Skip to main content

Messages - Python SDK feature guide

This page shows how to do the following:

Signals

How to develop with Signals using the Python SDK.

A Signal is a message sent asynchronously to a running Workflow Execution which can be used to change the state and control the flow of a Workflow Execution. It can only deliver data to a Workflow Execution that has not already closed.

Signals are defined in your code and handled in your Workflow Definition. Signals can be sent to Workflow Executions from a Temporal Client or from another Workflow Execution.

There are two steps for adding support for a Signal to your Workflow code:

  1. Defining the Signal - You specify the name and data structure used by Temporal Clients when sending the Signal.
  2. Handling the Signal - You write code that will be invoked when the Signal is received from a Temporal Client.

After defining and handling your Signal, you can send it from a Temporal Client or from another Workflow Execution.

Define Signal

How to define a Signal using the Python SDK.

A Signal has a name and can have arguments.

  • The name, also called a Signal type, is a string.
  • The arguments must be serializable. To define a Signal, set the Signal decorator @workflow.signal on the Signal function inside your Workflow.

Non-dynamic methods can only have positional arguments. Temporal suggests taking a single argument that is an object or data class of fields that can be added to as needed.

Return values from Signal methods are ignored.

How to customize names

You can have a name parameter to customize the Signal's name, otherwise it defaults to the name of the Signal method.

View the source code

in the context of the rest of the application code.

from temporalio import workflow
# ...
@workflow.signal
async def submit_greeting(self, name: str) -> None:
await self._pending_greetings.put(name)

@workflow.signal
def exit(self) -> None:
# ...
@workflow.signal(name="Custom Signal Name")
async def custom_signal(self, name: str) -> None:
await self._pending_greetings.put(name)

Handle Signal

How to handle a Signal using the Python SDK.

Workflows listen for Signals by the Signal's name.

Signal handlers are functions defined in the Workflow that listen for incoming Signals of a given type. These handlers define how a Workflow should react when it receives a specific type of Signal.

To send a Signal to the Workflow, use the signal method from the WorkflowHandle class.

from temporalio.client import Client
# ...
# ...
await handle.signal(GreetingWorkflow.submit_greeting, "User 1")

Send a Signal from a Temporal Client

How to send a Signal from a Temporal Client using the Python SDK.

When a Signal is sent successfully from the Temporal Client, the WorkflowExecutionSignaled Event appears in the Event History of the Workflow that receives the Signal.

To send a Signal from the Client, use the signal() function on the Workflow handle.

To get the Workflow handle, you can use any of the following options.

View the source code

in the context of the rest of the application code.

from temporalio.client import Client
# ...
# ...
client = await Client.connect("localhost:7233")
handle = await client.start_workflow(
GreetingWorkflow.run,
id="your-greeting-workflow",
task_queue="signal-tq",
)
await handle.signal(GreetingWorkflow.submit_greeting, "User 1")

Send a Signal from a Workflow

How to send a Signal from a Workflow using the Python SDK.

A Workflow can send a Signal to another Workflow, in which case it's called an External Signal.

When an External Signal is sent:

Use get_external_workflow_handle_for to get a typed Workflow handle to an existing Workflow by its identifier. Use get_external_workflow_handle when you don't know the type of the other Workflow.

note

The Workflow Type passed is only for type annotations and not for validation.

# ...
@workflow.defn
class WorkflowB:
@workflow.run
async def run(self) -> None:
handle = workflow.get_external_workflow_handle_for(WorkflowA.run, "workflow-a")
await handle.signal(WorkflowA.your_signal, "signal argument")

Signal-With-Start

How to send a Signal-With-Start using the Python SDK.

Signal-With-Start is used from the Client. It takes a Workflow Id, Workflow arguments, a Signal name, and Signal arguments.

If there's a Workflow running with the given Workflow Id, it will be signaled. If there isn't, a new Workflow will be started and immediately signaled.

To send a Signal-With-Start in Python, use the start_workflow() method and pass the start_signal argument with the name of your Signal.

from temporalio.client import Client
# ...
# ...
async def main():
client = await Client.connect("localhost:7233")
await client.start_workflow(
GreetingWorkflow.run,
id="your-signal-with-start-workflow",
task_queue="signal-tq",
start_signal="submit_greeting",
start_signal_args=["User Signal with Start"],
)

Queries

A Query is a synchronous operation that is used to get the state of a Workflow Execution.

How to define a Query

A Query has a name and can have arguments.

  • The name, also called a Query type, is a string.
  • The arguments must be serializable.

To define a Query, set the Query decorator @workflow.query on the Query function inside your Workflow.

Customize names

You can have a name parameter to customize the Query's name, otherwise it defaults to the name of the Query method.

note

You can either set the name or the dynamic parameter in a Query's decorator, but not both.

# ...
@workflow.query
def greeting(self) -> str:
return self._greeting

Handle a Query

How to handle a Query

Queries are handled by your Workflow.

Don’t include any logic that causes Command generation within a Query handler (such as executing Activities). Including such logic causes unexpected behavior.

To send a Query to the Workflow, use the query method from the WorkflowHandle class.

# ...
result = await handle.query(GreetingWorkflow.greeting)

Send a Query

How to send a Query

Queries are sent from a Temporal Client.

To send a Query to a Workflow Execution from Client code, use the query() method on the Workflow handle.

# ...
result = await handle.query(GreetingWorkflow.greeting)

Developing with Updates

An Update is a trackable request sent synchronously to a running Workflow Execution that can change the state and control the flow of a Workflow Execution, and return a result. The sender of the request must wait until the update is at least accepted or rejected by a Worker, and will often opt to wait further to receive the value returned by the Update handler, or an exception indicating what went wrong. Update handlers can do arbitrarily long-running async operations (like signal handlers, and the main workflow method).

Writing Update handlers and validators as a Workflow author

Here's a Workflow Definition that illustrates how to create an Update handler, and an associated validator:

@dataclass
class HelloWorldInput:
entity_to_be_greeted: str


@workflow.defn
class HelloWorldWorkflow:
def __init__(self):
self.entity_to_be_greeted: Optional[str] = None

@workflow.run
async def run(self) -> str:
await workflow.wait_condition(lambda: self.entity_to_be_greeted is not None)
return self.greeting()

@workflow.update
def set_greeting(self, input: HelloWorldInput) -> str:
self.entity_to_be_greeted = input.entity_to_be_greeted
return self.greeting()

@set_greeting.validator
def set_greeting_validator(self, input: HelloWorldInput) -> None:
if input.entity_to_be_greeted not in {"world", "World"}:
raise Exception(f"invalid entity: {input.entity_to_be_greeted}")

def greeting(self) -> str:
return f"Hello, {self.entity_to_be_greeted}!"

Note the following:

  • Update handlers and validators are defined using decorators, in a similar way to Signal and Query handlers.
  • This handler method does not do any long-running async operations; if it did, it would need to be an async def.
  • Examples of async operations that can be done in an update handler include asyncio.sleep(...), workflow.wait_condition(...), and execution of Activities and Child Workflows.
  • The handler method signature defines the argument type and return type that a client will use when sending an Update.
  • It is possible to use multiple arguments, but this is not recommended: instead use a single dataclass argument in which fields can be added/removed as needed.
  • Validators are optional; if you don't want to be able to reject updates then you don't need a validator.
  • To reject an Update, you raise an exception (of any type) in the validator.
  • The name of the decorator you use to define the validator is based on the name that you give to the handler.
  • The validator must take the same argument type as the handler, but always returns None.
  • The update decorator accepts arguments.

Sending an Update to a Workflow Execution

Recall that when sending an Update, the client will not receive a response until a Worker is available and the Update has been delivered to the Worker. If you want the server to send a response as soon as it receives your request, then you must use a Signal instead.

To send an Update to a Workflow Execution, you have two choices:

1. Use execute_update to wait for the update to complete

execute_update sends an Update and waits until it has been completed by a Worker. It returns the Update result:

# Wait until the update is completed
update_result = await workflow_handle.execute_update(
HelloWorldWorkflow.set_greeting,
HelloWorldInput("World"),
)

2. Use start_update to receive a handle as soon as the update is accepted or rejected

start_update sends an Update and waits until the Update has been accepted or rejected by a Worker. It returns an UpdateHandle:

# Wait until the update is accepted
update_handle = await workflow_handle.start_update(
HelloWorldWorkflow.set_greeting,
HelloWorldInput("World"),
)
# Wait until the update is completed
update_result = await update_handle.result()

Exceptions

The following exceptions might be raised by execute_update, or when calling update_handle.result() on a handle obtained from start_update:

  • temporalio.client.WorkflowUpdateFailedError

    The Update was either rejected by the validator, or the Workflow author deliberately failed the Update by raising ApplicationError in the handler.

  • temporalio.service.RPCError "workflow execution already completed"`

    This will happen in any of the following situations:

    • The WorkflowHandle that was used to send the Update referenced a non-existent Workflow.
    • The Workflow finished while the Update handler execution was in progress, for example because
      • The Workflow was canceled or was deliberately failed (the Workflow author raised ApplicationError outside an Update handler)
      • The Workflow completed normally or continued-as-new and the Workflow author did not wait for handlers to be finished.

If the Workflow handle references a Workflow that doesn't exist then execute_update and start_update will both raise temporalio.service.RPCError "sql: no rows in result set".

Dynamic Handler

What is a Dynamic Handler?

Temporal supports Dynamic Workflows, Activities, Signals, and Queries. These are unnamed handlers that are invoked if no other statically defined handler with the given name exists.

Dynamic Handlers provide flexibility to handle cases where the names of Workflows, Activities, Signals, or Queries aren't known at run time.

caution

Dynamic Handlers should be used judiciously as a fallback mechanism rather than the primary approach. Overusing them can lead to maintainability and debugging issues down the line.

Instead, Workflows, Activities, Signals, and Queries should be defined statically whenever possible, with clear names that indicate their purpose. Use static definitions as the primary way of structuring your Workflows.

Reserve Dynamic Handlers for cases where the handler names are not known at compile time and need to be looked up dynamically at runtime. They are meant to handle edge cases and act as a catch-all, not as the main way of invoking logic.

Set a Dynamic Signal

How to set a Dynamic Signal

A Dynamic Signal in Temporal is a Signal that is invoked dynamically at runtime if no other Signal with the same input is registered. A Signal can be made dynamic by adding dynamic=True to the @signal.defn decorator.

The Signal Handler should accept self, a string input, and a Sequence[temporalio.common.RawValue]. The payload_converter() function is used to convert a RawValue object to the desired type.

# ...
@workflow.signal(dynamic=True)
async def dynamic_signal(self, name: str, args: Sequence[RawValue]) -> None:
await self._pending_greetings.put(name)

Set a Dynamic Query

How to set a Dynamic Query

A Dynamic Query in Temporal is a Query that is invoked dynamically at runtime if no other Query with the same name is registered. A Query can be made dynamic by adding dynamic=True to the @query.defn decorator.

The Query Handler should accept self, a string name, and a Sequence[temporalio.common.RawValue]. The payload_converter() function is used to convert a RawValue object to the desired type.

# ...
@workflow.query(dynamic=True)
def dynamic_query(self, input: str, args: Sequence[RawValue]) -> str:
return self._greeting

Set a Dynamic Workflow

How to set a Dynamic Workflow

A Dynamic Workflow in Temporal is a Workflow that is invoked dynamically at runtime if no other Workflow with the same name is registered. A Workflow can be made dynamic by adding dynamic=True to the @workflow.defn decorator. You must register the Workflow with the Worker before it can be invoked.

The Workflow Definition must then accept a single argument of type Sequence[temporalio.common.RawValue]. The payload_converter() function is used to convert a RawValue object to the desired type.

# ...
@workflow.defn(dynamic=True)
class DynamicWorkflow:
@workflow.run
async def run(self, args: Sequence[RawValue]) -> str:
name = workflow.payload_converter().from_payload(args[0].payload, str)
return await workflow.execute_activity(
default_greeting,
YourDataClass("Hello", name),
start_to_close_timeout=timedelta(seconds=10),
)

Set a Dynamic Activity

How to set a Dynamic Activity

A Dynamic Activity in Temporal is an Activity that is invoked dynamically at runtime if no other Activity with the same name is registered. An Activity can be made dynamic by adding dynamic=True to the @activity.defn decorator. You must register the Activity with the Worker before it can be invoked.

The Activity Definition must then accept a single argument of type Sequence[temporalio.common.RawValue]. The payload_converter() function is used to convert a RawValue object to the desired type.

# ...
@activity.defn(dynamic=True)
async def dynamic_greeting(args: Sequence[RawValue]) -> str:
arg1 = activity.payload_converter().from_payload(args[0].payload, YourDataClass)
return (
f"{arg1.greeting}, {arg1.name}!\nActivity Type: {activity.info().activity_type}"
)
# ...
@workflow.defn
class GreetingWorkflow:
@workflow.run
async def run(self, name: str) -> str:
return await workflow.execute_activity(
"unregistered_activity",
YourDataClass("Hello", name),
start_to_close_timeout=timedelta(seconds=10),
)