Skywalking oap 源码解读——链路前置处理

这篇具有很好参考价值的文章主要介绍了Skywalking oap 源码解读——链路前置处理。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

本文已参与腾源会发起的「开源摘星计划」

怎么样利用Skywalking oap源码为自己所用。首先需要了解Skywalking oap对链路信息的处理过程。再上一篇的基础上,我们已经完成了对skywalking8.7.0的源码编译。下面对其采集链路信息相关的模块进行读解。因为其源码功能模块众多,我们并能不一次性都看完,但是我们需要对功能模块有个总体认识,方便理解我们后续链路信息处理模块。
oap-server模块是Skywalking的OAP的实现模块,有下列子模块:

  1. analyzer:是ageng上传的对链路信息进行加工处理。
  2. server-core:OAP服务核心模块
  3. oal-rt:oal类生成引擎
  4. server-alarm-plugin:报警插件
  5. server-cluster-plugin:集群信息插件,支持etcd、kubernetes、nacos、Zookeeper等等服务的插件
  6. oal-grammar:oal语言的语法
  7. server-configuration:这是负责管理OAP配置信息,包括Apollo的配置、nacos、etcd、Zookeeper等等
  8. server-library:公共模块部分
  9. server-query-plugin:这一块是对查询的处理,处理的是SKywalking前台发过来的查询请求
  10. server-receiver-plugin:接收请求,包括Metrics、istio、jvm、mesh、trace等等插件,用来把数据写入Skywalking中
  11. server-starter-es7:这是OAP的starter启动模块
  12. server-storage-plugin:存储插件,也就是支持什么存储,有ElasticSearch、zipkin、jdbc等等,这些存储形式的插件也很丰富。

先打开模块:oap-server->server-starter-es7。如果使用elasticsearch7作为存储,则需要从这里启动。

Skywalking oap 源码解读——链路前置处理

主要流程:

查看类OAPServerBootstrap:

Skywalking oap 源码解读——链路前置处理

 如果是初次启动skyalking oap,可以设置环境变量mode=init,这样启动后系统只会进行系统初始化动作,例如kafka建立topic;elasticsearch建立index,数据库建表。所有的模块都会执行初始化动作。初始化完成后,系统运行结束,不会进行信息采集。如果不设置mode环境变量,则是正常启动oap。

Skywalking oap 源码解读——链路前置处理

这里是读取初始目录下面,config目录中的application.yml文件,根据配置加载各个模块的各个候选实现。具体加载流程可以参考java SPI。

这里只看我们用到的kafka-fetcher-plugin模块,他的位置在:

Skywalking oap 源码解读——链路前置处理

SPI机制会系统初始化时调用KafkaFetcherProvider类的prepare方法和start方法,来初始化kafka配置,例如按照application.yml文件的配置,建立好和kafka消息中间件的连接,如果没有topic就新建topic。随后,skywalking会等待java agent上报数据,包括消息类型有metrics/trace/manage/profile(jvm快照)等信息,分别对应相应的消息队列,并将kafka的消息队列和处理插件进行关联:

@Override
public void start() throws ServiceNotProvidedException, ModuleStartException {
    handlerRegister.register(new JVMMetricsHandler(getManager(), config));
    handlerRegister.register(new ServiceManagementHandler(getManager(), config));
    handlerRegister.register(new TraceSegmentHandler(getManager(), config));
    handlerRegister.register(new ProfileTaskHandler(getManager(), config));
    handlerRegister.register(new MeterServiceHandler(getManager(), config));

    if (config.isEnableNativeProtoLog()) {
        handlerRegister.register(new LogHandler(getManager(), config));
    }
    if (config.isEnableNativeJsonLog()) {
        handlerRegister.register(new JsonLogHandler(getManager(), config));
    }

    handlerRegister.start();
}

如果java agent上报trace信息时,就会调用run方法:

@Override
public void run() {
    while (true) {
        try {
            ConsumerRecords<String, Bytes> consumerRecords = consumer.poll(Duration.ofMillis(500L));
            if (!consumerRecords.isEmpty()) {
                for (final ConsumerRecord<String, Bytes> record : consumerRecords) {
                    executor.submit(() -> handlerMap.get(record.topic()).handle(record));
                }
                if (!enableKafkaMessageAutoCommit) {
                    consumer.commitAsync();
                }
            }
        } catch (Exception e) {
            log.error("Kafka handle message error.", e);
        }
    }
}

每隔500毫秒去访问一次kafka,oap对每一个消息都会新建一个线程去处理。处理完(consumer.commitAsync();会更新kafka读取的游标,以免重复读取。根据之前的队列绑定关系,如果消息是一个segment信息,则会调用TraceSegmentHandler的handle方法。Handle方法会调用核心分析模块去进一步处理segment信息,也就是agent-analyzer模块:

Skywalking oap 源码解读——链路前置处理

SegmentParserServiceImpl类中的send方法:traceAnalyzer.doAnalysis(segment);完成数据转换并存入我们配置好的elasticsearch7存储中。这一段处理过程时我们下一课重点展开研究的地方。也是提取分析skywalking核心数据结构的地方。文章来源地址https://www.toymoban.com/news/detail-429423.html

到了这里,关于Skywalking oap 源码解读——链路前置处理的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 链路追踪Skywalking快速入门

    2023年09月08日
    浏览(44)
  • 链路追踪Skywalking应用实战

    2023年09月05日
    浏览(44)
  • skywalking springgateway 全链路

    spring-cloud-gateway 3.1.0 skywalking 默认是不整合springGateway的,需要手动拷贝skywalking optional-plugins 下的 apm-spring-cloud-gateway-N.x-plugin-8.13.0.jar 和 apm-spring-webflux-5.x-plugin-8.13.0.jar 架包拷贝到 plugins 目录下 gateway架包的选择根据springgateway的版本进行选择 经过上一步配置的请求会存在调用链

    2024年02月10日
    浏览(43)
  • skywalking全链路监控部署

    环境配置要求 Centos 7 jdk1.8.0.144 elasticsearch 7.8.1 skyWalking 3.2.6 一、安装jdk1.8 创建相关目录,解压安装包,解压并移动到jdk目录下 设置环境变量,查看jdk版本 二、安装elasticsearch 2.1、创建用户elasticsearch useradd es passwd es 2.2、下载elasticsearch7.8.1,并解压到相关目录 2.3、修改系统参

    2024年02月08日
    浏览(35)
  • Skywalking全链路追踪【学习笔记】

    Skywalking全链路追踪的服务搭建,使用docker进行安装。 搭建【ES】 搭建【SkyWalking】 访问这里:http://localhost:9898/ 就有界面了 启动配置添加【Agent】 日志配置添加【日志】 完成 (~ ̄▽ ̄)~ 本地测试接口请求 然后登入http://localhost:9898/ 进行查看数据 参考 SkyWalking 教程:https

    2024年02月12日
    浏览(33)
  • 全网最全的Skywalking链路追踪

    写在前面 :笔者发现目前关于Skywalking的内容很是零散,没有成型的内容,笔者在项目中使用到Skywalking进行埋点分析,下面分三篇来介绍下Skywalking,分别是Skywalking基本知识,Skywalking基于docke安装,SpringBoot工程集成Skywalking 服务监控需要满足的三要素分别如下: 日志监控 指标

    2023年04月08日
    浏览(35)
  • 分布式链路追踪之SkyWalking

      在微服务架构中,一次请求往往涉及到多个模块,多个中间件,多台机器的相互协作才能完成。这一系列调用请求中,有些是串行的,有些是并行的,那么如何确定这个请求背后调用了哪些应用,哪些模块,哪些节点及调用的先后顺序?如何定位每个模块的性能问题?本

    2023年04月20日
    浏览(49)
  • SkyWalking分布式链路追踪学习

    实际生产中,面对几十个、甚至成百上千个的微服务实例,如果一旦某个实例发生宕机,如果不能快速定位、提交预警,对实际生产造成的损失无疑是巨大的。所以,要对微服务进行监控、预警,对微服务的调用链路进行监控,迅速定位问题 SkyWalking下载 SkyWalking官网 elastic

    2024年02月07日
    浏览(40)
  • SkyWalking链路追踪中span全解

            在SkyWalking链路追踪中,Span(跨度)是Trace(追踪)的组成部分之一。Span代表一次调用或操作的单个组件,可以是一个方法调用、一个HTTP请求或者其他类型的操作。         每个Span都包含了一些关键的信息,如开始时间、结束时间、耗时、所属的Trace ID、Spa

    2024年02月16日
    浏览(37)
  • 微服务链路追踪SkyWalking学习笔记

    目录 1、skywalking是什么 1.2 链路追踪框架对比 1.3 性能对比 1.4 Skywalking主要功能特性 2、 SkyWalking 环境搭建部署 2.1 下载 SkyWalking   2.2 搭建SkyWalking OAP 服务 2.3 SkyWalking中三个概念 3、 SkyWalking 接入微服务 3.1 linux环境—通过jar包方式接入 3.2 windos环境—在IDEA中使用Sk

    2024年02月14日
    浏览(33)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包