Skywalking oap 源码解读——链路前置处理

这篇具有很好参考价值的文章主要介绍了Skywalking oap 源码解读——链路前置处理。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

本文已参与腾源会发起的「开源摘星计划」

怎么样利用Skywalking oap源码为自己所用。首先需要了解Skywalking oap对链路信息的处理过程。再上一篇的基础上,我们已经完成了对skywalking8.7.0的源码编译。下面对其采集链路信息相关的模块进行读解。因为其源码功能模块众多,我们并能不一次性都看完,但是我们需要对功能模块有个总体认识,方便理解我们后续链路信息处理模块。
oap-server模块是Skywalking的OAP的实现模块,有下列子模块:

  1. analyzer:是ageng上传的对链路信息进行加工处理。
  2. server-core:OAP服务核心模块
  3. oal-rt:oal类生成引擎
  4. server-alarm-plugin:报警插件
  5. server-cluster-plugin:集群信息插件,支持etcd、kubernetes、nacos、Zookeeper等等服务的插件
  6. oal-grammar:oal语言的语法
  7. server-configuration:这是负责管理OAP配置信息,包括Apollo的配置、nacos、etcd、Zookeeper等等
  8. server-library:公共模块部分
  9. server-query-plugin:这一块是对查询的处理,处理的是SKywalking前台发过来的查询请求
  10. server-receiver-plugin:接收请求,包括Metrics、istio、jvm、mesh、trace等等插件,用来把数据写入Skywalking中
  11. server-starter-es7:这是OAP的starter启动模块
  12. server-storage-plugin:存储插件,也就是支持什么存储,有ElasticSearch、zipkin、jdbc等等,这些存储形式的插件也很丰富。

先打开模块:oap-server->server-starter-es7。如果使用elasticsearch7作为存储,则需要从这里启动。

Skywalking oap 源码解读——链路前置处理

主要流程:

查看类OAPServerBootstrap:

Skywalking oap 源码解读——链路前置处理

 如果是初次启动skyalking oap,可以设置环境变量mode=init,这样启动后系统只会进行系统初始化动作,例如kafka建立topic;elasticsearch建立index,数据库建表。所有的模块都会执行初始化动作。初始化完成后,系统运行结束,不会进行信息采集。如果不设置mode环境变量,则是正常启动oap。

Skywalking oap 源码解读——链路前置处理

这里是读取初始目录下面,config目录中的application.yml文件,根据配置加载各个模块的各个候选实现。具体加载流程可以参考java SPI。

这里只看我们用到的kafka-fetcher-plugin模块,他的位置在:

Skywalking oap 源码解读——链路前置处理

SPI机制会系统初始化时调用KafkaFetcherProvider类的prepare方法和start方法,来初始化kafka配置,例如按照application.yml文件的配置,建立好和kafka消息中间件的连接,如果没有topic就新建topic。随后,skywalking会等待java agent上报数据,包括消息类型有metrics/trace/manage/profile(jvm快照)等信息,分别对应相应的消息队列,并将kafka的消息队列和处理插件进行关联:

@Override
public void start() throws ServiceNotProvidedException, ModuleStartException {
    handlerRegister.register(new JVMMetricsHandler(getManager(), config));
    handlerRegister.register(new ServiceManagementHandler(getManager(), config));
    handlerRegister.register(new TraceSegmentHandler(getManager(), config));
    handlerRegister.register(new ProfileTaskHandler(getManager(), config));
    handlerRegister.register(new MeterServiceHandler(getManager(), config));

    if (config.isEnableNativeProtoLog()) {
        handlerRegister.register(new LogHandler(getManager(), config));
    }
    if (config.isEnableNativeJsonLog()) {
        handlerRegister.register(new JsonLogHandler(getManager(), config));
    }

    handlerRegister.start();
}

如果java agent上报trace信息时,就会调用run方法:

@Override
public void run() {
    while (true) {
        try {
            ConsumerRecords<String, Bytes> consumerRecords = consumer.poll(Duration.ofMillis(500L));
            if (!consumerRecords.isEmpty()) {
                for (final ConsumerRecord<String, Bytes> record : consumerRecords) {
                    executor.submit(() -> handlerMap.get(record.topic()).handle(record));
                }
                if (!enableKafkaMessageAutoCommit) {
                    consumer.commitAsync();
                }
            }
        } catch (Exception e) {
            log.error("Kafka handle message error.", e);
        }
    }
}

每隔500毫秒去访问一次kafka,oap对每一个消息都会新建一个线程去处理。处理完(consumer.commitAsync();会更新kafka读取的游标,以免重复读取。根据之前的队列绑定关系,如果消息是一个segment信息,则会调用TraceSegmentHandler的handle方法。Handle方法会调用核心分析模块去进一步处理segment信息,也就是agent-analyzer模块:

Skywalking oap 源码解读——链路前置处理

SegmentParserServiceImpl类中的send方法:traceAnalyzer.doAnalysis(segment);完成数据转换并存入我们配置好的elasticsearch7存储中。这一段处理过程时我们下一课重点展开研究的地方。也是提取分析skywalking核心数据结构的地方。文章来源地址https://www.toymoban.com/news/detail-429423.html

到了这里,关于Skywalking oap 源码解读——链路前置处理的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 链路追踪Skywalking应用实战

    2023年09月05日
    浏览(46)
  • skywalking springgateway 全链路

    spring-cloud-gateway 3.1.0 skywalking 默认是不整合springGateway的,需要手动拷贝skywalking optional-plugins 下的 apm-spring-cloud-gateway-N.x-plugin-8.13.0.jar 和 apm-spring-webflux-5.x-plugin-8.13.0.jar 架包拷贝到 plugins 目录下 gateway架包的选择根据springgateway的版本进行选择 经过上一步配置的请求会存在调用链

    2024年02月10日
    浏览(45)
  • 链路追踪Skywalking快速入门

    2023年09月08日
    浏览(45)
  • skywalking全链路追踪

    在上一篇文章skywalking安装教程中我们介绍了skywalking的作用以及如何将其集成到我们的微服务项目中。本篇文章我们介绍在微服务架构中,如何使用skywalking对一次客户端请求进行全链路追踪。 skywalking的介绍分多篇文章: 微服务项目集成skywalking skywalking全链路追踪 何为全链路

    2024年02月14日
    浏览(40)
  • 全网最全的Skywalking链路追踪

    写在前面 :笔者发现目前关于Skywalking的内容很是零散,没有成型的内容,笔者在项目中使用到Skywalking进行埋点分析,下面分三篇来介绍下Skywalking,分别是Skywalking基本知识,Skywalking基于docke安装,SpringBoot工程集成Skywalking 服务监控需要满足的三要素分别如下: 日志监控 指标

    2023年04月08日
    浏览(38)
  • Skywalking全链路追踪【学习笔记】

    Skywalking全链路追踪的服务搭建,使用docker进行安装。 搭建【ES】 搭建【SkyWalking】 访问这里:http://localhost:9898/ 就有界面了 启动配置添加【Agent】 日志配置添加【日志】 完成 (~ ̄▽ ̄)~ 本地测试接口请求 然后登入http://localhost:9898/ 进行查看数据 参考 SkyWalking 教程:https

    2024年02月12日
    浏览(33)
  • 微服务链路追踪SkyWalking学习笔记

    目录 1、skywalking是什么 1.2 链路追踪框架对比 1.3 性能对比 1.4 Skywalking主要功能特性 2、 SkyWalking 环境搭建部署 2.1 下载 SkyWalking   2.2 搭建SkyWalking OAP 服务 2.3 SkyWalking中三个概念 3、 SkyWalking 接入微服务 3.1 linux环境—通过jar包方式接入 3.2 windos环境—在IDEA中使用Sk

    2024年02月14日
    浏览(35)
  • SkyWalking分布式链路追踪学习

    实际生产中,面对几十个、甚至成百上千个的微服务实例,如果一旦某个实例发生宕机,如果不能快速定位、提交预警,对实际生产造成的损失无疑是巨大的。所以,要对微服务进行监控、预警,对微服务的调用链路进行监控,迅速定位问题 SkyWalking下载 SkyWalking官网 elastic

    2024年02月07日
    浏览(42)
  • SkyWalking链路追踪-技术文档首页

    SkyWalking链路追踪-融合-spring-boot-cloud-单机环境 之《10 分钟快速搭建spring-boot-cloud整合SkyWalking链路追踪》_一单成的博客-CSDN博客 SkyWalking链路追踪-搭建-spring-boot-cloud-单机环境 之《10 分钟快速搭建 SkyWalking 服务》_一单成的博客-CSDN博客  SkyWalking链路追踪-Collector(收集器)_一单

    2024年02月15日
    浏览(67)
  • 分布式链路追踪之SkyWalking

      在微服务架构中,一次请求往往涉及到多个模块,多个中间件,多台机器的相互协作才能完成。这一系列调用请求中,有些是串行的,有些是并行的,那么如何确定这个请求背后调用了哪些应用,哪些模块,哪些节点及调用的先后顺序?如何定位每个模块的性能问题?本

    2023年04月20日
    浏览(52)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包