·

Budget controls are reactive without visibility. Cost monitoring closes the loop: it tells you what is actually happening with your token spend in real time, surfaces anomalies before they become invoices, and provides the data needed to make informed optimization decisions. In agentic systems — where costs compound across loops, tools, and users — the difference between monitored and unmonitored spend can be one order of magnitude.

This topic covers how to build a comprehensive cost monitoring and alerting system for agentic workflows, from instrumentation through dashboarding and incident response.


The Monitoring Stack for Token Costs

Effective cost monitoring requires four layers:

  1. Instrumentation — Capturing token usage at every API call
  2. Aggregation — Rolling up individual calls into meaningful metrics (by task, session, project, model, time period)
  3. Storage — Persisting metrics in a queryable time-series or relational store
  4. Alerting — Notifying the right people when thresholds are crossed

Each layer has specific tooling requirements. Let's build them from the ground up.

Tip: Don't wait until you have a "perfect" monitoring system to start tracking costs. A Google Sheet updated by a daily cron job beats no visibility at all. Start simple, iterate toward sophistication as your agentic workloads mature.


Layer 1: Instrumentation — Capturing Every API Call

The first requirement is that every API call to any LLM provider records its token usage, model, task type, and cost. The cleanest way to do this is a wrapper or middleware layer.

Universal API Wrapper

import time
import uuid
from dataclasses import dataclass, field, asdict
from typing import Optional, Any
from datetime import datetime, timezone
import anthropic
import openai

@dataclass
class TokenUsageEvent:
    event_id: str = field(default_factory=lambda: str(uuid.uuid4()))
    timestamp: str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat())

    # Context
    project_id: str = ""
    session_id: str = ""
    task_type: str = ""
    user_id: str = ""

    # Model info
    provider: str = ""
    model: str = ""

    # Token counts
    input_tokens: int = 0
    output_tokens: int = 0
    cache_read_tokens: int = 0    # For providers with caching
    cache_write_tokens: int = 0

    # Cost (USD)
    input_cost: float = 0.0
    output_cost: float = 0.0
    cache_read_cost: float = 0.0
    cache_write_cost: float = 0.0
    total_cost: float = 0.0

    # Performance
    latency_ms: int = 0

    # Agent context
    loop_step: Optional[int] = None
    tool_name: Optional[str] = None
    finish_reason: str = ""


PRICING = {
    "anthropic": {
        "claude-3-5-sonnet-20241022": {
            "input": 3.00, "output": 15.00,
            "cache_write": 3.75, "cache_read": 0.30
        },
        "claude-3-5-haiku-20241022": {
            "input": 0.80, "output": 4.00,
            "cache_write": 1.00, "cache_read": 0.08
        },
        "claude-3-haiku-20240307": {
            "input": 0.25, "output": 1.25,
            "cache_write": 0.30, "cache_read": 0.03
        },
    },
    "openai": {
        "gpt-4o": {"input": 2.50, "output": 10.00},
        "gpt-4o-mini": {"input": 0.15, "output": 0.60},
        "gpt-3.5-turbo": {"input": 0.50, "output": 1.50},
        "o1": {"input": 15.00, "output": 60.00},
        "o3-mini": {"input": 1.10, "output": 4.40},
    },
    "google": {
        "gemini-1.5-pro": {"input": 3.50, "output": 10.50},
        "gemini-1.5-flash": {"input": 0.075, "output": 0.30},
        "gemini-2.0-flash": {"input": 0.10, "output": 0.40},
    }
}

def calculate_cost(
    provider: str, 
    model: str, 
    input_tokens: int, 
    output_tokens: int,
    cache_read_tokens: int = 0,
    cache_write_tokens: int = 0
) -> dict:
    pricing = PRICING.get(provider, {}).get(model, {"input": 0.003, "output": 0.015})

    input_cost = (input_tokens / 1_000_000) * pricing.get("input", 0)
    output_cost = (output_tokens / 1_000_000) * pricing.get("output", 0)
    cache_read_cost = (cache_read_tokens / 1_000_000) * pricing.get("cache_read", 0)
    cache_write_cost = (cache_write_tokens / 1_000_000) * pricing.get("cache_write", 0)

    return {
        "input_cost": input_cost,
        "output_cost": output_cost,
        "cache_read_cost": cache_read_cost,
        "cache_write_cost": cache_write_cost,
        "total_cost": input_cost + output_cost + cache_read_cost + cache_write_cost
    }


class InstrumentedAnthropicClient:
    """Wrapper around Anthropic client that emits usage events."""

    def __init__(
        self, 
        project_id: str,
        event_sink,  # Any object with a .record(event) method
        **anthropic_kwargs
    ):
        self.client = anthropic.Anthropic(**anthropic_kwargs)
        self.project_id = project_id
        self.event_sink = event_sink

    def create_message(
        self,
        model: str,
        messages: list,
        max_tokens: int,
        session_id: str = "",
        task_type: str = "",
        user_id: str = "",
        loop_step: Optional[int] = None,
        tool_name: Optional[str] = None,
        **kwargs
    ):
        start_time = time.time()

        response = self.client.messages.create(
            model=model,
            messages=messages,
            max_tokens=max_tokens,
            **kwargs
        )

        latency_ms = int((time.time() - start_time) * 1000)

        # Extract usage including cache tokens if present
        usage = response.usage
        cache_read = getattr(usage, 'cache_read_input_tokens', 0) or 0
        cache_write = getattr(usage, 'cache_creation_input_tokens', 0) or 0

        costs = calculate_cost(
            provider="anthropic",
            model=model,
            input_tokens=usage.input_tokens,
            output_tokens=usage.output_tokens,
            cache_read_tokens=cache_read,
            cache_write_tokens=cache_write
        )

        event = TokenUsageEvent(
            project_id=self.project_id,
            session_id=session_id,
            task_type=task_type,
            user_id=user_id,
            provider="anthropic",
            model=model,
            input_tokens=usage.input_tokens,
            output_tokens=usage.output_tokens,
            cache_read_tokens=cache_read,
            cache_write_tokens=cache_write,
            latency_ms=latency_ms,
            loop_step=loop_step,
            tool_name=tool_name,
            finish_reason=response.stop_reason or "",
            **costs
        )

        self.event_sink.record(event)
        return response

Tip: Instrument at the client wrapper level rather than in individual task functions. Wrapper-level instrumentation is guaranteed to capture every call regardless of where in your codebase it happens. Task-level logging misses calls in dependencies, libraries, and future code written by other team members.


Layer 2: Aggregation — Meaningful Metrics from Raw Events

Raw events are necessary but not sufficient. You need aggregations that answer business questions:

  • What did we spend on the CI/CD agent today?
  • Which task type is most expensive?
  • Which users are consuming the most tokens?
  • Is today's spend trending above or below budget?
  • What percentage of tokens came from cache hits?

Aggregation with PostgreSQL

-- Schema for token usage events
CREATE TABLE token_usage_events (
    event_id UUID PRIMARY KEY,
    timestamp TIMESTAMPTZ NOT NULL,
    project_id VARCHAR(64) NOT NULL,
    session_id VARCHAR(64),
    task_type VARCHAR(64),
    user_id VARCHAR(64),
    provider VARCHAR(32) NOT NULL,
    model VARCHAR(64) NOT NULL,
    input_tokens INTEGER NOT NULL DEFAULT 0,
    output_tokens INTEGER NOT NULL DEFAULT 0,
    cache_read_tokens INTEGER NOT NULL DEFAULT 0,
    cache_write_tokens INTEGER NOT NULL DEFAULT 0,
    total_cost DECIMAL(10, 8) NOT NULL DEFAULT 0,
    latency_ms INTEGER,
    loop_step INTEGER,
    tool_name VARCHAR(64),
    finish_reason VARCHAR(32)
);

CREATE INDEX idx_token_events_project_time 
    ON token_usage_events (project_id, timestamp DESC);
CREATE INDEX idx_token_events_task_type 
    ON token_usage_events (task_type, timestamp DESC);

-- Daily spend by project
CREATE VIEW daily_project_spend AS
SELECT 
    project_id,
    DATE(timestamp) as date,
    SUM(input_tokens + output_tokens) as total_tokens,
    SUM(total_cost) as total_cost_usd,
    COUNT(*) as request_count,
    AVG(latency_ms) as avg_latency_ms
FROM token_usage_events
GROUP BY project_id, DATE(timestamp)
ORDER BY date DESC, total_cost_usd DESC;

-- Top task types by cost
CREATE VIEW task_type_cost_breakdown AS
SELECT
    project_id,
    task_type,
    DATE_TRUNC('month', timestamp) as month,
    SUM(total_cost) as total_cost_usd,
    SUM(input_tokens) as total_input_tokens,
    SUM(output_tokens) as total_output_tokens,
    AVG(total_cost) as avg_cost_per_request,
    COUNT(*) as request_count
FROM token_usage_events
GROUP BY project_id, task_type, DATE_TRUNC('month', timestamp)
ORDER BY total_cost_usd DESC;

-- Cache efficiency
CREATE VIEW cache_efficiency AS
SELECT
    project_id,
    DATE(timestamp) as date,
    SUM(cache_read_tokens) as cache_hits_tokens,
    SUM(cache_write_tokens) as cache_write_tokens,
    SUM(input_tokens) as total_input_tokens,
    ROUND(
        100.0 * SUM(cache_read_tokens) / NULLIF(SUM(input_tokens), 0), 
        2
    ) as cache_hit_rate_pct
FROM token_usage_events
GROUP BY project_id, DATE(timestamp);

Tip: Create a "cost anomaly" view that flags days where any project's spend exceeded 2x its 7-day rolling average. This catches runaway loops, misconfigured deployments, and test scripts accidentally running against production — the most common sources of unexpected spikes.


Layer 3: Real-Time Alerting

Alerts transform monitoring from a historical record into an operational tool. Effective alerting requires defining threshold types and routing them to the right channels.

Alert Threshold Types

Alert Type Trigger Condition Severity Recipient
Daily budget 50% Project hits 50% of daily budget by noon Info Team lead
Daily budget 80% Project hits 80% of daily budget Warning Team lead + engineer
Daily budget 100% Project hits 100% of daily budget Critical Team lead + engineer + PagerDuty
Monthly budget 80% Project hits 80% of monthly budget Warning Engineering manager
Cost spike Single request >10x project average Warning Engineer
Runaway loop >30 steps in single task Critical Engineer + PagerDuty
Error rate spike >20% requests ending in non-success Warning Engineer
Latency spike P99 latency >30s Warning Engineer

Alert Implementation with Slack and PagerDuty

import httpx
import asyncio
from enum import Enum
from typing import Optional

class AlertSeverity(Enum):
    INFO = "info"
    WARNING = "warning"
    CRITICAL = "critical"

class AlertManager:
    def __init__(
        self,
        slack_webhook_url: str,
        pagerduty_routing_key: Optional[str] = None
    ):
        self.slack_webhook = slack_webhook_url
        self.pagerduty_key = pagerduty_routing_key
        self._alert_cooldowns = {}  # Prevent alert storms

    async def send_alert(
        self,
        title: str,
        message: str,
        severity: AlertSeverity,
        context: dict,
        cooldown_key: Optional[str] = None,
        cooldown_minutes: int = 30
    ) -> None:
        # Check cooldown to prevent alert floods
        if cooldown_key:
            last_alert = self._alert_cooldowns.get(cooldown_key, 0)
            if time.time() - last_alert < cooldown_minutes * 60:
                return
            self._alert_cooldowns[cooldown_key] = time.time()

        # Always send to Slack
        await self._send_slack(title, message, severity, context)

        # Send to PagerDuty for critical alerts
        if severity == AlertSeverity.CRITICAL and self.pagerduty_key:
            await self._send_pagerduty(title, message, context)

    async def _send_slack(
        self, 
        title: str, 
        message: str, 
        severity: AlertSeverity,
        context: dict
    ) -> None:
        color_map = {
            AlertSeverity.INFO: "#36a64f",
            AlertSeverity.WARNING: "#ff9900",
            AlertSeverity.CRITICAL: "#ff0000"
        }

        severity_emoji = {
            AlertSeverity.INFO: "ℹ️",
            AlertSeverity.WARNING: "⚠️",
            AlertSeverity.CRITICAL: "🚨"
        }

        payload = {
            "attachments": [{
                "color": color_map[severity],
                "title": f"{severity_emoji[severity]} {title}",
                "text": message,
                "fields": [
                    {"title": k, "value": str(v), "short": True}
                    for k, v in context.items()
                ],
                "footer": "Token Cost Monitor",
                "ts": int(time.time())
            }]
        }

        async with httpx.AsyncClient() as client:
            await client.post(self.slack_webhook, json=payload)

    async def _send_pagerduty(
        self, 
        title: str, 
        message: str, 
        context: dict
    ) -> None:
        payload = {
            "routing_key": self.pagerduty_key,
            "event_action": "trigger",
            "payload": {
                "summary": title,
                "severity": "critical",
                "source": "token-cost-monitor",
                "custom_details": context
            }
        }

        async with httpx.AsyncClient() as client:
            await client.post(
                "https://events.pagerduty.com/v2/enqueue",
                json=payload
            )


class CostAlertTriggers:
    def __init__(self, alert_manager: AlertManager, enforcer):
        self.alerts = alert_manager
        self.enforcer = enforcer

    async def check_budget_thresholds(self, project_id: str) -> None:
        status = self.enforcer.get_project_status(project_id)

        daily_pct = status["daily_percent"]
        monthly_pct = status["monthly_percent"]

        if daily_pct >= 100:
            await self.alerts.send_alert(
                title=f"BUDGET EXCEEDED: {project_id}",
                message=f"Daily budget 100% consumed. Requests are being blocked.",
                severity=AlertSeverity.CRITICAL,
                context={
                    "project": project_id,
                    "daily_spend": f"${status['daily_spend']:.4f}",
                    "daily_limit": f"${status['daily_limit']:.2f}"
                },
                cooldown_key=f"{project_id}:daily:100",
                cooldown_minutes=60
            )
        elif daily_pct >= 80:
            await self.alerts.send_alert(
                title=f"Budget Warning: {project_id}",
                message=f"Daily budget is {daily_pct:.1f}% consumed.",
                severity=AlertSeverity.WARNING,
                context={
                    "project": project_id,
                    "remaining": f"${status['daily_limit'] - status['daily_spend']:.4f}"
                },
                cooldown_key=f"{project_id}:daily:80",
                cooldown_minutes=120
            )

Tip: Set up a weekly "cost digest" that goes to your entire engineering team — not just leads — showing total spend, top cost drivers, and any threshold breaches from the past week. Broad visibility normalizes cost awareness as a team responsibility, not just an infrastructure concern.


Provider-Native Monitoring Tools

Each provider offers native monitoring capabilities that supplement custom instrumentation:

OpenAI Usage Dashboard

  • Access via platform.openai.com/usage
  • Shows daily token usage and cost by model and API key
  • Supports setting "usage limits" — soft and hard limits per month
  • Exportable via CSV; no API for programmatic queries (use your own instrumentation)

Setting limits on OpenAI:

Platform → Settings → Limits → Usage limits
  Monthly soft limit: $X (receive email warning)
  Monthly hard limit: $Y (API returns 429 after this amount)

Anthropic Console

  • console.anthropic.com → Usage
  • Shows token usage by model and workspace
  • API key management with per-key tracking
  • Organization-level spend visible to admins

Google Cloud Billing + Vertex AI

gcloud billing budgets create \
  --billing-account=YOUR_BILLING_ACCOUNT_ID \
  --display-name="Gemini Monthly Budget" \
  --budget-amount=500USD \
  --threshold-rule=percent=0.5,basis=CURRENT_SPEND \
  --threshold-rule=percent=0.8,basis=CURRENT_SPEND \
  --threshold-rule=percent=1.0,basis=CURRENT_SPEND \
  --notifications-rule-pubsub-topic=projects/YOUR_PROJECT/topics/billing-alerts

AWS Bedrock + Cost Explorer

aws budgets create-budget \
  --account-id YOUR_ACCOUNT_ID \
  --budget '{
    "BudgetName": "Bedrock-Monthly",
    "BudgetLimit": {"Amount": "500", "Unit": "USD"},
    "TimeUnit": "MONTHLY",
    "BudgetType": "COST",
    "CostFilters": {"Service": ["Amazon Bedrock"]}
  }' \
  --notifications-with-subscribers '[{
    "Notification": {
      "NotificationType": "ACTUAL",
      "ComparisonOperator": "GREATER_THAN",
      "Threshold": 80.0
    },
    "Subscribers": [{
      "SubscriptionType": "EMAIL",
      "Address": "[email protected]"
    }]
  }]'

Tip: Use provider-native limits as a "last resort" safety net, not as your primary budget control. Provider limits are coarse (monthly only) and take effect too late. Your application-level budget enforcement (from Topic 3) should catch 99% of overruns before provider limits trigger.


Building a Cost Dashboard

For teams with Grafana or similar tooling, a token cost dashboard should include:

Key Panels

  1. Daily spend by project (bar chart, last 30 days) — Spot trends and anomalies
  2. Budget utilization gauges (one per project) — At-a-glance health
  3. Cost by task type (pie chart, current month) — Identify top cost drivers
  4. Token volume over time (time series, input vs. output vs. cache hits) — Efficiency trends
  5. Requests per model (stacked bar) — Track model tier distribution
  6. P99 latency vs. cost (scatter plot) — Correlate quality/speed tradeoffs
  7. Anomaly table (requests >2x average cost) — Runaway detection

Grafana Dashboard JSON (Key Query Examples)

-- Panel: Daily spend by project (PostgreSQL data source)
SELECT
  DATE(timestamp) as time,
  project_id,
  SUM(total_cost) as cost_usd
FROM token_usage_events
WHERE timestamp >= NOW() - INTERVAL '30 days'
GROUP BY DATE(timestamp), project_id
ORDER BY time;

-- Panel: Cache hit rate trend
SELECT
  DATE_TRUNC('hour', timestamp) as time,
  project_id,
  ROUND(100.0 * SUM(cache_read_tokens) / NULLIF(SUM(input_tokens), 0), 1) as cache_hit_pct
FROM token_usage_events
WHERE timestamp >= NOW() - INTERVAL '7 days'
GROUP BY DATE_TRUNC('hour', timestamp), project_id
ORDER BY time;

Tip: Publish your cost dashboard to a shared company URL and link it from your team's Slack channel topic. When the dashboard is passively visible, engineers naturally start optimizing before they're asked to — cost awareness becomes ambient rather than episodic.


Incident Response for Cost Spikes

When an alert fires, your team needs a runbook. Here is a template:

COST SPIKE RUNBOOK

1. IDENTIFY
   - Which project/model is spiking?
   - When did it start? (Check dashboard timestamp)
   - Is it many small requests or a few large ones?
     → Query: SELECT event_id, total_cost, input_tokens, task_type 
               FROM token_usage_events 
               WHERE project_id = 'X' AND timestamp > 'Y' 
               ORDER BY total_cost DESC LIMIT 20

2. CONTAIN
   - If runaway loop: deploy emergency max_steps=5 override
   - If runaway users: temporarily lower per-session budget
   - If model misconfiguration: revert to last known good config

3. ROOT CAUSE
   - Was a new feature deployed? Check git log
   - Did input data distribution change? Check input token histogram
   - Did a test script run in production?

4. REMEDIATE
   - Fix the underlying issue
   - Add a specific guardrail to prevent recurrence
   - Document in incident log

5. REVIEW
   - Post-incident: update budget estimates and alert thresholds
   - Share learnings with team in weekly digest

Tip: Run a "fire drill" quarterly — intentionally trigger your cost spike alert with a test request and time how long it takes your team to identify the cause and contain it. Fire drills reveal gaps in your runbook and monitoring coverage before a real incident does.


Summary

Real-time cost monitoring for agentic workflows requires instrumentation at the API wrapper level, aggregation into queryable metrics, threshold-based alerting with appropriate routing, and a defined incident response process. Provider-native monitoring tools provide a useful last-resort safety net but are insufficient as primary controls. A well-instrumented system turns cost management from a monthly invoice surprise into a continuous operational practice — one that naturally drives optimization behavior across engineering, QA, and product teams.