Physical Address
304 North Cardinal St.
Dorchester Center, MA 02124
Physical Address
304 North Cardinal St.
Dorchester Center, MA 02124
A integração do Spring Boot com o Apache Kafka é uma abordagem muito utilizada para construção de aplicações modernas que usam mensagens em tempo real e processamento de eventos assíncronos. O Apache Kafka é uma plataforma de streaming de eventos distribuída, enquanto o Spring Boot simplifica o desenvolvimento de aplicativos Java. Nesse artigo, exploraremos como criar uma conexão sólida entre o Spring Boot e o Kafka, permitindo que você construa aplicativos escaláveis e resilientes.
Para que possamos testar nossa aplicação vamos precisar do ambiente Kafka, para isso vamos configurar uma versão minimalista do ecossistema do Kafka. Para essa abordagem vamos utilizar o Docker que vai possibilitar que executemos todos os componentes necessários na nossa própria maquina, e de maneira fácil a remover depois.
Se você não possui o Docker devidamente configurado na sua maquina pode seguir esse tutorial passo a passo de como fazer a instalação.
O Kafka precisa basicamente de dois componentes para executar corretamente que é a própria aplicação do Kafka e o Zookeeper, para facilitar nós criamos um docker compose que com um único script vai subir toda a aplicação, e também adicionamos o Kafdrop, ele vai nos auxiliar a visualizar as mensagem que chega no nosso broker.
version: "3"
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
networks:
- broker-kafka
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:latest
networks:
- broker-kafka
depends_on:
- zookeeper
ports:
- 9092:9092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
kafdrop:
image: obsidiandynamics/kafdrop:latest
networks:
- broker-kafka
depends_on:
- kafka
ports:
- 9000:9000
environment:
KAFKA_BROKERCONNECT: kafka:29092
networks:
broker-kafka:
driver: bridge
docker-compose.ymlPara subir esse ambiente, basta copiar o código acima, em um arquivo com nome de docker-compose.yml, executar o terminal de comando na pasta raiz onde está localizado o arquivo, e usar o comando abaixo:
docker-compose up -d
TerminalEsse processo deve demorar um pouco, porque será necessário baixar todas as imagens necessárias para executar os contêineres, caso queira validar se os contêineres estão em execução, e não conhece muito de Docker confira esse artigo com os principais comandos.
Também é possível validar a execução acessando a aplicação do Kafdrop, é possível ver a interface, através da url https://localhost:9000.
Para que a comunicação com o Kafka aconteça vamos precisar adicionar duas dependências, e para isso vamos modificar o arquivo pom.xml acrescentando as 2 dependências abaixo.
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
XMLPara que a nossa aplicação consiga se conectar ao nosso broker, vamos precisa indicar para ela o endereço onde está sendo executado o cluster do Kafka, para isso precisamos adicionar duas configurações no arquivo application.properties da aplicação
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=app-1
application.propertiesA propriedade bootstrap-servers, indica o endereço do cluster, e a propriedade consumer.group identifica o nome do grupo da nossa aplicação, essa configuração é importante quando se vai usar aplicações escaláveis, pois cada mensagem só é distribuída para apenas um membro do mesmo grupo, pois é possível termos mais de um grupo escutando o mesmo tópico.
Agora que já fizemos todas as configurações necessárias, vamos codificar o nosso consumidor ele que será o responsável por receber as mensagens de um tópico especifico.
Como estamos usando as dependências do próprio pacote do Spring essa parte fica bem simples, precisamos apenas usar a anotação @KafkaListener
e indicar os tópicos que vamos escultar. Para o correto funcionamento dessa anotação é necessário que a classe seja um bean Spring gerenciado, então vamos anotar a classe com @Service
.
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class KafkaConsumer {
@KafkaListener(topics = "topic1")
public void listen(final String message) {
System.out.println("Received message: " + message);
}
}
JavaAgora que já temos um consumidor, precisamos produzir as mensagens para o tópico que ele esculta, em uma aplicação real é comum que o produtor fique em outra aplicação, mas como estamos fazendo apenas um exemplo prático vamos, codificá-lo aqui mesmo.
Sua implementação é bem simples basta usar a classe que a biblioteca já nos entrega KafkaTemplate
, e o próprio Spring se encarregará de injetar a implementação dela. Nessa caso poderíamos utilizar a anotação @Autowired
, mas vamos usar um conceito diferente, nas versões mais recentes do Spring, ele faz a injeção de dependência automaticamente se a nossa propriedade for final e a classe tiver um construtor para receber esse objeto, além disso no nosso projeto estamos utilizando o Lombok, que vai fazer esse construtor para nós apenas com uma anotação @AllArgsConstructor
, caso você não esteja utilizando o Lombok pode fazer um construtor de forma tradicional.
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import lombok.AllArgsConstructor;
@Service
@AllArgsConstructor
public class KafkaProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
private final String TOPIC = "topic1";
public void sendMessage(String message) {
kafkaTemplate.send(TOPIC, message);
}
}
JavaNote também que estamos definindo o tópico para onde vamos enviar a mensagem em uma propriedade privada e estática, mas isso não é obrigatório, ele pode ser passado como uma string no método send.
Pronto agora temos as duas pontas necessárias para uma comunicação, podemos testar nossa aplicação e ver essa comunicação acontecendo de fato.
Para facilitar o entendimento e testar de forma prática, vamos criar uma classe de controller que vai ser a responsável por receber um requisição e chamar o nosso produtor, quando isso acontecer, a mensagem será enviada ao Kafka, e instantaneamente será lida pelo nosso consumidor, que irá exibir no console a mensagem enviada.
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/test")
public class TestController {
@Autowired
private KafkaProducer producer;
@PostMapping
public void endpoint(@RequestBody String message) {
producer.sendMessage(message);
}
}
JavaAgora ao executar a aplicação note que ela já conecta ao broker do Kafka, e fica aguardando o recebimento de mensagens, para fazer o envio vamos fazer uma requisição para este endpoint de teste.
curl --request POST \
--url http://localhost:8080/test \
--header 'Content-Type: text/plain' \
--data 'Aprendi isso no Artefato X'
Terminalwget --quiet \
--method POST \
--header 'Content-Type: text/plain' \
--body-data 'Aprendi isso no Artefato X' \
--output-document \
- http://localhost:8080/test
TerminalO resultado esperado é uma saída no terminal semelhante a essa:
2023-10-19T16:50:22.658-03:00 INFO 91443 --- [nio-8080-exec-1] o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-1] Instantiated an idempotent producer.
2023-10-19T16:50:22.692-03:00 INFO 91443 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser : Kafka version: 3.4.1
2023-10-19T16:50:22.693-03:00 INFO 91443 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 8a516edc2755df89
2023-10-19T16:50:22.693-03:00 INFO 91443 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1697745022692
2023-10-19T16:50:22.709-03:00 INFO 91443 --- [ad | producer-1] org.apache.kafka.clients.Metadata : [Producer clientId=producer-1] Resetting the last seen epoch of partition topic1-0 to 0 since the associated topicId changed from null to hl8y1lO1TQCGmlWeNVp53w
2023-10-19T16:50:22.710-03:00 INFO 91443 --- [ad | producer-1] org.apache.kafka.clients.Metadata : [Producer clientId=producer-1] Cluster ID: nlx33ZQqQROkqJ85jFzWVw
2023-10-19T16:50:22.769-03:00 INFO 91443 --- [ad | producer-1] o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-1] ProducerId set to 0 with epoch 0
Received message: Aprendi isso no Artefato X
2023-10-19T16:58:18.565-03:00 INFO 91443 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-app-1-1, groupId=app-1] Node -1 disconnected.
2023-10-19T16:59:22.966-03:00 INFO 91443 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Node -1 disconnected.
TerminalA integração do Spring Boot com o Apache Kafka oferece a capacidade de construir aplicações altamente responsivas, escaláveis e capazes de lidar com fluxos de eventos em tempo real. Essa é uma ferramenta muito poderosa e muito utiliza em aplicações modernas que se apoiam em fluxos assíncronos e contribui para desacoplar partes de software e dividir regras de negócio em componentes específicos, aproveite para usar esse conhecimento e aplicar em seus projetos.