RDD算子操作(基本算子和常见算子)

这篇具有很好参考价值的文章主要介绍了RDD算子操作(基本算子和常见算子)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

目录

一、基本算子

        1.map算子

        2.flatMap算子

        3.filter算子

         4.foreach算子

        5.saveAsTextFile算子

        6.redueceByKey算子

二、常用Transformation算子       

         1.mapValues算子

        2.groupBy算子

        3.distinct算子

        4.union算子

        5.join算子

        6.intersection算子

        7.glom算子

        8.groupByKey算子

        9.sortBy算子

        10.sortByKey算子

三、常用Action算子

        1.countByKey算子

        2.collect算子

        3.reduce算子

        4.takeSample算子

        5.takeOrdered算子

四、分区操作算子

        1.mapPartitions算子

        2.foreachPartition算子

        3.partitionBy算子

        4.repartition算子和coalesce算子


一、基本算子

        RDD中map、filter、flatMap及foreach等函数为最基本算子,都是都RDD中每个元素进行操作,将元素传递到函数中进行转换。

        1.map算子

        map(f:T=>U): RDD[T]=>RDD[U],表示将RDD经由某一函数f后,转变为另一个RDD。

        功能:map算子,是将RDD的数据一条条处理(处理的逻辑 基于map算子中接受的处理函数),返回新的RDD。

RDD算子操作(基本算子和常见算子),大数据,大数据,spark

#cording:utf-8
from pyspark import SparkConf,SparkContext

if __name__ == "__main__":
    # 构建SparkContext对象
    conf = SparkConf().setAppName('test').setMaster("local[*]")
    sc = SparkContext(conf=conf)

    rdd = sc.parallelize([1,2,3,4,5,6],3)

    # 定义方法,作为算子的传入函数体
    def add(data):
        return data * 10

    print(rdd.map(add).collect())

    # 更简单的方式 是定义lambda表达式来写匿名函数
    print(rdd.map(lambda data:data * 10).collect())

'''
    对于算子的接受函数来说,两种方法都可以
    lambda表达式 适用于 一行代码就搞定的函数体,如果是多行,需要定义独立的方法
'''
RDD算子操作(基本算子和常见算子),大数据,大数据,spark        2.flatMap算子

        flatMap(f:T=>Seq[U]): RDD[T]=>RDD[U]),表示将RDD经由某一函数f后,转变为一个新的 RDD,但是与map 不同,RDD中的每一个元素会被映射成新的0到多个元素(f 函数返回的是一个序列Seq)。

        功能:对RDD执行map操作,然后进行解除嵌套操作

#cording:utf-8
from pyspark import SparkConf,SparkContext

if __name__ == "__main__":
    # 构建SparkContext对象
    conf = SparkConf().setAppName('test').setMaster("local[*]")
    sc = SparkContext(conf=conf)

    rdd = sc.parallelize(["hadoop hadoop spark","spark hadoop hadoop","hadoop flink spark"])

    #得到所有的单词,组成RDD
    rdd2 = rdd.map(lambda line: line.split(" "))
    rdd3 = rdd.flatMap(lambda line: line.split(" "))
    print(rdd2.collect())
    print(rdd3.collect())
RDD算子操作(基本算子和常见算子),大数据,大数据,spark        3.filter算子

        filter(f.T=>Bool): RDD[T]=>RDD[T],表示将 RDD经由某一函数f后,只保留f返回True的数据,组成新的RDD。

        功能:过滤想要的数据进行保留,返回值是True的数据保留,返回值是False的数据则会被丢弃。

RDD算子操作(基本算子和常见算子),大数据,大数据,spark

#corfding:utf8
from pyspark import SparkConf,SparkContext
if __name__ == '__main__':
    conf = SparkConf().setAppName('test').setMaster('local[*]')
    sc = SparkContext(conf=conf)
    # 通过filter算子过滤奇数
    rdd = sc.parallelize((1,2,3,4,5,6,7,8,9,10))
    result_rdd = rdd.filter(lambda x: x % 2 == 1)
    print(result_rdd.collect())

RDD算子操作(基本算子和常见算子),大数据,大数据,spark

         4.foreach算子

       foreach(func),将函数 func应用在数据集的每一个元素上,通常用于更新一个累加器,或者和外部存储系统进行交互,例如 Redis。

        功能:对RDD的每一个元素,执行你提供的逻辑的操作(和map一个意思),但是这个方法没有返回值。

        ps:该算子是分区(Executor)直接执行的,跳过Driver,由分区所在的Executor直接执行。

RDD算子操作(基本算子和常见算子),大数据,大数据,spark

#cording:utf8
from pyspark import SparkConf,SparkContext
if __name__ == '__main__':
    conf = SparkConf().setMaster('local[*]').setAppName('test')
    sc = SparkContext(conf=conf)
    rdd = sc.parallelize([1, 5, 4, 2, 3, 6])
    print(rdd.foreach(lambda x: 10 * x))
    print('----------------------------------')
    print(rdd.foreach(lambda x: print(10 * x)))
RDD算子操作(基本算子和常见算子),大数据,大数据,spark
        5.saveAsTextFile算子

        saveAsTextFile(path:String),数据集内部的元素会调用其 toString方法,转换为字符串形式,然后根据传入的路径保存成文本文件,既可以是本地文件系统,也可以是HDFS等。

        ps:该算子是分区(Executor)直接执行的,跳过Driver,由分区所在的Executor直接执行。

#cording:utf8
from pyspark import SparkConf,SparkContext
if __name__ == '__main__':
    conf = SparkConf().setMaster('local[*]').setAppName('test')
    sc = SparkContext(conf=conf)
    rdd = sc.parallelize([1, 5, 4, 2, 3, 6])

    rdd.saveAsTextFile('hdfs://pyspark01/output/out1')

RDD算子操作(基本算子和常见算子),大数据,大数据,spark

        6.redueceByKey算子

        功能:针对KV型RDD,自动按照key分组,然后根据你提供的聚合逻辑,完成组内数据(value)的聚合操作。

RDD算子操作(基本算子和常见算子),大数据,大数据,spark

#cording:utf-8
from pyspark import SparkConf,SparkContext

if __name__ == "__main__":
    # 构建SparkContext对象
    conf = SparkConf().setAppName('test').setMaster("local[*]")
    sc = SparkContext(conf=conf)
    rdd = sc.parallelize([('a',1),('b',1),('a',1),('a',1),('b',1),('c',1),('a',1)])
    #使用reduceByKey函数进行聚合
    reduce_rdd = rdd.reduceByKey(lambda a,b : a + b).collect()
    print("聚合结果:",reduce_rdd)

RDD算子操作(基本算子和常见算子),大数据,大数据,spark

二、常用Transformation算子       

         1.mapValues算子

        功能:针对二元元组RDD,对其内部的二元元组的Value执行map操作。

RDD算子操作(基本算子和常见算子),大数据,大数据,spark

#cording:utf-8
from pyspark import SparkConf,SparkContext

if __name__ == "__main__":
    # 构建SparkContext对象
    conf = SparkConf().setAppName('test').setMaster("local[*]")
    sc = SparkContext(conf=conf)
    rdd = sc.parallelize([('a',2),('b',11),('a',1)])
    #使用map函数
    map_rdd = rdd.map(lambda x: (x[0],x[1]*10)).collect()
    print("结果:",map_rdd)
    # 使用mapValue函数
    value_rdd = rdd.mapValues(lambda value: value*10).collect()
    print("结果:",value_rdd)

RDD算子操作(基本算子和常见算子),大数据,大数据,spark

        2.groupBy算子

        功能:将RDD数据进行分组。

RDD算子操作(基本算子和常见算子),大数据,大数据,spark

#cording:utf8

from pyspark import SparkConf,SparkContext

if __name__ == '__main__':
    conf = SparkConf().setMaster('local[*]').setAppName('test')
    sc = SparkContext(conf=conf)
    # 创建数据
    test_rdd = sc.parallelize([('a',1),('b',1),('a',2),('b',2),('b',3)])
    # 通过groupBy函数对数据进行分组
    # groupBy函数传入函数的意思是:通过这个函数,来确定按照谁来分组(返回谁即可)
    # 分组规则和SQL一致:也就是相同的在同一个组(Hash分组)
    result_1 = test_rdd.groupBy(lambda t: t[0])
    result_2 = result_1.map(lambda t: (t[0],list(t[1])))
    print(result_1.collect())
    print(result_2.collect())

RDD算子操作(基本算子和常见算子),大数据,大数据,spark

        3.distinct算子

        功能:对RDD数据进行去重复,返回新的RDD。

RDD算子操作(基本算子和常见算子),大数据,大数据,spark

#cording:utf8

from pyspark import SparkConf,SparkContext

if __name__ == '__main__':
    conf = SparkConf().setAppName('test').setMaster('local[*]')
    sc = SparkContext(conf=conf)
    rdd_1 =  sc.parallelize((1,2,1,2,3,4,5,6))
    rdd_2 = sc.parallelize([('a',1),('b',1),('a',1),('a',1),('b',1),('c',1),('a',1)])
    # 使用distinct算子进行去重
    print('数字:',rdd_1.distinct().collect())
    print('元组:',rdd_2.distinct().collect())

RDD算子操作(基本算子和常见算子),大数据,大数据,spark

        4.union算子

        功能:将两个RDD合并成一个RDD返回。只合并不去重,RDD的类型不同也是可以合并的。

#corfding:utf8
from pyspark import SparkConf,SparkContext
if __name__ == '__main__':
    conf = SparkConf().setAppName('test').setMaster('local[*]')
    sc = SparkContext(conf=conf)
    # 通过union算子合并RDD
    rdd_1 = sc.parallelize((1,2,3,4,5))
    rdd_2 = sc.parallelize((6,7,8,9,10))
    print(rdd_1.union(rdd_2).collect())

RDD算子操作(基本算子和常见算子),大数据,大数据,spark

        5.join算子

        功能:对两个RDD执行join操作(可实现SQL外/内连接),join算子只能用于二元元组。

RDD算子操作(基本算子和常见算子),大数据,大数据,spark

#corfding:utf8
from pyspark import SparkConf,SparkContext
if __name__ == '__main__':
    conf = SparkConf().setAppName('test').setMaster('local[*]')
    sc = SparkContext(conf=conf)

    rdd1 = sc.parallelize([(1001,'zhangsan'),(1002,'lisi'),(1003,'wangwu'),(1004,'zhaoliu')])
    rdd2 = sc.parallelize([(1001,'销售部'),(1002,'科技部')])

    # 通过join算子来进行rdd之间的关联
    # 对于join算子来说,关联条件按照二元元组的key来进行关联
    print(rdd1.join(rdd2).collect())

    # 左外连接,右外连接可以更换一下rdd的顺序或者调用rightOuterJoin即可
    print(rdd1.leftOuterJoin(rdd2).collect())

RDD算子操作(基本算子和常见算子),大数据,大数据,spark

        6.intersection算子

        功能:求两个RDD的交集,返回一个新的RDD。

#corfding:utf8
from pyspark import SparkConf,SparkContext
if __name__ == '__main__':
    conf = SparkConf().setAppName('test').setMaster('local[*]')
    sc = SparkContext(conf=conf)

    rdd1 = sc.parallelize([('a',1),('b',3)])
    rdd2 = sc.parallelize([('a',1),('c',1)])
    # 通过intersection算子求出RDD的交集 取出并返回新的RDD
    print(rdd1.intersection(rdd2).collect())

RDD算子操作(基本算子和常见算子),大数据,大数据,spark

        7.glom算子

        功能:将RDD的数据,加上嵌套,这个嵌套按照分区来进行,比如RDD数据[1,2,3,4,5]有两个分区,那么glom后,数据变成:[[1,2,3],[4,5]]。

#corfding:utf8
from pyspark import SparkConf,SparkContext
if __name__ == '__main__':
    conf = SparkConf().setAppName('test').setMaster('local[*]')
    sc = SparkContext(conf=conf)

    rdd1 = sc.parallelize([1,2,3,4,5,6,7,8,9,10])
    print(rdd1.glom().collect())
    # 解嵌套操作
    print(rdd1.glom().flatMap(lambda x: x).collect())

RDD算子操作(基本算子和常见算子),大数据,大数据,spark

        8.groupByKey算子

        功能:针对KV型RDD,自动按照key分组。

#cording:utf8

from pyspark import SparkConf,SparkContext

if __name__ == '__main__':
    conf = SparkConf().setMaster('local[*]').setAppName('test')
    sc = SparkContext(conf=conf)
    # 创建数据
    test_rdd = sc.parallelize([('a',1),('b',1),('a',2),('b',2),('b',3)])
    # 使用groupByKey算子
    result_1 = test_rdd.groupByKey()
    #查看结果
    result_2 = result_1.map(lambda t: (t[0],list(t[1])))
    print(result_1.collect())
    print(result_2.collect())

RDD算子操作(基本算子和常见算子),大数据,大数据,spark

        9.sortBy算子

        功能:对RDD数据进行排序,基于你指定的排序依据。

RDD算子操作(基本算子和常见算子),大数据,大数据,spark

#cording:utf8
from pyspark import SparkConf,SparkContext
if __name__ == '__main__':
    conf = SparkConf().setMaster('local[*]').setAppName('test')
    sc = SparkContext(conf=conf)

    rdd = sc.parallelize([('c',3),('f',1),('b',11),('c',3),('e',1),('n',9),('a',1)],3)
    # 使用sortBy对RDD执行排序
    # 按照value 数字进行排序
    # 参数1函数:表示的是,告知spark,按照数据的哪个列进行排序
    # 参数2:True表示升序 False表示降序
    # 参数3:排序的分区数
    '''注意:如果要全局有序,排序分区数设置为1'''
    print('按照value排序:',rdd.sortBy(lambda x: x[1], ascending=True, numPartitions=3).collect())
    # 按照key进行排序
    print('按照key排序:',rdd.sortBy(lambda x: x[0], ascending=True, numPartitions=3).collect())
RDD算子操作(基本算子和常见算子),大数据,大数据,spark        10.sortByKey算子

        功能:针对KV型RDD,按照Key进行排序

RDD算子操作(基本算子和常见算子),大数据,大数据,spark

#cording:utf8
from pyspark import SparkConf,SparkContext
if __name__ == '__main__':
    conf = SparkConf().setMaster('local[*]').setAppName('test')
    sc = SparkContext(conf=conf)
    rdd = sc.parallelize([('a',1),('E',1),('C',1),('D',1),('b',1),('g',1),('h',1),
                            ( "y" ,1),('u',1),('i',1),('o',1),('p',1),
                            ( 'm',1),('n',1),('L',1),('k',1),('f',1)],3)
    # 根据字母的小写排序
    print(rdd.sortByKey(ascending=True, numPartitions=1, keyfunc=lambda key: key.lower()).collect())

RDD算子操作(基本算子和常见算子),大数据,大数据,spark

三、常用Action算子

        1.countByKey算子

        功能:统计key出现的次数(一般适用于KV型RDD)

import json
from pyspark import SparkConf,SparkContext
if __name__ == '__main__':
    conf = SparkConf().setMaster('local[*]').setAppName('test')
    sc = SparkContext(conf=conf)

    rdd = sc.textFile('../input/words.txt')
    rdd2 = rdd.flatMap(lambda x: x.split(' ')).map(lambda x: (x,1))

    # 通过countByKey来对key进行计数,这是一个Action算子
    result = rdd2.countByKey()
    print(result)
    print(type(result))

RDD算子操作(基本算子和常见算子),大数据,大数据,spark

        2.collect算子

        功能:将RDD各个分区的数据,统一收集到Driver中,形成一个list对象。这个算子,是将RDD各个分区数据都拉取到Driver,注意的是,RDD是分布式对象,其数据量可以很大,所以用这个算子之前,要心知肚明的了解结果数据集不会太大,不然,会把Driver内存撑爆。

        3.reduce算子

        功能:对RDD数据集按照你传入的逻辑进行聚合。

RDD算子操作(基本算子和常见算子),大数据,大数据,spark

import json
from pyspark import SparkConf,SparkContext
if __name__ == '__main__':
    conf = SparkConf().setMaster('local[*]').setAppName('test')
    sc = SparkContext(conf=conf)

    rdd = sc.parallelize([1,2,3,4,5,6])
    print(rdd.reduce(lambda a,b: a+b))

RDD算子操作(基本算子和常见算子),大数据,大数据,spark

        4.takeSample算子

        功能:随机抽样RDD数据,随机数种子数字可以随便传,如果传同一个数字,那么取出的结果是一致的。一般参数三不传,spark会自动给与一个随机的种子。

RDD算子操作(基本算子和常见算子),大数据,大数据,spark


from pyspark import SparkConf,SparkContext
if __name__ == '__main__':
    conf = SparkConf().setMaster('local[*]').setAppName('test')
    sc = SparkContext(conf=conf)

    rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10,1,2])
    print('True:',rdd.takeSample(True,22))
    print('False:',rdd.takeSample(False,22))
    print('无随机种子1:',rdd.takeSample(True,5))
    print('无随机种子2:', rdd.takeSample(True, 5))
    print('有随机种子1:',rdd.takeSample(True,5,1))
    print('有随机种子2:', rdd.takeSample(True, 5, 1))

RDD算子操作(基本算子和常见算子),大数据,大数据,spark

        5.takeOrdered算子

        功能:对RDD进行排序取前N个。

RDD算子操作(基本算子和常见算子),大数据,大数据,spark

#cording:utf8
from pyspark import SparkConf,SparkContext
if __name__ == '__main__':
    conf = SparkConf().setMaster('local[*]').setAppName('test')
    sc = SparkContext(conf=conf)

    rdd = sc.parallelize([1,5,4,2,3,6])
    print('普通:',rdd.takeOrdered(3))
    # 函数操作只会对结果产生影响,不会影响数据本身
    print("传入函数:",rdd.takeOrdered(3, lambda x: -x))

RDD算子操作(基本算子和常见算子),大数据,大数据,spark

四、分区操作算子

        1.mapPartitions算子

        功能:与map功能相似,但区别是,mapPartition一次被传递的是一整个分区的数据,是作为一个迭代器(一次性list)对象传入过来,而map是一个一个数据的传递。

#cording:utf8
from pyspark import SparkConf,SparkContext
if __name__ == '__main__':
    conf = SparkConf().setMaster('local[*]').setAppName('test')
    sc = SparkContext(conf=conf)
    rdd = sc.parallelize([1, 5, 4, 2, 3, 6],3)
    def process(iter):
        result = list()
        for it in iter:
            result.append(it * 10)
        return result

    # mapPartitions算子相比于map算子,节省了大量打IO操作,每一个分区只需要进行一次IO操作即可
    print('输出结果:',rdd.mapPartitions(process).collect())

RDD算子操作(基本算子和常见算子),大数据,大数据,spark

        2.foreachPartition算子

        功能:和普通的foreach一致,一次处理的是一整个分区的数据。

#cording:utf8
from pyspark import SparkConf,SparkContext
if __name__ == '__main__':
    conf = SparkConf().setMaster('local[*]').setAppName('test')
    sc = SparkContext(conf=conf)
    rdd = sc.parallelize([1, 5, 4, 2, 3, 6],3)
    def process(iter):
        result = list()
        for it in iter:
            result.append(it * 10)
        print(result)

    rdd.foreachPartition(process)

RDD算子操作(基本算子和常见算子),大数据,大数据,spark

        3.partitionBy算子

        功能:对RDD进行自定义分区操作。

RDD算子操作(基本算子和常见算子),大数据,大数据,spark

#cording:utf8
from pyspark import SparkConf,SparkContext
if __name__ == '__main__':
    conf = SparkConf().setMaster('local[*]').setAppName('test')
    sc = SparkContext(conf=conf)
    rdd = sc.parallelize([('hadoop',1),('hadoop',1),('hello',1),('spark',1),('flink',1),('spark',1)])

    # 使用partitionBy自定义分区
    def process(x):
        if 'hadoop' == x or 'hello' == x:return 0
        if 'spark' == x:return 1
        return 2
    # 使用glom算子将每个分区的数据进行嵌套
    print('显示分区:',rdd.partitionBy(3, process).glom().collect())

RDD算子操作(基本算子和常见算子),大数据,大数据,spark

        4.repartition算子和coalesce算子

        功能:对RDD的分区执行重新分区(仅数量)

        ps:对分区的数量进行操作,一定要慎重,一般情况下,我们写Spark代码除了要求全局排序设置为1个分区外多数时候,所有API中关于分区相关的代码我们都不太理会。因为,如果你改分区了,会影响并行计算(内存迭代的并行管道数量)后面学分区如果增加,极大可能导致shuffle。

RDD算子操作(基本算子和常见算子),大数据,大数据,spark

#cording:utf8
from pyspark import SparkConf,SparkContext
if __name__ == '__main__':
    conf = SparkConf().setMaster('local[*]').setAppName('test')
    sc = SparkContext(conf=conf)
    rdd = sc.parallelize([1, 5, 4, 2, 3, 6],3)

    # repartition 修改分区
    # 减少分区
    print("减少分区为1:",rdd.repartition(1).getNumPartitions())
    # 增加分区
    print("增加分区为5:", rdd.repartition(5).getNumPartitions())
    # coalesce 修改分区
    # 减少分区
    print("减少分区为1:",rdd.coalesce(1).getNumPartitions())
    # 增加分区 ps:coalesce增加分区数量需要指定参数shuffle为True才能1成功修改
    print("减少分区为5:", rdd.coalesce(5).getNumPartitions())
    print("减少分区为5:",rdd.coalesce(5, shuffle=True).getNumPartitions())

RDD算子操作(基本算子和常见算子),大数据,大数据,spark文章来源地址https://www.toymoban.com/news/detail-721075.html

到了这里,关于RDD算子操作(基本算子和常见算子)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Spark---RDD算子(单值类型转换算子)

    RDD算子是用于对RDD进行转换(Transformation)或行动(Action)操作的方法或函数。通俗来讲,RDD算子就是RDD中的函数或者方法,根据其功能,RDD算子可以分为两大类: 转换算子(Transformation): 转换算子用于从一个RDD生成一个新的RDD,但是原始RDD保持不变。常见的转换算子包括

    2024年01月21日
    浏览(51)
  • 【Spark】RDD转换算子

    目录 map mapPartitions mapPartitionsWithIndex flatMap glom groupBy shuffle filter sample distinct coalesce repartition sortBy ByKey intersection union subtract zip partitionBy reduceByKey groupByKey reduceByKey 和 groupByKey 的区别 aggregateByKey foldByKey combineByKey reduceByKey、foldByKey、aggregateByKey、combineByKey 的区别 join leftOuterJoin

    2024年02月12日
    浏览(78)
  • Spark中Rdd算子和Action算子--学习笔记

    filter distinct groupBy groupByKey,sortBy,SortByKey rdd之间的连接 collect,take,count()类的聚合算子,saveAsTextFile, 统计算子,countByKey() countByKey().items() countByValue() , countByValue().items() 词频统计 缓存是将数据存储再内存或者磁盘上,缓存的特点是计算结束后缓存自动清空 为什么使用缓存? 提升

    2024年01月16日
    浏览(62)
  • Spark中RDD的Transformation算子

    map算子的功能为做映射,即将原来的RDD中对应的每一个元素,应用外部传入的函数进行运算,返回一个新的RDD flatMap算子的功能为扁平化映射,即将原来RDD中对应的每一个元素应用外部的运算逻辑进行运算,然后再将返回的数据进行压平,类似先map,然后再flatten的操作,最后

    2024年02月11日
    浏览(41)
  • Spark源码解析(一):RDD之Transfrom算子

    RDD 代表的是分布式数据形态,因此,RDD 到 RDD 之间的转换,本质上是数据形态上的转换(Transformations) 在 RDD 的编程模型中,一共有两种算子,Transformations 类算子和 Actions 类算子。开发者需要使用 Transformations 类算子,定义并描述数据形态的转换过程,然后调用 Actions 类算子

    2024年01月20日
    浏览(44)
  • 大数据之RDD的算子分类

    上一篇文章主要讲述了两种RDD的创建方式,本篇文章接着讲RDD的算子及其分类。 RDD的算子主要有两种类型,一种是Transformation转换算子,另一种是Action动作算子,Transformation转换算子执行完成后会返回一个新的RDD,所有的Transformation转换算子都是Lazy,不会立即执行,需要Acti

    2024年02月05日
    浏览(29)
  • 【Spark练习】RDD分区操作

    2.1 textFile 对于textFile而言,如果 没有在方法中指定分区数 ,则sc.defaultMinPartitions默认为 min(defaultParallelism,2) ,其中,defaultParallelism对应的就是spark.default.parallelism,如果是从HDFS中读取文件,则分区数为文件分片数(比如,128MB/片) rdd的分区数 = max(本地file的分片数, sc.default

    2024年02月02日
    浏览(39)
  • 横扫Spark之 - 9个常见的行动算子

    水善利万物而不争,处众人之所恶,故几于道💦 1. collect()   收集RDD每个分区的数据以数组封装之后发给Driver   如果RDD数据量比较大,Driver内存默认只有1G,可能出现内存溢出,工作中一般需要将Driver内存设置为5-10G。可以通过 bin/spark-submit --driver-memory 10G 这样设置 结果

    2024年02月20日
    浏览(39)
  • Spark RDD编程 文件数据读写

    从本地文件系统读取数据,可以采用textFile()方法,可以为textFile()方法提供一个本地文件或目录地址,如果是一个文件地址,它会加载该文件,如果是一个目录地址,它会加载该目录下的所有文件的数据。 示例:读取一个本地文件word.txt val textFile中的textFile是变量名称,sc.t

    2024年02月05日
    浏览(42)
  • 大数据 - Spark系列《六》- RDD详解

    Spark系列文章: 大数据 - Spark系列《一》- 从Hadoop到Spark:大数据计算引擎的演进-CSDN博客 大数据 - Spark系列《二》- 关于Spark在Idea中的一些常用配置-CSDN博客 大数据 - Spark系列《三》- 加载各种数据源创建RDD-CSDN博客 大数据 - Spark系列《四》- Spark分布式运行原理-CSDN博客 大数据

    2024年02月20日
    浏览(46)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包