return_aggregate_stream feature to automatically collect all yielded outputs into a single aggregated response. This simplifies result handling by eliminating the need to manually collect and format streaming results, making your handlers easier to implement and consume.
This guide shows you how to use output aggregation effectively in your applications.
Understanding output aggregation
By default, streaming handlers that yield results are only accessible via the/stream operation, which returns results as they become available. When you enable return_aggregate_stream, Runpod automatically:
- Collects all yielded results as your handler produces them.
- Aggregates them into a single list.
- Makes the complete aggregated results available via
/runand/runsyncoperations.
Basic aggregation example
Here’s a simple handler that processes multiple items and yields results incrementally:handler.py
- Use
/streamto receive each result as it’s yielded. - Use
/runor/runsyncto receive all results aggregated into a list.
Processing multiple items
A common pattern is processing a batch of items and yielding results as each completes. This is useful for tasks like:- Analyzing sentiment for multiple text samples.
- Generating images from multiple prompts.
- Running inference on multiple inputs.
handler.py
Local testing with aggregation
When testing locally, generators behave differently than in production. You need to handle the difference between the local test environment and production:handler.py
local_test (indicating local testing) and converts the generator to a list. In production, it returns the generator directly, allowing Runpod to handle the aggregation.
Testing locally
Create a test input file to verify your aggregation works correctly:test_input.json
Understanding the output format
Whenreturn_aggregate_stream is enabled, the final output structure includes all yielded results in a list:
Without aggregation (streaming only):
- Results arrive one at a time via
/stream. - No combined output available via
/runor/runsync.
- Individual results still available via
/streamas they’re yielded. - Complete aggregated list available via
/runand/runsync:
When to use output aggregation
Usereturn_aggregate_stream for:
- Batch processing: Processing multiple items and clients need the complete set of results.
- Progress tracking: Clients want to see incremental progress but also need the final aggregated results.
- Flexible consumption: Supporting both streaming and batch consumption patterns.
- Simplified integration: Clients don’t want to implement streaming logic but still benefit from incremental processing.
- Large result sets: Aggregating thousands of results can create memory pressure and large response payloads.
- True streaming only: Results should only be consumed as a stream (like real-time chat).
- Single result: Handler only returns one result (no need for aggregation).
Best practices
- Memory management: Be mindful of memory usage when aggregating large numbers of results.
-
Payload limits: Remember the payload size limits:
/runoperation: 10 MB/runsyncoperation: 20 MB
-
Error handling: Handle errors for individual items without breaking the entire batch:
- Consistent output structure: Yield results in a consistent format to simplify client-side processing.
Combining with async handlers
You can also use aggregation with async handlers for concurrent processing:handler.py
Next steps
- Learn more about streaming handlers.
- Explore async handlers for concurrent processing.
- Understand error handling for robust batch processing.
- Review payload limits to avoid oversized responses.