These patterns build on the basic baking workflows. Use them when deploying to production.
Overview
Production-ready baking workflows need:- Job polling - Wait for async operations (stim, rollout, bake) to complete
- Error handling - Gracefully handle auth failures, timeouts, API errors
- Retry logic - Automatically retry transient failures
- Logging - Track progress for debugging and monitoring
- Timeouts - Prevent jobs from running indefinitely
- Async support - Better concurrency for high-throughput pipelines
Job Polling Pattern
The Problem
Stim, rollout, and bake jobs run asynchronously. You need to wait for completion:Copy
# This starts the job but doesn't wait
client.targets.stim.run(target_name="my_target", repo_name="my_repo")
# How do you know when it's done?
The Solution
Poll for job status with proper intervals:Copy
import time
from typing import Callable, Any
def wait_for_job(
get_status_fn: Callable[[], Any],
job_name: str,
poll_interval: int = 5,
timeout: int = 3600
) -> Any:
"""Poll job status until completion or timeout."""
start_time = time.time()
deadline = start_time + timeout
while time.time() < deadline:
try:
status = get_status_fn()
if status.status == "complete":
return status
if status.status == "failed":
error_msg = getattr(status, "error", "Unknown error")
raise RuntimeError(f"{job_name} failed: {error_msg}")
time.sleep(poll_interval)
except Exception as e:
if "timeout" in str(e).lower() or "connection" in str(e).lower():
raise
# Retry on transient errors
time.sleep(poll_interval)
raise TimeoutError(f"{job_name} exceeded {timeout}s timeout")
Copy
from aibread import Bread
client = Bread()
# Start job and wait for completion
client.targets.stim.run(target_name="my_target", repo_name="my_repo")
status = wait_for_job(
lambda: client.targets.stim.get("my_target", repo_name="my_repo"),
job_name="Stim",
poll_interval=5,
timeout=3600
)
print(f"Generated {status.lines} lines")
Recommended Poll Intervals
| Job Type | Interval | Typical Duration |
|---|---|---|
| Stim | 5 seconds | 30s - 5min |
| Rollout | 10 seconds | 2min - 30min |
| Bake | 30 seconds | 10min - 2hours |
Error Handling Pattern
Comprehensive Error Handling
Handle specific error types with appropriate responses:Copy
import aibread
from aibread import Bread
def safe_bake_workflow(repo_name: str, target_name: str):
"""
Production workflow with comprehensive error handling.
"""
try:
client = Bread()
# Verify repository exists
try:
client.repo.get(repo_name)
except aibread.NotFoundError:
print(f"Repository '{repo_name}' not found, creating...")
client.repo.set(repo_name=repo_name)
# Run stim with error handling
try:
client.targets.stim.run(
target_name=target_name,
repo_name=repo_name
)
wait_for_job(
lambda: client.targets.stim.get(target_name, repo_name=repo_name),
"Stim",
timeout=3600
)
except TimeoutError as e:
print(f"Stim timed out: {e}")
raise
# Run rollout with error handling
try:
client.targets.rollout.run(
target_name=target_name,
repo_name=repo_name
)
wait_for_job(
lambda: client.targets.rollout.get(target_name, repo_name=repo_name),
"Rollout",
timeout=7200
)
except TimeoutError as e:
print(f"Rollout timed out: {e}")
raise
print("Workflow completed successfully")
except aibread.AuthenticationError:
print("Authentication failed - check BREAD_API_KEY")
raise
except aibread.APIConnectionError as e:
print(f"Connection failed: {e}")
print("Check your internet connection and API endpoint")
raise
except aibread.RateLimitError as e:
print(f"Rate limited: {e}")
print("Wait before retrying")
raise
except aibread.APIStatusError as e:
print(f"API error {e.status_code}: {e.response}")
raise
except Exception as e:
print(f"Unexpected error: {e}")
raise
# Usage
safe_bake_workflow("my_repo", "my_target")
Retry Pattern
Automatic Retries for Transient Failures
Some failures are temporary (network blips, rate limits). Retry with exponential backoff:Copy
import time
from functools import wraps
import aibread
def retry_on_failure(max_attempts=3, backoff_factor=2):
"""
Decorator for automatic retries with exponential backoff.
Args:
max_attempts: Maximum retry attempts
backoff_factor: Multiplier for wait time (2 = double each retry)
"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_attempts):
try:
return func(*args, **kwargs)
except aibread.APIConnectionError as e:
if attempt == max_attempts - 1:
raise
wait_time = backoff_factor ** attempt
print(f"Connection error, retrying in {wait_time}s... (attempt {attempt + 1}/{max_attempts})")
time.sleep(wait_time)
except aibread.RateLimitError as e:
if attempt == max_attempts - 1:
raise
# Rate limit errors often include retry-after
wait_time = getattr(e, 'retry_after', backoff_factor ** attempt)
print(f"Rate limited, retrying in {wait_time}s...")
time.sleep(wait_time)
return None
return wrapper
return decorator
# Usage
@retry_on_failure(max_attempts=3, backoff_factor=2)
def create_target(client, target_name, repo_name, config):
"""Create target with automatic retries."""
return client.targets.set(
target_name=target_name,
repo_name=repo_name,
template="default",
overrides=config
)
# Calls will automatically retry on connection/rate limit errors
create_target(client, "my_target", "my_repo", {...})
Logging Pattern
Structured Logging for Production
Use proper logging libraries for log levels and formatting:Copy
import logging
from aibread import Bread
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
def production_bake_workflow(
repo_name: str,
target_name: str,
bake_name: str
):
"""Production workflow with comprehensive logging."""
logger.info(f"Starting bake workflow: {bake_name}")
try:
client = Bread()
logger.info("Client initialized")
# Stim
logger.info(f"Starting stim for target: {target_name}")
client.targets.stim.run(target_name=target_name, repo_name=repo_name)
status = wait_for_job(
lambda: client.targets.stim.get(target_name, repo_name=repo_name),
"Stim"
)
logger.info(f"Stim completed: {status.lines} lines generated")
# Rollout
logger.info(f"Starting rollout for target: {target_name}")
client.targets.rollout.run(target_name=target_name, repo_name=repo_name)
status = wait_for_job(
lambda: client.targets.rollout.get(target_name, repo_name=repo_name),
"Rollout"
)
logger.info(f"Rollout completed: {status.lines} trajectories")
# Bake
logger.info(f"Starting bake: {bake_name}")
client.bakes.run(bake_name=bake_name, repo_name=repo_name)
status = wait_for_job(
lambda: client.bakes.get(bake_name, repo_name=repo_name),
"Bake",
poll_interval=30
)
logger.info(f"Bake completed successfully: {bake_name}")
return status
except Exception as e:
logger.error(f"Workflow failed: {e}", exc_info=True)
raise
# Usage
production_bake_workflow("my_repo", "my_target", "my_bake")
Async Pattern
Async/Await for Better Concurrency
Use async workflows when running multiple bakes in parallel:Copy
import asyncio
from aibread import AsyncBread
async def async_wait_for_job(
client,
get_status_coro,
job_name: str,
poll_interval: int = 5,
timeout: int = 3600
):
"""Async version of wait_for_job."""
print(f"Waiting for {job_name}...")
start_time = asyncio.get_event_loop().time()
deadline = start_time + timeout
while asyncio.get_event_loop().time() < deadline:
# Get status
status = await get_status_coro()
if status.status == "complete":
print(f"{job_name} complete")
return status
if status.status == "failed":
raise RuntimeError(f"{job_name} failed")
await asyncio.sleep(poll_interval)
raise TimeoutError(f"{job_name} exceeded timeout")
async def async_target_workflow(
repo_name: str,
target_name: str
):
"""Complete async target workflow."""
async with AsyncBread() as client:
# Run stim
await client.targets.stim.run(target_name=target_name, repo_name=repo_name)
await async_wait_for_job(
client,
lambda: client.targets.stim.get(target_name, repo_name=repo_name),
"Stim"
)
# Run rollout
await client.targets.rollout.run(target_name=target_name, repo_name=repo_name)
await async_wait_for_job(
client,
lambda: client.targets.rollout.get(target_name, repo_name=repo_name),
"Rollout",
poll_interval=10
)
print(f"Workflow complete: {target_name}")
# Run multiple workflows in parallel
async def run_parallel_workflows():
"""Run multiple target workflows concurrently."""
await asyncio.gather(
async_target_workflow("repo1", "target1"),
async_target_workflow("repo2", "target2"),
async_target_workflow("repo3", "target3")
)
# Usage
asyncio.run(run_parallel_workflows())
Complete Production Example
Putting it all together:Copy
import logging
import time
import aibread
from aibread import Bread
from typing import Callable
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def wait_for_job(
get_status_fn: Callable,
job_name: str,
poll_interval: int = 5,
timeout: int = 3600
):
"""Poll job until completion with timeout."""
logger.info(f"Waiting for {job_name}...")
start_time = time.time()
deadline = start_time + timeout
while time.time() < deadline:
status = get_status_fn()
if status.status == "complete":
logger.info(f"{job_name} complete ({status.lines} lines)")
return status
if status.status == "failed":
raise RuntimeError(f"{job_name} failed: {status.error}")
logger.debug(f"{job_name}: {status.status}")
time.sleep(poll_interval)
raise TimeoutError(f"{job_name} exceeded {timeout}s timeout")
def retry_on_failure(max_attempts=3):
"""Decorator for automatic retries."""
def decorator(func):
def wrapper(*args, **kwargs):
for attempt in range(max_attempts):
try:
return func(*args, **kwargs)
except (aibread.APIConnectionError, aibread.RateLimitError) as e:
if attempt == max_attempts - 1:
raise
wait_time = 2 ** attempt
logger.warning(f"Retry {attempt + 1}/{max_attempts} after {wait_time}s")
time.sleep(wait_time)
return wrapper
return decorator
@retry_on_failure(max_attempts=3)
def create_target_with_retry(client, target_name, repo_name, config):
"""Create target with automatic retries."""
return client.targets.set(
target_name=target_name,
repo_name=repo_name,
template="default",
overrides=config
)
def production_workflow(
repo_name: str,
target_name: str,
target_config: dict,
bake_name: str,
bake_config: dict
):
"""
Complete production workflow with error handling, retries, and logging.
"""
logger.info(f"Starting production workflow: {bake_name}")
try:
client = Bread()
# Create/verify repository
try:
client.repo.get(repo_name)
logger.info(f"Repository '{repo_name}' exists")
except aibread.NotFoundError:
logger.info(f"Creating repository: {repo_name}")
client.repo.set(repo_name=repo_name)
# Create target with retry
logger.info(f"Creating target: {target_name}")
create_target_with_retry(client, target_name, repo_name, target_config)
# Run stim
logger.info("Starting stim job")
client.targets.stim.run(target_name=target_name, repo_name=repo_name)
wait_for_job(
lambda: client.targets.stim.get(target_name, repo_name=repo_name),
"Stim",
poll_interval=5,
timeout=3600
)
# Run rollout
logger.info("Starting rollout job")
client.targets.rollout.run(target_name=target_name, repo_name=repo_name)
wait_for_job(
lambda: client.targets.rollout.get(target_name, repo_name=repo_name),
"Rollout",
poll_interval=10,
timeout=7200
)
# Configure and run bake
logger.info(f"Configuring bake: {bake_name}")
client.bakes.set(
bake_name=bake_name,
repo_name=repo_name,
template="default",
overrides=bake_config
)
logger.info("Starting bake")
client.bakes.run(bake_name=bake_name, repo_name=repo_name)
wait_for_job(
lambda: client.bakes.get(bake_name, repo_name=repo_name),
"Bake",
poll_interval=30,
timeout=14400 # 4 hours
)
logger.info(f"Production workflow completed: {bake_name}")
return True
except aibread.AuthenticationError:
logger.error("Authentication failed - check BREAD_API_KEY")
raise
except aibread.APIConnectionError as e:
logger.error(f"Connection failed: {e}")
raise
except TimeoutError as e:
logger.error(f"Job timeout: {e}")
raise
except Exception as e:
logger.error(f"Workflow failed: {e}", exc_info=True)
raise
# Usage
production_workflow(
repo_name="production_repo",
target_name="yoda_target",
target_config={
"generators": [...],
"model_name": "Qwen/Qwen3-32B",
"teacher_prompt": "yoda_teacher",
"student_prompt": "empty_student"
},
bake_name="yoda_production_v1",
bake_config={
"datasets": [{"target": "yoda_target", "weight": 1.0}],
"epochs": 3
}
)
Best Practices
Use Appropriate Timeouts
Use Appropriate Timeouts
Set realistic timeouts based on job type. Stim: 1h, Rollout: 2h, Bake: 4h.
Log Everything
Log Everything
Log start/completion of each phase, error details, and timing info for debugging.
Retry Transient Failures
Retry Transient Failures
Automatically retry connection errors and rate limits with exponential backoff.
Handle All Error Types
Handle All Error Types
Catch specific exceptions (AuthenticationError, APIConnectionError) not just Exception.
Use Async for Parallel Jobs
Use Async for Parallel Jobs
Process multiple targets concurrently with asyncio for better throughput.