Include transient and speculative WFT events in GetWorkflowExecutionHistoryResponse#9138
Include transient and speculative WFT events in GetWorkflowExecutionHistoryResponse#9138
Conversation
…-premature-end-stream
…-premature-end-stream
| string inherited_build_id = 23; | ||
| repeated temporal.server.api.persistence.v1.VersionedTransition transition_history = 24; | ||
| temporal.api.workflow.v1.WorkflowExecutionVersioningInfo versioning_info = 25; | ||
| // Transient or speculative workflow task events which are not yet persisted in the history. |
There was a problem hiding this comment.
For reviewer: this was resurrected from PR #7732
| bool is_workflow_running = 5; | ||
| bytes persistence_token = 6; | ||
| temporal.server.api.history.v1.TransientWorkflowTaskInfo transient_workflow_task = 7; | ||
| reserved 7; // Was: transient_workflow_task - no longer passed through continuation token |
| 2 WorkflowTaskScheduled | ||
| 3 WorkflowTaskStarted | ||
| 4 WorkflowTaskFailed {"Cause":23,"Failure":{"Message":"BadSearchAttributes: search attribute INVALIDKEY is not defined"}}`, historyEvents) | ||
| 4 WorkflowTaskFailed {"Cause":23,"Failure":{"Message":"BadSearchAttributes: search attribute INVALIDKEY is not defined"}} |
There was a problem hiding this comment.
All of the test files except transient_workflow_task_history_test.go have this addition of a WorkflowTaskScheduled because of the shipped spec/trans events. I let Claude iterate on these tests.
| @@ -0,0 +1,410 @@ | |||
| package tests | |||
There was a problem hiding this comment.
For reviewer: I used CC to help generate these tests, especially with syntax for pollers, and validate the tests were testing the actual premature end of stream issue
| 4 WorkflowTaskCompleted | ||
| 5 WorkflowTaskScheduled // Speculative WT2 which was created while completing WT1. | ||
| 6 WorkflowTaskStarted`, task.History) | ||
| // Message handler rejects update. |
There was a problem hiding this comment.
AI agents have the unfortunate habit to remove comments; I think we want to keep the comments in this file.
| @@ -0,0 +1,410 @@ | |||
| package tests | |||
There was a problem hiding this comment.
minor thing; but why don't we rename this to workflow_task_transient_history_test.go so it appears next to the other workflow task test file?
| tag.WorkflowRunID(execution.GetRunId()), | ||
| tag.Error(err)) | ||
| // Don't append events, but don't fail request | ||
| } else { |
There was a problem hiding this comment.
Interesting! So before we actually appended the events even if validation failed. (just found that curious)
| // clientSupportsTranOrSpecEvents detects if client supports transient events | ||
| // Default to include transient events for clients, only CLI and UI are | ||
| // explicitly excluded for backward compatability |
There was a problem hiding this comment.
I suppose this was on the original PR by Alex already; I wonder what this would be breaking exactly. Maybe just the user experience as there might be "phantom" events and that's odd.
Anyway safe(r) to keep it this way for now.
| logger := shardContext.GetLogger() | ||
| metricsHandler := interceptor.GetMetricsHandlerFromContext(ctx, logger).WithTags(metrics.OperationTag(metrics.HistoryGetRawHistoryScope)) | ||
| metrics.ServiceErrIncompleteHistoryCounter.With(metricsHandler).Record(1) | ||
| logger.Warn("Transient event validation failed, skipping events", |
There was a problem hiding this comment.
optional: should we put a softassert.Fail here (and for the other validation)? Sounds like this should never happen. Then again, it's logged as warning. Hm. Ignore this, just leaving this as a thought I had.
| useRawHistory bool, | ||
| history *historypb.History, | ||
| historyBlob *[]*commonpb.DataBlob, | ||
| ) error { |
There was a problem hiding this comment.
It looks like this never returns an error? Might as well remove the error return.
| // Manually append transient events to the response | ||
| if useRawHistory { | ||
| transientEventsBlob, err := shardContext.GetPayloadSerializer().SerializeEvents(transientWorkflowTask.GetHistorySuffix()) | ||
| if err == nil { |
There was a problem hiding this comment.
optional: this looks like sth that should never happen; could add a softassert here (requires logger param).
| temporal.api.workflow.v1.WorkflowExecutionVersioningInfo versioning_info = 25; | ||
| // Transient or speculative workflow task events which are not yet persisted in the history. | ||
| // These events should be appended to the history when it is returned to the worker. | ||
| temporal.server.api.history.v1.TransientWorkflowTaskInfo transient_or_speculative_events = 26; |
There was a problem hiding this comment.
What do you think about the name transient_or_speculative_events? I was a bit surprised by this in the code
transientWorkflowTask := msResp.GetTransientOrSpeculativeEvents()
One thing says "events" another says "task".
There was a problem hiding this comment.
The name is revived from a previous PR, I can make a change to say transient_or_speculative_tasks.
| clientName, _ := headers.GetClientNameAndVersion(ctx) | ||
|
|
||
| switch clientName { | ||
| case headers.ClientNameCLI, headers.ClientNameUI: |
There was a problem hiding this comment.
Is there a unit or functional test for this? I can see this being changed/removed by accident.
| history *historypb.History, | ||
| historyBlob *[]*commonpb.DataBlob, | ||
| ) error { | ||
| msResp, err := api.GetOrPollWorkflowMutableState( |
There was a problem hiding this comment.
I think the 2nd GetOrPollWorkflowMutableState call is problematic. It adds latency since it has to go through the workflow lock again - but more importantly, I think there's a possible race here where the first call found a running workflow, but then it closes, and then we add these regardless.
What changed?
Include transient and speculative WFT events in
GetWorkflowExecutionHistoryReponseresponse, unless UI or CLI made request.transient_or_speculative_eventsback toGetMutableStateResponsetransient_workflow_taskinHisotryCOntinuationtokenRe-implements #7732
Why?
Fix "premature end of stream" errors when workers request history after cache eviction w/ transient/speculative workflow tasks present. This adds transient & speculative WFT events in
GetWorkflowExecutionHistory(already inPollWorkflowTask). Worker cache eviction w/ speculative workflow tasks causes the expected and actual event counts to be different. #7732 passed transient events through continuation tokens, which could become stale during pagination. This PR implements mutable state querying at both start and end of pagination and compares transient event IDs to detect if WFT state changed during pagination and return a retryable error.How did you test it?
Potential risks
Same risks from #7732