22import socket
33import uuid
44from datetime import timedelta
5- from typing import TypedDict , Unpack , Any , cast , Union , Callable
5+ from typing import TypedDict , Unpack , Any , cast , Union
66
77from grpc import ChannelCredentials , Compression
88from google .protobuf .duration_pb2 import Duration
1717from cadence .api .v1 .service_workflow_pb2 import (
1818 StartWorkflowExecutionRequest ,
1919 StartWorkflowExecutionResponse ,
20+ SignalWithStartWorkflowExecutionRequest ,
21+ SignalWithStartWorkflowExecutionResponse ,
2022)
2123from cadence .api .v1 .common_pb2 import WorkflowType , WorkflowExecution
2224from cadence .api .v1 .tasklist_pb2 import TaskList
2325from cadence .data_converter import DataConverter , DefaultDataConverter
2426from cadence .metrics import MetricsEmitter , NoOpMetricsEmitter
27+ from cadence .workflow import WorkflowDefinition
2528
2629
2730class StartWorkflowOptions (TypedDict , total = False ):
@@ -132,7 +135,7 @@ async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
132135
133136 def _build_start_workflow_request (
134137 self ,
135- workflow : Union [str , Callable ],
138+ workflow : Union [str , WorkflowDefinition ],
136139 args : tuple [Any , ...],
137140 options : StartWorkflowOptions ,
138141 ) -> StartWorkflowExecutionRequest :
@@ -144,8 +147,8 @@ def _build_start_workflow_request(
144147 if isinstance (workflow , str ):
145148 workflow_type_name = workflow
146149 else :
147- # For callable , use function name or __name__ attribute
148- workflow_type_name = getattr ( workflow , "__name__" , str ( workflow ))
150+ # For WorkflowDefinition , use the name property
151+ workflow_type_name = workflow . name
149152
150153 # Encode input arguments
151154 input_payload = None
@@ -186,15 +189,15 @@ def _build_start_workflow_request(
186189
187190 async def start_workflow (
188191 self ,
189- workflow : Union [str , Callable ],
192+ workflow : Union [str , WorkflowDefinition ],
190193 * args ,
191194 ** options_kwargs : Unpack [StartWorkflowOptions ],
192195 ) -> WorkflowExecution :
193196 """
194197 Start a workflow execution asynchronously.
195198
196199 Args:
197- workflow: Workflow function or workflow type name string
200+ workflow: WorkflowDefinition or workflow type name string
198201 *args: Arguments to pass to the workflow
199202 **options_kwargs: StartWorkflowOptions as keyword arguments
200203
@@ -229,6 +232,69 @@ async def start_workflow(
229232 except Exception :
230233 raise
231234
235+ async def signal_with_start_workflow (
236+ self ,
237+ workflow : Union [str , WorkflowDefinition ],
238+ signal_name : str ,
239+ signal_args : list [Any ],
240+ * workflow_args : Any ,
241+ ** options_kwargs : Unpack [StartWorkflowOptions ],
242+ ) -> WorkflowExecution :
243+ """
244+ Signal a workflow execution, starting it if it is not already running.
245+
246+ Args:
247+ workflow: WorkflowDefinition or workflow type name string
248+ signal_name: Name of the signal
249+ signal_args: List of arguments to pass to the signal handler
250+ *workflow_args: Arguments to pass to the workflow if it needs to be started
251+ **options_kwargs: StartWorkflowOptions as keyword arguments
252+
253+ Returns:
254+ WorkflowExecution with workflow_id and run_id
255+
256+ Raises:
257+ ValueError: If required parameters are missing or invalid
258+ Exception: If the gRPC call fails
259+ """
260+ # Convert kwargs to StartWorkflowOptions and validate
261+ options = _validate_and_apply_defaults (StartWorkflowOptions (** options_kwargs ))
262+
263+ # Build the start workflow request
264+ start_request = self ._build_start_workflow_request (
265+ workflow , workflow_args , options
266+ )
267+
268+ # Encode signal input
269+ signal_payload = None
270+ if signal_args :
271+ try :
272+ signal_payload = self .data_converter .to_data (signal_args )
273+ except Exception as e :
274+ raise ValueError (f"Failed to encode signal input: { e } " )
275+
276+ # Build the SignalWithStartWorkflowExecution request
277+ request = SignalWithStartWorkflowExecutionRequest (
278+ start_request = start_request ,
279+ signal_name = signal_name ,
280+ )
281+
282+ if signal_payload :
283+ request .signal_input .CopyFrom (signal_payload )
284+
285+ # Execute the gRPC call
286+ try :
287+ response : SignalWithStartWorkflowExecutionResponse = (
288+ await self .workflow_stub .SignalWithStartWorkflowExecution (request )
289+ )
290+
291+ execution = WorkflowExecution ()
292+ execution .workflow_id = start_request .workflow_id
293+ execution .run_id = response .run_id
294+ return execution
295+ except Exception :
296+ raise
297+
232298
233299def _validate_and_copy_defaults (options : ClientOptions ) -> ClientOptions :
234300 if "target" not in options :
0 commit comments