Overview
Stim and rollout outputs support pagination for efficiently handling large datasets. Use offset and limit parameters to paginate through results.
Maximum number of items to return per request (max 1000)
Starting position in the dataset (0-indexed)
Response field indicating if more data is available
# First page
output = client.targets.stim.get_output(
target_name = "my_target" ,
repo_name = "my_repo" ,
limit = 100 ,
offset = 0
)
print ( f "Retrieved { len (output.output) } items" )
print ( f "Has more: { output.has_more } " )
# Next page
if output.has_more:
next_page = client.targets.stim.get_output(
target_name = "my_target" ,
repo_name = "my_repo" ,
limit = 100 ,
offset = 100
)
Fetch All Results
def get_all_output ( target_name : str , repo_name : str , output_type : str = "stim" ):
"""Fetch all stim or rollout output with pagination"""
all_output = []
offset = 0
limit = 1000 # Maximum allowed
# Select appropriate method
if output_type == "stim" :
get_fn = lambda : client.targets.stim.get_output(
target_name, repo_name, limit = limit, offset = offset
)
else :
get_fn = lambda : client.targets.rollout.get_output(
target_name, repo_name, limit = limit, offset = offset
)
while True :
chunk = get_fn()
all_output.extend(chunk.output)
print ( f "Fetched { len (chunk.output) } items (total: { len (all_output) } )" )
if not chunk.has_more:
break
offset += limit
return all_output
# Usage
stimuli = get_all_output( "my_target" , "my_repo" , "stim" )
trajectories = get_all_output( "my_target" , "my_repo" , "rollout" )
import asyncio
from aibread import AsyncBread
async def get_all_output_async ( target_name : str , repo_name : str ):
"""Async pagination for better performance"""
async with AsyncBread() as client:
all_output = []
offset = 0
limit = 1000
while True :
chunk = await client.targets.stim.get_output(
target_name = target_name,
repo_name = repo_name,
limit = limit,
offset = offset
)
all_output.extend(chunk.output)
if not chunk.has_more:
break
offset += limit
return all_output
# Run
output = asyncio.run(get_all_output_async( "my_target" , "my_repo" ))
Generator Pattern
Use generators/iterators for memory-efficient iteration:
def iterate_output ( target_name : str , repo_name : str , chunk_size : int = 1000 ):
"""Generator that yields chunks of output"""
offset = 0
while True :
chunk = client.targets.stim.get_output(
target_name = target_name,
repo_name = repo_name,
limit = chunk_size,
offset = offset
)
yield chunk.output
if not chunk.has_more:
break
offset += chunk_size
# Usage
for chunk in iterate_output( "my_target" , "my_repo" ):
for item in chunk:
process(item) # Process items without loading all into memory
Progress Tracking
def fetch_with_progress ( target_name : str , repo_name : str ):
"""Fetch output with progress tracking"""
# Get total count first
initial = client.targets.stim.get_output(
target_name = target_name,
repo_name = repo_name,
limit = 1 ,
offset = 0
)
total_lines = initial.lines
all_output = []
offset = 0
limit = 1000
print ( f "Total items: { total_lines } " )
while True :
chunk = client.targets.stim.get_output(
target_name = target_name,
repo_name = repo_name,
limit = limit,
offset = offset
)
all_output.extend(chunk.output)
# Progress percentage
progress = ( len (all_output) / total_lines) * 100
print ( f "Progress: { progress :.1f} % ( { len (all_output) } / { total_lines } )" )
if not chunk.has_more:
break
offset += limit
return all_output
Best Practices
Set limit=1000 to minimize API calls
Always check the has_more field instead of guessing when to stop
For very large datasets, use generators to avoid loading everything into memory
Implement error handling and retry logic for robust pagination
Show progress indicators for better UX when fetching large datasets
Next Steps