Overview
Apache Kafka is a distributed streaming platform which allows you to:
- Publish and subscribe to streams of records, like a message queue.
- Store stream of records.
- Process stream of records.
This article will focus only on the first use-case, using Kafka as a message queue. We will go through producing messages and consuming messages. You can go to Apache Kafka’s introduction page for more details.
Using Spring for Apache Kafka
Spring for Apache Kafka applies core Spring concepts to develop Kafka-based messaging solutions. It provides us with an abstraction over Kafka API’s so that we can focus on producing and consuming messages.
There are other Spring-based Kafka solutions such as Spring Integration and Spring Cloud Stream, but we won’t need those for now.
Installation and Setup
Kafka
This article assumes that you have installed Kafka and know how to start the server. If not, you can follow Kafka’s installation guide. If you are on Mac OS and using Homebrew, you can just do brew install kafka
to install and then brew services start kafka
to start the server.
If you have the server running, create a topic (i.e. person
). You can follow Kafka’s topic creation guide.
Project Setup
Since we will be using Spring, we can get started by initialising a Spring Boot project with Spring Initiliazr with Kafka messaging as a dependency.
In some cases we need to make sure that spring-kafka, kafka-clients and Kafka installation (broker) versions are compatible. Please use the compatibility table in the spring-kafka project page if you encounter any problems.
Sending Messages
There are several ways you can produce messages using spring-kafka. The easiest way is to use KafkaTemplate
. spring-kafka provides a default KafkaTemplate
bean which uses String
for the message key and String
for the message itself. This may change depending on the spring-kafka version used, please refer to the appropriate reference doc.
These are the bare minimum configurations we need to put into our application.properties
or application.yml
in order to use KafkaTemplate:
spring.kafka.bootstrap-servers=localhost:9092
kafka.topics.person=person
spring.kafka.bootstrap-servers
is a spring-kafka property which configures our Kafka broker address. By default this is set to localhost:9092
.
kafka.topics.person
is our custom property for the topic which we created earlier. We will need to tell KafkaTemplate
to which topic we should send our messages.
We can test our application by making our main class implement org.springframework.boot.CommandLineRunner
and using KafkaTemplate
there.
@SpringBootApplication
public class SpringKafkaApplication implements CommandLineRunner {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Value("${kafka.topics.person}")
private String topic;
public static void main(String[] args) {
SpringApplication.run(SpringKafkaApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
String message = "{\"name\": \"John Doe\", \"age\": 99}";
kafkaTemplate.send(topic, message);
}
}
When we run the application you should see a log of the the producer configuration values. A shortened version looks like this:
INFO --- [main] o.a.k.clients.producer.ProducerConfig: ProducerConfig values:
acks = 1
batch.size = 16384
bootstrap.servers = [localhost:9092]
If we want to set the message key ourselves, we can use the other send
method which takes a message key: send(String topic, String key, String data)
. The send
method is an overloaded method, please refer to the appropriate API doc as needed.
Consuming Messages
There are also several ways how we can consume messages. The easiest way is to use the @KafkaListener
annotation on a method. For more information on how you can use @KafkaListener
, please see the appropriate reference.
@Component
public class PersonEvents {
private static final Logger logger = LoggerFactory.getLogger(PersonEvents.class);
@KafkaListener(topics = "${kafka.topics.person}")
public void handle(String message) {
logger.info("Received message: {}", message);
}
}
The bare minimum information we need to give @KafkaListener
is the topic it should subscribe or listen to. The topics
attribute can be a ‘topic name’, ‘property-placeholder keys’ or ‘expressions’. Our example uses a property-placeholder key.
topics
is, actually, not the bare minimum we need for our @KafkaListener
to work. Not seen in our example is the groupId
. The groupId
uniquely identifies the group of consumer processes to which our consumer belongs. Visit Kafka’s introduction for more information.
We don’t see the groupId
attribute here because we’ve set a default one for all of the listeners in our application in application.properties
or application.yml
:
spring.kafka.consumer.group-id=bobsantosjr
spring.kafka.consumer.auto-offset-reset=earliest
If you want to override the default value set by spring.kafka.consumer.group-id
, then you can set the groupId
attribute of @KafkaListener
.
You might be wondering what spring.kafka.consumer.auto-offset-reset
is. By default, a consumer is set to get the latest
message from the topic. This means that a consumer won’t be able to read messages sent to a topic before it’s created and connected to the topic.
In our example, we are sending the message using the KafkaTemplate
and we are not sure if our consumer is ready when our message is sent. Setting the offset to earliest
will make sure that our consumer will be able to read the message our KafkaTemplate
produced. Just remember, you might not want to do this for your consumer group in a production environment.
If there are no errors, you should see the log message we have in our listener:
INFO --- [ntainer#0-0-C-1] c.bobsantosjr.springkafka.PersonEvents : Received message: {"name": "John Doe", "age": 99}
Converting Messages to POJO’s
For non-trivial applications, you would want to consume messages and convert them to objects.
And as always, spring-kafka provides us with several ways to easily convert our messages to objects.
One of them is to use a deserializer called JsonDeserializer
. However, this deserializer usually depends on message headers to infer the type conversion or you can create different consumer factory containers for each type you have. For more details on serialization and deserialization please see the reference.
An easier way to do it is to create a KafkaListenerContainerFactory
and set StringJsonMessageConverter
as its converter.
But first, we need to add the ever reliable jackson library as one of our dependencies for JSON processing:
implementation 'com.fasterxml.jackson.core:jackson-databind:2.9.8'
Let’s create a configuration class where we will override the default KafkaListenerContainerFactory
bean:
@Configuration
public class KafkaConfig {
@Bean
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory(KafkaProperties kafkaProperties) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory(kafkaProperties));
factory.setMessageConverter(new StringJsonMessageConverter());
return factory;
}
private ConsumerFactory<String, String> consumerFactory(KafkaProperties kafkaProperties) {
return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties());
}
}
If you don’t want to override the default KafkaListenerContainerFactory
bean, you can create a new bean and then set the containerFactory
attribute in your @KafkaListener
.
We need to create our POJO for the message:
public class Person {
private String name;
private int age;
// getters and setters
@Override
public String toString() {
return "Person{" + "name='" + name + '\'' + ", age=" + age + '}';
}
}
And then we can replace our handle
method’s parameter to be Person
instead of String
.
@KafkaListener(topics = "${kafka.topics.person}")
public void handle(Person person) {
logger.info("Received message: {}", person);
}
Running our application will show us a log of the message like this:
INFO --- [ntainer#0-0-C-1] c.bobsantosjr.springkafka.PersonEvents: Received message: Person{name='John Doe', age=99}
For more details on custom message conversion, you can check the reference.
Conclusion
There are a lot of ways how you can use Kafka in Java. Fortunately using spring-kafka gives us a very easy way to do messaging over Kafka. One of its main advantages is it’s very good reference and documentation. You can find the list of references and documentation here.
You can also easily customise spring-kafka by adding or changing configuration through properties. If you find yourself searching for the correct property, just search for spring.kafka
in this very helpful appendix.
The code sample is available at my GitHub. The master
branch has the simple string producer and consumer. The custom message conversion is in the json-converter
branch.