FastAPI Response Streaming
Response streaming is a powerful feature in FastAPI that allows you to send data to clients incrementally instead of waiting for the entire response to be ready. This can significantly improve your application's performance when dealing with large datasets or implementing real-time features.
Introduction to Response Streaming
In traditional HTTP responses, the server prepares the entire response before sending it to the client. This approach works well for small responses, but it becomes inefficient when dealing with:
- Large datasets that take time to generate or retrieve
- Real-time data that needs to be sent as it becomes available
- Long-running operations where you want to provide incremental feedback
Response streaming solves these problems by allowing your application to send data in chunks, as soon as each piece is ready. The client can start processing the data immediately, without waiting for the complete response.
How Response Streaming Works in FastAPI
FastAPI provides several ways to implement response streaming:
- Using
StreamingResponse
- Using Server-Sent Events (SSE)
- Using WebSockets (for bidirectional communication)
In this guide, we'll focus primarily on the first two methods, as they're most commonly used for response streaming.
Implementing Basic Response Streaming
Using StreamingResponse
The StreamingResponse
class in FastAPI lets you return a stream of data. Here's a simple example:
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import time
app = FastAPI()
async def number_generator():
for i in range(1, 11):
yield f"Number: {i}\n"
await asyncio.sleep(1) # Simulate processing time
@app.get("/stream-numbers")
async def stream_numbers():
return StreamingResponse(number_generator(), media_type="text/plain")
In this example:
- We create an async generator function that yields numbers from 1 to 10
- We wrap this generator in a
StreamingResponse
- The client receives each number as it's generated, with a 1-second delay between them
When you access the /stream-numbers
endpoint, you'll see numbers appearing one by one, rather than waiting 10 seconds for all numbers to be returned at once.
Output:
Number: 1
Number: 2
Number: 3
...and so on
Streaming Large Data Sets
One of the most common use cases for response streaming is efficiently handling large datasets. Let's see how we can stream a large CSV file:
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import csv
import io
app = FastAPI()
@app.get("/download-large-csv")
async def download_large_csv():
# Function to generate CSV data
async def generate_csv():
# Create a StringIO object to write CSV data
buffer = io.StringIO()
writer = csv.writer(buffer)
# Write header
writer.writerow(["id", "name", "value"])
yield buffer.getvalue()
buffer.seek(0)
buffer.truncate(0)
# Write rows
for i in range(1, 100001):
writer.writerow([i, f"Item {i}", i * 3.14])
# Yield after every 1000 rows to avoid building up too much in memory
if i % 1000 == 0:
yield buffer.getvalue()
buffer.seek(0)
buffer.truncate(0)
# Set appropriate headers for file download
headers = {
'Content-Disposition': 'attachment; filename="large_data.csv"'
}
return StreamingResponse(
generate_csv(),
media_type="text/csv",
headers=headers
)
In this example:
- We create a generator function that produces CSV data in chunks
- Instead of building the entire 100,000-row CSV in memory, we yield after every 1,000 rows
- The client can start downloading and processing the CSV immediately, rather than waiting for all data to be ready
This approach is much more memory-efficient for both server and client, especially when dealing with very large datasets.
Implementing Server-Sent Events (SSE)
Server-Sent Events provide a standard way to stream updates to clients. They're particularly useful for real-time dashboards, notifications, and other scenarios where you need to push data from server to client.
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
import asyncio
import json
app = FastAPI()
@app.get("/sse-updates")
async def sse_updates(request: Request):
async def event_generator():
# Send an initial message
yield "data: Connection established\n\n"
count = 0
while True:
if await request.is_disconnected():
break
# Simulate getting new data
count += 1
data = json.dumps({"count": count, "timestamp": time.time()})
# Format as SSE
yield f"data: {data}\n\n"
await asyncio.sleep(1)
return StreamingResponse(
event_generator(),
media_type="text/event-stream"
)
Client-side JavaScript to consume SSE:
const eventSource = new EventSource('/sse-updates');
eventSource.onmessage = function(event) {
const data = JSON.parse(event.data);
console.log('Received update:', data);
// Update your UI here
};
eventSource.onerror = function(error) {
console.error('EventSource error:', error);
eventSource.close();
};
This implementation allows the server to push updates to the client continuously. The client-side EventSource API handles reconnection automatically if the connection is lost.
Real-World Example: Progress Updates for Long-Running Tasks
Let's implement a more practical example: a file processing endpoint that provides progress updates while the task is running.
from fastapi import FastAPI, BackgroundTasks
from fastapi.responses import StreamingResponse
import asyncio
import json
import uuid
app = FastAPI()
# In-memory store for task status (in a real app, use Redis or similar)
task_status = {}
async def process_file(task_id: str, total_steps: int):
"""Simulates processing a large file with multiple steps"""
for step in range(1, total_steps + 1):
# Update progress
progress = (step / total_steps) * 100
task_status[task_id].update({
"progress": progress,
"step": step,
"status": "processing" if step < total_steps else "completed"
})
# Simulate processing time
await asyncio.sleep(2)
@app.post("/process-file")
async def start_file_processing(background_tasks: BackgroundTasks):
"""Endpoint to start file processing"""
task_id = str(uuid.uuid4())
task_status[task_id] = {
"progress": 0,
"step": 0,
"status": "starting"
}
# Start processing in background
background_tasks.add_task(process_file, task_id, 10)
return {"task_id": task_id, "message": "Processing started"}
@app.get("/task-progress/{task_id}")
async def task_progress_updates(task_id: str):
"""Stream task progress updates using SSE"""
if task_id not in task_status:
return {"error": "Task not found"}
async def progress_generator():
last_status = None
# Keep checking until task is completed
while True:
current_status = task_status.get(task_id)
# If status changed, send an update
if current_status != last_status:
yield f"data: {json.dumps(current_status)}\n\n"
last_status = current_status.copy() if current_status else None
# Break the loop if task is completed or failed
if current_status and current_status.get("status") in ["completed", "failed"]:
# Clean up after completion (optional)
# del task_status[task_id]
break
await asyncio.sleep(0.5)
return StreamingResponse(
progress_generator(),
media_type="text/event-stream"
)
Client-side code to monitor progress:
function monitorProgress(taskId) {
const progressBar = document.getElementById('progress-bar');
const statusElement = document.getElementById('status-message');
const eventSource = new EventSource(`/task-progress/${taskId}`);
eventSource.onmessage = function(event) {
const data = JSON.parse(event.data);
// Update UI
progressBar.value = data.progress;
statusElement.textContent = `Step ${data.step}: ${data.status} (${data.progress.toFixed(1)}%)`;
// Close connection when completed
if (data.status === 'completed' || data.status === 'failed') {
eventSource.close();
}
};
eventSource.onerror = function() {
statusElement.textContent = 'Connection lost. Refresh to try again.';
eventSource.close();
};
}
// Call this when you start the task
// monitorProgress('task-id-returned-from-server');
This example demonstrates how streaming responses can provide real-time feedback for long-running operations, creating a better user experience.
Handling Large File Downloads with Streaming
When serving large files, streaming is crucial for efficiency. Here's how to implement a file download endpoint using streaming:
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import os
app = FastAPI()
@app.get("/download/{filename}")
async def download_file(filename: str):
file_path = f"./files/{filename}" # Adjust path as needed
if not os.path.isfile(file_path):
return {"error": "File not found"}
async def file_iterator():
with open(file_path, mode="rb") as file:
# Read and yield file in chunks
chunk_size = 1024 * 1024 # 1MB chunks
while True:
chunk = file.read(chunk_size)
if not chunk:
break
yield chunk
# Determine content type based on file extension
# This is a simple example - consider using a library for accurate MIME type detection
extension = filename.split('.')[-1].lower()
content_type_map = {
'pdf': 'application/pdf',
'mp4': 'video/mp4',
'zip': 'application/zip',
# Add other types as needed
}
content_type = content_type_map.get(extension, 'application/octet-stream')
headers = {
'Content-Disposition': f'attachment; filename="{filename}"'
}
return StreamingResponse(
file_iterator(),
media_type=content_type,
headers=headers
)
This implementation:
- Reads the file in manageable chunks to avoid loading the entire file into memory
- Streams these chunks directly to the client
- Sets appropriate headers to trigger a download in the browser
Best Practices for Response Streaming
When implementing streaming responses in FastAPI, keep these best practices in mind:
-
Memory Management: Use appropriate chunk sizes to avoid consuming too much memory on either server or client side.
-
Error Handling: Implement proper error handling within your streaming generators.
pythonasync def protected_generator():
try:
async for item in original_generator():
yield item
except Exception as e:
# Log the error
logger.error(f"Error in stream: {str(e)}")
# Yield an error message if appropriate
yield f"data: {json.dumps({'error': str(e)})}\n\n" -
Connection Monitoring: For long-lived connections, check if the client is still connected.
-
Backpressure Handling: Be mindful of how fast you're generating data versus how fast clients can consume it.
-
Timeouts: Implement appropriate timeouts for long-running operations.
-
Compression: Consider enabling compression for text-based streams:
pythonfrom fastapi.middleware.gzip import GZipMiddleware
app = FastAPI()
app.add_middleware(GZipMiddleware, minimum_size=1000)
Limitations and Considerations
While response streaming offers many benefits, it's important to be aware of its limitations:
- Server Resources: Long-lived connections consume server resources for their duration.
- Load Balancers: Some load balancers or proxies might buffer responses, negating streaming benefits.
- Client Support: Not all clients support streaming responses efficiently.
- Connection Limits: Browsers typically limit the number of concurrent connections to a domain.
Summary
Response streaming in FastAPI provides an efficient way to handle large data transfers, real-time updates, and long-running operations. By sending data incrementally, you can improve both application performance and user experience.
Key takeaways:
- Use
StreamingResponse
for basic streaming needs - Implement Server-Sent Events for real-time updates
- Stream large files in chunks to conserve memory
- Consider connection management and error handling
- Monitor client connection status for long-lived streams
With these techniques, you can build more responsive and efficient FastAPI applications that provide a better experience for your users, especially when dealing with large datasets or real-time information.
Additional Resources
- FastAPI Official Documentation on Custom Response Classes
- MDN Web Docs: Server-Sent Events
- Starlette Documentation on Streaming Responses
Exercises
- Create a streaming endpoint that returns the current server time every second for 30 seconds.
- Implement a file upload endpoint with progress tracking using SSE.
- Build a simple chat application that streams new messages using response streaming.
- Enhance the file download example to include progress percentage in the SSE events.
- Create an endpoint that streams the results of a database query with many rows, using pagination behind the scenes.
If you spot any mistakes on this website, please let me know at [email protected]. I’d greatly appreciate your feedback! :)