Skip to content

Include transient and speculative WFT events in GetWorkflowExecutionHistoryResponse#9138

Open
spkane31 wants to merge 21 commits intomainfrom
spk/update-premature-end-stream
Open

Include transient and speculative WFT events in GetWorkflowExecutionHistoryResponse#9138
spkane31 wants to merge 21 commits intomainfrom
spk/update-premature-end-stream

Conversation

@spkane31
Copy link
Contributor

@spkane31 spkane31 commented Jan 26, 2026

What changed?

Include transient and speculative WFT events in GetWorkflowExecutionHistoryReponse response, unless UI or CLI made request.

  • Adds transient_or_speculative_events back to GetMutableStateResponse
  • Reserve transient_workflow_task in HisotryCOntinuation token
  • Add validation helpers
  • Add query-compare-query for transient events at request start and end

Re-implements #7732

Why?

Fix "premature end of stream" errors when workers request history after cache eviction w/ transient/speculative workflow tasks present. This adds transient & speculative WFT events in GetWorkflowExecutionHistory (already in PollWorkflowTask). Worker cache eviction w/ speculative workflow tasks causes the expected and actual event counts to be different. #7732 passed transient events through continuation tokens, which could become stale during pagination. This PR implements mutable state querying at both start and end of pagination and compares transient event IDs to detect if WFT state changed during pagination and return a retryable error.

How did you test it?

  • built
  • run locally and tested manually
  • covered by existing tests
  • added new unit test(s)
  • added new functional test(s)

Potential risks

Same risks from #7732

@spkane31 spkane31 requested review from a team as code owners January 26, 2026 19:00
@spkane31 spkane31 requested a review from stephanos January 27, 2026 23:36
string inherited_build_id = 23;
repeated temporal.server.api.persistence.v1.VersionedTransition transition_history = 24;
temporal.api.workflow.v1.WorkflowExecutionVersioningInfo versioning_info = 25;
// Transient or speculative workflow task events which are not yet persisted in the history.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For reviewer: this was resurrected from PR #7732

bool is_workflow_running = 5;
bytes persistence_token = 6;
temporal.server.api.history.v1.TransientWorkflowTaskInfo transient_workflow_task = 7;
reserved 7; // Was: transient_workflow_task - no longer passed through continuation token
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For reviewer: resurrected from PR #7732

2 WorkflowTaskScheduled
3 WorkflowTaskStarted
4 WorkflowTaskFailed {"Cause":23,"Failure":{"Message":"BadSearchAttributes: search attribute INVALIDKEY is not defined"}}`, historyEvents)
4 WorkflowTaskFailed {"Cause":23,"Failure":{"Message":"BadSearchAttributes: search attribute INVALIDKEY is not defined"}}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All of the test files except transient_workflow_task_history_test.go have this addition of a WorkflowTaskScheduled because of the shipped spec/trans events. I let Claude iterate on these tests.

@@ -0,0 +1,410 @@
package tests
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For reviewer: I used CC to help generate these tests, especially with syntax for pollers, and validate the tests were testing the actual premature end of stream issue

4 WorkflowTaskCompleted
5 WorkflowTaskScheduled // Speculative WT2 which was created while completing WT1.
6 WorkflowTaskStarted`, task.History)
// Message handler rejects update.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AI agents have the unfortunate habit to remove comments; I think we want to keep the comments in this file.

@@ -0,0 +1,410 @@
package tests
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor thing; but why don't we rename this to workflow_task_transient_history_test.go so it appears next to the other workflow task test file?

tag.WorkflowRunID(execution.GetRunId()),
tag.Error(err))
// Don't append events, but don't fail request
} else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting! So before we actually appended the events even if validation failed. (just found that curious)

Comment on lines +446 to +448
// clientSupportsTranOrSpecEvents detects if client supports transient events
// Default to include transient events for clients, only CLI and UI are
// explicitly excluded for backward compatability
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose this was on the original PR by Alex already; I wonder what this would be breaking exactly. Maybe just the user experience as there might be "phantom" events and that's odd.

Anyway safe(r) to keep it this way for now.

logger := shardContext.GetLogger()
metricsHandler := interceptor.GetMetricsHandlerFromContext(ctx, logger).WithTags(metrics.OperationTag(metrics.HistoryGetRawHistoryScope))
metrics.ServiceErrIncompleteHistoryCounter.With(metricsHandler).Record(1)
logger.Warn("Transient event validation failed, skipping events",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

optional: should we put a softassert.Fail here (and for the other validation)? Sounds like this should never happen. Then again, it's logged as warning. Hm. Ignore this, just leaving this as a thought I had.

useRawHistory bool,
history *historypb.History,
historyBlob *[]*commonpb.DataBlob,
) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this never returns an error? Might as well remove the error return.

// Manually append transient events to the response
if useRawHistory {
transientEventsBlob, err := shardContext.GetPayloadSerializer().SerializeEvents(transientWorkflowTask.GetHistorySuffix())
if err == nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

optional: this looks like sth that should never happen; could add a softassert here (requires logger param).

temporal.api.workflow.v1.WorkflowExecutionVersioningInfo versioning_info = 25;
// Transient or speculative workflow task events which are not yet persisted in the history.
// These events should be appended to the history when it is returned to the worker.
temporal.server.api.history.v1.TransientWorkflowTaskInfo transient_or_speculative_events = 26;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about the name transient_or_speculative_events? I was a bit surprised by this in the code

	transientWorkflowTask := msResp.GetTransientOrSpeculativeEvents()

One thing says "events" another says "task".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name is revived from a previous PR, I can make a change to say transient_or_speculative_tasks.

clientName, _ := headers.GetClientNameAndVersion(ctx)

switch clientName {
case headers.ClientNameCLI, headers.ClientNameUI:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a unit or functional test for this? I can see this being changed/removed by accident.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding

history *historypb.History,
historyBlob *[]*commonpb.DataBlob,
) error {
msResp, err := api.GetOrPollWorkflowMutableState(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the 2nd GetOrPollWorkflowMutableState call is problematic. It adds latency since it has to go through the workflow lock again - but more importantly, I think there's a possible race here where the first call found a running workflow, but then it closes, and then we add these regardless.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants