package xxx;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import static org.apache.spark.sql.types.DataTypes.LongType;
import static org.apache.spark.sql.types.DataTypes.StringType;
public class Kafka2HiverSpeakerDataLoader {
private static final Logger log = LoggerFactory.getLogger(Kafka2HiverSpeakerDataLoader.class);
public static void main(String... ss) throws InterruptedException {
Config config = ConfigFactory.load("speaker");
// log.info("config:\t " + config.toString());
Config sparkConfig = config.getConfig("spark");
Config kafkaConfig = config.getConfig("kafka");
log.info("sparkConfig:\t {}", sparkConfig.toString());
log.info("kafkaConfig:\t {}", kafkaConfig);
log.info("{}\t{}", sparkConfig.getString("warehouse-dir"), sparkConfig.getString("hive-table"));
// Create a local StreamingContext with two working thread and batch interval of 1 second
SparkConf conf = new SparkConf().setMaster(sparkConfig.getString("master")).setAppName(sparkConfig.getString("app-name"))
.set("spark.sql.warehouse.dir", sparkConfig.getString("warehouse-dir"))
.set("spark.streaming.blockInterval", sparkConfig.getString("blockInterval"))
.set("hive.exec.dynamic.partition", "true")
.set("hive.exec.dynamic.partition.mode", "nonstrict")
.set("hive.exec.max.dynamic.partitions", sparkConfig.getString("partitions"))
.set("spark.sql.sources.partitionOverwriteMode", "dynamic");
JavaStreamingContext streamingContext = new JavaStreamingContext(conf, Durations.seconds(sparkConfig.getInt("streaming-duration")));
// streamingContext.checkpoint("hdfs://**101:8020/tmp/hive/iot/speaker_online");
Collection<String> topics = Arrays.asList(kafkaConfig.getString("topic"));
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", kafkaConfig.getString("bootstrap-servers"));
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", kafkaConfig.getString("group-id"));
kafkaParams.put("auto.offset.reset", kafkaConfig.getString("auto-offset-reset"));
kafkaParams.put("enable.auto.commit", (Boolean) (kafkaConfig.getBoolean("enable-auto-commit")));
// JavaInputDStream
JavaDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(streamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
);
JavaDStream<String> jsonLines = messages.map(tuple -> tuple.value());
// System.out.println(new java.util.Date() + "\t jsonLines print ==> ");
jsonLines.print(3);
StructType schema = DataTypes.createStructType(new StructField[]{
DataTypes.createStructField("action", StringType, true),
DataTypes.createStructField("action_vod", StringType, true),
DataTypes.createStructField("bluetooth_status", StringType, true),
DataTypes.createStructField("mic_status", StringType, true),
DataTypes.createStructField("network_status", StringType, true),
DataTypes.createStructField("speaker_device_id", StringType, true),
DataTypes.createStructField("timestamp", StringType, true),
DataTypes.createStructField("volume", StringType, true),
DataTypes.createStructField("volume_status", StringType, true)
});
jsonLines.foreachRDD((rdd, time) -> {
log.info("\t curr rdd count: \t ", rdd.count());
// System.out.println(new java.util.Date() + "\t curr rdd count: \t " + rdd.count());
// Get the singleton instance of SparkSession
SparkSession spark = JavaSparkSessionSingleton.getInstance(rdd.context().getConf());
JavaRDD<String> lineRDD = rdd.map(jsonLine ->
jsonLine
);
Dataset<Row> df = spark.read().schema(schema).json(lineRDD);
Dataset<Row> df1 = df.filter(functions.length(functions.col("timestamp")).$less$eq(kafkaConfig.getInt("timestamp-length")) );
Dataset<Row> speakerDateFrame = df1.withColumn("part_dt", functions.from_unixtime(functions.col("timestamp").cast(LongType).divide(1000), "yyyy-MM-dd"))
.withColumnRenamed("timestamp", "dtm_str");
speakerDateFrame.write().mode(SaveMode.Append).partitionBy("part_dt").format("hive").saveAsTable(sparkConfig.getString("hive-table"));
// speakerDateFrame.write().partitionBy("part_dt").insertInto(sparkConfig.getString("hive-table"));
});
streamingContext.start();
streamingContext.awaitTermination();
}
}
/** singleton:
* Lazily instantiated instance of SparkSession
**/
class JavaSparkSessionSingleton {
private static transient SparkSession instance = null;
public static SparkSession getInstance(SparkConf sparkConf) {
if (instance == null) {
instance = SparkSession
.builder()
.config(sparkConf).getOrCreate();
}
return instance;
}
}
package xxx;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.spark.sql.types.DataTypes.LongType;
import static org.apache.spark.sql.types.DataTypes.StringType;
public class Socket2HiveDataLoader {
private static final Logger log = LoggerFactory.getLogger(Socket2HiveDataLoader.class);
public static void main(String... ss) throws InterruptedException {
Config config = ConfigFactory.load("speaker");
Config sparkConfig = config.getConfig("spark");
log.info("sparkConfig:\t {}", sparkConfig.toString());
Config kafkaConfig = config.getConfig("kafka");
log.info("kafkaConfig:\t {}", kafkaConfig);
log.info("{}\t{}", sparkConfig.getString("warehouse-dir"), sparkConfig.getString("hive-table"));
// Create a local StreamingContext with two working thread and batch interval of 1 second
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkJsonLoader")
.set("spark.sql.warehouse.dir", sparkConfig.getString("warehouse-dir"))
.set("hive.exec.dynamic.partition", "true")
.set("hive.exec.dynamic.partition.mode", "nonstrict")
.set("spark.sql.sources.partitionOverwriteMode", "dynamic");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
// Create a DStream that will connect to hostname:port, like localhost:5555
JavaReceiverInputDStream<String> jsonLines = jssc.socketTextStream("10.16.5.35", 5555);
StructType schema = DataTypes.createStructType(new StructField[]{
DataTypes.createStructField("action", StringType, true),
DataTypes.createStructField("action_vod", StringType, true),
DataTypes.createStructField("bluetooth_status", StringType, true),
DataTypes.createStructField("mic_status", StringType, true),
DataTypes.createStructField("network_status", StringType, true),
DataTypes.createStructField("speaker_device_id", StringType, true),
DataTypes.createStructField("timestamp", StringType, true),
DataTypes.createStructField("volume", StringType, true),
DataTypes.createStructField("volume_status", StringType, true)
});
jsonLines.foreachRDD((rdd, time) -> {
log.info("\t curr rdd count: \t ", rdd.count());
// System.out.println(new java.util.Date() + "\t curr rdd count: \t " + rdd.count());
// Get the singleton instance of SparkSession
SparkSession spark = JavaSparkSessionSingleton.getInstance(rdd.context().getConf());
JavaRDD<String> lineRDD = rdd.map(jsonLine ->
jsonLine
);
Dataset<Row> df = spark.read().schema(schema).json(lineRDD);
Dataset<Row> df1 = df.filter(functions.length(functions.col("timestamp")).$less$eq(kafkaConfig.getInt("timestamp-length")) );
Dataset<Row> speakerDateFrame = df1.withColumn("part_dt", functions.from_unixtime(functions.col("timestamp").cast(LongType).divide(1000), "yyyy-MM-dd"))
.withColumnRenamed("timestamp", "dtm_str");
speakerDateFrame.write().mode(SaveMode.Append).partitionBy("part_dt").format("hive").saveAsTable(sparkConfig.getString("hive-table"));
// speakerDateFrame.write().partitionBy("part_dt").insertInto(sparkConfig.getString("hive-table"));
});
jssc.start();
jssc.awaitTermination();
}
}
package com.xq.spring.boot;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.filter.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.client.Scan;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.hadoop.hbase.HbaseTemplate;
import java.util.List;
import java.util.stream.Collectors;
@Configuration
public class HbaseConfiguration {
Logger log = LoggerFactory.getLogger(HbaseConfiguration.class);
@Value("${spring.data.hbase.zk-quorum}")
private String zkQuorum;
@Value("${spring.data.hbase.zk-base-path}")
private String zkBasePath;
@Value("${spring.data.hbase.root-dir}")
private String rootDir;
@Value("${spring.data.hbase.rpc-timeout}")
private Long rpcTimeout;
@Value("${spring.data.hbase.client-scanner-caching}")
private Long clientScannerCaching;
// @Bean
public org.apache.hadoop.conf.Configuration configuration() {
org.apache.hadoop.conf.Configuration configuration = HBaseConfiguration.create();
configuration.set("hbase.zookeeper.quorum", zkQuorum);
configuration.set("hbase.rootdir", rootDir);
configuration.set("zookeeper.znode.parent", zkBasePath);
configuration.setLong("hbase.rpc.timeout", rpcTimeout);
configuration.setLong("hbase.client.scanner.caching", clientScannerCaching);
return configuration;
}
@Bean
public HbaseTemplate hbaseTemplate() {
org.apache.hadoop.conf.Configuration configuration = configuration();
return new HbaseTemplate(configuration);
}
public List<Result> getRowKeyAndColumn(String tableName, String startRowkey, String stopRowkey, String column, String qualifier) {
FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);
if (StringUtils.isNotBlank(column)) {
log.debug("{}", column);
filterList.addFilter(new FamilyFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes(column))));
}
if (StringUtils.isNotBlank(qualifier)) {
log.debug("{}", qualifier);
filterList.addFilter(new QualifierFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes(qualifier))));
}
Scan scan = new Scan();
if (filterList.getFilters().size() > 0) {
scan.setFilter(filterList);
}
scan.setStartRow(Bytes.toBytes(startRowkey));
scan.setStopRow(Bytes.toBytes(stopRowkey));
return hbaseTemplate().find(tableName, scan, (rowMapper, rowNum) -> rowMapper);
}
public List<Result> getListRowkeyData(String tableName, List<String> rowKeys, String familyColumn, String column) {
return rowKeys.stream().map(rk -> {
if (StringUtils.isNotBlank(familyColumn)) {
if (StringUtils.isNotBlank(column)) {
return hbaseTemplate().get(tableName, rk, familyColumn, column, (rowMapper, rowNum) -> rowMapper);
} else {
return hbaseTemplate().get(tableName, rk, familyColumn, (rowMapper, rowNum) -> rowMapper);
}
}
return hbaseTemplate().get(tableName, rk, (rowMapper, rowNum) -> rowMapper);
}).collect(Collectors.toList());
}
}
package com.xq.spring.boot.kafka;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.hadoop.fs.HdfsResourceLoader;
import org.springframework.data.hadoop.fs.FsShell;
import java.net.URI;
@Configuration
public class HadoopHdfsConfiguration {
@Value("${spring.data.hadoop.fs-uri}")
private URI hdfsUri;
@Value("${spring.data.hadoop.user}")
private String user;
@Bean
public org.apache.hadoop.conf.Configuration hadoopConfiguration() {
return new org.apache.hadoop.conf.Configuration();
}
@Bean
public HdfsResourceLoader hdfsResourceLoader() {
return new HdfsResourceLoader(hadoopConfiguration(), hdfsUri, user);
}
@Bean(name = "fsSehll")
public FsShell fsSehll() {
return new FsShell(hadoopConfiguration(), hdfsResourceLoader().getFileSystem());
}
}
package com.xq.spring.boot.kafka;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.CustomScopeConfigurer;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Scope;
import org.springframework.context.annotation.ScopedProxyMode;
import org.springframework.context.support.SimpleThreadScope;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.util.ObjectUtils;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@SpringBootApplication
public class Application implements CommandLineRunner {
public static Logger logger = LoggerFactory.getLogger(Application.class);
public static final ObjectMapper mapper = new ObjectMapper();
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
// SpringApplication.run(Application.class, args).close();
}
@Autowired
private KafkaTemplate<String, String> template;
@Autowired
private ConsumerFactory consumerFactory;
private final CountDownLatch latch = new CountDownLatch(3);
@Override
public void run(String... args) throws Exception {
this.template.send("sf01", "foo1");
this.template.send("sf01", "foo2");
this.template.send("sf01", "foo3");
this.template.send("sf01", "foo4");
this.template.send("sf01", "foo5");
this.template.send("sf01", "bar1");
this.template.send("sf01", "bar2");
this.template.send("sf01", "bar3");
this.template.send("sf01", "bar4");
this.template.send("sf01", "bar5");
this.template.send("sf01", "bar6");
latch.await(3, TimeUnit.SECONDS);
logger.info("All received");
Calendar calendar = Calendar.getInstance();
calendar.set(Calendar.DAY_OF_MONTH, 1);
calendar.set(Calendar.HOUR_OF_DAY, 0);
// consumeHistory(calendar.getTimeInMillis());
}
// @KafkaListener(topics = "myTopic")
// public void listen(ConsumerRecord<?, ?> cr) throws Exception {
// logger.info("receive message from myTopic: \t{}", cr.toString());
// latch.countDown();
// }
public void consumeHistory(Long timestamp) {
KafkaConsumer consumer = (KafkaConsumer) consumerFactory.createConsumer();
// Map<String, List<PartitionInfo>> map = consumer.listTopics();
// PartitionInfo partitionInfo = map.get("datarev_0x1c_state_prod_iot").get(0);
// TopicPartition topicPartition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
//
// consumer.assign(Arrays.asList(topicPartition));
//
// Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
// timestampsToSearch.put(topicPartition, timestamp);
//
// Map<TopicPartition, OffsetAndTimestamp> map1 = consumer.offsetsForTimes(timestampsToSearch);
//
//
// consumer.seek(topicPartition, map1.get(topicPartition).offset());
// consumer.seek(topicPartition, 0L);
consumer.subscribe(Arrays.asList("testlxciot"));
while (true) {
ConsumerRecords<String, Object> records = consumer.poll(Duration.ofMillis(1000));
if (records.isEmpty()) {
break;
}
logger.info("total records = {}", records.count());
int msgNo = 0;
for (ConsumerRecord<String, Object> record : records) {
logger.info("{}\t === {}", msgNo, record.toString());
// try {
// JsonNode jsonNode = mapper.readTree(record.value().toString());
// logger.info("jsonNode:\t{}", jsonNode);
// } catch (JsonProcessingException e) {
// e.printStackTrace();
// }
msgNo++;
}
}
consumer.close();
}
/*
No; each listener container can only have one message listener instance; which must be thread-safe when using increased concurrency.
You would have to declare multiple @Bean instances for MessageConsumer to get a container for each instance.
*/
// @Bean
// public ApplicationRunner runner(KafkaTemplate<String, String> template) {
// return args -> {
// Thread.sleep(10_000);
// template.send("kgh627", 0, null, "foo");
// template.send("kgh627", 1, null, "bar");
// template.send("kgh627", 0, null, "baz");
// template.send("kgh627", 1, null, "qux");
// };
// }
//
// @Bean
// @Scope(scopeName = "thread", proxyMode = ScopedProxyMode.TARGET_CLASS) // 对bean的每个请求都将在同一个线程中返回相同的实例。
// public ThreadScopedListener tsl() {
// return new ThreadScopedListener();
// }
//
// @Bean
// public Listener listener() {
// return new Listener(tsl());
// }
//
// @Bean
// public NewTopic topic() {
// return new NewTopic("kgh627", 2, (short) 1);
// }
//
// @Bean
// public static CustomScopeConfigurer scoper() {
// CustomScopeConfigurer configurer = new CustomScopeConfigurer();
// configurer.addScope("thread", new SimpleThreadScope());
// return configurer;
// }
//
// public static class Listener {
//
// private final ThreadScopedListener listener;
//
// public Listener(ThreadScopedListener listener) {
// this.listener = listener;
// }
//
// @KafkaListener(id = "myListener", topics = "kgh627")
// public void listen(String in) {
// this.listener.listen(in);
// }
//
// }
//
// public static class ThreadScopedListener {
//
// private static final AtomicInteger instances = new AtomicInteger();
//
// private final int instance = instances.incrementAndGet();
//
// public void listen(String in) {
// System.out.println("Received: " + in + " on Thread " + Thread.currentThread().getName()
// + " in instance " + this.instance + " (" + ObjectUtils.getIdentityHexString(this) + ")");
// }
//
// }
}
package com.xq.spring.boot;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import springfox.documentation.builders.ApiInfoBuilder;
import springfox.documentation.builders.PathSelectors;
import springfox.documentation.builders.RequestHandlerSelectors;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.service.Contact;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2;
@Configuration
@EnableSwagger2
@ConditionalOnExpression("${swagger.enable:true}")
public class SwaggerConfig {
@Bean
Docket docket() {
return new Docket(DocumentationType.SWAGGER_2)
.apiInfo(apiInfo())
.select()
.apis(RequestHandlerSelectors.basePackage("com.xq.spring.boot.controller"))
.paths(PathSelectors.any())
.build();
}
private ApiInfo apiInfo() {
return new ApiInfoBuilder()
.title("API文档")
.description("语音数据迭代Dataset接口文档")
.termsOfServiceUrl("")
.version("0.1-snapshot")
.contact(new Contact("aipdc",
"aipdc",
""))
.build();
}
}
package com.xq.spring.boot;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.coprocessor.AggregationClient;
import org.apache.hadoop.hbase.client.coprocessor.LongColumnInterpreter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.jdbc.DataSourceBuilder;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Primary;
import org.springframework.data.hadoop.hbase.HbaseTemplate;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.util.StopWatch;
import javax.sql.DataSource;
import java.io.IOException;
import java.text.SimpleDateFormat;
//@EnableCaching
//@SpringBootApplication
//public class MyBootApplication implements CommandLineRunner {
//
//// @Value("${dataset.export.hdfsDir}")
//// private String hdfsDir;
//
// @Autowired
// HbaseTemplate hbaseTemplate;
//
// public static final ObjectMapper mapper = new ObjectMapper();
//// public static final JsonFactory jsonFactory = new JsonFactory();
//
// public static final SimpleDateFormat SDF = (ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"))).get();
//
// private static final Logger logger = LoggerFactory.getLogger(MyBootApplication.class);
//
// public static void main(String[] args) {
// ConfigurableApplicationContext context = SpringApplication.run(MyBootApplication.class, args);
// }
//
// @Override
// public void run(String... args) throws IOException {
// mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
//// mapper.setDateFormat(new StdDateFormat());
// mapper.setDateFormat(SDF);
//
// // scan "DeviceInfo", { LIMIT => 10 }
//
//// Scan scan1 = new Scan();
//// scan1.setMaxResultSize(20);
////
//// List list = hbaseTemplate.find("DeviceInfo", scan1, (result, rowNum) -> {
//// System.out.println("scan1 result size:\t " + result.size());
////// System.out.println("result:\t " + result);
//// return result; });
////// list.forEach(System.out::print);
//// list.forEach( e ->
//// System.out.println("scan1 list:\t " + e)
//// );
//
// rowCountByCoprocessor("DeviceInfo");
//
//// Scan scan2 = new Scan();
//// scan2.setFilter(new PageFilter(20));
//// List list = hbaseTemplate.find("DeviceInfo", scan2, (result, rowNum) -> {
//// System.out.println("scan2 result size:\t " + result.size());
//// return result; });
//// System.out.println("scan2 list size:\t " + list.size());
//// list.forEach( e ->
//// System.out.println("scan2 list:\t " + e)
//// );
//
//
//// Scan scan3 = new Scan();
//// scan3.next(10);
//// list = hbaseTemplate.find("DeviceInfo", scan3, (result, rowNum) -> {
//// System.out.println("scan3 result size:\t " + result.size());
//// return result; });
//// list.forEach( e ->
//// System.out.println("scan3 list:\t " + e)
//// );
//
// }
//
// public void rowCountByCoprocessor(String tablename){
// try {
// Connection connection = ConnectionFactory.createConnection(hbaseTemplate.getConfiguration());
// //提前创建connection和conf
// Admin admin = connection.getAdmin();
// TableName name=TableName.valueOf(tablename);
// //先disable表,添加协处理器后再enable表
// admin.disableTable(name);
// HTableDescriptor descriptor = admin.getTableDescriptor(name);
// String coprocessorClass = "org.apache.hadoop.hbase.coprocessor.AggregateImplementation";
// if (! descriptor.hasCoprocessor(coprocessorClass)) {
// descriptor.addCoprocessor(coprocessorClass);
// }
// admin.modifyTable(name, descriptor);
// admin.enableTable(name);
//
// //计时
// StopWatch stopWatch = new StopWatch();
// stopWatch.start();
//
// Scan scan = new Scan();
// AggregationClient aggregationClient = new AggregationClient(hbaseTemplate.getConfiguration());
//
// System.out.println("RowCount: " + aggregationClient.rowCount(name, new LongColumnInterpreter(), scan));
// stopWatch.stop();
// System.out.println("统计耗时:" +stopWatch.getTotalTimeMillis());
// } catch (Throwable e) {
// e.printStackTrace();
// }
// }
//
// /**
// * Usually, Spring boot creates automatically a datasource and a jdbcTemplate when the jdbc-starter is part of the dependencies.
// * Behind the scene, it checks in classpath for libraries brought by the starters, based on the presence of certain libraries it will autoconfigure beans.
// * Spring boot is said as opinionated. Also, it is worth mentioning that Spring Boot reads the application properties to auto-configure the beans.
// * This is how Spring Boot can create a datasource, but you have to respect the right names for the properties.
// *
// * In the case of multiple datasources Spring Boot can’t guess that you actually want multiple datasources.
// * Hopefully, it’s possible to override Spring Boot behaviour and define these beans ourself.
// */
// @Bean(name = "parksqlDataSource")
// @Qualifier("parksqlDataSource")
// @Primary
// @ConfigurationProperties(prefix = "spring.datasource.sparksql")
// public DataSource parksqlDataSource() {
// return DataSourceBuilder.create().build();
// }
//
// @Bean(name = "hiveDataSource")
// @Qualifier("hiveDataSource")
// @ConfigurationProperties(prefix = "spring.datasource.hive")
// public DataSource hiveDataSource() {
// return DataSourceBuilder.create().build();
// }
//
// @Bean(name = "sparksqlJdbcTemplate")
// public JdbcTemplate sparksqlJdbcTemplate(@Qualifier("parksqlDataSource") DataSource dataSource) {
// return new JdbcTemplate(dataSource);
// }
//
// @Bean(name = "hiveJdbcTemplate")
// public JdbcTemplate hiveJdbcTemplate(@Qualifier("hiveDataSource") DataSource dataSource) {
// return new JdbcTemplate(dataSource);
// }
//
//}
//////
import com.fasterxml.jackson.databind.JsonNode;
import kafka.coordinator.group.*;
import org.apache.commons.io.IOUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.junit.Assert;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.MediaType;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.client.WebClient;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.TimeUnit;
//@RunWith(SpringRunner.class)
//@SpringBootTest(classes = {MyBootApplication.class})
public class RandomExportTest {
@Value("${dataset.export.callback.rest}")
private String callbackRestUrl;
@Test
public void testConsumer() {
Properties props = new Properties();
props.setProperty("bootstrap.servers", "mvxl58434:9092,mvxl65161:9092,mvxl74089:9092");
props.setProperty("group.id", "test01");
props.setProperty("enable.auto.commit", "false");
props.put("auto.commit.interval.ms", "1000");
props.setProperty("auto.offset.reset", "earliest");
props.setProperty("max.poll.records", "50");
props.put("session.timeout.ms", "30000");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("datarev_0x1c_state_prod_iot"));
// consumer.subscribe(Arrays.asList("1sftest"));
System.out.println("Subscribed to topic " + "datarev_0x1c_state_prod_iot");
// Map<String, List<PartitionInfo>> pInfos = consumer.listTopics(Duration.ofSeconds(3));
// Set<String> topics = consumer.subscription();
// for (String topic : topics) {
// System.out.println("**************topic*************\t " + topic);
// Set<TopicPartition> tps = consumer.assignment();
// for (TopicPartition tp: tps) {
// System.out.println(tp);
// }
//
// Map<TopicPartition, Long> bOffsets = consumer.beginningOffsets(tps);
// for (Map.Entry<TopicPartition, Long> e : bOffsets.entrySet()) {
// System.out.println(String.format("topicpartition:[%s]\t beginnigoffset:[%d]\t", e.getKey(), e.getValue()));
// }
//
// }
Calendar cal = Calendar.getInstance();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(3000));
if (records.isEmpty()) {
break;
}
int msgNo = 0;
for (ConsumerRecord<String, String> record : records) {
System.out.printf("%d => topic = %s, partition = %d, offset = %d, customer = %s, country = %s\n",
++msgNo, record.topic(), record.partition(), record.offset(), record.timestamp(), record.key(), record.value());
}
// for (TopicPartition topicPartition : records.partitions()) {
// System.out.println("**************topicPartition*************\t " + topicPartition);
// for (ConsumerRecord<String, String> record : records.records(topicPartition)) {
// System.out.printf("committed: [%s]\t position: [%d]\n" , consumer.committed(topicPartition) , consumer.position(topicPartition));
// System.out.printf("==%d== \t offset = %d, key = %s, value = %s\n", msgNo++, record.offset(), record.key(), record.value());
// }
// }
// consumer.commitSync();
if (msgNo >= 50)
break;
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
// @Test
public void testConsumer2() {
Properties props = new Properties();
props.setProperty("bootstrap.servers", "mvxl58434:9092,mvxl65161:9092,mvxl74089:9092");
props.setProperty("group.id", "test02");
props.setProperty("enable.auto.commit", "false");
props.setProperty("auto.commit.interval.ms", "1000");
props.setProperty("auto.offset.reset", "earliest");
props.setProperty("max.poll.records", "10");
props.setProperty("session.timeout.ms", "30000");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
String currTopic = "1sftest";
// consumer.subscribe(Arrays.asList("1sftest"));
// 分配该消费者的TopicPartition,这里和KafkaConsumer()里不能同时配置kafka_topic
TopicPartition topicPartition = new TopicPartition(currTopic,1);
TopicPartition topicPartition1 = new TopicPartition(currTopic,0);
consumer.assign(Arrays.asList(new TopicPartition [] {topicPartition}));
// 获取currTopic(1sftest)主题的分区信息
System.out.println(consumer.partitionsFor(currTopic));
System.out.println(consumer.assignment());
System.out.println("开始偏移: " + consumer.beginningOffsets(consumer.assignment()));
OffsetAndMetadata committedOffset = consumer.committed(topicPartition);
if (null == committedOffset) {
System.out.println("committedOffset is NULL.");
// 重置此消费者消费的起始位
consumer.seek(topicPartition, 0L);
}
Long endOffset = consumer.endOffsets(Arrays.asList(new TopicPartition [] {topicPartition})).get(topicPartition);
System.out.println(String.format("已保存的偏移量: %s, 最新偏移量: %d", committedOffset, endOffset));
// Map<String, List<PartitionInfo>> pInfos = consumer.listTopics(Duration.ofSeconds(3));
// for (Map.Entry<String, List<PartitionInfo>> entry : pInfos.entrySet()) {
// String topic = entry.getKey();
// System.out.println("**************topic*************\t " + topic);
// if (!topic.equals("1sftest"))
// continue;
//
// List<PartitionInfo> pinfoList = entry.getValue();
// for (PartitionInfo pinfo : pinfoList) {
// System.out.println("partition\t " + pinfo.toString());
// TopicPartition topicPartition = new TopicPartition(entry.getKey(), pinfo.partition());
// System.out.printf("committed: [%s]\t\n" , consumer.committed(topicPartition));
//
//// System.out.printf("committed: [%s]\t position: [%d]\n" , consumer.committed(topicPartition) , consumer.position(topicPartition));
// }
// }
// committed_offset=self.consumer.committed(self.topic_partition)
// if committed_offset==None:
// ##重置此消费者消费的起始位
// self.consumer.seek(partition=self.topic_partition, offset=0)
// end_offset = self.consumer.end_offsets([self.topic_partition])[self.topic_partition]
// print('已保存的偏移量:',committed_offset,'最新偏移量:',end_offset)
}
@Test
public void givenTwoDatesBeforeJava8_whenDifferentiating_thenWeGetSix() throws ParseException {
SimpleDateFormat sdf = new SimpleDateFormat("MM/dd/yyyy", Locale.ENGLISH);
Date firstDate = sdf.parse("06/24/2017");
Date secondDate = sdf.parse("06/30/2017");
long diffInMillies = Math.abs(secondDate.getTime() - firstDate.getTime());
long diff = TimeUnit.DAYS.convert(diffInMillies, TimeUnit.MILLISECONDS);
Assert.assertEquals(diff, 6);
}
@Test
public void testTmp() throws ParseException, IOException {
UUID exportDatasetRunId = UUID.randomUUID();
// System.out.println(exportDatasetRunId.fromString())
System.out.println(exportDatasetRunId.variant());
System.out.println(exportDatasetRunId.toString());
byte [] bytes = {123, 10, 9, 34, 99, 111, 100, 101, 34, 58, 52, 48, 48, 44, 10, 9, 34, 100, 97, 116, 97, 34, 58, 110, 117, 108, 108, 44, 10, 9, 34, 109, 115, 103, 34, 58, 34, 101, 120, 112, 111, 114, 116, 95, 114, 101, 113, 117, 101, 115, 116, 95, 105, 100, 61, 32, 53, 57, 52, 50, 51, 50, 57, 51, 45, 56, 54, 55, 55, 45, 52, 53, 49, 54, 45, 56, 51, 51, 49, 45, 57, 55, 101, 53, 52, 99, 50, 57, 56, 99, 98, 101, 32, -25, -102, -124, -24, -81, -73, -26, -79, -126, -28, -72, -115, -27, -83, -104, -27, -100, -88, 34, 44, 10, 9, 34, 116, 105, 109, 101, 115, 116, 97, 109, 112, 34, 58, 34, 49, 53, 57, 52, 51, 55, 50, 51, 50, 52, 57, 52, 56, 34, 10, 125};
String s = new String(bytes);
System.out.println("s: \t" + s);
FileOutputStream fos = new FileOutputStream("test.csv");
//2020-06-15 23:15:37.594,00001C11100001021301990542160000,00001C11100001021301990542160000,请放孙宇晨的歌
IOUtils.write("time,sn,client_id,asr_text", fos, StandardCharsets.UTF_8);
IOUtils.write("\r\n", fos, StandardCharsets.UTF_8);
IOUtils.write("2020-06-08 16:09:41.325,00001C11100001021301962603020000,00001C11100001021301962603020000,我想听邓丽君的我只在乎你", fos, StandardCharsets.UTF_8);
IOUtils.write("\r\n", fos, StandardCharsets.UTF_8);
IOUtils.write("2020-06-15 23:15:37.594,00001C11100001021301990542160000,00001C11100001021301990542160000,请放孙宇晨的歌", fos, StandardCharsets.UTF_8);
}
// @Test
public void testAccumulate() throws IOException {
String jsonParam = "{ \"period\": {\"ordinal\": 1, \"start\": 20200129}}";
WebClient client = WebClient.create("http://10.133.229.233:8080/device/accumulate"); //WebClient.create(callbackRestUrl);
JsonNode res = client.post()
.contentType(MediaType.APPLICATION_JSON)
.body(BodyInserters.fromValue(jsonParam))
.retrieve()
.bodyToMono(JsonNode.class).block();
Assert.assertNotNull(res.get("detailed"));
System.out.println("res:\t" + res);
}
}
//////
package com.xq.spring.boot.kafka;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.CustomScopeConfigurer;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Scope;
import org.springframework.context.annotation.ScopedProxyMode;
import org.springframework.context.support.SimpleThreadScope;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.util.ObjectUtils;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@SpringBootApplication
public class Application implements CommandLineRunner {
public static Logger logger = LoggerFactory.getLogger(Application.class);
public static final ObjectMapper mapper = new ObjectMapper();
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
// SpringApplication.run(Application.class, args).close();
}
@Autowired
private KafkaTemplate<String, String> template;
@Autowired
private ConsumerFactory consumerFactory;
private final CountDownLatch latch = new CountDownLatch(3);
@Override
public void run(String... args) throws Exception {
this.template.send("sf01", "foo1");
this.template.send("sf01", "foo2");
this.template.send("sf01", "foo3");
this.template.send("sf01", "foo4");
this.template.send("sf01", "foo5");
this.template.send("sf01", "bar1");
this.template.send("sf01", "bar2");
this.template.send("sf01", "bar3");
this.template.send("sf01", "bar4");
this.template.send("sf01", "bar5");
this.template.send("sf01", "bar6");
latch.await(3, TimeUnit.SECONDS);
logger.info("All received");
Calendar calendar = Calendar.getInstance();
calendar.set(Calendar.DAY_OF_MONTH, 1);
calendar.set(Calendar.HOUR_OF_DAY, 0);
// consumeHistory(calendar.getTimeInMillis());
}
// @KafkaListener(topics = "myTopic")
// public void listen(ConsumerRecord<?, ?> cr) throws Exception {
// logger.info("receive message from myTopic: \t{}", cr.toString());
// latch.countDown();
// }
public void consumeHistory(Long timestamp) {
KafkaConsumer consumer = (KafkaConsumer) consumerFactory.createConsumer();
// Map<String, List<PartitionInfo>> map = consumer.listTopics();
// PartitionInfo partitionInfo = map.get("datarev_0x1c_state_prod_iot").get(0);
// TopicPartition topicPartition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
//
// consumer.assign(Arrays.asList(topicPartition));
//
// Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
// timestampsToSearch.put(topicPartition, timestamp);
//
// Map<TopicPartition, OffsetAndTimestamp> map1 = consumer.offsetsForTimes(timestampsToSearch);
//
//
// consumer.seek(topicPartition, map1.get(topicPartition).offset());
// consumer.seek(topicPartition, 0L);
consumer.subscribe(Arrays.asList("testlxciot"));
while (true) {
ConsumerRecords<String, Object> records = consumer.poll(Duration.ofMillis(1000));
if (records.isEmpty()) {
break;
}
logger.info("total records = {}", records.count());
int msgNo = 0;
for (ConsumerRecord<String, Object> record : records) {
logger.info("{}\t === {}", msgNo, record.toString());
// try {
// JsonNode jsonNode = mapper.readTree(record.value().toString());
// logger.info("jsonNode:\t{}", jsonNode);
// } catch (JsonProcessingException e) {
// e.printStackTrace();
// }
msgNo++;
}
}
consumer.close();
}
/*
No; each listener container can only have one message listener instance; which must be thread-safe when using increased concurrency.
You would have to declare multiple @Bean instances for MessageConsumer to get a container for each instance.
*/
// @Bean
// public ApplicationRunner runner(KafkaTemplate<String, String> template) {
// return args -> {
// Thread.sleep(10_000);
// template.send("kgh627", 0, null, "foo");
// template.send("kgh627", 1, null, "bar");
// template.send("kgh627", 0, null, "baz");
// template.send("kgh627", 1, null, "qux");
// };
// }
//
// @Bean
// @Scope(scopeName = "thread", proxyMode = ScopedProxyMode.TARGET_CLASS) // 对bean的每个请求都将在同一个线程中返回相同的实例。
// public ThreadScopedListener tsl() {
// return new ThreadScopedListener();
// }
//
// @Bean
// public Listener listener() {
// return new Listener(tsl());
// }
//
// @Bean
// public NewTopic topic() {
// return new NewTopic("kgh627", 2, (short) 1);
// }
//
// @Bean
// public static CustomScopeConfigurer scoper() {
// CustomScopeConfigurer configurer = new CustomScopeConfigurer();
// configurer.addScope("thread", new SimpleThreadScope());
// return configurer;
// }
//
// public static class Listener {
//
// private final ThreadScopedListener listener;
//
// public Listener(ThreadScopedListener listener) {
// this.listener = listener;
// }
//
// @KafkaListener(id = "myListener", topics = "kgh627")
// public void listen(String in) {
// this.listener.listen(in);
// }
//
// }
//
// public static class ThreadScopedListener {
//
// private static final AtomicInteger instances = new AtomicInteger();
//
// private final int instance = instances.incrementAndGet();
//
// public void listen(String in) {
// System.out.println("Received: " + in + " on Thread " + Thread.currentThread().getName()
// + " in instance " + this.instance + " (" + ObjectUtils.getIdentityHexString(this) + ")");
// }
//
// }
}