通过 DVT 和 dbt 测试监控Airbyte数据管道

这篇具有很好参考价值的文章主要介绍了通过 DVT 和 dbt 测试监控Airbyte数据管道。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

为数据复制或数据迁移构建 ELT 数据管道的一个重要部分是能够在出现错误时进行监视并获得通知。如果您不知道错误,您的数据将包含不一致之处,并且您的报告将不准确。由于使用的工具数量众多,大多数管道的复杂性使得设置监视和警报系统更具挑战性。

在本文中,我将分享为什么为 ELT 数据管道设置监视和警报系统很重要。我解释了要监视的关键指标,以及设置监视和警报系统时将遇到的常见挑战。我进一步强调了不同的监控和警报工具,并展示了如何使用Google的数据验证工具(DVT)和数据构建工具(dbt)实现典型的监视/警报系统。

为什么要监视数据管道?

好吧,问题应该是,“为什么不监视数据管道”?。这是因为让您的管道作为黑匣子运行对业务来说可能非常昂贵。让我分享一个个人故事来解释这一点。我公司用于数据复制的初始管道是使用 AWS 数据迁移服务 (DMS) 设计的,将数据从 RDS (PostgreSQL) 副本复制到 S3 存储桶。然后,我们让 Snowpipe(Snowflake 的 ELT 工具)从 S3 存储桶获取新数据,并将这些数据通过管道传输到 Snowflake(我们的数据仓库)。

这种架构有效,但它是一个完整的黑匣子,因为我们很少或根本看不到引擎盖下发生的事情。没有适当的警报或通知系统来通知我们管道故障。只有当我们在一天或更长时间后看到数据不一致或下游分析不准确时,我们才会知道管道故障。但这对业务有何影响?这对我们的影响之一是客户流失率的提高。由于我们的数据到达较晚且不一致,我们无法及时检测到客户何时遇到 KYC 验证问题。

以下是应监视管道的主要原因:

全面鸟瞰数据运行状况。

防止数据传输不一致。

获取持续的数据测试方法。

及早发现数据质量和数据完整性问题。

跟踪数据处理成本、元数据和整体系统性能。

提供反馈以优化管道性能。

应监视哪些指标?

虽然设置这些监视/警报系统以了解管道非常重要,但确定要衡量的关键指标以及要使用哪些工具并不总是一项简单的任务。这是因为要衡量的指标在很大程度上取决于数据管道的用例和其他几个因素。

例如,如果其中一个管道实时提供用于跟踪应用程序服务器停机时间的关键数据,那么您的首要任务是根据组织或团队定义的 SLA 监控数据到达的延迟。

以下是要监控的不同类别的指标,适用于您或您的组织可能拥有的任何用例:

数据质量监控

通过数据质量监控,建立一个监控系统,以持续验证管道不同阶段的数据质量。首先,在提取加载 (EL) 步骤中,在加载作业完成后,根据目标中的数据验证源中的数据质量。此处监视的关键指标包括源-目标记录计数匹配、源-目标列计数匹配、数据格式错误、数据卷错误、列名更改、引用完整性等。其次,在转换作业运行后的转换步骤中监视数据的质量。此步骤监控的关键指标包括:数据类型错误、空值等。

管道可靠性监测

在这里,监控侧重于管道的端到端可靠性。在管道的不同步骤中监视错误:

提取加载 (EL) 步骤:此步骤由 Airbyte 等 ELT 工具处理。此处会出现错误,例如身份验证问题导致的同步失败、规范化错误、同步期间加载新列 (SCD) 时出错、JSON 架构验证程序错误等。受到监控。

转换步骤:此步骤由 dbt 等转换工具处理。此处会出现转换作业运行失败、数据传输延迟(运行持续时间长于预期)、数据沿袭或数据丢失问题等错误。受到监控。

业务指标监控

这种类型的监视发生在管道的转换阶段之后。在这里,监视转换后的数据,以根据特定的业务需求识别异常。例如,货币价格贬值或升值、基于市场价值的交易损失等。当这些指标达到特定阈值时,将触发警报。

监视数据管道有哪些挑战?

ELT 数据管道是使用多种工具组合构建的,包括 dbt、Airflow、Airbyte、SQL、云服务、数据库、数据仓库和数据湖。这种工具的多样性有利于可扩展性,以及在数据堆栈的每一层使用最有效的工具。但是,这会导致管道中有许多移动部件。这可能会使监视或全面了解数据管道成为一场噩梦。

在设置管道监视和警报系统之前,我强烈建议先简化管道中要监视的进程数。具有多个可能故障点的复杂管道将需要在这些不同层设置监视/警报系统。这将使事情变得非常难以跟踪和管理。

为了简化我上面给出的示例中公司管道的复杂性,我们首先引入了 Airbyte——一个开源数据摄取工具来处理我们的数据复制。Airbyte 帮助我们减少了管道中可能的故障点数量。我们没有首先使用数据迁移服务 (DMS) 将数据复制到 S3 存储桶,而是使用 Airbyte 将数据直接复制到我们的仓库(Snowflake)。借助此架构,我们无需在数据流的三个不同级别进行监控:RDS-DMS 级别、DMS-S3 存储桶级别和 S3-Snowpipe 级别。现在,我们只在仓库级别监控我们的管道。

简化监视进程的数量后,让我们讨论数据管道监视工具以及如何为数据管道设置典型的监视/警报系统。

监控管道中的数据质量指标

市场上有很多工具可用于监视和触发数据管道中的警报。但是,这些工具的功能不同。虽然一些工具专注于监控云基础设施、日志和应用程序安全性,但其他工具则专注于监控数据质量、数据验证和数据沿袭。此外,一些工具是专有的基于云的解决方案,而另一些则是开源的。

一些专有的基于云的解决方案包括Monte Carlo,Databand,Datadog,Datafold,Accel Data。开源替代方案包括普罗米修斯、洛基、远大期望、数据验证工具 (DVT)、dbt 测试、Datafold 的数据差异等......需要注意的重要一点是,您可能需要组合其中两个或多个工具来实现您的目标。

在下一节中,我将介绍如何使用两个开源工具设置这些监视/警报系统:数据验证工具 (DVT) 和数据构建工具 (dbt)。

使用数据验证工具 (DVT) 进行数据验证监控

数据验证是在将数据用于业务运营之前检查数据的完整性、准确性和结构的做法。

数据验证是构建数据管道的关键步骤,因为它提供了在将数据用于下游分析之前检查数据有效性的层。

数据验证工具 (DVT) 是一种开源 Python CLI 工具,可将异构数据源表与多级验证函数进行比较。在数据加载过程完成后,您可以运行 DVT 进程来验证源表和目标表是否匹配且正确。DVT 支持列、行、自定义查询、架构、列数据类型验证以及许多数据仓库和数据库的连接。

Datafold 还提供了一个开源数据差异项目,用于有效地比较数据库和数据仓库之间的表。要了解有关使用 data-diff 的更多信息,请阅读有关验证从 Postgres 到 Snowflake 的数据复制管道的教程。

将 DVT 与 BigQuery 结合使用

通过 DVT 和 dbt 测试监控Airbyte数据管道

 

您可以在任何云平台中的虚拟机上设置和运行 DVT。您还可以选择在 Docker 容器中运行 DVT。按照此处的说明在本地计算机或云环境中安装和设置 DVT。本节中的代码演练是在 Google Cloud 上的云外壳会话上运行的。DVT 提供了一个命令行界面 (CLI),用于在安装后执行 dvt 命令。

若要根据目标表验证源表,请先创建源连接和目标连接。我们通过 CLI 运行以下代码来做到这一点。

# Create MYSQL connection as Source connectiondata-validation connections add--connection-nameMYSQL_CONN MySQL--hostHOST_IP--portPORT--user-nameUSER-NAME--passwordPASSWORD # Create BigQuery connection as target connectiondata-validation connections add--connection-name$BigQuery_CONNBigQuery--project-id$MY_GCP_PROJECT

上面的代码片段将创建一个 MySQL 连接作为源连接,创建一个 BigQuery 连接作为目标连接。

列验证

列验证对源和目标都运行计数 (*)。这将计算源表中的列数,并验证它是否与目标表上的计数匹配。要运行列验证,请通过 CLI 执行数据验证运行命令。以下是 MySQL 源表和 BigQuery 目标表之间的列验证的外观:

data-validation validate column\--source-conn$MYSQL_CONN--target-conn$BigQuery_CONN\--tables-list MYSQL_database.source_table=$YOUR_PROJECT_ID.dataset.target_table \--bq-result-handler$YOUR_PROJECT_ID.bigquery_dataset.validation_result

上面的代码片段将执行源表的列计数,并根据目标表的列计数进行验证。--bq-result-handler 标志将有助于将验证结果输出到中间 BigQuery 表。默认情况下,如果没有 --bq-result-handler 标志,验证结果将输出到控制台。

对于启用了表规范化的 Airbyte 同步,您需要指定要在列验证中验证的列的名称。这是为了排除 Airbyte 在同步期间添加的其他元数据列。下面的代码演示如何在验证中指定列:

data-validation validate column\--source-conn$MYSQL_CONN--target-conn$BigQuery_CONN\--tables-list MYSQL_database.source_table=$YOUR_PROJECT_ID.dataset.target_table \--count column1, column2, column3, column4, column5 \--bq-result-handler$YOUR_PROJECT_ID.bigquery_dataset.validation_result

行验证

行验证在源和目标上运行计数 (*)。这将计算源表中的行数,并验证它与目标表上的计数匹配。以下是 MySQL 源表和 BigQuery 目标表之间的行验证的外观:

data-validation validate row \--source-conn$MYSQL_CONN--target-conn$BigQuery_CONN\--tables-list MYSQL_database.source_table=$YOUR_PROJECT_ID.dataset.target_table--bq-result-handler$YOUR_PROJECT_ID.bigquery_dataset.validation_result \--hash'*'\--primary-keysid\--use-random-row --random-row-batch-size 50

--use-random-row 和 --random-row-batch-size 标志指定您只想随机验证行的子集。当您有大型表时,这会派上用场,因为行验证需要更多的内存和计算。

架构验证

架构验证将获取源表中每一列的列数据类型,并验证它是否与目标表的列数据类型匹配。源表和目标表中的类型不匹配会导致验证状态失败。

以下是 MySQL 源表和 BigQuery 目标表之间的架构验证的外观:

data-validation validate schema\--source-conn$MYSQL_CONN--target-conn$BigQuery_CONN\--tables-list MYSQL_database.source_table=$YOUR_PROJECT_ID.dataset.target_table \--bq-result-handler$YOUR_PROJECT_ID.bigquery_dataset.validation_result

行比较验证

这种类型的验证对源表和目标表中指定列的值执行逐行比较。这些值不匹配会导致验证状态失败。

以下是 MySQL 源表和 BigQuery 目标表之间的架构验证的外观:

data-validation validate row \--source-conn$MYSQL_CONN--target-conn$BigQuery_CONN\--tables-list MYSQL_database.source_table=$YOUR_PROJECT_ID.dataset.target_table--bq-result-handler$YOUR_PROJECT_ID.bigquery_dataset.validation_result \--hash'*'\--primary-keysid\--use-random-row --random-row-batch-size 50

下面是 BigQuery 表中验证结果的示例输出:通过 DVT 和 dbt 测试监控Airbyte数据管道

 

名为“difference”的列表示源表中的列/行计数与目标表中的列/行计数之间的差异。validation_status列显示验证的状态。

然后可以查询此表,并将错误通知/警报发送到电子邮件或 Slack 频道。

我将在上一节中介绍向 Slack 频道发送通知/警报

从 YAML 文件运行验证

运行验证的另一种方法是将验证配置保存到 YAML 文件。这样,您可以存储以前的验证并轻松修改验证配置。此方法还有助于自动执行验证过程,因为验证可以按计划运行。

若要生成用于验证的 YAML 配置文件,请指定 --config-file 标志。请参阅下面的代码:

data-validation validatecolumn\--source-conn $MYSQL_CONN --target-conn $BigQuery_CONN \--tables-list MYSQL_database.source_table=$YOUR_PROJECT_ID.dataset.target_table--bq-result-handler $YOUR_PROJECT_ID.bigquery_dataset.validation_result \--config-file validation_config.yaml

下面是从上述代码生成的 YAML 配置的外观。

result_handler:project_id:your-project-idtable_id:data_validation.validation_resultstype:BigQuerysource:MYSQL_CONNtarget:BigQuery_CONNvalidations:-aggregates:-field_alias:countsource_column:nulltarget_column:nulltype:countcalculated_fields:[]filter_status:nullfilters:[]format:tablelabels:[]random_row_batch_size:nullschema_name:transportation_datatable_name:citibike_stationstarget_schema_name:onesphere-analytics.master_datatarget_table_name:citibike_stationsthreshold:0.0type:Columnuse_random_rows:false

生成的 YAML 配置文件可以在执行生成验证命令的目录中找到。

现在,可以使用以下代码从 YAML 配置文件运行验证:

data-validation run-config -c validation_config.yaml

使用数据构建工具 (dbt) 进行数据质量监控

dbt 是一种数据转换工具,使数据和分析工程师能够通过简单地编写 SQL 语句来转换其仓库中的数据。DBT 处理将这些 SELECT 语句转换为仓库中的表和视图。要设置 dbt 项目,请按照此设置 dbt 项目指南进行操作。

DBT 提供了用于执行数据质量检查的测试功能,包括数据类型检查、空值检查、重复检查、参照完整性检查等。让我们看看如何使用 dbt 测试执行数据质量检查。dbt 测试定义为单一测试、SQL 文件中的一般测试或具有返回失败记录逻辑的 YAML 配置文件。

下面是使用 YAML 配置文件为源表(订单表)定义测试的示例。

version: 2source:  - name: orderscolumns:      -name: order_idtests:          - unique          - not_null      -name: statustests:          -accepted_values:values: ['placed','shipped','completed','returned']      -name: customer_idtests:          -relationships:to: ref('customers')field: id

上面示例中的测试配置首先检查orders_table order_id列中的重复值和非空值,然后在状态列中检查接受的值(“已放置”、“已发货”、“已完成”、“已退回”)。最后,它检查customer_id列中的引用完整性,以确保订单表上的每个customer_id在客户表上都有一个关联的 ID。

若要运行测试,请运行命令:dbt test --store-failures

--store-failure 标志将测试结果存储在中间表中。然后,可以查询此表以发送错误/失败通知/警报。未通过测试的记录保存在数据仓库中后缀为“dbt_test__audit”的架构中的结果表中。

需要注意的是,如上所述,使用 dbt 监控数据质量的方法也适用于业务指标监控。

使用 dbt 进行管道可靠性监控

DBT 提供用于跟踪管道错误或作业故障的监视和警报系统。通知在 dbt 云上配置,并在作业运行后立即触发。通知可以发送到电子邮件或 Slack 频道。这是这方面的指南。对于管道中的数据提取转换 (EL) 层,Airbyte 提供了一个可靠的监控和警报系统,用于端到端监控并将同步失败/成功通知发送到 Slack 通道。按照此分步指南为空字节同步设置 Slack 通知。

向 Slack 频道发送通知

要发送通知/警报以跟踪管道的问题或故障,您将构建一个简单的 Slack 机器人,该机器人可以在无服务器函数(例如 AWS Lambda 或 GCP CloudFunction)上运行。机器人是一个简单的 Python 脚本,可以计划为按时间间隔运行或基于数据加载事件运行。

机器人将从上述部分查询包含数据质量和数据验证测试结果的任何表,并根据某些定义的逻辑发送通知/警报。

下面的代码片段实现了从 Google Cloud Function 运行的通知/提醒系统,并在源表中的行/列数与上述数据验证示例中的目标表不匹配时将通知/警报推送到 Slack 渠道。以下是创建 Slack 网络钩子网址的指南。

importpandasaspdimportrequestsfromgoogle.oauth2importservice_account# Credentials from GCP service account saved in a json file.credentials = service_account.Credential.from_service_account_file('./google_credentials.json')defslack_notification_bot(credentials, slack_webhook_url):    query ='''select  *

            from validation_result_table

            where validation_status = 'fail';'''validation_data = pd.read_gbq(query, project_id='gcp_project_id', credentials= credentials)    message ='''Validation Report:\n

    Error! Incomplete rows of data loaded'''iflen(validation_data) >0:        requests.post(slack_webhook_url, json={'text': message})else:return'done'defmain():# Run the slack notification botslack_notification_bot(credentials, slack_webhook_url)

结论

在本文中,你看到了为 ELT 数据管道设置监视/警报系统的必要性。

数据质量、管道可靠性和业务指标监视是要监视管道的关键指标。管道复杂性是数据团队在计划为 ELT 管道设置监视/警报系统时将面临的主要挑战之一。我建议使用像Airbyte这样的数据复制工具来降低这种复杂性。然后,我查看了不同的专有数据管道监视/警报工具及其开源替代方案。我进一步深入研究了如何使用数据验证工具 (DVT) 和数据构建工具 (dbt) 设置典型的监控/警报系统。最后,我们了解了如何构建一个机器人来触发管道的通知/警报。文章来源地址https://www.toymoban.com/news/detail-400860.html

到了这里,关于通过 DVT 和 dbt 测试监控Airbyte数据管道的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • js 通过 navigator.clipboard.writeText(textToCopy) 实现复制,测试环境可以,正式环境不行的解决方案。

    问题描述 : 代码: 测试环境下可以正常复制 ,但放到线上会报错:找不到 .writeText 百度分析: 在 Chrome 的 DevTools 控制台下执行  navigator.clipboard  返回  undefined ,经查找资料发现是浏览器禁用了非安全域的  navigator.clipboard  对象,哪些地址是安全的呢? 安全域包括本地访

    2024年02月13日
    浏览(41)
  • 液体泄露识别检测算法 监控识别管道液体泄漏

    液体泄露识别检测算法通过 yolov8+python网络模型技术,液体泄露识别检测算法对管道的液体泄露情况进行全天候不间断实时监测,检测到画面中管道设备液体泄露现象时,将自动发出警报提示相关人员及时采取措施。YOLOv8 算法的核心特性和改动可以归结为如下:提供了一个全

    2024年02月10日
    浏览(51)
  • flume组件以及通过命令监控大数据平台转态

    可 以 从 官 网 下 载 Flume 组 件 安 装 包 , 下 载 地 址 如 下 URL 链 接 所 示 https://archive.apache.org/dist/flume/1.6.0/ 步骤一:使用 root 用户设置 Flume 环境变量,并使环境变量对所有用户生效。 步骤二:修改 Flume 相应配置文件。 首先,切换到 hadoop 用户,并切换当前工作目录到

    2023年04月24日
    浏览(36)
  • 网络安全工具:通过监控分析日志数据保护企业网络

    由于混合工作模式的兴起以及业务运营向云环境的迁移,企业网络变得更加分散和复杂,仅安装外围安全解决方案只会创建一个基本的防御层,系统、服务器和其他网络实体会生成记录所有网络活动的日志。集中式日志管理系统可以帮助管理员自动监控网络日志,全面了解网

    2024年01月20日
    浏览(52)
  • Baumer工业相机堡盟工业相机如何通过BGAPISDK复制内存空间存储图像数据序列(C#)

    Baumer工业相机堡盟相机是一种高性能、高质量的工业相机,可用于各种应用场景,如物体检测、计数和识别、运动分析和图像处理。 Baumer的万兆网相机拥有出色的图像处理性能,可以实时传输高分辨率图像。此外,该相机还具有快速数据传输、低功耗、易于集成以及高度可扩

    2024年02月11日
    浏览(42)
  • 上位机软件wincc通过工业网关采集plc数据实现组态监控

    WinCC是一个组态软件,可以用于数据采集与监控、自动化控制、工业物联网等领域。WinCC可以帮助用户实现工厂自动化和过程自动化的解决方案,提供可视化的监控界面和数据采集分析功能,支持多种协议和设备,如Siemens、Modbus、OPC等。 如何使用WinCC采集PLC数据?工业网关可

    2024年02月15日
    浏览(49)
  • 驱动开发:通过PIPE管道与内核层通信

    在本人前一篇博文 《驱动开发:通过ReadFile与内核层通信》 详细介绍了如何使用应用层 ReadFile 系列函数实现内核通信,本篇将继续延申这个知识点,介绍利用 PIPE 命名管道实现应用层与内核层之间的多次通信方法。 什么是PIPE管道? 在Windows编程中,数据重定向需要用到管道

    2024年02月20日
    浏览(53)
  • 十款开源测试开发工具推荐(自动化、性能、混沌测试、造数据、流量复制)

    在本篇文章中,我将给大家推荐 10 款日常工作中经常用到的测试开发工具神器,涵盖了自动化测试、性能压测、流量复制、混沌测试、造数据等。 AutoMeter 是一款针对分布式服务,微服务 API 做功能和性能一体化的自动化测试平台,一站式提供发布单元,API,环境,用例,前

    2024年02月15日
    浏览(59)
  • 【墙裂推荐!】十款开源测试开发工具(自动化、性能、造数据、流量复制)​

    目录 1、AutoMeter-API 自动化测试平台 2、QA Wolf 浏览器自动化测试工具 3、Mimesis 用于 Python 的高性能虚假数据生成器 4、Ddosify 高性能负载测试工具 5、AutoCannon HTTP/1.1 基准测试工具 6、Sharingan 流量录制回放工具 7、randdata 随机测试数据生成工具 8、DrissionPage WEB 自动化测试集成工具

    2024年02月06日
    浏览(56)
  • Linux之实现Apache服务器监控、数据库定时备份及通过使用Shell脚本发送邮件

    目录  一、Apache服务器监控 为什么要用到服务监控? 实现Apache服务器监控 二、数据库备份 为什么要用到数据库备份? 实现数据库备份 三、Shell脚本发送邮件 为什么要用使用Shell脚本发送邮件? 实现Shell脚本发送邮件 在Linux中监控Apache服务器是非常重要的,原因如下: 保证

    2024年04月15日
    浏览(74)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包