Logstash - kamialie/knowledge_corner GitHub Wiki
Contents
Docker
Home directory is /usr/share/logstash
. Default pipeline configuration is
located at pipeline/logstash.conf
or specify with -f
parameter. Any files in
pipeline
directory will be read and merged (most likely). Default settings
configuration is located at config/logstash.yml
file or specify with
path.settings
parameter, when executing Logstash binary manually.
Configuration
Settings
To enable dead letter queue add the following lines:
dead_letter_queue.enable: true
path.dead_letter_queue: path/to/file
Pipeline
input
Input configuration is enclosed in input
root section:
input {
...
}
local file
Parse local file, also continuously look for new lines. By default, checks the
end of the file, expecting new lines to appear. sincedb
file is used to
record, where Logstash left off. Setting it to /dev/null
ensures that
specified is read from the beginning each time.
file {
path => "path_to_file"
start_position => "beginning"
sincedb_path => "/dev/null"
}
Some logs are produced on multiple lines and Logstash has a way to identify multiline group and assemble it together.
file {
codec => multiline {
pattern => "pattern"
negate => true
what => "previous"
}
}
heartbeat
heartbeat {
message => "ok"
interval => 5
type => "heartbeat"
}
message
can be any text, but has the following special valuesok
- defaultepoch
will omitclock
field instead ofmessage
with the value of epoch time - can be used to find out time delay between event generation and log ingestionsequence
- will increment value inclock
field starting from one
interaval
in seconds
generator
generator {
lines => {
`{"field": "value"}`,
`{"other_field": "value"}`
}
count => 0
codec => "json"
}
count
- if set to 0, generates infinite number of times, otherwise generate n timescodec
tells Logstash about data format
dlq
dead_letter_queue {
path => "path/to/file"
}
Items that didn't match the filter move to dead letter queue file, which must be
enabled and set in Logstash settings (yaml file). Items aren't automatically
removed, use commit_offset
to true
to enable that.
http poller
http_poller {
urls => {
es_health_status => {
method => get
url => "http://search:9200/_cluster/health"
headers => {
Accept => "application/json"
}
}
}
tags => "es_health"
request_timeout => 60
schedule => {
cron => "* * * * * UTC"
}
codec => "json"
metadata_target => "http_poller_metadata"
}
-
schedule
can also be"every" => "5s"
-
metadata_target
- specifies where (field name) to store response headers
tcp
tcp {
port => port_number
type => syslog
}
Configures Logstash to open port for incoming connection on specified port. Type can be any.
MySQL
S3
s3 {
bucket => "bucket_name"
access_key_id => "..."
secret_access_key => "..."
}
Kafka
kafka {
bootstrap_servers => "host_name:port"
topics => ["name"]
}
Specify kafka server and name of topic (channel) to listen to.
filter
Filter configuration is enclosed in filter
root section:
filter {
...
}
filter {
date {
match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
}
}
csv
Skip first column, as it contains headers. Specify headers in columns
parameter.
csv {
separator => ","
skip_header => "true"
columns => ["id","timestamp","paymentType","name","gender","ip_address","purpose","country","age"]
}
By default, Logstash also inserts other fields, such as host
, timestamp
,
version
and so on. To remove these fields add mutate
configuration in
filter
section. Filters are applied in the same order they are specified in
the section. Thus, specify csv
first, then mutate
.
mutate {
convert => {
age => "integer"
}
remove_field => ["message","@timestamp","path","host","@version"]
}
json
message
is the field name, where json data will be pulled from (this is where
Logstash saves json data).
json {
source = "message"
}
To drop entries that contains certain value of the field, use if block (in the filter section):
if [field_name] == "value" {
drop {}
}
grok
- documentation
- debugger - choose
Named Captures Only
option
Generic grok syntax - %{PATTERN:identifier}
. Identifier is chosen by user.
Can be used as field name for matched values.
F.e. %{TIMESTAMP_ISO8601:time} %{LOGLEVEL:legLevel} %{GREEDYDATA:logMessage
would match 2020-03-29T15:42:39.44+03:00 INFO This is a sample log
.
grok {
match => {"message" => ['%{TIMESTAMP_ISO8601:time} %{LOGLEVEL:legLevel} %{GREEDYDATA:logMessage"]}
}
Lines that don't match grok filter produce entries with _grokparserfailure
item in tags
field, and without specified in the filter fields. Add the
following if
block at the end of filter section to drop those failed parsing
items.
if "_grokparsefailure" in [tags] { drop {} }
It is also possible to specify multiple patterns - if line doesn't match first pattern, next pattern can be checked and so on.
grok {
match => {"message" => [
'%{TIMESTAMP_ISO8601:time} %{LOGLEVEL:legLevel} %{GREEDYDATA:logMessage',
'%{IP:clientIP} %{WORD:httpMethod} %{URIPATH:url'
]
}
}
Nginx:
grok {
match => { "message" => ["%{IPORHOST:remote_ip} - %{DATA:user_name} \[%{HTTPDATE:access_time}\] \"%{WORD:http_method} %{DATA:url} HTTP/%{NUMBER:http_version}\" %{NUMBER: response_code} %{NUMBER:body_sent_bytes} \"%{DATA:referrer}\" \"%{DATA:agent}\""] }
remove_field => "message"
}
mutate {
add_field => { "read_timestamp" => "%{@timestamp}" }
}
date {
match => [ "timestamp", "dd/MMM/YYYY:H:m:s Z" ]
remove_field => "timestamp"
}
MongoDB:
grok {
match => { "message" => ["%{TIMESTAMP_ISO8601:timestamp}\s+%{NOTSPACE:severity}\s+%{NOTSPACE:component}\s+(?:\[%{DATA:context}\])?\s+%{GREEDYDATA:log_message}" ] }
remove_field => "message"
}
mutate {
add_field => { "read_timestamp" => "%{@timestamp}" }
}
AWS ELB:
grok {
match => { "message" => ["%{TIMESTAMP_ISO8601:timestamp} %{NOTSPACE:loadbalancer} %{IP:client_ip}:%{NUMBER:client_port} (?:%{IP:backend_ip}:%{NUMBER:backend_port}|-) %{NUMBER:request_processing_time} %{NUMBER:backend_processing_time} %{NUMBER:response_processing_time} (?:%{NUMBER:elb_status_code}|-) (?:%{NUMBER:backend_status_code}|-) %{NUMBER:received_bytes} %{NUMBER:sent_bytes} \"(?:%{WORD:verb}|-) (?:%{GREEDYDATA:request}|-) (?:HTTP/%{NUMBER:httpversion}|-( )?)\" \"%{DATA:userAgent}\"( %{NOTSPACE: ssl_cipher} %{NOTSPACE:ssl_protocol})?"] }
remove_field => "message"
}
AWS ALB:
grok {
match => { "message" => ["%{NOTSPACE:request_type} %{TIMESTAMP_ISO8601:log_timestamp} %{NOTSPACE:alb-name} %{NOTSPACE:client}:%{NUMBER:client_port} (?:%{IP:backend_ip}: %{NUMBER:backend_port}|-) %{NUMBER:request_processing_time} %{NUMBER:backend_processing_time} %{NOTSPACE:response_processing_time:float} %{NOTSPACE:elb_status_code} %{NOTSPACE: target_status_code} %{NOTSPACE:received_bytes:float} %{NOTSPACE:sent_bytes:float} %{QUOTEDSTRING:request} %{QUOTEDSTRING:user_agent} %{NOTSPACE:ssl_cipher} %{NOTSPACE: ssl_protocol} %{NOTSPACE:target_group_arn} %{QUOTEDSTRING:trace_id}"] }
remove_field => "message"
}
AWS CloudFront:
mutate {
gsub => [
"message", "\t", " ",
"message", "\n", " "
]
}
grok {
match => { "message" => [
"%{DATE:date}[ \t]%{TIME:time}[ \t]%{DATA:x_edge_location}[ \t](?:%{NUMBER:sc_bytes}|-)[ \t]%{IP:c_ip}[ \t]%{WORD:cs_method}[ \t]%{HOSTNAME:cs_host}[ \t]%{NOTSPACE:cs_uri_stem}[ \t]%{NUMBER:sc_status}[ \t]%{GREEDYDATA:referrer}[ \t]%{NOTSPACE:user_agent}[ \t]%{GREEDYDATA:cs_uri_query}[ \t]%{NOTSPACE:cookie}[ \t]%{WORD:x_edge_result_type}[ \t]%{NOTSPACE:x_edge_request_id}[ \t]%{HOSTNAME:x_host_header}[ \t]%{URIPROTO:cs_protocol}[ \t]%{INT:cs_bytes}[ \t]%{NUMBER:time_taken}[ \t]%{NOTSPACE:x_forwarded_for}[ \t]%{NOTSPACE:ssl_protocol}[ \t]%{NOTSPACE:ssl_cipher}[ \t]%{NOTSPACE:x_edge_response_result_type}[ \t]%{NOTSPACE:cs_protocol_version}[ \t]%{NOTSPACE:fle_status}[ \t]%{NOTSPACE: fle_encrypted_fields}[ \t]%{NOTSPACE:c_port}[ \t]%{NOTSPACE:time_to_first_byte}[ \t]%{NOTSPACE:x_edge_detailed_result_type}[ \t]%{NOTSPACE:sc_content_type}[ \t]%{NOTSPACE: sc_content_len}[ \t]%{NOTSPACE:sc_range_start}[ \t]%{NOTSPACE:sc_range_end}"
] }
}
mutate {
remove_field => "message"
}
syslog:
grok {
match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA: syslog_message}" }
add_field => [ "received_at", "%{@timestamp}" ]
add_field => [ "received_from", "%{host}" ]
}
date {
match => [ "syslog_timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]
target => "syslog_timestamp"
}
useragent
Takes agent info from the log and transforms is to more human readable info
useragent {
source => "agent"
target => "agent"
}
geoip
Identifies approximate geographical location based on IP.
geoip {
source => "clientIP"
target => "geoip"
}
output
Output configuration is enclosed in output
root section:
output {
...
}
if block can be used to specify multiple targets
if "some_value" in [field_name] {
# specify target info
}
Elasticsearch
If no name of index is specified, logstash-{data}
is created.
elasticsearch {
hosts => ["host:port"]
index => "index-name"
}
stdout
stdout {
codec => rubydebug
}
stdout {
codec => json_lines
}
Empty configuration will display status output and log information.
stdout {}