Spark中Rdd算子和Action算子--学习笔记

这篇具有很好参考价值的文章主要介绍了Spark中Rdd算子和Action算子--学习笔记。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

RDD算子

filter

"""
rdd.filter(f):根据f函数中的判断条件对rdd追踪的数据进行过滤
保留条件为True对应的rdd数据
"""
from pyspark import SparkContext

sc = SparkContext()

rdd1 = sc.parallelize([1,2,3,4])
rdd2 = sc.parallelize(['a','b','c'])

rdd1_filter1 = rdd1.filter(lambda x:x%2==0)
print(rdd1_filter1.collect())

rdd2_filter = rdd2.filter(lambda x:x=='a')
print(rdd2_filter.collect())

distinct

"""
rdd.distinct():对rdd中的数据进行去重操作
"""
from pyspark import SparkContext

sc = SparkContext()

rdd1 = sc.parallelize([1,2,3,4,2])
#distinct中不需要参数,它可以直接对rdd进行去重
rdd1_distinct = rdd1.distinct()
print(rdd1_distinct.collect())

groupBy

from pyspark import SparkContext


sc = SparkContext()

rdd1 = sc.parallelize([1,2,3,4,2])
rdd2 = sc.parallelize(['a','b','c','a'])

#对rdd1中的数据进行hash计算后和2进行整除,证据余数值进行分组操作
#hash(x)% 2:获取余数,返回值,0,1   0是能被2整除  1是不能被2整除
rdd2_groupBy1 = rdd2.groupBy(lambda x:hash(x)%2)
# print(rdd2_groupBy1.collect())

for k,v in rdd2_groupBy1.collect():
    print(k)
    print(list(v))
    print(tuple(v))

#拿到groupBy后的值需要通过mapValues获取
rdd2_mapValues = rdd2_groupBy1.mapValues(lambda x:list(x))

print(rdd2_mapValues.collect())

groupByKey,sortBy,SortByKey

"""
kv数据格式的rdd:rdd中的每个元素是一个容器,容器中国有两个值
①[(key1,value1),(key2,value2)……]
②[([k1,k2],value1),([k1,k2],value2)……]多个key一个value
③[([k1],[v1,v2]),([k2],[v1,v2])……]一个key多个value
④[([k1,k2],[v1,v2]),([k3,k4],[v1,v2])……]多个key,多个value

算子:
rdd.groupByKey():对kv数据格式的rdd分局可以值进行分组,相同key值对应的value值合并到一起

rdd.sortByKey():根据rdd的value值进行排序,可以通过ascending=进行调整,默认为升序
rdd.sorBy(keyfunc):对kv数据格式的rdd经过可以func函数确定函数排序规则

"""
from pyspark import SparkContext


sc = SparkContext()

rdd1 = sc.parallelize([1,2,3,4,2])
rdd2 = sc.parallelize(['a','b','c','a'])
rdd3 = sc.parallelize([('a',1),('b',2),('a',2),('b',2)])

rdd3_groupBykey1 = rdd3.groupByKey()
rdd3_map1 = rdd3_groupBykey1.mapValues(lambda x:list(x))
print(rdd3_map1.collect())
#结果:[('b', [2, 2]), ('a', [1, 2])]
rdd_reduceByKey1 = rdd3.reduceByKey(lambda x,y:x+y)
print(rdd_reduceByKey1.collect())
#结果[('b', 4), ('a', 3)]


####sortByKey()
#ascending= ,默认是升序排列
rdd3_SortByKey1 = rdd3.sortByKey()
print(rdd3_SortByKey1.collect())

#根据rdd的value值进行降序排序
rdd_sortBy1 = rdd3.sortBy(keyfunc = lambda x:x[1],ascending=False)
print(rdd_sortBy1.collect())

#根据key值对rdd中的元素进行排序操作
rdd_sortBy2 = rdd3.sortBy(keyfunc= lambda x:x,ascending=False)
print(rdd_sortBy2.collect())

rdd之间的连接

"""
rdd1.join(rdd2) 内连接
rdd1.leftOuterJoin(rdd2)左外连接
rdd1.rightOuterJoin(rdd2)左外连接
rdd1.fullOuterjoin(rdd2)

"""


from pyspark import SparkContext


sc = SparkContext()

rdd1 = sc.parallelize([('a',1),('b',2),('d',5)])
rdd2 = sc.parallelize([('a',3),('b',6),('c',3)])

#直接连接只会显示双方都有的元素
print(rdd1.join(rdd2).collect())
#左连接显示左rdd拥有的元素,右边没有的位置用None替代
print(rdd1.leftOuterJoin(rdd2).collect())
#右连接显示右rdd拥有的元素,左边没有的位置用None替代
print(rdd1.rightOuterJoin(rdd2).collect())
#全连接,两个表的数据都保存
print(rdd1.fullOuterJoin(rdd2).collect())

Action算子

collect,take,count()类的聚合算子,saveAsTextFile,

统计算子,countByKey()

countByKey().items()

countByValue() , countByValue().items()

    from pyspark import SparkContext
    
    sc = SparkContext()
    
    rdd1 = sc.parallelize([1, 2, 3, 4, 2])
    rdd2 = sc.parallelize(['a', 'b', 'c', 'a'])
    
    # collect():获取rdd中所有的数据,注意点:使用此算子时要考虑rdd对象的数据量问题
    # 可能存在内存溢出的问题
    rdd1_collect1 = rdd1.collect()
    print(rdd1_collect1)
    
    # take():获取指定数量的数据
    rdd1_take = rdd1.take(num=2)
    print(type(rdd1_take))
    print(rdd1_take)
    
    # 聚合算子count(),sum(),min(),max()
    res3 = rdd1.count()
    print(res3)
    
    rdd3 = sc.parallelize([('a', 1), ('b', 2), ('a', 2), ('b', 2)])
    res7 = rdd3.countByKey()
    print(res7)
    # 结果defaultdict(<class 'int'>, {'a': 2, 'b': 2})
    res9 = rdd3.countByValue()
    print(res9)
    
    # 统计key值的次数,并且将类型转换为dict_items
    res10 = rdd3.countByKey().items()
    print(type(res10))
    # <class 'dict_items'>
    print(res10)
    # 结果dict_items([('a', 2), ('b', 2)])
    
    
    # 统计value的次数
    res8 = rdd3.countByValue().items()
    print(res8)
    # 结果dict_items([(('a', 1), 1), (('b', 2), 2), (('a', 2), 1)])
    
    res11 = rdd1.sum()
    print(res11)
    
    # 将kv格式数据转换成字典格式并统计key的个数
    res9 = rdd3.collectAsMap()
    print(type(res9))
    print(res9)
    
    #将结果rdd保存到hdfs的目录中,保存的目录是不存在的,会自动创建(目录存在会报错)
    rdd3.saveAsTextFile('/output/test')

词频统计

    from pyspark import SparkContext
    
    sc = SparkContext()
    #获取到文本文件的内容
    rdd1 = sc.textFile('/test/words.txt')
    
    #将文件中的数据拆分成单词形式
    rdd1_flatMap1 = rdd1.flatMap(lambda x:x.split(','))
    print(rdd1_flatMap1.collect())
    #结果['hadoop', 'flink', 'spark', 'hive', 'hive', 'spark', 'python', 'java', 'python', 'itcast', 'itheima']
    
    #将单词变为k,v格式
    rdd1_map = rdd1_flatMap1.map(lambda x:(x,1))
    print(rdd1_map.collect())
    #结果[('hadoop', 1), ('flink', 1), ('spark', 1), ('hive', 1), ('hive', 1), ('spark', 1), ('python', 1), ('java', 1), ('python', 1), ('itcast', 1), ('itheima', 1)]
    
    #对结果进行分组统计单词出现的次数
    rdd1_reductByKey1 = rdd1_map.reduceByKey(lambda x,y:x+y)
    print(rdd1_reductByKey1.collect())
    
    #对rdd的值进行排序,降序排列
    rdd1SortByKey1= rdd1_reductByKey1.sortBy(lambda x:x[1],ascending=False)
    print(rdd1SortByKey1.collect())
    
    
    ###################################第二种##################
    #使用聚合函数统计每个单词出现的数量
    rdd1_countByKey1 = rdd1_map.countByKey().items()
    print(rdd1_countByKey1)
    #结果dict_items([('hadoop', 1), ('flink', 1), ('spark', 2), ('hive', 2), ('python', 2), ('java', 1), ('itcast', 1), ('itheima', 1)])
    
    #对结果进行排序,并指定排序规则
    sorted_res = sorted(rdd1_countByKey1,key=lambda x:x[1],reverse=True)
    print(sorted_res)
    #结果[('spark', 2), ('hive', 2), ('python', 2), ('hadoop', 1), ('flink', 1), ('java', 1), ('itcast', 1), ('itheima', 1)]

Rdd的缓存

缓存是将数据存储再内存或者磁盘上,缓存的特点是计算结束后缓存自动清空

为什么使用缓存?

提升应用性能,容错文章来源地址https://www.toymoban.com/news/detail-792136.html

    """
    缓存(cache):将中间计算的rdd进行零食存储,临时持久化,默认存储再内存种族观念
    
    作用:提高应用程序性能(spark执行效率,sparrk计算速度快的原因之一);
    容错性(rdd计算失败后从缓存中获取以来的rdd)
    特点:缓存会随着应用程序结束自动清空,缓存会保留rdd之间的依赖关系(缓存rdd不可靠,可能会被清空,可以通过依赖关系计算新的rdd)
    什么rdd需要进行缓存:计算时间长,计算昂贵,迭代计算(重复是使用)
    缓存级别设置:指定缓存的rdd存储的位置
    算子:cache()  persist()
    对任务效率要求高
    """
    
    from pyspark import SparkContext
    from pyspark import StorageLevel
    
    sc = SparkContext()
    
    rdd_words = sc.textFile('/test/words.txt')
    
    rdd_split = rdd_words.flatMap(lambda s:s.split(','))
    
    rdd_map = rdd_split.map(lambda x:(x,1))
    
    
    rdd_reduceByKey = rdd_map.reduceByKey(lambda x,y:x+y)
    print(rdd_reduceByKey.is_cached)
    
    #定义一个缓存任务,当条用action算子时才会触发缓存任务
    rdd_reduceByKey.persist(storageLevel=StorageLevel.MEMORY_ONLY_2)
    print(rdd_reduceByKey.is_cached)
    
    # todo: 5-对kv数据格式的rdd以value值进行降序操作
    # rdd_reduceByKey通过依赖关系计算得到的 -> 缓存任务到目前为止还没有触发
    rdd_sortBy = rdd_reduceByKey.sortBy(keyfunc=lambda x: x[1], ascending=False)
    res = rdd_sortBy.collect()
    print(res)
    
    # rdd_reduceByKey是从缓存处获取的
    rdd_sortBy2 = rdd_reduceByKey.sortBy(keyfunc=lambda x: x[1], ascending=True)
    print(rdd_sortBy2.collect())
    
    
    # 手动释放缓存 正常情况下不需要手动释放(手动释放缓存后再指定action算子时就没有缓存可以触发)
    rdd_reduceByKey.unpersist()

checkpoint机制

    """
    checkpoint: 将中间计算的rdd永久持久化, rdd存储在HDFS上(分块多副本)
    作用: 和缓存一样
    特点: 永久存储, 不会保留rdd之间的依赖关系(存储可靠)
    任务延迟性没有要求可以是用checkpoint(磁盘IO操作)
    算子: checkpoint()  需要设置一个存储目录路径 sc.setCheckpointDir()
    """
    
    from pyspark import SparkContext
    from pyspark import StorageLevel  # 设置缓存级别
    
    # 创建sc对象
    sc = SparkContext()
    # 设置checkpoint目录路径
    sc.setCheckpointDir('/checkpointdata')
    
    # todo: 1-读取hdfs文件数据转换成rdd对象
    rdd_words = sc.textFile('/test/words.txt')
    
    # todo: 2-将rdd中元素以逗号,进行分割保存到新的rdd中
    rdd_split = rdd_words.flatMap(lambda x: x.split(','))
    
    # todo: 3-将拆分之后rdd构造成kv数据格式的rdd
    rdd_map = rdd_split.map(lambda x: (x, 1))
    
    # todo: 4-将kv数据格式的rdd根据key值进行分组,对value值进行累加操作
    rdd_reduceByKey = rdd_map.reduceByKey(lambda x, y: x + y)
    # 对rdd进行checkpoint操作
    print(rdd_reduceByKey.is_checkpointed)
    # 定义了一个checkpoint任务, 当调用action算子时才会触发checkpoint任务
    rdd_reduceByKey.checkpoint()
    print(rdd_reduceByKey.is_checkpointed)
    
    # todo: 5-对kv数据格式的rdd以value值进行降序操作
    # rdd_reduceByKey通过依赖关系计算得到的 -> checkpoint任务到目前为止还没有触发
    rdd_sortBy = rdd_reduceByKey.sortBy(keyfunc=lambda x: x[1], ascending=False)
    res = rdd_sortBy.collect()
    print(res)
    
    # rdd_reduceByKey是从checkpoint处获取的
    rdd_sortBy2 = rdd_reduceByKey.sortBy(keyfunc=lambda x: x[1], ascending=True)
    print(rdd_sortBy2.collect())

lambda表达式if-elif-else的使用

# 使用lambda表达式实现if-elif-else条件判断
# result = lambda x: 'A' if x > 90 else ('B' if x > 80 else ('C' if x > 70 else 'D'))
# print(result(85))  # 输出:B
# print(result(95))  # 输出:A
# print(result(75))  # 输出:C
# print(result(65))  # 输出:D

#需求六
#不同年龄段的数量  数量集年龄分布范围:20-80  20-39青年  40-59中年   60以上:老年
#年龄,加上标识
# rdd_urban =  rddflatMap.map(lambda x:(x[5],x[2],int(x[1])))
# print(rdd_urban.take(5))
# rdd_age = rddflatMap.map(lambda x:(x[1],'少年') if int(x[1])<=20 else '')

rdd_age = rddflatMap.map(lambda x:'青年' if 20<=int(x[1])<=39 else('中年' if 40<int(x[1])<=59 else ('老年' if int(x[1])>=60 else '少年')))

print(rdd_age.take(5))
rdd1 = rdd_age.countByKey()
print(rdd1)

到了这里,关于Spark中Rdd算子和Action算子--学习笔记的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Spark---RDD算子(单值类型转换算子)

    RDD算子是用于对RDD进行转换(Transformation)或行动(Action)操作的方法或函数。通俗来讲,RDD算子就是RDD中的函数或者方法,根据其功能,RDD算子可以分为两大类: 转换算子(Transformation): 转换算子用于从一个RDD生成一个新的RDD,但是原始RDD保持不变。常见的转换算子包括

    2024年01月21日
    浏览(50)
  • Spark中RDD的Transformation算子

    map算子的功能为做映射,即将原来的RDD中对应的每一个元素,应用外部传入的函数进行运算,返回一个新的RDD flatMap算子的功能为扁平化映射,即将原来RDD中对应的每一个元素应用外部的运算逻辑进行运算,然后再将返回的数据进行压平,类似先map,然后再flatten的操作,最后

    2024年02月11日
    浏览(40)
  • Spark源码解析(一):RDD之Transfrom算子

    RDD 代表的是分布式数据形态,因此,RDD 到 RDD 之间的转换,本质上是数据形态上的转换(Transformations) 在 RDD 的编程模型中,一共有两种算子,Transformations 类算子和 Actions 类算子。开发者需要使用 Transformations 类算子,定义并描述数据形态的转换过程,然后调用 Actions 类算子

    2024年01月20日
    浏览(44)
  • 2023_Spark_实验十:RDD基础算子操作

    Ø练习 1: Ø 练习 2: Ø 练习 3: Ø 练习 4: Ø 练习 5: groupByKey groupByKey会将RDD[key,value]按照相同的key进行分组,形成RDD[key,iterable[value]]的形式,有点类似于sql中的groupby,例如类似于mysql中的group_contact cogroup groupByKey是对单个RDD的数据进行分组,还可以使用一个叫作cogroup()的函

    2024年02月08日
    浏览(43)
  • 2023_Spark_实验十一:RDD高级算子操作

    coalesce : 总所周知,spark的rdd编程中有两个算子repartition和coalesce。公开的资料上定义为,两者都是对spark分区数进行调整的算子。         repartition会经过shuffle,其实际上就是调用的coalesce(shuffle=true)。         coalesce,默认shuffle=false,不会经过shuffle。         当

    2024年02月08日
    浏览(36)
  • Spark大数据处理讲课笔记---Spark RDD典型案例

    利用RDD计算总分与平均分 利用RDD统计每日新增用户 利用RDD实现分组排行榜 针对成绩表,计算每个学生总分和平均分   读取成绩文件,生成lines;定义二元组成绩列表;遍历lines,填充二元组成绩列表;基于二元组成绩列表创建RDD;对rdd按键归约得到rdd1,计算总分;将rdd1映射

    2024年02月06日
    浏览(50)
  • spark中Rdd依赖和SparkSQL介绍--学习笔记

    1.1概念 rdd的特性之一 相邻rdd之间存在依赖关系(因果关系) 窄依赖 每个父RDD的一个Partition最多被子RDD的一个Partition所使用 父rdd和子rdd的分区是一对一(多对一) 触发窄依赖的算子 map(),flatMap(),filter() 宽依赖 父RDD的一个partition会被子rdd的多个Partition所使用 父rdd和子rdd的

    2024年01月17日
    浏览(47)
  • Spark基础学习笔记----RDD检查点与共享变量

    了解RDD容错机制 理解RDD检查点机制的特点与用处 理解共享变量的类别、特点与使用 当Spark集群中的某一个节点由于宕机导致数据丢失,则可以通过Spark中的RDD进行容错恢复已经丢失的数据。RDD提供了两种故障恢复的方式,分别是 血统(Lineage)方式 和 设置检查点(checkpoint)

    2024年02月06日
    浏览(46)
  • Spark大数据分析与实战笔记(第三章 Spark RDD 弹性分布式数据集-02)

    人生很长,不必慌张。你未长大,我要担当。 传统的MapReduce虽然具有自动容错、平衡负载和可拓展性的优点,但是其最大缺点是采用非循环式的数据流模型,使得在迭代计算式要进行大量的磁盘IO操作。Spark中的RDD可以很好的解决这一缺点。 RDD是Spark提供的最重要的抽象概念

    2024年02月22日
    浏览(96)
  • Spark大数据处理讲课笔记--- RDD持久化机制

    理解RDD持久化的必要性 了解RDD的存储级别 学会如何查看RDD缓存 Spark中的RDD是懒加载的,只有当遇到行动算子时才会从头计算所有RDD,而且当同一个RDD被多次使用时,每次都需要重新计算一遍,这样会严重增加消耗。为了避免重复计算同一个RDD,可以将RDD进行持久化。 Spark中

    2024年02月06日
    浏览(44)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包