logstash kafka es - youngperson/study-100 GitHub Wiki

让我们回想一下之前的架构,数据量较少的时候我们是直接logstash把日志数据给到es、或者是logstash把日志数据给Redis,Redis在给到es,最终再由kinbana展示出来。前者es一旦挂掉,可能就会丢失数据了。后者redis的性能没有kafka的好。
于是,我们考虑利用 Kafka 作为缓冲区,第一步就是让 Logstash 把日志发送到 Kafka,这里 Logstash 相当于 producer。
在日志服务器上把kafka启动好,创建好topic。

kafka配置

# 在内网ELK机器上 server.properties 中 配置后重启zookeeper kafaka
# 内网IP10.111.11.120那么
listeners=PLAINTEXT://10.111.11.120:9092
# 对外暴露的IP:端口
advertised.listeners=PLAINTEXT://10.111.11.120:9092

# 对kafka的log目录进行创建并给权限
mkdir /data/kafka-data/kafka-logs
chmod -R 777 /data/kafka-data/kafka-logs

logstash->kafka

进入需要收集日志的服务器,cd /etc/logstash/conf.d/
vi log_to_kafka.conf
input {
	#test
	file {
		type => "test_log"
		path => "/data/wwwlogs/test.log"
	}
}

output {
	if [type] == "test_log" {
		# 输出到 Kafka,topic 名称为 test,地址为默认的端口号
		kafka {
			bootstrap_servers => "10.111.11.120:9092"
			topic_id => "test"
		}

		# 写入到redis一份,和上面对比
		redis {
			host => "10.111.11.120"
			port=> 6379
			data_type => "list"
			key => "test_log"
		}
	}
}

kafka->es

vi input_kafka_output_es.conf

input {
	kafka {
		zk_connect => "10.111.11.120:2181"
		type => "test_log"
		topic_id => test
	}
}

output {
	if [type] == "test_log" {
		 elasticsearch {
				hosts => ["10.111.11.120:9200"]
				index => "test_error_log_%{+YYYY.MM.dd}"
		 }
	}
}

测试

先把数据打入kafka,然后用kafka自带的消费命令去查看有无数据进入
查看kinbana看kafka的数据有没有进入到es