【数据库】Python脚本实现数据库批量插入事务

这篇具有很好参考价值的文章主要介绍了【数据库】Python脚本实现数据库批量插入事务。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

背景介绍

在工作中可能会遇到需要批量插入的场景, 而批量插入的过程具有耗时长的特点, 再此过程很容易出现程序崩溃的情况.为了解决插入大量数据插入后崩溃导致已插入数据无法清理未插入数据无法筛出的问题, 需要编写一个脚本记录已插入和未插入的数据, 并可以根据记录的数据选择数据回滚或是继续执行任务; 因此需要对批量插入实现事务的机制.

概要设计

参考数据库的实现原理详见我之前写的文章的Recovery章节1; 事务的实现可以通过redo log和undo log实现, 这篇文章提供了三种算法, 但只有算法3才能实现事务的回滚和继续, 虽然说只有算法三能够达到预期的目标, 但类似上面三种算法的分类, 依旧存在多种写入顺序, 他们在各自的场景中都有各自的优势:

  1. 先写redo log, 再写磁盘, 写磁盘的过程中写undo log; 这样便于数据的再执行
  2. 同时写redo log和undo log; 每写一条undo log即可写一次磁盘; 这样效率最高, 但实现较为复杂

一般来说, 在执行失败之后都是选择未执行的继续执行, 因此这里选择方案1

注意; 倘若要实现事务, 这里的redo操作和undo操作需要有幂等性 在MySQL中redo可以用insert on duplicate key实现, 而undo对应的删除本身就是幂等的

有了redo log和undo log, 系统则可以根据保存的信息恢复或回滚任务, 然而系统恢复和清理过程依赖于系统当前的状态, 而数据的写入和回滚也会导致系统状态的改变; 因此系统设计的基本原则即是将系统的状态保存, 再根据当前的状态选择下一步的操作.

任务状态的保存和恢复

使用文件保存任务的状态, 文件名即任务的状态.

当系统崩溃之后, 重启可以读取到崩溃前的状态, 此时只需要根据下面的状态机图就可以识别出崩溃恢复时对应的状态.

在该系统中, 我使用枚举值表达系统的状态

class ResourceManagerStatus(Enum):
    STARTING = "STARTING"
    PREPARING = "PREPARING"
    INSERTING = "INSERTING"
    UNPREPARED = "UNPREPARED"
    BLOCKED = "BLOCKED"
    CANCELLED = "CANCELLED"
    FINISHED = "FINISHED"
    CLEANING = "CLEANING"

系统状态转移

下面是系统的状态机图, 描述了整个系统的转换过程

【数据库】Python脚本实现数据库批量插入事务,数据库,python,开发语言

该系统主要考虑了两种可能, 一是正常工作的情况, 二是系统崩溃的境况. 为了在系统崩溃后能够识别系统当前的状态, 需要将所有的正常状态用文件保存下来除Starting, 因为它是默认状态; 当系统崩溃后, 即对应上图中的crush事件, 此时尽管无法修改文件以更新状态, 但在系统重启的时候, 会自动根据crush箭头的方向设置的对应的状态.

在能够保存状态之后, 则需要对系统的运行状态进行划分, 大致可以分为正常运行的状态崩溃状态两大类:

  1. 正常运行的状态: 指系统在运行尚未崩溃下所处的大类, 此时状态保存在内存当中, 包括:
    1. Starting: 初始状态
    2. Preparing: 准备状态, 处于该状态说明系统正在写redo log
    3. Inserting: 插入状态, 说明redo log已经完成写入, 系统正在插入
    4. Cleaning: 清理状态, 事务已经完成, 可以对删除临时文件
    5. Cancelled: 取消状态, 说明任务造成的影响已经回滚, 可以删除临时文件
  2. 崩溃的状态: 当系统崩溃后重启, 其所处的状态可能发生改变:
    1. Unprepared: redo log写入过程中系统发生了崩溃, 此时恢复只能够重新写redo log
    2. Blocked: 系统插入的过程中发生了崩溃, 由于redo log存在, 因此可以根据redo log继续插入; 但虽然redo具有幂等性, 可以重新执行redo, 但毕竟都保存了undo log, 计算他们之间的差值然后选择未插入的插入性能更好, 所以可以只重新执行未执行的任务
    3. Cleaning: 系统已经完成任务, 但清理文件的过程中发生了崩溃, Cleaning状态会被持久化到磁盘上, 因此可以直接回复该状态, 并继续清理

代码实现

由上一章节可以得出, 任务的执行本质上对应了状态的变化, 因此只需要将不同的状态抽象出来则可以控制程序的执行流程, 又因为状态的转换规则具有明显的偏向, 因此使用if判断状态的转移要远远优于将状态封装成类.

剩下的就是任务的拆分和执行过程, 对应了sql语句的执行.

任务的抽象

很明显, 只有可以分批执行的任务才能分批执行, 并且分批redo和undo; 为此可以抽象出一个类DividableDataSource代表可以分批执行的任务

class MetaInfo(BaseModel):
    total: int
    batchSize: int


class Log(BaseModel):
    ids: Tuple[str, ...]
    start: int
    end: int

    class Config:
        frozen = True
        
class DividableDataSource(ABC):

    @abstractmethod
    def getMetaInfo(self) -> MetaInfo:
        pass

    @abstractmethod
    def setMetaInfo(self, metaInfo: MetaInfo) -> None:
        pass

    @abstractmethod
    def divide(self) -> Iterable[Log]:
        pass

    @abstractmethod
    def update(self, log: Log) -> None:
        pass

    @abstractmethod
    def rollback(self, log: Log) -> None:
        pass

前两个方法是对MetaInfo的操作, 顾名思义MetaInfo保存了总任务的元数据, 它需要根据这些进行任务的分批; 具体任务的分批则会调用divide方法, 它会根据元数据进行分批; 这个分批过程其实并不需要一次性计算出来, 因此可以返回一个迭代器实际操作中可以将分批过程保存在next中, 甚至可以异步计算next, 这个得带器则包含了分区的信息, 即上面对应的Log类. 在完成了分区之后, 则需要对redo和undo实现, 他们分别对应了上面的updaterollback; 之所以这么命名是想要强调他们的幂等性

事务管理器

在完成了对可以分批处理, 且可以重做/撤销的任务的抽象后, 则可以对具有该特征的所有类进行事务的管理. 它包含了四个核心的方法: prepare, insert, rollback, restore; 用于管理事务的进行

prepare方法

prepare方法对应了事务的准备, 即写入undo log的过程

    def prepare(self) -> None:
        assert_equal(self.status, ResourceManagerStatus.STARTING,
                     "Can only prepare under status 'STARTING'")
        self.__changeStatus(ResourceManagerStatus.PREPARING)
        with open("./cache/MetaInfo.json", "w", encoding=config.encoding) as metaInfoOutput:
            metaInfoOutput.write(
                self.datasource.getMetaInfo().model_dump_json())
        with open("./cache/redo.json", "w", encoding=config.encoding) as logOutput:
            for line in self.datasource.divide():
                logOutput.write(line.model_dump_json()+"\n")
        open("./cache/undo.json", "w", encoding=config.encoding).close()
        self.__changeStatus(ResourceManagerStatus.INSERTING)

该系统采用json格式记录信息, 并逐行写入信息, 使用json格式记录保证了程序的可测试性, 便于错误的定位. 这个过程也很简单, 就是不要忘了各个状态之间的转换条件即可. 毕竟在插入状态再"准备"以此要重新计算和写入所有的redo log; 这毫无意义.

insert方法

    def insert(self) -> None:
        assert_equal(self.status, ResourceManagerStatus.INSERTING,
                     "can only insert under status 'INSERTING'")
        with open("./cache/redo.json", "r", encoding=config.encoding) as redo, open("./cache/undo.json", "a", encoding=config.encoding) as undo:
            for logStr in redo:
                log = Log.model_validate_json(logStr)
                self.datasource.update(log)
                undo.write(log.model_dump_json()+"\n")
                undo.flush()
            self.__changeStatus(ResourceManagerStatus.CLEANING)

在记录了redo log之后, 则可根据redo log中的信息不断进行插入, 插入过程还需要写undo log以实现可以对已写入数据的回滚, 这里需要注意每写入一条数据需要进行一次**flush**, 否则系统崩溃C库和操作系统的磁盘缓存都不会写入磁盘, 进而导致数据的丢失

restore方法

    def restore(self) -> None:
        # 还没有完成redo log的持久化, 直接删除临时文件
        if self.status == ResourceManagerStatus.UNPREPARED:
            os.remove("./cache/redo.json")
            with open("./cache/MetaInfo.json") as metaInfo:
                self.datasource.setMetaInfo(
                    MetaInfo.model_validate_json(metaInfo.read()))
            self.__changeStatus(ResourceManagerStatus.STARTING)
        # 已经完成redo log的持久化了, 完成redo log和undo log差的条目即可完成所有的持久化
        elif self.status == ResourceManagerStatus.BLOCKED:
            self.__changeStatus(ResourceManagerStatus.INSERTING)
            redoSet: Set[Log] = set()
            undoSet: Set[Log] = set()
            with open("./cache/redo.json", "r", encoding=config.encoding) as redo:
                for line in redo:
                    log = Log.model_validate_json(line)
                    redoSet.add(log)
            with open("./cache/undo.json", "r", encoding=config.encoding) as undo:
                for line in undo:
                    log = Log.model_validate_json(line)
                    undoSet.add(log)
            with open("./cache/undo.json", "a", encoding=config.encoding) as undo:
                for diff in redoSet - undoSet:
                    self.datasource.update(diff)
                    undo.write(diff.model_dump_json() + "\n")
            self.__changeStatus(ResourceManagerStatus.CLEANING)

对于失败的任务, 可以根据redo log进行恢复, 这里只需要判断当前状态是不是block即可判断redo log是否已经成功写入. 如上文所说的, 这里采用了计算redo log和undo log的差值然后再根据差值重新执行, 而非一味利用redo的幂等性执行, 对性能有所优化

rollback的实现

    def rollback(self) -> None:
        if self.status in ResourceManagerStatus.halfCommittedStatus():
            assert_true(os.path.exists("./cache/undo.json"),
                        "Can't find undo log")
            self.__changeStatus(ResourceManagerStatus.INSERTING)
            with open("./cache/undo.json", "r", encoding=config.encoding) as undo:
                for line in undo:
                    undoLog = Log.model_validate_json(line)
                    self.datasource.rollback(undoLog)
        self.__changeStatus(ResourceManagerStatus.CANCELLED)

rollback的实现相比于restore就简单很多, 它只需要执行所有redo log中反序列化出的信息即可.

测试

创建测试用表

CREATE TABLE score_source
(
    id           CHAR(36) PRIMARY KEY,
    student_name VARCHAR(64),
    course_name  VARCHAR(64),
    score        DOUBLE
);

这里使用Faker产生随机的数据, 并模拟批量插入的过程

class FakerDataSource(DividableDataSource):
    db = pymysql.connect(host=config.dbHost,
                         user=config.dbUser,
                         password=config.dbPassword,
                         database=config.dbName)

    def __init__(self, total: int = 0, batchSize: int = 0) -> None:
        self.metaInfo = MetaInfo(total=total, batchSize=batchSize)

    def getMetaInfo(self) -> MetaInfo:
        return self.metaInfo

    def setMetaInfo(self, metaInfo: MetaInfo) -> None:
        self.metaInfo = metaInfo

    def divide(self) -> Iterable[Log]:
        start, end = 0, 0
        ret: List[Log] = []
        while end < self.metaInfo.total:
            start, end = end, min(self.metaInfo.total,
                                  end+self.metaInfo.batchSize)
            ret.append(Log(ids=tuple(str(uuid.uuid1())
                                     for _ in range(start, end)), start=start, end=end))
        return ret

    def update(self, log: Log) -> None:
        cursor = self.db.cursor()
        for id in log.ids:
            cursor.execute(f"""INSERT INTO score_source (id, student_name, course_name, score) VALUE ('{id}', '{faker.name()}', '{faker.job()}', {faker.random_int(0, 10000)/100})
ON DUPLICATE KEY UPDATE student_name='{faker.name()}',
                        course_name='{faker.job()}',
                        score={faker.random_int(0, 10000)/100}""")
        self.db.commit()

    def rollback(self, log: Log) -> None:
        cursor = self.db.cursor()
        for id in log.ids:
            cursor.execute(f"""DELETE
    FROM score_source
    WHERE id ='{id}'""")
        self.db.commit()

这里通过insert on duplicate key update实现了redo的幂等性, delete本身则具有幂等性. 对于需要持久化的信息, 使用了uuid来保存, 这样便可以根据uuid找到已经插入或者需要回滚的数据.

下面是几个简单的测试用例, 分别测试了普通插入/恢复/回滚的功能:

class TransactionTest(unittest.TestCase):
    def setUp(self) -> None:
        db.cursor().execute("TRUNCATE score_source;")
        db.commit()

    def test_shouldPrepared(self) -> None:
        cursor = db.cursor()
        rm = ResourceManager(FakerDataSource(config.total, config.batchSize))
        rm.prepare()
        self.assertTrue(os.path.exists("./cache/INSERTING"),
                        "status should be 'INSERTING'")
        self.assertTrue(os.path.exists("./cache/redo.json"),
                        "redo log should be created")
        self.assertTrue(os.path.exists("./cache/undo.json"),
                        "undo log should be created")
        self.assertTrue(os.path.exists("./cache/MetaInfo.json"),
                        "MetaInfo file should be created")
        with open("./cache/redo.json") as redo:
            redoLogSize = sum(1 for _ in redo)
            self.assertEqual(redoLogSize, math.ceil(config.total / config.batchSize),
                             f"size of redo log should be {math.ceil(config.total / config.batchSize)}")
        with open("./cache/undo.json") as undo:
            undoLogSize = sum(1 for _ in undo)
            self.assertEqual(undoLogSize, 0,
                             f"size of redo log should be {0}")
        cursor.execute("SELECT COUNT(*) FROM score_source;")
        self.assertEqual(cursor.fetchone()[
                         0], 0, "nothing should be insert right now")

    # NOTICE: this should be executed after test_shouldPrepared
    def test_shouldRestore(self) -> None:
        cursor = db.cursor()
        rm = ResourceManager(FakerDataSource(config.total, config.batchSize))
        rm.restore()
        cursor.execute("SELECT COUNT(*) FROM score_source;")
        self.assertEqual(cursor.fetchone()[
                         0], config.total, "all data should be inserted")
        rm.restore()
        cursor.execute("SELECT COUNT(*) FROM score_source;")
        self.assertEqual(cursor.fetchone()[
                         0], config.total, "redo should be idempotent")
        with open("./cache/redo.json") as redo:
            redoLogSize = sum(1 for _ in redo)
            self.assertEqual(redoLogSize, math.ceil(config.total / config.batchSize),
                             f"size of redo log should be {math.ceil(config.total / config.batchSize)}")
        with open("./cache/undo.json") as undo:
            undoLogSize = sum(1 for _ in undo)
            self.assertEqual(undoLogSize, math.ceil(config.total / config.batchSize),
                             f"size of redo log should be {math.ceil(config.total / config.batchSize)}")
        self.assertTrue(os.path.exists("./cache/CLEANING"),
                        "status should be CLEANING")
        rm.clean()
        self.assertEqual(len(os.listdir("./cache")), 0,
                         "all temporary file should be cleaned")

    def test_shouldRollback(self) -> None:
        cursor = db.cursor()
        rm = ResourceManager(FakerDataSource(config.total, config.batchSize))
        rm.prepare()
        rm.insert()
        cursor.execute("SELECT COUNT(*) FROM score_source;")
        self.assertEqual(cursor.fetchone()[
                         0], config.total, "data should be inserted")

        rm.rollback()
        db.commit()
        cursor.execute("SELECT COUNT(*) FROM score_source;")
        self.assertEqual(cursor.fetchone()[
                         0], 0, "should rollback")

        rm.rollback()
        cursor.execute("SELECT COUNT(*) FROM score_source;")
        self.assertEqual(cursor.fetchone()[
                         0], 0, "rollback should be idempotent")
        rm.clean()

    def tearDown(self) -> None:
        db.cursor().execute("TRUNCATE score_source;")
        db.commit()

优化

  1. DividableDataSource#divide改为异步生成的迭代器; 这样便可以延迟分片的计算, 以提高性能
  2. 使用多线程实现概要设计中对应的方案2, 对于io密集型应用可以极大地提高效率; 如一个线程写redo log, 一个线程写undo log, 多个线程执行update/rollback

引用


  1. 【精选】【数据库】数据库笔记_is a software package,designed to store,retrieve,q-CSDN博客 ↩︎文章来源地址https://www.toymoban.com/news/detail-717912.html

到了这里,关于【数据库】Python脚本实现数据库批量插入事务的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • MySQL 数据库表格创建、数据插入及获取插入的 ID:Python 教程

    要在MySQL中创建表格,请使用\\\"CREATE TABLE\\\"语句。 确保在创建连接时定义了数据库的名称。 示例创建一个名为 \\\"customers\\\" 的表格: 如果上述代码没有出现错误,那么您已成功创建了一个表格。 您可以通过使用\\\"SHOW TABLES\\\"语句列出数据库中的所有表格来检查表格是否存在: 示例返

    2024年02月05日
    浏览(52)
  • Python读取excle文件,插入到数据库

     一、需求背景         最近项目实践过程中遇到了一个问题:在使用Navicat将数据导入到PostgreSQL数据库时,发现时间格式的字段中的时间数值发生了变化,导致部分数据的时间不正确,故数据手动导入数据库报错。为了解决这个问题,决定编写Python代码来读取Excel文件,

    2024年02月16日
    浏览(38)
  • 【数据库】事务的隔离级别以及实现原理

    经常提到数据库的事务,那你知道数据库还有事务隔离的说法吗,事务隔离还有隔离级别,那什么是事务隔离,隔离级别又是什么呢?本文就帮大家梳理一下。 事务,由一个有限的数据库操作序列构成,这些操作要么全部执行,要么全部不执行,是一个不可分割的工作单位。

    2023年04月26日
    浏览(43)
  • Go 语言实现 MySQL 数据库事务

    MySQL事务是指一组数据库操作,它们被视为一个逻辑单元,并且要么全部成功执行,要么全部回滚(撤销)。事务是数据库管理系统提供的一种机制,用于确保数据的一致性和完整性。 事务具有以下特性(通常由ACID原则定义): 原子性(Atomicity):事务中的所有操作要么全

    2024年02月08日
    浏览(47)
  • MySql数据库实现数据存在则更新,不存在则插入

    1.存在则更新(不影响其他字段),不存在则插入 如上语句的意思的意思是如果字段1不存在(主键,索引或者唯一条件不存在),则执行插入语句,存在则执行更新语句,该更新只更新需要的字段,不影响其他字段的值; 2.存在则更新(先删除后更新),不存在则插入 如上

    2024年02月15日
    浏览(38)
  • 【Python笔记】Python + xlrd + pymysql读取excel文件数据并且将数据插入到MySQL数据库里面

    这篇文章,主要介绍Python + xlrd + pymysql读取excel文件数据并且将数据插入到MySQL数据库里面。 目录 一、Python读取excel 1.1、安装xlrd库 1.2、打开excel工作簿 1.3、获取sheet工作表 1.4、操作row数据行 1.5、操作column数据列 1.6、操作单元格 二、读取excel数据保存到MySQL 2.1、完整代码 2.

    2024年02月15日
    浏览(59)
  • 【cfengDB】自己实现数据库第0节 ---整体介绍及事务管理层实现

    LearnProj 本文作为数工底层的项目CfengDB开始篇章,介绍开发缘由和实现思路 cfeng之前对数据库研究不深入,之前只是能够做到基本的SQL查询和基本的慢SQL优化,之前拿到数据库系统工程师证书还是只在业务上对于DB系统使用更深入,但是cfeng基于work的理解,当作为一个优秀的产

    2024年02月16日
    浏览(43)
  • MySQL:想实现sql语句进行批量删除数据库或表,而引发的熬夜探究

    因为在自测过程中,创建了很多数据库,一个个手动删除属实有点对不起程序员这个身份,那么有没有简单的sql语句操作来进行批量删除数据库呢?于是便有了本篇文章 上面图片是AI创作,未经允许,不可商用哦! 删库跑路需谨慎, 放弃一切亦不易。 了解到数据库或表的信

    2024年01月16日
    浏览(60)
  • 关于Android Studio连接mysql数据库的过程和注册功能的实现(数据的插入)以及mysql环境变量的配置

    1.安装mysql数据库,安装的教程哔站有很多,版本尽量用mysql5.7的版本,用mysql8.0的版本与android studio进行连接的话可能会出现问题。 2.安装完成之后,给本机配置环境变量,步骤:如下图示 (1)通过搜索打开环境变量。 (2).点击环境变量。 (3).找到系统变量点击新建。

    2024年04月14日
    浏览(66)
  • 编写shell脚本,利用mysqldump实现MySQL数据库分库分表备份

     查看数据和数据表 删除头部Database和数据库自带的表  编写脚本 检查脚本运行备份数据库 分表分库备份成功 还原检测 删除数据库并查看库 开始还原 使用备份的库进行还原,由于是压缩文件,使用压缩还原 查看数据库  

    2024年02月05日
    浏览(48)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包