Data Bricks Delta Lake 入门

这篇具有很好参考价值的文章主要介绍了Data Bricks Delta Lake 入门。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Delta Lake 是一个开源存储层,它将关系数据库语义添加到基于 Spark 的数据湖处理中。 适用于 PySpark、Scala 和 .NET 代码的 Azure Synapse Analytics Spark , Azure DataBricks 都支持 Delta Lake。在大数据这个领域,对象存储的最影响效率的问题就是针对对象存储数据的更新,传统的对象存储如AWS 的S3 , Azure的 Blob等如果要更新要给对象数据的时候,必须要先将对象查找到并删除,然后再追加。这通常会导致性能效率低下。Delta Lake很高的的解决了对象数据更新的问题,并同时支持实时数据流的更新,主要功能如下:

  • 支持查询和数据修改的关系表。 使用 Delta Lake,可以将数据存储在支持 CRUD(创建、读取、更新和删除)操作的表中。 换句话说,可以采用与在关系数据库系统中相同的方式选择、插入、更新和删除数据行。
  • 支持 ACID 事务。 关系数据库旨在支持事务数据修改,这些修改提供原子性(事务作为单个工作单元完成)、一致性(事务使数据库保持一致状态)、隔离(进行中的事务不能相互干扰)和持久性(事务完成时,它所做的更改将保留)。 Delta Lake 通过实现事务日志并强制实施并发操作的可序列化隔离,为 Spark 提供相同的事务支持。
  • 数据版本控制和按时间顺序查看。 由于所有事务都记录在事务日志中,因此可以跟踪每个表行的多个版本,甚至使用按时间顺序查看功能在查询中检索某行的先前版本。
  • 支持批处理和流式处理数据。 虽然大多数关系数据库包括存储静态数据的表,但 Spark 包含通过 Spark 结构化流式处理 API 流式处理数据的本机支持。 Delta Lake 表可用作流式处理数据的接收器(目标)和源。
  • 标准格式和互操作性。 Delta Lake 表的基础数据以 Parquet 格式存储,该格式通常用于数据湖引入管道。

以下开始Delta的入门操作:

使用免费的Azure Data Bricks 的工作区,参加如下链接:

利用 Azure Data Bricks的免费资源学习云上大数据-CSDN博客

一、创建 Delta Lake 表 

1、从数据帧创建 Delta Lake 表

创建Ddelta Lake表,以增量格式保存数据帧,指定应存储表的数据文件和相关元数据信息的路径

下载试验用数据:

 %sh
 rm -r /dbfs/delta_lab
 mkdir /dbfs/delta_lab
 wget -O /dbfs/delta_lab/products.csv https://raw.githubusercontent.com/MicrosoftLearning/mslearn-databricks/main/data/products.csv

例如:使用现有文件中的数据加载数据帧,然后将该数据帧以增量格式保存到新文件夹位置:

# Load a file into a dataframe
df = spark.read.load('/delta_lab/products.csv', format='csv', header=True)

# Save the dataframe as a delta table
delta_table_path = "/delta/mydata"
df.write.format("delta").save(delta_table_path)

2、验证保存的Delta Lake数据文件

保存 delta 表后,指定的路径位置包括数据的 parquet 文件

%sh 
ls /dbfs/delta/mydata

执行后结果如下:

Data Bricks Delta Lake 入门,flask,python,后端

 可以使用覆盖模式将现有 Delta Lake 表替换为数据帧的内容,如下所示:

new_df.write.format("delta").mode("overwrite").save(delta_table_path)

还可以使用追加模式将数据帧中的行添加到现有表:

new_rows_df.write.format("delta").mode("append").save(delta_table_path)

二、根据条件进行表的更新

1、首先查看原始表的数据:

df1 = df.select("ProductName", "ListPrice").where((df["ProductName"]=="Road-750 Black, 58"))
display(df1)

执行结果:

Data Bricks Delta Lake 入门,flask,python,后端

2、执行数据更新语句

虽然可以在数据帧中进行数据修改,然后通过覆盖数据来替换 Delta Lake 表,但数据库中的一种更常见的模式是插入、更新或删除现有表中的行作为离散事务操作。 若要对 Delta Lake 表进行此类修改,可以使用 Delta Lake API 中的 DeltaTable 对象,该对象支持更新、删除和合并操作。 例如,可以使用以下代码更新 category 列值为“Accessories”的所有行的 price 列:

from delta.tables import *
from pyspark.sql.functions import *

# Create a deltaTable object
deltaTable = DeltaTable.forPath(spark, delta_table_path)

# Update the table (reduce price of accessories by 10%)
deltaTable.update(
    condition = "ProductName=='Road-750 Black, 58'",
    set = { "ListPrice": "ListPrice * 0.9" })

执行结果:

Data Bricks Delta Lake 入门,flask,python,后端

3、查看验证数据是否更新:

df2 = spark.read.load('/delta/mydata')
df3= df2.select("ProductName", "ListPrice").where((df2["ProductName"]=="Road-750 Black, 58"))
display(df3)

执行结果:

Data Bricks Delta Lake 入门,flask,python,后端

 三、查询表以前的版本

Delta Lake 表支持通过事务日志进行版本控制。 事务日志记录对表进行的修改,指出每个事务的时间戳和版本号。 可以使用此记录的版本数据查看表以前的版本 - 称为按时间顺序查看的功能。

可以通过将数据从 delta 表位置读取到数据帧中,将所需版本指定为 versionAsOf 选项,从 Delta Lake 表的特定版本检索数据:

df4 = spark.read.format("delta").option("versionAsOf", 0).load(delta_table_path)

执行结果:

Data Bricks Delta Lake 入门,flask,python,后端

可以使用 timestampAsOf 选项指定时间戳:

df4 = spark.read.format("delta").option("timestampAsOf", '2024-01-16 09:36:23').load(delta_table_path)

查看历史变化

deltaTable.history(10).show(10, False, True)

 执行结果:

Data Bricks Delta Lake 入门,flask,python,后端

四、创建和查询目录表

下面将Delta Lake 表定义为 Spark 群集的 Hive 元存储中的目录表,并使用 SQL 来进行处理。

Spark 目录中的表(包括 Delta Lake 表)可以是托管或外部表(非托管表)

  • 托管表是在没有指定位置的情况下定义的,数据文件存储在元存储使用的存储中。 删除表不仅会从目录中删除其元数据,还删除存储其数据文件的文件夹。
  • 外部表指的是为自定义文件位置定义外部表,其中存储了表的数据。 表的元数据定义在 Spark 目录中。 删除表会从目录中删除元数据,但不会影响数据文件。

1、使用数据帧来创建目录表

可以使用 saveAsTable 操作写入数据帧来创建托管表和非托管表,如以下示例所示:

# 托管表 Save a dataframe as a managed table 
df.write.format("delta").saveAsTable("MyManagedTable")

# 非托管表 specify a path option to save as an external table
df.write.format("delta").option("path", "/mydata").saveAsTable("MyExternalTable")

 2、使用 SQL 创建目录表

可以使用含 USING DELTA 子句的 CREATE TABLE SQL 语句和用于外部表的可选 LOCATION 参数来创建目录表。 可以使用 SparkSQL API 运行语句,如以下示例所示:

使用Spark SQL 创建一个外部表

spark.sql("CREATE DATABASE AdventureWorks")
spark.sql("CREATE TABLE AdventureWorks.ProductsExternal USING DELTA LOCATION '{0}'".format(delta_table_path))
spark.sql("DESCRIBE EXTENDED AdventureWorks.ProductsExternal").show(truncate=False)

执行结果:

Data Bricks Delta Lake 入门,flask,python,后端

使用SQL直接查询

%sql
USE AdventureWorks;
SELECT * FROM ProductsExternal;

 执行结果: 

Data Bricks Delta Lake 入门,flask,python,后端

创建一个完全托管的表

df.write.format("delta").saveAsTable("AdventureWorks.ProductsManaged")
spark.sql("DESCRIBE EXTENDED AdventureWorks.ProductsManaged").show(truncate=False)

 Data Bricks Delta Lake 入门,flask,python,后端

 查询该表:

%sql
USE AdventureWorks;
SELECT * FROM ProductsManaged;

执行结果:

Data Bricks Delta Lake 入门,flask,python,后端

3、比较外部表和托管表的差别

执行如下语句:

%sql
USE AdventureWorks;
SHOW TABLES;

结果如下: 

Data Bricks Delta Lake 入门,flask,python,后端

执行下面语句来看两个表所在的不同位置:

 %sh
 echo "External table:"
 ls /dbfs/delta/mydata
 echo
 echo "Managed table:"
 ls /dbfs/user/hive/warehouse/adventureworks.db/productsmanaged

执行结果:

Data Bricks Delta Lake 入门,flask,python,后端

执行下面命令将表删除,看有何不同

%sql
USE AdventureWorks;
DROP TABLE IF EXISTS ProductsExternal;
DROP TABLE IF EXISTS ProductsManaged;
SHOW TABLES;
 %sh
 echo "External table:"
 ls /dbfs/delta/mydata
 echo
 echo "Managed table:"
 ls /dbfs/user/hive/warehouse/adventureworks.db/productsmanaged

 执行结果如下:Managed Table的数据文件已经不在,外部表的数据文件还在

Data Bricks Delta Lake 入门,flask,python,后端

五、使用 Delta Lake 对数据进行流式处理 

假设我们是一个IoT设备的流式数据,数据结构如下:

Data Bricks Delta Lake 入门,flask,python,后端

 1、下载JSON格式的流式数据文件

 %sh
 rm -r /dbfs/device_stream
 mkdir /dbfs/device_stream
 wget -O /dbfs/device_stream/devices1.json https://raw.githubusercontent.com/MicrosoftLearning/mslearn-databricks/main/data/devices1.json

 执行结果如下:Data Bricks Delta Lake 入门,flask,python,后端

下面基于JSON文件所在的文件夹创建iotstream,如下命令: 

from pyspark.sql.types import *
from pyspark.sql.functions import *
   
# Create a stream that reads data from the folder, using a JSON schema
inputPath = '/device_stream/'
jsonSchema = StructType([
StructField("device", StringType(), False),
StructField("status", StringType(), False)
])
iotstream = spark.readStream.schema(jsonSchema).option("maxFilesPerTrigger", 1).json(inputPath)
print("Source stream created...")

执行结果:

 Data Bricks Delta Lake 入门,flask,python,后端

 将数据流写入delta表

# Write the stream to a delta table
delta_stream_table_path = '/delta/iotdevicedata'
checkpointpath = '/delta/checkpoint'
deltastream = iotstream.writeStream.format("delta").option("checkpointLocation", checkpointpath).start(delta_stream_table_path)
print("Streaming to delta sink...")

如下图:数据作业已经启动,开始实时写入 

 Data Bricks Delta Lake 入门,flask,python,后端

读取实时表中的数据:

# Read the data in delta format into a dataframe
df = spark.read.format("delta").load(delta_stream_table_path)
display(df)

执行结果: 9条记录

Data Bricks Delta Lake 入门,flask,python,后端

 基于这个stream创建一个目录表

# create a catalog table based on the streaming sink
spark.sql("CREATE TABLE IotDeviceData USING DELTA LOCATION '{0}'".format(delta_stream_table_path))

 查询这张表:

%sql
SELECT * FROM IotDeviceData;

执行结果 如下:9条记录 

 Data Bricks Delta Lake 入门,flask,python,后端

执行以下语句刷新Iot数据到Stream

 %sh
 wget -O /dbfs/device_stream/devices2.json https://raw.githubusercontent.com/MicrosoftLearning/mslearn-databricks/main/data/devices2.json

 执行结果:

Data Bricks Delta Lake 入门,flask,python,后端 查看数据是否已经更新到表中,执行如下语句:

%sql
SELECT * FROM IotDeviceData;

执行结果如下:从9条增加到16条 

 Data Bricks Delta Lake 入门,flask,python,后端

 执行如下语句:停止Steam文章来源地址https://www.toymoban.com/news/detail-807577.html

deltastream.stop()

到了这里,关于Data Bricks Delta Lake 入门的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【python】flask模板渲染引擎Jinja2,通过后端数据渲染前端页面

    ✨✨ 欢迎大家来到景天科技苑✨✨ 🎈🎈 养成好习惯,先赞后看哦~🎈🎈 🏆 作者简介:景天科技苑 🏆《头衔》:大厂架构师,华为云开发者社区专家博主,阿里云开发者社区专家博主,CSDN新星创作者,掘金优秀博主,51CTO博客专家等。 🏆《博客》:Python全栈,前后端开

    2024年04月11日
    浏览(56)
  • 大白话说Python+Flask入门(一)

    技术这东西就得用,不用就会忘,之前写博客感觉就是给自己记笔记用,还有大部分,估计睡在语雀里都落灰了,哈哈! 在Python领域,我觉得我还是算个小白吧,会写讲不明白,所以我决定想做一件事,先搞下flask这部分教程,看看能给大家说明白吗,真的感觉和 Java 有很大

    2024年02月05日
    浏览(53)
  • 大白话说Python+Flask入门(二)

    笔者技术真的很一般,也许只靠着 笨鸟先飞的这种傻瓜坚持 ,才能在互联网行业侥幸的 生存下来 吧! 为什么这么说? 我曾不止一次在某群,看到说我写的东西一点技术含量都没有,而且很没营养,换作一年前的我,也许会怼回去, 现在的话,我只是看到了,完事忘记了。

    2024年02月05日
    浏览(54)
  • Python之Flask入门教程

    Flask是一个用python编写的Web应用程序框架。Armin Ronacher带领一个名为Pocco的国际Python爱好者团队开发了Flask。Flask基于Werkzeug WSGI工具包和Jinja2模板引擎。两者都是Pocco项目。 Flask也被称为“microframework” ,因为它使用简单的核心,用extension增加其他功能。Flask没有默认使用的数据

    2024年02月02日
    浏览(58)
  • 大白话说Python+Flask入门(三)

    今天状态很不好,我发现学这部分知识的时候,会出现溜号或者注意力无法集中的情况。 我能想到的是,大概率是这部分知识,应该是超出了我现在的水平了,也就是说我存在知识断层了,整体感觉真的是一知半解。 那有同学会问了,那你能说明白吗? 我理解的肯定能呀,

    2024年02月05日
    浏览(50)
  • Python-flask项目入门

    一、flask对于简单搭建一个基于python语言-的web项目非常简单 二、项目目录 示例代码 git路径  三、代码介绍 1、安装pip依赖 2.配置数据源 config.py 3、引用orm框架 访问数据库 /mapper/exts.py 4、启动文件 app.py 5、数据库操作 通过Flask提供orm框架对数据库进行操作 5.1增加 5.2删除 5.3修

    2024年02月14日
    浏览(36)
  • Python框架之Flask入门和视图

    需要安装 Pycharm专业版 Python后端的2个主流框架 Flask 轻量级框架 Django 重型框架 Flask是一个基于Python实现的web开发微框架 官方文档:https://flask.palletsprojects.com/ 中文文档:https://dormousehole.readthedocs.io/ Flask是一个基于MVC设计模式的Web后端框架 MVC: M: Model 数据模型 V: View 界面 C:

    2024年02月06日
    浏览(43)
  • Python光速入门 - Flask轻量级框架

            FlASK是一个轻量级的WSGI Web应用程序框架,Flask的核心包括Werkzeug工具箱和Jinja2模板引擎,它没有默认使用的数据库或窗体验证工具,这意味着用户可以根据自己的需求选择不同的数据库和验证工具。Flask的设计理念是保持核心简单,同时提供强大的扩展性,用户

    2024年03月14日
    浏览(109)
  • Python Web 开发之 Flask 入门实践

    导语:Flask 是一个轻量级的 Python Web 框架,广受开发者喜爱。本文将带领大家了解 Flask 的基本概念、搭建一个简单的 Web 项目以及如何进一步扩展功能。 Flask 是一个基于 Werkzeug 和 Jinja2 的微型 Web 框架,它的特点是轻量、易学习、可扩展。使用 Flask,我们可以快速构建 Web 应

    2024年01月22日
    浏览(61)
  • 大白话说Python+Flask入门(六)Flask SQLAlchemy操作mysql数据库

    这篇文章被搁置真的太久了,不知不觉拖到了周三了,当然,也算跟falsk系列说再见的时候,真没什么好神秘的,就是个数据库操作,就大家都知道的 CRUD 吧。 1、Flask SQLAlchemy简介 Flask SQLAlchemy 是基于 Flask web 框架和 SQLAlchemy ORM (对象关系映射)的工具。它旨在为 Flask web 应用

    2024年02月05日
    浏览(71)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包