[flink1.14.4]Unable to create a source for reading table ‘default_catalog.default_database.new_buyer

这篇具有很好参考价值的文章主要介绍了[flink1.14.4]Unable to create a source for reading table ‘default_catalog.default_database.new_buyer。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

升级flink1.14.4报错 

Caused by: org.apache.flink.table.api.ValidationException: Unable to create a source for reading table 'default_catalog.default_database.new_buyer_trade_order2'  

CAUSED BY: 2022-03-11 16:45:04,169 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                [] - Dynamic Property set: metrics.reporter.influxdb.class=org.apache.flink.metrics.influxdb.InfluxdbReporter
2022-03-11 16:45:04,169 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                [] - Dynamic Property set: metrics.reporter.influxdb.class=org.apache.flink.metrics.influxdb.InfluxdbReporter
2022-03-11 16:45:04,170 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                [] - Dynamic Property set: state.backend.rocksdb.ttl.compaction.filter.enabled=true
2022-03-11 16:45:04,170 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                [] - Dynamic Property set: state.backend.rocksdb.ttl.compaction.filter.enabled=true
2022-03-11 16:45:04,171 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                [] - Dynamic Property set: metrics.reporter.influxdb.db=flink
2022-03-11 16:45:04,171 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                [] - Dynamic Property set: metrics.reporter.influxdb.db=flink
2022-03-11 16:45:04,171 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                [] - Dynamic Property set: metrics.reporter.influxdb.host=192.168.5.57
2022-03-11 16:45:04,171 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                [] - Dynamic Property set: metrics.reporter.influxdb.host=192.168.5.57
2022-03-11 16:45:04,171 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                [] - Dynamic Property set: web.timeout=120000
2022-03-11 16:45:04,171 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                [] - Dynamic Property set: web.timeout=120000
2022-03-11 16:45:04,171 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                [] - Dynamic Property set: akka.ask.timeout=120 s
2022-03-11 16:45:04,171 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                [] - Dynamic Property set: akka.ask.timeout=120 s
2022-03-11 16:45:04,171 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                [] - Dynamic Property set: env.java.opts=-verbose:gc -XX:NewRatio=3 -XX:+UseConcMarkSweepGC -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:ParallelGCThreads=4 -Duser.timezone=Asia/Shanghai
2022-03-11 16:45:04,171 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                [] - Dynamic Property set: env.java.opts=-verbose:gc -XX:NewRatio=3 -XX:+UseConcMarkSweepGC -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:ParallelGCThreads=4 -Duser.timezone=Asia/Shanghai
2022-03-11 16:45:04,171 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                [] - Dynamic Property set: metrics.reporter.influxdb.port=8086
2022-03-11 16:45:04,171 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                [] - Dynamic Property set: metrics.reporter.influxdb.port=8086
2022-03-11 16:45:04,171 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                [] - Dynamic Property set: akka.watch.heartbeat.interval=10 s
2022-03-11 16:45:04,171 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                [] - Dynamic Property set: akka.watch.heartbeat.interval=10 s
2022-03-11 16:45:04,172 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                [] - Dynamic Property set: slotmanager.taskmanager-timeout=600000
2022-03-11 16:45:04,172 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                [] - Dynamic Property set: slotmanager.taskmanager-timeout=600000
2022-03-11 16:45:04,172 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                [] - Dynamic Property set: containerized.heap-cutoff-min=100
2022-03-11 16:45:04,172 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                [] - Dynamic Property set: containerized.heap-cutoff-min=100
建表语句解析成功!connector:upsert-kafka
设置kafka时间撮:upsert-kafka
建表语句解析成功!connector:print
hive conf path:/etc/ecm/hive-conf
hive conf path:/etc/ecm/hive-conf
create iceberg catalog success!
start to run sql:CREATE TABLE new_buyer_trade_order2 (
  database VARCHAR,  `table`  VARCHAR,  type  VARCHAR,  ts   BIGINT,  xid   BIGINT,  xoffset  BIGINT,  data  VARCHAR,    `old`   VARCHAR      
) WITH (
'value.format' = 'json',
'key.format' = 'json',
'properties.bootstrap.servers' = '192.168.8.142:9092,192.168.8.141:9092,192.168.8.143:9092',
'connector' = 'upsert-kafka',
'topic' = 'new_buyer_trade_order2')
start to run sql:create table result_print(   database VARCHAR,  `table`  VARCHAR,  type  VARCHAR,  ts   BIGINT,  xid   BIGINT,  xoffset  BIGINT,  data  VARCHAR,    `old`   VARCHAR )with(     'connector' = 'print' )
start to run sql:insert into result_print select      database,  `table`,  type,  ts,  xid,  xoffset,  database,    `old` from new_buyer_trade_order2

------------------------------------------------------------
 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Unable to create a source for reading table 'default_catalog.default_database.new_buyer_trade_order2'.

Table options are:

'connector'='upsert-kafka'
'key.format'='json'
'properties.bootstrap.servers'='192.168.8.142:9092,192.168.8.141:9092,192.168.8.143:9092'
'topic'='new_buyer_trade_order2'
'value.format'='json'
	at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
	at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
	at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
	at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
	at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
	at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
	at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
	at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
	at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
Caused by: org.apache.flink.table.api.ValidationException: Unable to create a source for reading table 'default_catalog.default_database.new_buyer_trade_order2'.

Table options are:

'connector'='upsert-kafka'
'key.format'='json'
'properties.bootstrap.servers'='192.168.8.142:9092,192.168.8.141:9092,192.168.8.143:9092'
'topic'='new_buyer_trade_order2'
'value.format'='json'
	at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:150)
	at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.createDynamicTableSource(CatalogSourceTable.java:116)
	at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:82)
	at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585)
	at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2507)
	at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2144)
	at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093)
	at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
	at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663)
	at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644)
	at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438)
	at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570)
	at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:177)
	at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:169)
	at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:1057)
	at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:1026)
	at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:301)
	at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:639)
	at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:290)
	at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:834)
	at com.gegejia.flink.FlinkJobBoot.main(FlinkJobBoot.java:95)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
	... 11 more
Caused by: org.apache.flink.table.api.ValidationException: 'upsert-kafka' tables require to define a PRIMARY KEY constraint. The PRIMARY KEY specifies which columns should be read from or write to the Kafka message key. The PRIMARY KEY also defines records in the 'upsert-kafka' table should update or delete on which keys.
	at org.apache.flink.streaming.connectors.kafka.table.UpsertKafkaDynamicTableFactory.validatePKConstraints(UpsertKafkaDynamicTableFactory.java:261)
	at org.apache.flink.streaming.connectors.kafka.table.UpsertKafkaDynamicTableFactory.validateSource(UpsertKafkaDynamicTableFactory.java:219)
	at org.apache.flink.streaming.connectors.kafka.table.UpsertKafkaDynamicTableFactory.createDynamicTableSource(UpsertKafkaDynamicTableFactory.java:119)
	at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:147)
	... 37 more

 source表未加主键导致,注释放开,提交成功

[flink1.14.4]Unable to create a source for reading table ‘default_catalog.default_database.new_buyer,flink,大数据,问题集锦,大数据,flink文章来源地址https://www.toymoban.com/news/detail-610572.html

到了这里,关于[flink1.14.4]Unable to create a source for reading table ‘default_catalog.default_database.new_buyer的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • flink1.14.5使用CDH6.3.2的yarn提交作业

    使用CDH6.3.2安装了hadoop集群,但是CDH不支持flink的安装,网上有CDH集成flink的文章,大都比较麻烦;但其实我们只需要把flink的作业提交到yarn集群即可,接下来以CDH yarn为基础,flink on yarn模式的配置步骤。 一、部署flink 1、下载解压 官方下载地址:Downloads | Apache Flink 注意:CD

    2024年01月16日
    浏览(45)
  • Flink1.14提交任务报错classloader.check-leaked-classloader问题解决

    我的hadoop版本是3.1.3,Flink版本是1.14。不知道是hadoop版本的原因还是Flink版本更新的原因。当我运行一个简单的Flink测试时,虽然结果出来了但是后面还跟着一段报错信息。 测试命令: flink run -m yarn-cluster -p 2 -yjm 2G -ytm 2G $FLINK_HOME/examples/batch/WordCount.jar 报错信息: Trying to acce

    2024年02月11日
    浏览(40)
  • docker出现Error response from daemon: error while creating mount source path...read-only file system..

    图示 网上查找很多,感觉是docker文件或系统文件损坏,已经尝试很多方式,重启docekr无解并无法重启和进入容器,最终选择卸载重装。 出现docker无法卸载,docker --version仍然有版本信息 因为安装docker同时使用了snap源和apt 源 使用 https://www.python100.com/html/5E074TD2ZY4R.html 方式卸载

    2024年02月07日
    浏览(40)
  • docker Error response from daemon error while creating mount source path mkdir data read-only file

    可能原因: docker是由snap安装的, 这种情况下,docker只在用户目录下拥有读写权限. 解决方法: 创建地址映射的时候将目录创建在用户目录下,如root用户: 之前的错误命令: 更改之后的命令: 自己对比一下吧。

    2024年01月24日
    浏览(66)
  • Unable to add a source with url `` named `-1`.

    今天在发布私有库的时候,执行 pod repo push name name.podspec --allow-warnings --verbose .的时候遇到下面这个错误 Unable to add a source with url `` named -1 . (/usr/bin/git clone – -1 fatal: repository ‘’ does not exist ) You can try adding it manually in /Users/****/.cocoapods/repos or via pod repo add . 解决方法: pod rep

    2024年02月15日
    浏览(40)
  • Flink1.14新版KafkaSource和KafkaSink实践使用(自定义反序列化器、Topic选择器、序列化器、分区器)

    在官方文档的描述中,API FlinkKafkaConsumer和FlinkKafkaProducer将在后续版本陆续弃用、移除,所以在未来生产中有版本升级的情况下,新API KafkaSource和KafkaSink还是有必要学会使用的。下面介绍下基于新API的一些自定义类以及主程序的简单实践。 官方文档地址: https://nightlies.apache.o

    2024年01月21日
    浏览(53)
  • 【containerd错误解决系列】failed to create shim task, OCI runtime create failed, unable to retrieve OCI...

    pod的状态全部都是ContainerCreating的状态 containerd进程有大量报错,主要有: failed to create containerd task: failed to create shim task: OCI runtime create failed: unable to retrieve OCI runtime error (open /run/containerd/io.containerd.runtime.v2.task/k8s.io/c4847070fad34a8da9b16b5c20cdc38e28a15cfcf9913d712e4fe60d8c9029f7/log.json: no

    2023年04月25日
    浏览(47)
  • docker: Error response from daemon: failed to create shim task: OCI runtime create failed: unable to

    1.先下载runc源码: https://github.com/opencontainers/runc/releases/tag/v1.0.3 2.我的是centos8   运行以下代码 3.安装go环境  wget https://studygolang.com/dl/golang/go1.16.linux-amd64.tar.gz  tar -C /usr/local -xzf go1.16.linux-amd64.tar.gz 4.添加配置: 进去到 vi /etc/profile 5.检测配置成功 go env 6.将下载好的runc解压  

    2024年02月06日
    浏览(58)
  • Git报错:fatal: Unable to create ‘.../.git/index.lock‘

      今天提交一份很早之前写的代码的时候,遇到git报错,报错如下:   git在执行耗时操作的时候为了避免对同一个目录进行多个操作的冲突 ,会自动生成一个index.lock文件。作为锁文件。当操作结束,git会自动删除该文件。 当git在运行过程中,用户强制关闭了git,导致

    2024年02月16日
    浏览(60)
  • error: unable to read askpass response from 解决办法

    出现这个报错,我认为原因与你的码云账号有关,因为我在网上大量搜过这个问题,最后 检查了一番原来是gitee账号登录过期,于是重新进行登录,可是登录成功还是提示错误,最后网上找了好久的方法终于找到解决办法,特此记录一下。 直接修改项目目录下面的.git文件夹

    2024年02月10日
    浏览(47)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包