LogStash JDBC Plugin - chaitanyavangalapudi/devops-scripts GitHub Wiki
Replicating MySQL data into Elastic search using Logstash JDBC Plugin on Linux/Unix
Logstash JDBC Plugin is used to ingest data from any relational database with a JDBC interface into Logstash. LogStash supports many other input types like kafka, file, jms, stomp, kinesis. It also supports output plugins like elasticsearch, file, kafka, mongodb.
In this article, we will choose Elastic stack version 6.5.2 for all our experiments. Let us assume we are replicating data from MySQL 8.x to Elasticsearch 6.5.2. Follow the steps mentioned in the LogStash manual to install it.
Step 1: Install required Logstash Plugins
We need to install the JDBC input plugin, Aggregate/Mutate filter plugin and Elasticsearch output plugin using the following commands:
[root@localhost logstash]# which logstash-plugin
/usr/share/logstash/bin/logstash-plugin
bin/logstash-plugin install logstash-input-jdbc
bin/logstash-plugin install logstash-filter-mutate
bin/logstash-plugin install logstash-output-elasticsearch
- JDBC Input plugin (logstash-input-jdbc) – Used for reading the data from SQL DB using JDBC
- Aggregate Filter plugin (logstash-filter-aggregate) – Used for aggregating the rows from SQL DB into nested objects. Aggregates information from several events originating with a single task
- Mutate filter plugin (logstash-filter-mutate) - The mutate filter allows you to perform general mutations on fields. You can rename, remove, replace, and modify fields in your events.
- Elasticsearch output plugin (logstash-output-elasticsearch) - Stores logs in Elasticsearch
Step 2: Download the MySQL library for 8.x and put it in a path accessible by Logstash
Download MySQL JDBC driver for 8.x for your operating system from https://dev.mysql.com/downloads/connector/j/
and place it in /usr/share/logstash/lib/mysql-connector-java-8.0.13.jar
We need to specify the location of the jdbc driver jar by jdbc_river_library option since Logstash plugin does not contain jdbc drivers.
Step 3: Define Logstash Configuration
Let’s make a configuration file named logstash-config.conf Configuration file should contain an input section and an output section and optional filter section. Below is sample skeleton Logstash Configuration file
input {
jdbc {
## Input Configs
}
}
filter {
mutate {
## Filter Configs
}
}
output {
elasticsearch {
## Output Configs
}
}
Step 3.1 Define Input section
Since we are using MySQL as input, we need to use JDBC Input Plugin. Sample Input section using JDBC Input plugin is as shown below
input {
jdbc {
id => "student_processor"
jdbc_connection_string => "jdbc:mysql://localhost:3306/schooldb"
jdbc_user => "test"
jdbc_password => "rxCv3rPw#"
# The path to downloaded jdbc driver
jdbc_driver_library => "/usr/share/logstash/lib/mysql-connector-java-8.0.13.jar"
jdbc_driver_class => "Java::com.mysql.jdbc.Driver"
# The path to the file containing the query
statement_filepath => "/etc/logstash/conf.d/ou_master.sql"
use_column_value => true
}
}
We will discuss about the id field later during Output section.
- jdbc_connection_string: Connection string is the URL and the port that runs the MySQL database server. Also provide the username and password if not specified in the connection string.
- jdbc_driver_library: We have to specify the location of the jdbc driver library on the Linux box. In our case it is "/usr/share/logstash/lib/mysql-connector-java-8.0.13.jar"
- jdbc_driver_class : Class name for JDBC Driver. "Java::com.mysql.jdbc.Driver" (deprecated) OR "java::com.mysql.cj.jdbc.Driver"
- statement_filepath: File containing the SQL statement to fetch data from MySQL. In our case it is "/etc/logstash/conf.d/student.sql"
[root@localhost conf.d]# cat /etc/logstash/conf.d/student.sql
select id, student_name, student_code, student_category, type, status, grade, last_updated_date from student;
We can also use SQL statement directly like below
- statement => "select id, student_name, student_code, type, status, grade, last_updated_date from student"
Step 3.2 Define Filter section
As part of filter section, we will use Mutate filter already installed. We want to remote underscores from the MySQL table field names and use camel convention for those fields in Elasticsearch. We do this by using rename feature Mutate filter. This filter provides many other operations, please visit filter logstash-mutate-filter for more information.
filter {
mutate {
rename => { "student_code" => "studentCode"}
rename => { "student_name" => "studentName"}
rename => { "student_category" => "studentCategory"}
rename => { "last_updated_date" => "lastUpdatedDate"}
}
}
Step 3.3 Define Output section
We want to ingest data into Elasticsearch. so we will use logstash-output-elasticsearch plugin Let’s assume Elasticsearch is running locally on the port 9200. Sample Output configuration using this plugin is as below:
output {
elasticsearch {
document_id => "%{id}"
document_type => "student"
index => "student-index"
# codec => "json"
hosts => ["localhost:9200"]
}
stdout {codec => json_lines}
}
-
stdout is a output plugin in Logstash. It prints to the stdout of the shell running Logstash. We can use different codecs inside stdout like rubydebug, json_lines.
-
rubydebug - outputs event data using awesome_print library. It is used to visualise the structure of the data. We can also use the json codec in order to print event data using JSON format.
-
json_lines - This codec will decode streamed JSON that is newline delimited. Encoding will emit a single JSON string ending in a @delimiter NOTE: Do not use this codec if your source input is line-oriented JSON, for example, redis or file inputs. Rather, use the json codec.
Step 4: Putting it all together
Let’s say you want to push all the rows in the student table to Elasticsearch. student table contains id (auto increment primary key), id, student_name, student_code, student_category, type, status, grade, last_updated_date columns. Following will be the full configuration.
input {
jdbc {
id => "student_processor"
jdbc_connection_string => "jdbc:mysql://localhost:3306/schooldb"
jdbc_user => "test"
jdbc_password => "rxCv3rPw#"
# The path to downloaded jdbc driver
jdbc_driver_library => "/usr/share/logstash/lib/mysql-connector-java-8.0.13.jar"
jdbc_driver_class => "Java::com.mysql.jdbc.Driver"
# The path to the file containing the query
statement_filepath => "/etc/logstash/conf.d/ou_master.sql"
use_column_value => true
}
}
filter {
mutate {
rename => { "student_code" => "studentCode"}
rename => { "student_name" => "studentName"}
rename => { "student_category" => "studentCategory"}
rename => { "last_updated_date" => "lastUpdatedDate"}
}
}
output {
elasticsearch {
document_id => "%{id}"
document_type => "student"
index => "student-index"
# codec => "json"
hosts => ["localhost:9200"]
}
stdout {codec => json_lines}
}
Step 5: Running the logstash process via command line
Place the configuration file in the logstash’s config directory. We run the logstash pipeline using the following command:
$ logstash -w 1 -f /etc/logstash/conf.d/student-logstash.conf
Open the following URL http://localhost:9200/student-index/student/IND to check whether the student-index is populated with data or not
OR do
curl -XGET http://localhost:9200/student-index/student/IND
curl -XGET http://localhost:9200/student-index?pretty=true
Scheduling LogStash jobs:
With our earlier configurations, only the rows/data present at the time of running will be saved in Elasticsearch. In order to save the data that is modified/created after the process is run, we use scheduling capability of LogStash. In general, it is not advisable to run LogStash configuration only with schedule feature, use it in combination with tracking column field.
Before going further on scheduling let us discuss some Common and Predefined LogStash Configurations that will be very useful for processing the data.
Common Configurations:
There are some common configuration options like id, tags, type, codec that are supported by all input plugins.
- id field - Add a unique ID to the plugin configuration. If no ID is specified, Logstash will generate one. It is strongly recommended to set this ID in your configuration. This is particularly useful when you have two or more plugins of the same type, for example, if you have 2 jdbc inputs. Adding a named ID in this case will help in monitoring Logstash when using the monitoring APIs.
input {
jdbc {
id => "my_plugin_id"
}
}
-
tags - Add any number of arbitrary tags to your event. This can be used for processing later.
-
type - Add a type field to all events handled by this input. Types are used mainly for filter activation. The type is stored as part of the event itself, so you can also use the type to search for it in Kibana. If you try to set a type on an event that already has one (for example when you send an event from a shipper to an indexer) then a new input will not override the existing type. A type set at the shipper stays with that event for its life even when sent to another Logstash server.
Predefined Parameters:
Some parameters are built-in and can be used from within your queries. Here is the list:
sql_last_value
The value used to calculate which rows to query. Before any query is run, this is set to Thursday, 1 January 1970, or 0 if use_column_value is true and tracking_column is set. It is updated accordingly after subsequent queries are run.
Example:
input {
jdbc {
statement => "SELECT id, mycolumn1, mycolumn2 FROM my_table WHERE id > :sql_last_value"
use_column_value => true
tracking_column => "id"
# ... other configuration bits
}
}
Using scheduler/cron jobs:
If you want to stash MySQL repeatedly based on a timestamp or some trackable column and ingest data into Elasticsearch, we can do that using tracking column & schedule parameters in input jdbc section. Parameters like use_column_value, tracking_column, last_run_metadata_path, schedule influence this behavior.
Sample config is as below:
input {
jdbc {
#Processor for Student data
id => "student_processor"
jdbc_connection_string => "jdbc:mysql://localhost:3306/schooldb"
jdbc_user => "test"
jdbc_password => "rxCv3rPw#"
# The path to downloaded jdbc driver
jdbc_driver_library => "/usr/share/logstash/lib/mysql-connector-java-8.0.13.jar"
jdbc_driver_class => "Java::com.mysql.jdbc.Driver"
# The path to the file containing the query
statement => "select id, student_name, student_code, type, status, grade, last_updated_date from student where last_updated_date >: sql_last_value"
use_column_value => true
tracking_column => last_updated_date
tracking_column_type => "timestamp"
last_run_metadata_path => "/usr/share/logstash/.logstash_jdbc_lastrun_student"
schedule =>"* * * * * *"
}
}
-
document_id field from Output section - Elasticsearch creates a unique id for every document in order to uniquely identify that. Since we run the configuration file again and again, it will create the same documents repeatedly. In order to prevent this, we use the document_id option. In our case, we chose id field from student table as our document_id of elastic index so that the plugin uses the primary key (id field) from student table as unique document id. Then Elasticsearch will not create multiple documents for the same record since it does not create documents with same document id. Then the existing one will get replaced with the newest ones. Then the newly added and changed records’ data will be present at the Elasticsearch.
-
schedule - Input from this plugin can be scheduled to run periodically according to a specific schedule. Use any valid cron job expression to run the logstash configuration repeatedly.
-
tracking_column - The column whose value is to be tracked if use_column_value is set to true. If we don't use the tracking_column and only schedule is used, it runs logstash with same configuration again and again using a scheduler and whole table data will be sent repeatedly — not just the changed data)
-
tracking_column_type - Type of tracking column. Currently only "numeric" and "timestamp" are supported.
-
last_run_metadata_path - Path to file with last run time. If there is a system failure and Logstash stops running, when we start the server it will continue from the value in the .logstash_jdbc_run file without starting from the beginning). We can also specify a custom .logstash_jdbc_last_run file where the :sql_last_value gets saved.
-
use_column_value - When set to true, uses the defined tracking_column value as the :sql_last_value. When set to false, :sql_last_value reflects the last time the query was executed.
There are different combinations of using scheduling with use_column_value, tracking_column, schedule.
Configuration #1: schedule and tracking_column
To incrementally push the data to Elasticsearch, we need to use a tracking column in the MySQL table to be used as reference. In our case we will use last_updated_date column of student table which is a timestamp and is updated whenever there is a creation of row or update operation is performed.
Following is the sample configuration for this scenario.
input {
jdbc {
jdbc_connection_string => "jdbc:mysql://localhost:3306/schooldb"
jdbc_user => "test"
jdbc_password => "rxCv3rPw#"
jdbc_driver_library => "/usr/share/logstash/lib/mysql-connector-java-8.0.13.jar"
jdbc_driver_class => "Java::com.mysql.jdbc.Driver"
statement => "select id, student_name, student_code, type, status, grade, last_updated_date from student where last_updated_date>:sql_last_value"
tracking_column => last_updated_date
tracking_column_type => "timestamp"
schedule =>"* * * * * *"
}
}
The purpose of sql_last_value is already described above "Predefined Parameters" section. It contains the value that is used as the reference to select data to be retrieved. It increments the time based on the schedule configuration. sql_last_value is incremented based on the scheduler (in our case it runs every second), sql_last_value is incremented by a second everytime. This value is saved to a file .logstash_jdbc_last_run
[root@localhost conf.d]# ls -al /usr/share/logstash/
total 2956
drwxr-xr-x. 11 logstash logstash 4096 Dec 15 20:43 .
drwxr-xr-x. 254 root root 8192 Jan 28 21:20 ..
drwxr-xr-x. 2 logstash logstash 4096 Dec 15 20:41 bin
-rw-r--r--. 1 logstash logstash 2276 Nov 30 06:17 CONTRIBUTORS
drwxrwxr-x. 4 logstash logstash 65 Dec 15 20:29 data
drwxr-xr-x. 6 logstash logstash 4096 Dec 15 20:41 lib
-rw-r--r--. 1 logstash logstash 13675 Nov 30 06:17 LICENSE.txt
drwxr-xr-x. 4 logstash logstash 86 Dec 7 16:09 logstash-core
drwxr-xr-x. 3 logstash logstash 55 Dec 7 16:09 logstash-core-plugin-api
-rw-r--r--. 1 logstash logstash 41 Jan 10 23:14 .logstash_jdbc_last_run
Configuration #2: schedule, tracking_column and use_column_value on a timestamp field
Instead of updating the sql_last_value field based on the scheduler timestamp, we can configure LogStash to update based on a column in the table as a reference. With this configuration, LogStash fetches only the rows from the table where last_updated_date is greater than the value of last_updated_date of the record which was saved in the last.
Below is the sample LogStash Configuration to Stash MySQL with use_column_value
input {
jdbc {
jdbc_connection_string => "jdbc:mysql://localhost:3306/schooldb"
jdbc_user => "test"
jdbc_password => "rxCv3rPw#"
jdbc_driver_library => "/usr/share/logstash/lib/mysql-connector-java-8.0.13.jar"
jdbc_driver_class => "Java::com.mysql.jdbc.Driver"
statement => "select id, student_name, student_code, type, status, grade, last_updated_date from student where last_updated_date >: sql_last_value"
use_column_value => true
tracking_column => last_updated_date
tracking_column_type => "timestamp"
last_run_metadata_path => "/usr/share/logstash/.logstash_jdbc_lastrun_student"
schedule =>"* * * * * *"
}
}
Configuration #3: schedule, tracking_column and use_column_value on an integer field
This configuration can be used to push only the newly added rows rather than the updated ones. Instead of the last_updated_date field, we can use id column which is an integer.
input {
jdbc {
jdbc_connection_string => "jdbc:mysql://localhost:3306/schooldb"
jdbc_user => "test"
jdbc_password => "rxCv3rPw#"
jdbc_driver_library => "/usr/share/logstash/lib/mysql-connector-java-8.0.13.jar"
jdbc_driver_class => "Java::com.mysql.jdbc.Driver"
statement => "select id, student_name, student_code, type, status, grade, last_updated_date from student where id >: sql_last_value"
use_column_value => true
tracking_column => id
tracking_column_type => "numeric"
last_run_metadata_path => "/usr/share/logstash/.logstash_jdbc_lastrun_student"
schedule =>"* * * * * *"
}
}
In this case, :sql_last_value is automatically set to 0 at first (Since tracking column is in integer type).
Advanced configurations with multiple JDBC inputs
If you want to pull data from multiple MySQL tables and push them to different indexes in Elastic search, parameters like input.id, type, tags are helpful to distinguish the JDBC Inputs.
-
id - As stated above, id field from input section/jdbc, can be used to add unique ID to plugin configuration. Add unique id for each of the JDBC plugins
-
type - type field from input section can be used while applying different filters for various JDBC inputs/tables.
Sample input section with multiple JDBC Inputs is as follows:
Configuration #1 Multiple Inputs: using id, type
input {
jdbc {
#Processor for Student data
id => "student_processor"
jdbc_connection_string => "jdbc:mysql://localhost:3306/schooldb"
jdbc_user => "logstash"
jdbc_password => "rxv3rPw#"
# The path to downloaded jdbc driver
jdbc_driver_library => "/usr/share/logstash/lib/mysql-connector-java-8.0.13.jar"
jdbc_driver_class => "Java::com.mysql.jdbc.Driver"
# The path to the file containing the query
statement_filepath => "/etc/logstash/conf.d/student.sql"
use_column_value => true
type => "student"
tracking_column => last_updated_date
tracking_column_type => "timestamp"
last_run_metadata_path => "/usr/share/logstash/.logstash_jdbc_lastrun_student"
schedule =>"* * * * * *"
}
jdbc {
#Processor for List of Values data
id => "lovdata_processor"
jdbc_connection_string => "jdbc:mysql://localhost:3306/schooldb"
jdbc_user => "logstash"
jdbc_password => "rxv3rPw#"
# The path to downloaded jdbc driver
jdbc_driver_library => "/usr/share/logstash/lib/mysql-connector-java-8.0.13.jar"
jdbc_driver_class => "Java::com.mysql.jdbc.Driver"
# The path to the file containing the query
statement_filepath => "/etc/logstash/conf.d/lov_data.sql"
type => "lovdata"
use_column_value => true
tracking_column => last_updated_date
tracking_column_type => "timestamp"
last_run_metadata_path => "/usr/share/logstash/.logstash_jdbc_lastrun_lovdata"
schedule =>"* * * * * *"
}
}
You can repeat the Filter and Output section for two entities (student and lovdata) using the type parameter like below.
filter {
if [type] == "student" {
mutate {
rename => { "student_code" => "studentCode"}
rename => { "student_name" => "studentName"}
rename => { "student_category" => "studentCategory"}
rename => { "last_updated_date" => "lastUpdatedDate"}
}
}
else if [type] == "lovdata" {
mutate {
rename => { "lov_type" => "type"}
rename => { "lov_display_value" => "lovDisplayValue"}
rename => { "lov_db_value" => "lovDbValue"}
}
}
}
output {
if [type] == "student" {
elasticsearch {
document_id => "%{id}"
document_type => "student"
index => "student-index"
# codec => "json"
hosts => ["localhost:9200"]
}
}
else if [type] == "lovdata" {
elasticsearch {
document_id => "%{id}"
document_type => "lovdata"
index => "lovdata-index"
# codec => "json"
hosts => ["localhost:9200"]
}
}
stdout {codec => json_lines}
}
If your event (MySQL table) already has a field with name type, JDBC plugin doesn't override that value causing incorrect data to be pushed into Elasticsearch.
We can use the tags input feature in this scenario by assigning tags of our choice in input section.
Configuration #1 Multiple Inputs: using id, tags
Complete configuration for handling student and lov_data using tags section is follows:
input {
jdbc {
#Processor for Student data
id => "student_processor"
jdbc_connection_string => "jdbc:mysql://localhost:3306/schooldb"
jdbc_user => "logstash"
jdbc_password => "rxv3rPw#"
# The path to downloaded jdbc driver
jdbc_driver_library => "/usr/share/logstash/lib/mysql-connector-java-8.0.13.jar"
jdbc_driver_class => "Java::com.mysql.jdbc.Driver"
# The path to the file containing the query
statement_filepath => "/etc/logstash/conf.d/student.sql"
use_column_value => true
tags => ["student"]
tracking_column => last_updated_date
tracking_column_type => "timestamp"
last_run_metadata_path => "/usr/share/logstash/.logstash_jdbc_lastrun_student"
schedule =>"* * * * * *"
}
jdbc {
#Processor for List of Values data
id => "lovdata_processor"
jdbc_connection_string => "jdbc:mysql://localhost:3306/schooldb"
jdbc_user => "logstash"
jdbc_password => "rxv3rPw#"
# The path to downloaded jdbc driver
jdbc_driver_library => "/usr/share/logstash/lib/mysql-connector-java-8.0.13.jar"
jdbc_driver_class => "Java::com.mysql.jdbc.Driver"
# The path to the file containing the query
statement_filepath => "/etc/logstash/conf.d/lov_data.sql"
tags => ["lovdata"]
use_column_value => true
tracking_column => last_updated_date
tracking_column_type => "timestamp"
last_run_metadata_path => "/usr/share/logstash/.logstash_jdbc_lastrun_lovdata"
schedule =>"* * * * * *"
}
}
filter {
if "student" in [tags] {
mutate {
rename => { "student_code" => "studentCode"}
rename => { "student_name" => "studentName"}
rename => { "student_category" => "studentCategory"}
rename => { "last_updated_date" => "lastUpdatedDate"}
}
}
else if "lovdata" in [tags] {
mutate {
rename => { "lov_type" => "type"}
rename => { "lov_display_value" => "lovDisplayValue"}
rename => { "lov_db_value" => "lovDbValue"}
}
}
}
output {
if "student" in [tags] {
elasticsearch {
document_id => "%{id}"
document_type => "student"
index => "student-index"
# codec => "json"
hosts => ["localhost:9200"]
}
}
else if "lovdata" in [tags] {
elasticsearch {
document_id => "%{id}"
document_type => "lovdata"
index => "lovdata-index"
# codec => "json"
hosts => ["localhost:9200"]
}
}
stdout {codec => json_lines}
}
Note: If any of the statements use the sql_last_value parameter (e.g. for ingesting only data changed since last run), each input should define its own last_run_metadata_path parameter. Failure to do so will result in undesired behaviour, as all inputs will store their state to the same (default) metadata file, effectively overwriting each other’s sql_last_value.
Here we used both inputs as JDBC, output as Elasticsearch with mutate filter to rename the fields. We can have more complex configurations using combination of tags, type, input plugins, filter plugins and Output plugins. We can use complex regex during comparison, nested if/else conditions, multiple outputs for different entities.
References:
- https://www.elastic.co/guide/en/logstash/current/logstash-6-5-2.html
- https://www.elastic.co/guide/en/logstash/current/input-plugins.html
- https://www.elastic.co/guide/en/logstash/current/plugins-inputs-jdbc.html
- https://www.elastic.co/guide/en/logstash/current/filter-plugins.html
- https://www.elastic.co/guide/en/logstash/current/plugins-filters-mutate.html
- https://www.elastic.co/guide/en/logstash/current/codec-plugins.html
- https://www.elastic.co/guide/en/logstash/current/output-plugins.html
- https://www.elastic.co/guide/en/logstash/current/config-examples.html
- https://discuss.elastic.co/c/logstash