Kafka
Kafka 插件允许您监控您的 Kafka 事件流处理过程,创建消费者、生产者和主题。 它还允许您连接到 Schema Registry,创建和更新 schemas。
安装 Kafka 插件
此功能依赖于 Kafka 插件,您需要安装并启用。
按 Ctrl+Alt+S 打开设置,然后选择
。打开 Marketplace 选项卡,找到 Kafka插件,然后点击 安装 (如果有提示,请重启 IDE)。
借助 Kafka 插件,您可以:
连接到:
如果安装并启用了 Kafka 插件,您可以使用 Kafka 工具窗口 ( ) 来连接到 Kafka 并进行操作。 或者,如果 远程文件系统 或 Zeppelin 插件已安装并启用,您也可以使用 Big Data Tools 工具窗口( )访问 Kafka 连接。
连接到 Kafka
使用云服务提供商连接到 Kafka
连接到 Confluent 集群
打开 Kafka 工具窗口: 。
点击
(新建连接)。
在 名称 字段中,输入连接的名称,以便于与其他连接区分开。
在 配置源 列表中,选择 云 ,然后在 Provider 列表中,选择 Confluent。
请前往 https://confluent.cloud/home。 在页面右侧,点击设置菜单,选择 环境 ,选择您的集群,然后选择 。
在 为您的客户复制配置片段 块中,提供 Kafka API 密钥并点击 复制。
请回到您的 IDE 并将复制的属性粘贴到 配置 字段中。
填写设置后,点击 测试连接 以确保所有配置参数正确。 然后点击 OK。

此外,您可以设置:
启用连接 :如果希望禁用此连接,请取消选中复选框。 默认情况下,新建的连接是启用的。
按项目 :选择仅对当前项目启用这些连接设置。 如果希望此连接在其他项目中可见,请取消选中复选框。
连接到 AWS MSK 集群
打开 Kafka 工具窗口: 。
点击
(新建连接)。
在 名称 字段中,输入连接的名称,以便于与其他连接区分开。
在 配置源 列表中,选择 云 ,然后在 Provider 列表中,选择 AWS MSK。
在 引导服务器 字段中,输入 Kafka broker 的 URL 或逗号分隔的 URL 列表。
在 AWS 身份验证 列表中,选择认证方法。
默认凭据提供程序链 :使用默认提供程序链中的凭证。 有关此链的更多信息,请参阅 使用默认凭证提供程序链。
来自凭据文件的配置文件 :从您的 文件中选择一个配置文件。
显式访问密钥和密钥 :手动输入您的凭据。
您可以选择 连接到 Schema Registry。
如果您希望在连接 Kafka 时使用 SSH 隧道,请选择 启用隧道 并在 SSH 配置 列表中选择 SSH 配置或创建一个新的配置。
填写设置后,点击 测试连接 以确保所有配置参数正确。 然后点击 OK。

此外,您可以设置:
启用连接 :如果希望禁用此连接,请取消选中复选框。 默认情况下,新建的连接是启用的。
按项目 :选择仅对当前项目启用这些连接设置。 如果希望此连接在其他项目中可见,请取消选中复选框。
连接自定义 Kafka 服务器
打开 Kafka 工具窗口: 。
点击
(新建连接)。
在 名称 字段中,输入连接的名称,以便于与其他连接区分开。
在 配置源 列表中,选择 自定义。
在 引导服务器 字段中,输入 Kafka broker 的 URL 或逗号分隔的 URL 列表。
在 身份验证 下,选择一种认证方法:
无 :无需认证连接。
SASL :选择一个 SASL 机制(Plain、SCRAM-SHA-256、SCRAM-SHA-512 或 Kerberos )并提供您的用户名和密码。
SSL
选择 验证服务器主机名 以便验证代理主机名是否与代理证书中的主机名匹配。 清除此复选框相当于添加
ssl.endpoint.identification.algorithm=
属性。在 信任库位置 中,提供 SSL truststore 位置的路径(
ssl.truststore.location
属性)。在 信任库密码 中,提供 SSL truststore 密码的路径 (
ssl.truststore.password
属性)。选择 使用 密钥存储 客户端身份验证 并为 密钥库位置 (
ssl.keystore.location
)、 密钥存储密码 (ssl.keystore.password
)和 密钥密码 (ssl.key.password
)提供值。
AWS IAM :使用 AWS IAM 进行 Amazon MSK。 在 AWS 身份验证 列表中,选择以下选项之一:
默认凭据提供程序链 :使用默认提供程序链中的凭证。 有关此链的更多信息,请参阅 使用默认凭证提供程序链。
来自凭据文件的配置文件 :从您的 文件中选择一个配置文件。
显式访问密钥和密钥 :手动输入您的凭据。
您可以选择 连接到 Schema Registry。
如果您希望在连接 Kafka 时使用 SSH 隧道,请选择 启用隧道 并在 SSH 配置 列表中选择 SSH 配置或创建一个新的配置。
填写设置后,点击 测试连接 以确保所有配置参数正确。 然后点击 OK。

此外,您可以设置:
启用连接 :如果希望禁用此连接,请取消选中复选框。 默认情况下,新建的连接是启用的。
按项目 :选择仅对当前项目启用这些连接设置。 如果希望此连接在其他项目中可见,请取消选中复选框。
使用属性连接 Kafka
打开 Kafka 工具窗口: 。
点击
(新建连接)。
在 名称 字段中,输入连接的名称,以便于与其他连接区分开。
在 配置源 列表中,选择 属性。
在 引导服务器 字段中,输入 Kafka broker 的 URL 或逗号分隔的 URL 列表。
请选择提供 Kafka Broker 配置属性的方法:
隐式 :粘贴提供的配置属性。 或者,您可以使用 IntelliJ IDEA 提供的代码补全和快速文档手动输入它们。
来自文件 :选择 properties 文件。
您可以选择 连接到 Schema Registry。
如果您希望在连接 Kafka 时使用 SSH 隧道,请选择 启用隧道 并在 SSH 配置 列表中选择 SSH 配置或创建一个新的配置。
填写设置后,点击 测试连接 以确保所有配置参数正确。 然后点击 OK。
此外,您可以设置:
启用连接 :如果希望禁用此连接,请取消选中复选框。 默认情况下,新建的连接是启用的。
按项目 :选择仅对当前项目启用这些连接设置。 如果希望此连接在其他项目中可见,请取消选中复选框。
在 Spring 项目中连接 Kafka
如果您在 Spring 项目中使用 Kafka,您可以根据应用程序属性文件中的配置属性,快速连接到 Kafka 集群(或打开现有连接)。
打开您的 application.properties 或 application.yml 文件,确保至少定义了
bootstrap-servers
属性。在装订区域栏中,点击
并选择 创建 Kafka 连接。 如果您已经配置了 Kafka 连接,您也可以在此列表中选择。

此外,如果您有一个带有 @KafkaListener
注解的方法,您可以点击旁边的 ,从而快速生成指定主题的消息或从中消费数据。
一旦您与 Kafka 服务器建立连接,与此连接相关的新选项卡会出现在 Kafka 工具窗口中。 您可以用它来 生产和 消费数据, 创建和删除主题。 如果您 连接到 Schema Registry ,您还可以查看、创建和更新模式。
请点击 在 Kafka 工具窗口的任何选项卡中重命名、删除、禁用或刷新连接,或修改其设置。

所有的集群主题都显示在 主题 部分。 您可以点击 仅显示收藏的主题,或者点击
显示或隐藏内部主题。 点击任意主题以获取更多详细信息,例如有关分区、配置和架构的信息。
创建主题
打开 Kafka 工具窗口: 。
点击
(新建连接)。
选择 主题 并点击
(或按 Alt+Insert)。
为新主题命名,指定分区数量和复制因子,然后点击 OK。
从主题中删除记录
打开 Kafka 工具窗口: 。
点击
(新建连接)。
在 主题 下,右键点击一个主题并选择 清除主题 (或点击其左侧的
)。 点击 OK 以确认删除。
产生和消费数据
生成数据
打开 Kafka 工具窗口: 。
点击
(新建连接)。
选择一个 Kafka 连接,然后点击 生产者。
这将在新的编辑器标签中打开一个生产者。
在 主题 列表中,请选择一个主题发布消息。
在 键 和 值 之下,选择 message 键和值。
如果您 连接到 Schema Registry ,可以选择 架构注册表 来根据所选模式检查发送的数据。
您可以生成随机值:
点击
以根据选定类型生成随机值。 这包括根据选定的 Schema Registry 生成整个 JSON 对象。
为了更灵活地生成随机值,请使用
${random...}
变量。 在编辑 JSON、Avro 或 Protobuf 文件时,在值字段中开始键入random
以查看可能的随机值的自动完成选项。 例如,您可以使用"${random.integer(1,10)}"
生成一个介于 1 到 10 之间的随机整数。
在 头 下,提供任何自定义头文件。 如果您以 JSON 或 CSV 格式拥有它们,您可以将它们粘贴到此部分。
在 流程 下,您可以控制记录流:
在 一次记录数 中输入一个数字,如果您想同时发送多条记录。
请选择 生成随机键 和 生成随机值 ,如果您希望随机生成记录数据。
将记录发送之间的 间隔 设置为毫秒。
如果您希望生产者在达到指定的记录数量或经过指定的时间后停止发送消息,请提供 停止条件。
在 选项 下,提供其他选项:
分区 :指定必须发送记录的主题分区。 如果未指定,将使用默认逻辑:producer 将键的哈希值对分区数取模。
压缩 :选择由生产者生成的数据的压缩类型: 无、 Gzip、 Snappy、 Lz4 或 Zstd。
幂等性 :选择此项,如果您希望确保每条消息的确切副本在流中被写入。
ACK :如果您希望领导者将记录写入本地日志,并在不等待所有从属节点完全确认的情况下进行响应,请选择 负责人。 选择 所有 让领导等待整个同步副本集确认记录。 为了不等待服务器的确认,请保持 无 给生产者。
点击 生成。

您可以点击 数据 选项卡中的任何记录以显示其详细信息。 您也可以点击 启用统计信息。
使用数据
打开 Kafka 工具窗口: 。
点击
(新建连接)。
选择一个 Kafka 连接,然后点击 使用者。
这将在新的编辑器选项卡中打开一个 consumer。
在 主题 列表中,选择您要订阅的主题。
在 键 和 值 下,选择您将要消费的记录的键和值的数据类型。
请使用 范围和筛选器 来缩小数据范围:
在 起点 列表中,选择一个您想要消费数据的周期或偏移量。 请选择 从开头 以获取该主题的所有记录。
在 限制 列表中,选择何时停止接收数据,例如,当主题中的记录数达到一定数量时。
请使用 筛选 过滤键、值或标头中包含子字符串的记录。
在 其他 下配置其他参数:
在 分区 框中,输入分区 ID 或以逗号分隔的 ID 列表,以仅从特定分区获取记录。
在 使用者组 列表中,选择一个消费者组,如果您希望将新消费者添加到其中。
点击 开始使用。

您可以点击 数据 选项卡中的任何记录以显示其详细信息。 您也可以点击 启用统计信息。
导出数据
您可以以 CSV、TSV 或 JSON 格式下载已生成或已消费的数据。
保存 生产者或消费者预设
如果您经常使用相同的键、值、标头或其他参数生成或消费数据,您可以将它们保存为预设。 然后,您可以重用预设来快速创建生产者或消费者。
在 Kafka 工具窗口中,点击 生产者 或 使用者。
指定所需参数,然后在生产者或消费者创建表单顶部,点击
(保存预设)。
参数会保存为预设,可在 预设 选项卡中使用。 点击预设以应用它。
使用 Schema Registry
生产者和消费者可以使用 schema 验证并确保其记录键和值的一致性。 Kafka 插件与 Schema Registry 集成,并支持 Avro、Protobuf 和 JSON 架构。 它使您能够:
连接到 Schema Registry
创建、更改、删除和克隆 schemas
以原始格式或树状视图预览架构
比较 schema 版本
删除架构版本
连接 Schema Registry
如果您使用 Confluent ,可以将 Broker 和 Schema Registry 属性粘贴到 配置 字段中。
否则,请展开 架构注册表 部分并选择提供者: Confluent 或 Glue。
URL :请输入 Schema Registry URL。
配置源 :请选择提供连接参数的方式:
自定义 :选择认证方法并提供凭证。
如果您想使用与 Kafka Broker 不同的 SSL 设置,请清除 使用代理 SSL 设置 复选框并提供 truststore 的路径。
属性 :粘贴提供的配置属性。 或者,您可以使用 IntelliJ IDEA 提供的代码补全和快速文档手动输入属性。
区域 :选择 Schema Registry 区域。
AWS 身份验证 :请选择身份验证方式:
默认凭据提供程序链 :使用默认提供程序链中的凭证。 有关此链的更多信息,请参阅 使用默认凭证提供程序链。
来自凭据文件的配置文件 :从您的 文件中选择一个配置文件。
显式访问密钥和密钥 :手动输入您的凭据。
注册表名称 :输入您要连接的 Schema Registry 名称,或点击
从列表中选择。
填写设置后,点击 测试连接 以确保所有配置参数正确。 然后点击 OK。
创建架构
打开 Kafka 工具窗口: 。
点击
(新建连接)。
选择 架构注册表 并点击
(或按 Alt+Insert)。
在 设置格式 列表中,选择架构格式:Avro、Protobuf 或 JSON。
在 策略 列表中,选择 命名策略 ,并根据所选策略设置名称后缀或选择主题。 或者,选择 自定义名称 并输入任何名称。

您可以在树状视图和原始视图中预览模式。


比较 schema 版本
连接到 Schema Registry 时,请在 架构注册表 下选择一个模式。
切换到 原始视图 并点击 比较。 如果架构有多个版本,则该按钮可用。

删除架构版本
如果架构有多个版本,您可以删除特定版本。 Schema Registry 支持 两种类型的删除 :软删除(在版本删除后,架构元数据和 ID 不会从注册表中移除)和硬删除(会删除所有元数据,包括架构 ID)。 选择能力取决于您是使用 Confluent 还是 AWS Glue Schema Registry:
在 Confluent Schema Registry 中,默认使用软删除。 您可以通过选择 永久删除 复选框来选择使用永久删除。
AWS Glue Schema Registry 总是使用硬删除。
在 架构注册表 下选择一个架构。
在其右侧,点击
并选择 删除版本。