Сбор логов сетевых устройств - ESGuardian/LittleBeat GitHub Wiki
Вы можете собирать логи с сетевых устройств на Logstash. Можно использовать два способа отправки логов: сразу в Logstash или сначала на rsyslog. Рассмотрим это на примере сбора логов с коммутатора Cisco ASA. Наша целевая картинка будет вот такая:
Некоторые поля на картинке замазаны специально, поскольку она из реальной системы. И это только фрагмент реальной дашборд.
Первый способ. Отправляем лог сразу в Logstash
Сначала мы должны выбрать на какой порт Logstash будет принимать пакеты и по какому протоколу. В этом примере мы выбираем порт 10514, протокол UDP. Важно помнить, что Logstash не может использовать стандартные порты (0-1024), если ваше сетевое устройство не умеет отправлять лог на нестандартный порт, то вам надо воспользоваться вторым способом - приемом логов через rsyslog, он мало чем отличается и будет описан ниже. Теперь мы создадим конфиг для Logstash в котором опишем процесс приема, парсинга и вывода лога и поместим этот конфиг в /etc/logstash/conf.d/
образец такого конфига есть здесь 04-cisco.conf. Мы рассмотрим его по секциям.
Первая и самая простая секция Input
, в ней мы описываем прием лога.
input {
udp {
port => 10514
tags => ["cisco-asa"]
}
}
Обратите внимание на tags => ["cisco-asa"]
. Мы помечаем источник. Делать это обязательно. Дело в том, что Logstash склеивает все конфиги в один, а мы хотим по разному обрабатывать разные источники данных. Чтобы не залезть на "чужой" источник мы "прикрываем" свои процедуры обработки операторами if
. Как видно в следующей секции конфига:
filter {
if "cisco-asa" in [tags] {
grok {
patterns_dir => ["/etc/logstash/patterns"]
match => [
"message", "%{ESGCISCOFWUNKNOWN}"
]
}
grok {
patterns_dir => ["/etc/logstash/patterns"]
match => [
"log_msg", "%{ESGCISCOFW106001}",
"log_msg", "%{ESGCISCOFW106006_106007_106010}",
"log_msg", "%{ESGCISCOFW106014}",
"log_msg", "%{ESGCISCOFW106015}",
"log_msg", "%{ESGCISCOFW106021}",
"log_msg", "%{ESGCISCOFW106023}",
"log_msg", "%{ESGCISCOFW106100}",
"log_msg", "%{ESGCISCOFW110002}",
"log_msg", "%{ESGCISCOFW302010}",
"log_msg", "%{ESGCISCOFW302013_302014_302015_302016}",
"log_msg", "%{ESGCISCOFW302020_302021}",
"log_msg", "%{ESGCISCOFW305011}",
"log_msg", "%{ESGCISCOFW313001_313004_313008}",
"log_msg", "%{ESGCISCOFW402117}",
"log_msg", "%{ESGCISCOFW402119}",
"log_msg", "%{ESGCISCOFW419001}",
"log_msg", "%{ESGCISCOFW419002}",
"log_msg", "%{ESGCISCOFW500004}",
"log_msg", "%{ESGCISCOFW602303_602304}",
"log_msg", "%{ESGCISCOFW710001_710002_710003_710005_710006}",
"log_msg", "%{ESGCISCOFW713172}",
"log_msg", "%{ESGCISCOFW722051}",
"log_msg", "%{ESGCISCOFW722037}",
"log_msg", "%{ESGCISCOFW113019}",
"log_msg", "%{ESGCISCOFW7500_03_12}",
"log_msg", "%{ESGCISCOFW113005}",
"log_msg", "%{ESGCISCOFW733100}"
]
}
# Parse the syslog severity and facility
syslog_pri { }
geoip {
source => "src_ip"
target => "geoip"
add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}" ]
add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
add_field => [ "[geoip][geohash]", "%{[geoip][coordinates]}" ]
}
mutate {
remove_field => [ "message" ]
gsub => ["event-code","4-106023","Reject"]
gsub => ["event-code","4-419002","Duplicate TCP SYN"]
gsub => ["event-code","3-710003","Reject"]
gsub => ["event-code","2-106001","Reject"]
gsub => ["event-code","4-313005","ICMP Reject"]
gsub => ["event-code","3-313001","ICMP Reject"]
gsub => ["event-code","3-210007","LU allocate xlate failed"]
gsub => ["event-code","2-106017","Land Atack"]
gsub => ["event-code","4-722051","Remconn address assigned"]
gsub => ["event-code","4-113019","Remconn session disconnected"]
gsub => ["event-code","4-722037","Remconn closing connection"]
gsub => ["event-code","4-722041","Remconn IPv6 not available"]
gsub => ["event-code","3-713194","IKE delete"]
gsub => ["event-code","4-750003","IKEv2 Error"]
gsub => ["event-code","4-750012","IKEv2 Error"]
gsub => ["event-code","4-405001","ARP collision"]
gsub => ["event-code","4-113005","AAA user authentication Rejected"]
}
}
}
Это секция обработки лога. Здесь мы используем несколько плагинов Logstash предназначенных для обработки данных (filter plugins): grok, mutate, syslog_pri, geoip. Это стандартные "встроенные" плагины, они не требуют отдельной установки.
Плагин grok предназначен для разбора строк по регулярным выражениям, он использует паттерны для поиска совпадений. В Logstash есть свои стандартные паттерны для обработки логов оборудования Cisco, но они довольно примитивны и менее детальны, чем я хочу. Поэтому я использую свой собственный набор паттернов. Путь к нему я указываю в параметре вызова плагина patterns_dir => ["/etc/logstash/patterns"]
здесь, как понятно, можно указать список путей (правильнее сказать "массив путей"). Там лежит вот такой набор паттернов esguardian-ASA. Собственно плагин grok делает основную работу - разрезает лог на поля документа JSON.
Дальше плагин syslog_pri обрабатывает стандартный заголовок syslog, чтобы получить из него стандартные поля, например, timestamp.
Далее мы применяем плагин geoip, чтобы получить данные геолокации для src_ip. Хотим отмечать источники на карте в Kibana.
И наконец используем плагин mutate для замены кодов событий Cisco на более приятный нам текст.
Всё. с обработкой покончили. Теперь надо отправить результат в elasticsearch, чтобы он его проиндексировал и сохранил. Это делается в последней секции output
:
output {
if "cisco-asa" in [tags] {
#stdout { codec => rubydebug }
elasticsearch {
hosts => ["127.0.0.1:9200"]
index => "cisco-asa-%{+YYYY.MM.dd}"
document_type => "cisco-asa"
template => "/etc/logstash/templates/elastic-cisco-asa-template.json"
template_name => "cisco-asa"
template_overwrite => true
}
}
}
Заметьте, что здесь мы тоже делаем "обертку" из оператора if
. Причина всё та же, мы не хотим отправить в наш индекс чужие данные.
Здесь мы говорим, что хотим создавать новый индекс ежедневно и задаем шаблон его идентификатора index => "cisco-asa-%{+YYYY.MM.dd}"
. Далее мы говорим, что следует использовать специальный шаблон индекса, он задает так называемый mapping - соответствие полей типам принятым в elasticsearch и еще кое-какие параметры, например, количество шардов на индекс. Мой шаблон индекса лежит здесь elastic-cisco-asa-template.json.
Теперь мы можем рестартовать Logstash из SSH-консоли LittleBeat и включить на нашей Cisco-ASA отправку логов на порт 10514 сервера littlebeat по протоколу UDP.
Чтобы увидеть данные в интерфейсе Kibana нужно сначала создать в веб-интерфейсе LittleBeat index-pattern и собрать дашборд, как вам нравится.
Второй способ. Используем rsyslog.
Разница не большая. Настраиваем источник на отправку сообщений на стандартный порт syslog 514 на сервер LittleBeat. Затем конфигурируем rsyslog для "вырезки" логов в отдельный файл. Например, помещаем в /etc/rsyslog.d/
файл cisco-asa.conf
вот такого короткого содержания:
if ($fromhost-ip == '192.168.102.17') then /var/log/cisco-asa.log
Кстати, не забываем настроить logrotate, чтобы не забивать диск бесконечным логом. Например, помещаем в /etc/logrotate.d/
файл cisco-asa-rotate.conf
вот такого содержания:
/var/log/cisco-asa.log {
daily
rotate 5
compress
missingok
notifempty
copytruncate
create 640 logstash logstash
}
Обратите внимание вот на это: create 640 logstash logstash
. Юзер logstash
должен иметь права на чтение лога, лучше сразу сделать его владельцем файла.
Остальные действия не отличаются от вышеописанных, только секцию input
в 04-cisco.conf
надо написать так:
input {
file {
path => "/var/log/cisco-asa.log"
codec => "plain"
tags => ["cisco-asa"]
}
}
Вот и вся разница. Только не забудьте перед рестартом Logstash проверить права на файл /var/log/cisco-asa.log
. Юзер logstash
должен иметь права чтения, можно назначить права командой chown logstash:logstash /var/log/cisco-asa.log
выйдя в shell на SSH-консоли LittleBeat.