Long-Running and Durable Agents

Publish at:

Most agent demonstrations last seconds. A user asks a question, the model reasons through a few steps, calls a tool or two, and returns an answer. The entire lifecycle fits inside a single HTTP request. Production agents are different. A research agent synthesizing a market report might run for twenty minutes. A code migration agent refactoring a large codebase could work for hours. An agent managing a multi-step approval workflow might span days — waiting for human input, resuming when approvals arrive, and picking up exactly where it left off.

The challenge is durability. How do you build an agent that survives process restarts, infrastructure failures, and the simple passage of time — without losing its progress or corrupting its state?

Standard agent loops assume they run to completion in a single process lifetime. That assumption breaks the moment a task takes longer than a request timeout, longer than a container's lifespan, or longer than the patience of a load balancer. Durable agents replace this assumption with a different contract: execution progress is persisted, and the agent can resume from any checkpoint at any time, in any process.

Why Agents Need Durability #

Three forces push agents toward long-running execution:

Task complexity. Some tasks are inherently large. Migrating a 500-file codebase, analyzing a year of financial filings, or generating a comprehensive competitor report involves hundreds of tool calls and model invocations. You cannot squeeze this into a single request-response cycle.

External dependencies with latency. Agents that interact with humans (waiting for approvals), external APIs (rate-limited to N requests per minute), or asynchronous processes (CI pipelines, deployment checks) must wait. The wait might be milliseconds or days. An agent that holds open a thread while waiting wastes resources and risks timeout.

Reliability requirements. If a four-hour research task fails at the three-hour mark because of a transient network error, restarting from scratch is unacceptable. The cost — in tokens, time, and API calls — is too high. The agent needs to resume from the last known-good state.

┌───────────────────────────────────────────────────────────────┐
│                    Short-Lived Agent                          │
│                                                               │
│  Request ──► Reason ──► Act ──► Observe ──► ... ──► Response  │
│                                                               │
│  Lifecycle: seconds to minutes                                │
│  State: in-memory only                                        │
│  Failure mode: total loss, restart from scratch               │
└───────────────────────────────────────────────────────────────┘

┌───────────────────────────────────────────────────────────────┐
│                    Durable Agent                              │
│                                                               │
│  Trigger ──► [Checkpoint] ──► Act ──► [Checkpoint] ──► Wait   │
│                                         │                     │
│          Resume ◄── External Event ─────┘                     │
│             │                                                 │
│             ▼                                                 │
│  [Checkpoint] ──► Act ──► [Checkpoint] ──► Complete           │
│                                                               │
│  Lifecycle: minutes to days                                   │
│  State: persisted after every step                            │
│  Failure mode: resume from last checkpoint                    │
└───────────────────────────────────────────────────────────────┘

Checkpointing - The Core Primitive #

A checkpoint is a snapshot of the agent's complete state at a specific point in execution. If the process dies after a checkpoint, the agent can be rehydrated from that snapshot and continue as if nothing happened. The key question is: what exactly do you persist?

An agent's state includes several components:

  • The task definition. The original goal, constraints, and parameters.
  • Execution history. Every reasoning step, tool call, and observation so far.
  • Working memory. Accumulated facts, intermediate results, and scratchpad notes.
  • Plan state. Which sub-tasks are complete, which are pending, and what the current step is.
  • Cursor position. Where in the execution graph the agent currently sits.

The simplest checkpoint strategy is event sourcing: persist every event (model call, tool response, decision) as an append-only log. To resume, replay the log from the beginning to reconstruct state. This is conceptually clean but can be expensive for long-running agents — replaying thousands of events to reconstruct state introduces latency on resume.

A more practical approach combines event sourcing with periodic state snapshots: persist the full state every N steps (or after expensive operations), and keep the event log for fine-grained replay between snapshots.

from dataclasses import dataclass, field
from typing import Any
import json
import time


@dataclass
class Checkpoint:
    checkpoint_id: str
    task_id: str
    step_index: int
    timestamp: float = field(default_factory=time.time)
    agent_state: dict[str, Any] = field(default_factory=dict)
    execution_history: list[dict] = field(default_factory=list)
    plan: dict[str, Any] = field(default_factory=dict)
    metadata: dict[str, Any] = field(default_factory=dict)


class CheckpointStore:
    """Persist and retrieve agent checkpoints."""

    def __init__(self, backend):
        self.backend = backend

    async def save(self, checkpoint: Checkpoint) -> None:
        key = f"{checkpoint.task_id}/{checkpoint.checkpoint_id}"
        payload = json.dumps(checkpoint.__dict__, default=str)
        await self.backend.put(key, payload)

    async def load_latest(self, task_id: str) -> Checkpoint | None:
        entries = await self.backend.list_prefix(task_id)
        if not entries:
            return None
        latest = max(entries, key=lambda e: e.timestamp)
        data = json.loads(await self.backend.get(latest.key))
        return Checkpoint(**data)

    async def list_checkpoints(self, task_id: str) -> list[Checkpoint]:
        entries = await self.backend.list_prefix(task_id)
        return [
            Checkpoint(**json.loads(await self.backend.get(e.key)))
            for e in sorted(entries, key=lambda e: e.timestamp)
        ]

What to Checkpoint and When #

Not every micro-step needs a checkpoint. Persisting after every single token generation would be wasteful. The right granularity depends on the cost of replaying from the last checkpoint versus the cost of storing checkpoints.

Practical heuristics:

  • After every tool call. Tool calls are the most expensive and side-effect-producing operations. If the agent crashes after executing a tool but before recording the result, you risk re-executing a side effect (sending an email twice, creating a duplicate record).
  • After plan changes. When the agent revises its plan — adding sub-tasks, reprioritizing, or marking steps complete — persist immediately. Plan state is cheap to store and expensive to recompute.
  • Before long waits. If the agent is about to wait for an external event (human approval, CI result, rate-limit cooldown), checkpoint first. The process can safely shut down during the wait and resume when the event arrives.
  • At regular time intervals. As a safety net, checkpoint every N minutes regardless of what is happening. This bounds the worst-case replay cost.

The Durable Execution Model #

Durable execution is a pattern where the runtime — not the developer — handles persistence and replay. The developer writes sequential code as if it runs to completion in one shot. The runtime intercepts each operation, persists it, and guarantees that on replay, completed operations return their previous results without re-executing.

This is the core idea behind durable workflow engines. The developer writes:

async def research_agent(ctx: DurableContext, topic: str):
    # Step 1: Generate research plan
    plan = await ctx.run("plan", generate_plan, topic)

    # Step 2: Execute each research step
    findings = []
    for step in plan.steps:
        result = await ctx.run(f"research_{step.id}", execute_step, step)
        findings.append(result)

    # Step 3: Wait for human review (may take hours/days)
    approval = await ctx.wait_for_event("human_review", timeout_hours=72)

    if approval.status == "approved":
        # Step 4: Synthesize final report
        report = await ctx.run("synthesize", synthesize_report, findings)
        return report
    else:
        return await ctx.run("revise", revise_and_resubmit, findings, approval.feedback)

From the developer's perspective, this looks like a normal async function. But the runtime makes it durable:

  • Each ctx.run() call is persisted before execution. If the process dies mid-execution, on restart the runtime replays all completed ctx.run() calls by returning their stored results (without re-executing), then resumes from where it left off.
  • ctx.wait_for_event() suspends the workflow. The process can shut down entirely. When the event arrives (hours or days later), the runtime spins up a new process, replays to the suspension point, delivers the event, and continues.
  • The entire execution is idempotent. Replaying produces the same result because completed steps return cached outcomes.

Implementing Durable Steps #

The critical mechanism is step identity. Each operation must have a stable identifier so the runtime knows whether it has already been executed. The pattern:

class DurableContext:
    def __init__(self, task_id: str, store: CheckpointStore, resume_event=None):
        self.task_id = task_id
        self.store = store
        self.resume_event = resume_event
        self.completed_steps: dict[str, Any] = {}
        self.current_step: int = 0

    async def run(self, step_id: str, fn, *args, **kwargs):
        """Execute a step durably. Returns cached result on replay."""
        if step_id in self.completed_steps:
            return self.completed_steps[step_id]

        result = await fn(*args, **kwargs)

        self.completed_steps[step_id] = result
        self.current_step += 1
        await self._persist()
        return result

    async def wait_for_event(self, event_type: str, timeout_hours: float = 24):
        """Suspend execution until an external event arrives."""
        event_id = f"wait_{event_type}_{self.current_step}"

        if event_id in self.completed_steps:
            return self.completed_steps[event_id]

        resume_event_type = (
            getattr(self.resume_event, "type", None)
            if self.resume_event is not None
            else None
        )
        if resume_event_type is None and isinstance(self.resume_event, dict):
            resume_event_type = self.resume_event.get("type")

        if self.resume_event and resume_event_type == event_type:
            self.completed_steps[event_id] = self.resume_event
            self.current_step += 1
            await self._persist()
            return self.resume_event

        await self._persist()
        # Signal the runtime to suspend this workflow
        raise SuspendExecution(
            task_id=self.task_id,
            resume_on=event_type,
            timeout_hours=timeout_hours,
        )

    async def _persist(self):
        checkpoint = Checkpoint(
            checkpoint_id=f"step_{self.current_step}",
            task_id=self.task_id,
            step_index=self.current_step,
            agent_state={"completed_steps": self.completed_steps},
            plan={},
        )
        await self.store.save(checkpoint)

State Machines for Agent Lifecycles #

Long-running agents benefit from an explicit state machine that governs their lifecycle. Rather than an implicit "running" or "stopped" dichotomy, a durable agent moves through well-defined states with clear transitions:

┌──────────┐    trigger    ┌──────────┐   checkpoint   ┌───────────┐
│  PENDING │──────────────►│ RUNNING  │───────────────►│ SUSPENDED │
└──────────┘               └──────────┘                └───────────┘
                                │  │                         │
                          fail  │  │ complete          event │
                                ▼  ▼                         ▼
                         ┌──────────┐                  ┌───────────┐
                         │  FAILED  │                  │  RUNNING  │
                         └──────────┘                  └───────────┘
                                │                            │
                          retry │                            │ complete
                                ▼                            ▼
                         ┌──────────┐                  ┌───────────┐
                         │ RUNNING  │                  │ COMPLETED │
                         └──────────┘                  └───────────┘
from enum import Enum


class AgentState(Enum):
    PENDING = "pending"
    RUNNING = "running"
    SUSPENDED = "suspended"
    COMPLETED = "completed"
    FAILED = "failed"
    CANCELLED = "cancelled"


class DurableAgent:
    def __init__(self, task_id: str, store: CheckpointStore):
        self.task_id = task_id
        self.store = store
        self.state = AgentState.PENDING
        self.retry_count = 0
        self.max_retries = 3

    async def start(self):
        self.state = AgentState.RUNNING
        await self._execute()

    async def resume(self, event=None):
        """Resume from last checkpoint, optionally with a triggering event."""
        checkpoint = await self.store.load_latest(self.task_id)
        if not checkpoint:
            raise NoCheckpointFound(self.task_id)

        self.state = AgentState.RUNNING
        await self._execute(from_checkpoint=checkpoint, event=event)

    async def _execute(self, from_checkpoint=None, event=None):
        try:
            ctx = DurableContext(self.task_id, self.store, resume_event=event)
            if from_checkpoint:
                ctx.completed_steps = from_checkpoint.agent_state.get(
                    "completed_steps", {}
                )
                ctx.current_step = from_checkpoint.step_index

            result = await self._run_workflow(ctx, event)
            self.state = AgentState.COMPLETED
            return result

        except SuspendExecution as suspend:
            self.state = AgentState.SUSPENDED
            await self._register_wake_trigger(suspend)

        except Exception as e:
            if self.retry_count < self.max_retries:
                self.retry_count += 1
                self.state = AgentState.RUNNING
                await self._execute(from_checkpoint=from_checkpoint)
            else:
                self.state = AgentState.FAILED
                raise

Each state transition is persisted. An external scheduler or event router can inspect the state to decide what action to take — retry a failed agent, deliver an event to a suspended one, or alert an operator about a stuck task.

Idempotency and Side Effects #

Durability introduces a subtle but critical problem: side effects must be idempotent. If an agent sends an email at step 7 and then crashes at step 8, resuming replays step 7. Without protection, the email is sent twice.

Three strategies address this:

Idempotency keys. Every side-effecting operation carries a unique key derived from the task ID and step ID. The receiving system uses this key to deduplicate. If the same key arrives twice, the second request is a no-op.

async def send_email_idempotent(ctx: DurableContext, recipient: str, body: str):
    idempotency_key = f"{ctx.task_id}_step_{ctx.current_step}_email"

    result = await ctx.run(
        f"email_{recipient}",
        email_client.send,
        to=recipient,
        body=body,
        idempotency_key=idempotency_key,
    )
    return result

Outbox pattern. Instead of executing the side effect directly, the agent writes the intent to an outbox table within the same transaction that updates its checkpoint. A separate process reads the outbox and executes the side effects exactly once, marking them as delivered.

Compensation. For operations that cannot be made idempotent (rare in practice), the agent records a compensation action — the inverse operation. If a duplicate is detected after the fact, the compensation undoes the extra effect. This is common in financial systems where a double-charge must be refunded rather than prevented.

Managing Context Across Long Lifetimes #

A long-running agent accumulates context over time. A research agent that has been running for four hours might have thousands of tool results, dozens of reasoning traces, and a multi-level plan tree. The context window cannot hold all of it simultaneously.

This demands a progressive summarization strategy:

class LongRunningContextManager:
    def __init__(self, max_context_tokens: int = 100_000):
        self.max_tokens = max_context_tokens
        self.active_context: list[dict] = []
        self.archived_summaries: list[str] = []
        self.total_steps: int = 0

    async def add_step_result(self, step: dict, model):
        self.active_context.append(step)
        self.total_steps += 1

        if self._estimate_tokens(self.active_context) > self.max_tokens * 0.8:
            await self._compress(model)

    async def _compress(self, model):
        """Summarize older context to free space for new work."""
        # Keep the most recent 20% of steps in full detail
        keep_count = max(5, len(self.active_context) // 5)
        to_summarize = self.active_context[:-keep_count]
        to_keep = self.active_context[-keep_count:]

        summary = await model.generate(
            f"Summarize these {len(to_summarize)} agent steps into a concise "
            f"paragraph preserving key facts, decisions, and results:\n\n"
            + json.dumps(to_summarize, indent=2)
        )

        self.archived_summaries.append(summary)
        self.active_context = to_keep

    def build_prompt_context(self) -> str:
        """Assemble context for the next model call."""
        parts = []
        if self.archived_summaries:
            parts.append("## Previous Work (summarized)\n")
            for s in self.archived_summaries:
                parts.append(f"- {s}\n")
        parts.append("\n## Recent Steps (full detail)\n")
        for step in self.active_context:
            parts.append(json.dumps(step) + "\n")
        return "\n".join(parts)

The tradeoff is clear: summarization loses detail. A fact buried in step 47 might be relevant at step 200, but if it was summarized away, the agent cannot access it. Mitigation strategies include:

  • Structured working memory. Key facts (variable values, decisions, discovered constraints) are stored in a separate key-value store outside the context window, queryable on demand.
  • Selective retrieval. Before each reasoning step, the agent can query its own history ("What did I learn about X?") using vector search over past steps.
  • Plan-level annotations. Important findings are attached to plan nodes, which remain accessible even after individual step details are compressed.

Scheduling and Wake-Up Triggers #

A suspended agent needs something to wake it up. The scheduling layer sits between the outside world and the agent runtime, routing events to the correct agent instance:

class AgentScheduler:
    def __init__(self, store: CheckpointStore, agent_registry: dict):
        self.store = store
        self.registry = agent_registry
        self.timers: dict[str, float] = {}

    async def handle_event(self, event_type: str, payload: dict):
        """Route an external event to the correct suspended agent."""
        task_id = payload.get("task_id")
        if not task_id:
            return

        agent = self.registry.get(task_id)
        if agent and agent.state == AgentState.SUSPENDED:
            await agent.resume(event=payload)

    async def schedule_timer(self, task_id: str, delay_seconds: float):
        """Wake an agent after a delay (rate-limit cooldown, polling)."""
        wake_time = time.time() + delay_seconds
        self.timers[task_id] = wake_time

    async def tick(self):
        """Called periodically to fire expired timers."""
        now = time.time()
        expired = [tid for tid, t in self.timers.items() if t <= now]
        for task_id in expired:
            del self.timers[task_id]
            await self.handle_event("timer", {"task_id": task_id})

Common wake-up triggers:

  • Human input. An approval arrives, or a user provides clarification.
  • Webhook. A CI pipeline completes, a payment processes, or a document finishes OCR.
  • Timer. A rate-limit cooldown expires, or a polling interval elapses.
  • Dependency completion. Another agent (or sub-task) finishes and posts its result.

Handling Partial Failures in Multi-Step Tasks #

Long-running agents face a harder failure recovery problem than short-lived ones. If step 47 of 200 fails, you need to decide: retry the step? Skip it and continue? Roll back to a known-good state? Abort the entire task?

The answer depends on the dependency structure of the plan:

class TaskNode:
    def __init__(self, node_id: str, depends_on: list[str] = None):
        self.node_id = node_id
        self.depends_on = depends_on or []
        self.status = "pending"  # pending, running, done, failed, skipped
        self.result = None
        self.error = None


class DurablePlanExecutor:
    def __init__(self, nodes: list[TaskNode], ctx: DurableContext):
        self.nodes = {n.node_id: n for n in nodes}
        self.ctx = ctx

    async def execute(self):
        while self._has_pending_work():
            ready = self._get_ready_nodes()
            if not ready:
                # All remaining nodes are blocked by failures
                break

            # Execute independent nodes in parallel
            results = await asyncio.gather(
                *[self._execute_node(n) for n in ready],
                return_exceptions=True,
            )

            for node, result in zip(ready, results):
                if isinstance(result, Exception):
                    node.status = "failed"
                    node.error = str(result)
                    self._propagate_failure(node)
                else:
                    node.status = "done"
                    node.result = result

            await self.ctx._persist()

    def _get_ready_nodes(self) -> list[TaskNode]:
        """Return nodes whose dependencies are all satisfied."""
        return [
            n for n in self.nodes.values()
            if n.status == "pending"
            and all(
                self.nodes[dep].status == "done"
                for dep in n.depends_on
            )
        ]

    def _propagate_failure(self, failed_node: TaskNode):
        """Mark downstream dependents as skipped."""
        for node in self.nodes.values():
            if failed_node.node_id in node.depends_on and node.status == "pending":
                node.status = "skipped"
                self._propagate_failure(node)

    def _has_pending_work(self) -> bool:
        return any(n.status == "pending" for n in self.nodes.values())

This gives the agent a partial success model: independent branches continue even if one branch fails. After execution, the orchestrator can report which tasks succeeded, which failed, and which were skipped — giving a human operator actionable information for recovery.

Budget and Time Guards #

Long-running agents can consume significant resources. Without guards, a confused agent in a retry loop can burn through thousands of dollars in API costs or run indefinitely without producing useful output.

@dataclass
class ExecutionBudget:
    max_tokens: int = 1_000_000
    max_cost_usd: float = 50.0
    max_wall_time_seconds: float = 3600 * 8  # 8 hours
    max_steps: int = 500

    # Running totals
    tokens_used: int = 0
    cost_usd: float = 0.0
    start_time: float = field(default_factory=time.time)
    steps_executed: int = 0

    def check(self):
        """Raise if any budget limit is exceeded."""
        if self.tokens_used > self.max_tokens:
            raise BudgetExceeded(f"Token limit: {self.tokens_used}/{self.max_tokens}")
        if self.cost_usd > self.max_cost_usd:
            raise BudgetExceeded(f"Cost limit: ${self.cost_usd:.2f}/${self.max_cost_usd:.2f}")
        elapsed = time.time() - self.start_time
        if elapsed > self.max_wall_time_seconds:
            raise BudgetExceeded(f"Time limit: {elapsed:.0f}s/{self.max_wall_time_seconds:.0f}s")
        if self.steps_executed > self.max_steps:
            raise BudgetExceeded(f"Step limit: {self.steps_executed}/{self.max_steps}")

    def record_step(self, tokens: int, cost: float):
        self.tokens_used += tokens
        self.cost_usd += cost
        self.steps_executed += 1
        self.check()

Budget enforcement must itself be durable. If an agent resumes after a crash, it must restore its running totals from the checkpoint — otherwise it resets to zero and the limits become meaningless.

A more sophisticated approach uses budget tiers: the agent operates freely within a low-cost tier, requests human approval before entering a higher tier, and hard-stops at an absolute ceiling. This balances autonomy with cost control.

Concrete Example: A Durable Code Migration Agent #

Consider an agent tasked with migrating a large codebase from one framework to another. The migration touches 300 files, requires running tests after each batch, and needs human review at three checkpoints.

async def code_migration_agent(ctx: DurableContext, config: MigrationConfig):
    # Phase 1: Analysis (fast, cheap)
    analysis = await ctx.run(
        "analyze_codebase",
        analyze_files,
        config.source_dir,
        config.source_framework,
    )

    # Phase 2: Generate migration plan
    plan = await ctx.run(
        "generate_plan",
        create_migration_plan,
        analysis,
        config.target_framework,
    )

    # Checkpoint: human reviews the plan before execution
    approval = await ctx.wait_for_event("plan_review", timeout_hours=48)
    if approval.status != "approved":
        plan = await ctx.run("revise_plan", revise_plan, plan, approval.feedback)
        approval = await ctx.wait_for_event("plan_review_v2", timeout_hours=48)
        if approval.status != "approved":
            return MigrationResult(status="rejected", reason=approval.feedback)

    # Phase 3: Execute migration in batches
    batch_size = 10
    batches = [plan.files[i:i+batch_size] for i in range(0, len(plan.files), batch_size)]

    for i, batch in enumerate(batches):
        # Migrate batch
        results = await ctx.run(f"migrate_batch_{i}", migrate_files, batch, config)

        # Run tests after each batch
        test_result = await ctx.run(f"test_batch_{i}", run_tests, config.test_command)

        if not test_result.passed:
            # Attempt auto-fix
            fix = await ctx.run(f"fix_batch_{i}", auto_fix_tests, test_result, results)
            retest = await ctx.run(f"retest_batch_{i}", run_tests, config.test_command)

            if not retest.passed:
                # Escalate to human
                await ctx.wait_for_event(
                    f"batch_{i}_fix",
                    timeout_hours=24,
                )

    # Phase 4: Final validation
    final_tests = await ctx.run("final_tests", run_full_test_suite, config)

    # Final human sign-off
    await ctx.wait_for_event("final_review", timeout_hours=72)

    return MigrationResult(
        files_migrated=len(plan.files),
        test_status=final_tests,
    )

This agent might run for days. It suspends three times waiting for human input. If the process crashes at any point — during file migration, during tests, during the wait — it resumes from the exact step where it left off. Completed migrations are not repeated. Tests that already passed are not re-run. The human sees the same coherent workflow regardless of how many times the infrastructure restarted underneath.

Trade-Offs and Design Decisions #

Building durable agents introduces complexity. Not every agent needs full durability. Here is the decision framework:

Use simple in-memory execution when:

  • Tasks complete in under a minute.
  • Failure cost is low (just re-run).
  • No side effects that cannot be repeated.
  • No human-in-the-loop waits.

Use lightweight checkpointing when:

  • Tasks run 1–30 minutes.
  • Failures are moderately expensive.
  • Side effects are idempotent or retryable.
  • State can be serialized simply (JSON blob).

Use full durable execution when:

  • Tasks span hours or days.
  • Human interaction creates unbounded waits.
  • Side effects require exactly-once semantics.
  • Partial progress must be preserved across infrastructure failures.
  • Budget/cost constraints demand no-rework guarantees.

The cost of durability is threefold. First, latency: every checkpoint adds a write to persistent storage. For fast agents doing many small steps, this overhead can dominate execution time. Second, complexity: developers must reason about replay semantics, idempotency, and state serialization — concepts that do not exist in simple request-response agents. Third, storage: long-running agents accumulate large checkpoint histories. You need retention policies to prune old checkpoints without losing the ability to debug past executions.

Conclusion #

Long-running agents demand a fundamentally different architecture from their short-lived counterparts. The core insight is that durability is not an add-on — it is a runtime property that must be designed in from the start. Retrofitting persistence into an agent loop that assumes continuous execution is fragile and error-prone.

The key patterns are: checkpointing after side-effecting steps, durable execution frameworks that make sequential code resumable, explicit state machines for lifecycle management, idempotency keys for safe replay, progressive summarization for managing unbounded context, and budget guards that persist across restarts.

When implemented well, durable agents feel effortless to their users. A task that takes three days completes as naturally as one that takes three seconds — the agent simply picks up where it left off, every time, regardless of what happened underneath.