Kafka
Kafka 插件允许您监控 Kafka 事件流处理流程、创建消费者、生产者和主题。 它还支持连接到 Schema Registry,以及创建与更新架构。
安装 Kafka 插件
此功能依赖于 Kafka 插件,您需要安装并启用该插件。
按下 Ctrl+Alt+S 以打开设置,然后选择 。
打开 Marketplace 选项卡,找到 Kafka 插件并单击 安装 (如提示,请重新启动 IDE)。
使用 Kafka 插件,您可以:
连接到:
如果安装并启用了 Kafka 插件,您可以使用 Kafka 工具窗口( )连接并使用 Kafka。 或者,如果安装并启用了 远程文件系统 或 Zeppelin 插件,您也可以通过 Big Data Tools 工具窗口( )访问 Kafka 连接。
连接到 Kafka
使用云服务连接到 Kafka
连接到 Confluent 集群
打开 Kafka 工具窗口: 。
单击
(新建连接)。
在 名称 字段中输入连接的名称,以便区别于其他连接。
在 配置来源 列表中选择 云 ,然后在 提供程序 列表中选择 Confluent。
转到 https://confluent.cloud/home。 在页面右侧点击设置菜单,选择 环境 ,选择您的集群,然后选择 。
在 复制用于客户端的配置片段 区块中提供 Kafka API 密钥,并点击 复制。
返回到 IDE,并将复制的属性粘贴到 配置 字段中。
填写设置后,点击 测试连接 以确保所有配置参数正确。 然后点击 确定。

(可选)您可以设置:
启用连接 :如果您希望禁用此连接,请取消选中该复选框。 默认情况下,新建的连接是启用的。
按项目 :选择此项可仅对当前项目启用这些连接设置。 如果您希望在其他项目中显示此连接,请取消选中该复选框。
连接到 AWS MSK 集群
打开 Kafka 工具窗口: 。
点击
(新建连接)。
在 名称 字段中输入连接名称,以便区分其他连接。
在 配置来源 列表中选择 云 ,然后在 提供程序 列表中选择 AWS MSK。
在 Bootstrap servers 字段中输入 Kafka broker 的 URL 或以逗号分隔的 URL 列表。
在 AWS 身份验证 列表中选择身份验证方法。
默认凭证提供程序链 :使用默认提供程序链中的凭据。 关于该链的更多信息,请参见 Using the Default Credential Provider Chain。
来自凭证文件的配置文件 :从您的 文件中选择一个配置文件。
显式访问密钥和私密密钥 :手动输入您的凭据。
(可选)您可以 连接到 Schema Registry。
如果希望在连接 Kafka 时使用 SSH 隧道,请选择 启用隧道 ,然后在 SSH 配置 列表中选择一个 SSH 配置或创建新的配置。
填写完设置后,点击 测试连接 确保所有配置参数均正确。 然后点击 确定。

您可以选择设置:
启用连接 :如果想要禁用此连接,请取消选中复选框。 默认情况下,新建连接处于启用状态。
按项目 :选中后,仅对当前项目启用这些连接设置。 如果希望此连接在其他项目中可见,请取消选中复选框。
连接到自定义 Kafka 服务器
打开 Kafka 工具窗口: 。
点击
(新建连接)。
在 名称 字段中输入连接名称,以便与其他连接区分。
在 配置来源 列表中选择 自定义。
在 Bootstrap servers 字段中输入 Kafka broker 的 URL 或以逗号分隔的 URL 列表。
在 身份验证 下选择一种认证方式:
无 :无需认证直接连接。
SASL :选择一种 SASL 机制(Plain、SCRAM-SHA-256、SCRAM-SHA-512 或 Kerberos ),并提供您的用户名和密码。
SSL
如果希望验证 broker 主机名是否与证书中的主机名匹配,请选择 验证服务器主机名。 取消选中复选框等同于添加
ssl.endpoint.identification.algorithm=属性。在 信任库位置 中提供 SSL truststore 位置的路径(
ssl.truststore.location属性)。在 信任库密码 中提供 SSL truststore 密码的路径(
ssl.truststore.password属性)。选择 使用密钥库客户端身份验证 ,并提供 密钥库位置 (
ssl.keystore.location)、 密钥库密码 (ssl.keystore.password)和 密钥密码 (ssl.key.password)的值。
AWS IAM :对 Amazon MSK 使用 AWS IAM。 在 AWS 身份验证 列表中选择以下之一:
默认凭证提供程序链 :使用默认提供程序链中的凭据。 有关提供程序链的更多信息,请参阅 Using the Default Credential Provider Chain。
来自凭证文件的配置文件 :从 文件中选择一个配置文件。
显式访问密钥和私密密钥 :手动输入您的凭据。
您可以选择 连接到 Schema Registry。
如果希望在连接 Kafka 时使用 SSH 隧道,请选择 启用隧道 ,并在 SSH 配置 列表中选择一个 SSH 配置或新建一个配置。
填写完设置后,点击 测试连接 确保所有配置参数均正确。 然后点击 确定。

您可以选择设置:
启用连接 :如果想要禁用此连接,请取消选中复选框。 默认情况下,新建连接处于启用状态。
按项目 :选中后,仅对当前项目启用这些连接设置。 如果希望此连接在其他项目中可见,请取消选中复选框。
使用属性连接到 Kafka
打开 Kafka 工具窗口: 。
点击
(新建连接)。
在 名称 字段中输入连接名称,以便与其他连接区分。
在 配置来源 列表中选择 属性。
在 Bootstrap servers 字段中输入 Kafka broker 的 URL 或以逗号分隔的 URL 列表。
选择提供 Kafka Broker 配置属性的方式:
隐式 :粘贴提供的配置属性。 或者,您可以使用 WebStorm 提供的代码补全和快速文档功能手动输入配置属性。
来自文件 :选择属性文件。
您还可以选择 连接到 Schema Registry。
如果希望通过 SSH 隧道连接到 Kafka,请选择 启用隧道 ,然后在 SSH 配置 列表中选择一个 SSH 配置,或新建一个。
填写完设置后,点击 测试连接 以确保所有配置参数均正确。 然后点击 确定。
如有需要,您可以设置以下内容:
启用连接 :若要禁用此连接,请取消选中此复选框。 默认情况下,新建的连接是启用状态。
按项目 :选中以仅为当前项目启用这些连接设置。 如需在其他项目中显示此连接,请取消选中此复选框。
连接到 Kafka 服务器后,会在 Kafka 工具窗口中显示一个包含该连接的新选项卡。 您可以使用它来 生产 和 消费 数据、 创建 和删除主题。 如果已 连接到 Schema Registry ,还可以查看、创建和更新 schema。
在 Kafka 工具窗口的任一选项卡中点击 ,即可重命名、删除、禁用或刷新连接,或修改其设置。

所有集群主题显示在 主题 部分。 您可以点击 只显示收藏主题,或点击
显示或隐藏内部主题。 点击任意主题可查看更多信息,例如分区、配置和 schema 的详细内容。
创建主题
打开 Kafka 工具窗口: 。
点击
(新建连接)。
选择 主题 并点击
(或按 Alt+Insert)。
命名新主题,指定分区数和副本因子,然后点击 确定。
从主题中删除记录
打开 Kafka 工具窗口: 。
点击
(新建连接)。
在 主题 下,右键点击一个主题并选择 清除主题 (或点击其左侧的
)。 点击 确定 以确认删除。
生产与消费数据
生产数据
打开 Kafka 工具窗口: 。
点击
(新建连接)。
选择一个 Kafka 连接并点击 生产者。
这将在新的编辑器选项卡中打开一个生产者。
在 主题 列表中选择要写入消息的主题。
在 键 和 值 下选择消息的键和值。
如果已 连接到 Schema Registry ,可以选择 Schema Registry 来根据所选 schema 验证发送的数据。
您可以生成随机值:
点击
以根据所选类型生成随机值。 包括根据所选的 Schema Registry 生成整个 JSON 对象。
若需更灵活地生成随机值,请使用
${random...}变量。 当编辑 JSON、Avro 或 Protobuf 文件时,在值字段输入random可查看可能的随机值自动补全选项。 例如,您可以使用"${random.integer(1,10)}"生成 1 到 10 之间的随机整数。
在 头 下添加任意自定义 header。 如果以 JSON 或 CSV 格式提供 header,可以粘贴到此部分。
在 流程 下,您可以控制记录流:
在 每次记录数量 中输入一个数字,以同时发送多个记录。
如果希望随机生成记录数据,请选择 生成随机键 和 生成随机值。
设置 间隔 ,用于控制两条记录发送之间的间隔(毫秒)。
如需在达到指定记录数量或经过特定时间后停止发送消息,请提供 停止条件。
在 选项 下,您可以配置其他选项:
分区 :指定记录应发送至的主题分区。 如果未指定,将使用默认逻辑:生产者会对键取哈希值并对分区数取模。
压缩 :为生产者生成的数据选择压缩类型: 无、 Gzip、 Snappy、 Lz4 或 Zstd。
幂等性 :选中以确保每条消息在流中仅写入一次。
ACK :如果您希望主节点将记录写入其本地日志并在不等待所有 follower 完全确认的情况下进行响应,请选择 主节点。 请选择 全部 ,以使主节点等待所有同步副本完全确认该记录。 为使 producer 不等待来自服务器的任何确认,请保留 无。
单击 生成。

然后,您可以在 数据 选项卡中单击任意记录以显示其详细信息。 您还可以单击 启用统计信息。
消费数据
打开 Kafka 工具窗口: 。
单击
(新建连接)。
选择一个 Kafka 连接并单击 消费者。
这将在新的编辑器选项卡中打开一个 consumer。
在 主题 列表中,选择您希望订阅的主题。
在 键 和 值 下,选择您将要消费的记录键和值的数据类型。
使用 范围和筛选器 缩小待消费数据的范围:
在 起点 列表中,选择您希望开始消费数据的时间段或偏移量。 选择 从开头 以获取该主题中的所有记录。
在 限制 列表中,选择何时停止接收数据,例如当主题中达到特定记录数量时。
使用 筛选器 通过其键、值或头中的子串过滤记录。
在 其他 下配置附加参数:
在 分区 框中,输入一个分区 ID 或逗号分隔的多个 ID,仅获取特定分区的记录。
在 消费者组 列表中,选择一个 consumer 组以将新的 consumer 添加到其中。
单击 开始消费。

然后,您可以在 数据 选项卡中单击任意记录以显示其详细信息。 您还可以单击 启用统计信息。
导出数据
您可以以 CSV、TSV 或 JSON 格式下载生产或消费的数据。
保存生产者或消费者预设
如果您经常使用相同的键、值、头或其他参数生产或消费数据,您可以将其保存为预设。 然后,您可以重用预设快速创建 producer 或 consumer。
在 Kafka 工具窗口中,单击 生产者 或 消费者。
指定所需参数后,在 producer 或 consumer 创建表单的顶部单击
(保存预设)。
参数将作为预设保存,可在 预设 选项卡中使用。 单击预设以应用。
使用 Schema Registry
producer 和 consumer 可使用 schema 验证其记录的键和值,并确保一致性。 Kafka 插件可与 Schema Registry 集成,并支持 Avro、Protobuf 和 JSON schema。 此插件可帮助您:
连接到 Schema Registry
创建、更新、删除和克隆 schema
以原始格式或树视图预览 schema
比较 schema 版本
删除 schema 版本
连接到 Schema Registry
如果您使用 Confluent ,可以将 Broker 和 Schema Registry 属性粘贴到 配置 字段中。
否则,展开 Schema Registry 部分并选择提供方: Confluent 或 Glue。
URL :输入 Schema Registry URL。
配置来源 :选择提供连接参数的方式:
自定义 :选择身份验证方式并提供凭据。
如果您希望使用与 Kafka Broker 不同的 SSL 设置,请取消选中 使用代理 SSL 设置 复选框,并提供 truststore 路径。
属性 :粘贴提供的配置属性。 或者,您可以使用代码补全和 WebStorm 提供的快速文档手动输入属性。
区域 :选择 Schema Registry 区域。
AWS 身份验证 :选择身份验证方式:
默认凭据提供程序链 :使用默认提供方链中的凭据。 有关该链的详细信息,请参阅 使用默认凭据提供链。
来自凭据文件的配置文件 :从您的 文件中选择一个配置文件。
显式访问密钥和私密密钥 :手动输入您的凭据。
注册表名称 :输入您要连接的 Schema Registry 名称,或单击
从列表中进行选择。
填写设置后,点击 测试连接 以确保所有配置参数正确。 然后点击 确定。
创建架构
打开 Kafka 工具窗口: 。
点击
(新建连接)。
选择 Schema Registry 并点击
(或按 Alt+Insert)。
在 格式 列表中,选择架构格式:Avro、Protobuf 或 JSON。
在 策略 列表中,选择 命名策略 ,并根据所选策略设置名称后缀或选择主题。 或者,选择 自定义名称 并输入任意名称。

您可以在树状视图和原始视图中预览架构。


比较架构版本
连接到 Schema Registry 后,在 Schema Registry 下选择架构。
切换到 原始视图 并点击 比较。 如果架构包含多个版本,则该按钮可用。

删除架构版本
如果一个架构包含多个版本,您可以删除特定版本。 Schema Registry 支持 两种删除方式 :软删除(删除版本后不会从注册表中移除架构元数据和 ID)和硬删除(会移除所有元数据及架构 ID)。 可选择的权限取决于您是否使用 Confluent 或 AWS Glue Schema Registry:
在 Confluent Schema Registry 中,默认使用软删除。 您可以通过选中 永久删除 复选框来选择使用硬删除。
AWS Glue Schema Registry 始终使用硬删除。
在 Schema Registry 下选择一个架构。
在其右侧点击
并选择 删除版本。