Flink-SQL 写入PostgreSQL 问题汇总

这篇具有很好参考价值的文章主要介绍了Flink-SQL 写入PostgreSQL 问题汇总。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1.主键字段为空问题

  • 错误信息
org.apache.flink.table.api.TableException: Column 'bus_no' is NOT NULL, however, a null value is being written into it. You can set job configuration 'table.exec.sink.not-null-enforcer'='DROP' to suppress this exception and drop such records silently.

Flink-SQL 写入PostgreSQL 问题汇总,flink,flink-sql

  • 问题原因
    	sink 表定义了主键,flink-sql在使用jdbc 插入时,定义的主键中的属性存在空值
    PRIMARY  KEY (col,col2,col3,col4,col5,col6,col7) NOT ENFORCED
    
  • 解决
    确定主键属性是否有空值,若为空则确定是否可作为主键属性(若正确则将其置为默认值);若不符合业务情况,则重新定义主属性集合,确保不出现空值

2.flink-sql jdbc 并发写入Mysql出现死锁

  • 错误信息
 org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.flush(JdbcOutputFormat.java:240)
java.sql.BatchUpdateException: Batch entry 0 INSERT INTO dm_hljy.dm_hljy_sims_day_payment_info(sign_year_type, pay_source, campus_name, school_name, depart_name, pay_time, pay_channel_name, day_fee_amounts) VALUES ('2033', '3', '教育 ', ' 初级中学', ' 初级中学', '2022-02-18 00:00:00+08'::timestamp, '微信', '61310.00'::numeric) ON DUPLICATE KEY UPDATE day_fee_amounts=VALUES(day_fee_amounts) was aborted: ERROR: dn_6003_6004: deadlock detected
  Detail: Process 139972369676032 waits for ShareLock on transaction 1091179704; blocked by process 139976955991808.
Process 139976955991808 waits for ShareLock on transaction 1091179703; blocked by process 139972369676032.
  Hint: See server log for query details.  Call getNextException to see other errors in the batch.
	at com.huawei.gauss200.jdbc.jdbc.BatchResultHandler.handleCompletion(BatchResultHandler.java:171) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]
	at com.huawei.gauss200.jdbc.core.v3.QueryExecutorImpl.executeBatch(QueryExecutorImpl.java:586) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]
	at com.huawei.gauss200.jdbc.jdbc.PgStatement.executeBatch(PgStatement.java:883) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]
	at com.huawei.gauss200.jdbc.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1580) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]
	at org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatementImpl.executeBatch(FieldNamedPreparedStatementImpl.java:65) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.connector.jdbc.internal.executor.TableSimpleStatementExecutor.executeBatch(TableSimpleStatementExecutor.java:64) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.connector.jdbc.internal.executor.TableBufferReducedStatementExecutor.executeBatch(TableBufferReducedStatementExecutor.java:101) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.attemptFlush(JdbcOutputFormat.java:266) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.flush(JdbcOutputFormat.java:236) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.writeRecord(JdbcOutputFormat.java:215) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.invoke(GenericJdbcSinkFunction.java:57) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.table.runtime.operators.sink.SinkOperator.processElement(SinkOperator.java:65) ~[flink-table-runtime-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer.processElement(SinkUpsertMaterializer.java:128) ~[flink-table-runtime-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:524) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:216) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:812) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) [flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) [flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751) [flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:573) [flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at java.lang.Thread.run(Thread.java:750) [?:1.8.0_332]
Caused by: com.huawei.gauss200.jdbc.util.PSQLException: ERROR: dn_6003_6004: deadlock detected
  Detail: Process 139972369676032 waits for ShareLock on transaction 1091179704; blocked by process 139976955991808.
Process 139976955991808 waits for ShareLock on transaction 1091179703; blocked by process 139972369676032.
  Hint: See server log for query details.
	at com.huawei.gauss200.jdbc.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2856) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]
	at com.huawei.gauss200.jdbc.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2587) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]
	at com.huawei.gauss200.jdbc.core.v3.QueryExecutorImpl.executeBatch(QueryExecutorImpl.java:575) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]
	... 30 more

Flink-SQL 写入PostgreSQL 问题汇总,flink,flink-sql

  • 错误原因
flink-sql在写入sql时定义了主键,flink-sql进行upser 操作(主键记录不存在则进行insert,存在则进行update)
flink-sql 插入时,会根据主键对sink表进行查询,若并发度大于1,则可能存在两个及以上线程查询同一条记录,出现死锁情况
  • 解决
 设置flink-sql jdbc 并发度为1,(flink jdbc 并发度跟随flink 作业并发度;也可以单独设置)
设置flink-sql 并发度 : parallelism.default: 1
也可以单独设置jdbc 并发度: (不跟随flink作业并发度) 
WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:gaussdb://dws-hl-datalake.dws.myhuaweiclouds.com:8000/hl_datamart',
  'table-name' = 'dm_hljy.dm_hljy_sims_security_send_back_student_info',
   'connection.max-retry-timeout' = '600s',
  'sink.max-retries' = '6',
   'sink.parallelism'= '1',
   'username' = '用户名',
   'password' = ‘密码'
 );

postgreSQL 的schema未指定导致数据插入失败

Flink-SQL 写入PostgreSQL 问题汇总,flink,flink-sql文章来源地址https://www.toymoban.com/news/detail-756351.html

  • 原因:postgreSQL包含database,schema,table三级结构,在生产环境未指定特定schema(dm_hljy),导致插入数据到public,出现错误,官方文档如下
    Flink-SQL 写入PostgreSQL 问题汇总,flink,flink-sql
  • 解决:设置flink-sql的parallelism 并行度为1,或修改flink-sql sink.parallelism = 1

到了这里,关于Flink-SQL 写入PostgreSQL 问题汇总的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • flink-sql对kafka数据进行清洗过滤

    今天这篇blog主要记录使用flink-sql对kafka中的数据进行过滤。 以前对kafka数据进行实时处理时都是使用java来进行flink开发,需要创建一个工程,并且打成jar包再提交,流程固定但对于简单任务来说还是比较繁琐的。 今天我们要对logstash采集到kafka中的数据进行过滤筛选,将筛选

    2024年02月16日
    浏览(30)
  • 11 flink-sql 中基于 mysql-cdc 连接 mysql-pxc 集群无法获取增量数据问题

    问题是来自于 群友, 2024.03.29, 也是花了一些时间 来排查这个问题  大致的问题是用 mysql-cdc 连接了一个 mysql-pxc 集群, 然后创建了一个 test_user 表  使用 \\\"select * from test_user\\\" 获取数据表的数据, 可以拿到 查询时的快照, 但是 无法获取到后续对于 test_user 表的增量操作的数据, 比如

    2024年04月15日
    浏览(42)
  • Flink-SQL join 优化 -- MiniBatch + local-global

    问题1. 近期在开发flink-sql期间,发现数据在启动后,任务总是进行重试,运行一段时间后,container heartbeat timeout,内存溢出(GC overhead limit exceede) ,作业无法进行正常工作 问题2. 未出现container心跳超时的,作业运行缓慢,超过一天 ,作业仍存在反压情况 查看日志内容发现,出

    2024年02月06日
    浏览(34)
  • 问题:Spark SQL 读不到 Flink 写入 Hudi 表的新数据,打开新 Session 才可见

    博主历时三年精心创作的《大数据平台架构与原型实现:数据中台建设实战》一书现已由知名IT图书品牌电子工业出版社博文视点出版发行,点击《重磅推荐:建大数据平台太难了!给我发个工程原型吧!》了解图书详情,京东购书链接:https://item.jd.com/12677623.html,扫描左侧

    2024年02月22日
    浏览(43)
  • Flink CDC和Flink SQL构建实时数仓Flink写入Doris

    软件环境 Flink1.13.3 Scala 2.12 doris 0.14 一、MySQL 开启binlog日志、创建用户 1.开启bin log MySQL 8.0默认开启了binlog,可以通过代码show variables like \\\"%log_bin%\\\";查询是否开启了,show variables like \\\"%server_id%\\\";查询服务器ID。 上图分别显示了bin long是否开启以及bin log所在的位置。 2.创建用户 C

    2024年02月02日
    浏览(63)
  • Apache Doris 系列: 基础篇-Flink SQL写入Doris

    本文介绍 Flink SQL如何流式写入 Apache Doris,分为一下几个部分: Flink Doris connector Doris FE 节点配置 Flink SQL 写 Doris Flink Doris connector 本质是通过Stream Load来时实现数据的查询和写入功能。 支持二阶段提交,可实现Exatly Once的写入。 1)需在 apache-doris/fe/fe.conf 配置文件添加如下配置

    2023年04月08日
    浏览(35)
  • FLINK CDC postgresql (Stream与SQL)

    Postgres CDC Connector — CDC Connectors for Apache Flink® documentation flink cdc捕获postgresql数据 1)更改配置文件 需要更改 # 更改wal日志方式为logical # 更改solts最大数量(默认值为10),flink-cdc默认一张表占用一个 # 更改wal发送最大进程数(默认值为10),这个值和上面的solts设置一样 # 中断

    2023年04月27日
    浏览(31)
  • Flink SQL和Table API实现消费kafka写入mysql

    1、构建 table环境 2、构建source kafka 方式一:API 方式二:Flink SQL 3、构建sink mysql  4、写入将source表写入sink表 方式一:API 方式二:Flink SQL 5、手动执行 6、测试 (1)连接kafka生产者 (2)造数据 (3)mysql查看入库情况

    2024年01月16日
    浏览(31)
  • 基于Flink CDC实时同步PostgreSQL与Tidb【Flink SQL Client模式下亲测可行,详细教程】

    操作系统:ubuntu-22.04,运行于wsl 2【 注意,请务必使用wsl 2 ;wsl 1会出现各种各样的问题】 软件版本:PostgreSQL 14.9,TiDB v7.3.0,flink 1.7.1,flink cdc 2.4.0 已有postgre的跳过此步 (1)pg安装 https://zhuanlan.zhihu.com/p/143156636 (2)pg配置 可能出现的问题 sudo -u postgres psql 报错: psql: err

    2024年02月11日
    浏览(27)
  • Flink流批一体计算(15):PyFlink Tabel API之SQL写入Sink

    目录 举个例子 写入Sink的各种情况 1. 将结果数据收集到客户端 2. 将结果数据转换为Pandas DataFrame,并收集到客户端 3. 将结果写入到一张 Sink 表中 4. 将结果写入多张 Sink 表中 举个例子 将计算结果写入给 sink 表 写入Sink的各种情况 1. 将结果数据收集到客户端 你可以使用 TableR

    2024年02月11日
    浏览(34)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包