通过logstash搜集日志
这里搜集日志可以使用ELK的一个插件filebeat对日志进行处理,并传输到后端的程序
在这里有一个不好的地方, 如果想要直接使用filebeat将日志发送到elasticsearch的话, 它并不能对任何字段进行替换等处理
比较明显的问题就是, 一般我们需要将@timestamp替换成日志里面的时间而不是程序对日志的处理时间, 这一点它无法做到
还有一点, 使用filebeat对多行日志进行处理时似乎会发生日志收集错乱的现象, 这个问题有待测试, 因为filebeat程序是自带处理多行日志的
当然好处也是有点, 可以比较省资源
1 | input { |
上面是一个logstash的配置文件,处理的日志格式大概是这样的
1 | [ERROR 2017-05-04 10:12:24,281 ./connect_info.py:336 @ 8299] - socket send and recieve Error: Traceback (most recent call last): |
这里分为三个段落
input段:
采用文件的形式, path可以采用*来匹配任意字符(匹配单个字符待测试),
add_field 可以增加字段, 可以很好的区分开日志
codec => multiline 采用多行的模式 如果不是以[
开头的将后面的行算作第一行
filter段:
这里采用的是 grok 匹配前面的无规则(非json格式)内容, 其后的json格式内容统一存到 result 字段, 并移除message字段
再通过 if 判断, 提取需要处理的日志 使用 mutate 对日志进行切分, 标准的json格式日志将保存在 field2 字段 之后通过 json 进行格式化该字段
最好将格式化好的字段中的时间 替换默认的 @timestamp 字段
output字段:
elasticsearch 将日志输出到elasticsearch 中
stdout 将日志输出到屏幕终端
通过elasticsearch对日志进行检索
先通过results.req.searchFunc字段过滤出包含 searchmusic的内容, 再判断 results.response.data.total 是否大于 1 排除搜索无结果的内容
最后使用 aggregations 对 results.req.text.keyword 字段结果进行聚合 统计出该字段的每个内容的个数, size控制显示多少个内容
aggregations 上面的size控制不显示其他搜索内容, 只关注aggregations 统计结果
1 | GET /logstash-name-2017.06*/_search |