柴少鹏的官方网站 技术在分享中进步,水平在学习中升华

Logstash日志收集(三)

还是得先顺着官网了解一波:https://www.elastic.co/products/logstash

一、跟着官网学习下Logstash的基本概念

 集中,转换和隐藏。您的数据Logstash是一个开源的服务器端数据处理管道,可以同时从多个源中获取数据,并将其转换为您喜欢的“存储”(自然是Elasticsearch)。)

1.1 Logstash 6.0.0新增功能

用多条管道简化处理:

 Logstash 6.0引入了针对不同用例同时运行多个管道的能力。 管道在同一个实例中一起运行,但具有独立的输入,过滤器和输出,使用户能够隔离每个数据源的处理逻辑。 这使您的管道逻辑集中和简洁。 
 从历史上看,许多Logstash用户将多个用例组合成单个管道,这需要添加复杂的条件逻辑。 有了多个不再需要的管道, 现在,您可以更干净地组织您的配置,并通过使用每个用例的专用管道来更有效地执行您的管道。
 为了增加趣味性,每个管道还有自己的独立设置和生命周期,您可以调整它们以匹配该数据源所需的相应工作负载配置文件。 例如,您可能希望为高容量日志记录管道分配更多的管道工作线程,并为较低强度的本地度量标准管道节流资源。

多管道的官方文档:https://www.elastic.co/guide/en/logstash/6.0/multiple-pipelines.html

多管道的博客地址:https://www.elastic.co/blog/logstash-multiple-pipelines

用Elastic Stack集中管理管道:

在过去,管理pipeline配置或者是手动任务,或者使用像Puppet或Chef这样的配置管理工具来协助操作自动化。 在Logstash 6.0,集中管道管理功能现在使您能够通过Kibana单一窗格直接使用Elastic Stack管理和自动编排Logstash部署。
此功能为Kibana带来了管道管理UI,您可以使用它来创建,编辑和删除pipeline。 在这个UI下面,我们使用Elasticsearch来存储你的pipelines配置。 通过几个简单的设置,您的Logstash节点可以配置为监视这些管道上的变化,让您可以无缝地推出管道更改,而无需额外的运营基础架构。

#集中管道管理可作为X-Pack功能使用。

#文档连接:https://www.elastic.co/guide/en/logstash/6.0/logstash-centralized-pipeline-management.html

可视化管道逻辑和性能:

Pipeline Viewer,这是对Logstash Monitoring UI的一个补充。 有了这个新工具,您可以看到您的管道配置,并排除插件级别的性能瓶颈。 管道查看器将Logstash管道显示为DAG(定向非循环图)。 在这个DAG上重叠是个别输入,过滤器和输出的相关性能指标。 突出显示潜在的性能问题,以便快速确定管道的哪些部分可能是瓶颈。
用户应该注意,这是一个测试版功能,可能会有所变化。 一个已知的问题是缺乏清晰渲染非常大的管道的能力。 这是一个积极的工作领域,所以当我们进一步改进并将其转化为全面可用性时,期待对此功能进行重大改进。

从导入节点到Logstash的Smooth path:

一些用户在开始使用Elastic Stack时享受Elasticsearch摄取节点的便利。 但是,在一定程度的复杂性中,Ingest节点可能不具备解决您的问题所需的所有功能,并且可能需要迁移到Logstash。 为了简化这些转换,我们创建了一个随Logstash 6.0一起提供的Ingest管道转换工具。 转换器将一个接收节点管道作为输入,并将相应的Logstash管道 spits out。

使用新的JRuby 9k:

我们花了很多时间把Logstash项目转移到JRuby的最新版本:JRuby 9000,它支持现代的Ruby语法和增强的内部结构,更适合优化。 对于插件开发者来说,这意味着你的老插件将继续在除了最罕见的情况之外的所有工作。 此升级还意味着您可以继续使用新的Ruby功能,以及使用仅与JRuby 9k兼容的Ruby库。

1.2 跟着官网了解下logstash的基本流程

INPUTS

提取所有形式、大小、来源的数据:

       数据通常以多种格式散布或分布在许多系统中。 Logstash支持各种输入,可以同时从多个常见的源中获取事件。 轻松从日志,指标,Web应用程序,数据存储和各种AWS服务中进行采集,所有这些都以连续,流式的方式进行。

图片.png

FILTERS

即时分析和转换您的数据:

     当数据从源传输到存储时,Logstash过滤器解析每一个事件,识别命名的字段来构建结构,并将它们转换为一种通用格式,以方便、加速的分析和业务价值。

     Logstash动态转换和准备您的数据,无论其格式或复杂性如何:

通过grok从非结构化数据派生结构
从IP地址解读地理坐标
Anonymize PII数据,完全排除敏感字段
简化独立于数据源,格式或模式的整体处理

过滤器库:https://www.elastic.co/guide/en/logstash/current/filter-plugins.html

图片.png

OUTPUTS

选择你的Stash,传输你的数据:

      虽然Elasticsearch是我们的首选输出,开创了搜索和分析可能性的世界,但并不是唯一可用的。Logstash有多种输出,可以让您在需要的地方路由数据,从而可以灵活地释放大量的下游用例。

输出插件:https://www.elastic.co/guide/en/logstash/current/output-plugins.html

#大概流程就是流入==》过滤==》流出,下面还有点其他介绍。

可扩展性:

创建和配置自己的Pipeline:

         Logstash有一个可插拔的框架,拥有超过200个插件。 混合,匹配和编排不同的输入,滤波器和输出以实现在管道中和谐工作。

         Logstash plugins创建很容器构建,为插件开发和插件生成器通过了API,可以从自定义应用程序中获取数据(https://www.elastic.co/guide/en/logstash/current/contributing-to-logstash.html)。

耐用性和安全性:

内置数据传输的信任管道:

        如果Logstash节点发生故障,Logstash会保证至少一次为您的进行中的事件传递其持续队列。未成功处理的事件可以被分流到一个死信队列中进行自省和回放。通过吸收吞吐量的能力,Logstash通过吞吐峰值进行扩展,而无需使用外部排队层。

        无论您是运行10或1000个Logstash实例,我们都可以让您充分保护您的采集管道。 来自Beats(https://www.elastic.co/products/beats)的输入数据以及其他输入可以通过网络加密,并且与安全的Elasticsearch集群(https://www.elastic.co/products/x-pack/security)完全集成。

监测:

对部署具有完全可见性:

        Logstash管道经常是多功能的,并且可以变得复杂,使得对管道性能,可用性和瓶颈的深入理解是非常宝贵的。 借助X-Pack中的监视和管道查看器功能(https://www.elastic.co/products/x-pack/monitoring),您可以轻松观察和研究活动Logstash节点或完整部署。

图片.png

管理和编排:

使用单个UI集中管理部署:

      使用Pipeline Management UI来执行Logstash部署,这使得编排和管理管道变得轻而易举。 管理控制还可以与内置的X-Pack安全功能无缝集成,以防止任何意外重新连接。

图片.png

博文来自:www.51niux.com

二、Logstash的安装部署

#这里先用一台做下日志的收集工作。我们就选择192.168.14.65这台机器作为Logstash的机器。

#还是先要贴一下官网部署以及使用文档:https://www.elastic.co/guide/en/logstash/current/index.html

#Logstash和filebeat的几个示例:https://www.elastic.co/guide/en/logstash/current/logstash-config-for-filebeat-modules.html

#filebeat的正则:https://www.elastic.co/guide/en/beats/filebeat/6.0/regexp-support.html

2.1 安装Logstash

安装java

#Logstash需要Java 8。不支持Java 9。

# java -version

java version "1.8.0_74"
Java(TM) SE Runtime Environment (build 1.8.0_74-b02)
Java HotSpot(TM) 64-Bit Server VM (build 25.74-b02, mixed mode)

#在某些Linux系统上,在尝试安装之前,可能还需要导出JAVA_HOME环境,特别是如果您从tarball安装了Java。 这是因为Logstash在安装过程中使用Java来自动检测您的环境并安装正确的启动方法(SysV init脚本,Upstart或systemd)。 如果Logstash在软件包安装期间无法找到JAVA_HOME环境变量,则可能会收到错误消息,并且Logstash将无法正常启动。

下载安装Logstash

# wget https://artifacts.elastic.co/downloads/logstash/logstash-6.0.1.tar.gz     #线上用的是2.3.4的版本,这里用最新版......

# tar zxf logstash-6.0.1.tar.gz

# useradd elk

# cp -r logstash-6.0.1 /home/elk/
# ln -s /home/elk/logstash-6.0.1 /home/elk/logstash
# chown -R elk:elk /home/elk/

测试Logstash

#首先,我们通过运行最基本的Logstash管道来测试Logstash安装。Logstash管道有两个必需的元素,即输入和输出,以及一个可选元素filter。 输入插件使用来自数据源的数据,过滤器插件根据您的指定修改数据,输出插件将数据写入目标。

图片.png

#要测试Logstash安装,请运行最基本的Logstash管道。 例如:

# su - elk

$ ./logstash -e 'input { stdin { } } output { stdout {} }'   #定义了一个叫“stdin”的input还有一个“stdout”的output,无论输入什么字符,Logstash都会按照结构化格式将输入输出。

/home/elk/logstash/vendor/bundle/jruby/2.3.0/gems/manticore-0.6.1-java/lib/manticore/client.rb:108: warning: already initialized constant DEFAULT_MAX_POOL_SIZE
/home/elk/logstash/vendor/bundle/jruby/2.3.0/gems/manticore-0.6.1-java/lib/manticore/client.rb:110: warning: already initialized constant DEFAULT_REQUEST_TIMEOUT
/home/elk/logstash/vendor/bundle/jruby/2.3.0/gems/manticore-0.6.1-java/lib/manticore/client.rb:111: warning: already initialized constant DEFAULT_SOCKET_TIMEOUT
/home/elk/logstash/vendor/bundle/jruby/2.3.0/gems/manticore-0.6.1-java/lib/manticore/client.rb:112: warning: already initialized constant DEFAULT_CONNECT_TIMEOUT
/home/elk/logstash/vendor/bundle/jruby/2.3.0/gems/manticore-0.6.1-java/lib/manticore/client.rb:113: warning: already initialized constant DEFAULT_MAX_REDIRECTS
/home/elk/logstash/vendor/bundle/jruby/2.3.0/gems/manticore-0.6.1-java/lib/manticore/client.rb:114: warning: already initialized constant DEFAULT_EXPECT_CONTINUE
/home/elk/logstash/vendor/bundle/jruby/2.3.0/gems/manticore-0.6.1-java/lib/manticore/client.rb:115: warning: already initialized constant DEFAULT_STALE_CHECK
/home/elk/logstash/vendor/bundle/jruby/2.3.0/gems/manticore-0.6.1-java/lib/manticore/client.rb:590: warning: already initialized constant ISO_8859_1
/home/elk/logstash/vendor/bundle/jruby/2.3.0/gems/manticore-0.6.1-java/lib/manticore/client.rb:641: warning: already initialized constant KEY_EXTRACTION_REGEXP
Sending Logstash's logs to /home/elk/logstash/logs which is now configured via log4j2.properties
[2017-12-07T17:00:51,114][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"fb_apache", :directory=>"/home/elk/logstash/modules/fb_apache/configuration"}
[2017-12-07T17:00:51,118][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"netflow", :directory=>"/home/elk/logstash/modules/netflow/configuration"}
[2017-12-07T17:00:51,346][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2017-12-07T17:00:51,561][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}
[2017-12-07T17:00:51,924][INFO ][logstash.pipeline        ] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>8, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>1000, :thread=>"#<Thread:0x7c079096@/home/elk/logstash/logstash-core/lib/logstash/pipeline.rb:290 run>"}
[2017-12-07T17:00:51,957][INFO ][logstash.pipeline        ] Pipeline started {"pipeline.id"=>"main"}
The stdin plugin is now waiting for input:
[2017-12-07T17:00:51,971][INFO ][logstash.agent           ] Pipelines running {:count=>1, :pipelines=>["main"]}
hello world    #手工输入一个hello world
2017-12-07T09:03:32.065Z localhost.localdomain hello world   #可以返回一个hello world,测试成功了。
[2017-12-07T17:07:14,815][INFO ][logstash.pipeline        ] Pipeline terminated {"pipeline.id"=>"main"}   #CTRL+D退出

#Logstash将时间戳和IP地址信息添加到消息中。 通过在运行Logstash的shell中发出CTRL-D命令来退出Logstash。

博文来自:www.51niux.com

三、解析日志与Logstash

3.1 配置Filebeat发送日志到Logstash

 先跟着官网介绍一下:      

        在创建Logstash管道之前,将配置Filebeat以将日志行发送到Logstash。Filebeat客户端是一个轻量级的,资源友好的工具,它从服务器上的文件中收集日志,并将这些日志转发给Logstash实例进行处理。 Filebeat专为可靠性和低延迟而设计。 Filebeat在主机上占用的资源较少,Beats输入插件最大限度地减少了Logstash实例的资源需求。       

       默认的Logstash安装包括Beats输入插件。 Beats输入插件允许Logstash从Elastic Beats框架接收事件,这意味着任何Beat框架编写的Beat框架(如Packetbeat和Metricbeat)都可以将事件数据发送到Logstash。

客户端安装Filebeat:

       #正常生产场景下,哪台机器需要收集日志如web日志等会在其机器上安装Filebeat客户端。这里搞一个192.168.14.67来做此次web日志测试的客户机。

# wget https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-6.0.1-linux-x86_64.tar.gz   #还是下载最新版,线上用的是5.1.1版的

# useradd elk
# tar zxf filebeat-6.0.1-linux-x86_64.tar.gz
# cp -r filebeat-6.0.1-linux-x86_64 /home/elk/filebeat-6.0.1
# ln -s /home/elk/filebeat-6.0.1 /home/elk/filebeat
# chown -R elk:elk /home/elk/filebeat-6.0.1

配置Filebeat:

#su - elk

$ vim /home/elk/filebeat/filebeat.yml   #只粘贴修改部分

#=========================== Filebeat prospectors =============================
- type: log
  paths:
    #- /var/log/*.log    #目前按照Go语言的glob函数处理。没有对配置目录做递归处理,比如配置的如果是:/var/log/* /*.log则只会去/var/log目录的所有子目录中寻找以”.log”结尾的文件,而不会寻找/var/log目录下以”.log”结尾的文件。
    #- c:\programdata\elasticsearch\logs\*
    - /usr/local/nginx/logs/access.log
  tags: ["nginx-access","test.51niux.com"]
#================================ General =====================================
name: "192.168.14.67"
#-------------------------- Elasticsearch output ------------------------------
#output.elasticsearch:
  # Array of hosts to connect to.
#  hosts: ["localhost:9200"]
#----------------------------- Logstash output --------------------------------
output.logstash:
  # The Logstash hosts
  hosts: ["192.168.14.65:5066"]
  loadbalance: true
  compression_level: 6

$ cat /home/elk/filebeat/filebeat.yml    #把样例配置都贴出来

#=========================== Filebeat prospectors =============================
filebeat.prospectors:
#每个 - 是一个prospectors。大多数选项可以在prospectors级别设置,因此您可以使用不同的prospectors进行各种配置。 以下是prospectors特定的配置。
- type: log
  enabled: false  ##更改为true以启用此prospectors配置。
  paths:     ##应该被抓取和抓取的路径。 基于Glob的路径。
    - /var/log/*.log
    #- c:\programdata\elasticsearch\logs\*
  #exclude_lines: ['^DBG']    #排除行。 要匹配的正则表达式列表。 它删除与列表中任何正则表达式匹配的行。
  #include_lines: ['^ERR', '^WARN']   #包括行。 要匹配的正则表达式列表。 它从列表中导出与任何正则表达式匹配的行。
  #exclude_files: ['.gz$']   #排除文件。 要匹配的正则表达式列表。 Filebeat会从列表中删除与任何正则表达式匹配的文件。 默认情况下,没有文件被丢弃。
  #fields:   #可选的附加字段。 可以自由选择这些字段,以将其他信息添加到搜寻的日志文件中进行过滤
  #  level: debug
  #  review: 1
  
  ###多行选项 
  #Multiline可以用于跨越多行的日志消息。 这在Java Stack Traces或C-Line Continuation中很常见   
  #multiline.pattern: ^\[    #必须匹配的正则表达式模式。 示例模式匹配所有以[
  #multiline.negate: false   #定义模式下的模式是否应该被否定。 默认是false。
  #multiline.match: after    ##匹配可以设置为“after”或“before”。 它用于定义行是否应该附加到之前或之后(不匹配)的模式,或者只要模式不匹配,则基于否定。 注意:相当于之前和之前相当于Logstash中的下一个
  
#============================= Filebeat modules ===============================
filebeat.config.modules:
  path: ${path.config}/modules.d/*.yml   #配置加载的全局模式
  reload.enabled: false    #设置为true以启用配置重新加载
  #reload.period: 10s      #应检查路径下文件的更改时间间隔
#====================  Elasticsearch template setting ==========================  
setup.template.settings:
  index.number_of_shards: 3
  #index.codec: best_compression
  #_source.enabled: false
#================================ General =====================================
#name: #发布网络数据的发送人的名称。 它可以用于将单个发货人在Web界面中发送的所有传输进行分组。   
#tags: ["service-X", "web-tier"]  #发送人的标签包含在他们自己的字段中,每次传输都会发布。
#fields:    ##可选字段,可以指定将其他信息添加到输出。
#  env: staging

#============================== Dashboards =====================================     
#setup.dashboards.enabled: false   #这些设置控制将示例仪表板加载到Kibana索引。 加载仪表板默认是禁用的,可以通过在这里设置选项来启用,或者使用`-setup` CLI标记或`setup`命令启用。
#setup.dashboards.url:   #从哪里下载仪表板归档的URL。 默认情况下,这个URL的值是根据节拍名称和版本计算的。 对于发布的版本,此URL指向artifacts.elastic.co网站上的仪表板存档。

#============================== Kibana =====================================
#从Beats版本6.0.0开始,仪表板通过Kibana API加载。这需要一个Kibana端点配置。
setup.kibana:
   #host: "localhost:5601"
#============================= Elastic Cloud ==================================
#这些设置简化了使用Elastic Cloud(https://cloud.elastic.co/)的filebeat。 
#cloud.id:    #cloud.id设置将覆盖`output.elasticsearch.hosts`和`setup.kibana.host`选项。 可以在Elastic Cloud Web UI中找到
#cloud.auth:  #cloud.auth设置将覆盖`output.elasticsearch.username`和`output.elasticsearch.password`设置。 格式是<user>:<pass>`。
#================================ Outputs =====================================
#配置发送beat收集的数据时使用的输出。
#-------------------------- Elasticsearch output ------------------------------
output.elasticsearch: 
  hosts: ["localhost:9200"]   #要连接的主机数组。 
  #protocol: "https"    ##可选协议和基本认证凭证。
  #username: "elastic"
  #password: "changeme"
#----------------------------- Logstash output --------------------------------
#output.logstash:
  #hosts: ["localhost:5044"]  #Logstash主机
  #ssl.certificate_authorities: ["/etc/pki/root/ca.pem"]   #可选的SSL。默认是关闭的。HTTPS服务器验证的根证书列表
  #ssl.certificate: "/etc/pki/client/cert.pem"   #SSL客户端认证证书
  #ssl.key: "/etc/pki/client/cert.key"   #客户证书密钥
#================================ Logging =====================================
#logging.level: debug   ##设置日志级别。 默认的日志级别是info。可用的日志级别是:critical, error, warning, info, debug
#logging.selectors: ["*"]  ##在调试级别,您可以选择性地启用仅针对某些组件的日志记录。 要启用所有选择器,请使用[“*”]。 其他选择器的例子是“beat”,“publish”,“service”。

#但是显然一个logstash是不合理的,如果一个logstash挂掉那整个集群就废掉了,所以最起码是两个啊,得下面这么配置:

output.logstash:
  hosts: ["192.168.14.65:5044","192.168.14.66:5044"]  #filebeat配置多个logstash服务端
  loadbalance: true
  compression_level: 6

 启动Filebeat:

$ ./filebeat -e -c filebeat.yml -d "publish"

2017/12/07 11:12:04.686924 beat.go:426: INFO Home path: [/home/elk/filebeat] Config path: [/home/elk/filebeat] Data path: [/home/elk/filebeat/data] Logs path: [/home/elk/filebeat/logs]
2017/12/07 11:12:04.687210 metrics.go:23: INFO Metrics logging every 30s
2017/12/07 11:12:04.687390 beat.go:433: INFO Beat UUID: d85b9ebe-c3c4-4ae3-91ed-0826a7bb8382
2017/12/07 11:12:04.687448 beat.go:192: INFO Setup Beat: filebeat; Version: 6.0.1
2017/12/07 11:12:04.689131 logger.go:18: DBG start pipeline event consumer
2017/12/07 11:12:04.689185 module.go:80: INFO Beat name: 192.168.14.67
2017/12/07 11:12:04.709937 beat.go:260: INFO filebeat start running.
2017/12/07 11:12:04.710061 registrar.go:71: INFO No registry file found under: /home/elk/filebeat/data/registry. Creating a new registry file.
2017/12/07 11:12:04.733832 registrar.go:108: INFO Loading registrar data from /home/elk/filebeat/data/registry
2017/12/07 11:12:04.733922 registrar.go:119: INFO States Loaded from registrar: 0
2017/12/07 11:12:04.735009 filebeat.go:260: WARN Filebeat is unable to load the Ingest Node pipelines for the configured modules because the Elasticsearch output is not configured/enabled. If you have already loaded the Ingest Node pipelines or are using Logstash pipelines, you can ignore this warning.
2017/12/07 11:12:04.735072 crawler.go:44: INFO Loading Prospectors: 1
2017/12/07 11:12:04.736376 registrar.go:150: INFO Starting Registrar
2017/12/07 11:12:04.737264 prospector.go:103: INFO Starting prospector of type: log; id: 16430255320212846879 
2017/12/07 11:12:04.737538 crawler.go:78: INFO Loading and starting Prospectors completed. Enabled prospectors: 1
2017/12/07 11:12:04.737692 reload.go:128: INFO Config reloader started
2017/12/07 11:12:04.750387 reload.go:220: INFO Loading of config files completed.
2017/12/07 11:12:04.752227 harvester.go:207: INFO Harvester started for file: /usr/local/nginx/logs/access.log
2017/12/07 11:12:34.689285 metrics.go:39: INFO Non-zero metrics in the last 30s: beat.memstats.gc_next=4473924 beat.memstats.memory_alloc=3062112 beat.memstats.memory_total=3062112 filebeat.events.added=1 filebeat.events.done=1 filebeat.harvester.open_files=1 filebeat.harvester.running=1 filebeat.harvester.started=1 libbeat.config.module.running=0 libbeat.config.reloads=1 libbeat.output.type=logstash libbeat.pipeline.clients=1 libbeat.pipeline.events.active=0 libbeat.pipeline.events.filtered=1 libbeat.pipeline.events.total=1 registrar.states.current=1 registrar.states.update=1 registrar.writes=2
2017/12/07 11:13:04.688099 metrics.go:39: INFO Non-zero metrics in the last 30s: beat.memstats.gc_next=4473924 beat.memstats.memory_alloc=3088000 beat.memstats.memory_total=3088000 filebeat.harvester.open_files=1 filebeat.harvester.running=1 libbeat.config.module.running=0 libbeat.pipeline.clients=1 libbeat.pipeline.events.active=0 registrar.states.current=1
2017/12/07 11:13:34.687919 metrics.go:39: INFO Non-zero metrics in the last 30s: beat.memstats.gc_next=4473924 beat.memstats.memory_alloc=3107840 beat.memstats.memory_total=3107840 filebeat.harvester.open_files=1 filebeat.harvester.running=1 libbeat.config.module.running=0 libbeat.pipeline.clients=1 libbeat.pipeline.events.active=0 registrar.states.current=1

3.2Logstash端配置

$ mkdir conf

$ cd conf

$mkdir /home/elk/logstash/patterns   #我们要转换的格式文件就存放在此目录下

$ cat /home/elk/logstash/patterns/51niux

NGUSERNAME [a-zA-Z\.\@\-\+_%]+    #这些现在还用不到,以后就会用到先粘贴在这里了
NGUSER %{NGUSERNAME}
NOTQUOTE [^\"]*
SPACE \s*
NOTDOT [^,]*

TEST_HTTP_ACCESS %{HTTPDATE:timestamp}\|%{IP:remote_addr}\|%{IPORHOST:http_host}\|(?:%{DATA:http_x_forwarded_for}|-)\|%{DATA:request_method}\|%{DATA:request_uri}\|%{DATA:server_protocol}\|%{NUMBER:status}\|(?:%{NUMBER:body_bytes_sent}|-)\|(?:%{DATA:http_referer}|-)\|%{DATA:http_user_agent}\|(?:%{DATA:request_time}|-)\|"
#主要是这里这句话,定义了一个TEST_HTTP_ACCESS,这就相当于模板,后面是通过log_format来匹配对应的正则。message是每段读进来的日志,IPORHOST、USERNAME、HTTPDATE等都是patterns/grok-patterns中定义好的正则格式名称,对照日志进行编写。
#grok pattren的语法为:%{SYNTAX:semantic},":" 前面是grok-pattrens中定义的变量,后面可以自定义变量的名称。(?:%{SYNTAX:semantic}|-)这种形式是条件判断。如果有双引号""或者中括号[],需要加 \ 进行转义。

$ cat /home/elk/logstash/conf/logstash-nginx.conf  #定义被logstash引用的配置文件

input {   #定义客户端发送到哪个IP的哪个端口
    beats {
        host => "192.168.14.65"    
        port => 5066
    }
}
filter {
    if "nginx-access" in [tags]{    #如果tags里面是nginx-access匹配到了就进行下面的判断,if判断是用在收集多业务nginx日志的时候,这里先把框架写在这里,然后如果不是的话就可以在下面再定义else if
        if "test.51niux.com" in [tags]{    #如果tags匹配到是test.51niux.com的话
            grok {   #就进行我们指定的TEST_HTTP_ACCESS这个模板里面的格式进行grok的格式转换。
                 patterns_dir => "/home/elk/logstash/patterns"   
                 match => {
                          "message" => "%{TEST_HTTP_ACCESS}"
                 }
            }
            mutate {   #类型转换,可以转换类型包括:"integer","float" 和 "string"。示例如下:
                convert => { "nginx_request_time" => "float"}
                convert => { "nginx_body_bytes_sent" => "integer"}
            }
   }
    
  }
}
output {
    if "nginx-access" in [tags]{   #如果是带着nginx-access的tags的消息
        if "test.51niux.com" in [tags]{   #如果是带着tags是test.51niux.com的消息
            elasticsearch {   #写入到elasticsearch中
                hosts => ["192.168.14.60","192.168.14.61","192.168.14.62","192.168.14.63","192.168.14.64"]   #这里是elasticsearch集群
                index => "logstash-nginx-access-test-51niux-com-%{+YYYY.MM}"   #这里是创建的索引的格式以logstash开头然后以年月结尾。
            }         
    }
 }
}

$ /home/elk/logstash/bin/logstash -t -f /home/elk/logstash/conf/logstash-nginx.conf   #上面文件if判断了{}有点多,可以用-t来验证一下配置文件而不启动,结尾出现下面就是可以了

Configuration OK
[2017-12-07T19:38:07,082][INFO ][logstash.runner          ] Using config.test_and_exit mode. Config Validation Result: OK. Exiting Logstash

$ /home/elk/logstash/bin/logstash  -f /home/elk/logstash/conf/logstash-nginx.conf   #指定配置文件前台启动,我们先看看输出。

[2017-12-07T19:52:04,304][INFO ][logstash.outputs.elasticsearch] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://192.168.14.60:9200/, http://192.168.14.61:9200/, http://192.168.14.62:9200/, http://192.168.14.63:9200/, http://192.168.14.64:9200/]}}
[2017-12-07T19:52:04,307][INFO ][logstash.outputs.elasticsearch] Running health check to see if an Elasticsearch connection is working {:healthcheck_url=>http://192.168.14.60:9200/, :path=>"/"}
[2017-12-07T19:52:04,401][WARN ][logstash.outputs.elasticsearch] Restored connection to ES instance {:url=>"http://192.168.14.60:9200/"}
[2017-12-07T19:52:04,438][INFO ][logstash.outputs.elasticsearch] ES Output version determined {:es_version=>nil}
[2017-12-07T19:52:04,438][WARN ][logstash.outputs.elasticsearch] Detected a 6.x and above cluster: the `type` event field won't be used to determine the document _type {:es_version=>6}
[2017-12-07T19:52:04,439][INFO ][logstash.outputs.elasticsearch] Running health check to see if an Elasticsearch connection is working {:healthcheck_url=>http://192.168.14.61:9200/, :path=>"/"}
[2017-12-07T19:52:04,449][WARN ][logstash.outputs.elasticsearch] Restored connection to ES instance {:url=>"http://192.168.14.61:9200/"}
[2017-12-07T19:52:04,455][INFO ][logstash.outputs.elasticsearch] Running health check to see if an Elasticsearch connection is working {:healthcheck_url=>http://192.168.14.62:9200/, :path=>"/"}
[2017-12-07T19:52:04,464][WARN ][logstash.outputs.elasticsearch] Restored connection to ES instance {:url=>"http://192.168.14.62:9200/"}
[2017-12-07T19:52:04,475][INFO ][logstash.outputs.elasticsearch] Running health check to see if an Elasticsearch connection is working {:healthcheck_url=>http://192.168.14.63:9200/, :path=>"/"}
[2017-12-07T19:52:04,486][WARN ][logstash.outputs.elasticsearch] Restored connection to ES instance {:url=>"http://192.168.14.63:9200/"}
[2017-12-07T19:52:04,494][INFO ][logstash.outputs.elasticsearch] Running health check to see if an Elasticsearch connection is working {:healthcheck_url=>http://192.168.14.64:9200/, :path=>"/"}
[2017-12-07T19:52:04,502][WARN ][logstash.outputs.elasticsearch] Restored connection to ES instance {:url=>"http://192.168.14.64:9200/"}
[2017-12-07T19:52:04,514][INFO ][logstash.outputs.elasticsearch] Using mapping template from {:path=>nil}
[2017-12-07T19:52:04,518][INFO ][logstash.outputs.elasticsearch] Attempting to install template {:manage_template=>{"template"=>"logstash-*", "version"=>60001, "settings"=>{"index.refresh_interval"=>"5s"}, "mappings"=>{"_default_"=>{"dynamic_templates"=>[{"message_field"=>{"path_match"=>"message", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false}}}, {"string_fields"=>{"match"=>"*", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false, "fields"=>{"keyword"=>{"type"=>"keyword", "ignore_above"=>256}}}}}], "properties"=>{"@timestamp"=>{"type"=>"date"}, "@version"=>{"type"=>"keyword"}, "geoip"=>{"dynamic"=>true, "properties"=>{"ip"=>{"type"=>"ip"}, "location"=>{"type"=>"geo_point"}, "latitude"=>{"type"=>"half_float"}, "longitude"=>{"type"=>"half_float"}}}}}}}}
[2017-12-07T19:52:04,534][INFO ][logstash.outputs.elasticsearch] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>["//192.168.14.60", "//192.168.14.61", "//192.168.14.62", "//192.168.14.63", "//192.168.14.64"]}
[2017-12-07T19:52:04,627][INFO ][logstash.pipeline        ] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>8, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>1000, :thread=>"#<Thread:0x139581ec@/home/elk/logstash/logstash-core/lib/logstash/pipeline.rb:290 run>"}
[2017-12-07T19:52:05,179][INFO ][logstash.inputs.beats    ] Beats inputs: Starting input listener {:address=>"192.168.14.65:5066"}
[2017-12-07T19:52:05,238][INFO ][logstash.pipeline        ] Pipeline started {"pipeline.id"=>"main"}
[2017-12-07T19:52:05,251][INFO ][logstash.agent           ] Pipelines running {:count=>1, :pipelines=>["main"]}
[2017-12-07T19:52:05,260][INFO ][org.logstash.beats.Server] Starting server on port: 5066

#可以看到日志输出基本是没毛病的。

# netstat  -lntup|grep 5066  #查看端口已经有了

tcp6       0      0 192.168.14.65:5066      :::*                    LISTEN      1683/java

# lsof -i :5066

COMMAND  PID USER   FD   TYPE DEVICE SIZE/OFF NODE NAME
java    1683  elk  165u  IPv6  18794      0t0  TCP 192.168.14.65:stanag-5066 (LISTEN)
java    1683  elk  166u  IPv6  26958      0t0  TCP 192.168.14.65:stanag-5066->192.168.14.67:56018 (ESTABLISHED)

#可以看到除了本地有一个5066监听端口外,还有一个客户端192.168.14.67的连接,注意了,也就是客户端发送日志的时候才会跟logstash的端口进行连接。

3.3 测试一下并配置Kibana

# for num in `seq 1 10000`;do  curl -I http://192.168.14.67;done   #随便找个 客户端,发它个10000次请求。

HTTP/1.1 200 OK
Server: nginx/1.12.2
Date: Thu, 07 Dec 2017 12:21:35 GMT
Content-Type: text/html
Content-Length: 612
Last-Modified: Thu, 07 Dec 2017 09:42:54 GMT
Connection: keep-alive
ETag: "5a290d1e-264"
Accept-Ranges: bytes

HTTP/1.1 200 OK
Server: nginx/1.12.2
Date: Thu, 07 Dec 2017 12:21:35 GMT
Content-Type: text/html
Content-Length: 612
Last-Modified: Thu, 07 Dec 2017 09:42:54 GMT
Connection: keep-alive
ETag: "5a290d1e-264"
Accept-Ranges: bytes

#然后我们看看192.168.14.67的beat的输出界面

2017/12/07 12:21:36.284411 processor.go:262: DBG Publish event: {
  "@timestamp": "2017-12-07T12:21:36.284Z",
  "@metadata": {
    "beat": "filebeat",
    "type": "doc",
    "version": "6.0.1"
  },
  "tags": [
    "nginx-access",
    "test.51niux.com"
  ],
  "prospector": {
    "type": "log"
  },
  "beat": {
    "name": "192.168.14.67",
    "hostname": "localhost.localdomain",
    "version": "6.0.1"
  },
  "source": "/usr/local/nginx/logs/access.log",
  "offset": 1792332,
  "message": "192.168.14.64 - - [07/Dec/2017:20:21:35 +0800] \"HEAD / HTTP/1.1\" 200 0 \"-\" \"curl/7.29.0\""
}
2017/12/07 12:21:49.206552 metrics.go:39: INFO Non-zero metrics in the last 30s: beat.memstats.gc_next=4194304 beat.memstats.memory_alloc=1456736 beat.memstats.memory_total=332163248 filebeat.events.active=-128 filebeat.events.added=2201 filebeat.events.done=2329 filebeat.harvester.open_files=1 filebeat.harvester.running=1 libbeat.config.module.running=0 libbeat.output.events.acked=2329 libbeat.output.events.batches=18 libbeat.output.events.total=2329 libbeat.output.read.bytes=108 libbeat.output.write.bytes=45691 libbeat.pipeline.clients=1 libbeat.pipeline.events.active=0 libbeat.pipeline.events.published=2201 libbeat.pipeline.events.total=2201 libbeat.pipeline.queue.acked=2329 registrar.states.current=1 registrar.states.update=2329 registrar.writes=18
2017/12/07 12:22:19.206515 metrics.go:39: INFO Non-zero metrics in the last 30s: beat.memstats.gc_next=4194304 beat.memstats.memory_alloc=1485824 beat.memstats.memory_total=332192336 filebeat.harvester.open_files=1 filebeat.harvester.running=1 libbeat.config.module.running=0 libbeat.pipeline.clients=1 libbeat.pipeline.events.active=0 registrar.states.current=1

#这是最后的窗口输出消息,可以看到它的发送过程。

Kibana界面的配置和查看:

图片.png

图片.png

图片.png

#按分钟来显示,然后将鼠标移动到左边的数据柱可以看到时间和Count次数。

3.4 跟着官网翻译一下grok过滤插件

官网链接:https://www.elastic.co/guide/en/logstash/current/plugins-filters-grok.html

描述:

       解析任意文本并构造它。Grok是将非结构化日志数据解析为结构化和可查询的好方法。这个工具非常适合syslog日志,apache和其他web服务器日志,mysql日志,以及通常为人类而不是计算机消耗的任何日志格式。

       默认情况下,Logstash附带约120个模式。 你可以在这里找到它们:https://github.com/logstash-plugins/logstash-patterns-core/tree/master/patterns

Grok基础:

      Grok通过将文本模式组合成与日志匹配的东西来工作。Grok模式的语法是%{SYNTAX:SEMANTIC}。语法是与您的文本相匹配的模式的名称。 例如,3.44将被NUMBER模式匹配,55.3.244.1将被IP模式匹配。 语法是你如何匹配。SEMANTIC是您为正在匹配的文本的标识符。 例如,3.44可能是一个事件的持续时间,所以你可以称之为持续时间。 此外,字符串55.3.244.1可以识别发出请求的客户端。对于上面的例子,你的Grok过滤器看起来像这样:

%{NUMBER:duration} %{IP:client}

    #或者,您可以将数据类型转换添加到您的grok模式。 默认情况下,所有的语义都保存为字符串。 如果您希望转换语义的数据类型,例如将字符串更改为整数,则将其后缀与目标数据类型。 例如%{NUMBER:num:int}将num语义从一个字符串转换为一个整数。 目前唯一支持的转换是int和float。

     例子:有了这个语法和语义的思想,我们可以从一个样本日志中提取有用的字段,就像这个虚构的http请求日志:

55.3.244.1 GET /index.html 15824 0.043

这种模式可能是:

%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}

一个更现实的例子,让我们从文件中读取这些日志:

input {
  file {
    path => "/var/log/http.log"
  }
}
filter {
  grok {
    match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" }
  }
}

在grok过滤器之后,事件将会有一些额外的字段:

client: 55.3.244.1
method: GET
request: /index.html
bytes: 15824
duration: 0.043

常用正则表达:

Grok位于正则表达式之上,所以任何正则表达式在grok中都是有效的。 正则表达式库是Oniguruma,您可以在Oniguruma网站上看到完整支持的regexp语法:https://github.com/kkos/oniguruma/blob/master/doc/RE

下面翻译一下这个链接:

Oniguruma正式表达版本6.7.0 2017/12/08

语法:ONIG_SYNTAX_ONIGURUMA(默认)

语法元素:

\    #转义(启用或禁用元字符)
|    #轮流交替
(...)  #组
[...]  #字符类,这里一般代表一个范围

字符:

\t      #水平制表符(0x09)
\v      #垂直制表符(0x0B)
\n      #换行符(换行符)(0x0A)
\r      #回车(0x0D)
\b      #退格(0x08)
\f      #换页(0x0C)
\a      #bell (0x07)
\e      #转义(0x1B)
\nnn    #八进制字符(编码字节值)
\o{17777777777}  #宽八进制字符(字符代码点值)
\uHHHH    #宽十六进制字符(字符码值)
\xHH    #十六进制字符(编码字节值)
\x{7HHHHHHH}  #宽十六进制字符(字符码值)
\cx   #控制字符(字符码值)
\C-x  #控制字符(字符码值)
\M-x  #meta(x | 0x80)(字符码值)
\M-\C-x   #元控制字符(字符码点值)
#(* \b作为退格仅在字符类中有效)

字符类型:

.     #任何字符(除了换行符)
\w    #单词字符  不是Unicode:字母数字,“_”和多字节字符。
\W    #非字字符
\s    #空白字符  不是Unicode:\t,\n,\v,\f,\r,\x20
\S    #非空白字符
\d    #十进制数字字符 Unicode:General_Category - Decimal_Number
\D    #非十进制数字字符
\h    #十六进制数字字符[0-9a-fA-F]
\H    #非十六进制字符
\R    #通用换行符(*不能用于字符类)"\r\n" or \n,\v,\f,\r(*但不会从\r\n回溯到\r) Unicode情况下:"\r\n" or \n,\v,\f,\r or U+0085, U+2028, U+2029
\N    #负面的换行符(?-m :.)
\O    #true anychar(?m :.)(*原始函数)
\X    #扩展字形集群 (?>\O(?:\Y\O)*) \X不检查匹配的起始位置是否是边界。写为\y \X如果你想确保它。
#字符属性
* \p{property-name}    
* \p{^property-name}    (negative)    
* \P{property-name}     (negative)    
property-name:
+适用于所有编码  #Alnum, Alpha, Blank, Cntrl, Digit, Graph, Lower,Print, Punct, Space, Upper, XDigit, Word, ASCII 
+在EUC_JP,Shift_JIS上工作   # Hiragana, Katakana
+工作在UTF8,UTF16,UTF32    #请参阅doc / UNICODE_PROPERTIES。

量词:

#greedy
?    #1或0次
*     #0或更多次
+     #1次或更多次
{n,m} #至少n但不超过m次
{n,}   #至少n次
{,n}   #至少为0但不超过n次({0,n})
{n}     #n次
#reluctant
??    #1或0次
*?   #0次或更多次
+?   #1次或更多次
{N,M}? #至少n但不超过m次
{N,}? #至少n次
{,N}?  #至少为0但不超过n次(== {0,n}?)
#possessive (greedy and does not backtrack once match)
?+    #1或0次
*+    #0或更多次
++    #一次或多次
({n,m}+, {n,}+, {n}+   #仅在ONIG_SYNTAX_JAVA中占有)
# ex. /a*+/ === /(?>a*)/

Anchors:

^     #行的开头
$     #行结束
\b    #字边界
\B    #非单词边界
\y    #扩展的字形集群边界
\Y     #扩展的字形集群非边界
\A     #字符串的开始
\Z    #结尾的字符串,或换行符在结尾处
\z    #字符串的结尾
\G    #当前的搜索尝试开始
\K    #保留(保留结果字符串的起始位置)

字符类:

 ^...   #负面的类(最低优先)
 x-y    #范围从x到y
 [...]  #设置(字符类中的字符类)
 ..&&.. #相交(低优先级,只高于^)
 # ex. [a-w&&[^c-g]z] ==> ([a-w] AND ([^c-g] OR z)) ==> [abh-w]
 *如果您想使用'[',' - '或']'作为普通字符。在字符类中,你应该用'\'来转义它们。
 #POSIX括号([:xxxxx:],否定[:^ xxxxx:])
  #不是Unicode的情况下:
  alnum    #字母或数字字符
  alpha    #字母
  ascii    #代码值:[0 - 127]
  blank    #\t,\x20
  cntrl
  digit    #0-9
  graph    #包括所有多字节编码字符
  lower
  print        #包括所有多字节编码字符
  punct
  space        #\t, \n, \v, \f, \r, \x20
  upper
  xdigit      #0-9,a-f,A-F
  word     #单词字母数字,“_”和多字节字符
  #Unicode案例:
   alnum    #字母| 标记|十进制数
   alpha    #字母| 标记
   ascii    # 0000 - 007F
   blank    #Space_Separator | 0009
   cntrl    #Control | Format | Unassigned | Private_Use | Surrogate
   digit    #十进制数
   graph    # [[:^space:]] && ^Control && ^Unassigned && ^Surrogate
   lower    #Lowercase_Letter
   print    #[[:graph:]] | [[:space:]]
   punct    #Connector_Punctuation | Dash_Punctuation | Close_Punctuation |    Final_Punctuation | Initial_Punctuation | Other_Punctuation |    Open_Punctuation    
   space    #Space_Separator | Line_Separator | Paragraph_Separator |    U+0009 | U+000A | U+000B | U+000C | U+000D | U+0085   
   upper    # Uppercase_Letter
   xdigit   #U+0030 - U+0039 | U+0041 - U+0046 | U+0061 - U+0066    (0-9, a-f, A-F)    
   word     #Letter | Mark | Decimal_Number | Connector_Punctuation

#剩下的看链接吧......

自定义模式:

有时logstash没有你需要的模式。 为此,有几个选项。

首先,可以使用Oniguruma语法来命名捕获,这将使您匹配一段文本并将其保存为一个字段:

(?<field_name>the pattern here)

例如,后缀日志的队列ID是10或11个字符的十六进制值。 我可以很容易地捕捉到这一点:

(?<queue_id>[0-9A-F]{10,11})

或者,可以创建一个自定义模式文件。

创建一个名为patterns的目录,其中包含一个名为extra的文件(文件名无关紧要,但为自己命名)
在该文件中,将需要的模式写为模式名称,空格,然后是该模式的正则表达式。

例如,像上面那样做postfix队列id的例子:

# contents of ./patterns/postfix:
POSTFIX_QUEUEID [0-9A-F]{10,11}

然后使用此插件中的patterns_dir设置来告知logstash您的自定义模式目录。 以下是一个带有示例日志的完整示例:

Jan  1 06:25:43 mailserver14 postfix/cleanup[21403]: BEF25A72965: message-id=<20130101142543.5828399CCAF@mailserver14.example.com>

过滤例子:

filter {
  grok {
    patterns_dir => ["./patterns"]
    match => { "message" => "%{SYSLOGBASE} %{POSTFIX_QUEUEID:queue_id}: %{GREEDYDATA:syslog_message}" }
  }
}

以上内容将匹配并导致以下字段:

timestamp: Jan 1 06:25:43
logsource: mailserver14
program: postfix/cleanup
pid: 21403
queue_id: BEF25A72965
syslog_message: message-id=<20130101142543.5828399CCAF@mailserver14.example.com>

The timestamp, logsource, program, and pid fields come from the SYSLOGBASE pattern ,它本身是由其他模式定义的。

另一种选择是使用pattern_definitions在过滤器中内联定义模式。 这主要是为了方便,并允许用户定义可以在该过滤器中使用的模式。 pattern_definitions中新定义的模式在特定的Grok过滤器之外将不可用。

Grok过滤器配置选项:

该插件支持以下配置选项和稍后介绍的通用选项。

break_on_match   #值类型是布尔值,默认值为truegrok的第一个成功的匹配将导致过滤器被完成。如果你想grok尝试所有的模式(也许你是解析不同的东西),然后将其设置为false。
keep_empty_captures  #值类型是布尔值,默认值是false。如果为true,则将空捕获保留为事件字段。
match   #值类型是hash,默认值是{}。 field ⇒ value的匹配散列
named_captures_only  #值类型是布尔值,默认值为true。如果为true,则只存储grok的命名捕获。
overwrite   #值类型是数组,默认值是[]。要覆盖的字段。这使可以覆盖已存在的字段中的值。
pattern_definitions  #值类型是hash。默认值是{}。模式名称和模式元组的散列,用于定义当前过滤器要使用的自定义模式。 匹配现有名称的模式将覆盖预先存在的定义。 把这个想象成可用的内联模式就是grok的这个定义
patterns_dir  #值类型是数组,默认值是[]。Logstash默认带有一堆模式,所以你不一定需要自己定义这个,除非你添加额外的模式。 您可以使用此设置指向多个模式目录。 请注意,Grok将读取与patterns_files_glob匹配的目录中的所有文件,并假定它是一个模式文件(包括任何波形备份文件)。
patterns_files_glob  #值类型是字符串,默认值是“*”。Glob模式,用于在由patterns_dir指定的目录中选择模式文件
tag_on_failure  #值类型是数组,默认值是[“_grokparsefailure”]。没有成功匹配时,将值追加到标签字段
tag_on_timeout  #值类型是字符串,默认值是“_groktimeout”。如果grok正则表达式超时,则应用标记。
timeout_millis  #值类型是数字,默认值是30000。尝试在这段时间后终止正则表达式。 这适用于每个模式,如果应用多个模式这将永远不会提前超时,但可能需要一些时间超时。 实际的超时时间是基于250ms量化的近似值。 设置为0以禁用超时

下面是overwrite的例子:

filter {
  grok {
    match => { "message" => "%{SYSLOGBASE} %{DATA:message}" }
    overwrite => [ "message" ]
  }
}

#例如,如果在消息字段中有syslog行,则可以使用部分匹配覆盖消息字段,如下所示:在这种情况下,像一条lineMay 29 16:37:11 sadness logger: hello world 将被解析和hello world将覆盖原来的消息。

通用选项:

add_field:

值类型是hash。默认值是{}。如果此过滤器成功,请将任意字段添加到此事件。 字段名称可以是动态的,并使用%{field}来包含事件的一部分。

例:

filter {
  grok {
    add_field => { "foo_%{somefield}" => "Hello world, from %{host}" }
  }
}
# You can also add multiple fields at once:
filter {
  grok {
    add_field => {
      "foo_%{somefield}" => "Hello world, from %{host}"
      "new_field" => "new_static_value"
    }
  }
}

#如果事件的字段为“somefield”==“hello”,那么这个过滤器在成功时将添加字段foo_hello(如果存在),并且上面的值和%{host}部分被来自事件的值替换。 第二个例子也会添加一个硬编码的字段。

add_tag:

值类型是array,默认值是[]。如果此过滤器成功,请向该事件添加任意标签。 标签可以是动态的,并使用%{field}语法包含事件的一部分。

例子:

filter {
  grok {
    add_tag => [ "foo_%{somefield}" ]
  }
}
# You can also add multiple tags at once:
filter {
  grok {
    add_tag => [ "foo_%{somefield}", "taggedy_tag"]
  }
}

如果事件的字段为“somefield”==“hello”,那么这个过滤器会成功添加一个标签foo_hello(而第二个例子当然会添加一个taggedy_tag标签)。

enable_metric:

值类型是布尔值,默认值为true.

为特定的插件实例禁用或启用度量标准日志记录,我们默认记录所有的度量标准,但是您可以禁用特定插件的度量标准收集。

id:

值类型是字符串,这个设置没有默认值。为插件配置添加一个唯一的ID。 如果没有指定ID,Logstash将会生成一个。 强烈建议在您的配置中设置此ID。 当你有两个或多个相同类型的插件时,这是非常有用的,例如,如果你有2个grok过滤器。 在这种情况下添加一个命名的ID将有助于在使用监视API时监视Logstash。

filter {
  grok {
    id => "ABC"
  }
}

periodic_flush:

值类型是布尔值,默认值是false。定期调用过滤器刷新方法。 可选的。

remove_field:

值类型是数组,默认值是[]。如果此过滤器成功,请从此事件中删除任意字段。 例:

filter {
  grok {
    remove_field => [ "foo_%{somefield}" ]
  }
}
# You can also remove multiple fields at once:
filter {
  grok {
    remove_field => [ "foo_%{somefield}", "my_extraneous_field" ]
  }
}

如果事件的字段为“somefield”==“hello”,则此过滤器在成功时将删除名称为foo_hello的字段(如果存在)。 第二个例子会删除一个额外的非动态字段。

remove_tag:

值类型是数组,默认值是[]。如果此过滤器成功,请从该事件中移除任意标签。 标签可以是动态的,并使用%{field}语法包含事件的一部分。

例子:

filter {
  grok {
    remove_tag => [ "foo_%{somefield}" ]
  }
}
# You can also remove multiple tags at once:
filter {
  grok {
    remove_tag => [ "foo_%{somefield}", "sad_unwanted_tag"]
  }
}

如果事件的字段为“somefield”==“hello”,那么这个过滤器在成功的时候会删除标签foo_hello(如果存在的话)。 第二个例子会删除一个sad_unwanted_tag的标签。

作者:忙碌的柴少 分类:ELK 浏览:18858 评论:18
留言列表
叮当
叮当 启动filebeat报错 filebeat2018/01/12 10:47:27.830964 beat.go:635: CRIT Exiting: error unpacking config data: more then one namespace configured accessing 'output' (source:'filebeat.yml')
Exiting: error unpacking config data: more then one namespace configured accessing 'output' (source:'filebeat.yml')
怎么解决?  回复
访客
访客 filebeat.yml文件中配置了多个output参数,配置一个即可  回复
叮当
叮当 坐等回复,卡在这儿了  回复
Leon.zhou
Leon.zhou 是不是你filebeat.yml没有配置输出管道
output.elasticsearch 或 output.logstash:  回复
叮当
叮当 已解决,原因是output.elasticsearch和output.logstash都打开了  回复
叮当
叮当 奇怪,我是filebeat跟logstash装在同一台机器上的,走到3.2最后一小步lsof查看5066端口的时候, filebeat的客户端并没有跟logstash端口进行连接啊  回复
忙碌的柴少
忙碌的柴少 你没注意看我下面写的那句话只有发数据的时候才会建立连接你用filebeat采集下数据试试  回复
忙碌的柴少
忙碌的柴少 谢谢帮忙回复  回复
叮当
叮当 kibana的discover显示不了收集的日志提示No results found  回复
忙碌的柴少
忙碌的柴少 那就是没数据没索引白。  回复
叮当
叮当 有索引,es里有数据,怎么回复贴不了图  回复
叮当
叮当 已解决,原来是kibana设置时不选择时间过滤器就可以显示日志了,还是要谢谢楼主  回复
阿dai
阿dai 请教楼主,我这边当前使用的是多实例配置,现在想将多实例集中到一个实例进行管理,是应该使用x-pack实现吗?  回复
忙碌的柴少
忙碌的柴少 加tags,通过判断tags做操作。  回复
1111
1111 请问一下filebeat如何后台启动运行?  回复
忙碌的柴少
忙碌的柴少 肯定得啊你仔细看博客哈  回复
小花
小花 filebeat配置多个logstash服务端,logstash多实例的话,每个logstash过滤的配置都需要一样的是吗?需要考虑filebeat收集的数据到每个logstash数据一致性问题吗?  回复
访客
访客 一致性倒是不考虑,你配置多实例就可以想成是一份数据切两半发两个地方提高并打,过滤肯定是一样的。但是一般也不这么干啊,你都两个Logstash了 直接顶kafka集群吧。  回复
发表评论
来宾的头像