Skip to content

Commit accf9b3

Browse files
meiliang86yux0
authored andcommitted
Fix workflow and activity registration race (#980)
1 parent 4e06a49 commit accf9b3

File tree

7 files changed

+42
-48
lines changed

7 files changed

+42
-48
lines changed

internal/internal_activity.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -395,8 +395,8 @@ func deSerializeFunctionResult(f interface{}, result []byte, to interface{}, dat
395395
case reflect.String:
396396
// If we know about this function through registration then we will try to return corresponding result type.
397397
fnName := reflect.ValueOf(f).String()
398-
if fnRegistered, ok := registry.getActivityFn(fnName); ok {
399-
return deSerializeFnResultFromFnType(reflect.TypeOf(fnRegistered), result, to, dataConverter)
398+
if activity, ok := registry.GetActivity(fnName); ok {
399+
return deSerializeFnResultFromFnType(reflect.TypeOf(activity.GetFunction()), result, to, dataConverter)
400400
}
401401
}
402402

internal/internal_task_handlers.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1833,7 +1833,7 @@ func (ath *activityTaskHandlerImpl) getActivity(name string) activity {
18331833
return ath.activityProvider(name)
18341834
}
18351835

1836-
if a, ok := ath.registry.getActivity(name); ok {
1836+
if a, ok := ath.registry.GetActivity(name); ok {
18371837
return a
18381838
}
18391839

internal/internal_task_handlers_test.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1270,7 +1270,7 @@ func (t *TaskHandlersTestSuite) TestActivityExecutionDeadline() {
12701270
}
12711271
a := &testActivityDeadline{logger: t.logger}
12721272
registry := getGlobalRegistry()
1273-
registry.addActivity(a.ActivityType().Name, a)
1273+
registry.addActivityWithLock(a.ActivityType().Name, a)
12741274

12751275
mockCtrl := gomock.NewController(t.T())
12761276
mockService := workflowservicetest.NewMockClient(mockCtrl)
@@ -1324,7 +1324,10 @@ func activityWithWorkerStop(ctx context.Context) error {
13241324
func (t *TaskHandlersTestSuite) TestActivityExecutionWorkerStop() {
13251325
a := &testActivityDeadline{logger: t.logger}
13261326
registry := getGlobalRegistry()
1327-
registry.addActivityFn(a.ActivityType().Name, activityWithWorkerStop)
1327+
registry.RegisterActivityWithOptions(
1328+
activityWithWorkerStop,
1329+
RegisterActivityOptions{Name: a.ActivityType().Name, DisableAlreadyRegisteredCheck: true},
1330+
)
13281331

13291332
mockCtrl := gomock.NewController(t.T())
13301333
mockService := workflowservicetest.NewMockClient(mockCtrl)

internal/internal_worker.go

Lines changed: 30 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -539,28 +539,32 @@ func (r *registry) RegisterWorkflow(af interface{}) {
539539
}
540540

541541
func (r *registry) RegisterWorkflowWithOptions(
542-
af interface{},
542+
wf interface{},
543543
options RegisterWorkflowOptions,
544544
) {
545545
// Validate that it is a function
546-
fnType := reflect.TypeOf(af)
546+
fnType := reflect.TypeOf(wf)
547547
if err := validateFnFormat(fnType, true); err != nil {
548548
panic(err)
549549
}
550-
fnName := getFunctionName(af)
550+
fnName := getFunctionName(wf)
551551
alias := options.Name
552552
registerName := fnName
553553
if len(alias) > 0 {
554554
registerName = alias
555555
}
556+
557+
r.Lock()
558+
defer r.Unlock()
559+
556560
if !options.DisableAlreadyRegisteredCheck {
557561
if _, ok := r.workflowFuncMap[registerName]; ok {
558562
panic(fmt.Sprintf("workflow name \"%v\" is already registered", registerName))
559563
}
560564
}
561-
r.addWorkflowFn(registerName, af)
565+
r.workflowFuncMap[registerName] = wf
562566
if len(alias) > 0 {
563-
r.addWorkflowAlias(fnName, alias)
567+
r.workflowAliasMap[fnName] = alias
564568
}
565569
}
566570

@@ -594,20 +598,27 @@ func (r *registry) registerActivityFunction(af interface{}, options RegisterActi
594598
if len(alias) > 0 {
595599
registerName = alias
596600
}
601+
602+
r.Lock()
603+
defer r.Unlock()
604+
597605
if !options.DisableAlreadyRegisteredCheck {
598606
if _, ok := r.activityFuncMap[registerName]; ok {
599607
return fmt.Errorf("activity type \"%v\" is already registered", registerName)
600608
}
601609
}
602-
r.addActivityFn(registerName, af)
610+
r.activityFuncMap[registerName] = &activityExecutor{registerName, af}
603611
if len(alias) > 0 {
604-
r.addActivityAlias(fnName, alias)
612+
r.activityAliasMap[fnName] = alias
605613
}
606614

607615
return nil
608616
}
609617

610618
func (r *registry) registerActivityStruct(aStruct interface{}, options RegisterActivityOptions) error {
619+
r.Lock()
620+
defer r.Unlock()
621+
611622
structValue := reflect.ValueOf(aStruct)
612623
structType := structValue.Type()
613624
count := 0
@@ -629,11 +640,11 @@ func (r *registry) registerActivityStruct(aStruct interface{}, options RegisterA
629640
}
630641
registerName = prefix + name
631642
if !options.DisableAlreadyRegisteredCheck {
632-
if _, ok := r.getActivityFn(registerName); ok {
643+
if _, ok := r.getActivityNoLock(registerName); ok {
633644
return fmt.Errorf("activity type \"%v\" is already registered", registerName)
634645
}
635646
}
636-
r.addActivityFn(registerName, methodValue.Interface())
647+
r.activityFuncMap[registerName] = &activityExecutor{registerName, methodValue.Interface()}
637648
count++
638649
}
639650

@@ -644,12 +655,6 @@ func (r *registry) registerActivityStruct(aStruct interface{}, options RegisterA
644655
return nil
645656
}
646657

647-
func (r *registry) addWorkflowAlias(fnName string, alias string) {
648-
r.Lock()
649-
defer r.Unlock()
650-
r.workflowAliasMap[fnName] = alias
651-
}
652-
653658
func (r *registry) getWorkflowAlias(fnName string) (string, bool) {
654659
r.Lock() // do not defer for Unlock to call next.getWorkflowAlias without lock
655660
alias, ok := r.workflowAliasMap[fnName]
@@ -661,12 +666,6 @@ func (r *registry) getWorkflowAlias(fnName string) (string, bool) {
661666
return alias, ok
662667
}
663668

664-
func (r *registry) addWorkflowFn(fnName string, wf interface{}) {
665-
r.Lock()
666-
defer r.Unlock()
667-
r.workflowFuncMap[fnName] = wf
668-
}
669-
670669
func (r *registry) getWorkflowFn(fnName string) (interface{}, bool) {
671670
r.Lock() // do not defer for Unlock to call next.getWorkflowFn without lock
672671
fn, ok := r.workflowFuncMap[fnName]
@@ -692,12 +691,6 @@ func (r *registry) getRegisteredWorkflowTypes() []string {
692691
return result
693692
}
694693

695-
func (r *registry) addActivityAlias(fnName string, alias string) {
696-
r.Lock()
697-
defer r.Unlock()
698-
r.activityAliasMap[fnName] = alias
699-
}
700-
701694
func (r *registry) getActivityAlias(fnName string) (string, bool) {
702695
r.Lock() // do not defer for Unlock to call next.getActivityAlias without lock
703696
alias, ok := r.activityAliasMap[fnName]
@@ -709,32 +702,30 @@ func (r *registry) getActivityAlias(fnName string) (string, bool) {
709702
return alias, ok
710703
}
711704

712-
func (r *registry) addActivity(fnName string, a activity) {
705+
// Use in unit test only, otherwise deadlock will occur.
706+
func (r *registry) addActivityWithLock(fnName string, a activity) {
713707
r.Lock()
714708
defer r.Unlock()
715709
r.activityFuncMap[fnName] = a
716710
}
717711

718-
func (r *registry) addActivityFn(fnName string, af interface{}) {
719-
r.addActivity(fnName, &activityExecutor{fnName, af})
720-
}
721-
722-
func (r *registry) getActivity(fnName string) (activity, bool) {
723-
r.Lock() // do not defer for Unlock to call next.getActivity without lock
712+
func (r *registry) GetActivity(fnName string) (activity, bool) {
713+
r.Lock() // do not defer for Unlock to call next.GetActivity without lock
724714
a, ok := r.activityFuncMap[fnName]
725715
if !ok && r.next != nil {
726716
r.Unlock()
727-
return r.next.getActivity(fnName)
717+
return r.next.GetActivity(fnName)
728718
}
729719
r.Unlock()
730720
return a, ok
731721
}
732722

733-
func (r *registry) getActivityFn(fnName string) (interface{}, bool) {
734-
if a, ok := r.getActivity(fnName); ok {
735-
return a.GetFunction(), ok
723+
func (r *registry) getActivityNoLock(fnName string) (activity, bool) {
724+
a, ok := r.activityFuncMap[fnName]
725+
if !ok && r.next != nil {
726+
return r.next.getActivityNoLock(fnName)
736727
}
737-
return nil, false
728+
return a, ok
738729
}
739730

740731
func (r *registry) getRegisteredActivities() []activity {

internal/internal_workers_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ func (s *WorkersTestSuite) TestActivityWorker() {
127127
overrides := &workerOverrides{activityTaskHandler: newSampleActivityTaskHandler()}
128128
a := &greeterActivity{}
129129
registry := getGlobalRegistry()
130-
registry.addActivity(a.ActivityType().Name, a)
130+
registry.addActivityWithLock(a.ActivityType().Name, a)
131131
activityWorker := newActivityWorker(
132132
s.service, domain, executionParameters, overrides, registry, nil,
133133
)
@@ -176,7 +176,7 @@ func (s *WorkersTestSuite) TestActivityWorkerStop() {
176176
overrides := &workerOverrides{activityTaskHandler: activityTaskHandler}
177177
a := &greeterActivity{}
178178
registry := getGlobalRegistry()
179-
registry.addActivity(a.ActivityType().Name, a)
179+
registry.addActivityWithLock(a.ActivityType().Name, a)
180180
worker := newActivityWorker(
181181
s.service, domain, executionParameters, overrides, registry, nil,
182182
)

internal/internal_workflow_client_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -934,7 +934,7 @@ func getGetWorkflowExecutionHistoryRequest(filterType shared.HistoryEventFilterT
934934
},
935935
WaitForNewEvent: common.BoolPtr(isLongPoll),
936936
HistoryEventFilterType: &filterType,
937-
SkipArchival: common.BoolPtr(true),
937+
SkipArchival: common.BoolPtr(true),
938938
}
939939

940940
return request

internal/internal_workflow_testsuite.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1517,7 +1517,7 @@ func (env *testWorkflowEnvironmentImpl) newTestActivityTaskHandler(taskList stri
15171517
}
15181518
}
15191519

1520-
activity, ok := registry.getActivity(name)
1520+
activity, ok := registry.GetActivity(name)
15211521
if !ok {
15221522
return nil
15231523
}

0 commit comments

Comments
 (0)