pyspark学习的第三篇会介绍DataFrame和其相关的操作运用。
引言
PySpark DataFrame是惰性计算的,可以算是RDD的上层类。同时,DataFrame也可以看成是行DataSet。
以下将会记录DataFrame的创造、操作和UDF的运用,最后记录了SQL查询作用于DataFrame。
from pyspark.sql import SparkSession
from pyspark import SparkFiles, Row
from pyspark.sql.types import *
from pyspark.sql.functions import pandas_udf,PandasUDFType
import pandas as pd
import numpy as np
import datetime
spark = SparkSession.builder.appName("example_sparksql").config("spark.some.config.option", "some-value").getOrCreate()
spark.sparkContext
创造Spark DataFrame
以下展示常用的4种方式来创建Spark DataFrame
schema = StructType([
StructField("age", IntegerType(), True),
StructField("name", StringType(), True)
])
## 1.read from https
print("---- 1.read from https and schema given ---- \n")
url = "https://raw.githubusercontent.com/apache/spark/master/examples/src/main/resources/people.json"
spark.sparkContext.addFile(url)
df1 = spark.read.json(SparkFiles.get("people.json"),schema=schema)
print("df1's content is: ")
df1.show()
print("df1's schema is: ")
df1.printSchema()
## 2.create from pd.DataFrame
print("---- 2.1. duplicate a same table in pd.DF then create from it ---- \n")
df2 = spark.createDataFrame(pd.DataFrame({"age":[None,30,19],"name":["Michael","Andy","Justin"]}))
print("df2's content is: ")
df2.show()
print("df2's schema is: ")
df2.printSchema()
print("---- 2.2. find the diff with 1. and adjusting ---- \n")
print("change float nan to Null, then check df2's content:")
df2 = df2.replace(float('nan'),None)
df2.show()
print("adjust df2's schema:")
df2 = df2.withColumn("age",df2.age.cast(IntegerType()))
df2.printSchema()
## 3.create by Row
print("---- 3. Using [Row(.)] to create DataFrame ---- \n")
df3 = spark.createDataFrame([
Row(age=None, name='Michael'),
Row(age=30, name='Andy'),
Row(age=19, name='Justin')
], schema="age int, name string")
print("show df3's schema:")
df3.printSchema()
## 4.create from rdd
print("---- 4. Creating RDD then convert to DataFrame ---- \n")
rdd = spark.sparkContext.parallelize([
(None, 'Michael'),
(30, 'Andy'),
(19, 'Justin')
])
df4 = spark.createDataFrame(rdd, schema=schema)
print("show df4's schema:")
df4.printSchema()
---- 1.read from https and schema given ----
df1's content is:
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
df1's schema is:
root
|-- age: integer (nullable = true)
|-- name: string (nullable = true)
---- 2.1. duplicate a same table in pd.DF then create from it ----
df2's content is:
+----+-------+
| age| name|
+----+-------+
| NaN|Michael|
|30.0| Andy|
|19.0| Justin|
+----+-------+
df2's schema is:
root
|-- age: double (nullable = true)
|-- name: string (nullable = true)
---- 2.2. find the diff with 1. and adjusting ----
change float nan to Null, then check df2's content:
+----+-------+
| age| name|
+----+-------+
|null|Michael|
|30.0| Andy|
|19.0| Justin|
+----+-------+
adjust df2's schema:
root
|-- age: integer (nullable = true)
|-- name: string (nullable = true)
---- 3. Using [Row(.)] to create DataFrame ----
show df3's schema:
root
|-- age: integer (nullable = true)
|-- name: string (nullable = true)
---- 4. Creating RDD then convert to DataFrame ----
show df4's schema:
root
|-- age: integer (nullable = true)
|-- name: string (nullable = true)
操作Spark DataFrame
df = df4
查看数据
df是惰性求值的,通过.show()来查看数据。df.columns可以展示表头。
如果需要开启及时求值(‘即查即得’)模式,需要去配置项里进行更改:将spark.sql.repl.eagerEval.enabled
设置为True。同时spark.sql.repl.eagerEval.maxNumRows
可以设置最大行数。
df.collect()其实在df内含的rdd上做collect()操作,也可以展示df的信息。取前/后几排操作可以使用df.take()
和df.tail()
实现,不同这些也都是在rdd上进行的操作,所以结果的类型并不是一个dataframe
df同样可以转化为pandas.DataFrame。
df.show()
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
df.columns
['age', 'name']
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)
df
age | name |
---|---|
null | Michael |
30 | Andy |
19 | Justin |
df.collect()
[Row(age=None, name='Michael'),
Row(age=30, name='Andy'),
Row(age=19, name='Justin')]
df.take(1)
[Row(age=None, name='Michael')]
df.toPandas()
age | name | |
---|---|---|
0 | NaN | Michael |
1 | 30.0 | Andy |
2 | 19.0 | Justin |
筛选数据
pyspark里这一块最核心的类型是Column,所有的Column都可以通过.select()方法来获取展示。
spark.conf.set('spark.sql.repl.eagerEval.enabled', False)
df.select("age").show()
+----+
| age|
+----+
|null|
| 30|
| 19|
+----+
df.filter(df.age>20).show()
df[df.age>20].show()
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+
df.withColumn("agenull_flag",df.age.isNotNull()).show()
+----+-------+------------+
| age| name|agenull_flag|
+----+-------+------------+
|null|Michael| false|
| 30| Andy| true|
| 19| Justin| true|
+----+-------+------------+
apply自定义函数
使用pandas UDF(User Defined Function)可以很简单的自定义想要应用的函数。它的作用机理是:由Spark执行,使用Arrow(一种用于内存计算的列式数据结构的库,可以极大的增强JVM和Python之间的数据传输效率,具体可见这篇文章)传输数据,通过Pandas执行数据计算,允许向量化操作。
Pandas UDF通过使用pandas_udf作为装饰器包装函数来定义,不需要额外的配置。
df = spark.createDataFrame([
Row(name='A', age=20, weight=80.),
Row(name='A', age=21, weight=85.),
Row(name='A', age=22, weight=83.),
Row(name='A', age=23, weight=78.),
Row(name='B', age=30, weight=60.),
Row(name='B', age=31, weight=65.),
Row(name='B', age=32, weight=70.),
],schema="name string,age long,weight double")
df.printSchema()
df.show()
root
|-- name: string (nullable = true)
|-- age: long (nullable = true)
|-- weight: double (nullable = true)
+----+---+------+
|name|age|weight|
+----+---+------+
| A| 20| 80.0|
| A| 21| 85.0|
| A| 22| 83.0|
| A| 23| 78.0|
| B| 30| 60.0|
| B| 31| 65.0|
| B| 32| 70.0|
+----+---+------+
## 使用pandas_udf装饰器来定义udf有一下2个要点:
## 1.@pandas_udf里需要定义出参的数据格式 (pyspark.sql.types里的那些或者其对应的简写)
## 2.def的函数需定义入出参的类型,常用的有如下2种:
## ---- pd.Series -> pd.Series
## ---- pd.Series -> Scalar(单值,这儿需给出具体取值的类型:str、int等)
@pandas_udf('long')
def plus_one(series: pd.Series) -> pd.Series:
return series + 1
df.select(plus_one(df["age"]).alias("age_plus1")).show()
@pandas_udf('double')
def get_mean(series: pd.Series) -> float:
return series.mean()
df.select(get_mean(df["weight"]).alias("weight_mean")).show()
@pandas_udf('double')
def weight2age(age:pd.Series, wt:pd.Series) -> pd.Series:
return (wt/age).round(2)
df.select(weight2age(df["age"],df["weight"]).alias("wt2age")).show()
+---------+
|age_plus1|
+---------+
| 21|
| 22|
| 23|
| 24|
| 31|
| 32|
| 33|
+---------+
+-----------------+
| weight_mean|
+-----------------+
|74.42857142857143|
+-----------------+
+------+
|wt2age|
+------+
| 4.0|
| 4.05|
| 3.77|
| 3.39|
| 2.0|
| 2.1|
| 2.19|
+------+
## 在groupby的基础上也可以使用udf(相当于是udaf)。pd.DataFrame -> pd.DataFrame
## 老版apply,通过@pandas_udf装饰器,第二个参数必须使用PandasUDFType.GROUPED_MAP
@pandas_udf('name string, alpha float, beta float',PandasUDFType.GROUPED_MAP)
def get_fitline_old(pdf):
res = np.polyfit(pdf["weight"],pdf["age"],1)
beta = round(res[0],2)
alpha = round(res[1],2)
name = pdf["name"][0]
return pd.DataFrame({"name":[name],"alpha":[alpha],"beta":[beta]})
print("---- 这是老版的apply ----")
df.groupby("name").apply(get_fitline_old).show()
## 新版applyInPandas,不需要装饰器,但是需要在applyInPandas里给定schema
print("---- 这是新版的applyInPandas ----")
def get_fitline_new(pdf):
res = np.polyfit(pdf["weight"],pdf["age"],1)
beta = round(res[0],2)
alpha = round(res[1],2)
name = pdf["name"][0]
return pd.DataFrame({"name":[name],"alpha":[alpha],"beta":[beta]})
df.groupby("name").applyInPandas(get_fitline_new,schema="name string, alpha double, beta double").show()
---- 这是老版的apply ----
+----+-----+-----+
|name|alpha| beta|
+----+-----+-----+
| A|32.74|-0.14|
| B| 18.0| 0.2|
+----+-----+-----+
---- 这是新版的applyInPandas ----
+----+-----+-----+
|name|alpha| beta|
+----+-----+-----+
| A|32.74|-0.14|
| B| 18.0| 0.2|
+----+-----+-----+
在Spark DataFrame里操作SQL指令
针对某个spark dataframe使用createOrReplaceTempView()
来进行数据库表的注册,然后通过spark.sql
发起SQL操作。
同样,之前定义的pandas_udf函数可以使用在SQL查询里,通过spark.udf.register
进行注册,然后在sql文里就可以使用了。
df.createOrReplaceTempView("table_tmp")
spark.sql("select * from table_tmp where name='A'").show()
+----+---+------+
|name|age|weight|
+----+---+------+
| A| 20| 80.0|
| A| 21| 85.0|
| A| 22| 83.0|
| A| 23| 78.0|
+----+---+------+
spark.udf.register("plus_one", plus_one)
spark.sql("SELECT plus_one(age) as age_plus1 FROM table_tmp").show()
+---------+
|age_plus1|
+---------+
| 21|
| 22|
| 23|
| 24|
| 31|
| 32|
| 33|
+---------+