VerticaSink, SinkLog, SinkOptions. - rambabu-chamakuri/PSTL-DOC GitHub Wiki
If you are not familiar with the basic characteristics of a sink, please refer to the sinks documentation.
Usage
The Vertica Sink is basically a sink connector that exports the contents of a Kafka topic to a HDFS file. VerticaSink utilizes the VerticaCopyStream of the JDBC SDK to write data to Vertica. The criteria of these connectors for the best practices code development, schema registry integration, security have been met.
VerticaMetrics
VerticaMetrics give the information regarding the Histograms and Counters which have the details regarding the Kafka producers ProducerHistogram
, total number of rows copied RowsLoadedCouter
etc with the options from Vertica Sink Options
which in turn have the PSTL specific configuration, options for the Vertica and Kafka.
consumer
consumer has the KafkaConsumer
which is the Kafka client that consumes records from the Kafka cluster. it takes in the driverConsumerProperties
which are used to set the cosumerConfig
with the necessary properties like below sample example.
driverConsumerProperties = ModuleConfig[Object]("VerticaSink#driverConsumerProperties", defaultConsumerProperties)
.set(ConsumerConfig.getName)
For more information on Consumer check the KafkaConsumer
KafkaExecutionContext
An execution context is used to execute a program asynchronously, typically on the thread pool but not necessarily. For more information check out Execution Context
DriverManager
Our Java application should be connected to HP vertica for them to interact and the process of connecting to HP Vertica using JDBC is similar as connecting to most of the databases. You can open a connection by calling DriverManager.getConnection()
method once after the imports of all the SQL packages is done.
This is driver manager gets the connection with jdbc url and jdbc properties where the connection is set auto commit to false
(AutoCommit should be disabled). The driver requires us to load the HP Vertica JDBC driver using a class.forName
method.
For more information go to DriverManager
VerticaCopyManager
VerticaCopyManager which takes care of the copy command and its operations in vertica. It takes in the vertica connection
with details of JDBC url and properties along with the VerticaSinkOptions
.
verticaLog
This creates a new VerticaSinkLog that takes in the parameters like vertica connection
, options
and metrics
.
doAddBatch
This method takes in the batch Id and the DataFrame. The batch ID is from the VerticaLog
and is made sure that its the latest batch ID. Now the schema, offsets are loaded and the timer will be started from the metrics
to load the data. The metrics are loaded with a new row count, the context will be updated with the new offsets and schema.
context = CopyContext(newOffsets, context.schema, loaded)
copyThreadSignal
This copyThreadSignal
is set for the AtomicBoolean to be false and further used in the code when initiating the method for adding the batch. it has a condition that if copyThreadSignal
is not set for either false or true, then throw an IllegalStateException
stating that the state of the copyThreadSignal
is broken or having a bug. If you have the copyThreadSignal
, then the VerticaCopyManager
should be loaded with an incremental copy of the CopyContext that has the data schema, state of the data and offset information.
kafkaWriteStatus
This takes in the data, consumer and producer properties along with the partitions. The process of writing to kafka with all the details from the batch added previously is done so the batch can be committed along with the metadata. kafkaFetchLatestOffsets
is used to fetch the latest offsets from the topic partition and assign it to kafkaConsumer for the position of the partitions.
validateLoadedRecords
Based on the produced and consumed records, this methods validates the records that were loaded. It throws a IllegalStateException
if the records produced is not equal to consumed.
VerticaSinkLog
The VerticaSinkLog has the methods for adding a query, getting the query based on the job Id, sink Id etc from the transaction table that they were added to. It also supports to get the latest query
added and if the query limit exceeds or the table becomes unbounded over time, it also has a method to purge
the old records based on the query Id. All the operations are done with respect to the VerticaSinkOptions and metrics.
Purge = batchId – options.compactInterval
VerticaSinkOptions
This class primarily takes in the parameters and values for the PSTL specific configuration and also the vertica options. It has all the options for the different methods used in VerticaSink
, VerticaSinkLog
and executor properties.
The core logic of purging is if the batches exceed the compactInterval, the excess batches should be purged. The compact interval is set by commitLogCompactionInterval
, it can be even changed by the users depending on their requirement of how many batches should be purged. Timer
is also used to notify the amount of time taken by the purge process. KafkaOptions
contains the required information about topic, groupID, default consumer and producer properties.
Bootstrap servers specify the servers to be used when connecting to Kafka. These hostnames will be used by the driver (consumer), the executors (producer), and vertica (consumer). If you have non-trivial networking requirements for upstream vs. downstream you can provide driver and executor specific bootstrap servers in spark.sql.streaming.vertica.consumer.bootstrap.servers
and spark.sql.streaming.vertica.producer.bootstrap.servers
respectively. In this case, vertica consumers will use spark.sql.streaming.vertica.kafka.bootstrap.servers
.
val bootstrapservers = buildConf("spark.sql.streaming.vertica.kafka.bootstrap.servers")
ProducerParallelism
Dictates the amount of parallelism to allocate on the spark cluster when producing processed data to kafka for vertica to load. Parallelism is determined by the number of partitions present in the intermediate kafka topic. If the intermediate kafka contains 4 partitions, then we will allocate 4 tasks by default. If the producer parallelism is increased to 2, then we will allocate 8 tasks. Increasing the producer parallelism typically results in the need for a shuffle operation, so care should be taken with manipulating the default parallelism of 1.
val producer parallelism = buildConf("spark.sql.streaming.vertica.kafka.producerParallelism")