ELK日志收集实例(七)
#前面记录了一大波,这里记录一些小例子捋一下。
一、再以收集nginx为例
#前面记录了在Logstash端通过grok将日志转换成json形式,这里记录一种直接在客户端就将nginx转换成json形式。
1.1 Filebeat客户端的操作
nginx端的配置
# cat /usr/local/nginx/conf/nginx.conf
log_format main '$remote_addr - $remote_user [$time_local] "$request" ' #一份正常的 '$status $body_bytes_sent "$http_referer" ' '"$http_user_agent" "$http_x_forwarded_for"'; log_format json '{ "@timestamp": "$time_local", ' #一份json格式的 '"@fields": { ' '"remote_addr": "$remote_addr", ' '"remote_user": "$remote_user", ' '"body_bytes_sent": "$body_bytes_sent", ' '"request_time": "$request_time", ' '"status": "$status", ' '"request": "$request", ' '"request_method": "$request_method", ' '"http_referrer": "$http_referer", ' '"body_bytes_sent":"$body_bytes_sent", ' '"http_x_forwarded_for": "$http_x_forwarded_for", ' '"http_user_agent": "$http_user_agent" } }'; access_log elklogs/access_json.log json; #日志写两份 access_log logs/access.log main;
#mkdir /usr/log/nginx/elklogs -p
# /usr/local/nginx/sbin/nginx -t #测试没问题
nginx: the configuration file /usr/local/nginx/conf/nginx.conf syntax is ok nginx: configuration file /usr/local/nginx/conf/nginx.conf test is successful
# tail -f /usr/local/nginx/logs/access.log #客户端访问测试一下
192.168.14.50 - - [10/Dec/2017:20:10:26 +0800] "GET / HTTP/1.1" 200 612 "-" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/62.0.3202.75 Safari/537.36" "-" 192.168.14.50 - - [10/Dec/2017:20:10:27 +0800] "GET /favicon.ico HTTP/1.1" 404 571 "http://192.168.14.49/" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/62.0.3202.75 Safari/537.36" "-"
# tail -f /usr/local/nginx/elklogs/access_json.log
{ "@timestamp": "10/Dec/2017:20:10:26 +0800", "@fields": { "remote_addr": "192.168.14.50", "remote_user": "-", "body_bytes_sent": "612", "request_time": "0.000", "status": "200", "request": "GET / HTTP/1.1", "request_method": "GET", "http_referrer": "-", "body_bytes_sent":"612", "http_x_forwarded_for": "-", "http_user_agent": "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/62.0.3202.75 Safari/537.36" } } { "@timestamp": "10/Dec/2017:20:10:27 +0800", "@fields": { "remote_addr": "192.168.14.50", "remote_user": "-", "body_bytes_sent": "571", "request_time": "0.000", "status": "404", "request": "GET /favicon.ico HTTP/1.1", "request_method": "GET", "http_referrer": "http://192.168.14.49/", "body_bytes_sent":"571", "http_x_forwarded_for": "-", "http_user_agent": "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/62.0.3202.75 Safari/537.36" } }
#当然产生的新的日志呢可以用https://www.bejson.com/ #这种网站检验一下自己那个字段有问题,因为logstash是不做转换的,所以这个日志不是json信息就不好玩了。
Filebeat端的配置
# su - elk
$ mkdir beats #单独创建一个目录用来存放filebeat的文件
$ cat /home/elk/beats/filebeat.yaml
filebeat.prospectors: - input_type: log paths: - /usr/local/nginx/elklogs/access_json.log tags: ["nginx-access","test.json.51niux.com"] output.logstash: hosts: ["192.168.14.65:5066"] loadbalance: true compression_level: 6 name: "192.168.14.49"
$ cat /home/elk/beats/setup.py
#! /usr/bin/python import sys import subprocess import datetime if __name__ =='__main__': args = sys.argv[1:] print 'servie start at %s' % datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') while True: subprocess.Popen(args).wait() print 'servie restart at %s' % datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
$ cat /home/elk/beats/start.sh
nohup ./setup.py /home/elk/filebeat/filebeat -c /home/elk/beats/filebeat.yaml >/dev/null 2>&1 &
启动服务:
$ chmod go-w /home/elk/beats/filebeat.yaml
$ chmod +x setup.py
$ sh /home/elk/beats/start.sh
$ ps -ef|grep filebeat
elk 22302 1 0 20:45 pts/0 00:00:00 /usr/bin/python ./setup.py /home/elk/filebeat/filebeat -c /home/elk/beats/filebeat.yaml elk 22303 22302 0 20:45 pts/0 00:00:00 /home/elk/filebeat/filebeat -c /home/elk/beats/filebeat.yaml elk 22327 20930 0 20:46 pts/0 00:00:00 grep --color=auto filebeat
$ cat /home/elk/filebeat/data/registry
[{"source":"/usr/local/nginx/elklogs/access_json.log","offset":347601,"timestamp":"2017-12-10T21:13:36.480657975+08:00","ttl":-1,"type":"log","FileStateOS":{"inode":796457,"device":64770}}]
#这个文件相当于监控了文件以及偏移量有变化就会触发事件发送
博文来自:www.51niux.com
1.2 Logstash端的操作
$ cat /home/elk/logstash/conf/logstash-nginx.conf #input那里不用改没啥好filter的
output { if "nginx-access" in [tags]{ if "test.51niux.com" in [tags]{ elasticsearch { hosts => ["192.168.14.60","192.168.14.61","192.168.14.62","192.168.14.63","192.168.14.64"] index => "logstash-nginx-access-test-51niux-com-%{+YYYY.MM}" } } else if "test.json.51niux.com" in [tags] { #如果tags匹配到test.json.51niux.com elasticsearch { hosts => ["192.168.14.60","192.168.14.61","192.168.14.62","192.168.14.63","192.168.14.64"] index => "logstash-nginx-access-test-json-51niux-com-%{+YYYY.MM}" #就创建这个索引 } } } }
$ cat /home/elk/logstash/conf/setup.py
#! /usr/bin/python import sys import subprocess import datetime if __name__ =='__main__': args = sys.argv[1:] print 'servie start at %s' % datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') while True: subprocess.Popen(args).wait() print 'servie restart at %s' % datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
$ cat /home/elk/logstash/conf/logstash_start.sh
#export LS_HEAP_SIZE=8g #旧版本的用法现在已经不需要了 cd /home/elk/logstash/conf/ nohup ./setup.py /home/elk/logstash/bin/logstash -w 4 -b 10000 -f ./logstash-nginx.conf >/dev/null 2>&1 &
$ vim /home/elk/logstash/config/jvm.options
-Xms8g -Xmx8g
启动服务并验证:
$ chmod +x /home/elk/logstash/conf/setup.py
$ chmod +x /home/elk/logstash/conf/logstash_start.sh
$ /home/elk/logstash/conf/logstash_start.sh
$ ps -ef|grep logstash
elk 1527 1 0 21:27 pts/0 00:00:00 /usr/bin/python ./setup.py /home/elk/logstash/bin/logstash -w 4 -b 10000 -f ./logstash-nginx.conf elk 1528 1527 89 21:27 pts/0 00:01:59 /usr/java/jdk/bin/java -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=75 -XX:+UseCMSInitiatingOccupancyOnly -XX:+DisableExplicitGC -Djava.awt.headless=true -Dfile.encoding=UTF-8 -Djruby.compile.invokedynamic=true -XX:+HeapDumpOnOutOfMemoryError -Djava.security.egd=file:/dev/urandom -Xmx8g -Xms8g -Xss2048k -Djffi.boot.library.path=/home/elk/logstash-6.0.1/vendor/jruby/lib/jni -Xbootclasspath/a:/home/elk/logstash-6.0.1/vendor/jruby/lib/jruby.jar -classpath :/usr/java/jdk/lib/*.jar:/usr/java/jdk/jre/lib/*.jar -Djruby.home=/home/elk/logstash-6.0.1/vendor/jruby -Djruby.lib=/home/elk/logstash-6.0.1/vendor/jruby/lib -Djruby.script=jruby -Djruby.shell=/bin/sh org.jruby.Main /home/elk/logstash/lib/bootstrap/environment.rb logstash/runner.rb -w 4 -b 10000 -f ./logstash-nginx.conf
#-Xmx8g -Xms8g已生效。
1.3 随便写点日志查看一下filebeat是否上传信息上来
1.4 访问下Elasticsearch节点查看一下一下新的索引是否产生了
$ curl http://192.168.14.60:9200/_cat/indices?v #从结果看可以看到新的索引创建了
health status index uuid pri rep docs.count docs.deleted store.size pri.store.size green open .kibana L3u2SfDjSQGpQFR7fOI5DA 1 1 2 0 21.5kb 10.7kb green open logstash-nginx-access-test-51niux-com-2017.12 vXMhx2DERDOkP6DQAeny5g 5 1 20012 0 3.7mb 1.8mb green open logstash-nginx-access-test-json-51niux-com-2017.12 YmXRlohtR1OIEtFXIPxiGQ 5 1 1000 0 492.3kb 274.5kb
$ curl http://192.168.14.60:9200/_cat/health?v #查看下集群的健康状态
epoch timestamp cluster status node.total node.data shards pri relo init unassign pending_tasks max_task_wait_time active_shards_percent 1512913003 21:36:43 elasticsearch_51niux green 6 5 22 11 0 0 0 0 - 100.0%
二、ELK以系统用户登录为例实现rsylog日志收集
#首先先展示一下客户端还是log的type形式的收集
$ cat /home/elk/beats/filebeat.yaml
filebeat.prospectors: - input_type: log paths: - /usr/local/nginx/elklogs/access_json.log tags: ["nginx-access","test.json.51niux.com"] - input_type: log paths: - /var/log/secure tags: ["ssh-login","test.51niux.com"] output.logstash: hosts: ["192.168.14.65:5066"] loadbalance: true compression_level: 6 name: "192.168.14.49"
#这种形式呢,因为/var/log/secure权限比较低我们还得授权成elk用户有权限,这个日志还是轮询滚动的。
2.1 先跟着官网走一波syslog的input插件解释
系统日志输入配置选项:
#设置 #输入类型 facility_labels array host string locale string port number proxy_protocol boolean severity_labels array timezone string use_labels boolean
facility_labels:
值类型是数组。默认值是:["kernel", "user-level", "mail", "system", "security/authorization", "syslogd", "line printer", "network news", "UUCP", "clock", "security/authorization", "FTP", "NTP", "log audit", "log alert", "clock", "local0", "local1", "local2", "local3", "local4", "local5", "local6", "local7"]
host:
值类型是字符串。默认值是“0.0.0.0”监听的地址
locale:
值类型是字符串。这个设置没有默认值。使用IETF-BCP47或POSIX语言标记指定用于日期分析的区域设置。 简单的例子是en,美国的BCP47或者en_US的POSIX。 如果未指定,则将使用平台默认值。大多数情况下需要设置语言环境来解析月份名称(MMM模式)和星期几名称(EEE模式)。
port:
值类型是数字。默认值是514。端口监听。请记住,小于1024(特权端口)的端口可能需要使用root权限。
proxy_protocol:
值类型是布尔值。默认值是false。代理协议支持,目前只支持v1版http://www.haproxy.org/download/1.5/doc/proxy-protocol.txt
severity_labels:
值类型是数组。默认值是["Emergency", "Alert", "Critical", "Error", "Warning", "Notice", "Informational", "Debug"]。
timezone:
值类型是字符串。这个设置没有默认值。指定要用于日期分析的时区标准ID。 有效的ID在[Joda.org可用时区页面](http://joda-time.sourceforge.net/timezones.html)上列出。 如果时区无法从数值中提取,那么这非常有用,并且不是平台默认值。 如果没有指定,将使用平台默认值。 Canonical ID是很好的,因为它会为您节省夏令时。例如,America/Los_Angeles或Europe / France是有效的ID。
use_labels:
值类型是布尔值。默认值为true。使用标签分析严重性和设施级别。
通用选项:
所有输入插件都支持以下配置选项:
#设置 #输入类型 add_field hash codec codec enable_metric boolean id string tags array type string
add_field:
值类型是hash。默认值是{}。添加一个字段到一个事件
codec:
值类型是编解码器。默认值是“plain”。用于输入数据的编解码器。 输入编解码器是在数据进入输入之前解码数据的一种便捷方法,无需在Logstash管道中使用单独的过滤器。
enable_metric:
值类型是布尔值。默认值为true。为特定的插件实例禁用或启用度量标准日志记录,我们默认记录所有的度量标准,但是您可以禁用特定插件的度量标准收集。
id:
值类型是string。这个设置没有默认值。为插件配置添加一个唯一的ID。 如果没有指定ID,Logstash将会生成一个。 强烈建议在您的配置中设置此ID。 如果您有两个或多个相同类型的插件,例如,如果您有两个系统日志输入,则此功能特别有用。 在这种情况下添加一个命名的ID将有助于在使用监视API时监视Logstash。
input { syslog { id => "my_plugin_id" } }
tags:
值类型是数组。这个设置没有默认值。添加任意数量的任意标签到您的事件
type:
值类型是字符串。这个设置没有默认值。将类型字段添加到由此输入处理的所有事件。类型主要用于过滤器激活。
正则:https://github.com/kkos/oniguruma/blob/master/doc/RE
======logstash已经自带了不少的正则,如果想偷懒的话,可以在内置正则里借用下。===============
USERNAME [a-zA-Z0-9._-]+ USER %{USERNAME} INT (?:[+-]?(?:[0-9]+)) BASE10NUM (?<![0-9.+-])(?>[+-]?(?:(?:[0-9]+(?:\.[0-9]+)?)|(?:\.[0-9]+))) NUMBER (?:%{BASE10NUM}) BASE16NUM (?<![0-9A-Fa-f])(?:[+-]?(?:0x)?(?:[0-9A-Fa-f]+)) BASE16FLOAT \b(?<![0-9A-Fa-f.])(?:[+-]?(?:0x)?(?:(?:[0-9A-Fa-f]+(?:\.[0-9A-Fa-f]*)?)|(?:\.[0-9A-Fa-f]+)))\b POSINT \b(?:[1-9][0-9]*)\b NONNEGINT \b(?:[0-9]+)\b WORD \b\w+\b NOTSPACE \S+ SPACE \s* DATA .*? GREEDYDATA .* QUOTEDSTRING (?>(?<!\\)(?>”(?>\\.|[^\\"]+)+”|”"|(?>’(?>\\.|[^\\']+)+’)|”|(?>(?>\\.|[^\]+)+)|`)) UUID [A-Fa-f0-9]{8}-(?:[A-Fa-f0-9]{4}-){3}[A-Fa-f0-9]{12} # Networking MAC (?:%{CISCOMAC}|%{WINDOWSMAC}|%{COMMONMAC}) CISCOMAC (?:(?:[A-Fa-f0-9]{4}\.){2}[A-Fa-f0-9]{4}) WINDOWSMAC (?:(?:[A-Fa-f0-9]{2}-){5}[A-Fa-f0-9]{2}) COMMONMAC (?:(?:[A-Fa-f0-9]{2}:){5}[A-Fa-f0-9]{2}) IPV6 ((([0-9A-Fa-f]{1,4}:){7}([0-9A-Fa-f]{1,4}|:))|(([0-9A-Fa-f]{1,4}:){6}(:[0-9A-Fa-f]{1,4}|((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){5}(((:[0-9A-Fa-f]{1,4}){1,2})|:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){4}(((:[0-9A-Fa-f]{1,4}){1,3})|((:[0-9A-Fa-f]{1,4})?:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){3}(((:[0-9A-Fa-f]{1,4}){1,4})|((:[0-9A-Fa-f]{1,4}){0,2}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){2}(((:[0-9A-Fa-f]{1,4}){1,5})|((:[0-9A-Fa-f]{1,4}){0,3}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){1}(((:[0-9A-Fa-f]{1,4}){1,6})|((:[0-9A-Fa-f]{1,4}){0,4}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(:(((:[0-9A-Fa-f]{1,4}){1,7})|((:[0-9A-Fa-f]{1,4}){0,5}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:)))(%.+)? IPV4 (?<![0-9])(?:(?:25[0-5]|2[0-4][0-9]|[0-1]?[0-9]{1,2})[.](?:25[0-5]|2[0-4][0-9]|[0-1]?[0-9]{1,2})[.](?:25[0-5]|2[0-4][0-9]|[0-1]?[0-9]{1,2})[.](?:25[0-5]|2[0-4][0-9]|[0-1]?[0-9]{1,2}))(?![0-9]) IP (?:%{IPV6}|%{IPV4}) HOSTNAME \b(?:[0-9A-Za-z][0-9A-Za-z-]{0,62})(?:\.(?:[0-9A-Za-z][0-9A-Za-z-]{0,62}))*(\.?|\b) HOST %{HOSTNAME} IPORHOST (?:%{HOSTNAME}|%{IP}) HOSTPORT (?:%{IPORHOST=~/\./}:%{POSINT}) # paths PATH (?:%{UNIXPATH}|%{WINPATH}) UNIXPATH (?>/(?>[\w_%!$@:.,-]+|\\.)*)+ TTY (?:/dev/(pts|tty([pq])?)(\w+)?/?(?:[0-9]+)) WINPATH (?>[A-Za-z]+:|\\)(?:\\[^\\?*]*)+ URIPROTO [A-Za-z]+(\+[A-Za-z+]+)? URIHOST %{IPORHOST}(?::%{POSINT:port})? # uripath comes loosely from RFC1738, but mostly from what Firefox # doesn’t turn into %XX URIPATH (?:/[A-Za-z0-9$.+!*'(){},~:;=@#%_\-]*)+ #URIPARAM \?(?:[A-Za-z0-9]+(?:=(?:[^&]*))?(?:&(?:[A-Za-z0-9]+(?:=(?:[^&]*))?)?)*)? URIPARAM \?[A-Za-z0-9$.+!*’|(){},~@#%&/=:;_?\-\[\]]* URIPATHPARAM %{URIPATH}(?:%{URIPARAM})? URI %{URIPROTO}://(?:%{USER}(?::[^@]*)?@)?(?:%{URIHOST})?(?:%{URIPATHPARAM})? # Months: January, Feb, 3, 03, 12, December MONTH \b(?:Jan(?:uary)?|Feb(?:ruary)?|Mar(?:ch)?|Apr(?:il)?|May|Jun(?:e)?|Jul(?:y)?|Aug(?:ust)?|Sep(?:tember)?|Oct(?:ober)?|Nov(?:ember)?|Dec(?:ember)?)\b MONTHNUM (?:0?[1-9]|1[0-2]) MONTHDAY (?:(?:0[1-9])|(?:[12][0-9])|(?:3[01])|[1-9]) # Days: Monday, Tue, Thu, etc… DAY (?:Mon(?:day)?|Tue(?:sday)?|Wed(?:nesday)?|Thu(?:rsday)?|Fri(?:day)?|Sat(?:urday)?|Sun(?:day)?) # Years? YEAR (?>\d\d){1,2} HOUR (?:2[0123]|[01]?[0-9]) MINUTE (?:[0-5][0-9]) # ’60′ is a leap second in most time standards and thus is valid. SECOND (?:(?:[0-5][0-9]|60)(?:[:.,][0-9]+)?) TIME (?!<[0-9])%{HOUR}:%{MINUTE}(?::%{SECOND})(?![0-9]) # datestamp is YYYY/MM/DD-HH:MM:SS.UUUU (or something like it) DATE_US %{MONTHNUM}[/-]%{MONTHDAY}[/-]%{YEAR} DATE_EU %{MONTHDAY}[./-]%{MONTHNUM}[./-]%{YEAR} ISO8601_TIMEZONE (?:Z|[+-]%{HOUR}(?::?%{MINUTE})) ISO8601_SECOND (?:%{SECOND}|60) TIMESTAMP_ISO8601 %{YEAR}-%{MONTHNUM}-%{MONTHDAY}[T ]%{HOUR}:?%{MINUTE}(?::?%{SECOND})?%{ISO8601_TIMEZONE}? DATE %{DATE_US}|%{DATE_EU} DATESTAMP %{DATE}[- ]%{TIME} TZ (?:[PMCE][SD]T|UTC) DATESTAMP_RFC822 %{DAY} %{MONTH} %{MONTHDAY} %{YEAR} %{TIME} %{TZ} DATESTAMP_OTHER %{DAY} %{MONTH} %{MONTHDAY} %{TIME} %{TZ} %{YEAR} # Syslog Dates: Month Day HH:MM:SS SYSLOGTIMESTAMP %{MONTH} +%{MONTHDAY} %{TIME} PROG (?:[\w._/%-]+) SYSLOGPROG %{PROG:program}(?:\[%{POSINT:pid}\])? SYSLOGHOST %{IPORHOST} SYSLOGFACILITY <%{NONNEGINT:facility}.%{NONNEGINT:priority}> HTTPDATE %{MONTHDAY}/%{MONTH}/%{YEAR}:%{TIME} %{INT} # Shortcuts QS %{QUOTEDSTRING} # Log formats SYSLOGBASE %{SYSLOGTIMESTAMP:timestamp} (?:%{SYSLOGFACILITY} )?%{SYSLOGHOST:logsource} %{SYSLOGPROG}: COMMONAPACHELOG %{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] “(?:%{WORD:verb} %{NOTSPACE:request}(?: HTTP/%{NUMBER:httpversion})?|%{DATA:rawrequest})” %{NUMBER:response} (?:%{NUMBER:bytes}|-) COMBINEDAPACHELOG %{COMMONAPACHELOG} %{QS:referrer} %{QS:agent} # Log Levels LOGLEVEL ([A-a]lert|ALERT|[T|t]race|TRACE|[D|d]ebug|DEBUG|[N|n]otice|NOTICE|[I|i]nfo|INFO|[W|w]arn?(?:ing)?|WARN?(?:ING)?|[E|e]rr?(?:or)?|ERR?(?:OR)?|[C|c]rit?(?:ical)?|CRIT?(?:ICAL)?|[F|f]atal|FATAL|[S|s]evere|SEVERE|EMERG(?:ENCY)?|[Ee]merg(?:ency)?)
===========================================================
看一个官网处理Syslog消息的例子
Syslog是Logstash最常见的用例之一,它处理得非常好(只要日志行大致符合RFC3164)。 Syslog是事实上的UNIX联网日志记录标准,通过rsyslog将消息从客户端机器发送到本地文件或发送到集中式日志服务器。 对于这个例子,你将不需要一个正常工作的系统日志实例; 我们将从命令行中伪造它,以便你了解发生的情况。首先,我们为Logstash + syslog创建一个简单的配置文件,名为logstash-syslog.conf。
input { tcp { port => 5000 type => syslog } udp { port => 5000 type => syslog } } filter { if [type] == "syslog" { grok { match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" } add_field => [ "received_at", "%{@timestamp}" ] add_field => [ "received_from", "%{host}" ] } date { match => [ "syslog_timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ] } } } output { elasticsearch { hosts => ["localhost:9200"] } stdout { codec => rubydebug } }
用这个新配置运行Logstash:
bin/logstash -f logstash-syslog.conf
通常,客户端机器将连接到端口5000上的Logstash实例并发送其消息。 在这个例子中,我们将telnet到Logstash并输入一个日志行(类似于我们之前在日志中输入日志行的方式)。 打开另一个shell窗口与Logstash syslog输入交互并输入以下命令:
telnet localhost 5000
复制并粘贴以下行作为示例。 (随意尝试一些你自己的,但请记住,如果grok过滤器不正确的数据,他们可能不会解析)。
Dec 23 12:11:43 louis postfix/smtpd[31499]: connect from unknown[95.75.93.154] Dec 23 14:42:56 louis named[16000]: client 199.48.164.7#64817: query (cache) 'amsterdamboothuren.com/MX/IN' denied Dec 23 14:30:01 louis CRON[619]: (www-data) CMD (php /usr/share/cacti/site/poller.php >/dev/null 2>/var/log/cacti/poller-error.log) Dec 22 18:28:06 louis rsyslogd: [origin software="rsyslogd" swVersion="4.2.0" x-pid="2253" x-info="http://www.rsyslog.com"] rsyslogd was HUPed, type 'lightweight'.
现在,应该在原始shell中看到Logstash的输出,因为它处理和分析消息!
{ "message" => "Dec 23 14:30:01 louis CRON[619]: (www-data) CMD (php /usr/share/cacti/site/poller.php >/dev/null 2>/var/log/cacti/poller-error.log)", "@timestamp" => "2013-12-23T22:30:01.000Z", "@version" => "1", "type" => "syslog", "host" => "0:0:0:0:0:0:0:1:52617", "syslog_timestamp" => "Dec 23 14:30:01", "syslog_hostname" => "louis", "syslog_program" => "CRON", "syslog_pid" => "619", "syslog_message" => "(www-data) CMD (php /usr/share/cacti/site/poller.php >/dev/null 2>/var/log/cacti/poller-error.log)", "received_at" => "2013-12-23 22:49:22 UTC", "received_from" => "0:0:0:0:0:0:0:1:52617", "syslog_severity_code" => 5, "syslog_facility_code" => 1, "syslog_facility" => "user-level", "syslog_severity" => "notice" }
博文来自:www.51niux.com
2.2 rsyslog客户端的操作
#syslog发送日志可以查看:https://blog.51niux.com/?id=108
# vim /etc/rsyslog.conf #除了本地记录一份以外,还给远端的logstash服务器发送一份登录信息
authpriv.* /var/log/secure authpriv.* @192.168.14.65:1514 #因为1024端口以内的root用户权限,Logstash服务端是普通用户
# service rsyslog restart
2.3 Logstash服务端的配置
$ cat /home/elk/logstash/patterns/ssh
input { syslog{ type => "system-syslog" port => 1514 } } output { stdout { codec => rubydebug } }
#可以先测试一下能够收到信息。
$ /home/elk/logstash/bin/logstash -f /home/elk/logstash/conf/ssh.conf #找台机器登录下syslog客户端。
[2017-12-11T00:20:37,452][INFO ][logstash.pipeline ] Pipeline started {"pipeline.id"=>"main"} [2017-12-11T00:20:37,460][INFO ][logstash.inputs.syslog ] Starting syslog tcp listener {:address=>"0.0.0.0:1514"} [2017-12-11T00:20:37,460][INFO ][logstash.inputs.syslog ] Starting syslog udp listener {:address=>"0.0.0.0:1514"} [2017-12-11T00:20:37,468][INFO ][logstash.agent ] Pipelines running {:count=>1, :pipelines=>["main"]} { "severity" => 6, "pid" => "27976", "program" => "sshd", "message" => "Accepted password for root from 192.168.14.51 port 52470 ssh2", "type" => "system-syslog", "priority" => 86, "logsource" => "master", "@timestamp" => 2017-12-10T16:20:57.000Z, "@version" => "1", "host" => "192.168.14.49", "facility" => 10, "severity_label" => "Informational", "timestamp" => "Dec 11 00:20:57", "facility_label" => "security/authorization" } { "severity" => 6, "pid" => "27976", "program" => "sshd", "message" => "pam_unix(sshd:session): session opened for user root by (uid=0)", "type" => "system-syslog", "priority" => 86, "logsource" => "master", "@timestamp" => 2017-12-10T16:20:57.000Z, "@version" => "1", "host" => "192.168.14.49", "facility" => 10, "severity_label" => "Informational", "timestamp" => "Dec 11 00:20:57", "facility_label" => "security/authorization" }
#这个已经可以插了,如下图,偷偷的插了一下:
#好的忘记上面那张图,现在我们已经测试可以收到客户端通过1514端口发过来的rsyslog的ssh登录日志信息了。正规的走一下。
$ cat /home/elk/logstash/patterns/ssh #下面是啥,通俗来说下面就是一个变量值跟着一串正则表达式跟来源的信息能匹配上
SECURELOG %{WORD:status} password for ?(invalid user)? %{WORD:FromUSER} from %{DATA:FromIP} port SYSLOGPAMSESSION %{SYSLOGBASE} (?=%{GREEDYDATA:message})%{WORD:pam_module}\(%{DATA:pam_caller}\): session %{WORD:pam_session_state} for user %{USERNAME:username}(?: by %{GREEDYDATA:pam_by})? SYSLOGBASE2 (?:%{SYSLOGFACILITY} )?%{SYSLOGHOST:logsource} %{SYSLOGPROG}:
$ vim /home/elk/logstash/conf/ssh.conf
input { syslog{ type => "system-syslog" port => 1514 } } filter { if [type] == "system-syslog" { grok { match => { "message" => "%{SYSLOGPAMSESSION}" } match => { "message" => "%{SECURELOG}" } match => { "message" => "%{SYSLOGBASE2}" } } if ([status] == "Accepted") { mutate { add_tag => ["Success"] } } else if ([status] == "Failed") { mutate { add_tag => ["Failed"] } } } } output { #stdout { codec => rubydebug } #可以有多个output的,如果这里取消注释也会打印到屏幕一份信息 elasticsearch { hosts => ["192.168.14.60","192.168.14.61","192.168.14.62","192.168.14.63","192.168.14.64"] index => "ssh-%{+YYYY.MM}" } }
$ /home/elk/logstash/bin/logstash -f /home/elk/logstash/conf/ssh.conf #我这里就前台启动了,为了查看有什么问题。
[2017-12-11T01:17:18,362][INFO ][logstash.pipeline ] Pipeline started {"pipeline.id"=>"main"} [2017-12-11T01:17:18,379][INFO ][logstash.inputs.syslog ] Starting syslog udp listener {:address=>"0.0.0.0:1514"} [2017-12-11T01:17:18,379][INFO ][logstash.inputs.syslog ] Starting syslog tcp listener {:address=>"0.0.0.0:1514"} [2017-12-11T01:17:18,379][INFO ][logstash.agent ] Pipelines running {:count=>1, :pipelines=>["main"]} { "severity" => 0, "message" => "<86>Dec 11 01:17:19 master sshd[29602]: Accepted password for root from 192.168.14.54 port 37966 ssh2", "type" => "system-syslog", "priority" => 0, "tags" => [ [0] "_grokparsefailure_sysloginput", [1] "Success" #测试了一个登陆,因为匹配上了Acceptd,所以状态是Success。 ], "@timestamp" => 2017-12-10T17:17:18.740Z, "@version" => "1", "host" => "192.168.14.49", "FromIP" => "192.168.14.54", #来源IP也抓出来了 "facility" => 0, "severity_label" => "Emergency", "facility_label" => "kernel", "status" => "Accepted", "FromUSER" => "root" #来源用户也抓出来了。 }
2.4 kibana的操作
#在kibana上面加上这个索引,kinaba不会自动加载索引的,需要我们手工添加。当然如果你的日志信息比较单一比如都是web信息,可以直接添加一个*的索引。
#kibana上面效果也出现了。
#点击左边的选中一个字段可以看到字段里面内容的比例。
三、ELK收集TCP日志
#是否可以通过tcp端口将日志传输过来呢,是可以的。
3.1 先把Tcp input plugin翻译一波
#官网链接:https://www.elastic.co/guide/en/logstash/current/plugins-inputs-tcp.html
描述:
通过TCP套接字读取事件。像标准输入和文件输入一样,每个事件都被假定为一行文本。可以接受来自客户端的连接或连接到服务器,具体取决于模式。
##接受log4j2日志
Log4j2可以通过套接字发送JSON,我们可以使用它与我们的tcp输入结合来接受日志。首先,我们需要配置你的应用程序来通过套接字发送JSON日志。 以下的log4j2.xml完成这个任务。请注意,您需要更改此配置中的主机和端口设置以符合您的需求。
<Configuration> <Appenders> <Socket name="Socket" host="localhost" port="12345"> <JsonLayout compact="true" eventEol="true" /> </Socket> </Appenders> <Loggers> <Root level="info"> <AppenderRef ref="Socket"/> </Root> </Loggers> </Configuration>
要在Logstash中接受这个,你需要tcp输入和日期过滤器:
input { tcp { port => 12345 codec => json } }
并添加一个日期过滤器来获取log4j2的timeMillis字段,并将其用作事件时间戳
filter { date { match => [ "timeMillis", "UNIX_MS" ] } }
Tcp输入配置选项:
该插件支持以下配置选项和稍后介绍的通用选项。
host #值类型是字符串,默认值是“0.0.0.0”.当模式是服务器时,要监听的地址。 当模式是客户端时,连接的地址。 mode #值可以是任何:server, client,默认值是“server”。服务器侦听客户端连接,客户端连接服务器。 port #这是一个必需的设置。值类型是数字。这个设置没有默认值。当模式是服务器,端口监听。 当模式是客户端时,连接到的端口。 proxy protocol #值类型是布尔值,默认值是false,代理协议支持,目前仅支持v1。 ssl_cert #值类型是路径,这个设置没有默认值。SSL证书路径 ssl_enable #值类型是布尔值,默认值是false。启用SSL(必须设置为其他ssl_选项才能生效)。 ssl_extra_chain_certs #值类型是数组,默认值是[]。要添加到证书链的额外X509证书的数组。 在系统商店中不需要CA链时很有用。 ssl_key #值类型是路径,这个设置没有默认值。SSL关键路径 ssl_key_passphrase #值类型是密码,默认值是零。SSL密钥密码 ssl_verify #值类型是布尔值,默认值为true。验证SSL连接的另一端对CA的身份。 对于输入,将字段sslsubject设置为客户端证书的字段。
通用选项:
所有输入插件都支持以下配置选项:
细节:
add_field #值类型是hash.默认值是{}.添加一个字段到一个事件 codec #值类型是codec,默认值是“plain”。用于输入数据的编解码器。 输入编解码器是在数据进入输入之前解码数据的一种便捷方法,无需在Logstash管道中使用单独的筛选器。 enable_metric #值类型是布尔值。默认值为true。为特定的插件实例禁用或启用度量标准日志记录,我们默认记录所有的度量标准,但是您可以禁用特定插件的度量标准收集。 id #值类型是字符串,这个设置没有默认值。为插件配置添加一个唯一的ID。 如果没有指定ID,Logstash将会生成一个。 强烈建议在您的配置中设置此ID。 当你有两个或多个相同类型的插件时,这是特别有用的,例如,如果你有2个tcp输入。 在这种情况下添加一个命名的ID将有助于在使用监视API时监视Logstash。 tags #值类型是数组,这个设置没有默认值。添加任意数量的任意标签到您的事件。 type #值类型是字符串,这个设置没有默认值。将类型字段添加到由此输入处理的所有事件。
3.2 Logstash端的配置
$ cat /home/elk/logstash/conf/tcp.conf
input { tcp { type => "tcp" port => "6666" mode => "server" } } filter { } output { stdout { codec => rubydebug } }
$ /home/elk/logstash/bin/logstash -f /home/elk/logstash/conf/tcp.conf #启动一下。
客户端写入测试:
现在客户端就不需要安装filebeat或者logstash了,直接往IP:端口发送日志事件就行了。
#yum install nc -y
# tail -f /var/log/messages |nc 192.168.14.13 6666 #这种就相当于实时的监控一个日志文件,当然也可以变成cat文件
{ "@timestamp" => 2017-12-16T07:52:56.353Z, "port" => 39150, "@version" => "1", "host" => "192.168.14.13", "@metdata" => { "ip_address" => "192.168.14.13" }, "message" => "Dec 16 15:52:56 controller systemd: Stopping OpenSSH server daemon...", "type" => "tcp" } { "@timestamp" => 2017-12-16T07:52:56.355Z, "port" => 39150, "@version" => "1", "host" => "192.168.14.13", "@metdata" => { "ip_address" => "192.168.14.13" }, "message" => "Dec 16 15:52:56 controller systemd: Starting OpenSSH server daemon...", "type" => "tcp" } { "@timestamp" => 2017-12-16T07:52:56.363Z, "port" => 39150, "@version" => "1", "host" => "192.168.14.13", "@metdata" => { "ip_address" => "192.168.14.13" }, "message" => "Dec 16 15:52:56 controller systemd: Started OpenSSH server daemon.", "type" => "tcp" }
#手工重启下sshd服务,上面是logstash的前台输出。
# nc 192.168.14.13 6666 </var/log/secure #这是第二种方式,将一个文件的内容通过192.168.14.13:6666把日志传输。
# echo "Hello,wo am is 51niux.com" > /dev/tcp/192.168.14.13/6666 #这是伪终端的方式。
{ "@timestamp" => 2017-12-16T07:58:08.813Z, "port" => 40730, "@version" => "1", "host" => "network02", "@metdata" => { "ip_address" => "192.168.14.9" }, "message" => "Hello,wo am is 51niux.com", "type" => "tcp" }
#这是Logstash的输出,可以看到来源IP是非本机,然后message消息正好是我们echo的那句话。
3.3 Redis output plugin(先跟着官网捋一下)
#该输出将使用RPUSH将事件发送到Redis队列。建议使用2.6.0+版本。更多的信息了解redis官网:http://redis.io/
Redis输出配置选项:
该插件支持以下配置选项和稍后介绍的通用选项。
#Setting #Input type #Required batch boolean No batch_events number No batch_timeout number No congestion_interval number No congestion_threshold number No data_type string, one of ["list", "channel"] No db number No host array No key string No password password No port number No reconnect_interval number No shuffle_hosts boolean No timeout number No
选项解释:
batch #值类型是布尔值,默认值是false。如果希望Redis对值进行批处理并发送1个RPUSH命令(而不是每个值的一个命令),则设置为true。 请注意,这只适用于data_type =“list”模式。 #如果为true,则每发送一次“batch_events”事件或“batch_timeout”秒(以先到者为准),发送RPUSH。 只有data_type支持“list”。 batch_events #值类型是数字,默认值是50。如果batch设置为true,则我们排队等待RPUSH的事件数量。 batch_timeout #值类型是数字,默认值是5.如果batch设置为true,则在有未决事件刷新时RPUSH命令之间的最大时间量。 congestion_interval #值类型是数字,默认值是1。多久检查一次拥堵。 默认是一秒钟。 零意味着检查每一个事件。 congestion_threshold #值类型是数字,默认值是0。如果Redis的data_type是列表并且拥有多个@congestion_threshold项,则阻塞直到有人使用它们并减少拥塞, #否则,如果没有使用者,则除非使用OOM保护配置,否则Redis将耗尽内存。 但即使使用OOM保护,单个Redis列表也可以阻止Redis的所有其他用户,直到Redis CPU消耗达到允许的最大RAM大小。 默认值为0意味着此限制被禁用。 仅支持列表Redis data_type。 data_type #值可以是以下任何一种:list, channel,这个设置没有默认值。无论是列表还是频道。 如果data_type是list,那么我们将RPUSH设置为key。 如果data_type是频道,那么我们将PUBLISH键。 db #值类型是数字,默认值是0.Redis数据库编号。 host #值类型是数组,默认值是[“127.0.0.1”].您的Redis服务器的主机名。 可以在任何主机名上指定端口,这将覆盖全局端口配置。 如果主机列表是一个数组,Logstash将选择一个随机主机来连接,如果主机断开连接,它将选择另一个主机。 key #值类型是字符串,这个设置没有默认值。Redis列表或频道的名称。 动态名称在这里有效,例如logstash-%{type}。 password #值类型是密码,这个设置没有默认值。密码进行身份验证。 没有默认的身份验证。 port #值类型是数字,默认值是6379。连接的默认端口。 可以在任何主机名上覆盖。 reconnect_interval #值类型是数字,默认值是1。重新连接到失败的Redis连接的时间间隔 shuffle_hosts #值类型是布尔值,默认值为true。在Logstash启动过程中随机清除主机列表。 timeout #值类型是数字,默认值是5。Redis的初始连接超时秒数。
通用选项
所有输出插件都支持以下配置选项:
codec #默认是非必需的。值类型是codec。默认值是“plain”。用于输出数据的编解码器。 输出编解码器是在离开输出之前对数据进行编码的一种便捷方法,无需在Logstash管道中使用单独的筛选器。 enable_metric #值类型是布尔值,默认值为true。为特定的插件实例禁用或启用度量标准日志记录,我们默认记录所有的度量标准,但是您可以禁用特定插件的度量标准收集。 id #值类型是字符串,这个设置没有默认值。为插件配置添加一个唯一的ID。如果没有指定ID,Logstash将会生成一个。 强烈建议在配置中设置此ID。 当你有两个或多个相同类型的插件时,这是特别有用的, #例如,如果你有两个redis输出。 在这种情况下添加一个命名的ID将有助于在使用监视API时监视Logstash。如:output { redis { id => "my_plugin_id" }}
3.4 将tcp收集到的信息以频道的形式写入redis
Logstash配置:
$ cat /home/elk/logstash/conf/tcp_redis.conf
input { tcp { type => "tcp" port => "6666" mode => "server" } } filter { } output { #stdout { # codec => rubydebug #} redis { host => "192.168.14.13" port => "6379" db => "1" #这里是redis数据的编号,redis一般是0-15的数据库编号也就是16个数据库 key => "logstash_list_0" #这里就是发布通道的名称 data_type => channel #主要是这里啊,两种形式,这里设置的是频道模式,那么redis就相当于一个消息队列服务了,就是发布/订阅 #password => "51niux.com" #如果redis设置了密码这里还要输入密码 } }
$ /home/elk/logstash/bin/logstash -f /home/elk/logstash/conf/tcp_redis.conf #启动服务
客户端测试:
# echo "one.Hello,wo am is 51niux.com" |nc 192.168.14.13 6666
# echo "two.OK I know you are 51niux.com" |nc 192.168.14.13 6666
redis服务器上面查看:
# /usr/local/redis/src/redis-cli -p 6379 -h 192.168.14.13
192.168.14.13:6379> SELECT 1 #因为我们是选的编号是1的数据库所以要使用编号是1的数据库 OK
192.168.14.13:6379[1]> SUBSCRIBE logstash_list_0 #实时的读取日志信息
Reading messages... (press Ctrl-C to quit) 1) "subscribe" 2) "logstash_list_0" 3) (integer) 1 1) "message" 2) "logstash_list_0" 3) "{\"@timestamp\":\"2017-12-16T09:33:10.007Z\",\"port\":40766,\"@version\":\"1\",\"host\":\"network02\",\"@metdata\":{\"ip_address\":\"192.168.14.9\"},\"message\":\"one.Hello,wo am is 51niux.com\",\"type\":\"tcp\"}" 1) "message" 2) "logstash_list_0" 3) "{\"@timestamp\":\"2017-12-16T09:33:29.027Z\",\"port\":40768,\"@version\":\"1\",\"host\":\"network02\",\"@metdata\":{\"ip_address\":\"192.168.14.9\"},\"message\":\"two.OK I know you are 51niux.com\",\"type\":\"tcp\"}"
#从上如可以看到消息发送到redis是OK的。
#当然上面模式就是一个生产消费模式,redis本身并不会把这些消息存储下来也就是说消费了这些消息,消息也就没有了。
3.5 将TCP收到的信息以列表的形式写入到Redis中
#如果要将消息保存下来呢,就要使用data_type => list这种模式,下面一个简单的小例子,就写output配置了:
output { stdout { codec => rubydebug } redis { host => "192.168.14.13" port => "6379" db => "1" key => "logstash_%{+YYYY.MM.dd}" #类似于这里key这里可以搞一个变量的形式,比如按天 data_type => list #这里改为list,列表类型 #password => "51niux.com" } }
# echo "two.OK I know you are 51niux.com" |nc 192.168.14.13 6666 #客户端再写两条测试语句
# echo "lai lai lai hai yi bo" |nc 192.168.14.13 6666
192.168.14.13:6379[1]> KEYS * #可以看到过了一个后缀是时间格式的键,下面是输出 1) "logstash_2017.12.16" 192.168.14.13:6379[1]> get logstash_2017.12.16 #不能这样搞哦,因为这不是一个简单的键值对的形式,下面是摆错输出 (error) WRONGTYPE Operation against a key holding the wrong kind of value 192.168.14.13:6379[1]> type logstash_2017.12.16 #查看一下键的类型可以看到它是一个列表的形式,下面是输出 list 192.168.14.13:6379[1]> LRANGE logstash_2017.12.16 0 -1 #我们就来获取下这个列表的所有信息吧,下面是输出结果 1) "{\"@timestamp\":\"2017-12-16T09:43:06.817Z\",\"port\":40770,\"@version\":\"1\",\"host\":\"network02\",\"@metdata\":{\"ip_address\":\"192.168.14.9\"},\"message\":\"two.OK I know you are 51niux.com\",\"type\":\"tcp\"}" 2) "{\"@timestamp\":\"2017-12-16T09:43:34.499Z\",\"port\":40772,\"@version\":\"1\",\"host\":\"network02\",\"@metdata\":{\"ip_address\":\"192.168.14.9\"},\"message\":\"lai lai lai hai yi bo\",\"type\":\"tcp\"}"