Conectando o Spring Boot ao Apache Kafka: Como Implementar Mensagens em Tempo Real

A integração do Spring Boot com o Apache Kafka é uma abordagem muito utilizada para construção de aplicações modernas que usam mensagens em tempo real e processamento de eventos assíncronos. O Apache Kafka é uma plataforma de streaming de eventos distribuída, enquanto o Spring Boot simplifica o desenvolvimento de aplicativos Java. Nesse artigo, exploraremos como criar uma conexão sólida entre o Spring Boot e o Kafka, permitindo que você construa aplicativos escaláveis e resilientes.

Configuração do ambiente Apache Kafka

Para que possamos testar nossa aplicação vamos precisar do ambiente Kafka, para isso vamos configurar uma versão minimalista do ecossistema do Kafka. Para essa abordagem vamos utilizar o Docker que vai possibilitar que executemos todos os componentes necessários na nossa própria maquina, e de maneira fácil a remover depois.

Se você não possui o Docker devidamente configurado na sua maquina pode seguir esse tutorial passo a passo de como fazer a instalação.

O Kafka precisa basicamente de dois componentes para executar corretamente que é a própria aplicação do Kafka e o Zookeeper, para facilitar nós criamos um docker compose que com um único script vai subir toda a aplicação, e também adicionamos o Kafdrop, ele vai nos auxiliar a visualizar as mensagem que chega no nosso broker.

version: "3"
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    networks:
      - broker-kafka
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka:latest
    networks:
      - broker-kafka
    depends_on:
      - zookeeper
    ports:
      - 9092:9092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

  kafdrop:
    image: obsidiandynamics/kafdrop:latest
    networks:
      - broker-kafka
    depends_on:
      - kafka
    ports:
      - 9000:9000
    environment:
      KAFKA_BROKERCONNECT: kafka:29092

networks:
  broker-kafka:
    driver: bridge
docker-compose.yml

Para subir esse ambiente, basta copiar o código acima, em um arquivo com nome de docker-compose.yml, executar o terminal de comando na pasta raiz onde está localizado o arquivo, e usar o comando abaixo:

docker-compose up -d
Terminal

Esse processo deve demorar um pouco, porque será necessário baixar todas as imagens necessárias para executar os contêineres, caso queira validar se os contêineres estão em execução, e não conhece muito de Docker confira esse artigo com os principais comandos.

Também é possível validar a execução acessando a aplicação do Kafdrop, é possível ver a interface, através da url https://localhost:9000.

Configurando o projeto Spring Boot

Para que a comunicação com o Kafka aconteça vamos precisar adicionar duas dependências, e para isso vamos modificar o arquivo pom.xml acrescentando as 2 dependências abaixo.

<dependency>
	<groupId>org.springframework.kafka</groupId>
	<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
	<groupId>org.springframework.kafka</groupId>
	<artifactId>spring-kafka-test</artifactId>
	<scope>test</scope>
</dependency>
XML

Configurando as propriedades de conexão

Para que a nossa aplicação consiga se conectar ao nosso broker, vamos precisa indicar para ela o endereço onde está sendo executado o cluster do Kafka, para isso precisamos adicionar duas configurações no arquivo application.properties da aplicação

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=app-1
application.properties

A propriedade bootstrap-servers, indica o endereço do cluster, e a propriedade consumer.group identifica o nome do grupo da nossa aplicação, essa configuração é importante quando se vai usar aplicações escaláveis, pois cada mensagem só é distribuída para apenas um membro do mesmo grupo, pois é possível termos mais de um grupo escutando o mesmo tópico.

Criando um consumidor

Agora que já fizemos todas as configurações necessárias, vamos codificar o nosso consumidor ele que será o responsável por receber as mensagens de um tópico especifico.

Como estamos usando as dependências do próprio pacote do Spring essa parte fica bem simples, precisamos apenas usar a anotação @KafkaListener e indicar os tópicos que vamos escultar. Para o correto funcionamento dessa anotação é necessário que a classe seja um bean Spring gerenciado, então vamos anotar a classe com @Service.

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class KafkaConsumer {

	@KafkaListener(topics = "topic1")
	public void listen(final String message) {
		System.out.println("Received message: " + message);
	}
}
Java

Criando um produtor

Agora que já temos um consumidor, precisamos produzir as mensagens para o tópico que ele esculta, em uma aplicação real é comum que o produtor fique em outra aplicação, mas como estamos fazendo apenas um exemplo prático vamos, codificá-lo aqui mesmo.

Sua implementação é bem simples basta usar a classe que a biblioteca já nos entrega KafkaTemplate, e o próprio Spring se encarregará de injetar a implementação dela. Nessa caso poderíamos utilizar a anotação @Autowired, mas vamos usar um conceito diferente, nas versões mais recentes do Spring, ele faz a injeção de dependência automaticamente se a nossa propriedade for final e a classe tiver um construtor para receber esse objeto, além disso no nosso projeto estamos utilizando o Lombok, que vai fazer esse construtor para nós apenas com uma anotação @AllArgsConstructor, caso você não esteja utilizando o Lombok pode fazer um construtor de forma tradicional.

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

import lombok.AllArgsConstructor;

@Service
@AllArgsConstructor
public class KafkaProducer {

	private final KafkaTemplate<String, String> kafkaTemplate;
	private final String TOPIC = "topic1";
	
	public void sendMessage(String message) {
		kafkaTemplate.send(TOPIC, message);
	}
}
Java

Note também que estamos definindo o tópico para onde vamos enviar a mensagem em uma propriedade privada e estática, mas isso não é obrigatório, ele pode ser passado como uma string no método send.

Pronto agora temos as duas pontas necessárias para uma comunicação, podemos testar nossa aplicação e ver essa comunicação acontecendo de fato.

Testando a aplicação

Para facilitar o entendimento e testar de forma prática, vamos criar uma classe de controller que vai ser a responsável por receber um requisição e chamar o nosso produtor, quando isso acontecer, a mensagem será enviada ao Kafka, e instantaneamente será lida pelo nosso consumidor, que irá exibir no console a mensagem enviada.

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/test")
public class TestController {

	@Autowired
	private KafkaProducer producer;
	
	@PostMapping
	public void endpoint(@RequestBody String message) {
		producer.sendMessage(message);
	}
}
Java

Agora ao executar a aplicação note que ela já conecta ao broker do Kafka, e fica aguardando o recebimento de mensagens, para fazer o envio vamos fazer uma requisição para este endpoint de teste.

Requisição usando Curl

curl --request POST \
  --url http://localhost:8080/test \
  --header 'Content-Type: text/plain' \
  --data 'Aprendi isso no Artefato X'
Terminal

Requisição usando Wget

wget --quiet \
  --method POST \
  --header 'Content-Type: text/plain' \
  --body-data 'Aprendi isso no Artefato X' \
  --output-document \
  - http://localhost:8080/test
Terminal

O resultado esperado é uma saída no terminal semelhante a essa:

2023-10-19T16:50:22.658-03:00  INFO 91443 --- [nio-8080-exec-1] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=producer-1] Instantiated an idempotent producer.
2023-10-19T16:50:22.692-03:00  INFO 91443 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 3.4.1
2023-10-19T16:50:22.693-03:00  INFO 91443 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 8a516edc2755df89
2023-10-19T16:50:22.693-03:00  INFO 91443 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1697745022692
2023-10-19T16:50:22.709-03:00  INFO 91443 --- [ad | producer-1] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-1] Resetting the last seen epoch of partition topic1-0 to 0 since the associated topicId changed from null to hl8y1lO1TQCGmlWeNVp53w
2023-10-19T16:50:22.710-03:00  INFO 91443 --- [ad | producer-1] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-1] Cluster ID: nlx33ZQqQROkqJ85jFzWVw
2023-10-19T16:50:22.769-03:00  INFO 91443 --- [ad | producer-1] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=producer-1] ProducerId set to 0 with epoch 0
Received message: Aprendi isso no Artefato X
2023-10-19T16:58:18.565-03:00  INFO 91443 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-app-1-1, groupId=app-1] Node -1 disconnected.
2023-10-19T16:59:22.966-03:00  INFO 91443 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Node -1 disconnected.
Terminal

Conclusão

A integração do Spring Boot com o Apache Kafka oferece a capacidade de construir aplicações altamente responsivas, escaláveis e capazes de lidar com fluxos de eventos em tempo real. Essa é uma ferramenta muito poderosa e muito utiliza em aplicações modernas que se apoiam em fluxos assíncronos e contribui para desacoplar partes de software e dividir regras de negócio em componentes específicos, aproveite para usar esse conhecimento e aplicar em seus projetos.

Mauricio Lima
Mauricio Lima

Bacharel em Ciência da Computação, profissional dedicado ao desenvolvimento de software e entusiasta da tecnologia.

Artigos: 65