Kafka数据接入(Java接入,spark streaming接入) - TBDSUDC/tdbs-document GitHub Wiki

Kafka开发文档

kafka开发前准备工作

  • 获取认证信息,从tbdsportal界面获取认证信息

1566546416028

  • 创建认证配置文件

    . 构建kafka认证文件:

    echo "security.protocol=SASL_TBDS" >> /opt/client_tbds.properties

    echo "sasl.mechanism=TBDS" >> /opt/client_tbds.properties

    echo "sasl.tbds.secure.id=dfksPsVdEarSjAmrQ8CoPibZgr5wLeylk1k5" >> /opt/client_tbds.properties

    echo "sasl.tbds.secure.key=eU3HwVAi9Vv2s98LyYBqGLwxe3bzNwKl" >> /opt/client_tbds.properties

    echo "group.id=testgroup" >> /opt/client_tbds.properties

  • 创建topic:

cd /data/bigdata/tbds/usr/hdp/2.2.0.0-2041/kafka/bin

./kafka-topics.sh --create --zookeeper tbds-10-3-0-13:2181,tbds-10-3-0-17:2181,tbds-10-3-0-7:2181 --replication-factor 3 --partitions 2 --topic web_click_topic

./kafka-topics.sh --describe --topic web_click_topic --zookeeper tbds-10-3-0-13:2181

  • 权限修改

    1566546585153

编码重点

1566547060075

  • Spark消费kafka消息

    import org.apache.kafka.common.TopicPartition
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    /**
      * spark 与 kafka 集成demo
      */
    object SparkAndKafkaDemo {
    
      /**
        * Main 程序的主方法
        * @param args
        */
      def main(args: Array[String]): Unit = {
        /**定义程序运行的名称**/
        val appName = "sparkKafkaDemo";
        val groupId = "test_group"
        val brokers = "172.16.16.9:6667";
        /**初始化spark conf的信息,采用本地运行模式**/
        val sparkConf = new SparkConf().setAppName(appName);
        val sc = new StreamingContext(sparkConf,Seconds(5));
        //定义checkpoint
        var topics = Array("test01").toSet;
        //定义kafka的参数
        val kafkaParams = collection.Map[String, Object](
          "bootstrap.servers" -> brokers,
          "key.serializer" -> classOf[org.apache.kafka.common.serialization.StringSerializer],
          "value.serializer" -> classOf[org.apache.kafka.common.serialization.StringSerializer],
          "key.deserializer" -> classOf[org.apache.kafka.common.serialization.StringDeserializer],
          "value.deserializer" -> classOf[org.apache.kafka.common.serialization.StringDeserializer],
          "group.id" -> groupId,
          "auto.offset.reset" -> "latest",
          "enable.auto.commit" -> (true: java.lang.Boolean),
          "security.protocol" -> "SASL_TBDS",
          "sasl.mechanism" -> "TBDS",
          "sasl.tbds.secure.id" -> "cRE5hQa2mUiKBrZ3V2tj2T8ab4owReLRdB5p",
          "sasl.tbds.secure.key" -> "27GOsG4sPPxhdgv5t1wgdqnuCW4bNUmz"
        )
    
        //定义消费者,忽略offset的设置了哈
        var consumerStrategy = ConsumerStrategies.Subscribe[String,String](topics,kafkaParams);
    
        //通过kafaUtils来进行数据的读取
        val lines = KafkaUtils.createDirectStream(sc,LocationStrategies.PreferConsistent,consumerStrategy)
    
        //读取的数据进行转换成单词,根据空格进行分割,然后在统计技术
        val words = lines.flatMap(_.topic().split(" ")).map(x=>(x,1))
        //根据第一个key进行分组,然后将数据相加
        words.reduceByKey(_ + _).print()
        sc.start()
        sc.awaitTermination()
      }
    }
    

代码demo

下载demo案例,java消费kafka

下载demo案例,spark streaming消费kafka