spark报错:Cannot overwrite a path that is also being read from. Cannot overwrite a path that is also being read from.

这篇具有很好参考价值的文章主要介绍了spark报错:Cannot overwrite a path that is also being read from. Cannot overwrite a path that is also being read from.。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Cannot overwrite a path that is also being read from.

这个错看起来很简单。代码简化为

Dataset<Row> selectBefore = session.sql("select * from table1") //表里原先的数据

Dataset<Row> dataset = session.createDataset(list,xx.class)//新增加的数据csv txt kafka

大概就是获取表里的原始数据,然后从别的地方搞来的新数据两个合起来继续存到表里去

selectBefore 
.union(dataset) --两个数据union融合
.write()
.mode(SaveMode.Overwrite) --重新写到hive
.format("hive")
.insertInto(table1);

为啥不用append 因为有时候会重复调用。。 反正就是这么个情况。就是要先查再插入。

为什么报错?

Spark SQL在雪球的实践 - 腾讯云开发者社区-腾讯云

spark报错:Cannot overwrite a path that is also being read from.
                    
            
Cannot overwrite a path that is also being read from.

 解决办法

spark报错:Cannot overwrite a path that is also being read from.
                    
            
Cannot overwrite a path that is also being read from.

第一个解决办法真不行。我查了下这两个参数大多是解决spark读取hive表数据量不对的情况用的。而且我设置之后还有报错就不贴出来了

spark sql读取不到orc格式hive表数据问题_Java小田的博客-CSDN博客_spark读取不到hive表

第二个 确实可以。

session.sparkContext().setCheckpointDir("/tmp/spark/job/OrderOnlineSparkJob");

Dataset<Row> selectBefore = session.sql("select * from table1").checkpoint();

spark报错:Cannot overwrite a path that is also being read from.
                    
            
Cannot overwrite a path that is also being read from.

 第三个这种lowb方法就不说了

spark报错:Cannot overwrite a path that is also being read from.
                    
            
Cannot overwrite a path that is also being read from.

找个时间好好学习下spark的checkpoint知识。 

备注下:这种方式有个弊端,checkpoint 在hdfs的目录不是一定会删除的,经过百度,说到GC的时候才会删除,还有什么弱引用。给两个解决办法

1..config("spark.cleaner.referenceTracking.cleanCheckpoints", "true") 这个就是checkpoint的清理线程去清理,但是不一定完全有用,建议大家可以试下System.gc(); 手动触发下。

2.搞个脚本跑完spark后 hdfs dfs -rm -r /checkpoint/*

___________________________________________________________________________

2022-11-15更新。为什么checkpoint一下就可以了?

我们稍微百度下,spark.checkpoint的作用?

spark checkpoint详解 - 超级核弹头 - 博客园

简单来说,

1.截断血缘关系,避免rdd从头开始继续计算,解决链路过长的问题

2.上面说的云里雾里,那就看这里,就是把数据存到了hdfs的目录。可以从磁盘文件去恢复数据

那为啥没有checkpoint就不能插入读取的表?

之前看到的一篇文章说了,spark的逻辑是没有临时目录,hive是有临时目录的。

所以这里就很清楚了

没有checkpoint= 读自己目录,写自己目录  报错

有了checkpoint=读hdfs的checkpoint dir,写自己目录,所以是可以的

——————————————————————————————————————————

稍微分析下源码

spark报错:Cannot overwrite a path that is also being read from.
                    
            
Cannot overwrite a path that is also being read from.

 上图中internalRdd是我们查出来的dataset然后toRdd 在copy,简单来说就是复制了一份,

reliableCheckpoint默认是true。 这个一直是true,应该就是你的checkpoint是否可靠,比如存到hdfs就可靠,你存到磁盘肯定不可靠。。。

接着看 

internalRdd.checkpoint()和 internalRdd.count()

spark报错:Cannot overwrite a path that is also being read from.
                    
            
Cannot overwrite a path that is also being read from.

 上图看你的checkpointDir 设置没有,没有就报错

最后new ReliableRDDCheckpointData(this)->new ReliableRDDCheckpointData(internalRdd)

我们再看ReliableRDDCheckpointData 类

spark报错:Cannot overwrite a path that is also being read from.
                    
            
Cannot overwrite a path that is also being read from.

 这里只是new了这个对象。并没有存储rdd

实际是在后面的action后调用该doCheckpoint方法

Materialize this RDD and write its content to a reliable DFS. This is called immediately after the first action invoked on this RDD has completed.

将rdd物化也就是存储,将它的内容写到可靠的dfs上,这个在第一个action执行后会被立马调用!

 val newRDD = ReliableCheckpointRDD.writeRDDToCheckpointDirectory(rdd, cpDir)

spark报错:Cannot overwrite a path that is also being read from.
                    
            
Cannot overwrite a path that is also being read from.

 这里可以看到说明是啥,不是把原来的rdd返回了,是返回了一个新的rdd!!!!所以我们这个时候使用的rdd不是我们spark读取的rdd了。

这个时候我们再看下我之前提到的问题,spark.cleaner.referenceTracking.cleanCheckpoints这个参数有什么用?

spark报错:Cannot overwrite a path that is also being read from.
                    
            
Cannot overwrite a path that is also being read from.

这里设置了true,看代码不就是把rdd的cleaner遍历,然后去清理checkpoint目录吗?为什么有时候不起作用?(我跑了十次大概清理了1-2次,最后hdfs checkpoint目录大概有1G的大小,如果数据特别大,这个空间不就浪费了?)

继续研究下,这个cleaner是啥?

spark报错:Cannot overwrite a path that is also being read from.
                    
            
Cannot overwrite a path that is also being read from.

spark报错:Cannot overwrite a path that is also being read from.
                    
            
Cannot overwrite a path that is also being read from.  可以看到 spark.cleaner.referenceTracking 这个设置为true了会new一个 默认为true。然后foreach start。

 spark报错:Cannot overwrite a path that is also being read from.
                    
            
Cannot overwrite a path that is also being read from.

 这个代码就有意思了。。。。

cleaningThread.setDaemon(true)//设置为守护线程,主线程没了它也没了
cleaningThread.setName("Spark Context Cleaner")//加个名字不重要。
cleaningThread.start() //start 方法 肯定要看
periodicGCService.scheduleAtFixedRate(new Runnable { 
  override def run(): Unit = System.gc()
}, periodicGCInterval, periodicGCInterval, TimeUnit.SECONDS)

//这个搞了一个定时调度的一个东西,仔细看这个是啥 

periodicGCInterval=30min periodicGCInterval=30min

意思是啥?就是每隔30min来次System.GC来清除弱引用。这就是为什么有时候checkpoint会自己删 有时候不删了。

这段代码主要两个是

1.开启线程清理

2.开启定时GC

cleaningThread是什么呢?

private val cleaningThread = new Thread() { override def run() { keepCleaning() }}

cleaningThread.start()就是keepCleaning对吧。

//开启清理线程,清理没有引用的checkpoint boradcast rdd
//怎么清理的?referenceQueue这个引用队列里获取的。
/** Keep cleaning RDD, shuffle, and broadcast state. */
private def keepCleaning(): Unit = Utils.tryOrStopSparkContext(sc) {
  while (!stopped) {//差不多就是while(true)了
    try {
      val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))
        .map(_.asInstanceOf[CleanupTaskWeakReference])//原来是这里!!!
      // Synchronize here to avoid being interrupted on stop()
      synchronized {
        reference.foreach { ref =>
          logDebug("Got cleaning task " + ref.task)
          referenceBuffer.remove(ref)
          ref.task match {
            case CleanRDD(rddId) =>
              doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
            case CleanShuffle(shuffleId) =>
              doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks)
            case CleanBroadcast(broadcastId) =>
              doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
            case CleanAccum(accId) =>
              doCleanupAccum(accId, blocking = blockOnCleanupTasks)
            case CleanCheckpoint(rddId) => //!!!!清理checkpoint目录
              doCleanCheckpoint(rddId)
          }
        }
      }
    } catch {
      case ie: InterruptedException if stopped => // ignore
      case e: Exception => logError("Error in cleaning thread", e)
    }
  }
}

 ——————————————————————————————————

/**
 * Clean up checkpoint files written to a reliable storage.
 * Locally checkpointed files are cleaned up separately through RDD cleanups.
 */
def doCleanCheckpoint(rddId: Int): Unit = {
  try {
    logDebug("Cleaning rdd checkpoint data " + rddId)
    ReliableRDDCheckpointData.cleanCheckpoint(sc, rddId)
    listeners.asScala.foreach(_.checkpointCleaned(rddId))
    logInfo("Cleaned rdd checkpoint data " + rddId)
  }
  catch {
    case e: Exception => logError("Error cleaning rdd checkpoint data " + rddId, e)
  }
}

———————————————清理hdfs目录———————————————

/** Clean up the files associated with the checkpoint data for this RDD. */
def cleanCheckpoint(sc: SparkContext, rddId: Int): Unit = {
  checkpointPath(sc, rddId).foreach { path =>
    path.getFileSystem(sc.hadoopConfiguration).delete(path, true)
  }
}

看着好像没问题为啥没有清理呢? 能力有限查不出来。。。但是可以借鉴别人的,别人比我写的好多了。

Spark ContextCleaner及checkpoint的clean机制分析 - 知乎

———————————————继续学习弱引用————————————————————

不清楚弱引用的可以看这个文章

Java中弱引用的概念和作用是什么 - 编程语言 - 亿速云

之前的文章分析到 checkpoint目录没有被删除是因为弱引用的问题。弱引用假装已经很熟练了。那么继续源码分析。

ContextCleaner类

//搞了一个引用队列,存放引用对象

private val referenceQueue = new ReferenceQueue[AnyRef]
/** Register an RDD for cleanup when it is garbage collected. */
def registerRDDForCleanup(rdd: RDD[_]): Unit = {
  registerForCleanup(rdd, CleanRDD(rdd.id))
}

def registerAccumulatorForCleanup(a: AccumulatorV2[_, _]): Unit = {
  registerForCleanup(a, CleanAccum(a.id))
}

/** Register a ShuffleDependency for cleanup when it is garbage collected. */
def registerShuffleForCleanup(shuffleDependency: ShuffleDependency[_, _, _]): Unit = {
  registerForCleanup(shuffleDependency, CleanShuffle(shuffleDependency.shuffleId))
}

/** Register a Broadcast for cleanup when it is garbage collected. */
def registerBroadcastForCleanup[T](broadcast: Broadcast[T]): Unit = {
  registerForCleanup(broadcast, CleanBroadcast(broadcast.id))
}

/** Register a RDDCheckpointData for cleanup when it is garbage collected. */
def registerRDDCheckpointDataForCleanup[T](rdd: RDD[_], parentId: Int): Unit = {
  registerForCleanup(rdd, CleanCheckpoint(parentId))
}

/** Register an object for cleanup. */
private def registerForCleanup(objectForCleanup: AnyRef, task: CleanupTask): Unit = {
  referenceBuffer.add(new CleanupTaskWeakReference(task, objectForCleanup, referenceQueue))
}

这里多复制点。可以看到acc rdd RDDCheckpointData 等都是调用的registerForCleanup这个方法。最后都是变成了 new CleanupTaskWeakReference(task,acc/rdd/checkpoint,引用队列)

CleanupTaskWeakReference

/**
 * A WeakReference associated with a CleanupTask.
 *
 * When the referent object becomes only weakly reachable, the corresponding
 * CleanupTaskWeakReference is automatically added to the given reference queue.
 */
private class CleanupTaskWeakReference(
    val task: CleanupTask,
    referent: AnyRef,
    referenceQueue: ReferenceQueue[AnyRef])
  extends WeakReference(referent, referenceQueue)

 至此我们分析下checkpoint的流程。

rdd.checkpoint()->

new ReliableRDDCheckpointData(this)

ReliableRDDCheckpointData.checkpointPath(rdd.context, rdd.id)//确定checkpoint路径

spark.cleaner.referenceTracking.cleanCheckpoints=TURE会将当前rddCheckpoint放到引用队列里,

最开始我们spark代码里new SparkContext的时候

会根据spark.cleaner.referenceTracking=true去 new ContextCleaner()然后调用这个的start方法

而这个ContextCleaner的start方法开启了两个线程

一个不停的看队列里是否有需要清除弱引用对象

一个定时去GC

经过我测试 弱引用对象什么时候会被清理?

参考文章

referenceQueue用法_gmHappy的博客-CSDN博客_referencequeue

    private void test2() throws InterruptedException {
        ReferenceQueue<Object> referenceQueue = new ReferenceQueue<>();
        Object value = new Object();
        Map<Object, Object> map = new HashMap<>();
        for (int i = 0; i < 10; i++) {
            WeakReference<String> weakReference = new WeakReference<String>(String.valueOf(i), referenceQueue);
            System.out.println("创造了:"+weakReference+",value="+weakReference.get());
            map.put(weakReference, value);
        }
        System.gc();
        Thread.sleep(100);
        WeakReference<String> result=null;
        int cnt = 0;
        while ((result=(WeakReference)referenceQueue.poll())!=null){
            System.out.println((cnt++) + "回收了:" + result+",value="+result.get());
        }
    }
-----------
创造了:java.lang.ref.WeakReference@383534aa,value=0
创造了:java.lang.ref.WeakReference@6bc168e5,value=1
创造了:java.lang.ref.WeakReference@7b3300e5,value=2
创造了:java.lang.ref.WeakReference@2e5c649,value=3
创造了:java.lang.ref.WeakReference@136432db,value=4
0回收了:java.lang.ref.WeakReference@6bc168e5,value=null
1回收了:java.lang.ref.WeakReference@2e5c649,value=null
2回收了:java.lang.ref.WeakReference@7b3300e5,value=null
3回收了:java.lang.ref.WeakReference@136432db,value=null
4回收了:java.lang.ref.WeakReference@383534aa,value=null
    private void testBigObjectWithoutGC() throws InterruptedException {
        int _1M = 1024 * 1024;

        ReferenceQueue<Object> referenceQueue = new ReferenceQueue<>();
        Thread thread = new Thread(() -> {
            try {
                int cnt = 0;
                WeakReference<byte[]> k;
                while ((k = (WeakReference) referenceQueue.remove()) != null) {
                    System.out.println((cnt++) + "回收了:" + k);
                }
            } catch (InterruptedException e) {
                // 结束循环
            }
        });
        thread.setDaemon(true);
        thread.start();

        Object value = new Object();
        Map<Object, Object> map = new HashMap<>();
        for (int i = 0; i < 10000; i++) {
            byte[] bytes = new byte[_1M];
            WeakReference<byte[]> weakReference = new WeakReference<byte[]>(bytes, referenceQueue);
            map.put(weakReference, value);
        }
        System.out.println("map.size->" + map.size());
    }
--------
8342回收了:java.lang.ref.WeakReference@67a3bd51
8343回收了:java.lang.ref.WeakReference@2cec704c
8344回收了:java.lang.ref.WeakReference@bd1111a
8345回收了:java.lang.ref.WeakReference@5918c260
8346回收了:java.lang.ref.WeakReference@7fc7c4a
8347回收了:java.lang.ref.WeakReference@9d3c67
注意这里没有了。没有回收10000个对象 只回收了8000多个。。有时候9000多。

经过测试发现

需要使用

System.gc(); //触发GC 但是GC没有真正开始

Thread.sleep(1000);//给个时间让GC去执行。

或者

你队列里放的对象都很大,并且数量也不少 这个时候会主动触发GC

这样弱引用对象就会被清除了。。。所以我们之前如何清理checkpoint目录呢?

spark.cleaner.referenceTracking.cleanCheckpoints true

System.gc();+ Thread.sleep(1000); 也不会特别影响spark的任务。我就懒得测试了。。。各位成功了留个言让我知道下。文章来源地址https://www.toymoban.com/news/detail-400267.html

到了这里,关于spark报错:Cannot overwrite a path that is also being read from. Cannot overwrite a path that is also being read from.的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • git拉取远程分支到本地报错fatal: ‘origin/XXX‘ is not a commit and a branch ‘XXX‘ cannot be created from it

    远程已有分支,本地需要新建对应分支,报下面错误 原因: 远程真的没有这个分支,所以失败 远程有这个分支,但是本地认为远程没有这个分支 执行 git branch -r 命令,查看本地缓存的所有远程分支 输出显示远程并没有要拉取的分支,但是实际上远程仓库是有该分支的,从

    2024年02月15日
    浏览(46)
  • Caused by: java.io.FileNotFoundException: class path resource [datasourc.properties] cannot be opene

    异常: Caused by: java.io.FileNotFoundException: class path resource [文件名] cannot be opened because it does not exist 原因: 资源无法打开,因为它不存在 在Maven项目里面资源 默认生成的类路径是src/main/java ,如果你所需要的文件不在这个目录下,项目编译器输出里就不会有该文件(即便你的项目

    2024年02月07日
    浏览(41)
  • QObject: Cannot create children for a parent that is in a different thread

    在Qt的官方文档,大家知道有两种方式使用 QThread。 You can use worker objects by moving them to the thread using QObject::moveToThread(). Another way to make code run in a separate thread, is to subclass QThread and reimplement run(). 在使用MoveToThread这种方式时,经常会遇到下面类似的问题: QObject: Cannot create child

    2024年02月06日
    浏览(50)
  • 解决 Cannot read properties of null (reading ‘disabled‘)报错

    在Vue + elementUI 后台项目里遇到了一个问题,所有模块的的下拉Select 和时间选择器DataPicker (可能还有其他组件) ,在选择后点击页面其他地方都不会自己收起。打开控制台会发现报错了,每点击一次,错误+1. 代码里全局搜索 el-dropdown ,el-dropdown下缺少 el-dropdown-menu 元素,如果

    2024年02月13日
    浏览(41)
  • docker报错:You have to remove (or rename) that container to be able to reuse that name

    You have to remove (or rename) that container to be able to reuse that name 错误原因:您必须删除(或重命名)该容器才能重用该名称。 解决: 查看docker 启动进程 杀死指定进程:

    2024年02月04日
    浏览(45)
  • Bitwarden报错:Cannot read properties of nul(reading ‘iterations‘)

    所有终端都无法登录,但已登录的不受影响还能正常使用。 看后台日志能找到404 Not Found的字样 部署的时候使用的镜像为 bitwardenrs/server:latest 官方已经更改了镜像,原镜像已经不再更新,新镜像名为: docker.io/vaultwarden/server:latest 第一次部署的话将环境变量 SIGNUPS_ALLOWED=true ,

    2024年02月12日
    浏览(44)
  • cannot be resolved to absolute file path because it does not reside in the file system 问题解决

    在Springboot中利用Resource来获取文件并在前端返回该文件, 本地测试正常, 打包到远程报错: cannot be resolved to absolute file path because it does not reside in the file system 紧接上一个问题: 项目打包成 jar 后包无法读取src/main/resources下文件, 在Springboot打包之后, 无法读取到jar包内的文件, 因此

    2023年04月18日
    浏览(43)
  • Vue报错 Cannot read properties of undefined (reading ‘websiteDomains‘) 解决办法

    浏览器控制台如下报错: Unchecked runtime.lastError: The message port closed before a response was received. Uncaught (in promise) TypeError: Cannot read properties of undefined (reading \\\'websiteDomains\\\')     at xl-content.js:1:100558 此问题困扰了很久,偶然看到一篇博文,说是迅雷扩展问题 要想解决这个bug最有效的方

    2024年04月24日
    浏览(43)
  • TypeError: Cannot read properties of undefined (reading ‘NAME‘)报错解决

    问题描述:前端一个el-table表格,一个医院查询到的科室从后端返回时总是显示不出来,response里面是有数据的,这个表格别的医院都能显示出科室,就那个医院显示不出。报错:TypeError: Cannot read properties of undefined (reading \\\'NAME\\\')  查找问题所在,发现el-table里面有一个:formatte

    2024年02月01日
    浏览(53)
  • 【Spark】What is the difference between Input and Shuffle Read

    Spark调参过程中 保持每个task的 input + shuffle read 量在300-500M左右比较合适 The Spark UI is documented here: https://spark.apache.org/docs/3.0.1/web-ui.html The relevant paragraph reads: Input: Bytes read from storage in this stage Output: Bytes written in storage in this stage Shuffle read: Total shuffle bytes and records read, includes b

    2024年02月06日
    浏览(44)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包