返回 MCP 目录
public公开dns本地运行

kafka-mcp-server

Kafka MCP Server是一个基于Go语言实现的Apache Kafka模型上下文协议(MCP)服务器,通过标准化接口让LLM模型能够执行Kafka操作。它提供了生产/消费消息、管理主题和消费者组、监控集群健康等功能,支持多种安全认证方式,并可与Cursor、Claude等AI应用集成。

article

README

🚀 Kafka 分布式计算平台 (MCP) 用户指南

Kafka 分布式计算平台(简称 Kafka MCP)是一套强大的工具集,能简化与 Apache Kafka 集群的交互。它提供了多种命令行工具和 REST API,可用于消费组描述、主题偏移量管理以及生产者/消费者日志记录等操作。

🚀 快速开始

Kafka 分布式计算平台(Kafka MCP)是一个强大的工具集,可帮助用户轻松与 Apache Kafka 集群进行交互。下面为你介绍如何快速开启使用之旅。

环境要求

  • Java 8 或更高版本
  • Kafka 版本 2.13+

安装步骤

  1. 下载并解压 Kafka MCP 发行包:
wget https://example.com/kafka-mcp.tar.gz
tar -xzvf kafka-mcp.tar.gz
cd kafka-mcp
  1. 配置环境变量(可选):
export PATH=$PATH:/path/to/kafka-mcp/bin
  1. 启动 MCP 服务(如果使用 REST API 版本):
./start_mcp_server.sh

✨ 主要特性

消费组管理

  • 消费组描述:可获取指定消费组的消费者 ID 及其所属客户端 ID。
  • 偏移量查询:能查看特定主题和分区在消费组中的当前消费位置。
  • 重置偏移量:支持手动调整消费组的偏移量,实现精确消费控制。

主题管理

  • 主题创建:可通过定义分区数、复制因子等参数快速创建新主题。
  • 主题删除:能安全删除不再需要的主题,释放集群资源。
  • ISR 监控:可实时查看主题分区的 _isr(In-Sync Replicas)状态,确保数据一致性。

消息跟踪

  • 消息消费:可实时消费指定主题的消息,并支持按时间范围、批次大小等方式灵活获取。
  • 生产日志生成:能方便地向指定主题发送测试或调试用的消息。
  • 偏移量调整:必要时可手动调整消费者偏移量,实现精准的流处理。

📦 安装指南

环境要求

| 属性 | 详情 | |------|------| | 运行环境 | Java 8 或更高版本 | | Kafka 版本 | 2.13+ |

安装步骤

  1. 下载并解压 Kafka MCP 发行包:
wget https://example.com/kafka-mcp.tar.gz
tar -xzvf kafka-mcp.tar.gz
cd kafka-mcp
  1. 配置环境变量(可选):
export PATH=$PATH:/path/to/kafka-mcp/bin
  1. 启动 MCP 服务(如果使用 REST API 版本):
./start_mcp_server.sh

💻 使用示例

基础用法

Kafka MCP 提供了多种命令行工具,以下是一些基础用法示例:

# 获取所有Kafka消费组列表
list_consumer_groups --bootstrap-server <broker_address>

# 列出Kafka集群中的所有主题
list_topics --bootstrap-server <broker_address>

高级用法

消费组与主题详细信息查看

# 查看指定消费组的详细信息,包括消费者ID和偏移量
describe_consumer_group --group-id <consumer_group_id> \
                       --bootstrap-server <broker_address> \
                       --include-protocol-info \
                       --include-consumer-info \
                       --include-offsets

# 查看指定主题的分区分配、副本和ISR状态
describe_topic --topic-name <topic_name> \
               --bootstrap-server <broker_address>

主题偏移量管理

# 消费特定主题的所有分区的偏移量
consume_offsets --group-id <consumer_group_id> \
               --topic-name <topic_name> \
               --bootstrap-server <broker_address>

# 提供指定分区的当前偏移量
seek_consumer_partition --group-id <consumer_group_id> \
                       --topic-name <topic_name> \
                       --partition <partition_number> \
                       --offset <target_offset> \
                       --bootstrap-server <broker_address>

生产者和消费者日志监控

# 消费指定主题的最新消息
consume_logs --topic-name <topic_name> \
             --bootstrap-server <broker_address>

# 生成模拟生产者日志
produce_logs --topic-name <topic_name> \
             --message "your_message_here" \
             --bootstrap-server <broker_address>

📚 详细文档

配置参考

系统配置

  • Kafka 代理地址--bootstrap-server):指定 Kafka 服务的 IP 和端口,格式为 host:port
  • 消费者组 ID--group-id):用于标识一组消费者实例。
  • 主题名称--topic-name):操作的具体主题。

环境变量

# 设置默认代理地址
export KAFKA_BOOTSTRAP_SERVER=localhost:9092

使用案例

案例一:消费组偏移量查询

# 查看指定消费组的偏移量
consume_offsets --group-id my_consumer_group \
               --bootstrap-server localhost:9092

案例二:主题描述

# 获取特定主题的详细信息
describe_topic --topic-name user_activity \
               --bootstrap-server localhost:9092

🔧 技术细节

安全注意事项

  • 认证授权:在生产环境中,建议启用 SASL/PLAINTEXT 或 TLS 加密通信,确保 API 调用的安全性。
  • 访问控制:通过 Kafka 的 ACL(Access Control List)机制限制不同用户或应用的权限范围。
  • 日志监控:定期审查操作日志,识别异常行为并及时响应。

📄 许可证

文档中未提及相关许可证信息。

🆘 帮助与支持

如需更多信息,请参考 Kafka 官方文档 或联系技术支持团队。

help

运行方式说明

cloud

托管运行

托管运行通常表示这个 MCP Server 由服务方环境承载,用户一般按页面提供的连接方式或授权流程接入,不需要在本地长期启动一个 MCP 进程

  1. 打开服务方连接页
  2. 完成授权或复制端点
  3. 在 MCP 客户端中连接
terminal

本地运行 / 其它方式

本地运行通常需要用户在自己的电脑或服务器上安装依赖,把 server_config 复制到 MCP 客户端,并按 env_schema 补齐环境变量、密钥或其它配置

  1. 复制 server_config
  2. 安装所需依赖
  3. 补齐环境变量后重启客户端