pyspark学习的第三篇会介绍DataFrame和其相关的操作运用。

引言

PySpark DataFrame是惰性计算的,可以算是RDD的上层类。同时,DataFrame也可以看成是行DataSet。

以下将会记录DataFrame的创造、操作和UDF的运用,最后记录了SQL查询作用于DataFrame。

from pyspark.sql import SparkSession
from pyspark import SparkFiles, Row
from pyspark.sql.types import *
from pyspark.sql.functions import pandas_udf,PandasUDFType

import pandas as pd
import numpy as np

import datetime
spark = SparkSession.builder.appName("example_sparksql").config("spark.some.config.option", "some-value").getOrCreate()
spark.sparkContext

SparkContext

Spark UI

Version
v3.2.1
Master
local[*]
AppName
example_sparksql

创造Spark DataFrame

以下展示常用的4种方式来创建Spark DataFrame

schema = StructType([
    StructField("age", IntegerType(), True),
    StructField("name", StringType(), True)
])

## 1.read from https
print("---- 1.read from https and schema given ---- \n")
url = "https://raw.githubusercontent.com/apache/spark/master/examples/src/main/resources/people.json"
spark.sparkContext.addFile(url)
df1 = spark.read.json(SparkFiles.get("people.json"),schema=schema)
print("df1's content is: ")
df1.show()
print("df1's schema is: ")
df1.printSchema()


## 2.create from pd.DataFrame
print("---- 2.1. duplicate a same table in pd.DF then create from it ---- \n")
df2 = spark.createDataFrame(pd.DataFrame({"age":[None,30,19],"name":["Michael","Andy","Justin"]}))
print("df2's content is: ")
df2.show()
print("df2's schema is: ")
df2.printSchema()

print("---- 2.2. find the diff with 1. and adjusting ---- \n")
print("change float nan to Null, then check df2's content:")
df2 = df2.replace(float('nan'),None)
df2.show()
print("adjust df2's schema:")
df2 = df2.withColumn("age",df2.age.cast(IntegerType()))
df2.printSchema()


## 3.create by Row
print("---- 3. Using [Row(.)] to create DataFrame ---- \n")
df3 = spark.createDataFrame([
    Row(age=None, name='Michael'),
    Row(age=30, name='Andy'),
    Row(age=19, name='Justin')
], schema="age int, name string")
print("show df3's schema:")
df3.printSchema()


## 4.create from rdd
print("---- 4. Creating RDD then convert to DataFrame ---- \n")
rdd = spark.sparkContext.parallelize([
    (None, 'Michael'),
    (30, 'Andy'),
    (19, 'Justin')
])
df4 = spark.createDataFrame(rdd, schema=schema)
print("show df4's schema:")
df4.printSchema()
---- 1.read from https and schema given ---- 

df1's content is: 
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

df1's schema is: 
root
 |-- age: integer (nullable = true)
 |-- name: string (nullable = true)

---- 2.1. duplicate a same table in pd.DF then create from it ---- 

df2's content is: 
+----+-------+
| age|   name|
+----+-------+
| NaN|Michael|
|30.0|   Andy|
|19.0| Justin|
+----+-------+

df2's schema is: 
root
 |-- age: double (nullable = true)
 |-- name: string (nullable = true)

---- 2.2. find the diff with 1. and adjusting ---- 

change float nan to Null, then check df2's content:
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|30.0|   Andy|
|19.0| Justin|
+----+-------+

adjust df2's schema:
root
 |-- age: integer (nullable = true)
 |-- name: string (nullable = true)

---- 3. Using [Row(.)] to create DataFrame ---- 

show df3's schema:
root
 |-- age: integer (nullable = true)
 |-- name: string (nullable = true)

---- 4. Creating RDD then convert to DataFrame ---- 

show df4's schema:
root
 |-- age: integer (nullable = true)
 |-- name: string (nullable = true)

操作Spark DataFrame

df = df4

查看数据

df是惰性求值的,通过.show()来查看数据。df.columns可以展示表头。

如果需要开启及时求值(‘即查即得’)模式,需要去配置项里进行更改:将spark.sql.repl.eagerEval.enabled设置为True。同时spark.sql.repl.eagerEval.maxNumRows可以设置最大行数。

df.collect()其实在df内含的rdd上做collect()操作,也可以展示df的信息。取前/后几排操作可以使用df.take()df.tail()实现,不同这些也都是在rdd上进行的操作,所以结果的类型并不是一个dataframe

df同样可以转化为pandas.DataFrame。

df.show()
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+
df.columns
['age', 'name']
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)
df
agename
nullMichael
30Andy
19Justin
df.collect()
[Row(age=None, name='Michael'),
 Row(age=30, name='Andy'),
 Row(age=19, name='Justin')]
df.take(1)
[Row(age=None, name='Michael')]
df.toPandas()
age name
0 NaN Michael
1 30.0 Andy
2 19.0 Justin

筛选数据

pyspark里这一块最核心的类型是Column,所有的Column都可以通过.select()方法来获取展示。

spark.conf.set('spark.sql.repl.eagerEval.enabled', False)
df.select("age").show()
+----+
| age|
+----+
|null|
|  30|
|  19|
+----+
df.filter(df.age>20).show()
df[df.age>20].show()
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+

+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+
df.withColumn("agenull_flag",df.age.isNotNull()).show()
+----+-------+------------+
| age|   name|agenull_flag|
+----+-------+------------+
|null|Michael|       false|
|  30|   Andy|        true|
|  19| Justin|        true|
+----+-------+------------+

apply自定义函数

使用pandas UDF(User Defined Function)可以很简单的自定义想要应用的函数。它的作用机理是:由Spark执行,使用Arrow(一种用于内存计算的列式数据结构的库,可以极大的增强JVM和Python之间的数据传输效率,具体可见这篇文章)传输数据,通过Pandas执行数据计算,允许向量化操作。

Pandas UDF通过使用pandas_udf作为装饰器包装函数来定义,不需要额外的配置。

df = spark.createDataFrame([
    Row(name='A', age=20, weight=80.),
    Row(name='A', age=21, weight=85.),
    Row(name='A', age=22, weight=83.),
    Row(name='A', age=23, weight=78.),
    Row(name='B', age=30, weight=60.),
    Row(name='B', age=31, weight=65.),
    Row(name='B', age=32, weight=70.),    
],schema="name string,age long,weight double")

df.printSchema()
df.show()
root
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)
 |-- weight: double (nullable = true)

+----+---+------+
|name|age|weight|
+----+---+------+
|   A| 20|  80.0|
|   A| 21|  85.0|
|   A| 22|  83.0|
|   A| 23|  78.0|
|   B| 30|  60.0|
|   B| 31|  65.0|
|   B| 32|  70.0|
+----+---+------+
## 使用pandas_udf装饰器来定义udf有一下2个要点:
## 1.@pandas_udf里需要定义出参的数据格式 (pyspark.sql.types里的那些或者其对应的简写)
## 2.def的函数需定义入出参的类型,常用的有如下2种:
##   ---- pd.Series -> pd.Series
##   ---- pd.Series -> Scalar(单值,这儿需给出具体取值的类型:str、int等)

@pandas_udf('long')
def plus_one(series: pd.Series) -> pd.Series:
    return series + 1
df.select(plus_one(df["age"]).alias("age_plus1")).show()

@pandas_udf('double')
def get_mean(series: pd.Series) -> float:
    return series.mean()
df.select(get_mean(df["weight"]).alias("weight_mean")).show()

@pandas_udf('double')
def weight2age(age:pd.Series, wt:pd.Series) -> pd.Series:
    return (wt/age).round(2)
df.select(weight2age(df["age"],df["weight"]).alias("wt2age")).show()

+---------+
|age_plus1|
+---------+
|       21|
|       22|
|       23|
|       24|
|       31|
|       32|
|       33|
+---------+

+-----------------+
|      weight_mean|
+-----------------+
|74.42857142857143|
+-----------------+

+------+
|wt2age|
+------+
|   4.0|
|  4.05|
|  3.77|
|  3.39|
|   2.0|
|   2.1|
|  2.19|
+------+
## 在groupby的基础上也可以使用udf(相当于是udaf)。pd.DataFrame -> pd.DataFrame

## 老版apply,通过@pandas_udf装饰器,第二个参数必须使用PandasUDFType.GROUPED_MAP
@pandas_udf('name string, alpha float, beta float',PandasUDFType.GROUPED_MAP)
def get_fitline_old(pdf):
    res = np.polyfit(pdf["weight"],pdf["age"],1)
    beta = round(res[0],2)
    alpha = round(res[1],2)
    name = pdf["name"][0]
    return pd.DataFrame({"name":[name],"alpha":[alpha],"beta":[beta]})
print("---- 这是老版的apply ----")
df.groupby("name").apply(get_fitline_old).show()


## 新版applyInPandas,不需要装饰器,但是需要在applyInPandas里给定schema
print("---- 这是新版的applyInPandas ----")
def get_fitline_new(pdf):
    res = np.polyfit(pdf["weight"],pdf["age"],1)
    beta = round(res[0],2)
    alpha = round(res[1],2)
    name = pdf["name"][0]
    return pd.DataFrame({"name":[name],"alpha":[alpha],"beta":[beta]})
df.groupby("name").applyInPandas(get_fitline_new,schema="name string, alpha double, beta double").show()

---- 这是老版的apply ----
+----+-----+-----+
|name|alpha| beta|
+----+-----+-----+
|   A|32.74|-0.14|
|   B| 18.0|  0.2|
+----+-----+-----+

---- 这是新版的applyInPandas ----
+----+-----+-----+
|name|alpha| beta|
+----+-----+-----+
|   A|32.74|-0.14|
|   B| 18.0|  0.2|
+----+-----+-----+

在Spark DataFrame里操作SQL指令

针对某个spark dataframe使用createOrReplaceTempView()来进行数据库表的注册,然后通过spark.sql发起SQL操作。

同样,之前定义的pandas_udf函数可以使用在SQL查询里,通过spark.udf.register进行注册,然后在sql文里就可以使用了。

df.createOrReplaceTempView("table_tmp")
spark.sql("select * from table_tmp where name='A'").show()
+----+---+------+
|name|age|weight|
+----+---+------+
|   A| 20|  80.0|
|   A| 21|  85.0|
|   A| 22|  83.0|
|   A| 23|  78.0|
+----+---+------+
spark.udf.register("plus_one", plus_one)
spark.sql("SELECT plus_one(age) as age_plus1 FROM table_tmp").show()

+---------+
|age_plus1|
+---------+
|       21|
|       22|
|       23|
|       24|
|       31|
|       32|
|       33|
+---------+