如第一章所述,我们将数据采集分为日志采集和数据库数据同步两部分。数据同步技术更通用的含义是不同系统间的数据流转,有多种不同的应用场景。主数据库与备份数据库之间的数据备份,以及主系统与子系统之间的数据更新,属于同类型不同集群数据库之间的数据同步。另外,还有不同地域、不同数据库类型之间的数据传输交换,比如分布式业务系统与数据仓库系统之间的数据同步。对于大数据系统来说,包含数据从业务系统同步进入数据仓库和数据从数据仓库同步进入数据服务或数据应用两个方面。本章侧重讲解数据从业务系统同步进入数据仓库这个环节,但其适用性并不仅限于此。
3.1 数据同步基础
源业务系统的数据类型多种多样,有来源于关系型数据库的结构化数据,如 MySQL Orac le DB2, SQL Server :也有来源于非关系型数据库的非结构化数据,如 Ocean Base HBase Mongo DB 等,这类数据通常存储在数据库表中:还有来源于文件系统的结构化或非结构化数据,如阿里云对象存储 oss 、文件存储 NAS 等,这类数据通常以文件形式进行存储。数据同步需要针对不同的数据类型及业务场景选择不同的同步方式。总的来说,同步方式可以分为三种 直连同步 、数据文件同步和数据库日志解析同步。
3.1.1 直连同步
直连同步是指通过定义好的规范接口 API 和基于动态链接库的方式直接连接业务库,如 BC/JD BC 等规定了统一规范的标准接口,不同的数据库基于这套标准接口提供规范的驱动,支持完全相同的函数调用和 SQL 实现(见图 .1 )。
这种方式配置简单,实现容易,比较适合操作型业务系统的数据同步。但是业务库直连的方式对源系统的性能影响较大,当执行大批量数据同步时会降低甚至拖垮业务系统的性能。如果业务库采取主备策略,则可以从备库抽取数据,避免对业务系统产生性能影响。但是当数据量
较大时,采取此种抽取方式性能较差,不太适合从业务系统到数据仓库系统的同步。
3.1.2 数据文件同步
数据文件同步通过约定好的文件编码、大小、格式等,直接从源系统生成数据的文本文件,由专门的文件服务器,如 FTP 服务器传输到目标系统后,加载到目标数据库系统中。当数据源包含多个异构的数据库系统(如 MyS QL Oracle QL Server DB2 等)时,用这种方式比较简单、实用。另外,互联网的日志类数据,通常是以文本文件形式存在的,也适合使用数据文件同步方式(见图 3.2)
由于通过文件服务器上传、下载可能会造成丢包或错误,为了确保数据文件同步的完整性,通常除了上传数据文件本身以外,还会上传一个校验文件,该校验文件记录了数据文件的数据量以及文件大小等校验信息,以供下游目标系统验证数据同步的准确性。
另外,在从源系统生成数据文件的过程中,可以增加压缩和加密功能,传输到目标系统以后,再对数据进行解压缩和解密 这样可以大大提高文件的传输效率和安全性。
3.1.3 数据库日志解析同步
目前,大多数主流数据库都已经实现了使用日志文件进行系统恢复,因为日 文件信息足够丰富,而且数据格式也很稳定,完全可以通过解析日志文件获取发生变更的数据,从而满足增量数据同步的需求。
以Oracle 为例,可以通过源系统的进程,读取归档日志文件用以收集变化的数据信息,并判断日志中的变更是否属于被收集对象,将其解析到目标数据文件中。这种读操作是在操作系统层面完成的,不需要通过数据库,因此不会给源系统带来性能影响。
然后可通过网络协议,实现源系统和目标系统之间的数据文件传输。相关进程可以确保数据文件的正确接收和网络数据包的正确顺序,并提供网络传输冗余,以确保数据文件的完整性。
数据文件被传输到目标系统后,可通过数据加载模块完成数据的导人,从而实现数据从源系统到目标系统的同步(见图 3.3 )。
数据库日志解析同步方式实现了实时与准实时同步的能力,延迟可以控制在毫秒级别,并且对业务系统的性能影响也比较小,目前广泛应用于从业务系统到数据仓库系统的增量数据同步应用之中。
由于数据库日志抽取 般是获取所有的数据记录的变更(增、删、改),落地到目标表时我们需要根据主键去重按照日志时间倒排序获取最后状态的变化情况。对于删除数据这种变更情况,针对不同的业务场景可以采用一些不同的落地手法。
我们以具体的实例进行说明。如表 3.1 所示为源业务系统中某表变更日志流水表。其含义是:存在 条变更日志,其中主键为 的记录有3条变更日志,主键为 的记录有 条变更日志。
针对删除数据这种变更,主要有三种方式,下面以实例 行说明。假设根据主键去重,按照流水倒序获取记录最后状态生成的表为 delta表。
第一种方式,不过滤删除流水。不管是否是删除操作, 都获取同一主键最后变更的那条流水。采用此种方式生成的delta表如表3.2所示。
第二种方式,过滤最后一条删除流水,如果同一主键最后变更的那条流水是删除操作,就获取倒数第二条流水。采用此种方式生成 delta表如表3.3所示。
第三种方式,过滤删除流水和之前的流水。如果在同一主键变更的过程中有删除操作,则根据操作时间将该删除操作对应的流水和之前的
流水都过滤掉。采用 此种方式生成的 delta 表如表 3.4 所示。
对于采用哪种方式处理删除数据,要看前端是如何删除无效数据的。前端业务系统删除数据的方式一般有两种 :正常业务数据删除和手工批量删除。手工批量删除通常针对类似的场景,业务系统只做逻辑删除,不做物理删除, OBA 定期将部分历史数据直接删除或者备份到备份库。
一般情况下,可以采用不过滤的方式来处理,下游通过是否删除记录的标识来判断记录是否有效。如果明确业务数据不存在业务上的删除,但是存在批量手工删除或备份数据删除,例如淘宝商品、会员等,则可以采用只过滤最后一条删除流水的方式,通过状态字段来标识删除
记录是否有效。
通过数据库日志解析进行同步的方式性能好、效率高,对业务系统的影响较小。但是它也存在如下一些问题:
·
- 数据延迟。例如,业务系统做批量补录可能会使数据更新量超出系统处理峰值,导致数据延迟。
- 投人较大。采用数据库日 抽取的方式投入较大,需要在源数据库与目标数据库之间部署 个系统实时抽取数据。
- 数据漂移和遗漏。数据漂移, 般是对增量表而言的,通常是指该表的同一个业务日期数据中包含前一天或后一天凌晨 附近的数据或者丢失当天的变更数据。这个问题我们将在“数据漂移的处理”一节中详细论述。
3.2 阿里数据仓库的同步方式
数据仓库的特性之一是集成,将不同的数据来源、不同形式的数据整合在一起,所以从不同业务系统将各类数据源同步到数据仓库是一切的开始。那么阿里数据仓库的数据同步有什么特别之处呢?
阿里数据仓库的数据同步的特点之一是数据来源的多样性。在传统的数据仓库系统中,一般数据来源于各种类型的关系型数据库系统,比如MySQL SQL Server Oracle DB2 等,这类数据的共同特点便是高度结构化,易于被计算机系统处理。而在大数据时代 ,除了结构化数据,还有大量非结构化数据,比如 Web 服务器产生的日志、各类图片、视频等。特别是日志数据,记录了用户对网站的访问情况,这类数据通常直接以文本文件形式记录在文件系统中,对于数据的分析、统计、挖掘等各类数据应用有极大的价值。
阿里数据仓库的数据同步的特点之 则体现在数据量上。传统的数据仓库系统每天同步的数据量一般在几百 GB 甚至更少,而一些大型互联网企业的大数据系统每天同步的数据量则达到 PB 级别。目前间里巴巴的大数据处理系统 MaxCompute 的数据存储达到 EB 级别,每天需要同步的数据量达到 PB 级别,这种量级上的差距是巨大的。
数据源的类型是多样的,需要同步的数据是海量的,那该如何准确、高效地完成数据同步呢?这里就需要针对不同的数据源类型和数据应用的时效性要求而采取不同的策略。
3.2.1 批量数据同步
对于离线类型的数据仓库应用,需要将不同的数据源批量同步到数据仓库,以及将经过数据仓库处理的结果数据定时同步到业务系统。
当前市场上的数据库系统种类很多 ,有行存储的和列存储的,有开源的和非开源的,每一种数据库的数据类型都略有不同,而数据仓库系统则是集成各类数据源的地方,所以数据类型是统一的。要实现各类数据库系统与数据仓库系统之间的批量双向数据同步,就需要先将数据转换为中间状态,统一数据格式。由于这类数据都是结构化的,且均支持标准的 SQL 语言查询,所以所有的数据类型都可以转换为字符串类型。因此,我们可以通过将各类源数据库系统的数据类型统 转换为字符串类型的方式,实现数据格式的统一。
阿里巴巴的 DataX 就是这样 个能满足多方向高自由度的异构数据交换服务产品。对于不同的数据源, DataX 通过插件的形式提供支持,将数据从数据源读出并转换为中间状态,同时维护好数据的传输、缓存等工作。数据在 DataX 中以中间状态存在, 并在目标数据系统中将中间状态的数据转换为对应的数据格式后写人。目前 DataX 每天都需要处理2PB 左右的批量数据同步任务,通过分布式模式,同步完所有的数据所需要的时间一般在 小时以内,有力保障了大数据同步的准确性及高效性(见图 3.4 )。
DataX 采用 Framework+Plugin 的开放式框架实现, Framework 处理缓冲、流程控制、并发、上下文加载等高速数据交换的大部分技术问题,并提供简单的接口与插件接人(见图 3.5 )。插件仅需实现对数据处理系统的访问,编写方便,开发者可以在极短的时间内开发 个插件以快速支持新的数据库或文件系统。数据传输在单进程(单机模式)/多进程(分布式模式)下完成,传输过程全内存操作,不读写磁盘,也没有进程间通信,实现了在异构数据库或文件系统之间的高速数据交换。
• Job :数据同步作业
• Splitter :作业切分模块,将 个大任务分解成多个可以并发行的小任务
• Sub-Job :数据同步作业切分后的小任务,或称之为 Task
• Read er :数据读人模块,负 运行切分后的小任务,将数据从源系统 载到 DataX
• Channel: eader Writer 通过 hannel 交换数据。
• Writer :数据 出模块,负责将数据从 DataX 导人目标数据系统。
实时数据同步
对于日 志类数据来说,由于每天的日志是源源不断 生的,并且分布在不同的服务器中,有些大型互联网公司的服务器集群有成千上万台机器,所以所产生的日志需要尽快以数据流的方式不间断地同步到数据仓库。 另外,还有 些数据应用,需要对业务系统产生的数据进行实时处理,比如 猫“双 ”的数据大屏,对所产生的交易数据需要实时汇总,实现秒级的数据刷新。这类数据是通过解析 MySQL binlog日志(相当于 Orac le 的归档日志)来实时获得增量的数据更新,并通过消息订阅模式来实现数据的实时同步的。具体来说 ,就是建立 个日志数据交换中心,通过专门的模块从每台服务器源源不断地读取日志数据,或者解析业务数据库系统的 binlog 或归档日志,将增量数据以数据流的方式不断同步到日志交换中心,然后通知所有订阅了这些数据的数据仓库系统来获取。阿里巴巴的 TimeTunnel (TT )系统就是这样的实时数据传输平台,具有高性能、实时性、顺序性、高可靠性、高可用性、可扩展性等特点。
具体来说, TT 是一种基于生产者、消费者和 Topic 消息标识的消息中间件,将消息数据持久化到 HBase 的高可用、分布式数据交互系统。
•生产者 :消息数据的产生端,向 TimeTunnel 集群发送消息数据,就是图 3.6 中的生产 Client
•消费者:消息数据的接收端,从 TimeTunnel 集群中获取数据进行业务处理。
• Topic :消息类型的标识,如淘 acookie 志的 Topic为taobao_acookie 生产 Client 和消费 Client 需要知道对应的Topic 字。
• Broker 模块 负责处理客户端收发消息数据的请求,然后往HBase 取发数据。
Time Tunnel 支持主动、被动等多种数据订阅机制,订阅端自动负载均衡,消费者自己把握消费策略。对于读写比例很高的 Topic ,能够做到读写分离,使消费不影响发送。同时支持订阅历史数据,可以随意设置订阅位置,方便用户回补数据。另外,针对订阅有强大的属性过滤功能,用户只需关心自己需要的数据即可。
参考资料:大数据技术之DataX
3.3 数据同步遇到的问题及解决方案
3.3.1 分库分表的处理
随着业务的不断增长,业务系统处理的数据量也在飞速增加,需要系统具备灵活的扩展能力和高并发大数据量的处理能力,目前一些主流数据库系统都提供了分布式分库分表方案来解决这个问题(见图 3. )。但是对于数据同步来说,这种分库分表的设计无疑 大了同步处理的复杂度。试想 下,如果有一个中间表,具备将分布在不同数据库中的不同表集成为 个表的能力,就能让下游应用像访问单库单表一样方便。
阿里巴巴的 TDDL ( Taobao Distributed Data ayer )就是这样一个分布式数据库的访问引擎,通过建立中间状态的逻辑表来整合统一分库分表的访问(见图 3.8 )。
TDDL是在持久层框架之下、 JDBC驱动之上的中间件,它与JDBC规范保持一致,有效解决了分库分表的规则引擎问题,实现了SQL解析、规则计算、表名替换、选择执行单元并合并结果集的功能,同时解决了数据库表的读写分离、高性能主备切换的问题,实现了数据库配置信息的统 管理。
JDBC ( Java DataBaseConnectivity java数据库连接)是一种用于执行SQL语句的Java API,可以为多种关系型数据库提供统一访问,它是由一组用Java语言编写的类和接口组成的。
参考资料:
-
分库分表介绍
-
TDDL介绍
3.3.2 高效同步及批量同步
数据同步的方法通常是先创建目标表 ,再通过同步工具的填写数据库连接、表 段等各种配置信息后测试完成数据同步。这也是 DataX任务的配置过程,同步中心对 DataX 进行进 步封装,通过源系统元数据降低了数据库连接、表和字段等信息的配置复杂度,但在实际生产过程中我们仍然会遇到 些问题。
- 随着业务的发展和变化,会新增大批量的数据同步,使用传统方式每天去完成成百上千的数据同步工作,一方面,工作量会特别大另一方面,相似并且重复的操作会降低开发人员的工作热情。
- 数据仓库 的数据源种类特别丰富,遇到不同类型的数据源同步就要求开发人员去了解其特殊配置。
- 部分真正 的数据需求方,如 Java 开发和业务运营,由于存在相关数据同步的专业技能门槛,往往需要将需求提交给数据开发方来完成,额外增加了沟通和流程成本。
为了解决上述问题,网里巴巴数据仓库研发了 OneC lick 产品:
- 对不同数据源的数据同步配置透明化,可以通过库名和表名唯一定位,通过 IDB 接口获取元数据信息自动生成配置信息。
- 简化了数据同步的操作步骤,实现了与数据同步相关的建表、配置任务、发布、测试操作一键化处理,并且封装成 Web 接口进一步达到批量化的效果。
- 降低了数据同步的技能门槛,让数据需求方更加方便地获取和使
用数据。
通过 OneClick 产品,真正实现了数据的一键化和批量化同步,一键完成 DDL DML 生成、数据的冒烟测试以及在生产环境中测试等。因此,阿里巴巴通过极少的人力投入,实现了数据同步的集中化建设和管理:改变了之前各数据开发人员自行同步带来的效率低、重复同步和同步配置质量低下等问题,大大降低了数据同步成本。
3.3.3 增量与全量同步的合并
在批量数据同步中,有些表的数据量随着业务的发展越来越大,如果按周期全量同步的方式会影响处理效率。在这种情况下,可以选择每次只同步新变更的增量数据,然后与上一个同步周期获得的全量数据进行合井,从而获得最新版本的全量数据。
在传统的数据整合方案中,合并技术大多采用 merge 方式( update+insert ):当前流行的大数据平台基本都不支持 update 操作 ,现在我们比较推荐的方式是全外连接( full outer join) +数据全量覆盖重新加载( insert overwrite ),即如日调度,则将当天的增量数据和前一天的全量数据做全外连接,重新加载最新的全量数据。在大数据量规模下,全量更新的性能比 update 要高得多。此外,如果担心数据更新错误问题,可以采用分区方式,每天保持 个最新的全量版本,保留较短的时间周期(如 3~7 天)。
另外,当业务系统的表有物理删除数据的操作,而数据仓库需要保留所有历史数据时,也可以选择这种方式,在数据仓库中永久保留最新的全量数据快照 下面我们以淘宝订单表的具体实例来说明。
淘宝交易订单表,每天新增、变更的增量数据多达几亿条,历史累计至今的全量数据则有几百亿条,面对如此庞大的数据量,如果每天从业务系统全量同步显然是不可能的 可行的方式是同步当天的增量数据,并与数据仓库中的前一天全量数据合并,获得截至当天的最新全量数据(见图 3.9 )。
3.3.4 同步性能处理
数据同步任务是针对不同数据库系统之间的数据同步问题而创建的一系列周期调度的任务。在大型的数据调度工作台上,每天会运行大量的数据同步任务。针对数据同步任务 一般首先需要设定首轮同步的线程数,然后运行同步任务。
这样的数据同步模式存在以下几个问题:
- 有些数据同步任务的总线程数达不到用户设置的首轮同步的线程数时,如果同步控制器将这些同步线程分发到 PU 较繁忙的机器上,将导致这些同步任务的平均同步速度非常低,数据同步速度非常慢。
- 用户不清楚该如何设置首轮同步的线程数,基本都会设置成一个固定的值,导致同步任务因得不到合理的 PU 资源而影响同步效率。
- 不同的数据同步任务的重要程度是不一样的,但是同步控制器平等对待接收到的同步线程,导致重要的同步线程因得不 CPU资源而无法同步。
上述数据同步模式存在的几个问题导致的最终结果是数据同步任务运行不稳定。
针对数据同步任务中存在的问题,阿里巴巴数据团队实践出了一套基于负载均衡思想的新型数据同步方案。该方案的核心思想是通过目标数据库的元数据估算同步任务的总线程数,以及通过系统预先定义的期望同步速度估算首轮同步的线程数,同时通过数据同步任务的业务优先级决定同步线程的优先级,最终提升同步任务的执行效率和稳定性。
具体实现步骤如下:
- 用户创建数据同步任务,并提交该同步任务。根据系统提前获知及设定的数据,估算该同步任务需要同步的数据量、平均同步速度、首轮运行期望的线程数、需要同步的总线程数。
- 根据需要同步的总线程数将待同步的数据拆分成相 等数量的数据块,一个线程处理 个数据块,并将该任务对应的所有线程提交至同步控制器。
- 同步控制器判断需要同步的总线程数是否大于首轮运行期望的线程数,若大于,则跳转至 若不大于,则跳转至。
- 同步控制器采用多机多线程的数据同步模式,准备该任务第一轮线程的调度,优先发送等待时间最长、优先级最高且同 任务的线程。
- 同步控制器准备一定数据量(期望首轮线程数-总线程数)的虚拟线程,采用单机多线程的数据同步模式 ,准备该任务相应实体线程和虚 拟线程的调度,优先发送等待时间最长、优先级最高且单机 PU 剩余资源可以支持首轮所有线程数且同 任务的线程,如果没有满足条件的机器,则选择 CPU 剩余资源最多的机器进行首轮发送。
- 数据任务开始同步,并等待完成。
- 数据任务同步结束。
3.3.5 数据漂移处理
通常我们把从源系统同步进人数据仓库的第一层数据称为 ODS或者staging 层数据,阿里巴巴统称为 ODS 。数据漂移是 ODS 数据的一个顽疾,通常是指 ODS 表的同一个业务日期数据中包含前一天或后一天凌晨附近的数据或者丢失当天的变更数据。
由于 ODS 需要承接面向历史的细节数据查询需求,这就需要物理落地到数据仓库的 ODS 表按时间段来切分进行分区存储 ,通常的做法是按某些时间戳字段来切分,而实际上往往由于时间戳字段的准确性问题导致发生数据漂移。
通常,时间戳字段分为四类:
- 数据库表中用来标识数据记录更新时间的时间戳字段(假设这类字段叫 modified time )。
- 数据库日志中用来标识数据记录更新时间的时间戳字段·(假设这类宇段叫 log_time)。
- 数据库表中用来记录具体业务过程发生时间的时间戳字段 (假设这类字段叫 proc_time)。
- 标识数据记录被抽取到时间的时间戳字段(假设这类字段叫extract time)。
理论上,这几个时间应该是 致的,但是在实际生产中,这几个时间往往会出现差异,可能的原因有以下几点:
- 由于数据抽取是需要时间的, extract_ti me 往往会晚于前三个时间。
- 前台业务系统手工订正数据时未更新 modified_time。
- 由于网络或者系统压力问题, log_time 或者 modified_time 会晚于proc time。
通常的做法是根据其中的某 个字段来切分 ODS 表,这就导致产生数据漂移。下面我们来具体看下数据漂移的几种场景。
- 根据 extract_ti me 来获取数据。这种情况数据漂移的问题最明显。
- 根据 modified_time 限制。在实际生产中这种情况最常见,但是往往会发生不更新 modified time 而导致的数据遗漏,或者凌晨时间产生的 数据记录漂移到后一天。
- 根据 log_time 限制。由于网络或者系统压力问题, log time 会晚于proc_time ,从而导致凌晨时间产生的数据记录漂移到后一天。例如, 在淘宝“双 l l ”大促期间凌晨时间产生的数据量非常大,用户支付需要调用多个接口,从而导致 log time 晚于实际的支付时间。
- 根据 proc_time 限制。仅仅根据 proc_time 限制,我们所获取的ODS 表只是包含一个业务过程所产生的记 ,会遗漏很多其他过程的变化记录,这违背了 ODS 和业务系统保持 致的设计原则。
处理方法主要有以下两种:
( 1 )多获取后一天的数据
既然很难解决数据漂移的问题,那么就在 ODS 每个时间分区中向前、向后多冗余 些数据,保障数据只会多不会少,而具体的数据切分让下游根据自身不同的业务场景用不同的业务时间 proc time 来限制但是这种方式会有一些数据误差,例如 个订单是当天支付的,但是第二天凌晨申请退款关闭了该订单,那么这条记录的订单状态会被更新,下游在统计支付订单状态时会出现错误。文章来源:https://www.toymoban.com/news/detail-824575.html
(2)通过多个时间戳字段限制时间来获取相对准确的数据文章来源地址https://www.toymoban.com/news/detail-824575.html
- 首先根据 log_time 分别冗余前一天最后 15 分钟的数据和后一天凌晨开始 15 分钟的数据,并用 modified time 过滤非当天数据,确保数据不会因为系统问题而遗漏。
- 然后根据 log_time 获取后一天 15 分钟的数据 针对此数据,按照主键根据 log_time 做升序排列去重。因为我们需要获取的是最接近当天记录变化的数据(数据库日志将保留所有变化的数据,但是落地到 DS 表的是根据主键去重获取最后状态变化的数据)。
- 最后将前两步的结果数据做全外连接,通过限制业务时间proc_time 来获取我们所需要的数据。
到了这里,关于大数据之路——数据同步(第三章)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!