tonglin0325的个人主页

Spark学习笔记——读写Avro

1.DataFrame API读取avro文件

1
2
https://sparkbyexamples.com/spark/read-write-avro-file-spark-dataframe/

pom引入,spark2.4.0之后可以使用apache的spark-avro包,之前需要使用databricks的spark-avro包

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
<!--avro-->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.11.0</version>
<exclusions>
<exclusion>
<artifactId>jackson-databind</artifactId>
<groupId>com.fasterxml.jackson.core</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
<version>1.12.1</version>
</dependency>
<!--spark-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.0.cloudera2</version>
<exclusions>
<exclusion>
<artifactId>avro</artifactId>
<groupId>org.apache.avro</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.4.0.cloudera2</version>
<exclusions>
<exclusion>
<artifactId>avro</artifactId>
<groupId>org.apache.avro</groupId>
</exclusion>
<exclusion>
<artifactId>parquet-jackson</artifactId>
<groupId>com.twitter</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-avro_2.11</artifactId>
<version>2.4.0.cloudera2</version>
</dependency>

读取avro文件,得到DataFrame

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import java.io.File

import org.apache.avro.Schema
import org.apache.spark.sql.SparkSession

object SparkAvroDemo {

def main(args: Array[String]) {

val sparkSession = SparkSession.builder()
.master("local")
.appName("spark session example")
.getOrCreate()

val schemaAvro = new Schema.Parser().parse(new File("src/main/avro/kst.avsc"))

val df = sparkSession.read
.format("avro")
.option("avroSchema", schemaAvro.toString)
.load("file:///Users/lintong/coding/java/interview/bigdata/data/000000_0")

df.show()

}

}

 

 

DataFrame API写入avro文件 

1
2
df.write.format("avro").save("person.avro")

 

2.spark使用Hadoop API读取avro文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
val sparkSession = SparkSession.builder()
.master("local")
.appName("spark session example")
.getOrCreate()

val sc = sparkSession.sparkContext
val conf = sc.hadoopConfiguration
val path = "file:///Users/lintong/coding/java/interview/bigdata/data/000000_0"
val rdd = sc.newAPIHadoopFile(
path,
classOf[AvroKeyInputFormat[test_serializer]],
classOf[AvroKey[test_serializer]],
classOf[NullWritable],
conf
).map(_._1.datum())
rdd.foreach(println(_))

输出

1
2
{"string1": "test", "int1": 2, "tinyint1": 1, "smallint1": 2, "bigint1": 1000, "boolean1": true, "float1": 1.111, "double1": 2.22222, "list1": ["test1", "test2", "test3"], "map1": {"test123": 2222, "test321": 4444}, "struct1": {"sInt": 123, "sBoolean": true, "sString": "London"}, "union1": 0.2, "enum1": "BLUE", "nullableint": 12345, "bytes1": "00008DAC", "fixed1": [49, 49, 49]}

参考:

1
2
https://github.com/subprotocol/spark-avro-example/blob/master/src/main/scala/com/subprotocol/AvroUtil.scala

 

spark使用Hadoop API写入avro文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
val writeJob = new Job()
AvroJob.setOutputKeySchema(writeJob, test_serializer.SCHEMA$)
writeJob.setOutputFormatClass(classOf[AvroKeyOutputFormat[test_serializer]])

rdd
.map { s => (new AvroKey(s), NullWritable.get) }
.saveAsNewAPIHadoopFile(
"file:///Users/lintong/coding/java/interview/bigdata/data/avro_container",
classOf[AvroKey[test_serializer]],
classOf[NullWritable],
classOf[AvroKeyOutputFormat[test_serializer]],
writeJob.getConfiguration
)

参考:

1
2
https://mail-archives.apache.org/mod_mbox/spark-user/201411.mbox/%3CCAKz4c0S_cuo90q2JXudvx9WC4FWU033kX3-FjUJYTXxhr7PXOw@mail.gmail.com%3E

 

3.spark使用Hadoop API读取avro parquet文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import org.apache.avro.specific.SpecificRecord
import org.apache.hadoop.mapred.JobConf
import org.apache.parquet.avro.AvroParquetInputFormat
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD

import scala.reflect.{ClassTag, classTag}

object HDFSIO {

def readAvroParquetFile[T <: SpecificRecord : ClassTag](
sc: SparkContext,
path: String,
avroClass: Class[T]): RDD[T] = {
val jobConf = new JobConf(sc.hadoopConfiguration)
sc.newAPIHadoopFile(path, classOf[AvroParquetInputFormat[T]], classOf[Void], avroClass, jobConf)
.map {
case (_, obj) => obj
}
}

}

 

然后调用HDFSIO.readAvroParquetFile

1
2
3
val rdd = HDFSIO.readAvroParquetFile(sc, "file:///Users/lintong/coding/java/interview/bigdata/data/avro_parquet", classOf[test_serializer])
rdd.foreach(line => println(line))

输出

1
2
3
{"string1": "test", "int1": 2, "tinyint1": 1, "smallint1": 2, "bigint1": 1000, "boolean1": true, "float1": 1.111, "double1": 2.22222, "list1": ["test1", "test2", "test3"], "map1": {"test123": 2222, "test321": 4444}, "struct1": {"sInt": 123, "sBoolean": true, "sString": "London"}, "union1": 0.2, "enum1": "BLUE", "nullableint": 12345, "bytes1": "00008DAC", "fixed1": [49, 49, 49]}
{"string1": "test", "int1": 2, "tinyint1": 1, "smallint1": 2, "bigint1": 1000, "boolean1": true, "float1": 1.111, "double1": 2.22222, "list1": ["test1", "test2", "test3"], "map1": {"test123": 2222, "test321": 4444}, "struct1": {"sInt": 123, "sBoolean": true, "sString": "London"}, "union1": 0.2, "enum1": "BLUE", "nullableint": 12345, "bytes1": "00008DAC", "fixed1": [49, 49, 49]}

 

spark使用Hadoop API写入avro parquet文件

 

1
2
3
4
5
6
7
8
9
10
11
AvroWriteSupport.setSchema(sc.hadoopConfiguration, test_serializer.SCHEMA$)

rdd
.map((null, _)).saveAsNewAPIHadoopFile(
"file:///Users/lintong/coding/java/interview/bigdata/data/avro_parquet",
classOf[Void],
classOf[test_serializer],
classOf[AvroParquetOutputFormat[test_serializer]],
writeJob.getConfiguration
)