Flask Server-Sent Events
Real-time web applications are increasingly important in today's digital landscape. There are multiple ways to achieve real-time updates in web applications, and Server-Sent Events (SSE) is one of the simplest yet powerful approaches. In this tutorial, we'll explore how to implement Server-Sent Events in Flask applications.
What are Server-Sent Events?
Server-Sent Events (SSE) is a technology that enables a server to push real-time updates to a client over a single HTTP connection. Unlike WebSockets, which provide full-duplex communication, SSE is unidirectional—data flows only from server to client. This makes SSE ideal for scenarios where you need to send updates from the server without needing to receive messages from the client.
Key characteristics of Server-Sent Events:
- One-way communication (server to client)
- Built on standard HTTP, not requiring special protocols
- Automatic reconnection if the connection is lost
- Event filtering capabilities
- Works with standard web servers
When to Use Server-Sent Events
SSE is particularly useful for:
- Real-time notifications
- Live feeds or social media updates
- Stock tickers or cryptocurrency price updates
- Progress indicators for long-running tasks
- Dashboard updates
Setting Up Flask for SSE
Let's start by setting up a basic Flask application with SSE support:
from flask import Flask, Response, render_template
import time
import json
app = Flask(__name__)
@app.route('/')
def index():
return render_template('index.html')
@app.route('/stream')
def stream():
def event_stream():
count = 0
while True:
count += 1
# Format the data according to SSE specification
yield f"data: {json.dumps({'count': count, 'time': time.time()})}\n\n"
time.sleep(1) # Send an update every second
return Response(event_stream(), mimetype="text/event-stream")
if __name__ == '__main__':
app.run(debug=True)
And here's a simple HTML template (templates/index.html
):
<!DOCTYPE html>
<html>
<head>
<title>Flask SSE Example</title>
</head>
<body>
<h1>Server-Sent Events Demo</h1>
<div id="counter">Waiting for updates...</div>
<script>
const evtSource = new EventSource("/stream");
const counterElement = document.getElementById("counter");
evtSource.onmessage = function(event) {
const data = JSON.parse(event.data);
counterElement.textContent = `Count: ${data.count}, Time: ${new Date(data.time * 1000).toLocaleTimeString()}`;
}
evtSource.onerror = function() {
counterElement.textContent += "\nConnection lost. Reconnecting...";
}
</script>
</body>
</html>
Understanding the Code
Server Side (Flask)
-
We create a regular route (
/
) to serve our HTML page. -
We define a special
/stream
endpoint that:- Returns a custom response with the MIME type set to
text/event-stream
- Contains a generator function (
event_stream
) that yields data in the SSE format - Each message must end with a double newline (
\n\n
) - Data is prefixed with
data:
according to the SSE protocol
- Returns a custom response with the MIME type set to
Client Side (JavaScript)
-
We create an
EventSource
object that connects to our/stream
endpoint. -
We define handlers for:
onmessage
: Called when a new message arrives from the serveronerror
: Called if the connection encounters an error
-
When messages arrive, we parse the JSON data and update the page content.
Enhancing Our SSE Implementation
Let's improve our example to demonstrate more SSE features:
Named Events
We can send different types of events from the server:
@app.route('/advanced-stream')
def advanced_stream():
def event_stream():
count = 0
while True:
count += 1
# Send different event types
if count % 2 == 0:
yield f"event: counter\ndata: {json.dumps({'count': count})}\n\n"
else:
yield f"event: time\ndata: {json.dumps({'time': time.time()})}\n\n"
time.sleep(2)
return Response(event_stream(), mimetype="text/event-stream")
And the client can listen for specific event types:
<script>
const advancedSource = new EventSource("/advanced-stream");
const counterElement = document.getElementById("counter");
const timeElement = document.getElementById("time");
// Listen for specific events
advancedSource.addEventListener("counter", function(event) {
const data = JSON.parse(event.data);
counterElement.textContent = `Counter: ${data.count}`;
});
advancedSource.addEventListener("time", function(event) {
const data = JSON.parse(event.data);
timeElement.textContent = `Time: ${new Date(data.time * 1000).toLocaleTimeString()}`;
});
</script>
Including Event IDs
Event IDs help the client keep track of which events have been received, which is useful for reconnection:
@app.route('/stream-with-id')
def stream_with_id():
def event_stream():
count = 0
while True:
count += 1
yield f"id: {count}\ndata: {json.dumps({'message': f'Update #{count}'})}\n\n"
time.sleep(1)
return Response(event_stream(), mimetype="text/event-stream")
Real-World Example: Task Progress Monitor
Let's implement a more practical example: monitoring the progress of a long-running background task:
from flask import Flask, Response, render_template, request
import time
import json
import threading
app = Flask(__name__)
# Dictionary to store task progress
tasks = {}
@app.route('/')
def index():
return render_template('task_monitor.html')
@app.route('/start-task', methods=['POST'])
def start_task():
task_id = str(int(time.time()))
tasks[task_id] = {"status": "running", "progress": 0}
# Start a background thread to process the task
def process_task(id):
for i in range(1, 11):
time.sleep(1) # Simulate work
tasks[id]["progress"] = i * 10
tasks[id]["status"] = "completed"
thread = threading.Thread(target=process_task, args=(task_id,))
thread.daemon = True
thread.start()
return {"task_id": task_id}
@app.route('/task-progress/<task_id>')
def task_progress(task_id):
def stream():
while task_id in tasks:
task = tasks[task_id]
yield f"data: {json.dumps(task)}\n\n"
if task["status"] == "completed":
# Clean up task data after completion
if task["progress"] >= 100:
time.sleep(3) # Give client time to process completion
del tasks[task_id]
break
time.sleep(0.5)
return Response(stream(), mimetype="text/event-stream")
if __name__ == '__main__':
app.run(debug=True)
And the corresponding HTML template:
<!DOCTYPE html>
<html>
<head>
<title>Task Progress Monitor</title>
<style>
.progress-bar {
height: 20px;
background-color: #e0e0e0;
border-radius: 5px;
margin-bottom: 10px;
overflow: hidden;
}
.progress-fill {
height: 100%;
background-color: #4CAF50;
transition: width 0.3s ease;
}
.task {
margin-bottom: 20px;
padding: 10px;
border: 1px solid #ddd;
border-radius: 5px;
}
</style>
</head>
<body>
<h1>Task Progress Monitor</h1>
<button id="startTask">Start New Task</button>
<div id="taskContainer"></div>
<script>
document.getElementById('startTask').addEventListener('click', async () => {
// Start a new task
const response = await fetch('/start-task', { method: 'POST' });
const data = await response.json();
const taskId = data.task_id;
// Create UI for this task
const taskElement = document.createElement('div');
taskElement.className = 'task';
taskElement.innerHTML = `
`;
document.getElementById('taskContainer').prepend(taskElement);
// Connect to SSE stream for this task
const eventSource = new EventSource(`/task-progress/${taskId}`);
eventSource.onmessage = function(event) {
const taskData = JSON.parse(event.data);
const progressBar = taskElement.querySelector('.progress-fill');
const statusElement = taskElement.querySelector('.status');
// Update UI
progressBar.style.width = `${taskData.progress}%`;
statusElement.textContent = `${taskData.status} - ${taskData.progress}% complete`;
// Close connection when task is finished
if (taskData.status === "completed" && taskData.progress >= 100) {
setTimeout(() => {
eventSource.close();
}, 1000);
}
};
eventSource.onerror = function() {
eventSource.close();
taskElement.querySelector('.status').textContent += ' (Connection lost)';
};
});
</script>
</body>
</html>
This example demonstrates a practical use case where:
- The user starts a long-running task by clicking a button
- The server initiates the task and assigns a unique ID
- The client connects to an SSE stream specific to that task
- The server sends real-time progress updates
- The client displays the progress visually
- Once the task completes, the connection is closed and resources are cleaned up
Considerations and Best Practices
Connection Management
- Be mindful of the number of connections your server can handle
- Implement timeouts to close inactive connections
- Consider using a message queue for high-traffic applications
Error Handling
Always implement proper error handling on both the client and server sides:
// Client-side error handling
eventSource.onerror = function(error) {
console.error("EventSource failed:", error);
// Implement reconnection logic if needed
// eventSource.close();
// setTimeout(() => connectEventSource(), 5000);
};
# Server-side error handling
@app.route('/stream')
def stream():
def event_stream():
try:
while True:
# Your stream logic here
yield f"data: {json.dumps({'message': 'update'})}\n\n"
time.sleep(1)
except:
yield f"data: {json.dumps({'error': 'Stream closed due to error'})}\n\n"
return Response(event_stream(), mimetype="text/event-stream")
Scalability
For production applications, consider using frameworks designed for handling many concurrent connections:
# Example using gevent for better concurrency
from gevent.pywsgi import WSGIServer
if __name__ == '__main__':
http_server = WSGIServer(('', 5000), app)
http_server.serve_forever()
Summary
Server-Sent Events provide an efficient way to implement real-time updates in Flask applications. They're simpler than WebSockets for one-way communication and work well with the Flask ecosystem.
Key takeaways:
- SSE uses a single HTTP connection to push data from server to client
- The format includes
data:
prefixes and double newlines (\n\n
) - Event types can be specified with
event:
prefixes - SSE automatically handles reconnection
- It's ideal for notifications, progress updates, and live feeds
Additional Resources
Exercises
- Modify the task progress example to support multiple simultaneous tasks for the same client.
- Implement a simple chat notification system using SSE for new message alerts.
- Create a dashboard that shows real-time system metrics (CPU usage, memory, etc.) using SSE.
- Build a stock ticker application that streams price updates for selected stocks.
- Enhance the SSE implementation with reconnection handling and error recovery.
By mastering Server-Sent Events in Flask, you've added a powerful tool to your web development arsenal that enables real-time updates without the complexity of WebSockets.
If you spot any mistakes on this website, please let me know at [email protected]. I’d greatly appreciate your feedback! :)