Spark streaming消费TBDS kafka代码讲解 - TBDSUDC/tdbs-document GitHub Wiki
Kafka开发文档
kafka开发前准备工作
- 获取认证信息,从tbdsportal界面获取认证信息
-
创建认证配置文件
. 构建kafka认证文件:
echo "security.protocol=SASL_TBDS" >> /opt/client_tbds.properties
echo "sasl.mechanism=TBDS" >> /opt/client_tbds.properties
echo "sasl.tbds.secure.id=dfksxxxxxxxxxx1k5" >> /opt/client_tbds.properties
echo "sasl.tbds.secure.key=eU3xxxxxxvxxxxx" >> /opt/client_tbds.properties
echo "group.id=testgroup" >> /opt/client_tbds.properties
-
创建topic:
cd /data/bigdata/tbds/usr/hdp/2.2.0.0-2041/kafka/bin
./kafka-topics.sh --create --zookeeper ..**:2181 --replication-factor 3 --partitions 2 --topic web_click_topic
./kafka-topics.sh --describe --topic web_click_topic --zookeeper tbds-10-3-0-13:2181
-
权限修改
编码重点
- Java编码直接消费
-
Spark消费kafka消息
import org.apache.kafka.common.TopicPartition import org.apache.spark.SparkConf import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies} import org.apache.spark.streaming.{Seconds, StreamingContext} /** * spark 与 kafka 集成demo */ object SparkAndKafkaDemo { /** * Main 程序的主方法 * @param args */ def main(args: Array[String]): Unit = { /**定义程序运行的名称**/ val appName = "sparkKafkaDemo"; val groupId = "test_group" val brokers = "172.xxxxxx:6667"; /**初始化spark conf的信息,采用本地运行模式**/ val sparkConf = new SparkConf().setAppName(appName); val sc = new StreamingContext(sparkConf,Seconds(5)); //定义checkpoint var topics = Array("test01").toSet; //定义kafka的参数 val kafkaParams = collection.Map[String, Object]( "bootstrap.servers" -> brokers, "key.serializer" -> classOf[org.apache.kafka.common.serialization.StringSerializer], "value.serializer" -> classOf[org.apache.kafka.common.serialization.StringSerializer], "key.deserializer" -> classOf[org.apache.kafka.common.serialization.StringDeserializer], "value.deserializer" -> classOf[org.apache.kafka.common.serialization.StringDeserializer], "group.id" -> groupId, "auto.offset.reset" -> "latest", "enable.auto.commit" -> (true: java.lang.Boolean), "security.protocol" -> "SASL_TBDS", "sasl.mechanism" -> "TBDS", "sasl.tbds.secure.id" -> "cxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx", "sasl.tbds.secure.key" -> "27GOxxxxxxxxxxxxxxxxxxxxxxxx" ) //定义消费者,忽略offset的设置了哈 var consumerStrategy = ConsumerStrategies.Subscribe[String,String](topics,kafkaParams); //通过kafaUtils来进行数据的读取 val lines = KafkaUtils.createDirectStream(sc,LocationStrategies.PreferConsistent,consumerStrategy) //读取的数据进行转换成单词,根据空格进行分割,然后在统计技术 val words = lines.flatMap(_.topic().split(" ")).map(x=>(x,1)) //根据第一个key进行分组,然后将数据相加 words.reduceByKey(_ + _).print() sc.start() sc.awaitTermination() } }