使用服务器发送事件 (SSE) 实现 Neo4j 实时更新
本文展示的所有代码均可在 neo4j-sse-simple-app Github 仓库中找到。
此应用程序演示了如何使用服务器发送事件 (SSE) 将 Neo4j 查询结果实时流式传输到浏览器。非常适合实时仪表板、活动摘要或任何需要实时图数据的应用程序。

什么是服务器发送事件 (SSE)?
SSE 是一项网络标准,允许服务器通过简单的 HTTP 连接将数据推送到浏览器。与 WebSockets 不同,SSE 具有以下特点:
- 单向通信(从服务器到客户端)——非常适合广播
- 连接断开时自动重连
- 内置于浏览器中(无需额外库)
- 通过标准 HTTP 运行
设置与依赖项
依赖项极简——仅需 Express 和 Neo4j 驱动程序。
包初始化 – package.json
{
"name": "neo4j-sse-simple",
"version": "1.0.0",
"type": "module",
"scripts": {
"start": "node src/index.js"
},
"dependencies": {
"neo4j-driver": "^5.14.0",
"express": "^4.18.2"
}
}Code language: JSON / JSON with Comments (json)
Neo4j 连接
连接到只读演示数据库的简单示例。
数据库连接 – src/db.js
class Database {
constructor() {
this.driver = neo4j.driver( 'neo4j+s://demo.neo4jlabs.com', neo4j.auth.basic('goodreads', 'goodreads') );
}
async query(cypher, params = {}) {
const session = this.driver.session();
try {
const result = await session.run(cypher, params);
return result.records.map(record => record.toObject());
} finally {
await session.close();
}
}
async close() {
await this.driver.close();
}
}
export default new Database();Code language: JavaScript (javascript)
带有 SSE 端点的服务器
每隔几秒流式传输书籍推荐的主服务器。
带有 SSE 的 Express 服务器 – src/index.js
import express from 'express';
import db from './db.js';
const app = express();
// SSE endpoint - streams book recommendations
app.get('/stream', async (req, res) => {
console.log('📡 Client connected to SSE stream');
// Set SSE headers
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive'
});
// Send initial message
res.write('event: connected\n');
res.write('data: {"message": "Connected to book recommendation stream"}\n\n');
// Function to send a random book recommendation
async function sendRecommendation() {
try {
// Query for a random highly-rated book
const books = await db.query(`
MATCH (b:Book)<-[:AUTHORED]-(a:Author)
WHERE b.average_rating > 4.2
AND b.ratings_count > 50000
WITH b, collect(a.name) AS authors, rand() AS r
ORDER BY r
LIMIT 1
RETURN b.title AS title,
b.average_rating AS rating,
b.ratings_count AS ratingsCount,
authors
`);
if (books.length > 0) {
const book = books[0];
// Send as SSE event
res.write('event: recommendation\n');
res.write(`data: ${JSON.stringify({
title: book.title,
authors: book.authors.slice(0, 3),
rating: book.rating,
ratingsCount: book.ratingsCount,
timestamp: new Date().toISOString()
})}\n\n`);
}
} catch (error) {
console.error('Query error:', error.message);
}
}
// Send a recommendation every 3 seconds
const interval = setInterval(sendRecommendation, 3000);
// Send first recommendation immediately
sendRecommendation();
// Clean up on client disconnect
req.on('close', () => {
clearInterval(interval);
console.log('📴 Client disconnected from SSE stream');
});
});
// Simple HTML client for testing
app.get('/', (req, res) => {
res.send(` <!DOCTYPE html> <html> <head> <title>Neo4j SSE Book Stream</title> <style> body { font-family: system-ui, -apple-system, sans-serif; max-width: 800px; margin: 50px auto; padding: 20px; background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); min-height: 100vh; } h1 { color: white; text-align: center; } .container { background: white; border-radius: 12px; padding: 30px; box-shadow: 0 20px 40px rgba(0,0,0,0.1); } button { background: #667eea; color: white; border: none; padding: 12px 24px; border-radius: 6px; font-size: 16px; cursor: pointer; margin-bottom: 20px; } button:hover { background: #5a67d8; } button:disabled { background: #cbd5e0; cursor: not-allowed; } .book { padding: 15px; margin: 10px 0; border-left: 4px solid #667eea; background: #f7fafc; border-radius: 4px; animation: slideIn 0.5s ease; } @keyframes slideIn { from { opacity: 0; transform: translateX(-20px); } } .book-title { font-weight: bold; color: #2d3748; font-size: 18px; } .book-meta { color: #718096; font-size: 14px; margin-top: 5px; } #status { text-align: center; padding: 10px; margin-bottom: 20px; border-radius: 6px; } .connected { background: #c6f6d5; color: #22543d; } .disconnected { background: #fed7d7; color: #742a2a; } #books { max-height: 400px; overflow-y: auto; } </style> </head> <body> <h1>📚 Real-time Book Recommendations</h1> <div class="container"> <button id="toggleBtn">Start Streaming</button> <div id="status" class="disconnected">Disconnected</div> <div id="books"></div> </div>
<script>
let eventSource = null;
const toggleBtn = document.getElementById('toggleBtn');
const statusDiv = document.getElementById('status');
const booksDiv = document.getElementById('books');
let bookCount = 0;
function connect() {
// Create SSE connection
eventSource = new EventSource('/stream');
eventSource.addEventListener('connected', (e) => {
const data = JSON.parse(e.data);
console.log('Connected:', data.message);
statusDiv.textContent = 'Connected - Receiving recommendations...';
statusDiv.className = 'connected';
toggleBtn.textContent = 'Stop Streaming';
});
eventSource.addEventListener('recommendation', (e) => {
const book = JSON.parse(e.data);
displayBook(book);
});
eventSource.onerror = (e) => {
console.error('SSE error:', e);
disconnect();
};
}
function disconnect() {
if (eventSource) {
eventSource.close();
eventSource = null;
}
statusDiv.textContent = 'Disconnected';
statusDiv.className = 'disconnected';
toggleBtn.textContent = 'Start Streaming';
}
function displayBook(book) {
const bookDiv = document.createElement('div');
bookDiv.className = 'book';
bookDiv.innerHTML = \`
<div class="book-title">\${book.title}</div>
<div class="book-meta">
⭐ \${book.rating.toFixed(1)} ·
👥 \${book.ratingsCount.toLocaleString()} reviews ·
✍️ \${book.authors.join(', ')}
</div>
\`;
booksDiv.insertBefore(bookDiv, booksDiv.firstChild);
bookCount++;
// Keep only last 10 books
while (booksDiv.children.length > 10) {
booksDiv.removeChild(booksDiv.lastChild);
}
}
toggleBtn.addEventListener('click', () => {
if (eventSource) {
disconnect();
} else {
connect();
}
});
</script>
</body>
</html>
`);
});
const PORT = 3000;
app.listen(PORT, () => {
console.log('\n🚀 SSE Server Running!');
console.log(`📺 Open https://:${PORT} in your browser`);
console.log(`📡 SSE endpoint: https://:${PORT}/stream`);
console.log(`\n💡 The server will stream a new book recommendation every 3 seconds`);
});Code language: JavaScript (javascript)
工作原理
- 客户端连接到 /stream 端点
- 服务器发送一条随机的高评分书籍信息,每 3 秒一次
- 浏览器接收事件并实时显示
- 自动重连(如果连接断开,EventSource 会自动处理)
运行应用程序
# Install Dependencies
npm install
# Run the Server
npm startCode language: JavaScript (javascript)
在浏览器中打开 https://:3000 并点击“开始流式传输 (Start Streaming)”,即可查看实时书籍推荐!
在 JavaScript 中使用 SSE
消费该流非常简单,如下所示:
// Connect to SSE endpoint
const eventSource = new EventSource('/stream');
// Listen for recommendations
eventSource.addEventListener('recommendation', (event) => { const book = JSON.parse(event.data); console.log('New recommendation:', book.title); });
// Connection opens
eventSource.onopen = () => console.log('Connected!');
// Handle errors (auto-reconnects)
eventSource.onerror = () => console.log('Connection lost, reconnecting...');
// Close when done
eventSource.close();Code language: JavaScript (javascript)
使用多个客户端进行测试
打开多个浏览器标签页,查看 SSE 如何同时向多个客户端进行广播
# Terminal 1
npm start
# Open Multiple browser tabs
https://:3000 # Tab 1
https://:3000 # Tab 2
https://:3000 # Tab 3
# All tabs receive the same stream of recommendations!Code language: Shell Session (shell)
应用场景
此模式非常适合:
- 显示图指标的实时仪表板
- 活动摘要(新书、评论、评分)
- 实时推荐
- 监控图的变化
- 长时间运行的图算法的进度更新
SSE 的魅力在于其简洁性——没有 WebSocket 的复杂性,没有轮询的开销,只有一个简单的 HTTP 事件流!
资源
- 文档:MDN 服务器发送事件
- Neo4j JavaScript:Neo4j JavaScript 开发者指南


