Spark基础-结构化API和操作

DataFrame和DataSet共同点

具有行和列的类似于数据表的集合类型,所有列的行数相同,缺省值用null填充。每一列的所有行数据类型相同。

DataFrame

是非类型化的,列的数据类型由spark维护,在运行时检查类型。

可以将DataFrame看作是Row类型的DataSet

Dataset

是类型化的,仅支持基于JVM的语言,通过case class或Java beans指定类型。在编译时进行类型检查。

使用Dataset API时,将Spark Row格式的每一行转换成指定的特定类型对象(case或bean),转换会减慢操作速度,但提供更大灵活性。

上图将DF转成了DS

SQL

直接用SQL实现spark任务

托管表

简单理解就是由spark管理的表,不光包括数据,还包括表的元数据,都是spark的格式存储的。比如df调用saveAsTable生成的结果。

非托管表

spark读取到的数据都可以看成是一张非托管表,这些数据以其他格式和方式存在,spark只读取使用,不进行管理。

执行过程

  • 编写结构化API代码
  • spark将其转换为逻辑执行计划
  • spark将逻辑执行计划转换为物理执行计划
  • spark在集群上执行物理计划(RDD操作)

下图表示代码到逻辑执行计划的转换流程,主要包括逻辑计划的转换和catalyst优化器优化两个部分

物理计划通过生成不同的物理执行策略,并通过代价模型进行比较分析,从而指定如何在集群上执行逻辑计划。

重分区

用df的repartition方法进行数据的重分区,会导致数据的shuffle,一般有两个参数(可以都传)

  • 改变分区个数repartion(10)
  • 按某列分区repartion(col(“aaa”))
    如果要按照某一列进行大量操作,则按这一列重分区很有必要

合并操作

coalesce操作,不会导致shuffle,但会尝试合并分区

1
df.repartition(5,col("aaa")).coalesce(2) //分区从5合并到2

驱动器获取行

将各个executor数据收集到driver上。collect函数从整个df中获取数据到driver

1
2
3
val collected = df.limit(10)
collected.show()
collected.collect()

空值的处理

建议使用null来表示DataFrame中的空值,其他的空如空字符串,scala的None,都不被spark认为是空值。

  • drop默认删除包含null的行

  • Coalesce方法从一列中选择第一个非空值

    数组处理

  • split:将一列的值分割为数组

  • size:长度

  • array_contains:包含

  • explode:行转多行,数组中每个值变成一行,其他列重复

    自定义函数UDF

spark在driver上序列化udf,将它网络传输到所有executor

udf会导致性能下降,无法参与优化

聚合操作

spark允许下列分组类型:

  • group by 对一个或多个列执行指定的聚合函数
  • window:开窗操作,支持聚合一定范围内的值
  • grouping set:多个不同级别进行聚合(SQL中使用,dataFrame中对应rollup和cube)
  • rollup:聚合后针对指定的多个key进行分级分组汇总
  • cube:聚合后针对指定的多个key进行全组合分组汇总

    window

比如计算最近7天平均值

通过一个窗口,决定哪些行传递给聚合函数

定义窗口:

使用窗口

分组集

支持跨多个组的聚合操作。(仅在SQL中可用),将各种分组的统计的结果union到一起。

比如下面的例子,表示先按照city和car_model分组统计,再按照city统计,再按照car_model统计,再按照全部统计,将所有统计结果union

rollup

rollup操作在df和ds上使用,效果和上面的grouping sets一样,即可以指定多个聚合维度,最终把这些聚合维度下的数据union起来

cube

cube是更进一步的操作,不需要知道聚合组合的指定,直接对所有参与列值进行所有维度的全组合聚合。比如下面,指定了两个列,则一共有4种可能的组合

join操作

join种类

join的类型有很多,比如inner join, outer join, left join, right join等区别在于如何处理没有匹配上的行。这里不再赘述。

join执行过程

根据表的大小不同,spark采取不同的join方式。总共3种

broadcast hash join

当两张表中有一张很小的时候,采用广播的方式将小表广播到所有executor,避免了all-to-all的shuffle join。发送成功后,对小表的连接字段做hash存储,然后对大表的每条数据进行验证,看能否匹配。

shuffle hash join

当小表不足以广播但仍旧不太大时,可以根据join key相同必然分区相同的原理,将两张表分别按照join key进行重新组织分区,然后做hash join

sort merge join

两张大表进行关联时,不再用hash的形式,因为需要额外内存存储生成的hash,大表情况下内存可能不足。改用sort merge join。

首先还是按照join key进行重新组织分区,之后在每个executor上,对两表的数据分别排序,之后进行两个有序列表的merge。

总结

大表与一个很小的表关联,采用broadcast hash join

大表和一个小表关联,选择shuffle hash join

大表和大表关联,采用sort merge join


Spark基础-结构化API和操作
http://www.bake-data.com/2024/04/24/Spark基础-结构化API和操作/
Author
shuchen
Posted on
April 24, 2024
Licensed under