Skip to main content
When building a streaming that yields results incrementally, you can use the return_aggregate_stream feature to automatically collect all yielded outputs into a single aggregated response. This simplifies result handling by eliminating the need to manually collect and format streaming results, making your handlers easier to implement and consume. This guide shows you how to use output aggregation effectively in your applications.

Understanding output aggregation

By default, streaming handlers that yield results are only accessible via the /stream operation, which returns results as they become available. When you enable return_aggregate_stream, Runpod automatically:
  1. Collects all yielded results as your handler produces them.
  2. Aggregates them into a single list.
  3. Makes the complete aggregated results available via /run and /runsync operations.
This allows clients to choose between streaming individual results as they arrive or waiting for the complete aggregated response.

Basic aggregation example

Here’s a simple handler that processes multiple items and yields results incrementally:
handler.py
import runpod

def handler(job):
    job_input = job["input"]
    items = job_input.get("items", [])

    results = []
    for item in items:
        # Process each item
        result = f"Processed: {item}"

        # Yield result immediately for streaming
        yield result

        # Also collect for final return
        results.append(result)

    # Return complete list
    return results

runpod.serverless.start({
    "handler": handler,
    "return_aggregate_stream": True
})
When a client calls this handler with multiple items, they can:
  • Use /stream to receive each result as it’s yielded.
  • Use /run or /runsync to receive all results aggregated into a list.

Processing multiple items

A common pattern is processing a batch of items and yielding results as each completes. This is useful for tasks like:
  • Analyzing sentiment for multiple text samples.
  • Generating images from multiple prompts.
  • Running inference on multiple inputs.
Here’s a practical example:
handler.py
import runpod
import time

def analyze_items(items, task_type):
    """Process items based on task type."""
    results = []

    for item in items:
        # Simulate processing time
        time.sleep(0.5)

        # Process based on type
        if task_type == "sentiment":
            result = {"text": item, "sentiment": "positive", "score": 0.92}
        elif task_type == "classify":
            result = {"text": item, "category": "technology", "confidence": 0.88}
        else:
            result = {"error": f"Unknown task type: {task_type}"}

        # Yield each result as it completes
        yield result
        results.append(result)

    return results

def handler(job):
    job_input = job["input"]
    task_type = job_input.get("task_type", "sentiment")
    items = job_input.get("items", [])

    # Validate input
    if not items:
        return {"error": "No items provided"}

    # Process items and yield results
    return analyze_items(items, task_type)

runpod.serverless.start({
    "handler": handler,
    "return_aggregate_stream": True
})
This handler processes each item sequentially, yielding results immediately while building a complete list to return.

Local testing with aggregation

When testing locally, generators behave differently than in production. You need to handle the difference between the local test environment and production:
handler.py
import runpod

def handler(job):
    job_input = job["input"]
    items = job_input.get("items", [])

    for item in items:
        result = f"Processed: {item}"
        yield result

def start_handler():
    """Wrapper to handle local testing vs. production."""
    def wrapper(job):
        generator = handler(job)

        # In local testing, convert generator to list
        if job.get("id") == "local_test":
            return list(generator)

        # In production, return the generator
        return generator

    runpod.serverless.start({
        "handler": wrapper,
        "return_aggregate_stream": True
    })

if __name__ == "__main__":
    start_handler()
The wrapper function checks if the job ID is local_test (indicating local testing) and converts the generator to a list. In production, it returns the generator directly, allowing Runpod to handle the aggregation.

Testing locally

Create a test input file to verify your aggregation works correctly:
test_input.json
{
  "input": {
    "task_type": "sentiment",
    "items": [
      "I love this product!",
      "The service was okay.",
      "Not great, could be better."
    ]
  }
}
Run your handler:
python handler.py --test_input '{"input": {"task_type": "sentiment", "items": ["Item 1", "Item 2", "Item 3"]}}'
You should see output showing each result being processed and the final aggregated list:
--- Starting Serverless Worker |  Version 1.6.2 ---
INFO   | Using test_input.json as job input.
DEBUG  | Retrieved local job: {'input': {'task_type': 'sentiment', 'items': ['Item 1', 'Item 2', 'Item 3']}, 'id': 'local_test'}
INFO   | local_test | Started.
DEBUG  | local_test | Handler output: ['Processed: Item 1', 'Processed: Item 2', 'Processed: Item 3']
INFO   | Job local_test completed successfully.

Understanding the output format

When return_aggregate_stream is enabled, the final output structure includes all yielded results in a list: Without aggregation (streaming only):
  • Results arrive one at a time via /stream.
  • No combined output available via /run or /runsync.
With aggregation enabled:
  • Individual results still available via /stream as they’re yielded.
  • Complete aggregated list available via /run and /runsync:
{
  "output": [
    {"text": "Item 1", "sentiment": "positive", "score": 0.92},
    {"text": "Item 2", "sentiment": "neutral", "score": 0.54},
    {"text": "Item 3", "sentiment": "negative", "score": 0.78}
  ]
}

When to use output aggregation

Use return_aggregate_stream for:
  • Batch processing: Processing multiple items and clients need the complete set of results.
  • Progress tracking: Clients want to see incremental progress but also need the final aggregated results.
  • Flexible consumption: Supporting both streaming and batch consumption patterns.
  • Simplified integration: Clients don’t want to implement streaming logic but still benefit from incremental processing.
Don’t use it for:
  • Large result sets: Aggregating thousands of results can create memory pressure and large response payloads.
  • True streaming only: Results should only be consumed as a stream (like real-time chat).
  • Single result: Handler only returns one result (no need for aggregation).

Best practices

  1. Memory management: Be mindful of memory usage when aggregating large numbers of results.
  2. Payload limits: Remember the payload size limits:
    • /run operation: 10 MB
    • /runsync operation: 20 MB
    If aggregated results exceed these limits, consider using streaming only or storing results in cloud storage.
  3. Error handling: Handle errors for individual items without breaking the entire batch:
    def handler(job):
        items = job["input"].get("items", [])
    
        for item in items:
            try:
                result = process_item(item)
                yield {"success": True, "result": result}
            except Exception as e:
                yield {"success": False, "error": str(e), "item": item}
    
  4. Consistent output structure: Yield results in a consistent format to simplify client-side processing.

Combining with async handlers

You can also use aggregation with async handlers for concurrent processing:
handler.py
import runpod
import asyncio

async def async_handler(job):
    items = job["input"].get("items", [])

    for item in items:
        # Simulate async processing
        await asyncio.sleep(0.5)

        result = f"Async processed: {item}"
        yield result

runpod.serverless.start({
    "handler": async_handler,
    "return_aggregate_stream": True
})
This combines the benefits of async processing with automatic output aggregation.

Next steps