How to Secure Confluent Kafka with SSL and SASL/SCRAM

Sharing is caring!

Overview

When I decided to include Apache Kafka as one of our technology stack, I never imagine the demand would be this huge. At first, my requirements were only two things. Something faster than ActiveMQ for MQTT needs, and future necessity for event driven approach. It started well in late 2017 and established as one of our important technology stack until now.

As a consequence, the requirements for using and storing message inside Kafka are getting bigger and complex. One of these requirements is security aspect. Even though we put Kafka inside our private network and behind firewall, still it is not sufficient enough. People started to ask about how secure Kafka is; are the connections encrypted; and how about authentication and authorization for each Kafka topic.

Therefore this article is started by securing Apache Kafka systems first. Then I will write about client connection into Kafka brokers based on role based access mechanism.

This article is a summary from security tutorial from confluent page There are lot of security methods based on this link, and I choose SASL/SCRAM method for my use case.

Since my VM installation is using Confluent version is 5.1.2, this article will based on that version.

SSL

First of all, I’ll go with securing the connection using SSL protocol. Each machine in cluster has public-private key and certificate as an identity. Because the machines are located inside private network, I decided to create self-signed certificate for each machine. Therefore I have to register all of self-signed certificate into each machine trust store (JKS).

SASL/SCRAM and JAAS

Salted Challenge Response Authentication Mechanism (SCRAM) is a family of modern, password-based challenge mechanism providing authentication of a user to a server. It can be used for password based login to services¹. Apache Kafka itself supports SCRAM-SHA-256 and SCRAM-SHA-512.

JAAS (Java Authentication and Authorization Service) is a Java implementation of Pluggable Authentication Module (PAM). We are going to use JAAS for inter-broker authentication and zookeeper-broker authentication.

Use Case

Use case for this article is upgrade existing Kafka server that has been installed in past article by adding some security layer on top of it (SASL/SCRAM) with SCRAM-SHA-256 mechanism.

And as a pre-existing setup, we already have had three-servers, which their respective IPs are 172.20.20.10 (node-1), 172.20.20.20 (node-2) and 172.20.20.30 (node-3). Each node has zookeeper and broker in their machine.

Certificate

Because some host naming issue on my Kafka environment (Vagrant), I disabled host-name verification for this tutorial. However, you can always enable it and put FQDN into CN or SAN fields while generating certificate.

One more thing, the certificate validity is set to 10 years because I am just lazy to generate new one each year (not a good behavior).

For each node, we will generate one key store and two trust store. The key store is needed as private key for broker and the trust store as a public key for broker and client (spring-kafka for instance). Just as a reminder, we need trust store because this is self-signed certificate (not trusted authority by JVM point of view).

Zookeeper

For zookeeper, the authentication is using Digest-MD5 combine with JAAS as an implementation.

Configure JAAS for zookeeper under /etc/kafka/zookeeper_jaas.conf.

Set runtime configuration for storing KAFKA_OPTS environment variables because we are using systemd to start zookeeper. And then update systemd for zookeeper.

Broker

I divided four steps for broker configurations. There are SSL configuration, JAAS configuration, SASL configuration and Listener configuration.

SSL

First step, configure keystore and truststore for each broker and security inter broker, under /etc/kafka/server.properties.

Note: please be careful if your certificate does not contain FQDN. You have to put additional properties as mentioned in line 2.

JAAS

Now we configure authentication for brokers. This configuration is intended as a communication between zookeeper and brokers, as well as between brokers themselves. Put the configuration under new file /etc/kafka/kafka_server_jaas.conf.

Note: in the Client part, the username and password match with the zookeeper JAAS config for user_kafka. For admin username, will be created later in the SASL part.

Set runtime configuration for storing KAFKA_OPTS environment variables because we are using systemd to start broker. And then update systemd configuration for broker.

SASL Authentication

Before we configure SASL/SCRAM authentication into brokers, we will create admin user first. We can execute this command in any broker.

Configure /etc/kafka/server.properties.

Listener

Because we are going to do rolling update, and minimized impact for existing topic, we will open three listeners with different protocols and ports for each broker.

Note: change the IP address accordingly for every broker.

Restart Zookeeper and Kafka

After all the configurations are finished, it is time to restart zookeepers and brokers. First we need to reload systemd because we changed the configuration.
Now restart zookeeper one by one, then follow by broker. You should see there are three ports, and they are ready to receive connection from client. You can choose port 9092 for PLAIN connection, 9093 for SSL connection and 9094 for SASL_SSL connection (Of course you need to provide authentication to access 9094 port). Later I will show how to connect to Kafka with SCRAM authentication by using spring boot.

And here are the final properties file.


############################# Server Basics #############################
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=2
############################# Socket Server Settings #############################
# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://:9092,SSL://:9093,SASL_SSL://:9094
# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092
advertised.host.name=172.20.20.20
advertised.listeners=PLAINTEXT://172.20.20.20:9092,SSL://172.20.20.20:9093,SASL_SSL://172.20.20.20:9094
# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3
# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8
# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400
# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400
# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600
############################# Log Basics #############################
# A comma seperated list of directories under which to store log files
log.dirs=/var/lib/kafka
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1
# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1
############################# Internal Topic Settings #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended for to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824
# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000
zookeeper.connect=172.20.20.10:2181,172.20.20.20:2181,172.20.20.30:2181
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=60000
group.initial.rebalance.delay.ms=0
ssl.endpoint.identification.algorithm=
ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks
ssl.truststore.password=password
ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks
ssl.keystore.password=password
ssl.key.password=password
security.inter.broker.protocol=SASL_SSL
# List of enabled mechanisms, can be more than one
sasl.enabled.mechanisms=SCRAM-SHA-256,PLAIN
# Specify one of of the SASL mechanisms
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
ssl.client.auth=required
# for ACL (RBAC)
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
allow.everyone.if.no.acl.found=true
super.users=User:admin


# The basic time unit in milliseconds used by ZooKeeper.
# It is used to do heartbeats and the minimum session timeout will be twice the tickTime.
tickTime=2000
# the directory where the snapshot is stored.
dataDir=/var/lib/zookeeper
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
# maxClientCnxns=0
initLimit=5
syncLimit=2
server.1=172.20.20.10:2888:3888
server.2=172.20.20.20:2888:3888
server.3=172.20.20.30:2888:3888
autopurge.snapRetainCount=3
autopurge.purgeInterval=24
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
authProvider.2=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
authProvider.3=org.apache.zookeeper.server.auth.SASLAuthenticationProvider

Conclusion

This is only first part, which is securing Zookeeper and Kafka. Next article will talk about how to connect to Kafka using SASL/SCRAM mechanism. Any missing configuration, please provide feedback. For now, try and enjoy playing with server configuration.

References

Author: ru rocker

I have been a professional software developer since 2004. Java, Python, NodeJS, and Go-lang are my favorite programming languages. I also have an interest in DevOps. I hold professional certifications: SCJP, SCWCD, PSM 1, AWS Solution Architect Associate, and AWS Solution Architect Professional.

4 thoughts on “How to Secure Confluent Kafka with SSL and SASL/SCRAM”

  1. In the zookeeper JAAS config, why do you repeat 3 times the authProvider?
    authProvider.X=org.apache.zookeeper.server.auth.SASLAuthenticationProvider

Leave a Reply

Your email address will not be published. Required fields are marked *