Skip to main content
Production patterns for deploying cascadeflow at scale.

Retry with Exponential Backoff

import asyncio
from cascadeflow import CascadeAgent

async def execute_with_retry(agent, query, max_retries=3, base_delay=1.0):
    for attempt in range(max_retries):
        try:
            return await agent.run(query)
        except Exception as e:
            if attempt == max_retries - 1:
                raise
            delay = base_delay * (2 ** attempt)
            await asyncio.sleep(delay)

Rate Limiting

import time
from collections import deque

class RateLimiter:
    def __init__(self, max_requests: int, window_seconds: float):
        self.max_requests = max_requests
        self.window = window_seconds
        self.requests = deque()

    async def acquire(self):
        now = time.monotonic()
        while self.requests and self.requests[0] < now - self.window:
            self.requests.popleft()
        if len(self.requests) >= self.max_requests:
            wait = self.requests[0] + self.window - now
            await asyncio.sleep(wait)
        self.requests.append(time.monotonic())

Budget Management

import cascadeflow

cascadeflow.init(mode="enforce")

# Per-user daily budget
async def handle_user_request(user_id: str, query: str):
    user_budget = get_user_remaining_budget(user_id)

    with cascadeflow.run(budget=min(user_budget, 0.50)) as session:
        result = await agent.run(query)

        spent = session.summary()['cost_total']
        update_user_budget(user_id, spent)
        return result

Circuit Breaker

from cascadeflow import CircuitBreaker, CircuitBreakerConfig

config = CircuitBreakerConfig(
    failure_threshold=5,
    recovery_timeout=30.0,
    half_open_max_calls=2,
)

breaker = CircuitBreaker(config=config)

async def safe_call(agent, query):
    if not breaker.allow_request():
        return fallback_response(query)
    try:
        result = await agent.run(query)
        breaker.record_success()
        return result
    except Exception as e:
        breaker.record_failure()
        raise

Response Caching

from cascadeflow import ResponseCache

cache = ResponseCache(max_size=1000, ttl_seconds=300)

async def cached_run(agent, query):
    cached = cache.get(query)
    if cached:
        return cached
    result = await agent.run(query)
    cache.set(query, result)
    return result

Health Monitoring

with cascadeflow.run(budget=10.00) as session:
    for query in production_queries:
        result = await agent.run(query)

    summary = session.summary()

    # Alert on anomalies
    if summary['cost_total'] > 8.0:
        alert("Budget 80% consumed")
    if summary['steps'] > 100:
        alert("High step count")

Source

examples/production_patterns.py