Databricks Agent Bricks — 自定义 MCP 服务器

简介

本指南演示了如何使用 Databricks Apps 创建一个 自定义 Neo4j MCP 服务器

这种设置允许你定义 MCP 工具,直接从 Databricks 应用程序对远程 Neo4j 实例执行查询,而无需使用专用的 Neo4j MCP 服务器。

通过遵循本指南,你可以公开基于 Neo4j 的 MCP 工具,并使用 Databricks Agent 功能与它们进行交互,从而实现与 LLM(大语言模型)代理或其他工作流的集成。

该示例展示了一个简单的 MCP 服务器实现,它返回给定公司名称的竞争对手。


初步说明

当你需要创建既能利用 Neo4j 查询,又能结合 Rest API 和/或 Databricks SQL Warehouse 查询的 MCP 服务器时,这种集成模式非常适用。如果你需要具有完整功能(聊天记忆等)的 Neo4j MCP 服务器,建议参阅 OFFICIAL_MCP_SERVER 示例。


架构概述

→ Databricks Agent / Playground(工作区/游乐场)

→ 基于自定义 MCP 服务器工具定义的 Databricks App

→ Neo4j 驱动程序连接

→ Neo4j 数据库(例如:demo.neo4jlabs.com / 公司数据集)

关键点

  • 自定义 Python MCP 服务器封装了查询逻辑并处理与 Neo4j 的连接。

  • Databricks App 与服务器保持同步。

  • LLM 或代理与 MCP 工具进行交互。

  • Neo4j 连接使用 SSL 加密。

优势

  • 低代码与低基础设施要求(Databricks App)。

  • 快速原型设计(本地测试)。

  • 允许进行复杂的 MCP 定义。

  • 自动权限继承。

  • 凭据隐藏在 Databricks 机密(Secrets)之后。

  • 模式级公开(多个函数 → 多个工具)。

  • 在 Playground 中可直接使用。

限制

  • 纯 Python 实现。

  • Neo4j MCP 服务器功能有限。

先决条件

  • 需要具备计算能力的 Databricks 订阅。

  • 电脑上已安装 Databricks CLI。

实施

第 1 步 - 设置环境

首先要做的是为 Neo4j 凭据定义 Databricks 机密。可以定义一个 env 文件和一个脚本来自动化该过程。

env
NEO4J_URI=bolt+ssc://<your-neo4j>:7687
NEO4J_USERNAME=
NEO4J_PASSWORD=

setup_secrets.sh

#!/bin/bash

set -e

SCOPE="neo4j-agent"

auth=$(databricks current-user me)
if [[ $auth != *'"active":true'* ]]; then
  echo "❌ Databricks CLI unauthenticated."
  echo ""
  echo "You must login first:"
  echo "databricks auth login --host https://<your-databricks-workspace>"
  echo ""
  exit 1
fi

echo "✅ Databricks CLI authenticated"

# ---- load .env ----
if [ ! -f .env ]; then
  echo "❌ File .env not found. Please create one with the necessary environment variables."
  exit 1
fi

set -o allexport
source .env
set +o allexport

# ---- create scope ----
echo "Creating scope (if not exists)..."
databricks secrets create-scope $SCOPE >/dev/null 2>&1 || echo "Scope already exists, skipping creation."

# ---- upload secrets ----
echo "Uploading secrets..."

databricks secrets put-secret $SCOPE neo4j-uri \
  --string-value "$NEO4J_URI"

databricks secrets put-secret $SCOPE username \
  --string-value "$NEO4J_USERNAME"

databricks secrets put-secret $SCOPE password \
  --string-value "$NEO4J_PASSWORD"

echo "✅ Secrets uploaded successfully"

在终端运行脚本后,机密将被存储在 Databricks 环境中。

第 2 步 - 实现 MCP 服务器

在本指南中,MCP 服务器保持得尽可能简单,但你可以轻松对其进行扩展。

项目结构

custom_mcp_server
└───app
   │   app.py
   │   app.yaml
   │   requirements.txt

首先,我们定义一个 YAML 文件,指示 Databricks App 将 Databricks 机密绑定到环境变量。

app.env

env:
  - name: NEO4J_URI
    valueFrom: neo4j-uri

  - name: NEO4J_USER
    valueFrom: username

  - name: NEO4J_PASS
    valueFrom: password

其次,我们定义 Python 需求文件。

requirements.txt

uvicorn==0.41.0
neo4j==6.1.0
mcp==1.26.0
fastapi==0.135.1

最后,我们实现 Python MCP 服务器。

app.py

import os
from pathlib import Path
from mcp.server.fastmcp import FastMCP
from fastapi import FastAPI
from fastapi.responses import FileResponse
from neo4j import GraphDatabase, RoutingControl, bearer_auth
import uvicorn

# Get the Env Variables - Secrets
try:
    URI = os.getenv("NEO4J_URI")
    NEO4J_USER = os.getenv("NEO4J_USER")
    NEO4J_PASS = os.getenv("NEO4J_PASS")
except Exception as e:
    print(f"Warning: Secrets not found ({e}). Check that the application has been configured to access the necessary secrets, that the resource keys are correctly set and that the app.yaml is properly configured to map the resource keys into environment variables.")
    # Fallback for local tests
    URI = os.getenv("NEO4J_URI", "bolt://:7687")
    NEO4J_USER = os.getenv("NEO4J_USER", "neo4j")
    NEO4J_PASS = os.getenv("NEO4J_PASS", "password")

mcp = FastMCP("Custom MCP Server on Databricks App using Neo4j against companies database")
AUTH = (NEO4J_USER, NEO4J_PASS)

# mcp tool using Neo4j driver to find competitors for a given company
@mcp.tool()
def find_competitors(company_name: str, limit: int) -> list[dict]:
    """Find competitors for a given company"""

    cypher_query = f"""
        MATCH (c:Organization {{name: $company_name}})-[:HAS_COMPETITOR]->(competitor:Organization)
        RETURN competitor.name as name, competitor.revenue as revenue
        LIMIT $limit
        """

    with GraphDatabase.driver(URI, auth=AUTH) as driver:
        records, _, _ = driver.execute_query(
            cypher_query,
            parameters_={"company_name": company_name, "limit": limit},
            database_="companies",
            routing_=RoutingControl.READ,
        )
        return records

# define more tools as needed
# @mcp.tool()
# def get_employees_count(...

mcp_app = mcp.streamable_http_app()

app = FastAPI(
    lifespan=lambda _: mcp.session_manager.run(),
)

# Add a landing page if needed
@app.get("/", include_in_schema=False)
async def serve_index():
    return FileResponse(Path(__file__).parent / "index.html")

app.mount("/", mcp_app)

if __name__ == "__main__":
    uvicorn.run(
        "app:app",
        host="0.0.0.0",
        port=8000,
        reload=True, # Useful in Test, remove in Production
    )

我们还可以通过以下客户端在本地测试服务器。

client.py

from mcp.client.streamable_http import streamable_http_client
from mcp import ClientSession
import asyncio

async def main():
    app_url = "https://:8000/mcp/"
    async with streamable_http_client(app_url) as (
        read_stream,
        write_stream,
        _,
    ):
        async with ClientSession(read_stream, write_stream) as session:
            await session.initialize()
            tool_result = await session.call_tool("find_competitors", {"company_name": "BigFix", "limit": 5})
            print(tool_result)

if __name__ == "__main__":
    asyncio.run(main())

第 3 步 - 创建 Databricks App

现在我们可以创建使用自定义 Python MCP 服务器的 Databricks App 了。

在 custom-mcp-server 文件夹的根目录下打开终端,运行命令以创建 Databricks App。

重要的是,应用程序名称必须以 "mcp-" 开头,否则 Databricks 将无法将其视为 MCP。

databricks apps create mcp-<app_name>

然后,我们将代码与 Databricks App 同步,并部署/启动该应用程序。

DATABRICKS_USERNAME=$(databricks current-user me | jq -r .userName)
databricks sync . "/Users/$DATABRICKS_USERNAME/mcp-<app-name>"
databricks apps deploy mcp-<app_name> --source-code-path "/Workspace/Users/$DATABRICKS_USERNAME/mcp-custom-server"

检查你的工作区以确认应用名称和已同步的文件。

该应用程序与一个服务主体(Service Principal)关联,请确保其具有读取机密的权限。

第 4 步 - 将机密传递给 App

进入 Compute(计算)→ Apps(应用程序)选项卡。

检查日志以确保应用程序运行正常且 Python 需求已安装,然后点击 Edit(截图)。

Logs & Edit

在第三个选项卡中,按如下方式将你的机密链接到唯一的资源键。

Secrets & Resource Keys

然后向下滚动以保存配置并重启应用程序。

测试与使用

Playground

Playground 中,从 Tools → Add Tool → MCP Servers (Tab) 选择自定义的 MCP 服务器,添加如下系统提示词,然后开始提问:What are the competitors of BigFix?(BigFix 的竞争对手是谁?)

Purpose: Assist users in getting companies/organizations info.

Limitations:
- Focus on companies.
- Be conversational but do not answer any unrelated queries that are not related to companies.
- Handle queries for multiple companies.
- If there is no company information, do not attempt to retrieve otherwise – inform the user with an appropriate error message.

Parameters:
- Company name
- Max results

Data Sources:
- Use the find_competitors API tool when requested with questions about company's competitors.

Actions:
1. Retrieve company info
2. Retrieve competitors

Error Handling:
- Provide clear error messages if Neo4j Connection calls fail.

Sample Questions:
- "What are the competitors of 'BigFix'?"

LLM 将使用 MCP 服务器从 Neo4j 检索信息,并以自然语言进行回复。如果模型表示无法使用该 MCP 服务器,请尝试切换到其他模型,如 Claude Opus 4.6。

Playground Results

注意:可以同时使用来自不同来源的多种工具(外部 MCP、UC 函数等),从而使你能够创建更复杂的代理。

现在我们已经测试了代理功能,可以正式使用它了。

Databricks App 的外部使用

在 Compute → Apps 中,你可以找到与你的应用程序关联的公共 URL;或者,选择你的应用程序,在状态信息中也能找到该公共 URL。

现在,要将该应用程序集成到你的代码项目中,或与团队共享,你需要一个 Databricks 令牌和一个客户端(例如 Claude)。

databricks auth token -p <your-profile>

这里是一个使用工作区身份验证的简单 Python 客户端示例。

client.py

from databricks.sdk import WorkspaceClient
from mcp.client.streamable_http import streamable_http_client
from mcp import ClientSession
import asyncio

client = WorkspaceClient()

async def main():
    headers = client.config.authenticate()
    app_url = "https://your.app.url.databricksapps.com/mcp/"
    async with streamable_http_client(app_url,  headers=headers) as (
        read_stream,
        write_stream,
        _,
    ):
        async with ClientSession(read_stream, write_stream) as session:
            await session.initialize()
            tool_result = await session.call_tool("find_competitors", {"company_name": "BigFix", "limit": 5})
            print(tool_result)

if __name__ == "__main__":
    asyncio.run(main())

你还可以将你的 App 发布到 Databricks Marketplace。

© . This site is unofficial and not affiliated with Neo4j, Inc.