使用golang实现日志收集系统的logagent

这篇具有很好参考价值的文章主要介绍了使用golang实现日志收集系统的logagent。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

整体架构

参考 七米老师的日志收集项目
使用golang实现日志收集系统的logagent
主要用go实现logagent的部分,logagent的作用主要是实时监控日志追加的变化,并将变化发送到kafka中。
之前我们已经实现了 用go连接kafka并向其中发送数据,也实现了使用tail库监控日志追加操作。
我们把这两部分结合起来实现监控日志追加并发送到kafka。

使用github.com/go-ini/ini配置参数

// 读取配置参数
	cfg, err:=ini.Load("config/config.ini")
	if err!=nil {
		logrus.Error((" load config error"))
		return
	}
[kafka]
address = 127.0.0.1:9092
chan_size = 1000

[collect]
logfile_path= D:/learn/go/log-collector-lmh/log_agent/config_version/log_file/xx.log

配置参数主要包括,kafka的启动端口,存储的数据大小限制,日志文件的路径。

初始化kafka

kafka.go

package kafka

import (
	"github.com/Shopify/sarama"
	"github.com/sirupsen/logrus"
)

var (
	Client sarama.SyncProducer
	MsgChan chan *sarama.ProducerMessage //占用的字节数少,传递的指针
)

func InitKafka(kafkaAddr string, chanSize int64) (err error){
	config:=sarama.NewConfig()
	// 生产者配置
	config.Producer.RequiredAcks=sarama.WaitForAll
	config.Producer.Partitioner=sarama.NewRandomPartitioner
	config.Producer.Return.Successes=true
	// 连接kafka
	Client,err=sarama.NewSyncProducer([]string{kafkaAddr}, config)
	if err!=nil {
		logrus.Error("producer closed", err)
		return
	}
	// 从管道中读取日志并发送到kafka
	MsgChan = make(chan *sarama.ProducerMessage, chanSize)
	go sendMsg()
	return
}

func sendMsg(){
	for {
		select {
			case msg := <- MsgChan:
				pid, offset, err := Client.SendMessage(msg)
				if err != nil {
					logrus.Warning("send msg failed, err:", err)
					return
				}
				logrus.Infof("send msg to kafka success. pid:%v offset:%v", pid, offset)
		}
	}
}

这里实现了连接kafka,并使用协程不断地读取MsgChan,读取到数据后向kafka发送,这里MsgChan通道的数据由tail监控到的日志变化写入。
main.go中调用

// 初始化kafka
	kafkaAddr:=cfg.Section("kafka").Key("address").String()
	chanSize:=cfg.Section("kafka").Key("chan_size").MustInt64(0)

	err=kafka.InitKafka(kafkaAddr, chanSize)
	if err!=nil {
		logrus.Error("kafka init failed")
	}
	logrus.Info("Kafka init success")

初始化tailf,并将日志数据写入ChanMsg

tailF.go

package tailF
import (
	"github.com/hpcloud/tail"
	"fmt"
)
var (
	TailObj *tail.Tail
)
func InitTail(filename string) (err error) {
	config := tail.Config{
		ReOpen: true,
		Follow: true,
		Location: &tail.SeekInfo{Offset: 0, Whence: 2},
		MustExist: false,
		Poll: true,
	}

	// 打开文件开始读取数据
	TailObj, err =  tail.TailFile(filename, config)
	if err != nil {
		fmt.Printf("create tail %s failed, err:%v\n", filename, err)
		return
	}
	return
}

main.go中对应

// 初始化tailf
	fileName:=cfg.Section("collect").Key("logfile_path").String()
	err=tailF.InitTail(fileName)
	if err!=nil {
		logrus.Error(" tailf init failed")
	}
	logrus.Info("Init tail success")
	// 把读取的日志发往kafka
	err=run()
	if err!=nil {
		logrus.Error(" run error%s", err)
		return
	}
	logrus.Info("run success")

main.go中实现的run函数,读取tailF的数据,并写入ChanMsg

func run () (err error){
	for {
		line,ok:=<-tailF.TailObj.Lines
		if !ok {
			logrus.Warn("tail file %s close reopen\n", tailF.TailObj.Filename)
			// 读取出错等一秒
			time.Sleep(time.Second)
			continue
		}
		// 使用通道将传输日志改为异步
		// 读取的日志封装为ProducerMessage
		msg:=&sarama.ProducerMessage{}
		msg.Topic="web_log"
		msg.Value=sarama.StringEncoder(line.Text)
		// 放到channel中
		kafka.MsgChan<-msg
	}
}

完整main.go

package main

import (
	"config_version/kafka"
	"config_version/tailF"
	"time"
	"github.com/Shopify/sarama"
	"github.com/go-ini/ini"
	"github.com/sirupsen/logrus"
)

func main() {
	// 读取配置参数
	cfg, err:=ini.Load("config/config.ini")
	if err!=nil {
		logrus.Error((" load config error"))
		return
	}
	// 初始化kafka
	kafkaAddr:=cfg.Section("kafka").Key("address").String()
	chanSize:=cfg.Section("kafka").Key("chan_size").MustInt64(0)

	err=kafka.InitKafka(kafkaAddr, chanSize)
	if err!=nil {
		logrus.Error("kafka init failed")
	}
	logrus.Info("Kafka init success")
	// 初始化tailf
	fileName:=cfg.Section("collect").Key("logfile_path").String()
	err=tailF.InitTail(fileName)
	if err!=nil {
		logrus.Error(" tailf init failed")
	}
	logrus.Info("Init tail success")
	// 把读取的日志发往kafka
	err=run()
	if err!=nil {
		logrus.Error(" run error%s", err)
		return
	}
	logrus.Info("run success")

}

func run () (err error){
	for {
		line,ok:=<-tailF.TailObj.Lines
		if !ok {
			logrus.Warn("tail file %s close reopen\n", tailF.TailObj.Filename)
			// 读取出错等一秒
			time.Sleep(time.Second)
			continue
		}
		// 使用通道将传输日志改为异步
		// 读取的日志封装为ProducerMessage
		msg:=&sarama.ProducerMessage{}
		msg.Topic="web_log"
		msg.Value=sarama.StringEncoder(line.Text)
		// 放到channel中
		kafka.MsgChan<-msg
	}
}

至此, 我们实现了简化版的日志收集系统的logagent功能,目前日志的路径还需要手动写入配置文件中,修改的话还需重启项目,之后可以使用ETCD实现日志路径的自动配置。文章来源地址https://www.toymoban.com/news/detail-418934.html

到了这里,关于使用golang实现日志收集系统的logagent的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • ELK---日志收集系统

    1.要收集哪些日志? ①系统日志–为监控做准备 ②服务日志–数据库–MySQL–慢查询日志、错误日志、普通日志 ③业务日志–log4j( 必须要收集的是业务日志 ) 注:log4j—Java类的数据业务日志 (1)要有针对性的去收集 (2)调整日志级别 2.日志收集后,如何展示?(可视化

    2023年04月08日
    浏览(28)
  • go语言日志收集系统

    完整项目的GitHub地址 1. 项目背景 a. 每个系统都有日志,当系统出现问题时,需要通过日志解决问题 b. 当系统机器比较少时,登陆到服务器上查看即可满足 c. 当系统机器规模巨大,登陆到机器上查看几乎不现实 2. 解决方案 a. 把机器上的日志实时收集,统一的存储到中心系统

    2023年04月08日
    浏览(27)
  • Loki 日志收集系统

       Loki 的日志堆栈由 3 个组件组成: promtail : 用于采集日志、并给每条日志流打标签,每个节点部署,k8s部署模式下使用daemonset管理。  loki: 用于存储采集的日志, 并根据标签查询日志流。单节点部署,一般和监控组件部署在同一节点。  Grafana: 提供界面,实现日志的

    2024年02月08日
    浏览(34)
  • 【ELK日志收集系统】

    目录 一、概述 1.作用 2.为什么使用? 二、组件 1.elasticsearch 1.1 作用 1.2 特点 2.logstash 2.1 作用 2.2 工作过程 2.3 INPUT 2.4 FILETER 2.5 OUTPUTS 3.kibana 三、架构类型 1.ELK 2.ELKK 3.ELFK 4.ELFKK 四、案例 - 构建ELK集群 1.环境配置 2.安装node1与node2节点的elasticsearch 2.1 安装 2.2 配置 2.3 启动ela

    2024年02月10日
    浏览(30)
  • ELK日志收集系统(四十九)

    提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 目录 前言 一、概述 二、组件 1. elasticsearch 2. logstash 2.1 工作过程 2.2 INPUT 2.3  FILETER 2.4  OUTPUTS 3. kibana 三、架构类型 3.1 ELK 3.2 ELKK 3.3 ELFK 3.5 EFK 四、案例 ELK日志收集系统是一种常用的开源系统,由三个主

    2024年02月10日
    浏览(31)
  • ELK 日志系统收集K8s中日志

    • K8s弹性伸缩性:导致不能预先确定采集的目标 • 容器隔离性:容器的文件系统与宿主机是隔离,导致日志采集器读取日志文件受阻。 应用程序日志记录体现方式分为两类: • 标准输出:输出到控制台,使用kubectl logs可以看到。 例如 nginx日志是将访问日志输出到标准输出

    2024年02月09日
    浏览(28)
  • 系统学习Linux-ELK日志收集系统

    ELK日志收集系统集群实验 实验环境 角色 主机名 IP 接口 httpd 192.168.31.50 ens33 node1 192.168.31.51 ens33 noed2 192.168.31.53 ens33 环境配置 设置各个主机的ip地址为拓扑中的静态ip,并修改主机名 安装elasticsearch node1 vim /etc/elasticsearch/elasticsearch.yml  node2 移动到elk文件夹 修改elasticsearch配置文

    2024年02月10日
    浏览(31)
  • ELK+Kafka+Zookeeper日志收集系统

    节点IP 节点规划 主机名 192.168.112.3 Elasticsearch + Kibana + Logstash + Zookeeper + Kafka + Nginx elk-node1 192.168.112.3 Elasticsearch + Logstash + Zookeeper + Kafka elk-node2 192.168.112.3 Elasticsearch + Logstash + Zookeeper + Kafka + Nginx elk-node3 修改主机名 配置映射 安装Elasticserach 三台主机都需安装java及elasticserach 启动

    2024年04月18日
    浏览(60)
  • 电商大数据日志收集系统之EFK

    背景 日志管理的挑战: 关注点很多,任何一个点都有可能引起问题 日志分散在很多机器,出了问题时,才发现日志被删了 很多运维人员是消防员,哪里有问题去哪里 集中化日志管理思路: 日志收集 ——》格式化分析 ——》检索和可视化 ——》 风险告警 ELK架构: ELK架构

    2024年02月07日
    浏览(30)
  • 大数据技术之flume——日志收集系统

    大数据需要解决的三个问题:采集、存储、计算。 Apache flume是一个分布式、可靠的、高可用的 海量日志数据采集、聚合和传输系统 ,将海量的日志数据从不同的数据源移动到一个中央的存储系统中。用一句话总结:Flume不生产数据,它只是数据的搬运工。 flume最主要的作用

    2024年02月06日
    浏览(27)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包