Apache Kafka broker支持通过 SASL 进行客户端身份验证。SASL 身份验证可以与 SSL 加密同时启用(SSL 客户端身份验证将被禁用)。

支持SASL的机制:

  • GSSAPI(Kerberos)
  • OAUTHBEARER
  • SCRAM
  • PLAIN
  • Delegation Tokens
  • LDAP

JAAS配置

在实践之前先说一下配置相关的基本内容,实践中使用到的docker-compose.yml在后文有展示。
Kafka 使用 Java 身份验证和授权服务 ( JAAS ) 进行 SASL 配置。

服务端配置

文件配置方式

可以通过java.security.auth.login.config指定JAAS配置文件并传递给JVM,例如:

export KAFKA_OPTS="-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf"

kafka_server_jaas.conf的内容如下所示:

KafkaServer {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="admin-secret"
    user_admin="admin-secret"
    user_alice="alice";
};

在JAAS文件中,broker使用的section名称是KafkaServer,如果有多个listener要配置使用SASL,则可以在KafkaServer前加上listener名称作为前缀,例如sasl_ssl.KafkaServer

属性配置方式

除了使用JAAS配置文件外,还能通过broker配置属性sasl.jaas.config来配置SASL,但必须要在属性名前加上listener的前缀和SASL机制的前缀:

listener.name.{listenerName}.{saslMechanism}.sasl.jaas.config

示例:

listener.name.sasl_ssl.scram-sha-256.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
  username="admin" \
  password="admin-secret";
listener.name.sasl_ssl.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
  username="admin" \
  password="admin-secret" \
  user_admin="admin-secret" \
  user_alice="alice-secret";

官方推荐使用这种方法配置JAAS。

应用优先级

如果同时使用多种方式配置JAAS,其应用的优先级顺序:

  1. broker配置属性listener.name.{listenerName}.{saslMechanism}.sasl.jaas.config
  2. {listenerName}.KafkaServer 静态 JAAS 配置部分
  3. KafkaServer静态 JAAS 配置部分

客户端配置

配置文件方式

可以通过java.security.auth.login.config指定客户端JAAS配置文件并传递给JVM,例如:

-Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf

kafka_client_jaas.conf的内容如下所示:

KafkaClient {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="alice"
    password="alice";
};

配置属性方式

通过配置属性sasl.jaas.config配置,例如:

sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
   username="confluent" \
   password="confluent-secret";

应用优先级

如果同时指定了配置文件和配置属性,则和服务端一样,将使用配置属性。

SASL配置

配置SASL_PLAIN

PLAIN或SASL/PLAIN是一种简单的用户名/密码认证机制,通常与SSL一起使用进行加密以实现安全认证,确保认证的密码不会被明文传输。

服务端配置

  1. 在每个broker的server.properties中启用SASL/PLAIN
# List of enabled mechanisms, can be more than one
sasl.enabled.mechanisms=PLAIN

# Specify one of of the SASL mechanisms
sasl.mechanism.inter.broker.protocol=PLAIN
  1. 配置端口监听
listeners=SASL_PLAINTEXT://kafka1:9093
advertised.listeners=SASL_PLAINTEXT://localhost:9093
  1. 配置JAAS属性
listener.name.sasl_plaintext.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ 
    username="admin" \ 
    password="admin-secret" \
    user_admin="admin-secret" \
    user_alice="alice-secret";
  1. 配置认证
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
super.users=User:admin

配置SASL_SSL(SASL_PLAIN + SSL)

  1. 制作证书

启用SASL_SSL需要制作相应的SSL证书秘钥,制作方法如下,我整理成脚本,提交到了github。

git clone https://github.com/korimas/kafka-certs-create.git
cd kafka-certs-create
sh create.sh

生成的证书在target目录下。

  1. 配置server.properties(同SASL_PLAIN)
# List of enabled mechanisms, can be more than one
sasl.enabled.mechanisms=PLAIN
# Specify one of of the SASL mechanisms 
sasl.mechanism.inter.broker.protocol=PLAIN
  1. 配置JAAS(同SASL_PLAIN)
listener.name.sasl_plaintext.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="admin" \
    password="admin-secret" \
    user_admin="admin-secret" \
    user_alice="alice-secret";
  1. 配置broker之间通信协议
security.inter.broker.protocol=SASL_SSL
  1. 配置监听端口
listeners=SASL_SSL://kafka1:9093
advertised.listeners=SASL_SSL://localhost:9093
  1. 配置SSL证书,证书提前放到对应目录下
ssl.key.password=hillstone
ssl.keystore.location=/etc/kafka/secrets/server.keystore.jks
ssl.keystore.password=hillstone
ssl.truststore.location=/etc/kafka/secrets/server.truststore.jks
ssl.truststore.password=hillstone
# 关闭证书域名的校验
ssl.endpoint.identification.algorithm=
  1. 配置认证
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
super.users=User:admin

配置多种SASL机制

通过Kafka server.properties的以下选项控制:

sasl.enabled.mechanisms=GSSAPI,PLAIN

同时在对应的JAAS配置中指定所启用的机制的登录模块配置

KafkaServer {
   com.sun.security.auth.module.Krb5LoginModule required
   useKeyTab=true
   storeKey=true
   keyTab="/etc/security/keytabs/kafka_server.keytab"
   principal="kafka/kafka1.hostname.com@EXAMPLE.COM";

   org.apache.kafka.common.security.plain.PlainLoginModule required
   username="admin"
   password="admin-secret"
   user_admin="admin-secret"
   user_alice="alice-secret";
};

docker容器实践

实践配置了一套使用SASL_PLAIN + SSL的三节点集群。
使用的证书可以按上文的方法制作。
直接上docker-compose.yml

version: '2'
services:
  zookeeper:
      container_name: sasl_ssl_zookeeper
      image: confluentinc/cp-zookeeper:5.1.2
      hostname: sasl_ssl_zookeeper
      restart: always
      ports:
          - 22182:2182
      environment:
        ZOOKEEPER_CLIENT_PORT: 2182
        ZOOKEEPER_TICK_TIME: 2000
        ZOOKEEPER_MAXCLIENTCNXNS: 0
        ZOOKEEPER_AUTHPROVIDER.1: org.apache.zookeeper.server.auth.SASLAuthenticationProvider
        ZOOKEEPER_REQUIRECLIENTAUTHSCHEME: sasl
        ZOOKEEPER_JAASLOGINRENEW: 3600000
        KAFKA_OPTS: -Djava.security.auth.login.config=/etc/kafka/secrets/zk_server_jaas.conf
      volumes:
          - ./secrets:/etc/kafka/secrets
  kafka01:
      image: wurstmeister/kafka:latest
      hostname: sasl_ssl_kafka01
      container_name: sasl_ssl_kafka01
      depends_on:
          - zookeeper
      ports:
          - 29093:9093
      environment:
        KAFKA_BROKER_ID: 1
        KAFKA_ZOOKEEPER_CONNECT: 'sasl_ssl_zookeeper:2182'
        KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
        KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
        KAFKA_LISTENERS: SASL_SSL://:9093
        KAFKA_ADVERTISED_LISTENERS: SASL_SSL://10.182.51.86:29093
        KAFKA_SECURITY_INTER_BROKER_PROTOCOL: SASL_SSL
        KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: PLAIN
        KAFKA_SASL_ENABLED_MECHANISMS: PLAIN
        KAFKA_SSL_KEYSTORE_LOCATION: "/etc/kafka/secrets/server.keystore.jks"
        KAFKA_SSL_KEYSTORE_PASSWORD: "hillstone"
        KAFKA_SSL_KEY_PASSWORD: "hillstone"
        KAFKA_SSL_TRUSTSTORE_LOCATION: "/etc/kafka/secrets/server.truststore.jks"
        KAFKA_SSL_TRUSTSTORE_PASSWORD: "hillstone"
        KAFKA_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: ""
        KAFKA_SSL_CLIENT_AUTH: none
        KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.auth.SimpleAclAuthorizer
        KAFKA_OPTS: -Djava.security.auth.login.config=/etc/kafka/secrets/kafka_server_jaas.conf
        KAFKA_SUPER_USERS: User:admin
      volumes:
          - ./secrets:/etc/kafka/secrets
  kafka02:
      image: wurstmeister/kafka:latest
      hostname: sasl_ssl_kafka02
      container_name: sasl_ssl_kafka02
      depends_on:
          - zookeeper
      ports:
          - 39093:9093
      environment:
        KAFKA_BROKER_ID: 2
        KAFKA_ZOOKEEPER_CONNECT: 'sasl_ssl_zookeeper:2182'
        KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
        KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
        KAFKA_LISTENERS: SASL_SSL://:9093
        KAFKA_ADVERTISED_LISTENERS: SASL_SSL://10.182.51.86:39093
        KAFKA_SECURITY_INTER_BROKER_PROTOCOL: SASL_SSL
        KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: PLAIN
        KAFKA_SASL_ENABLED_MECHANISMS: PLAIN
        KAFKA_SSL_KEYSTORE_LOCATION: "/etc/kafka/secrets/server.keystore.jks"
        KAFKA_SSL_KEYSTORE_PASSWORD: "hillstone"
        KAFKA_SSL_KEY_PASSWORD: "hillstone"
        KAFKA_SSL_TRUSTSTORE_LOCATION: "/etc/kafka/secrets/server.truststore.jks"
        KAFKA_SSL_TRUSTSTORE_PASSWORD: "hillstone"
        KAFKA_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: ""
        KAFKA_SSL_CLIENT_AUTH: none
        KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.auth.SimpleAclAuthorizer
        KAFKA_OPTS: -Djava.security.auth.login.config=/etc/kafka/secrets/kafka_server_jaas.conf
        KAFKA_SUPER_USERS: User:admin
      volumes:
          - ./secrets:/etc/kafka/secrets
  kafka03:
      image: wurstmeister/kafka:latest
      hostname: sasl_ssl_kafka03
      container_name: sasl_ssl_kafka03
      depends_on:
          - zookeeper
      ports:
          - 49093:9093
      environment:
        KAFKA_BROKER_ID: 3
        KAFKA_ZOOKEEPER_CONNECT: 'sasl_ssl_zookeeper:2182'
        KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
        KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
        KAFKA_LISTENERS: SASL_SSL://:9093
        KAFKA_ADVERTISED_LISTENERS: SASL_SSL://10.182.51.86:49093
        KAFKA_SECURITY_INTER_BROKER_PROTOCOL: SASL_SSL
        KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: PLAIN
        KAFKA_SASL_ENABLED_MECHANISMS: PLAIN
        KAFKA_SSL_KEYSTORE_LOCATION: "/etc/kafka/secrets/server.keystore.jks"
        KAFKA_SSL_KEYSTORE_PASSWORD: "hillstone"
        KAFKA_SSL_KEY_PASSWORD: "hillstone"
        KAFKA_SSL_TRUSTSTORE_LOCATION: "/etc/kafka/secrets/server.truststore.jks"
        KAFKA_SSL_TRUSTSTORE_PASSWORD: "hillstone"
        KAFKA_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: ""
        KAFKA_SSL_CLIENT_AUTH: none
        KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.auth.SimpleAclAuthorizer
        KAFKA_OPTS: -Djava.security.auth.login.config=/etc/kafka/secrets/kafka_server_jaas.conf
        KAFKA_SUPER_USERS: User:admin
      volumes:
          - ./secrets:/etc/kafka/secrets

其中使用到的kafka_server_jaas.conf

KafkaServer {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="admin-secret"
    user_admin="admin-secret"
    user_alice="alice";
};

KafkaClient {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="alice"
    password="alice";
};

Client {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="admin-secret";
};

以及zk_server_jaas.conf

Server {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="admin-secret"
    user_admin="admin-secret";
};

启动容器直接执行

docker-compose up -d

需要注意的是,上述docker-compose.yml使用了confluentinc的zookeeper容器镜像和wurstmeister的kafka容器镜像,照理他们都各自有自己的zookeeper镜像和kafka镜像,不过在我实践中使用同一家的镜像一直无法搭建成功,有搭建成功的可以留下自己的docker-compose.yml。

Python访问

python可以使用python-kafka这个第三方的库去访问Kafka服务。

consumer

from kafka import KafkaConsumer
import time
import json

BOOTSTRAP_SERVERS='10.182.51.86:29093'
TOPIC='topic'

consumer = KafkaConsumer(TOPIC,
                                 bootstrap_servers=BOOTSTRAP_SERVERS,
                                 ssl_check_hostname=False,
                                 ssl_cafile='./certs/target/cacert.pem',
                                 auto_offset_reset='earliest',
                                 security_protocol='SASL_SSL',
                                 #ssl_context=context,
                                 sasl_mechanism='PLAIN',
                                 sasl_plain_username='admin',
                                 sasl_plain_password='admin-secret',
                                 api_version=(0,10),
                                 receive_buffer_bytes=1024,
                                 enable_auto_commit='False')
#                                 consumer_timeout_ms=1000)
# Consumption log
for msg in consumer:
    print(msg)

producer

from kafka import KafkaProducer

producer = KafkaProducer(
    bootstrap_servers="10.182.51.86:29093",
    security_protocol="SASL_SSL",
    ssl_cafile='./certs/target/cacert.pem',
    ssl_check_hostname=False,
    sasl_mechanism='PLAIN',
    sasl_plain_username='admin',
    sasl_plain_password='admin-secret',
)

future = producer.send("topic", value="hello world".encode())
result = future.get(timeout=10)
print(result)