ElasticSearch, Kibana, Logstash, Filebeat, etc. - cchantra/bigdata.github.io GitHub Wiki

Elasticsearch

Download elasticsearch

install and run them.

  • Follow steps in

https://www.elastic.co/start

(Version less than 6.0 is for linux 32 bit in our case we need it.)

wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.12.0-linux-x86_64.tar.gz

tar xvf elasticsearch-7.12.0-linux-x86_64.tar.gz

  • rename folders

mv elasticsearch-7.12.0 elasticsearch

Run Test elastic.

cd elastic

bin/elastic

or run as daemon

bin/elastic -d -p pid

or run as a service

  • Run as a service.

create service file where hadoop is user name and group name

vi /usr/lib/systemd/system/elastic.service

[Unit]

Description=Elastic

[Service]

RuntimeDirectory=/home/hadoop/elasticsearch

Environment=ES_HOME=/home/hadoop/elasticsearch

Environment=ES_PATH_CONF=/home/hadoop/elasticsearch/config

Environment=ES_HEAP_SIZE=512M

Environment=ES_JAVA_OPTS=-Xmx2g -Xms2g

Type=simple

User=hadoop

Group=hadoop

#User=root

#Group=root

ExecStart=/home/hadoop/elasticsearch/bin/elasticsearch

Restart=always

[Install]

WantedBy=multi-user.target

Save it.

Reload service file, enable at start and start service, check status

sudo systemctl daemon-reload

sudo systemctl enable elastic.service

sudo systemctl start elastic.service

sudo systemctl status elastic.service

check problem if service is not started


sudo journalctl -u elastic.service -b
Screen Shot 2567-04-22 at 16 23 49

Then

curl -X GET "http://localhost:9200/?pretty"

{   "name" : "bigdata",   "cluster_name" : "elasticsearch",   "cluster_uuid" : "L82KiFJTSO6f4dvvI6Oq_g",   "version" : {     "number" : "7.12.0",     "build_flavor" : "default",     "build_type" : "tar",     "build_hash" : "78722783c38caa25a70982b5b042074cde5d3b3a",     "build_date" : "2021-03-18T06:17:15.410153305Z",     "build_snapshot" : false,     "lucene_version" : "8.8.0",     "minimum_wire_compatibility_version" : "6.8.0",     "minimum_index_compatibility_version" : "6.0.0-beta1"   },   "tagline" : "You Know, for Search" }

If get killed, you may try to increase jvm heap size

in elasticsearch/conf directory

file jvm.options

-Xmx1g

(source: https://www.elastic.co/guide/en/elasticsearch/reference/current/important-settings.html#heap-size-settings)

(https://www.elastic.co/guide/en/cloud-enterprise/2.0/ece-heap.html#:~:text=Elasticsearch%20clusters%20and%20JVM%20Heap%20Size&text=The%20ideal%20heap%20size%20is,container%20that%20hosts%20your%20cluster.

)

kibana

Must be the same version as elastic installed (https://www.elastic.co/downloads/kibana)

wget https://artifacts.elastic.co/downloads/kibana/kibana-7.12.0-linux-x86_64.tar.gz

Or I used

wget https://artifacts.elastic.co/downloads/kibana/kibana-7.12.0-linux-x86_64.tar.gz
tar xvf  kibana/kibana-7.12.0-linux-x86_64.tar.gz

mv  kibana-7.12.0-linux-x86_64 kibana
cd kibana

Then edit config

vi config/kibana.yml

uncomment the line to link with elastic search

elasticsearch.hosts: ["http://localhost:9200"]

server.ssl.enabled: false

Then create a service file. Create service file where hadoop is user name and group name

sudo vi /usr/lib/systemd/system/kibana.service
[Unit]
Description=Kibana

[Service]
Type=simple
User=hadoop
Group=hadoop
#User=root
#Group=root
ExecStart=/home/hadoop/kibana/bin/kibana -c /home/hadoop/kibana/config/kibana.yml  # location of your kibana
Restart=always

[Install]
WantedBy=multi-user.target

Save it and Reload service file, enable at start and start service, check status

sudo systemctl daemon-reload
sudo systemctl enable kibana.service
sudo systemctl start kibana.service
sudo systemctl status kibana.service

Screen Shot 2567-04-22 at 16 22 38

check problem if service is not started

sudo journalctl -u kibana.service -b

Then, after succesful, Allow port:

sudo ufw allow 5601/tcp

Then tunnel to localhost port 5601 to university firewall from your machine

ssh -N -L 5601:localhost:5601 [email protected]   -vv

Navigate to : see the sample dashboard

http://localhost:5601

Try elasticsearch using UI

http://localhost:5601/app/kibana#/dev_tools/console?_g=()

A request to Elasticsearch consists of the same parts as any HTTP request:

curl -X '://:/?<QUERY_STRING>' -d ''

(https://logz.io/blog/elasticsearch-tutorial/)

curl -X :///

e.g

curl -X PUT 'localhost:9200/app/users/4' -H 'Content-Type: application/json' -d '
{
  "id": 4,
  "username": "john",
  "last_login": "2018-01-25 12:34:56"
}'
curl -XGET 'localhost:9200/_cat/indices?v&pretty'
health status index                           uuid                   pri rep docs.count docs.deleted store.size pri.store.size
yellow open   app                             2MbJ72MfTvW9_9TUwFQ3Tw   1   1          1            0      4.8kb          4.8kb
green  open   .kibana_task_manager_7.12.0_001 s6lgWLf9TIqI65enuCdZZQ   1   0          9          640      185kb          185kb
green  open   .apm-custom-link                tpcWGCaQSfmnaQtrROrF7w   1   0          0            0       208b           208b
green  open   .apm-agent-configuration        3KfE94rkQ8CoJtr5oxPFxQ   1   0          0            0       208b           208b
green  open   kibana_sample_data_logs         9oRBaJHhRq2aPs-pwQ8Fgg   1   0      14074            0     10.5mb         10.5mb
green  open   .async-search                   -MT7MgLIQ_u8Vr5RK6IHDA   1   0         22            0    239.8kb        239.8kb
green  open   .kibana_7.12.0_001              NkWmq-cdSG2rVnOvbxuThQ   1   0         79           36      2.1mb          2.1mb
green  open   .kibana-event-log-7.12.0-000001 L-7sExOHSuSkofWOyBQYxQ   1   0          1            0      5.6kb          5.6kb
yellow open   logs                            5O18D74zQ4CKy_x3v7T9Zw   1   1          2            0     10.2kb         10.2kb
curl -XGET 'localhost:9200/app/users/4?pretty'
{
  "_index" : "app",
  "_type" : "users",
  "_id" : "4",
  "_version" : 2,
  "_seq_no" : 1,
  "_primary_term" : 1,
  "found" : true,
  "_source" : {
    "id" : 4,
    "username" : "john",
    "last_login" : "2018-01-25 12:34:56"
  }
}


curl -XGET 'localhost:9200/_search?q=logged'
{"took":11,"timed_out":false,"_shards":{"total":9,"successful":9,"skipped":0,"failed":0},"hits":{"total":{"value":2,"relation":"eq"},"max_score":0.18232156,"hits":[{"_index":"logs","_type":"my_app","_id":"1VoY6ngBbN132dKkN_OX","_score":0.18232156,"_source":
{
"timestamp": "2018-01-24 12:34:56",
"message": "User logged in",
"user_id": 4,
"admin": false
}
},{"_index":"logs","_type":"my_app","_id":"1loY6ngBbN132dKkwPOk","_score":0.18232156,"_source":
{
"timestamp": "2018-01-24 12:34:56",
"message": "User logged in",
"user_id": 4,
"admin": false
}
}]}

Read more:

  1. https://logz.io/blog/elasticsearch-tutorial/
  2. https://stackify.com/elasticsearch-tutorial/

An index is like a 'database' in a relational database. It has a mapping which defines multiple types. We can think of an index as a type of data organization mechanism, allowing the user to partition data a certain way. Analogy:

(https://www.elastic.co/guide/en/elasticsearch/reference/7.12/documents-indices.html)

MySQL => Databases => Tables => Columns/Rows Elasticsearch => Indices => Types => Documents with Properties

Other key concepts of Elasticsearch are replicas and shards, the mechanism Elasticsearch uses to distribute data around the cluster. Elasticsearch implements a clustered architecture that uses sharding to distribute data across multiple nodes, and replication to provide high availability.

The index is a logical namespace which maps to one or more primary shards and can have zero or more replica shards. A shard is a Lucene index and that an Elasticsearch index is a collection of shards. Our application talks to an index, and Elasticsearch routes our requests to the appropriate shards.

The smallest index we can have is one with a single shard. This setup may be small, but it may serve our current needs and is cheap to run. Suppose that our cluster consists of one node, and in our cluster we have one index, which has only one shard: an index with one primary shard and zero replica shards.

Follow the instruction in: https://stackify.com/elasticsearch-tutorial/

  1. to create index 2. read index 3. update 4. search https://www.elastic.co/blog/a-practical-introduction-to-elasticsearch https://www.elastic.co/guide/en/kibana/current/getting-started.html
PUT /my_index
{
  "settings": {
    "number_of_shards":   1, 
    "number_of_replicas": 0
  }
}

https://www.elastic.co/guide/en/elasticsearch/guide/current/_talking_to_elasticsearch.html

In elasticsearch, we index, search, sort, and filter documents Elasticsearch uses JavaScript Object Notation, or JSON, as the serialization format for documents.

Elasticsearch, Kibana

Explore sample data from http://localhost:5601/app/home#/tutorial_directory/sampleData

load "Sample eCommerce orders"

Then go back to Discover menu

Select index pattern "kibana_sample_data_ecommerce"

See: https://www.elastic.co/guide/en/kibana/current/discover.html

Then, follow this tutorial.

https://www.elastic.co/guide/en/kibana/6.8/tutorial-load-dataset.html

get the data

wget   https://raw.githubusercontent.com/cchantra/bigdata.github.io/master/elasticsearch/data/shakespeare_6.0.json
 
wget  https://raw.githubusercontent.com/cchantra/bigdata.github.io/refs/heads/master/elasticsearch/data/logs_mod.jsonl
wget https://download.elastic.co/demos/kibana/gettingstarted/accounts.zip

unzip accounts.zip

Required operations: store document ( index ) using PUT, retrieve document using GET, Try complicated search, full text search, phase search, highlightsearch, analytics (aggregation).

(For analytics, you may need to enable fielddata = True. See https://www.elastic.co/guide/en/elasticsearch/reference/5.0/fielddata.html

Create the mapping as follows. (note** remove 'doc' for version 7.xx)

use devtool http://localhost:5601/app/dev_tools#/console?_g=()

copy each one and run it one by one

PUT /shakespeare
{
  
 "mappings": {
   "properties": {
    "speaker": {"type": "keyword"},
    "play_name": {"type": "keyword"},
    "line_id": {"type": "integer"},
    "speech_number": {"type": "integer"}
   }
  }
 
}
PUT /logstash-2015.05.18
{
  "mappings": {
     
      "properties": {
        "geo": {
          "properties": {
            "coordinates": {
              "type": "geo_point"
            }
          }
        }
      }
    
  }
}
PUT /logstash-2015.05.19
{
  "mappings": {
    
      "properties": {
        "geo": {
          "properties": {
            "coordinates": {
              "type": "geo_point"
            }
          }
        }
      }
  
  }
}

PUT /logstash-2015.05.20
{
  "mappings": {
 
      "properties": {
        "geo": {
          "properties": {
            "coordinates": {
              "type": "geo_point"
            }
          }
        }
      }
    
  }
}

Then upload the data (don't forget to unzip accounts.zip, gzip -d logs.jsonl.gz)


curl -H 'Content-Type: application/x-ndjson' -XPOST 'localhost:9200/bank/account/_bulk?pretty' --data-binary @accounts.json

curl -H 'Content-Type: application/x-ndjson' -XPOST 'localhost:9200/shakespeare/_bulk?pretty' --data-binary @shakespeare_6.0.json
curl -H 'Content-Type: application/x-ndjson' -XPOST 'localhost:9200/_bulk?pretty' --data-binary @logs_mod.jsonl

Next define index pattern (https://www.elastic.co/guide/en/kibana/6.8/tutorial-define-index.html) Goto

http://localhost:5601/app/management/kibana/indexPatterns

Define index pattern "shakes*"
click next step

Try:

POST /shakespeare/_search
POST  /shakespeare/scene/_search/
{
    "query":{
        "match" : {
            "play_name" : "Antony"
        }
    }
}
POST  /shakespeare/scene/_search/
{
    "query":{
        "match" : {
            "play_name" : "Antony"
        }
    }
}

To see your created index

GET /_cat/indices?v

see https://www.elastic.co/guide/en/elasticsearch/guide/current/_analytics.html grouping by interest )

Future reading: https://www.elastic.co/guide/en/elasticsearch/guide/current/_distributed_nature.html

Testing ElasticSearch https://www.bogotobogo.com/Hadoop/ELK/ELK_ElasticSearch_Logstash_Kibana4.php

curl http://localhost:9200

logstash

Ref. https://www.elastic.co/guide/en/logstash/7.12/index.html https://www.elastic.co/guide/en/logstash/7.12/installing-logstash.html

wget  https://artifacts.elastic.co/downloads/logstash/logstash-7.12.0-linux-x86_64.tar.gz
wget  https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.12.0-linux-x86_64.tar.gz

untar them and

change folder name to logstash, filebeat

Testing logstash: pipeline

cd logstash
Screen Shot 2567-04-22 at 17 10 59
bin/logstash -e 'input { stdin { } } output { stdout {} }'

Flag -e enables you to specify a configuration directly from the command line.

After starting Logstash, wait until you see "Pipeline main started

Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}..."

and then enter "hello world "

hello world

you get back:

{
      "@version" => "1",
       "message" => "hello world",
          "host" => "bigdata",
    "@timestamp" => 2021-11-22T12:09:28.186Z
}

Exit Logstash by issuing a CTRL-D command in the shell where Logstash is running.

create Logstash pipeline that uses Filebeat to take Apache web logs as input, parses those logs to create specific, named fields from the logs, and writes the parsed data to an Elasticsearch cluster.

  1. Install and configure filebeat that collects logs from files on the server and forwards these logs to your Logstash instance for processing. (see https://www.elastic.co/guide/en/beats/filebeat/7.12/filebeat-getting-started.html)

Download logstash-tutorial.log from https://download.elastic.co/demos/logstash/gettingstarted/logstash-tutorial.log.gz and unzip using

gzip -d  logstash-tutorial.log.gz

After installing Filebeat, configure it. Open the filebeat.yml file located in your Filebeat installation directory, and replace the contents with the following lines.

filebeat.prospectors:
- type: log
 enabled: true 
 paths:
  - ./logstash-tutorial.log 
output.logstash:
  hosts: ["localhost:5044"]


Then run

./filebeat -e -c filebeat.yml -d "publish"

Then configure logstash to use filebeat as input.

cd /home/hadoop/logstash/

Create file first-pipeline.conf

input {
    beats {
        port => "5044"
    }
}
# The filter part of this file is commented out to indicate that it is
# optional.
# filter {
#
# }
output {
    stdout { codec => rubydebug }
}

Verify and exit.

bin/logstash -f first-pipeline.conf --config.test_and_exit

The --config.test_and_exit option parses your configuration file and reports any errors. Now run it.

bin/logstash -f first-pipeline.conf --config.reload.automatic

The grok filter plugin is one of several plugins that are available by default in Logstash. 

the grok filter plugin looks for patterns in the incoming log data, configuring the plugin requires you to make decisions about how to identify the patterns that are of interest.

Edit the first-pipeline.conf to add filter.

filter {
    grok {
        match => { "message" => "%{COMBINEDAPACHELOG}"}
    }
}

Use the %{COMBINEDAPACHELOG} grok pattern, which structures lines from the Apache log using the following schema:

Information Field Name
IP Address clientip
User ID ident
User Authentication auth
timestamp timestamp
HTTP Verb verb
Request body request
HTTP Version httpversion
HTTP Status Code response
Bytes served bytes
Referrer URL referrer
User agent agent

delete registry

sudo rm data/registry

Rerun filebeat

./filebeat -e -c filebeat.yml -d  "publish"

the geoip plugin looks up IP addresses, derives geographic location information from the addresses, and adds that location information to the logs.

filter {
    grok {
        match => { "message" => "%{COMBINEDAPACHELOG}"}
    }
    geoip {
        source => "clientip"
    }
}
....
{"system_info": {"host": {"architecture":"x86_64","boot_time":"2019-01-04T16:44:52.377982+07:00","name":"Admins-MacBook-Air-3.local","ip":["127.0.0.1/8","::1/128","fe80::1/64","192.168.100.2/24","fe80::3caa:d6ff:feaf:b110/64","fe80::f0de:744b:4be3:1db8/64"],"kernel_version":"17.2.0","mac":["80:e6:50:19:f8:fc","02:e6:50:19:f8:fc","3e:aa:d6:af:b1:10","72:00:05:bc:06:b0","72:00:05:bc:06:b1","72:00:05:bc:06:b0"],"os":{"family":"darwin","platform":"darwin","name":"Mac OS X","version":"10.13.1","major":10,"minor":13,"patch":1,"build":"17B1003"},"timezone":"+07","timezone_offset_sec":25200}}}
2019-01-09T20:23:07.072+0700 INFO [beat] instance/beat.go:870 Process info {"system_info": {"process": {"cwd": "/Users/admin/filebeat-6.5.4-darwin-x86_64", "exe": "./filebeat", "name": "filebeat", "pid": 89677, "ppid": 531, "start_time": "2019-01-09T20:23:06.885+0700"}}}....

indexing data to elasticsearch change the output section of first-pipeline.conf to send to elasticsearch

output {
    elasticsearch {
        hosts => [ "localhost:9200" ]
    }
}

Try a test query to Elasticsearch based on the fields created by the grok filter plugin. First, check available indexes.

 curl 'localhost:9200/_cat/indices?v'

health status index                           uuid                   pri rep docs.count docs.deleted store.size pri.store.size
yellow open   app                             2MbJ72MfTvW9_9TUwFQ3Tw   1   1          1            0      4.8kb          4.8kb
yellow open   logstash-2021.04.19-000001      kfg84NcsScmvnT6b-P14GA   1   1          0            0       208b           208b
green  open   .apm-agent-configuration        3KfE94rkQ8CoJtr5oxPFxQ   1   0          0            0       208b           208b
green  open   .kibana_7.12.0_001              NkWmq-cdSG2rVnOvbxuThQ   1   0        180          176        3mb            3mb
green  open   .kibana-event-log-7.12.0-000001 L-7sExOHSuSkofWOyBQYxQ   1   0          2            0       11kb           11kb
green  open   .tasks                          2aYEJzSVTKG08ACUgcyqvg   1   0          2            0     13.8kb         13.8kb
yellow open   bank                            N0OP4zzjQCqjGJSC2_2Yyw   1   1       1000            0    379.3kb        379.3kb
green  open   .kibana_task_manager_7.12.0_001 s6lgWLf9TIqI65enuCdZZQ   1   0          9         3111    393.2kb        393.2kb
green  open   .apm-custom-link                tpcWGCaQSfmnaQtrROrF7w   1   0          0            0       208b           208b
green  open   kibana_sample_data_ecommerce    7gY4HjxOT_KjMhia-Mf_fw   1   0       4675            0      3.9mb          3.9mb
yellow open   shakespeare                     OBwjmoa5T0KJPVWswO-nmQ   1   1     111396            0     18.1mb         18.1mb
green  open   kibana_sample_data_logs         9oRBaJHhRq2aPs-pwQ8Fgg   1   0      14074            0     10.5mb         10.5mb
green  open   .async-search                   -MT7MgLIQ_u8Vr5RK6IHDA   1   0         21            0    239.5kb        239.5kb
yellow open   logs                            5O18D74zQ4CKy_x3v7T9Zw   1   1          2            0     10.2kb         10.2kbThen for my indexes,
curl -XGET 'localhost:9200/logstash-2015.05.19/_search?pretty&q=response=200'
...
 "took" : 31,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : 4233,
    "max_score" : 7.486204,
    "hits" : [
      {
        "_index" : "logstash-2015.05.19",
        "_type" : "log",
        "_id" : "1vSzHWgBdDA_uAwh8YVg",
......

Try to search filter with city == 'Buffalo'

curl -XGET 'localhost:9200/logstash-2015.05.19/_search?pretty&q=geoip.city_name=Buffalo'

Try to use Kibana for viewing filebeat. See: https://www.elastic.co/guide/en/beats/filebeat/current/config-filebeat-logstash.html

https://www.elastic.co/guide/en/beats/filebeat/6.8/filebeat-getting-started.html

see: https://medium.com/@saurabhpresent/getting-started-on-elasticsearch-logstash-kibana-filebeat-for-nginx-logs-analysis-d567999d7846

https://blog.dbi-services.com/elasticsearch-kibana-logstash-and-filebeat-centralize-all-your-database-logs-and-even-more/

See : youtube: https://www.youtube.com/watch?v=d3ZzFHaRljg

Logstash with Filebeat and twitter : Multiple inputs and output.

https://www.elastic.co/guide/en/logstash/7.12/multiple-input-output-plugins.html

Create a Logstash pipeline that takes input from a Twitter feed and the Filebeat client, then sends the information to an Elasticsearch cluster as well as writing the information directly to a file.

https://www.elastic.co/guide/en/logstash/current/multiple-input-output-plugins.html

Create second_pipeline.conf for twitter source to output to elasticsearch and read input from twitter.

input {
    twitter {
        consumer_key => "enter_your_consumer_key_here"
        consumer_secret => "enter_your_secret_here"
        keywords => ["cloud"]
        oauth_token => "enter_your_access_token_here"
        oauth_token_secret => "enter_your_access_token_secret_here"
    }
    beats {
        port => "5044"
    }
}
output {
    elasticsearch {
        hosts => ["localhost:9200", "IP Address 2:port2", "IP Address 3"]
    }
    file {
        path => "/path/to/target/file"
    }
}

Modify filebeat.yml to send output to logstash

filebeat.prospectors:
- type: log
  paths:
    - /var/log/*.log 
  fields:
    type: syslog 
output.logstash:
  hosts: ["localhost:5044"]

Run logstash again.

logstash -f second-pipeline.conf

Search the target file from :

cat /path/to/target/file

Search index

  curl -XGET 'http://localhost:9200/logstash-2019.01.14/_search?pretty&q=client:iphone'
{
  "took" : 2,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : 64,
    "max_score" : 2.8225634,
    "hits" : [
      {
        "_index" : "logstash-2019.01.14",
        "_type" : "doc",
        "_id" : "q_T8S2gBdDA_uAwhH6Iq",
        "_score" : 2.8225634,
        "_source" : {
          "retweeted" : false,
          "@version" : "1",
          "user" : "_BarneyStins0n_",
          "user_mentions" : [
            {
              "id_str" : "20474834",
              "name" : "marc rees",
              "screen_name" : "reesmarc",
              "id" : 20474834,
              "indices" : [
                3,
                12
              ]
            }
          ],
          "source" : "http://twitter.com/_BarneyStins0n_/status/1084764671603851264",
          "@timestamp" : "2019-01-14T10:50:26.000Z",
...

look at your data :

curl -XGET 'localhost:9200/logstash-2019.01.14/_search?pretty'
{
  "took" : 0,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : 442,
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "logstash-2019.01.14",
        "_type" : "doc",
        "_id" : "sfT8S2gBdDA_uAwhSqLl",
        "_score" : 1.0,
        "_source" : {
          "retweeted" : false,
          "@version" : "1",
          "message" : "RT @kimparkdaily: Here we have Jimin asking if the cloud was touching his hair in the softest voice possible. ☁️ https://t.co/7V7XkAqlQ3",
          "user" : "lu_grey",
          "hashtags" : [ ],
          "user_mentions" : [
            {
              "id_str" : "65515596",
              "name" : "Joon's World \uD83D\uDC25\uD83D\uDC28",
              "screen_name" : "kimparkdaily",
              "id" : 65515596,
              "indices" : [
                3,
                16
              ]
            }
.....

filebeat, Kafka, logstash, Elasticsearch

Screen Shot 2567-04-22 at 17 24 15

When to use kafka?

Scenario1: Event Spike

The app you deployed has a bad bug where information is logged excessively, flooding your logging infrastructure. This spike or a burst of data is fairly common in other multi-tenant use cases as well, for example, in the gaming and e-commerce industries. A message broker like Kafka is used in this scenario to protect Logstash and Elasticsearch from this surge. Use the following architecture:

The processing is split into 2 separate stages — the Shipper and Indexer stages. The Logstash instance that receives data from different data sources is called a Shipper as it doesn't do much processing. Its responsibility is to immediately persist data received to a Kafka topic, and hence, its a producer. On the other side, another Logstash instance will consume data, at its own throttled speed, while performing expensive transformations like Grok, DNS lookup and indexing into Elasticsearch. This instance is called the Indexer.

Scenario2: Elasticsearch not reachable

situation where Elasticsearch is down for a longer period of time than you expected. If there are a number of data sources streaming into Elasticsearch, and we cannot afford to stop the original data sources, a message broker like Kafka is useful. With Logstash shipper and indexer architecture with Kafka, we continue to stream your data from edge nodes and hold them temporarily in Kafka. As and when Elasticsearch comes back up, Logstash will continue where it left off, and help catch up to the backlog of data.

Pitfall: When not to use Kafka

If we can tolerate a relaxed search latency, we can completely skip the use of Kafka. Filebeat, which follows and ships file content from edge nodes, is resilient to log rotations. This means if the application is emitting more logs than Logstash/Elasticsearch can ingest at real time, logs can be rotated — using Log4j or logrotate, for example — across files, but they will still be indexed. Note, this brings in a separate requirement of having sufficient disk space to house these logs on your server machines. In other words, in this scenario, your local filesystem will become the temporary buffer.

The setup: https://sematext.com/blog/kafka-connect-elasticsearch-how-to/

Screen Shot 2567-04-22 at 17 24 56

Use Kafka connect (https://github.com/confluentinc/kafka-connect-elasticsearch) Example simple setup: we already have a logs topic created in Kafka and we would like to send data to an index called logs_index in Elasticsearch. To simplify our test we will use Kafka Console Producer to ingest data into Kafka.

Use Kafka connect (https://github.com/confluentinc/kafka-connect-elasticsearch) Example simple setup: we already have a logs topic created in Kafka and we would like to send data to an index called logs_index in Elasticsearch. To simplify our test we will use Kafka Console Producer to ingest data into Kafka. Screen Shot 2567-04-22 at 17 26 09

The setup will be (https://www.elastic.co/blog/just-enough-kafka-for-the-elastic-stack-part1) Screen Shot 2567-04-22 at 17 26 17 (https://facingissuesonit.com/integrate-filebeat-kafka-logstash-elasticsearch-and-kibana/)

Screen Shot 2567-04-22 at 17 27 07

Requirement: Enable kafka model in filebeat.yml by adding: or see filebeat.reference.yml

filebeat:modules:

#--------------------------------- Kafka Module ---------------------------------
- module: kafka
  # All logs
  log:
    enabled: true

    # Set custom paths for Kafka. If left empty,
    # Filebeat will look under /opt.
    var.kafka_home: /opt/kafka

    # Set custom paths for the log files. If left empty,
    # Filebeat will choose the paths depending on your OS.
    #var.paths:

    # Convert the timestamp to UTC. Requires Elasticsearch >= 6.1.
    #var.convert_timezone: false

-create module directory

  mkdir /usr/share/filebeat/bin/module

-enable module

 sudo filebeat modules enable kafka

Steps: for this pipeline.

  1. start zookeeper

  2. start kafka

  3. run kafka create topic

//traverse to extracted directory of Apache Kafka
./bin/zookeeper-server-start.sh config/zookeeper.properties  //start the zookeeper first
./bin/kafka-server-start.sh config/server.properties  //start Kafka
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic devglan-log-test  //create a topic
  1. push log to kafka using filebeat by configuring output of filebeat to kafka: modify filebeat.yml to change output section.
output.kafka:
  # initial brokers for reading cluster metadata
  hosts: ["localhost:9092"]

  # message topic selection + partitioning
  topic: "devglan-log-test"
  partition.round_robin:
    reachable_only: false

  required_acks: 1
  compression: gzip
  max_message_bytes: 1000000
  1. Run elasticsearch daemon

  2. Consume message from kafka by logstash and send to elasticsearch : Create config file: logstash-kafka.conf

input {
    kafka {
            bootstrap_servers => "localhost:9092"
            topics => ["devglan-log-test"]
    }
}

output {
   elasticsearch {
      hosts => ["localhost:9200"]
      index => "devglan-log-test"
      workers => 1
    }
}

run

logstash -f /home/path/logstash-kafka.conf #absolute path is needed
  1. run Kitbana. Create index pattern: devglan-log-test
Screen Shot 2567-04-22 at 17 29 19

Elasticsearch & Grafana & Twitter

(http://docs.grafana.org/features/datasources/elasticsearch/) Required Grafana install: http://docs.grafana.org/installation/

location configuration: /usr/local/etc/grafana/grafana.ini

plugin location: /usr/local/var/lib/grafana/plugins

database location: /usr/local/var/lib/grafana

-Start filebeat from second-pipeline.conf

-Start logstash

-Start elasticsearch

find your elasticsearch index name:

 curl -XGET 'localhost:9200/_cat/indices?v&pretty'

-Add elasticsearch data source to grafana

https://qbox.io/blog/how-to-use-grafana-to-pull-data-from-elasticsearch

-Create grafana dashboard from the above data source

-Add graph and edit metric Screen Shot 2567-04-22 at 17 30 35

Compared this in Kibana: for message keyword "Nike"

Screen Shot 2567-04-22 at 17 31 04

Refereces:

https://www.elastic.co/guide/en/elasticsearch/reference/master/starting-elasticsearch.html

⚠️ **GitHub.com Fallback** ⚠️