开发者中心 » 编程语言 » Javascript » 代码指南 » 使用服务器发送事件 (SSE) 实现 Neo4j 实时更新

使用服务器发送事件 (SSE) 实现 Neo4j 实时更新

本文展示的所有代码均可在 neo4j-sse-simple-app Github 仓库中找到。

此应用程序演示了如何使用服务器发送事件 (SSE) 将 Neo4j 查询结果实时流式传输到浏览器。非常适合实时仪表板、活动摘要或任何需要实时图数据的应用程序。

什么是服务器发送事件 (SSE)?

SSE 是一项网络标准,允许服务器通过简单的 HTTP 连接将数据推送到浏览器。与 WebSockets 不同,SSE 具有以下特点:

  • 单向通信(从服务器到客户端)——非常适合广播
  • 连接断开时自动重连
  • 内置于浏览器中(无需额外库)
  • 通过标准 HTTP 运行

设置与依赖项

依赖项极简——仅需 Express 和 Neo4j 驱动程序。

包初始化 – package.json

{
    "name": "neo4j-sse-simple",
    "version": "1.0.0",
    "type": "module",
    "scripts": {
        "start": "node src/index.js"
    },
    "dependencies": {
        "neo4j-driver": "^5.14.0",
        "express": "^4.18.2"
    }
}Code language: JSON / JSON with Comments (json)

Neo4j 连接

连接到只读演示数据库的简单示例。

数据库连接 – src/db.js

class Database {

    constructor() {
        this.driver = neo4j.driver( 'neo4j+s://demo.neo4jlabs.com', neo4j.auth.basic('goodreads', 'goodreads') );
    }

    async query(cypher, params = {}) {
        const session = this.driver.session();
        try {
            const result = await session.run(cypher, params);
            return result.records.map(record => record.toObject());
        } finally {
            await session.close();
        }
    }

    async close() {
        await this.driver.close();
    }
}

export default new Database();Code language: JavaScript (javascript)

带有 SSE 端点的服务器

每隔几秒流式传输书籍推荐的主服务器。

带有 SSE 的 Express 服务器 – src/index.js

import express from 'express';
import db from './db.js';

const app = express();
// SSE endpoint - streams book recommendations
app.get('/stream', async (req, res) => {
    console.log('📡 Client connected to SSE stream');
    // Set SSE headers
    res.writeHead(200, {
        'Content-Type': 'text/event-stream',
        'Cache-Control': 'no-cache',
        'Connection': 'keep-alive'
    });

    // Send initial message
    res.write('event: connected\n');
    res.write('data: {"message": "Connected to book recommendation stream"}\n\n');

    // Function to send a random book recommendation
    async function sendRecommendation() {
        try {
            // Query for a random highly-rated book
            const books = await db.query(`
            MATCH (b:Book)<-[:AUTHORED]-(a:Author)
            WHERE b.average_rating > 4.2 
              AND b.ratings_count > 50000
            WITH b, collect(a.name) AS authors, rand() AS r
            ORDER BY r
            LIMIT 1
            RETURN b.title AS title, 
                   b.average_rating AS rating,
                   b.ratings_count AS ratingsCount,
                   authors
        `);

            if (books.length > 0) {
                const book = books[0];

                // Send as SSE event
                res.write('event: recommendation\n');
                res.write(`data: ${JSON.stringify({
                    title: book.title,
                    authors: book.authors.slice(0, 3),
                    rating: book.rating,
                    ratingsCount: book.ratingsCount,
                    timestamp: new Date().toISOString()
                })}\n\n`);
            }
        } catch (error) {
            console.error('Query error:', error.message);
        }
    }

    // Send a recommendation every 3 seconds
    const interval = setInterval(sendRecommendation, 3000);

    // Send first recommendation immediately
    sendRecommendation();

    // Clean up on client disconnect
    req.on('close', () => {
        clearInterval(interval);
        console.log('📴 Client disconnected from SSE stream');
    });
});

// Simple HTML client for testing
app.get('/', (req, res) => {
    res.send(` <!DOCTYPE html> <html> <head> <title>Neo4j SSE Book Stream</title> <style> body { font-family: system-ui, -apple-system, sans-serif; max-width: 800px; margin: 50px auto; padding: 20px; background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); min-height: 100vh; } h1 { color: white; text-align: center; } .container { background: white; border-radius: 12px; padding: 30px; box-shadow: 0 20px 40px rgba(0,0,0,0.1); } button { background: #667eea; color: white; border: none; padding: 12px 24px; border-radius: 6px; font-size: 16px; cursor: pointer; margin-bottom: 20px; } button:hover { background: #5a67d8; } button:disabled { background: #cbd5e0; cursor: not-allowed; } .book { padding: 15px; margin: 10px 0; border-left: 4px solid #667eea; background: #f7fafc; border-radius: 4px; animation: slideIn 0.5s ease; } @keyframes slideIn { from { opacity: 0; transform: translateX(-20px); } } .book-title { font-weight: bold; color: #2d3748; font-size: 18px; } .book-meta { color: #718096; font-size: 14px; margin-top: 5px; } #status { text-align: center; padding: 10px; margin-bottom: 20px; border-radius: 6px; } .connected { background: #c6f6d5; color: #22543d; } .disconnected { background: #fed7d7; color: #742a2a; } #books { max-height: 400px; overflow-y: auto; } </style> </head> <body> <h1>📚 Real-time Book Recommendations</h1> <div class="container"> <button id="toggleBtn">Start Streaming</button> <div id="status" class="disconnected">Disconnected</div> <div id="books"></div> </div>
 <script>
            let eventSource = null;
            const toggleBtn = document.getElementById('toggleBtn');
            const statusDiv = document.getElementById('status');
            const booksDiv = document.getElementById('books');
            let bookCount = 0;

            function connect() {
                // Create SSE connection
                eventSource = new EventSource('/stream');
                
                eventSource.addEventListener('connected', (e) => {
                    const data = JSON.parse(e.data);
                    console.log('Connected:', data.message);
                    statusDiv.textContent = 'Connected - Receiving recommendations...';
                    statusDiv.className = 'connected';
                    toggleBtn.textContent = 'Stop Streaming';
                });

                eventSource.addEventListener('recommendation', (e) => {
                    const book = JSON.parse(e.data);
                    displayBook(book);
                });

                eventSource.onerror = (e) => {
                    console.error('SSE error:', e);
                    disconnect();
                };
            }

            function disconnect() {
                if (eventSource) {
                    eventSource.close();
                    eventSource = null;
                }
                statusDiv.textContent = 'Disconnected';
                statusDiv.className = 'disconnected';
                toggleBtn.textContent = 'Start Streaming';
            }

            function displayBook(book) {
                const bookDiv = document.createElement('div');
                bookDiv.className = 'book';
                bookDiv.innerHTML = \`
                    <div class="book-title">\${book.title}</div>
                    <div class="book-meta">
                        ⭐ \${book.rating.toFixed(1)} · 
                        👥 \${book.ratingsCount.toLocaleString()} reviews · 
                        ✍️ \${book.authors.join(', ')}
                    </div>
                \`;
                
                booksDiv.insertBefore(bookDiv, booksDiv.firstChild);
                bookCount++;
                
                // Keep only last 10 books
                while (booksDiv.children.length > 10) {
                    booksDiv.removeChild(booksDiv.lastChild);
                }
            }

            toggleBtn.addEventListener('click', () => {
                if (eventSource) {
                    disconnect();
                } else {
                    connect();
                }
            });
        </script>
    </body>
    </html>
`);
});

const PORT = 3000;
app.listen(PORT, () => {
    console.log('\n🚀 SSE Server Running!');
    console.log(`📺 Open https://:${PORT} in your browser`);
    console.log(`📡 SSE endpoint: https://:${PORT}/stream`);
    console.log(`\n💡 The server will stream a new book recommendation every 3 seconds`);
});Code language: JavaScript (javascript)

工作原理

  1. 客户端连接到 /stream 端点
  2. 服务器发送一条随机的高评分书籍信息,每 3 秒一次
  3. 浏览器接收事件并实时显示
  4. 自动重连(如果连接断开,EventSource 会自动处理)

运行应用程序

# Install Dependencies
npm install

# Run the Server
npm startCode language: JavaScript (javascript)

在浏览器中打开 https://:3000 并点击“开始流式传输 (Start Streaming)”,即可查看实时书籍推荐!

在 JavaScript 中使用 SSE

消费该流非常简单,如下所示:

// Connect to SSE endpoint 
const eventSource = new EventSource('/stream');

// Listen for recommendations 
eventSource.addEventListener('recommendation', (event) => { const book = JSON.parse(event.data); console.log('New recommendation:', book.title); });

// Connection opens 
eventSource.onopen = () => console.log('Connected!');

// Handle errors (auto-reconnects) 
eventSource.onerror = () => console.log('Connection lost, reconnecting...');

// Close when done
eventSource.close();Code language: JavaScript (javascript)

使用多个客户端进行测试

打开多个浏览器标签页,查看 SSE 如何同时向多个客户端进行广播

# Terminal 1
npm start

# Open Multiple browser tabs
https://:3000 # Tab 1 
https://:3000 # Tab 2 
https://:3000 # Tab 3

# All tabs receive the same stream of recommendations!Code language: Shell Session (shell)

应用场景

此模式非常适合:

  • 显示图指标的实时仪表板
  • 活动摘要(新书、评论、评分)
  • 实时推荐
  • 监控图的变化
  • 长时间运行的图算法的进度更新

SSE 的魅力在于其简洁性——没有 WebSocket 的复杂性,没有轮询的开销,只有一个简单的 HTTP 事件流!

资源

分享文章