Kafka Event Driven Architecture - vidyasekaran/current_learning GitHub Wiki

Use Stephen M course and E:\techbooks\kafka

Designing Event-Driven Systems_ Concepts and Patterns for Streaming Services with Apache Kafka ( PDFDrive )

and

confluent-kafka-definitive-guide-complete.pdf

Install Kafka in local refer

https://dzone.com/articles/running-apache-kafka-on-windows-os

Install kafka in linux

https://www.apache.org/dyn/closer.lua/zookeeper/zookeeper-3.6.2/apache-zookeeper-3.6.2-bin.tar.gz wget https://www.apache.org/dyn/closer.lua/zookeeper/zookeeper-3.6.2/apache-zookeeper-3.6.2-bin.tar.gz

Starting kafka and zookeper in linux

https://kafka.apache.org/quickstart https://www.digitalocean.com/community/tutorials/how-to-install-and-configure-an-apache-zookeeper-cluster-on-ubuntu-18-04

I started zookeeper in linux like

[root@ip-172-31-39-218 apache-zookeeper-3.6.2-bin]# bin/zkServer.sh start ZooKeeper JMX enabled by default Using config: /home/ec2-user/software/apache-zookeeper-3.6.2-bin/bin/../conf/zoo.cfg Starting zookeeper ... STARTED

I started kafka in linux like

[root@ip-172-31-39-218 kafka_2.13-2.7.0]# bin/kafka-server-start.sh config/server.properties

Created topic in linux like below

[root@ip-172-31-39-218 kafka_2.13-2.7.0]# bin/kafka-topics.sh --create --topic router --bootstrap-server localhost:9092 Created topic router.

[root@ip-172-31-39-218 kafka_2.13-2.7.0]# bin/kafka-topics.sh --create --topic adapter --bootstrap-server localhost:9092 Created topic adapter.

[root@ip-172-31-39-218 kafka_2.13-2.7.0]# bin/kafka-topics.sh --create --topic processor --bootstrap-server localhost:9092 Created topic processor.

[root@ip-172-31-39-218 kafka_2.13-2.7.0]#

Reference : https://kafka.apache.org/quickstart

List the kafka topics

[root@ip-172-31-39-218 kafka_2.13-2.7.0]# bin/kafka-topics.sh --zookeeper localhost:2181 --list adapter processor router

To Start Zookeeper run -- zkserver

Install zookeeper

https://www.apache.org/dyn/closer.lua/zookeeper/zookeeper-3.6.2/apache-zookeeper-3.6.2-bin.tar.gz wget https://www.apache.org/dyn/closer.lua/zookeeper/zookeeper-3.6.2/apache-zookeeper-3.6.2-bin.tar.gz

Modify below in D:\softwares\apache-zookeeper-3.5.8-bin.tar\apache-zookeeper-3.5.8-bin\conf\zoo.cfg zookeeper.connect=192.168.56.1:2181

then start zookeeper C:\WINDOWS\system32>zkserver

Start kafka (Any issue with logs change log directory in .\config\server.properties)

am using latest version of kafka - D:\softwares\kafka_2.13-2.7.0\bin\windows>.\bin\windows\kafka-server-start.bat .\config\server.properties

D:\softwares\kafka_2.12-2.6.0>.\bin\windows\kafka-server-start.bat .\config\server.properties

Creating topic

D:\softwares\kafka_2.12-2.6.0\bin\windows>kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test2

List the topics

D:\softwares\kafka_2.13-2.7.0\bin\windows>kafka-topics.bat --zookeeper localhost:2181 --list __consumer_offsets reliable-logs test

Delete a topic

D:\softwares\kafka_2.13-2.7.0\bin\windows>kafka-topics.bat --zookeeper localhost:2181 --topic test --delete

Starting kafka consumer

D:\softwares\kafka_2.12-2.6.0\bin\windows>kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test

Starting kafka Producer

D:\softwares\kafka_2.12-2.6.0\bin\windows>kafka-console-producer.bat --broker-list localhost:9092 --topic test

You can modify pom to include kafka and include kafka server details and use below mentioned consuemr and producer spring stream microservices

D:\Udemy_springms_ddd\micro-2020-april\micro-2020-april\microws2020\01-message-consumer-start

D:\Udemy_springms_ddd\micro-2020-april\micro-2020-april\microws2020\01-message-producer-start

**The Spring for Apache Kafka (spring-kafka) ** project applies core Spring concepts to the development of Kafka-based messaging solutions.

It provides a "template" as a high-level abstraction for sending messages. It also provides support for Message-driven POJOs with @KafkaListener

annotations and a "listener container". These libraries promote the use of dependency injection and declarative. In all of these cases, you will see

similarities to the JMS support in the Spring Framework and RabbitMQ support in Spring AMQP.

https://spring.io/projects/spring-kafka

Features

KafkaTemplate KafkaMessageListenerContainer @KafkaListener KafkaTransactionManager spring-kafka-test jar with embedded kafka server

Kafka producer consumer

https://dzone.com/articles/spring-cloud-stream-with-kafka

https://www.sohamkamani.com/blog/2017/11/22/how-to-install-and-run-kafka/

Refer this for

https://github.com/ivangfr/spring-cloud-stream-kafka-elasticsearch/blob/master/producer-api/src/main/resources/application.yml

Spring Cloud Stream to build highly scalable event-driven applications connected with shared messaging systems;

https://github.com/ivangfr/spring-cloud-stream-kafka-elasticsearch

  1. I created all topics mentioned such as producer.news,categorizer.news etc..
  2. Now modified application.xml to port 9092 and starting one by one 1st producer-api brokers: ${KAFKA_HOST:localhost}:${KAFKA_PORT:9092}

Error while starting the kafka broker

-deleting all logs and restarting didnot work

https://stackoverflow.com/questions/39759071/error-while-starting-kafka-broker

Changing D:\softwares\kafka_2.13-2.7.0\config\server.properties value for broker.id was set to 1 i.e broker.id=1

Kafka cluster setup, some kafka command and spark

https://sparkbyexamples.com/kafka/kafka-delete-topic/

Creating a topic in kafka and set producer and consumer

  1. Define Interface with INPUT,OUTPUT and Annotate @INPUT(INPUT - "greetings-in" & OUTPUT = "greetings-out" and Create a topic in kafka "greeting")

public interface GreetingsStreams {

String INPUT = "greetings-in";
String OUTPUT = "greetings-out";

@Input(INPUT)
SubscribableChannel inboundGreetings();

@Output(OUTPUT)
MessageChannel outboundGreetings();

}

spring: cloud: stream: kafka: binder: brokers: localhost:9092 bindings: greetings-in: destination: greetings contentType: application/json greetings-out: destination: greetings contentType: application/json

  1. Start zookeeper and kafka and then create a topic in kafka "greeting"

D:\softwares\kafka_2.13-2.7.0\bin\windows>kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic greetings Created topic greetings.

  1. Start service and hit

http://localhost:8888/greetings?message=%22JAI%20Guru%22

  1. You may define many listens to receive message

@Component public class GreetingsListener2 {

private final static Logger log = Logger.getLogger(GreetingsListener2.class.getName());

@StreamListener(GreetingsStreams.INPUT)
public void handleGreetings(@Payload Greetings greetings) {
   log.info("Received greetings - 2 : {},"+ greetings.getMessage());
}

}


@Component public class GreetingsListener1 {

private final static Logger log = Logger.getLogger(GreetingsListener2.class.getName());

@StreamListener(GreetingsStreams.INPUT)
public void handleGreetings(@Payload Greetings greetings) {
   log.info("Received greetings - 1 : {},"+ greetings.getMessage());
}

}

  1. Use a controller to invoke service

@RestController public class GreetingsController { private final GreetingsService greetingsService;

public GreetingsController(GreetingsService greetingsService) {
    this.greetingsService = greetingsService;
}

@GetMapping("/greetings")
@ResponseStatus(HttpStatus.ACCEPTED)
public void greetings(@RequestParam("message") String message) {
    Greetings greetings = new Greetings();
    greetings.setMessage(message);
    greetings.setTimestamp(System.currentTimeMillis());
    

    greetingsService.sendGreeting(greetings);
}

}

  1. Write a service to publish a message

@Service

public class GreetingsService {

private final static Logger log = Logger.getLogger(GreetingsService.class.getName());


private final GreetingsStreams greetingsStreams;

public GreetingsService(GreetingsStreams greetingsStreams) {
    this.greetingsStreams = greetingsStreams;
}

public void sendGreeting(final Greetings greetings) {
    log.info("Sending greetings {},"+ greetings.getMessage());

    MessageChannel messageChannel = greetingsStreams.outboundGreetings();
    messageChannel.send(MessageBuilder
            .withPayload(greetings)
            .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
            .build());
}

}

3 MicroServices are created namely

3 topics are created namely router, adapter, processor

http://localhost:8888/route?payment=%22pay%2010000%20to%20IOB%22

spring-cloud-streams-kafka-payment-router - REST Controller - puts payment to **adapter ** topic

spring-cloud-streams-kafka-payment-adapter - Receives payment from ** adapter ** topic and publishes it to ** processor **

spring-cloud-streams-kafka-payment-processor - Receives payment from ** processor ** topic.

Tasks to be done

Dockerize and publish artifacts to docker hub refer - https://github.com/vidyasekaran/current_learning/wiki/dockerize-and-run-multi-springboot-ms-using-docker-compose

https://github.com/vidyasekaran/current_learning/wiki/Develop-Event-driven-microservices

deploy in kubertes refer kalyan course - section 5 and git code https://github.com/stacksimplify/kubernetes-fundamentals/tree/master/07-PODs-with-YAML/kube-manifests

all in aws docker machine -

Set up spring boot project - /home/ec2-user/openshift/spring-cloud-streams-kafka-payment-router

  • to build and run commands

cd /home/ec2-user/openshift/spring-cloud-streams-kafka-payment-router

mvn clean install

docker build -t router5 .

docker run -p 8100:8100 router5 --> Throws error need to fix.

Got no binders available i had to add below dependency in pom.xml to fix. I had to replace localhost with public ip of Ec2 instance in application.yaml

https://stackoverflow.com/questions/55875428/a-default-binder-has-been-requested-but-there-are-no-binders-available-for-org

I have now installed zookeeper and kafka & created topics in kafka - for install refer above

started zookeeper in linux like [root@ip-172-31-39-218 apache-zookeeper-3.6.2-bin]# bin/zkServer.sh start

I started kafka in linux like

[root@ip-172-31-39-218 kafka_2.13-2.7.0]# bin/kafka-server-start.sh config/server.properties

Created topic in linux like below

[root@ip-172-31-39-218 kafka_2.13-2.7.0]# bin/kafka-topics.sh --create --topic router --bootstrap-server localhost:9092 Created topic router.

[root@ip-172-31-39-218 kafka_2.13-2.7.0]# bin/kafka-topics.sh --create --topic adapter --bootstrap-server localhost:9092 Created topic adapter.

[root@ip-172-31-39-218 kafka_2.13-2.7.0]# bin/kafka-topics.sh --create --topic processor --bootstrap-server localhost:9092 Created topic processor.

[root@ip-172-31-39-218 kafka_2.13-2.7.0]#

Reference : https://kafka.apache.org/quickstart