tonglin0325的个人主页

使用confluent schema registry将protobuf schema转换成avro schema

confleunt提供了一些方法,可以将protobuf schema转换成avro schema,用于支持将kafka protobuf序列化的message落盘成avro格式的文件

1.引入依赖#

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
<repositories>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
<repository>
<id>confluent</id>
<url>https://packages.confluent.io/maven/</url>
</repository>
</repositories>

<dependencies>
<!--pb-->
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.21.7</version>
</dependency>
<!--confluent-->
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-schema-registry</artifactId>
<version>7.1.1</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-protobuf-provider</artifactId>
<version>7.1.1</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-connect-avro-data</artifactId>
<version>7.1.1</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-connect-protobuf-converter</artifactId>
<version>7.1.1</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-connect-avro-data</artifactId>
<version>7.1.1</version>
</dependency>
<!--kafka-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-api</artifactId>
<version>1.1.0</version>
</dependency>
</dependencies>

2.定义protobuf schema#

定义一个protobuf schema

1
2
3
4
5
6
7
8
9
10
11
12
syntax = "proto3";
package com.acme;

message MyRecord {
string f1 = 1;
OtherRecord f2 = 2;
}

message OtherRecord {
int32 other_id = 1;
}

编译java代码

1
2
protoc -I=./ --java_out=./src/main/java ./src/main/proto/other.proto

得到schema的java代码

3.将protobuf schema转换成avro schema#

confluent schema registry在处理处理protobuf,avro,json格式的数据的时候,会统一先将其转换成connect schema格式的数据,然后再将其写成parquet,avro等具体的文件格式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import com.acme.Other;
import io.confluent.connect.avro.AvroData;
import io.confluent.connect.avro.AvroDataConfig;
import io.confluent.connect.protobuf.ProtobufData;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaUtils;
import org.apache.kafka.connect.data.SchemaAndValue;

public class ProtobufToAvro {

public static void main(String[] args) {
// 初始化protobuf定义的类
Other.MyRecord obj = Other.MyRecord.newBuilder().build();
// 获取pb schema
ProtobufSchema pbSchema = ProtobufSchemaUtils.getSchema(obj);
ProtobufData protobufData = new ProtobufData();
// SchemaAndValue result = protobufData.toConnectData(pbSchema, obj);
// System.out.println(result);

AvroDataConfig avroDataConfig = new AvroDataConfig.Builder()
.with(AvroDataConfig.SCHEMAS_CACHE_SIZE_CONFIG, 1)
.with(AvroDataConfig.CONNECT_META_DATA_CONFIG, false)
.with(AvroDataConfig.ENHANCED_AVRO_SCHEMA_SUPPORT_CONFIG, true)
.build();
AvroData avroData = new AvroData(avroDataConfig);
// 先将protobuf schema转换成connect schema,然后再转换成avro schema
org.apache.avro.Schema avroSchema = avroData.fromConnectSchema(protobufData.toConnectSchema(pbSchema));
System.out.println(avroSchema);
}

}

转换的avro schema输出如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
{
"type":"record",
"name":"MyRecord",
"fields":[
{
"name":"f1",
"type":[
"null",
"string"
],
"default":null
},
{
"name":"f2",
"type":[
"null",
{
"type":"record",
"name":"OtherRecord",
"fields":[
{
"name":"other_id",
"type":[
"null",
"int"
],
"default":null
}
]
}
],
"default":null
}
]
}

注意:confluent在具体实现的时候,比较严谨,在protobuf的uint32(0 到 2^32 -1)的时候,会统一转换成long(-2^63 ~ 2^63-1),不会出现越界的情况,参考源码

1
2
https://github.com/confluentinc/schema-registry/blob/v7.1.1/protobuf-converter/src/main/java/io/confluent/connect/protobuf/ProtobufData.java#L1485

转换实现参考源码

1
2
https://github.com/confluentinc/schema-registry/blob/v7.1.1/avro-data/src/test/java/io/confluent/connect/avro/AdditionalAvroDataTest.java