Conversation
5a868c0 to
9f1d783
Compare
cretz
left a comment
There was a problem hiding this comment.
Mostly minor things, overall LGTM
temporalio/client.py
Outdated
|
|
||
| # Overload for no-param update | ||
| @overload | ||
| async def execute_update_with_start( |
There was a problem hiding this comment.
I think this needs to be execute_update_with_start_workflow to differentiate from all of the non-workflow stuff on the client (same for start_update_with_start_workflow)
There was a problem hiding this comment.
Thanks, I hadn't appreciated the distinction from Java/TS's "workflow clients". Done.
temporalio/client.py
Outdated
| update: Update function or name on the workflow. arg: Single argument to the | ||
| update. args: Multiple arguments to the update. Cannot be set if arg is. | ||
| start_workflow_operation: a WithStartWorkflowOperation definining the | ||
| WorkflowIDConflictPolicy and how to start the workflow in the event that a | ||
| workflow is started. | ||
| id: ID of the update. If not set, the default is a new UUID. result_type: For |
There was a problem hiding this comment.
Some newlines aren't showing here to separate the args
|
|
||
| # Overload for no-param workflow, with_start | ||
| @overload | ||
| def __init__( |
There was a problem hiding this comment.
This may need to new user metadata stuff that was added in #701
| ) | ||
| self._workflow_handle: Future[WorkflowHandle[SelfType, ReturnType]] = Future() | ||
|
|
||
| async def workflow_handle(self) -> WorkflowHandle[SelfType, ReturnType]: |
There was a problem hiding this comment.
No strong opinion here, but would be ok if this was just called handle since it's in the start-workflow class
There was a problem hiding this comment.
I think I'll leave it as workflow_handle. It's an "operation" which is not really a standard SDK concept, so the clarity probably helps IMO, plus handle could be mistaken for a verb (a lot of non-English-first-language-speakers find the handle-verb, handle-noun, handler-noun terms confusing in software.)
There was a problem hiding this comment.
Ok. May be worth noting Go, Java, and .NET WithStartWorkflowOperation classes to not qualify their methods with workflow either which makes sense.
There was a problem hiding this comment.
As decided in group discussion, we're going with await start_op.workflow_handle()
temporalio/client.py
Outdated
| """Called for every :py:meth:`WorkflowHandle.update` and :py:meth:`WorkflowHandle.start_update` call.""" | ||
| return await self.next.start_workflow_update(input) | ||
|
|
||
| async def start_workflow_update_with_start( |
There was a problem hiding this comment.
I think this would be more clearly named start_update_with_start_workflow (and changing input class name too). IMO it makes sense to have the method name match the client call (which should also be that IMO).
There was a problem hiding this comment.
I agree. But signal/query/update/terminate do not do this; they have query_workflow, signal_workflow, start_workflow_update, terminate_workflow, etc.
There was a problem hiding this comment.
What do you think? It seems unfortunately that the most consistent name is start_workflow_update_with_start_workflow, in order to match start_workflow_update
There was a problem hiding this comment.
OK discussed offline; it is now named start_update_with_start_workflow to match the client call. (The idea behind start_workflow_update is that the workflow handle can name it start_update but in other contexts we need to be more explicit about what "update" is).
| start_req = ( | ||
| await self._build_update_with_start_start_workflow_execution_request( | ||
| start_input | ||
| ) | ||
| ) | ||
| update_req = await self._build_update_workflow_execution_request( | ||
| update_input, workflow_id=start_input.id | ||
| ) |
There was a problem hiding this comment.
I am concerned that exceptions that happen here (e.g. serializing workflow/update args) will leave someone waiting on the workflow handle hanging. Same for things like cancel.
Is it possible to make sure that no matter how this method exits, the start operation handle awaiter is updated? I didn't check if other langs did this, but I think it makes sense.
There was a problem hiding this comment.
Yes absolutely, good call. Done. Both the exception handling and the inner logic are involved so the inner logic is in a separate function, with a finally case in the outer function ensuring that the promise is rejected in all cases.
temporalio/client.py
Outdated
| ), | ||
| None, | ||
| ) | ||
| if status and status.code in RPCStatusCode: |
There was a problem hiding this comment.
Should add special handling for WorkflowAlreadyStartedError here I think
There was a problem hiding this comment.
Thank you, that's done now, and in general the error handling and poll loop has been rewritten.
temporalio/client.py
Outdated
| temporalio.api.workflowservice.v1.StartWorkflowExecutionResponse | ||
| ] = [ | ||
| r.start_workflow | ||
| for r in multiop_response.responses |
There was a problem hiding this comment.
Doesn't really matter, but I think we should be able to assume the indexes of responses match 1:1 with the request objects. So may be able to just do multiop_response.responses[0].start_workflow and not have to loop.
There was a problem hiding this comment.
Yeah I decided that you're right and I didn't need to program so defensively here. So more minimal/cleaner now.
temporalio/client.py
Outdated
| # Build the handle. If the user's wait stage is COMPLETED, make sure we | ||
| # poll for result. | ||
| handle: WorkflowUpdateHandle[Any] = WorkflowUpdateHandle( | ||
| return WorkflowUpdateHandle( |
There was a problem hiding this comment.
When we do execute_update (or the user set wait stage to completed), we don't return the handle until we have polled for outcome
There was a problem hiding this comment.
You're right, there's a test that I'd intended to be confirming that the result is fetched in that context, but the test was misconceived. It's not the easiest thing to test for, but I've put the poll call in. Maybe we can add a test later based on manipulating the history long poll timeout.
break
handle = WorkflowUpdateHandle(
client=self._client,
id=update_req.request.meta.update_id,
workflow_id=start_input.id,
workflow_run_id=start_response.run_id,
known_outcome=known_outcome,
)
if update_input.wait_for_stage == WorkflowUpdateStage.COMPLETED:
await handle._poll_until_outcome()
return handle0b37fd2 to
3c17209
Compare
temporalio/client.py
Outdated
|
|
||
| # TODO (dan): | ||
| # temporalio/client.py:926: error: Overloaded function implementation does not accept all possible arguments of signature 1 [misc] | ||
| async def start_update_with_start_workflow( # type: ignore |
There was a problem hiding this comment.
I think here and other user facing entry points (e.g. execute equivalent and the class doc for WithStartWorkflowOperation) should have the "experimental" warning that looks similar to what we're removing on #707.
There was a problem hiding this comment.
Added experimental warnings
temporalio/client.py
Outdated
| id=req.request.meta.update_id, | ||
| workflow_id=input.id, | ||
| workflow_id=workflow_id, | ||
| # TODO: Why don't we use the run ID from the update response here? |
There was a problem hiding this comment.
This is a bug we believe (and it exists in .NET too)
There was a problem hiding this comment.
Deleted comment from this PR
| if ( | ||
| st.details | ||
| and not st.details[0].Is( | ||
| temporalio.api.failure.v1.MultiOperationExecutionAborted.DESCRIPTOR | ||
| ) | ||
| ) |
There was a problem hiding this comment.
What did we end up deciding here about what a successful start but failed update looks like? I think server side today st.details is never None correct? Should we ignore OK statuses? This is super rare and so non-blocking for this PR, but whatever the decision we may need to apply to other SDKs too.
There was a problem hiding this comment.
I'm skipping OK statuses in Python and TS.
temporalio/client.py
Outdated
| st | ||
| for st in multiop_failure.statuses | ||
| if ( | ||
| st.details |
There was a problem hiding this comment.
I think this logic is a bit off. I don't think we should require details to be considered an error
faf4140 to
10f3026
Compare
Add an update-with-start API, using the MultiOperation gRPC API.
The test suite is not complete yet, but please feel free to review.
In addition to the tests, an example of using the new API is temporalio/samples-python#156:
From the docstring: