Cost Optimization
An agent that works perfectly but costs a dollar per task is a prototype. An agent that costs a tenth of a cent per task at the same quality level is a product. The difference is engineering around the model: intercepting redundant work before it reaches the API, compressing what you send, batching what you can defer, and building architectures that are aware of their own budget.
Cost in agent systems is dominated by token consumption. Every reasoning step, every tool call description, every context window stuffed with history — these are tokens flowing through the meter. The levers are straightforward: send fewer tokens, send them to cheaper models, avoid sending them at all when the answer already exists. But applying those levers without degrading quality requires careful design.
Where the Money Goes #
Before optimizing, you need to know what you are optimizing. Agent costs break down into a few dominant categories:
Input tokens. The system prompt, conversation history, tool schemas, retrieved context (RAG chunks), and the user's actual request. For a well-equipped agent with 20 tools and a detailed system prompt, the input can be 3,000-5,000 tokens before the user says a word.
Output tokens. The model's reasoning, tool call arguments, and final response. Output tokens are typically 3-5x more expensive than input tokens per unit.
Iteration multiplier. A ReAct agent making 8 tool calls means 8 round trips to the model. Each round trip carries the full (growing) context. By iteration 8, you are paying for the system prompt, all previous reasoning, all tool results, and the new step — accumulated.
Embedding calls. RAG systems embed queries and sometimes re-embed documents. At scale, embedding costs add up.
Tool execution. External API calls, database queries, and compute for code execution. Not LLM costs, but part of the total.
┌────────────────────────────────────────────────────────────────┐
│ Cost Anatomy of a Typical Agent Task │
│ │
│ Step 1: Initial call │
│ ├─ System prompt: 1,200 tokens │
│ ├─ Tool schemas: 1,800 tokens │
│ ├─ User message: 100 tokens │
│ ├─ Output (reasoning): 300 tokens │
│ └─ Subtotal: 3,100 in + 300 out │
│ │
│ Step 2: After tool result │
│ ├─ Previous context: 3,400 tokens │
│ ├─ Tool result: 500 tokens │
│ ├─ Output (next action): 200 tokens │
│ └─ Subtotal: 3,900 in + 200 out │
│ │
│ Step 3-8: Accumulating context... │
│ └─ By step 8: ~8,000 in + 400 out per call │
│ │
│ Total for task: ~42,000 input tokens + ~2,500 output tokens │
│ │
│ At $3/M input, $15/M output: │
│ Cost = $0.126 + $0.0375 = $0.16 per task │
│ │
│ At 100,000 tasks/day = $16,000/day │
└────────────────────────────────────────────────────────────────┘
The accumulating context is the real killer. Each iteration pays for everything that came before. This is why a 10-step agent task does not cost 10x a single call — it costs closer to 50x due to the growing context window.
Semantic Caching #
The most effective cost optimization is not doing the work at all. Semantic caching stores model responses indexed by the meaning of the request in addition to the exact text. When a new request is semantically similar to one already answered, the cache returns the stored response without calling the model.
Unlike exact-match caching (which only helps with identical inputs), semantic caching handles the natural variation in how users phrase the same question.
import hashlib
import time
from dataclasses import dataclass, field
import numpy as np
@dataclass
class CacheEntry:
"""A cached model response with metadata."""
key_embedding: list[float]
request_text: str
response: str
model_id: str
created_at: float
hit_count: int = 0
ttl_seconds: int = 3600
metadata: dict = field(default_factory=dict)
@property
def is_expired(self) -> bool:
return time.time() - self.created_at > self.ttl_seconds
class SemanticCache:
"""Cache model responses by semantic similarity."""
def __init__(
self,
embedding_model,
similarity_threshold: float = 0.95,
max_entries: int = 100_000,
):
self.embedding_model = embedding_model
self.threshold = similarity_threshold
self.max_entries = max_entries
self.entries: list[CacheEntry] = []
self._embeddings_matrix: np.ndarray | None = None
async def get(self, request: str, model_id: str) -> str | None:
"""Look up a semantically similar cached response."""
if not self.entries:
return None
query_embedding = await self.embedding_model.embed(request)
# Find most similar cached request
similarities = self._compute_similarities(query_embedding)
best_idx = int(np.argmax(similarities))
best_score = similarities[best_idx]
if best_score >= self.threshold:
entry = self.entries[best_idx]
if not entry.is_expired and entry.model_id == model_id:
entry.hit_count += 1
return entry.response
return None
async def put(
self, request: str, response: str, model_id: str, ttl: int = 3600
):
"""Store a response in the cache."""
embedding = await self.embedding_model.embed(request)
entry = CacheEntry(
key_embedding=embedding,
request_text=request,
response=response,
model_id=model_id,
created_at=time.time(),
ttl_seconds=ttl,
)
self.entries.append(entry)
self._rebuild_matrix()
# Evict if over capacity
if len(self.entries) > self.max_entries:
self._evict()
def _compute_similarities(self, query: list[float]) -> np.ndarray:
"""Cosine similarity against all cached embeddings."""
query_vec = np.array(query)
return np.dot(self._embeddings_matrix, query_vec) / (
np.linalg.norm(self._embeddings_matrix, axis=1)
* np.linalg.norm(query_vec)
)
def _evict(self):
"""Remove expired entries, then least-used entries."""
# Remove expired first
self.entries = [e for e in self.entries if not e.is_expired]
# If still over capacity, remove least-hit entries
if len(self.entries) > self.max_entries:
self.entries.sort(key=lambda e: e.hit_count, reverse=True)
self.entries = self.entries[: self.max_entries]
self._rebuild_matrix()
def _rebuild_matrix(self):
"""Rebuild the embedding matrix for fast similarity search."""
if self.entries:
self._embeddings_matrix = np.array(
[e.key_embedding for e in self.entries]
)
else:
self._embeddings_matrix = None
When Semantic Caching Works #
Semantic caching is most effective when:
- Many users ask similar questions (customer support, FAQ-style queries).
- The agent performs repeatable tasks with stable inputs (daily report generation, standard analyses).
- Tool results are deterministic for the same inputs (database lookups, API calls with stable data).
It is dangerous when:
- Answers depend on real-time data (stock prices, live system state).
- Subtle wording differences change the correct answer ("cancel my last order" vs. "cancel my first order").
- The agent needs to maintain conversational context that differs between users.
The similarity threshold is the critical tuning parameter. Too high (0.99) and the cache rarely hits. Too low (0.85) and you return wrong answers for superficially similar but semantically different requests. Start at 0.95 and adjust based on your false-positive rate.
Caching at Multiple Levels #
A sophisticated agent system caches at several layers:
| Cache Level | What is Cached | Hit Rate | Risk |
|---|---|---|---|
| Full response | Complete agent output for a task | Low-medium | Stale answers |
| Tool results | Individual tool call responses | High | Stale data |
| Intermediate reasoning | Partial chains for sub-problems | Medium | Context mismatch |
| Embeddings | Vector representations of text | Very high | Minimal |
| Prompt prefix | KV cache for static system prompts | Very high | None |
Tool-result caching is particularly effective. If your agent calls a get_user_profile tool and the profile has not changed since the last call, there is no reason to hit the database again. A TTL-based cache on tool results can eliminate 30-50% of external calls in many workloads.
Prompt Compression #
Every token in the prompt costs money. Prompt compression reduces token count while preserving the information the model needs to do its job.
Static Compression - Trimming the System Prompt #
The system prompt is sent on every single call. A 200-token reduction here saves 200 tokens × every iteration × every task × every day. Small absolute savings compound enormously.
Techniques:
Remove redundant instructions. If the model already does something correctly without being told (because it was fine-tuned for it), delete that instruction.
Abbreviate tool schemas. Instead of verbose descriptions, use terse parameter names and minimal descriptions. A well-named tool (search_documents) needs less explanation than a generic one (run_query).
Conditional inclusion. Only include tool schemas for tools relevant to the current task. If the user is asking a factual question, do not inject the 15 tool schemas for code execution, file management, and database access.
class DynamicPromptAssembler:
"""Assemble prompts with only the components needed for this task."""
def __init__(self, base_prompt: str, tool_registry: dict):
self.base_prompt = base_prompt
self.tool_registry = tool_registry
def assemble(self, task: str, task_category: str) -> str:
"""Build a minimal prompt for the given task."""
parts = [self.base_prompt]
# Only include relevant tool schemas
relevant_tools = self._select_tools(task_category)
if relevant_tools:
schemas = self._format_schemas(relevant_tools)
parts.append(schemas)
return "\n\n".join(parts)
def _select_tools(self, category: str) -> list[str]:
"""Select tools relevant to the task category."""
category_tools = {
"research": ["web_search", "read_document", "summarize"],
"coding": ["read_file", "write_file", "run_code", "search_code"],
"data": ["query_db", "visualize", "export_csv"],
"general": ["web_search", "calculator"],
}
return category_tools.get(category, list(self.tool_registry.keys()))
def _format_schemas(self, tool_names: list[str]) -> str:
"""Format only selected tool schemas, minimizing token count."""
schemas = []
for name in tool_names:
tool = self.tool_registry[name]
# Compact format — no verbose descriptions
schemas.append(
f"{name}({', '.join(tool.parameters.keys())}): "
f"{tool.short_description}"
)
return "Tools:\n" + "\n".join(schemas)
Dynamic Compression - Shrinking Context Between Iterations #
As the agent loop iterates, the conversation history grows. By iteration 8, you are carrying reasoning from steps 1-7 that may no longer be relevant. Dynamic compression summarizes or prunes old context to keep the window manageable.
class ContextCompressor:
"""Compress conversation history to reduce token usage."""
def __init__(self, model, max_context_tokens: int = 4000):
self.model = model
self.max_tokens = max_context_tokens
async def compress(self, messages: list[dict]) -> list[dict]:
"""Compress message history while preserving essential information."""
current_tokens = self._count_tokens(messages)
if current_tokens <= self.max_tokens:
return messages # No compression needed
# Strategy 1: Summarize old tool results
messages = self._truncate_tool_results(messages)
if self._count_tokens(messages) <= self.max_tokens:
return messages
# Strategy 2: Summarize early reasoning steps
messages = await self._summarize_early_steps(messages)
if self._count_tokens(messages) <= self.max_tokens:
return messages
# Strategy 3: Drop intermediate steps, keep first and recent
messages = self._keep_bookends(messages)
return messages
def _truncate_tool_results(self, messages: list[dict]) -> list[dict]:
"""Truncate verbose tool results, keeping only key findings."""
compressed = []
for msg in messages:
if msg["role"] == "tool" and len(msg["content"]) > 500:
# Keep first 200 chars + last 100 chars
content = msg["content"]
truncated = (
content[:200] + "\n...[truncated]...\n" + content[-100:]
)
compressed.append({**msg, "content": truncated})
else:
compressed.append(msg)
return compressed
async def _summarize_early_steps(
self, messages: list[dict]
) -> list[dict]:
"""Replace early reasoning steps with a summary."""
# Keep system prompt and last 3 exchanges intact
system = messages[0]
recent = messages[-6:] # Last 3 turns (assistant + tool/user)
middle = messages[1:-6]
if not middle:
return messages
# Summarize the middle section
middle_text = "\n".join(
f"{m['role']}: {m['content'][:200]}" for m in middle
)
summary = await self.model.generate(
f"Summarize these agent steps concisely, preserving key "
f"findings and decisions:\n{middle_text}"
)
summary_msg = {
"role": "system",
"content": f"[Previous steps summary: {summary}]",
}
return [system, summary_msg] + recent
def _keep_bookends(self, messages: list[dict]) -> list[dict]:
"""Keep first exchange and last few exchanges, drop middle."""
system = messages[0]
first_exchange = messages[1:3] # First user + assistant
recent = messages[-6:]
return [system] + first_exchange + recent
The trade-off is clear: compression loses information. Aggressive compression saves tokens but risks the agent "forgetting" something important from earlier steps. Conservative compression preserves fidelity but saves less. A practical approach is to compress only tool results and early reasoning — the model rarely needs the full text of a tool result from 5 steps ago, but it does need to remember what it concluded from that result.
Response Deduplication #
Agents often re-derive the same intermediate result. In a multi-agent system, different agents may independently call the same tool with the same arguments, or compute the same sub-answer. Deduplication catches these redundancies.
import asyncio
import hashlib
import json
import time
from collections.abc import Awaitable, Callable
class RequestDeduplicator:
"""Prevent redundant in-flight and recently-completed requests."""
def __init__(self, ttl_seconds: int = 60):
self.in_flight: dict[str, asyncio.Future] = {}
self.recent_results: dict[str, tuple[str, float]] = {}
self.ttl = ttl_seconds
def _make_key(self, model_id: str, messages: list[dict]) -> str:
"""Create a deduplication key from the request."""
# Hash the meaningful parts of the request
content = json.dumps(messages, sort_keys=True)
return hashlib.sha256(
f"{model_id}:{content}".encode()
).hexdigest()
async def deduplicated_call(
self,
model_id: str,
messages: list[dict],
call_fn: Callable[[str, list[dict]], Awaitable[str]],
) -> str:
"""Execute a model call, deduplicating concurrent identical requests."""
key = self._make_key(model_id, messages)
# Check recent results first
if key in self.recent_results:
result, timestamp = self.recent_results[key]
if time.time() - timestamp < self.ttl:
return result
# Check if an identical request is already in flight
if key in self.in_flight:
return await self.in_flight[key]
# Execute the call, sharing the result with any concurrent duplicates
future = asyncio.get_event_loop().create_future()
self.in_flight[key] = future
try:
result = await call_fn(model_id, messages)
future.set_result(result)
self.recent_results[key] = (result, time.time())
return result
except Exception as e:
future.set_exception(e)
raise
finally:
del self.in_flight[key]
Deduplication is especially valuable in parallel execution patterns. When a coordinator agent spawns five sub-agents that all need the same background context computed, deduplication ensures that computation happens once and the result is shared.
Batching #
Model API calls have per-request overhead — network latency, queue scheduling, and cold-start penalties. Batching groups multiple independent requests into a single API call (where the provider supports it) or a tight burst, amortizing that overhead.
Two batching strategies:
Micro-Batching - Collecting Requests Over a Short Window #
When multiple agent tasks arrive concurrently, their initial model calls are independent. Instead of sending them one by one, collect requests over a short window (10-50ms) and send them as a batch.
import asyncio
from dataclasses import dataclass
@dataclass
class PendingRequest:
messages: list[dict]
model_id: str
future: asyncio.Future
class MicroBatcher:
"""Batch concurrent model requests for throughput and cost savings."""
def __init__(
self,
model_client,
max_batch_size: int = 8,
window_ms: int = 20,
):
self.client = model_client
self.max_batch_size = max_batch_size
self.window_ms = window_ms
self.pending: list[PendingRequest] = []
self._flush_task: asyncio.Task | None = None
async def call(self, model_id: str, messages: list[dict]) -> str:
"""Submit a request for batched execution."""
future = asyncio.get_event_loop().create_future()
request = PendingRequest(
messages=messages, model_id=model_id, future=future
)
self.pending.append(request)
# Start the collection window if not already running
if self._flush_task is None:
self._flush_task = asyncio.create_task(self._flush_after_window())
# Flush immediately if batch is full
if len(self.pending) >= self.max_batch_size:
await self._flush()
return await future
async def _flush_after_window(self):
"""Wait for the collection window, then flush."""
await asyncio.sleep(self.window_ms / 1000)
await self._flush()
async def _flush(self):
"""Send all pending requests as a batch."""
if not self.pending:
return
batch = self.pending[:]
self.pending = []
self._flush_task = None
# Group by model for efficient batching
by_model: dict[str, list[PendingRequest]] = {}
for req in batch:
by_model.setdefault(req.model_id, []).append(req)
for model_id, requests in by_model.items():
try:
results = await self.client.batch_generate(
model_id=model_id,
messages_batch=[r.messages for r in requests],
)
for req, result in zip(requests, results):
req.future.set_result(result)
except Exception as e:
for req in requests:
req.future.set_exception(e)
Offline Batching - Deferred Processing for Non-Urgent Work #
Many model providers offer batch APIs at 50% discount — you submit a batch of requests and get results hours later. For any agent work that is not time-sensitive (nightly report generation, bulk document processing, periodic evaluations), offline batching cuts cost in half with no quality trade-off.
from collections.abc import Callable
class OfflineBatchQueue:
"""Queue non-urgent agent tasks for discounted batch processing."""
def __init__(self, batch_client, max_batch_size: int = 1000):
self.client = batch_client
self.max_batch_size = max_batch_size
self.queue: list[dict] = []
self.callbacks: dict[str, Callable[[str], None]] = {}
def enqueue(
self,
task_id: str,
messages: list[dict],
callback: Callable[[str], None],
):
"""Add a task to the batch queue."""
self.queue.append({
"custom_id": task_id,
"messages": messages,
})
self.callbacks[task_id] = callback
if len(self.queue) >= self.max_batch_size:
self._submit_batch()
def _submit_batch(self):
"""Submit the current queue as a batch job."""
batch = self.queue[:]
self.queue = []
# Submit to provider's batch API
job_id = self.client.create_batch(requests=batch)
# Register a webhook or poll for completion
self.client.on_complete(job_id, self._handle_results)
def _handle_results(self, results: list[dict]):
"""Distribute batch results to waiting callbacks."""
for result in results:
task_id = result["custom_id"]
if task_id in self.callbacks:
self.callbacks[task_id](result["response"])
del self.callbacks[task_id]
Budget-Aware Architectures #
The techniques above are optimizations applied to individual calls. Budget-aware architecture is a system-level concern: the agent knows what it is spending and adjusts its behavior accordingly.
Per-Task Budget Enforcement #
Each agent task gets a token or cost budget. The orchestrator tracks spending across iterations and takes action when the budget is running low.
from dataclasses import dataclass
@dataclass
class TaskBudget:
"""Token and cost budget for a single agent task."""
max_input_tokens: int = 50_000
max_output_tokens: int = 10_000
max_cost_dollars: float = 0.50
max_iterations: int = 15
# Tracking
input_tokens_used: int = 0
output_tokens_used: int = 0
cost_used: float = 0.0
iterations_used: int = 0
def record_usage(
self, input_tokens: int, output_tokens: int, cost: float
):
self.input_tokens_used += input_tokens
self.output_tokens_used += output_tokens
self.cost_used += cost
self.iterations_used += 1
@property
def remaining_fraction(self) -> float:
"""What fraction of the budget remains (0 to 1)."""
token_frac = 1 - (
self.input_tokens_used / self.max_input_tokens
)
cost_frac = 1 - (self.cost_used / self.max_cost_dollars)
iter_frac = 1 - (self.iterations_used / self.max_iterations)
return min(token_frac, cost_frac, iter_frac)
@property
def is_exhausted(self) -> bool:
return self.remaining_fraction <= 0
@property
def is_low(self) -> bool:
return self.remaining_fraction < 0.2
class BudgetAwareAgent:
"""An agent that adapts behavior based on remaining budget."""
def __init__(self, model_router, tools, budget: TaskBudget):
self.router = model_router
self.tools = tools
self.budget = budget
async def step(self, state: dict) -> dict:
"""Execute one agent step with budget awareness."""
if self.budget.is_exhausted:
return self._force_final_answer(state)
# Adapt model selection based on remaining budget
model_id = self._select_model_for_budget()
# Adapt context size based on remaining budget
messages = self._trim_context_for_budget(state["messages"])
response = await self.router.call(model_id, messages)
# Record usage
self.budget.record_usage(
input_tokens=response.usage.input_tokens,
output_tokens=response.usage.output_tokens,
cost=response.usage.cost,
)
return response
def _select_model_for_budget(self) -> str:
"""Pick a model based on remaining budget."""
if self.budget.remaining_fraction > 0.5:
return "large-model" # Plenty of budget — use the best
elif self.budget.remaining_fraction > 0.2:
return "medium-model" # Getting tight — use mid-tier
else:
return "small-model" # Almost out — use cheapest
def _trim_context_for_budget(self, messages: list[dict]) -> list[dict]:
"""Aggressively compress context when budget is low."""
if self.budget.is_low:
# Keep only system prompt and last 2 exchanges
return [messages[0]] + messages[-4:]
return messages
def _force_final_answer(self, state: dict) -> dict:
"""Budget exhausted — produce best answer with what we have."""
return {
"type": "final_answer",
"content": self._synthesize_from_current_state(state),
"metadata": {"budget_exhausted": True},
}
Cascading Model Tiers #
A budget-aware architecture routes requests through a cascade of models, starting with the cheapest and escalating only when the cheap model cannot handle the task confidently.
┌──────────────────────────────────────────────────────────────┐
│ Model Cascade Architecture │
│ │
│ Request ──► Small Model (fast, cheap) │
│ │ │
│ ├── Confident? ──► Return response │
│ │ │
│ └── Uncertain? ──► Medium Model │
│ │ │
│ ├── Confident? ──► │
│ │ Return response │
│ │ │
│ └── Uncertain? ──► │
│ Large Model │
│ │ │
│ └── Return │
│ response │
│ │
│ Cost: 80% of requests resolved at tier 1 ($0.001/call) │
│ 15% escalated to tier 2 ($0.01/call) │
│ 5% escalated to tier 3 ($0.05/call) │
│ │
│ Blended cost: ~$0.004/call (vs. $0.05 for always-large) │
└──────────────────────────────────────────────────────────────┘
class ModelCascade:
"""Route requests through a cascade of increasingly capable models."""
def __init__(self, tiers: list[dict]):
"""
tiers: [
{"model_id": "small", "client": ..., "confidence_threshold": 0.8},
{"model_id": "medium", "client": ..., "confidence_threshold": 0.7},
{"model_id": "large", "client": ..., "confidence_threshold": 0.0},
]
"""
self.tiers = tiers
async def call(self, messages: list[dict]) -> dict:
"""Try each tier in order, escalating on low confidence."""
for tier in self.tiers:
response = await tier["client"].generate(
model=tier["model_id"],
messages=messages,
return_confidence=True,
)
if response.confidence >= tier["confidence_threshold"]:
return {
"content": response.content,
"model_used": tier["model_id"],
"confidence": response.confidence,
"escalated": tier != self.tiers[0],
}
# Final tier always returns regardless of confidence
return {
"content": response.content,
"model_used": self.tiers[-1]["model_id"],
"confidence": response.confidence,
"escalated": True,
}
The confidence signal can come from the model itself (logprobs on the output), a separate classifier, or heuristics (short responses for complex questions often indicate uncertainty).
Prompt Prefix Caching #
Most API providers now support prompt prefix caching — if consecutive requests share the same prefix (system prompt, tool schemas, static context), the provider caches the key-value computations for that prefix and charges reduced rates for cached tokens. This is free money for agent systems.
The requirement is simple: structure your prompts so the static parts come first and the dynamic parts come last.
class PrefixOptimizedPrompt:
"""Structure prompts to maximize prefix cache hits."""
def __init__(self, system_prompt: str, tool_schemas: str):
# Static prefix — cached across all calls
self.prefix = f"{system_prompt}\n\n{tool_schemas}"
def build_messages(
self, conversation: list[dict], task_context: str = ""
) -> list[dict]:
"""Build messages with cacheable prefix ordering."""
messages = [
# Static system prompt — always the same, always first
{"role": "system", "content": self.prefix},
]
# Dynamic context goes in a separate system message or user message
if task_context:
messages.append({
"role": "system",
"content": task_context,
})
# Conversation history
messages.extend(conversation)
return messages
For an agent making 8 calls per task with a 3,000-token system prompt, prefix caching means the system prompt is processed (and paid for at full price) only on the first call. The remaining 7 calls pay the reduced cached-token rate — typically 75-90% cheaper for input tokens. On a 3,000-token prefix over 8 iterations, that saves ~18,000 tokens worth of full-price computation per task.
The key constraint: the prefix must be identical across requests. If you inject dynamic content (like a timestamp or user ID) into the system prompt, you break the cache. Move all dynamic content after the static prefix.
Putting It All Together #
A production agent system layers these techniques:
┌─────────────────────────────────────────────────────────┐
│ Cost-Optimized Agent Architecture │
│ │
│ Incoming task │
│ │ │
│ ▼ │
│ ┌──────────────┐ │
│ │ Semantic │── Hit ──► Return cached response │
│ │ cache │ │
│ └──────┬───────┘ │
│ │ Miss │
│ ▼ │
│ ┌──────────────┐ │
│ │ Task │── Route to category │
│ │ classifier │ │
│ └──────┬───────┘ │
│ │ │
│ ▼ │
│ ┌──────────────┐ │
│ │ Prompt │── Minimal tools + compressed context │
│ │ assembler │ │
│ └──────┬───────┘ │
│ │ │
│ ▼ │
│ ┌──────────────┐ │
│ │ Model │── Cheapest model first │
│ │ cascade │ │
│ └──────┬───────┘ │
│ │ │
│ ▼ │
│ ┌──────────────┐ │
│ │ Dedup + │── Batch concurrent calls │
│ │ batcher │ │
│ └──────┬───────┘ │
│ │ │
│ ▼ │
│ ┌──────────────┐ │
│ │ Budget │── Track spending, adapt behavior │
│ │ enforcer │ │
│ └──────┬───────┘ │
│ │ │
│ ▼ │
│ ┌──────────────┐ │
│ │ Context │── Compress history between iterations │
│ │ compressor │ │
│ └──────────────┘ │
│ │
└─────────────────────────────────────────────────────────┘
The compound effect is dramatic. Consider a workload of 100,000 agent tasks per day:
| Optimization | Savings | How |
|---|---|---|
| Semantic cache (30% hit rate) | 30% | Avoid model calls entirely |
| Prefix caching | 15-20% | Reduced input token cost |
| Conditional tool schemas | 10-15% | Fewer tokens per call |
| Context compression | 15-20% | Shorter context in later iterations |
| Model cascade | 40-60% | Cheaper model handles most tasks |
| Offline batching (non-urgent) | 50% on subset | Discounted batch API |
| Deduplication | 5-10% | Eliminate redundant work |
These savings multiply. A model cascade (60% saving) combined with semantic caching (30% of remaining) combined with prefix caching (15% of remaining) yields total savings of 75-85% compared to the naive approach of always using the largest model with full context.
Conclusion #
Cost optimization for agents is a layered architecture where each layer removes waste at a different level. Semantic caching eliminates redundant work entirely. Prompt compression reduces what you send. Model cascades route simple tasks to cheap models. Batching amortizes overhead. Budget enforcement prevents runaway spending. Context compression stops the accumulating-history problem from making later iterations exponentially expensive.
The foundational principle is that tokens are the unit of cost, and the agent loop is a token multiplier. Every iteration carries the weight of all previous iterations. Optimizations that reduce per-iteration tokens compound across the entire task. A system that saves 40% per call saves far more than 40% per task because it also reduces the context that future calls must carry.
Start with measurement — instrument your agent to report tokens consumed per task, cache hit rates, and cost per iteration. Once you see where the tokens go, the highest-leverage optimizations become obvious. In most systems, prefix caching and conditional tool inclusion are free wins that require only prompt restructuring. Semantic caching and model cascades require more infrastructure but deliver the largest absolute savings.