对接华泰极速行情丨DolphinDB INSIGHT 插件使用教程

这篇具有很好参考价值的文章主要介绍了对接华泰极速行情丨DolphinDB INSIGHT 插件使用教程。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

INSIGHT 是华泰证券依托大数据存储、实时分析等领域的技术积累,整合接入国内多家交易所高频行情数据,为投资者提供集行情接入、推送、回测、计算及分析等功能于一体的行情数据服务解决方案。基于 INSIGHT 官方提供的行情数据服务 C++ SDK(TCP 版本),DolphinDB 开发了能够获取市场行情数据的 INSIGHT 插件,帮助用户方便地通过 DolphinDB 脚本语言将实时行情数据接入 DolphinDB 进程中以便进行后续的计算或存储

本文主要介绍如何通过 INSIGHT 插件将实时行情数据写入分布式数据库,以及如何实现节点启动时自动订阅 INSIGHT 实时行情。本文全部代码需要运行在 2.00.11 或者更高版本的 DolphinDB server 以及插件上,目前仅支持 Linux 系统。

1. DolphinDB INSIGHT 行情插件介绍

INSIGHT 插件的接口介绍见 DolphinDB INSIGHT Plugin 使用说明,注意, INSIGHT 插件基于 INSIGHT 官方提供的行情数据服务 C++ SDK(即 TCP 版本 INSIGHT)实现。若您的 INSIGHT 订阅账号对应 C++ UDP SDK,本文提到的 2.00.11 版本插件将无法支持,可以联系 DolphinDB 技术支持以进一步了解。

INSIGHT 插件最核心的功能是实现了INSIGHT C++ SDK 中的行情回调函数,每次接收到行情时会写入对应的 DolphinDB 内存表,DolphinDB INSIGHT 插件目前已经支持的数据源包括:

Level-2 快照 逐笔成交 逐笔委托 逐笔成交和委托
股票
指数
期货
基金

注:逐笔成交和委托类型是为了对接 DolphinDB 快照合成流计算引擎的特殊处理,指将逐笔成交和逐笔委托处理为同构数据写入一张 DolphinDB 表中。(可联系 DolphinDB 技术支持以进一步了解)

2. 基本使用介绍

2.1 安装插件

节点启动后,连接节点并在 GUI(或 VS Code、Web UI)等DolphinDB客户端中执行 installPlugin 函数,则可以下载到与当前 server 版本适配的 INSIGHT 插件文件,插件文件包括插件描述文件及插件的二进制文件。

login("admin", "123456")
installPlugin("insight")

installPlugin 函数若正常返回,则代表下载成功,其返回值为插件描述文件(PluginInsight.txt)的安装路径,如:

/path_to_dolphindb_server/server/plugins/insight/PluginInsight.txt

installPlugin 函数实际上是完成从远程文件服务器拉取插件文件到 DolphinDB server 所在的服务器,因此需要一定的耗时,请耐心等待大约一分钟。

2.2 加载插件

在脚本中调用插件相关的接口前,需要先加载插件。在 GUI(或 VS Code、Web UI)等客户端中执行 loadPlugin 函数加载插件。以下示例中使用了相对路径,也可以试用 2.1 中返回的绝对路径 /path_to_dolphindb_server/server/plugins/insight/PluginInsight.txt

loadPlugin("./plugins/insight/PluginInsight.txt")

loadPlugin 函数正常返回则插件加载成功,以 VS Code 为例,首次加载成功后返回的部分信息如下,返回值是 INSIGHT 插件提供的所有函数,至此插件安装与加载已全部完成:

dolphindb 搭建加密货币行情,量化金融,行情数据,数据库,市场数据,量化金融,数据分析,量化交易

此外,需要注意,如果重复执行 loadPlugin 加载插件,会抛出模块已经被使用的错误提示,因为节点启动后,只允许加载一次 INSIGHT 插件,即可在任意会话中调用该插件提供的函数。错误提示如下:

The module [insight] is already in use.

可以通过 try-cach 语句捕获这个错误,避免因为插件已加载而中断后续脚本代码的执行:

try{ loadPlugin("./plugins/insight/PluginInsight.txt") }catch(ex){print ex}

此外,若节点重启则需要重新加载插件。

3. 通过 INSIGHT 行情插件将实时行情数据写入分布式数据库

本章以订阅沪深两市的全市场股票的逐笔数据、实时写入 DolphinDB 分布式数据库为例,对 INSIGHT 插件的使用进行说明,大致的流程如下:

dolphindb 搭建加密货币行情,量化金融,行情数据,数据库,市场数据,量化金融,数据分析,量化交易

  • 通过 INSIGHT 插件订阅逐笔数据写入 DolphinDB transactionTable 和 orderTable 两个持久化流数据表。持久化流数据表是具备发布订阅功能的内存表。
  • 订阅持久化流数据表写入 DolphinDB 分布式数据库。分布式数据库是将数据存储到磁盘上。

注意:请勿使用 INSIGHT 插件将行情数据直接写入分布式数据库。因为分布式数据库并不适用于此类高频的流式写入。建议在使用时,借助流数据表及其发布订阅功能以实现微批处理。这样既可以提高写入的吞吐,也有助于降低时延。

下面分步骤介绍关键的 DolphinDB 代码实现,完整脚本见附录。

3.1 清理环境(可选)

为保证本文的示例脚本能够反复执行,特提供了以下流环境清理脚本。

由于相同的流数据表名和订阅无法进行重复定义,因此先取消相关订阅并清除需要用到的流数据表。

try {
	tcpClient  = insight::getHandle()
	insight::unsubscribe(tcpClient) 
	insight::close(tcpClient) 
} catch(ex) { print(ex) }
try { unsubscribeTable(tableName="transactionTable", actionName="transactionTableInsert") } catch(ex) { print(ex) }
try { unsubscribeTable(tableName="orderTable", actionName="orderTableInsert") } catch(ex) { print(ex) }
try { dropStreamTable(tableName="transactionTable") } catch(ex) { print(ex) }
try { dropStreamTable(tableName="orderTable") } catch(ex) { print(ex) }
undef all

3.2 创建库表

执行创建分布式库表的语句前,需要登陆有创建权限的账号。执行如下代码登录默认的管理员账号:

login("admin", "123456")

获取表结构

调用 insight::getSchema 方法可以获取行情数据各个表的表结构。以下代码获取股票逐笔成交和逐笔委托两张表的表结构:

// 股票逐笔成交
transactionSchema = insight::getSchema(`StockTransaction); 
//股票逐笔委托
orderSchema = insight::getSchema(`StockOrder); 

创建持久化流数据表

得到行情数据的表结构后,使用该表结构创建持久化流数据表。

// 建立用于insight行情接入的持久化流表
cacheSize = 1000000
transactionColName = transactionSchema[`name]
transactionColType = transactionSchema[`type]
orderColName = orderSchema[`name]
orderColType = orderSchema[`type]

enableTableShareAndPersistence(table=streamTable(cacheSize:0, transactionColName, transactionColType), tableName=`transactionTable, cacheSize=cacheSize)
enableTableShareAndPersistence(table=streamTable(cacheSize:0, orderColName, orderColType), tableName=`orderTable, cacheSize=cacheSize)
  • 为保证 enableTableShareAndPersistence 函数能够正常执行,需要节点启动之前在配置文件中(单节点:dolohindb.cfg,集群:cluster.cfg)指定配置参数 persistenceDir ,配置参考功能配置。
  • 代码第二行的 cacheSize 变量控制了在建表时预分配内存的大小、以及流数据表可占用的最大内存,其单位是行,设置较大的 cacheSize 可以降低出现峰值时延的频率。具体大小可以根据实际的可使用的内存大小决定。具体优化原理可参考 流计算时延统计与性能优化

创建分布式数据库

为将行情数据存入分布式数据库,需要根据之前得到的行情数据表结构来创建分布式库表,分区规则参考自《基于 DolphinDB 存储金融数据的分区方案最佳实践》。

dbName = "dfs://Stock_TB"
transactionTbName = "transactionTable"
orderTbName = "orderTable"
dbDate = database(, partitionType=VALUE, partitionScheme=2023.01.01..2024.01.01)
dbID = database(, partitionType=HASH, partitionScheme=[SYMBOL, 25])
db = database(directory=dbName, partitionType=COMPO, partitionScheme=[dbDate, dbID],engine='TSDB',atomic='CHUNK')

tbSchema = table(1:0, transactionColName, transactionColType)
db.createPartitionedTable(table=tbSchema,tableName=transactionTbName,partitionColumns=`MDDate`HTSCSecurityID,sortColumns=`HTSCSecurityID`MDTime)	

tbSchema = table(1:0, orderColName, orderColType)
db.createPartitionedTable(table=tbSchema,tableName=orderTbName,partitionColumns=`MDDate`HTSCSecurityID,sortColumns=`HTSCSecurityID`MDTime)	

3.3 订阅流数据表把增量数据实时写入分布式数据库

订阅 3.2 节中的两个持久化流数据表,将增量数据实时写入分布式数据库。

Transaction = loadTable(database=dbName, tableName=transactionTbName)
Order = loadTable(database=dbName, tableName=orderTbName)

subscribeTable(tableName="transactionTable", actionName="transactionTableInsert", offset=-1, handler=tableInsert{Transaction}, msgAsTable=true, batchSize=20000, throttle=1, reconnect=true)
subscribeTable(tableName="orderTable", actionName="orderTableInsert", offset=-1, handler=tableInsert{Order}, msgAsTable=true, batchSize=20000, throttle=1, reconnect=true)
  • 通过调整 subscribeTable 函数中的 batchSize 和 throttle 参数可以控制写入分布式数据库的频率。
    • batchSize=20000 表示当未处理消息的数量达到 20000 时,handler 才会处理消息。
    • throttle=1 表示继上次 handler 处理消息之后,若未处理消息的数量还没有达到 20000 ,但是时间间隔 1s 后也会处理消息。
    • 因此,达到 batchSize 设置的条件或者达到 throttle 设置的条件,才会向分布式数据库写入一次。

3.4 订阅 INSIGHT 行情将增量数据实时写入流数据表

建立 INSIGHT 连接

用户配置 INSIGHT 账户信息,使用 insight::connect 函数进行连接,handles 用于设置接收行情数据的流数据表,下例用流数据表 transactionTable 接收逐笔成交(字典的 key 为 Transaction 表示了逐笔成交)。

// 配置账户信息
HOST = "111.111.111.111";
PORT = 111;
USER = "111";
PASSWORD = "111";

handles = dict(['Transaction', 'Order'], [transactionTable, orderTable]);
tcpClient= insight::connect(handles,HOST, PORT, USER, PASSWORD,,,true);

订阅行情数据

使用 insight::subscribe 函数进行 INSIGHT 行情订阅,行情数据将进入流数据表。

// 深交所股票数据
insight::subscribe(tcpClient, `MD_TRANSACTION`MD_ORDER, `XSHE, `StockType)  
// 上交所股票数据
insight::subscribe(tcpClient, `MD_TRANSACTION`MD_ORDER, `XSHG, `StockType)  

3.5 查询 INSIGHT 行情接收情况

运行过程中,可以查看 INSIGHT 行情的接收情况。首先调用 insight::getHandle 函数获取连接句柄,再使用 insight::getStatus 查询 INSIGHT 行情的接收情况。

tcpClient = insight::getHandle()
insight::getStatus(tcpClient)

返回结果如下,本例在 16:26 (见 startTime 字段) 提交了 INSIGHT 订阅,订阅了逐笔成交和逐笔委托(见 startTime 不为空的 topicType 字段),订阅的是沪深两市的股票数据(见 market 字段)。但是由于已经是盘后了,所以实际上并没有逐笔数据发送过来,因此 firstMsgTime 为空,firstMsgTime 表示收到第一条数据的系统时刻。

dolphindb 搭建加密货币行情,量化金融,行情数据,数据库,市场数据,量化金融,数据分析,量化交易

当在盘中启动时,可以看到 firstMsgTime 与 lastMsgTime 均不为空,lastMsgTime 表示收到最后一条数据的系统时刻。

dolphindb 搭建加密货币行情,量化金融,行情数据,数据库,市场数据,量化金融,数据分析,量化交易

4. 节点启动时自动订阅 INSIGHT 实时行情数据入库

DolphinDB 系统的启动流程如下图所示:

dolphindb 搭建加密货币行情,量化金融,行情数据,数据库,市场数据,量化金融,数据分析,量化交易

  • 系统初始化脚本(dolphindb.dos


系统初始化脚本是必需的,默认加载版本发布目录中的 dolphindb.dos。不建议做修改,因为版本升级的时候需要用新版本发布包中的系统初始化脚本覆盖。

  • 用户启动脚本(startup.dos


用户启动脚本是通过配置参数 startup 后才会执行,单节点模式在 dolphindb.cfg 中配置,集群模式在 cluster.cfg 中配置,可配置绝对路径或相对路径。若配置了相对路径或者没有指定目录,系统会依次搜索本地节点的 home 目录、工作目录和可执行文件所在目录。


配置举例如下:

startup=/DolphinDB/server/startup.dos

将上述业务代码添加到 /DolphinDB/server 目录的 startup.dos 文件中,并在对应的配置文件中配置参数 startup,即可完成节点启动时的自动订阅部署。

  • 定时任务脚本(postStart.dos


DolphinDB 中通过 scheduleJob 函数定义的定时任务会进行持久化。所以在重新启动节点时,系统先执行用户启动脚本,然后在初始化定时任务模块时完成持久化定时任务的加载。在完成上述步骤后,系统会执行定时任务脚本,此时用户可以在定时任务脚本中调用 scheduleJob 函数定义新的定时任务。


本教程中未使用该功能,所以不需要开启该配置项。1.30.15 和 2.00.3 版本开始支持配置 postStart.dos 实现节点启动自动执行定时任务脚本。

注意文章来源地址https://www.toymoban.com/news/detail-842927.html

  • INSIGHT 的账户信息需要根据实际环境进行修改。

附录

  • 详细启动脚本配置可以参考官网文档教程:启动脚本教程。
  • 关于节点启动时自动订阅处理业务的部署可以参考官网文档教程:节点启动时的流计算自动订阅教程。
  • startup.dos 启动脚本(账户信息需要根据用户实际情况进行修改)。
// 加载插件
login("admin", "123456")
try{ loadPlugin("./plugins/insight/PluginInsight.txt") }catch(ex){print ex}

// 清理环境
def cleanEnvironment(){
	try {
		tcpClient  = insight::getHandle()
		insight::unsubscribe(tcpClient) 
		insight::close(tcpClient) 
	} catch(ex) { print(ex) }
	try { unsubscribeTable(tableName="transactionTable", actionName="transactionTableInsert") } catch(ex) { print(ex) }
	try { unsubscribeTable(tableName="orderTable", actionName="orderTableInsert") } catch(ex) { print(ex) }
	try { dropStreamTable(tableName="transactionTable") } catch(ex) { print(ex) }
	try { dropStreamTable(tableName="orderTable") } catch(ex) { print(ex) }
	undef all
}

// 创建用于 INSIGHT 行情接入的持久化流数据表
def createStreamTable() {
	cacheSize = 1000000
	transactionSchema = insight::getSchema(`StockTransaction);
	orderSchema = insight::getSchema(`StockOrder);
	transactionColName = transactionSchema[`name]
	transactionColType = transactionSchema[`type]
	orderColName = orderSchema[`name]
	orderColType = orderSchema[`type]
	enableTableShareAndPersistence(table=streamTable(cacheSize:0, transactionColName, transactionColType), tableName=`transactionTable, cacheSize=cacheSize)
	enableTableShareAndPersistence(table=streamTable(cacheSize:0, orderColName, orderColType), tableName=`orderTable, cacheSize=cacheSize)
}

// 创建存储行情数据的分布式数据库表
def createDFSTable(dbName, transactionTbName, orderTbName) {
	transactionSchema = insight::getSchema(`StockTransaction);
	orderSchema = insight::getSchema(`StockOrder);
	transactionColName = transactionSchema[`name]
	transactionColType = transactionSchema[`type]
	orderColName = orderSchema[`name]
	orderColType = orderSchema[`type]
	if(!existsDatabase(dbName)) {
		dbDate = database(, partitionType=VALUE, partitionScheme=2023.01.01..2024.01.01)
		dbID = database(, partitionType=HASH, partitionScheme=[SYMBOL, 25])
		db = database(directory=dbName, partitionType=COMPO, partitionScheme=[dbDate, dbID],engine='TSDB',atomic='CHUNK')
	}
	if(!existsTable(dbName, transactionTbName)) {
		db = database(dbName)
		tbSchema = table(1:0, transactionColName, transactionColType)
		db.createPartitionedTable(table=tbSchema,tableName=transactionTbName,partitionColumns=`MDDate`HTSCSecurityID,sortColumns=`HTSCSecurityID`MDTime)	
	}
	if(!existsTable(dbName, orderTbName)) {
		db = database(dbName)
		tbSchema = table(1:0, orderColName, orderColType)
		db.createPartitionedTable(table=tbSchema,tableName=orderTbName,partitionColumns=`MDDate`HTSCSecurityID,sortColumns=`HTSCSecurityID`MDTime)	
	}
}

// 订阅流数据表把增量数据实时写入分布式数据库
def saveToDFSTable(dbName, transactionTbName, orderTbName) {
	Transaction = loadTable(database=dbName, tableName=transactionTbName)
	Order = loadTable(database=dbName, tableName=orderTbName)
	subscribeTable(tableName="transactionTable", actionName="transactionTableInsert", offset=-1, handler=tableInsert{Transaction}, msgAsTable=true, batchSize=20000, throttle=1, reconnect=true)
	subscribeTable(tableName="orderTable", actionName="orderTableInsert", offset=-1, handler=tableInsert{Transaction}, msgAsTable=true, batchSize=20000, throttle=1, reconnect=true)
}

// 订阅 INSIGHT 行情将增量数据实时写入流数据表
def connectToInsight(ip, port, user, password){
	// 建立 INSIGHT 连接
	handles = dict(['Transaction', 'Order'], [objByName(`transactionTable), objByName(`orderTable)])
	tcpClient= insight::connect(handles, ip, port, user, password,,,true)
	// 订阅深交所股票逐笔成交、逐笔委托
	insight::subscribe(tcpClient, `MD_TRANSACTION`MD_ORDER, `XSHE, `StockType)  
	// 订阅上交所股票逐笔成交、逐笔委托
	insight::subscribe(tcpClient, `MD_TRANSACTION`MD_ORDER, `XSHG, `StockType)  
}

// 配置账户信息。注意!账户信息必须根据用户实际情况进行修改
host = "111.111.111.111"
port = 111
user = "111"
password = "111"

// 配置数据库表名
dbName = "dfs://Stock_TB"
transactionTbName = "transactionTable"
orderTbName = "orderTable"

cleanEnvironment()
createStreamTable()
createDFSTable(dbName, transactionTbName, orderTbName) 
saveToDFSTable(dbName, transactionTbName, orderTbName)
connectToInsight(ip, port, user, password)
writeLog("Subsribe to insight market data successfully!")

到了这里,关于对接华泰极速行情丨DolphinDB INSIGHT 插件使用教程的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • uniapp:记录使用uniapp对接腾讯音视频插件(TUICallKit )

    官方地址:TUICallKit—API uniapp插件地址:腾讯云音视频通话插件TencentCloud-TUICallKit 1、到鷈插件地址,购买插件,并绑定到自己的项目中。 第3步的报名,一定要与最后打包包名一致。输入包名后:下一步-------确定。 2、开通服务 1.登录到 即时通信 IM 控制台,单击创建新应用,

    2024年02月10日
    浏览(47)
  • 使用redis-insight连接到服务器上的redis数据库

    一、安装redis: 安装 Redis 。你可以通过运行下面的命令来使用 yum 安装 Redis: 启动 Redis 服务 。安装完成后,你可以通过运行下面的命令来启动 Redis 服务: 设置 Redis 服务在启动时自动启动 。如果希望 Redis 在系统启动时自动运行,可以使用下面的命令: 检查 Redis 服务状态

    2024年04月09日
    浏览(44)
  • 更安全,更省心丨DolphinDB 数据库权限管理系统使用指南

    在数据库产品使用过程中,为保证数据不被窃取、不遭破坏,我们需要通过用户权限来限制用户对数据库、数据表、视图等功能的操作范围,以保证数据库安全性。为此,DolphinDB 提供了具备以下主要功能的权限管理系统: 提供用户和组角色,方便权限控制 提供19种权限控制

    2024年02月15日
    浏览(37)
  • vue2 对接 海康摄像头插件 (视频WEB插件 V1.5.2)

    前言 海康视频插件v.1.5.2版本运行环境需要安装插件VideoWebPlugin.exe,对浏览器也有兼容性要求,具体看官方文档 对应下载插件 去海康官网下载插件 里面有dome等其他需要用到的 地址: 安装插件 打开下载的文件里的bin文件 安装一下VideoWebPlugin vue脚手架中集成插件 把官方资源

    2024年02月03日
    浏览(47)
  • 高频策略如何使用level2行情?

    我们一定听说过高频交易使用 十档行情 ,市场是实时刷新的,比我们的普通市场快得多,我们都知道市场快1ms,可以给我们带来巨大的信息优势,然后转化为收入。今天我们将解密传说中的十档行情。 在介绍行情前,先介绍一些基本概念。 又称 逐笔成交 ,是整个市场上的

    2024年02月09日
    浏览(69)
  • 猎豹极速WiFi怎么用 猎豹极速WiFi安装使用教程图文详解

    猎豹极速WiFi是一款猎豹移动推出的无线网卡硬件,根据官方介绍,该产品采用的是Realtek 8192CU芯片无线网卡,传输速率最高可达300M。猎豹极速WiFi炫彩开启精彩时代!猎豹极速WiFi给你不一样的绝美体验!猎豹极速WiFi使用方法即刻呈现,至佳享受!猎豹极速WiFi怎么用呢?下面

    2024年02月06日
    浏览(41)
  • Midjourney基本使用(极速版)

            首先,我们来创建与Midjourney交互的环境         因为需要利用discord与Midjourney进行交互         因为Midjourney没有自己的网站没有自己的服务器         完全依赖discord作为Midjourney的前端,通过discord,用户才能方便快捷的与Midjourney进行交互。 来,让我们加入Midjou

    2024年02月22日
    浏览(42)
  • 适用各种行情,使用网格交易策略实现自动高效套利

    网格交易是量化交易的一种,在指定的区间内,将资金分成N份,先用一部分资金初始建仓,每下跌一定比例买入1份,每上涨一定比例卖出1份,反复低吸高抛,赚取差价。 网格交易策略对于不能实时盯盘的交易者来说,是一个非常理想的自动交易工具!它的优点非常明显,无

    2024年02月04日
    浏览(42)
  • DolphinDB学习(0):DolphinDB基本概述

    DolphinDB的学习难度不小,主要是写法比较多,官方示例是一次性给一大堆代码,在没有成体系的学习基础的前提下,总有种力不从心的感觉,所以博主汇总这一个系列的文章,尝试从最简单的基础常规操作开始,一边学习一边记录探索DolphinDB的过程,同时对一些函数做更形象

    2024年01月22日
    浏览(34)
  • Source Insight和Keil中文乱码

            由于Source Insight和Keil中的中文编码方式的不同,导致Keil中添加的中文注释在Source Insight中乱码;在Source Insight中添加的中文注释在Keil中乱码。所以需要统一两者的编码方式。         Source Insight默认编码方式为UTF-8,Keil中一般是GB2312。 (1)更改Source Insight编码方式

    2024年02月14日
    浏览(39)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包