/etc/logstash/config/logstash.yml:主要用于控制logstash运行时的状态 /etc/logstash/config/startup.options:logstash 运行相关参数
/opt/log_samele : 日志资源文件
/use/share/logstash/ : logstash安装目录
日志格式模板 看起来就像是这样
127.0.0.1 - - [25/Oct/2019:11:17:45 +0800] "GET / HTTP/1.1" 200 11418 "-" "Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/77.0.3865.120 Safari/537.36" 127.0.0.1 - - [25/Oct/2019:11:17:45 +0800] "GET /favicon.ico HTTP/1.1" 200 21630 "http://127.0.0.1:8081/" "Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/77.0.3865.120 Safari/537.36"配置文件部分解释
1.输入文件
input { file { path => "xxx/xxx/xxx*.txt" // 日志文件路径 add_field => {"xxx"=>"xxx"} // 添加一个自定义字段 tags => ["xxx"] // 添加一个自定义的tags用来判断或者是当变量使用 type => "system" // 可自定义的type值也可以作为判断条件 } }2.处理部分
filter { if[type] == "system"{ //根据type判断 如果为true则进入该grok grok { match => { "message" => "%{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] \"(?:%{WORD:verb} %{NOTSPACE:request}(?: HTTP/%{NUMBER:httpversion})?|%{DATA:rawrequest})\" %{NUMBER:response} (?:%{NUMBER:bytes}|-)" // 解析上面指定格式的日志 } remove_field => "message" // 删除message字段节省es内存 } date { match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ] //将日志时间替换为@timestamp } } }3.输出部分
output { elasticsearch { // 输出到es hosts => ["127.0.0.1:9200"] // es的访问地址 index => "%{type}-%{tags}-%{+YYYY-MM}" // 索引名称 } stdout { codec => "json_lines" } // 以json形式打印到控制台 }1.首先查看日志文件的字符集 [logStash@hadoop3 gamelogs]$ file game.txt game.txt: UTF-8 Unicode (with BOM) text, with CRLF line terminators 2.如果是UTF-8编码,则在LogStash配置文件设置charset为UTF-8
input{ file{ path=>"/home/logStash/gamelogs/*.txt" start_position=>"beginning" stat_interval=>10 type=>"gameLogs" codec=>plain{ charset=>"UTF-8" } } } output{ kafka{ topic_id => "gameLogTopic" codec => plain { format => "%{message}" charset => "UTF-8" } bootstrap_servers=>"192.168.25.100:9092,192.168.25.102:9092" } }3.如果不是UTF-8,则统一设置为GB2312
input{ file{ path=>"/home/logStash/gamelogs/*.txt" start_position=>"beginning" stat_interval=>10 type=>"gameLogs" codec=>plain{ charset=>"GB2312" } } } output{ kafka{ topic_id => "gameLogTopic" codec => plain { format => "%{message}" charset => "GB2312" } bootstrap_servers=>"192.168.25.100:9092,192.168.25.102:9092" } }转载于:https://blog.csdn.net/zhou373986278/article/details/82191898
我们知道在启动logstash的时候,只要加上-f /you_path_to_config_file就可以加载配置文件了,如果我们需要加载多个配置文件,只需要-f /you_path_to_config_directory就可以了。简单说,就是在-f后面加上目录就可以。 注意:目录后面不能加 * 号,否则只会读取一个文件,但是在读取日志文件时,可以匹配所有,比如sys.log可以匹配所有以sys.log开头的日志文件,如sys.log1,sys.log2等。
示例如下:
//比如 /home/husen/config/目录下有 //in1.conf、in2.conf、filter1.conf、filter2.conf、out.conf这5个文件 //我们使用 /logstash-5.5.1/bin/logstash -f /home/husen/config启动logtstash //logstash会自动加载这个5个配置文件,并合并成1个整体的配置文件比如:
## in1.conf内容如下: input{ file{ path=>[ "/home/husen/log/sys.log" ] } } ## in2.conf内容如下: input{ file{ path=>[ "/home/husen/log/error.log" ] } } ## out1.conf如下 elasticsearch { action => "index" hosts => "localhost:9200" index => "from_sys_log" codec => "json" } ## out2.conf如下 elasticsearch { action => "index" hosts => "localhost:9200" index => "from_error_log" codec => "json" } //这几个配置文件的目的是: //想把in1.conf读进来的sys.log的索引建立为from_sys_log //把in.conf读进来的error.log的索引建立为femo_error_log //logstash-5.5.1/bin/logstash -f /home/husen/config //启动之后,会发现in1.conf的日志被输出了两次,in2.conf读进来的日志也被输出了两次 //结论:logstash读取多个配置文件只是简单的将所有配置文件整合到了一起! //如果要彼此独立,需要自己加字段,然后判断一下 //比如读取来不同不同服务器的同样格式的日志,那么filter是可以共用的 //但是输出的索引需要分别建立,以提高辨识度如果要在配置文件中,独立一些部分,又要共用一些部分,比如我上门提高同样的日志来自不同的服务器,需要用同样的filter,但是建立不同的索引的问题,该怎么办? 建议使用tags或者type这两个特殊字段,即在读取文件的时候,添加标识符在tags中或者定义type变量。
示例如下:
## in1.conf内容如下: input{ file{ path=>[ "/home/husen/log/sys.log" ] type => "from_sys" #tags => ["from_sys"] } } ## in2.conf内容如下: input{ file{ path=>[ "/home/husen/log/error.log" ] type => "from_error" #tags => ["from_sys"] } } ## out1.conf如下 if [type] == "from_sys"{ #if "from_sys" in [tags] elasticsearch { action => "index" hosts => "localhost:9200" index => "from_sys_log" codec => "json" } } ## out2.conf如下 if [type] == "from_error"{ #if "from_error" in [tags] elasticsearch { action => "index" hosts => "localhost:9200" index => "from_error_log" codec => "json" } } #特别地,如果要针对不同的类型日志用不同filter来grok解析, #也可以通过类似的方法判断