pyspark学习的第二篇会主要介绍RDD和相关的算子。

RDD的基本概念

在上层应用中,每个Spark应用都包含一个驱动程序,该驱动程序运行用户定义的主要功能并在集群上执行各种并行操作。Spark提供的主要数据抽象是RDD(Resilient Distirbuted Dataset: 弹性分布式数据集),它是跨集群节点分区的元素集合,可以执行并行计算。RDD是通过从Hadoop(或其支持的)文件系统中的文件开始并对其进行转换来创建的。用户还可以要求Spark将RDD持久化到内存中,以便在并行操作中有效地重用它。最后,RDD会自动从节点故障中恢复。

通俗一点的理解,RDD首先是一个dataset,即一个数据集合;然后是一个分布式的(distributed),既能分布储存,也能进行分布式计算;最后还是弹性的(resilient),表示它既能够存在硬盘中也能存在内存里,同时有较强的容错机制。

Spark中的第二个抽象是可以在并行操作中使用的共享变量。默认情况下,当 Spark 在不同节点上并行运行一个函数作为一组任务时,它会将函数中使用的每个变量的副本发送到每个任务。有时,需要在任务之间或在任务和驱动程序之间共享变量。Spark 支持两种类型的共享变量:广播变量,可用于在所有节点的内存中缓存值,以及累加器,它们是仅“添加”到的变量,例如计数器和总和。

## 载入packages
from pyspark import SparkContext
from pyspark import AccumulatorParam

import numpy as np
import pandas as pd

初始化Spark

首先需要SparkContext创建一个和spark集群的连接,这儿我们就是本地集群,同时给应用起名tuto_rdd。

sc = SparkContext('local','tuto_rdd')
sc

SparkContext

Spark UI

Version v3.2.1
Master local
AppName tuto_rdd

创建RDD

创建RDD有两种途径:

  1. 通过在驱动程序中将已有的集合并行化来创建(parallelized collection)
  2. 通过读取外部分布式数据系统,如HDFS、HBase等
## 并行化集合可以通过SparkContext下的parallelize方法来实现
data = [1,2,3,4]
rdd = sc.parallelize(data)

print(f"查看rdd所用分区数量:{rdd.getNumPartitions()}")
查看rdd所用分区数量:1
rdd = sc.parallelize(data,2)
print(f"查看rdd所用分区数量:{rdd.getNumPartitions()}")
print(f"查看rdd各自分区下元素集合:{rdd.glom().collect()}")
查看rdd所用分区数量:2
查看rdd各自分区下元素集合:[[1, 2], [3, 4]]
## 读取外部数据生成rdd。这儿没有分布式文件系统,就给出个example,不展开了。
rdd = sc.textFile("data.txt")

RDD的操作

RDD支持两种类型的操作(算子):Transformation(从现有数据集上创建新数据集)和Action(在数据集上进行计算后生成一个值返回给驱动程序)。比如,map将每个数据集元素通过函数传递并返回一个新RDD表示结果,是一种transformatino;而reduce使用某个函数聚合RDD的所有元素并将最终结果返回给驱动程序的操作,是一种action。

Spark中的所有转换都是惰性的,因为它们不会立即计算结果。相反,他们只记得应用于某些基础数据集(例如文件)的转换。仅当操作需要将结果返回给驱动程序时才计算转换。这种设计使 Spark 能够更高效地运行。默认情况下,每个转换操作后的(应用transformation后的)RDD可能会在我们每次对其运行操作时重新计算。但是,我们也可以使用持久化方法将RDD持久化到内存中,在这种情况下,Spark会将元素保留在集群上,下次查询时能更快地访问它。

举例:
如下代码中,前两行的代码并不会在编译的时候立马执行,而是后到最后一行的reduce触发的时候才执行,层层逆向推导。如果想保存lineLengths的结果到内存里,使用persist方法实现。

rdd_orig = sc.parallelize([1,2,3,4])
rdd_adj = rdd_orig.map(lambda s:s+1)
rdd_adj.persist()
res = rdd_adj.reduce(lambda a, b: a + b)
res
14

transformation算子

如下记录一些基本的transformation算子和对应的代码操作

## map:将func函数作用到数据集的每一个元素上,生成一个新的RDD返回

rdd = sc.parallelize([1,2,3,4])
rdd.map(lambda s:s+1).collect()
[2, 3, 4, 5]
## filter:选出所有func返回值为true的元素,生成一个新的RDD返回

rdd = sc.parallelize([1,2,3,4])
rdd.filter(lambda s:s%3==1).collect()
[1, 4]
## flatMap:先执行map的操作,再将所有对象合并为一个对象

rdd = sc.parallelize([[1,2],[3],[4,5]])
print(rdd.map(lambda l:list(np.array(l)+1)).collect())
print(rdd.flatMap(lambda l:list(np.array(l)+1)).collect())
[[2, 3], [4], [5, 6]]
[2, 3, 4, 5, 6]
## mapPartitions: 同map,不过是按分区并行的,且func需要是一个iterator->iterator的映射

def sum_ptt(data_ptt):
    yield sum(data_ptt)

def plus1_ptt(data_ptt):
    yield [ele+1 for ele in data_ptt]

rdd1 = sc.parallelize([1,2,3,4],2)    
print("if there are 2 partitions:")
print(rdd1.mapPartitions(sum_ptt).collect())
print(rdd1.mapPartitions(plus1_ptt).collect())

print("")
print("if just 1 partition:")
rdd2 = sc.parallelize([1,2,3,4],1)
print(rdd2.mapPartitions(sum_ptt).collect())
print(rdd2.mapPartitions(plus1_ptt).collect())

if there are 2 partitions:
[3, 7]
[[2, 3], [4, 5]]

if just 1 partition:
[10]
[[2, 3, 4, 5]]
## mapPartitionWithIndex: 同mapPartitions,但是入参带了index,(int,iterator)->iterator

rdd = sc.parallelize([1,2,3,4],2)
def sum_ptt_with_id(index,data_ptt):
    yield sum(data_ptt)

def get_ptt_id(index,data_ptt):
    yield index
    
print(rdd.mapPartitionsWithIndex(sum_ptt_with_id).collect())
print(rdd.mapPartitionsWithIndex(get_ptt_id).collect())
[3, 7]
[0, 1]
## sample: 随机取样,无法确定具体取样的数量,参数fraction表示每个元素按古典抽样的抽样概率。我觉得用处不大

rdd = sc.parallelize(range(10),2)
rdd.sample(withReplacement=False,fraction=0.5,seed=2).collect()
[6, 8]
## union: 2个rdd取并,不去重
## intersection:2个rdd取交
## distinct: 取唯一值
## cartesian: 取笛卡尔积,返回2个rdd的全组合排列

rdd1 = sc.parallelize([1,2,3,3],2)
rdd2 = sc.parallelize([2,3,4],3)

print(rdd1.union(rdd2).collect())
print(rdd1.intersection(rdd2).collect())
print(rdd1.union(rdd2).distinct().collect())
print(rdd1.cartesian(rdd2).collect())

[1, 2, 3, 3, 2, 3, 4]
[2, 3]
[1, 2, 3, 4]
[(1, 2), (2, 2), (1, 3), (2, 3), (1, 4), (2, 4), (3, 2), (3, 2), (3, 3), (3, 3), (3, 4), (3, 4)]
## groupByKey:以第一个元素作为key进行原始rdd进行分组,返回一个新的rdd。(Key,Value)->(Key, Iterator)。

rdd = sc.parallelize([("a",1),("b",2),("a",3),("c",4)],2)
res = rdd.groupByKey().collect()
print("元素a的结果(转化成list展示):")
print(list(res[2][1]))
print("原始结果")
res
元素a的结果(转化成list展示):
[1, 3]
原始结果





[('b', <pyspark.resultiterable.ResultIterable at 0xffff6743b4f0>),
 ('c', <pyspark.resultiterable.ResultIterable at 0xffff674b7fa0>),
 ('a', <pyspark.resultiterable.ResultIterable at 0xffff674b7eb0>)]
## reduceByKey:将key相同的键值对,按照Function进行计算。(Key,Value)->(Key,Value),func 需要是(V,V)->V

rdd = sc.parallelize([("a",1),("b",2),("a",3),("a",4)],2)
    
rdd.reduceByKey(lambda x,y:min(x,y)).collect()
[('b', 2), ('a', 1)]
## aggregateByKey:和reduceByKey一样,不过键值对里的“取值”的出入参可以不同。

rdd = sc.parallelize([("a",1),("b",2),("a",3),("a",4)],4)
rdd.aggregateByKey(0,lambda x,y:x+y,lambda x,y:min(x,y)).collect()
[('b', 2), ('a', 1)]
## sortByKey: 根据第一个元素排序,然后返回

rdd = sc.parallelize([("a",1),("b",2),("c",3),("a",4)],4)
rdd.sortByKey().collect()
[('a', 1), ('a', 4), ('b', 2), ('c', 3)]
## join: 两个数据按照第一个元素作为key的全连接

rdd1 = sc.parallelize([("a",1),("b",2),("c",3),("a",4)])
rdd2 = sc.parallelize([("a",10),("b",20),("c",30)])

rdd1.join(rdd2).collect()
[('b', (2, 20)), ('c', (3, 30)), ('a', (1, 10)), ('a', (4, 10))]
## coalesce: 把rdd的分区数量降低至指定数量

rdd = sc.parallelize([1,2,3,4],4)
rdd.coalesce(2).glom().collect()
[[1, 2], [3, 4]]

action算子

如下记录一些基本的action算子和对应的代码操作

## collect:将rdd里的信息返回成一个list,存在内存里

rdd = sc.parallelize([1,2,3,4])
rdd.collect()
[1, 2, 3, 4]
## reduce: 将rdd中元素两两传递给输入函数,同时产生一个新的值,新产生的值与RDD中下一个元素再被传递给输入函数直到最后只有一个值为止。
##         相当于输入函数需要满足交换律和结合律(i.e. f(a,b)=f(b,a) & f(f(a,b),c)=f(a,f(b,c)))

rdd = sc.parallelize([1,2,3,4],4)
rdd.reduce(lambda x,y:min(x,y))

1
## count: 计数

rdd = sc.parallelize([1,2,3,4],4)
rdd.count()
4
## take: 返回前n个元素
## takeSample: 随机取n个元素
## takeOrdered: 先排序再返回前n个元素

rdd = sc.parallelize([1,2,4,3],4)
print(rdd.take(2))
print(rdd.takeSample(withReplacement=False,num=2,seed=1))
print(rdd.takeOrdered(3))
[1, 2]
[3, 1]
[1, 2, 3]
## coutByKey: 根据key(第一个元素)分组计数
## countByValue: 根据value(第二个元素)分组计数

rdd = sc.parallelize([("a",1),("b",2),("c",3),("a",4)])
print(rdd.countByKey())
print(rdd.countByValue())
defaultdict(<class 'int'>, {'a': 2, 'b': 1, 'c': 1})
defaultdict(<class 'int'>, {('a', 1): 1, ('b', 2): 1, ('c', 3): 1, ('a', 4): 1})

共享变量

通常情况下,当上述介绍的这些Spark算子在远程集群节点上执行时,它中间调用的函数所使用的变量是在单独副本上工作的,这些变量将被复制到每台机器/节点上,并且对远程机器上的变量的更新不会传播回驱动程序。支持跨任务的通用读写共享变量效率非常低,于是Spark确实为两种常见的使用模式提供了两种有限类型的共享变量:broadcast variableaccumulator

p.s.以下只是简单的说一下我对这两个共享变量的理解,我觉得这些主要是在资源调度上的考虑,目前学习中还没有真正弄懂其中的奥妙,可能需要在实际实践中体会

broadcast variable

简单的说,broadcast variable相当于是在每台机器上缓存的一个只读变量,而不用随任务一起发送它的副本,目的是为了降低通信成本。

它的使用是通过SparkContext下的broadcast方法去封装原始数据信息:

brdvar = sc.broadcast([1,2,3,4])
brdvar.value
[1, 2, 3, 4]

accumulator

accumulator是一个满足交换律和结合律的只“加”变量(此处“加”可视为一个抽象的映射,和reduce算子中的func的所需的特性一样),因此可以有效地并行支持。它可用于实现计数器或求和,Spark原生支持数值类型的accumulator,使用的时候可以自定义添加对新类型的支持。

accumulator通过SparkContext下的accumulator方法来创建,需要传入一个初始值。accumulator同样满足Spark的惰性计算,即需要action来触发结果的收集返回。

同时,如果想要自定义不同类型的accumulator,可以通过传入accum_param来实现,accum_param可以通过继承AccumulatorParam来创建。AccumulatorParam里面需要定义2个方法:zero和addInPlace。前者用来定义0值类型,后者用来定义抽象“加”法的计算逻辑。

accum = sc.accumulator(0)
def f(x):
    accum.add(x)
    return x

sc.parallelize([1,2,3,4],3).map(f)
print(f"1.仅进行transformer,accum={accum}")
sc.parallelize([1,2,3,4],3).map(f).collect()
print(f"2.进行action,accum={accum}")

1.仅进行transformer,accum=0
2.进行action,accum=10
class vec_accumparam(AccumulatorParam):
    def zero(self, v):
        return [0]*len(v)

    def addInPlace(self, v1, v2):
        try: 
            return [v1[i]+v2[i] for i in range(len(v1))]
        except:
            return [v1[i]+0 for i in range(len(v1))]

accum_vec = sc.accumulator([1,2,3,4], vec_accumparam())
accum_vec.add([2,2,2,2])
accum_vec.value
[3, 4, 5, 6]
class str_accumparam(AccumulatorParam):
    def zero(self, v):
        return ""

    def addInPlace(self, v1, v2):
        return v1+v2

accum_str = sc.accumulator("a", str_accumparam())
accum_str.add("b")
accum_str.value
'ab'