Springboot集成syslog+logstash收集日志到ES

news/2024/11/8 16:33:54 标签: spring boot, elasticsearch, 后端

Springboot集成syslog+logstash收集日志到ES

1、背景

Logstash 是一个实时数据收集引擎,可收集各类型数据并对其进行分析,过滤和归纳。按照自己条件分析过滤出符合的数据,导入到可视化界面。它可以实现多样化的数据源数据全量或增量传输,数据标准格式处理,数据格式化输出等的功能,常用于日志处理。工作流程分为三个阶段:

  1. input数据输入阶段,可接收oracle、mysql、postgresql、file等多种数据源;
  2. filter数据标准格式化阶段,可过滤、格式化数据,如格式化时间、字符串等;
  3. output数据输出阶段,可输出到elasticsearch、mongodb、kafka等接收终端。

架构原理:springboot发出syslog日志,通过系统的rsyslog服务进行数据转发,logstash监听rsyslog端口过滤数据并发到es进行存储

2、springboot集成syslog

maven依赖:

<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-api</artifactId>
    <version>1.7.7</version>
</dependency>
<dependency>
    <groupId>ch.qos.logback</groupId>
    <artifactId>logback-core</artifactId>
    <version>1.1.7</version>
</dependency>
<dependency>
    <groupId>ch.qos.logback</groupId>
    <artifactId>logback-classic</artifactId>
    <version>1.1.7</version>
</dependency>

logback.xml文件配置

配置好日志之后,在root标签中添加appender才能生效;

<?xml version="1.0" encoding="UTF-8"?>
<configuration debug="false">
  <!-- 控制台输出 -->
  <appender name="consoleLogAppender" class="ch.qos.logback.core.ConsoleAppender">
    <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
      <level>INFO</level>
    </filter>
    <encoder>
      <pattern>%d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{20} - %cyan(%.-3072msg %n)</pattern>
    </encoder>
  </appender>

  <appender name="infoFileAppender" class="ch.qos.logback.core.rolling.RollingFileAppender">
    <File>./logs/service.log</File>
    <filter class="ch.qos.logback.classic.filter.LevelFilter">
      <level>INFO</level>
      <onMatch>ACCEPT</onMatch>
      <onMismatch>DENY</onMismatch>
    </filter>
    <encoder>
      <pattern>%d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{20} - %cyan(%.-3072msg %n)</pattern>
    </encoder>
    <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
      <fileNamePattern>./logs/service-log-%d{yyyy-MM-dd}.log</fileNamePattern>
      <maxHistory>15</maxHistory>
      <totalSizeCap>5GB</totalSizeCap>
    </rollingPolicy>
  </appender>
  <appender name="errorFileAppender" class="ch.qos.logback.core.rolling.RollingFileAppender">
    <File>./logs/service-error.log</File>
    <filter class="ch.qos.logback.classic.filter.LevelFilter">
      <level>ERROR</level>
      <onMatch>ACCEPT</onMatch>
      <onMismatch>DENY</onMismatch>
    </filter>
    <encoder>
      <pattern>%d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{20} - %cyan(%.-3072msg %n)</pattern>
    </encoder>
    <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
      <fileNamePattern>./logs/service-error.log.%d{yyyy-MM-dd}.log</fileNamePattern>
      <maxHistory>15</maxHistory>
      <totalSizeCap>5GB</totalSizeCap>
    </rollingPolicy>
  </appender>

  <appender name="msgAppender" class="ch.qos.logback.core.rolling.RollingFileAppender">
    <File>./logs/service-msg.log</File>
    <filter class="ch.qos.logback.classic.filter.LevelFilter">
      <level>INFO</level>
      <onMatch>ACCEPT</onMatch>
      <onMismatch>DENY</onMismatch>
    </filter>
    <encoder>
      <pattern>%d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{20} - %cyan(%.-3072msg %n)</pattern>
    </encoder>
    <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
      <fileNamePattern>./logs/service-msg-%d{yyyy-MM-dd}.log</fileNamePattern>
      <maxHistory>5</maxHistory>
      <totalSizeCap>5GB</totalSizeCap>
    </rollingPolicy>
  </appender>

  <appender name="taskAppender" class="ch.qos.logback.core.rolling.RollingFileAppender">
    <File>./logs/service-task.log</File>
    <filter class="ch.qos.logback.classic.filter.LevelFilter">
      <level>INFO</level>
      <onMatch>ACCEPT</onMatch>
      <onMismatch>DENY</onMismatch>
    </filter>
    <encoder>
      <pattern>%d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{20} - %cyan(%.-3072msg %n)</pattern>
    </encoder>
    <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
      <fileNamePattern>./logs/service-task-%d{yyyy-MM-dd}.log</fileNamePattern>
      <maxHistory>5</maxHistory>
      <totalSizeCap>5GB</totalSizeCap>
    </rollingPolicy>
  </appender>

  <appender name="mybatisplus" class="ch.qos.logback.core.rolling.RollingFileAppender">
    <File>./logs/service-mybatisplus.log</File>
    <filter class="ch.qos.logback.classic.filter.LevelFilter">
      <level>DEBUG</level>
      <onMatch>ACCEPT</onMatch>
      <onMismatch>DENY</onMismatch>
    </filter>
    <encoder>
      <pattern>%d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{20} - %cyan(%.-3072msg %n)</pattern>
    </encoder>
    <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
      <fileNamePattern>./logs/service-mybatisplus-%d{yyyy-MM-dd}.log</fileNamePattern>
      <maxHistory>5</maxHistory>
      <totalSizeCap>5GB</totalSizeCap>
    </rollingPolicy>
  </appender>
  
  <!-- 定义一个 SyslogAppender -->
  <appender name="SYSLOG" class="ch.qos.logback.classic.net.SyslogAppender">
    <syslogHost>localhost</syslogHost>
    <port>12525</port>
    <facility>LOCAL0</facility> <!-- 设置 Syslog 设施,这意味着服务发送到 Syslog 服务器的所有日志都将被标记为来源于 LOCAL0 -->
    <filter class="ch.qos.logback.classic.filter.LevelFilter">
      <level>WARN</level>
      <onMatch>ACCEPT</onMatch>
      <onMismatch>DENY</onMismatch>
    </filter>
    <suffixPattern>
      [%d{yyyy-MM-dd HH:mm:ss.SSS}] - [%p] - [%X{app:-${app}}] - [%thread] - [%logger{36}.%M] - %msg%n
    </suffixPattern>
  </appender>

  <logger name="msgLogger" level="info"  additivity="false">
    <appender-ref ref="msgAppender" />
  </logger>

  <logger name="taskLogger" level="info"  additivity="false">
    <appender-ref ref="taskAppender" />
  </logger>

<!--  <logger name="com.zbnsec.opera.project.simulator.framework.task" level="DEBUG">
    <appender-ref ref="mybatisplus" />
  </logger>-->

  <root level="INFO" additivity="false">
    <appender-ref ref="consoleLogAppender"/>
    <appender-ref ref="infoFileAppender"/>
    <appender-ref ref="errorFileAppender"/>
    <appender-ref ref="SYSLOG"/>
  </root>
</configuration>

SyslogAppender是syslog的配置:
syslogHost:指的是syslog服务器的主机名/IP地址
port:syslog服务器的监听端口,默认为514 udp
facility:标识消息的来源
suffixPattern:描述日志的格式

3、rsyslog接收springboot应用的日志

1、服务器安装rsyslog服务

apt install rsyslog 安装
systemctl start rsyslog 启动服务
systemctl status rsyslog 查看服务状态
systemctl enable rsyslog  设置rsyslog服务器在启动时自动运行

2、配置rsyslog.conf

rsyslog的配置文件位于:/etc/rsyslog.conf

global(workDirectory="/var/lib/rsyslog")
module(load="builtin:omfile" Template="RSYSLOG_TraditionalFileFormat")
include(file="/etc/rsyslog.d/*.conf" mode="optional")

*.* @@localhost:12515

*.info;mail.none;authpriv.none;cron.none                /var/log/messages
authpriv.*                                              /var/log/secure
mail.*                                                  -/var/log/maillog
cron.*                                                  /var/log/cron
*.emerg                                                 :omusrmsg:*
uucp,news.crit                                          /var/log/spooler
local7.*                                                /var/log/boot.log

以上配置转发了12525端口的syslog,@@代表udp;
如果此时需要系统日志,则需要以下配置:tail -500f /var/log/messages 则会看到系统日志一直在刷新保存

module(load="imuxsock"  SysSock.Use="off") 
module(load="imjournal"  StateFile="imjournal.state") 
module(load="imklog") 
module(load="immark") 
$imjournalRatelimitInterval 0

如果需要将sprigboot日志同时也存储在messages文件,则需要以下配置:
注意:这里监听12525端口,则在logstash启动时,同时监听12525,会出现端口占用,则logstash不会接收到springboot日志数据;

# 监听 UDP 端口
module(load="imudp")
input(type="imudp" port="12525")

# 监听 TCP 端口
module(load="imtcp")
input(type="imtcp" port="12525")

修改完配置之后,执行 systemctl restart rsyslog 重新启动服务

4、集成logstash

1、拉取logstash镜像

logstash的版本要和ES的版本一致,否则可能出现其他问题

docker pull docker.elastic.co/logstash/logstash:7.4.0

2、配置logstash

除了以下配置,其他的都使用logstash容器中的默认配置,可以起一个空的容器,把这些默认配置(config目录和pipeline目录)复制出来
logstash.yaml:

config.reload.automatic: true
config.reload.interval: 3s
http.host: "0.0.0.0"
path.logs: /usr/share/logstash/logs/

logstash.conf:

status = error
name = LogstashPropertiesConfig
appender.console.type = Console
appender.console.name = plain_console
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = [%d{ISO8601}][%-5p][%-25c] %m%n
appender.json_console.type = Console
appender.json_console.name = json_console
appender.json_console.layout.type = JSONLayout
appender.json_console.layout.compact = true
appender.json_console.layout.eventEol = true
# Define Rolling File Appender
appender.rolling.type = RollingFile
appender.rolling.name = rolling
appender.rolling.fileName = ${sys:ls.logs}/logstash-plain.log
appender.rolling.filePattern = ${sys:ls.logs}/logstash-%d{yyyy-MM-dd}-%i.log.gz
appender.rolling.layout.type = PatternLayout
appender.rolling.layout.pattern = [%d{ISO8601}][%-5p][%-25c] %m%n
appender.rolling.policies.type = Policies
appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
appender.rolling.policies.size.size = 100MB
appender.rolling.strategy.type = DefaultRolloverStrategy
appender.rolling.strategy.max = 20
rootLogger.level = ${sys:ls.log.level}
rootLogger.appenderRef.console.ref = ${sys:ls.log.format}_console
rootLogger.appenderRef.rolling.ref = rolling

pipelines.yml: 在pipeline目录中配置几个管道,则在这里对应配置

- pipeline.id: system-syslog
  path.config: "/usr/share/logstash/pipeline/fscr-syslog.conf"

fscr-syslog.conf:

input {
  syslog {
    port => 12525
    type => "system-syslog"
  }
}
filter {
  if [type] == "system-syslog" {
    mutate {
      # Remove ANSI escape sequences
      gsub => [
        "message", "\e\[\d+(;\d+)*m", ""
      ]
    }
    if [message] =~ /^\[/ {
		  dissect {
		    mapping => {
		      "message" => "[%{timestamp}] - [%{loglevel}] - [%{app}] - [%{thread_info}] - [%{source_class}] - %{log_message}"
			      }
			    }
		  }
    mutate {
      # Convert "WARN" to "WARNING"
      gsub => [
        "loglevel", "^WARN$", "WARNING"
      ]
      add_field => [ "received_at", "%{@timestamp}" ]
      add_field => [ "received_from", "%{host}" ]
      add_field => [ "syslog_hostname", "%{logsource}" ]
      add_field => [ "syslog_severity", "%{loglevel}" ]
      add_field => [ "syslog_program", "%{app}" ]
      add_field => [ "syslog_message", "%{message}" ]
      add_field => [ "syslog_timestamp", "%{timestamp}" ]
      remove_field => ["severity_label", "facility_label", "facility", "priority"]
    }
    date {
      match => ["adjusted_received_at", "ISO8601"]
      timezone => "Asia/Shanghai"
      target => "@timestamp"
    }
  }
}

output {
  if [loglevel] == "WARNING" or [loglevel] == "ERROR" {
    elasticsearch {
      hosts => ["http://esHost:9200"]
      index => "logstash-%{+YYYY.MM.dd}"
      template_name => "logstash"   # 指定模板(该模板已经存在于es中)
      template_overwrite => false
    }
  }
	if [loglevel] == "WARNING" or [loglevel] == "ERROR" {
    stdout {
	    codec => rubydebug
	  }
  }
}

logstash.json索引文件:

{
  "name": "logstash",
  "order": 0,
  "version": 60001,
  "index_patterns": [
    "logstash-*"
  ],
  "settings": {
    "index": {
      "number_of_shards": "1",
      "refresh_interval": "5s"
    }
  },
  "mappings": {
    "dynamic_templates": [
      {
        "message_field": {
          "path_match": "message",
          "mapping": {
            "norms": false,
            "type": "text"
          },
          "match_mapping_type": "string"
        }
      },
      {
        "string_fields": {
          "mapping": {
            "norms": false,
            "type": "text",
            "fields": {
              "keyword": {
                "ignore_above": 256,
                "type": "keyword"
              }
            }
          },
          "match_mapping_type": "string",
          "match": "*"
        }
      }
    ],
    "properties": {
      "@timestamp": {
        "type": "date"
      },
      "geoip": {
        "dynamic": true,
        "properties": {
          "ip": {
            "type": "ip"
          },
          "latitude": {
            "type": "half_float"
          },
          "location": {
            "type": "geo_point"
          },
          "longitude": {
            "type": "half_float"
          }
        }
      },
      "@version": {
        "type": "keyword"
      }
    }
  },
  "aliases": {}
}

启动容器:

docker run --name logstash -itd --net=host \
	-v /opt/fscr/middleware/logstash/logstash/config:/usr/share/logstash/config \
	-v /opt/fscr/middleware/logstash/logstash/pipeline:/usr/share/logstash/pipeline \
	-p 5044:5044 -p 9600:9600 \
	logstash:8.8.0

容器启动后,无error日志,可以看到打印的日志信息,为正常启动;


http://www.niftyadmin.cn/n/5744155.html

相关文章

修改 title标题图标

路径 \web\views\webclient_templates.xml \web\static\src\webclient\webclient.js 再升级web模块

基于java+SpringBoot+Vue的微服务在线教育系统设计与实现

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; Springboot mybatis Maven mysql5.7或8.0等等组成&#x…

企业数字化转型从理论到实践:企业架构治理的深度解读与应用指南

在数字化转型中&#xff0c;企业面临的不仅是技术的更替&#xff0c;更是业务模型的重塑和组织结构的深度变革。企业架构治理能力&#xff08;Enterprise Architecture Governance Capability, EAGC&#xff09;作为转型过程中的核心支柱&#xff0c;提供了确保架构一致性、治理…

使用 OpenCV 和 Pyzbar 检测二维码和条码

概述 在现代社会&#xff0c;二维码和条码的应用非常广泛&#xff0c;从商品标签到支付二维码&#xff0c;几乎无处不在。本文将详细介绍如何使用 OpenCV 和 Pyzbar 库在 Python 中检测并识别二维码和条码&#xff0c;并通过具体的代码示例来展示整个过程。 环境准备 在开始…

医疗器械产品稳定性验证有效期与运输条件的深度解析

在医疗器械行业&#xff0c;产品的稳定性验证是确保其在声明的有效期内维持安全性和有效性的重要环节。有效期验证不仅考验着产品的内在质量&#xff0c;还考量着其对外界环境的适应能力&#xff0c;尤其是运输和储存条件的影响。本文将围绕产品有效期验证与运输条件的关系展开…

文件工具类

isImage(MultipartFile file) 检查文件是否是图片类型isExcel(MultipartFile file) 检查文件是否是excel类型checkFileType(MultipartFile file, String... type) 检查文件是否是指定的类型uploadFile(MultipartFile file) 上传文件downloadFile(HttpServletResponse response,…

CSS如何改变滚动条的颜色样式粗细?

默认滚动条很丑怎么办&#xff1f;如何改版滚动条的粗细&#xff0c;颜色&#xff0c;让它更美观&#xff1f;CSS如何改变滚动条的粗细&#xff1f; 干货来了 /* Webkit内核浏览器的滚动条样式 */ ::-webkit-scrollbar {width: 4px; /* 设置滚动条的宽度 */ }::-webkit-scroll…

【大数据学习 | kafka高级部分】kafka的kraft集群

首先我们分析一下zookeeper在kafka中的作用 zookeeper可以实现controller的选举&#xff0c;并且记录topic和partition的元数据信息&#xff0c;帮助多个broker同步数据信息。 在新版本中的kraft模式中可以这个管理和选举可以用kafka自己完成&#xff0c;而不再依赖zookeeper。…