GoLand 2025.2 Help

Kafka

Kafka 插件可让您监视 Kafka 事件流处理过程、创建消费者、生产者和主题。 此外,还可连接到架构注册表,并创建和更新架构。

安装 Kafka 插件

此功能依赖于 Kafka 插件,您需要安装并启用该插件。

  1. 按下 Ctrl+Alt+S 以打开设置,然后选择 插件

  2. 打开 Marketplace 选项卡,找到 Kafka 插件并点击 安装 (如果提示则重启 IDE)。

使用 Kafka 插件,您可以:

  1. 连接到:

  2. 生成并消费数据

  3. 管理主题

  4. 使用架构注册表

如果安装并启用了 Kafka 插件,您可以使用 Kafka 工具窗口(视图 | 工具窗口 | Kafka )连接到 Kafka 并进行操作。 或者,如果安装并启用了 远程文件系统Zeppelin 插件,也可以使用 大数据工具 工具窗口(视图 | 工具窗口 | 大数据工具 )访问 Kafka 连接。

连接到 Kafka

使用云服务商连接到 Kafka

连接到 Confluent 集群

  1. 打开 Kafka 工具窗口: 视图 | 工具窗口 | Kafka

  2. 点击 新建连接)。

  3. 名称 字段中输入连接名称,以便与其他连接区分。

  4. 配置来源 列表中选择 云端 ,然后在 提供程序 列表中选择 Confluent

  5. 前往 https://confluent.cloud/home。 在页面右侧点击设置菜单,选择 环境 ,然后选择您的集群,再点击 客户端 | Java

    为您的客户端复制配置片段 区块中提供 Kafka API 密钥并点击 复制

  6. 返回 IDE,并将复制的属性粘贴到 配置 字段。

  7. 填写设置之后,点击 测试连接 以确保所有配置参数均正确。 然后单击 确定

Kafka Confluent

您也可以选择设置:

  • 启用连接 :如果希望禁用此连接,请取消选中该复选框。 默认情况下,新建连接为启用状态。

  • 按项目 :选择此项可使这些连接设置仅对当前项目生效。 如果希望该连接在其他项目中可见,请取消选中该复选框。

连接到 AWS MSK 集群

  1. 打开 Kafka 工具窗口: 视图 | 工具窗口 | Kafka

  2. 单击 新建连接)。

  3. 名称 字段中输入连接名称,以便与其他连接区分开。

  4. 配置来源 列表中选择 云端 ,然后在 提供程序 列表中选择 AWS MSK

  5. 引导服务器 字段中输入 Kafka broker 的 URL,或是以逗号分隔的 URL 列表。

  6. AWS 身份验证 列表中选择身份验证方法。

    • 默认凭证提供者链 :使用默认提供程序链中的凭据。 有关该链的更多信息,请参见 使用默认凭据提供程序链

    • 从凭证文件中的配置文件 :从您的 凭证 文件中选择配置文件。

    • 显式访问密钥和密钥 :手动输入您的凭据。

  7. 您也可以选择 连接到 Schema Registry

  8. 如果希望在连接 Kafka 时使用 SSH 隧道,请选择 启用隧道 ,并在 SSH 配置 列表中选择 SSH 配置,或新建一个配置。

  9. 填写设置后,请单击 测试连接 以确保所有配置参数正确。 然后单击 确定

Kafka AWS MSK

您也可以选择设置:

  • 启用连接 :如果希望禁用此连接,请取消选中该复选框。 默认情况下,新建连接为启用状态。

  • 按项目 :选择此项可使这些连接设置仅对当前项目生效。 如果希望该连接在其他项目中可见,请取消选中该复选框。

连接到自定义 Kafka 服务器

  1. 打开 Kafka 工具窗口: 视图 | 工具窗口 | Kafka

  2. 单击 新建连接)。

  3. 名称 字段中输入连接名称,以便与其他连接区分开。

  4. 配置来源 列表中选择 自定义

  5. 引导服务器 字段中输入 Kafka broker 的 URL,或是以逗号分隔的 URL 列表。

  6. 身份验证 下选择身份验证方法:

    • :连接时不使用身份验证。

    • SASL :选择一个 SASL 机制(Plain、SCRAM-SHA-256、SCRAM-SHA-512 或 Kerberos ),并输入用户名和密码。

    • SSL

      • 如果想验证 broker 主机名是否与 broker 证书中的主机名匹配,请选择 验证服务器主机名。 取消选中该复选框等效于添加 ssl.endpoint.identification.algorithm= 属性。

      • Truststore 位置 中提供 SSL truststore 位置的路径(ssl.truststore.location 属性)。

      • Truststore 密码 中提供 SSL truststore 密码的路径(ssl.truststore.password 属性)。

      • 选择 使用 Keystore 客户端身份验证 ,并为 Keystore 位置ssl.keystore.location)、 Keystore 密码ssl.keystore.password )、以及 密钥密码ssl.key.password )提供对应值。

    • AWS IAM :使用 AWS IAM 访问 Amazon MSK。 在 AWS 身份验证 列表中选择以下选项之一:

      • 默认凭证提供者链 :使用默认提供程序链中的凭据。 有关该链的更多信息,请参见 使用默认凭据提供程序链

      • 从凭证文件中的配置文件 :从您的 凭证 文件中选择配置文件。

      • 显式访问密钥和密钥 :手动输入您的凭据。

  7. 您也可以选择 连接到 Schema Registry

  8. 如果希望在连接 Kafka 时使用 SSH 隧道,请选择 启用隧道 ,并在 SSH 配置 列表中选择 SSH 配置,或新建一个配置。

  9. 填写设置后,请单击 测试连接 以确保所有配置参数正确。 然后单击 确定

Kafka 自定义连接

您也可以选择设置:

  • 启用连接 :如果希望禁用此连接,请取消选中该复选框。 默认情况下,新建连接为启用状态。

  • 按项目 :选择此项可使这些连接设置仅对当前项目生效。 如果希望该连接在其他项目中可见,请取消选中该复选框。

使用属性连接到 Kafka

  1. 打开 Kafka 工具窗口: 视图 | 工具窗口 | Kafka

  2. 单击 新建连接)。

  3. 名称 字段中输入连接名称,以便与其他连接区分开。

  4. 配置来源 列表中选择 属性

  5. Bootstrap 服务器 字段中输入 Kafka broker 的 URL,或是以逗号分隔的 URL 列表。

  6. 选择提供 Kafka Broker 配置属性的方式:

    • 隐式 :粘贴配置属性。 您也可以使用 GoLand 提供的代码补全与文档功能手动输入属性内容。

    • 来自文件 :选择属性文件。

  7. 您可以选择 连接到 Schema Registry

  8. 如果您希望在连接 Kafka 时使用 SSH 隧道,请选择 启用隧道 ,然后在 SSH 配置 列表中选择一个 SSH 配置,或创建一个新配置。

  9. 填写设置后,点击 测试连接 以确保所有配置参数均正确。 然后点击 确定

您还可以选择设置:

  • 启用连接 :如果您希望禁用该连接,请取消选中该复选框。 默认情况下,新建的连接处于启用状态。

  • 按项目 :选中此项以仅对当前项目启用这些连接设置。 如果希望在其他项目中也可见此连接,请取消选中该复选框。

与 Kafka 服务器建立连接后,会在 Kafka 工具窗口中显示一个新标签页。 您可以使用它来 生产消费数据, 创建和删除主题。 如果您 连接到了 Schema Registry ,还可以查看、创建和更新 schema。

Kafka 工具窗口的任意标签页中点击 连接设置 可重命名、删除、禁用或刷新连接,或修改其设置。

Kafka 连接:主题

所有集群主题显示在 主题 区域。 您可以点击 显示收藏 显示收藏主题,或点击 显示内部内容 显示或隐藏内部主题。 点击任意主题可获取其详细信息,例如分区、配置和 schema 的信息。

创建主题

  1. 打开 Kafka 工具窗口: 视图 | 工具窗口 | Kafka

  2. 点击 新建连接)。

  3. 选择 主题 并点击 添加主题 (或按 Alt+Insert)。

  4. 为新主题命名,指定分区数和复制因子,然后点击 确定

从主题中删除记录

  1. 打开 Kafka 工具窗口: 视图 | 工具窗口 | Kafka

  2. 点击 新建连接)。

  3. 主题 下方,右键点击一个主题并选择 清除主题 (或点击其左侧的 清除主题)。 点击 确定 以确认删除。

生成并消费数据

生成数据

  1. 打开 Kafka 工具窗口: 视图 | 工具窗口 | Kafka

  2. 点击 新建连接)。

  3. 选择一个 Kafka 连接并点击 生产者

    这将在新的编辑器标签页中打开一个 producer。

  4. 主题 列表中,选择将消息写入的主题。

  5. 下方选择消息的 key 和 value。

    如果您 连接到了 Schema Registry ,可以选择 架构注册表 以根据所选 schema 检查发送的数据。

    您可以生成随机值:

    • 点击 生成随机数据 可基于所选类型生成随机值。 这包括基于所选 Schema Registry 生成整个 JSON 对象。

    • 如需更灵活地生成随机值,请使用 ${random...} 变量。 编辑 JSON、Avro 或 Protobuf 文件时,在值字段中开始输入 random 以查看可用随机值的自动补全选项。 例如,您可以使用 "${random.integer(1,10)}" 来生成介于 1 到 10 之间的随机整数。

      Kafka 随机关键字
  6. 下方提供自定义 header。 如果您有 JSON 或 CSV 格式的 header,可以将其粘贴到此区域。

  7. 流程 下方,您可以控制记录流:

    • 每次记录数 中输入一个数字,以便同时发送多条记录。

    • 如果希望记录数据为随机生成,请选择 生成随机键生成随机值

    • 设置每条记录发送间隔的 间隔 (单位:毫秒)。

    • 如需在达到指定记录数量或经过指定时间后停止发送消息,请提供 停止条件

  8. 选项 下方提供其他选项:

    • 分区 :指定必须发送记录的主题分区。 如果未指定,将使用默认逻辑:producer 会取 key 的哈希值对分区数取模。

    • 压缩 :选择由 producer 生成数据的压缩类型: GzipSnappyLz4Zstd

    • 幂等性 :选择此项以确保在流中每条消息只写入一份副本。

    • ACK :选择 主节点 表示由 leader 将记录写入其本地日志后立即响应,无需等待所有 follower 的完全确认。 选择 全部 ,以使主节点等待所有同步副本确认该记录。 保留 给生产者,以避免等待来自服务器的任何确认。

  9. 单击 生产

在 Kafka 中生成消息

然后,您可以单击 数据 选项卡中的任何记录以查看其详细信息。 您还可以单击 显示统计信息 启用统计信息。

消费数据

  1. 打开 Kafka 工具窗口: 视图 | 工具窗口 | Kafka

  2. 单击 新建连接)。

  3. 选择一个 Kafka 连接并单击 消费者

    这将在新的编辑器选项卡中打开一个 consumer。

  4. 主题 列表中,选择要订阅的主题。

  5. 下,选择将要消费的记录键和值的数据类型。

  6. 使用 范围与过滤器 缩小消费数据范围:

    • 起点 列表中,选择要开始消费数据的时间段或偏移量。 选择 从开头 可获取该主题中的所有记录。

    • 限制 列表中,选择停止接收数据的时机,例如,当主题中达到某个记录数量时。

    • 使用 过滤器 根据键、值或头中的子字符串筛选记录。

  7. 其他 下配置其他参数:

    • 分区 框中,输入分区 ID 或以逗号分隔的 ID 列表,仅从指定分区获取记录。

    • 消费者组 列表中选择一个 consumer group(如果希望将新 consumer 添加到该组中)。

  8. 单击 开始消费

在 Kafka 中消费消息

然后,您可以单击 数据 选项卡中的任何记录以查看其详细信息。 您还可以单击 显示统计信息 启用统计信息。

导出数据

您可以将已生成或已消费的数据下载为 CSV、TSV 或 JSON 格式。

  1. 开始 生成消费数据。

  2. 数据 表格的右上角,单击 下载图标 并选择 CSVTSVJSON

  3. 选择输出文件位置并单击 保存

保存生产者或消费者预设

如果您经常使用相同的键、值、头或其他参数生成或消费数据,您可以将其保存为预设。 之后,您可以重用这些预设快速创建 producer 或 consumer。

  1. Kafka 工具窗口中,单击 生产者消费者

  2. 指定所需参数,然后在创建 producer 或 consumer 的界面顶部,单击 收藏图标保存预设)。

这些参数将保存为预设,并显示在 预设 选项卡中。 单击一个预设以应用它。

使用架构注册表

producer 和 consumer 可使用 schema 对其记录键和值进行验证并确保一致性。 Kafka 插件集成了 Schema Registry 并支持 Avro、Protobuf 和 JSON schema。 它使您能够:

  • 连接到 Schema Registry

  • 创建、更新、删除和克隆 schema

  • 以原始格式或树形视图预览 schema

  • 比较 schema 版本

  • 删除 schema 版本

连接到架构注册表

  1. 使用 云服务提供商自定义服务器属性创建与 Kafka Broker 的连接。

  2. 如果使用 Confluent ,您可以在 配置 字段中粘贴 Broker 和 Schema Registry 的属性。

    否则,请展开 架构注册表 部分并选择提供商: ConfluentGlue

    • URL :输入 Schema Registry 的 URL。

    • 配置来源 :选择提供连接参数的方式:

      • 自定义 :选择身份验证方式并提供凭据。

        如果希望使用与 Kafka Broker 不同的 SSL 设置,请取消选中 使用代理 SSL 设置 复选框,并提供 truststore 的路径。

      • 属性 :粘贴提供的配置属性。 或者,您可以使用 GoLand 提供的代码补全和快速文档手动输入属性。

    • 区域 :选择 Schema Registry 的区域。

    • AWS 身份验证 :选择身份验证方式:

      • 默认凭据提供程序链 :使用来自默认提供程序链的凭据。 有关该链的更多信息,请参阅 使用默认凭证提供程序链

      • 来自凭据文件的配置文件 :从您的 credentials 文件中选择一个配置。

      • 显式访问密钥和私密密钥 :手动输入您的凭据。

    • 注册表名称 :输入要连接的 Schema Registry 的名称,或单击 显示架构注册表 从列表中选择。

  3. 填写设置后,单击 测试连接 以确保所有配置参数正确。 然后单击 确定

创建架构

  1. 打开 Kafka 工具窗口: 视图 | 工具窗口 | Kafka

  2. 单击 新建连接).

  3. 选择 架构注册表 并单击 创建架构 (或按 Alt+Insert).

  4. 格式 列表中,选择架构格式:Avro、Protobuf 或 JSON。

  5. 策略 列表中,选择 命名策略 ,并根据所选策略设置名称后缀或选择主题。 或者,选择 自定义名称 并输入任意名称。

创建架构

您可以在树状视图和原始视图中预览架构。

树状视图
原始视图

比较架构版本

  1. 连接到 Schema Registry 后,在 架构注册表 下选择一个架构。

  2. 切换到 原始视图 并单击 比较。 如果某个架构具有多个版本,则此按钮可用。

比较版本

删除架构版本

如果某个架构具有多个版本,您可以删除特定版本。 Schema Registry 支持 两种删除类型 :软删除(删除版本后不会从注册表中移除架构元数据和 ID)和硬删除(会移除所有元数据,包括架构 ID)。 可选择的能力取决于您使用的是 Confluent 还是 AWS Glue Schema Registry:

  • 在 Confluent Schema Registry 中,默认使用软删除。 您可以通过选中 永久删除 复选框选择使用硬删除。

  • AWS Glue Schema Registry 始终使用硬删除。

  1. 架构注册表 下选择一个架构。

  2. 在其右侧,单击 更多 并选择 删除版本

最后修改日期: 2025年 9月 26日