logstash学习笔记 解决令你头疼的问题 采集access日志 处理中文乱码 读取多个配置文件

mac2024-05-15  34

LogStash笔记

文章目录

LogStash笔记配置参数说明logstash.ymlstartup.options 使用logstash采集tomcat access访问日志tomcat访问日志输出格式logstash配置文件 logstash处理中文乱码问题 logstash如何读取多个配置文件logstash多个配置文件里的input、filter、output是否相互独立logstash读取多个配置文件建议的配置方法

配置参数说明

/etc/logstash/config/logstash.yml:主要用于控制logstash运行时的状态 /etc/logstash/config/startup.options:logstash 运行相关参数

/opt/log_samele : 日志资源文件

/use/share/logstash/ : logstash安装目录

logstash.yml
参数用途默认值node.name节点名称主机名称path.data/数据存储路径LOGSTASH_HOME/data/pipeline.workers输出通道的工作workers数据量(提升输出效率)cpu核数pipeline.output.workers每个输出插件的工作wokers数量1pipeline.batch.size每次input数量125path.config过滤配置文件目录config.reload.automatic自动重新加载被修改配置false or trueconfig.reload.interval配置文件检查时间path.logs日志输出路径http.host绑定主机地址,用户指标收集“127.0.0.1”log.level日志输出级别,如果config.debug开启,这里一定要是debug日志infolog.format日志格式* plain*path.plugins自定义插件目录
startup.options
参数用途JAVACMD=/usr/bin/java本地jdkLS_HOME=/opt/logstashlogstash所在目录LS_SETTINGS_DIR="${LS_HOME}/config"默认logstash配置文件目录LS_OPTS="–path.settings ${LS_SETTINGS_DIR}"logstash启动命令参数 指定配置文件目录LS_JAVA_OPTS=""指定jdk目录LS_PIDFILE=/var/run/logstash.pidlogstash.pid所在目录LS_USER=logstashlogstash启动用户LS_GROUP=logstashlogstash启动组LS_GC_LOG_FILE=/var/log/logstash/gc.loglogstash jvm gc日志路径LS_OPEN_FILES=65534logstash最多打开监控文件数量

使用logstash采集tomcat access访问日志

tomcat访问日志输出格式

<Valve className="org.apache.catalina.valves.AccessLogValve" directory="logs" prefix="localhost_access_log." suffix=".txt" pattern="combined" />

日志格式模板 看起来就像是这样

127.0.0.1 - - [25/Oct/2019:11:17:45 +0800] "GET / HTTP/1.1" 200 11418 "-" "Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/77.0.3865.120 Safari/537.36" 127.0.0.1 - - [25/Oct/2019:11:17:45 +0800] "GET /favicon.ico HTTP/1.1" 200 21630 "http://127.0.0.1:8081/" "Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/77.0.3865.120 Safari/537.36"

logstash配置文件

input { file { path => "xxx/xxx/xxx*.txt" add_field => {"fromsystem"=>"xxx"} tags => ["xxx"] type => "system" } } filter { if[type] == "system"{ grok { match => { "message" => "%{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] \"(?:%{WORD:verb} %{NOTSPACE:request}(?: HTTP/%{NUMBER:httpversion})?|%{DATA:rawrequest})\" %{NUMBER:response} (?:%{NUMBER:bytes}|-)" } remove_field => "message" } date { match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ] } } } output { elasticsearch { hosts => ["127.0.0.1:9200"] index => "%{type}-%{tags}-%{+YYYY-MM}" } stdout { codec => "json_lines" } }

配置文件部分解释

1.输入文件

input { file { path => "xxx/xxx/xxx*.txt" // 日志文件路径 add_field => {"xxx"=>"xxx"} // 添加一个自定义字段 tags => ["xxx"] // 添加一个自定义的tags用来判断或者是当变量使用 type => "system" // 可自定义的type值也可以作为判断条件 } }

2.处理部分

filter { if[type] == "system"{ //根据type判断 如果为true则进入该grok grok { match => { "message" => "%{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] \"(?:%{WORD:verb} %{NOTSPACE:request}(?: HTTP/%{NUMBER:httpversion})?|%{DATA:rawrequest})\" %{NUMBER:response} (?:%{NUMBER:bytes}|-)" // 解析上面指定格式的日志 } remove_field => "message" // 删除message字段节省es内存 } date { match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ] //将日志时间替换为@timestamp } } }

3.输出部分

output { elasticsearch { // 输出到es hosts => ["127.0.0.1:9200"] // es的访问地址 index => "%{type}-%{tags}-%{+YYYY-MM}" // 索引名称 } stdout { codec => "json_lines" } // 以json形式打印到控制台 }

logstash处理中文乱码问题

1.首先查看日志文件的字符集 [logStash@hadoop3 gamelogs]$ file game.txt game.txt: UTF-8 Unicode (with BOM) text, with CRLF line terminators 2.如果是UTF-8编码,则在LogStash配置文件设置charset为UTF-8

input{ file{ path=>"/home/logStash/gamelogs/*.txt" start_position=>"beginning" stat_interval=>10 type=>"gameLogs" codec=>plain{ charset=>"UTF-8" } } } output{ kafka{ topic_id => "gameLogTopic" codec => plain { format => "%{message}" charset => "UTF-8" } bootstrap_servers=>"192.168.25.100:9092,192.168.25.102:9092" } }

3.如果不是UTF-8,则统一设置为GB2312

input{ file{ path=>"/home/logStash/gamelogs/*.txt" start_position=>"beginning" stat_interval=>10 type=>"gameLogs" codec=>plain{ charset=>"GB2312" } } } output{ kafka{ topic_id => "gameLogTopic" codec => plain { format => "%{message}" charset => "GB2312" } bootstrap_servers=>"192.168.25.100:9092,192.168.25.102:9092" } }

转载于:https://blog.csdn.net/zhou373986278/article/details/82191898

logstash如何读取多个配置文件

我们知道在启动logstash的时候,只要加上-f /you_path_to_config_file就可以加载配置文件了,如果我们需要加载多个配置文件,只需要-f /you_path_to_config_directory就可以了。简单说,就是在-f后面加上目录就可以。 注意:目录后面不能加 * 号,否则只会读取一个文件,但是在读取日志文件时,可以匹配所有,比如sys.log可以匹配所有以sys.log开头的日志文件,如sys.log1,sys.log2等。

示例如下:

//比如 /home/husen/config/目录下有 //in1.conf、in2.conf、filter1.conf、filter2.conf、out.conf这5个文件 //我们使用 /logstash-5.5.1/bin/logstash -f /home/husen/config启动logtstash //logstash会自动加载这个5个配置文件,并合并成1个整体的配置文件

logstash多个配置文件里的input、filter、output是否相互独立

比如:

## in1.conf内容如下: input{ file{ path=>[ "/home/husen/log/sys.log" ] } } ## in2.conf内容如下: input{ file{ path=>[ "/home/husen/log/error.log" ] } } ## out1.conf如下 elasticsearch { action => "index" hosts => "localhost:9200" index => "from_sys_log" codec => "json" } ## out2.conf如下 elasticsearch { action => "index" hosts => "localhost:9200" index => "from_error_log" codec => "json" } //这几个配置文件的目的是: //想把in1.conf读进来的sys.log的索引建立为from_sys_log //把in.conf读进来的error.log的索引建立为femo_error_log //logstash-5.5.1/bin/logstash -f /home/husen/config //启动之后,会发现in1.conf的日志被输出了两次,in2.conf读进来的日志也被输出了两次 //结论:logstash读取多个配置文件只是简单的将所有配置文件整合到了一起! //如果要彼此独立,需要自己加字段,然后判断一下 //比如读取来不同不同服务器的同样格式的日志,那么filter是可以共用的 //但是输出的索引需要分别建立,以提高辨识度

logstash读取多个配置文件建议的配置方法

如果要在配置文件中,独立一些部分,又要共用一些部分,比如我上门提高同样的日志来自不同的服务器,需要用同样的filter,但是建立不同的索引的问题,该怎么办? 建议使用tags或者type这两个特殊字段,即在读取文件的时候,添加标识符在tags中或者定义type变量。

示例如下:

## in1.conf内容如下: input{ file{ path=>[ "/home/husen/log/sys.log" ] type => "from_sys" #tags => ["from_sys"] } } ## in2.conf内容如下: input{ file{ path=>[ "/home/husen/log/error.log" ] type => "from_error" #tags => ["from_sys"] } } ## out1.conf如下 if [type] == "from_sys"{ #if "from_sys" in [tags] elasticsearch { action => "index" hosts => "localhost:9200" index => "from_sys_log" codec => "json" } } ## out2.conf如下 if [type] == "from_error"{ #if "from_error" in [tags] elasticsearch { action => "index" hosts => "localhost:9200" index => "from_error_log" codec => "json" } } #特别地,如果要针对不同的类型日志用不同filter来grok解析, #也可以通过类似的方法判断
最新回复(0)