WebStorm 2025.2 Help

Kafka

Kafka 插件允许您监控 Kafka 事件流处理流程、创建消费者、生产者和主题。 它还支持连接到 Schema Registry,以及创建与更新架构。

安装 Kafka 插件

此功能依赖于 Kafka 插件,您需要安装并启用该插件。

  1. 按下 Ctrl+Alt+S 以打开设置,然后选择 远程文件系统

  2. 打开 Marketplace 选项卡,找到 Kafka 插件并单击 安装 (如提示,请重新启动 IDE)。

使用 Kafka 插件,您可以:

  1. 连接到:

  2. 生产与消费数据

  3. 管理主题

  4. 使用 Schema Registry

如果安装并启用了 Kafka 插件,您可以使用 Kafka 工具窗口(视图 | 工具窗口 | Kafka )连接并使用 Kafka。 或者,如果安装并启用了 远程文件系统Zeppelin 插件,您也可以通过 Big Data Tools 工具窗口(视图 | 工具窗口 | Big Data Tools )访问 Kafka 连接。

连接到 Kafka

使用云服务连接到 Kafka

连接到 Confluent 集群

  1. 打开 Kafka 工具窗口: 视图 | 工具窗口 | Kafka

  2. 单击 新建连接)。

  3. 名称 字段中输入连接的名称,以便区别于其他连接。

  4. 配置来源 列表中选择 ,然后在 提供程序 列表中选择 Confluent

  5. 转到 https://confluent.cloud/home。 在页面右侧点击设置菜单,选择 环境 ,选择您的集群,然后选择 客户端 | Java

    复制用于客户端的配置片段 区块中提供 Kafka API 密钥,并点击 复制

  6. 返回到 IDE,并将复制的属性粘贴到 配置 字段中。

  7. 填写设置后,点击 测试连接 以确保所有配置参数正确。 然后点击 确定

Kafka Confluent

(可选)您可以设置:

  • 启用连接 :如果您希望禁用此连接,请取消选中该复选框。 默认情况下,新建的连接是启用的。

  • 按项目 :选择此项可仅对当前项目启用这些连接设置。 如果您希望在其他项目中显示此连接,请取消选中该复选框。

连接到 AWS MSK 集群

  1. 打开 Kafka 工具窗口: 视图 | 工具窗口 | Kafka

  2. 点击 新建连接)。

  3. 名称 字段中输入连接名称,以便区分其他连接。

  4. 配置来源 列表中选择 ,然后在 提供程序 列表中选择 AWS MSK

  5. Bootstrap servers 字段中输入 Kafka broker 的 URL 或以逗号分隔的 URL 列表。

  6. AWS 身份验证 列表中选择身份验证方法。

    • 默认凭证提供程序链 :使用默认提供程序链中的凭据。 关于该链的更多信息,请参见 Using the Default Credential Provider Chain

    • 来自凭证文件的配置文件 :从您的 credentials 文件中选择一个配置文件。

    • 显式访问密钥和私密密钥 :手动输入您的凭据。

  7. (可选)您可以 连接到 Schema Registry

  8. 如果希望在连接 Kafka 时使用 SSH 隧道,请选择 启用隧道 ,然后在 SSH 配置 列表中选择一个 SSH 配置或创建新的配置。

  9. 填写完设置后,点击 测试连接 确保所有配置参数均正确。 然后点击 确定

Kafka AWS MSK

您可以选择设置:

  • 启用连接 :如果想要禁用此连接,请取消选中复选框。 默认情况下,新建连接处于启用状态。

  • 按项目 :选中后,仅对当前项目启用这些连接设置。 如果希望此连接在其他项目中可见,请取消选中复选框。

连接到自定义 Kafka 服务器

  1. 打开 Kafka 工具窗口: 视图 | 工具窗口 | Kafka

  2. 点击 新建连接)。

  3. 名称 字段中输入连接名称,以便与其他连接区分。

  4. 配置来源 列表中选择 自定义

  5. Bootstrap servers 字段中输入 Kafka broker 的 URL 或以逗号分隔的 URL 列表。

  6. 身份验证 下选择一种认证方式:

    • :无需认证直接连接。

    • SASL :选择一种 SASL 机制(Plain、SCRAM-SHA-256、SCRAM-SHA-512 或 Kerberos ),并提供您的用户名和密码。

    • SSL

      • 如果希望验证 broker 主机名是否与证书中的主机名匹配,请选择 验证服务器主机名。 取消选中复选框等同于添加 ssl.endpoint.identification.algorithm= 属性。

      • 信任库位置 中提供 SSL truststore 位置的路径(ssl.truststore.location 属性)。

      • 信任库密码 中提供 SSL truststore 密码的路径(ssl.truststore.password 属性)。

      • 选择 使用密钥库客户端身份验证 ,并提供 密钥库位置ssl.keystore.location)、 密钥库密码ssl.keystore.password )和 密钥密码ssl.key.password )的值。

    • AWS IAM :对 Amazon MSK 使用 AWS IAM。 在 AWS 身份验证 列表中选择以下之一:

      • 默认凭证提供程序链 :使用默认提供程序链中的凭据。 有关提供程序链的更多信息,请参阅 Using the Default Credential Provider Chain

      • 来自凭证文件的配置文件 :从 credentials 文件中选择一个配置文件。

      • 显式访问密钥和私密密钥 :手动输入您的凭据。

  7. 您可以选择 连接到 Schema Registry

  8. 如果希望在连接 Kafka 时使用 SSH 隧道,请选择 启用隧道 ,并在 SSH 配置 列表中选择一个 SSH 配置或新建一个配置。

  9. 填写完设置后,点击 测试连接 确保所有配置参数均正确。 然后点击 确定

Kafka 自定义连接

您可以选择设置:

  • 启用连接 :如果想要禁用此连接,请取消选中复选框。 默认情况下,新建连接处于启用状态。

  • 按项目 :选中后,仅对当前项目启用这些连接设置。 如果希望此连接在其他项目中可见,请取消选中复选框。

使用属性连接到 Kafka

  1. 打开 Kafka 工具窗口: 视图 | 工具窗口 | Kafka

  2. 点击 新建连接)。

  3. 名称 字段中输入连接名称,以便与其他连接区分。

  4. 配置来源 列表中选择 属性

  5. Bootstrap servers 字段中输入 Kafka broker 的 URL 或以逗号分隔的 URL 列表。

  6. 选择提供 Kafka Broker 配置属性的方式:

    • 隐式 :粘贴提供的配置属性。 或者,您可以使用 WebStorm 提供的代码补全和快速文档功能手动输入配置属性。

    • 来自文件 :选择属性文件。

  7. 您还可以选择 连接到 Schema Registry

  8. 如果希望通过 SSH 隧道连接到 Kafka,请选择 启用隧道 ,然后在 SSH 配置 列表中选择一个 SSH 配置,或新建一个。

  9. 填写完设置后,点击 测试连接 以确保所有配置参数均正确。 然后点击 确定

如有需要,您可以设置以下内容:

  • 启用连接 :若要禁用此连接,请取消选中此复选框。 默认情况下,新建的连接是启用状态。

  • 按项目 :选中以仅为当前项目启用这些连接设置。 如需在其他项目中显示此连接,请取消选中此复选框。

连接到 Kafka 服务器后,会在 Kafka 工具窗口中显示一个包含该连接的新选项卡。 您可以使用它来 生产消费 数据、 创建 和删除主题。 如果已 连接到 Schema Registry ,还可以查看、创建和更新 schema。

Kafka 工具窗口的任一选项卡中点击 连接设置 ,即可重命名、删除、禁用或刷新连接,或修改其设置。

Kafka 连接:主题

所有集群主题显示在 主题 部分。 您可以点击 显示收藏 只显示收藏主题,或点击 显示内部项 显示或隐藏内部主题。 点击任意主题可查看更多信息,例如分区、配置和 schema 的详细内容。

创建主题

  1. 打开 Kafka 工具窗口: 视图 | 工具窗口 | Kafka

  2. 点击 新建连接)。

  3. 选择 主题 并点击 添加主题 (或按 Alt+Insert)。

  4. 命名新主题,指定分区数和副本因子,然后点击 确定

从主题中删除记录

  1. 打开 Kafka 工具窗口: 视图 | 工具窗口 | Kafka

  2. 点击 新建连接)。

  3. 主题 下,右键点击一个主题并选择 清除主题 (或点击其左侧的 清除主题)。 点击 确定 以确认删除。

生产与消费数据

生产数据

  1. 打开 Kafka 工具窗口: 视图 | 工具窗口 | Kafka

  2. 点击 新建连接)。

  3. 选择一个 Kafka 连接并点击 生产者

    这将在新的编辑器选项卡中打开一个生产者。

  4. 主题 列表中选择要写入消息的主题。

  5. 下选择消息的键和值。

    如果已 连接到 Schema Registry ,可以选择 Schema Registry 来根据所选 schema 验证发送的数据。

    您可以生成随机值:

    • 点击 生成随机数据 以根据所选类型生成随机值。 包括根据所选的 Schema Registry 生成整个 JSON 对象。

    • 若需更灵活地生成随机值,请使用 ${random...} 变量。 当编辑 JSON、Avro 或 Protobuf 文件时,在值字段输入 random 可查看可能的随机值自动补全选项。 例如,您可以使用 "${random.integer(1,10)}" 生成 1 到 10 之间的随机整数。

      Kafka 随机关键词
  6. 下添加任意自定义 header。 如果以 JSON 或 CSV 格式提供 header,可以粘贴到此部分。

  7. 流程 下,您可以控制记录流:

    • 每次记录数量 中输入一个数字,以同时发送多个记录。

    • 如果希望随机生成记录数据,请选择 生成随机键生成随机值

    • 设置 间隔 ,用于控制两条记录发送之间的间隔(毫秒)。

    • 如需在达到指定记录数量或经过特定时间后停止发送消息,请提供 停止条件

  8. 选项 下,您可以配置其他选项:

    • 分区 :指定记录应发送至的主题分区。 如果未指定,将使用默认逻辑:生产者会对键取哈希值并对分区数取模。

    • 压缩 :为生产者生成的数据选择压缩类型: GzipSnappyLz4Zstd

    • 幂等性 :选中以确保每条消息在流中仅写入一次。

    • ACK :如果您希望主节点将记录写入其本地日志并在不等待所有 follower 完全确认的情况下进行响应,请选择 主节点。 请选择 全部 ,以使主节点等待所有同步副本完全确认该记录。 为使 producer 不等待来自服务器的任何确认,请保留

  9. 单击 生成

在 Kafka 中生产消息

然后,您可以在 数据 选项卡中单击任意记录以显示其详细信息。 您还可以单击 显示统计信息 启用统计信息。

消费数据

  1. 打开 Kafka 工具窗口: 视图 | 工具窗口 | Kafka

  2. 单击 新建连接)。

  3. 选择一个 Kafka 连接并单击 消费者

    这将在新的编辑器选项卡中打开一个 consumer。

  4. 主题 列表中,选择您希望订阅的主题。

  5. 下,选择您将要消费的记录键和值的数据类型。

  6. 使用 范围和筛选器 缩小待消费数据的范围:

    • 起点 列表中,选择您希望开始消费数据的时间段或偏移量。 选择 从开头 以获取该主题中的所有记录。

    • 限制 列表中,选择何时停止接收数据,例如当主题中达到特定记录数量时。

    • 使用 筛选器 通过其键、值或头中的子串过滤记录。

  7. 其他 下配置附加参数:

    • 分区 框中,输入一个分区 ID 或逗号分隔的多个 ID,仅获取特定分区的记录。

    • 消费者组 列表中,选择一个 consumer 组以将新的 consumer 添加到其中。

  8. 单击 开始消费

在 Kafka 中消费消息

然后,您可以在 数据 选项卡中单击任意记录以显示其详细信息。 您还可以单击 显示统计信息 启用统计信息。

导出数据

您可以以 CSV、TSV 或 JSON 格式下载生产或消费的数据。

  1. 开始 生产消费数据。

  2. 数据 表格的右上部分,单击 下载图标 并选择 CSVTSVJSON

  3. 选择输出文件的位置,并单击 保存

保存生产者或消费者预设

如果您经常使用相同的键、值、头或其他参数生产或消费数据,您可以将其保存为预设。 然后,您可以重用预设快速创建 producer 或 consumer。

  1. Kafka 工具窗口中,单击 生产者消费者

  2. 指定所需参数后,在 producer 或 consumer 创建表单的顶部单击 收藏图标保存预设)。

参数将作为预设保存,可在 预设 选项卡中使用。 单击预设以应用。

使用 Schema Registry

producer 和 consumer 可使用 schema 验证其记录的键和值,并确保一致性。 Kafka 插件可与 Schema Registry 集成,并支持 Avro、Protobuf 和 JSON schema。 此插件可帮助您:

  • 连接到 Schema Registry

  • 创建、更新、删除和克隆 schema

  • 以原始格式或树视图预览 schema

  • 比较 schema 版本

  • 删除 schema 版本

连接到 Schema Registry

  1. 使用 云服务提供商自定义服务器属性连接到 Kafka Broker。

  2. 如果您使用 Confluent ,可以将 Broker 和 Schema Registry 属性粘贴到 配置 字段中。

    否则,展开 Schema Registry 部分并选择提供方: ConfluentGlue

    • URL :输入 Schema Registry URL。

    • 配置来源 :选择提供连接参数的方式:

      • 自定义 :选择身份验证方式并提供凭据。

        如果您希望使用与 Kafka Broker 不同的 SSL 设置,请取消选中 使用代理 SSL 设置 复选框,并提供 truststore 路径。

      • 属性 :粘贴提供的配置属性。 或者,您可以使用代码补全和 WebStorm 提供的快速文档手动输入属性。

    • 区域 :选择 Schema Registry 区域。

    • AWS 身份验证 :选择身份验证方式:

      • 默认凭据提供程序链 :使用默认提供方链中的凭据。 有关该链的详细信息,请参阅 使用默认凭据提供链

      • 来自凭据文件的配置文件 :从您的 凭据 文件中选择一个配置文件。

      • 显式访问密钥和私密密钥 :手动输入您的凭据。

    • 注册表名称 :输入您要连接的 Schema Registry 名称,或单击 显示 Schema Registry 从列表中进行选择。

  3. 填写设置后,点击 测试连接 以确保所有配置参数正确。 然后点击 确定

创建架构

  1. 打开 Kafka 工具窗口: 视图 | 工具窗口 | Kafka

  2. 点击 新建连接)。

  3. 选择 Schema Registry 并点击 创建架构 (或按 Alt+Insert)。

  4. 格式 列表中,选择架构格式:Avro、Protobuf 或 JSON。

  5. 策略 列表中,选择 命名策略 ,并根据所选策略设置名称后缀或选择主题。 或者,选择 自定义名称 并输入任意名称。

创建架构

您可以在树状视图和原始视图中预览架构。

树状视图
原始视图

比较架构版本

  1. 连接到 Schema Registry 后,在 Schema Registry 下选择架构。

  2. 切换到 原始视图 并点击 比较。 如果架构包含多个版本,则该按钮可用。

比较版本

删除架构版本

如果一个架构包含多个版本,您可以删除特定版本。 Schema Registry 支持 两种删除方式 :软删除(删除版本后不会从注册表中移除架构元数据和 ID)和硬删除(会移除所有元数据及架构 ID)。 可选择的权限取决于您是否使用 Confluent 或 AWS Glue Schema Registry:

  • 在 Confluent Schema Registry 中,默认使用软删除。 您可以通过选中 永久删除 复选框来选择使用硬删除。

  • AWS Glue Schema Registry 始终使用硬删除。

  1. Schema Registry 下选择一个架构。

  2. 在其右侧点击 更多 并选择 删除版本

最后修改日期: 2025年 9月 26日