tonglin0325的个人主页

Spark学习笔记——rdd,dataframe和dataset转换

1.生成RDD

1
2
val rdd: RDD[(String, Int)] = sc.parallelize(Seq(("cat", 30), ("fox", 40)))

2.生成case class RDD

1
2
3
case class WordCount(word: String, count: Long)
val rdd: RDD[WordCount] = sc.parallelize(Seq(WordCount("dog", 50), WordCount("cow", 60)))

3.rdd转df,注意这里需要隐式转换

1
2
3
4
import spark.implicits._

val df = rdd.toDF()

4.rdd转ds,注意
 WordCount 需要写在主函数之外

1
2
3
4
import spark.implicits._

val ds: Dataset[WordCount]= rdd.toDS()

5.df转ds

1
2
val ds: Dataset[WordCount]= df.as[WordCount]

6.thrift class rdd转df

1
2
val df = spark.createDataFrame(rdd, classOf[MyBean])

7.thrift class df转ds,需要为thrift class指定encoder

1
2
val ds = df.as[MyBean](Encoders.bean(classOf[MyBean]))

或者

1
2
3
4
implicit val mapEncoder = Encoders.bean(classOf[MyBean])

val ds = df.as[MyBean]

8.avro class rdd转df,参考

1
2
https://stackoverflow.com/questions/47264701/how-to-convert-rddgenericrecord-to-dataframe-in-scala

使用databricks的spark-avro包

1
2
3
4
5
6
<dependency>
<groupId>com.databricks</groupId>
<artifactId>spark-avro_2.11</artifactId>
<version>3.2.0</version>
</dependency>

工具类,注意包名必须写成 com.databricks.spark.avro

1
2
3
4
5
6
7
8
9
10
11
12
13
14
package com.databricks.spark.avro

import com.databricks.spark.avro.SchemaConverters.createConverterToSQL
import org.apache.avro.Schema
import org.apache.spark.sql.types.StructType

object SchemaConverterUtils {

def converterSql(schema: Schema, sqlType: StructType) = {
createConverterToSQL(schema, sqlType)
}

}

代码

1
2
3
4
5
6
7
8
val sqlType = SchemaConverters.toSqlType(test_serializer.SCHEMA$).dataType.asInstanceOf[StructType]
val converter = SchemaConverterUtils.converterSql(test_serializer.SCHEMA$, sqlType)
val rowRdd = rdd.flatMap(record => {
Try(converter(record).asInstanceOf[Row]).toOption
})
val df = sparkSession.createDataFrame(rowRdd, sqlType)
df.show()

输出