柴少的官方网站 技术在学习中进步,水平在分享中升华

ELK加入Redis队列(十一)

#如果消息量比较大的话怕ES顶不住,然后再其前端加了一层消息队列减轻压力,将logstash采集到的数据写入redis然后再通过logstash从redis把数据拿出来写入到ES中。当然redis是内存级缓存,而且是持久化存储,如果量比较大的话需要内存就打、太奢侈。我这里线上也没用这种方式,这里也就是简单的记录一下,主要是一种缓冲思路嘛。

一、架构简单介绍

1.1 架构介绍

image.png

#此图来自于:http://blog.csdn.net/u010636606/article/details/71237248

多个独立的agent(Shipper)负责收集不同来源的数据。
一个中心agent(indexer)负责汇总和分析数据,在中心agent前的Broker(使用redis实现)作为缓冲区,中心agent后的ElasticSearch用于存储和搜索数据,前端的Kibana提供丰富的图表展示。
Shipper表示日志收集,使用LogStash收集各种来源的日志数据;
Broker作为远程agent与中心agent之间的缓冲区,使用redis实现,一是可以提高系统的性能,二是可以提高系统的可靠性,当中心agent提取数据失败时,数据保存在redis中,而不至于丢失;
中心agent也是LogStash,从Broker中提取数据,可以执行相关的分析和处理(Filter);
ElasticSearch用于存储最终的数据,并提供搜索功能;
Kibana提供一个简单、丰富的web界面,数据来自于ElasticSearch,支持各种查询、统计和展示;

1.2 架构规划

#ElasticSearch集群还是192.168.14.60-64

#Kibana还是原来的192.168.14.66,这样在其上面在其一个Logstash,负责从redis集群里面拿数据然后写入到ES集群中。

#Redis集群选择三台机器做成哨兵模式吧,192.168.14.11-13

#192.168.14.13上面的Logstash负责采集本地的慢查询日志到redis中。

#192.168.14.67上面的Logstash负责接收本地filebeat发送过来的nginx日志然后写入到redis中。

博文来自:www.51niux.com

二、环境部署

2.1 redis主备外加哨兵模式

参考链接:https://blog.51niux.com/?id=132

192.168.14.11:6379> info replication
# Replication
role:master
connected_slaves:2
slave0:ip=192.168.14.12,port=6379,state=online,offset=17925,lag=1
slave1:ip=192.168.14.13,port=6379,state=online,offset=18066,lag=0
master_repl_offset:18066
repl_backlog_active:1
repl_backlog_size:1048576
repl_backlog_first_byte_offset:2
repl_backlog_histlen:18065

#从上面redis的数据结果可以看出已经主备模式已经配置完毕了。

2.2 192.168.14.13上面部署mysql慢查询日志写入到redis

logstash端的配置:

$ cat /home/elk/logstash/conf/mysqltest.conf   #主要是修改output到redis

input {
  file { 
    path => "/var/lib/mysql/slow.log"
    type => "slow-log"
    codec => multiline {
      pattern => "^# User@Host:"
      negate => true
      what => previous
    }
   tags => ["slow-mysql","mysqllog"]   #在传输的时候打上标签。
  }
}
output {
  redis {
        host => ["192.168.14.11","192.168.14.12","192.168.14.13"]
        #port => "6379"
        db => "1"
        key => "slow_mysql_%{+YYYY.MM.dd}"
        data_type => list 
  }
}

#但是redis列表它这里有一个问题,因为redis集群只有主才能写入,其他的是不能写的,但是这个选择又是随机的......

$   /home/elk/logstash/bin/logstash -f /home/elk/logstash/conf/mysqltest.conf    #启动logstah,我直接截取报错信息了

[2017-12-16T21:30:54,216][WARN ][logstash.outputs.redis   ] Failed to send event to Redis {:event=>#<LogStash::Event:0x3e3eeef5>, :identity=>"redis://@192.168.14.12:6379/1 list:slow_mysql_%{+YYYY.MM.dd}", :exception=>#<Redis::CommandError: READONLY You can't write against a read only slave.>, 
......
[2017-12-16T21:30:55,227][WARN ][logstash.outputs.redis   ] Failed to send event to Redis {:event=>#<LogStash::Event:0x3e3eeef5>, :identity=>"redis://@192.168.14.13:6379/1 list:slow_mysql_%{+YYYY.MM.dd}", :exception=>#<Redis::CommandError: READONLY You can't write against a read only slave.>,

#有上面两条信息,但是是WARN不是ERROR,程序运行时没问题的,也就是随机选嘛,先选择192.168.14.12发现不能写,然后又选择192.168.14.13又发现不能写,然后又发现192.168.14.11可以写,然后就没再报错了。要是顺序写就好了,可惜是随机的,但是你要是不搞列表的形式的话,redis万一出问题了你就写不过去了,不过也没问题信息不发送也不丢失嘛。

然后redis上面查看一下:

192.168.14.11:6379> SELECT 1
OK
192.168.14.11:6379[1]> KEYS *
1) "slow_mysql_2017.12.16"
192.168.14.11:6379[1]> LRANGE slow_mysql_2017.12.16 0 -1   #查看已经写入信息了
 1) "{\"rows_examined\":1,\"client-domain\":\"controller\",\"query\":\"SELECT services.created_at AS services_created_at, services.updated_at AS services_updated_at, servic\\nes.deleted_at AS services_delet\\ned_at, services.deleted AS services_deleted, services.id AS services_id, services.host AS services_ho\\nst, services.`binary` AS servic\\nes_binary, services.topic AS services_topic, services.report_count AS services_report_count, services\\n.disabled AS services_disabled,\\n services.disabled_reason AS services_disab

2.3 192.168.14.67上面部署nginx日志写入到redis

Filebeat的主要配置:

filebeat.prospectors:
- type: log
  enabled: true
  paths:
     - /usr/local/nginx/logs/access.log
  tags: ["nginx-access","test.51niux.com"]
output.logstash:
  hosts: ["192.168.14.67:5066"]
  loadbalance: true
  compression_level: 6

  $ /home/elk/filebeat/filebeat -c /home/elk/filebeat/filebeat.yml 

Logstash的配置:

$ cat /home/elk/logstash/patterns/51niux 

TEST_HTTP_ACCESS %{IP:fromip} - - \[%{HTTPDATE:nginx.access.time}\] "%{WORD:nginx.access.method} %{DATA:nginx.access.url} HTTP/%{NUMBER:nginx.access.http_version}\" %{NUMBER:nginx.access.response_code} %{NUMBER:nginx.access.body_sent.bytes} \"%{DATA:nginx.access.referrer}\" \"%{DATA:nginx.access.agent}"

$ vim /home/elk/logstash/conf/logstash-nginx.conf

input {
    beats {
        host => "192.168.14.67"
        port => 5066
    }
}
filter {
    if "nginx-access" in [tags]{
        if "test.51niux.com" in [tags]{
            grok {
                 patterns_dir => "/home/elk/logstash/patterns"
                 match => {
                          "message" => "%{TEST_HTTP_ACCESS}"
                 }
            }
          }
  }
}
output {

  redis {
        host => ["192.168.14.11","192.168.14.12","192.168.14.13"]
        #port => "6379"
        db => "2"
        key => "access_nginx_%{+YYYY.MM.dd}"
        data_type => list
  }
}

$ /home/elk/logstash/bin/logstash -f /home/elk/logstash/conf/logstash-nginx.conf 

redis服务器上面的查看:

192.168.14.11:6379[1]> SELECT 2
OK
192.168.14.11:6379[2]> KEYS *
1) "access_nginx_2017.12.16"
192.168.14.11:6379[2]> LRANGE access_nginx_2017.12.16 0 -1  #查看也有信息写入了。
1) "{\"nginx.access.http_version\":\"1.1\",\"offset\":2706801,\"nginx.access.response_code\":\"200\",\"prospector\":{\"type\":\"log\"},\"nginx.access.referrer\":\"-\",\"nginx.access.agent\":\"curl/7.29.0\",\"source\":\"/usr/local/nginx/logs/access.log\",\"message\":\"192.168.14.66 - - [16/Dec/2017:23:56:39 +0800] \\\"GET / HTTP/1.1\\\" 200 612 \\\"-\\\" \\\"curl/7.29.0\\\"\",
\"nginx.access.url\":\"/\",\"tags\":[\"nginx-access\",\"test.51niux.com\",\"beats_input_codec_plain_applied\"],\"@timestamp\":\"2017-12-16T15:56:43.850Z\",\"nginx.access.method\":\"GET\",\"nginx.access.time\":\"16/Dec/2017:23:56:39 +0800\",\"@version\":\"1\",\"beat\":{\"name\":\"192.168.14.67\",\"hostname\":\"localhost.localdomain\",\"version\":\"6.0.1\"},
\"host\":\"localhost.localdomain\",\"fromip\":\"192.168.14.66\",\"nginx.access.body_sent.bytes\":\"612\"}"

#注意最后标签发过来了:\"tags\":[\"nginx-access\",\"test.51niux.com\"

博文来自:www.51niux.com

2.4 redis做input插件,先跟着官网走一波

官网链接:https://www.elastic.co/guide/en/logstash/current/plugins-inputs-redis.html

batch_count注意:如果使用batch_count设置,则必须使用Redis版本2.6.0或更高版本。 以前的任何东西都不支持批处理所使用的操作。

Redis输入配置选项

该插件支持以下配置选项和稍后介绍的通用选项。

image.png

下面是介绍:

batch_count  #值类型是数字,默认值是125。使用EVAL从Redis返回的事件数量。
data_type   #这是一个必需的设置。值可以是以下任何一个:list,channel,pattern_channel。这个设置没有默认值。指定列表或频道。 如果redis\_type is list,那么我们将BLPOP的关键。  If redis\_type ischannel,那么我们将SUBSCRIBE键。 If redis\_type is pattern_channel,那么我们将把PSUBSCRIBE改为key。
db    #值类型是数字,默认值是0。Redis数据库编号。
host  #值类型是字符串,默认值是“127.0.0.1”。Redis服务器的主机名。
key  #这是一个必需的设置。值类型是字符串,这个设置没有默认值。Redis列表或频道的名称。
password  #值类型是密码,这个设置没有默认值。密码进行身份验证。 没有默认的身份验证。
port  #值类型是数字,默认值是6379。要连接的端口。
threads  #值类型是数字,默认值是1
timeout  #值类型是数字,默认值是5。初始连接超时秒数

 通用选项

add_field  #值类型是hash。默认值是{}。添加一个字段到一个事件
codec   #值类型是codec。默认值是“plain”。用于输入数据的编解码器。 输入编解码器是在数据进入输入之前解码数据的一种便捷方法,无需在Logstash管道中使用单独的筛选器。
enable_metric  #值类型是布尔值,默认值为true。为特定的插件实例禁用或启用度量标准日志记录,我们默认记录所有的度量标准,但是您可以禁用特定插件的度量标准收集。
id  #值类型是字符串,这个设置没有默认值。为插件配置添加一个唯一的ID。如果没有指定ID,Logstash将会生成一个。强烈建议在您的配置中设置此ID。 当你有两个或多个相同类型的插件时,这是特别有用的,例如,如果你有两个redis输入。 在这种情况下添加一个命名的ID将有助于在使用监视API时监视Logstash。
tags  #值类型是数组,这个设置没有默认值。添加任意数量的任意标签到您的事件。
type  #值类型是字符串.这个设置没有默认值。将类型字段添加到由此输入处理的所有事件。

2.5 192.168.14.66上面Logstash上面的设置

logstash配置文件设置:

$ vim /home/elk/logstash/conf/redis_es.conf 

input {
  redis {
    host =>  "192.168.14.11"   #这里也不能设置redis列表,还得设置主redis。
    port => "6379"
    #key =>  "access_nginx_%{+YYYY.MM.dd}"   #这里为什么注释呢,因为发现这里不能设置变量,所以上面设置的变量白瞎了
    key => "access_nginx_2017.12.16"
    db => 2
    data_type => "list"
  }
  redis {
    host =>  "192.168.14.11"
    port => "6379"
    key => "slow_mysql_2017.12.16"
    db => 1
    data_type => "list"
    #tags => ["slow_mysql"]    #给这个slow_mysql_2017.12.16里的消息打一个标签slow_mysql。可以这样干如果客户端没有传递标签
  }
}
filter{
 if "slow-mysql" in [tags]{
  grok {
    match => { "message" => "# User@Host: (?<user>[a-zA-Z0-9._-]*)\[(?:.*)\]\s+@\s+(?<client-domain>\S*)\s+\[%{IP:client-ip}*\]\s.*#
 Query_time: %{NUMBER:query_time:float}\s+Lock_time: %{NUMBER:lock_time:float}\s+Rows_sent: %{NUMBER:rows_sent:int}\s+Rows_examined:
 %{NUMBER:rows_examined:int}\s.*(?<db>use \S*)?.*SET timestamp=%{NUMBER:ts};\s+(?<query>(?<action>\w+)\s+.*)\s+#?\s*.*" }
  }
 }
}
output {
   if "nginx-access" in [tags]{
       if "test.51niux.com" in [tags]{
           elasticsearch {  
               hosts => ["192.168.14.60","192.168.14.61","192.168.14.62","192.168.14.63","192.168.14.64"]
               index => "logstash-nginx-access-test-51niux-com-%{+YYYY.MM}"
           }         
        }  
   }    
   else if "slow-mysql" in [tags]  {
           elasticsearch {
               hosts => ["192.168.14.60","192.168.14.61","192.168.14.62","192.168.14.63","192.168.14.64"]
               index => "slow-mysql-%{+YYYY.MM}"
           }   
        }  
  }

$ /home/elk/logstash/bin/logstash -f /home/elk/logstash/conf/redis_es.conf  #只截取部分结果

[2017-12-16T23:21:52,179][INFO ][logstash.outputs.elasticsearch] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>["//192.168.14.60", "//192.168.14.61", "//192.168.14.62", "//192.168.14.63", "//192.168.14.64"]}
[2017-12-16T23:21:52,188][INFO ][logstash.pipeline        ] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>500, :thread=>"#<Thread:0x609a89ed@/home/elk/logstash/logstash-core/lib/logstash/pipeline.rb:290 run>"}
[2017-12-16T23:21:52,201][INFO ][logstash.inputs.redis    ] Registering Redis {:identity=>"redis://@192.168.14.11:6379/2 list:access_nginx_2017.12.16"}
[2017-12-16T23:21:52,203][INFO ][logstash.inputs.redis    ] Registering Redis {:identity=>"redis://@192.168.14.11:6379/1 list:slow_mysql_2017.12.16"}
[2017-12-16T23:21:52,204][INFO ][logstash.pipeline        ] Pipeline started {"pipeline.id"=>"main"}
[2017-12-16T23:21:52,239][INFO ][logstash.agent           ] Pipelines running {:count=>1, :pipelines=>["main"]}

先是nginx日志测试:

image.png

#用一个机器打了3000此访问都捕捉到了一次没有丢失。

192.168.14.11:6379[2]> KEYS *
(empty list or set)

#然后就是redis这里,抓走消息之后就会清掉,所以只有当数据没被抓走的时候你才会看到数据,如果数据被拿走这里就没了。

#然后还有一个问题,比如现在是2017年12月17号了,我们客户端Logstash那里是按日志变量创建的key嘛。现在我们将中心Logstash关闭一下,然后客户端的Logstash重启一下。然后再次访问下nginx日志。

192.168.14.11:6379[2]> KEYS *
(empty list or set)
192.168.14.11:6379[2]> KEYS *
1) "access_nginx_2017.12.16"
192.168.14.11:6379[2]> LRANGE access_nginx_2017.12.16 0 -1
1) "{\"nginx.access.http_version\":\"1.1\",\"offset\":3067431,\"nginx.access.response_code\":\"200\",\"prospector\":{\"type\":\"log\"},\"nginx.access.referrer\":\"-\",\"nginx.access.agent\":\"curl/7.29.0\",\"source\":\"/usr/local/nginx/logs/access.log\",\"message\":\"192.168.14.66 - - [17/Dec/2017:00:54:39 +0800] \\\"GET / HTTP/1.1\\\" 200 612 \\\"-\\\" \\\"curl/7.29.0\\\"\",\"nginx.access.url\":\"/\",\"tags\":[\"nginx-access\",\"test.51niux.com\",\"beats_input_codec_plain_applied\"],\"@timestamp\":\"2017-12-16T16:54:44.012Z\",\"nginx.access.method\":\"GET\",\"nginx.access.time\":\"17/Dec/2017:00:54:39 +0800\",\"@version\":\"1\",\"beat\":{\"name\":\"192.168.14.67\",\"hostname\":\"localhost.localdomain\",\"version\":\"6.0.1\"},\"host\":\"localhost.localdomain\",\"fromip\":\"192.168.14.66\",\"nginx.access.body_sent.bytes\":612}"

#从上面结果可以看出key是2017.12.16号的,然后里面的内容的日志事件是2017.12.17,我擦时间变量%{+YYYY.MM.dd}怎么不生效了?恩filebeat也重启也不行。恩看来这里还是要设置个固定值......还有这个坑。

博文来自:www.51niux.com

然后是mysql的慢查询测试:

image.png

#mysql慢查询的测试结果也是可以的。

作者:忙碌的柴少 分类:ELK 浏览:10931 评论:3
留言列表
DonaldCen
DonaldCen logstash 不支持 redis哨兵模式的呀  回复
pppkq
pppkq 请问线上用的是哪种模式?我们这边也遇到量大es写入不了数据的情况  回复
访客
访客 kafka做缓冲式模式,我在ELK系列(16)里面有写以后会开单章  回复
发表评论
来宾的头像