Flink连接Hbase时的kafka报错:java.lang.NoClassDefFoundError: org/apache/kafka/common/utils/ThreadUtils

这篇具有很好参考价值的文章主要介绍了Flink连接Hbase时的kafka报错:java.lang.NoClassDefFoundError: org/apache/kafka/common/utils/ThreadUtils。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

写在前面

书接上文 【Flink实时数仓】需求一:用户属性维表处理-Flink CDC 连接 MySQL 至 Hbase 实验及报错分析http://t.csdn.cn/bk96r
我隔了一天跑Hbase中的数据,发现kafka报错,但是kafka在这个代码段中并没有使用,原因就是我在今天的其他项目中添加的kafka依赖导致了冲突。

错误全文

+--------+
| result |
+--------+
|     OK |
+--------+
1 row in set
[WARN ] 2023-07-23 12:48:34,083(0) --> [main] org.apache.flink.runtime.webmonitor.WebMonitorUtils$LogFileLocation.find(WebMonitorUtils.java:82): Log file environment variable 'log.file' is not set.  
[WARN ] 2023-07-23 12:48:34,088(5) --> [main] org.apache.flink.runtime.webmonitor.WebMonitorUtils$LogFileLocation.find(WebMonitorUtils.java:88): JobManager log files are unavailable in the web dashboard. Log file location not found in environment variable 'log.file' or configuration key 'web.log.path'.  
[WARN ] 2023-07-23 12:48:35,781(1698) --> [Source: TableSourceScan(table=[[default_catalog, default_database, ums_member]], fields=[id, username, phone, status, create_time, gender, birthday, city, job, source_type]) -> NotNullEnforcer(fields=[id]) -> Sink: Collect table sink (1/1)#0] org.apache.flink.runtime.metrics.groups.TaskMetricGroup.getOrAddOperator(TaskMetricGroup.java:154): The operator name Source: TableSourceScan(table=[[default_catalog, default_database, ums_member]], fields=[id, username, phone, status, create_time, gender, birthday, city, job, source_type]) exceeded the 80 characters length limit and was truncated.  
[WARN ] 2023-07-23 12:48:36,481(2398) --> [Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, ums_member]], fields=[id, username, phone, status, create_time, gender, birthday, city, job, source_type]) -> NotNullEnforcer(fields=[id]) -> Sink: Collect table sink (1/1)#0] org.apache.kafka.connect.runtime.WorkerConfig.logPluginPathConfigProviderWarning(WorkerConfig.java:420): Variables cannot be used in the 'plugin.path' property, since the property is used by plugin scanning before the config providers that replace the variables are initialized. The raw value 'null' was used for plugin scanning, as opposed to the transformed value 'null', and this may cause unexpected results.  
[ERROR] 2023-07-23 12:48:36,487(2404) --> [debezium-engine] com.ververica.cdc.debezium.internal.Handover.reportError(Handover.java:147): Reporting error:  
java.lang.NoClassDefFoundError: org/apache/kafka/common/utils/ThreadUtils
	at com.ververica.cdc.debezium.internal.FlinkOffsetBackingStore.start(FlinkOffsetBackingStore.java:152)
	at com.ververica.cdc.debezium.internal.FlinkOffsetBackingStore.configure(FlinkOffsetBackingStore.java:71)
	at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:690)
	at io.debezium.embedded.ConvertingEngineBuilder$2.run(ConvertingEngineBuilder.java:188)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.utils.ThreadUtils
	at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
	... 7 more
[WARN ] 2023-07-23 12:48:36,499(2416) --> [Source: TableSourceScan(table=[[default_catalog, default_database, ums_member]], fields=[id, username, phone, status, create_time, gender, birthday, city, job, source_type]) -> NotNullEnforcer(fields=[id]) -> Sink: Collect table sink (1/1)#0] org.apache.flink.runtime.taskmanager.Task.transitionState(Task.java:1074): Source: TableSourceScan(table=[[default_catalog, default_database, ums_member]], fields=[id, username, phone, status, create_time, gender, birthday, city, job, source_type]) -> NotNullEnforcer(fields=[id]) -> Sink: Collect table sink (1/1)#0 (472d9a4f02e261cfd2f115da78d97e03) switched from RUNNING to FAILED with failure cause: java.lang.NoClassDefFoundError: org/apache/kafka/common/utils/ThreadUtils
	at com.ververica.cdc.debezium.internal.FlinkOffsetBackingStore.start(FlinkOffsetBackingStore.java:152)
	at com.ververica.cdc.debezium.internal.FlinkOffsetBackingStore.configure(FlinkOffsetBackingStore.java:71)
	at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:690)
	at io.debezium.embedded.ConvertingEngineBuilder$2.run(ConvertingEngineBuilder.java:188)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.utils.ThreadUtils
	at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
	... 7 more
  
[WARN ] 2023-07-23 12:48:36,581(2498) --> [main] org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.isJobTerminated(CollectResultFetcher.java:215): Failed to get job status so we assume that the job has terminated. Some data might be lost.  
java.lang.IllegalStateException: MiniCluster is not yet running or has already been shut down.
	at org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)
	at org.apache.flink.runtime.minicluster.MiniCluster.getDispatcherGatewayFuture(MiniCluster.java:852)
	at org.apache.flink.runtime.minicluster.MiniCluster.runDispatcherCommand(MiniCluster.java:752)
	at org.apache.flink.runtime.minicluster.MiniCluster.getJobStatus(MiniCluster.java:705)
	at org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobStatus(MiniClusterJobClient.java:90)
	at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.isJobTerminated(CollectResultFetcher.java:203)
	at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:117)
	at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
	at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
	at org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:370)
	at org.apache.flink.table.utils.PrintUtils.printAsTableauForm(PrintUtils.java:152)
	at org.apache.flink.table.api.internal.TableResultImpl.print(TableResultImpl.java:160)
	at demo.UserInfo2Hbase.main(UserInfo2Hbase.java:93)
[WARN ] 2023-07-23 12:48:36,582(2499) --> [main] org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.isJobTerminated(CollectResultFetcher.java:215): Failed to get job status so we assume that the job has terminated. Some data might be lost.  
java.lang.IllegalStateException: MiniCluster is not yet running or has already been shut down.
	at org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)
	at org.apache.flink.runtime.minicluster.MiniCluster.getDispatcherGatewayFuture(MiniCluster.java:852)
	at org.apache.flink.runtime.minicluster.MiniCluster.runDispatcherCommand(MiniCluster.java:752)
	at org.apache.flink.runtime.minicluster.MiniCluster.getJobStatus(MiniCluster.java:705)
	at org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobStatus(MiniClusterJobClient.java:90)
	at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.isJobTerminated(CollectResultFetcher.java:203)
	at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.cancelJob(CollectResultFetcher.java:225)
	at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.close(CollectResultFetcher.java:150)
	at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:108)
	at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
	at org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:370)
	at org.apache.flink.table.utils.PrintUtils.printAsTableauForm(PrintUtils.java:152)
	at org.apache.flink.table.api.internal.TableResultImpl.print(TableResultImpl.java:160)
	at demo.UserInfo2Hbase.main(UserInfo2Hbase.java:93)
Exception in thread "main" java.lang.RuntimeException: Failed to fetch next result
	at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109)
	at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
	at org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:370)
	at org.apache.flink.table.utils.PrintUtils.printAsTableauForm(PrintUtils.java:152)
	at org.apache.flink.table.api.internal.TableResultImpl.print(TableResultImpl.java:160)
	at demo.UserInfo2Hbase.main(UserInfo2Hbase.java:93)
Caused by: java.io.IOException: Failed to fetch job execution result
	at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:177)
	at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:120)
	at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
	... 5 more
Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
	at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
	at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928)
	at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:175)
	... 7 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
	at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
	at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:137)
	at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
	at java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:628)
	at java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:1996)
	at org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobExecutionResult(MiniClusterJobClient.java:134)
	at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:174)
	... 7 more
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
	at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:216)
	at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:206)
	at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:197)
	at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:682)
	at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)
	at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:435)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
	at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
	at akka.actor.ActorCell.invoke(ActorCell.scala:561)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
	at akka.dispatch.Mailbox.run(Mailbox.scala:225)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
	at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.NoClassDefFoundError: org/apache/kafka/common/utils/ThreadUtils
	at com.ververica.cdc.debezium.internal.FlinkOffsetBackingStore.start(FlinkOffsetBackingStore.java:152)
	at com.ververica.cdc.debezium.internal.FlinkOffsetBackingStore.configure(FlinkOffsetBackingStore.java:71)
	at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:690)
	at io.debezium.embedded.ConvertingEngineBuilder$2.run(ConvertingEngineBuilder.java:188)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.utils.ThreadUtils
	at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
	... 7 more

Process finished with exit code 1

Flink测试代码

   StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tenv = StreamTableEnvironment.create(env);

 tenv.executeSql("CREATE TABLE ums_member (\n" +
                "    id    BIGINT,        \n" +
                "    username  STRING,        \n" +
                "    phone    STRING,        \n" +
                "    status    int,           \n" +
                "    create_time timestamp(3),  \n" +
                "    gender \t\tint,           \n" +
                "    birthday\tdate,          \n" +
                "    city \t\tSTRING,        \n" +
                "    job \t\tSTRING ,       \n" +
                "    source_type INT ,  \n" +
                "    PRIMARY KEY(id) NOT ENFORCED\n" +
                " ) WITH (\n" +
                " 'connector' = 'mysql-cdc',\n" +
                " 'hostname' = 'hadoop10',\n" +
                " 'port' = '3306',\n" +
                " 'username' = 'root',\n" +
                " 'password' = '0000',\n" +
                " 'database-name' = 'db1',\n" +
                //" 'scan.startup.mode' = 'latest-offset',\n" +
                " 'scan.incremental.snapshot.enabled' = 'false',\n" +
                " 'table-name' = 'ums_member')").print();
tenv.executeSql("select * from ums_member").print();

姐姐方案

注释掉kafka依赖,此时我又重新跑,仍然报错。
org.apache.kafka.common.utils.threadutils,报错,flink,hbase,kafka

经过我一顿全网搜索,解决方法五花八门,八仙过海。
我选择了重启idea2020,随后解决。org.apache.kafka.common.utils.threadutils,报错,flink,hbase,kafka
数据成功回到了hbase。org.apache.kafka.common.utils.threadutils,报错,flink,hbase,kafka文章来源地址https://www.toymoban.com/news/detail-762465.html

到了这里,关于Flink连接Hbase时的kafka报错:java.lang.NoClassDefFoundError: org/apache/kafka/common/utils/ThreadUtils的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包