Skip to main content
Production workflows need polling, error handling, retries, and monitoring. This guide shows battle-tested patterns for building robust baking pipelines.
These patterns build on the basic baking workflows. Use them when deploying to production.

Overview

Production-ready baking workflows need:
  • Job polling - Wait for async operations (stim, rollout, bake) to complete
  • Error handling - Gracefully handle auth failures, timeouts, API errors
  • Retry logic - Automatically retry transient failures
  • Logging - Track progress for debugging and monitoring
  • Timeouts - Prevent jobs from running indefinitely
  • Async support - Better concurrency for high-throughput pipelines

Job Polling Pattern

The Problem

Stim, rollout, and bake jobs run asynchronously. You need to wait for completion:
# This starts the job but doesn't wait
client.targets.stim.run(target_name="my_target", repo_name="my_repo")

# How do you know when it's done?

The Solution

Poll for job status with proper intervals:
import time
from typing import Callable, Any

def wait_for_job(
    get_status_fn: Callable[[], Any],
    job_name: str,
    poll_interval: int = 5,
    timeout: int = 3600
) -> Any:
    """Poll job status until completion or timeout."""
    start_time = time.time()
    deadline = start_time + timeout
    
    while time.time() < deadline:
        try:
            status = get_status_fn()
            
            if status.status == "complete":
                return status
            
            if status.status == "failed":
                error_msg = getattr(status, "error", "Unknown error")
                raise RuntimeError(f"{job_name} failed: {error_msg}")
            
            time.sleep(poll_interval)
        except Exception as e:
            if "timeout" in str(e).lower() or "connection" in str(e).lower():
                raise
            # Retry on transient errors
            time.sleep(poll_interval)
    
    raise TimeoutError(f"{job_name} exceeded {timeout}s timeout")
Usage:
from aibread import Bread

client = Bread()

# Start job and wait for completion
client.targets.stim.run(target_name="my_target", repo_name="my_repo")
status = wait_for_job(
    lambda: client.targets.stim.get("my_target", repo_name="my_repo"),
    job_name="Stim",
    poll_interval=5,
    timeout=3600
)
print(f"Generated {status.lines} lines")
Job TypeIntervalTypical Duration
Stim5 seconds30s - 5min
Rollout10 seconds2min - 30min
Bake30 seconds10min - 2hours

Error Handling Pattern

Comprehensive Error Handling

Handle specific error types with appropriate responses:
import aibread
from aibread import Bread

def safe_bake_workflow(repo_name: str, target_name: str):
    """
    Production workflow with comprehensive error handling.
    """
    try:
        client = Bread()
        
        # Verify repository exists
        try:
            client.repo.get(repo_name)
        except aibread.NotFoundError:
            print(f"Repository '{repo_name}' not found, creating...")
            client.repo.set(repo_name=repo_name)
        
        # Run stim with error handling
        try:
            client.targets.stim.run(
                target_name=target_name,
                repo_name=repo_name
            )
            wait_for_job(
                lambda: client.targets.stim.get(target_name, repo_name=repo_name),
                "Stim",
                timeout=3600
            )
        except TimeoutError as e:
            print(f"Stim timed out: {e}")
            raise
        
        # Run rollout with error handling
        try:
            client.targets.rollout.run(
                target_name=target_name,
                repo_name=repo_name
            )
            wait_for_job(
                lambda: client.targets.rollout.get(target_name, repo_name=repo_name),
                "Rollout",
                timeout=7200
            )
        except TimeoutError as e:
            print(f"Rollout timed out: {e}")
            raise
        
        print("Workflow completed successfully")
        
    except aibread.AuthenticationError:
        print("Authentication failed - check BREAD_API_KEY")
        raise
    
    except aibread.APIConnectionError as e:
        print(f"Connection failed: {e}")
        print("Check your internet connection and API endpoint")
        raise
    
    except aibread.RateLimitError as e:
        print(f"Rate limited: {e}")
        print("Wait before retrying")
        raise
    
    except aibread.APIStatusError as e:
        print(f"API error {e.status_code}: {e.response}")
        raise
    
    except Exception as e:
        print(f"Unexpected error: {e}")
        raise


# Usage
safe_bake_workflow("my_repo", "my_target")

Retry Pattern

Automatic Retries for Transient Failures

Some failures are temporary (network blips, rate limits). Retry with exponential backoff:
import time
from functools import wraps
import aibread

def retry_on_failure(max_attempts=3, backoff_factor=2):
    """
    Decorator for automatic retries with exponential backoff.
    
    Args:
        max_attempts: Maximum retry attempts
        backoff_factor: Multiplier for wait time (2 = double each retry)
    """
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            for attempt in range(max_attempts):
                try:
                    return func(*args, **kwargs)
                
                except aibread.APIConnectionError as e:
                    if attempt == max_attempts - 1:
                        raise
                    
                    wait_time = backoff_factor ** attempt
                    print(f"Connection error, retrying in {wait_time}s... (attempt {attempt + 1}/{max_attempts})")
                    time.sleep(wait_time)
                
                except aibread.RateLimitError as e:
                    if attempt == max_attempts - 1:
                        raise
                    
                    # Rate limit errors often include retry-after
                    wait_time = getattr(e, 'retry_after', backoff_factor ** attempt)
                    print(f"Rate limited, retrying in {wait_time}s...")
                    time.sleep(wait_time)
            
            return None
        return wrapper
    return decorator


# Usage
@retry_on_failure(max_attempts=3, backoff_factor=2)
def create_target(client, target_name, repo_name, config):
    """Create target with automatic retries."""
    return client.targets.set(
        target_name=target_name,
        repo_name=repo_name,
        template="default",
        overrides=config
    )


# Calls will automatically retry on connection/rate limit errors
create_target(client, "my_target", "my_repo", {...})

Logging Pattern

Structured Logging for Production

Use proper logging libraries for log levels and formatting:
import logging
from aibread import Bread

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

def production_bake_workflow(
    repo_name: str,
    target_name: str,
    bake_name: str
):
    """Production workflow with comprehensive logging."""
    logger.info(f"Starting bake workflow: {bake_name}")
    
    try:
        client = Bread()
        logger.info("Client initialized")
        
        # Stim
        logger.info(f"Starting stim for target: {target_name}")
        client.targets.stim.run(target_name=target_name, repo_name=repo_name)
        
        status = wait_for_job(
            lambda: client.targets.stim.get(target_name, repo_name=repo_name),
            "Stim"
        )
        logger.info(f"Stim completed: {status.lines} lines generated")
        
        # Rollout
        logger.info(f"Starting rollout for target: {target_name}")
        client.targets.rollout.run(target_name=target_name, repo_name=repo_name)
        
        status = wait_for_job(
            lambda: client.targets.rollout.get(target_name, repo_name=repo_name),
            "Rollout"
        )
        logger.info(f"Rollout completed: {status.lines} trajectories")
        
        # Bake
        logger.info(f"Starting bake: {bake_name}")
        client.bakes.run(bake_name=bake_name, repo_name=repo_name)
        
        status = wait_for_job(
            lambda: client.bakes.get(bake_name, repo_name=repo_name),
            "Bake",
            poll_interval=30
        )
        logger.info(f"Bake completed successfully: {bake_name}")
        
        return status
        
    except Exception as e:
        logger.error(f"Workflow failed: {e}", exc_info=True)
        raise


# Usage
production_bake_workflow("my_repo", "my_target", "my_bake")

Async Pattern

Async/Await for Better Concurrency

Use async workflows when running multiple bakes in parallel:
import asyncio
from aibread import AsyncBread

async def async_wait_for_job(
    client,
    get_status_coro,
    job_name: str,
    poll_interval: int = 5,
    timeout: int = 3600
):
    """Async version of wait_for_job."""
    print(f"Waiting for {job_name}...")
    start_time = asyncio.get_event_loop().time()
    deadline = start_time + timeout
    
    while asyncio.get_event_loop().time() < deadline:
        # Get status
        status = await get_status_coro()
        
        if status.status == "complete":
            print(f"{job_name} complete")
            return status
        
        if status.status == "failed":
            raise RuntimeError(f"{job_name} failed")
        
        await asyncio.sleep(poll_interval)
    
    raise TimeoutError(f"{job_name} exceeded timeout")


async def async_target_workflow(
    repo_name: str,
    target_name: str
):
    """Complete async target workflow."""
    async with AsyncBread() as client:
        # Run stim
        await client.targets.stim.run(target_name=target_name, repo_name=repo_name)
        await async_wait_for_job(
            client,
            lambda: client.targets.stim.get(target_name, repo_name=repo_name),
            "Stim"
        )
        
        # Run rollout
        await client.targets.rollout.run(target_name=target_name, repo_name=repo_name)
        await async_wait_for_job(
            client,
            lambda: client.targets.rollout.get(target_name, repo_name=repo_name),
            "Rollout",
            poll_interval=10
        )
        
        print(f"Workflow complete: {target_name}")


# Run multiple workflows in parallel
async def run_parallel_workflows():
    """Run multiple target workflows concurrently."""
    await asyncio.gather(
        async_target_workflow("repo1", "target1"),
        async_target_workflow("repo2", "target2"),
        async_target_workflow("repo3", "target3")
    )


# Usage
asyncio.run(run_parallel_workflows())

Complete Production Example

Putting it all together:
import logging
import time
import aibread
from aibread import Bread
from typing import Callable

# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def wait_for_job(
    get_status_fn: Callable,
    job_name: str,
    poll_interval: int = 5,
    timeout: int = 3600
):
    """Poll job until completion with timeout."""
    logger.info(f"Waiting for {job_name}...")
    start_time = time.time()
    deadline = start_time + timeout
    
    while time.time() < deadline:
        status = get_status_fn()
        
        if status.status == "complete":
            logger.info(f"{job_name} complete ({status.lines} lines)")
            return status
        
        if status.status == "failed":
            raise RuntimeError(f"{job_name} failed: {status.error}")
        
        logger.debug(f"{job_name}: {status.status}")
        time.sleep(poll_interval)
    
    raise TimeoutError(f"{job_name} exceeded {timeout}s timeout")


def retry_on_failure(max_attempts=3):
    """Decorator for automatic retries."""
    def decorator(func):
        def wrapper(*args, **kwargs):
            for attempt in range(max_attempts):
                try:
                    return func(*args, **kwargs)
                except (aibread.APIConnectionError, aibread.RateLimitError) as e:
                    if attempt == max_attempts - 1:
                        raise
                    wait_time = 2 ** attempt
                    logger.warning(f"Retry {attempt + 1}/{max_attempts} after {wait_time}s")
                    time.sleep(wait_time)
        return wrapper
    return decorator


@retry_on_failure(max_attempts=3)
def create_target_with_retry(client, target_name, repo_name, config):
    """Create target with automatic retries."""
    return client.targets.set(
        target_name=target_name,
        repo_name=repo_name,
        template="default",
        overrides=config
    )


def production_workflow(
    repo_name: str,
    target_name: str,
    target_config: dict,
    bake_name: str,
    bake_config: dict
):
    """
    Complete production workflow with error handling, retries, and logging.
    """
    logger.info(f"Starting production workflow: {bake_name}")
    
    try:
        client = Bread()
        
        # Create/verify repository
        try:
            client.repo.get(repo_name)
            logger.info(f"Repository '{repo_name}' exists")
        except aibread.NotFoundError:
            logger.info(f"Creating repository: {repo_name}")
            client.repo.set(repo_name=repo_name)
        
        # Create target with retry
        logger.info(f"Creating target: {target_name}")
        create_target_with_retry(client, target_name, repo_name, target_config)
        
        # Run stim
        logger.info("Starting stim job")
        client.targets.stim.run(target_name=target_name, repo_name=repo_name)
        wait_for_job(
            lambda: client.targets.stim.get(target_name, repo_name=repo_name),
            "Stim",
            poll_interval=5,
            timeout=3600
        )
        
        # Run rollout
        logger.info("Starting rollout job")
        client.targets.rollout.run(target_name=target_name, repo_name=repo_name)
        wait_for_job(
            lambda: client.targets.rollout.get(target_name, repo_name=repo_name),
            "Rollout",
            poll_interval=10,
            timeout=7200
        )
        
        # Configure and run bake
        logger.info(f"Configuring bake: {bake_name}")
        client.bakes.set(
            bake_name=bake_name,
            repo_name=repo_name,
            template="default",
            overrides=bake_config
        )
        
        logger.info("Starting bake")
        client.bakes.run(bake_name=bake_name, repo_name=repo_name)
        wait_for_job(
            lambda: client.bakes.get(bake_name, repo_name=repo_name),
            "Bake",
            poll_interval=30,
            timeout=14400  # 4 hours
        )
        
        logger.info(f"Production workflow completed: {bake_name}")
        return True
        
    except aibread.AuthenticationError:
        logger.error("Authentication failed - check BREAD_API_KEY")
        raise
    
    except aibread.APIConnectionError as e:
        logger.error(f"Connection failed: {e}")
        raise
    
    except TimeoutError as e:
        logger.error(f"Job timeout: {e}")
        raise
    
    except Exception as e:
        logger.error(f"Workflow failed: {e}", exc_info=True)
        raise


# Usage
production_workflow(
    repo_name="production_repo",
    target_name="yoda_target",
    target_config={
        "generators": [...],
        "model_name": "Qwen/Qwen3-32B",
        "teacher_prompt": "yoda_teacher",
        "student_prompt": "empty_student"
    },
    bake_name="yoda_production_v1",
    bake_config={
        "datasets": [{"target": "yoda_target", "weight": 1.0}],
        "epochs": 3
    }
)

Best Practices

Set realistic timeouts based on job type. Stim: 1h, Rollout: 2h, Bake: 4h.
Log start/completion of each phase, error details, and timing info for debugging.
Automatically retry connection errors and rate limits with exponential backoff.
Catch specific exceptions (AuthenticationError, APIConnectionError) not just Exception.
Process multiple targets concurrently with asyncio for better throughput.

Next Steps