1.DataFrame API读取avro文件
1 | https://sparkbyexamples.com/spark/read-write-avro-file-spark-dataframe/ |
pom引入,spark2.4.0之后可以使用apache的spark-avro包,之前需要使用databricks的spark-avro包
1 | <!--avro--> |
读取avro文件,得到DataFrame
1 | import java.io.File |
DataFrame API写入avro文件
1 | df.write.format("avro").save("person.avro") |
2.spark使用Hadoop API读取avro文件
1 | val sparkSession = SparkSession.builder() |
输出
1 | {"string1": "test", "int1": 2, "tinyint1": 1, "smallint1": 2, "bigint1": 1000, "boolean1": true, "float1": 1.111, "double1": 2.22222, "list1": ["test1", "test2", "test3"], "map1": {"test123": 2222, "test321": 4444}, "struct1": {"sInt": 123, "sBoolean": true, "sString": "London"}, "union1": 0.2, "enum1": "BLUE", "nullableint": 12345, "bytes1": "00008DAC", "fixed1": [49, 49, 49]} |
参考:
1 | https://github.com/subprotocol/spark-avro-example/blob/master/src/main/scala/com/subprotocol/AvroUtil.scala |
spark使用Hadoop API写入avro文件
1 | val writeJob = new Job() |
参考:
1 | https://mail-archives.apache.org/mod_mbox/spark-user/201411.mbox/%3CCAKz4c0S_cuo90q2JXudvx9WC4FWU033kX3-FjUJYTXxhr7PXOw@mail.gmail.com%3E |
3.spark使用Hadoop API读取avro parquet文件
1 | import org.apache.avro.specific.SpecificRecord |
然后调用HDFSIO.readAvroParquetFile
1 | val rdd = HDFSIO.readAvroParquetFile(sc, "file:///Users/lintong/coding/java/interview/bigdata/data/avro_parquet", classOf[test_serializer]) |
输出
1 | {"string1": "test", "int1": 2, "tinyint1": 1, "smallint1": 2, "bigint1": 1000, "boolean1": true, "float1": 1.111, "double1": 2.22222, "list1": ["test1", "test2", "test3"], "map1": {"test123": 2222, "test321": 4444}, "struct1": {"sInt": 123, "sBoolean": true, "sString": "London"}, "union1": 0.2, "enum1": "BLUE", "nullableint": 12345, "bytes1": "00008DAC", "fixed1": [49, 49, 49]} |
spark使用Hadoop API写入avro parquet文件
1 | AvroWriteSupport.setSchema(sc.hadoopConfiguration, test_serializer.SCHEMA$) |