Cannot overwrite a path that is also being read from.
这个错看起来很简单。代码简化为
Dataset<Row> selectBefore = session.sql("select * from table1") //表里原先的数据
Dataset<Row> dataset = session.createDataset(list,xx.class)//新增加的数据csv txt kafka
大概就是获取表里的原始数据,然后从别的地方搞来的新数据两个合起来继续存到表里去
selectBefore .union(dataset) --两个数据union融合 .write() .mode(SaveMode.Overwrite) --重新写到hive .format("hive") .insertInto(table1);
为啥不用append 因为有时候会重复调用。。 反正就是这么个情况。就是要先查再插入。
为什么报错?
Spark SQL在雪球的实践 - 腾讯云开发者社区-腾讯云
解决办法
第一个解决办法真不行。我查了下这两个参数大多是解决spark读取hive表数据量不对的情况用的。而且我设置之后还有报错就不贴出来了
spark sql读取不到orc格式hive表数据问题_Java小田的博客-CSDN博客_spark读取不到hive表
第二个 确实可以。
session.sparkContext().setCheckpointDir("/tmp/spark/job/OrderOnlineSparkJob");
Dataset<Row> selectBefore = session.sql("select * from table1").checkpoint();
第三个这种lowb方法就不说了
找个时间好好学习下spark的checkpoint知识。
备注下:这种方式有个弊端,checkpoint 在hdfs的目录不是一定会删除的,经过百度,说到GC的时候才会删除,还有什么弱引用。给两个解决办法
1..config("spark.cleaner.referenceTracking.cleanCheckpoints", "true") 这个就是checkpoint的清理线程去清理,但是不一定完全有用,建议大家可以试下System.gc(); 手动触发下。
2.搞个脚本跑完spark后 hdfs dfs -rm -r /checkpoint/*
___________________________________________________________________________
2022-11-15更新。为什么checkpoint一下就可以了?
我们稍微百度下,spark.checkpoint的作用?
spark checkpoint详解 - 超级核弹头 - 博客园
简单来说,
1.截断血缘关系,避免rdd从头开始继续计算,解决链路过长的问题
2.上面说的云里雾里,那就看这里,就是把数据存到了hdfs的目录。可以从磁盘文件去恢复数据
那为啥没有checkpoint就不能插入读取的表?
之前看到的一篇文章说了,spark的逻辑是没有临时目录,hive是有临时目录的。
所以这里就很清楚了
没有checkpoint= 读自己目录,写自己目录 报错
有了checkpoint=读hdfs的checkpoint dir,写自己目录,所以是可以的
——————————————————————————————————————————
稍微分析下源码
上图中internalRdd是我们查出来的dataset然后toRdd 在copy,简单来说就是复制了一份,
reliableCheckpoint默认是true。 这个一直是true,应该就是你的checkpoint是否可靠,比如存到hdfs就可靠,你存到磁盘肯定不可靠。。。
接着看
internalRdd.checkpoint()和 internalRdd.count()
上图看你的checkpointDir 设置没有,没有就报错
最后new ReliableRDDCheckpointData(this)->new ReliableRDDCheckpointData(internalRdd)
我们再看ReliableRDDCheckpointData 类
这里只是new了这个对象。并没有存储rdd
实际是在后面的action后调用该doCheckpoint方法
Materialize this RDD and write its content to a reliable DFS. This is called immediately after the first action invoked on this RDD has completed.
将rdd物化也就是存储,将它的内容写到可靠的dfs上,这个在第一个action执行后会被立马调用!
val newRDD = ReliableCheckpointRDD.writeRDDToCheckpointDirectory(rdd, cpDir)
这里可以看到说明是啥,不是把原来的rdd返回了,是返回了一个新的rdd!!!!所以我们这个时候使用的rdd不是我们spark读取的rdd了。
这个时候我们再看下我之前提到的问题,spark.cleaner.referenceTracking.cleanCheckpoints这个参数有什么用?
这里设置了true,看代码不就是把rdd的cleaner遍历,然后去清理checkpoint目录吗?为什么有时候不起作用?(我跑了十次大概清理了1-2次,最后hdfs checkpoint目录大概有1G的大小,如果数据特别大,这个空间不就浪费了?)
继续研究下,这个cleaner是啥?
可以看到 spark.cleaner.referenceTracking 这个设置为true了会new一个 默认为true。然后foreach start。
这个代码就有意思了。。。。
cleaningThread.setDaemon(true)//设置为守护线程,主线程没了它也没了 cleaningThread.setName("Spark Context Cleaner")//加个名字不重要。 cleaningThread.start() //start 方法 肯定要看 periodicGCService.scheduleAtFixedRate(new Runnable { override def run(): Unit = System.gc() }, periodicGCInterval, periodicGCInterval, TimeUnit.SECONDS)//这个搞了一个定时调度的一个东西,仔细看这个是啥
periodicGCInterval=30min periodicGCInterval=30min
意思是啥?就是每隔30min来次System.GC来清除弱引用。这就是为什么有时候checkpoint会自己删 有时候不删了。
这段代码主要两个是
1.开启线程清理
2.开启定时GC
cleaningThread是什么呢?
private val cleaningThread = new Thread() { override def run() { keepCleaning() }}
cleaningThread.start()就是keepCleaning对吧。
//开启清理线程,清理没有引用的checkpoint boradcast rdd //怎么清理的?referenceQueue这个引用队列里获取的。 /** Keep cleaning RDD, shuffle, and broadcast state. */ private def keepCleaning(): Unit = Utils.tryOrStopSparkContext(sc) { while (!stopped) {//差不多就是while(true)了 try { val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT)) .map(_.asInstanceOf[CleanupTaskWeakReference])//原来是这里!!! // Synchronize here to avoid being interrupted on stop() synchronized { reference.foreach { ref => logDebug("Got cleaning task " + ref.task) referenceBuffer.remove(ref) ref.task match { case CleanRDD(rddId) => doCleanupRDD(rddId, blocking = blockOnCleanupTasks) case CleanShuffle(shuffleId) => doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks) case CleanBroadcast(broadcastId) => doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks) case CleanAccum(accId) => doCleanupAccum(accId, blocking = blockOnCleanupTasks) case CleanCheckpoint(rddId) => //!!!!清理checkpoint目录 doCleanCheckpoint(rddId) } } } } catch { case ie: InterruptedException if stopped => // ignore case e: Exception => logError("Error in cleaning thread", e) } } }——————————————————————————————————
/** * Clean up checkpoint files written to a reliable storage. * Locally checkpointed files are cleaned up separately through RDD cleanups. */ def doCleanCheckpoint(rddId: Int): Unit = { try { logDebug("Cleaning rdd checkpoint data " + rddId) ReliableRDDCheckpointData.cleanCheckpoint(sc, rddId) listeners.asScala.foreach(_.checkpointCleaned(rddId)) logInfo("Cleaned rdd checkpoint data " + rddId) } catch { case e: Exception => logError("Error cleaning rdd checkpoint data " + rddId, e) } }———————————————清理hdfs目录———————————————
/** Clean up the files associated with the checkpoint data for this RDD. */ def cleanCheckpoint(sc: SparkContext, rddId: Int): Unit = { checkpointPath(sc, rddId).foreach { path => path.getFileSystem(sc.hadoopConfiguration).delete(path, true) } }
看着好像没问题为啥没有清理呢? 能力有限查不出来。。。但是可以借鉴别人的,别人比我写的好多了。
Spark ContextCleaner及checkpoint的clean机制分析 - 知乎
———————————————继续学习弱引用————————————————————
不清楚弱引用的可以看这个文章
Java中弱引用的概念和作用是什么 - 编程语言 - 亿速云
之前的文章分析到 checkpoint目录没有被删除是因为弱引用的问题。弱引用假装已经很熟练了。那么继续源码分析。
ContextCleaner类
//搞了一个引用队列,存放引用对象
private val referenceQueue = new ReferenceQueue[AnyRef]
/** Register an RDD for cleanup when it is garbage collected. */ def registerRDDForCleanup(rdd: RDD[_]): Unit = { registerForCleanup(rdd, CleanRDD(rdd.id)) } def registerAccumulatorForCleanup(a: AccumulatorV2[_, _]): Unit = { registerForCleanup(a, CleanAccum(a.id)) } /** Register a ShuffleDependency for cleanup when it is garbage collected. */ def registerShuffleForCleanup(shuffleDependency: ShuffleDependency[_, _, _]): Unit = { registerForCleanup(shuffleDependency, CleanShuffle(shuffleDependency.shuffleId)) } /** Register a Broadcast for cleanup when it is garbage collected. */ def registerBroadcastForCleanup[T](broadcast: Broadcast[T]): Unit = { registerForCleanup(broadcast, CleanBroadcast(broadcast.id)) } /** Register a RDDCheckpointData for cleanup when it is garbage collected. */ def registerRDDCheckpointDataForCleanup[T](rdd: RDD[_], parentId: Int): Unit = { registerForCleanup(rdd, CleanCheckpoint(parentId)) } /** Register an object for cleanup. */ private def registerForCleanup(objectForCleanup: AnyRef, task: CleanupTask): Unit = { referenceBuffer.add(new CleanupTaskWeakReference(task, objectForCleanup, referenceQueue)) }
这里多复制点。可以看到acc rdd RDDCheckpointData 等都是调用的registerForCleanup这个方法。最后都是变成了 new CleanupTaskWeakReference(task,acc/rdd/checkpoint,引用队列)
CleanupTaskWeakReference
/**
* A WeakReference associated with a CleanupTask.
*
* When the referent object becomes only weakly reachable, the corresponding
* CleanupTaskWeakReference is automatically added to the given reference queue.
*/
private class CleanupTaskWeakReference(
val task: CleanupTask,
referent: AnyRef,
referenceQueue: ReferenceQueue[AnyRef])
extends WeakReference(referent, referenceQueue)
至此我们分析下checkpoint的流程。
rdd.checkpoint()->
new ReliableRDDCheckpointData(this)
ReliableRDDCheckpointData.checkpointPath(rdd.context, rdd.id)//确定checkpoint路径
spark.cleaner.referenceTracking.cleanCheckpoints=TURE会将当前rddCheckpoint放到引用队列里,
最开始我们spark代码里new SparkContext的时候
会根据spark.cleaner.referenceTracking=true去 new ContextCleaner()然后调用这个的start方法
而这个ContextCleaner的start方法开启了两个线程
一个不停的看队列里是否有需要清除弱引用对象
一个定时去GC
经过我测试 弱引用对象什么时候会被清理?
参考文章
referenceQueue用法_gmHappy的博客-CSDN博客_referencequeue
private void test2() throws InterruptedException {
ReferenceQueue<Object> referenceQueue = new ReferenceQueue<>();
Object value = new Object();
Map<Object, Object> map = new HashMap<>();
for (int i = 0; i < 10; i++) {
WeakReference<String> weakReference = new WeakReference<String>(String.valueOf(i), referenceQueue);
System.out.println("创造了:"+weakReference+",value="+weakReference.get());
map.put(weakReference, value);
}
System.gc();
Thread.sleep(100);
WeakReference<String> result=null;
int cnt = 0;
while ((result=(WeakReference)referenceQueue.poll())!=null){
System.out.println((cnt++) + "回收了:" + result+",value="+result.get());
}
}
-----------
创造了:java.lang.ref.WeakReference@383534aa,value=0
创造了:java.lang.ref.WeakReference@6bc168e5,value=1
创造了:java.lang.ref.WeakReference@7b3300e5,value=2
创造了:java.lang.ref.WeakReference@2e5c649,value=3
创造了:java.lang.ref.WeakReference@136432db,value=4
0回收了:java.lang.ref.WeakReference@6bc168e5,value=null
1回收了:java.lang.ref.WeakReference@2e5c649,value=null
2回收了:java.lang.ref.WeakReference@7b3300e5,value=null
3回收了:java.lang.ref.WeakReference@136432db,value=null
4回收了:java.lang.ref.WeakReference@383534aa,value=null
private void testBigObjectWithoutGC() throws InterruptedException {
int _1M = 1024 * 1024;
ReferenceQueue<Object> referenceQueue = new ReferenceQueue<>();
Thread thread = new Thread(() -> {
try {
int cnt = 0;
WeakReference<byte[]> k;
while ((k = (WeakReference) referenceQueue.remove()) != null) {
System.out.println((cnt++) + "回收了:" + k);
}
} catch (InterruptedException e) {
// 结束循环
}
});
thread.setDaemon(true);
thread.start();
Object value = new Object();
Map<Object, Object> map = new HashMap<>();
for (int i = 0; i < 10000; i++) {
byte[] bytes = new byte[_1M];
WeakReference<byte[]> weakReference = new WeakReference<byte[]>(bytes, referenceQueue);
map.put(weakReference, value);
}
System.out.println("map.size->" + map.size());
}
--------
8342回收了:java.lang.ref.WeakReference@67a3bd51
8343回收了:java.lang.ref.WeakReference@2cec704c
8344回收了:java.lang.ref.WeakReference@bd1111a
8345回收了:java.lang.ref.WeakReference@5918c260
8346回收了:java.lang.ref.WeakReference@7fc7c4a
8347回收了:java.lang.ref.WeakReference@9d3c67
注意这里没有了。没有回收10000个对象 只回收了8000多个。。有时候9000多。
经过测试发现
需要使用
System.gc(); //触发GC 但是GC没有真正开始
Thread.sleep(1000);//给个时间让GC去执行。
或者
你队列里放的对象都很大,并且数量也不少 这个时候会主动触发GC
这样弱引用对象就会被清除了。。。所以我们之前如何清理checkpoint目录呢?
spark.cleaner.referenceTracking.cleanCheckpoints true文章来源:https://www.toymoban.com/news/detail-400267.html
System.gc();+ Thread.sleep(1000); 也不会特别影响spark的任务。我就懒得测试了。。。各位成功了留个言让我知道下。文章来源地址https://www.toymoban.com/news/detail-400267.html
到了这里,关于spark报错:Cannot overwrite a path that is also being read from. Cannot overwrite a path that is also being read from.的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!