Messages - Python SDK feature guide
This page shows how to do the following:
Signals
How to develop with Signals using the Python SDK.
A Signal is a message sent asynchronously to a running Workflow Execution which can be used to change the state and control the flow of a Workflow Execution. It can only deliver data to a Workflow Execution that has not already closed.
Signals are defined in your code and handled in your Workflow Definition. Signals can be sent to Workflow Executions from a Temporal Client or from another Workflow Execution.
There are two steps for adding support for a Signal to your Workflow code:
- Defining the Signal - You specify the name and data structure used by Temporal Clients when sending the Signal.
- Handling the Signal - You write code that will be invoked when the Signal is received from a Temporal Client.
After defining and handling your Signal, you can send it from a Temporal Client or from another Workflow Execution.
Define Signal
How to define a Signal using the Python SDK.
A Signal has a name and can have arguments.
- The name, also called a Signal type, is a string.
- The arguments must be serializable.
To define a Signal, set the Signal decorator
@workflow.signal
on the Signal function inside your Workflow.
Non-dynamic methods can only have positional arguments. Temporal suggests taking a single argument that is an object or data class of fields that can be added to as needed.
Return values from Signal methods are ignored.
How to customize names
You can have a name parameter to customize the Signal's name, otherwise it defaults to the name of the Signal method.
View the source code
in the context of the rest of the application code.
from temporalio import workflow
# ...
@workflow.signal
async def submit_greeting(self, name: str) -> None:
await self._pending_greetings.put(name)
@workflow.signal
def exit(self) -> None:
# ...
@workflow.signal(name="Custom Signal Name")
async def custom_signal(self, name: str) -> None:
await self._pending_greetings.put(name)
Handle Signal
How to handle a Signal using the Python SDK.
Workflows listen for Signals by the Signal's name.
Signal handlers are functions defined in the Workflow that listen for incoming Signals of a given type. These handlers define how a Workflow should react when it receives a specific type of Signal.
To send a Signal to the Workflow, use the signal method from the WorkflowHandle class.
from temporalio.client import Client
# ...
# ...
await handle.signal(GreetingWorkflow.submit_greeting, "User 1")
Send a Signal from a Temporal Client
How to send a Signal from a Temporal Client using the Python SDK.
When a Signal is sent successfully from the Temporal Client, the WorkflowExecutionSignaled Event appears in the Event History of the Workflow that receives the Signal.
To send a Signal from the Client, use the signal() function on the Workflow handle.
To get the Workflow handle, you can use any of the following options.
- Use the get_workflow_handle() method.
- Use the get_workflow_handle_for() method to get a type-safe Workflow handle by its Workflow Id.
- Use the start_workflow() to start a Workflow and return its handle.
View the source code
in the context of the rest of the application code.
from temporalio.client import Client
# ...
# ...
client = await Client.connect("localhost:7233")
handle = await client.start_workflow(
GreetingWorkflow.run,
id="your-greeting-workflow",
task_queue="signal-tq",
)
await handle.signal(GreetingWorkflow.submit_greeting, "User 1")
Send a Signal from a Workflow
How to send a Signal from a Workflow using the Python SDK.
A Workflow can send a Signal to another Workflow, in which case it's called an External Signal.
When an External Signal is sent:
- A SignalExternalWorkflowExecutionInitiated Event appears in the sender's Event History.
- A WorkflowExecutionSignaled Event appears in the recipient's Event History.
Use get_external_workflow_handle_for
to get a typed Workflow handle to an existing Workflow by its identifier.
Use get_external_workflow_handle
when you don't know the type of the other Workflow.
The Workflow Type passed is only for type annotations and not for validation.
# ...
@workflow.defn
class WorkflowB:
@workflow.run
async def run(self) -> None:
handle = workflow.get_external_workflow_handle_for(WorkflowA.run, "workflow-a")
await handle.signal(WorkflowA.your_signal, "signal argument")
Signal-With-Start
How to send a Signal-With-Start using the Python SDK.
Signal-With-Start is used from the Client. It takes a Workflow Id, Workflow arguments, a Signal name, and Signal arguments.
If there's a Workflow running with the given Workflow Id, it will be signaled. If there isn't, a new Workflow will be started and immediately signaled.
To send a Signal-With-Start in Python, use the start_workflow()
method and pass the start_signal
argument with the name of your Signal.
from temporalio.client import Client
# ...
# ...
async def main():
client = await Client.connect("localhost:7233")
await client.start_workflow(
GreetingWorkflow.run,
id="your-signal-with-start-workflow",
task_queue="signal-tq",
start_signal="submit_greeting",
start_signal_args=["User Signal with Start"],
)
Queries
A Query is a synchronous operation that is used to get the state of a Workflow Execution.
How to define a Query
A Query has a name and can have arguments.
- The name, also called a Query type, is a string.
- The arguments must be serializable.
To define a Query, set the Query decorator @workflow.query
on the Query function inside your Workflow.
Customize names
You can have a name parameter to customize the Query's name, otherwise it defaults to the name of the Query method.
You can either set the name
or the dynamic
parameter in a Query's decorator, but not both.
# ...
@workflow.query
def greeting(self) -> str:
return self._greeting
Handle a Query
How to handle a Query
Queries are handled by your Workflow.
Don’t include any logic that causes Command generation within a Query handler (such as executing Activities). Including such logic causes unexpected behavior.
To send a Query to the Workflow, use the query
method from the WorkflowHandle
class.
# ...
result = await handle.query(GreetingWorkflow.greeting)
Send a Query
How to send a Query
Queries are sent from a Temporal Client.
To send a Query to a Workflow Execution from Client code, use the query()
method on the Workflow handle.
# ...
result = await handle.query(GreetingWorkflow.greeting)
Developing with Updates
An Update is a trackable request sent synchronously to a running Workflow Execution that can change the state and control the flow of a Workflow Execution, and return a result. The sender of the request must wait until the update is at least accepted or rejected by a Worker, and will often opt to wait further to receive the value returned by the Update handler, or an exception indicating what went wrong. Update handlers can do arbitrarily long-running async operations (like signal handlers, and the main workflow method).
Writing Update handlers and validators as a Workflow author
Here's a Workflow Definition that illustrates how to create an Update handler, and an associated validator:
@dataclass
class HelloWorldInput:
entity_to_be_greeted: str
@workflow.defn
class HelloWorldWorkflow:
def __init__(self):
self.entity_to_be_greeted: Optional[str] = None
@workflow.run
async def run(self) -> str:
await workflow.wait_condition(lambda: self.entity_to_be_greeted is not None)
return self.greeting()
@workflow.update
def set_greeting(self, input: HelloWorldInput) -> str:
self.entity_to_be_greeted = input.entity_to_be_greeted
return self.greeting()
@set_greeting.validator
def set_greeting_validator(self, input: HelloWorldInput) -> None:
if input.entity_to_be_greeted not in {"world", "World"}:
raise Exception(f"invalid entity: {input.entity_to_be_greeted}")
def greeting(self) -> str:
return f"Hello, {self.entity_to_be_greeted}!"
Note the following:
- Update handlers and validators are defined using decorators, in a similar way to Signal and Query handlers.
- This handler method does not do any long-running async operations; if it did, it would need to be an
async def
. - Examples of async operations that can be done in an update handler include
asyncio.sleep(...)
,workflow.wait_condition(...)
, and execution of Activities and Child Workflows. - The handler method signature defines the argument type and return type that a client will use when sending an Update.
- It is possible to use multiple arguments, but this is not recommended: instead use a single dataclass argument in which fields can be added/removed as needed.
- Validators are optional; if you don't want to be able to reject updates then you don't need a validator.
- To reject an Update, you raise an exception (of any type) in the validator.
- The name of the decorator you use to define the validator is based on the name that you give to the handler.
- The validator must take the same argument type as the handler, but always returns
None
. - The
update
decorator accepts arguments.
Sending an Update to a Workflow Execution
Recall that when sending an Update, the client will not receive a response until a Worker is available and the Update has been delivered to the Worker. If you want the server to send a response as soon as it receives your request, then you must use a Signal instead.
To send an Update to a Workflow Execution, you have two choices:
1. Use execute_update
to wait for the update to complete
execute_update
sends an Update and waits until it has been completed by a Worker. It returns the Update result:
# Wait until the update is completed
update_result = await workflow_handle.execute_update(
HelloWorldWorkflow.set_greeting,
HelloWorldInput("World"),
)
2. Use start_update
to receive a handle as soon as the update is accepted or rejected
start_update
sends an Update and waits until the Update has been accepted or rejected by a Worker. It returns an UpdateHandle
:
# Wait until the update is accepted
update_handle = await workflow_handle.start_update(
HelloWorldWorkflow.set_greeting,
HelloWorldInput("World"),
)
# Wait until the update is completed
update_result = await update_handle.result()
Exceptions
The following exceptions might be raised by execute_update
, or when calling update_handle.result()
on a handle obtained from start_update
:
-
temporalio.client.WorkflowUpdateFailedError
The Update was either rejected by the validator, or the Workflow author deliberately failed the Update by raising
ApplicationError
in the handler. -
temporalio.service.RPCError
"workflow execution already completed"`This will happen in any of the following situations:
- The
WorkflowHandle
that was used to send the Update referenced a non-existent Workflow. - The Workflow finished while the Update handler execution was in progress, for example because
- The Workflow was canceled or was deliberately failed (the Workflow author raised
ApplicationError
outside an Update handler) - The Workflow completed normally or continued-as-new and the Workflow author did not wait for handlers to be finished.
- The Workflow was canceled or was deliberately failed (the Workflow author raised
- The
If the Workflow handle references a Workflow that doesn't exist then execute_update
and start_update
will both raise temporalio.service.RPCError "sql: no rows in result set"
.
Dynamic Handler
What is a Dynamic Handler?
Temporal supports Dynamic Workflows, Activities, Signals, and Queries. These are unnamed handlers that are invoked if no other statically defined handler with the given name exists.
Dynamic Handlers provide flexibility to handle cases where the names of Workflows, Activities, Signals, or Queries aren't known at run time.
Dynamic Handlers should be used judiciously as a fallback mechanism rather than the primary approach. Overusing them can lead to maintainability and debugging issues down the line.
Instead, Workflows, Activities, Signals, and Queries should be defined statically whenever possible, with clear names that indicate their purpose. Use static definitions as the primary way of structuring your Workflows.
Reserve Dynamic Handlers for cases where the handler names are not known at compile time and need to be looked up dynamically at runtime. They are meant to handle edge cases and act as a catch-all, not as the main way of invoking logic.
Set a Dynamic Signal
How to set a Dynamic Signal
A Dynamic Signal in Temporal is a Signal that is invoked dynamically at runtime if no other Signal with the same input is registered.
A Signal can be made dynamic by adding dynamic=True
to the @signal.defn
decorator.
The Signal Handler should accept self
, a string input, and a Sequence[temporalio.common.RawValue]
.
The payload_converter() function is used to convert a RawValue
object to the desired type.
# ...
@workflow.signal(dynamic=True)
async def dynamic_signal(self, name: str, args: Sequence[RawValue]) -> None:
await self._pending_greetings.put(name)
Set a Dynamic Query
How to set a Dynamic Query
A Dynamic Query in Temporal is a Query that is invoked dynamically at runtime if no other Query with the same name is registered.
A Query can be made dynamic by adding dynamic=True
to the @query.defn
decorator.
The Query Handler should accept self
, a string name, and a Sequence[temporalio.common.RawValue]
.
The payload_converter() function is used to convert a RawValue
object to the desired type.
# ...
@workflow.query(dynamic=True)
def dynamic_query(self, input: str, args: Sequence[RawValue]) -> str:
return self._greeting
Set a Dynamic Workflow
How to set a Dynamic Workflow
A Dynamic Workflow in Temporal is a Workflow that is invoked dynamically at runtime if no other Workflow with the same name is registered.
A Workflow can be made dynamic by adding dynamic=True
to the @workflow.defn
decorator.
You must register the Workflow with the Worker before it can be invoked.
The Workflow Definition must then accept a single argument of type Sequence[temporalio.common.RawValue]
.
The payload_converter() function is used to convert a RawValue
object to the desired type.
# ...
@workflow.defn(dynamic=True)
class DynamicWorkflow:
@workflow.run
async def run(self, args: Sequence[RawValue]) -> str:
name = workflow.payload_converter().from_payload(args[0].payload, str)
return await workflow.execute_activity(
default_greeting,
YourDataClass("Hello", name),
start_to_close_timeout=timedelta(seconds=10),
)
Set a Dynamic Activity
How to set a Dynamic Activity
A Dynamic Activity in Temporal is an Activity that is invoked dynamically at runtime if no other Activity with the same name is registered.
An Activity can be made dynamic by adding dynamic=True
to the @activity.defn
decorator.
You must register the Activity with the Worker before it can be invoked.
The Activity Definition must then accept a single argument of type Sequence[temporalio.common.RawValue]
.
The payload_converter() function is used to convert a RawValue
object to the desired type.
# ...
@activity.defn(dynamic=True)
async def dynamic_greeting(args: Sequence[RawValue]) -> str:
arg1 = activity.payload_converter().from_payload(args[0].payload, YourDataClass)
return (
f"{arg1.greeting}, {arg1.name}!\nActivity Type: {activity.info().activity_type}"
)
# ...
@workflow.defn
class GreetingWorkflow:
@workflow.run
async def run(self, name: str) -> str:
return await workflow.execute_activity(
"unregistered_activity",
YourDataClass("Hello", name),
start_to_close_timeout=timedelta(seconds=10),
)