POSTGRESQL中ETL、fdw的平行替换

这篇具有很好参考价值的文章主要介绍了POSTGRESQL中ETL、fdw的平行替换。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

POSTGRESQL中ETL、fdw的平行替换

01、简介

“ 在我前两次的文章中,说到postgresql对于python的支持,其实很多功能也就可以封装进入的postgresql数据库中去。比如fdw、etl等,本文将以此为叙述点,进行演示展示”

POSTGRESQL中ETL、fdw的平行替换,大数据,python,postgresql,etl

在postgresql数据库中fdw的支持,在创建和使用上都不上太方便,特别是fdw在用表级别关联的时候,性能会大大折扣,因为fdw的数据并不会落地到本地​。所以我们可以利用postgresql对于python的支持,自行封装一个库对库的调度工具,将远端数据进行落地​后再次使用。对于使用的便利性,读者可自行​对比。

02、postgresql16.1的安装

安装依赖

yum install -y bison flex readline-devel zlib-devel zlib zlib-devel gcc  gcc-c++ openssl-devel python3  python3-devel libicu-devel ncurses-devel sqlite-devel tk-devel gcc make

添加用户

useradd postgres 
vim /etc/sudo

在101行以下添加以下内容


postgres ALL=(ALL)     NOPASSWD: ALL

进入官网找到链接,这里使用源码安装。

wget https://ftp.postgresql.org/pub/source/v16.1/postgresql-16.1.tar.gz

解压并进入解压目录


 mv postgresql-16.1.tar.gz /home/postgres
 su - postgres 
 tar -zxf postgresql-16.1.tar.gz
 cd postgresql-16.1

这里编译python支持还是很重要。–with-python 自行构建plpython3u插件


./configure --prefix=/home/postgres/pg --with-openssl  --with-python

make && make install

编辑环境变量


cd 
vim .bash_profile

加入以下环境变量

export PATH=/home/postgres/pg/bin:$PATH 
export PGDATA=/home/postgres/pg/data 

加载环境变量


source ~/.bash_profile

初始化数据库


initdb -D $PGDATA -U postgres -W 
(输入超级用户密码两次)
pg_ctl start 
pg_ctl status

进入数据库创建拓展


CREATE EXTENSION plpython3u CASCADE;

02、创建支持跨库访问的函数

首先下载python链接数据库所需module

postgres=# \! pip3 install -i https://mirrors.aliyun.com/pypi/simple/ cx_Oracle pyodbc pymysql --user 
Looking in indexes: https://mirrors.aliyun.com/pypi/simple/
Requirement already satisfied: cx_Oracle in ./.local/lib/python3.6/site-packages (8.3.0)
Collecting pyodbc
  Downloading https://mirrors.aliyun.com/pypi/packages/27/5c/5e472d714dea2a634bd79df6b8ace55737a9f50c8fbb3b15521fceda4694/pyodbc-4.0.39-cp36-cp36m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (330 kB)
     |████████████████████████████████| 330 kB 2.8 MB/s            
Collecting pymysql
  Downloading https://mirrors.aliyun.com/pypi/packages/4f/52/a115fe175028b058df353c5a3d5290b71514a83f67078a6482cff24d6137/PyMySQL-1.0.2-py3-none-any.whl (43 kB)
     |████████████████████████████████| 43 kB 2.4 MB/s             
Installing collected packages: pyodbc, pymysql
Successfully installed pymysql-1.0.2 pyodbc-4.0.39

在链接远程Oracle数据库,需要下载指定的客户端,本文使用的是oracle 19C

wget https://download.oracle.com/otn_software/linux/instantclient/1921000/oracle-instantclient19.21-basic-19.21.0.0.0-1.x86_64.rpm
sudo rpm -ivh oracle-instantclient19.21-basic-19.21.0.0.0-1.x86_64.rpm

编辑环境变量

vim /etc/profile

配置以下环境变量值

export LD_LIBRARY_PATH=/usr/lib/oracle/19.21/client64/lib:$LD_LIBRARY_PATH

加载环境变量

source /etc/profile

在postgresql数据库中创建具有跨库链接mysql\oracle\sqlserver功能的function。

CREATE OR REPLACE FUNCTION fdw_db(db_type varchar(100),host VARCHAR(100),port integer, username VARCHAR(100), password VARCHAR(100), db_name VARCHAR(100),tablename varchar(100))
RETURNS text AS $$

import cx_Oracle
import pyodbc
import pymysql

def read_data_from_database(db_type, host, port, username, password, db_name, table_name):
    result_values = []  # Initialize as an empty list

    # 读取Oracle数据库中指定表数据的函数
    if db_type.lower() == 'oracle':
        connection_string = f"{username}/{password}@{host}:{port}/{db_name}"
        connection = cx_Oracle.connect(connection_string)
        cursor = connection.cursor()
        cursor.execute(f'SELECT * FROM {table_name}')
        result = cursor.fetchall()
        cursor.close()
        connection.close()

        # 将结果转换为支持INSERT INTO的VALUES语句
        for row in result:
            values_str = ', '.join([f"'{value}'" if isinstance(value, str) else str(value) for value in row])
            result_values.append(f'({values_str})')

    # 读取SQL Server数据库中指定表数据的函数
    elif db_type.lower() == 'sqlserver':
        connection = pyodbc.connect(f"DRIVER={{SQL Server}};SERVER={host};port={port};DATABASE={db_name};UID={username};PWD={password}")
        cursor = connection.cursor()
        cursor.execute(f'SELECT * FROM {table_name}')
        result = cursor.fetchall()
        cursor.close()
        connection.close()

        # 将结果转换为支持INSERT INTO的VALUES语句
        for row in result:
            values_str = ', '.join([f"'{value}'" if isinstance(value, str) else str(value) for value in row])
            result_values.append(f'({values_str})')

    # 读取MySQL数据库中指定表数据的函数
    elif db_type.lower() == 'mysql':
        connection = pymysql.connect(host=host, user=username, password=password, database=db_name, port=port)
        cursor = connection.cursor()
        cursor.execute(f'SELECT * FROM {table_name}')
        result = cursor.fetchall()
        cursor.close()
        connection.close()

        # 将结果转换为支持INSERT INTO的VALUES语句
        for row in result:
            values_str = ', '.join([f"'{value}'" if isinstance(value, str) else str(value) for value in row])
            result_values.append(f'({values_str})')

    else:
        raise ValueError("Unsupported database type. Supported types: 'oracle', 'sqlserver', 'mysql'")

    # 返回拼接的VALUES子句
    return ', '.join(result_values)

insert_values = read_data_from_database(db_type, host, port, username, password, db_name, tablename)
return insert_values


$$ LANGUAGE plpython3u;

以Oracle作为测试 在Oracle 和PG中均创建测试表conn_fdw
postgresql

-- 创建表 conn_fdw
CREATE TABLE conn_fdw (
    id integer,
    name VARCHAR(50),
    age integer,
    city VARCHAR(50),
    salary integer
);

oracle中

-- 创建表 conn_fdw
CREATE TABLE conn_fdw (
    id NUMBER,
    name VARCHAR2(50),
    age NUMBER,
    city VARCHAR2(50),
    salary NUMBER
);

Oracle中插入数据

-- 插入20行数据
INSERT INTO conn_fdw VALUES (1, 'John', 30, 'New York', 50000);
INSERT INTO conn_fdw VALUES (2, 'Alice', 25, 'Los Angeles', 60000);
INSERT INTO conn_fdw VALUES (3, 'Bob', 35, 'Chicago', 70000);
INSERT INTO conn_fdw VALUES (4, 'Eva', 28, 'San Francisco', 55000);
INSERT INTO conn_fdw VALUES (5, 'Mike', 32, 'Seattle', 65000);
INSERT INTO conn_fdw VALUES (6, 'Sophia', 29, 'Boston', 75000);
INSERT INTO conn_fdw VALUES (7, 'David', 27, 'Denver', 52000);
INSERT INTO conn_fdw VALUES (8, 'Emily', 31, 'Austin', 68000);
INSERT INTO conn_fdw VALUES (9, 'Daniel', 26, 'Phoenix', 58000);
INSERT INTO conn_fdw VALUES (10, 'Olivia', 33, 'Houston', 72000);
INSERT INTO conn_fdw VALUES (11, 'Liam', 24, 'Portland', 49000);
INSERT INTO conn_fdw VALUES (12, 'Ava', 34, 'Atlanta', 71000);
INSERT INTO conn_fdw VALUES (13, 'Logan', 30, 'Miami', 62000);
INSERT INTO conn_fdw VALUES (14, 'Mia', 28, 'Dallas', 54000);
INSERT INTO conn_fdw VALUES (15, 'Jackson', 29, 'Minneapolis', 67000);
INSERT INTO conn_fdw VALUES (16, 'Sophie', 31, 'Detroit', 59000);
INSERT INTO conn_fdw VALUES (17, 'William', 27, 'Philadelphia', 70000);
INSERT INTO conn_fdw VALUES (18, 'Emma', 32, 'San Diego', 66000);
INSERT INTO conn_fdw VALUES (19, 'James', 26, 'Raleigh', 63000);
INSERT INTO conn_fdw VALUES (20, 'Avery', 35, 'Tampa', 71000);


此时再结合SQL语言进行处理远程连接传过来数据,再创建一个函数用于调用以上创建fdw_db

CREATE OR REPLACE FUNCTION inset_fdw_db(db_type varchar(100),host VARCHAR(100)
								  ,port integer, username VARCHAR(100), 
								  password VARCHAR(100), db_name VARCHAR(100),
								  tablename varchar(100),target_bale varchar(100))
RETURNS void AS $$
declare 
data_values text;
begin 
SELECT   fdw_db(db_type, host, port, username, password, db_name,tablename) into data_values;
 
EXECUTE 'insert into '||target_bale ||' values'||data_values;
end;

$$ LANGUAGE plpgsql;

进行调用

 SELECT   inset_fdw_db('oracle', '192.168.48.1', 1521, 'system', 'system', 'orcl', 'CONN_FDW','public.conn_fdw');
 

进入数据库中查看
此时数据已经落地

postgres=# select *  from CONN_FDW;
 id | name | age | city | salary 
----+------+-----+------+--------
(0 rows)

postgres=#  SELECT   inset_fdw_db('oracle', '192.168.48.1', 1521, 'system', 'system', 'orcl', 'CONN_FDW','public.conn_fdw');
 inset_fdw_db 
--------------
 
(1 row)


postgres=# select *  from CONN_FDW;
 id |  name   | age |     city      | salary 
----+---------+-----+---------------+--------
  1 | John    |  30 | New York      |  50000
  2 | Alice   |  25 | Los Angeles   |  60000
  3 | Bob     |  35 | Chicago       |  70000
  4 | Eva     |  28 | San Francisco |  55000
  5 | Mike    |  32 | Seattle       |  65000
  6 | Sophia  |  29 | Boston        |  75000
  7 | David   |  27 | Denver        |  52000
  8 | Emily   |  31 | Austin        |  68000
  9 | Daniel  |  26 | Phoenix       |  58000
 10 | Olivia  |  33 | Houston       |  72000
 11 | Liam    |  24 | Portland      |  49000
 12 | Ava     |  34 | Atlanta       |  71000
 13 | Logan   |  30 | Miami         |  62000
 14 | Mia     |  28 | Dallas        |  54000
 15 | Jackson |  29 | Minneapolis   |  67000
 16 | Sophie  |  31 | Detroit       |  59000
 17 | William |  27 | Philadelphia  |  70000
 18 | Emma    |  32 | San Diego     |  66000
 19 | James   |  26 | Raleigh       |  63000
 20 | Avery   |  35 | Tampa         |  71000
(20 rows)



总结

该方法不仅可以应用到数据库对数据库之间,也可以应到,数据库对文件路径下。在postgresql嵌入python代码 其实可以替换掉一些中间件的使用。可控性,定制性也会更强。文章来源地址https://www.toymoban.com/news/detail-795467.html

到了这里,关于POSTGRESQL中ETL、fdw的平行替换的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 影响ETL数据传输性能的9大因素及主流ETL应对策略

    现在很多企业在选择 ETL 工具时都特别关注 ETL 的数据传输性能,而有很多开源 ETL 工具都说自已是性能如何如何快,而事实上数据传输性能是不是这些工具说的那样快呢?   数据传输性能受制于哪些因素呢?企业在自身数据库性能受制的情况下一味的想用 ETL 工具来提升性能

    2024年01月23日
    浏览(42)
  • 大数据扫盲(2): 数据分析BI与ETL的紧密关系——ETL是成功BI的先决条件

    着业务的发展每个企业都将产生越来越多的数据,然后这些数据本身并不能直接带来洞察力并产生业务价值。为了释放数据的潜力,数据分析BI(商业智能)成为了现代企业不可或缺的一部分。然而,在数据分析的背后,有一个至关重要且常常被忽视的步骤——ETL(Extract, T

    2024年02月12日
    浏览(40)
  • ETL详解--数据仓库技术

      一、ETL简介 ETL ,是英文Extract-Transform-Load的缩写,用来描述将数据从来源端经过抽取(extract)、转换(transform)、加载(load)至目的端的过程,是数据仓库的生命线。它是一种数据处理过程,用于从不同的数据源中提取数据、对数据进行转换和清洗,并将处理后的数据加

    2024年02月02日
    浏览(40)
  • 数据仓库的ELT/ETL

    ETL 和 ELT 有很多共同点,从本质上讲,每种集成方法都可以将数据从源端抽取到数据仓库中,两者的区别在于数据在哪里进行转换。 ETL – 抽取、转换、加载 从不同的数据源抽取信息,将其转换为根据业务定义的格式,然后将其加载到其他数据库或数据仓库中。另一种 ETL 集

    2024年04月16日
    浏览(45)
  • ETL的数据挖掘方式

    ETL的基本概念 数据抽取(Extraction):从不同源头系统中获取所需数据的步骤。比如从mysql中拿取数据就是一种简单的抽取动作,从API接口拿取数据也是。 数据转换(Transformation):清洗、整合和转化原始数据以适应目标存储或分析系统的阶段。从mysql中拿到数据之后对数据进

    2024年03月12日
    浏览(44)
  • 大数据ETL工具Kettle

    提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 提示:这里可以添加本文要记录的大概内容: 提示:以下是本篇文章正文内容,下面案例可供参考 ETL是将业务系统的数据经过抽取、清洗转换之后加载到数据仓库的过程,目的是将企业中的分散、零乱

    2024年02月10日
    浏览(49)
  • ETL简介:数据集成与应用

    在当今大数据时代,组织和企业需要处理和分析庞大的数据量。ETL(Extract, Transform, Load)是一种重要的数据集成和处理方法,它在数据管理和决策支持中起着关键作用。本文将介绍ETL的基本概念、作用和关键组成部分,以帮助读者了解ETL的重要性和应用领域。 ETL是指数据提取

    2024年02月12日
    浏览(35)
  • ETL数据集成和数据仓库的关键步骤

    在当今数据驱动的世界中,ETL(提取、转换和加载)过程在构建可靠和高效的数据仓库中扮演着关键角色。ETL数据集成和数据仓库的关键步骤对于数据质量和决策支持至关重要。本文将介绍ETL数据集成和数据仓库构建的关键步骤,以帮助读者了解构建一个可靠数据仓库所需的

    2024年02月12日
    浏览(101)
  • 六、数据仓库详细介绍(ETL)方法篇

    上文我们把数据仓库类比我们人类自身,数据仓库“吃”进去的是原材料(原始数据),经过 ETL 集成进入数据仓库,然后从 ODS 开始逐层流转最终供给到数据应用,整个数据流动过程中,在一些关键节点数据会被存储存储下来落入数仓模型。在数仓这个自运转的大生态系统中

    2024年02月16日
    浏览(46)
  • 大数据 ETL 处理工具之 Kettle

    目录 第1章 Kettle概述 1.1 ETL简介 1.2 Kettle简介 1.2.1 Kettle是什么 1.2.2 Kettle的两种设计 1.2.3 Kettle的核心组件 1.2.4 Kettle特点 第2章 Kettle安装部署 2.1 Kettle下载 2.1.1 下载地址 2.1.2  Kettle目录说明 2.1.3  Kettle文件说明 2.2 Kettle安装部署 2.2.1 概述 2.2.2 安装 2.3 Kettle界面简介 2.3.1 首页 2.

    2024年02月11日
    浏览(44)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包