本文已参与腾源会发起的「开源摘星计划」
怎么样利用Skywalking oap源码为自己所用。首先需要了解Skywalking oap对链路信息的处理过程。再上一篇的基础上,我们已经完成了对skywalking8.7.0的源码编译。下面对其采集链路信息相关的模块进行读解。因为其源码功能模块众多,我们并能不一次性都看完,但是我们需要对功能模块有个总体认识,方便理解我们后续链路信息处理模块。
oap-server模块是Skywalking的OAP的实现模块,有下列子模块:
- analyzer:是ageng上传的对链路信息进行加工处理。
- server-core:OAP服务核心模块
- oal-rt:oal类生成引擎
- server-alarm-plugin:报警插件
- server-cluster-plugin:集群信息插件,支持etcd、kubernetes、nacos、Zookeeper等等服务的插件
- oal-grammar:oal语言的语法
- server-configuration:这是负责管理OAP配置信息,包括Apollo的配置、nacos、etcd、Zookeeper等等
- server-library:公共模块部分
- server-query-plugin:这一块是对查询的处理,处理的是SKywalking前台发过来的查询请求
- server-receiver-plugin:接收请求,包括Metrics、istio、jvm、mesh、trace等等插件,用来把数据写入Skywalking中
- server-starter-es7:这是OAP的starter启动模块
- server-storage-plugin:存储插件,也就是支持什么存储,有ElasticSearch、zipkin、jdbc等等,这些存储形式的插件也很丰富。
先打开模块:oap-server->server-starter-es7。如果使用elasticsearch7作为存储,则需要从这里启动。
主要流程:
查看类OAPServerBootstrap:
如果是初次启动skyalking oap,可以设置环境变量mode=init,这样启动后系统只会进行系统初始化动作,例如kafka建立topic;elasticsearch建立index,数据库建表。所有的模块都会执行初始化动作。初始化完成后,系统运行结束,不会进行信息采集。如果不设置mode环境变量,则是正常启动oap。
这里是读取初始目录下面,config目录中的application.yml文件,根据配置加载各个模块的各个候选实现。具体加载流程可以参考java SPI。
这里只看我们用到的kafka-fetcher-plugin模块,他的位置在:
SPI机制会系统初始化时调用KafkaFetcherProvider类的prepare方法和start方法,来初始化kafka配置,例如按照application.yml文件的配置,建立好和kafka消息中间件的连接,如果没有topic就新建topic。随后,skywalking会等待java agent上报数据,包括消息类型有metrics/trace/manage/profile(jvm快照)等信息,分别对应相应的消息队列,并将kafka的消息队列和处理插件进行关联:
@Override
public void start() throws ServiceNotProvidedException, ModuleStartException {
handlerRegister.register(new JVMMetricsHandler(getManager(), config));
handlerRegister.register(new ServiceManagementHandler(getManager(), config));
handlerRegister.register(new TraceSegmentHandler(getManager(), config));
handlerRegister.register(new ProfileTaskHandler(getManager(), config));
handlerRegister.register(new MeterServiceHandler(getManager(), config));
if (config.isEnableNativeProtoLog()) {
handlerRegister.register(new LogHandler(getManager(), config));
}
if (config.isEnableNativeJsonLog()) {
handlerRegister.register(new JsonLogHandler(getManager(), config));
}
handlerRegister.start();
}
如果java agent上报trace信息时,就会调用run方法:
@Override
public void run() {
while (true) {
try {
ConsumerRecords<String, Bytes> consumerRecords = consumer.poll(Duration.ofMillis(500L));
if (!consumerRecords.isEmpty()) {
for (final ConsumerRecord<String, Bytes> record : consumerRecords) {
executor.submit(() -> handlerMap.get(record.topic()).handle(record));
}
if (!enableKafkaMessageAutoCommit) {
consumer.commitAsync();
}
}
} catch (Exception e) {
log.error("Kafka handle message error.", e);
}
}
}
每隔500毫秒去访问一次kafka,oap对每一个消息都会新建一个线程去处理。处理完(consumer.commitAsync();)会更新kafka读取的游标,以免重复读取。根据之前的队列绑定关系,如果消息是一个segment信息,则会调用TraceSegmentHandler的handle方法。Handle方法会调用核心分析模块去进一步处理segment信息,也就是agent-analyzer模块:
文章来源:https://www.toymoban.com/news/detail-429423.html
SegmentParserServiceImpl类中的send方法:traceAnalyzer.doAnalysis(segment);完成数据转换并存入我们配置好的elasticsearch7存储中。这一段处理过程时我们下一课重点展开研究的地方。也是提取分析skywalking核心数据结构的地方。文章来源地址https://www.toymoban.com/news/detail-429423.html
到了这里,关于Skywalking oap 源码解读——链路前置处理的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!