Go实现LogCollect:海量日志收集系统【下篇——开发LogTransfer】

这篇具有很好参考价值的文章主要介绍了Go实现LogCollect:海量日志收集系统【下篇——开发LogTransfer】。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Go实现LogAgent:海量日志收集系统【下篇】

0 前置文章

Go实现LogAgent:海量日志收集系统【上篇——LogAgent实现】

前面的章节我们已经完成了日志收集(LogAgent),接下来我们需要将日志写入到kafka中,然后将数据落地到Elasticsearch中。

项目架构图:
Go实现LogCollect:海量日志收集系统【下篇——开发LogTransfer】,go,demo,golang,开发语言,日志收集项目,kafka,LogTransfer,es
项目逻辑图:
Go实现LogCollect:海量日志收集系统【下篇——开发LogTransfer】,go,demo,golang,开发语言,日志收集项目,kafka,LogTransfer,es

1 docker搭建Elasticsearcsh、Kibana

如果没有docker环境的,可以在本机安装docker desktop

# 1 创建一个docker网络
docker network create es-net
# 查看本机网络
docker network ls
# 删除一个网络
docker network rm es-net

# 2 拉取es、kibana镜像
docker pull elasticsearch:7.17.4
docker pull kibana:7.17.4

# 3 创建es容器并挂在数据卷
mkdir -p /Users/xxx/docker-home/es-data/_data
mkdir -p /Users/xxx/docker-home/es-plugins
mkdir -p /Users/xxx/docker-home/es-config
mkdir -p /Users/xxx/docker-home/kibana-config

touch elasticsearch.yml
touch kibana.yml

1.需要保证要挂载的目录有读写权限,包括要挂载的配置文件。如果没有则用chmod 777命令
2.如果要挂载配置文件,则需要提前把配置文件内容写好,不能为空,否则可能会影响es和kibana运行。
3.如果只挂载到配置文件目录,不准备配置文件,会导致创建容器后没有配置文件。报错

elasticsearch.yml:

cluster.name: "docker-cluster"
network.host: 0.0.0.0

kibana.yml:

server.host: "0.0.0.0"
server.shutdownTimeout: "5s"
elasticsearch.hosts: [ "http://elasticsearch:9200" ]
monitoring.ui.container.elasticsearch.enabled: true

启动es:

docker run -d \
 --name es7.17.4 -p 9200:9200 -p 9300:9300 \
 -e "discovery.type=single-node" -e ES_JAVA_OPTS="-Xms64m -Xmx128m" \
 -v /Users/xxx/docker-home/es-data/_data:/usr/share/elasticsearch/data \
 -v  /Users/xxx/docker-home/es-plugins:/usr/share/elasticsearch/plugins \
 -v  /Users/xxx/docker-home/es-config/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml \
 --privileged \
 --network es-net \
  elasticsearch:7.17.4

启动Kibana:

docker run -d \
--name kibana17 \
--network=es-net \
-p 5601:5601 \
-e ELASTICSEARCH_HOSTS=http://es7.17.4:9200 \ 
kibana:7.17.4

-e ELASTICSEARCH_HOSTS=http://es7.17.4:9200 \ 其中,es7.17.4的名称为上面es容器的名称

结果:
Go实现LogCollect:海量日志收集系统【下篇——开发LogTransfer】,go,demo,golang,开发语言,日志收集项目,kafka,LogTransfer,es

2 golang操作es

执行下面代码在es中添加索引,然后到kibana页面创建索引

package main

import (
	"context"
	"fmt"
	"github.com/olivere/elastic/v7"
)

type Tweet struct {
	User    string
	Message string
}

func main() {
	client, err := elastic.NewClient(elastic.SetSniff(false), elastic.SetURL("http://localhost:9200/"))
	if err != nil {
		fmt.Println("connect es error", err)
		return
	}

	fmt.Println("conn es succ")

	tweet := Tweet{User: "haohan", Message: "This is a test"}
	_, err = client.Index().
		Index("twitter").
		Id("1").
		BodyJson(tweet).
		Do(context.Background())
	if err != nil {
		// Handle error
		panic(err)
		return
	}

	fmt.Println("insert succ")
}
# 执行上面的go代码执行,控制台输出如下表明插入成功
conn es succ
insert succ

然后我们手动到kibana中添加对应的index即可搜索出对应数据

Go实现LogCollect:海量日志收集系统【下篇——开发LogTransfer】,go,demo,golang,开发语言,日志收集项目,kafka,LogTransfer,es
Go实现LogCollect:海量日志收集系统【下篇——开发LogTransfer】,go,demo,golang,开发语言,日志收集项目,kafka,LogTransfer,es

3 开发LogTransfer:从kafka中读取数据并写入es

在前面的开发中,我们已经将日志写入到了kafka。接下来我们要做的就是从kafka中消费数据,然后写入到es中。LogTransfer做的就是这个工作。

3.1 项目结构

├─config
│      logTransfer.conf
│
├─es
│      elasticsearch.go
│   
├─logs
│      my.log
│
└─main
		kafka.go
        config.go
        log.go
        main.go

Go实现LogCollect:海量日志收集系统【下篇——开发LogTransfer】,go,demo,golang,开发语言,日志收集项目,kafka,LogTransfer,es

3.2 项目代码

①LogTransfer/main/main.go
package main

import (
	"github.com/astaxie/beego/logs"
)

func main() {
	// 初始化配置
	err := InitConfig("ini", "/Users/xxx/GolandProjects/LogCollect/LogTransfer/config/log_transfer.conf")
	if err != nil {
		panic(err)
		return
	}
	logs.Debug("初始化配置成功")

	//初始化日志模块
	err = initLogger(logConfig.LogPath, logConfig.LogLevel)
	if err != nil {
		panic(err)
		return
	}
	logs.Debug("初始化日志模块成功")

	// 初始化Kafka
	err = InitKafka(logConfig.KafkaAddr, logConfig.KafkaTopic)
	if err != nil {
		logs.Error("初始化Kafka失败, err:", err)
		return
	}
	logs.Debug("初始化Kafka成功")
}
②LogTransfer/main/log.go
package main

import (
	"encoding/json"
	"fmt"
	"github.com/astaxie/beego/logs"
)

func convertLogLevel(level string) int {

	switch level {
	case "debug":
		return logs.LevelDebug
	case "warn":
		return logs.LevelWarn
	case "info":
		return logs.LevelInfo
	case "trace":
		return logs.LevelTrace
	}
	return logs.LevelDebug
}

func initLogger(logPath string, logLevel string) (err error) {

	config := make(map[string]interface{})
	config["filename"] = logPath
	config["level"] = convertLogLevel(logLevel)
	configStr, err := json.Marshal(config)
	if err != nil {
		fmt.Println("初始化日志, 序列化失败:", err)
		return
	}
	_ = logs.SetLogger(logs.AdapterFile, string(configStr))

	return
}
③LogTransfer/main/kafka.go
package main

import (
	"github.com/IBM/sarama"
	"github.com/astaxie/beego/logs"
	"strings"
)

type KafkaClient struct {
	client sarama.Consumer
	addr   string
	topic  string
}

var (
	kafkaClient *KafkaClient
)

func InitKafka(addr string, topic string) (err error) {

	kafkaClient = &KafkaClient{}
	consumer, err := sarama.NewConsumer(strings.Split(addr, ","), nil)
	if err != nil {
		logs.Error("启动Kafka消费者错误: %s", err)
		return nil
	}
	kafkaClient.client = consumer
	kafkaClient.addr = addr
	kafkaClient.topic = topic
	return
}
④LogTransfer/main/config.go
package main

import (
	"fmt"
	"github.com/astaxie/beego/config"
)

type LogConfig struct {
	KafkaAddr  string
	KafkaTopic string
	EsAddr     string
	LogPath    string
	LogLevel   string
}

var (
	logConfig *LogConfig
)

func InitConfig(confType string, filename string) (err error) {
	conf, err := config.NewConfig(confType, filename)
	if err != nil {
		fmt.Printf("初始化配置文件出错:%v\n", err)
		return
	}
	// 导入配置信息
	logConfig = &LogConfig{}
	// 日志级别
	logConfig.LogLevel = conf.String("logs::log_level")
	if len(logConfig.LogLevel) == 0 {
		logConfig.LogLevel = "debug"
	}
	// 日志输出路径
	logConfig.LogPath = conf.String("logs::log_path")
	if len(logConfig.LogPath) == 0 {
		logConfig.LogPath = "/Users/xxx/GolandProjects/LogCollect/LogTransfer/logs/log_transfer.log"
	}

	// Kafka
	logConfig.KafkaAddr = conf.String("kafka::server_addr")
	if len(logConfig.KafkaAddr) == 0 {
		err = fmt.Errorf("初识化Kafka addr失败")
		return
	}
	logConfig.KafkaTopic = conf.String("kafka::topic")
	if len(logConfig.KafkaAddr) == 0 {
		err = fmt.Errorf("初识化Kafka topic失败")
		return
	}

	// Es
	logConfig.EsAddr = conf.String("elasticsearch::addr")
	if len(logConfig.EsAddr) == 0 {
		err = fmt.Errorf("初识化Es addr失败")
		return
	}
	return
}

④LogTransfer/config/log_transfer.conf
[logs]
log_level = debug
log_path = "/Users/xxx/GolandProjects/LogCollect/LogTransfer/logs/log_transfer.log"

[kafka]
server_addr = localhost:9092
topic = nginx_log

[elasticsearch]
addr = http://localhost:9200/
⑤LogTransfer/es/es.go
package main

import (
	"context"
	"fmt"
	"github.com/olivere/elastic/v7"
)

type Tweet struct {
	User    string
	Message string
}

func main() {
	client, err := elastic.NewClient(elastic.SetSniff(false), elastic.SetURL("http://localhost:9200/"))
	if err != nil {
		fmt.Println("connect es error", err)
		return
	}

	fmt.Println("conn es succ")

	tweet := Tweet{User: "haohan", Message: "This is a test"}
	_, err = client.Index().
		Index("twitter").
		Id("1").
		BodyJson(tweet).
		Do(context.Background())
	if err != nil {
		// Handle error
		panic(err)
		return
	}

	fmt.Println("insert succ")
}
结果

LogTransfer的运行日志在LogTransfer/logs/log_transfer.log中

logs/log_transfer.log:

2023/09/02 19:55:29.037 [D]  初始化日志模块成功
2023/09/02 19:55:29.074 [D]  初始化Kafka成功

Go实现LogCollect:海量日志收集系统【下篇——开发LogTransfer】,go,demo,golang,开发语言,日志收集项目,kafka,LogTransfer,es

4 完成LogTransfer:将日志入库到es并通过kibana展示

前面我们将LogTransfer的配置初始化成功了,下面我们将从Kafka中消费数据,然后将日志入库到es,最后通过kibana展示。

Go实现LogCollect:海量日志收集系统【下篇——开发LogTransfer】,go,demo,golang,开发语言,日志收集项目,kafka,LogTransfer,es

4.1 将日志保存到es

在LogTransfer/main/main.go中添加初始化InitEs函数

①main.go中添加InitEs函数

LogTransfer/main/main.go:

package main

import (
	"github.com/astaxie/beego/logs"
	"logtransfer.com/es"
)

func main() {
	// 初始化配置
	err := InitConfig("ini", "/Users/xxx/GolandProjects/LogCollect/LogTransfer/config/log_transfer.conf")
	if err != nil {
		panic(err)
		return
	}
	logs.Debug("初始化配置成功")

	//初始化日志模块
	err = initLogger(logConfig.LogPath, logConfig.LogLevel)
	if err != nil {
		panic(err)
		return
	}
	logs.Debug("初始化日志模块成功")

	// 初始化Kafka
	err = InitKafka(logConfig.KafkaAddr, logConfig.KafkaTopic)
	if err != nil {
		logs.Error("初始化Kafka失败, err:", err)
		return
	}
	logs.Debug("初始化Kafka成功")
	// 初始化Es
	err = es.InitEs(logConfig.EsAddr)
	if err != nil {
		logs.Error("初始化Elasticsearch失败, err:", err)
		return
	}
	logs.Debug("初始化Es成功")

}

运行LogTransfer下的main.go可以发现log_transfer.log中输出的日志信息
Go实现LogCollect:海量日志收集系统【下篇——开发LogTransfer】,go,demo,golang,开发语言,日志收集项目,kafka,LogTransfer,es

②LogTransfer/es/es.go
package es

import (
	"fmt"
	"github.com/olivere/elastic/v7"
)

type Tweet struct {
	User    string
	Message string
}

var (
	esClient *elastic.Client
)

func InitEs(addr string) (err error) {
	client, err := elastic.NewClient(elastic.SetSniff(false), elastic.SetURL(addr))
	if err != nil {
		fmt.Println("connect es error", err)
		return nil
	}
	esClient = client
	return
}

运行LogTransfer/main下的main函数

  • 可以从logs/log_transfer.log中看到打印初始化es、kafka等成功
③添加run.go:消费kafka中的数据

在main函数中添加run函数, 用于运行kafka消费数据到Es

package main

import (
	"github.com/Shopify/sarama"
	"github.com/astaxie/beego/logs"
)

func run() (err error) {

	partitionList, err := kafkaClient.Client.Partitions(kafkaClient.Topic)

	if err != nil {
		logs.Error("Failed to get the list of partitions: ", err)
		return
	}
	for partition := range partitionList {
		pc, errRet := kafkaClient.Client.ConsumePartition(kafkaClient.Topic, int32(partition), sarama.OffsetNewest)
		if errRet != nil {
			err = errRet
			logs.Error("Failed to start consumer for partition %d: %s\n", partition, err)
			return
		}
		defer pc.AsyncClose()
		kafkaClient.wg.Add(1)
		go func(pc sarama.PartitionConsumer) {
			for msg := range pc.Messages() {
				logs.Debug("Partition:%d, Offset:%d, Key:%s, Value:%s", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
				err = es.SendToES(kafkaClient.topic, msg.Value)
				if err != nil {
					logs.Warn("send to es failed, err:%v", err)
				}
			}
			kafkaClient.wg.Done()
		}(pc)
	}

	kafkaClient.wg.Wait()

	return
}
④main.go中添加SendToES函数
package main

import (
	"github.com/astaxie/beego/logs"
	"logtransfer.com/es"
)

func main() {
	// 初始化配置
	err := InitConfig("ini", "/Users/xxx/GolandProjects/LogCollect/LogTransfer/config/log_transfer.conf")
	if err != nil {
		panic(err)
		return
	}
	logs.Debug("初始化配置成功")

	//初始化日志模块
	err = initLogger(logConfig.LogPath, logConfig.LogLevel)
	if err != nil {
		panic(err)
		return
	}
	logs.Debug("初始化日志模块成功")

	// 初始化Kafka
	err = InitKafka(logConfig.KafkaAddr, logConfig.KafkaTopic)
	if err != nil {
		logs.Error("初始化Kafka失败, err:", err)
		return
	}
	logs.Debug("初始化Kafka成功")
	// 初始化Es
	err = es.InitEs(logConfig.EsAddr)
	if err != nil {
		logs.Error("初始化Elasticsearch失败, err:", err)
		return
	}
	logs.Debug("初始化Es成功")
	// 运行
	err = run()
	if err != nil {
		logs.Error("运行错误, err:", err)
		return
	}
	select {}
}

5 联调

5.1 运行LogAgent:采集数据并存储到kafka

# 用于向docker中的etcd写入对应key
docker exec etcd1 etcdctl put /backend/logagent/config/192.168.0.103 "[{\"logpath\":\"/Users/xxx/GolandProjects/LogCollect/LogAgent/mysql_log.log\",\"topic\":\"mysql_log\"},{\"logpath\":\"/Users/xxx/GolandProjects/LogCollect/LogAgent/nginx_log.log\",\"topic\":\"nginx_log\"}]"

通过上面的命令,用于向etcd中写入对应key,etcd的watcher监视到后会对应更新配置

Go实现LogCollect:海量日志收集系统【下篇——开发LogTransfer】,go,demo,golang,开发语言,日志收集项目,kafka,LogTransfer,es

查看LogAgent的运行日志:
Go实现LogCollect:海量日志收集系统【下篇——开发LogTransfer】,go,demo,golang,开发语言,日志收集项目,kafka,LogTransfer,es

5.2 运行LogTransfer:消费kafka数据并存到es

选中LogTransfer下main文件夹下的所有go文件,鼠标右击运行,查看控制台输出

Go实现LogCollect:海量日志收集系统【下篇——开发LogTransfer】,go,demo,golang,开发语言,日志收集项目,kafka,LogTransfer,es
查看LogTransfer的运行日志:
Go实现LogCollect:海量日志收集系统【下篇——开发LogTransfer】,go,demo,golang,开发语言,日志收集项目,kafka,LogTransfer,es

5.3 在kibana创建index并查看

Management - Stack Management - Kibana - Index Patterns ,根据kafka中的topic创建对应的索引。以nginx_log为例:

Go实现LogCollect:海量日志收集系统【下篇——开发LogTransfer】,go,demo,golang,开发语言,日志收集项目,kafka,LogTransfer,es
回到overview,根据nginx_log这个index搜索信息:
Go实现LogCollect:海量日志收集系统【下篇——开发LogTransfer】,go,demo,golang,开发语言,日志收集项目,kafka,LogTransfer,es

可以看到成功读取到日志信息,至此该项目已开发完成

参考文章:https://blog.csdn.net/qq_43442524/article/details/105072952文章来源地址https://www.toymoban.com/news/detail-698674.html

到了这里,关于Go实现LogCollect:海量日志收集系统【下篇——开发LogTransfer】的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Loki 日志收集系统

       Loki 的日志堆栈由 3 个组件组成: promtail : 用于采集日志、并给每条日志流打标签,每个节点部署,k8s部署模式下使用daemonset管理。  loki: 用于存储采集的日志, 并根据标签查询日志流。单节点部署,一般和监控组件部署在同一节点。  Grafana: 提供界面,实现日志的

    2024年02月08日
    浏览(49)
  • ELK---日志收集系统

    1.要收集哪些日志? ①系统日志–为监控做准备 ②服务日志–数据库–MySQL–慢查询日志、错误日志、普通日志 ③业务日志–log4j( 必须要收集的是业务日志 ) 注:log4j—Java类的数据业务日志 (1)要有针对性的去收集 (2)调整日志级别 2.日志收集后,如何展示?(可视化

    2023年04月08日
    浏览(40)
  • 【ELK日志收集系统】

    目录 一、概述 1.作用 2.为什么使用? 二、组件 1.elasticsearch 1.1 作用 1.2 特点 2.logstash 2.1 作用 2.2 工作过程 2.3 INPUT 2.4 FILETER 2.5 OUTPUTS 3.kibana 三、架构类型 1.ELK 2.ELKK 3.ELFK 4.ELFKK 四、案例 - 构建ELK集群 1.环境配置 2.安装node1与node2节点的elasticsearch 2.1 安装 2.2 配置 2.3 启动ela

    2024年02月10日
    浏览(39)
  • ELK日志收集系统

    ELK由三个组件构成 作用:日志收集 日志分析 日志可视化     日志分析     开源的日志收集、分析、存储程序     特点         分布式         零配置         自动发现         索引自动分片         索引副本机制         Restful风格接口         多数据源

    2024年02月10日
    浏览(38)
  • ELK日志收集系统(四十九)

    提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 目录 前言 一、概述 二、组件 1. elasticsearch 2. logstash 2.1 工作过程 2.2 INPUT 2.3  FILETER 2.4  OUTPUTS 3. kibana 三、架构类型 3.1 ELK 3.2 ELKK 3.3 ELFK 3.5 EFK 四、案例 ELK日志收集系统是一种常用的开源系统,由三个主

    2024年02月10日
    浏览(43)
  • ELK 日志系统收集K8s中日志

    • K8s弹性伸缩性:导致不能预先确定采集的目标 • 容器隔离性:容器的文件系统与宿主机是隔离,导致日志采集器读取日志文件受阻。 应用程序日志记录体现方式分为两类: • 标准输出:输出到控制台,使用kubectl logs可以看到。 例如 nginx日志是将访问日志输出到标准输出

    2024年02月09日
    浏览(34)
  • 系统学习Linux-ELK日志收集系统

    ELK日志收集系统集群实验 实验环境 角色 主机名 IP 接口 httpd 192.168.31.50 ens33 node1 192.168.31.51 ens33 noed2 192.168.31.53 ens33 环境配置 设置各个主机的ip地址为拓扑中的静态ip,并修改主机名 安装elasticsearch node1 vim /etc/elasticsearch/elasticsearch.yml  node2 移动到elk文件夹 修改elasticsearch配置文

    2024年02月10日
    浏览(49)
  • ELK+Kafka+Zookeeper日志收集系统

    节点IP 节点规划 主机名 192.168.112.3 Elasticsearch + Kibana + Logstash + Zookeeper + Kafka + Nginx elk-node1 192.168.112.3 Elasticsearch + Logstash + Zookeeper + Kafka elk-node2 192.168.112.3 Elasticsearch + Logstash + Zookeeper + Kafka + Nginx elk-node3 修改主机名 配置映射 安装Elasticserach 三台主机都需安装java及elasticserach 启动

    2024年04月18日
    浏览(67)
  • 电商大数据日志收集系统之EFK

    背景 日志管理的挑战: 关注点很多,任何一个点都有可能引起问题 日志分散在很多机器,出了问题时,才发现日志被删了 很多运维人员是消防员,哪里有问题去哪里 集中化日志管理思路: 日志收集 ——》格式化分析 ——》检索和可视化 ——》 风险告警 ELK架构: ELK架构

    2024年02月07日
    浏览(38)
  • 大数据技术之flume——日志收集系统

    大数据需要解决的三个问题:采集、存储、计算。 Apache flume是一个分布式、可靠的、高可用的 海量日志数据采集、聚合和传输系统 ,将海量的日志数据从不同的数据源移动到一个中央的存储系统中。用一句话总结:Flume不生产数据,它只是数据的搬运工。 flume最主要的作用

    2024年02月06日
    浏览(40)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包