这是本节的多页打印视图。 点击此处打印.

返回本页常规视图.

Kafka学习笔记

1 - 介绍

Kafka的介绍

1.1 - 资料收集

收集 kafka 的各种资料

网站

文档

文章

2 - 安装

Kafka的安装

2.1 - 在ubuntu上安装kafka

在ubuntu实体机器上安装Kafka

安装

下载

从 kafka 官网的下载页面 http://kafka.apache.org/downloads 下载最新版本。

解压缩

解压缩下载下来的文件(如 kafka_2.13-3.3.1.tgz)到合适的位置,然后修改 /etc/profile 或者 ~/.zshrc,加入下列内容:

# kafka
export KAFKA_HOME=/home/sky/work/soft/kafka/kafka
export PATH=$PATH:$KAFKA_HOME/bin

通过 source /etc/profile 或者 ~/.zshr 命令载入。

最简启动

参考 Apache Kafka Quick Start, 以最简单的方式启动 kafka 并启动 producer 和 consumer:

cd /home/sky/work/soft/kafka/kafak

# start zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties

# start kafka server
bin/kafka-server-start.sh config/server.properties

# create topic
bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092

# write events
bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092

# read events
bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092

3 - 安全

Kafka的安全配置

3.1 - 概述

安全概述

https://kafka.apache.org/documentation/#security_overview

在0.9.0.0版本中,Kafka社区增加了一些功能,这些功能单独或一起使用,可以增加Kafka集群的安全性。目前支持以下安全措施:

  1. 使用 SSL 或 SASL 对来自客户端(生产者和消费者)、其他broker和工具的连接进行认证。Kafka支持以下SASL (Simple Authentication and Security Layer/简单认证与安全层)机制:
    • SASL/GSSAPI (Kerberos) - 从版本 0.9.0.0 开始
    • SASL/PLAIN - 从版本 0.10.0.0 开始
    • SASL/SCRAM-SHA-256 和 SASL/SCRAM-SHA-512 - 从版本 0.10.2.0 开始
    • SASL/OAUTHBEARER - 从版本 2.0 开始
  2. 验证从 broker 到 ZooKeeper 的连接
  3. 使用 SSL 对 broker 和客户端之间、broker之间或 broker 和工具之间传输的数据进行加密(注意,启用 SSL 后会有性能下降,其程度取决于 CPU 类型和 JVM 实现)。
  4. 对客户的读/写操作进行授权
  5. 授权是可插拔的,支持与外部授权服务的集成

值得注意的是,安全是可选的–支持非安全的集群,以及认证的、非认证的、加密的和非加密的客户端的混合。下面的指南解释了如何配置和使用客户端和 broker 的安全功能。

3.2 - Listener

监听器配置

https://kafka.apache.org/documentation/#listener_configuration

为了保证 Kafka 集群的安全,有必要保证用于与服务器通信的通道的安全。每个服务器必须定义一组监听器,用来接收来自客户端以及其他服务器的请求。每个监听器可以被配置为使用各种机制来验证客户端,并确保服务器和客户端之间的通信是安全的。本节提供了一个配置监听器的入门知识。

Kafka 服务器支持监听多个端口的连接。这是通过服务器配置中的 listeners 属性来配置的,它接受一个用逗号分隔的监听器列表来启用。每个服务器上必须至少定义一个监听器。 listeners 中定义的每个监听器的格式如下:

{LISTENER_NAME}://{hostname}:{port}

LISTENER_NAME通常是一个描述性的名字,定义了监听器的用途。例如,许多配置为客户端流量使用单独的监听器,所以他们可能在配置中把相应的监听器称为`CLIENT':

listeners=CLIENT://localhost:9092

每个监听器的安全协议在一个单独的配置中定义:listener.security.protocol.map。该值是一个逗号分隔的列表,列出了映射到其安全协议的每个监听器。例如,下面值配置指定 CLIENT 监听器将使用 SSL,而BROKER 监听器将使用明文(plaintext)。

listener.security.protocol.map=CLIENT:SSL,BROKER:PLAINTEXT

下面给出了安全协议的可能选项:

  1. PLAINTEXT
  2. SSL
  3. SASL_PLAINTEXT
  4. SASL_SSL

明文(PLAINTEXT)协议不提供安全性,不需要任何额外的配置。在下面的章节中,本文将介绍如何配置其余的协议。

如果每个需要的监听器使用单独的安全协议,也可以在监听器中使用安全协议名称作为监听器名称。使用上面的例子,我们可以使用下面的定义跳过 CLIENT 和 BROKER 监听器的定义:

listeners=SSL://localhost:9092,PLAINTEXT://localhost:9093

然而,我们建议用户为监听器提供明确的名称,因为它使每个监听器的预期用途更加清晰。

在这个列表中的监听器中,可以通过将 inter.broker.listener.name 配置设置为监听器的名称,来声明监听器用于 broker 间的通信。broker 间监听器的主要目的是分区复制。如果没有定义,那么 broker 间的监听器由 security.inter.broker.protocol 定义的安全协议决定,该协议默认为PLAINTEXT

对于依赖 Zookeeper 存储集群元数据的传统集群,可以声明一个单独的监听器,用于从活动控制器到 broker 的元数据传播。这是由 control.plane.listener.name 定义的。当活动控制器需要向集群中的 broker 推送元数据更新时,它将使用这个监听器。使用控制平面监听器的好处是,它使用一个单独的处理线程,这使得应用程序流量不太可能阻碍元数据变化的及时传播(如分区领导和ISR更新)。注意,默认值是 null,这意味着控制器将使用由 inter.broker.listener 定义的相同监听器。

控制器接收来自其他控制器和 broker 的请求。由于这个原因,即使一个服务器没有启用控制器角色(即它只是一个 broker),它仍然必须定义控制器监听器以及配置它所需的任何安全属性。例如,我们可以在一个独立的 broker 上使用以下配置:

process.roles=broker
listeners=BROKER://localhost:9092
inter.broker.listener.name=BROKER
controller.quorum.voters=0@localhost:9093
controller.listener.names=CONTROLLER
listener.security.protocol.map=BROKER:SASL_SSL,CONTROLLER:SASL_SSL

在这个例子中,控制器监听器仍然被配置为使用 SASL_SSL 安全协议,但它不包括在 listeners 中,因为 broker 没有暴露控制器监听器本身。在这种情况下,将使用的端口来自 controller.quorum.voters 配置,它定义了完整的控制器列表。

对于同时启用了 broker 和控制器角色的 KRaft 服务器,配置是类似的。唯一的区别是,控制器监听器必须包含在监听器中:

process.roles=broker,controller
listeners=BROKER://localhost:9092,CONTROLLER://localhost:9093
inter.broker.listener.name=BROKER
controller.quorum.voters=0@localhost:9093
controller.listener.names=CONTROLLER
listener.security.protocol.map=BROKER:SASL_SSL,CONTROLLER:SASL_SSL

在 controller.quorum.voters 中定义的端口必须与暴露的控制器监听器之一完全匹配。例如,这里的 CONTROLLER 监听器被绑定到端口9093。由 controller.quorum.voters 定义的连接字符串也必须使用端口 9093,就像这里一样。

控制器将接受由 controller.listener.names 定义的所有监听器的请求。通常情况下,只有一个控制器监听器,但也可以有更多。例如,这提供了一种方法,通过集群的滚动,将活动的监听器从一个端口或安全协议改为另一个(一个滚动暴露新的监听器,一个滚动删除旧的监听器)。当定义了多个控制器监听器时,列表中的第一个监听器将被用于出站请求。

在 Kafka 中,传统的做法是为客户端使用单独的监听器。这使得集群间的监听器可以在网络层面上被隔离。在 KRaft 的控制器监听器的情况下,监听器应该是隔离的,因为客户端无论如何都不会与它一起工作。客户端应该连接到配置在代理上的任何其他监听器。任何与控制器绑定的请求将被转发,如下所述。

在下一节中,本文将介绍如何在监听器上启用 SSL 以进行加密和验证。随后的部分将介绍使用 SASL 的额外认证机制。

3.3 - SSL

使用SSL进行加密和认证

https://kafka.apache.org/documentation/#security_ssl

Apache Kafka 允许客户端使用 SSL 对流量进行加密以及验证。默认情况下,SSL 是禁用的,但如果需要,可以打开。下面几段详细解释了如何设置你自己的 PKI 基础设施,用它来创建证书并配置 Kafka 来使用这些证书。

1. 为每个Kafka broker 生成SSL密钥和证书

部署一个或多个支持 SSL 的 broker 的第一步是为每个服务器生成一个公钥/私钥对。由于 Kafka 希望所有的密钥和证书都存储在密钥库中,我们将使用 Java 的 keytool 命令来完成这项任务。该工具支持两种不同的钥匙库格式,一种是 Java 特有的 jks 格式,现在已经废弃了,另一种是 PKCS12。PKCS12 是 Java 9 版本的默认格式,为了确保这个格式被使用,无论使用的是什么 Java 版本,以下所有命令都明确指定了 PKCS12 格式。

keytool -keystore {keystorefile} -alias localhost -validity {validity} -genkey -keyalg RSA -storetype pkcs12

你需要在上述命令中指定两个参数:

  1. keystorefile: 存储该 broker 的密钥(以及后来的证书)的keystore文件。keystore文件包含了这个 broker 的私钥和公钥,因此它需要被妥善保存。理想情况下,这一步是在密钥将被用于的 Kafka broker 上运行,因为这个密钥不应该被传输/离开它所要使用的服务器。
  2. validity: 密钥的有效时间,以天为单位。请注意,这与证书的有效期不同,后者将在 签署证书 中确定。你可以用同一把密钥申请多个证书:如果你的密钥有效期为10年,但你的CA只签署有效期为一年的证书,你可以在一段时间内用同一把密钥签署10个证书。

为了获得可与刚刚创建的私钥一起使用的证书,需要创建一个证书签署请求。该签名请求由受信任的 CA 签署后会产生实际的证书,然后可以安装在钥匙库中并用于验证目的。

为了生成证书签署请求,对迄今为止创建的所有服务器密钥库运行以下命令:

keytool -keystore server.keystore.jks -alias localhost -validity {validity} -genkey -keyalg RSA -destkeystoretype pkcs12 -ext SAN=DNS:{FQDN},IP:{IPADDRESS1}

该命令假定你想在证书中添加主机名信息,如果不是这样,你可以省略扩展参数 -ext SAN=DNS:{FQDN},IP:{IPADDRESS1}。请参阅下文以了解更多相关信息。

主机名称验证

启用主机名验证时,是将你正在连接的服务器所出示的证书的属性与该服务器的实际主机名或IP地址进行核对的过程,以确保你确实正在连接到正确的服务器。

这种检查的主要原因是为了防止中间人攻击。对于Kafka来说,这种检查在很长一段时间内都是默认禁用的,但从Kafka 2.0.0开始,服务器的主机名验证在客户端连接和代理间连接中都是默认启用的。

可以通过设置 ssl.endpoint.identification.algorithm 为空字符串来禁用服务器主机名验证。

对于动态配置的 broker 监听器,可以使用 kafka-configs.sh 禁用主机名验证:

> bin/kafka-configs.sh --bootstrap-server localhost:9093 --entity-type brokers --entity-name 0 --alter --add-config "listener.name.internal.ssl.endpoint.identification.algorithm="

注意:

通常情况下,没有任何好的理由禁用主机名验证,除非是 “只是让它工作” 的最快方法,并承诺 “以后有更多时间时再修复它”!

在正确的时间进行主机名验证并不难,但一旦集群开始运行,就会变得更加困难–帮你自己一个忙,现在就去做吧!。

如果启用了主机名验证,客户将根据以下两个字段之一验证服务器的完全合格域名(FQDN)或ip地址。

  1. Common Name (CN)
  2. Subject Alternative Name (SAN)

虽然 Kafka 检查这两个字段,但自2000年以来,使用通用名称(common name/CN)字段进行主机名验证已经废弃,应尽可能避免使用。此外,SAN字段更加灵活,允许在一个证书中声明多个 DNS 和 IP 条目。

另一个好处是,如果 SAN 字段被用于主机名验证,为了授权目的,可以将通用名称设置为一个更有意义的值。由于我们需要 SAN 字段包含在已签署的证书中,它将在生成签署请求时被指定。它也可以在生成密钥对时指定,但这不会自动被复制到签名请求中。

要添加一个SAN字段,请在 keytool 命令中添加以下参数 -ext SAN=DNS:{FQDN},IP:{IPADDRESS}

2. 创建自己的CA

在这一步之后,集群中的每台机器都有一个公钥/私钥对,已经可以用来加密流量,还有一个证书签署请求,这是创建证书的基础。为了增加认证功能,这个签名请求需要由一个受信任的机构签署,这个机构将在这一步中创建。

证书颁发机构(CA)负责签署证书。CA的工作就像一个签发护照的政府–政府在每本护照上盖章(签名),这样护照就很难伪造了。其他政府会验证这些印章以确保护照的真实性。同样,CA签署证书,密码学保证签署的证书在计算上很难被伪造。因此,只要CA是一个真实可信的机构,客户就有强大的保证,他们正在连接到真实的机器。

在本指南中,我们将成为我们自己的证书颁发机构。当在企业环境中建立一个生产集群时,这些证书通常会由整个公司信任的企业CA签署。请参阅生产中的常见陷阱了解这种情况下需要考虑的一些问题。

由于OpenSSL的一个bug,x509模块不会将CSR中要求的扩展字段复制到最终证书中。由于我们希望SAN扩展在我们的证书中存在,以实现主机名验证,我们将使用ca模块来代替。这需要在我们生成CA密钥对之前进行一些额外的配置。

将以下列表保存到一个名为 openssl-ca.cnf 的文件中,并根据需要调整有效性和共同属性的值:

HOME            = .
RANDFILE        = $ENV::HOME/.rnd

####################################################################
[ ca ]
default_ca    = CA_default      # The default ca section

[ CA_default ]

base_dir      = .
certificate   = $base_dir/cacert.pem   # The CA certifcate
private_key   = $base_dir/cakey.pem    # The CA private key
new_certs_dir = $base_dir              # Location for new certs after signing
database      = $base_dir/index.txt    # Database index file
serial        = $base_dir/serial.txt   # The current serial number

default_days     = 1000         # How long to certify for
default_crl_days = 30           # How long before next CRL
default_md       = sha256       # Use public key default MD
preserve         = no           # Keep passed DN ordering

x509_extensions = ca_extensions # The extensions to add to the cert

email_in_dn     = no            # Don't concat the email in the DN
copy_extensions = copy          # Required to copy SANs from CSR to cert

####################################################################
[ req ]
default_bits       = 4096
default_keyfile    = cakey.pem
distinguished_name = ca_distinguished_name
x509_extensions    = ca_extensions
string_mask        = utf8only

####################################################################
[ ca_distinguished_name ]
countryName         = Country Name (2 letter code)
countryName_default = DE

stateOrProvinceName         = State or Province Name (full name)
stateOrProvinceName_default = Test Province

localityName                = Locality Name (eg, city)
localityName_default        = Test Town

organizationName            = Organization Name (eg, company)
organizationName_default    = Test Company

organizationalUnitName         = Organizational Unit (eg, division)
organizationalUnitName_default = Test Unit

commonName         = Common Name (e.g. server FQDN or YOUR name)
commonName_default = Test Name

emailAddress         = Email Address
emailAddress_default = test@test.com

####################################################################
[ ca_extensions ]

subjectKeyIdentifier   = hash
authorityKeyIdentifier = keyid:always, issuer
basicConstraints       = critical, CA:true
keyUsage               = keyCertSign, cRLSign

####################################################################
[ signing_policy ]
countryName            = optional
stateOrProvinceName    = optional
localityName           = optional
organizationName       = optional
organizationalUnitName = optional
commonName             = supplied
emailAddress           = optional

####################################################################
[ signing_req ]
subjectKeyIdentifier   = hash
authorityKeyIdentifier = keyid,issuer
basicConstraints       = CA:FALSE
keyUsage               = digitalSignature, keyEncipherment

然后创建一个数据库和序列号文件,这些将被用来跟踪哪些证书是用这个CA签署的。这两个文件都是简单的文本文件,与你的CA密钥存放在同一个目录下。

> echo 01 > serial.txt
> touch index.txt

完成这些步骤后,你现在就可以生成你的CA了,以后将用于签署证书。

> openssl req -x509 -config openssl-ca.cnf -newkey rsa:4096 -sha256 -nodes -out cacert.pem -outform PEM

CA只是一个公共/私人密钥对和证书,它是由自己签署的,而且只用于签署其他证书。

这个密钥对应该非常安全,如果有人获得了这个密钥对,他们可以创建和签署将被你的基础设施所信任的证书,这意味着他们在连接到任何信任这个CA的服务时可以冒充任何人。

下一步是将生成的CA添加到客户的信任库(truststore),这样客户就可以信任这个CA。

> keytool -keystore client.truststore.jks -alias CARoot -import -file ca-cert

Note: If you configure the Kafka brokers to require client authentication by setting ssl.client.auth to be “requested” or “required” in the Kafka brokers config then you must provide a truststore for the Kafka brokers as well and it should have all the CA certificates that clients' keys were signed by.

**注意:**如果你通过在 Kafka brokers config 中设置 ssl.client.auth 为 “request “或 “required”,将 Kafka brokers 配置为需要客户端认证,那么你也必须为 Kafka brokers 提供一个信任仓库,它应该有所有客户端的密钥由CA证书签署。

> keytool -keystore server.truststore.jks -alias CARoot -import -file ca-cert

与步骤1中存储每台机器自身身份的钥匙库不同,客户的信任库(truststore)存储了客户应该信任的所有证书。将一个证书导入自己的信任库(truststore)也意味着信任由该证书签署的所有证书。正如上面的类比,信任政府(CA)也意味着信任它所签发的所有护照(证书)。这个属性被称为信任链(chain of trust),在大型 Kafka 集群上部署 SSL 时,它特别有用。你可以用 CA 来签署集群中的所有证书,并让所有机器共享同一个信任 CA 的信任库。这样一来,所有的机器都可以验证其他所有的机器。

3. 签署证书

然后与 CA 一起签署:

> openssl ca -config openssl-ca.cnf -policy signing_policy -extensions signing_req -out {server certificate} -infiles {certificate signing request}

最后,你需要把CA的证书和签名的证书都导入 keystore:

> keytool -keystore {keystore} -alias CARoot -import -file {CA certificate}
> keytool -keystore {keystore} -alias localhost -import -file cert-signed

参数的定义如下:

  1. keystore: keystore 的位置
  2. CA certificate: CA的证书
  3. certificate signing request: 用服务器密钥创建的csr
  4. server certificate: 将服务器的签名证书写入该文件。

这将给你留下一个名为 truststore.jks 的 truststore – 这对所有客户端和 broker 来说都是一样的,不包含任何敏感信息,所以没有必要保护它。

此外,你将在每个节点上有一个server.keystore.jks文件,其中包含该节点的密钥、证书和你的CAs证书,关于如何使用这些文件的信息,请参考 配置Kafka Brokers 和 配置Kafka Client。

关于这个话题的一些工具帮助,请查 [easyRSA 项目,它有大量的脚本来帮助完成这些步骤。

PEM格式的SSL密钥和证书

从2.7.0开始,可以在配置中直接为 Kafka broker 和客户端配置 PEM 格式的 SSL 密钥和信任存储。这避免了在文件系统上存储单独的文件,并受益于 Kafka 配置的密码保护功能。除了 JKS 和 PKCS12 之外,PEM也可以作为基于文件的密钥和信任存储的存储类型。要在代理或客户端配置中直接配置PEM密钥存储,应在 ssl.keystore.key 中提供PEM格式的私钥,在 ssl.keystore.certificate.chain 中提供 PEM 格式的证书链。为了配置信任存储,应在 ssl.truststore.certificates 中提供信任证书,如CA的公共证书。由于 PEM 通常以多行 base-64 字符串的形式存储,配置值可以作为多行字符串包含在 Kafka 配置中,每行以反斜杠 ('') 为结尾进行续行。

存储密码配置 ssl.keystore.passwordssl.truststore.password 不用于PEM。如果私钥是用密码加密的,必须在 ssl.key.password 中提供密钥密码。私钥可以以未加密的形式提供,无需密码。在生产部署中,在这种情况下,配置应该被加密或使用 Kafka 中的密码保护功能进行外部化。注意,当使用 OpenSSL 等外部工具进行加密时,默认的 SSL 引擎工厂对加密的私钥解密能力有限。像 BouncyCastle 这样的第三方库可以与自定义的SslEngineFactory集成,以支持更广泛的加密私钥。

4. 生产中的常见陷阱

上述段落展示了创建自己的CA并使用它为集群签署证书的过程。虽然对sandbox、dev、测试和类似的系统非常有用,但这通常不是为企业环境中的生产集群创建证书的正确过程。企业通常会操作他们自己的CA,用户可以发送 CSR 到这个 CA 上签名,这样做的好处是用户不需要负责保持 CA 的安全,同时也是一个大家可以信任的中央机构。然而,它也从用户那里拿走了对签署证书过程的很多控制权。很多时候,操作企业 CA 的人都会对证书施加严格的限制,当试图在 Kafka上 使用这些证书时,就会产生问题。

  1. 扩展密钥的使用

    证书可能包含一个扩展字段,控制证书的使用目的。如果该字段为空,则对用途没有限制,但如果在这里指定了任何用途,有效的SSL实现必须执行这些用途。

    Kafka的相关用途是:

    • 客户端认证
    • 服务器认证

    Kafka broker 需要允许这两种用法,因为对于集群内的通信,每个 broker 都会对其他 broker 表现得既是客户端又是服务器。企业CA有一个Web服务器的签名配置文件,并将其用于Kafka,这并不罕见,它将只包含serverAuth使用值,并导致SSL握手失败。

  2. 中级证书 为了安全起见,企业根CA经常保持离线状态。为了保证日常使用,我们创建了所谓的中间CA,然后用它们来签署最终的证书。当把由中间CA签署的证书导入钥匙库时,有必要提供整个信任链,直到根CA。这可以通过简单的cat将证书文件合并成一个证书文件,然后用keytool导入来完成。

  3. 未能复制扩展字段

    CA运营商通常不愿意从CSR中复制和要求的扩展字段,而倾向于自己指定这些字段,因为这使得恶意的一方更难获得具有潜在的误导性或欺诈性价值的证书。建议仔细检查已签署的证书,这些证书是否包含所有要求的SAN字段,以实现正确的主机名验证。下面的命令可以用来将证书细节打印到控制台,并与最初要求的内容进行比较。

    > openssl x509 -in certificate.crt -text -noout
    

5. 配置 Kafka Brokers

如果没有为 broker 之间的通信启用SSL(如何启用见下文),那么PLAINTEXT和SSL端口都是必要的。

listeners=PLAINTEXT://host.name:port,SSL://host.name:port

在 broker 方面,需要进行以下SSL配置:

ssl.keystore.location=/var/private/ssl/server.keystore.jks
ssl.keystore.password=test1234
ssl.key.password=test1234
ssl.truststore.location=/var/private/ssl/server.truststore.jks
ssl.truststore.password=test1234

注意:ssl.truststore.password 在技术上是可选的,但强烈推荐。如果不设置密码,仍然可以访问 truststore,但完整性检查被禁用。值得考虑的可选设置:

  1. ssl.client.auth=none (“required” => 需要客户认证, “requested” => 要求客户认证,没有证书的客户仍然可以连接。我们不鼓励使用 “requested”,因为它提供了一种错误的安全感,配置错误的客户仍然会成功连接。)
  2. ssl.cipher.suites (Optional). 一个密码套件是认证、加密、MAC和密钥交换算法的命名组合,用于协商使用TLS或SSL网络协议的网络连接的安全设置。(默认是一个空列表)
  3. ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1 (列出你要接受的客户的SSL协议。请注意,SSL已被弃用,取而代之的是TLS,不建议在生产中使用SSL)
  4. ssl.keystore.type=JKS
  5. ssl.truststore.type=JKS
  6. ssl.secure.random.implementation=SHA1PRNG

如果你想为 broker 之间的通信启用SSL,请在 server.properties 文件中添加以下内容(它默认为PLAINTEXT)。

security.inter.broker.protocol=SSL

由于一些国家的进口法规,Oracle的实现限制了默认情况下可用的加密算法的强度。如果需要更强的算法(例如,256位密钥的AES),必须在JDK/JRE中获得并安装JCE无限强度管辖政策文件。更多信息请参见JCA供应商文档。

JRE/JDK将有一个默认的伪随机数发生器(PRNG),用于加密操作,所以不需要配置与ssl.secure.random.implement一起使用的实现。然而,一些实现存在性能问题(特别是Linux系统上选择的默认实现NativePRNG,利用了全局锁)。在SSL连接的性能成为问题的情况下,考虑明确设置要使用的实现。SHA1PRNG的实现是无阻塞的,并在重载下显示出非常好的性能特征(50MB/秒的生产消息,加上复制流量,每个经纪人)。

一旦你启动 broker,你应该能够在server.log中看到:

with addresses: PLAINTEXT -> EndPoint(192.168.64.1,9092,PLAINTEXT),SSL -> EndPoint(192.168.64.1,9093,SSL)

要快速检查服务器钥匙库和信任库是否设置正确,你可以运行以下命令

> openssl s_client -debug -connect localhost:9093 -tls1

(注意:TLSv1应该列在ssl.enabled.protocols下)

在这个命令的输出中,你应该看到服务器的证书:

-----BEGIN CERTIFICATE-----
{variable sized random bytes}
-----END CERTIFICATE-----
subject=/C=US/ST=CA/L=Santa Clara/O=org/OU=org/CN=Sriharsha Chintalapani
issuer=/C=US/ST=CA/L=Santa Clara/O=org/OU=org/CN=kafka/emailAddress=test@test.com

如果证书没有显示出来,或者有任何其他错误信息,那么你的钥匙库并没有正确设置。

6. 配置 Kafka Clients

只有新的 Kafka 生产者和消费者支持 SSL,旧的 API 不支持。对于生产者和消费者来说,SSL 的配置将是相同的。

如果 broker 中不需要客户端认证,那么下面是一个最小的配置例子:

security.protocol=SSL
ssl.truststore.location=/var/private/ssl/client.truststore.jks
ssl.truststore.password=test1234

注意:ssl.truststore.password 在技术上是可选的,但强烈推荐。如果不设置密码,仍然可以访问truststore,但完整性检查被禁用。如果需要客户端认证,那么必须像第1步那样创建一个密钥库,同时还必须配置以下内容。

ssl.keystore.location=/var/private/ssl/client.keystore.jks
ssl.keystore.password=test1234
ssl.key.password=test1234

根据我们的要求和 broker 的配置,可能还需要其他的配置设置:

  1. ssl.provider (Optional). 用于SSL连接的安全提供者的名称。默认值是JVM的默认安全提供者。
  2. ssl.cipher.suites (Optional). 密码套件是认证、加密、MAC和密钥交换算法的命名组合,用于协商使用TLS或SSL网络协议的网络连接的安全设置。
  3. ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1. 它应该至少列出在 broker 一方配置的一个协议
  4. ssl.truststore.type=JKS
  5. ssl.keystore.type=JKS

使用控制台-生产者和控制台-消费者的例子:

> kafka-console-producer.sh --bootstrap-server localhost:9093 --topic test --producer.config client-ssl.properties
> kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic test --consumer.config client-ssl.properties

3.4 - SASL

使用SASL进行认证

https://kafka.apache.org/documentation/#security_sasl

1. JAAS 配置

Kafka使用 Java认证和授权服务(Java Authentication and Authorization Service / JAAS)进行SASL配置。

用于Kafka broker的JAAS配置

KafkaServer 是每个 KafkaServer/Broker 使用的 JAAS 文件中的部分名称。该部分为 broker 提供 SASL 配置选项,包括 broker 为 broker 之间的通信进行的任何SASL客户端连接。如果多个监听器被配置为使用SASL,该部分的名称可以以小写的监听器名称为前缀,后面跟一个句号,例如:sasl_ssl.KafkaServer

client 部分用于验证与 zookeeper 的 SASL 连接。它还允许 broker 在 zookeeper 节点上设置 SASL ACL,锁定这些节点,以便只有 broker 可以修改它。有必要在所有的 broker 中使用相同的 principal 名称。如果你想使用Client以外的部分名称,请将系统属性 zookeeper.sasl.clientconfig 设置为合适的名称(-Dzookeeper.sasl.clientconfig=ZkClient)。

ZooKeeper默认使用 “zookeeper” 作为服务名称。如果你想改变这一点,请将系统属性 zookeeper.sasl.client.username设置为适当的名称(例如-Dzookeeper.sasl.client.username=zk)。

broker 也可以使用 broker 配置属性 sasl.jaas.config 来配置 JAAS。该属性名称必须以包括 SASL 机制的监听器前缀,即listener.name.{listenerName}.{saslMechanism}.sasl.jaas.config。配置值中只能指定一个登录模块。如果在一个监听器上配置了多个机制,必须使用监听器和机制前缀为每个机制提供配置。例如:

listener.name.sasl_ssl.scram-sha-256.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
    username="admin" \
    password="admin-secret";
listener.name.sasl_ssl.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="admin" \
    password="admin-secret" \
    user_admin="admin-secret" \
    user_alice="alice-secret";

如果JAAS配置是在不同层次上定义的,所使用的优先顺序是:

  • broker 配置属性 listener.name.{listenerName}.{saslMechanism}.sasl.jaas.config
  • 静态JAAS配置的{listenerName}.KafkaServer 部分
  • 静态JAAS配置的KafkaServer 部分

请注意,ZooKeeper JAAS配置只能使用静态JAAS配置进行配置。

关于代理配置的例子,请参见GSSAPI (Kerberos)PLAINSCRAMOAUTHBEARER

为Kafka客户端配置JAAS

客户端可以使用客户端配置属性sasl.jaas.config或使用类似于经纪人的静态JAAS配置文件来配置JAAS:

  1. 使用客户端配置属性进行JAAS配置

    客户端可以将 JAAS 配置指定为生产者或消费者属性,而不需要创建一个物理配置文件。这种模式也使同一JVM内的不同生产者和消费者能够通过为每个客户指定不同的属性来使用不同的凭证。如果同时指定静态JAAS配置系统性java.security.auth.login.config和客户端属性sasl.jaas.config,将使用客户端属性。

    关于配置的例子,请参见GSSAPI (Kerberos)PLAINSCRAMOAUTHBEARER

  2. 使用静态配置文件进行JAAS配置

    使用静态JAAS配置文件在客户端配置SASL认证。

    1. 添加 JAAS 配置文件,其中有名为 KafkaClient 的客户端登录部分。按照设置GSSAPI(Kerberos)、PLAIN、SCRAM或OAUTHBEARER的例子所述,在 KafkaClient 中为所选机制配置登录模块。例如,GSSAPI 凭证可以配置为:

    KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/etc/security/keytabs/kafka_client.keytab" principal=“kafka-client-1@EXAMPLE.COM”; };

    
    2. 将JAAS配置文件的位置作为JVM参数传递给每个客户JVM。比如说:
    
    ```bash
    -Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf
    

2. SASL 配置

SASL 可以使用 PLAINTEXT 或 SSL 作为传输层,分别使用安全协议 SASL_PLAINTEXT 或 SASL_SSL。如果使用 SASL_SSL,那么还必须配置 SSL。

SASL 机制

Kafka支持以下 SASL 机制:

Kafka broker 的 SASL 配置

  1. 在 server.properties 中配置 SASL 端口,在 listeners 参数中至少添加 SASL_PLAINTEXT 或 SASL_SSL 之一,该参数包含一个或多个逗号分隔的值。

    listeners=SASL_PLAINTEXT://host.name:port
    

    如果你只配置一个 SASL 端口(或者你想让 Kafka broker 使用 SASL 互相认证),那么请确保你为 broker 之间的通信设置相同的SASL协议。

    security.inter.broker.protocol=SASL_PLAINTEXT (or SASL_SSL)
    
  2. 选择一个或多个 支持的机制 在 broker 中启用,并按照步骤为该机制配置SASL。要在代理中启用多个机制,请按照步骤这里

Kafka 客户端的 SASL 配置

SASL 认证只支持新的 Java Kafka 生产者和消费者,旧的API不被支持。

要在客户端配置 SASL 认证,请选择 broker 中为客户端认证启用的 SASL机制,并按照步骤为所选机制配置SASL。

3. 使用SASL/Kerberos进行认证

先决条件

  1. Kerberos

    如果你的组织已经在使用Kerberos服务器(例如,通过使用Active Directory),就没有必要为 Kafka 安装一个新的服务器。否则你需要安装一个,你的Linux供应商可能有 Kerberos 的软件包,以及如何安装和配置的简短指南(Ubuntu, Redhat)。注意,如果你使用的是Oracle Java,你将需要为你的Java版本下载JCE策略文件,并将它们复制到$JAVA_HOME/jre/lib/security。

  2. 创建Kerberos原则

    如果你使用的是组织的Kerberos或Active Directory服务器,请向你的Kerberos管理员要一个principal,用于集群中的每个Kafka broker 和每个将用Kerberos认证(通过客户端和工具)访问Kafka的操作系统用户。

    如果你已经安装了你自己的Kerberos,你将需要使用以下命令自己创建这些 principals。

    > sudo /usr/sbin/kadmin.local -q 'addprinc -randkey kafka/{hostname}@{REALM}'
    > sudo /usr/sbin/kadmin.local -q "ktadd -k /etc/security/keytabs/{keytabname}.keytab kafka/{hostname}@{REALM}"
    
  3. 确保所有的主机都可以用主机名到达–这是Kerberos的要求,你的所有主机都可以用它们的FQDNs来解析。

配置 kafka broker

  1. 在每个 Kafka broker 的配置目录下添加一个与下面类似的经过适当修改的 JAAS 文件,在这个例子中我们称之为kafka_server_jaas.conf(注意,每个 broker 都应该有自己的keytab)。

     KafkaServer {
       com.sun.security.auth.module.Krb5LoginModule required
       useKeyTab=true
       storeKey=true
       keyTab="/etc/security/keytabs/kafka_server.keytab"
       principal="kafka/kafka1.hostname.com@EXAMPLE.COM";
     };
    
    // Zookeeper client authentication
    Client {
       com.sun.security.auth.module.Krb5LoginModule required
       useKeyTab=true
       storeKey=true
       keyTab="/etc/security/keytabs/kafka_server.keytab"
       principal="kafka/kafka1.hostname.com@EXAMPLE.COM";
    };
    

    JAAS 文件中的 KafkaServer 部分告诉 broker 要使用哪个 prinsipal ,以及存储该 principal 的 keytab 的位置。它允许 broker 使用本节中指定的 keytab 登录。关于Zookeeper SASL配置的更多细节,请参见注释。

  2. 将 JAAS 和可选的 krb5 文件位置作为 JVM 参数传递给每 个Kafka broker(更多细节见这里)。

    -Djava.security.krb5.conf=/etc/kafka/krb5.conf
    -Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf
    
  3. 确保在 JAAS 文件中配置的 keytabs 是可以被启动 kafka broker 的操作系统用户读取的。

  4. 在 server.properties 中配置SASL端口和SASL机制,如这里所述. 例如:

    listeners=SASL_PLAINTEXT://host.name:port
    security.inter.broker.protocol=SASL_PLAINTEXT
    sasl.mechanism.inter.broker.protocol=GSSAPI
    sasl.enabled.mechanisms=GSSAPI
    

    我们还必须在 server.properties 中配置服务名称,它应该与 kafka broker 的 principal 名称相匹配。在上面的例子中,principal 是 “kafka/kafka1.hostname.com@EXAMPLE.com”,所以:

    sasl.kerberos.service.name=kafka
    

配置 Kafka Clients

要在客户端配置SASL认证:

  1. 客户端(生产者、消费者、连接工作者等)将用自己的 broker(通常与运行客户端的用户同名)对集群进行认证,因此要根据需要获得或创建这些 principal。然后为每个客户端配置 JAAS 配置属性。一个 JVM 内的不同客户端可以通过指定不同的 principal,作为不同的用户运行。producer.properties 或 consumer.properties中 的属性 sasl.jaas.config 描述了生产者和消费者等客户端如何连接到 Kafka Broker。下面是一个使用keytab的客户端的配置示例(推荐用于长期运行的进程)。

    sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \
        useKeyTab=true \
        storeKey=true  \
        keyTab="/etc/security/keytabs/kafka_client.keytab" \
        principal="kafka-client-1@EXAMPLE.COM";
    

    对于 kafka-console-consumer 或 kafka-console-producer 这样的命令行工具,kinit 可以和 “useTicketCache=true” 一起使用,如:

    sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \
        useTicketCache=true;
    

    客户端的 JAAS 配置也可以指定为 JVM 参数,类似于这里所说的 broker。客户端使用名为 KafkaClient 的登录部分。这个选项只允许一个JVM的所有客户端连接的用户。

  2. 确保在 JAAS 配置中配置的 keytabs 是可以被启动 kafka 客户端的操作系统用户读取的。

  3. 可以选择将krb5文件的位置作为JVM参数传递给每个客户端JVM(更多细节见这里)。

    1. -Djava.security.krb5.conf=/etc/kafka/krb5.conf
      
  4. 在 producer.properties 或 consumer.properties 中配置以下属性:

    security.protocol=SASL_PLAINTEXT (or SASL_SSL)
    sasl.mechanism=GSSAPI
    sasl.kerberos.service.name=kafka
    

4. 使用 SASL/PLAIN 进行认证

SASL/PLAIN 是一种简单的用户名/密码认证机制,通常与 TLS 一起用于加密以实现安全认证。Kafka 支持 SASL/PLAIN 的默认实现,它可以被扩展为生产使用,如这里所描述的。

在 principal.builder.class 的默认实现下,username 被用作配置 ACL 等的认证 Principal。

配置 Kafka Brokers

  1. 在每个Kafka broker 的配置目录下添加一个与下面类似的经过适当修改的JAAS文件,本例中我们称之为 kafka_server_jaas.conf:

    KafkaServer {
        org.apache.kafka.common.security.plain.PlainLoginModule required
        username="admin"
        password="admin-secret"
        user_admin="admin-secret"
        user_alice="alice-secret";
    };
    

    这个配置定义了两个用户(admin和alice)。KafkaServer 部分的属性 username 和 password 被 broker 用来启动与其他 broker 的连接。在这个例子中,admin 是用于 broker 之间通信的用户。一组属性 user_userName 定义了所有连接到 broker 的用户的密码, broker 使用这些属性验证所有的客户端连接,包括来自其他 broker 的连接。

  2. 将 JAAS 配置文件的位置作为 JVM 参数传递给每个 Kafka broker

    -Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf
    
  3. 在 server.properties 中配置 SASL 端口和 SASL机制,如上所述。比如说。

    listeners=SASL_SSL://host.name:port
    security.inter.broker.protocol=SASL_SSL
    sasl.mechanism.inter.broker.protocol=PLAIN
    sasl.enabled.mechanisms=PLAIN
    

配置 Kafka 客户端

要在客户端配置SASL认证:

  1. 在 producer.properties 或 consumer.properties 中为每个客户端配置 JAAS 配置属性。登录模块描述了生产者和消费者等客户端如何连接到 Kafka Broker。下面是一个为 PLAIN 机制的客户端配置的例子。

    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
        username="alice" \
        password="alice-secret";
    

    选项 usernamepassword 被客户端用来配置客户端连接的用户。在这个例子中,客户端以用户 alice 的身份连接到 broker。JVM中的不同客户端可以通过在 sasl.jaas.config 中指定不同的用户名和密码,以不同的用户身份连接。

    客户端的 JAAS 配置也可以指定为 JVM 参数,类似于这里描述的brokers。客户端使用名为 “KafkaClient “的登录部分。这个选项只允许一个JVM的所有客户端连接使用一个用户。

  2. 在 producer.properties 或 consumer.properties 中配置以下属性:

    security.protocol=SASL_SSL
    sasl.mechanism=PLAIN
    
  3. 在生产中使用 SASL/PLAIN

    • SASL/PLAIN 应该只与 SSL 作为传输层一起使用,以确保在没有加密的情况下不会在网上传输明确的密码。
    • Kafka中 SASL/PLAIN 的默认实现在 JAAS 配置文件中指定了用户名和密码,如这里所示。从Kafka 2.0版本开始,你可以通过配置自己的回调处理程序来避免在磁盘上存储清晰的密码,这些处理程序使用配置选项从外部来源获得用户名和密码 sasl.server.callback.handler.class and sasl.client.callback.handler.class.
    • 在生产系统中,外部认证服务器可以实现密码验证。从Kafka 2.0版本开始,你可以通过配置 sasl.server.callback.handler.class 插入你自己的回调处理程序,使用外部认证服务器进行密码验证。

5. 使用 SASL/SCRAM 进行认证

Salted Challenge Response Authentication Mechanism / SCRAM 是SASL机制的一个系列,解决了传统机制的安全问题,这些机制执行用户名/密码认证,如 PLAIN 和 DIGEST-MD5。该机制在 RFC 5802 中定义。Kafka支持 SCRAM-SHA-256 和 SCRAM-SHA-512,可以与TLS 一起使用,以执行安全认证。在 principal.builder.class 的默认实现下,用户名被用作认证的 Principal,用于配置ACL等。Kafka 的默认 SCRAM 实现在 Zookeeper 中存储 SCRAM 凭证,适合用于 Zookeeper 在私有网络上的 Kafka 安装。更多细节请参考安全考虑

创建 Creating SCRAM 证书

Kafka 中的SCRAM 实现使用 Zookeeper 作为凭证存储。可以使用 kafka-configs.sh 在 Zookeeper 中创建凭证。对于每个启用的SCRAM机制,必须通过添加机制名称的配置来创建凭证。在Kafka broker 启动之前,必须创建 broker 之间通信的凭证。客户端凭证可以动态地创建和更新,更新的凭证将用于验证新的连接。

为用户 *alice *创建 SCRAM 凭证,密码为 alice-secret

> bin/kafka-configs.sh --zookeeper localhost:2182 --zk-tls-config-file zk_tls_config.properties --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=alice-secret],SCRAM-SHA-512=[password=alice-secret]' --entity-type users --entity-name alice

如果没有指定迭代次数,则使用默认的 4096 迭代次数。一个随机 salt 被创建,由salt、iterations、StoredKey 和 ServerKey 组成的SCRAM 身份被存储在 Zookeeper中。关于 SCRAM 身份和各个字段的详细信息,请参见RFC 5802

下面的例子还需要一个用户 admin,用于 broker 之间的通信,可以用以下方式创建。

> bin/kafka-configs.sh --zookeeper localhost:2182 --zk-tls-config-file zk_tls_config.properties --alter --add-config 'SCRAM-SHA-256=[password=admin-secret],SCRAM-SHA-512=[password=admin-secret]' --entity-type users --entity-name admin

现有的证书可以用 --describe 选项列出:

> bin/kafka-configs.sh --zookeeper localhost:2182 --zk-tls-config-file zk_tls_config.properties --describe --entity-type users --entity-name alice

可以使用 --alter --delete-config 选项删除一个或多个 SCRAM 机制的凭证:

> bin/kafka-configs.sh --zookeeper localhost:2182 --zk-tls-config-file zk_tls_config.properties --alter --delete-config 'SCRAM-SHA-512' --entity-type users --entity-name alice

配置 Kafka Brokers

  1. 在每个 Kafka broker 的配置目录下添加一个与下面类似的经过适当修改的 JAAS 文件,本例中我们称之为 kafka_server_jaas.conf:

    KafkaServer {
        org.apache.kafka.common.security.scram.ScramLoginModule required
        username="admin"
        password="admin-secret";
    };
    

    KafkaServer部分的属性 username password 被 broker 用来启动与其他 broker 的连接。在这个例子中,admin是用于 broker 之间通信的用户。

  2. Pass the JAAS config file location as JVM parameter to each Kafka broker:

    -Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf
    
  3. 在 server.properties 中配置 SASL 端口和 SASL 机制,如上所述。比如:

    listeners=SASL_SSL://host.name:port
    security.inter.broker.protocol=SASL_SSL
    sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256 (or SCRAM-SHA-512)
    sasl.enabled.mechanisms=SCRAM-SHA-256 (or SCRAM-SHA-512)
    

配置 Kafka Clients

要在客户端配置SASL认证:

  1. 在 producer.properties 或 consumer.properties 中为每个客户端配置JAAS配置属性。登录模块描述了生产者和消费者等客户端如何连接到 Kafka Broker。下面是一个为 SCRAM 机制配置客户端的例子。

    sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
        username="alice" \
        password="alice-secret";
    

    选项usernamepassword被客户端用来配置客户端连接的用户。在这个例子中,客户端以用户alice的身份连接到 broker。JVM中的不同客户端可以通过在sasl.jaas.config中指定不同的用户名和密码,以不同的用户身份连接。

    客户端的 JAAS 配置也可以指定为 JVM 参数,类似于这里描述的brokers。客户端使用名为 “KafkaClient” 的登录部分。这个选项只允许一个JVM的所有客户端连接使用一个用户。

  2. 在 producer.properties 或 consumer.properties 中配置以下属性:

    security.protocol=SASL_SSL
    sasl.mechanism=SCRAM-SHA-256 (or SCRAM-SHA-512)
    

SASL/SCRAM 的安全考虑因素

  • Kafka 中 SASL/SCRAM 的默认实现在 Zookeeper 中存储 SCRAM 凭证。这适合在 Zookeeper 是安全的并且在私有网络上的情况下生产使用。
  • Kafka 只支持强散列函数 SHA-256 和 SHA-512,最小迭代次数为4096。如果 Zookeeper 的安全性受到影响,强散列函数与强密码和高迭代次数相结合,可以防止暴力攻击。
  • SCRAM 应该只与 TLS 加密一起使用,以防止截获 SCRAM 的交换。这可以防止字典或暴力攻击,以及在 Zookeeper 被破坏的情况下防止冒充。
  • 从 Kafka 2.0 版本开始,在 Zookeeper 不安全的情况下,可以通过配置 sasl.server.callback.handler.class,使用自定义回调处理程序覆盖默认的 SASL/SCRAM 凭证存储。
  • 关于安全考虑的更多细节,请参考RFC 5802

6. 使用 SASL/OAUTHBEARER 进行认证

OAuth 2授权框架 “使第三方应用程序能够获得对HTTP服务的有限访问,可以通过协调资源所有者和HTTP服务之间的批准互动来代表资源所有者,或者允许第三方应用程序以自己的名义获得访问。” SASL OAUTHBEARER 机制能够在 SASL(即非HTTP)上下文中使用该框架;它在 RFC 7628 中定义。Kafka 中默认的 OAUTHBEARER 实现创建和验证不安全的JSON Web令牌,只适合在非生产性Kafka安装中使用。更多细节请参考安全注意事项

principal.builder.class 的默认实现下,OAuthBearerToken 的 principalName 被用作配置ACL等的认证 Principal

配置 Kafka Brokers

  1. 在每个Kafka broker 的配置目录下添加一个与下面类似的经过适当修改的JAAS文件,本例中我们称之为 kafka_server_jaas.conf:

    KafkaServer {
        org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required
        unsecuredLoginStringClaim_sub="admin";
    };
    

    KafkaServer 部分的属性 unsecuredLoginStringClaim_sub 是由经纪人在启动与其他经纪人的连接时使用。在这个例子中,admin 将出现在 subject (sub) claim 中,并将成为 broker 之间通信的用户。

  2. 将 JAAS 配置文件的位置作为 JVM 参数传递给每个 Kafka broker:

    -Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf
    
  3. 在 server.properties 中配置 SASL 端口和 SAS L机制,如上所述。比如:

    listeners=SASL_SSL://host.name:port (or SASL_PLAINTEXT if non-production)
    security.inter.broker.protocol=SASL_SSL (or SASL_PLAINTEXT if non-production)
    sasl.mechanism.inter.broker.protocol=OAUTHBEARER
    sasl.enabled.mechanisms=OAUTHBEARER
    

配置 Kafka Clients

要在客户端配置SASL认证:

  1. 在producer.properties或consumer.properties中为每个客户端配置JAAS配置属性。登录模块描述了生产者和消费者等客户端如何连接到Kafka Broker。下面是一个OAUTHBEARER机制的客户端的配置例子:

    sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
        unsecuredLoginStringClaim_sub="alice";
    

    选项 unsecuredLoginStringClaim_sub 被客户用来配置 subject (sub) claim,它决定了客户端连接的用户。在这个例子中,客户端以用户 alice 的身份连接到 broker 。一个JVM中的不同客户可以通过在sasl.jaas.config中指定不同的 subject (sub) claim,以不同的用户身份连接。

    客户端的JAAS配置也可以指定为JVM参数,类似于这里描述的 brokers。客户端使用名为 “KafkaClient” 的登录部分。这个选项只允许一个JVM的所有客户端连接使用一个用户。

  2. 在 producer.properties 或 consumer.properties 中配置以下属性:

    security.protocol=SASL_SSL (or SASL_PLAINTEXT if non-production)
    sasl.mechanism=OAUTHBEARER
    
  3. SASL/OAUTHBEARER 的默认实现依赖于 jackson-databind 库。由于它是一个可选的依赖关系,用户必须通过他们的构建工具将其配置为依赖关系。

SASL/OAUTHBEARER 的无担保令牌创建选项

  • Kafka 中 SASL/OAUTHBEARER 的默认实现可以创建和验证不安全的 JSON Web 令牌。虽然只适合于非生产性使用,但它确实提供了在DEV或TEST环境下创建任意令牌的灵活性。

  • 下面是客户端支持的各种 JAAS 模块选项(如果 OAUTHBEARER 是 broker 之间的协议,则在 broker 一侧也支持)。

    创建无担保令牌的 JAAS 模块选项 文档
    unsecuredLoginStringClaim_<claimname>="value" 创建一个具有给定名称和值的String claim。除了’iat’和’exp'(这些是自动生成的),任何有效的 claim 名称都可以被指定。
    unsecuredLoginNumberClaim_<claimname>="value" 创建一个具有给定名称和值的 Number claim。除了’iat’和’exp'(这些是自动生成的),任何有效的 claim 名称都可以被指定。
    unsecuredLoginListClaim_<claimname>="value" 创建一个 String List claim ,具有给定的名称和从给定值中解析出来的值,其中第一个字符被当作分隔符。例如: `unsecuredLoginListClaim_fubar=”
    unsecuredLoginExtension_<extensionname>="value" 创建一个具有给定名称和值的String扩展。例如:unsecuredLoginExtension_traceId="123"。一个有效的扩展名是任何小写或大写的字母序列。此外,“auth “扩展名被保留。一个有效的扩展值是ASCII码1-127的任何字符组合。
    unsecuredLoginPrincipalClaimName 如果你希望持有 principal 名称的 “String” claim 要求的名称不是 “sub”,则设置为自定义 claim 要求名称。
    unsecuredLoginLifetimeSeconds 如果要将令牌过期时间设置为默认值3600秒(即1小时)之外的其他值,则设置为一个整数。exp claim 将被设置为反映过期时间。
    unsecuredLoginScopeClaimName 如果你希望持有任何标记范围的 “String” 或 “String List” claim 的名称不是 “scope”,则设置为自定义 claim 名称。

SASL/OAUTHBEARER 的无担保令牌验证选项

  • 以下是经纪人方面支持的各种JAAS模块选项,用于无担保的JSON网络令牌验证:

    JAAS Module Option for Unsecured Token Validation Documentation
    unsecuredValidatorPrincipalClaimName="value" Set to a non-empty value if you wish a particular String claim holding a principal name to be checked for existence; the default is to check for the existence of the ‘sub’ claim.
    unsecuredValidatorScopeClaimName="value" Set to a custom claim name if you wish the name of the String or String List claim holding any token scope to be something other than ‘scope’.
    unsecuredValidatorRequiredScope="value" Set to a space-delimited list of scope values if you wish the String/String List claim holding the token scope to be checked to make sure it contains certain values.
    unsecuredValidatorAllowableClockSkewMs="value" Set to a positive integer value if you wish to allow up to some number of positive milliseconds of clock skew (the default is 0).
  • 默认的不安全的 SASL/OAUTHBEARER 实现可以使用自定义的登录和SASL服务器回调处理程序进行重写(在生产环境中必须重写)。

  • 关于安全考虑的更多细节,请参考RFC 6749, Section 10

为 SASL/OAUTHBEARER 刷新令牌

Kafka 会在任何令牌过期前定期刷新,这样客户端就可以继续与 broker 建立连接。影响刷新算法操作方式的参数被指定为生产者/消费者/ broker 配置的一部分,具体如下。详细情况见其他地方的这些属性的文档。默认值通常是合理的,在这种情况下,这些配置参数就不需要明确设置。

Producer/Consumer/Broker 配置属性
sasl.login.refresh.window.factor
sasl.login.refresh.window.jitter
sasl.login.refresh.min.period.seconds
sasl.login.refresh.min.buffer.seconds

SASL/OAUTHBEARER 的安全/生产使用

生产用例将需要编写 org.apache.kafka.common.security.auth.AuthenticateCallbackHandler 的实现,该实现可以处理org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback 的实例,并通过 sasl.login.callback.handler.class 配置选项来处理非 broker 客户端,或者通过 Listener.name.ssl.oauthbearer.sasl.login.callback.handler.class 配置选项来处理 broker(当 SASL/OAUTHBEARER 为 broker 间协议时)。

生产用例还需要编写 org.apache.kafka.common.security.auth.AuthenticateCallbackHandler 的实现,该实现可以处理org.apache.kafka.common.security.oauthBearerValidatorCallback 的实例,并通过 listener.name.sasl_ssl.oauthbearer.sasl.server.callback.handler.class broker 配置选项声明它。

SASL/OAUTHBEARER 的安全考虑因素

  • Kafka 中 SASL/OAUTHBEARER 的默认实现创建并验证无担保的JSON Web令牌。这只适合于非生产使用。
  • 在生产环境中,OAUTHBEARER 应该只使用 TLS 加密,以防止截获令牌。
  • 默认的不安全的 SASL/OAUTHBEARER 实现可以使用上述的自定义登录和SASL服务器回调处理程序进行覆盖(在生产环境中必须进行覆盖)。
  • 关于OAuth 2一般安全考虑的更多细节,请参考RFC 6749,第10节

7. 在 broker 中启用多个SASL机制

  1. 在 JAAS 配置文件的 KafkaServer 部分指定所有启用机制的登录模块的配置。比如说:

    KafkaServer {
        com.sun.security.auth.module.Krb5LoginModule required
        useKeyTab=true
        storeKey=true
        keyTab="/etc/security/keytabs/kafka_server.keytab"
        principal="kafka/kafka1.hostname.com@EXAMPLE.COM";
    
        org.apache.kafka.common.security.plain.PlainLoginModule required
        username="admin"
        password="admin-secret"
        user_admin="admin-secret"
        user_alice="alice-secret";
    };
    
  2. 在 server.properties 中启用 SASL 机制:

    sasl.enabled.mechanisms=GSSAPI,PLAIN,SCRAM-SHA-256,SCRAM-SHA-512,OAUTHBEARER
    
  3. 如果需要,在 server.properties 中指定 SASL 安全协议和 broker 之间的通信机制:

    security.inter.broker.protocol=SASL_PLAINTEXT (or SASL_SSL)
    sasl.mechanism.inter.broker.protocol=GSSAPI (or one of the other enabled mechanisms)
    
  4. 按照 GSSAPI (Kerberos)PLAINSCRAM OAUTHBEARER 中特定机制的步骤,为启用的机制配置SASL。

8. 在运行中的集群中修改 SASL 机制

在运行中的集群中,可以用以下顺序修改SASL机制:

  1. 启用新的 SASL 机制,将该机制添加到每个代理的 server.properties 中的 sasl.enabled. mechanisms。按照这里的描述,更新JAAS配置文件以包括两种机制。逐步跳转集群节点。
  2. 使用新机制重新启动客户。
  3. 要改变 broker 之间的通信机制(如果需要的话),将 server.properties 中的sasl.mechanism.inter.broker.protocol设置为新的机制,并再次递增地弹出集群。
  4. 要删除旧机制(如果需要的话),从 server.properties 的 sasl.enabled. mechanisms 中删除旧机制,并从JAAS配置文件中删除旧机制的条目。再次递增弹出集群。

9. 使用授权令牌进行认证

基于委托令牌的认证是一种轻量级的认证机制,以补充现有的 SASL/SSL 方法。委托令牌是 kafka broker 和客户端之间的共享秘密。委托令牌将帮助处理框架在安全环境下将工作负载分配给可用的工作者,而不需要在使用2-way SSL时分配Kerberos TGT/keytabs或密钥存储的额外成本。参见KIP-48了解更多细节。

在 principal.builder.class 的默认实现下,授权令牌的所有者被用作配置 ACL 等的认证 Principal。

使用授权令牌的典型步骤是:

  1. 用户通过SASL或SSL与Kafka集群进行认证,并获得一个授权令牌。这可以通过Admin APIs或使用kafka-delegation-tokens.sh脚本完成。
  2. 用户安全地将授权令牌传递给Kafka客户端,以便与Kafka集群进行认证。
  3. 代币所有者/更新者可以更新/终止委托代币。

Token Management

A secret is used to generate and verify delegation tokens. This is supplied using config option delegation.token.secret.key. The same secret key must be configured across all the brokers. If the secret is not set or set to empty string, brokers will disable the delegation token authentication.

In the current implementation, token details are stored in Zookeeper and is suitable for use in Kafka installations where Zookeeper is on a private network. Also currently, this secret is stored as plain text in the server.properties config file. We intend to make these configurable in a future Kafka release.

A token has a current life, and a maximum renewable life. By default, tokens must be renewed once every 24 hours for up to 7 days. These can be configured using delegation.token.expiry.time.ms and delegation.token.max.lifetime.ms config options.

Tokens can also be cancelled explicitly. If a token is not renewed by the token’s expiration time or if token is beyond the max life time, it will be deleted from all broker caches as well as from zookeeper.

Creating Delegation Tokens

Tokens can be created by using Admin APIs or using kafka-delegation-tokens.sh script. Delegation token requests (create/renew/expire/describe) should be issued only on SASL or SSL authenticated channels. Tokens can not be requests if the initial authentication is done through delegation token. A token can be created by the user for that user or others as well by specifying the --owner-principal parameter. Owner/Renewers can renew or expire tokens. Owner/renewers can always describe their own tokens. To describe other tokens, a DESCRIBE_TOKEN permission needs to be added on the User resource representing the owner of the token. kafka-delegation-tokens.sh script examples are given below.

Create a delegation token:

> bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092 --create   --max-life-time-period -1 --command-config client.properties --renewer-principal User:user1

Create a delegation token for a different owner:

> bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092 --create   --max-life-time-period -1 --command-config client.properties --renewer-principal User:user1 --owner-principal User:owner1

Renew a delegation token:

> bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092 --renew    --renew-time-period -1 --command-config client.properties --hmac ABCDEFGHIJK

Expire a delegation token:

> bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092 --expire   --expiry-time-period -1   --command-config client.properties  --hmac ABCDEFGHIJK

Existing tokens can be described using the –describe option:

> bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092 --describe --command-config client.properties  --owner-principal User:user1

Token Authentication

Delegation token authentication piggybacks on the current SASL/SCRAM authentication mechanism. We must enable SASL/SCRAM mechanism on Kafka cluster as described in here.

Configuring Kafka Clients:

  1. Configure the JAAS configuration property for each client in producer.properties or consumer.properties. The login module describes how the clients like producer and consumer can connect to the Kafka Broker. The following is an example configuration for a client for the token authentication:

    sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
        username="tokenID123" \
        password="lAYYSFmLs4bTjf+lTZ1LCHR/ZZFNA==" \
        tokenauth="true";
    

    The options username and password are used by clients to configure the token id and token HMAC. And the option tokenauth is used to indicate the server about token authentication. In this example, clients connect to the broker using token id: tokenID123. Different clients within a JVM may connect using different tokens by specifying different token details in sasl.jaas.config.

    JAAS configuration for clients may alternatively be specified as a JVM parameter similar to brokers as described here. Clients use the login section named KafkaClient. This option allows only one user for all client connections from a JVM.

Procedure to manually rotate the secret

We require a re-deployment when the secret needs to be rotated. During this process, already connected clients will continue to work. But any new connection requests and renew/expire requests with old tokens can fail. Steps are given below.

  1. Expire all existing tokens.
  2. Rotate the secret by rolling upgrade, and
  3. Generate new tokens

We intend to automate this in a future Kafka release.

3.5 - 授权

授权和ACL

https://kafka.apache.org/documentation/#security_authz

Kafka 提供了一个可插拔的授权框架,它是通过服务器配置中的 authorizer.class.name 属性配置的。配置的实现必须扩展 org.apache.kafka.server.authorizer.Authorizer 。Kafka 提供了默认的实现,将ACL存储在集群元数据中(Zookeeper或KRaft元数据日志)。对于基于Zookeeper的集群,所提供的实现是这样配置的。

authorizer.class.name=kafka.security.authorizer.AclAuthorizer

Kafka acls 的一般格式是 “Principal P是 [Allowed/Denied] Operation O From Host H on any Resource R matching ResourcePattern RP”。你可以在 KIP-11 中阅读更多关于acl结构的信息,在KIP-290中阅读资源模式。为了添加、删除或列出acls,你可以使用Kafka authorizer CLI。默认情况下,如果没有ResourcePatterns匹配特定的资源R,那么R就没有相关的acls,因此除了超级用户之外,其他任何人都不允许访问R。如果你想改变这种行为,你可以在 server.properties 中包含以下内容。

allow.everyone.if.no.acl.found=true

也可以像下面这样在 server.properties 中添加超级用户(注意分界符是分号,因为SSL用户名可能包含逗号)。默认的PrincipalType字符串 “User” 是区分大小写的。

super.users=User:Bob;User:Alice

TBD