1.生成RDD
1 | val rdd: RDD[(String, Int)] = sc.parallelize(Seq(("cat", 30), ("fox", 40))) |
2.生成case class RDD
1 | case class WordCount(word: String, count: Long) |
3.rdd转df,注意这里需要隐式转换
1 | import spark.implicits._ |
4.rdd转ds,注意
WordCount 需要写在主函数之外
1 | import spark.implicits._ |
5.df转ds
1 | val ds: Dataset[WordCount]= df.as[WordCount] |
6.thrift class rdd转df
1 | val df = spark.createDataFrame(rdd, classOf[MyBean]) |
7.thrift class df转ds,需要为thrift class指定encoder
1 | val ds = df.as[MyBean](Encoders.bean(classOf[MyBean])) |
或者
1 | implicit val mapEncoder = Encoders.bean(classOf[MyBean]) |
8.avro class rdd转df,参考
1 | https://stackoverflow.com/questions/47264701/how-to-convert-rddgenericrecord-to-dataframe-in-scala |
使用databricks的spark-avro包
1 | <dependency> |
工具类,注意包名必须写成 com.databricks.spark.avro
1 | package com.databricks.spark.avro |
代码
1 | val sqlType = SchemaConverters.toSqlType(test_serializer.SCHEMA$).dataType.asInstanceOf[StructType] |
输出