Kafka - rlip/java GitHub Wiki


  1. Wiadomości w partycjach są zapisywane w kolejności w jakiej zostały wysłane
  2. Consumery czytają wiadomości w kolejności w jakiej zostały zapisane w partycji
  3. Jeśli replication factor wynosi N to consumery i producery mogą tolerować n - 1 zepsutych brokerów, więc dobrze jakby wynosił 3
  4. Dopóki jest stała liczba partycji to ten sam klucz będzie szedł na tą samą partycję

Komendy:

//nowy topic: 
kafka-topics --zookeeper localhost:2181 --topic second_topic 
             --create --partitions 3 --replication-factor 1

//lista topiców:
kafka-topics --zookeeper localhost:2181 --list

//szczegóły topicu:
kafka-topics --zookeeper localhost:2181 --topic first_topic --describe

//dodanie wiadomości do kolejki
kafka-console-producer --broker-list localhost:9092 --topic first_topic
//można dodać jakiś paramter dodając --producer-property acks=all


//czytanie 
kafka-console-consumer --bootstrap-server localhost:9092 --topic first_topic  
//domyślnie czyta tylko nowe, a dodając --from-begining to od początku. 
//przypisujemy consumera do grupy dodając mu parametr --group my-first-application
//Wtedy zawsze czyta tylko nieprzeczytane przez grupę

//lista consumer groups
kafka-consumer-groups --bootstrap-server localhost:9092 --list

//szczegóły danej grupy
kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group my-first-application

kafkadiag

Topic

temat jak np. gps_signal każdy topic może mieć ileśtam partycji w każdej partycji są wiadomości mające swój topic wiadomości są tam określony czas, np 2 tygodnie, nie można ich modyfikować

Bokers

to serwery, mają id - integer topic rozkłada się na brokery w taki sposób że partycje rozdzielone są na różne borkerach (chyba że jest za mało serwerów)


Replication factor

ustawia się na ile borkerów mają się backupować partycje jest tylko 1 brocker, który jest liderem i może otrzymywać i przekazywać dane dla jednej partycji, a wszystkie pozostałe ISR (in-sync replica) tylko się synchronizują Jak Leader się zepsuje to jeden z ISR zostaje nowym leaderem, a jak poprzedni leader wstanie to próbuje zostać liderem z powrotem

Producers

zapisują dane do topiców, automatycznie wiedzą do jakiego brokera i partycji zapisać, nie trzeba tego ustawiać Są 3 możliwości zapisywania danych przez producerów:

  • acks=0 - wysyła bez potwierdzenie - możliwa jest utrata danych
  • acks=1 - wysyła z potwierdzeniem - domyślne (ograniczona, ale możliwa utraty danych)
  • acks=all - leader i ISR - wysyłają potwierdzenie (bez utraty danych)

Producery mogą wysyłać wiadomości z key. Kafka gwarantuje, że wszystkie wiadomości z takim samym key trafiają na ta samą partycję np. truck_id - jeśli potrzebujemy kolejności. Jeśli nie będzie klucza, to producer wyśle wiadomość do każdego brokera


Consumers

czytają określony topic identyfikowany przez name. Dane w ramach jednej partycji czytane są w określonej kolejności.

Mogą być w grupach. Każda consumer z grupy czyta z min. jednej partycji, a jeśli jest więcej consumerów w grupie niż partycji, to niektóre będą nieaktywne

Consumer Offsets

Kafka zapisuje offesty consumergroups w topicu: __customer_offsets Kiedy consumer z grupy przetwarza dane otrzymane od Kafki powinien zapisać offesty tych danych w tym topicu - robione automatyczne. Są 3 możliwości zapisywania tych offsetów:

  • at most once - consumer committuje offesty od razu jak otrzyma wiadomość - jak coś ujdzie potem nie tak to ta wiadomość jest utracona
  • at least once - commituje tylko po tym jak wiadomość zostanie przetworzona (zwykle preferowana) - jak coś pójdzie nie tak to wiadomość zostanie przetworzona ponownie. Trzeba się upewnić, że możliwe ponowne przetworzenie wiadomości nie będzie groźne do systemu
  • exact once - tylko pomiędzy kafkami

Broker discovery

wystarczy że klient połączy się z dowolnym borkerem, a otrzyma w metadanych adresy wszystkich borkerów


Zookeeper

  • zarządza brokerami,
  • pomaga w wyznaczeniu lidera, jak poprzedni padnie,
  • wysyła notyfikacje
  • kafka nie może pracować bez zookeepera
  • defaultowo na produkcji jest 3,5, albo 7 zookeeperów
  • też jeden jest liderem - do niego kafka zapisuje, a z pozostałych tylko odczytuje,
  • producery i consumery nie zapisują ani nie czytają z zookeperów
  • od wersji 0.10 nie przechowuję offsetów consumerów

Kafka z dockera

https://medium.com/@TimvanBaarsen/apache-kafka-cli-commands-cheat-sheet-a6f06eac01b#5745

docker-compose up -d
docker ps
docker exec -it idKontenera bash
ls $KAFKA_HOME/bin/
$KAFKA_HOME/bin/kafka-topics.sh --zookeeper zookeeper:2181 --list
$KAFKA_HOME/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
$KAFKA_HOME/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group listener-subscription

Przykłady java

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.4.0</version>
        </dependency>
-----------------------------
Producer
-----------------------------
        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

        for (int i = 0; i < 10; i++) {
            ProducerRecord<String, String> record = new ProducerRecord<>(
                    "first_topic",
                    "id_" + i,  //klucz niepotrzebny, ale gwarantuje partycje
                    "hello world " + i);
//        producer.send(record);  // tak bez callbacka
            producer.send(record, new Callback() {
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (e == null) {
                        logger.info("Producer metadata callback: " +
                                "\nTopic: " + recordMetadata.topic() +
                                "\nPartition: " + recordMetadata.partition() +
                                "\nOffset: " + recordMetadata.offset() +
                                "\nTimestamp: " + recordMetadata.timestamp());
                    } else {
                        logger.error("Error while producing", e);
                    }
                }
            }); //.get(); - tak można zmienić na synchroniczne
        }
        producer.flush();
        producer.close(); // flush + close
    }
-----------------------------
ConsumerDemo
-----------------------------
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-fourth-application");
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        consumer.subscribe(Arrays.asList("first_topic"));

        while (true){
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for(ConsumerRecord<String, String> record : records){
                logger.info("key: " + record.key() +", value: " + record.value());
                logger.info("partition: " + record.partition() +", offset: " + record.offset());
            }
        }
-----------------------------
ConsumerDemoAssignSeek
-----------------------------
        TopicPartition partitionToReadFrom = new TopicPartition("first_topic", 0);
        long offsetToReadFrom = 15L;
        consumer.assign(Arrays.asList(partitionToReadFrom));

        //seek
        consumer.seek(partitionToReadFrom, offsetToReadFrom);

        int numberOfMessagesToRead = 5;
        boolean keepOnReading = true;
        int numberOfMessagesReadSoFar = 0;

        while (keepOnReading){
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

            for(ConsumerRecord<String, String> record : records){
                numberOfMessagesReadSoFar += 1;

                logger.info("key: " + record.key() +", value: " + record.value());
                logger.info("partition: " + record.partition() +", offset: " + record.offset());

                if(numberOfMessagesReadSoFar >= numberOfMessagesToRead){
                    keepOnReading = false; //to exit while loop
                    break; //to exit for loop
                }
            }
        }
-----------------------------
ConsumerWithThread
-----------------------------
 public static void main(String[] args) {
        new ConsumerDemoWithThread().run();
    }

    private void run() {
        Logger logger = LoggerFactory.getLogger(ConsumerRunnable.class.getName());
        CountDownLatch latch = new CountDownLatch(1);

        logger.info("Creating the consumer thread");
        ConsumerRunnable myConsumerRunnable = new ConsumerRunnable(
                "127.0.0.1:9092",
                "my-sixth-application",
                "first_topic",
                latch
        );

        Thread myThread = new Thread(myConsumerRunnable);
        myThread.start();

        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            logger.info("Caught shutdown hook");
            myConsumerRunnable.shutdown();
            try {
                latch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            logger.info("Application has exited");
        }));

        try {
            latch.await();
        } catch (InterruptedException e) {
            logger.error("Application is interrupted", e);
        } finally {
            logger.info("Application is closing");
        }
    }

    public static class ConsumerRunnable implements Runnable {
        private Logger logger = LoggerFactory.getLogger(ConsumerRunnable.class.getName());

        private CountDownLatch latch;
        private KafkaConsumer<String, String> consumer;

        public ConsumerRunnable(
                String bootstrapServers,
                String groupId,
                String topic,
                CountDownLatch latch) {
            this.latch = latch;

            Properties properties = new Properties();
            properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
            properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
            properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

            consumer = new KafkaConsumer<>(properties);
            consumer.subscribe(Collections.singletonList(topic));
        }

        @Override
        public void run() {
            try {
                while (true) {
                    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

                    for (ConsumerRecord<String, String> record : records) {
                        logger.info("key: {}, value: {}", record.key(), record.partition());
                        logger.info("partition: {}, offset: {}", record.partition(), record.offset());
                    }
                }
            } catch (WakeupException e) {
                logger.info("Received shutdown signal");
            } finally {
                consumer.close();
                latch.countDown();
            }
        }

        public void shutdown() {
            //the wakeup method is a special method to interrupt consumer.poll()
            //it will throw the exception WakeupException
            consumer.wakeup();
        }
    }
⚠️ **GitHub.com Fallback** ⚠️