tminusplus

Fault Tolerant Systems with Temporal

This article will explain why I like to build distributed fault tolerant systems with Temporal. We will start by discussing choreography vs orchestration, and then get into Temporal.

Choreography

In a choreographed systems, perhaps consisting of micro-services and event queues, processes are spread out across services which communicate with events:

# Payment Service
async def process_payment(order_id: str, amount: float):
    try:
        result = await payment_provider.charge(amount)
        await message_queue.publish("payment_completed", {
            "order_id": order_id,
            "transaction_id": result.id
        })
    except PaymentError as e:
        await message_queue.publish("payment_failed", {
            "order_id": order_id,
            "error": str(e)
        })
 
# Inventory Service
async def handle_payment_completed(event):
    order_id = event["order_id"]
    await database.update_inventory(order_id)
    await message_queue.publish("inventory_updated", {
        "order_id": order_id
    })
 
# Notification Service
async def handle_inventory_updated(event):
    order_id = event["order_id"]
    await email_service.send_confirmation(order_id)
# Payment Service
async def process_payment(order_id: str, amount: float):
    try:
        result = await payment_provider.charge(amount)
        await message_queue.publish("payment_completed", {
            "order_id": order_id,
            "transaction_id": result.id
        })
    except PaymentError as e:
        await message_queue.publish("payment_failed", {
            "order_id": order_id,
            "error": str(e)
        })
 
# Inventory Service
async def handle_payment_completed(event):
    order_id = event["order_id"]
    await database.update_inventory(order_id)
    await message_queue.publish("inventory_updated", {
        "order_id": order_id
    })
 
# Notification Service
async def handle_inventory_updated(event):
    order_id = event["order_id"]
    await email_service.send_confirmation(order_id)

This approach promises compelling benefits. Services can be developed and deployed independently. Teams work autonomously. Each service owns its logic and data. New services can be added without changing existing ones. The system scales naturally as each component handles its own load.

Choreography works well when processes are simple, state is minimal, timing is not critical, and failures are tolerable. Otherwise, reality gets messy.

When Choreography Breaks

Questions like “what’s the status of order X?” become difficult to answer. The payment service knows about the charge but not the inventory. The inventory service hasn’t seen the payment. The notification service is completely in the dark.

The problems compound when we add real-world requirements like:

  • Long-running processes with multi-step approvals, human intervention, and complex retry policies.
  • Failure handling across internal services and third-party APIs.
  • Visibility into the process for debugging, SLA monitoring, and state inspection.
  • Complex flows with conditional paths, dynamic routing, parallel processing, and complex error recovery.

We need a model that centralizes process flow and state management. We need orchestration.

Orchestration

Instead of a process being defined across services coordinating through events, orchestration defines a central workflow which executes the process. This is exactly the approach that Temporal enables:

@workflow
async def process_order(order: Order) -> str:
    inventory = await workflow.execute_activity(
        check_inventory,
        order.items,
        start_to_close_timeout=timedelta(minutes=5)
    )
    
    payment = await workflow.execute_activity(
        process_payment,
        order.payment,
        start_to_close_timeout=timedelta(minutes=5)
    )
    
    await workflow.execute_activity(
        update_inventory,
        order.items,
        start_to_close_timeout=timedelta(minutes=5)
    )
    
    return "Order completed"
@workflow
async def process_order(order: Order) -> str:
    inventory = await workflow.execute_activity(
        check_inventory,
        order.items,
        start_to_close_timeout=timedelta(minutes=5)
    )
    
    payment = await workflow.execute_activity(
        process_payment,
        order.payment,
        start_to_close_timeout=timedelta(minutes=5)
    )
    
    await workflow.execute_activity(
        update_inventory,
        order.items,
        start_to_close_timeout=timedelta(minutes=5)
    )
    
    return "Order completed"

This now means that:

  1. State becomes explicit - the workflow maintains a clear record of what happened.
  2. Failure handling becomes explicit - as defined in the workflow.
  3. Process tracking becomes trivial - we can see exactly where each workflow stands.
  4. Recovery becomes automatic - if the system goes down, it will resume from the last workflow state after recovering.

The complexity moves from individual services that each team builds, into reliable infrastructure that has proven itself across many companies over many years.

But how does it work?

Temporal Overview

We will take a high-level explanation of Temporal, and avoid introducing too many concepts like namespaces, versioning, signals, timers, child workflows, cluster replication etc.

Service Architecture

The Temporal Server maintains the source of truth about what should happen. It stores workflow histories, dispatches tasks, tracks timeouts, and schedules retries. It contains multiple services and a database. It provides a crucial guarantee: to maintain a durable record of workflow histories.

Workers handle your code execution, and are hosted by you. They pick up tasks, run your workflow and activity code, and then provides an updated workflow history to the server. They are stateless, and can encrypt payload data before sending it to the server.

This separation between the server and workers creates a crucial property: program state exists independently from execution. Even if all your workers crash, the server maintains a durable record of what needs to happen next.

Workflows

Workflow code must run deterministically. This means it produces identical event logs across runs while potentially appending new events which trigger server tasks.

This might seem counterintuitive. Most code contains side effects and runs from top-to-bottom once. Workflow code, however, eliminates side effects and executes many times within a single workflow execution.

Consider this example:

@workflow
async def process_order(order: Order) -> str:
    inventory = await workflow.execute_activity(
        check_inventory,
        order.items,
        start_to_close_timeout=timedelta(minutes=5)
    )
    
    payment = await workflow.execute_activity(
        process_payment,
        order.payment,
        start_to_close_timeout=timedelta(days=1)
    )
    
    await workflow.execute_activity(
        update_inventory,
        order.items,
        start_to_close_timeout=timedelta(minutes=5)
    )
    
    return "Order completed"
@workflow
async def process_order(order: Order) -> str:
    inventory = await workflow.execute_activity(
        check_inventory,
        order.items,
        start_to_close_timeout=timedelta(minutes=5)
    )
    
    payment = await workflow.execute_activity(
        process_payment,
        order.payment,
        start_to_close_timeout=timedelta(days=1)
    )
    
    await workflow.execute_activity(
        update_inventory,
        order.items,
        start_to_close_timeout=timedelta(minutes=5)
    )
    
    return "Order completed"

A single workflow execution spans multiple workflow tasks, and builds a workflow history stored in the server:

WorkflowExecutionStarted
  - workflow_type: "process_order"
  - input: [serialized order data]

WorkflowTaskScheduled
WorkflowTaskStarted
WorkflowTaskCompleted
  - Commands: [ActivityTaskScheduled(check_inventory)]
  
ActivityTaskScheduled(check_inventory)
  - activity_type: "check_inventory"
  - input: [serialized items]
  - schedule_to_close_timeout: 5 minutes
ActivityTaskStarted(check_inventory)
ActivityTaskCompleted(check_inventory)
  - result: {"available": true}
  
WorkflowTaskScheduled
WorkflowTaskStarted
WorkflowTaskCompleted
  - Commands: [ActivityTaskScheduled(process_payment)]
  
ActivityTaskScheduled(process_payment)
  - activity_type: "process_payment"
  - input: [payment details]
  - schedule_to_close_timeout: 5 minutes
ActivityTaskStarted(process_payment)
ActivityTaskCompleted(process_payment)
  - result: {"success": true}
  
WorkflowTaskScheduled
WorkflowTaskStarted
WorkflowTaskCompleted
  - Commands: [ActivityTaskScheduled(update_inventory)]

ActivityTaskScheduled(update_inventory)
  - activity_type: "update_inventory" 
  - input: [items to update]
  - schedule_to_close_timeout: 5 minutes
ActivityTaskStarted(update_inventory)
ActivityTaskCompleted(update_inventory)
  
WorkflowTaskScheduled
WorkflowTaskStarted
WorkflowTaskCompleted
  - Commands: [CompleteWorkflowExecution]
  
WorkflowExecutionCompleted
  - result: "Order completed"
WorkflowExecutionStarted
  - workflow_type: "process_order"
  - input: [serialized order data]

WorkflowTaskScheduled
WorkflowTaskStarted
WorkflowTaskCompleted
  - Commands: [ActivityTaskScheduled(check_inventory)]
  
ActivityTaskScheduled(check_inventory)
  - activity_type: "check_inventory"
  - input: [serialized items]
  - schedule_to_close_timeout: 5 minutes
ActivityTaskStarted(check_inventory)
ActivityTaskCompleted(check_inventory)
  - result: {"available": true}
  
WorkflowTaskScheduled
WorkflowTaskStarted
WorkflowTaskCompleted
  - Commands: [ActivityTaskScheduled(process_payment)]
  
ActivityTaskScheduled(process_payment)
  - activity_type: "process_payment"
  - input: [payment details]
  - schedule_to_close_timeout: 5 minutes
ActivityTaskStarted(process_payment)
ActivityTaskCompleted(process_payment)
  - result: {"success": true}
  
WorkflowTaskScheduled
WorkflowTaskStarted
WorkflowTaskCompleted
  - Commands: [ActivityTaskScheduled(update_inventory)]

ActivityTaskScheduled(update_inventory)
  - activity_type: "update_inventory" 
  - input: [items to update]
  - schedule_to_close_timeout: 5 minutes
ActivityTaskStarted(update_inventory)
ActivityTaskCompleted(update_inventory)
  
WorkflowTaskScheduled
WorkflowTaskStarted
WorkflowTaskCompleted
  - Commands: [CompleteWorkflowExecution]
  
WorkflowExecutionCompleted
  - result: "Order completed"

If our system crashes after processing payment but before updating inventory, Temporal knows exactly where we were and what needs to happen next. When the system comes back online, the server dispatches the task to continue executing on the worker.

Activities

Pure deterministic execution within workflows has limits. Real systems must interact with non-deterministic systems like calling an API. Activities provide this bridge in Temporal.

Consider an example activity to call the inventory service API:

@activity.defn
async def check_inventory(items):
    return await inventory_service.check(items)
@activity.defn
async def check_inventory(items):
    return await inventory_service.check(items)

If it completes successfully, then it will store the output into the workflow history in the corresponding ActivityTaskCompleted event.

If it fails to complete, it will retry according to the retry policy the workflow executed the activity with. If it times out, then you will receive an explicit error you can handle in the workflow.

Note that activities should be idempotent to ensure safe retries, and good workflow design means the workflow never fails.

Workflow Design Patterns

Now that you have a basic understanding of what Temporal is, we will go into design patterns.

Treat this as pseudo-code. I fed an LLM some of the Temporal documentation and Python SDK to generate these because blogs can take a long time to write and I do not have a lot of time.

There are no silver bullets in software, and that applies to workflow design as well. Do not cargo cult design patterns without understanding their trade-offs. Set retries and timeouts appropriate for your specific use-case.

Saga Pattern: Coordinated Transactions

You may require multiple things to happen together, or not at all, but you are subject to failures such as network partitions. The saga pattern can help you ensure that operations are atomic:

@workflow
async def transfer_funds(transfer: TransferRequest) -> str:
    try:
        # Debit source account
        debit = await workflow.execute_activity(
            debit_account,
            DebitRequest(
                account_id=transfer.source_id,
                amount=transfer.amount
            )
        )
        
        try:
            # Credit destination account
            credit = await workflow.execute_activity(
                credit_account,
                CreditRequest(
                    account_id=transfer.dest_id,
                    amount=transfer.amount
                )
            )
            return "Transfer complete"
            
        except Exception:
            # Compensation: Reverse debit
            await workflow.execute_activity(
                reverse_debit,
                debit.transaction_id
            )
            return "Transfer failed"
@workflow
async def transfer_funds(transfer: TransferRequest) -> str:
    try:
        # Debit source account
        debit = await workflow.execute_activity(
            debit_account,
            DebitRequest(
                account_id=transfer.source_id,
                amount=transfer.amount
            )
        )
        
        try:
            # Credit destination account
            credit = await workflow.execute_activity(
                credit_account,
                CreditRequest(
                    account_id=transfer.dest_id,
                    amount=transfer.amount
                )
            )
            return "Transfer complete"
            
        except Exception:
            # Compensation: Reverse debit
            await workflow.execute_activity(
                reverse_debit,
                debit.transaction_id
            )
            return "Transfer failed"

Admittedly, this is a classic example to give but also the least used pattern in my experience. I believe a major reason for this, is because most of the workflows I have written are designed to eventually succeed even if a fault occurs.

Polling Pattern: External Process Integration

If you ever call an API that is asynchronous, meaning it ends up starting a long-running progress in the background, you can use the polling pattern to elegantly await for the completion of it:

@workflow
async def track_payment(payment: Payment) -> str:
    # Submit payment to external system
    tracking_id = await workflow.execute_activity(
        submit_payment,
        payment
    )
    
    # Checks every 60s, fails after four hours.
    status = await workflow.execute_activity(
		    check_payment_status,
        CheckPaymentStatusInput(tracking_id=tracking_id),
        start_to_close_timeout=timedelta(seconds=5),
        schedule_to_close_timeout=timedelta(hours=4),
        retry_policy=RetryPolicy(
            backoff_coefficient=1.0,
            initial_interval=timedelta(seconds=60),
        ),
    )
    
    return status
@workflow
async def track_payment(payment: Payment) -> str:
    # Submit payment to external system
    tracking_id = await workflow.execute_activity(
        submit_payment,
        payment
    )
    
    # Checks every 60s, fails after four hours.
    status = await workflow.execute_activity(
		    check_payment_status,
        CheckPaymentStatusInput(tracking_id=tracking_id),
        start_to_close_timeout=timedelta(seconds=5),
        schedule_to_close_timeout=timedelta(hours=4),
        retry_policy=RetryPolicy(
            backoff_coefficient=1.0,
            initial_interval=timedelta(seconds=60),
        ),
    )
    
    return status

We did not go into workflow histories in this post, but the advantage to this method is that it is simple and keeps the workflow history size small.

Batch Processing Pattern: High-Volume Operations

If you have a large amount of work to complete, such as with batch processing, you can fan-out the work to child workflows.

@workflow
async def process_batch(batch: BatchConfig) -> BatchResult:
    futures = []
    
    for chunk in create_chunks(batch.items, size=100):
        handle = workflow.start_child_workflow(
            process_chunk,
            chunk,
        )
        futures.append(handle)
        
    return await asyncio.gather(*futures)
@workflow
async def process_batch(batch: BatchConfig) -> BatchResult:
    futures = []
    
    for chunk in create_chunks(batch.items, size=100):
        handle = workflow.start_child_workflow(
            process_chunk,
            chunk,
        )
        futures.append(handle)
        
    return await asyncio.gather(*futures)

More advanced use-cases will add a concurrency limiter on the calls, using a wait condition, so that you don’t overwhelm your system when handling batches.

Actor Pattern: Entity Management

This is, in my opinion, the holy grail of workflow use-cases because it represents the idea of orchestration systems mentioned at the beginning of this post:

@workflow
class PlayerWorkflow:
    def __init__(self):
        self._state = "INITIALIZING"
        self._inventory = {}
        self._health = 100
        self._position = Position(0, 0)
        self._pending_actions = []
        
    @workflow.run
    async def run(self, player_id: str) -> None:
        self._state = "ACTIVE"
        
        while True:
            # Process any pending actions
            while self._pending_actions:
                action = self._pending_actions.pop(0)
                await self._process_action(action)
                
            # Check if we need to continue-as-new
            if self._should_continue_as_new():
                workflow.continue_as_new(
                    player_id,
                    state=self._get_continuation_state()
                )
                
            # Wait for next action
            await workflow.wait_condition(
                lambda: bool(self._pending_actions)
            )
    
    @workflow.signal
    async def submit_action(self, action: PlayerAction):
        self._pending_actions.append(action)
        
    @workflow.query
    def get_player_state(self) -> PlayerState:
        return PlayerState(
            health=self._health,
            position=self._position,
            inventory=self._inventory
        )
        
    async def _process_action(self, action: PlayerAction):
        if action.type == "MOVE":
            await self._handle_move(action.data)
        elif action.type == "INVENTORY":
            await self._handle_inventory(action.data)
        elif action.type == "COMBAT":
            await self._handle_combat(action.data)
@workflow
class PlayerWorkflow:
    def __init__(self):
        self._state = "INITIALIZING"
        self._inventory = {}
        self._health = 100
        self._position = Position(0, 0)
        self._pending_actions = []
        
    @workflow.run
    async def run(self, player_id: str) -> None:
        self._state = "ACTIVE"
        
        while True:
            # Process any pending actions
            while self._pending_actions:
                action = self._pending_actions.pop(0)
                await self._process_action(action)
                
            # Check if we need to continue-as-new
            if self._should_continue_as_new():
                workflow.continue_as_new(
                    player_id,
                    state=self._get_continuation_state()
                )
                
            # Wait for next action
            await workflow.wait_condition(
                lambda: bool(self._pending_actions)
            )
    
    @workflow.signal
    async def submit_action(self, action: PlayerAction):
        self._pending_actions.append(action)
        
    @workflow.query
    def get_player_state(self) -> PlayerState:
        return PlayerState(
            health=self._health,
            position=self._position,
            inventory=self._inventory
        )
        
    async def _process_action(self, action: PlayerAction):
        if action.type == "MOVE":
            await self._handle_move(action.data)
        elif action.type == "INVENTORY":
            await self._handle_inventory(action.data)
        elif action.type == "COMBAT":
            await self._handle_combat(action.data)

My caution is that this is an advanced use-case, and if I was to write an actor workflow I would do it in a very different way than this. For example, safe_message_handlers in temporalio/samples-python show one nice way of handling messages in a workflow.

But the concept of having a single workflow that processes all messages for an entity is accurately represented.

And So Forth

We have barely even started talking about workflow design or patterns. The one cost of adopting Temporal is that you must learn how to design workflows. This means the adoption cost is slightly higher, although I believe often overstated, but this is also the strength of the tool because it forces you to make things like retry policies and timeouts explicit.

Focus on Solving Problems

It should be evident why I like Temporal for building distributed systems. Really, it lets me focus on solving problems rather than building reliable, usable, event infrastructure.

If you would like to learn more, you can find real workflow examples at github.com/temporalio/samples-python. There are sample repos for every language they support.

There are also several great talks available about Temporal on YouTube:

Hope you found this interesting.