Skip to main content

FastAPI Stream Response

In modern web applications, there are many scenarios where you need to stream data to clients rather than sending complete responses at once. FastAPI's StreamingResponse allows you to efficiently handle these situations, enabling you to work with large datasets, real-time updates, and more.

What is a StreamingResponse?

StreamingResponse is a response type in FastAPI that allows you to return a stream of data to the client instead of sending the entire response at once. This is particularly useful when:

  • Working with large files or datasets that would consume too much memory
  • Providing real-time updates or continuous data feeds
  • Implementing server-sent events
  • Creating long-running operations with progressive output

Basic Usage of StreamingResponse

Let's start with the basic structure of a StreamingResponse in FastAPI:

python
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import io

app = FastAPI()

@app.get("/stream")
def get_stream():
def generate_content():
for i in range(10):
yield f"Line {i}\n"

return StreamingResponse(generate_content())

In this example, we create a generator function generate_content() that yields lines of text. The StreamingResponse takes this generator and streams the output to the client one item at a time.

Understanding Generator Functions

The core of StreamingResponse is the generator function that provides the content to be streamed. In Python, a generator function:

  • Uses the yield keyword instead of return
  • Maintains its state between yields
  • Returns an iterator that can be consumed incrementally

Here's a more detailed example:

python
@app.get("/countdown")
def countdown_stream():
def countdown_generator():
count = 5
while count > 0:
yield f"Counting: {count}\n"
count -= 1
# In a real application, you might add a time.sleep() here
yield "Blast off!\n"

return StreamingResponse(countdown_generator())

When a client requests /countdown, they'll receive each line as it's generated, rather than waiting for the entire countdown to complete.

Content Types and Headers

You can specify the content type and additional headers when creating a StreamingResponse:

python
@app.get("/stream-csv")
def stream_csv():
def generate_csv():
yield "id,name,value\n"
for i in range(1000):
yield f"{i},item-{i},{i*10}\n"

return StreamingResponse(
generate_csv(),
media_type="text/csv",
headers={"Content-Disposition": "attachment; filename=data.csv"}
)

This example streams a large CSV file and instructs the browser to download it as an attachment named "data.csv".

Real-World Example: Streaming a Large File

One common use case for StreamingResponse is serving large files without loading them entirely into memory:

python
@app.get("/download-large-file/{file_path}")
def download_large_file(file_path: str):
def file_iterator(file_path):
with open(file_path, mode="rb") as file:
yield from file

# Determine content type based on file extension
content_type = "application/octet-stream" # Default binary
if file_path.endswith(".pdf"):
content_type = "application/pdf"
elif file_path.endswith(".txt"):
content_type = "text/plain"
# Add more types as needed

return StreamingResponse(
file_iterator(file_path),
media_type=content_type,
headers={"Content-Disposition": f"attachment; filename={file_path.split('/')[-1]}"}
)

This endpoint streams a file from disk in chunks, avoiding memory issues when dealing with large files.

Server-Sent Events (SSE)

StreamingResponse is perfect for implementing Server-Sent Events, a technology for pushing updates to browsers:

python
@app.get("/sse")
def server_sent_events():
def event_generator():
for i in range(10):
# Format according to SSE specification
yield f"data: {{'id': {i}, 'message': 'Update {i}'}}\n\n"
# In a real application, add time.sleep() or await asyncio.sleep()

return StreamingResponse(
event_generator(),
media_type="text/event-stream"
)

The client would connect to this endpoint and receive updates as they're generated.

Async Support

FastAPI is built for asynchronous operations, and StreamingResponse works perfectly with async generators:

python
import asyncio

@app.get("/async-stream")
async def async_stream():
async def async_generator():
for i in range(10):
await asyncio.sleep(0.5) # Non-blocking sleep
yield f"Async line {i}\n"

return StreamingResponse(async_generator())

This allows your application to handle many concurrent streaming connections efficiently.

Real-Time Processing Example

Here's a more complex example that processes data in real-time as it's being streamed:

python
@app.get("/process-stream")
def process_stream():
def data_processor():
# Imagine this is a large dataset or continuous data source
data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

# Stream header
yield "Processing data in real-time...\n"

running_sum = 0
for item in data:
# Process each item
processed = item * 10
running_sum += processed

# Yield progress update
yield f"Processed item: {item}{processed}, Running total: {running_sum}\n"

# Final summary
yield f"Processing complete. Final total: {running_sum}\n"

return StreamingResponse(data_processor())

This pattern is useful for data transformation pipelines, log processing, or any scenario where you want to show processing progress to users.

Error Handling in Streams

When working with streams, proper error handling is essential:

python
@app.get("/stream-with-error-handling")
def stream_with_error_handling():
def generator_with_error_handling():
try:
for i in range(10):
if i == 5:
# Simulate an error
raise ValueError("Something went wrong!")
yield f"Line {i}\n"
except Exception as e:
yield f"Error occurred: {str(e)}\n"
# You might want to log the error here too
finally:
yield "Stream complete (with or without errors).\n"

return StreamingResponse(generator_with_error_handling())

This ensures that clients receive information about errors and that resources are properly cleaned up.

Practical Use Cases

Here are some practical applications for StreamingResponse:

  1. Log file streaming: Display logs in real-time on a web interface
  2. Progress reporting: Report progress of long-running tasks
  3. Data export: Stream large datasets as CSV or other formats
  4. Media streaming: Stream audio or video content
  5. Real-time analytics: Stream results of ongoing analytics processes
  6. Chat applications: Implement server-to-client message streaming

Performance Considerations

While streaming responses are powerful, keep these considerations in mind:

  1. Connection management: Clients may disconnect before the stream completes
  2. Resource usage: Long-lived connections require server resources
  3. Buffering: Some proxies or clients might buffer responses anyway
  4. Timeouts: Configure appropriate timeouts for your streaming endpoints

Summary

FastAPI's StreamingResponse provides an elegant solution for streaming data to clients, enabling efficient handling of large datasets, real-time updates, and progressive responses. By using generator functions, you can process and send data incrementally, reducing memory usage and improving user experience.

Remember these key points:

  • Use yield statements in generator functions to produce stream content
  • Specify the appropriate media_type for your content
  • Leverage async generators for non-blocking streaming
  • Implement proper error handling in your streaming functions
  • Consider performance implications for long-lived streams

Additional Resources

Exercises

  1. Create a streaming endpoint that serves lines from a large log file in real-time.
  2. Implement a streaming endpoint that processes uploaded CSV data on-the-fly and returns transformed data.
  3. Build a simple chat application using StreamingResponse for message delivery.
  4. Create a progress tracker for a long-running task that updates clients in real-time.
  5. Implement an endpoint that streams the results of a database query as it processes each row.


If you spot any mistakes on this website, please let me know at [email protected]. I’d greatly appreciate your feedback! :)