eBPF系列之:DeepFlow 扩展协议解析实践(MongoDB协议与Kafka协议)

这篇具有很好参考价值的文章主要介绍了eBPF系列之:DeepFlow 扩展协议解析实践(MongoDB协议与Kafka协议)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

原文:https://blog.mickeyzzc.tech/posts/ebpf/deepflow-agent-proto-dev

MongoDB 目前使用广泛,但是缺乏有效的可观测能力。DeepFlow 在可观测能力上是很优秀的解决方案,但是却缺少了对 MongoDB 协议的支持。该文是为 DeepFlow 扩展了 MongoDB 协议解析,增强 MongoDB 生态的可观测能力,简要描述了从协议文档分析到在 DeepFlow 内实现代码解析的过程拆解。

0x0: 如何分析一个协议(MongoDB)

协议文档的分析思路

首先要从官方网站找到协议解析的文档,在协议文档《mongodb-wire-protocol#standard-message-header》中,可以看到 MongoDB 的协议头结构体描述如下:

struct MsgHeader {
    int32   messageLength;     // total message size, including this
    int32   requestID;         // identifier for this message
    int32   responseTo;        // requestID from the original request
                               //   (used in responses from the database)
    int32   opCode;            // message type
}

上述结构代码理解为下图所示:

eBPF系列之:DeepFlow 扩展协议解析实践(MongoDB协议与Kafka协议),Opentelemetry,eBPF,mongodb,kafka,数据库

⚠️注意,在协议文档《mongodb-wire-protocol》有一段说明,MongoDB 协议是用了字节小端顺序:

Byte Ordering
All integers in the MongoDB wire protocol use little-endian byte order: that is, least-significant

接下来从实际的抓包看一下实际的数据是长什么样子的:
eBPF系列之:DeepFlow 扩展协议解析实践(MongoDB协议与Kafka协议),Opentelemetry,eBPF,mongodb,kafka,数据库

0000   a3 00 00 00 0a 50 88 48 23 00 00 00 dd 07 00 00

0010   00 00 00 00 00 8e 00 00 00 01 6f 6b 00 00 00 00
0020   00 00 00 f0 3f 11 6f 70 65 72 61 74 69 6f 6e 54
0030   69 6d 65 00 01 00 00 00 bc 1d c3 64 03 24 63 6c
0040   75 73 74 65 72 54 69 6d 65 00 58 00 00 00 11 63
0050   6c 75 73 74 65 72 54 69 6d 65 00 01 00 00 00 bc
0060   1d c3 64 03 73 69 67 6e 61 74 75 72 65 00 33 00
0070   00 00 05 68 61 73 68 00 14 00 00 00 00 29 12 d4
0080   7f 78 52 55 42 04 29 2f b7 36 85 39 c1 47 66 05
0090   de 12 6b 65 79 49 64 00 01 00 00 00 8c d2 e4 63
00a0   00 00 00

上述的抓包数据简单拆解到如下信息:

  • 字段 messageLengtha3 00 00 00 :即 消息长度为 a3
  • 字段 requestID0a 50 88 48:即 请求ID为 4888500a
  • 字段 responseTo23 00 00 00:即 对ID为 23 的响应
  • 字段 opCodedd 07 00 00:即 命令号为 7dd,十进制是 2013,对应协议文档中的 OP_MSG 指令

MongoDB 协议操作码说明表

操作码名称 操作码 操作码说明 额外说明
OP_COMPRESSED 2012 使用压缩
OP_MSG 2013 Send a message using the standard format. Used for both client requests and database replies.
OP_REPLY 1 通过responseTo指定响应客户端请求。 Deprecated in MongoDB 5.0. Removed in MongoDB 5.1.|
OP_UPDATE 2001 更新文档 Deprecated in MongoDB 5.0. Removed in MongoDB 5.1.
OP_INSERT 2002 插入文档 Deprecated in MongoDB 5.0. Removed in MongoDB 5.1.
RESERVED 2003
OP_QUERY 2004 查询文档 Deprecated in MongoDB 5.0. Removed in MongoDB 5.1.
OP_GET_MORE 2005 Deprecated in MongoDB 5.0. Removed in MongoDB 5.1.
OP_DELETE 2006 删除文档 Deprecated in MongoDB 5.0. Removed in MongoDB 5.1.
OP_KILL_CURSORS 2007 Deprecated in MongoDB 5.0. Removed in MongoDB 5.1.

对最常见的操作码 OP_MSG 分析
从协议文档 《mongodb-wire-protocol#op_msg》 查看 OP_MSG 的结构体:

OP_MSG {
    MsgHeader header;              // standard message header
    uint32 flagBits;               // message flags
    Sections[] sections;           // data sections
    optional<uint32> checksum;     // optional CRC-32C checksum
}

OP_MSG 需要关注的解码内容在 Sections,只需要判断 kind01 的情况,其中:

  • 0:后面直接用 BSON 解码
  • 1:先偏移 int32c_string 占用的 byte 后,用 BSON 解码后面的内容
    eBPF系列之:DeepFlow 扩展协议解析实践(MongoDB协议与Kafka协议),Opentelemetry,eBPF,mongodb,kafka,数据库

从实际抓包看一下原始数据。如下所示,MongoDB协议的操作码 OP_MSG 内容从第十六(从0开始数,后续文档统一按此规律)字节开始:
eBPF系列之:DeepFlow 扩展协议解析实践(MongoDB协议与Kafka协议),Opentelemetry,eBPF,mongodb,kafka,数据库

0000   a3 00 00 00 0a 50 88 48 23 00 00 00 dd 07 00 00
0010   00 00 00 00 
                   00 
                      8e 00 00 00 01 6f 6b 00 00 00 00
0020   00 00 00 f0 3f 11 6f 70 65 72 61 74 69 6f 6e 54
0030   69 6d 65 00 01 00 00 00 bc 1d c3 64 03 24 63 6c
0040   75 73 74 65 72 54 69 6d 65 00 58 00 00 00 11 63
0050   6c 75 73 74 65 72 54 69 6d 65 00 01 00 00 00 bc
0060   1d c3 64 03 73 69 67 6e 61 74 75 72 65 00 33 00
0070   00 00 05 68 61 73 68 00 14 00 00 00 00 29 12 d4
0080   7f 78 52 55 42 04 29 2f b7 36 85 39 c1 47 66 05
0090   de 12 6b 65 79 49 64 00 01 00 00 00 8c d2 e4 63
00a0   00 00 00

不需要关心字段 flagBits ,偏移4个字节后从第四个字节判断字段 kind 类型。由此判断后面为 BSON 结构数据。

到这里我们已经基本了解到 MongoDB 协议的数据结构和解码思路了,接下来我们开始在 DeepFlow Agent 中尝试实现解码观察。

0x1: 在 DeepFlow Agent 扩展一个协议解析采集

DeepFlow Agent 的开发文档概要

前提, DeepFlow Agent 的原生开发需要掌握 Rust 语言的基础开发能力。
接下来先参考官方文档《HOW_TO_SUPPORT_YOUR_PROTOCOL_CN》了解几个关键信息:

  • L7Protocol 用于标识协议常量
    源码位置:deepflow/agent/crates/public/src/l7_protocol.rs

  • L7ProtocolParser 主要用于协议判断和解析出 L7ProtocolInfo(七层协议的基础结构信息)
    源码位置:deepflow/agent/src/common/l7_protocol_log.rs

  • L7ProtocolInfoL7ProtocolParser 解析出来,并且用于后续会话聚合
    源码位置:deepflow/agent/src/common/l7_protocol_info.rs

  • L7ProtocolInfoInterface 七层协议结构L7ProtocolInfo 都需要实现这个接口来处理特征逻辑
    源码位置:deepflow/agent/src/common/l7_protocol_info.rs

  • L7ProtocolSendLog 统一发送到 deepflow-server 的结构
    源码位置:deepflow/agent/src/flow_generator/protocol_logs/pb_adapter.rs

在 DeepFlow Agent 中开发的大致步骤:

  • deepflow/agent/crates/public/src/l7_protocol.rs 添加对应协议名称和协议号。
  • L7ProtocolParser::parse_payload() 需要返回 L7ProtocolInfo,所以需要先定义一个结构,实现 L7ProtocolInfoInterface 接口并且添加到 L7ProtocolInfo 这个枚举。
  • 实现 L7ProtocolParserInterface 接口,并添加到 deepflow/agent/src/common/l7_protocol_log.rs 中的 impl_protocol_parser! 宏。
  • deepflow-server 中只需增加一个常量用于搜索提示即可。

代码指引

  1. 定义一个协议,并用一个常量标识:

    源码位置:deepflow/agent/crates/public/src/l7_protocol.rs,DeepFlow Agent 通过遍历所有支持协议判断一个流的应用层协议。
    这里说明一下,由于业界的通用应用协议没有一个约束字段来定义应用协议类型,所以在大量网络包是通过遍历已知协议解码逻辑来判断应用层协议的。

    pub enum L7Protocol {
    	#[num_enum(default)]
    	Unknown = 0,
    	Other = 1,
    	// HTTP
    	Http1 = 20,
    	Http2 = 21,
    	Http1TLS = 22,
    	Http2TLS = 23,
    	// RPC
    	Dubbo = 40,
    	Grpc = 41,
    	SofaRPC = 43,
    	FastCGI = 44,
    	// SQL
    	MySQL = 60,
    	PostgreSQL = 61,
    	// NoSQL
    	Redis = 80,
    +   MongoDB = 81,
    	// MQ
    	Kafka = 100,
    	MQTT = 101,
    	// INFRA
    	DNS = 120,
    	Custom = 127,
    	Max = 255,
    }
    
    impl From<String> for L7Protocol {
    	fn from(l7_protocol_str: String) -> Self {
    		let l7_protocol_str = l7_protocol_str.to_lowercase();
    		match l7_protocol_str.as_str() {
    			"http" | "https" => Self::Http1,
    			"dubbo" => Self::Dubbo,
    			"grpc" => Self::Grpc,
    			"fastcgi" => Self::FastCGI,
    			"custom" => Self::Custom,
    			"sofarpc" => Self::SofaRPC,
    			"mysql" => Self::MySQL,
    +           "mongodb" => Self::MongoDB,
    			"postgresql" => Self::PostgreSQL,
    			"redis" => Self::Redis,
    			"kafka" => Self::Kafka,
    			"mqtt" => Self::MQTT,
    			"dns" => Self::DNS,
    			_ => Self::Unknown,
    		}
    	}
    }
    
  2. 为新协议准备解析逻辑

    定义结构体:
    deepflow/agent/src/flow_generator/protocol_logs/ 该路径下找一个目录建立相关的协议解析逻辑代码文件,该案例的代码文件放在上述目录下的 sql/mongo.rs

    pub struct MongoDBInfo {
    	msg_type: LogMessageType,
    	#[serde(rename = "req_len")]
    	pub req_len: u32,
    	#[serde(rename = "resp_len")]
    	pub resp_len: u32,
    	 参考“deepflow/agent/src/flow_generator/protocol_logs/pb_adapter.rs” 
    	//   准备要处理的结构体。
    	//   其中“request_id”、“response_id”、“op_code”和“op_code_name”是
    	//   从mongodb header解析出来的关键信息。
    	#[serde(rename = "request_id")]
    	pub request_id: u32,
    	#[serde(rename = "response_id")]
    	pub response_id: u32,
    	#[serde(rename = "op_code")]
    	pub op_code: u32,
    	#[serde(skip)]
    	pub op_code_name: String,
    	 “request”、“response”和“response_code”是
    	//   从mongodb协议主体内容解析出来的所需信息。
    	#[serde(rename = "request_resource"]
    	pub request: String,
    	#[serde(skip)]
    	pub response: String,
    	#[serde(rename = "response_code"]
    	pub response_code: i32,
    	
    	#[serde(rename = "response_status")]
    	pub status: L7ResponseStatus,
    }
    
  3. 实现 L7ProtocolParserInterface

    • 先看源码结构逻辑(以下只显示需处理函数,不需处理的保留默认逻辑即可)

      #[enum_dispatch]
      pub trait L7ProtocolParserInterface {
      	fn check_payload(&mut self, payload: &[u8], param: &ParseParam) -> bool;
      	// 协议解析
      	fn parse_payload(&mut self, payload: &[u8], param: &ParseParam) -> Result<L7ParseResult>;
      	// 返回协议号和协议名称,由于的bitmap使用u128,所以协议号不能超过128.
      	// 其中 crates/public/src/l7_protocol.rs 里面的 pub const L7_PROTOCOL_xxx 是已实现的协议号.
      	// ===========================================================================================
      	// return protocol number and protocol string. because of bitmap use u128, so the max protocol number can not exceed 128
      	// crates/public/src/l7_protocol.rs, pub const L7_PROTOCOL_xxx is the implemented protocol.
      	fn protocol(&self) -> L7Protocol;
      	// l4是tcp时是否解析,用于快速过滤协议
      	// ==============================
      	// whether l4 is parsed when tcp, use for quickly protocol filter
      	fn parsable_on_tcp(&self) -> bool {
      		true
      	}
      	// l4是udp是是否解析,用于快速过滤协议
      	// ==============================
      	// whether l4 is parsed when udp, use for quickly protocol filter
      	fn parsable_on_udp(&self) -> bool {
      		true
      	}
      	// return perf data
      	fn perf_stats(&mut self) -> Option<L7PerfStats>;
      }
      
    • 解码协议的第一步是如何识别协议,代码中需处理 L7ProtocolParserInterface::check_payload() 逻辑

    • 定义 MongoDB 协议头并解码

      // 定义MongoDB协议头结构体,并对必要信息字段一一解码
      #[derive(Clone, Debug, Default, Serialize)]
      pub struct MongoDBHeader {
      	length: u32,
      	request_id: u32,
      	response_to: u32,
      	op_code: u32,
      	op_code_name: String,
      }
      
      impl MongoDBHeader {
      	fn decode(&mut self, payload: &[u8]) -> isize {
      // 对payload前16位以MongoDBHeader结构解码,判断是否符合MongoDB的协议
      	}
      	fn is_request(&self) -> bool {
      // 解码op_code判断是否request
      	}
      	pub fn get_op_str(&self) -> &'static str {
      // 解码op_code出对应文本描述
      	}
      }
      
    • L7ProtocolParserInterface::check_payload() 调用 MongoDB 协议头解码逻辑
      在此过程,把 protocol(&self)parsable_on_udp(&self) 也一并处理。

      impl L7ProtocolParserInterface for MongoDBLog {
      	fn check_payload(&mut self, payload: &[u8], param: &ParseParam) -> bool {
      		let mut header = MongoDBHeader::default();
      		header.decode(payload);
      		return header.is_request();
      	}
      	fn protocol(&self) -> L7Protocol {
      		L7Protocol::MongoDB
      	}
      	// udp协议的跳过解码
      	fn parsable_on_udp(&self) -> bool {false}
      }
      
    • 第一步的效果展示
      到这一步的解码将会得到如下展示效果,接下来还需要对具体的协议操作码做进一步解码。
      eBPF系列之:DeepFlow 扩展协议解析实践(MongoDB协议与Kafka协议),Opentelemetry,eBPF,mongodb,kafka,数据库

    • 解码协议的第二步是对关键指令定义结构体和解码接口逻辑实现,对应处理是 L7ProtocolParserInterface::parse_payload() 代码实现,这里以 OP_MSG 为例

    • 定义 OP_MSG 操作码的结构体并解码

      #[derive(Clone, Debug, Default, Serialize)]
      pub struct MongoOpMsg {
      	flag: u32,
      	sections: Sections,
      	checksum: Option<u32>,
      }
      
      impl MongoOpMsg {
      	fn decode(&mut self, payload: &[u8]) -> Result<bool> {
      				// 略过偏移逻辑
      		let _ = sections.decode(&payload);
      		self.sections = sections;
      		Ok(true)
      	}
      }
      
      
    • OP_MSG 操作码中业务需要关注的字段 Sections 做进一步解码

      #[derive(Clone, Debug, Default, Serialize)]
      struct Sections {
      	kind: u8,
      	kind_name: String,
      	// kind: 0 mean doc
      	doc: Document,
      	// kind: 1 mean body
      	size: Option<i32>,
      	c_string: Option<String>,
      }
      
      impl Sections {
      	pub fn decode(&mut self, payload: &[u8]) -> Result<bool> {
      		match self.kind {
      			0 => {// Body}
      			1 => {// Doc}
      			2 => {// Internal}
      			_ => {// Unknown}
      		}
      		Ok(true)
      	}
      }
      
    • 处理 L7ProtocolParserInterface::parse_payload,返回 L7ProtocolInfo

      #[derive(Clone, Debug, Default, Serialize)]
      pub struct MongoDBLog {
      	info: MongoDBInfo,
      	#[serde(skip)]
      	perf_stats: Option<L7PerfStats>,
      }
      impl L7ProtocolParserInterface for MongoDBLog {
      	fn parse_payload(&mut self, payload: &[u8], param: &ParseParam) -> Result<L7ParseResult> {
      		let mut info = MongoDBInfo::default();
      		self.parse(payload, param.l4_protocol, param.direction, &mut info)?;  // 解码得到L7ProtocolInfo
      	}
      }
      impl MongoDBLog {
      	fn parse(&mut self,payload:&[u8],proto:IpProtocol,dir:PacketDirection,info:&mut MongoDBInfo,)-> Result<bool> { // 解码指令获取请求和响应等信息}
      		// command decode
      		match info.op_code {
      			_OP_MSG if payload.len() > _MSG_DOC_SECTION_OFFSET => {
      				// OP_MSG
      				let mut msg_body = MongoOpMsg::default();	
      				// TODO: Message Flags
      				msg_body.decode(&payload[_MSG_DOC_SECTION_OFFSET..])?;
      			}
      		}
      	}
      }
      
    • MongoDBInfo 实现 L7ProtocolInfoInterface

      impl L7ProtocolInfoInterface for MongoDBInfo {
      	fn session_id(&self) -> Option<u32> {
      		// 这里返回流标识id,例如 http2 返回 streamid,dns 返回 transaction id,如果没有就返回 None
      	}
      	fn merge_log(&mut self, other: L7ProtocolInfo) -> Result<()> {
      // 这里的self必定是请求,other必定是响应
      		if let L7ProtocolInfo::MongoDBInfo(other) = other {
      			self.merge(other);
      		}
      		Ok(())
      	}
      	fn app_proto_head(&self) -> Option<AppProtoHead> {
      // 这里返回一个 AppProtoHead 结构,返回 None 直接丢弃这段数据
      		Some(AppProtoHead {
      			proto: L7Protocol::MongoDB,
      		})
      	}
      	fn is_tls(&self) -> bool {
      		self.is_tls
      	}
      }
      
    • MongoDBInfo 实现 L7ProtocolSendLog

      impl From<MongoDBInfo> for L7ProtocolSendLog {
      	fn from(f: MongoDBInfo) -> Self {
      		let log = L7ProtocolSendLog {
      			// 这里需要把 info 转换成统一的发送结构 L7ProtocolSendLog
      		};
      		return log;
      	}
      }
      
      // 参考源码来自:deepflow/agent/src/flow_generator/protocol_logs/pb_adapter.rs
      pub struct L7ProtocolSendLog {
      	pub req_len: Option<u32>,
      	pub resp_len: Option<u32>,
      	pub row_effect: u32,
      	pub req: L7Request,
      	pub resp: L7Response,
      	pub version: Option<String>,
      	pub trace_info: Option<TraceInfo>,
      	pub ext_info: Option<ExtendedInfo>,
      }
      
    • 把实现 L7ProtocolParserInterface 的接口,添加到 deepflow/agent/src/common/l7_protocol_log.rs 中的 impl_protocol_parser! 宏。

      impl_protocol_parser! {
      	pub enum L7ProtocolParser {
      		// http have two version but one parser, can not place in macro param.
      		// custom must in frist so can not place in macro
      		DNS(DnsLog),
      		SofaRPC(SofaRpcLog),
      		MySQL(MysqlLog),
      		Kafka(KafkaLog),
      		Redis(RedisLog),
      +       MongoDB(MongoDBLog),
      		PostgreSQL(PostgresqlLog),
      		Dubbo(DubboLog),
      		FastCGI(FastCGILog),
      		MQTT(MqttLog),
      		// add protocol below
      	}
      }
      
    • 第二步的效果
      eBPF系列之:DeepFlow 扩展协议解析实践(MongoDB协议与Kafka协议),Opentelemetry,eBPF,mongodb,kafka,数据库

    • 通过 perf_states 统计记录 QPS耗时异常 情况

      impl L7ProtocolParserInterface for MongoDBLog {
      	fn parse_payload(&mut self, payload: &[u8], param: &ParseParam) -> Result<L7ParseResult> {
      		let mut info = MongoDBInfo::default();
      		self.parse(payload, param.l4_protocol, param.direction, &mut info)?;  // 解码得到L7ProtocolInfo
      		info.cal_rrt(param, None).map(|rrt| {
      			info.rrt = rrt;
      +           self.perf_stats.as_mut().map(|p| p.update_rrt(rrt)); // 耗时
      		});
      	}
      impl MongoDBLog {
      	fn parse(&mut self,payload:&[u8],proto:IpProtocol,dir:PacketDirection,info:&mut MongoDBInfo,) -> Result<bool> { // 解码指令获取请求和响应等信息
      		if header.is_request() {
      +           self.perf_stats.as_mut().map(|p: &mut L7PerfStats| p.inc_req()); // 请求记录
      		} else {
      +           self.perf_stats.as_mut().map(|p| p.inc_resp()); // 响应记录
      		}
      		match info.op_code {
      			_OP_REPLY if payload.len() > _HEADER_SIZE => {
      				let mut msg_body = MongoOpReply::default();
      				msg_body.decode(&payload[_HEADER_SIZE..])?;
      				if !msg_body.reply_ok {
      +                   self.perf_stats.as_mut().map(|p| p.inc_resp_err());// 异常记录
      				}
      			}
      		}
      	}
      }
      

      效果如图:
      eBPF系列之:DeepFlow 扩展协议解析实践(MongoDB协议与Kafka协议),Opentelemetry,eBPF,mongodb,kafka,数据库

    • 最后在 deepflow-server 补充服务端的协议识别
      以下两部分内容在代码文件 server/libs/datatype/flow.go

      type L7Protocol uint8
      const (
      	L7_PROTOCOL_UNKNOWN    L7Protocol = 0
      	L7_PROTOCOL_OTHER      L7Protocol = 1
      	L7_PROTOCOL_HTTP_1     L7Protocol = 20
      	L7_PROTOCOL_HTTP_2     L7Protocol = 21
      	L7_PROTOCOL_HTTP_1_TLS L7Protocol = 22
      	L7_PROTOCOL_HTTP_2_TLS L7Protocol = 23
      	L7_PROTOCOL_DUBBO      L7Protocol = 40
      	L7_PROTOCOL_GRPC       L7Protocol = 41
      	L7_PROTOCOL_SOFARPC    L7Protocol = 43
      	L7_PROTOCOL_FASTCGI    L7Protocol = 44
      	L7_PROTOCOL_MYSQL      L7Protocol = 60
      	L7_PROTOCOL_POSTGRE    L7Protocol = 61
      	L7_PROTOCOL_REDIS      L7Protocol = 80
      +   L7_PROTOCOL_MONGODB    L7Protocol = 81
      	L7_PROTOCOL_KAFKA      L7Protocol = 100
      	L7_PROTOCOL_MQTT       L7Protocol = 101
      	L7_PROTOCOL_DNS        L7Protocol = 120
      	L7_PROTOCOL_CUSTOM     L7Protocol = 127
      )
      
      func (p L7Protocol) String() string {
      	formatted := ""
      	switch p {
      	case L7_PROTOCOL_HTTP_1:
      		formatted = "HTTP"
      	case L7_PROTOCOL_DNS:
      		formatted = "DNS"
      	case L7_PROTOCOL_MYSQL:
      		formatted = "MySQL"
      	case L7_PROTOCOL_POSTGRE:
      		formatted = "PostgreSQL"
      	case L7_PROTOCOL_REDIS:
      		formatted = "Redis"
      +   case L7_PROTOCOL_MONGODB:
      +       formatted = "MongoDB"
      	case L7_PROTOCOL_DUBBO:
      		formatted = "Dubbo"
      	case L7_PROTOCOL_GRPC:
      		formatted = "gRPC"
      	case L7_PROTOCOL_CUSTOM:
      		formatted = "Custom"
      	case L7_PROTOCOL_OTHER:
      		formatted = "Others"
      	default:
      		formatted = "N/A"
      	}
      	return formatted
      }
      

      server/querier/db_descriptions/clickhouse/tag/enum/l7_protocol

      	# Value , DisplayName     , Description
      	0       , N/A             ,
      	1       , Others          ,
      	20      , HTTP            ,
      	21      , HTTP2           ,
      	22      , HTTP1_TLS       ,
      	23      , HTTP2_TLS       ,
      	40      , Dubbo           ,
      	41      , gRPC            ,
      	43      , SOFARPC         ,
      	44      , FastCGI         ,
      	60      , MySQL           ,
      	61      , PostgreSQL      ,
      	80      , Redis           ,
      +	81      , MongoDB         ,
      	100     , Kafka           ,
      	101     , MQTT            ,
      	120     , DNS             ,
      	127     , Custom          ,
      

到这里已经完成 DeepFlow Agent 的原生协议扩展了,参考《# 完整指南:如何编译、打包和部署二次开发的 DeepFlow 》编译程序发布即可。

如果想快速实现一个协议采集解析,或者不熟悉Rust语言呢?我们还有一个选择,就是利用Wasm插件快速扩展协议解码。

0x2: 利用 Wasm 插件扩展 DeepFlow 的协议采集

该案例是用 Wasm 扩展 Kafka 协议支持 Topic 的实践。
首先还是参考Kafka的官方文档对Kafka协议做一个简单的分析

Kafka协议分析

KafkaHeaderData概览
eBPF系列之:DeepFlow 扩展协议解析实践(MongoDB协议与Kafka协议),Opentelemetry,eBPF,mongodb,kafka,数据库

Kafka的Fetch API
eBPF系列之:DeepFlow 扩展协议解析实践(MongoDB协议与Kafka协议),Opentelemetry,eBPF,mongodb,kafka,数据库

Kafka的Produce API
eBPF系列之:DeepFlow 扩展协议解析实践(MongoDB协议与Kafka协议),Opentelemetry,eBPF,mongodb,kafka,数据库

Kafka 协议 DeepFlow Agent 原生解码:
截止到 v6.3.x 版本,DeepFlow Agent 对 Kafka的原生解码如下图所示,还不支持 Topic 字段的解码,
且API的解码还没有版本号。
接下来的插件开发主要解决 Topic字 段的解码放在 resource 展示,同时把 API 的版本号也解析出来。

eBPF系列之:DeepFlow 扩展协议解析实践(MongoDB协议与Kafka协议),Opentelemetry,eBPF,mongodb,kafka,数据库

DeepFlow Agent 的 Wasm 插件

参考官方插件文档《 wasm-plugin》,需要注意两点:

  • DeepFlow Agent 通过遍历所有支持协议判断一个流的应用层协议,顺序是:
    HTTP -> Wasm Hook -> DNS -> …

  • 需要使用 Go 版本不低于 1.21 并且 tinygo 版本需要不低于 0.29

Wasm Go SDK 的框架

  • 先对框架有一个大概的认识,如下代码所示,整个框架逻辑都在以下五个接口函数。

    package main  
    import "github.com/deepflowio/deepflow-wasm-go-sdk/sdk"
    // 定义结构,需要实现 sdk.Parser 接口
    type plugin struct {}
    
    func (p plugin) HookIn() []sdk.HookBitmap {return []sdk.HookBitmap{}}
    // HookIn() 包含 HOOK_POINT_HTTP_REQ 时,http 请求解析完成返回之前会调用。
    // HttpReqCtx 包含了 BaseCtx 和已经解析出来的一些 http 头部
    func (p plugin) OnHttpReq(ctx *sdk.HttpReqCtx) sdk.Action {
    	return sdk.HttpReqActionAbortWithResult(nil, trace, attr)
    }
    func (p plugin) OnHttpResp(ctx *sdk.HttpRespCtx) sdk.Action {return sdk.ActionNext()}
    func (p plugin) OnCheckPayload(baseCtx *sdk.ParseCtx) (uint8, string) {return 0, "ownwasm"}
    func (p plugin) OnParsePayload(baseCtx *sdk.ParseCtx) sdk.ParseAction {
    	return sdk.ParseActionAbortWithL7Info([]*sdk.L7ProtocolInfo{})
    }
    // main 需要注册解析器
    func main() {
    	sdk.SetParser(plugin{})
    }
    
  • DeepFlow Agent 会遍历所有插件调用对应的 Export 函数,但是遍历的行为可以通过返回值控制

    返回值 说明
    sdk.ActionNext() 停止当前插件,直接执行下一个插件
    sdk.ActionAbort() 停止当前插件并且停止遍历
    sdk.ActionAbortWithErr(err) 停止当前插件,打印错误日志并且停止遍历
    sdk.HttpActionAbortWithResult() Agent 停止遍历并且提取相应返回结果
    sdk.ParseActionAbortWithL7Info() Agent 停止遍历并且提取相应返回结果

    ⚠️注意:
    因为该案例不涉及 HTTP 协议的处理,所以 OnHttpReq()OnHttpResp() 直接使用 sdk.ActionNext() 跳过即可。
    该案例也不会用到 sdk.HttpActionAbortWithResult()

  • HookBitmap 的三个 hook

    hook点 说明
    HOOK_POINT_HTTP_REQ 表示 http 请求解析完成返回之前
    HOOK_POINT_HTTP_RESP 表示 http 响应解析完成返回之前
    HOOK_POINT_PAYLOAD_PARSE 表示协议的判断和解析

    ⚠️注意:因为该案例不涉及 HTTP 协议的处理,所以 HOOK_POINT_HTTP_REQHOOK_POINT_HTTP_RESP 在该案例也不会用到。

插件代码指引

  • 梳理后的 Kafka 协议的 Wasm 插件代码框架

    package main  
    import "github.com/deepflowio/deepflow-wasm-go-sdk/sdk"
    // 定义结构,需要实现 sdk.Parser 接口
    type kafkaParser struct {}
    
    func (p kafkaParser) HookIn() []sdk.HookBitmap {
    	return []sdk.HookBitmap{sdk.HOOK_POINT_PAYLOAD_PARSE}
    }
    // 跳过HTTP协议处理
    func (p kafkaParser) OnHttpReq(ctx *sdk.HttpReqCtx) sdk.Action {return sdk.ActionNext()}
    func (p kafkaParser) OnHttpResp(ctx *sdk.HttpRespCtx) sdk.Action {return sdk.ActionNext()}
    // 协议判断检查
    func (p kafkaParser) OnCheckPayload(baseCtx *sdk.ParseCtx) (uint8, string) {return 100, "kafka"}
    // 协议解码
    func (p kafkaParser) OnParsePayload(baseCtx *sdk.ParseCtx) sdk.ParseAction {
    	return sdk.ParseActionAbortWithL7Info([]*sdk.L7ProtocolInfo{})
    }
    // main 需要注册解析器
    func main() {sdk.SetParser(plugin{})}
    
  • 协议识别
    ⚠️注意:以下代码注释

    func (p kafkaParser) OnCheckPayload(ctx *sdk.ParseCtx) (uint8, string) {
    	// 跳过UDP协议数据
    	if ctx.L4 != sdk.TCP {
    		return 0, ""
    	}
    	// 如果环境有标准规范的端口约定,插件中指定端口会减少协议数据的遍历,优化解码时cpu等资源消耗
    	if ctx.DstPort < 9092 || ctx.DstPort > 9093 {
    		return 0, ""
    	}
    	// 读取抓包数据
    	payload, err := ctx.GetPayload()
    	if err != nil {
    		sdk.Error("get payload fail: %v", err)
    		return 0, ""
    	}
    	// 引用"github.com/segmentio/kafka-go/protocol"来解码
    	bl, err := protocol.ReadAll(protocol.NewBytes(payload))
    	if err != nil {
    		sdk.Error("read payload fail: %v", err)
    		return 0, ""
    	}
    	b, _ := decodeHeader(bl)
    	if !b {
    		return 0, ""
    	}
    	return WASM_KAFKA_PROTOCOL, "kafka"
    }
    
  • 协议 API 解码

    • 官方代码框架 OnParsePayload() 的逻辑如下

      func (p plugin) OnParsePayload(baseCtx *sdk.ParseCtx) sdk.ParseAction {
      	// ctx.L7 就是 OnCheckPayload 返回的协议号,可以先根据4层协议或协议号过滤。
      	if ctx.L4 != sdk.TCP {return sdk.ActionNext()}
      	payload, err := ctx.GetPayload()
      	if err != nil {return sdk.ActionAbortWithErr(err)}
      	// the parse logic here
      	// ...
      	/* 关于 L7ProtocolInfo 结构:
      			type L7ProtocolInfo struct {
      				ReqLen    *int       // 请求长度 例如 http 的 content-length
      				RespLen   *int       // 响应长度 例如 http 的 content-length
      				RequestID *uint32    // 子流的id标识,例如 http2 的 stream id,dns 的 transaction id
      				Req       *Request
      				Resp      *Response
      				Trace     *Trace     // 跟踪信息
      				Kv        []KeyVal   // 对应 attribute
      			}
      			type Request struct {
      				ReqType  string  // 对应请求类型
      				Domain   string  // 对应请求域名
      				Resource string  // 对应请求资源
      				Endpoint string  // 对应 endpoint
      			}
      			type Response struct {
      				Status    RespStatus // 对应响应状态
      				Code      *int32     // 对应响应码
      				Result    string     // 对应响应结果
      				Exception string     // 对应响应异常
      			}*/
      	return sdk.ParseActionAbortWithL7Info([]*sdk.L7ProtocolInfo{})
      }
      
    • Topic 字段解码的代码逻辑

      func (p kafkaParser) OnParsePayload(ctx *sdk.ParseCtx) sdk.Action {
      	// the parse logic here
      	// ...
      
      	// 解码 header base size :
      	// req_len(int32) + api_key(int16) + api_ver(int16) + c_id(int32) + client_len(int16)
      	// = 14
      	var header_offset = 14 + header.clientLen
      	var topic_size int16 = 0
      	var topic_name = ""
      	switch protocol.ApiKey(header.apikey) {
      	case protocol.Produce:
      		topic_size, topic_name = decodeProduce(header.apiversion, payload[header_offset:])
      	case protocol.Fetch:
      		topic_size, topic_name = decodeFetch(header.apiversion, payload[header_offset:])
      	}
      	if topic_size == 0 {
      		return sdk.ActionNext()
      	}
      	req = &sdk.Request{
      		ReqType:  protocol.ApiKey(header.apikey).String() + "_v" + strconv.Itoa(int(header.apiversion)),
      		Resource: topic_name,
      	}
      	return sdk.ParseActionAbortWithL7Info([]*sdk.L7ProtocolInfo{
      		{
      			RequestID: &id,
      			ReqLen:    &length,
      			Req:       req,
      		},
      	})
      }
      

加载插件和效果展示

执行如下命令编译插件,通过CTL方式加载插件

tinygo build -o build/topic.wasm  -target wasi  -panic=trap -scheduler=none -no-debug ./wasm/kafka/topic.go
deepflow-ctl plugin create --type wasm --image build/topic.wasm --name topic

准备好 DeepFlow Agent 的配置文件增加如下配置。注意,DeepFlow Agent 可以加载多个 Wasm 插件。

  ############
  ## plugin ##
  ############
  ## wasm plugin need to load in agent
  wasm-plugins:
    - mongo
    - topic

执行命令更新配置

deepflow-plugin git:(main) ✗ deepflow-ctl agent-group-config update -f g-d2d06af17e.yaml

当 DeepFlow Agent 日志出现如下图黄字体内容,即加载成功。
eBPF系列之:DeepFlow 扩展协议解析实践(MongoDB协议与Kafka协议),Opentelemetry,eBPF,mongodb,kafka,数据库

在 Grafana 上,可以看到原生的 Kafka 协议被覆盖,出现了几个变化:

  • Protocol 字段从 Kafka 变成 Custom
  • Request type 字段的 API 多了版本号
  • Request resource 字段出现了 Topic 信息
    eBPF系列之:DeepFlow 扩展协议解析实践(MongoDB协议与Kafka协议),Opentelemetry,eBPF,mongodb,kafka,数据库

0x3: 结语

最后对比一下两个协议扩展的方式,要注意⚠️的是:

  • 两者都存在一个共性问题,就是每增加一个协议,识别协议解码的效率相对降低
  • 可以通过配置的方式减少需解码的协议数量

原生Rust扩展

  • 优点:

    • 运行时的资源占用比插件低
    • 支持的功能比插件的丰富,且定制性更灵活
  • 缺点:文章来源地址https://www.toymoban.com/news/detail-811832.html

    • 在语言方面的开发难度比插件的大
    • 相对插件开发而言,新增协议需要改动的地方较多,还涉及到 Server 的一小部分代码

Wasm插件扩展

  • 优点:

    • 用 Golang 开发相对 Rust 语言难度较低
    • 可在运行时通过 CLI 方式加载
    • 扩展性强
  • 缺点:

    • Go 的标准库和第三方库有一定的限制,且调试难度大,导致插件异常较难排除
    • 由于Wasm本身限制等问题,导致功能相对 Rust 原生开发较弱
    • 资源增加,特别是内存方面。

0x5: 附录

  • DeepFlow 协议开发文档
  • DeepFlow的Wasm 插件系统
  • 使用 DeepFlow Wasm 插件实现业务可观测性
  • MongoDB协议文档
  • Kafka协议文档

到了这里,关于eBPF系列之:DeepFlow 扩展协议解析实践(MongoDB协议与Kafka协议)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • eBPF内核技术在滴滴云原生的落地实践

    将滴滴技术设为“ 星标⭐️ ” 第一时间收到文章更新 导读 eBPF是Linux内核革命性技术,能够安全高效地扩展内核能力,应用广泛,尤其是在云原生可观测性领域的应用已经成为行业热点。在滴滴云原生环境中,eBPF技术进行了业务实践和内源共建,HuaTuo eBPF 平台快速落地并取

    2024年02月12日
    浏览(32)
  • Qt实践录:实现http服务并解析json协议

    本文主要记录在 Qt 中实现http服务的示例,包括解析json协议数据。 很早之前,对自己写的一个工程测试,需对接一个C++写的web服务,但局域网中尚未有,于是部署之,web服务使用了cgi技术,于是找了fastcgi等库和程序,最终和nginx一道合力完成部署。当时觉得比较麻烦,C++应该

    2024年02月12日
    浏览(33)
  • eBPF 入门开发实践教程十一:在 eBPF 中使用 libbpf 开发用户态程序并跟踪 exec() 和 exit() 系统调用

    eBPF (Extended Berkeley Packet Filter) 是 Linux 内核上的一个强大的网络和性能分析工具。它允许开发者在内核运行时动态加载、更新和运行用户定义的代码。 在本教程中,我们将了解内核态和用户态的 eBPF 程序是如何协同工作的。我们还将学习如何使用原生的 libbpf 开发用户态程序,

    2024年02月07日
    浏览(36)
  • RTMP协议深度解析:从原理到实践,掌握实时流媒体传输技术

    在当今的互联网时代,流媒体传输技术在人们的日常生活中扮演着越来越重要的角色。从在线教育到实时娱乐,流媒体技术已经渗透到了生活的方方面面。在这篇博客中,我们将从C++语言的角度,探讨流媒体传输技术的重要性,为什么选择RTMP协议以及RTMP协议的发展与应用。

    2023年04月26日
    浏览(76)
  • Kafka 入门到起飞系列 - 生产者发送消息流程解析

    生产者通过 producerRecord 对象封装消息主题、消息的value(内容)、timestamp(时间戳)等 生产者通过 send() 方法发送消息,send()方法会经过如下几步 1. 首先将消息交给 拦截器(Interceptor) 处理, 拦截器对生产者而言,对所有消息都是生效的,拦截器也支持链式编程(责任器链)的

    2024年02月16日
    浏览(35)
  • eBPF 入门实践教程十五:使用 USDT 捕获用户态 Java GC 事件耗时

    eBPF (扩展的伯克利数据包过滤器) 是一项强大的网络和性能分析工具,被广泛应用在 Linux 内核上。eBPF 使得开发者能够动态地加载、更新和运行用户定义的代码,而无需重启内核或更改内核源代码。这个特性使得 eBPF 能够提供极高的灵活性和性能,使其在网络和系统性能分析

    2024年02月07日
    浏览(38)
  • 授权码 + PKCE 模式|OIDC & OAuth2.0 认证协议最佳实践系列【03】

    ​ 在上一篇文章中,我们介绍了 OIDC 授权码模式(点击下方链接查看), 本次我们将重点围绕 授权码 + PKCE 模式(Authorization Code With PKCE)进行介绍 ,从而让你的系统快速具备接入用户认证的标准体系。 OIDC OAuth2.0 认证协议最佳实践系列 02 - 授权码模式(Authorization Code)接

    2024年02月01日
    浏览(79)
  • Cilium系列-6-从地址伪装从IPtables切换为eBPF

    Cilium 系列文章 将 Kubernetes 的 CNI 从其他组件切换为 Cilium, 已经可以有效地提升网络的性能. 但是通过对 Cilium 不同模式的切换/功能的启用, 可以进一步提升 Cilium 的网络性能. 具体调优项包括不限于: 启用本地路由(Native Routing) 完全替换 KubeProxy IP 地址伪装(Masquerading)切换为基于

    2024年02月15日
    浏览(25)
  • 【译】MongoDB 性能最佳实践指南

    原文地址:Best Practices Guide for MongoDB Performance MongoDB 是面向开发高性能应用程序的现代开发人员的主要 NoSQL 文档数据库。MongoDB 采用类似 JSON 的文档,以水平扩展和负载平衡著称,为开发人员提供了定制化和可扩展性之间的绝佳平衡。 但是,与其他高性能工具一样,MongoDB 在

    2024年02月03日
    浏览(34)
  • CAN协议扩展帧ID

    例如  ID:0x18102701 扩展帧ID共29位 准换为二进制0001 1000 0001 0000 0010 0111 0000 0001 29位110代表优先级十进制6,PF十进制16(16进制0X10),目标地址16进制27,源地址十六进制01  

    2024年02月11日
    浏览(14)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包