ELK利用GeoIP映射用户地理位置(十二)
IP地址位置用于确定IP地址的物理位置,因为ELK已经收集了web日志,如果通过ELK分析出来用户来源地址的比例,那么多网站机构的调整都是非常有帮助的。
一、跟着官网了解一波Geoip
GeoIP过滤器根据来自Maxmind GeoLite2数据库的数据添加有关IP地址的地理位置的信息。
1.1 支持的数据库
这个插件与开箱即用的GeoLite2(https://dev.maxmind.com/geoip/geoip2/geolite2/)城市数据库捆绑在一起。 从Maxmind的描述 ----GeoLite2数据库是免费的IP地理位置数据库,可与MaxMind的GeoIP2数据库相媲美,但不太准确”。 有关更多详细信息,请参阅GeoIP Lite2许可证。Maxmind的商业数据库(https://www.maxmind.com/en/geoip2-databases)也支持这个插件。简单来说就是两个数据库一个免费的一个商业的,免费的不如商业的地理位置精准。
如果您需要使用捆绑的GeoLite2城市以外的数据库,则可以直接从Maxmind网站下载数据库,并使用数据库选项指定其位置。 GeoLite2数据库可以从这里下载(https://dev.maxmind.com/geoip/geoip2/geolite2/)。
如果您想获得自治系统编号(ASN)信息,您可以使用geolite2 - ASN数据库。
1.2 细节
如果GeoIP查找返回经度和纬度,则会创建[geoip][location]字段。该字段以GeoJSON(http://geojson.org/geojson-spec.html)格式存储。此外,还提供了默认的Elasticsearch模板elasticsearch输出(https://www.elastic.co/guide/en/logstash/current/plugins-outputs-elasticsearch.html)映射这[geoip] [location]字段添加到Elasticsearch geo_point。
由于这个字段是一个geo_point,它仍然是有效的GeoJSON,您将获得Elasticsearch的地理空间查询,构面和过滤器功能以及为所有其他应用程序(如Kibana的地图可视化)提供GeoJSON的灵活性。
本产品包含由MaxMind创建的GeoLite2数据,可从中获取http://www.maxmind.com. 这个数据库是根据授权知识共享署名 - 相同方式共享4.0国际许可(https://creativecommons.org/licenses/by-sa/4.0/)。
GeoIP过滤器的版本4.0.0和更高版本使用MaxMind GeoLite2数据库并支持IPv4和IPv6查找。 4.0.0之前的版本使用传统的MaxMind GeoLite数据库,仅支持IPv4查找。
1.3 Geoip筛选器配置选项
该插件支持以下配置选项和稍后介绍的通用选项。
cache_size:
值类型是数字,默认值是1000。
GeoIP查询的成本非常高。 该过滤器使用缓存来利用IP代理通常在日志文件中彼此相邻的情况,并且很少具有随机分布。 设置的越高,项目在缓存中的可能性就越大,并且此过滤器运行得越快。 但是,如果将此设置得太高,则可能会使用比所需更多的内存。 由于Geoip API升级到v2,目前没有任何驱逐策略,如果缓存已满,则不能添加更多记录。 尝试使用此选项的不同值来查找数据集的最佳性能。
这必须设置为大于0的值。真的没有理由不想要这种行为,开销很小,速度增益很大。
注意这个配置值对于geoip_type是全局的,这一点很重要。 也就是说,相同geoip_type的geoip过滤器的所有实例共享相同的高速缓存。 最后声明的缓存大小将获胜。 这样做的原因是,在流水线中的不同点上为不同的实例分配多个缓存是没有好处的,这只会增加缓存未命中次数和浪费内存。
database:
值类型是path,这个设置没有默认值。
Logstash应该使用的Maxmind数据库文件的路径。 默认数据库是GeoLite2-City。 GeoLite2-City,GeoLite2-Country,GeoLite2-ASN是Maxmind支持的免费数据库。 GeoIP2-City,GeoIP2-ISP,GeoIP2-Country是Maxmind支持的商业数据库。
如果未指定,则默认为Logstash附带的GeoLite2城市数据库。
default_database_type:
这个插件现在包括GeoLite2-City和GeoLite2-ASN数据库。如果未设置数据库和default_database_type,则将选择GeoLite2-City数据库。要使用包含的GeoLite2-ASN数据库,请将default_database_type设置为ASN。
值类型是字符串,默认值是city。唯一可接受的值是City和ASN。
fields:
值类型是array,这个设置没有默认值。
要包含在事件中的geoip字段数组。可能的字段取决于数据库类型。 默认情况下,所有geoip字段都包含在事件中。
对于内置的GeoLite2城市数据库,可以使用以下内容:city_name, continent_code, country_code2, country_code3, country_name, dma_code, ip, latitude, longitude, postal_code, region_name 和 timezone.
source:
这是一个必需的设置。值类型是字符串,这个设置没有默认值。
包含要通过geoip映射的IP地址或主机名的字段。 如果这个字段是一个数组,则只会使用第一个值。
tag_on_failure:
值类型是数组,默认值是[“_geoip_lookup_failure”]。这个设置没有默认值。
将事件标记为未能查找地理信息。 这可以在以后的分析中使用。
target:
值类型是字符串,默认值是“geoip”。
指定Logstash应该存储geoip数据的字段。 例如,如果您有src_ip和dst_ip字段,并希望两个IP的GeoIP信息,这可能是有用的。
如果将数据保存到geoip以外的目标字段,并希望在Elasticsearch中使用与geo_point相关的函数,则需要更改Elasticsearch输出提供的模板,并将输出配置为使用新模板。
即使您不使用geo_point映射,[target] [location]字段仍然是有效的GeoJSON。
1.4 通用选项
#这个在前面几节都翻译了,这里就简单记录下了。
所有过滤器插件都支持以下配置选项:
add_field:
值类型是hash,默认值是{}
如果此过滤器成功,请将任意字段添加到此事件。 字段名称可以是动态的,并使用%{field}来包含事件的一部分。
add_tag:
值类型是数组。默认值是[]。如果此过滤器成功,请向该事件添加任意标签。 标签可以是动态的,并使用%{field}语法包含事件的一部分。
enable_metric:
值类型是布尔值,默认值为true。为特定的插件实例禁用或启用度量标准日志记录,我们默认记录所有的度量标准,但是您可以禁用特定插件的度量标准收集。
id:
值类型是字符串,这个设置没有默认值。为插件配置添加一个唯一的ID。 如果没有指定ID,Logstash将会生成一个。 强烈建议在您的配置中设置此ID。 当你有两个或多个相同类型的插件时,这是特别有用的,例如,如果你有两个geoip过滤器。 在这种情况下添加一个命名的ID将有助于在使用监视API时监视Logstash。
periodic_flush:
值类型是布尔值,默认值是false。定期调用过滤器刷新方法。 可选的。
remove_field:
值类型是数组,默认值是[]。如果此过滤器成功,请从此事件中删除任意字段。
remove_tag:
值类型是数组,默认值是[]。如果此过滤器成功,请从该事件中移除任意标签。 标签可以是动态的,并使用%{field}语法包含事件的一部分。
博文来自:www.51niux.com
二、环境部署
2.1 Logstash日志格式匹配编写:
223.11.97.79 - - [14/Mar/2017:00:00:04 +0800] "GET /00/11/00114827.mp3 HTTP/1.1" 206 941328 "http://d.51niux.com/search/searchKsc.htm?wd=%E4%AA%8E%E5%B0%8F%E5%8D%96%E8%92%AA&fieldStr=&encode=encode" "Player3.1.0.4" "-" 0.199 - - HIT
#类似于这样一种nginx proxy cache的日志格式,先对其进行一下日志的转变
$ cat /home/elk/logstash/patterns/51niux
PROXY_HTTP_ACCESS %{IP:fromip} - - \[%{HTTPDATE:nginx.access.time}\] "%{WORD:nginx.access.method} %{DATA:nginx.access.url} HTTP/%{NUMBER:nginx.access.http_version}\" %{NUMBER:nginx.access.response_code} %{NUMBER:nginx.access.body_sent.bytes} \"%{DATA:nginx.access.referrer}\" \"%{DATA:nginx.access.agent}" "-" %{NUMBER:request_time}[\s-]+(?<cache_status>[a-zA-Z]+)
$ vim /home/elk/logstash/conf/nginx-cache.conf
input { beats { host => "192.168.14.67" port => 5066 } } filter { grok { patterns_dir => "/home/elk/logstash/patterns" match => { "message" => "%{PROXY_HTTP_ACCESS}" } } } output { stdout { codec => rubydebug } #先输出测试一下看看格式对不对哈 }
$ /home/elk/logstash/bin/logstash -f /home/elk/logstash/conf/nginx-cache.conf
"cache_status" => "HIT", nginx.access.url" => "/00/11/00114827.mp3", "fromip" => "223.11.97.79", "nginx.access.body_sent.bytes" => "941328"
#就粘贴了一小点输出哈,可以看出我们定义的匹配规则已经可以了,已经对日志进行了切分。
2.2 Geoip的配置
下载城市数据库:
$cd /home/elk/logstash/conf
$wget http://geolite.maxmind.com/download/geoip/database/GeoLiteCity.dat.gz
$tar -zxvf GeoLite2-City.tar.gz
$cd GeoLite2-City_20171205/
$ ls -lh
总用量 61M -rw-r--r-- 1 2000 2000 55 12月 7 04:04 COPYRIGHT.txt -rw-r--r-- 1 2000 2000 61M 12月 7 04:04 GeoLite2-City.mmdb -rw-r--r-- 1 2000 2000 433 12月 7 04:04 LICENSE.txt -rw-r--r-- 1 2000 2000 116 12月 7 04:04 README.txt
#GeoLite数据库的官网说自己每月的第一个星期二的MaxMind更新。 因此,如果想要始终拥有最新的数据库,则应该设置一个每月一次下载数据库的cron作业。
$ cp GeoLite2-City.mmdb /home/elk/logstash/conf/
=====================================旧版的方式=============================================================
$gunzip GeoLiteCity.dat.gz
# ls -lh /home/elk/logstash/conf/GeoLiteCity.dat
-rw-r--r-- 1 elk elk 17M 12月 7 03:22 /home/elk/logstash/conf/GeoLiteCity.dat
#旧版本Logstash2.x系统使用的是这种二进制文件,但是在新版本使用不了会有报错的。
[2017-12-18T10:14:53,867][ERROR][logstash.pipeline ] Error registering plugin {:pipeline_id=>"main", :plugin=>"#<LogStash::FilterDelegator:0x2db88668 @metric_events_out=org.jruby.proxy.org.logstash.instrument.metrics.counter.LongCounter$Proxy2 - namespace: [stats, pipelines, main, plugins, filters, cfcf4d91f00a2e40c80b84aa6ba0d21b2f502729498fc50ad8bb3682d0c7bde3, events] key: out value:0, @metric_events_in=org.jruby.proxy.org.logstash.instrument.metrics.counter.LongCounter$Proxy2 - namespace: ...... [2017-12-18T10:14:53,963][ERROR][logstash.pipeline ] Pipeline aborted due to error {:pipeline_id=>"main", :exception=>java.lang.IllegalArgumentException: The database provided is invalid or corrupted., :backtrace=>["org.logstash.filters.GeoIPFilter.<init>(org/lo
#大概意思就是库文件可能有损坏也就是读取不了不支持这种形式.......
===========================================================================================================
2.3 Logstash配置文件中加入Geoip过滤并启动测试
$ vim /home/elk/logstash/conf/nginx-cache.conf
input { beats { host => "192.168.14.67" port => 5066 } } filter { grok { patterns_dir => "/home/elk/logstash/patterns" match => { "message" => "%{PROXY_HTTP_ACCESS}" } } geoip { source => "fromip" #指定哪个字段过滤成物理地址肯定是来源IP啊 database => "/home/elk/logstash/conf/GeoLite2-City.mmdb" #指定库的位置 } } output { stdout { codec => rubydebug } #还是先打印到屏幕先看看效果出来没有 }
$ /home/elk/logstash/bin/logstash -f /home/elk/logstash/conf/nginx-cache.conf #启动服务这次没有报错了。从下面的启动过程中已经可以看到已经加载了地址库并启动了程序
Sending Logstash's logs to /home/elk/logstash/logs which is now configured via log4j2.properties [2017-12-18T14:24:11,308][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"fb_apache", :directory=>"/home/elk/logstash/modules/fb_apache/configuration"} [2017-12-18T14:24:11,314][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"netflow", :directory=>"/home/elk/logstash/modules/netflow/configuration"} [2017-12-18T14:24:11,728][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified [2017-12-18T14:24:11,993][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600} [2017-12-18T14:24:16,042][INFO ][logstash.filters.geoip ] Using geoip database {:path=>"/home/elk/logstash/conf/GeoLite2-City.mmdb"} [2017-12-18T14:24:16,108][INFO ][logstash.pipeline ] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>500, :thread=>"#<Thread:0x104c045a@/home/elk/logstash/logstash-core/lib/logstash/pipeline.rb:290 run>"} [2017-12-18T14:24:16,934][INFO ][logstash.inputs.beats ] Beats inputs: Starting input listener {:address=>"192.168.14.67:5066"} [2017-12-18T14:24:17,029][INFO ][logstash.pipeline ] Pipeline started {"pipeline.id"=>"main"} [2017-12-18T14:24:17,059][INFO ][logstash.agent ] Pipelines running {:count=>1, :pipelines=>["main"]} [2017-12-18T14:24:17,077][INFO ][org.logstash.beats.Server] Starting server on port: 5066
2.4 web日志写入测试并调整
# cat /opt/lzotest1.log |head -1 >>/usr/local/nginx/logs/access.log #搞了一份web缓存的日志,先把第一行日志打到测试环境的的nginx日志文件中。要公网IP地址哦。
查看下Logstash的屏幕输出:
{ "nginx.access.http_version" => "1.1", "geoip" => { #因为我们走的默认,默认target标签就是geoip "city_name" => "Hebei", #城市地址有的时候精确到市有的时候就是省地址 "timezone" => "Asia/Shanghai", #时区是上海时区 "ip" => "123.183.135.141", #IP地址 "latitude" => 39.8897, #维度 "country_name" => "China", #国家的名字 "country_code2" => "CN", #国家简写 "continent_code" => "AS", #哪个大洲 "country_code3" => "CN", "region_name" => "Hebei", #地区名称 "location" => { #位置 "lon" => 115.275, #经度 "lat" => 39.8897 #维度 },
#默认就是会输出好多信息,看来还是需要精简一下输出field。
#通过IP查询,IP地址也是正确的确实是河北省。
#如果是内网IP地址呢,输入就是下面的结果:
"geoip" => {}, #geoip里面是空的不包含任何信息,因为内网解析不出来。 #if [fromip] !~ "^127\.|^192\.168\.|^172\.1[6-9]\.|^172\.2[0-9]\.|^172\.3[01]\.|^10\." { #当然可以加下if判断吧geoip过滤包一下,这样内网的就不过滤了也就不会出现geoip空字段了,不过如果你不自己测试着玩一般也没内网IP geoip { source => "fromip"
2.5 修改Logstash并写入ES集群
$ vim /home/elk/logstash/conf/nginx-cache.conf
input { beats { host => "192.168.14.67" port => 5066 } } filter { grok { patterns_dir => "/home/elk/logstash/patterns" match => { "message" => "%{PROXY_HTTP_ACCESS}" } } geoip { source => "fromip" target => "geoip" #默认就是这个target,就是过滤的内容都写到这个字段下面 database => "/home/elk/logstash/conf/GeoLite2-City.mmdb" fields => ["city_name", "country_code2", "country_name", "latitude", "longitude", "region_name"] #指定自己所需的字段 remove_field => ["[geoip][latitude]", "[geoip][longitude]"] #删除多余的字段 #fields => ["city_name", "country_code2", "country_name", "region_name"] 直接这样一句话也是可以达到上面两句话的效果的。 } } output { elasticsearch { hosts => ["192.168.14.60","192.168.14.61","192.168.14.62","192.168.14.63","192.168.14.64"] index => "nginx-cache-51niux-com-%{+YYYY.MM}" } }
$ /home/elk/logstash/bin/logstash -f /home/elk/logstash/conf/nginx-cache.conf #再次启动
# cat /opt/lzotest1.log |head -100 >>/usr/local/nginx/logs/access.log #然后写点测试数据进去,然后查看下Kibana界面。
#通过Kibana界面查看最新的一条,可以看到在Kibana上面效果已经出来了。
博文来自:www.51niux.com
三、Kibana的设置
3.1 加一个缓存命中率的图形
饼图命中率图标:
查看命中率图标:
3.2 设置地图映射用户位置
发现问题:
No Compatible Fields: The "nginx*" index pattern does not contain any of the following field types: geo_point
#加地图的时候出现这个问题。
#原因链接:http://www.tonitech.com/2517.html 也好多地方又说。
#http://blog.csdn.net/yanggd1987/article/details/50469113 #你可以用head插件看模板索引格式,当然有点基础可以用Kibana自带的Dev Tools来查看。
#在Elasticsearch中,所有的数据都有一个类型,什么样的类型,就可以在其上做一些对应类型的特殊操作。geo信息中的location字段是经纬度,我们需要使用经纬度来定位地理位置;在Elasticsearch中,对于经纬度来说,要想使用Elasticsearch提供的地理位置查询相关的功能,就需要构造一个结构,并且将其类型属性设置为geo_point,此错误明显是由于我们的geo的location字段类型不是geo_point。
#关键点就是如果你不想改索引的话,就logstash这个索引引用的模板里面有geo_point这个类型。
往回导解决问题:
$vim /home/elk/logstash/conf/nginx-cache.conf
input { beats { host => "192.168.14.67" port => 5066 } } filter { grok { patterns_dir => "/home/elk/logstash/patterns" match => { "message" => "%{PROXY_HTTP_ACCESS}" } } geoip { source => "fromip" target => "geoip" database => "/home/elk/logstash/conf/GeoLite2-City.mmdb" #fields => ["city_name", "country_code2", "country_name", "latitude", "longitude", "region_name"] #fields => ["city_name", "country_code2", "country_name", "region_name"] #remove_field => ["[geoip][latitude]", "[geoip][longitude]"] add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ] #这里就不能像上面那样把经纬度给删掉了,因为ES的map图形得依据经纬度来定位位置。 add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}" ] } mutate { convert => [ "[geoip][coordinates]", "float"] } } output { elasticsearch { hosts => ["192.168.14.60","192.168.14.61","192.168.14.62","192.168.14.63","192.168.14.64"] index => "logstash-nginx-cache-51niux-com-%{+YYYY.MM.dd}" #主要是这里索引改掉,改成logstash开头的,我觉得主要是这个地方。 } }
$ /home/elk/logstash/bin/logstash -f /home/elk/logstash/conf/nginx-cache.conf #重新启动一下
# curl -XDELETE 'http://192.168.14.64:9200/nginx-*'
# curl -XDELETE 'http://192.168.14.64:9200/logstash*' #索引都删除掉,测试环境随便搞嘿嘿。
# tail -100000 /opt/lzotest1.log >>/usr/local/nginx/logs/access.log #再来点数据重新测试一下。
博文来自:www.51niux.com
Kibana再次来创建一下:
#好了效果出来了,怎么保存这个图像就不说了右上角有save哈。
使用高德地图将地图地名显示成中文:
$# vim /home/elk/kibana/config/kibana.yml #在文件底部加上下面一句话
tilemap.url: 'http://webrd02.is.autonavi.com/appmaptile?lang=zh_cn&size=1&scale=1&style=7&x={x}&y={y}&z={z}'
$/home/elk/kibana/bin/kibana #kibana重新启动一下
#出图上面已经可以了,可以看到颜色越深说明这个地点用户越多。
#geoip.location里面包的经纬度。
四、使用自带的geoip库
4.1 filebeat采集数据到kafka
# ------------------------------ Kafka Output ------------------------------- output.kafka: hosts: ["192.168.1.164:9092","192.168.1.165:9092","192.168.1.166:9092"] topic: 'nginx_wan' enabled: true metadata: retry.max: 3 retry.backoff: 250ms refresh_frequency: 10m full: true worker: 4 max_retries: 3 bulk_max_size: 2048 timeout: 30s broker_timeout: 10s channel_buffer_size: 256 keep_alive: 3 compression: gzip compression_level: 4 max_message_bytes: 1000000 required_acks: 1 client_id: beats codec.format: #这种模式要注意,插入kafka是纯message模式,不会带有其他的tag标签,path啊之类的东西 string: '%{[message]}'
4.2 logstash的配置
input { kafka { bootstrap_servers => "192.168.1.164:9092,192.168.1.165:9092,192.168.1.166:9092" topics => ["nginx_wan"] consumer_threads => 4 group_id => "nginx_wan_01" auto_offset_reset => "latest" } } filter { grok { patterns_dir => "/tmp/logstash/patterns/nginx_wan" match => { "message" => "%{WAN_HTTP_ACCESS}" } } date { match => ["time_local", "dd/MMM/yyyy:HH:mm:ss Z"] } #假如我们access.log和error.log全采集了,因为是纯message消息只能通过截取的字段判断切割成功不成功,来判断此消息是不是access.log if [http_type] { geoip { source => "remote_addr" target => "geoip" add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ] add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}" ] } mutate { add_tag => [ "access.log" ] convert => { "upstream_response_time" => "float"} convert => { "request_time" => "float"} convert => { "bytes_sent" => "integer"} } useragent { target => "agent" source => "http_user_agent" } }else{ mutate { add_tag => [ "error.log" ] } } } output { if "access.log" in [tags]{ #stdout { codec => rubydebug } elasticsearch { hosts => ["192.168.1.164:9200","192.168.1.165:9200","192.168.1.166:9200"] index => "nginx-wan-%{+YYYY.MM.dd}" } } }
4.3 kibana的配置
#上面就是最后的来源IP的TOP20的城市排名做出来的饼图