FastAPI Request Streaming
Introduction
When developing web applications, you'll often encounter situations where you need to handle large files or data streams. Loading these entirely into memory before processing can cause performance issues or even crash your application. FastAPI provides elegant solutions for handling these scenarios through request streaming.
Request streaming allows you to process incoming data in chunks as it arrives rather than waiting for the complete request to be received. This approach is particularly valuable when:
- Handling large file uploads
- Processing real-time data streams
- Working with memory-constrained environments
- Building applications that need to respond to partial data
In this tutorial, we'll explore how to implement request streaming in FastAPI applications, starting with the basics and progressing to more complex real-world examples.
Understanding Request Streaming Basics
By default, FastAPI reads the entire request body into memory before calling your path operation function. While convenient for most use cases, this approach isn't ideal for large payloads.
With request streaming, FastAPI can pass the incoming request body to your application as it arrives, allowing you to process it incrementally.
Request Body as a Stream
FastAPI allows you to receive the request body as an asynchronous stream using the Request
object:
from fastapi import FastAPI, Request
app = FastAPI()
@app.post("/stream-request/")
async def stream_request(request: Request):
body = await request.body()
return {"request_body_size": len(body)}
However, this still reads the entire body into memory. For true streaming, we need to use the .stream()
method:
from fastapi import FastAPI, Request
app = FastAPI()
@app.post("/stream-request/")
async def stream_request(request: Request):
total_bytes = 0
async for chunk in request.stream():
total_bytes += len(chunk)
return {"request_body_size": total_bytes}
In this example, we're processing the request body in chunks as they arrive, counting the total bytes without storing the entire body in memory.
Setting Up Your Environment
Before we dive deeper into request streaming, ensure you have the required dependencies:
pip install fastapi uvicorn python-multipart
The python-multipart
package is essential for handling streaming form data and file uploads.
File Uploading with Request Streaming
One of the most common use cases for request streaming is handling large file uploads. Let's implement a file upload endpoint that streams the data to a file on disk:
import os
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import JSONResponse
app = FastAPI()
@app.post("/upload-file/")
async def upload_file(request: Request):
# Ensure the uploads directory exists
os.makedirs("uploads", exist_ok=True)
# Generate a unique filename
filename = f"uploads/file_{os.urandom(8).hex()}.data"
total_bytes = 0
try:
with open(filename, "wb") as file:
async for chunk in request.stream():
total_bytes += len(chunk)
file.write(chunk)
return JSONResponse({
"filename": filename,
"size_bytes": total_bytes,
"message": f"File uploaded successfully with {total_bytes} bytes"
})
except Exception as e:
# Clean up in case of errors
if os.path.exists(filename):
os.unlink(filename)
raise HTTPException(status_code=500, detail=str(e))
This endpoint:
- Creates an 'uploads' directory if it doesn't exist
- Generates a unique filename for the uploaded file
- Opens a file for writing on the server
- Streams the incoming request data directly to the file in chunks
- Returns a response with the file details once complete
Processing Streaming JSON Data
Another common scenario is processing streaming JSON data. Here's how you can implement an endpoint to handle streaming JSON records:
from fastapi import FastAPI, Request
import json
app = FastAPI()
@app.post("/process-json-stream/")
async def process_json_stream(request: Request):
results = []
buffer = b""
async for chunk in request.stream():
buffer += chunk
# Try to process complete JSON objects from the buffer
try:
# Process multiple JSON objects if present
while buffer:
# Try to decode a JSON object
data = json.loads(buffer)
# If we got here, we successfully parsed one object
processed_data = {"original": data, "processed": True}
results.append(processed_data)
buffer = b""
except json.JSONDecodeError:
# If we can't decode, it might be incomplete - continue buffering
pass
return {"processed_items": len(results), "results": results}
This is a simplified example. In a real-world application, you might need more sophisticated JSON parsing to handle multiple JSON objects in a stream or partial objects spanning multiple chunks.
Real-World Example: CSV Processing Service
Let's build a more practical example: a service that processes CSV files line by line, performing data transformation without loading the entire file into memory:
from fastapi import FastAPI, Request, HTTPException
import csv
import io
app = FastAPI()
@app.post("/process-csv/")
async def process_csv(request: Request):
# Set up statistics
total_rows = 0
valid_rows = 0
results = []
buffer = ""
# Create a text line reader from the binary stream
async for chunk in request.stream():
text_chunk = chunk.decode("utf-8", errors="replace")
buffer += text_chunk
# Process complete lines
lines = buffer.split("\n")
# Keep the last (potentially incomplete) line in the buffer
buffer = lines.pop()
for line in lines:
if not line.strip(): # Skip empty lines
continue
total_rows += 1
try:
# Parse CSV line (assumes comma-separated)
reader = csv.reader([line])
row = next(reader)
# Example processing: Calculate sum and average of numeric fields
numeric_values = []
for value in row:
try:
numeric_values.append(float(value))
except ValueError:
pass
# Only process rows with at least one numeric value
if numeric_values:
valid_rows += 1
sum_value = sum(numeric_values)
avg_value = sum_value / len(numeric_values) if numeric_values else 0
results.append({
"original_row": row,
"sum": sum_value,
"average": avg_value
})
# Keep only the recent results to avoid memory issues
if len(results) > 10:
results.pop(0)
except Exception as e:
# Log errors but continue processing
print(f"Error processing row: {e}")
# Process any remaining content in the buffer
if buffer.strip():
total_rows += 1
try:
reader = csv.reader([buffer])
row = next(reader)
# Similar processing as above...
# (omitted for brevity)
except Exception:
pass
return {
"total_rows_processed": total_rows,
"valid_rows_processed": valid_rows,
"sample_results": results
}
This example processes a CSV file line by line, performing calculations on numeric values while keeping memory usage low even for large files.
Working with Form Data Streaming
FastAPI can also handle streaming of form data, which is useful for uploading files through HTML forms:
import os
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import HTMLResponse
app = FastAPI()
@app.get("/upload-form/")
async def get_upload_form():
# A simple HTML form for file upload
return HTMLResponse("""
<!DOCTYPE html>
<html>
<head>
<title>File Upload</title>
</head>
<body>
<h1>Upload a large file</h1>
<form action="/handle-form-upload/" method="post" enctype="multipart/form-data">
<input type="file" name="file"/>
<input type="submit" value="Upload"/>
</form>
</body>
</html>
""")
@app.post("/handle-form-upload/")
async def handle_form_upload(request: Request):
# Need to parse the multipart form data manually when streaming
content_type = request.headers.get("Content-Type", "")
if not content_type.startswith("multipart/form-data"):
raise HTTPException(status_code=400, detail="Expected multipart form data")
# This is a simplified example - handling multipart/form-data streaming
# properly requires parsing boundaries and headers, which is complex
# In real applications, consider using libraries like python-multipart
# For demonstration purposes only (not a complete implementation):
total_bytes = 0
os.makedirs("uploads", exist_ok=True)
filename = f"uploads/form_upload_{os.urandom(8).hex()}.data"
with open(filename, "wb") as f:
async for chunk in request.stream():
total_bytes += len(chunk)
f.write(chunk)
return {
"filename": filename,
"size_bytes": total_bytes,
"message": "File received and stored (simplified implementation)"
}
Note: The proper handling of multipart/form-data streaming is complex. In practical applications, you'll typically use FastAPI's built-in File
and UploadFile
classes, which handle multipart parsing for you, rather than implementing manual parsing as shown in this simplified example.
Best Practices for Request Streaming
When working with request streaming in FastAPI, keep these best practices in mind:
-
Use streaming for large data: Only use streaming for requests that might be large; for small payloads, regular request handling is simpler and more efficient.
-
Set appropriate timeouts: Configure proper timeouts to prevent slow streams from keeping connections open indefinitely.
-
Implement error handling: Always have proper error handling for incomplete or malformed streams.
-
Consider backpressure: If processing is slower than receiving, implement backpressure mechanisms to avoid memory issues.
-
Test with large payloads: Always test your streaming endpoints with realistically large data to ensure they perform as expected.
-
Monitor memory usage: Keep an eye on your application's memory usage during streaming operations.
Performance Considerations
Request streaming can significantly improve your application's performance when handling large data, but there are some considerations:
- Async requirements: Request streaming in FastAPI requires async handlers
- Worker configuration: Ensure your ASGI server (like Uvicorn) is configured properly for streaming workloads
- Connection stability: Long-running streaming operations are more susceptible to network issues
Summary
Request streaming in FastAPI provides a powerful way to handle large incoming data without consuming excessive memory. By processing data incrementally as it arrives, you can build more efficient and scalable applications.
In this tutorial, we covered:
- The basics of request streaming in FastAPI
- How to implement file uploads with streaming
- Processing streaming JSON and CSV data
- Best practices for efficient stream handling
With these techniques, you can build FastAPI applications capable of handling large data streams efficiently, ensuring your applications remain performant and reliable even under heavy loads.
Further Learning
To deepen your understanding of request streaming in FastAPI, consider exploring:
- FastAPI's official documentation on request handling
- The ASGI specification that enables streaming in FastAPI
- Design patterns for stream processing architectures
Exercises
- Simple Streaming Counter: Create an endpoint that counts the number of lines in a streamed text file.
- JSON Transformer: Build a streaming endpoint that transforms incoming JSON objects by adding a timestamp to each.
- Advanced CSV Processor: Extend the CSV processing example to validate each row against a schema and return statistics on valid/invalid rows.
- Streaming Echo Server: Create an endpoint that echoes back streamed data in uppercase, character by character.
- File Checksum Calculator: Build an endpoint that calculates SHA-256 checksums of uploaded files using streaming to avoid memory issues.
If you spot any mistakes on this website, please let me know at [email protected]. I’d greatly appreciate your feedback! :)