RDD算子
filter
"""
rdd.filter(f):根据f函数中的判断条件对rdd追踪的数据进行过滤
保留条件为True对应的rdd数据
"""
from pyspark import SparkContext
sc = SparkContext()
rdd1 = sc.parallelize([1,2,3,4])
rdd2 = sc.parallelize(['a','b','c'])
rdd1_filter1 = rdd1.filter(lambda x:x%2==0)
print(rdd1_filter1.collect())
rdd2_filter = rdd2.filter(lambda x:x=='a')
print(rdd2_filter.collect())
distinct
"""
rdd.distinct():对rdd中的数据进行去重操作
"""
from pyspark import SparkContext
sc = SparkContext()
rdd1 = sc.parallelize([1,2,3,4,2])
#distinct中不需要参数,它可以直接对rdd进行去重
rdd1_distinct = rdd1.distinct()
print(rdd1_distinct.collect())
groupBy
from pyspark import SparkContext
sc = SparkContext()
rdd1 = sc.parallelize([1,2,3,4,2])
rdd2 = sc.parallelize(['a','b','c','a'])
#对rdd1中的数据进行hash计算后和2进行整除,证据余数值进行分组操作
#hash(x)% 2:获取余数,返回值,0,1 0是能被2整除 1是不能被2整除
rdd2_groupBy1 = rdd2.groupBy(lambda x:hash(x)%2)
# print(rdd2_groupBy1.collect())
for k,v in rdd2_groupBy1.collect():
print(k)
print(list(v))
print(tuple(v))
#拿到groupBy后的值需要通过mapValues获取
rdd2_mapValues = rdd2_groupBy1.mapValues(lambda x:list(x))
print(rdd2_mapValues.collect())
groupByKey,sortBy,SortByKey
"""
kv数据格式的rdd:rdd中的每个元素是一个容器,容器中国有两个值
①[(key1,value1),(key2,value2)……]
②[([k1,k2],value1),([k1,k2],value2)……]多个key一个value
③[([k1],[v1,v2]),([k2],[v1,v2])……]一个key多个value
④[([k1,k2],[v1,v2]),([k3,k4],[v1,v2])……]多个key,多个value
算子:
rdd.groupByKey():对kv数据格式的rdd分局可以值进行分组,相同key值对应的value值合并到一起
rdd.sortByKey():根据rdd的value值进行排序,可以通过ascending=进行调整,默认为升序
rdd.sorBy(keyfunc):对kv数据格式的rdd经过可以func函数确定函数排序规则
"""
from pyspark import SparkContext
sc = SparkContext()
rdd1 = sc.parallelize([1,2,3,4,2])
rdd2 = sc.parallelize(['a','b','c','a'])
rdd3 = sc.parallelize([('a',1),('b',2),('a',2),('b',2)])
rdd3_groupBykey1 = rdd3.groupByKey()
rdd3_map1 = rdd3_groupBykey1.mapValues(lambda x:list(x))
print(rdd3_map1.collect())
#结果:[('b', [2, 2]), ('a', [1, 2])]
rdd_reduceByKey1 = rdd3.reduceByKey(lambda x,y:x+y)
print(rdd_reduceByKey1.collect())
#结果[('b', 4), ('a', 3)]
####sortByKey()
#ascending= ,默认是升序排列
rdd3_SortByKey1 = rdd3.sortByKey()
print(rdd3_SortByKey1.collect())
#根据rdd的value值进行降序排序
rdd_sortBy1 = rdd3.sortBy(keyfunc = lambda x:x[1],ascending=False)
print(rdd_sortBy1.collect())
#根据key值对rdd中的元素进行排序操作
rdd_sortBy2 = rdd3.sortBy(keyfunc= lambda x:x,ascending=False)
print(rdd_sortBy2.collect())
rdd之间的连接
"""
rdd1.join(rdd2) 内连接
rdd1.leftOuterJoin(rdd2)左外连接
rdd1.rightOuterJoin(rdd2)左外连接
rdd1.fullOuterjoin(rdd2)
"""
from pyspark import SparkContext
sc = SparkContext()
rdd1 = sc.parallelize([('a',1),('b',2),('d',5)])
rdd2 = sc.parallelize([('a',3),('b',6),('c',3)])
#直接连接只会显示双方都有的元素
print(rdd1.join(rdd2).collect())
#左连接显示左rdd拥有的元素,右边没有的位置用None替代
print(rdd1.leftOuterJoin(rdd2).collect())
#右连接显示右rdd拥有的元素,左边没有的位置用None替代
print(rdd1.rightOuterJoin(rdd2).collect())
#全连接,两个表的数据都保存
print(rdd1.fullOuterJoin(rdd2).collect())
Action算子
collect,take,count()类的聚合算子,saveAsTextFile,
统计算子,countByKey()
countByKey().items()
countByValue() , countByValue().items()
from pyspark import SparkContext
sc = SparkContext()
rdd1 = sc.parallelize([1, 2, 3, 4, 2])
rdd2 = sc.parallelize(['a', 'b', 'c', 'a'])
# collect():获取rdd中所有的数据,注意点:使用此算子时要考虑rdd对象的数据量问题
# 可能存在内存溢出的问题
rdd1_collect1 = rdd1.collect()
print(rdd1_collect1)
# take():获取指定数量的数据
rdd1_take = rdd1.take(num=2)
print(type(rdd1_take))
print(rdd1_take)
# 聚合算子count(),sum(),min(),max()
res3 = rdd1.count()
print(res3)
rdd3 = sc.parallelize([('a', 1), ('b', 2), ('a', 2), ('b', 2)])
res7 = rdd3.countByKey()
print(res7)
# 结果defaultdict(<class 'int'>, {'a': 2, 'b': 2})
res9 = rdd3.countByValue()
print(res9)
# 统计key值的次数,并且将类型转换为dict_items
res10 = rdd3.countByKey().items()
print(type(res10))
# <class 'dict_items'>
print(res10)
# 结果dict_items([('a', 2), ('b', 2)])
# 统计value的次数
res8 = rdd3.countByValue().items()
print(res8)
# 结果dict_items([(('a', 1), 1), (('b', 2), 2), (('a', 2), 1)])
res11 = rdd1.sum()
print(res11)
# 将kv格式数据转换成字典格式并统计key的个数
res9 = rdd3.collectAsMap()
print(type(res9))
print(res9)
#将结果rdd保存到hdfs的目录中,保存的目录是不存在的,会自动创建(目录存在会报错)
rdd3.saveAsTextFile('/output/test')
词频统计
from pyspark import SparkContext
sc = SparkContext()
#获取到文本文件的内容
rdd1 = sc.textFile('/test/words.txt')
#将文件中的数据拆分成单词形式
rdd1_flatMap1 = rdd1.flatMap(lambda x:x.split(','))
print(rdd1_flatMap1.collect())
#结果['hadoop', 'flink', 'spark', 'hive', 'hive', 'spark', 'python', 'java', 'python', 'itcast', 'itheima']
#将单词变为k,v格式
rdd1_map = rdd1_flatMap1.map(lambda x:(x,1))
print(rdd1_map.collect())
#结果[('hadoop', 1), ('flink', 1), ('spark', 1), ('hive', 1), ('hive', 1), ('spark', 1), ('python', 1), ('java', 1), ('python', 1), ('itcast', 1), ('itheima', 1)]
#对结果进行分组统计单词出现的次数
rdd1_reductByKey1 = rdd1_map.reduceByKey(lambda x,y:x+y)
print(rdd1_reductByKey1.collect())
#对rdd的值进行排序,降序排列
rdd1SortByKey1= rdd1_reductByKey1.sortBy(lambda x:x[1],ascending=False)
print(rdd1SortByKey1.collect())
###################################第二种##################
#使用聚合函数统计每个单词出现的数量
rdd1_countByKey1 = rdd1_map.countByKey().items()
print(rdd1_countByKey1)
#结果dict_items([('hadoop', 1), ('flink', 1), ('spark', 2), ('hive', 2), ('python', 2), ('java', 1), ('itcast', 1), ('itheima', 1)])
#对结果进行排序,并指定排序规则
sorted_res = sorted(rdd1_countByKey1,key=lambda x:x[1],reverse=True)
print(sorted_res)
#结果[('spark', 2), ('hive', 2), ('python', 2), ('hadoop', 1), ('flink', 1), ('java', 1), ('itcast', 1), ('itheima', 1)]
Rdd的缓存
缓存是将数据存储再内存或者磁盘上,缓存的特点是计算结束后缓存自动清空
为什么使用缓存?文章来源:https://www.toymoban.com/news/detail-792136.html
提升应用性能,容错文章来源地址https://www.toymoban.com/news/detail-792136.html
"""
缓存(cache):将中间计算的rdd进行零食存储,临时持久化,默认存储再内存种族观念
作用:提高应用程序性能(spark执行效率,sparrk计算速度快的原因之一);
容错性(rdd计算失败后从缓存中获取以来的rdd)
特点:缓存会随着应用程序结束自动清空,缓存会保留rdd之间的依赖关系(缓存rdd不可靠,可能会被清空,可以通过依赖关系计算新的rdd)
什么rdd需要进行缓存:计算时间长,计算昂贵,迭代计算(重复是使用)
缓存级别设置:指定缓存的rdd存储的位置
算子:cache() persist()
对任务效率要求高
"""
from pyspark import SparkContext
from pyspark import StorageLevel
sc = SparkContext()
rdd_words = sc.textFile('/test/words.txt')
rdd_split = rdd_words.flatMap(lambda s:s.split(','))
rdd_map = rdd_split.map(lambda x:(x,1))
rdd_reduceByKey = rdd_map.reduceByKey(lambda x,y:x+y)
print(rdd_reduceByKey.is_cached)
#定义一个缓存任务,当条用action算子时才会触发缓存任务
rdd_reduceByKey.persist(storageLevel=StorageLevel.MEMORY_ONLY_2)
print(rdd_reduceByKey.is_cached)
# todo: 5-对kv数据格式的rdd以value值进行降序操作
# rdd_reduceByKey通过依赖关系计算得到的 -> 缓存任务到目前为止还没有触发
rdd_sortBy = rdd_reduceByKey.sortBy(keyfunc=lambda x: x[1], ascending=False)
res = rdd_sortBy.collect()
print(res)
# rdd_reduceByKey是从缓存处获取的
rdd_sortBy2 = rdd_reduceByKey.sortBy(keyfunc=lambda x: x[1], ascending=True)
print(rdd_sortBy2.collect())
# 手动释放缓存 正常情况下不需要手动释放(手动释放缓存后再指定action算子时就没有缓存可以触发)
rdd_reduceByKey.unpersist()
checkpoint机制
"""
checkpoint: 将中间计算的rdd永久持久化, rdd存储在HDFS上(分块多副本)
作用: 和缓存一样
特点: 永久存储, 不会保留rdd之间的依赖关系(存储可靠)
任务延迟性没有要求可以是用checkpoint(磁盘IO操作)
算子: checkpoint() 需要设置一个存储目录路径 sc.setCheckpointDir()
"""
from pyspark import SparkContext
from pyspark import StorageLevel # 设置缓存级别
# 创建sc对象
sc = SparkContext()
# 设置checkpoint目录路径
sc.setCheckpointDir('/checkpointdata')
# todo: 1-读取hdfs文件数据转换成rdd对象
rdd_words = sc.textFile('/test/words.txt')
# todo: 2-将rdd中元素以逗号,进行分割保存到新的rdd中
rdd_split = rdd_words.flatMap(lambda x: x.split(','))
# todo: 3-将拆分之后rdd构造成kv数据格式的rdd
rdd_map = rdd_split.map(lambda x: (x, 1))
# todo: 4-将kv数据格式的rdd根据key值进行分组,对value值进行累加操作
rdd_reduceByKey = rdd_map.reduceByKey(lambda x, y: x + y)
# 对rdd进行checkpoint操作
print(rdd_reduceByKey.is_checkpointed)
# 定义了一个checkpoint任务, 当调用action算子时才会触发checkpoint任务
rdd_reduceByKey.checkpoint()
print(rdd_reduceByKey.is_checkpointed)
# todo: 5-对kv数据格式的rdd以value值进行降序操作
# rdd_reduceByKey通过依赖关系计算得到的 -> checkpoint任务到目前为止还没有触发
rdd_sortBy = rdd_reduceByKey.sortBy(keyfunc=lambda x: x[1], ascending=False)
res = rdd_sortBy.collect()
print(res)
# rdd_reduceByKey是从checkpoint处获取的
rdd_sortBy2 = rdd_reduceByKey.sortBy(keyfunc=lambda x: x[1], ascending=True)
print(rdd_sortBy2.collect())
lambda表达式if-elif-else的使用
# 使用lambda表达式实现if-elif-else条件判断
# result = lambda x: 'A' if x > 90 else ('B' if x > 80 else ('C' if x > 70 else 'D'))
# print(result(85)) # 输出:B
# print(result(95)) # 输出:A
# print(result(75)) # 输出:C
# print(result(65)) # 输出:D
#需求六
#不同年龄段的数量 数量集年龄分布范围:20-80 20-39青年 40-59中年 60以上:老年
#年龄,加上标识
# rdd_urban = rddflatMap.map(lambda x:(x[5],x[2],int(x[1])))
# print(rdd_urban.take(5))
# rdd_age = rddflatMap.map(lambda x:(x[1],'少年') if int(x[1])<=20 else '')
rdd_age = rddflatMap.map(lambda x:'青年' if 20<=int(x[1])<=39 else('中年' if 40<int(x[1])<=59 else ('老年' if int(x[1])>=60 else '少年')))
print(rdd_age.take(5))
rdd1 = rdd_age.countByKey()
print(rdd1)
到了这里,关于Spark中Rdd算子和Action算子--学习笔记的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!