LogStash - raghusumanth/elk-repo GitHub Wiki
Welcome to the LogStash wiki!
Logstash is an open source data collection engine with real-time pipelining capabilities. Any type of event can be enriched and transformed with a broad array of input, filter, and output plugins, with many native codecs further simplifying the ingestion process. Logstash accelerates your insights by harnessing a greater volume and variety of data.
Horizontally scalable data processing pipeline with strong Elasticsearch and Kibana synergy Pluggable pipeline architecture :Mix, match, and orchestrate different inputs, filters, and outputs to play in pipeline harmony. Over 200 plugins available, plus the flexibility of creating and contributing your own. Easily ingest a multitude of web logs like Apache, and application logs like log4j for Java Capture many other log formats like syslog, networking and firewall logs, and more Enjoy complementary secure log forwarding capabilities with Filebeat. Collect metrics from Ganglia, collectd, NetFlow, JMX, and many other infrastructure and application platforms over TCP and UDP Transform HTTP requests into events
Consume from web service firehoses like Twitter for social sentiment analysis Webhook support for GitHub, HipChat, JIRA, and countless other applications Enables many Watcher alerting use cases
Create events by polling HTTP endpoints on demand
Universally capture health, performance, metrics, and other types of data from web application interfaces Perfect for scenarios where the control of polling is preferred over receiving
Better understand your data from any relational database or NoSQL store with a JDBC interface Unify diverse data streams from messaging queues like Apache Kafka, RabbitMQ, and Amazon SQS
Logstash is the common event collection backbone for ingestion of data shipped from mobile devices to intelligent homes, connected vehicles, healthcare sensors, and many other industry specific applications.
Clean and transform your data during ingestion to gain near real-time insights immediately at index or output time. Logstash comes out-of-box with many aggregations and mutations along with pattern matching, geo mapping, and dynamic lookup capabilities.
Grok is the bread and butter of Logstash filters and is used ubiquitously to derive structure out of unstructured data. Enjoy a wealth of integrated patterns aimed to help quickly resolve web, systems, networking, and other types of event formats. Expand your horizons by deciphering geo coordinates from IP addresses, normalizing date complexity, simplifying key-value pairs and CSV data, fingerprinting(anonymizing) sensitive information, and further enriching your data with local lookups or Elasticsearch queries. Codecs are often used to ease the processing of common event structures like JSON and multiline events.
Choose Your Stashedit Route your data where it matters most. Unlock various downstream analytical and operational use cases by storing, analyzing, and taking action on your data.
Analysis
Elasticsearch Data stores such as MongoDB and Riak Archiving
HDFS S3 Monitoring
Nagios Ganglia Zabbix Graphite Datadog CloudWatch Alerting
Watcher with Elasticsearch Email Pagerduty IRC SNS
Stashing Your First Event First, let’s test your Logstash installation by running the most basic Logstash pipeline.
A Logstash pipeline has two required elements, input and output, and one optional element, filter. The input plugins consume data from a source, the filter plugins modify the data as you specify, and the output plugins write the data to a destination.
bin/logstash -e input { stdin { } } output { stdout {} }
The -e flag enables you to specify a configuration directly from the command line. Specifying configurations at the command line lets you quickly test configurations without having to edit a file between iterations. The pipeline in the example takes input from the standard input, stdin, and moves that input to the standard output, stdout, in a structured format. Logstash adds timestamp and IP address information to the message.
The Filebeat client is a lightweight, resource-friendly tool that collects logs from files on the server and forwards these logs to your Logstash instance for processing. Filebeat is designed for reliability and low latency. Filebeat has a light resource footprint on the host machine, and the Beats input plugin minimizes the resource demands on the Logstash instance.
to get list of plugins installed with logstash:
logstash-plugin list
To Verify configuration: logstash -f logstashconfig.conf --config.test_and_exit
To enable automatic config reloading. logstash -f logstashconfig.conf --config.reload.automatic
The geoip plugin configuration requires you to specify the name of the source field that contains the IP address to look up. In this example, the clientip field contains the IP address.
Since filters are evaluated in sequence, make sure that the geoip section is after the grok section of the configuration file and that both the grok and geoip sections are nested within the filter section.
The Logstash pipeline can index the data into an Elasticsearch cluster.
output { elasticsearch { hosts => [ "localhost:9200" ] } }
The Logstash event processing pipeline has three stages: inputs → filters → outputs. Inputs generate events, filters modify them, and outputs ship them elsewhere. Inputs and outputs support codecs that enable you to encode or decode the data as it enters or exits the pipeline without having to use a separate filter.
Input: file, syslog,redis, beats Filters: grok, mutate, drop, clone, geoip etc grok: parse and structure arbitrary text. Grok is currently the best way in Logstash to parse unstructured log data into something structured and queryable. With 120 patterns built-in to Logstash, it’s more than likely you’ll find one that meets your needs! mutate: perform general transformations on event fields. You can rename, remove, replace, and modify fields in your events. drop: drop an event completely, for example, debug events. clone: make a copy of an event, possibly adding or removing fields. geoip: add information about geographical location of IP addresses (also displays amazing charts in Kibana!) Output: elasticsearch, file, graphite, statsd Codecs: basically stream filters that can operate as part of an input or output.Codecs enable you to easily separate the transport of your messages from the serialization process. Popular codecs include json, msgpack, and plain (text).
json: encode or decode data in the JSON format. multiline: merge multiple-line text events such as java exception and stacktrace messages into a single event.
The Logstash event processing pipeline coordinates the execution of inputs, filters, and outputs.
Each input stage in the Logstash pipeline runs in its own thread. Inputs write events to a central queue that is either in memory (default) or on disk. Each pipeline worker thread takes a batch of events off this queue, runs the batch of events through the configured filters, and then runs the filtered events through any outputs. The size of the batch and number of pipeline worker threads are configurable (see Tuning and Profiling Logstash Performance).
By default, Logstash uses in-memory bounded queues between pipeline stages (input → filter and filter → output) to buffer events. If Logstash terminates unsafely, any events that are stored in memory will be lost. To help prevent data loss, you can enable Logstash to persist in-flight events to disk. See Persistent Queues for more information.
Event orderingedit By design and by default, Logstash does not guarantee event order. Reordering can occur in two places:
Events within a batch can be reordered during filter processing. In-flight batches can be reordered when one or more batches are processed faster than others. When maintaining event order is important, use a single worker and set pipeline.ordered ⇒ true. This approach ensures that batches are computed one-after-the-other, and that events maintain their order within the batch.
pipeline.ordered settingedit The pipeline.ordered setting in logstash.yml gives you more control over event ordering for single worker pipelines.
auto automatically enables ordering if the pipeline.workers setting is also set to 1. true will enforce ordering on the pipeline and prevent logstash from starting if there are multiple workers. false will disable the processing required to preserve order. Ordering will not be guaranteed, but you save the processing cost required to preserve order.
Java pipeline initialization time The Java pipeline initialization time appears in the startup logs at INFO level. Initialization time is the time it takes to compile the pipeline config and instantiate the compiled execution for all workers.
Reserved fields in Logstash eventsedit Some fields in Logstash events are reserved, or are required to adhere to a certain shape. Using these fields can cause runtime exceptions when the event API or plugins encounter incompatible values. @metadata: Java-based Plugin API: value is an org.logstash.ConvertedMap.
In serialized form (such as JSON): a key/value map where the keys must be strings and the values are not constrained to a particular type. @timestamp: An object holding representation of a specific moment in time, Java-based Plugin API: value is a java.time.Instant.In serialized form (such as JSON) or when setting with Event#set: an ISO8601-compliant String value is acceptable.
@version: A string, holding an integer value.
tags: An array of distinct strings
Logstash Configuration Files: Logstash has two types of configuration files: pipeline configuration files, which define the Logstash processing pipeline, and settings files, which specify options that control Logstash startup and execution.
Settings Filesedit The settings files are already defined in the Logstash installation. Logstash includes the following settings files: logstash.yml, pipelines.yml, jvm.options, log4j2.properties, startup.options
Logging APIs: Retrieve list of logging configurations: http://localhost:9600/_node/logging?pretty
Updating logging levels without restarting logstash dynamically:
curl -XPUT 'localhost:9600/_node/logging?pretty' -H 'Content-Type: application/json' -d' { "logger.logstash.outputs.elasticsearch" : "DEBUG" }
To reset dynamic changes of logging: All logging levels will revert to the values specified in the log4j2.properties file.
curl -XPUT 'localhost:9600/_node/logging/reset?pretty'
Slowlog: adds the ability to log when a specific event takes an abnormal amount of time to make its way through the pipeline.
What happens during a controlled shutdown: Logstash performs several steps before it can safely shutdown. It mush: Stop all input, filter and output plugins Process all in-flight events Terminate the Logstash process.
Following can affect the shutdown process: An input plugin receiving data at slow pace. A slow filter like a Ruby filter executing sleep(10000) or an Elasticsearch filter that is executing a very heavy query. A disconnected output plugin that is waiting to reconnect to flush in-flight events.
Logstash has a stall detection mechanism that analyzes the behavior of the pipeline and plugins during shutdown. This mechanism produces periodic information about the count of inflight events in internal queues and a list of busy worker threads.
To enable Logstash to forcibly terminate in the case of a stalled shutdown, use the --pipeline.unsafe_shutdown flag when you start Logstash.
Unsafe shutdowns, force-kills of the Logstash process, or crashes of the Logstash process for any other reason may result in data loss (unless you’ve enabled Logstash to use persistent queues). Shut down Logstash safely whenever possible.
The persistent queue directory is self-contained and can be read by a new Logstash instance running the same pipeline. You can safely shut down the original Logstash instance, spin up a new instance, and set path.queue in the logstash.yml settings file to point to the original queue directory. Keep in mind that only one Logstash instance can write to path.queue. You cannot have the original instance and the new instance writing to the queue at the same time.
Configuring Logstashedit To configure Logstash, you create a config file that specifies which plugins you want to use and settings for each plugin. You can reference event fields in a configuration and use conditionals to process events when they meet certain criteria. When you run logstash, you use the -f to specify your config file.
Logstash reads the specified configuration file and outputs to both Elasticsearch and stdout. Note that if you see a message in stdout that reads "Elasticsearch Unreachable" that you will need to make sure Elasticsearch is installed and up and reachable on port 9200. Before we move on to some more complex examples, let’s take a closer look at what’s in a config file.
Structure of a Config File: A Logstash config file has a separate section for each type of plugin you want to add to the event processing pipeline. For example:
`# This is a comment. You should use comments to describe
input { ... }
filter { ... }
output { ... }`
Each section contains the configuration options for one or more plugins. If you specify multiple filters, they are applied in the order of their appearance in the configuration file.
The configuration of a plugin consists of the plugin name followed by a block of settings for that plugin. `input { file { path => "/var/log/messages" type => "syslog" }
file { path => "/var/log/apache/access.log" type => "apache" } }`
Value Typesedit A plugin can require that the value for a setting be a certain type, such as boolean, list, or hash. The following value types are supported.
Arrayedit This type is now mostly deprecated in favor of using a standard type like string with the plugin defining the :list => true property for better type checking. It is still needed to handle lists of hashes or mixed types where type checking is not desired.
Example:
users => [ {id => 1, name => bob}, {id => 2, name => jane} ] Listsedit Not a type in and of itself, but a property types can have. This makes it possible to type check multiple values. Plugin authors can enable list checking by specifying :list => true when declaring an argument.
Example:
path => [ "/var/log/messages", "/var/log/*.log" ] uris => [ "http://elastic.co", "http://example.net" ] This example configures path, which is a string to be a list that contains an element for each of the three strings. It also will configure the uris parameter to be a list of URIs, failing if any of the URIs provided are not valid.
Booleanedit A boolean must be either true or false. Note that the true and false keywords are not enclosed in quotes.
Example:
ssl_enable => true Bytesedit A bytes field is a string field that represents a valid unit of bytes. It is a convenient way to declare specific sizes in your plugin options. Both SI (k M G T P E Z Y) and Binary (Ki Mi Gi Ti Pi Ei Zi Yi) units are supported. Binary units are in base-1024 and SI units are in base-1000. This field is case-insensitive and accepts space between the value and the unit. If no unit is specified, the integer string represents the number of bytes.
Examples:
my_bytes => "1113" # 1113 bytes my_bytes => "10MiB" # 10485760 bytes my_bytes => "100kib" # 102400 bytes my_bytes => "180 mb" # 180000000 bytes Codecedit A codec is the name of Logstash codec used to represent the data. Codecs can be used in both inputs and outputs.
Input codecs provide a convenient way to decode your data before it enters the input. Output codecs provide a convenient way to encode your data before it leaves the output. Using an input or output codec eliminates the need for a separate filter in your Logstash pipeline.
A list of available codecs can be found at the Codec Plugins page.
Example:
codec => "json" Hashedit A hash is a collection of key value pairs specified in the format "field1" => "value1". Note that multiple key value entries are separated by spaces rather than commas.
Example:
match => { "field1" => "value1" "field2" => "value2" ... }
match => { "field1" => "value1" "field2" => "value2" } Numberedit Numbers must be valid numeric values (floating point or integer).
Example:
port => 33 Passwordedit A password is a string with a single value that is not logged or printed.
Example:
my_password => "password" URIedit A URI can be anything from a full URL like http://elastic.co/ to a simple identifier like foobar. If the URI contains a password such as http://user:[email protected] the password portion of the URI will not be logged or printed.
Example:
my_uri => "http://foo:[email protected]" Pathedit A path is a string that represents a valid operating system path.
Example:
my_path => "/tmp/logstash" Stringedit A string must be a single character sequence. Note that string values are enclosed in quotes, either double or single.
Escape Sequencesedit By default, escape sequences are not enabled. If you wish to use escape sequences in quoted strings, you will need to set config.support_escapes: true in your logstash.yml. When true, quoted strings (double and single) will have this transformation: \r , \n, \t ,\ , " , '
Commentsedit Comments are the same as in perl, ruby, and python. A comment starts with a # character, and does not need to be at the beginning of a line.
Accessing Event Data and Fields in the Configurationedit The logstash agent is a processing pipeline with 3 stages: inputs → filters → outputs. Inputs generate events, filters modify them, outputs ship them elsewhere.
All events have properties. For example, an apache access log would have things like status code (200, 404), request path ("/", "index.html"), HTTP verb (GET, POST), client IP address, etc. Logstash calls these properties "fields."
Some of the configuration options in Logstash require the existence of fields in order to function. Because inputs generate events, there are no fields to evaluate within the input block—they do not exist yet!
Because of their dependency on events and fields, the following configuration options will only work within filter and output blocks.
Field References: The basic syntax to access a field is [fieldname]. If you are referring to a top-level field, you can omit the [] and simply use fieldname. To refer to a nested field, you specify the full path to that field: [top-level field][nested field].
For example, the following event has five top-level fields (agent, ip, request, response, ua) and three nested fields (status, bytes, os).
{ "agent": "Mozilla/5.0 (compatible; MSIE 9.0)", "ip": "192.168.24.44", "request": "/index.html" "response": { "status": 200, "bytes": 52353 }, "ua": { "os": "Windows 7" } }
To reference the os field, you specify [ua][os]. To reference a top-level field such as request, you can simply specify the field name.
sprintf formatedit
The field reference format is also used in what Logstash calls sprintf format. This format enables you to refer to field values from within other strings. For example, the statsd output has an increment setting that enables you to keep a count of apache logs by status code:
output { statsd { increment => "apache.%{[response][status]}" } }
Similarly, you can convert the timestamp in the @timestamp field into a string. Instead of specifying a field name inside the curly braces, use the +FORMAT syntax where FORMAT is a time format.
For example, if you want to use the file output to write to logs based on the event’s date and hour and the type field:
output { file { path => "/var/log/%{type}.%{+yyyy.MM.dd.HH}" } }
Conditionalsedit Sometimes you only want to filter or output an event under certain conditions. For that, you can use a conditional.
Conditionals in Logstash look and act the same way they do in programming languages. Conditionals support if, else if and else statements and can be nested.
if EXPRESSION { ... } else if EXPRESSION { ... } else { ... }
You can use the following comparison operators:
equality: ==, !=, <, >, <=, >=
regexp: =, ! (checks a pattern on the right against a string value on the left)
inclusion: in, not in
The supported boolean operators are:
and, or, nand, xor The supported unary operators are:
!
Expressions can be long and complex. Expressions can contain other expressions, you can negate expressions with !, and you can group them with parentheses (...).
For example, the following conditional uses the mutate filter to remove the field secret if the field action has a value of login:
filter {
if [action] == "login" {
mutate { remove_field => "secret" }
}
}
You can specify multiple expressions in a single condition:
output {
if [loglevel] == "ERROR" and [deployment] == "production" { pagerduty { ... } } }
You can use the in operator to test whether a field contains a specific string, key, or list element. Note that the semantic meaning of in can vary, based on the target type. For example, when applied to a string. in means "is a substring of". When applied to a collection type, in means "collection contains the exact value".
filter { if [foo] in [foobar] { mutate { add_tag => "field in field" } } if [foo] in "foo" { mutate { add_tag => "field in string" } } if "hello" in [greeting] { mutate { add_tag => "string in field" } } if [foo] in ["hello", "world", "foo"] { mutate { add_tag => "field in list" } } if [missing] in [alsomissing] { mutate { add_tag => "shouldnotexist" } } if !("foo" in ["hello", "world"]) { mutate { add_tag => "shouldexist" } } }
You use the not in conditional the same way. For example, you could use not in to only route events to Elasticsearch when grok is successful:
output { if "_grokparsefailure" not in [tags] { elasticsearch { ... } } }
You can check for the existence of a specific field, but there’s currently no way to differentiate between a field that doesn’t exist versus a field that’s simply false. The expression if [foo] returns false when:
[foo] doesn’t exist in the event, [foo] exists in the event, but is false, or [foo] exists in the event, but is null
The @metadata fieldedit In Logstash 1.5 and later, there is a special field called @metadata. The contents of @metadata will not be part of any of your events at output time, which makes it great to use for conditionals, or extending and building event fields with field reference and sprintf formatting.
The following configuration file will yield events from STDIN. Whatever is typed will become the message field in the event. The mutate events in the filter block will add a few fields, some nested in the @metadata field.
input { stdin { } }
filter { mutate { add_field => { "show" => "This data will be in the output" } } mutate { add_field => { "[@metadata][test]" => "Hello" } } mutate { add_field => { "[@metadata][no_show]" => "This data will not be in the output" } } }
output { if [@metadata][test] == "Hello" { stdout { codec => rubydebug } } }
The rubydebug codec allows you to reveal the contents of the @metadata field if you add a config flag, metadata => true: stdout { codec => rubydebug { metadata => true } }
Make use of the @metadata field any time you need a temporary field but do not want it to be in the final output.
Perhaps one of the most common use cases for this new field is with the date filter and having a temporary timestamp.
This configuration file has been simplified, but uses the timestamp format common to Apache and Nginx web servers. In the past, you’d have to delete the timestamp field yourself, after using it to overwrite the @timestamp field. With the @metadata field, this is no longer necessary:
input { stdin { } }
filter { grok { match => [ "message", "%{HTTPDATE:[@metadata][timestamp]}" ] } date { match => [ "[@metadata][timestamp]", "dd/MMM/yyyy:HH:mm:ss Z" ] } }
output { stdout { codec => rubydebug } } Notice that this configuration puts the extracted date into the [@metadata][timestamp] field in the grok filter. Let’s feed this configuration a sample date string and see what comes out: That’s it! No extra fields in the output, and a cleaner config file because you do not have to delete a "timestamp" field after conversion in the date filter.
Another use case is the CouchDB Changes input plugin.
This plugin automatically captures CouchDB document field metadata into the @metadata field within the input plugin itself. When the events pass through to be indexed by Elasticsearch, the Elasticsearch output plugin allows you to specify the action (delete, update, insert, etc.) and the document_id, like this:
output { elasticsearch { action => "%{[@metadata][action]}" document_id => "%{[@metadata][_id]}" hosts => ["example.com"] index => "index_name" protocol => "http" } }
Using Environment Variables in the Configuration:
You can set environment variable references in the configuration for Logstash plugins by using
examples: input { tcp { port => "${TCP_PORT}" } } If the TCP_PORT environment variable is not set, Logstash returns a configuration error.
You can fix this problem by specifying a default value: input { tcp { port => "${TCP_PORT:54321}" } }
Setting the Value of a Tagedit Here’s an example that uses an environment variable to set the value of a tag:
filter { mutate { add_tag => [ "tag1", "${ENV_TAG}" ] } }
Setting a File path: uses an environment variable to set the path to a log file: filter { mutate { add_field => { "my_path" => "${HOME}/file.log" } } }
Logstash Configuration Examples: Configure logstash to filter events, process Apache logs and syslog messages and use conditionals to control what events are processed by a filter or output.
Filters are an in-line processing mechanism that provide the flexibility to slice and dice your data to fit your needs.
input { stdin { } }
filter { grok { match => { "message" => "%{COMBINEDAPACHELOG}" } } date { match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ] } }
output { elasticsearch { hosts => ["localhost:9200"] } stdout { codec => rubydebug } }
input message: 127.0.0.1 - - [11/Dec/2013:00:01:45 -0800] "GET /xampp/status.php HTTP/1.1" 200 3891 "http://cadenza/xampp/navi.php" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; rv:25.0) Gecko/20100101 Firefox/25.0"
As you can see, Logstash (with help from the grok filter) was able to parse the log line (which happens to be in Apache "combined log" format) and break it up into many different discrete bits of information. This is extremely useful once you start querying and analyzing our log data. For example, you’ll be able to easily run reports on HTTP response codes, IP addresses, referrers, and so on. There are quite a few grok patterns included with Logstash out-of-the-box, so it’s quite likely if you need to parse a common log format, someone has already done the work for you.
https://github.com/logstash-plugins/logstash-patterns-core/tree/master/patterns
The other filter used in this example is the date filter. This filter parses out a timestamp and uses it as the timestamp for the event (regardless of when you’re ingesting the log data). You’ll notice that the @timestamp field in this example is set to December 11, 2013, even though Logstash is ingesting the event at some point afterwards. This is handy when backfilling logs. It gives you the ability to tell Logstash "use this value as the timestamp for this event"
Processing Apache Logs: input { file { path => "/tmp/access_log" start_position => "beginning" } }
filter { if [path] =~ "access" { mutate { replace => { "type" => "apache_access" } } grok { match => { "message" => "%{COMBINEDAPACHELOG}" } } } date { match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ] } }
output { elasticsearch { hosts => ["localhost:9200"] } stdout { codec => rubydebug } }
input Events: 71.141.244.242 - kurt [18/May/2011:01:48:10 -0700] "GET /admin HTTP/1.1" 301 566 "-" "Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US; rv:1.9.2.3) Gecko/20100401 Firefox/3.6.3" 134.39.72.245 - - [18/May/2011:12:40:18 -0700] "GET /favicon.ico HTTP/1.1" 200 1189 "-" "Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 5.1; Trident/4.0; .NET CLR 2.0.50727; .NET CLR 3.0.4506.2152; .NET CLR 3.5.30729; InfoPath.2; .NET4.0C; .NET4.0E)" 98.83.179.51 - - [18/May/2011:19:35:08 -0700] "GET /css/main.css HTTP/1.1" 200 1837 "http://www.safesand.com/information.htm" "Mozilla/5.0 (Windows NT 6.0; WOW64; rv:2.0.1) Gecko/20100101 Firefox/4.0.1"
Now you should see your apache log data in Elasticsearch! Logstash opened and read the specified input file, processing each event it encountered. Any additional lines logged to this file will also be captured, processed by Logstash as events, and stored in Elasticsearch. As an added bonus, they are stashed with the field "type" set to "apache_access" (this is done by the type ⇒ "apache_access" line in the input configuration).
In this configuration, Logstash is only watching the apache access_log, but it’s easy enough to watch both the access_log and the error_log (actually, any file matching *log), by changing one line in the above configuration:
input { file { path => "/tmp/*_log" ...
When you restart Logstash, it will process both the error and access logs. However, if you inspect your data (using elasticsearch-kopf, perhaps), you’ll see that the access_log is broken up into discrete fields, but the error_log isn’t. That’s because we used a grok filter to match the standard combined apache log format and automatically split the data into separate fields. Wouldn’t it be nice if we could control how a line was parsed, based on its format? Well, we can…
Note that Logstash did not reprocess the events that were already seen in the access_log file. When reading from a file, Logstash saves its position and only processes new lines as they are added. Neat!
Using Conditionalsedit You use conditionals to control what events are processed by a filter or output. For example, you could label each event according to which file it appeared in (access_log, error_log, and other random files that end with "log").
input { file { path => "/tmp/*_log" } }
filter { if [path] =~ "access" { mutate { replace => { type => "apache_access" } } grok { match => { "message" => "%{COMBINEDAPACHELOG}" } } date { match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ] } } else if [path] =~ "error" { mutate { replace => { type => "apache_error" } } } else { mutate { replace => { type => "random_logs" } } } }
output { elasticsearch { hosts => ["localhost:9200"] } stdout { codec => rubydebug } }
This example labels all events using the type field, but doesn’t actually parse the error or random files. There are so many types of error logs that how they should be labeled really depends on what logs you’re working with.
Similarly, you can use conditionals to direct events to particular outputs. For example, you could:
alert nagios of any apache events with status 5xx record any 4xx status to Elasticsearch record all status code hits via statsd
This example labels all events using the type field, but doesn’t actually parse the error or random files. There are so many types of error logs that how they should be labeled really depends on what logs you’re working with.
Similarly, you can use conditionals to direct events to particular outputs. For example, you could:
alert nagios of any apache events with status 5xx record any 4xx status to Elasticsearch record all status code hits via statsd To tell nagios about any http event that has a 5xx status code, you first need to check the value of the type field. If it’s apache, then you can check to see if the status field contains a 5xx error. If it is, send it to nagios. If it isn’t a 5xx error, check to see if the status field contains a 4xx error. If so, send it to Elasticsearch. Finally, send all apache status codes to statsd no matter what the status field contains:
output { if [type] == "apache" { if [status] =~ /^5\d\d/ { nagios { ... } } else if [status] =~ /^4\d\d/ { elasticsearch { ... } } statsd { increment => "apache.%{status}" } } }
Processing Syslog Messages: Syslog is one of the most common use cases for Logstash, and one it handles exceedingly well (as long as the log lines conform roughly to RFC3164). Syslog is the de facto UNIX networked logging standard, sending messages from client machines to a local file, or to a centralized log server via rsyslog. For this example, you won’t need a functioning syslog instance; we’ll fake it from the command line so you can get a feel for what happens.
First, let’s make a simple configuration file for Logstash + syslog, called logstash-syslog.conf.
input { tcp { port => 5000 type => syslog } udp { port => 5000 type => syslog } }
filter { if [type] == "syslog" { grok { match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:[%{POSINT:syslog_pid}])?: %{GREEDYDATA:syslog_message}" } add_field => [ "received_at", "%{@timestamp}" ] add_field => [ "received_from", "%{host}" ] } date { match => [ "syslog_timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ] } } }
output { elasticsearch { hosts => ["localhost:9200"] } stdout { codec => rubydebug } }
Normally, a client machine would connect to the Logstash instance on port 5000 and send its message. For this example, we’ll just telnet to Logstash and enter a log line (similar to how we entered log lines into STDIN earlier). Open another shell window to interact with the Logstash syslog input and enter the following command:
telnet localhost 5000
input messages: Dec 23 12:11:43 louis postfix/smtpd[31499]: connect from unknown[95.75.93.154] Dec 23 14:42:56 louis named[16000]: client 199.48.164.7#64817: query (cache) 'amsterdamboothuren.com/MX/IN' denied Dec 23 14:30:01 louis CRON[619]: (www-data) CMD (php /usr/share/cacti/site/poller.php >/dev/null 2>/var/log/cacti/poller-error.log) Dec 22 18:28:06 louis rsyslogd: [origin software="rsyslogd" swVersion="4.2.0" x-pid="2253" x-info="http://www.rsyslog.com"] rsyslogd was HUPed, type 'lightweight'.
The Logstash Elasticsearch plugins (output, input, filter and monitoring) support authentication and encryption over HTTP.
To use Logstash with a secured cluster, you need to configure authentication credentials for Logstash. Logstash throws an exception and the processing pipeline is halted if authentication fails.
If encryption is enabled on the cluster, you also need to enable TLS/SSL in the Logstash configuration. In addition to configuring authentication credentials for Logstash, you need to grant authorized users permission to access the Logstash indices.
Configuring Logstash to use Basic Authenticationedit Logstash needs to be able to manage index templates, create indices, and write and delete documents in the indices it creates.
To set up authentication credentials for Logstash:
Use the the Management > Roles UI in Kibana or the role API to create a logstash_writer role. For cluster privileges, add manage_index_templates and monitor. For indices privileges, add write, create, delete, and create_index.
If you plan to use index lifecycle management, also add manage_ilm for cluster and manage and manage_ilm for indices.
POST _xpack/security/role/logstash_writer
{
"cluster": ["manage_index_templates", "monitor", "manage_ilm"],
"indices": [
{
"names": [ "logstash-*" ],
"privileges": ["write","create","delete","create_index","manage","manage_ilm"]
}
]
}
Create a logstash_internal user and assign it the logstash_writer role. You can create users from the Management > Users UI in Kibana or through the user API:
POST _xpack/security/user/logstash_internal { "password" : "x-pack-test-password", "roles" : [ "logstash_writer"], "full_name" : "Internal Logstash User" }
Configure Logstash to authenticate as the logstash_internal user you just created. You configure credentials separately for each of the Elasticsearch plugins in your Logstash .conf file. For example:
input { elasticsearch { ... user => logstash_internal password => x-pack-test-password } } filter { elasticsearch { ... user => logstash_internal password => x-pack-test-password } } output { elasticsearch { ... user => logstash_internal password => x-pack-test-password } } Granting Users Access to the Logstash Indices
Advanced Logstash Configurations: Logstash can handle more advanced requirements, such as multiple pipelines, communication between Logstash pipelines, and multiple line events. Multiple Pipelines: If you need to run more than one pipeline in the same process, Logstash provides a way to do this through a configuration file called pipelines.yml. This file must be placed in the path.settings folder and follows this structure:
- pipeline.id: my-pipeline_1 path.config: "/etc/path/to/p1.config" pipeline.workers: 3
- pipeline.id: my-other-pipeline path.config: "/etc/different/path/p2.cfg" queue.type: persisted
The example shows two different pipelines described by their IDs and configuration paths. For the first pipeline, the value of pipeline.workers is set to 3, while in the other, the persistent queue feature is enabled. The value of a setting that is not explicitly set in the pipelines.yml file will fall back to the default specified in the logstash.yml settings file.
When you start Logstash without arguments, it will read the pipelines.yml file and instantiate all pipelines specified in the file. On the other hand, when you use -e or -f, Logstash ignores the pipelines.yml file and logs a warning about it.
Using multiple pipelines is especially useful if your current configuration has event flows that don’t share the same inputs/filters and outputs and are being separated from each other using tags and conditionals.
Having multiple pipelines in a single instance also allows these event flows to have different performance and durability parameters (for example, different settings for pipeline workers and persistent queues). This separation means that a blocked output in one pipeline won’t exert backpressure in the other.
That said, it’s important to take into account resource competition between the pipelines, given that the default values are tuned for a single pipeline. So, for example, consider reducing the number of pipeline workers used by each pipeline, because each pipeline will use 1 worker per CPU core by default.
Persistent queues and dead letter queues are isolated per pipeline, with their locations namespaced by the pipeline.id value.
Pipeline to Pipeline Communication: When using the multiple pipeline feature of Logstash, you may want to connect multiple pipelines within the same Logstash instance. This configuration can be useful to isolate the execution of these pipelines, as well as to help break-up the logic of complex pipelines. The pipeline input/output enables a number of advanced architectural patterns discussed later in this document.
If you need to set up communication between Logstash instances, use either Logstash-to-Logstash communications, or an intermediary queue, such as Kafka or Redis.
Configuration overview: Use the pipeline input and pipeline output to connect two pipelines running within the same Logstash instance. These inputs use a client-server approach, where the pipeline input registers a virtual address that a pipeline output can connect to.
Create a downstream pipeline that listens for events on a virtual address. Create an upstream pipeline that produces events, sending them through a pipeline output to one or more virtual addresses.
Example:
- pipeline.id: upstream config.string: input { stdin {} } output { pipeline { send_to => [myVirtualAddress] } }
- pipeline.id: downstream config.string: input { pipeline { address => myVirtualAddress } }
The pipeline input acts as a virtual server listening on a single virtual address in the local process. Only pipeline outputs running on the same local Logstash can send events to this address. Pipeline outputs can send events to a list of virtual addresses. A pipeline output will be blocked if the downstream pipeline is blocked or unavailable.
When events are sent across pipelines, their data is fully copied. Modifications to an event in a downstream pipeline do not affect that event in any upstream pipelines.
The pipeline plugin may be the most efficient way to communicate between pipelines, but it still incurs a performance cost. Logstash must duplicate each event in full on the Java heap for each downstream pipeline. Using this feature may affect the heap memory utilization of Logstash.
Delivery guaranteesedit In its standard configuration the pipeline input/output has at-least-once delivery guarantees. The output will be blocked if the address is blocked or unavailable.
By default, the ensure_delivery option on the pipeline output is set to true. If you change the ensure_delivery flag to false, an unavailable downstream pipeline causes the sent message to be discarded. Note that a pipeline is considered unavailable only when it is starting up or reloading, not when any of the plugins it may contain are blocked. A blocked downstream pipeline blocks the sending output/pipeline regardless of the value of the ensure_delivery flag. Use ensure_delivery => false when you want the ability to temporarily disable a downstream pipeline without blocking any upstream pipelines sending to it.
These delivery guarantees also inform the shutdown behavior of this feature. When performing a pipeline reload, changes will be made immediately as the user requests, even if that means removing a downstream pipeline receiving events from an upstream pipeline. This will cause the upstream pipeline to block. You must restore the downstream pipeline to cleanly shut down Logstash. You may issue a force kill, but inflight events may be lost unless the persistent queue is enabled for that pipeline.
Avoid cyclesedit When you connect pipelines, keep the data flowing in one direction. Looping data or connecting the pipelines into a cyclic graph can cause problems. Logstash waits for each pipeline’s work to complete before shutting down. Pipeline loops can prevent Logstash from shutting down cleanly.
Architectural Patterns: You can use the pipeline input and output to better organize code, streamline control flow, and isolate the performance of complex configurations. There are infinite ways to connect pipelines. The ones presented here offer some ideas.
The distributor pattern The output isolator pattern The forked path pattern The collector pattern
Distributor Pattern: You can use the distributor pattern in situations where there are multiple types of data coming through a single input, each with its own complex set of processing rules. With the distributor pattern one pipeline is used to route data to other pipelines based on type. Each type is routed to a pipeline with only the logic for handling that type. In this way each type’s logic can be isolated.
As an example, in many organizations a single beats input may be used to receive traffic from a variety of sources, each with its own processing logic. A common way to deal with this type of data is to have a number of if conditions separating the traffic and processing each type differently. This approach can quickly get messy when configs are long and complex.
Here is an example distributor pattern configuration.
- pipeline.id: beats-server config.string: | input { beats { port => 5044 } } output { if [type] == apache { pipeline { send_to => weblogs } } else if [type] == system { pipeline { send_to => syslog } } else { pipeline { send_to => fallback } } }
- pipeline.id: weblog-processing config.string: | input { pipeline { address => weblogs } } filter { # Weblog filter statements here... } output { elasticsearch { hosts => [es_cluster_a_host] } }
- pipeline.id: syslog-processing config.string: | input { pipeline { address => syslog } } filter { # Syslog filter statements here... } output { elasticsearch { hosts => [es_cluster_b_host] } }
- pipeline.id: fallback-processing config.string: | input { pipeline { address => fallback } } output { elasticsearch { hosts => [es_cluster_b_host] } }
The output isolator Pattern: You can use the output isolator pattern to prevent Logstash from becoming blocked if one of multiple outputs experiences a temporary failure. Logstash, by default, is blocked when any single output is down. This behavior is important in guaranteeing at-least-once delivery of data.
For example, a server might be configured to send log data to both Elasticsearch and an HTTP endpoint. The HTTP endpoint might be frequently unavailable due to regular service or other reasons. In this scenario, data would be paused from sending to Elasticsearch any time the HTTP endpoint is down.
Using the output isolator pattern and persistent queues, we can continue sending to Elasticsearch, even when one output is down.
Here is an example of this scenario using the output isolator pattern.
- pipeline.id: intake queue.type: persisted config.string: | input { beats { port => 5044 } } output { pipeline { send_to => [es, http] } }
- pipeline.id: buffered-es queue.type: persisted config.string: | input { pipeline { address => es } } output { elasticsearch { } }
- pipeline.id: buffered-http queue.type: persisted config.string: | input { pipeline { address => http } } output { http { } } In this architecture, each stage has its own queue with its own tuning and settings. Note that this approach uses up to three times as much disk space and incurs three times as much serialization/deserialization cost as a single pipeline.
If any of the persistent queues of the downstream pipelines (in the example above, buffered-es and buffered-http) become full, both outputs will stop.
The forked path patternedit You can use the forked path pattern for situations where a single event must be processed more than once according to different sets of rules. Before the pipeline input and output were available, this need was commonly addressed through creative use of the clone filter and if/else rules.
Let’s imagine a use case where we receive data and index the full event in our own systems, but publish a redacted version of the data to a partner’s S3 bucket. We might use the output isolator pattern described above to decouple our writes to either system. The distinguishing feature of the forked path pattern is the existence of additional rules in the downstream pipelines.
Here is an example of the forked path configuration.
- pipeline.id: intake queue.type: persisted config.string: | input { beats { port => 5044 } } output { pipeline { send_to => ["internal-es", "partner-s3"] } }
- pipeline.id: buffered-es queue.type: persisted config.string: | input { pipeline { address => "internal-es" } } output { elasticsearch { } }
- pipeline.id: partner queue.type: persisted config.string: | input { pipeline { address => "partner-s3" } } filter { # Remove the sensitive data mutate { remove_field => 'sensitive-data' } } output { s3 { } } # Output to partner's bucket The collector patternedit You can use the collector pattern when you want to define a common set of outputs and pre-output filters that many disparate pipelines might use. This pattern is the opposite of the distributor pattern. In this pattern many pipelines flow in to a single pipeline where they share outputs and processing. This pattern simplifies configuration at the cost of reducing isolation, since all data is sent through a single pipeline.
Here is an example of the collector pattern.
- pipeline.id: beats config.string: | input { beats { port => 5044 } } output { pipeline { send_to => [commonOut] } }
- pipeline.id: kafka config.string: | input { kafka { ... } } output { pipeline { send_to => [commonOut] } }
- pipeline.id: partner config.string: | input { pipeline { address => commonOut } } filter { # Always remove sensitive data from all input sources mutate { remove_field => 'sensitive-data' } } output { elasticsearch { } }
Reloading the config file: You can set Logstash to detect and reload configuration changes automatically.
To enable automatic config reloading, start Logstash with the --config.reload.automatic (or -r) command-line option specified. For example:
bin/logstash -f apache.config --config.reload.automatic The --config.reload.automatic option is not available when you specify the -e flag to pass in configuration settings from the command-line.
By default, Logstash checks for configuration changes every 3 seconds. To change this interval, use the --config.reload.interval option, where interval specifies how often Logstash checks the config files for changes (in seconds).
Note that the unit qualifier (s) is required.
Force reloading the config fileedit If Logstash is already running without auto-reload enabled, you can force Logstash to reload the config file and restart the pipeline. Do this by sending a SIGHUP (signal hangup) to the process running Logstash. For example:
kill -SIGHUP 14175
How automatic config reloading works: When Logstash detects a change in a config file, it stops the current pipeline by stopping all inputs, and it attempts to create a new pipeline that uses the updated configuration. After validating the syntax of the new configuration, Logstash verifies that all inputs and outputs can be initialized (for example, that all required ports are open). If the checks are successful, Logstash swaps the existing pipeline with the new pipeline. If the checks fail, the old pipeline continues to function, and the errors are propagated to the console.
During automatic config reloading, the JVM is not restarted. The creating and swapping of pipelines all happens within the same process.
Changes to grok pattern files are also reloaded, but only when a change in the config file triggers a reload (or the pipeline is restarted).
In general, Logstash is not watching or monitoring any configuration files used or referenced by inputs, filters or outputs.
Plugins that prevent automatic reloadingedit Input and output plugins usually interact with OS resources. In some circumstances those resources can’t be released without a restart. For this reason some plugins can’t be simply updated and this prevents pipeline reload.
The stdin input plugin, for example, prevents reloading for these reasons.
Managing Multiline Events:
Several use cases generate events that span multiple lines of text. In order to correctly handle these multiline events, Logstash needs to know how to tell which lines are part of a single event.
Multiline event processing is complex and relies on proper event ordering. The best way to guarantee ordered log processing is to implement the processing as early in the pipeline as possible.
The multiline codec is the preferred tool for handling multiline events in the Logstash pipeline. The multiline codec merges lines from a single input using a simple set of rules.
If you are using a Logstash input plugin that supports multiple hosts, such as the beats input plugin, you should not use the multiline codec to handle multiline events. Doing so may result in the mixing of streams and corrupted event data. In this situation, you need to handle multiline events before sending the event data to Logstash.
The most important aspects of configuring the multiline codec are the following:
The pattern option specifies a regular expression. Lines that match the specified regular expression are considered either continuations of a previous line or the start of a new multiline event. You can use grok regular expression templates with this configuration option. The what option takes two values: previous or next. The previous value specifies that lines that match the value in the pattern option are part of the previous line. The next value specifies that lines that match the value in the pattern option are part of the following line.* The negate option applies the multiline codec to lines that do not match the regular expression specified in the pattern option. See the full documentation for the multiline codec plugin for more information on configuration options.
Examples of Multiline Codec Configurationedit The examples in this section cover the following use cases:
Combining a Java stack trace into a single event Combining C-style line continuations into a single event Combining multiple lines from time-stamped events Java Stack Tracesedit Java stack traces consist of multiple lines, with each line after the initial line beginning with whitespace, as in this example:
Exception in thread "main" java.lang.NullPointerException at com.example.myproject.Book.getTitle(Book.java:16) at com.example.myproject.Author.getBookTitles(Author.java:25) at com.example.myproject.Bootstrap.main(Bootstrap.java:14) To consolidate these lines into a single event in Logstash, use the following configuration for the multiline codec:
input { stdin { codec => multiline { pattern => "^\s" what => "previous" } } } This configuration merges any line that begins with whitespace up to the previous line.
Line Continuationsedit Several programming languages use the \ character at the end of a line to denote that the line continues, as in this example:
printf ("%10.10ld \t %10.10ld \t %s
%f", w, x, y, z );
To consolidate these lines into a single event in Logstash, use the following configuration for the multiline codec:
input { stdin { codec => multiline { pattern => "\$" what => "next" } } } This configuration merges any line that ends with the \ character with the following line.
Timestampsedit Activity logs from services such as Elasticsearch typically begin with a timestamp, followed by information on the specific activity, as in this example:
[2015-08-24 11:49:14,389][INFO ][env ] [Letha] using [1] data paths, mounts [[/ (/dev/disk1)]], net usable_space [34.5gb], net total_space [118.9gb], types [hfs] To consolidate these lines into a single event in Logstash, use the following configuration for the multiline codec:
input { file { path => "/var/log/someapp.log" codec => multiline { pattern => "^%{TIMESTAMP_ISO8601} " negate => true what => previous } } } This configuration uses the negate option to specify that any line that does not begin with a timestamp belongs to the previous line.
Glob Pattern Support: Logstash supports the following patterns wherever glob patterns are allowed:
Match any file. You can also use an * to restrict other values in the glob. For example, conf matches all files that end in conf. apache matches any files with apache in the name. This pattern does not match hidden files (dot files) on Unix-like operating systems. To match dot files, use a pattern like {,.}.
**
Match directories recursively.
?
Match any one character.
[set]
Match any one character in a set. For example, [a-z]. Also supports set negation ([^a-z]).
{p,q}
Match either literal p or literal q. The matching literal can be more than one character, and you can specify more than two literals. This pattern is the equivalent to using alternation with the vertical bar in regular expressions (foo|bar).
Escape the next metacharacter. This means that you cannot use a backslash in Windows as part of a glob. The pattern c:\foo will not work, so use foo* instead.
Example Patternsedit
Here are some common examples of glob patterns:
"/path/to/.conf" Matches config files ending in .conf in the specified path. "/var/log/.log" Matches log files ending in .log in the specified path. "/var/log/**/*.log" Matches log files ending in .log in subdirectories under the specified path. "/path/to/logs/{app1,app2,app3}/data.log" Matches app log files in the app1, app2, and app3 subdirectories under the specified path.
Converting Ingest Node Pipeline: After implementing ingest pipelines to parse your data, you might decide that you want to take advantage of the richer transformation capabilities in Logstash. For example, you may need to use Logstash instead of ingest pipelines if you want to:
Ingest from more inputs. Logstash can natively ingest data from many other sources like TCP, UDP, syslog, and relational databases. Use multiple outputs. Ingest node was designed to only support Elasticsearch as an output, but you may want to use more than one output. For example, you may want to archive your incoming data to S3 as well as indexing it in Elasticsearch. Take advantage of the richer transformation capabilities in Logstash, such as external lookups. Use the persistent queue feature to handle spikes when ingesting data (from Beats and other sources). To make it easier for you to migrate your configurations, Logstash provides an ingest pipeline conversion tool. The conversion tool takes the ingest pipeline definition as input and, when possible, creates the equivalent Logstash configuration as output.
To run the conversion tool, use the following command: bin/ingest-convert.sh --input INPUT_FILE_URI --output OUTPUT_FILE_URI [--append-stdio]
Where:
INPUT_FILE_URI is a file URI that specifies the full path to the JSON file that defines the ingest node pipeline. OUTPUT_FILE_URI is the file URI of the Logstash DSL file that will be generated by the tool. --append-stdio is an optional flag that adds stdin and stdout sections to the config instead of adding the default Elasticsearch output. This command expects a file URI, so make sure you use forward slashes and specify the full path to the file.
For example: bin/ingest-convert.sh --input file:///tmp/ingest/apache.json --output file:///tmp/ingest/apache.conf
Limitations Painless script conversion is not supported. Only a subset of available processors are supported for conversion. For processors that are not supported, the tool produces a warning and continues with a best-effort conversion. Supported Processors The following ingest node processors are currently supported for conversion by the tool:
Append Convert Date GeoIP Grok Gsub Json Lowercase Rename Set
You can set up communication between two Logstash machines by connecting the Lumberjack output to the Beats input. You may need this configuration if the data path crosses network or firewall boundaries, for example. If you don’t have a compelling need for Logstash-to-Logstash communication, then don’t implement it.
If you are looking for information on connecting multiple pipelines within one Logstash instance, see Pipeline-to-Pipeline Communication.
Configuration Overview: Use the Lumberjack protocol to connect two Logstash machines.
Generate a trusted SSL certificate (required by the lumberjack protocol). Copy the SSL certificate to the upstream Logstash machine. Copy the SSL certificate and key to the downstream Logstash machine. Set the upstream Logstash machine to use the Lumberjack output to send data. Set the downstream Logstash machine to listen for incoming Lumberjack connections through the Beats input. Test it.
Test it. Generate a self-signed SSL certificate and key: Use the openssl req command to generate a self-signed certificate and key. The openssl req command is available with some operating systems. You may need to install the openssl command line program for others.
Run the following command:
openssl req -x509 -batch -nodes -newkey rsa:2048 -keyout lumberjack.key -out lumberjack.cert -subj /CN=localhost where:
lumberjack.key is the name of the SSL key to be created lumberjack.cert is the name of the SSL certificate to be created localhost is the name of the upstream Logstash computer
Copy the SSL certificate and keyedit Copy the SSL certificate to the upstream Logstash machine.
Copy the SSL certificate and key to the downstream Logstash machine.
Start the upstream Logstash instance: Start Logstash and generate test events:
bin/logstash -e 'input { generator { count => 5 } } output { lumberjack { codec => json hosts => "mydownstreamhost" ssl_certificate => "lumberjack.cert" port => 5000 } }' This sample command sends five events to mydownstreamhost:5000 using the SSL certificate provided.
Start the downstream Logstash instance: Start the downstream instance of Logstash:
bin/logstash -e 'input { beats { codec => json port => 5000 ssl => true ssl_certificate => "lumberjack.cert" ssl_key => "lumberjack.key"} }' This sample command sets port 5000 to listen for incoming Beats input.
Verify the communication: Watch the downstream Logstash machine for the incoming events. You should see five incrementing events
Configuring Centralized Pipeline Management: This comes with X-PACK
Field References DeepDive: It is often useful to be able to refer to a field or collection of fields by name. To do this, you can use the Logstash field reference syntax.
The syntax to access a field specifies the entire path to the field, with each fragment wrapped in square brackets.
Field References can be expressed literally within Conditional statements in your pipeline configurations, as string arguments to your pipeline plugins, or within sprintf statements that will be used by your pipeline plugins:
filter {
if [@metadata][date] and [@metadata][time] { mutate { add_field { "[@metadata][timestamp]" => "%{[@metadata][date]} %{[@metadata][time]}" # | | | | | | | | # +----string-argument---+ | +--field-ref----+ +--field-ref----+ | # +-------- sprintf format string ----------+ } } } }
Field Reference Literal: A Field Reference Literal is a sequence of one or more Path Fragments that can be used directly in Logstash pipeline conditionals without any additional quoting (e.g. [request], [response][status]).
Field Reference(Event APIs): The Event API’s methods for manipulating the fields of an event or using the sprintf syntax are more flexible than the pipeline grammar in what they accept as a Field Reference. Top-level fields can be referenced directly by their Field Name without the square brackets, and there is some support for Composite Field References, simplifying use of programmatically-generated Field References.
A Field Reference for use with the Event API is therefore one of:
a single Field Reference Literal; OR a single Field Name (referencing a top-level field); OR a single Composite Field Reference.
PathFragment: A Path Fragment is a Field Name wrapped in square brackets (e.g., [request]). pathFragment : '[' fieldName ']' ; FieldName: A Field Name is a sequence of characters that are not square brackets ([ or ]). fieldName : ( ~( '[' | ']' ) )+ ;
Composite Field Reference: In some cases, it may be necessary to programmatically compose a Field Reference from one or more Field References, such as when manipulating fields in a plugin or while using the Ruby Filter plugin and the Event API.
fieldReference = "[path][to][deep nested field]" compositeFieldReference = "[@metadata][#{fieldReference}][size]" # => "[@metadata][[path][to][deep nested field]][size]"
" Canonical Representations of Composite Field Referencesedit Acceptable Composite Field Reference Canonical Field Reference Representation +[[deep][nesting]][field]+
+[deep][nesting][field]+
+[foo]bar[bingo]+
+[foo][bar][bingo]+
+ok+
+[ok]+
A Composite Field Reference is a sequence of one or more Path Fragments or Embedded Field References.
compositeFieldReference : ( pathFragment | embeddedFieldReference )+ ; Composite Field References are supported by the Event API, but are not supported as literals in the Pipeline Configuration.
Embedded Field Referenceedit embeddedFieldReference : '[' fieldReference ']' ; An Embedded Field Reference is a Field Reference that is itself wrapped in square brackets ([ and ]), and can be a component of a Composite Field Reference.
Managing Logstash: Logstash provides configuration management features to make it easier for you to manage updates to your configuration over time.
The topics in this section describe Logstash configuration management features only. For information about other config management tools, such as Puppet and Chef, see the documentation for those projects. Also take a look at the Logstash Puppet module documentation.
Centralized Pipeline Management: The pipeline management feature centralizes the creation and management of Logstash configuration pipelines in Kibana. This is X-PACK feature. You can control multiple Logstash instances from the pipeline management UI in Kibana. You can add, edit, and delete pipeline configurations. On the Logstash side, you simply need to enable configuration management and register Logstash to use the centrally managed pipeline configurations.
Pipeline Behaviour: The pipeline configurations and metadata are stored in Elasticsearch. Any changes that you make to a pipeline definition are picked up and loaded automatically by all Logstash instances registered to use the pipeline. The changes are applied immediately. If Logstash is registered to use the pipeline, you do not have to restart Logstash to pick up the changes. The pipeline runs on all Logstash instances that are registered to use the pipeline. Kibana saves the new configuration, and Logstash will attempt to load it. There is no validation done at the UI level. You need to check the local Logstash logs for configuration errors. If you’re using the Logstash monitoring feature in X-Pack, use the Monitoring tab to check the status of your Logstash nodes. You can specify multiple pipeline configurations that run in parallel on the same Logstash node. If you edit and save a pipeline configuration, Logstash reloads the configuration in the background and continues processing events. If you try to delete a pipeline that is running (for example, apache) in Kibana, Logstash will attempt to stop the pipeline. Logstash waits until all events have been fully processed by the pipeline. Before you delete a pipeline, make sure you understand your data sources. Stopping a pipeline may lead to data loss.
Logstash modules provide a quick, end-to-end solution for ingesting data and visualizing it with purpose-built dashboards.
These modules are available:
Elastic Cloud ArcSight Module Netflow Module (deprecated) Microsoft Azure Module (deprecated) Each module comes pre-packaged with Logstash configurations, Kibana dashboards, and other meta files that make it easier for you to set up the Elastic Stack for specific use cases or data sources.
You can think of modules as providing three essential functions that make it easier for you to get started. When you run a module, it will:
Create the Elasticsearch index. Set up the Kibana dashboards, including the index pattern, searches, and visualizations required to visualize your data in Kibana. Run the Logstash pipeline with the configurations required to read and parse the data.
Running Modules: To run a module and set up dashboards, you specify the following options:
bin/logstash --modules MODULE_NAME --setup [-M "CONFIG_SETTING=VALUE"] Where:
--modules runs the Logstash module specified by MODULE_NAME. -M "CONFIG_SETTING=VALUE" is optional and overrides the specified configuration setting. You can specify multiple overrides. Each override must start with -M. See Specify module settings at the command line for more info. --setup creates an index pattern in Elasticsearch and imports Kibana dashboards and visualizations. Running --setup is a one-time setup step. Omit this option for subsequent runs of the module to avoid overwriting existing Kibana dashboards. For example, the following command runs the Netflow module with the default settings, and sets up the netflow index pattern and dashboards:
bin/logstash --modules netflow --setup The following command runs the Netflow module and overrides the Elasticsearch host setting. Here it’s assumed that you’ve already run the setup step.
bin/logstash --modules netflow -M "netflow.var.elasticsearch.host=es.mycloud.com" Configuring modulesedit To configure a module, you can either specify configuration settings in the logstash.yml settings file, or use command-line overrides to specify settings at the command line.
Specify module settings in logstash.ymledit To specify module settings in the logstash.yml settings file file, you add a module definition to the modules array. Each module definition begins with a dash (-) and is followed by name: module_name then a series of name/value pairs that specify module settings. For example:
modules:
- name: netflow var.elasticsearch.hosts: "es.mycloud.com" var.elasticsearch.username: "foo" var.elasticsearch.password: "password" var.kibana.host: "kb.mycloud.com" var.kibana.username: "foo" var.kibana.password: "password" var.input.tcp.port: 5606
Any settings defined in the command line are ephemeral and will not persist across subsequent runs of Logstash. If you want to persist a configuration, you need to set it in the logstash.yml settings file.
Settings that you specify at the command line are merged with any settings specified in the logstash.yml file. If an option is set in both places, the value specified at the command line takes precedence.
Logstash Arcsight Module: The Logstash ArcSight module understands CEF (Common Event Format), and can accept, enrich, and index these events for analysis on the Elastic Stack. ADP contains two core data collection components for data streaming:
The Smart Connectors (SC) are edge log collectors that parse and normalize data to CEF prior to publishing to the Logstash receiver. The Event Broker is the central hub for incoming data and is based on open source Apache Kafka. The Logstash ArcSight module can consume directly from Event Broker topics.
See documentation for more details.
Logstash Netflow Module: Azure Module:
Working with FileBeat Module: Filebeat comes packaged with pre-built modules that contain the configurations needed to collect, parse, enrich, and visualize data from various log file formats. Each Filebeat module consists of one or more filesets that contain ingest node pipelines, Elasticsearch templates, Filebeat input configurations, and Kibana dashboards.
Use ingest pipelines for parsing: When you use Filebeat modules with Logstash, you can use the ingest pipelines provided by Filebeat to parse the data. You need to load the pipelines into Elasticsearch and configure Logstash to use them.
On the system where Filebeat is installed, run the setup command with the --pipelines option specified to load ingest pipelines for specific modules. For example, the following command loads ingest pipelines for the system and nginx modules:
filebeat setup --pipelines --modules nginx,system A connection to Elasticsearch is required for this setup step because Filebeat needs to load the ingest pipelines into Elasticsearch. If necessary, you can temporarily disable your configured output and enable the Elasticsearch output before running the command.
To configure Logstash to use the pipelines:
On the system where Logstash is installed, create a Logstash pipeline configuration that reads from a Logstash input, such as Beats or Kafka, and sends events to an Elasticsearch output. Set the pipeline option in the Elasticsearch output to %{[@metadata][pipeline]} to use the ingest pipelines that you loaded previously.
Here’s an example configuration that reads data from the Beats input and uses Filebeat ingest pipelines to parse data collected by modules: input { beats { port => 5044 } }
output { if [@metadata][pipeline] { elasticsearch { hosts => "https://061ab24010a2482e9d64729fdb0fd93a.us-east-1.aws.found.io:9243" manage_template => false index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}" pipeline => "%{[@metadata][pipeline]}" user => "elastic" password => "secret" } } else { elasticsearch { hosts => "https://061ab24010a2482e9d64729fdb0fd93a.us-east-1.aws.found.io:9243" manage_template => false index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}" user => "elastic" password => "secret" } } }
Use Logstash Pipelines for Parsing: The examples in this section show how to build Logstash pipeline configurations that replace the ingest pipelines provided with Filebeat modules. The pipelines take the data collected by Filebeat modules, parse it into fields expected by the Filebeat index, and send the fields to Elasticsearch so that you can visualize the data in the pre-built dashboards provided by Filebeat.
This approach is more time consuming than using the existing ingest pipelines to parse the data, but it gives you more control over how the data is processed. By writing your own pipeline configurations, you can do additional processing, such as dropping fields, after the fields are extracted, or you can move your load from Elasticsearch ingest nodes to Logstash nodes.
Before deciding to replaced the ingest pipelines with Logstash configurations, read Use ingest pipelines for parsing.
Here are some examples that show how to implement Logstash configurations to replace ingest pipelines:
Apache 2 Logs MySQL Logs Nginx Logs System Logs
Example: Set up Filebeat modules to work with Kafka and Logstash: See documentation.
Data Resiliency: As data flows through the event processing pipeline, Logstash may encounter situations that prevent it from delivering events to the configured output. For example, the data might contain unexpected data types, or Logstash might terminate abnormally.
To guard against data loss and ensure that events flow through the pipeline without interruption, Logstash provides the following data resiliency features.
Persistent Queues protect against data loss by storing events in an internal queue on disk. Dead Letter Queues (DLQ) provide on-disk storage for events that Logstash is unable to process. You can easily reprocess events in the dead letter queue by using the dead_letter_queue input plugin. These resiliency features are disabled by default. To turn on these features, you must explicitly enable them in the Logstash settings file.
PersistentQueue: By default, Logstash uses in-memory bounded queues between pipeline stages (inputs → pipeline workers) to buffer events. The size of these in-memory queues is fixed and not configurable. If Logstash experiences a temporary machine failure, the contents of the in-memory queue will be lost. Temporary machine failures are scenarios where Logstash or its host machine are terminated abnormally but are capable of being restarted.
In order to protect against data loss during abnormal termination, Logstash has a persistent queue feature which will store the message queue on disk. Persistent queues provide durability of data within Logstash.
Persistent queues are also useful for Logstash deployments that need large buffers. Instead of deploying and managing a message broker, such as Redis, RabbitMQ, or Apache Kafka, to facilitate a buffered publish-subscriber model, you can enable persistent queues to buffer events on disk and remove the message broker.
In summary, the benefits of enabling persistent queues are as follows:
Absorbs bursts of events without needing an external buffering mechanism like Redis or Apache Kafka. Provides an at-least-once delivery guarantee against message loss during a normal shutdown as well as when Logstash is terminated abnormally. If Logstash is restarted while events are in-flight, Logstash will attempt to deliver messages stored in the persistent queue until delivery succeeds at least once.
You must set queue.checkpoint.writes: 1 explicitly to guarantee maximum durability for all input events. See Controlling Durability.
Limitations of Persistent Queuesedit The following are problems not solved by the persistent queue feature:
Input plugins that do not use a request-response protocol cannot be protected from data loss. For example: tcp, udp, zeromq push+pull, and many other inputs do not have a mechanism to acknowledge receipt to the sender. Plugins such as beats and http, which do have an acknowledgement capability, are well protected by this queue. It does not handle permanent machine failures such as disk corruption, disk failure, and machine loss. The data persisted to disk is not replicated. How Persistent Queues Workedit The queue sits between the input and filter stages in the same process:
input → queue → filter + output
When an input has events ready to process, it writes them to the queue. When the write to the queue is successful, the input can send an acknowledgement to its data source.
When processing events from the queue, Logstash acknowledges events as completed, within the queue, only after filters and outputs have completed. The queue keeps a record of events that have been processed by the pipeline. An event is recorded as processed (in this document, called "acknowledged" or "ACKed") if, and only if, the event has been processed completely by the Logstash pipeline.
What does acknowledged mean? This means the event has been handled by all configured filters and outputs. For example, if you have only one output, Elasticsearch, an event is ACKed when the Elasticsearch output has successfully sent this event to Elasticsearch.
During a normal shutdown (CTRL+C or SIGTERM), Logstash will stop reading from the queue and will finish processing the in-flight events being processed by the filters and outputs. Upon restart, Logstash will resume processing the events in the persistent queue as well as accepting new events from inputs.
If Logstash is abnormally terminated, any in-flight events will not have been ACKed and will be reprocessed by filters and outputs when Logstash is restarted. Logstash processes events in batches, so it is possible that for any given batch, some of that batch may have been successfully completed, but not recorded as ACKed, when an abnormal termination occurs.
For more details specific behaviors of queue writes and acknowledgement, see Controlling Durability.
Configuring Persistent Queuesedit To configure persistent queues, you can specify the following options in the Logstash settings file:
queue.type: Specify persisted to enable persistent queues. By default, persistent queues are disabled (default: queue.type: memory). path.queue: The directory path where the data files will be stored. By default, the files are stored in path.data/queue. queue.page_capacity: The maximum size of a queue page in bytes. The queue data consists of append-only files called "pages". The default size is 64mb. Changing this value is unlikely to have performance benefits. queue.drain: Specify true if you want Logstash to wait until the persistent queue is drained before shutting down. The amount of time it takes to drain the queue depends on the number of events that have accumulated in the queue. Therefore, you should avoid using this setting unless the queue, even when full, is relatively small and can be drained quickly. queue.max_events: The maximum number of events that are allowed in the queue. The default is 0 (unlimited). queue.max_bytes: The total capacity of the queue in number of bytes. The default is 1024mb (1gb). Make sure the capacity of your disk drive is greater than the value you specify here.
If you are using persistent queues to protect against data loss, but don’t require much buffering, you can set queue.max_bytes to a smaller value, such as 10mb, to produce smaller queues and improve queue performance.
If both queue.max_events and queue.max_bytes are specified, Logstash uses whichever criteria is reached first. See Handling Back Pressure for behavior when these queue limits are reached.
You can also control when the checkpoint file gets updated by setting queue.checkpoint.writes. See Controlling Durability.
Example configuration:
queue.type: persisted queue.max_bytes: 4gb Handling Back Pressureedit When the queue is full, Logstash puts back pressure on the inputs to stall data flowing into Logstash. This mechanism helps Logstash control the rate of data flow at the input stage without overwhelming outputs like Elasticsearch.
Use queue.max_bytes setting to configure the total capacity of the queue on disk. The following example sets the total capacity of the queue to 8gb:
queue.type: persisted queue.max_bytes: 8gb With these settings specified, Logstash will buffer events on disk until the size of the queue reaches 8gb. When the queue is full of unACKed events, and the size limit has been reached, Logstash will no longer accept new events.
Each input handles back pressure independently. For example, when the beats input encounters back pressure, it no longer accepts new connections and waits until the persistent queue has space to accept more events. After the filter and output stages finish processing existing events in the queue and ACKs them, Logstash automatically starts accepting new events.
Controlling Durability: Durability is a property of storage writes that ensures data will be available after it’s written.
When the persistent queue feature is enabled, Logstash will store events on disk. Logstash commits to disk in a mechanism called checkpointing.
To discuss durability, we need to introduce a few details about how the persistent queue is implemented.
First, the queue itself is a set of pages. There are two kinds of pages: head pages and tail pages. The head page is where new events are written. There is only one head page. When the head page is of a certain size (see queue.page_capacity), it becomes a tail page, and a new head page is created. Tail pages are immutable, and the head page is append-only. Second, the queue records details about itself (pages, acknowledgements, etc) in a separate file called a checkpoint file.
When recording a checkpoint, Logstash will:
Call fsync on the head page. Atomically write to disk the current state of the queue. The process of checkpointing is atomic, which means any update to the file is saved if successful.
If Logstash is terminated, or if there is a hardware-level failure, any data that is buffered in the persistent queue, but not yet checkpointed, is lost.
You can force Logstash to checkpoint more frequently by setting queue.checkpoint.writes. This setting specifies the maximum number of events that may be written to disk before forcing a checkpoint. The default is 1024. To ensure maximum durability and avoid losing data in the persistent queue, you can set queue.checkpoint.writes: 1 to force a checkpoint after each event is written. Keep in mind that disk writes have a resource cost. Setting this value to 1 can severely impact performance.
Disk Garbage Collectionedit On disk, the queue is stored as a set of pages where each page is one file. Each page can be at most queue.page_capacity in size. Pages are deleted (garbage collected) after all events in that page have been ACKed. If an older page has at least one event that is not yet ACKed, that entire page will remain on disk until all events in that page are successfully processed. Each page containing unprocessed events will count against the queue.max_bytes byte size.
Durability is a property of storage writes that ensures data will be available after it’s written.
When the persistent queue feature is enabled, Logstash will store events on disk. Logstash commits to disk in a mechanism called checkpointing.
To discuss durability, we need to introduce a few details about how the persistent queue is implemented.
First, the queue itself is a set of pages. There are two kinds of pages: head pages and tail pages. The head page is where new events are written. There is only one head page. When the head page is of a certain size (see queue.page_capacity), it becomes a tail page, and a new head page is created. Tail pages are immutable, and the head page is append-only. Second, the queue records details about itself (pages, acknowledgements, etc) in a separate file called a checkpoint file.
When recording a checkpoint, Logstash will:
Call fsync on the head page. Atomically write to disk the current state of the queue. The process of checkpointing is atomic, which means any update to the file is saved if successful.
If Logstash is terminated, or if there is a hardware-level failure, any data that is buffered in the persistent queue, but not yet checkpointed, is lost.
You can force Logstash to checkpoint more frequently by setting queue.checkpoint.writes. This setting specifies the maximum number of events that may be written to disk before forcing a checkpoint. The default is 1024. To ensure maximum durability and avoid losing data in the persistent queue, you can set queue.checkpoint.writes: 1 to force a checkpoint after each event is written. Keep in mind that disk writes have a resource cost. Setting this value to 1 can severely impact performance.
Disk Garbage Collectionedit On disk, the queue is stored as a set of pages where each page is one file. Each page can be at most queue.page_capacity in size. Pages are deleted (garbage collected) after all events in that page have been ACKed. If an older page has at least one event that is not yet ACKed, that entire page will remain on disk until all events in that page are successfully processed. Each page containing unprocessed events will count against the queue.max_bytes byte size.
Elasticsearch processing and the dead letter queueedit HTTP request failure. If the HTTP request fails (because Elasticsearch is unreachable or because it returned an HTTP error code), the Elasticsearch output retries the entire request indefinitely. In these scenarios, the dead letter queue has no opportunity to intercept.
HTTP request success. The Elasticsearch Bulk API can perform multiple actions using the same request. If the Bulk API request is successful, it returns 200 OK, even if some documents in the batch have failed. In this situation, the errors flag for the request will be true.
The response body can include metadata indicating that one or more specific actions in the bulk request could not be performed, along with an HTTP-style status code per entry to indicate why the action could not be performed. If the DLQ is configured, individual indexing failures are routed there.
Configuring Logstash to use dead letter queuesedit Dead letter queues are disabled by default. To enable dead letter queues, set the dead_letter_queue_enable option in the logstash.yml settings file:
dead_letter_queue.enable: true Dead letter queues are stored as files in the local directory of the Logstash instance. By default, the dead letter queue files are stored in path.data/dead_letter_queue. Each pipeline has a separate queue. For example, the dead letter queue for the main pipeline is stored in LOGSTASH_HOME/data/dead_letter_queue/main by default. The queue files are numbered sequentially: 1.log, 2.log, and so on.
You can set path.dead_letter_queue in the logstash.yml file to specify a different path for the files:
path.dead_letter_queue: "path/to/data/dead_letter_queue" You may not use the same dead_letter_queue path for two different Logstash instances.
File rotation: Dead letter queues have a built-in file rotation policy that manages the file size of the queue. When the file size reaches a preconfigured threshold, a new file is created automatically.
By default, the maximum size of each dead letter queue is set to 1024mb. To change this setting, use the dead_letter_queue.max_bytes option. Entries will be dropped if they would increase the size of the dead letter queue beyond this setting.
Processing events in the dead letter queue: When you are ready to process events in the dead letter queue, you create a pipeline that uses the dead_letter_queue input plugin to read from the dead letter queue. The pipeline configuration that you use depends, of course, on what you need to do. For example, if the dead letter queue contains events that resulted from a mapping error in Elasticsearch, you can create a pipeline that reads the "dead" events, removes the field that caused the mapping issue, and re-indexes the clean events into Elasticsearch.
The following example shows a simple pipeline that reads events from the dead letter queue and writes the events, including metadata, to standard output:
input { dead_letter_queue { path => "/path/to/data/dead_letter_queue" commit_offsets => true pipeline_id => "main" } }
output { stdout { codec => rubydebug { metadata => true } } }
The path to the top-level directory containing the dead letter queue. This directory contains a separate folder for each pipeline that writes to the dead letter queue. To find the path to this directory, look at the logstash.yml settings file. By default, Logstash creates the dead_letter_queue directory under the location used for persistent storage (path.data), for example, LOGSTASH_HOME/data/dead_letter_queue. However, if path.dead_letter_queue is set, it uses that location instead.
When true, saves the offset. When the pipeline restarts, it will continue reading from the position where it left off rather than reprocessing all the items in the queue. You can set commit_offsets to false when you are exploring events in the dead letter queue and want to iterate over the events multiple times.
The ID of the pipeline that’s writing to the dead letter queue. The default is "main". When the pipeline has finished processing all the events in the dead letter queue, it will continue to run and process new events as they stream into the queue. This means that you do not need to stop your production system to handle events in the dead letter queue.
Events emitted from the dead_letter_queue input plugin plugin will not be resubmitted to the dead letter queue if they cannot be processed correctly.
Reading from a timestamp: When you read from the dead letter queue, you might not want to process all the events in the queue, especially if there are a lot of old events in the queue. You can start processing events at a specific point in the queue by using the start_timestamp option. This option configures the pipeline to start processing events based on the timestamp of when they entered the queue:
input { dead_letter_queue { path => "/path/to/data/dead_letter_queue" start_timestamp => "2017-06-06T23:40:37" pipeline_id => "main" } } For this example, the pipeline starts reading all events that were delivered to the dead letter queue on or after June 6, 2017, at 23:40:37.
Processing Data that has mapping errors: the user attempts to index a document that includes geo_ip data, but the data cannot be processed because it contains a mapping error:
{"geoip":{"location":"home"}} Indexing fails because the Logstash output plugin expects a geo_point object in the location field, but the value is a string. The failed event is written to the dead letter queue, along with metadata about the error that caused the failure:
To process the failed event, you create the following pipeline that reads from the dead letter queue and removes the mapping problem:
input { dead_letter_queue { path => "/path/to/data/dead_letter_queue/" } } filter { mutate { remove_field => "[geoip][location]" } } output { elasticsearch{ hosts => [ "localhost:9200" ] } }
The dead_letter_queue input reads from the dead letter queue. The mutate filter removes the problem field called location. The clean event is sent to Elasticsearch, where it can be indexed because the mapping issue is resolved
Transforming Data: With over 200 plugins in the Logstash plugin ecosystem, it’s sometimes challenging to choose the best plugin to meet your data processing needs. In this section, we’ve collected a list of popular plugins and organized them according to their processing capabilities:
Performing Core Operations Deserializing Data Extracting Fields and Wrangling Data Enriching Data with Lookups Also see Filter plugins and Codec plugins for the full list of available data processing plugins.
Performing Core Operations: The plugins described in this section are useful for core operations, such as mutating and dropping events.
date filter Parses dates from fields to use as Logstash timestamps for events.
The following config parses a field called logdate to set the Logstash timestamp:
filter { date { match => [ "logdate", "MMM dd yyyy HH:mm:ss" ] } } drop filter Drops events. This filter is typically used in combination with conditionals.
The following config drops debug level log messages:
filter { if [loglevel] == "debug" { drop { } } } fingerprint filter Fingerprints fields by applying a consistent hash.
The following config fingerprints the IP, @timestamp, and message fields and adds the hash to a metadata field called generated_id:
filter { fingerprint { source => ["IP", "@timestamp", "message"] method => "SHA1" key => "0123" target => "[@metadata][generated_id]" } } mutate filter Performs general mutations on fields. You can rename, remove, replace, and modify fields in your events.
The following config renames the HOSTORIP field to client_ip:
filter { mutate { rename => { "HOSTORIP" => "client_ip" } } } The following config strips leading and trailing whitespace from the specified fields:
filter { mutate { strip => ["field1", "field2"] } } ruby filter Executes Ruby code.
The following config executes Ruby code that cancels 90% of the events:
filter { ruby { code => "event.cancel if rand <= 0.90" } }
Deserializing Data: The plugins described in this section are useful for deserializing data into Logstash events.
avro codec Reads serialized Avro records as Logstash events. This plugin deserializes individual Avro records. It is not for reading Avro files. Avro files have a unique format that must be handled upon input.
The following config deserializes input from Kafka:
input { kafka { codec => { avro => { schema_uri => "/tmp/schema.avsc" } } } } ... csv filter Parses comma-separated value data into individual fields. By default, the filter autogenerates field names (column1, column2, and so on), or you can specify a list of names. You can also change the column separator.
The following config parses CSV data into the field names specified in the columns field:
filter { csv { separator => "," columns => [ "Transaction Number", "Date", "Description", "Amount Debit", "Amount Credit", "Balance" ] } } fluent codec Reads the Fluentd msgpack schema.
The following config decodes logs received from fluent-logger-ruby:
input { tcp { codec => fluent port => 4000 } } json codec Decodes (via inputs) and encodes (via outputs) JSON formatted content, creating one event per element in a JSON array.
The following config decodes the JSON formatted content in a file:
input { file { path => "/path/to/myfile.json" codec =>"json" } protobuf codec Reads protobuf encoded messages and converts them to Logstash events. Requires the protobuf definitions to be compiled as Ruby files. You can compile them by using the ruby-protoc compiler.
The following config decodes events from a Kafka stream:
input kafka { zk_connect => "127.0.0.1" topic_id => "your_topic_goes_here" codec => protobuf { class_name => "Animal::Unicorn" include_path => ['/path/to/protobuf/definitions/UnicornProtobuf.pb.rb'] } } } xml filter Parses XML into fields.
The following config parses the whole XML document stored in the message field:
filter { xml { source => "message" } }
Extracting Fields and Wrangling Dataedit The plugins described in this section are useful for extracting fields and parsing unstructured data into fields.
dissect filter Extracts unstructured event data into fields by using delimiters. The dissect filter does not use regular expressions and is very fast. However, if the structure of the data varies from line to line, the grok filter is more suitable.
For example, let’s say you have a log that contains the following message:
Apr 26 12:20:02 localhost systemd[1]: Starting system activity accounting tool... The following config dissects the message:
filter { dissect { mapping => { "message" => "%{ts} %{+ts} %{+ts} %{src} %{prog}[%{pid}]: %{msg}" } } }
kv filter Parses key-value pairs.
For example, let’s say you have a log message that contains the following key-value pairs:
ip=1.2.3.4 error=REFUSED The following config parses the key-value pairs into fields:
filter { kv { } } After the filter is applied, the event in the example will have these fields:
ip: 1.2.3.4 error: REFUSED grok filter Parses unstructured event data into fields. This tool is perfect for syslog logs, Apache and other webserver logs, MySQL logs, and in general, any log format that is generally written for humans and not computer consumption. Grok works by combining text patterns into something that matches your logs.
For example, let’s say you have an HTTP request log that contains the following message:
55.3.244.1 GET /index.html 15824 0.043 The following config parses the message into fields:
filter { grok { match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" } } }
Enriching Data with Lookups: These plugins can help you enrich data with additional info, such as GeoIP and user agent info:
dns filter elasticsearch filter geoip filter http filter jdbc_static filter jdbc_streaming filter memcached filter translate filter useragent filter Lookup pluginsedit dns filter The dns filter plugin performs a standard or reverse DNS lookup.
The following config performs a reverse lookup on the address in the source_host field and replaces it with the domain name:
filter { dns { reverse => [ "source_host" ] action => "replace" } } elasticsearch filter The elasticsearch filter copies fields from previous log events in Elasticsearch to current events.
The following config shows a complete example of how this filter might be used. Whenever Logstash receives an "end" event, it uses this Elasticsearch filter to find the matching "start" event based on some operation identifier. Then it copies the @timestamp field from the "start" event into a new field on the "end" event. Finally, using a combination of the date filter and the ruby filter, the code in the example calculates the time duration in hours between the two events.
if [type] == "end" {
elasticsearch {
hosts => ["es-server"]
query => "type:start AND operation:%{[opid]}"
fields => { "@timestamp" => "started" }
}
date {
match => ["[started]", "ISO8601"]
target => "[started]"
}
ruby {
code => 'event.set("duration_hrs", (event.get("@timestamp") - event.get("started")) / 3600) rescue nil'
}
}
geoip filter The geoip filter adds geographical information about the location of IP addresses. For example:
filter { geoip { source => "clientip" } } After the geoip filter is applied, the event will be enriched with geoip fields. For example:
filter { geoip { source => "clientip" } } http filter The http filter integrates with external web services/REST APIs, and enables lookup enrichment against any HTTP service or endpoint. This plugin is well suited for many enrichment use cases, such as social APIs, sentiment APIs, security feed APIs, and business service APIs. jdbc_static filter The jdbc_static filter enriches events with data pre-loaded from a remote database.
The following example fetches data from a remote database, caches it in a local database, and uses lookups to enrich events with data cached in the local database.
ueries an external database to fetch the dataset that will be cached locally.
Defines the columns, types, and indexes used to build the local database structure. The column names and types should match the external database.
Performs lookup queries on the local database to enrich the events.
Specifies the event field that will store the looked-up data. If the lookup returns multiple columns, the data is stored as a JSON object within the field.
Takes data from the JSON object and stores it in top-level event fields for easier analysis in Kibana.
jdbc_streaming filter The jdbc_streaming filter enriches events with database data.
The following example executes a SQL query and stores the result set in a field called country_details: memcached filter The memcached filter enables key/value lookup enrichment against a Memcached object caching system. It supports both read (GET) and write (SET) operations. It is a notable addition for security analytics use cases. translate filter The translate filter replaces field contents based on replacement values specified in a hash or file. Currently supports these file types: YAML, JSON, and CSV.
The following example takes the value of the response_code field, translates it to a description based on the values specified in the dictionary, and then removes the response_code field from the event:
useragent filter The useragent filter parses user agent strings into fields.
The following example takes the user agent string in the agent field, parses it into user agent fields, and adds the user agent fields to a new field called user_agent. It also removes the original agent field: After the filter is applied, the event will be enriched with user agent fields. For example:
Deploying and Scaling Logstash: The Elastic Stack is used for tons of use cases, from operational log and metrics analytics, to enterprise and application search. Making sure your data gets scalably, durably, and securely transported to Elasticsearch is extremely important, especially for mission critical environments.
The goal of this document is to highlight the most common architecture patterns for Logstash and how to effectively scale as your deployment grows. The focus will be around the operational log, metrics, and security analytics use cases because they tend to require larger scale deployments. The deploying and scaling recommendations provided here may vary based on your own requirements.
Getting Startededit For first time users, if you simply want to tail a log file to grasp the power of the Elastic Stack, we recommend trying Filebeat Modules. Filebeat Modules enable you to quickly collect, parse, and index popular log types and view pre-built Kibana dashboards within minutes. Metricbeat Modules provide a similar experience, but with metrics data. In this context, Beats will ship data directly to Elasticsearch where Ingest Nodes will process and index your data.
Introduction to Logstash: What are the main benefits for integrating Logstash into your architecture?
Scale through ingestion spikes - Logstash has an adaptive disk-based buffering system that will absorb incoming throughput, therefore mitigating backpressure Ingest from other data sources like databases, S3, or messaging queues Emit data to multiple destinations like S3, HDFS, or write to a file Compose more sophisticated processing pipelines with conditional dataflow logic Scaling Ingestedit Beats and Logstash make ingest awesome. Together, they provide a comprehensive solution that is scalable and resilient. What can you expect?
Horizontal scalability, high availability, and variable load handling Message durability with at-least-once delivery guarantees End-to-end secure transport with authentication and wire encryption Beats and Logstashedit Beats run across thousands of edge host servers, collecting, tailing, and shipping logs to Logstash. Logstash serves as the centralized streaming engine for data unification and enrichment. The Beats input plugin exposes a secure, acknowledgement-based endpoint for Beats to send data to Logstash.
Enabling persistent queues is strongly recommended, and these architecture characteristics assume that they are enabled. We encourage you to review the Persistent Queues documentation for feature benefits and more details on resiliency.
Scalabilityedit Logstash is horizontally scalable and can form groups of nodes running the same pipeline. Logstash’s adaptive buffering capabilities will facilitate smooth streaming even through variable throughput loads. If the Logstash layer becomes an ingestion bottleneck, simply add more nodes to scale out. Here are a few general recommendations:
Beats should load balance across a group of Logstash nodes. A minimum of two Logstash nodes are recommended for high availability. It’s common to deploy just one Beats input per Logstash node, but multiple Beats inputs can also be deployed per Logstash node to expose independent endpoints for different data sources. Resiliencyedit When using Filebeat or Winlogbeat for log collection within this ingest flow, at-least-once delivery is guaranteed. Both the communication protocols, from Filebeat or Winlogbeat to Logstash, and from Logstash to Elasticsearch, are synchronous and support acknowledgements. The other Beats don’t yet have support for acknowledgements.
Logstash persistent queues provide protection across node failures. For disk-level resiliency in Logstash, it’s important to ensure disk redundancy. For on-premise deployments, it’s recommended that you configure RAID. When running in the cloud or a containerized environment, it’s recommended that you use persistent disks with replication strategies that reflect your data SLAs.
Make sure queue.checkpoint.writes: 1 is set for at-least-once guarantees. For more details, see the persistent queue durability documentation.
Processingedit Logstash will commonly extract fields with grok or dissect, augment geographical info, and can further enrich events with file, database, or Elasticsearch lookup datasets. Be aware that processing complexity can affect overall throughput and CPU utilization. Make sure to check out the other available filter plugins.
Secure Transportedit Enterprise-grade security is available across the entire delivery chain.
Wire encryption is recommended for both the transport from Beats to Logstash and from Logstash to Elasticsearch. There’s a wealth of security options when communicating with Elasticsearch including basic authentication, TLS, PKI, LDAP, AD, and other custom realms. To enable Elasticsearch security, see Secure a cluster. Monitoringedit When running Logstash 5.2 or greater, the Monitoring UI provides deep visibility into your deployment metrics, helping observe performance and alleviate bottlenecks as you scale. Monitoring is an X-Pack feature under the Basic License and is therefore free to use. To get started, see Monitoring Logstash.
If external monitoring is preferred, there are Monitoring APIs that return point-in-time metrics snapshots.
Adding Other Popular Sourcesedit Users may have other mechanisms of collecting logging data, and it’s easy to integrate and centralize them into the Elastic Stack. Let’s walk through a few scenarios:
TCP, UDP and HTTP Protocols: The TCP, UDP, and HTTP protocols are common ways to feed data into Logstash. Logstash can expose endpoint listeners with the respective TCP, UDP, and HTTP input plugins. The data sources enumerated below are typically ingested through one of these three protocols.
The TCP and UDP protocols do not support application-level acknowledgements, so connectivity issues may result in data loss.
For high availability scenarios, a third-party hardware or software load balancer, like HAProxy, should be added to fan out traffic to a group of Logstash nodes.
Network and Security Dataedit Although Beats may already satisfy your data ingest use case, network and security datasets come in a variety of forms. Let’s touch on a few other ingestion points.
Network wire data - collect and analyze network traffic with Packetbeat. Netflow v5/v9/v10 - Logstash understands data from Netflow/IPFIX exporters with the Netflow codec. Nmap - Logstash accepts and parses Nmap XML data with the Nmap codec. SNMP trap - Logstash has a native SNMP trap input. CEF - Logstash accepts and parses CEF data from systems like Arcsight SmartConnectors with the CEF codec. See this blog series for more details. Centralized Syslog Serversedit Existing syslog server technologies like rsyslog and syslog-ng generally send syslog over to Logstash TCP or UDP endpoints for extraction, processing, and persistence. If the data format conforms to RFC3164, it can be fed directly to the Logstash syslog input.
Infrastructure & Application Data and IoTedit Infrastructure and application metrics can be collected with Metricbeat, but applications can also send webhooks to a Logstash HTTP input or have metrics polled from an HTTP endpoint with the HTTP poller input plugin.
For applications that log with log4j2, it’s recommended to use the SocketAppender to send JSON to the Logstash TCP input. Alternatively, log4j2 can also log to a file for collection with FIlebeat. Usage of the log4j1 SocketAppender is not recommended.
IoT devices like Raspberry Pis, smartphones, and connected vehicles often send telemetry data through one of these protocols.
Integrating with Messaging Queuesedit If you are leveraging message queuing technologies as part of your existing infrastructure, getting that data into the Elastic Stack is easy. For existing users who are utilizing an external queuing layer like Redis or RabbitMQ just for data buffering with Logstash, it’s recommended to use Logstash persistent queues instead of an external queuing layer. This will help with overall ease of management by removing an unnecessary layer of complexity in your ingest architecture.
For users who want to integrate data from existing Kafka deployments or require the underlying usage of ephemeral storage, Kafka can serve as a data hub where Beats can persist to and Logstash nodes can consume from. The other TCP, UDP, and HTTP sources can persist to Kafka with Logstash as a conduit to achieve high availability in lieu of a load balancer. A group of Logstash nodes can then consume from topics with the Kafka input to further transform and enrich the data in transit.
Resiliency and Recoveryedit When Logstash consumes from Kafka, persistent queues should be enabled and will add transport resiliency to mitigate the need for reprocessing during Logstash node failures. In this context, it’s recommended to use the default persistent queue disk allocation size queue.max_bytes: 1GB.
If Kafka is configured to retain data for an extended period of time, data can be reprocessed from Kafka in the case of disaster recovery and reconciliation.
Other Messaging Queue Integrationsedit Although an additional queuing layer is not required, Logstash can consume from a myriad of other message queuing technologies like RabbitMQ and Redis. It also supports ingestion from hosted queuing services like Pub/Sub, Kinesis, and SQS.
Performance Tuning: Performance Troubleshooting: Performance Checklist- Check the performance of input sources and output destinations: Check system statistics: CPU, Memory, I/O Utilization Check the JVM heap: Tune Logstash worker settings:
Tuning and Profiling Logstash Performance The Logstash defaults are chosen to provide fast, safe performance for most users. However if you notice performance issues, you may need to modify some of the defaults. Logstash provides the following configurable options for tuning pipeline performance: pipeline.workers, pipeline.batch.size, and pipeline.batch.delay. For more information about setting these options, see logstash.yml.
The Logstash defaults are chosen to provide fast, safe performance for most users. However if you notice performance issues, you may need to modify some of the defaults. Logstash provides the following configurable options for tuning pipeline performance: pipeline.workers, pipeline.batch.size, and pipeline.batch.delay. For more information about setting these options, see logstash.yml.
Make sure you’ve read the Performance Troubleshooting before modifying these options.
The pipeline.workers setting determines how many threads to run for filter and output processing. If you find that events are backing up, or that the CPU is not saturated, consider increasing the value of this parameter to make better use of available processing power. Good results can even be found increasing this number past the number of available processors as these threads may spend significant time in an I/O wait state when writing to external systems. Legal values for this parameter are positive integers. The pipeline.batch.size setting defines the maximum number of events an individual worker thread collects before attempting to execute filters and outputs. Larger batch sizes are generally more efficient, but increase memory overhead. Some hardware configurations require you to increase JVM heap space in the jvm.options config file to avoid performance degradation. (See Logstash Configuration Files for more info.) Values in excess of the optimum range cause performance degradation due to frequent garbage collection or JVM crashes related to out-of-memory exceptions. Output plugins can process each batch as a logical unit. The Elasticsearch output, for example, issues bulk requests for each batch received. Tuning the pipeline.batch.size setting adjusts the size of bulk requests sent to Elasticsearch. The pipeline.batch.delay setting rarely needs to be tuned. This setting adjusts the latency of the Logstash pipeline. Pipeline batch delay is the maximum amount of time in milliseconds that Logstash waits for new messages after receiving an event in the current pipeline worker thread. After this time elapses, Logstash begins to execute filters and outputs.The maximum time that Logstash waits between receiving an event and processing that event in a filter is the product of the pipeline.batch.delay and pipeline.batch.size settings. Notes on Pipeline Configuration and Performanceedit If you plan to modify the default pipeline settings, take into account the following suggestions:
The total number of inflight events is determined by the product of the pipeline.workers and pipeline.batch.size settings. This product is referred to as the inflight count. Keep the value of the inflight count in mind as you adjust the pipeline.workers and pipeline.batch.size settings. Pipelines that intermittently receive large events at irregular intervals require sufficient memory to handle these spikes. Set the JVM heap space accordingly in the jvm.options config file. (See Logstash Configuration Files for more info.) Measure each change to make sure it increases, rather than decreases, performance. Ensure that you leave enough memory available to cope with a sudden increase in event size. For example, an application that generates exceptions that are represented as large blobs of text. The number of workers may be set higher than the number of CPU cores since outputs often spend idle time in I/O wait conditions. Threads in Java have names and you can use the jstack, top, and the VisualVM graphical tools to figure out which resources a given thread uses. On Linux platforms, Logstash labels all the threads it can with something descriptive. For example, inputs show up as [base]<inputname, and pipeline workers show up as [base]>workerN, where N is an integer. Where possible, other threads are also labeled to help you identify their purpose. Profiling the Heapedit When tuning Logstash you may have to adjust the heap size. You can use the VisualVM tool to profile the heap. The Monitor pane in particular is useful for checking whether your heap allocation is sufficient for the current workload. The screenshots below show sample Monitor panes. The first pane examines a Logstash instance configured with too many inflight events. The second pane examines a Logstash instance configured with an appropriate amount of inflight events. Note that the specific batch sizes used here are most likely not applicable to your specific workload, as the memory demands of Logstash vary in large part based on the type of messages you are sending.
As long as the GC pattern is acceptable, heap sizes that occasionally increase to the maximum are acceptable. Such heap size spikes happen in response to a burst of large events passing through the pipeline. In general practice, maintain a gap between the used amount of heap memory and the maximum. This document is not a comprehensive guide to JVM GC tuning.
Monitoring Logstash: Use the Elastic Stack monitoring features to gain insight into the health of Logstash instances running in your environment.
Configuring Monitoring for Logstash: Make sure monitoring is enabled on your Elasticsearch cluster. Then configure one of these methods to collect Logstash metrics:
Metricbeat collection. Metricbeat collects monitoring data from your Logstash instance and sends it directly to your monitoring cluster. The benefit of Metricbeat collection is that the monitoring agent remains active even if the Logstash instance does not. Legacy collection (deprecated). Legacy collectors send monitoring data to your production cluster.
bin/logstash-plugin list bin/logstash-plugin list --verbose bin/logstash-plugin list 'namefragment' bin/logstash-plugin list --group output
Adding a plugin: bin/logstash-plugin install logstash-input-github Updating a plugin: Plugins have their own release cycles and are often released independently of Logstash’s core release cycle. Using the update subcommand you can get the latest version of the plugin
bin/logstash-plugin update bin/logstash-plugin update logstash-input-github
Removing a plugin: bin/logstash-plugin remove logstash-input-github
Advanced: Adding a locally built-in plugin In some cases, you may want to install plugins which are not yett released and not hosted on RubyGems.org. Logstash provides you the option to install a locally built plugin which is packaged as a ruby gem. Using a file location:
bin/logstash-plugin install /path/to/logstash-output-kafka-1.0.0.gem Advanced: Using --path.pluginsedit Using the Logstash --path.plugins flag, you can load a plugin source code located on your file system. Typically this is used by developers who are iterating on a custom plugin and want to test it before creating a ruby gem.
The path needs to be in a specific directory hierarchy: PATH/logstash/TYPE/NAME.rb, where TYPE is inputs filters, outputs or codecs and NAME is the name of the plugin.
bin/logstash --path.plugins /opt/shared/lib
You can now create your own Logstash plugin in seconds! The generate subcommand of bin/logstash-plugin creates the foundation for a new Logstash plugin with templatized files. It creates the correct directory structure, gemspec files, and dependencies so you can start adding custom code to process data with Logstash.
Example Usage
bin/logstash-plugin generate --type input --name xkcd --path ~/ws/elastic/plugins --type: Type of plugin - input, filter, output, or codec --name: Name for the new plugin --path: Directory path where the new plugin structure will be created. If not specified, it will be created in the current directory.
Offline Plugin Management: The Logstash plugin manager provides support for preparing offline plugin packs that you can use to install Logstash plugins on systems that don’t have Internet access.
This procedure requires a staging machine running Logstash that has access to a public or private Rubygems server. The staging machine downloads and packages all the files and dependencies required for offline installation.
Building Offline Plugin Packsedit An offline plugin pack is a compressed file that contains all the plugins your offline Logstash installation requires, along with the dependencies for those plugins.
To build an offline plugin pack:
Make sure all the plugins that you want to package are installed on the staging server and that the staging server can access the Internet. Run the bin/logstash-plugin prepare-offline-pack subcommand to package the plugins and dependencies:
bin/logstash-plugin prepare-offline-pack --output OUTPUT --overwrite [PLUGINS] where:
OUTPUT specifies the zip file where the compressed plugin pack will be written. The default file is /LOGSTASH_HOME/logstash-offline-plugins-7.9.2.zip. If you are using 5.2.x and 5.3.0, this location should be a zip file whose contents will be overwritten. [PLUGINS] specifies one or more plugins that you want to include in the pack. --overwrite specifies if you want to override an existing file at the location Examples:
bin/logstash-plugin prepare-offline-pack logstash-input-beats bin/logstash-plugin prepare-offline-pack logstash-filter-* bin/logstash-plugin prepare-offline-pack logstash-filter-* logstash-input-beats
Packages the Beats input plugin and any dependencies.
Uses a wildcard to package all filter plugins and any dependencies.
Packages all filter plugins, the Beats input plugin, and any dependencies.
Private Gem Repository: The Logstash plugin manager connects to a Ruby gems repository to install and update Logstash plugins. By default, this repository is http://rubygems.org.
Some use cases are unable to use the default repository, as in the following examples:
A firewall blocks access to the default repository. You are developing your own plugins locally. Airgap requirements on the local system. When you use a custom gem repository, be sure to make plugin dependencies available.
Several open source projects enable you to run your own plugin server, among them:
Geminabox Gemirro Gemfury Artifactory Editing the Gemfileedit The gemfile is a configuration file that specifies information required for plugin management. Each gem file has a source line that specifies a location for plugin content.
By default, the gemfile’s source line reads:
source "https://rubygems.org" To change the source, edit the source line to contain your preferred source, as in the following example:
source "https://my.private.repository" After saving the new version of the gemfile, use plugin management commands normally.
The following links contain further material on setting up some commonly used repositories:
Geminabox Artifactory Running a rubygems mirror
Event API: This section is targeted for plugin developers and users of Logstash’s Ruby filter. Note that Accessing Event Data and Fields in the Configuration data flow in Logstash’s config files — using Field References — is not affected by this change, and will continue to use existing syntax.
Event Object: Event is the main object that encapsulates data flow internally in Logstash and provides an API for the plugin developers to interact with the event’s content. Typically, this API is used in plugins and in a Ruby filter to retrieve data and use it for transformations. Event object contains the original data sent to Logstash and any additional fields created during Logstash’s filter stages.
In 5.0, we’ve re-implemented the Event class and its supporting classes in pure Java. Since Event is a critical component in data processing, a rewrite in Java improves performance and provides efficient serialization when storing data on disk. For the most part, this change aims at keeping backward compatibility and is transparent to the users. To this extent we’ve updated and published most of the plugins in Logstash’s ecosystem to adhere to the new API changes. However, if you are maintaining a custom plugin, or have a Ruby filter, this change will affect you. The aim of this guide is to describe the new API and provide examples to migrate to the new changes.
Event APIedit Prior to version 5.0, developers could access and manipulate event data by directly using Ruby hash syntax. For example, event[field] = foo. While this is powerful, our goal is to abstract the internal implementation details and provide well-defined getter and setter APIs.
Get API
The getter is a read-only access of field-based data in an Event.
Syntax: event.get(field)
Returns: Value for this field or nil if the field does not exist. Returned values could be a string, numeric or timestamp scalar value.
field is a structured field sent to Logstash or created after the transformation process. field can also be a nested field reference such as [field][bar].
Examples:
event.get("foo" ) # => "baz" event.get("[foo]") # => "zab" event.get("[foo][bar]") # => 1 event.get("[foo][bar]") # => 1.0 event.get("[foo][bar]") # => [1, 2, 3] event.get("[foo][bar]") # => {"a" => 1, "b" => 2} event.get("[foo][bar]") # => {"a" => 1, "b" => 2, "c" => [1, 2]} Accessing @metadata
event.get("[@metadata][foo]") # => "baz" Set API
This API can be used to mutate data in an Event.
Syntax: event.set(field, value)
Returns: The current Event after the mutation, which can be used for chainable calls.
Examples:
event.set("foo", "baz") event.set("[foo]", "zab") event.set("[foo][bar]", 1) event.set("[foo][bar]", 1.0) event.set("[foo][bar]", [1, 2, 3]) event.set("[foo][bar]", {"a" => 1, "b" => 2}) event.set("[foo][bar]", {"a" => 1, "b" => 2, "c" => [1, 2]}) event.set("[@metadata][foo]", "baz") Mutating a collection after setting it in the Event has an undefined behaviour and is not allowed.
h = {"a" => 1, "b" => 2, "c" => [1, 2]} event.set("[foo][bar]", h)
h["c"] = [3, 4] event.get("[foo][bar][c]") # => undefined
Suggested way of mutating collections:
h = {"a" => 1, "b" => 2, "c" => [1, 2]} event.set("[foo][bar]", h)
h["c"] = [3, 4] event.set("[foo][bar]", h)
event.set("[foo][bar][c]", [3, 4]) Ruby Filteredit The Ruby Filter can be used to execute any ruby code and manipulate event data using the API described above. For example, using the new API:
filter { ruby { code => 'event.set("lowercase_field", event.get("message").downcase)' } } This filter will lowercase the message field, and set it to a new field called lowercase_field.
Integration Plugins: jdbc kafka rabbitmq
Integration plugins combine related plugins—inputs, outputs, and sometimes filters and codecs—into one package.
Input Plugins: Integration plugins combine related plugins—inputs, outputs, and sometimes filters and codecs—into one package. https://www.elastic.co/guide/en/logstash/current/plugins-inputs-s3.html