spark笔记 - 9dian/Index GitHub Wiki

https://github.com/cloudera/spark/blob/spark2-2.4.0-cloudera1/pom.xml

配置

###begin: Kafka2HiverSpeakerDataLoader ###

export SPARK_KAFKA_VERSION=0.10

export SPARK_DIST_CLASSPATH=$(hadoop classpath)

export YARN_CONF_DIR=/etc/hadoop/conf.cloudera.yarn/

export SPARK_HOME=/opt/cloudera/parcels/SPARK2/lib/spark2

nohup spark2-submit --class midea.ai.Kafka2HiverSpeakerDataLoader --master yarn --deploy-mode cluster --driver-memory 4G --conf spark.driver.memoryOverhead=512 --executor-memory 12G --num-executors 6 /cdhdata/aipdc/aip-rt-speaker-jar-with-dependencies.jar --files /cdhdata/aipdc/speaker.conf --conf "spark.driver.extraJavaOptions=-Dconfig.file=/cdhdata/aipdc/speaker.conf" --conf "spark.executor.extraJavaOptions=-Dconfig.file=/cdhdata/aipdc/speaker.conf" --conf hive.exec.max.dynamic.partitions=10000 --conf yarn.nodemanager.resource.memory-mb=16384 --conf mapreduce.map.memory.mb=4096 --conf mapreduce.reduce.memory.mb=8192 --conf spark.yarn.am.memory=2g --conf spark.task.maxFailures=1 >> logs/speaker.log 2>&1 &

###end: Kafka2HiverSpeakerDataLoader ###

stream

POM

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.midea.ai</groupId>
  <artifactId>aip-rtflow-speaker</artifactId>
  <version>1.0-SNAPSHOT</version>
<!-- spark-core_2.11-2.2.0.cloudera1.jar -->
  <dependencies>


    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-java</artifactId>
      <version>${flink.version}</version>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-clients_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-kafka_2.11</artifactId>
      <version>${flink.version}</version>
    </dependency>

    <!--
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql-kafka-0-10_2.11 </artifactId>
      <version>${spark.version}</version>
    </dependency> -->
    <!-- https://docs.cloudera.com/documentation/enterprise/release-notes/topics/cdh_vd_cdh5_maven_repo.html#topic_7 -->

    <!--
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>2.3.1</version>
      <scope>compile</scope>
    </dependency> 

    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <version>2.11.12</version>
      <scope>provided</scope>
    </dependency>
-->
    <dependency>
      <groupId>com.typesafe</groupId>
      <artifactId>config</artifactId>
      <version>1.2.0</version>
    </dependency>

    <!--
      <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
      </dependency>-->
  </dependencies>
  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    <flink.version>1.11.1</flink.version>
    <java.version>1.8</java.version>
    <scala.binary.version>2.11</scala.binary.version>
    <maven.compiler.source>${java.version}</maven.compiler.source>
    <maven.compiler.target>${java.version}</maven.compiler.target>
    <log4j.version>2.12.1</log4j.version>
  </properties>

  <build>
    <finalName>${project.artifactId}</finalName>
    <sourceDirectory>src/main/java</sourceDirectory>
    <plugins>
      <plugin>
        <artifactId>maven-assembly-plugin</artifactId>
        <version>2.4</version>
        <executions>
          <execution>
            <id>make-assembly</id>
            <phase>package</phase>
            <goals>
              <goal>attached</goal>
            </goals>
          </execution>
        </executions>
        <configuration>
          <tarLongFileMode>gnu</tarLongFileMode>
          <descriptorRefs>
            <descriptorRef>jar-with-dependencies</descriptorRef>
          </descriptorRefs>
        </configuration>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-surefire-plugin</artifactId>
        <version>2.7</version>
        <configuration>
          <skipTests>true</skipTests>
        </configuration>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <configuration>
          <source>${java.version}</source>
          <target>${java.version}</target>
          <!--
          <compilerArgument>-Xlint:unchecked</compilerArgument>
          <compilerArgument>-Xlint:deprecation</compilerArgument>
          -->
          <compilerArgument>-Xlint:deprecation</compilerArgument>
        </configuration>
      </plugin>
    </plugins>

  </build>
  <reporting>
  </reporting>
</project>

socket

// Socket2HiveDataLoader
package xx;

import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.spark.sql.types.DataTypes.LongType;
import static org.apache.spark.sql.types.DataTypes.StringType;

public class Socket2HiveDataLoader {

    private static final Logger log = LoggerFactory.getLogger(Socket2HiveDataLoader.class);

    public static void main(String... ss) throws InterruptedException {
        Config config = ConfigFactory.load("speaker");
        Config sparkConfig = config.getConfig("spark");
        log.info("sparkConfig:\t {}", sparkConfig.toString());
        Config kafkaConfig = config.getConfig("kafka");
        log.info("kafkaConfig:\t {}", kafkaConfig);

        log.info("{}\t{}", sparkConfig.getString("warehouse-dir"), sparkConfig.getString("hive-table"));

        // Create a local StreamingContext with two working thread and batch interval of 1 second
        SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkJsonLoader")
                .set("spark.sql.warehouse.dir", sparkConfig.getString("warehouse-dir"))
                .set("hive.exec.dynamic.partition", "true")
                .set("hive.exec.dynamic.partition.mode", "nonstrict")
                .set("spark.sql.sources.partitionOverwriteMode", "dynamic");

        JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
        // Create a DStream that will connect to hostname:port, like localhost:5555
        JavaReceiverInputDStream<String> jsonLines = jssc.socketTextStream("10.16.5.35", 5555);

        StructType schema = DataTypes.createStructType(new StructField[]{
                DataTypes.createStructField("action", StringType, true),
                DataTypes.createStructField("action_vod", StringType, true),
                DataTypes.createStructField("bluetooth_status", StringType, true),
                DataTypes.createStructField("mic_status", StringType, true),
                DataTypes.createStructField("network_status", StringType, true),
                DataTypes.createStructField("speaker_device_id", StringType, true),
                DataTypes.createStructField("timestamp", StringType, true),
                DataTypes.createStructField("volume", StringType, true),
                DataTypes.createStructField("volume_status", StringType, true)
        });

        jsonLines.foreachRDD((rdd, time) -> {
            log.info("\t curr rdd count: \t ", rdd.count());
//            System.out.println(new java.util.Date() + "\t curr rdd count: \t " + rdd.count());

            // Get the singleton instance of SparkSession
            SparkSession spark = JavaSparkSessionSingleton.getInstance(rdd.context().getConf());

            JavaRDD<String> lineRDD = rdd.map(jsonLine ->
                    jsonLine
            );

            Dataset<Row> df = spark.read().schema(schema).json(lineRDD);

            Dataset<Row> df1 = df.filter(functions.length(functions.col("timestamp")).$less$eq(kafkaConfig.getInt("timestamp-length")) );

            Dataset<Row> speakerDateFrame = df1.withColumn("part_dt", functions.from_unixtime(functions.col("timestamp").cast(LongType).divide(1000), "yyyy-MM-dd"))
                    .withColumnRenamed("timestamp", "dtm_str");

            speakerDateFrame.write().mode(SaveMode.Append).partitionBy("part_dt").format("hive").saveAsTable(sparkConfig.getString("hive-table"));
//            speakerDateFrame.write().partitionBy("part_dt").insertInto(sparkConfig.getString("hive-table"));
        });

        jssc.start();
        jssc.awaitTermination();

    }

}



kafka

speaker.conf
{
  spark {
    app-name = "Online Speaker Data Stream"
    master = "yarn"
    warehouse-dir = "hdfs://r***4:8020/user/hive/warehouse"
    hive-table = "aipdc.speaker_orionstar"
    partitions = 10000
    blockInterval = "500ms"
    streaming-duration = 3
    log-level = "INFO"
  }
  kafka {
    bootstrap-servers = "10.18.62.69:9092,10.18.62.70:9092,10.18.62.71:9092"
    group-id = "iot00_retail1"
    enable-auto-commit = false
    auto-offset-reset = "earliest"
    topic = "datarev_0x1c_state_prod_iot"
    timestamp-length = 13
  }
}

package xx;

import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;

import static org.apache.spark.sql.types.DataTypes.LongType;
import static org.apache.spark.sql.types.DataTypes.StringType;

public class Kafka2HiverSpeakerDataLoader {
    private static final Logger log = LoggerFactory.getLogger(Kafka2HiverSpeakerDataLoader.class);

    public static void main(String... ss) throws InterruptedException {
        Config config = ConfigFactory.load("speaker");
//        log.info("config:\t " + config.toString());
        Config sparkConfig = config.getConfig("spark");

        Config kafkaConfig = config.getConfig("kafka");

        log.info("sparkConfig:\t {}", sparkConfig.toString());
        log.info("kafkaConfig:\t {}", kafkaConfig);

        log.info("{}\t{}", sparkConfig.getString("warehouse-dir"), sparkConfig.getString("hive-table"));

        // Create a local StreamingContext with two working thread and batch interval of 1 second
        SparkConf conf = new SparkConf().setMaster(sparkConfig.getString("master")).setAppName(sparkConfig.getString("app-name"))
                .set("spark.sql.warehouse.dir", sparkConfig.getString("warehouse-dir"))
                .set("spark.streaming.blockInterval", sparkConfig.getString("blockInterval"))
                .set("hive.exec.dynamic.partition", "true")
                .set("hive.exec.dynamic.partition.mode", "nonstrict")
                .set("hive.exec.max.dynamic.partitions", sparkConfig.getString("partitions"))
                .set("spark.sql.sources.partitionOverwriteMode", "dynamic");

        JavaStreamingContext streamingContext = new JavaStreamingContext(conf, Durations.seconds(sparkConfig.getInt("streaming-duration")));


//        streamingContext.checkpoint("hdfs://mrpl101:8020/tmp/hive/iot/speaker_online");

        Collection<String> topics = Arrays.asList(kafkaConfig.getString("topic"));
        Map<String, Object> kafkaParams = new HashMap<>();
        kafkaParams.put("bootstrap.servers", kafkaConfig.getString("bootstrap-servers"));
        kafkaParams.put("key.deserializer", StringDeserializer.class);
        kafkaParams.put("value.deserializer", StringDeserializer.class);
        kafkaParams.put("group.id", kafkaConfig.getString("group-id"));
        kafkaParams.put("auto.offset.reset", kafkaConfig.getString("auto-offset-reset"));
        kafkaParams.put("enable.auto.commit", (Boolean) (kafkaConfig.getBoolean("enable-auto-commit")));

        // JavaInputDStream
        JavaDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(streamingContext,
                LocationStrategies.PreferConsistent(),
                ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
        );


        JavaDStream<String> jsonLines = messages.map(tuple -> tuple.value());

//        System.out.println(new java.util.Date() + "\t jsonLines print ==> ");
        jsonLines.print(3);

        StructType schema = DataTypes.createStructType(new StructField[]{
                DataTypes.createStructField("action", StringType, true),
                DataTypes.createStructField("action_vod", StringType, true),
                DataTypes.createStructField("bluetooth_status", StringType, true),
                DataTypes.createStructField("mic_status", StringType, true),
                DataTypes.createStructField("network_status", StringType, true),
                DataTypes.createStructField("speaker_device_id", StringType, true),
                DataTypes.createStructField("timestamp", StringType, true),
                DataTypes.createStructField("volume", StringType, true),
                DataTypes.createStructField("volume_status", StringType, true)
        });

        jsonLines.foreachRDD((rdd, time) -> {
            log.info("\t curr rdd count: \t ", rdd.count());
//            System.out.println(new java.util.Date() + "\t curr rdd count: \t " + rdd.count());

            // Get the singleton instance of SparkSession
            SparkSession spark = JavaSparkSessionSingleton.getInstance(rdd.context().getConf());

            JavaRDD<String> lineRDD = rdd.map(jsonLine ->
                jsonLine
            );

            Dataset<Row> df = spark.read().schema(schema).json(lineRDD);

            Dataset<Row> df1 = df.filter(functions.length(functions.col("timestamp")).$less$eq(kafkaConfig.getInt("timestamp-length")) );

            Dataset<Row> speakerDateFrame = df1.withColumn("part_dt", functions.from_unixtime(functions.col("timestamp").cast(LongType).divide(1000), "yyyy-MM-dd"))
                    .withColumnRenamed("timestamp", "dtm_str");

            speakerDateFrame.write().mode(SaveMode.Append).partitionBy("part_dt").format("hive").saveAsTable(sparkConfig.getString("hive-table"));
//            speakerDateFrame.write().partitionBy("part_dt").insertInto(sparkConfig.getString("hive-table"));
        });

        streamingContext.start();
        streamingContext.awaitTermination();

    }
}

/** singleton:
 *  Lazily instantiated instance of SparkSession
 **/
class JavaSparkSessionSingleton {
    private static transient SparkSession instance = null;
    public static SparkSession getInstance(SparkConf sparkConf) {
        if (instance == null) {
            instance = SparkSession
                    .builder()
                    .config(sparkConf).getOrCreate();
        }
        return instance;
    }
}

###begin: Kafka2HiverSpeakerDataLoader ###

export SPARK_KAFKA_VERSION=0.10

export SPARK_DIST_CLASSPATH=$(hadoop classpath)

export YARN_CONF_DIR=/etc/hadoop/conf.cloudera.yarn/

export SPARK_HOME=/opt/cloudera/parcels/SPARK2/lib/spark2

nohup spark2-submit --class midea.ai.Kafka2HiverSpeakerDataLoader --master yarn --deploy-mode cluster --driver-memory 4G --conf spark.driver.memoryOverhead=512 --executor-memory 12G --num-executors 6 /cdhdata/aipdc/aip-rt-speaker-jar-with-dependencies.jar --files /cdhdata/aipdc/speaker.conf --conf "spark.driver.extraJavaOptions=-Dconfig.file=/cdhdata/aipdc/speaker.conf" --conf "spark.executor.extraJavaOptions=-Dconfig.file=/cdhdata/aipdc/speaker.conf" --conf hive.exec.max.dynamic.partitions=10000 --conf yarn.nodemanager.resource.memory-mb=16384 --conf mapreduce.map.memory.mb=4096 --conf mapreduce.reduce.memory.mb=8192 --conf spark.yarn.am.memory=2g --conf spark.task.maxFailures=1 >> logs/speaker.log 2>&1 &

###end: Kafka2HiverSpeakerDataLoader ###

⚠️ **GitHub.com Fallback** ⚠️