柴少鹏的官方网站 技术在分享中进步,水平在学习中升华

ELK日志收集实例(七)

#前面记录了一大波,这里记录一些小例子捋一下。

一、再以收集nginx为例

#前面记录了在Logstash端通过grok将日志转换成json形式,这里记录一种直接在客户端就将nginx转换成json形式。

1.1 Filebeat客户端的操作

nginx端的配置

# cat /usr/local/nginx/conf/nginx.conf

    log_format  main  '$remote_addr - $remote_user [$time_local] "$request" '   #一份正常的
                      '$status $body_bytes_sent "$http_referer" '
                      '"$http_user_agent" "$http_x_forwarded_for"';
    log_format json '{ "@timestamp": "$time_local", '    #一份json格式的
         '"@fields": { '
         '"remote_addr": "$remote_addr", '
         '"remote_user": "$remote_user", '
         '"body_bytes_sent": "$body_bytes_sent", '
         '"request_time": "$request_time", '
         '"status": "$status", '
         '"request": "$request", '
         '"request_method": "$request_method", '
         '"http_referrer": "$http_referer", '
         '"body_bytes_sent":"$body_bytes_sent", '
         '"http_x_forwarded_for": "$http_x_forwarded_for", '
         '"http_user_agent": "$http_user_agent" } }';
    access_log  elklogs/access_json.log  json;   #日志写两份
    access_log  logs/access.log  main;

#mkdir /usr/log/nginx/elklogs -p

# /usr/local/nginx/sbin/nginx  -t   #测试没问题

nginx: the configuration file /usr/local/nginx/conf/nginx.conf syntax is ok
nginx: configuration file /usr/local/nginx/conf/nginx.conf test is successful

# tail -f /usr/local/nginx/logs/access.log   #客户端访问测试一下

192.168.14.50 - - [10/Dec/2017:20:10:26 +0800] "GET / HTTP/1.1" 200 612 "-" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/62.0.3202.75 Safari/537.36" "-"
192.168.14.50 - - [10/Dec/2017:20:10:27 +0800] "GET /favicon.ico HTTP/1.1" 404 571 "http://192.168.14.49/" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/62.0.3202.75 Safari/537.36" "-"

# tail -f /usr/local/nginx/elklogs/access_json.log 

{ "@timestamp": "10/Dec/2017:20:10:26 +0800", "@fields": { "remote_addr": "192.168.14.50", "remote_user": "-", "body_bytes_sent": "612", "request_time": "0.000", "status": "200", "request": "GET / HTTP/1.1", "request_method": "GET", "http_referrer": "-", "body_bytes_sent":"612", "http_x_forwarded_for": "-", "http_user_agent": "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/62.0.3202.75 Safari/537.36" } }
{ "@timestamp": "10/Dec/2017:20:10:27 +0800", "@fields": { "remote_addr": "192.168.14.50", "remote_user": "-", "body_bytes_sent": "571", "request_time": "0.000", "status": "404", "request": "GET /favicon.ico HTTP/1.1", "request_method": "GET", "http_referrer": "http://192.168.14.49/", "body_bytes_sent":"571", "http_x_forwarded_for": "-", "http_user_agent": "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/62.0.3202.75 Safari/537.36" } }

#当然产生的新的日志呢可以用https://www.bejson.com/  #这种网站检验一下自己那个字段有问题,因为logstash是不做转换的,所以这个日志不是json信息就不好玩了。

Filebeat端的配置

# su - elk

$ mkdir beats   #单独创建一个目录用来存放filebeat的文件

$ cat /home/elk/beats/filebeat.yaml

filebeat.prospectors:
- input_type: log
  paths:
    - /usr/local/nginx/elklogs/access_json.log
  tags: ["nginx-access","test.json.51niux.com"]

output.logstash:
  hosts: ["192.168.14.65:5066"]
  loadbalance: true
  compression_level: 6
  
name: "192.168.14.49"

$ cat /home/elk/beats/setup.py 

#! /usr/bin/python

import sys
import subprocess
import datetime

if __name__ =='__main__':
    args = sys.argv[1:]
    print 'servie start at %s' % datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    while True:
        subprocess.Popen(args).wait()
        print 'servie restart at %s' % datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')

$ cat /home/elk/beats/start.sh 

nohup ./setup.py  /home/elk/filebeat/filebeat -c /home/elk/beats/filebeat.yaml >/dev/null 2>&1 &

启动服务:

$ chmod go-w /home/elk/beats/filebeat.yaml

$ chmod +x setup.py 

$ sh /home/elk/beats/start.sh 

$ ps -ef|grep filebeat

elk      22302     1  0 20:45 pts/0    00:00:00 /usr/bin/python ./setup.py /home/elk/filebeat/filebeat -c /home/elk/beats/filebeat.yaml
elk      22303 22302  0 20:45 pts/0    00:00:00 /home/elk/filebeat/filebeat -c /home/elk/beats/filebeat.yaml
elk      22327 20930  0 20:46 pts/0    00:00:00 grep --color=auto filebeat

$ cat /home/elk/filebeat/data/registry 

[{"source":"/usr/local/nginx/elklogs/access_json.log","offset":347601,"timestamp":"2017-12-10T21:13:36.480657975+08:00","ttl":-1,"type":"log","FileStateOS":{"inode":796457,"device":64770}}]

#这个文件相当于监控了文件以及偏移量有变化就会触发事件发送

博文来自:www.51niux.com

1.2 Logstash端的操作

$ cat /home/elk/logstash/conf/logstash-nginx.conf   #input那里不用改没啥好filter的

output {
    if "nginx-access" in [tags]{
        if "test.51niux.com" in [tags]{
            elasticsearch {  
                hosts => ["192.168.14.60","192.168.14.61","192.168.14.62","192.168.14.63","192.168.14.64"]
                index => "logstash-nginx-access-test-51niux-com-%{+YYYY.MM}"
            }         
         }
         else if "test.json.51niux.com" in [tags] {   #如果tags匹配到test.json.51niux.com
            elasticsearch {  
                hosts => ["192.168.14.60","192.168.14.61","192.168.14.62","192.168.14.63","192.168.14.64"]
                index => "logstash-nginx-access-test-json-51niux-com-%{+YYYY.MM}"   #就创建这个索引
            }   
         }
 }
}

$ cat /home/elk/logstash/conf/setup.py 

#! /usr/bin/python

import sys
import subprocess
import datetime

if __name__ =='__main__':
    args = sys.argv[1:]
    print 'servie start at %s' % datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    while True:
        subprocess.Popen(args).wait()
        print 'servie restart at %s' % datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')

$ cat /home/elk/logstash/conf/logstash_start.sh

#export LS_HEAP_SIZE=8g   #旧版本的用法现在已经不需要了
cd  /home/elk/logstash/conf/
nohup ./setup.py /home/elk/logstash/bin/logstash -w 4 -b 10000 -f ./logstash-nginx.conf >/dev/null 2>&1 &

$ vim /home/elk/logstash/config/jvm.options 

-Xms8g
-Xmx8g

启动服务并验证:

$ chmod +x /home/elk/logstash/conf/setup.py 

$ chmod +x /home/elk/logstash/conf/logstash_start.sh

$ /home/elk/logstash/conf/logstash_start.sh 

$ ps -ef|grep logstash

elk       1527     1  0 21:27 pts/0    00:00:00 /usr/bin/python ./setup.py /home/elk/logstash/bin/logstash -w 4 -b 10000 -f ./logstash-nginx.conf
elk       1528  1527 89 21:27 pts/0    00:01:59 /usr/java/jdk/bin/java -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=75 -XX:+UseCMSInitiatingOccupancyOnly -XX:+DisableExplicitGC -Djava.awt.headless=true -Dfile.encoding=UTF-8 -Djruby.compile.invokedynamic=true -XX:+HeapDumpOnOutOfMemoryError -Djava.security.egd=file:/dev/urandom -Xmx8g -Xms8g -Xss2048k -Djffi.boot.library.path=/home/elk/logstash-6.0.1/vendor/jruby/lib/jni -Xbootclasspath/a:/home/elk/logstash-6.0.1/vendor/jruby/lib/jruby.jar -classpath :/usr/java/jdk/lib/*.jar:/usr/java/jdk/jre/lib/*.jar -Djruby.home=/home/elk/logstash-6.0.1/vendor/jruby -Djruby.lib=/home/elk/logstash-6.0.1/vendor/jruby/lib -Djruby.script=jruby -Djruby.shell=/bin/sh org.jruby.Main /home/elk/logstash/lib/bootstrap/environment.rb logstash/runner.rb -w 4 -b 10000 -f ./logstash-nginx.conf

#-Xmx8g -Xms8g已生效。

1.3 随便写点日志查看一下filebeat是否上传信息上来

image.png

1.4 访问下Elasticsearch节点查看一下一下新的索引是否产生了

$ curl http://192.168.14.60:9200/_cat/indices?v    #从结果看可以看到新的索引创建了

health status index                                              uuid                   pri rep docs.count docs.deleted store.size pri.store.size
green  open   .kibana                                            L3u2SfDjSQGpQFR7fOI5DA   1   1          2            0     21.5kb         10.7kb
green  open   logstash-nginx-access-test-51niux-com-2017.12      vXMhx2DERDOkP6DQAeny5g   5   1      20012            0      3.7mb          1.8mb
green  open   logstash-nginx-access-test-json-51niux-com-2017.12 YmXRlohtR1OIEtFXIPxiGQ   5   1       1000            0    492.3kb        274.5kb

$ curl http://192.168.14.60:9200/_cat/health?v   #查看下集群的健康状态

epoch      timestamp cluster              status node.total node.data shards pri relo init unassign pending_tasks max_task_wait_time active_shards_percent
1512913003 21:36:43  elasticsearch_51niux green           6         5     22  11    0    0        0             0                  -                100.0%

二、ELK以系统用户登录为例实现rsylog日志收集

#首先先展示一下客户端还是log的type形式的收集

$ cat /home/elk/beats/filebeat.yaml 

filebeat.prospectors:
- input_type: log
  paths:
    - /usr/local/nginx/elklogs/access_json.log
  tags: ["nginx-access","test.json.51niux.com"]
- input_type: log
  paths:
    - /var/log/secure
  tags: ["ssh-login","test.51niux.com"]

output.logstash:
  hosts: ["192.168.14.65:5066"]
  loadbalance: true
  compression_level: 6
  
name: "192.168.14.49"

image.png

#这种形式呢,因为/var/log/secure权限比较低我们还得授权成elk用户有权限,这个日志还是轮询滚动的。

2.1 先跟着官网走一波syslog的input插件解释

系统日志输入配置选项:

#设置              #输入类型
facility_labels    array  
host               string   
locale             string   
port               number    
proxy_protocol     boolean   
severity_labels    array    
timezone           string   
use_labels         boolean

facility_labels:

值类型是数组。默认值是:["kernel", "user-level", "mail", "system", "security/authorization", "syslogd", "line printer", "network news", "UUCP", "clock", "security/authorization", "FTP", "NTP", "log audit", "log alert", "clock", "local0", "local1", "local2", "local3", "local4", "local5", "local6", "local7"]

host:

值类型是字符串。默认值是“0.0.0.0”监听的地址

locale:

值类型是字符串。这个设置没有默认值。使用IETF-BCP47或POSIX语言标记指定用于日期分析的区域设置。 简单的例子是en,美国的BCP47或者en_US的POSIX。 如果未指定,则将使用平台默认值。大多数情况下需要设置语言环境来解析月份名称(MMM模式)和星期几名称(EEE模式)。

port:

值类型是数字。默认值是514。端口监听。请记住,小于1024(特权端口)的端口可能需要使用root权限。

proxy_protocol:

值类型是布尔值。默认值是false。代理协议支持,目前只支持v1版http://www.haproxy.org/download/1.5/doc/proxy-protocol.txt

severity_labels:

值类型是数组。默认值是["Emergency", "Alert", "Critical", "Error", "Warning", "Notice", "Informational", "Debug"]。

timezone:

值类型是字符串。这个设置没有默认值。指定要用于日期分析的时区标准ID。 有效的ID在[Joda.org可用时区页面](http://joda-time.sourceforge.net/timezones.html)上列出。 如果时区无法从数值中提取,那么这非常有用,并且不是平台默认值。 如果没有指定,将使用平台默认值。 Canonical ID是很好的,因为它会为您节省夏令时。例如,America/Los_Angeles或Europe / France是有效的ID。

use_labels:

值类型是布尔值。默认值为true。使用标签分析严重性和设施级别。

通用选项:

所有输入插件都支持以下配置选项:

#设置       #输入类型
add_field    hash  
codec        codec   
enable_metric  boolean    
id           string   
tags        array   
type        string

add_field:

值类型是hash。默认值是{}。添加一个字段到一个事件

codec:

值类型是编解码器。默认值是“plain”。用于输入数据的编解码器。 输入编解码器是在数据进入输入之前解码数据的一种便捷方法,无需在Logstash管道中使用单独的过滤器。

enable_metric:

值类型是布尔值。默认值为true。为特定的插件实例禁用或启用度量标准日志记录,我们默认记录所有的度量标准,但是您可以禁用特定插件的度量标准收集。

id:

值类型是string。这个设置没有默认值。为插件配置添加一个唯一的ID。 如果没有指定ID,Logstash将会生成一个。 强烈建议在您的配置中设置此ID。 如果您有两个或多个相同类型的插件,例如,如果您有两个系统日志输入,则此功能特别有用。 在这种情况下添加一个命名的ID将有助于在使用监视API时监视Logstash。

input {
  syslog {
    id => "my_plugin_id"
  }
}

tags:

值类型是数组。这个设置没有默认值。添加任意数量的任意标签到您的事件

type:

值类型是字符串。这个设置没有默认值。将类型字段添加到由此输入处理的所有事件。类型主要用于过滤器激活。

正则:https://github.com/kkos/oniguruma/blob/master/doc/RE

======logstash已经自带了不少的正则,如果想偷懒的话,可以在内置正则里借用下。===============

USERNAME [a-zA-Z0-9._-]+
USER %{USERNAME}
INT (?:[+-]?(?:[0-9]+))
BASE10NUM (?<![0-9.+-])(?>[+-]?(?:(?:[0-9]+(?:\.[0-9]+)?)|(?:\.[0-9]+)))
NUMBER (?:%{BASE10NUM})
BASE16NUM (?<![0-9A-Fa-f])(?:[+-]?(?:0x)?(?:[0-9A-Fa-f]+))
BASE16FLOAT \b(?<![0-9A-Fa-f.])(?:[+-]?(?:0x)?(?:(?:[0-9A-Fa-f]+(?:\.[0-9A-Fa-f]*)?)|(?:\.[0-9A-Fa-f]+)))\b

POSINT \b(?:[1-9][0-9]*)\b
NONNEGINT \b(?:[0-9]+)\b
WORD \b\w+\b
NOTSPACE \S+
SPACE \s*
DATA .*?
GREEDYDATA .*
QUOTEDSTRING (?>(?<!\\)(?>”(?>\\.|[^\\"]+)+”|”"|(?>’(?>\\.|[^\\']+)+’)|”|(?>(?>\\.|[^\]+)+)|`))
UUID [A-Fa-f0-9]{8}-(?:[A-Fa-f0-9]{4}-){3}[A-Fa-f0-9]{12}

# Networking
MAC (?:%{CISCOMAC}|%{WINDOWSMAC}|%{COMMONMAC})
CISCOMAC (?:(?:[A-Fa-f0-9]{4}\.){2}[A-Fa-f0-9]{4})
WINDOWSMAC (?:(?:[A-Fa-f0-9]{2}-){5}[A-Fa-f0-9]{2})
COMMONMAC (?:(?:[A-Fa-f0-9]{2}:){5}[A-Fa-f0-9]{2})
IPV6 ((([0-9A-Fa-f]{1,4}:){7}([0-9A-Fa-f]{1,4}|:))|(([0-9A-Fa-f]{1,4}:){6}(:[0-9A-Fa-f]{1,4}|((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){5}(((:[0-9A-Fa-f]{1,4}){1,2})|:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){4}(((:[0-9A-Fa-f]{1,4}){1,3})|((:[0-9A-Fa-f]{1,4})?:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){3}(((:[0-9A-Fa-f]{1,4}){1,4})|((:[0-9A-Fa-f]{1,4}){0,2}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){2}(((:[0-9A-Fa-f]{1,4}){1,5})|((:[0-9A-Fa-f]{1,4}){0,3}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){1}(((:[0-9A-Fa-f]{1,4}){1,6})|((:[0-9A-Fa-f]{1,4}){0,4}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(:(((:[0-9A-Fa-f]{1,4}){1,7})|((:[0-9A-Fa-f]{1,4}){0,5}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:)))(%.+)?
IPV4 (?<![0-9])(?:(?:25[0-5]|2[0-4][0-9]|[0-1]?[0-9]{1,2})[.](?:25[0-5]|2[0-4][0-9]|[0-1]?[0-9]{1,2})[.](?:25[0-5]|2[0-4][0-9]|[0-1]?[0-9]{1,2})[.](?:25[0-5]|2[0-4][0-9]|[0-1]?[0-9]{1,2}))(?![0-9])
IP (?:%{IPV6}|%{IPV4})
HOSTNAME \b(?:[0-9A-Za-z][0-9A-Za-z-]{0,62})(?:\.(?:[0-9A-Za-z][0-9A-Za-z-]{0,62}))*(\.?|\b)
HOST %{HOSTNAME}
IPORHOST (?:%{HOSTNAME}|%{IP})
HOSTPORT (?:%{IPORHOST=~/\./}:%{POSINT})

# paths
PATH (?:%{UNIXPATH}|%{WINPATH})
UNIXPATH (?>/(?>[\w_%!$@:.,-]+|\\.)*)+
TTY (?:/dev/(pts|tty([pq])?)(\w+)?/?(?:[0-9]+))
WINPATH (?>[A-Za-z]+:|\\)(?:\\[^\\?*]*)+
URIPROTO [A-Za-z]+(\+[A-Za-z+]+)?
URIHOST %{IPORHOST}(?::%{POSINT:port})?
# uripath comes loosely from RFC1738, but mostly from what Firefox
# doesn’t turn into %XX
URIPATH (?:/[A-Za-z0-9$.+!*'(){},~:;=@#%_\-]*)+
#URIPARAM \?(?:[A-Za-z0-9]+(?:=(?:[^&]*))?(?:&(?:[A-Za-z0-9]+(?:=(?:[^&]*))?)?)*)?
URIPARAM \?[A-Za-z0-9$.+!*’|(){},~@#%&/=:;_?\-\[\]]*
URIPATHPARAM %{URIPATH}(?:%{URIPARAM})?
URI %{URIPROTO}://(?:%{USER}(?::[^@]*)?@)?(?:%{URIHOST})?(?:%{URIPATHPARAM})?

# Months: January, Feb, 3, 03, 12, December
MONTH \b(?:Jan(?:uary)?|Feb(?:ruary)?|Mar(?:ch)?|Apr(?:il)?|May|Jun(?:e)?|Jul(?:y)?|Aug(?:ust)?|Sep(?:tember)?|Oct(?:ober)?|Nov(?:ember)?|Dec(?:ember)?)\b
MONTHNUM (?:0?[1-9]|1[0-2])
MONTHDAY (?:(?:0[1-9])|(?:[12][0-9])|(?:3[01])|[1-9])

# Days: Monday, Tue, Thu, etc…
DAY (?:Mon(?:day)?|Tue(?:sday)?|Wed(?:nesday)?|Thu(?:rsday)?|Fri(?:day)?|Sat(?:urday)?|Sun(?:day)?)

# Years?
YEAR (?>\d\d){1,2}
HOUR (?:2[0123]|[01]?[0-9])
MINUTE (?:[0-5][0-9])
# ’60′ is a leap second in most time standards and thus is valid.
SECOND (?:(?:[0-5][0-9]|60)(?:[:.,][0-9]+)?)
TIME (?!<[0-9])%{HOUR}:%{MINUTE}(?::%{SECOND})(?![0-9])
# datestamp is YYYY/MM/DD-HH:MM:SS.UUUU (or something like it)
DATE_US %{MONTHNUM}[/-]%{MONTHDAY}[/-]%{YEAR}
DATE_EU %{MONTHDAY}[./-]%{MONTHNUM}[./-]%{YEAR}
ISO8601_TIMEZONE (?:Z|[+-]%{HOUR}(?::?%{MINUTE}))
ISO8601_SECOND (?:%{SECOND}|60)
TIMESTAMP_ISO8601 %{YEAR}-%{MONTHNUM}-%{MONTHDAY}[T ]%{HOUR}:?%{MINUTE}(?::?%{SECOND})?%{ISO8601_TIMEZONE}?
DATE %{DATE_US}|%{DATE_EU}
DATESTAMP %{DATE}[- ]%{TIME}
TZ (?:[PMCE][SD]T|UTC)
DATESTAMP_RFC822 %{DAY} %{MONTH} %{MONTHDAY} %{YEAR} %{TIME} %{TZ}
DATESTAMP_OTHER %{DAY} %{MONTH} %{MONTHDAY} %{TIME} %{TZ} %{YEAR}

# Syslog Dates: Month Day HH:MM:SS
SYSLOGTIMESTAMP %{MONTH} +%{MONTHDAY} %{TIME}
PROG (?:[\w._/%-]+)
SYSLOGPROG %{PROG:program}(?:\[%{POSINT:pid}\])?
SYSLOGHOST %{IPORHOST}
SYSLOGFACILITY <%{NONNEGINT:facility}.%{NONNEGINT:priority}>
HTTPDATE %{MONTHDAY}/%{MONTH}/%{YEAR}:%{TIME} %{INT}

# Shortcuts
QS %{QUOTEDSTRING}

# Log formats
SYSLOGBASE %{SYSLOGTIMESTAMP:timestamp} (?:%{SYSLOGFACILITY} )?%{SYSLOGHOST:logsource} %{SYSLOGPROG}:
COMMONAPACHELOG %{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] “(?:%{WORD:verb} %{NOTSPACE:request}(?: HTTP/%{NUMBER:httpversion})?|%{DATA:rawrequest})” %{NUMBER:response} (?:%{NUMBER:bytes}|-)
COMBINEDAPACHELOG %{COMMONAPACHELOG} %{QS:referrer} %{QS:agent}

# Log Levels
LOGLEVEL ([A-a]lert|ALERT|[T|t]race|TRACE|[D|d]ebug|DEBUG|[N|n]otice|NOTICE|[I|i]nfo|INFO|[W|w]arn?(?:ing)?|WARN?(?:ING)?|[E|e]rr?(?:or)?|ERR?(?:OR)?|[C|c]rit?(?:ical)?|CRIT?(?:ICAL)?|[F|f]atal|FATAL|[S|s]evere|SEVERE|EMERG(?:ENCY)?|[Ee]merg(?:ency)?)

===========================================================

博文来自:www.51niux.com

2.2 rsyslog客户端的操作

#syslog发送日志可以查看:http://blog.51niux.com/?id=108

# vim /etc/rsyslog.conf   #除了本地记录一份以外,还给远端的logstash服务器发送一份登录信息

authpriv.*                                              /var/log/secure
authpriv.*                                              @192.168.14.65:1514   #因为1024端口以内的root用户权限,Logstash服务端是普通用户

# service rsyslog restart

2.3 Logstash服务端的配置

$ cat /home/elk/logstash/patterns/ssh 

input {
    syslog{
    type => "system-syslog"
    port => 1514
    }
}
output {
    stdout { codec => rubydebug }
}

#可以先测试一下能够收到信息。

$ /home/elk/logstash/bin/logstash -f /home/elk/logstash/conf/ssh.conf   #找台机器登录下syslog客户端。

[2017-12-11T00:20:37,452][INFO ][logstash.pipeline        ] Pipeline started {"pipeline.id"=>"main"}
[2017-12-11T00:20:37,460][INFO ][logstash.inputs.syslog   ] Starting syslog tcp listener {:address=>"0.0.0.0:1514"}
[2017-12-11T00:20:37,460][INFO ][logstash.inputs.syslog   ] Starting syslog udp listener {:address=>"0.0.0.0:1514"}
[2017-12-11T00:20:37,468][INFO ][logstash.agent           ] Pipelines running {:count=>1, :pipelines=>["main"]}
{
          "severity" => 6,
               "pid" => "27976",
           "program" => "sshd",
           "message" => "Accepted password for root from 192.168.14.51 port 52470 ssh2",
              "type" => "system-syslog",
          "priority" => 86,
         "logsource" => "master",
        "@timestamp" => 2017-12-10T16:20:57.000Z,
          "@version" => "1",
              "host" => "192.168.14.49",
          "facility" => 10,
    "severity_label" => "Informational",
         "timestamp" => "Dec 11 00:20:57",
    "facility_label" => "security/authorization"
}
{
          "severity" => 6,
               "pid" => "27976",
           "program" => "sshd",
           "message" => "pam_unix(sshd:session): session opened for user root by (uid=0)",
              "type" => "system-syslog",
          "priority" => 86,
         "logsource" => "master",
        "@timestamp" => 2017-12-10T16:20:57.000Z,
          "@version" => "1",
              "host" => "192.168.14.49",
          "facility" => 10,
    "severity_label" => "Informational",
         "timestamp" => "Dec 11 00:20:57",
    "facility_label" => "security/authorization"
}

#这个已经可以插了,如下图,偷偷的插了一下:

image.png

#好的忘记上面那张图,现在我们已经测试可以收到客户端通过1514端口发过来的rsyslog的ssh登录日志信息了。正规的走一下。

$ cat /home/elk/logstash/patterns/ssh   #下面是啥,通俗来说下面就是一个变量值跟着一串正则表达式跟来源的信息能匹配上

SECURELOG %{WORD:status} password for ?(invalid user)? %{WORD:FromUSER} from %{DATA:FromIP} port

SYSLOGPAMSESSION %{SYSLOGBASE} (?=%{GREEDYDATA:message})%{WORD:pam_module}\(%{DATA:pam_caller}\): session %{WORD:pam_session_state} for user %{USERNAME:username}(?: by %{GREEDYDATA:pam_by})?

SYSLOGBASE2  (?:%{SYSLOGFACILITY} )?%{SYSLOGHOST:logsource} %{SYSLOGPROG}:

$ vim /home/elk/logstash/conf/ssh.conf

input {
    syslog{
    type => "system-syslog"
    port => 1514
    }
}
filter {
 if [type] == "system-syslog" {
   grok {
        match => { "message" => "%{SYSLOGPAMSESSION}" }
        match => { "message" => "%{SECURELOG}" }
        match => { "message" => "%{SYSLOGBASE2}" }
   }
   if ([status] == "Accepted") {
        mutate {
        add_tag => ["Success"]
        }
    }
   else if ([status] == "Failed") {
        mutate {
        add_tag => ["Failed"]
        }
    } 
 }
}
output {
    #stdout { codec => rubydebug }  #可以有多个output的,如果这里取消注释也会打印到屏幕一份信息
    elasticsearch {
                hosts => ["192.168.14.60","192.168.14.61","192.168.14.62","192.168.14.63","192.168.14.64"]
                index => "ssh-%{+YYYY.MM}"
            }
}

$ /home/elk/logstash/bin/logstash -f /home/elk/logstash/conf/ssh.conf   #我这里就前台启动了,为了查看有什么问题。

[2017-12-11T01:17:18,362][INFO ][logstash.pipeline        ] Pipeline started {"pipeline.id"=>"main"}
[2017-12-11T01:17:18,379][INFO ][logstash.inputs.syslog   ] Starting syslog udp listener {:address=>"0.0.0.0:1514"}
[2017-12-11T01:17:18,379][INFO ][logstash.inputs.syslog   ] Starting syslog tcp listener {:address=>"0.0.0.0:1514"}
[2017-12-11T01:17:18,379][INFO ][logstash.agent           ] Pipelines running {:count=>1, :pipelines=>["main"]}
{
          "severity" => 0,
           "message" => "<86>Dec 11 01:17:19 master sshd[29602]: Accepted password for root from 192.168.14.54 port 37966 ssh2",
              "type" => "system-syslog",
          "priority" => 0,
              "tags" => [
        [0] "_grokparsefailure_sysloginput",
        [1] "Success"    #测试了一个登陆,因为匹配上了Acceptd,所以状态是Success。
    ],
        "@timestamp" => 2017-12-10T17:17:18.740Z,
          "@version" => "1",
              "host" => "192.168.14.49",
            "FromIP" => "192.168.14.54",   #来源IP也抓出来了
          "facility" => 0,
    "severity_label" => "Emergency",
    "facility_label" => "kernel",
            "status" => "Accepted",
          "FromUSER" => "root"     #来源用户也抓出来了。
}

2.4 kibana的操作

image.png

#在kibana上面加上这个索引,kinaba不会自动加载索引的,需要我们手工添加。当然如果你的日志信息比较单一比如都是web信息,可以直接添加一个*的索引。

image.png

#kibana上面效果也出现了。

image.png

#点击左边的选中一个字段可以看到字段里面内容的比例。

三、ELK收集TCP日志

#是否可以通过tcp端口将日志传输过来呢,是可以的。

3.1 先把Tcp input plugin翻译一波

#官网链接:https://www.elastic.co/guide/en/logstash/current/plugins-inputs-tcp.html

描述:

     通过TCP套接字读取事件。像标准输入和文件输入一样,每个事件都被假定为一行文本。可以接受来自客户端的连接或连接到服务器,具体取决于模式。

##接受log4j2日志

      Log4j2可以通过套接字发送JSON,我们可以使用它与我们的tcp输入结合来接受日志。首先,我们需要配置你的应用程序来通过套接字发送JSON日志。 以下的log4j2.xml完成这个任务。请注意,您需要更改此配置中的主机和端口设置以符合您的需求。

<Configuration>
  <Appenders>
     <Socket name="Socket" host="localhost" port="12345">
       <JsonLayout compact="true" eventEol="true" />
    </Socket>
  </Appenders>
  <Loggers>
    <Root level="info">
      <AppenderRef ref="Socket"/>
    </Root>
  </Loggers>
</Configuration>

     要在Logstash中接受这个,你需要tcp输入和日期过滤器:

input {
  tcp {
    port => 12345
    codec => json
  }
}

     并添加一个日期过滤器来获取log4j2的timeMillis字段,并将其用作事件时间戳

filter {
  date {
    match => [ "timeMillis", "UNIX_MS" ]
  }
}

Tcp输入配置选项:

该插件支持以下配置选项和稍后介绍的通用选项。

image.png

host  #值类型是字符串,默认值是“0.0.0.0”.当模式是服务器时,要监听的地址。 当模式是客户端时,连接的地址。
mode  #值可以是任何:server, client,默认值是“server”。服务器侦听客户端连接,客户端连接服务器。
port  #这是一个必需的设置。值类型是数字。这个设置没有默认值。当模式是服务器,端口监听。 当模式是客户端时,连接到的端口。
proxy protocol  #值类型是布尔值,默认值是false,代理协议支持,目前仅支持v1。
ssl_cert  #值类型是路径,这个设置没有默认值。SSL证书路径
ssl_enable  #值类型是布尔值,默认值是false。启用SSL(必须设置为其他ssl_选项才能生效)。
ssl_extra_chain_certs #值类型是数组,默认值是[]。要添加到证书链的额外X509证书的数组。 在系统商店中不需要CA链时很有用。
ssl_key  #值类型是路径,这个设置没有默认值。SSL关键路径
ssl_key_passphrase #值类型是密码,默认值是零。SSL密钥密码
ssl_verify  #值类型是布尔值,默认值为true。验证SSL连接的另一端对CA的身份。 对于输入,将字段sslsubject设置为客户端证书的字段。

通用选项:

所有输入插件都支持以下配置选项:

image.png

细节:

add_field  #值类型是hash.默认值是{}.添加一个字段到一个事件
codec  #值类型是codec,默认值是“plain”。用于输入数据的编解码器。 输入编解码器是在数据进入输入之前解码数据的一种便捷方法,无需在Logstash管道中使用单独的筛选器。
enable_metric #值类型是布尔值。默认值为true。为特定的插件实例禁用或启用度量标准日志记录,我们默认记录所有的度量标准,但是您可以禁用特定插件的度量标准收集。
id  #值类型是字符串,这个设置没有默认值。为插件配置添加一个唯一的ID。 如果没有指定ID,Logstash将会生成一个。 强烈建议在您的配置中设置此ID。 当你有两个或多个相同类型的插件时,这是特别有用的,例如,如果你有2个tcp输入。 在这种情况下添加一个命名的ID将有助于在使用监视API时监视Logstash。
tags  #值类型是数组,这个设置没有默认值。添加任意数量的任意标签到您的事件。
type  #值类型是字符串,这个设置没有默认值。将类型字段添加到由此输入处理的所有事件。

3.2 Logstash端的配置

$ cat /home/elk/logstash/conf/tcp.conf    

input {
    tcp {
        type => "tcp"
        port => "6666"
        mode => "server"
    }
}
filter {
}
output {
    stdout {
        codec => rubydebug
    }
}

$ /home/elk/logstash/bin/logstash -f /home/elk/logstash/conf/tcp.conf    #启动一下。

客户端写入测试:

现在客户端就不需要安装filebeat或者logstash了,直接往IP:端口发送日志事件就行了。

#yum install nc -y

# tail -f /var/log/messages |nc 192.168.14.13 6666   #这种就相当于实时的监控一个日志文件,当然也可以变成cat文件

{
    "@timestamp" => 2017-12-16T07:52:56.353Z,
          "port" => 39150,
      "@version" => "1",
          "host" => "192.168.14.13",
      "@metdata" => {
        "ip_address" => "192.168.14.13"
    },
       "message" => "Dec 16 15:52:56 controller systemd: Stopping OpenSSH server daemon...",
          "type" => "tcp"
}
{
    "@timestamp" => 2017-12-16T07:52:56.355Z,
          "port" => 39150,
      "@version" => "1",
          "host" => "192.168.14.13",
      "@metdata" => {
        "ip_address" => "192.168.14.13"
    },
       "message" => "Dec 16 15:52:56 controller systemd: Starting OpenSSH server daemon...",
          "type" => "tcp"
}
{
    "@timestamp" => 2017-12-16T07:52:56.363Z,
          "port" => 39150,
      "@version" => "1",
          "host" => "192.168.14.13",
      "@metdata" => {
        "ip_address" => "192.168.14.13"
    },
       "message" => "Dec 16 15:52:56 controller systemd: Started OpenSSH server daemon.",
          "type" => "tcp"
}

#手工重启下sshd服务,上面是logstash的前台输出。

# nc 192.168.14.13 6666 </var/log/secure   #这是第二种方式,将一个文件的内容通过192.168.14.13:6666把日志传输。

#  echo "Hello,wo am is 51niux.com" > /dev/tcp/192.168.14.13/6666  #这是伪终端的方式。

{
    "@timestamp" => 2017-12-16T07:58:08.813Z,
          "port" => 40730,
      "@version" => "1",
          "host" => "network02",
      "@metdata" => {
        "ip_address" => "192.168.14.9"
    },
       "message" => "Hello,wo am is 51niux.com",
          "type" => "tcp"
}

#这是Logstash的输出,可以看到来源IP是非本机,然后message消息正好是我们echo的那句话。

3.3 Redis output plugin(先跟着官网捋一下)

#该输出将使用RPUSH将事件发送到Redis队列。建议使用2.6.0+版本。更多的信息了解redis官网:http://redis.io/

Redis输出配置选项:

该插件支持以下配置选项和稍后介绍的通用选项。

#Setting    #Input type    #Required
batch        boolean        No    
batch_events    number      No    
batch_timeout   number      No    
congestion_interval   number  No    
congestion_threshold  number  No    
data_type      string, one of ["list", "channel"]    No    
db             number        No    
host          array        No    
key           string    No    
password      password    No    
port          number    No    
reconnect_interval  number    No    
shuffle_hosts  boolean    No    
timeout       number    No

选项解释:

batch  #值类型是布尔值,默认值是false。如果希望Redis对值进行批处理并发送1个RPUSH命令(而不是每个值的一个命令),则设置为true。 请注意,这只适用于data_type =“list”模式。
#如果为true,则每发送一次“batch_events”事件或“batch_timeout”秒(以先到者为准),发送RPUSH。 只有data_type支持“list”。
batch_events  #值类型是数字,默认值是50。如果batch设置为true,则我们排队等待RPUSH的事件数量。
batch_timeout  #值类型是数字,默认值是5.如果batch设置为true,则在有未决事件刷新时RPUSH命令之间的最大时间量。
congestion_interval  #值类型是数字,默认值是1。多久检查一次拥堵。 默认是一秒钟。 零意味着检查每一个事件。
congestion_threshold  #值类型是数字,默认值是0。如果Redis的data_type是列表并且拥有多个@congestion_threshold项,则阻塞直到有人使用它们并减少拥塞,
#否则,如果没有使用者,则除非使用OOM保护配置,否则Redis将耗尽内存。 但即使使用OOM保护,单个Redis列表也可以阻止Redis的所有其他用户,直到Redis CPU消耗达到允许的最大RAM大小。 默认值为0意味着此限制被禁用。 仅支持列表Redis data_type。
data_type  #值可以是以下任何一种:list, channel,这个设置没有默认值。无论是列表还是频道。 如果data_type是list,那么我们将RPUSH设置为key。 如果data_type是频道,那么我们将PUBLISH键。
db   #值类型是数字,默认值是0.Redis数据库编号。
host  #值类型是数组,默认值是[“127.0.0.1”].您的Redis服务器的主机名。 可以在任何主机名上指定端口,这将覆盖全局端口配置。 如果主机列表是一个数组,Logstash将选择一个随机主机来连接,如果主机断开连接,它将选择另一个主机。
key  #值类型是字符串,这个设置没有默认值。Redis列表或频道的名称。 动态名称在这里有效,例如logstash-%{type}。
password #值类型是密码,这个设置没有默认值。密码进行身份验证。 没有默认的身份验证。
port  #值类型是数字,默认值是6379。连接的默认端口。 可以在任何主机名上覆盖。
reconnect_interval  #值类型是数字,默认值是1。重新连接到失败的Redis连接的时间间隔
shuffle_hosts  #值类型是布尔值,默认值为true。在Logstash启动过程中随机清除主机列表。
timeout   #值类型是数字,默认值是5。Redis的初始连接超时秒数。

通用选项

所有输出插件都支持以下配置选项:

codec  #默认是非必需的。值类型是codec。默认值是“plain”。用于输出数据的编解码器。 输出编解码器是在离开输出之前对数据进行编码的一种便捷方法,无需在Logstash管道中使用单独的筛选器。
enable_metric  #值类型是布尔值,默认值为true。为特定的插件实例禁用或启用度量标准日志记录,我们默认记录所有的度量标准,但是您可以禁用特定插件的度量标准收集。
id  #值类型是字符串,这个设置没有默认值。为插件配置添加一个唯一的ID。如果没有指定ID,Logstash将会生成一个。 强烈建议在配置中设置此ID。 当你有两个或多个相同类型的插件时,这是特别有用的,
#例如,如果你有两个redis输出。 在这种情况下添加一个命名的ID将有助于在使用监视API时监视Logstash。如:output {  redis {    id => "my_plugin_id"  }}

3.4 将tcp收集到的信息以频道的形式写入redis

Logstash配置:

$ cat /home/elk/logstash/conf/tcp_redis.conf

input {
    tcp {
        type => "tcp"
        port => "6666"
        mode => "server"
    }
}
filter {
}
output {
  #stdout {
  #      codec => rubydebug
  #}
  redis {
        host => "192.168.14.13"
        port => "6379"
        db => "1"   #这里是redis数据的编号,redis一般是0-15的数据库编号也就是16个数据库
        key => "logstash_list_0"   #这里就是发布通道的名称
        data_type => channel   #主要是这里啊,两种形式,这里设置的是频道模式,那么redis就相当于一个消息队列服务了,就是发布/订阅
        #password => "51niux.com"  #如果redis设置了密码这里还要输入密码
        }
}

$ /home/elk/logstash/bin/logstash -f /home/elk/logstash/conf/tcp_redis.conf    #启动服务

客户端测试:

#  echo "one.Hello,wo am is 51niux.com" |nc 192.168.14.13 6666

#  echo "two.OK I know you are 51niux.com" |nc 192.168.14.13 6666  

redis服务器上面查看:

# /usr/local/redis/src/redis-cli -p 6379 -h 192.168.14.13

192.168.14.13:6379> SELECT 1   #因为我们是选的编号是1的数据库所以要使用编号是1的数据库
OK

192.168.14.13:6379[1]> SUBSCRIBE logstash_list_0   #实时的读取日志信息

Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "logstash_list_0"
3) (integer) 1
1) "message"
2) "logstash_list_0"
3) "{\"@timestamp\":\"2017-12-16T09:33:10.007Z\",\"port\":40766,\"@version\":\"1\",\"host\":\"network02\",\"@metdata\":{\"ip_address\":\"192.168.14.9\"},\"message\":\"one.Hello,wo am is 51niux.com\",\"type\":\"tcp\"}"
1) "message"
2) "logstash_list_0"
3) "{\"@timestamp\":\"2017-12-16T09:33:29.027Z\",\"port\":40768,\"@version\":\"1\",\"host\":\"network02\",\"@metdata\":{\"ip_address\":\"192.168.14.9\"},\"message\":\"two.OK I know you are 51niux.com\",\"type\":\"tcp\"}"

#从上如可以看到消息发送到redis是OK的。

#当然上面模式就是一个生产消费模式,redis本身并不会把这些消息存储下来也就是说消费了这些消息,消息也就没有了。

3.5 将TCP收到的信息以列表的形式写入到Redis中

#如果要将消息保存下来呢,就要使用data_type => list这种模式,下面一个简单的小例子,就写output配置了:

output {
  stdout {
        codec => rubydebug
  }
  redis {
        host => "192.168.14.13"
        port => "6379"
        db => "1"
        key => "logstash_%{+YYYY.MM.dd}"   #类似于这里key这里可以搞一个变量的形式,比如按天
        data_type => list   #这里改为list,列表类型
        #password => "51niux.com"
        }
}

#  echo "two.OK I know you are 51niux.com" |nc 192.168.14.13 6666    #客户端再写两条测试语句

#  echo "lai lai lai  hai  yi bo" |nc 192.168.14.13 6666   

192.168.14.13:6379[1]> KEYS *   #可以看到过了一个后缀是时间格式的键,下面是输出
1) "logstash_2017.12.16"
192.168.14.13:6379[1]> get logstash_2017.12.16   #不能这样搞哦,因为这不是一个简单的键值对的形式,下面是摆错输出
(error) WRONGTYPE Operation against a key holding the wrong kind of value
192.168.14.13:6379[1]> type logstash_2017.12.16   #查看一下键的类型可以看到它是一个列表的形式,下面是输出
list
192.168.14.13:6379[1]> LRANGE logstash_2017.12.16 0 -1   #我们就来获取下这个列表的所有信息吧,下面是输出结果
1) "{\"@timestamp\":\"2017-12-16T09:43:06.817Z\",\"port\":40770,\"@version\":\"1\",\"host\":\"network02\",\"@metdata\":{\"ip_address\":\"192.168.14.9\"},\"message\":\"two.OK I know you are 51niux.com\",\"type\":\"tcp\"}"
2) "{\"@timestamp\":\"2017-12-16T09:43:34.499Z\",\"port\":40772,\"@version\":\"1\",\"host\":\"network02\",\"@metdata\":{\"ip_address\":\"192.168.14.9\"},\"message\":\"lai lai lai  hai  yi bo\",\"type\":\"tcp\"}"


作者:忙碌的柴少 分类:ELK 浏览:3153 评论:13
留言列表
你好
你好 博主厉害啊 ,看了你的文章很喜欢。都是精华,能否加好友,方便交流。博主你QQ多少啊  回复
忙碌的柴少
忙碌的柴少 可以啊,你邮箱多少?  回复
访客
访客 你有群吗,我可以加你的群多交流  回复
忙碌的柴少
忙碌的柴少 我没群,群里一般都是扯淡用的,我都是各个群跑骚,我给你发邮件我们可以QQ加好友方便交流  回复
大闲君
大闲君 邮箱balbeleet2012@163.com
顺便加我个呗  回复
忙碌的柴少
忙碌的柴少 好的没问题给你发邮箱  回复
访客
访客 厉害柴少,给发下邮件,加好友  回复
访客
访客 wdc1006@126.com,我的邮箱  回复
Specter
Specter 看完这个系列非常感动和感谢!同求交流  回复
忙碌的柴少
忙碌的柴少 好的也没啥前两天看了看ELK权威文档发现才写了几十页的内容,还是得多看书嘿嘿。  回复
海口老男人
海口老男人 有个问题想问下,比如我有两台tomcat服务器跑的同样的web应用,tomcat1、tomcat2的日志在logstash怎么区分呢?  回复
访客
访客 通过name, beat.name这个标签  回复
lijun
lijun 博主厉害啊, 能加下你QQ吗,方便交流 我的邮箱是bokan@sohu.com  回复
发表评论
来宾的头像