tonglin0325的个人主页

thrift,protobuf,avro序列化对比

对比thrift使用TCompactProtocol协议,protobuf使用,以及avro使用AvroKeyOutputFormat格式进行序列化对数据进行序列化后数据量大小

由于thrift的binary数据类型不能再次序列化化成二进制,所以测试的schema中没有binary类型的字段

1.avro schema#

测试数据的avro schema定义如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
{
"namespace": "com.linkedin.haivvreo",
"name": "test_serializer",
"type": "record",
"fields": [
{ "name":"string1", "type":"string" },
{ "name":"int1", "type":"int" },
{ "name":"tinyint1", "type":"int" },
{ "name":"smallint1", "type":"int" },
{ "name":"bigint1", "type":"long" },
{ "name":"boolean1", "type":"boolean" },
{ "name":"float1", "type":"float" },
{ "name":"double1", "type":"double" },
{ "name":"list1", "type":{"type":"array", "items":"string"} },
{ "name":"map1", "type":{"type":"map", "values":"int"} },
{ "name":"struct1", "type":{"type":"record", "name":"struct1_name", "fields": [
{ "name":"sInt", "type":"int" }, { "name":"sBoolean", "type":"boolean" }, { "name":"sString", "type":"string" } ] } },
{ "name":"enum1", "type":{"type":"enum", "name":"enum1_values", "symbols":["BLUE","RED", "GREEN"]} },
{ "name":"nullableint", "type":["int", "null"] }
] }

2.Thrift schema#

测试数据的thrift schema定义如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
namespace java com.linkedin.haivvreo

struct struct1_name{
1: required i32 sInt;
2: required bool sBoolean;
3: required string sString;
}

enum enum1_values {
BLUE,
RED,
GREEN
}

struct union1{
1: optional double member0;
2: optional bool member1;
3: optional string member2;
}

struct test_serializer{
1: required string string1;
2: required i32 int1;
3: required i32 tinyint1;
4: required i32 smallint1;
5: required i64 bigint1;
6: required bool boolean1;
7: required double float1;
8: required double double1;
9: required list<string> list1;
10: required map<string, i32> map1;
11: required struct1_name struct1;
12: required string enum1;
13: optional i32 nullableint
}

3.protobuf schema#

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
syntax = "proto3";
package com.linkedin.haivvreo;

message Struct1Name {
int32 sInt = 1;
bool sBoolean = 2;
string sString = 3;
}

enum Enum1Values
{
BLUE = 0; //proto3版本中,首成员必须为0,成员不应有相同的值
RED = 1;
GREEN = 2;
}

message TestSerializer {
string string1 = 1;
int32 int1 = 2;
int32 tinyint1 = 3;
int32 smallint1 = 4;
int64 bigint1 = 5;
bool boolean1 = 6;
double float1 = 7;
double double1 = 8;
repeated string list1 = 9;
map<string, int32> map1 = 10;
Struct1Name struct1 = 11;
Enum1Values enum1 = 12;
int32 nullableint = 13;
}

编译protobuf schema

1
protoc -I=./ --java_out=src/main/java/ ./src/main/proto3/test_serializer.proto

4.测试过程#

数据内容如下,使用代码随机生成thrift object

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
val obj = new test_serializer()
obj.setString1(RandomStringUtils.randomAlphanumeric(10))
obj.setInt1(new java.util.Random().nextInt(100000))
obj.setTinyint1(new java.util.Random().nextInt(100))
obj.setSmallint1(new java.util.Random().nextInt(10))
obj.setBigint1(new java.util.Random().nextLong())
obj.setBoolean1(new java.util.Random().nextBoolean())
obj.setFloat1(new java.util.Random().nextFloat())
obj.setDouble1(new java.util.Random().nextDouble())
val cs1 = RandomStringUtils.randomAlphanumeric(10): CharSequence
val cs2 = RandomStringUtils.randomAlphanumeric(10): CharSequence
obj.setList1(List(cs1, cs2).asJava)
val map: java.util.Map[CharSequence, Integer] = HashMap(cs1 -> new java.util.Random().nextInt(10000), cs2 -> new java.util.Random().nextInt(10000))
.map(line => (line._1, (Integer.valueOf(line._2)))).asJava
obj.setMap1(map)
val struct1 = new struct1_name
struct1.setSInt(new java.util.Random().nextInt(1000000))
struct1.setSBoolean(new java.util.Random().nextBoolean())
struct1.setSString(RandomStringUtils.randomAlphanumeric(10))
obj.setStruct1(struct1)
val enum1 = enum1_values.BLUE
obj.setEnum1(enum1)
obj.setNullableint(new java.util.Random().nextInt(10000))
obj

如下

如果是avro object的话,可以从avro java class生成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
val rdd = sc.parallelize(Seq(1,1,1,1,1))
val rdd2 = rdd.map{ line =>
val avroSchema = new Schema.Parser().parse(schemasStr)
val avroRecord = new GenericData.Record(avroSchema)
avroRecord.put("firstname", "hello")
avroRecord.put("lastname", "world")
avroRecord.put("age", 20)
val childSchema = avroRecord.getSchema.getField("address").schema
val childRecord = new GenericData.Record(childSchema)
childRecord.put("streetaddress", "haidian")
childRecord.put("city", "beijing")
avroRecord.put("address", childRecord)
avroRecord

也可以从avro schema生成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
val rdd = spark.sparkContext.parallelize(Seq(1, 1, 1, 1, 1, 1, 1))
val rdd2 = rdd.map { line =>
val jsonFormatSchema = new String(Files.readAllBytes(Paths.get("src/main/resources/schema.avro")))
val avroSchema = new Schema.Parser().parse(jsonFormatSchema)
val datum = new GenericData.Record(avroSchema)
datum.put("firstname", "xiao")
datum.put("lastname", "ming")
datum.put("age", 24)
datum.put("test_field2", "test2")
val childSchema = avroSchema.getField("address").schema()
val childDatum = new GenericData.Record(childSchema)
childDatum.put("streetaddress", "xierqi")
childDatum.put("city", "beijing")
datum.put("address", childDatum)
datum.put("test_field", 222)
datum
}

  

spark rdd保存lzo文件

1
2
3
4
5
scala> import com.hadoop.compression.lzo.LzopCodec
import com.hadoop.compression.lzo.LzopCodec

scala> df.rdd.saveAsTextFile("/user/hive/warehouse/json_lzo", classOf[LzopCodec])

spark rdd保存snappy文件

1
2
3
4
5
scala> import org.apache.hadoop.io.compress.SnappyCodec
import org.apache.hadoop.io.compress.SnappyCodec

scala> df.repartition(1).rdd.saveAsTextFile("/user/hive/warehouse/json_snappy", classOf[SnappyCodec])

 

测试数据

 

序列化框架 格式 压缩/序列化方式 数据行数 文件数量 文件大小
avro AvroKeyOutputFormat null 5250987 1 587.9 MB
avro AvroKeyOutputFormat SNAPPY 5250987 1 453.2 MB
avro AvroParquetOutputFormat SNAPPY 5250987 1 553.7 MB
thrift ParquetThriftOutputFormat SNAPPY 5250987 1 570.5 MB
thrift SequenceFileOutputFormat TBinaryProtocol 5250987 1 1.19 GB
thrift SequenceFileOutputFormat TCompactProtocol 5250987 1 788.7 MB
thrift SequenceFileOutputFormat TCompactProtocol+DefaultCodec 5250987 1 487.1 MB
json textfile null 5250987 1 1.84 GB
json textfile gzip 5250987 1 570.8 MB
json textfile lzo 5250987 1 716MB
json textfile snappy 5250987 1 727M