从零开始实现一个RPC框架(五)

这篇具有很好参考价值的文章主要介绍了从零开始实现一个RPC框架(五)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

前言

这是系列最后一篇文章了,最后我们来为我们的rpc框架实现一个http gateway。这个功能实际上受到了rpcx的启发,基于这种方式实现一个简单的类似service mesh中的sidecar。

原理

http gateway可以接收来自客户端的http请求并将其转换为rpc请求然后交给服务端处理,再将服务端处理过后的结果通过http响应返回给客户端。
http gateway的大致原理就是将我们的RPC协议中header部分放到http header中,然后RPC协议中的body部分放到http body即可。

实现

首先我们需要定义http header中各个字段的名称:

const (
	HEADER_SEQ            = "rpc-header-seq"            //序号, 用来唯一标识请求或响应
	HEADER_MESSAGE_TYPE   = "rpc-header-message_type"   //消息类型,用来标识一个消息是请求还是响应
	HEADER_COMPRESS_TYPE  = "rpc-header-compress_type"  //压缩类型,用来标识一个消息的压缩方式
	HEADER_SERIALIZE_TYPE = "rpc-header-serialize_type" //序列化类型,用来标识消息体采用的编码方式
	HEADER_STATUS_CODE    = "rpc-header-status_code"    //状态类型,用来标识一个请求是正常还是异常
	HEADER_SERVICE_NAME   = "rpc-header-service_name"   //服务名
	HEADER_METHOD_NAME    = "rpc-header-method_name"    //方法名
	HEADER_ERROR          = "rpc-header-error"          //方法调用发生的异常
	HEADER_META_DATA      = "rpc-header-meta_data"      //其他元数据
)

然后我们需要启动一个http server,用来接收http请求。这里我们使用go自带的api,默认使用5080端口,如果发现端口已经被占用了,就递增端口。

func (s *SGServer) startGateway() {
	port := 5080
	ln, err := net.Listen("tcp", ":" + strconv.Itoa(port))
	for err != nil && strings.Contains(err.Error(), "address already in use") {
		port++
		ln, err = net.Listen("tcp", ":" + strconv.Itoa(port))
	}
	if err != nil {
		log.Printf("error listening gateway: %s", err.Error())
	}
	log.Printf("gateway listenning on " + strconv.Itoa(port))
	//避免阻塞,使用新的goroutine来执行http server
	go func() {
		err := http.Serve(ln, s)
		if err != nil {
			log.Printf("error serving http %s", err.Error())
		}
	}()
}

接下来我们需要实现ServeHTTP函数:

func (s *SGServer) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
         //如果url不对则直接返回
	if r.URL.Path != "/invoke" { 
		rw.WriteHeader(404)
		return
	}
	//如果method不对则直接返回
	if r.Method != "POST" {
		rw.WriteHeader(405)
		return
	}
	//构造新的请求
	request := protocol.NewMessage(s.Option.ProtocolType)
	//根据http header填充request的header
	request, err := parseHeader(request, r)
	if err != nil {
	    rw.WriteHeader(400)
	}
	//根据http body填充request的data
	request, err = parseBody(request, r)
	if err != nil {
	    rw.WriteHeader(400)
	}
	//构造context
	ctx := metadata.WithMeta(context.Background(), request.MetaData)
	response := request.Clone()
	response.MessageType = protocol.MessageTypeResponse
	//处理请求
	response = s.process(ctx, request, response)
	//返回相应
	s.writeHttpResponse(response, rw, r)
}

func parseBody(message *protocol.Message, request *http.Request) (*protocol.Message, error) {
	data, err := ioutil.ReadAll(request.Body)
	if err != nil {
		return nil, err
	}
	message.Data = data
	return message, nil
}

func parseHeader(message *protocol.Message, request *http.Request) (*protocol.Message, error) {
	headerSeq := request.Header.Get(HEADER_SEQ)
	seq, err := strconv.ParseUint(headerSeq, 10, 64)
	if err != nil {
		return nil, err
	}
	message.Seq = seq

	headerMsgType := request.Header.Get(HEADER_MESSAGE_TYPE)
	msgType, err := protocol.ParseMessageType(headerMsgType)
	if err != nil {
		return nil, err
	}
	message.MessageType = msgType

	headerCompressType := request.Header.Get(HEADER_COMPRESS_TYPE)
	compressType, err := protocol.ParseCompressType(headerCompressType)
	if err != nil {
		return nil, err
	}
	message.CompressType = compressType

	headerSerializeType := request.Header.Get(HEADER_SERIALIZE_TYPE)
	serializeType, err := codec.ParseSerializeType(headerSerializeType)
	if err != nil {
		return nil, err
	}
	message.SerializeType = serializeType

	headerStatusCode := request.Header.Get(HEADER_STATUS_CODE)
	statusCode, err := protocol.ParseStatusCode(headerStatusCode)
	if err != nil {
		return nil, err
	}
	message.StatusCode = statusCode

	serviceName := request.Header.Get(HEADER_SERVICE_NAME)
	message.ServiceName = serviceName

	methodName := request.Header.Get(HEADER_METHOD_NAME)
	message.MethodName = methodName

	errorMsg := request.Header.Get(HEADER_ERROR)
	message.Error = errorMsg

	headerMeta := request.Header.Get(HEADER_META_DATA)
	meta := make(map[string]interface{})
	err = json.Unmarshal([]byte(headerMeta), &meta)
	if err != nil {
		return nil, err
	}
	message.MetaData = meta

	return message, nil
}

func (s *SGServer) writeHttpResponse(message *protocol.Message, rw http.ResponseWriter, r *http.Request) {
	header := rw.Header()
	header.Set(HEADER_SEQ, string(message.Seq))
	header.Set(HEADER_MESSAGE_TYPE, message.MessageType.String())
	header.Set(HEADER_COMPRESS_TYPE, message.CompressType.String())
	header.Set(HEADER_SERIALIZE_TYPE, message.SerializeType.String())
	header.Set(HEADER_STATUS_CODE, message.StatusCode.String())
	header.Set(HEADER_SERVICE_NAME, message.ServiceName)
	header.Set(HEADER_METHOD_NAME, message.MethodName)
	header.Set(HEADER_ERROR, message.Error)
	metaDataJson, _ := json.Marshal(message.MetaData)
	header.Set(HEADER_META_DATA, string(metaDataJson))

	_, _ = rw.Write(message.Data)
}

最后我们只需要在wrapper中启动http server即可。

func (w *DefaultServerWrapper) WrapServe(s *SGServer, serveFunc ServeFunc) ServeFunc {
	return func(network string, addr string, meta map[string]interface{}) error {
		//省略前面的部分
		...
		
		//启动gateway
		s.startGateway()
		return serveFunc(network, addr, meta)
	}
}

客户端测试代码:

func MakeHttpCall() {
    //声明参数并序列化,放到http请求的body中
	arg := service.Args{A: rand.Intn(200), B: rand.Intn(100)}
	data, _ := msgpack.Marshal(arg)
	body := bytes.NewBuffer(data)
	req, err := http.NewRequest("POST", "http://localhost:5080/invoke", body)
	if err != nil {
		log.Println(err)
		return
	}
	req.Header.Set(server.HEADER_SEQ, "1")
	req.Header.Set(server.HEADER_MESSAGE_TYPE, protocol.MessageTypeRequest.String())
	req.Header.Set(server.HEADER_COMPRESS_TYPE,protocol.CompressTypeNone.String())
	req.Header.Set(server.HEADER_SERIALIZE_TYPE,codec.MessagePack.String())
	req.Header.Set(server.HEADER_STATUS_CODE,protocol.StatusOK.String())
	req.Header.Set(server.HEADER_SERVICE_NAME,"Arith")
	req.Header.Set(server.HEADER_METHOD_NAME,"Add")
	req.Header.Set(server.HEADER_ERROR,"")
	meta := map[string]interface{}{"key":"value"}
	metaJson, _ := json.Marshal(meta)
	req.Header.Set(server.HEADER_META_DATA,string(metaJson))
	response, err := http.DefaultClient.Do(req)
	if err != nil {
		log.Println(err)
		return
	}
	if response.StatusCode != 200 {
		log.Println(response)
	} else if response.Header.Get(server.HEADER_ERROR) != "" {
		log.Println(response.Header.Get(server.HEADER_ERROR))
	} else {
		data, err = ioutil.ReadAll(response.Body)
		result := service.Reply{}
		msgpack.Unmarshal(data, &result)
		fmt.Println(result.C)
	}
}

结语

这个系列到此就告一段落了,但是还有很多需要改进和丰富的地方甚至是错误,后续再以单独文章的形式更新。文章来源地址https://www.toymoban.com/news/detail-848951.html

到了这里,关于从零开始实现一个RPC框架(五)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 从零开始写一个RTSP服务器(二)RTSP协议的实现

    此系列只追求精简,旨在学习RTSP协议的实现过程,不追求复杂完美,所以这里要实现的RTSP服务器为了简单,实现上同一时间只能有一个客户端,下面开始介绍实现过程 在写一个RTSP服务器之前,我们必须知道一个RTSP服务器最简单的包含两部分,一部分是RTSP的交互,一部分是

    2024年04月17日
    浏览(52)
  • RPC分布式网络通信框架(一)—— protobuf的使用

    常见序列化和反序列化协议有XML、JSON、protobuf,相比于其他protobuf更有优势: 1、protobuf是二进制存储的,xml和json都是文本存储的。故protobuf占用带宽较低 2、protobuf不需要存储额外的信息。 json如何存储数据?键值对。例:Name:”zhang san”, pwd: “12345”。 protobuf存储数据的方式

    2024年02月16日
    浏览(52)
  • 【Flink网络通讯(一)】Flink RPC框架的整体设计

    我们从整体的角度看一下Flink RPC通信框架的设计与实现,了解其底层Akka通信框架的基础概念及二者之间的关系。   Akka是使用Scala语言编写的库,用于在JVM上简化编写具有可容错、高可伸缩性的Java或Scala的Actor模型。Akka基于Actor模型,提供了一个用于构建可扩展、弹性、快速响

    2024年02月21日
    浏览(39)
  • 从零开始实现一个C++高性能服务器框架----Hook模块

    此项目是根据sylar框架实现,是从零开始重写sylar,也是对sylar丰富与完善 项目地址:https://gitee.com/lzhiqiang1999/server-framework 项目介绍 :实现了一个基于协程的服务器框架,支持多线程、多协程协同调度;支持以异步处理的方式提高服务器性能;封装了网络相关的模块,包括

    2023年04月09日
    浏览(96)
  • 从零开始实现一个C++高性能服务器框架----Socket模块

    此项目是根据sylar框架实现,是从零开始重写sylar,也是对sylar丰富与完善 项目地址:https://gitee.com/lzhiqiang1999/server-framework 项目介绍 :实现了一个基于协程的服务器框架,支持多线程、多协程协同调度;支持以异步处理的方式提高服务器性能;封装了网络相关的模块,包括

    2023年04月08日
    浏览(55)
  • 从零开始实现一个C++高性能服务器框架----环境变量模块

    此项目是根据sylar框架实现,是从零开始重写sylar,也是对sylar丰富与完善 项目地址:https://gitee.com/lzhiqiang1999/server-framework 项目介绍 :实现了一个基于协程的服务器框架,支持多线程、多协程协同调度;支持以异步处理的方式提高服务器性能;封装了网络相关的模块,包括

    2024年02月02日
    浏览(52)
  • SocketD协议单链接双向RPC模式怎么实现

    SocketD是一个基于Socket的通信框架,支持单链接双向RPC模式。在实现单链接双向RPC模式时,需要按照一定的协议进行通信,以下是一个简单的实现示例: 定义通信协议:首先,需要定义客户端和服务端之间的通信协议,例如使用JSON格式来进行数据传输。 客户端和服务端通信:

    2024年02月14日
    浏览(29)
  • RPC分布式网络通信框架(三)—— 服务配置中心Zookeeper模块

    分布式系统存在的问题: 为了支持高并发,每个客户端都保存了一份服务提供者的 列表 。但是如果 列表 有更新,想要得到最新的URL列表(rpc服务的ip和端口号),必须要手动更新配置文件,很不方便。 如图所示,实例3挂掉了,但是 列表 并没有得到更新。 故需要动态的更

    2024年02月15日
    浏览(45)
  • python实现基于RPC协议的接口自动化测试

    RPC(Remote Procedure Call)远程过程调用协议是一个用于建立适当框架的协议。从本质上讲,它使一台机器上的程序能够调用另一台机器上的子程序,而不会意识到它是远程的。 RPC 是一种软件通信协议,一个程序可以用来向位于网络上另一台计算机的程序请求服务,而不必了解

    2024年02月04日
    浏览(69)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包