• 售前

  • 售后

热门帖子
入门百科

nginx日记导入elasticsearch的方法示例

[复制链接]
风来时狂放 显示全部楼层 发表于 2021-10-26 13:27:17 |阅读模式 打印 上一主题 下一主题
将nginx日记通过filebeat收集后传入logstash,颠末logstash处置惩罚后写入elasticsearch。filebeat只负责收集工作,logstash完成日记的格式化,数据的更换,拆分 ,以及将日记写入elasticsearch后的索引的创建。
1、设置nginx日记格式
  1. log_format main    '$remote_addr $http_x_forwarded_for [$time_local] $server_name $request '
  2.             '$status $body_bytes_sent $http_referer '
  3.             '"$http_user_agent" '
  4.             '"$connection" '
  5.             '"$http_cookie" '
  6.             '$request_time '
  7.             '$upstream_response_time';
复制代码
2、安装设置filebeat,启用nginx module
  1. tar -zxvf filebeat-6.2.4-linux-x86_64.tar.gz -C /usr/local
  2. cd /usr/local;ln -s filebeat-6.2.4-linux-x86_64 filebeat
  3. cd /usr/local/filebeat
复制代码
启用nginx模块
  1. ./filebeat modules enable nginx
复制代码
查看模块
  1. ./filebeat modules list
复制代码
创建设置文件
  1. vim /usr/local/filebeat/blog_module_logstash.yml
  2. filebeat.modules:
  3. - module: nginx
  4. access:
  5.   enabled: true
  6.   var.paths: ["/home/weblog/blog.cnfol.com_access.log"]
  7. #error:
  8. # enabled: true
  9. # var.paths: ["/home/weblogerr/blog.cnfol.com_error.log"]
  10. output.logstash:
  11. hosts: ["192.168.15.91:5044"]
复制代码
启动filebeat
  1. ./filebeat -c blog_module_logstash.yml -e
复制代码
3、设置logstash
  1. tar -zxvf logstash-6.2.4.tar.gz /usr/local
  2. cd /usr/local;ln -s logstash-6.2.4 logstash
  3. 创建一个nginx日志的pipline文件
  4. cd /usr/local/logstash
复制代码
logstash内置的模板目次
  1. vendor/bundle/jruby/2.3.0/gems/logstash-patterns-core-4.1.2/patterns
复制代码
编辑 grok-patterns 添加一个支持多ip的正则
  1. FORWORD (?:%{IPV4}[,]?[ ]?)+|%{WORD}
复制代码
官方grok
http://grokdebug.herokuapp.com/patterns#
创建logstash pipline设置文件
  1. #input {
  2. # stdin {}
  3. #}
  4. # 从filebeat接受数据
  5. input {
  6. beats {
  7. port => 5044
  8. host => "0.0.0.0"
  9. }
  10. }
  11. filter {
  12. # 添加一个调试的开关
  13. mutate{add_field => {"[@metadata][debug]"=>true}}
  14. grok {
  15. # 过滤nginx日志
  16. #match => { "message" => "%{NGINXACCESS_TEST2}" }
  17. #match => { "message" => '%{IPORHOST:clientip} # (?<http_x_forwarded_for>[^\#]*) # \[%{HTTPDATE:[@metadata][webtime]}\] # %{NOTSPACE:hostname} # %{WORD:verb} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion} # %{NUMBER:response} # (?:%{NUMBER:bytes}|-) # (?:"(?:%{NOTSPACE:referrer}|-)"|%{NOTSPACE:referrer}|-) # (?:"(?<http_user_agent>[^#]*)") # (?:"(?:%{NUMBER:connection}|-)"|%{NUMBER:connection}|-) # (?:"(?<cookies>[^#]*)") # %{NUMBER:request_time:float} # (?:%{NUMBER:upstream_response_time:float}|-)' }
  18. #match => { "message" => '(?:%{IPORHOST:clientip}|-) (?:%{TWO_IP:http_x_forwarded_for}|%{IPV4:http_x_forwarded_for}|-) \[%{HTTPDATE:[@metadata][webtime]}\] (?:%{HOSTNAME:hostname}|-) %{WORD:method} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion} %{NUMBER:response} (?:%{NUMBER:bytes}|-) (?:"(?:%{NOTSPACE:referrer}|-)"|%{NOTSPACE:referrer}|-) %{QS:agent} (?:"(?:%{NUMBER:connection}|-)"|%{NUMBER:connection}|-) (?:"(?<cookies>[^#]*)") %{NUMBER:request_time:float} (?:%{NUMBER:upstream_response_time:float}|-)' }
  19.     match => { "message" => '(?:%{IPORHOST:clientip}|-) %{FORWORD:http_x_forwarded_for} \[%{HTTPDATE:[@metadata][webtime]}\] (?:%{HOSTNAME:hostname}|-) %{WORD:method} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion} %{NUMBER:response} (?:%{NUMBER:bytes}|-) (?:"(?:%{NOTSPACE:referrer}|-)"|%{NOTSPACE:referrer}|-) %{QS:agent} (?:"(?:%{NUMBER:connection}|-)"|%{NUMBER:connection}|-) %{QS:cookie} %{NUMBER:request_time:float} (?:%{NUMBER:upstream_response_time:float}|-)' }
  20. }
  21. # 将默认的@timestamp(beats收集日志的时间)的值赋值给新字段@read_tiimestamp
  22. ruby {
  23. #code => "event.set('@read_timestamp',event.get('@timestamp'))"
  24. #将时区改为东8区
  25. code => "event.set('@read_timestamp',event.get('@timestamp').time.localtime + 8*60*60)"
  26. }
  27. # 将nginx的日志记录时间格式化
  28. # 格式化时间 20/May/2015:21:05:56 +0000
  29. date {
  30. locale => "en"
  31. match => ["[@metadata][webtime]","dd/MMM/yyyy:HH:mm:ss Z"]
  32. }
  33. # 将bytes字段由字符串转换为数字
  34. mutate {
  35. convert => {"bytes" => "integer"}
  36. }
  37. # 将cookie字段解析成一个json
  38. #mutate {
  39. # gsub => ["cookies",'\;',',']
  40. #}
  41. # 如果有使用到cdn加速http_x_forwarded_for会有多个ip,第一个ip是用户真实ip
  42. if[http_x_forwarded_for] =~ ", "{
  43.      ruby {
  44.          code => 'event.set("http_x_forwarded_for", event.get("http_x_forwarded_for").split(",")[0])'
  45.         }
  46.     }
  47. # 解析ip,获得ip的地理位置
  48. geoip {
  49. source => "http_x_forwarded_for"
  50. # # 只获取ip的经纬度、国家、城市、时区
  51. fields => ["location","country_name","city_name","region_name"]
  52. }
  53. # 将agent字段解析,获得浏览器、系统版本等具体信息
  54. useragent {
  55. source => "agent"
  56. target => "useragent"
  57. }
  58. #指定要删除的数据
  59. #mutate{remove_field=>["message"]}
  60. # 根据日志名设置索引名的前缀
  61. ruby {
  62. code => 'event.set("@[metadata][index_pre]",event.get("source").split("/")[-1])'
  63. }
  64. # 将@timestamp 格式化为2019.04.23
  65. ruby {
  66. code => 'event.set("@[metadata][index_day]",event.get("@timestamp").time.localtime.strftime("%Y.%m.%d"))'
  67. }
  68. # 设置输出的默认索引名
  69. mutate {
  70. add_field => {
  71.   #"[@metadata][index]" => "%{@[metadata][index_pre]}_%{+YYYY.MM.dd}"
  72.   "[@metadata][index]" => "%{@[metadata][index_pre]}_%{@[metadata][index_day]}"
  73. }
  74. }
  75. # 将cookies字段解析成json
  76. # mutate {
  77. # gsub => [
  78. #  "cookies", ";", ",",
  79. #  "cookies", "=", ":"
  80. # ]
  81. # #split => {"cookies" => ","}
  82. # }
  83. # json_encode {
  84. # source => "cookies"
  85. # target => "cookies_json"
  86. # }
  87. # mutate {
  88. # gsub => [
  89. #  "cookies_json", ',', '","',
  90. #  "cookies_json", ':', '":"'
  91. # ]
  92. # }
  93. # json {
  94. # source => "cookies_json"
  95. # target => "cookies2"
  96. # }
  97. # 如果grok解析存在错误,将错误独立写入一个索引
  98. if "_grokparsefailure" in [tags] {
  99. #if "_dateparsefailure" in [tags] {
  100. mutate {
  101.   replace => {
  102.   #"[@metadata][index]" => "%{@[metadata][index_pre]}_failure_%{+YYYY.MM.dd}"
  103.   "[@metadata][index]" => "%{@[metadata][index_pre]}_failure_%{@[metadata][index_day]}"
  104.   }
  105. }
  106. # 如果不存在错误就删除message
  107. }else{
  108. mutate{remove_field=>["message"]}
  109. }
  110. }
  111. output {
  112. if [@metadata][debug]{
  113. # 输出到rubydebuyg并输出metadata
  114. stdout{codec => rubydebug{metadata => true}}
  115. }else{
  116. # 将输出内容转换成 "."
  117. stdout{codec => dots}
  118. # 将输出到指定的es
  119. elasticsearch {
  120.   hosts => ["192.168.15.160:9200"]
  121.   index => "%{[@metadata][index]}"
  122.   document_type => "doc"
  123. }
  124. }
  125. }
复制代码
启动logstash
  1. nohup bin/logstash -f test_pipline2.conf &
复制代码
以上就是本文的全部内容,希望对各人的学习有所资助,也希望各人多多支持脚本之家。

帖子地址: 

回复

使用道具 举报

分享
推广
火星云矿 | 预约S19Pro,享500抵1000!
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

草根技术分享(草根吧)是全球知名中文IT技术交流平台,创建于2021年,包含原创博客、精品问答、职业培训、技术社区、资源下载等产品服务,提供原创、优质、完整内容的专业IT技术开发社区。
  • 官方手机版

  • 微信公众号

  • 商务合作