Zinx框架学习 - 消息封装

这篇具有很好参考价值的文章主要介绍了Zinx框架学习 - 消息封装。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Zinx - V0.5 消息封装

  • 之前我们使用Request来保存服务器的数据,很显然使用[]byte来接收数据,没有长度也没有消息类型,接下来就要针对这个消息进行封装

创建消息类型

定义一个基本的message包,会包含消息ID、数据、数据长度三个成员,并提供基本的setter和getter方法

Zinx框架学习 - 消息封装

imssage.go接口

package ziface

//将请求的消息封装到一个Message中, 定义抽象的接口
type IMessage interface {
	//获取消息的ID
	GetMsgId() uint32
	//获取消息的长度
	GetMsgLen() uint32
	//获取消息的内容
	GetData() []byte

	//设置消息的ID
	SetMsgId(uint32)
	//设置消息的内容
	SetData([]byte)
	//设置消息的长度
	SetDataLen(uint32)
}

message.go实现类

package znet

type Message struct {
	Id      uint32 //消息的ID
	DataLen uint32 //消息的长度
	Data    []byte //消息的内容
}

//创建一个Message消息包
func NewMsgPackage(id uint32, data []byte) *Message {
	return &Message{
		Id:      id,
		DataLen: uint32(len(data)),
		Data:    data,
	}
}

//获取消息的ID
func (m *Message) GetMsgId() uint32 {
	return m.Id
}

//获取消息的长度
func (m *Message) GetMsgLen() uint32 {
	return m.DataLen
}

//获取消息的内容
func (m *Message) GetData() []byte {
	return m.Data
}

//设置消息的ID
func (m *Message) SetMsgId(id uint32) {
	m.Id = id
}

//设置消息的内容
func (m *Message) SetData(data []byte) {
	m.Data = data
}

//设置消息的长度
func (m *Message) SetDataLen(len uint32) {
	m.DataLen = len
}

消息的粘包

Zinx框架学习 - 消息封装

这里我们使用 TLV (Type-Len-Value) 封包格式解决TCP粘包问题,由于Zinx也是TCP流的形式传播数据,难免会出现消息1和消息2⼀同发送,那么zinx就需要有能⼒区分两个消息的边界,所以Zinx此时应该提供⼀个统⼀的拆包和封包的⽅法。

  • 封包:在发包之前打包成如上图这种格式的有head和body的两部分的包
  • 拆包:在收到数据的时候分两次进行读取,先读取固定长度的head部分,得到后续Data的长度,再根据DataLen读取之后的body。

封包拆包的实现

Zinx框架学习 - 消息封装

 需要注意的是,封包针对的是IMessage数据来封,返回的是二进制序列化后的结果,即把message结构体变成二进制序列化的数据,拆包针对的是二进制的数据流,得到的是IMessage数据,即把二进制序列化的数据变成message结构体

idatapack.go

package ziface

//封包、拆包 模块
//直接面向TCP连接中的数据流, 用于处理TCP粘包问题
type IDataPack interface {
	//获取包的头的长度方法
	GetHeadLen() uint32
	//封包方法
	Pack(msg IMessage) ([]byte, error)
	//拆包方法
	Unpack([]byte) (IMessage, error)
}

datapack.go

package znet

import (
	"bytes"
	"encoding/binary"
	"errors"
	"zinx/utils"
	"zinx/ziface"
)

//封包,拆包的具体模块
type DataPack struct{}

//拆包封包实例的一个初始化方法
func NewDataPack() *DataPack {
	return &DataPack{}
}

//获取包的头的长度方法
func (dp *DataPack) GetHeadLen() uint32 {
	//Datalen uint32(4字节) + ID uint32(4字节)
	return 8
}

//封包方法
//|datelen|msgID|data|
func (dp *DataPack) Pack(msg ziface.IMessage) ([]byte, error) {
	//创建一个存放bytes字节的缓冲
	dataBuff := bytes.NewBuffer([]byte{})

	//将dataLen写进databuff中
	if err := binary.Write(dataBuff, binary.LittleEndian, msg.GetMsgLen()); err != nil {
		return nil, err
	}
	//将MsgId 写进databuff中
	if err := binary.Write(dataBuff, binary.LittleEndian, msg.GetMsgId()); err != nil {
		return nil, err
	}

	//将data数据 写进databuff中
	if err := binary.Write(dataBuff, binary.LittleEndian, msg.GetData()); err != nil {
		return nil, err
	}

	return dataBuff.Bytes(), nil
}

//拆包方法 (将包的Head信息都出来) 之后再根据head信息里的data的长度,再进行一次读
func (dp *DataPack) Unpack(binaryData []byte) (ziface.IMessage, error) {
	//创建一个从输入二进制数据的ioReader
	dataBuff := bytes.NewReader(binaryData)

	//只解压head信息,得到datalen和MsgID
	msg := &Message{}

	//读dataLen
	if err := binary.Read(dataBuff, binary.LittleEndian, &msg.DataLen); err != nil {
		return nil, err
	}
	//读MsgID
	if err := binary.Read(dataBuff, binary.LittleEndian, &msg.Id); err != nil {
		return nil, err
	}

	//判断datalen是否已经超出了我们允许的最大包长度
	if utils.GlobalObject.MaxPackageSize > 0 && msg.DataLen > utils.GlobalObject.MaxPackageSize {
		return nil, errors.New("too Large msg data recv!")
	}

	return msg, nil
}

注意这里的unpack()只读取了datalen和MsgID,并没有读取data

单元测试

  • 单元测试思路

模拟服务器创建socketTCP,并使用go协程承载从客户端读取数据、进行拆包处理的业务

//只是负责测试datapack拆包 封包的单元测试
func TestDataPack(t *testing.T) {
	//模拟的服务器
	//1 创建socketTCP
	listenner, err := net.Listen("tcp", "127.0.0.1:7777")
	if err != nil {
		fmt.Println("server listen err: ", err)
		return
	}

	//创建一个go 承载 负责从客户端处理业务
	go func() {
		//2 从客户端读取数据,拆包处理
		for {
			conn, err := listenner.Accept()
			if err != nil {
				fmt.Println("server accept error", err)
			}

			go func(conn net.Conn) {
				//处理客户端的请求
				//------> 拆包的过程 <------
				//定义一个拆包的对象dp
				dp := NewDataPack()
				for {
					// 1第一次从conn读, 把包的head读出来
					headData := make([]byte, dp.GetHeadLen())
					_, err := io.ReadFull(conn, headData)
					if err != nil {
						fmt.Println("read head error")
						break
					}

					msgHead, err := dp.Unpack(headData)
					if err != nil {
						fmt.Println("server unpacke err ", err)
						return
					}
					if msgHead.GetMsgLen() > 0 {
						//msg是有数据的, 需要进行第二次读取
						//2 第二次从conn读, 根据head中的datalen 再读取data内容
						msg := msgHead.(*Message)
						msg.Data = make([]byte, msg.GetMsgLen())

						//根据datalen的长度再次从io流中读取
						_, err := io.ReadFull(conn, msg.Data)
						if err != nil {
							fmt.Println("server unpack data err: ", err)
							return
						}

						//完整的一个消息已经读取完毕
						fmt.Println("---> Recv MsgID: ", msg.Id, ", datalen = ", msg.DataLen, "data = ", string(msg.Data))
					}
				}

			}(conn)
		}
	}()

从客户端读取数据进行拆包处理的业务需要用for循环进行阻塞,等待客户端的连接,然后开启一个协程处理客户端请求并将conn作为形参传入,这个协程实际上是承载的拆包的业务,进行两次读取,第一次把包的head读出来,第二次根据head中的dataLen读取data内容。我们首先定义一个拆包对象,这个对象提供拆包方法,同时读包也是用一个for循环去读,第一次将包的head二进制流读出来后要将其封装到message中,即调用unpack()方法,返回一个只有dataLen和MsgID两个字段的结构体。此时就可以进行第二次读取了,我们想要将第二次读取的数据添加到结构体的data字段里,但需要注意的是此时的msgHead是imessage接口类型,没有data字段,所以我们需要进行类型断言将其转换为message结构体类型

模拟客户端启动go程序,将两个包组合发送模拟粘包

//模拟客户端
	conn, err := net.Dial("tcp", "127.0.0.1:7777")
	if err != nil {
		fmt.Println("client dial err: ", err)
		return
	}

	//创建一个封包对象 dp
	dp := NewDataPack()

	//模拟粘包过程,封装两个msg一同发送
	//封装第一个msg1包
	msg1 := &Message{
		Id:      1,
		DataLen: 4,
		Data:    []byte{'z', 'i', 'n', 'x'},
	}
	sendData1, err := dp.Pack(msg1)
	if err != nil {
		fmt.Println("client pack msg1 error", err)
		return
	}
	//封装第二个msg2包
	msg2 := &Message{
		Id:      2,
		DataLen: 7,
		Data:    []byte{'n', 'i', 'h', 'a', 'o', '!', '!'},
	}
	sendData2, err := dp.Pack(msg2)
	if err != nil {
		fmt.Println("client pack msg1 error", err)
		return
	}
	//将两个包粘在一起
	sendData1 = append(sendData1, sendData2...)

	//一次性发送给服务端
	conn.Write(sendData1)

	//客户端阻塞
	select {}

创建一个封包对象,封装两个message一同发送,发送完之后主进程不能结束,要等待客户端返回,否则发送完进程结束客户端就销毁了,我们使用select阻塞

消息封装集成到Zinx框架

Zinx框架学习 - 消息封装

  •  将Message添加到Request属性中
package ziface

//IReqeust接口:
//实际上是把客户端请求的链接信息, 和 请求的数据 包装到了一个Request中
type IRequest interface {
	//得到当前链接
	GetConnection() IConneciton

	//得到请求的消息数据
	GetData() []byte

	//得到请求的消息ID
	GetMsgID() uint32
}
package znet

import (
	"zinx/ziface"
)

type Request struct {
	//已经和客户端建立好的链接
	conn ziface.IConneciton
	//客户端请求的数据
	msg ziface.IMessage
}

//得到当前链接
func (r *Request) GetConnection() ziface.IConneciton {
	return r.conn
}

//得到请求的消息数据
func (r *Request) GetData() []byte {
	return r.msg.GetData()
}

func (r *Request) GetMsgID() uint32 {
	return r.msg.GetMsgId()
}
  • 修改链接读取数据的机制,将之前的单纯的读取byte改成拆包形式的读取按照TLV形式读取

首先还是创建一个拆包解包对象,读取客户端的Msg Head二级制流8个字节,然后拆包,得到msgID和msgDatalen并放在msg消息中,最后根据dataLen再次读取Data,放在msg.Data中

//链接的读业务方法
func (c *Connection) StartReader() {
	fmt.Println(" Reader Goroutine is running...")
	defer fmt.Println("connID = ", c.ConnID, " Reader is exit, remote addr is ", c.RemoteAddr().String())
	defer c.Stop()

	for {
		//读取客户端的数据到buf中
		//buf := make([]byte, utils.GlobalObject.MaxPackageSize)
		//_, err := c.Conn.Read(buf)
		//if err != nil {
		//	fmt.Println("recv buf err", err)
		//	continue
		//}

		//创建一个拆包解包对象
		dp := NewDataPack()

		//读取客户端的Msg Head 二级制流 8个字节
		headData := make([]byte, dp.GetHeadLen())
		if _, err := io.ReadFull(c.GetTCPConnection(), headData); err != nil {
			fmt.Println("read msg head error", err)
			break
		}

		//拆包,得到msgID 和 msgDatalen 放在msg消息中
		msg, err := dp.Unpack(headData)
		if err != nil {
			fmt.Println("unpack error", err)
			break
		}

		//根据dataLen  再次读取Data, 放在msg.Data中
		var data []byte
		if msg.GetMsgLen() > 0 {
			data = make([]byte, msg.GetMsgLen())
			if _, err := io.ReadFull(c.GetTCPConnection(), data); err != nil {
				fmt.Println("read msg data error ", err)
				break
			}
		}
		msg.SetData(data)

		//得到当前conn数据的Request请求数据
		req := Request{
			conn: c,
			msg:  msg,
		}

		//执行注册的路由方法
		go func(request ziface.IRequest) {
			c.Router.PreHandle(request)
			c.Router.Handle(request)
			c.Router.PostHandle(request)
		}(&req)
		//从路由中,找到注册绑定的Conn对应的router调用
	}
}
  • 给链接提供一个发包机制: 将发送的消息进行打包,再发送

修改StartReader方法

//链接的读业务方法
func (c *Connection) StartReader() {
	fmt.Println(" Reader Goroutine is running...")
	defer fmt.Println("connID = ", c.ConnID, " Reader is exit, remote addr is ", c.RemoteAddr().String())
	defer c.Stop()

	for {
		//读取客户端的数据到buf中
		//buf := make([]byte, utils.GlobalObject.MaxPackageSize)
		//_, err := c.Conn.Read(buf)
		//if err != nil {
		//	fmt.Println("recv buf err", err)
		//	continue
		//}

		//创建一个拆包解包对象
		dp := NewDataPack()

		//读取客户端的Msg Head 二级制流 8个字节
		headData := make([]byte, dp.GetHeadLen())
		if _, err := io.ReadFull(c.GetTCPConnection(), headData); err != nil {
			fmt.Println("read msg head error", err)
			break
		}

		//拆包,得到msgID 和 msgDatalen 放在msg消息中
		msg, err := dp.Unpack(headData)
		if err != nil {
			fmt.Println("unpack error", err)
			break
		}

		//根据dataLen  再次读取Data, 放在msg.Data中
		var data []byte
		if msg.GetMsgLen() > 0 {
			data = make([]byte, msg.GetMsgLen())
			if _, err := io.ReadFull(c.GetTCPConnection(), data); err != nil {
				fmt.Println("read msg data error ", err)
				break
			}
		}
		msg.SetData(data)

		//得到当前conn数据的Request请求数据
		req := Request{
			conn: c,
			msg:  msg,
		}

		//执行注册的路由方法
		go func(request ziface.IRequest) {
			c.Router.PreHandle(request)
			c.Router.Handle(request)
			c.Router.PostHandle(request)
		}(&req)
		//从路由中,找到注册绑定的Conn对应的router调用
	}
}

添加SendMsg方法

package ziface

import "net"

//定义链接模块的抽象层
type IConneciton interface {
	//启动链接 让当前的链接准备开始工作
	Start()

	//停止链接 结束当前链接的工作
	Stop()

	//获取当前链接的绑定socket conn
	GetTCPConnection() *net.TCPConn

	//获取当前链接模块的链接ID
	GetConnID() uint32

	//获取远程客户端的 TCP状态 IP port
	RemoteAddr() net.Addr

	//发送数据, 将数据发送给远程的客户端
	SendMsg(msgId uint32, data []byte) error
}

//定义一个处理链接业务的方法
type HandleFunc func(*net.TCPConn, []byte, int) error

实现SendMsg方法

//提供一个SendMsg方法 将我们要发送给客户端的数据,先进行封包,再发送
func (c *Connection) SendMsg(msgId uint32, data []byte) error {
	if c.isClosed == true {
		return errors.New("Connection closed when send msg")
	}

	//将data进行封包 MsgDataLen|MsgID|Data
	dp := NewDataPack()

	//MsgDataLen|MsgID|Data
	binaryMsg, err := dp.Pack(NewMsgPackage(msgId, data))
	if err != nil {
		fmt.Println("Pack error msg id = ", msgId)
		return errors.New("Pack error msg")
	}

	//将数据发送给客户端
	if _, err := c.Conn.Write(binaryMsg); err != nil {
		fmt.Println("Write msg id ", msgId, " error :", err)
		return errors.New("conn Write error")
	}
	return nil
}

Zinx框架开发

server.go

package main

import (
	"fmt"
	"zinx/ziface"
	"zinx/znet"
)

//基于Zinx框架来开发的 服务器端应用程序
//ping test 自定义路由
type PingRouter struct {
	znet.BaseRouter
}

//Test Handle
func (this *PingRouter) Handle(request ziface.IRequest) {
	fmt.Println("Call Router Handle...")
	//先读取客户端的数据,再回写ping..ping...ping
	fmt.Println("recv from client: msgID = ", request.GetMsgID(),
		", data = ", string(request.GetData()))

	err := request.GetConnection().SendMsg(1, []byte("ping...ping...ping"))
	if err != nil {
		fmt.Println(err)
	}
}

func main() {
	//1 创建一个server句柄,使用Zinx的api
	s := znet.NewServer("[zinx V0.5]")

	//2 给当前zinx框架添加一个自定义的router
	s.AddRouter(&PingRouter{})

	//3 启动server
	s.Serve()
}

client.go文章来源地址https://www.toymoban.com/news/detail-470231.html

package main

import (
	"fmt"
	"io"
	"net"
	"time"
	"zinx/znet"
)

//模拟客户端
func main() {
	fmt.Println("client start...")
	time.Sleep(1 * time.Second)
	//1 直接链接远程服务器,得到一个conn链接
	conn, err := net.Dial("tcp", "127.0.0.1:8999")
	if err != nil {
		fmt.Println("client start err, exit!")
		return
	}
	for {
		//发送封包的message消息  MsgID:0
		dp := znet.NewDataPack()
		binaryMsg, err := dp.Pack(znet.NewMsgPackage(0, []byte("ZinxV0.5 client Test Message")))
		if err != nil {
			fmt.Println("Pack error:", err)
			return
		}
		if _, err := conn.Write(binaryMsg); err != nil {
			fmt.Println("write error", err)
			return
		}
		//服务器就应该给我们回复一个message数据, MsgID:1 pingpingping
		// 1 先读取流中的head部分 得到ID 和 dataLen
		binaryHead := make([]byte, dp.GetHeadLen())
		if _, err := io.ReadFull(conn, binaryHead); err != nil {
			fmt.Println("read head error ", err)
			break
		}
		// 将二进制的head拆包到msg 结构体中
		msgHead, err := dp.Unpack(binaryHead)
		if err != nil {
			fmt.Println("client unpack msgHead error ", err)
			break
		}
		if msgHead.GetMsgLen() > 0 {
			// 2再根据DataLen进行第二次读取,将data读出来
			msg := msgHead.(*znet.Message)
			msg.Data = make([]byte, msg.GetMsgLen())
			if _, err := io.ReadFull(conn, msg.Data); err != nil {
				fmt.Println("read msg data error , ", err)
				return
			}
			fmt.Println("---> Recv Server Msg : ID = ", msg.Id, ", len = ", msg.DataLen, ", data = ", string(msg.Data))
		}
		//cpu阻塞
		time.Sleep(1 * time.Second)
	}
}

到了这里,关于Zinx框架学习 - 消息封装的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包