37 | Kafka & ZMQ:自动化交易流水线

这篇具有很好参考价值的文章主要介绍了37 | Kafka & ZMQ:自动化交易流水线。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

在进行这节的学习前,我们先来回顾一下,前面三节,我们学了些什么。

第 34 讲,我们介绍了如何通过 RESTful API 在交易所下单;第 35 讲,我们讲解了如何通过 Websocket ,来获取交易所的 orderbook 数据;第 36 讲,我们介绍了如何实现一个策略,以及如何对策略进行历史回测。

事实上,到这里,一个简单的、可以运作的量化交易系统已经成型了。你可以对策略进行反复修改,期待能得到不错的 PnL。但是,对于一个完善的量化交易系统来说,只有基本骨架还是不够的。

在大型量化交易公司,系统一般是分布式运行的,各个模块独立在不同的机器上,然后互相连接来实现。即使是个人的交易系统,在进行诸如高频套利等算法时,也需要将执行层布置在靠近交易所的机器节点上。

所以,从今天这节开始,我们继续回到 Python 的技术栈,从量化交易系统这个角度切入,讲解如何实现分布式系统之间的复杂协作。

中间件

我们先来介绍一下中间件这个概念。中间件,是将技术底层工具和应用层进行连接的组件。它要实现的效果则是,让我们这些需要利用服务的工程师,不必去关心底层的具体实现。我们只需要拿着中间件的接口来用就好了。

这个概念听起来并不难理解,我们再举个例子让你彻底明白。比如拿数据库来说,底层数据库有很多很多种,从关系型数据库 MySQL 到非关系型数据库 NoSQL,从分布式数据库 Spanner 到内存数据库 Redis,不同的数据库有不同的使用场景,也有着不同的优缺点,更有着不同的调用方式。那么中间件起什么作用呢?

中间件,等于在这些不同的数据库上加了一层逻辑,这一层逻辑专门用来和数据库打交道,而对外只需要暴露同一个接口即可。这样一来,上层的程序员调用中间件接口时,只需要让中间件指定好数据库即可,其他参数完全一致,极大地方便了上层的开发;同时,下层技术栈在更新换代的时候,也可以做到和上层完全分离,不影响程序员的使用。

它们之间的逻辑关系,可以参照下面画的这张图。习惯性把中间件的作用调侃为:没有什么事情是加一层解决不了的;如果有,那就加两层。

37 | Kafka & ZMQ:自动化交易流水线,python

当然,这只是其中一个例子,也只是中间件的一种形式。事实上,比如在阿里,中间件主要有分布式关系型数据库 DRDS、消息队列和分布式服务这么三种形式。而我们今天,主要会用到消息队列,因为它非常符合量化交易系统的应用场景,即事件驱动模型。

消息队列

那么,什么是消息队列呢?一如其名,消息,即互联网信息传递的个体;而队列,学过算法和数据结构的你,应该很清楚这个 FIFO(先进先出)的数据结构吧。

简而言之,消息队列就是一个临时存放消息的容器,有人向消息队列中推送消息;有人则监听消息队列,发现新消息就会取走。根据我们刚刚对中间件的解释,清晰可见,消息队列也是一种中间件。

目前,市面上使用较多的消息队列有 RabbitMQ、Kafka、RocketMQ、ZMQ 等。不过今天,只介绍最常用的 ZMQ 和 Kafka。

我们先来想想,消息队列作为中间件有什么特点呢?

首先是严格的时序性。刚刚说了,队列是一种先进先出的数据结构,你丢给它 1, 2, 3,然后另一个人从里面取数据,那么取出来的一定也是 1, 2, 3,严格保证了先进去的数据先出去,后进去的数据后出去。显然,这也是消息机制中必须要保证的一点,不然颠三倒四的结果一定不是我们想要的。

说到队列的特点,简单提一句,与“先进先出“相对的是栈这种数据结构,它是先进后出的,你丢给它 1, 2, 3,再从里面取出来的时候,拿到的就是3, 2, 1了,这一点一定要区分清楚。

其次,是分布式网络系统的老生常谈问题。如何保证消息不丢失?如何保证消息不重复?这一切,消息队列在设计的时候都已经考虑好了,你只需要拿来用就可以,不必过多深究。

不过,很重要的一点,消息队列是如何降低系统复杂度,起到中间件的解耦作用呢?我们来看下面这张图。

37 | Kafka & ZMQ:自动化交易流水线,python

消息队列的模式是发布和订阅,一个或多个消息发布者可以发布消息,一个或多个消息接受者可以订阅消息。 从图中你可以看到,消息发布者和消息接受者之间没有直接耦合,其中,

消息发布者将消息发送到分布式消息队列后,就结束了对消息的处理;

消息接受者从分布式消息队列获取该消息后,即可进行后续处理,并不需要探寻这个消息从何而来。

至于新增业务的问题,只要你对这类消息感兴趣,即可订阅该消息,对原有系统和业务没有任何影响,所以也就实现了业务的可扩展性设计。

讲了这么多概念层的东西,想必你迫不及待地想看具体代码了吧。接下来,我们来看一下 ZMQ 的实现。

ZMQ

先来看 ZMQ,这是一个非常轻量级的消息队列实现。

作者 Pieter Hintjens 是一位大牛,他本人的经历也很传奇,2010 年诊断出胆管癌,并成功做了手术切除。但 2016 年 4 月,却发现癌症大面积扩散到了肺部,已经无法治疗。他写的最后一篇通信模式是关于死亡协议的,之后在比利时选择接受安乐死。

ZMQ 是一个简单好用的传输层,它有三种使用模式:

Request - Reply 模式;

Publish - Subscribe 模式;

Parallel Pipeline 模式。

第一种模式很简单,client 发消息给 server,server 处理后返回给 client,完成一次交互。这个场景你一定很熟悉吧,没错,和 HTTP 模式非常像,所以这里我就不重点介绍了。至于第三种模式,与今天内容无关,这里也不做深入讲解。

我们需要详细来看的是第二种,即“PubSub”模式。下面是它的具体实现,代码很清晰,应该很容易理解:

# 订阅者 1
import zmq


def run():
    context = zmq.Context()
    socket = context.socket(zmq.SUB)
    socket.connect('tcp://127.0.0.1:6666')
    socket.setsockopt_string(zmq.SUBSCRIBE, '')

    print('client 1')
    while True:
        msg = socket.recv()
        print("msg: %s" % msg)


if __name__ == '__main__':
    run()

########## 输出 ##########

client 1
msg: b'server cnt 1'
msg: b'server cnt 2'
msg: b'server cnt 3'
msg: b'server cnt 4'
msg: b'server cnt 5'
# 订阅者 2
import zmq


def run():
    context = zmq.Context()
    socket = context.socket(zmq.SUB)
    socket.connect('tcp://127.0.0.1:6666')
    socket.setsockopt_string(zmq.SUBSCRIBE, '')

    print('client 2')
    while True:
        msg = socket.recv()
        print("msg: %s" % msg)


if __name__ == '__main__':
    run()

########## 输出 ##########

client 2
msg: b'server cnt 1'
msg: b'server cnt 2'
msg: b'server cnt 3'
msg: b'server cnt 4'
msg: b'server cnt 5'
# 发布者
import time
import zmq


def run():
    context = zmq.Context()
    socket = context.socket(zmq.PUB)
    socket.bind('tcp://*:6666')

    cnt = 1

    while True:
        time.sleep(1)
        socket.send_string('server cnt {}'.format(cnt))
        print('send {}'.format(cnt))
        cnt += 1


if __name__ == '__main__':
    run()

########## 输出 ##########

send 1
send 2
send 3
send 4
send 5

这里要注意的一点是,如果你想要运行代码,请先运行两个订阅者,然后再打开发布者。

接下来,来简单讲解一下。

对于订阅者,我们要做的是创建一个 zmq Context,连接 socket 到指定端口。其中,setsockopt_string() 函数用来过滤特定的消息,而下面这行代码:

socket.setsockopt_string(zmq.SUBSCRIBE, '') 

则表示不过滤任何消息。最后,我们调用 socket.recv() 来接受消息就行了,这条语句会阻塞在这里,直到有新消息来临。

对于发布者,我们同样要创建一个 zmq Context,绑定到指定端口,不过请注意,这里用的是 bind 而不是 connect。因为在任何情况下,同一个地址端口 bind 只能有一个,但却可以有很多个 connect 链接到这个地方。初始化完成后,再调用 socket.send_string ,即可将我们想要发送的内容发送给 ZMQ。

当然,这里还有几个需要注意的地方。首先,有了 send_string,我们其实已经可以通过 JSON 序列化,来传递几乎我们想要的所有数据结构,这里的数据流结构就已经很清楚了。

另外,把发布者的 time.sleep(1) 放在 while 循环的最后,严格来说应该是不影响结果的。这里你可以尝试做个实验,看看会发生什么。

你还可以思考下另一个问题,如果这里是多个发布者,那么 ZMQ 应该怎么做呢?

Kafka

接着我们再来看一下 Kafka。

通过代码实现可以发现,ZMQ 的优点主要在轻量、开源和方便易用上,但在工业级别的应用中,大部分人还是会转向 Kafka 这样的有充足支持的轮子上。

相比而言,Kafka 提供了点对点网络和发布订阅模型的支持,这也是用途最广泛的两种消息队列模型。而且和 ZMQ 一样,Kafka 也是完全开源的,因此你也能得到开源社区的充分支持。

Kafka 的代码实现,和 ZMQ 大同小异,这里就不专门讲解了。

基于消息队列的 Orderbook 数据流

最后回到我们的量化交易系统上。

量化交易系统中,获取 orderbook 一般有两种用途:策略端获取实时数据,用来做决策;备份在文件或者数据库中,方便让策略和回测系统将来使用。

如果我们直接单机监听交易所的消息,风险将会变得很大,这在分布式系统中叫做 Single Point Failure。一旦这台机器出了故障,或者网络连接突然中断,我们的交易系统将立刻暴露于风险中。

于是,一个很自然的想法就是,我们可以在不同地区放置不同的机器,使用不同的网络同时连接到交易所,然后将这些机器收集到的信息汇总、去重,最后生成我们需要的准确数据。相应的拓扑图如下:

37 | Kafka & ZMQ:自动化交易流水线,python

当然,这种做法也有很明显的缺点:因为要同时等待多个数据服务器的数据,再加上消息队列的潜在处理延迟和网络延迟,对策略服务器而言,可能要增加几十到数百毫秒的延迟。如果是一些高频或者滑点要求比较高的策略,这种做法需要谨慎考虑。

但是,对于低频策略、波段策略,这种延迟换来的整个系统的稳定性和架构的解耦性,还是非常值得的。不过,你仍然需要注意,这种情况下,消息队列服务器有可能成为瓶颈,也就是刚刚所说的 Single Point Failure,一旦此处断开,依然会将系统置于风险之中。

事实上,我们可以使用一些很成熟的系统,例如阿里的消息队列,AWS 的 Simple Queue Service 等等,使用这些非常成熟的消息队列系统,风险也将会最小化。

总结

这节我们分析了现代化软件工程领域中的中间件系统,以及其中的主要应用——消息队列。我们讲解了最基础的消息队列的模式,包括点对点模型、发布者订阅者模型,和一些其他消息队列自己支持的模型。

在真实的项目设计中,我们要根据自己的产品需求,来选择使用不同的模型;同时也要在编程实践中,加深对不同技能点的了解,对系统复杂性进行解耦,这才是设计出高质量系统的必经之路。文章来源地址https://www.toymoban.com/news/detail-794877.html

到了这里,关于37 | Kafka & ZMQ:自动化交易流水线的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 自动化机器学习流水线:基于Spring Boot与AI机器学习技术的融合探索

    🧑 作者简介:阿里巴巴嵌入式技术专家,深耕嵌入式+人工智能领域,具备多年的嵌入式硬件产品研发管理经验。 📒 博客介绍:分享嵌入式开发领域的相关知识、经验、思考和感悟,欢迎关注。提供嵌入式方向的学习指导、简历面试辅导、技术架构设计优化、开发外包等服

    2024年04月27日
    浏览(42)
  • 作为前端leader,如何搭建属于我们公司自己的流水线自动化部署系统(node+express)

    背景:自动化部署系统主要可以集成到公司内部的管理系统中去,比如公司有多个项目,移动端H5,大屏网站,门户网站等...每次发布或者迭代都需要前端同事打包然后在交给运维或者后端同事放到服务器上进行部署 ,如果有一个项目多个同事合作完成 还要走git合并流程,

    2024年02月19日
    浏览(52)
  • MLOPS:大数据/服务器下的大规模机器学习技术—流水线处理技术的简介(标准化/自动化/可复用化)、常用框架(Pipeline/TFX、Airflow/Beam/Kubeflow/MLflow、Fli

    MLOPS:大数据/服务器下的大规模机器学习技术—流水线处理技术的简介(标准化/自动化/可复用化)、常用框架(Pipeline/TFX、Airflow/Beam/Kubeflow/MLflow、Flink/Kafka)之详细攻略 目录 流水线处理技术的简介 1、流水线处理技术的概述(标准化/自动化/可复用化)

    2024年02月08日
    浏览(57)
  • 构建高效实时数据流水线:Flink、Kafka 和 CnosDB 的完美组合

    当今的数据技术生态系统中,实时数据处理已经成为许多企业不可或缺的一部分。为了满足这种需求,Apache Flink、Apache Kafka和CnosDB等开源工具的结合应运而生,使得实时数据流的收集、处理和存储变得更加高效和可靠。本篇文章将介绍如何使用 Flink、Kafka 和 CnosDB 来构建一个

    2024年02月10日
    浏览(43)
  • [Gitlab CI] 自动取消旧流水线

    当某一分支开启 Merge Request 后只要提交一次 commit 就会自动创建一个新的流水线,此时之前的 Pipeline 不会被取消,经过下面的设置后可以实现自动取消旧流水线的功能。 ❗️通过提交(commit)触发的同一分支流水线可以自动冗余取消,但是通过手动触发的同一分支流水线不会

    2024年03月12日
    浏览(76)
  • [小白]Java自动部署之-流水线[超详细]

    个人博客: www.wdcdbd.com   devops文档链接: https://pan.baidu.com/s/12kOXbduI6daJBXQ0FWJaig?pwd=1234      提取码:1234 在我们开发写代码的时候,可以在本地启动,这样似乎挺方便的,但是如果我们想要部署到服务器上就很费劲了,不但要maven构建和将.jar包发布上去,还要重启等一系列麻

    2024年01月23日
    浏览(65)
  • 【从零开始玩量化17】如何python+QMT完成自动化交易?(全网最详细入门教程)

    此部分为扫盲内容,有一定了解者可以跳过。 它是一款量化交易客户端软件,由一家叫做迅投公司出品,可以直接登录你的券商账号进行股票交易,但与同花顺/通信达不同的是, 它暴露了基于python的交易API,可以执行程序化交易 。 顺便查了一下迅投这个公司的背景,21年冲

    2024年02月08日
    浏览(58)
  • CI/CD---使用新版云效流水线自动部署Java项目

    两大基本前提: 1、有一个自己的云服务器 2、项目代码已经提交到代码仓库,如gitee,github等 为什么需要流水线 1、除了第一次需要新建流水线,配置脚本外,后续所有的部署只需要提交代码后,点击运行流水线就行 。 2、流水线还可以回滚,此功能太过友好了。 3、当然,

    2024年02月06日
    浏览(65)
  • Jenkins流水线整合k8s实现代码自动集成和部署

    1、安装好k8s集群 这里先要搭建好一个K8s集群,笔者这边就采用使用了一个一主一丛的k8s集群,k8s集群的版本使用1.19.5版本,服务器的配置:2核4G,操作系统: CentOS Linux release 7.9.2009 (Core) 主机名         ip k8smaster 192.168.19.8 k8sworker         192.168.19.9 具体的安装步骤可以

    2024年02月05日
    浏览(53)
  • Kafka单机到集群- 自动化- SSL以及全部配置文件 作者精选

    本文实现kafka集群的配置和启动过程。并将脚本自动化,使得通用性、独立性更强。 用途 参考资料 kafka官方文档:包含安装部署、配置项、api使用等详细信息 附件: Kafka并不难学!:入门、进阶、商业实战 (邓杰) :包含kafka入门和linux快捷操作等系统化认识的内容。 名词 文

    2024年04月13日
    浏览(48)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包