Skip to main content

Flask Server-Sent Events

Real-time web applications are increasingly important in today's digital landscape. There are multiple ways to achieve real-time updates in web applications, and Server-Sent Events (SSE) is one of the simplest yet powerful approaches. In this tutorial, we'll explore how to implement Server-Sent Events in Flask applications.

What are Server-Sent Events?

Server-Sent Events (SSE) is a technology that enables a server to push real-time updates to a client over a single HTTP connection. Unlike WebSockets, which provide full-duplex communication, SSE is unidirectional—data flows only from server to client. This makes SSE ideal for scenarios where you need to send updates from the server without needing to receive messages from the client.

Key characteristics of Server-Sent Events:

  • One-way communication (server to client)
  • Built on standard HTTP, not requiring special protocols
  • Automatic reconnection if the connection is lost
  • Event filtering capabilities
  • Works with standard web servers

When to Use Server-Sent Events

SSE is particularly useful for:

  • Real-time notifications
  • Live feeds or social media updates
  • Stock tickers or cryptocurrency price updates
  • Progress indicators for long-running tasks
  • Dashboard updates

Setting Up Flask for SSE

Let's start by setting up a basic Flask application with SSE support:

python
from flask import Flask, Response, render_template
import time
import json

app = Flask(__name__)

@app.route('/')
def index():
return render_template('index.html')

@app.route('/stream')
def stream():
def event_stream():
count = 0
while True:
count += 1
# Format the data according to SSE specification
yield f"data: {json.dumps({'count': count, 'time': time.time()})}\n\n"
time.sleep(1) # Send an update every second

return Response(event_stream(), mimetype="text/event-stream")

if __name__ == '__main__':
app.run(debug=True)

And here's a simple HTML template (templates/index.html):

html
<!DOCTYPE html>
<html>
<head>
<title>Flask SSE Example</title>
</head>
<body>
<h1>Server-Sent Events Demo</h1>
<div id="counter">Waiting for updates...</div>

<script>
const evtSource = new EventSource("/stream");
const counterElement = document.getElementById("counter");

evtSource.onmessage = function(event) {
const data = JSON.parse(event.data);
counterElement.textContent = `Count: ${data.count}, Time: ${new Date(data.time * 1000).toLocaleTimeString()}`;
}

evtSource.onerror = function() {
counterElement.textContent += "\nConnection lost. Reconnecting...";
}
</script>
</body>
</html>

Understanding the Code

Server Side (Flask)

  1. We create a regular route (/) to serve our HTML page.

  2. We define a special /stream endpoint that:

    • Returns a custom response with the MIME type set to text/event-stream
    • Contains a generator function (event_stream) that yields data in the SSE format
    • Each message must end with a double newline (\n\n)
    • Data is prefixed with data: according to the SSE protocol

Client Side (JavaScript)

  1. We create an EventSource object that connects to our /stream endpoint.

  2. We define handlers for:

    • onmessage: Called when a new message arrives from the server
    • onerror: Called if the connection encounters an error
  3. When messages arrive, we parse the JSON data and update the page content.

Enhancing Our SSE Implementation

Let's improve our example to demonstrate more SSE features:

Named Events

We can send different types of events from the server:

python
@app.route('/advanced-stream')
def advanced_stream():
def event_stream():
count = 0
while True:
count += 1
# Send different event types
if count % 2 == 0:
yield f"event: counter\ndata: {json.dumps({'count': count})}\n\n"
else:
yield f"event: time\ndata: {json.dumps({'time': time.time()})}\n\n"

time.sleep(2)

return Response(event_stream(), mimetype="text/event-stream")

And the client can listen for specific event types:

html
<script>
const advancedSource = new EventSource("/advanced-stream");
const counterElement = document.getElementById("counter");
const timeElement = document.getElementById("time");

// Listen for specific events
advancedSource.addEventListener("counter", function(event) {
const data = JSON.parse(event.data);
counterElement.textContent = `Counter: ${data.count}`;
});

advancedSource.addEventListener("time", function(event) {
const data = JSON.parse(event.data);
timeElement.textContent = `Time: ${new Date(data.time * 1000).toLocaleTimeString()}`;
});
</script>

Including Event IDs

Event IDs help the client keep track of which events have been received, which is useful for reconnection:

python
@app.route('/stream-with-id')
def stream_with_id():
def event_stream():
count = 0
while True:
count += 1
yield f"id: {count}\ndata: {json.dumps({'message': f'Update #{count}'})}\n\n"
time.sleep(1)

return Response(event_stream(), mimetype="text/event-stream")

Real-World Example: Task Progress Monitor

Let's implement a more practical example: monitoring the progress of a long-running background task:

python
from flask import Flask, Response, render_template, request
import time
import json
import threading

app = Flask(__name__)

# Dictionary to store task progress
tasks = {}

@app.route('/')
def index():
return render_template('task_monitor.html')

@app.route('/start-task', methods=['POST'])
def start_task():
task_id = str(int(time.time()))
tasks[task_id] = {"status": "running", "progress": 0}

# Start a background thread to process the task
def process_task(id):
for i in range(1, 11):
time.sleep(1) # Simulate work
tasks[id]["progress"] = i * 10
tasks[id]["status"] = "completed"

thread = threading.Thread(target=process_task, args=(task_id,))
thread.daemon = True
thread.start()

return {"task_id": task_id}

@app.route('/task-progress/<task_id>')
def task_progress(task_id):
def stream():
while task_id in tasks:
task = tasks[task_id]
yield f"data: {json.dumps(task)}\n\n"

if task["status"] == "completed":
# Clean up task data after completion
if task["progress"] >= 100:
time.sleep(3) # Give client time to process completion
del tasks[task_id]
break

time.sleep(0.5)

return Response(stream(), mimetype="text/event-stream")

if __name__ == '__main__':
app.run(debug=True)

And the corresponding HTML template:

html
<!DOCTYPE html>
<html>
<head>
<title>Task Progress Monitor</title>
<style>
.progress-bar {
height: 20px;
background-color: #e0e0e0;
border-radius: 5px;
margin-bottom: 10px;
overflow: hidden;
}
.progress-fill {
height: 100%;
background-color: #4CAF50;
transition: width 0.3s ease;
}
.task {
margin-bottom: 20px;
padding: 10px;
border: 1px solid #ddd;
border-radius: 5px;
}
</style>
</head>
<body>
<h1>Task Progress Monitor</h1>

<button id="startTask">Start New Task</button>

<div id="taskContainer"></div>

<script>
document.getElementById('startTask').addEventListener('click', async () => {
// Start a new task
const response = await fetch('/start-task', { method: 'POST' });
const data = await response.json();
const taskId = data.task_id;

// Create UI for this task
const taskElement = document.createElement('div');
taskElement.className = 'task';
taskElement.innerHTML = `
<h3>Task ${taskId}</h3>
<div class="progress-bar">
<div class="progress-fill" style="width: 0%"></div>
</div>
<div class="status">Starting...</div>
`;
document.getElementById('taskContainer').prepend(taskElement);

// Connect to SSE stream for this task
const eventSource = new EventSource(`/task-progress/${taskId}`);

eventSource.onmessage = function(event) {
const taskData = JSON.parse(event.data);
const progressBar = taskElement.querySelector('.progress-fill');
const statusElement = taskElement.querySelector('.status');

// Update UI
progressBar.style.width = `${taskData.progress}%`;
statusElement.textContent = `${taskData.status} - ${taskData.progress}% complete`;

// Close connection when task is finished
if (taskData.status === "completed" && taskData.progress >= 100) {
setTimeout(() => {
eventSource.close();
}, 1000);
}
};

eventSource.onerror = function() {
eventSource.close();
taskElement.querySelector('.status').textContent += ' (Connection lost)';
};
});
</script>
</body>
</html>

This example demonstrates a practical use case where:

  1. The user starts a long-running task by clicking a button
  2. The server initiates the task and assigns a unique ID
  3. The client connects to an SSE stream specific to that task
  4. The server sends real-time progress updates
  5. The client displays the progress visually
  6. Once the task completes, the connection is closed and resources are cleaned up

Considerations and Best Practices

Connection Management

  • Be mindful of the number of connections your server can handle
  • Implement timeouts to close inactive connections
  • Consider using a message queue for high-traffic applications

Error Handling

Always implement proper error handling on both the client and server sides:

javascript
// Client-side error handling
eventSource.onerror = function(error) {
console.error("EventSource failed:", error);
// Implement reconnection logic if needed
// eventSource.close();
// setTimeout(() => connectEventSource(), 5000);
};
python
# Server-side error handling
@app.route('/stream')
def stream():
def event_stream():
try:
while True:
# Your stream logic here
yield f"data: {json.dumps({'message': 'update'})}\n\n"
time.sleep(1)
except:
yield f"data: {json.dumps({'error': 'Stream closed due to error'})}\n\n"

return Response(event_stream(), mimetype="text/event-stream")

Scalability

For production applications, consider using frameworks designed for handling many concurrent connections:

python
# Example using gevent for better concurrency
from gevent.pywsgi import WSGIServer

if __name__ == '__main__':
http_server = WSGIServer(('', 5000), app)
http_server.serve_forever()

Summary

Server-Sent Events provide an efficient way to implement real-time updates in Flask applications. They're simpler than WebSockets for one-way communication and work well with the Flask ecosystem.

Key takeaways:

  • SSE uses a single HTTP connection to push data from server to client
  • The format includes data: prefixes and double newlines (\n\n)
  • Event types can be specified with event: prefixes
  • SSE automatically handles reconnection
  • It's ideal for notifications, progress updates, and live feeds

Additional Resources

Exercises

  1. Modify the task progress example to support multiple simultaneous tasks for the same client.
  2. Implement a simple chat notification system using SSE for new message alerts.
  3. Create a dashboard that shows real-time system metrics (CPU usage, memory, etc.) using SSE.
  4. Build a stock ticker application that streams price updates for selected stocks.
  5. Enhance the SSE implementation with reconnection handling and error recovery.

By mastering Server-Sent Events in Flask, you've added a powerful tool to your web development arsenal that enables real-time updates without the complexity of WebSockets.



If you spot any mistakes on this website, please let me know at [email protected]. I’d greatly appreciate your feedback! :)