Switharoo Source Demo Idea (Flink SQL CLI) - wuchong/flink-hackathon GitHub Wiki

Prepare

Store at most 10mb data in kafka with log retention enabled.

cleanup.policy=delete
log.retention.check.interval.ms=10000 (10s, default is 5min)
log.retention.bytes=10485760 (10mb)

https://www.allprogrammingtutorials.com/tutorials/configuring-messages-retention-time-in-kafka.php

Sync/Backup data from Kafka to HDFS periodically

We use the Kafka message timestamp to calculate the partition file name, a partition represents Kafka's data in that range of time. Thus, we will use the partition file name to derive the handover offset, can also be other criteria in the future.

CREATE TABLE kafka (
    user_id BIGINT,
    item_id BIGINT,
    category_id BIGINT,
    behavior STRING,
    ts TIMESTAMP(3), 
    -- the above field are from Kafka value in JSON format
    kafka_ts TIMESTAMP(3)  -- this field is from Kafka message timestamp
) WITH (
    'connector' = 'kafka',
    'topic' = 'user_behavior',
    'properties.zookeeper.connect' = 'localhost:2181', 
    'properties.bootstrap.servers' = 'localhost:9092', 
    'scan.startup.mode' = 'earliest-offset',
    'format' = 'json',
    'timestamp.field' = 'kafka_ts'
);

CREATE TABLE hdfs (
    user_id BIGINT,
    item_id BIGINT,
    category_id BIGINT,
    behavior STRING,
    ts TIMESTAMP(3), 
    dt STRING
) PARTITIONED BY (dt) 
WITH (
    'connector' = 'filesystem',
    'path' = 'file:///Users/wuchong/hybrid-demo/hdfs/',
    'sink.rolling-policy.rollover-interval' = '2s',
    'sink.rolling-policy.check-interval' = '2s',
    'format' = 'csv',
    'csv.disable-quote-character' = 'true',
    'sink.partition-commit.delay' = '10s',
    'sink.partition-commit.policy.kind' = 'success-file'
);

-- start a long running streaming job to synchronizing data from Kafka to HDFS partitions
-- you can also periodically submit batch jobs every day to synchronize per-day data
INSERT INTO hdfs 
SELECT user_id, item_id, category_id, behavior, ts, SUBSTR(DATE_FORMAT(kafka_ts, 'yyyy-MM-dd HH:mm:ss'), 1, 18) || '0'
FROM kafka;
-- you can of course do some other filtering/deduplication 
-- when synchronizing the data according to your business

Run Hybrid source to read data from HDFS + Kafka

CREATE TABLE hybrid (
    user_id BIGINT,
    item_id BIGINT,
    category_id BIGINT,
    behavior STRING,
    ts TIMESTAMP(3),
    WATERMARK FOR ts AS ts - INTERVAL '5' MINUTE
) WITH (
    'connector' = 'hybrid',
    'historical.connector' = 'filesystem',
    'historical.filesystem.path' = 'file:///Users/wuchong/hybrid-demo/hdfs/',
    'historical.filesystem.format' = 'csv',
    'historical.filesystem.directory.filter.enable' = 'true', -- only read success files
    'incremental.connector' = 'kafka',
    'incremental.kafka.topic' = 'user_behavior',
    'incremental.kafka.properties.zookeeper.connect' = 'localhost:2181', 
    'incremental.kafka.properties.bootstrap.servers' = 'localhost:9092', 
    'incremental.kafka.format' = 'json'
);

-- should equal to row counts in Kafka without log retention
SELECT COUNT(*) FROM hybrid; 

-- apply window on the event time
SELECT TUMBLE_START(ts, INTERVAL '10' SECOND), COUNT(*) pv, COUNT(DISTINCT user_id) uv
FROM hybrid
GROUP BY TUMBLE(ts, INTERVAL '10' SECOND);