Exercise 1 - mbkeane/BigDataTechCon GitHub Wiki

Flume agent spooling from the file system using Kafka Channel and Kafka Sink

The goal of this exercise is to configure a Flume agent to read from a data file (Flume spooldir source) and use a Kafka topic for channel persistence (spooling_agent_source) and a Kafka topic for sink target (data_in).

Look at the spoolingAgent.properties configuration file in the config directory of the tutorial repo

# /tmp/BigDataTechCon/spoolingAgent.properties
# Define components of the spooling agent - really just giving names to the components configured below
spoolingAgent.sources = s1
spoolingAgent.channels = kafka-channel-1
spoolingAgent.sinks = kafka-sink-1

# A spooldir source will poll a directory for files to flume
spoolingAgent.sources.s1.type=spooldir
# Where you drop files for flume spoolingDir agent to ingest
spoolingAgent.sources.s1.spoolDir=/tmp/BigDataTechCon/spool
# What "channel" to persist the read in data too
spoolingAgent.sources.s1.channels=kafka-channel-1

# Configure the "kafka-channel-1" channel 
# Use KafkaChannel for persisting to assure delivery
# Data persisted here is stored as a protobuf
spoolingAgent.channels.kafka-channel-1.type = org.apache.flume.channel.kafka.KafkaChannel
# KafkaChannels must configure at least 1 kafka broker, typically multiple for high 
# availability.  For this tutorial we only started a single broker on localhost:9090
# This is a comma separated list of <host>:<port>
spoolingAgent.channels.kafka-channel-1.brokerList = localhost:9092
# Use the spooling_agent_channel kafka topic to persist "transient" data passing through the agent.  
spoolingAgent.channels.kafka-channel-1.topic = spooling_agent_channel
# Kafka channel requires at least one zookeeper instance, again this tutorial we only started a single
# zookeeper.  This is a comma separated list of <host>:<port>
spoolingAgent.channels.kafka-channel-1.zookeeperConnect = localhost:2181

# Configure the "kafka-sink-1" sink as a KafkaSink
spoolingAgent.sinks.kafka-sink-1.type = org.apache.flume.sink.kafka.KafkaSink
# Configure the "kafka-sink-1" sink to consume from the "kafka-channel-1" channel tied to the
# spooling_agent_channel topic above.
spoolingAgent.sinks.kafka-sink-1.channel = kafka-channel-1
# Transaction size
spoolingAgent.sinks.kafka-sink-1.batchSize = 100
# For failover purposes normally list at least 3 brokers, for demo purposes just 1
spoolingAgent.sinks.kafka-sink-1.brokerList = localhost:9092
# Configure the "kafka-sink-1" sink to be a producer to the "data_in" topic
spoolingAgent.sinks.kafka-sink-1.topic = data_in

Note the spoolDir property is /tmp/BigDataTechCon/spool, this is where the flume agent will look for files to be dropped. When a file is dropped the spooldir source will read the file, parse each line into a Flume event and put the event on the channel. Be sure this dictory exists, or modify the property to point to the directory of your choice.

Additionally notice the channel is configured to persist to the spooling_agent_channel topic and the sink is configured to sink to the data_in topic. Open a shell window and create these topics.

cd /tmp/kafka_2.10-0.8.2.2
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic spooling_agent_channel
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic data_in

# Verify the topics were created 
bin/kafka-topics.sh --list --zookeeper localhost:2181

Extract the data file in the data directory of your repository

cd /tmp/BigDataTechCon/data/
tar -xzvf crime_data.tar.gz

Running the spooldirAgent Flume agent in your IDE

Create a run configuration for a spoolingAgent Main Class:

  • org.apache.flume.node.Application

VM Options:

  • -Dflume.monitoring.type=http
  • -Dflume.monitoring.port=12344
  • -DagentName=spoolingAgent
  • -Dlog4j.configuration=file:/tmp/BigDataTechCon/configs/log4j.properties

The -DagentName option pertains to logging only, agentName is replaced in the logging path in log4j.properies

http monitoring options will start a jetty http server with agent metrics on port 12344

Program Arguments:

  • -f /tmp/BigDataTechCon/configs/spoolingAgent.properties
  • -n spoolingAgent

A Flume configuration can have properties for multiple flume agents, -n spoolingAgent is the agent to use from spoolingAgent.properties

The -f option is specifying what properties file to use to configure the "spoolingAgent" flume agent.

The spooling directory must exist for the Flume agent to start, create the spool directory

mkdir  /tmp/BigDataTechCon/spool

Execute the spoolingAgent configuration

Open up a browser and view the metrics port configured in the VM options.

From a shell window copy data into the spool directory

cd /tmp/BigDataTechCon/
cp /tmp/BigDataTechCon/data/crime_data_1.csv /tmp/BigDataTechCon/spool/.

Refresh your metrics url and see the 50,000 lines in crime_data_1.csv were flumed as flume events (event counters increased).

Note the completed file was renamed. Flume will do some book keeping on what files it has sent in the .flumespool directory. You cannot send the same file twice without error.

ls -al /tmp/BigDataTechCon/spool/
total 10048
drwxrwxr-x 3 mkeane mkeane     4096 Nov 11 10:51 .
drwxrwxr-x 9 mkeane mkeane     4096 Nov 11 10:20 ..
-rw-rw-r-- 1 mkeane mkeane 10274369 Nov 11 10:51 crime_data_1.csv.COMPLETED
drwxrwxr-x 2 mkeane mkeane     4096 Nov 11 10:51 .flumespool

Verify data from /tmp/BigDataTechCon/data/crime_data_1.csv is now in the "data_in" topic by using the console consumer to view the topic. (The console consumer will hang waiting on new data, use ctrl-c to break out).

cd /tmp/kafka_2.10-0.8.2.2
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic data_in --from-beginning

This completes exercise 1.

Exercise 2 Add a second flume agent with a Kafka source and chained interceptors

⚠️ **GitHub.com Fallback** ⚠️