confleunt提供了一些方法,可以将protobuf schema转换成avro schema,用于支持将kafka protobuf序列化的message落盘成avro格式的文件
1.引入依赖
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
| <repositories> <repository> <id>cloudera</id> <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url> </repository> <repository> <id>confluent</id> <url>https://packages.confluent.io/maven/</url> </repository> </repositories>
<dependencies> <!--pb--> <dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>3.21.7</version> </dependency> <!--confluent--> <dependency> <groupId>io.confluent</groupId> <artifactId>kafka-schema-registry</artifactId> <version>7.1.1</version> </dependency> <dependency> <groupId>io.confluent</groupId> <artifactId>kafka-protobuf-provider</artifactId> <version>7.1.1</version> </dependency> <dependency> <groupId>io.confluent</groupId> <artifactId>kafka-connect-avro-data</artifactId> <version>7.1.1</version> </dependency> <dependency> <groupId>io.confluent</groupId> <artifactId>kafka-connect-protobuf-converter</artifactId> <version>7.1.1</version> </dependency> <dependency> <groupId>io.confluent</groupId> <artifactId>kafka-connect-avro-data</artifactId> <version>7.1.1</version> </dependency> <!--kafka--> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>connect-api</artifactId> <version>1.1.0</version> </dependency> </dependencies>
|
2.定义protobuf schema
定义一个protobuf schema
1 2 3 4 5 6 7 8 9 10 11 12
| syntax = "proto3"; package com.acme;
message MyRecord { string f1 = 1; OtherRecord f2 = 2; }
message OtherRecord { int32 other_id = 1; }
|
编译java代码
1 2
| protoc -I=./ --java_out=./src/main/java ./src/main/proto/other.proto
|
得到schema的java代码
3.将protobuf schema转换成avro schema
confluent schema registry在处理处理protobuf,avro,json格式的数据的时候,会统一先将其转换成connect schema格式的数据,然后再将其写成parquet,avro等具体的文件格式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| import com.acme.Other; import io.confluent.connect.avro.AvroData; import io.confluent.connect.avro.AvroDataConfig; import io.confluent.connect.protobuf.ProtobufData; import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema; import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaUtils; import org.apache.kafka.connect.data.SchemaAndValue;
public class ProtobufToAvro {
public static void main(String[] args) { // 初始化protobuf定义的类 Other.MyRecord obj = Other.MyRecord.newBuilder().build(); // 获取pb schema ProtobufSchema pbSchema = ProtobufSchemaUtils.getSchema(obj); ProtobufData protobufData = new ProtobufData(); // SchemaAndValue result = protobufData.toConnectData(pbSchema, obj); // System.out.println(result);
AvroDataConfig avroDataConfig = new AvroDataConfig.Builder() .with(AvroDataConfig.SCHEMAS_CACHE_SIZE_CONFIG, 1) .with(AvroDataConfig.CONNECT_META_DATA_CONFIG, false) .with(AvroDataConfig.ENHANCED_AVRO_SCHEMA_SUPPORT_CONFIG, true) .build(); AvroData avroData = new AvroData(avroDataConfig); // 先将protobuf schema转换成connect schema,然后再转换成avro schema org.apache.avro.Schema avroSchema = avroData.fromConnectSchema(protobufData.toConnectSchema(pbSchema)); System.out.println(avroSchema); }
}
|
转换的avro schema输出如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| { "type":"record", "name":"MyRecord", "fields":[ { "name":"f1", "type":[ "null", "string" ], "default":null }, { "name":"f2", "type":[ "null", { "type":"record", "name":"OtherRecord", "fields":[ { "name":"other_id", "type":[ "null", "int" ], "default":null } ] } ], "default":null } ] }
|
注意:confluent在具体实现的时候,比较严谨,在protobuf的uint32(0 到 2^32 -1)的时候,会统一转换成long(-2^63 ~ 2^63-1),不会出现越界的情况,参考源码
1 2
| https://github.com/confluentinc/schema-registry/blob/v7.1.1/protobuf-converter/src/main/java/io/confluent/connect/protobuf/ProtobufData.java#L1485
|
转换实现参考源码
1 2
| https://github.com/confluentinc/schema-registry/blob/v7.1.1/avro-data/src/test/java/io/confluent/connect/avro/AdditionalAvroDataTest.java
|