Spark collect_list 的坑

collect_list 的坑

Spark 按照某列group 之后,使用collect_list 将特定列收集到一个数组的时候,默认情况会过滤掉空值(包括null 和 空字符串)。

比如我们有如下dataframe

name sex age
John male 18
null male 19
Lily female null
null female 20
Tom male 35

根据sex字段 group

df.groupBy("sex").agg(collect_list("name").alias("names"), collect_list("age").alias("ages"))

会得到以下结果:

sex names ages
male ["John", "Tom"] [18, 35, 19]
female ["Lily"] [20]

也就是说,spark 在collect_list 的时候把空值过滤掉。

解决方案

解决方案比较简单粗暴,就是将dataframe 里面的空值替换为非空值。
最简单的方法是使用下面的语句,但是直接使用na.fill 只能将schemaType 和fill 传入参数类型一致的列空值替换,schemaType不一致的null 值无法生效。

df = df.na.fill("#")

正确的做法是针对每一个不同的列类型指定替换后的数值

val typeMap = df.dtypes.map(column => 
    column._2 match {
        case "IntegerType" => (column._1 -> 0)
        case "StringType" => (column._1 -> "")
        case "DoubleType" => (column._1 -> 0.0)
    }).toMap
df  = df.na.fill(typeMap)

使用以下语句检查df 中是否存在空值

val filtered = df.filter(row => !row.anyNull)
assert(filtered.count() == df.count())

再次group 之后,会得到以下结果:

sex names ages
male ["John", "Tom", "#"] [18, 35, 19]
female ["Lily","#"] [0, 20]