搭建ELK日志监控平台那些事

ELK 是 elasticsearch / logstash / kibana 这三个英文单词的缩写,也即表明,这个技术栈中包含这三个组件,都是一家公司出品(www.elastic.co)。

kibana 负责展示,数据从elasticsearch中获取 
logstash 负责收集并解析日志数据,然后输出指定格式到elasticsearch
elasticsearch 负责保存数据,并提供检索功能。

这些组件可以在具体场景中按需组合使用,也可以单独使用。其中最有名气的当属 elasticsearch,是基于 Lucene 实现的高可用,高性能的搜索服务。现在,众多中大型企业级搜索就是使用的它,值得我们去好好学习一下。

中文文档:Elasticsearch权威指南
英文文档:Elasticsearch Reference

我这里使用的各个组件都是6.1.2版本,各个组件的不同版本在使用上可能有些差异,具体可以参考官方文档:Elastic Stack and Product Documentation。 记得参照对应版本的文档,不然可能找不到你期望的配置项说明!!!

在我们的技术栈中又加入了 filebeat / kafka ,因此整个架构是这样的:
filebeat => kafka => logstash => elasticsearch => kibana
为什么要使用 filebeat 这个组件?
1. Nginx 默认情况下,日志是写本地文件,因此需要把文件同步给logstash
2. 另外,我们也不期望 Nginx 写日志到一个远程机器,虽说Nginx使用 epoll 方式来处理各种事件,性能非常牛,加入此项可能在一定程度上不会增加请求响应时间。但是既然这两者之前有了关联,各种不确定的因素组合起来就会使后续排查故障变得非常复杂。我们期望的是两者之前没有耦合,不能因为收集日志的服务故障而影响到业务系统,因此这种隔离还是有必要的。

为什么要使用 kafka 这个组件?
1. 我们的 Nginx 日志每天产生的日志量非常大,因此我们会把日志定时切割(每2分切割一次),如果我们的日志在2分钟之内无法正常传输到收集日志的服务中,可能就会面临日志丢失的问题。比如,直接把日志推给logstash,logstash 的处理速度受限于elasticsearch的处理速度,在日志量非常大时,就会阻塞,导致filebeat 无法实时推送对应日志数据。
2. 可以重复多次使用日志数据。比如,测试环境、大数据等等情况。

我们的环境是:
CentOS release 6.6 (Final) x86_64 GNU/Linux
Filebeat 5.1.2
Kafka 1.0.0
Logstash 6.1.2
Elasticsearch 6.1.2
Kibana 6.1.2

准备阶段

总共使用6台服务器(下方括号中的数字为服务器编号)
Kafka 3台服务器(4/5/6)
Logstash/Elasticsearch 3台服务器(1/2/3)
Kibana 1台服务器(1)

由于在之前已经搭建过一套ELK监控系统,各个应用服务器上已经部署了 filebeat 5.1.2,所以也就没升级。
简单起见,使用 yum 安装,其他方式请参考官方文档,这里不详细列出。
添加yum源:

下载并安装公共签名密钥:
rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch
添加RPM 配置 /etc/yum.repos.d/elastic.repo
[elastic-6.x]
name=Elastic repository for 6.x packages
baseurl=https://artifacts.elastic.co/packages/6.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md
注意:
1. 使用yum安装只能安装对应的最新版本,可选项为 5.x 、 6.x 。如果要安装 5.x 版本需要把repo文件中的 6.x 替换为对应的5.x,但是不能把 6.x 替换为 6.1 。目前,在官方的 yum 只支持2个大版本,并且是每个版本中的最新版本。比如,我刚开始安装的时候是 6.1.2 ,过2月之后使用相同的方式安装得到的是6.3.0版本。如果要安装指定版本也可以,需要使用 rpm 包进行安装,而不能直接使用 yum 方式安装。rpm 包下载地址可以在上面提到的文档中有说明。
2. 这几个组件安装次序没有严格的次序限制,统一安装完之后再进行配置各项参数。

在各个应用服务器上安装 filebeat
sudo yum install filebeat

在ELK集群服务器上安装 ELK
sudo yum install kibana
sudo yum install logstash
sudo yum install elasticsearch

安装成功之后,安装文件分别会放置到:
/usr/share/filebeat/
/usr/share/kibana/
/usr/share/logstash/
/usr/share/elasticsearch/

配置文件分别会放置到:
/etc/filebeat
/etc/kibana
/etc/logstash
/etc/elasticsearch

启动、重启、停止命令分别是
/etc/init.d/filebeat {start|stop|status|restart|condrestart}
/etc/init.d/kibana {start|force-start|stop|force-start|force-stop|status|restart}
initctl {start|restart|reload|status|stop} logstash
/etc/init.d/elasticsearch {start|stop|status|restart|condrestart|try-restart|reload|force-reload}

注意:
1. logstash / elasticsearch 是java语言开发,基于JVM运行环境,所以启动前需要安装好JAVA运行环境,最低版本必须要是 Java 8. 
2. logstash 安装完之后没有自带启动脚本(指 initctl  或者 init.d 或者 services ),我们提供一个 initctl 配置文件,放到指定目录下就可以了,当然也可以自己制作启动脚本。请参看文章最末尾说明!!
 
接下来我们开始完善各组件配置文件,由于 kibana / logstash 依赖 elasticsearch,所以我们先来配置 elasticsearch ,使其先启动服务。

先配置elasticsearch

1. elasticsearch.yml
这个文件是用来配置elasticsearch相关的选项,主要关注这些配置
#集群名称,每个集群内的所有节点需要配置为相同的名称。如果是多个集群,每个集群需要从名称上做区分,也即各集群名称不能相同。
cluster.name: le-elasticsearch
#节点名称,如果不配置,默认是一个随机数。为了方便区分、管理,我这里配置为主机名,通过 ${HOSTNAME} 这个变量即可。
node.name: ${HOSTNAME}
#数据目录,elasticsearch 是一个开源搜索引擎,重启后数据不丢失,所以必然有数据存储的位置,就是这里了,注意磁盘空间大小。
path.data: /data/elasticsearch/data
#日志目录
path.logs: /letv/logs/elasticsearch
#对外服务的ip,这里也使用了一些特殊值。指 127.0.0.1 与 内网ip 可访问
network.host: ["_local_", "_site_"]
#对外服务的端口
http.port: 9200
#集群节点ip列表,可以不用写集群所有ip,但是为了统一,还是写所有的。
discovery.zen.ping.unicast.hosts: ["10.183.97.33", "10.183.97.34", "10.183.97.51"]
#主节点选举成功,最少需要多少个节点的投票支持
discovery.zen.minimum_master_nodes: 2
#xpack 插件的授权类型,basic是免费的,还有其他收费版本
xpack.license.self_generated.type: basic

2. jvm.options 
这个文件是用来配置JVM相关的选项,主要关注下面2项配置,这两项可以根据实际运行情况调整,后续会提到。
-Xms2g 初始堆空间,我这里配置的是 2G
-Xmx2g 最大的堆空间,我这里配置的是 2G

注意:
1. 上面的部分配置,比如 network.host / node.name 可以直接指定,为什么要使用变量?为了集群每个节点配置文件相同,方便统一管理、维护。
2. network.host 支持一些特殊值,这里有详细说明:Network Settings
3. discovery.zen 相关配置可以参考 Zen Discovery 
4. discovery.zen.ping.unicast.hosts 为了加入集群,一个节点需要知道集群中至少一些其他节点的主机名或者IP地址
5. x-pack 插件是集好多功能的官方插件,包括的功能:安全验证模块、告警模块,监控模块,报告模块、图形模块、机器学习模块等等。
功能介绍:https://www.elastic.co/cn/products/x-pack
配置参考:https://www.elastic.co/guide/en/elasticsearch/reference/current/setup-xpack.html
6. xpack.license.self_generated.type:basic  是x-pack插件的配置项,如果没有安装该插件则不需要配置。我这里安装了,并且已经过了一个月的试用期,所以这里需要配置为 basic。记得当时一个月试用期到了之后,kibana无法登陆。后来通过elasticsearch调整了一个关于验证的配置,但是也发现时不时会出现访问异常的问题。所以,还是建议到期之后直接添加上这个配置项。毕竟xpack还有一些不错的可以免费使用的功能,所以还是建议大家安装。安装过程后面介绍。

配置完这些,就可以启动elasticsearch服务了,注意关注日志输出。祝你成功!
现在我们机器的IP列表是 10.183.97.33, 10.183.97.34, 10.183.97.51,端口是9200。这些信息我们下面会用到。

接着来配置logstash

1. logstash.yml 这个是logstash的主配置文件,其中包含配置主pipeline选项
#这个是是节点的名称,在有多个logstash节点的情况下,可以在kibana监控中通过这个名称区分开。为了所有节点配置相同,建议不用配置,默认是主机名
#node.name: test 
#数据目录,logstash自身不保存数据,因此这里是为一些插件中可能存在持久化需求考虑的,默认值是 LOGSTASH_HOME/data
path.data: /data/logstash
#这个是主pipeline工作线程数,默认是当前cpu核数,所以也不需要配置,保持默认值就好
#pipeline.workers: 2
#指定主pipeline包含的配置文件(input/filter/output 这些模块的配置)
path.config: /etc/logstash/conf.d/*.conf
#日志目录
path.logs: /letv/logs/logstash
#xpack管理相关配置,在收费版本中可以使用,我这里使用免费版本,所以注释掉,不使用。如果开启成功,可以直接在kibana中在线修改 pipeline 的配置
#xpack.management.enabled: true
#xpack.management.elasticsearch.url: "http://10.183.97.33:9200/"
#xpack.management.elasticsearch.username: logstash_system
#xpack.management.elasticsearch.password: logstash_system
#xpack.management.logstash.poll_interval: 5s
#xpack.management.pipeline.id: ["apache", "cloudwatch_logs"]
#xpack监控相关配置,在免费版本中也可以使用,加上这个配置,可以在kibana=>Monitor页面中查看到关于该logstash的相关数据及图表展示,非常推荐开启。
xpack.monitoring.elasticsearch.url: ["http://10.183.97.33:9200", "http://10.183.97.34:9200", "http://10.183.97.51:9200"]
#xpack.monitoring.elasticsearch.username: "logstash_system" 
#xpack.monitoring.elasticsearch.password: "logstash_system"

2. pipelines.yml 如果要定义多个pipeline,需要用到这个配置。在业务日志类型不是很复杂的情况下也用不到。不过,使用多个pipeline 也有一些好处,如可以分别配置每个pipeline所使用的线程数,不同的pipeline 使用不同的内置队列机制,因此其中一个pipieline事件处理延迟,不会影响到另外的pipeline。具体配置参数与logstash.yml中看到的主pipieline是相同的,在pipelines.yml 文件中已列表的形式配置多个pipeline。具体可以参考 Multiple Pipelines 

3. jvm.options 关于JVM的配置,与elasticsearch类似
-Xms2g
-Xmx2g

4. conf.d/ 这个目录是主pipeline相关的一些配置项,包括 input/filter/output 这些模块的配置。pipeline 支持配置多个input,多个filter、多个output,如果所有这些配置都放置在同一个文件中,一定是依次执行,符合常理。多个input是并列的,多个output也是并列的,多个filter是串行的。这里并行指相互独立,没有直接依赖性,没有严格的先后顺序,多个input进来的数据都会进入filter阶段,然后分别进入多个output。多个filter是串行执行,当把多个filter放入不同的文件中时,会是什么状况呢?其实,当真正的ELK为生产环境使用时,我们一定是有各种各样的日志格式,我们的filter规则非常复杂,这样只filter部分就占了非常大的篇幅,后续如果要修改某个日志格式,如同海底捞针。我们一定是希望可以按照不同的模块、不同的业务把这些filter分开存放,这样结构非常清晰,维护起来也非常方便。另外,可能不同的业务,日志格式一样,都是TAB分割字段,但是在不同业务中各字段的含义是不同的,此时filter中解析出来的字段名该如何定义呢?我们必须要通过在每个日志进入filter阶段的时候有个标记,然后把每个grok按照if条件解析。我们是通过这么几部实现的,a> 保证各自业务内部日志格式唯一,b> 在不同业务的filebeat配置中添加不同的tag c> 在grok外层添加if条件过滤,条件包括 tag 与 source 的组合。通过这些方面的分析,filter部分一定是整个配置中最为复杂的,所以我们希望可以按照不同的业务分开存放filter,共用input/output。logstash 按照文件名ascii码正序依次载入这个目录下的所有 .conf,有了这个规律,我们的filter执行顺序就确定了,再集合if语句,可以实现的足够达到你的预期。

注意:
1. logstash pipieline 支持的条件语句(if)可以参考文档 https://www.elastic.co/guide/en/logstash/current/event-dependent-configuration.html#conditionals
参考示例 https://www.elastic.co/guide/en/logstash/current/config-examples.html#using-conditionals
2. input模块参考文档 https://www.elastic.co/guide/en/logstash/current/input-plugins.html
3. filter模块参考文档 https://www.elastic.co/guide/en/logstash/current/filter-plugins.html
4. output模块参考文档 https://www.elastic.co/guide/en/logstash/current/output-plugins.html
5. 我们的配置,将放在本文最下方,在使用中可以针对性的参考。

如果配置好了 input / filter / output 这时就可以启动 logstash 了。

接着来配置kibana

kibana 配置是最简单的,在使用中我们只配置了一个节点,如果kibana服务宕机了,直接重启就好了,没有数据丢失的影响。kibana除了复杂展示数据,有些kibana插件也提供了告警功能,如 sentinl 
#对外服务端口,默认5601,可以不用修改
#server.port: 5601
#对外服务IP,默认是127.0.0.1。为了内外其他用户可访问,我们需要修改此项配置。如上,为了配置文件的统一化,我们使用 0.0.0.0 ,而不使用本机ip地址,当然两者从实现上都可以达到效果。
server.host: "0.0.0.0"
#elasticsearch接口地址,通过这个接口来获取日志数据,然后生成各种图表。
elasticsearch.url: "http://10.183.97.33:9200"
#日志路径
logging.dest: /letv/logs/kibana/kibana.log

接着来配置filebeat

filebeat是部署在应用服务器上,而不是ELK集群机器上。我这里使用的是 filebeat 5.1.2,新版本中部分参数可能有所不同。
#输入日志的类型 log 代表文件,还有其他支持的方式。input_type 可以定义多个。
#input_type:log
#日志路径,可以使用* 匹配对应目录下的所有文件,但是无法递归匹配。可以同时配置多个日志路径
paths:
#自定义字段,可以在这里给日志数据加上一些特殊标记,方便后续的logstash条件处理,或者在监控中使用这些字段。可以在不同的input_type下定义,也可以在顶层层级下定义。
fields:
#自定义标签,可以在这里给日志数据加上一些特殊标记,方便后续logstash条件处理,或者在监控中使用这些标签。可以在不通的input_type 下定义,也可以在顶级层级下定义。fields / tags 功能类似,数据结构不同,可以根据不同情况使用。
tags:
#kafka相关配置,我们这里日志全部输出到kafka,所以这里配置kafka host/topic
output.kafka:
#日志级别
logging.level: error

注意:
1. fileds/tags 在不同的层级下定义是不同的,在顶层层级定义,代表全局生效。
2. input_type 可以定义多个,其下的所有配置跟随 input_type 也可以分别配置。在我们的实践中,我们把 nginx log 与 tracing log 分别输出到了不同的topic中,其中使用到了多个input_type 下定义不同的filed实现。具体可查看文章结尾的配置文件。

x-pack 插件

x-pack 插件有 kibana x-pack插件,logstash x-pack插件,elasticsearch x-pack插件,部分功能需要这3个组件都安装x-pack插件,而部分功能应该没这个要求,单独安装也可以。不过还是建议大家在这3个组件上全部安装 x-pack,可以避免很多问题。
已logstash 安装为例,kibana /elasticsearch 安装过程相同。
1. 切换到安装目录 /usr/share/logstash/
2. 执行安装命令
bin/logstash-plugin install x-pack
有时可能因为网络问题,默认方式无法正常安装,这时需要先从远程地址下载 x-pack包,如果提前下载完成,可以直接使用本地 x-pack 包进行安装。
bin/logstash-plugin install file:///path/to/file/x-pack-6.2.4.zip
3. 配置的话,这里不再展开了,上面穿插着把一些关键配置已经说明了。
4. 如果过了一个月试用期,记得在elasticsearch配置文件elaticsearch.yml 中追加 x-pack 相关授权类型为 basic 。 否则会因为授权限制导致各种问题出现,比如登陆验证等等。如果是 basic 类型,是没有登陆验证功能的,如果需要进行登陆验证,只能靠前端部署一个Nginx来弥补。

sentinl 插件

sentinl 是 kibana 的一款插件,用来提供告警功能,跟 x-pack 中的 watcher 模块类似。参考:官方文档  / github源码
这个需要安装对应版本,我这里kibana是6.1.2,所以安装过程是这样的:
cd /usr/share/kibana/bin/

./kibana-plugin install https://github.com/sirensolutions/sentinl/releases/download/tag-6.2.3-2/sentinl-v6.1.2.zip
一个配置分为这么几部分, input / condition / transform / actions 。input 指定从 elasticsearch 中查找什么样的数据,这里需要掌握 elaticsearch的查询语法及索引相关知识。condition 就是触发告警的条件,也比较简单。transform 这个用来数据转换,没有实际使用过。action 就是最终触发的各种行为了,包括 web-hook / email / slack 等等。基本满足要求。
另外,sentinl 关于发邮件账号的信息需要kibana的配置文件kibana.yml中配置,详情可参考下方 kibana配置。
对 input 相关的配置需要注意,在不需要对查询结构的文档进行相关性得分字段时,选择使用过滤(filter)要比查询(query)性能更好。
在有多个范围查询时,写法也比较特别,需要嵌套好多层,这个琢磨了好几天,看到文档中的一个相似的例子才得以解脱,真是值得好好记下来。
刚才说到的这些相关知识多数都在这个文档中有说明,建议认真看完,请求体查询。如果有兴趣,可以把整个文档读一遍,相信你以后一定能用到的。

整体回顾

在回顾一下,整个历程,发现其中遇到的好多问题都解决了,并且现在也想不起来具体时间啥问题了,我这里罗列一些我还有印象的几个问题:
1. 日志目录、数据目录权限问题
kibana / logstash / elasticsearch 实际运行时都不是已root方式运行,所以各个目录权限一定要给对应账户权限

2. logstash 内置的一些模式。logstash-patterns-core 
3. x-pack 过了试用期,导致的授权问题。
4. logstash 性能问题
多个节点都配置 相同的kafka集群,发现只有一个几点在处理数据,暂时还没确定是啥原因。不过单节点的性能也可以到 2w/s 。如果需要可以参考官方文档,进行调优。
Performance Tuning
5. elasticsearch 性能问题
如果需要可以参考官方文档,进行调优:
Tune for indexing speed
Tune for search speed
6. logstash 安装完,不能使用 initctl start logstash 启动,因为缺少启动脚本。把 logstash.conf 放置到 /etc/init/ 目录即可。这里有压缩包,下载后解压即可获得。
logstash.conf.tar.gz

至此 ELK+filebeat 已经介绍完了,kafka的配置,不在这里详细展开了。
下面是我们的完整配置文件,部分敏感信息做了调整,请针对性参考,希望对你有用。
filebeat.yml
filebeat.prospectors:
- input_type: log
  paths:
    - /letv/logs/odp.my.le.com.access.log
    - /letv/logs/benefit.my.le.com.access.log
    - /letv/logs/phpmessages
  fields:
    topic: odp_log
- input_type: log
  paths:
    - /letv/logs/phpzipkin/*
  fields:
    topic: lefan_tracing_log
tags: ["service-lefan", "web-tier"]
output.kafka:
  enabled: true
  hosts: ["10.183.97.52:9092", "10.183.97.62:9092", "10.183.97.63:9092"]
  topic: "%{[fields][topic]}"
logging.level: error
kibana.yml
server.host: "0.0.0.0"
elasticsearch.url: "http://10.183.97.33:9200"
logging.dest: /letv/logs/kibana/kibana.log
sentinl:
  settings:
    email:
      active: true
      user: "hello@abc.com"
      password: "666"
      host: "smtp.qq.com"
      port: 25
elasticsearch.yml
cluster.name: le-elasticsearch
node.name: ${HOSTNAME}
path.data: /data/elasticsearch/data
path.logs: /letv/logs/elasticsearch
bootstrap.system_call_filter: false
network.host: ["_local_", "_site_"]
discovery.zen.ping.unicast.hosts: ["10.183.97.33", "10.183.97.34", "10.183.97.51"]
discovery.zen.minimum_master_nodes: 2 
xpack.license.self_generated.type: basic
logstash.yml
path.data: /data/logstash
path.config: /etc/logstash/conf.d/*.conf
path.logs: /letv/logs/logstash
xpack.monitoring.elasticsearch.url: ["http://10.183.97.33:9200", "http://10.183.97.34:9200", "http://10.183.97.51:9200"]
/etc/logstash/conf.d/ 目录结构
[liuweifeng@dfs-zb20-node1 ~]$ ll /etc/logstash/conf.d/
total 20
-rw-r--r-- 1 root root 2074 May 18 15:59 filter_bbs.conf
-rw-r--r-- 1 root root   12 May 18 15:59 filter.conf
-rw-r--r-- 1 root root 1656 May 18 15:59 filter_lefan.conf
-rw-r--r-- 1 root root    0 May 18 15:59 filter_sso.conf
-rw-r--r-- 1 root root  193 May 18 15:59 input.conf
-rw-r--r-- 1 root root 1077 May 18 15:59 output.conf
/etc/logstash/conf.d/input.conf
input {

  kafka {
    bootstrap_servers => "10.183.97.52:9092,10.183.97.62:9092,10.183.97.63:9092"
    topics => ["bbs_log", "lefan_log", "growth_log", "odp_log"]

    codec => "json"
  }

}
/etc/logstash/conf.d/output.conf
filter {
  date {
    match => [ "timestamp", "E MMM d HH:mm:ss.SSS yyyy", "E MMM  d HH:mm:ss.SSS yyyy", "dd/MMM/yyyy:HH:mm:ss Z" ]
    timezone => "Asia/Shanghai"
    remove_field => [ "timestamp" ]
  }
  if [@metadata][index] {
      mutate {
        rename => { "[beat][hostname]" => "hostname"}
        remove_field => [ "input_type", "source" , "type", "beat", "tags", "offset", "fields"]
      }
  } else {
      mutate { add_field => { "[@metadata][index]" => "logstash-test" } }
  }
  if [loglevel] in ["debug", "info"] {
      drop { }
  }
}
output {
    elasticsearch {
      hosts => ["http://10.183.97.33:9200", "http://10.183.97.34:9200", "http://10.183.97.51:9200"]
      user => "elastic"
      password => "elastic"
      manage_template => false
      index => "%{[@metadata][index]}-%{+YYYY.MM.dd}"
    }
}
/etc/logstash/conf.d/filter.conf
filter {
}
/etc/logstash/conf.d/filter_bbs.conf
filter {
  if "service-bbs" in [tags] {
    if [source] =~ "nginx" {
      grok {
        match => { "message" => "(?:%{IPORHOST:clientip}|-), %{HTTPDUSER:remote_user} \[%{HTTPDATE:timestamp}\] %{NUMBER:request_time:float} %{WORD:request_method} \"http://%{URIHOST:request_host}%{URIPATH:request_path}%{URIPARAM:request_param}?\" %{NOTSPACE:protocol} %{NUMBER:status:int} (?:%{NOTSPACE:content_type}|-)(; ?charset=%{NOTSPACE:content_charset})? (?:%{NUMBER:bytes:int}|-) (?:%{NUMBER:content_length:int}|-) (?:%{NUMBER:upstream_status:int}|-) (?:%{NUMBER:upstream_response_time:float}|-) %{QS:referrer} %{QS:user_agent} %{QS:x_forwarded_for} \"(?:%{INT:ssouid:int}|)\""}    
        remove_field => [ "message", "clientip", "remote_user", "request_method", "request_param", "protocol", "content_type", "content_charset", "content_length", "upstream_status", "upstream_response_time", "referrer", "user_agent", "x_forwarded_for", "ssouid" ]    
        tag_on_failure => []
      }
      mutate { add_field => { "[@metadata][index]" => "logstash-bbs-nginx-access" } }
    } else if [source] =~ "sphinx" {
      grok {
        match => { "message" => "\[(?<timestamp>%{DAY} %{MONTH} \s?%{MONTHDAY} %{TIME} %{YEAR})\] %{NUMBER:time_1:float} %{WORD:time_1_unit} %{NUMBER:time_2:float} %{WORD:time_2_unit} \[%{NOTSPACE} %{NUMBER:results:int} %{DATA}\] \[%{WORD:index}\] %{GREEDYDATA:words}" }
        remove_field => [ "message" ]
        tag_on_failure => []
      }
      mutate { add_field => { "[@metadata][index]" => "logstash-bbs-sphinx-search" } }
    } else if [source] =~ "locallog" {
      grok {
        match => { "message" => "%{TIMESTAMP_ISO8601:timestamp}%{SPACE}%{IP:clientip}%{SPACE}%{INT:uid}%{SPACE}%{URIPATHPARAM:request}%{SPACE}code:%{INT:ret:int}, message:%{GREEDYDATA:msg}"}
        remove_field => [ "message" ]
        tag_on_failure => []
      }
      mutate { add_field => { "[@metadata][index]" => "logstash-bbs-extra" } }
    }
  }
}
/etc/logstash/conf.d/filter_lefan.conf
filter {
  if "service-lefan" in [tags] {
    if [source] =~ "access.log" {
      grok {
        match => { 
          "message" => "(?:%{IPORHOST}|-) - %{HTTPDUSER:remote_user} \[%{HTTPDATE:timestamp}\] %{URIHOST:request_host} \"%{WORD:request_method} %{URIPATH:request_path}%{URIPARAM:request_param}?\ %{NOTSPACE:protocol}\" %{NUMBER:status:int} (?:%{NUMBER:bytes:int}|-) %{QS:referrer} %{QS:user_agent} \"%{IPORHOST:clientip}\" %{NUMBER:request_length:int} %{NUMBER:request_time:float} (?:%{NUMBER:upstream_response_time:float}|-)"
        }
        remove_field => [ "message", "remote_user", "request_method", "request_param", "protocol", "referrer", "user_agent", "clientip", "request_length", "upstream_response_time"]
        tag_on_failure => []
      }
      mutate { add_field => { "[@metadata][index]" => "logstash-lefan-nginx-access" } }
    } else if [source] =~ "error.log" {
      mutate { add_field => { "[@metadata][index]" => "logstash-lefan-nginx-error" } }
    } else if [source] =~ "phpmessages" {
      grok {
        match => { "message" => "%{TIMESTAMP_ISO8601:timestamp}%{SPACE}(?:%{IP:clientip}|)%{SPACE}%{HOSTNAME:project}%{SPACE}%{WORD:module}%{SPACE}%{WORD:controller}%{SPACE}%{WORD:action}%{SPACE}%{WORD:runid}%{SPACE}%{WORD:loglevel}%{SPACE}%{GREEDYDATA:error}"}
        remove_field => [ "message" ]
        tag_on_failure => []
      }
      mutate { add_field => { "[@metadata][index]" => "logstash-lefan-phpmessages" } }
    }
  }
}
Sentinl:
{
  "actions": {
    "email_admin": {
      "throttle_period": "0h0m5s",
      "email": {
        "to": "liuweifeng@le.com;",
        "from": "Kibana Alarm <noreply@521-wf.com>",
        "subject": "Server Exception(lefan 5xx)",
        "priority": "high",
        "body": "2min 5xx:\nFound {{payload.hits.total}} Events"
      }
    }
  },
  "input": {
    "search": {
      "request": {
        "index": [
          "<logstash-lefan-nginx-access-{now/d}>"
        ],
        "body": {
          "query": {
            "bool": {
              "filter": {
                "bool": {
                  "must": [
                    {
                      "range": {
                        "status": {
                          "gte": 500,
                          "lte": 599
                        }
                      }
                    },
                    {
                      "range": {
                        "@timestamp": {
                          "gte": "now-2m",
                          "lte": "now"
                        }
                      }
                    }
                  ]
                }
              }
            }
          }
        }
      }
    }
  },
  "condition": {
    "script": {
      "script": "payload.hits.total > 10"
    }
  },
  "transform": {},
  "trigger": {
    "schedule": {
      "later": "every 1 minutes"
    }
  },
  "disable": false,
  "report": false,
  "title": "lefan-nginx-5xx-wather"
}

标签: ELK

23
May 2018
AUTHOR WiFeng
CATEGORY Asset
COMMENTS No Comments

添加新评论 »

   点击刷新验证码