https://github.com/awslabs/aws-glue-schema-registry
AWS Glue Schema Registry Client library provides serializers / de-serializers for applications to integrate with AWS Glue Schema Registry Service. The library currently supports Avro, JSON and Protobuf data formats. See https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html to get started.
Science Score: 26.0%
This score indicates how likely this project is to be science-related based on various indicators:
-
○CITATION.cff file
-
✓codemeta.json file
Found codemeta.json file -
✓.zenodo.json file
Found .zenodo.json file -
○DOI references
-
○Academic publication links
-
○Committers with academic emails
-
○Institutional organization owner
-
○JOSS paper metadata
-
○Scientific vocabulary similarity
Low similarity (12.6%) to scientific vocabulary
Keywords from Contributors
Repository
AWS Glue Schema Registry Client library provides serializers / de-serializers for applications to integrate with AWS Glue Schema Registry Service. The library currently supports Avro, JSON and Protobuf data formats. See https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html to get started.
Basic Info
Statistics
- Stars: 142
- Watchers: 14
- Forks: 114
- Open Issues: 124
- Releases: 24
Metadata Files
README.md
AWS Glue Schema Registry Library
AWS Glue Schema Registry provides a solution for customers to centrally discover, control and evolve schemas while ensuring data produced was validated by registered schemas. AWS Glue Schema Registry Library offers Serializers and Deserializers that plug-in with Glue Schema Registry.
Getting Started
- Sign up for AWS — Before you begin, you need an AWS account. For more information about creating an AWS account and retrieving your AWS credentials, see AWS Account and Credentials in the AWS SDK for Java Developer Guide.
- Sign up for AWS Glue Schema Registry — Go to the AWS Glue Schema Registry console to sign up for the service and create an AWS Glue Schema Registry. For more information, see Getting Started with Glue Schema Registry in the AWS Glue Developer Guide.
- Minimum requirements — To use the AWS Glue Schema Registry, you'll need Java > 1.8 and < Java 15.
Features
- Messages/records are serialized on producer front and deserialized on the consumer front by using schema-registry-serde.
- Support for three data formats: AVRO, JSON (with JSON Schema Draft04, Draft06, Draft07), and Protocol Buffers (Protobuf syntax versions 2 and 3).
- Kafka Streams support for AWS Glue Schema Registry.
- Records can be compressed to reduce message size.
- An inbuilt local in-memory cache to save calls to AWS Glue Schema Registry. The schema version id for a schema definition is cached on Producer side and schema for a schema version id is cached on the Consumer side.
- Auto registration of schema can be enabled for any new schema to be auto-registered.
- For Schemas, Evolution check is performed while registering.
- Migration from a third party Schema Registry.
- Flink support for AWS Glue Schema Registry.
- Kafka Connect support for AWS Glue Schema Registry.
Building from Source
After you've downloaded the code from GitHub, you can build it using Maven.
The following maven command will clean the target directory, compile the project, execute the tests and package the project build into a JAR.
cd build-tools/ && mvn clean install && cd .. && mvn clean install
Alternatively, one could git clone this repo and run mvn clean install.
Testing
To simply run the tests, execute the following maven command:
mvn test
Using the AWS Glue Schema Registry Library Serializer / Deserializer
The recommended way to use the AWS Glue Schema Registry Library for Java is to consume it from Maven.
Using AWS Glue Schema Registry with Amazon MSK — To set-up Amazon Managed Streaming for Apache Kafka see Getting started with Amazon MSK.
Maven Dependency
xml
<dependency>
<groupId>software.amazon.glue</groupId>
<artifactId>schema-registry-serde</artifactId>
<version>1.1.25</version>
</dependency>
Code Example
Producer for Kafka with AVRO format
```java properties.put(ProducerConfig.KEYSERIALIZERCLASSCONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUESERIALIZERCLASSCONFIG, GlueSchemaRegistryKafkaSerializer.class.getName()); properties.put(AWSSchemaRegistryConstants.DATAFORMAT, DataFormat.AVRO.name()); properties.put(AWSSchemaRegistryConstants.AWSREGION, "us-east-1"); properties.put(AWSSchemaRegistryConstants.REGISTRYNAME, "my-registry"); properties.put(AWSSchemaRegistryConstants.SCHEMANAME, "my-schema");
Schema schema_payment = null;
try {
schema_payment = parser.parse(new File("src/main/resources/avro/com/tutorial/Payment.avsc"));
} catch (IOException e) {
e.printStackTrace();
}
GenericRecord musical = new GenericData.Record(schema_payment);
musical.put("id", "entertainment_2");
musical.put("amount", 105.0);
List<GenericRecord> misc = new ArrayList<>();
misc.add(musical);
try (KafkaProducer<String, GenericRecord> producer = new KafkaProducer<String, GenericRecord>(properties)) {
for (int i = 0; i < 4; i++) {
GenericRecord r = misc.get(i);
final ProducerRecord<String, GenericRecord> record;
record = new ProducerRecord<String, GenericRecord>(topic, r.get("id").toString(), r);
producer.send(record);
System.out.println("Sent message " + i);
Thread.sleep(1000L);
}
producer.flush();
System.out.println("Successfully produced 10 messages to a topic called " + topic);
} catch (final InterruptedException | SerializationException e) {
e.printStackTrace();
}
```
Consumer for Kafka with AVRO format
```java properties.put(ConsumerConfig.KEYDESERIALIZERCLASSCONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUEDESERIALIZERCLASSCONFIG, GlueSchemaRegistryKafkaDeserializer.class .getName(); properties.put(AWSSchemaRegistryConstants.AWSREGION, "us-east-1"); properties.put(AWSSchemaRegistryConstants.AVRORECORDTYPE, AvroRecordType.GENERICRECORD.getName());
try (final KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<String, GenericRecord>(properties)) {
consumer.subscribe(Collections.singletonList(topic));
while (true) {
final ConsumerRecords<String, GenericRecord> records = consumer.poll(100);
for (final ConsumerRecord<String, GenericRecord> record : records) {
final String key = record.key();
final GenericRecord value = record.value();
System.out.println("Received message: key = " + key + ", value = " + value);
}
}
}
```
Producer for Kafka with JSON format
```java properties.put(ProducerConfig.KEYSERIALIZERCLASSCONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUESERIALIZERCLASSCONFIG, GlueSchemaRegistryKafkaSerializer.class.getName()); properties.put(AWSSchemaRegistryConstants.DATAFORMAT, DataFormat.JSON.name()); properties.put(AWSSchemaRegistryConstants.AWSREGION, "us-east-1"); properties.put(AWSSchemaRegistryConstants.REGISTRYNAME, "my-registry"); properties.put(AWSSchemaRegistryConstants.SCHEMANAME, "my-schema");
String jsonSchema = "{\n" + " \"$schema\": \"http://json-schema.org/draft-04/schema#\",\n"
+ " \"type\": \"object\",\n" + " \"properties\": {\n" + " \"employee\": {\n"
+ " \"type\": \"object\",\n" + " \"properties\": {\n"
+ " \"name\": {\n" + " \"type\": \"string\"\n" + " },\n"
+ " \"age\": {\n" + " \"type\": \"integer\"\n" + " },\n"
+ " \"city\": {\n" + " \"type\": \"string\"\n" + " }\n"
+ " },\n" + " \"required\": [\n" + " \"name\",\n"
+ " \"age\",\n" + " \"city\"\n" + " ]\n" + " }\n"
+ " },\n" + " \"required\": [\n" + " \"employee\"\n" + " ]\n"
+ " }";
String jsonPayload = "{\n" + " \"employee\": {\n" + " \"name\": \"John\",\n" + " \"age\": 30,\n"
+ " \"city\": \"New York\"\n" + " }\n" + " }";
JsonDataWithSchema jsonSchemaWithData = JsonDataWithSchema.builder(jsonSchema, jsonPayload).build();
List<JsonDataWithSchema> genericJsonRecords = new ArrayList<>();
genericJsonRecords.add(jsonSchemaWithData);
try (KafkaProducer<String, JsonDataWithSchema> producer = new KafkaProducer<String, JsonDataWithSchema>(properties)) {
for (int i = 0; i < genericJsonRecords.size(); i++) {
JsonDataWithSchema r = genericJsonRecords.get(i);
final ProducerRecord<String, JsonDataWithSchema> record;
record = new ProducerRecord<String, JsonDataWithSchema>(topic, "message-" + i, r);
producer.send(record);
System.out.println("Sent message " + i);
Thread.sleep(1000L);
}
producer.flush();
System.out.println("Successfully produced 10 messages to a topic called " + topic);
} catch (final InterruptedException | SerializationException e) {
e.printStackTrace();
}
```
Consumer for Kafka with JSON format
```java properties.put(ConsumerConfig.KEYDESERIALIZERCLASSCONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUEDESERIALIZERCLASSCONFIG, GlueSchemaRegistryKafkaDeserializer.class .getName(); properties.put(AWSSchemaRegistryConstants.AWS_REGION, "us-east-1");
try (final KafkaConsumer<String, JsonDataWithSchema> consumer = new KafkaConsumer<String, JsonDataWithSchema>(properties)) {
consumer.subscribe(Collections.singletonList(topic));
while (true) {
final ConsumerRecords<String, JsonDataWithSchema> records = consumer.poll(100);
for (final ConsumerRecord<String, JsonDataWithSchema> record : records) {
final String key = record.key();
final JsonDataWithSchema value = record.value();
System.out.println("Received message: key = " + key + ", value = " + value);
}
}
}
```
Producer for Kafka with PROTOBUF format
```java properties.put(ProducerConfig.KEYSERIALIZERCLASSCONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUESERIALIZERCLASSCONFIG, GlueSchemaRegistryKafkaSerializer.class.getName()); properties.put(AWSSchemaRegistryConstants.DATAFORMAT, DataFormat.PROTOBUF.name()); properties.put(AWSSchemaRegistryConstants.AWSREGION, "us-east-1"); properties.put(AWSSchemaRegistryConstants.REGISTRYNAME, "my-registry"); properties.put(AWSSchemaRegistryConstants.SCHEMANAME, "protobuf-file-name.proto")
// POJO production
// CustomerAddress is the generated Protocol Buffers class based on the given Protobuf schema definition
CustomerAddress customerAddress = CustomerAddress.newBuilder().build();
KafkaProducer<String, CustomerAddress> producer =
new KafkaProducer<String, CustomerAddress>(properties);
producer.send(customerAddress);
// DynamicMessage production
DynamicMesssage customerDynamicMessage =
DynamicMessage.newBuilder(CustomerAddress.getDescriptor()).build();
KafkaProducer<String, DynamicMesssage> producer =
new KafkaProducer<String, DynamicMesssage>(properties);
producer.send(customerDynamicMessage);
```
Consumer for Kafka with PROTOBUF format
```java properties.put(ConsumerConfig.KEYDESERIALIZERCLASSCONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUEDESERIALIZERCLASSCONFIG, GlueSchemaRegistryKafkaDeserializer.class.getName()); properties.put(AWSSchemaRegistryConstants.AWS_REGION, "us-east-1");
// POJO consumption
properties.put(AWSSchemaRegistryConstants.PROTOBUF_MESSAGE_TYPE, ProtobufMessageType.POJO.getName());
KafkaConsumer<String, CustomerAddress> consumer =
new KafkaConsumer<String, CustomerAddress>(properties)
consumer.subscribe(Collections.singletonList(topic));
final ConsumerRecords<String, CustomerAddress> records = consumer.poll(10);
records
.stream()
.forEach(record -> processRecord(record))
// DynamicMessage consumption
// This is optional. By default AWSSchemaRegistryConstants.PROTOBUF_MESSAGE_TYPE is set as ProtobufMessageType.DYNAMIC_MESSAGE.getName()
properties.put(AWSSchemaRegistryConstants.PROTOBUF_MESSAGE_TYPE, ProtobufMessageType.DYNAMIC_MESSAGE.getName());
KafkaConsumer<String, DynamicMessage> consumer =
new KafkaConsumer<String, DynamicMesssage>(properties)
consumer.subscribe(Collections.singletonList(topic));
final ConsumerRecords<String, DynamicMessage> records = consumer.poll(10);
records
.stream()
.forEach(record -> processRecord(record))
```
Dealing with Specific Record (JAVA POJO) for JSON
You could use a Java POJO and pass the object as a record. We use mbknor-jackson-jsonschema to generate a JSON Schema for the POJO passed. This library can also inject additional information in the JSON Schema.
GSR Library uses the "className" to fully classified class name to deserialize back to an Object of the POJO
Example class :
```java
@JsonSchemaDescription("This is a car") @JsonSchemaTitle("Simple Car Schema") @Builder @AllArgsConstructor @EqualsAndHashCode // Fully qualified class name to be added to an additionally injected property // called className for deserializer to determine which class to deserialize // the bytes into @JsonSchemaInject( strings = {@JsonSchemaString(path = "className", value = "com.amazonaws.services.schemaregistry.integrationtests.generators.Car")} ) // List of annotations to help infer JSON Schema are defined by https://github.com/mbknor/mbknor-jackson-jsonSchema public class Car { @JsonProperty(required = true) private String make;
@JsonProperty(required = true)
private String model;
@JsonSchemaDefault("true")
@JsonProperty
public boolean used;
@JsonSchemaInject(ints = {@JsonSchemaInt(path = "multipleOf", value = 1000)})
@Max(200000)
@JsonProperty
private int miles;
@Min(2000)
@JsonProperty
private int year;
@JsonProperty
private Date purchaseDate;
@JsonProperty
@JsonFormat(shape = JsonFormat.Shape.NUMBER)
private Date listedDate;
@JsonProperty
private String[] owners;
@JsonProperty
private Collection<Float> serviceChecks;
// Empty constructor is required by Jackson to deserialize bytes
// into an Object of this class
public Car() {}
}
```
Using AWS Glue Schema Registry with Kinesis Data Streams
Kinesis Client library (KCL) / Kinesis Producer Library (KPL): Getting started with AWS Glue Schema Registry with AWS Kinesis Data Streams
If you cannot use KCL / KPL libraries for Kinesis Data Streams integration, **See examples and integration-tests for working example with Kinesis SDK, KPL and KCL.
Using Auto-Registration
Auto-Registration allows any record produced with new schema to be automatically registered with the AWS Glue Schema Registry. The Schema is registered automatically and a new schema version is created and evolution checks are performed.
If the Schema already exists, but the schema version is new, the new schema version is created and evolution checks are performed.
Auto-Registration is disabled by default. To enable Auto-Registration, enable setting by passing the configuration to the Producer as below :
java
properties.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, true); // If not passed, defaults to false
Providing Registry Name
Registry Name can be provided by setting this property -
java
properties.put(AWSSchemaRegistryConstants.REGISTRY_NAME, "my-registry"); // If not passed, uses "default-registry"
Providing Schema Name
Schema Name can be provided by setting this property -
java
properties.put(AWSSchemaRegistryConstants.SCHEMA_NAME, "my-schema"); // If not passed, uses transport name (topic name in case of Kafka)
Alternatively, a schema registry naming strategy implementation can be provided.
java
properties.put(AWSSchemaRegistryConstants.SCHEMA_NAMING_GENERATION_CLASS,
"com.amazonaws.services.schemaregistry.serializers.avro.CustomerProvidedSchemaNamingStrategy");
An example test implementation class is here.
Providing Registry Description
Registry Description can be provided by setting this property -
java
properties.put(AWSSchemaRegistryConstants.DESCRIPTION, "This registry is used for several purposes."); // If not passed, constructs a description
Providing Compatibility Setting for Schema
Registry Description can be provided by setting this property -
java
properties.put(AWSSchemaRegistryConstants.COMPATIBILITY_SETTING, Compatibility.FULL); // Pass a compatibility mode. If not passed, uses Compatibility.BACKWARD
Using Compression
Deserialized byte array can be compressed to save on data usage over the network and storage on the topic/stream. The Consumer side using AWS Glue Schema Registry Deserializer would be able to decompress and deserialize the byte array. By default, compression is disabled. Customers can choose ZLIB as compressionType by setting up below property.
java
// If not passed, defaults to no compression
properties.put(AWSSchemaRegistryConstants.COMPRESSION_TYPE, AWSSchemaRegistryConstants.COMPRESSION.ZLIB.name());
In-Memory Cache settings
In Memory cache is used by Producer to store schema to schema version id mapping and by consumer to store schema version id to schema mapping. This cache allows Producers and Consumers to save time and hits on IO calls to Schema Registry.
The cache is available by default. However, it can be fine-tuned by providing cache specific properties.
java
properties.put(AWSSchemaRegistryConstants.CACHE_TIME_TO_LIVE_MILLIS, "60000"); // If not passed, defaults to 24 Hours
properties.put(AWSSchemaRegistryConstants.CACHE_SIZE, "100"); // Maximum number of elements in a cache - If not passed, defaults to 200
Migrating from a third party Schema Registry
To migrate to AWS Glue Schema Registry from a third party schema registry for AVRO data types for Kafka, add this property for value class along with the third party jar.
java
properties.put(AWSSchemaRegistryConstants.SECONDARY_DESERAILIZER, <ThirdPartyKafkaDeserializer>);
Using Kafka Connect with AWS Glue Schema Registry
- Clone this repo, build and copy dependencies
java
git clone git@github.com:awslabs/aws-glue-schema-registry.git
cd aws-glue-schema-registry
cd build-tools
mvn clean install
cd ..
mvn clean install
mvn dependency:copy-dependencies
- Configure Kafka Connectors with following properties
When configuring Kafka Connect workers or connectors, use the value of the string constant properties in the AWSSchemaRegistryConstants class to configure the AWSKafkaAvroConverter.
java
key.converter=com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter
value.converter=com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter
key.converter.region=ca-central-1
value.converter.region=ca-central-1
key.converter.schemaAutoRegistrationEnabled=true
value.converter.schemaAutoRegistrationEnabled=true
key.converter.avroRecordType=GENERIC_RECORD
value.converter.avroRecordType=GENERIC_RECORD
key.converter.schemaName=KeySchema
value.converter.schemaName=ValueSchema
As Glue Schema Registry is a fully managed service by AWS, there is no notion of schema registry URLs. Name of the registry (within the same AWS account) can be optionally configured using following options. If not specified, default-registry is used.
java
key.converter.registry.name=my-registry
value.converter.registry.name=my-registry
- Add command below to Launch mode section under kafka-run-class.sh
-cp $CLASSPATH:"<your aws glue schema registry base directory>/target/dependency/*"
It should look like this
# Launch mode
if [ "x$DAEMON_MODE" = "xtrue" ]; then
nohup "$JAVA" $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH:"/Users/johndoe/aws-glue-schema-registry/target/dependency/*" $KAFKA_OPTS "$@" > "$CONSOLE_OUTPUT_FILE" 2>&1 < /dev/null &
else
exec "$JAVA" $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH:"/Users/johndoe/aws-glue-schema-registry/target/dependency/*" $KAFKA_OPTS "$@"
fi
- If using bash, run the below commands to set-up your CLASSPATH in your bashprofile. (For any other shell, update the environment accordingly.)
```bash
echo 'export GSRLIBBASEDIR=<>' >>~/.bashprofile
echo 'export GSRLIBVERSION=1.1.25' >>~/.bashprofile
echo 'export KAFKAHOME=
' >>~/.bash profile echo 'export CLASSPATH=$CLASSPATH:$GSRLIBBASEDIR/avro-kafkaconnect-converter/target/schema-registry-kafkaconnect-converter-$GSRLIBVERSION.jar:$GSRLIBBASEDIR/common/target/schema-registry-common-$GSRLIBVERSION.jar:$GSRLIBBASEDIR/avro-serializer-deserializer/target/schema-registry-serde-$GSRLIBVERSION.jar' >>~/.bashprofile source ~/.bash_profile ``` - (Optional) If you wish to test with a simple file source then clone the file source connector.
bash
git clone https://github.com/mmolimar/kafka-connect-fs.git
cd kafka-connect-fs/
Under source connector configuration(config/kafka-connect-fs.properties), edit the data format to Avro, file reader to AvroFileReader and update an example Avro object from the file path you are reading from. For example:
fs.uris=<path to a sample avro object>
policy.regexp=^.*\.avro$
file_reader.class=com.github.mmolimar.kafka.connect.fs.file.reader.AvroFileReader
Install source connector
mvn clean package
echo "export CLASSPATH=\$CLASSPATH:\"\$(find target/ -type f -name '*.jar'| grep '\-package' | tr '\n' ':')\"" >>~/.bash_profile
source ~/.bash_profile
Update the sink properties under
file=<output file full path>
topics=<my topic>
Start Source Connector (In this example it is file source connector)
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties config/kafka-connect-fs.properties
Run Sink Connector (In this example it is file sink connector))
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties $KAFKA_HOME/config/connect-file-sink.properties
- For more examples for running Kafka Connect with Avro, JSON, and Protobuf formats, refer script run-local-tests.sh under integration-tests module.
Using Kafka Streams with AWS Glue Schema Registry
Maven Dependency
xml
<dependency>
<groupId>software.amazon.glue</groupId>
<artifactId>schema-registry-kafkastreams-serde</artifactId>
<version>1.1.25</version>
</dependency>
```java final Properties props = new Properties(); props.put(StreamsConfig.APPLICATIONIDCONFIG, "avro-streams"); props.put(StreamsConfig.BOOTSTRAPSERVERSCONFIG, "localhost:9092"); props.put(StreamsConfig.CACHEMAXBYTESBUFFERINGCONFIG, 0); props.put(StreamsConfig.DEFAULTKEYSERDECLASSCONFIG, Serdes.String().getClass().getName()); props.put(StreamsConfig.DEFAULTVALUESERDECLASSCONFIG, AWSKafkaAvroSerDe.class.getName()); props.put(ConsumerConfig.AUTOOFFSETRESET_CONFIG, "earliest");
props.put(AWSSchemaRegistryConstants.AWS_REGION, "us-east-1");
props.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, true);
props.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName());
StreamsBuilder builder = new StreamsBuilder();
final KStream<String, GenericRecord> source = builder.stream("avro-input");
final KStream<String, GenericRecord> result = source
.filter((key, value) -> !"pink".equals(String.valueOf(value.get("favorite_color"))));
.filter((key, value) -> !"15.0".equals(String.valueOf(value.get("amount"))));
result.to("avro-output");
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
```
Using the AWS Glue Schema Registry Flink Connector
AWS Glue Schema Registry Flink Connector for Java in this repository is not recommended. Please check out Apache Flink repository for the latest support: Avro SerializationSchema and DeserializationSchema and JSON SerializationSchema and DeserializationSchema. Protobuf integration will be followed up soon.
Maven Dependency
xml
<dependency>
<groupId>software.amazon.glue</groupId>
<artifactId>schema-registry-flink-serde</artifactId>
<version>1.1.25</version>
</dependency>
Code Example
Flink Kafka Producer with AVRO format
```java String topic = "topic"; Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); properties.setProperty("group.id", "test");
Map<String, Object> configs = new HashMap<>();
configs.put(AWSSchemaRegistryConstants.AWS_REGION, "us-east-1");
configs.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, true);
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(new File("path/to/avro/file"));
FlinkKafkaProducer<GenericRecord> producer = new FlinkKafkaProducer<>(
topic,
GlueSchemaRegistryAvroSerializationSchema.forGeneric(schema, topic, configs),
properties);
stream.addSink(producer);
```
Flink Kafka Consumer with AVRO format
```java String topic = "topic"; Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); properties.setProperty("group.id", "test");
Map<String, Object> configs = new HashMap<>();
configs.put(AWSSchemaRegistryConstants.AWS_REGION, "us-east-1");
configs.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName());
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(new File("path/to/avro/file"));
FlinkKafkaConsumer<GenericRecord> consumer = new FlinkKafkaConsumer<>(
topic,
GlueSchemaRegistryAvroDeserializationSchema.forGeneric(schema, configs),
properties);
DataStream<GenericRecord> stream = env.addSource(consumer);
```
Cross-Account Avro Converter Support
The AWSKafkaAvroConverter Avro converter is able to assume an IAM role in a different AWS account before accessing Glue Schema Registry. You can configure the role ARN and an optional session name.
If assumeRoleArn is not provided, the converter will fallback to using the default credentials associated to the host.
Connector configuration
Include these properties in your Kafka Connect worker or connector config:
```properties
Define converter
key.converter=com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter value.converter=com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter
Specify cross-account role arn
key.converter.assumeRoleArn="arn:aws:iam::123456789012:role/my-role" value.converter.assumeRoleArn="arn:aws:iam::123456789012:role/my-role"
Override default session name (optional; default is "kafka-connect-session")
key.converter.assumeRoleSessionName=my-custom-session value.converter.assumeRoleSessionName=my-custom-session ```
## Security issue notifications If you discover a potential security issue in this project we ask that you notify AWS/Amazon Security via our vulnerability reporting page. Please do not create a public github issue.
Owner
- Name: Amazon Web Services - Labs
- Login: awslabs
- Kind: organization
- Location: Seattle, WA
- Website: http://amazon.com/aws/
- Repositories: 914
- Profile: https://github.com/awslabs
AWS Labs
GitHub Events
Total
- Create event: 9
- Issues event: 13
- Release event: 3
- Watch event: 15
- Delete event: 8
- Issue comment event: 53
- Push event: 64
- Pull request review comment event: 101
- Pull request review event: 111
- Pull request event: 69
- Fork event: 18
Last Year
- Create event: 9
- Issues event: 13
- Release event: 3
- Watch event: 15
- Delete event: 8
- Issue comment event: 53
- Push event: 64
- Pull request review comment event: 101
- Pull request review event: 111
- Pull request event: 69
- Fork event: 18
Committers
Last synced: over 3 years ago
All Time
- Total Commits: 110
- Total Committers: 28
- Avg Commits per committer: 3.929
- Development Distribution Score (DDS): 0.782
Top Committers
| Name | Commits | |
|---|---|---|
| Mohit Paliwal | i****l@g****m | 24 |
| Ravindranath Kakarla | r****h@g****m | 12 |
| dependabot[bot] | 4****]@u****m | 11 |
| Kexin | 3****3@u****m | 9 |
| mohit | m****a@a****m | 8 |
| Ravindranath Kakarla | r****h@a****m | 7 |
| Rushin Naik | 7****k@u****m | 4 |
| Kexin Hui | k****i@a****m | 4 |
| George Yang | h****g@a****m | 4 |
| Vanessa Pinto | v****p@a****m | 3 |
| NersesAM | 1****M@u****m | 3 |
| Sebastian Gerstenlauer | S****r@p****m | 3 |
| Linyu Yao | 7****1@u****m | 2 |
| George Yang | d****r@h****m | 2 |
| Aravinthsamy Sekar | 6****r@u****m | 1 |
| Amazon GitHub Automation | 5****o@u****m | 1 |
| Davide Savazzi | d****i@g****m | 1 |
| Matthias Krueger | m****i@g****m | 1 |
| Burak Kose | b****1@g****m | 1 |
| Mark Lambert | m****t@g****m | 1 |
| Thomas Vestergaard Trolle | t****n@u****m | 1 |
| Linyu | y****y@a****m | 1 |
| awsalialem | 8****m@u****m | 1 |
| Vanessa Pinto | 4****7@u****m | 1 |
| Wei | 1****8@u****m | 1 |
| antontreushchenko | 1****o@u****m | 1 |
| Ali Alemi | a****m@a****m | 1 |
| msalistra | m****a@u****m | 1 |
Committer Domains (Top 20 + Academic)
Issues and Pull Requests
Last synced: 10 months ago
All Time
- Total issues: 118
- Total pull requests: 131
- Average time to close issues: 2 months
- Average time to close pull requests: about 1 month
- Total issue authors: 101
- Total pull request authors: 41
- Average comments per issue: 2.49
- Average comments per pull request: 0.64
- Merged pull requests: 72
- Bot issues: 0
- Bot pull requests: 10
Past Year
- Issues: 11
- Pull requests: 43
- Average time to close issues: about 1 month
- Average time to close pull requests: 3 days
- Issue authors: 10
- Pull request authors: 9
- Average comments per issue: 0.18
- Average comments per pull request: 0.05
- Merged pull requests: 21
- Bot issues: 0
- Bot pull requests: 0
Top Authors
Issue Authors
- singhbaljit (3)
- mdimas (3)
- antontreushchenko (2)
- julillosamaral (2)
- raipragyaa (2)
- cr-urra (2)
- chriline (2)
- ShakeelHussain (2)
- blacktooth (2)
- er1c (2)
- javierlarota (2)
- pc-akothapeta (2)
- luizamboni (2)
- dexter-mh-lee (2)
- NersesAM (2)
Pull Request Authors
- isaurab007 (18)
- knoxy5467 (13)
- YangSan0622 (12)
- dependabot[bot] (10)
- vanessapinto257 (8)
- blacktooth (8)
- allkliu (5)
- mohitpali (4)
- bowen-s (4)
- phshah95 (3)
- junyuc25 (3)
- NersesAM (3)
- abhilash47 (2)
- fcappi (2)
- charlescd (2)
Top Labels
Issue Labels
Pull Request Labels
Packages
- Total packages: 11
- Total downloads: unknown
- Total docker downloads: 293,048,699
-
Total dependent packages: 21
(may contain duplicates) -
Total dependent repositories: 358
(may contain duplicates) - Total versions: 287
repo1.maven.org: software.amazon.glue:schema-registry-serde
The AWS Glue Schema Registry Serializer/Deserializer enables Java developers to easily integrate their Apache Kafka and AWS Kinesis applications with AWS Glue Schema Registry
- Homepage: https://aws.amazon.com/glue
- Documentation: https://appdoc.app/artifact/software.amazon.glue/schema-registry-serde/
- License: Apache License, Version 2.0
-
Latest release: 1.1.24
published about 1 year ago
Rankings
repo1.maven.org: software.amazon.glue:schema-registry-common
The AWS Glue Schema Common is the common package used by AWS Glue Schema Registry Library and has the core functionality.
- Homepage: https://aws.amazon.com/glue
- Documentation: https://appdoc.app/artifact/software.amazon.glue/schema-registry-common/
- License: Apache License, Version 2.0
-
Latest release: 1.1.24
published about 1 year ago
Rankings
repo1.maven.org: software.amazon.glue:schema-registry-build-tools
The AWS Glue Schema Registry build tools helps run code coverage for AWS Glue Schema Registry Library
- Homepage: https://aws.amazon.com/glue
- Documentation: https://appdoc.app/artifact/software.amazon.glue/schema-registry-build-tools/
- License: Apache License, Version 2.0
-
Latest release: 1.1.24
published about 1 year ago
Rankings
repo1.maven.org: software.amazon.glue:schema-registry-flink-serde
The AWS Glue Schema Registry Library for Apache Flink enables Java developers to easily integrate their Apache Flink applications with AWS Glue Schema Registry
- Homepage: https://aws.amazon.com/glue
- Documentation: https://appdoc.app/artifact/software.amazon.glue/schema-registry-flink-serde/
- License: Apache License, Version 2.0
-
Latest release: 1.1.24
published about 1 year ago
Rankings
repo1.maven.org: software.amazon.glue:schema-registry-kafkastreams-serde
The AWS Glue Schema Registry Kafka Streams SerDe library enables Java developers to easily integrate their Apache Kafka Streams applications with AWS Glue Schema Registry
- Homepage: https://aws.amazon.com/glue
- Documentation: https://appdoc.app/artifact/software.amazon.glue/schema-registry-kafkastreams-serde/
- License: Apache License, Version 2.0
-
Latest release: 1.1.24
published about 1 year ago
Rankings
repo1.maven.org: software.amazon.glue:protobuf-kafkaconnect-converter
The AWS Glue Schema Registry Kafka Connect Converter enables Java developers to easily integrate their Kafka Connect applications with AWS Glue Schema Registry for Protobuf schemas.
- Homepage: https://aws.amazon.com/glue
- Documentation: https://appdoc.app/artifact/software.amazon.glue/protobuf-kafkaconnect-converter/
- License: Apache License, Version 2.0
-
Latest release: 1.1.24
published about 1 year ago
Rankings
repo1.maven.org: software.amazon.glue:jsonschema-kafkaconnect-converter
The AWS Glue Schema Registry Kafka Connect Converter enables Java developers to easily integrate their Kafka Connect applications with AWS Glue Schema Registry
- Homepage: https://aws.amazon.com/glue
- Documentation: https://appdoc.app/artifact/software.amazon.glue/jsonschema-kafkaconnect-converter/
- License: Apache License, Version 2.0
-
Latest release: 1.1.24
published about 1 year ago
Rankings
repo1.maven.org: software.amazon.glue:schema-registry-examples
The AWS Glue Schema examples has sample code for using Schema Registry with different applications.
- Homepage: https://aws.amazon.com/glue
- Documentation: https://appdoc.app/artifact/software.amazon.glue/schema-registry-examples/
- License: Apache License, Version 2.0
-
Latest release: 1.1.24
published about 1 year ago
Rankings
repo1.maven.org: software.amazon.glue:schema-registry-integration-tests
The AWS Glue Schema Registry tests would do a sanity check for the schema registry library.
- Homepage: https://aws.amazon.com/glue
- Documentation: https://appdoc.app/artifact/software.amazon.glue/schema-registry-integration-tests/
- License: Apache License, Version 2.0
-
Latest release: 1.1.24
published about 1 year ago
Rankings
repo1.maven.org: software.amazon.glue:schema-registry-parent
The AWS Glue Schema Registry Library for Java enables Java developers to easily integrate their data streaming systems with AWS Glue Schema Registry
- Homepage: https://aws.amazon.com/glue
- Documentation: https://appdoc.app/artifact/software.amazon.glue/schema-registry-parent/
- License: Apache License, Version 2.0
-
Latest release: 1.1.24
published about 1 year ago
Rankings
repo1.maven.org: software.amazon.glue:schema-registry-kafkaconnect-converter
The AWS Glue Schema Registry Kafka Connect Converter enables Java developers to easily integrate their Kafka Connect applications with AWS Glue Schema Registry
- Homepage: https://aws.amazon.com/glue
- Documentation: https://appdoc.app/artifact/software.amazon.glue/schema-registry-kafkaconnect-converter/
- License: Apache License, Version 2.0
-
Latest release: 1.1.24
published about 1 year ago
Rankings
Dependencies
- org.apache.flink:flink-streaming-java_2.11 1.12.2 provided
- org.apache.commons:commons-compress 1.21
- org.apache.flink:flink-avro 1.12.2
- software.amazon.glue:schema-registry-serde 1.1.12
- junit:junit 4.13.1 test
- org.hamcrest:hamcrest 2.2 test
- org.junit.jupiter:junit-jupiter-api 5.6.2 test
- org.junit.jupiter:junit-jupiter-engine 5.6.2 test
- org.junit.jupiter:junit-jupiter-params 5.6.2 test
- org.junit.platform:junit-platform-commons 1.6.2 test
- org.mockito:mockito-core 3.3.3 test
- org.mockito:mockito-junit-jupiter 3.3.3 test
- software.amazon.glue:schema-registry-serde 1.0.2 test
- org.apache.kafka:connect-api
- software.amazon.glue:schema-registry-serde 1.1.12
- junit:junit test
- org.junit.jupiter:junit-jupiter-api test
- org.junit.jupiter:junit-jupiter-engine test
- org.junit.jupiter:junit-jupiter-params test
- org.junit.platform:junit-platform-commons test
- org.mockito:mockito-core test
- org.mockito:mockito-junit-jupiter test
- org.powermock:powermock-reflect 2.0.7 test
- uk.co.jemos.podam:podam 7.2.5.RELEASE test
- com.google.guava:guava
- org.apache.avro:avro
- org.apache.commons:commons-compress
- org.apache.commons:commons-lang3
- org.projectlombok:lombok
- org.projectlombok:lombok-utils
- org.slf4j:slf4j-api
- software.amazon.awssdk:aws-json-protocol
- software.amazon.awssdk:glue
- software.amazon.awssdk:url-connection-client
- software.amazon.glue:schema-registry-build-tools 1.1.12
- org.junit.jupiter:junit-jupiter-api test
- org.junit.jupiter:junit-jupiter-engine test
- org.junit.jupiter:junit-jupiter-params test
- org.junit.platform:junit-platform-commons test
- org.mockito:mockito-core test
- org.mockito:mockito-junit-jupiter test
- com.amazonaws:aws-java-sdk-kinesis
- com.fasterxml.jackson.dataformat:jackson-dataformat-cbor
- commons-cli:commons-cli
- org.projectlombok:lombok
- org.projectlombok:lombok-utils
- software.amazon.glue:schema-registry-serde 1.1.12
- cloud.localstack:localstack-utils
- com.amazonaws:amazon-kinesis-producer
- com.google.protobuf:protobuf-java
- org.apache.avro:avro
- org.apache.avro:avro-compiler
- org.apache.avro:avro-ipc
- org.apache.avro:avro-maven-plugin
- org.apache.kafka:kafka-clients
- org.apache.kafka:kafka-streams
- org.apache.kafka:kafka_${kafka.scala.version}
- org.apache.logging.log4j:log4j-1.2-api
- org.apache.logging.log4j:log4j-api
- org.apache.logging.log4j:log4j-core
- org.apache.logging.log4j:log4j-slf4j-impl
- org.awaitility:awaitility
- org.hamcrest:hamcrest-all
- org.junit.jupiter:junit-jupiter
- org.junit.platform:junit-platform-surefire-provider
- org.slf4j:slf4j-api
- org.slf4j:slf4j-simple
- software.amazon.awssdk:auth
- software.amazon.awssdk:kinesis
- software.amazon.glue:schema-registry-kafkastreams-serde 1.1.12
- software.amazon.glue:schema-registry-serde 1.1.12
- software.amazon.kinesis:amazon-kinesis-client
- software.amazon.glue:schema-registry-serde 1.1.12 test
- com.github.erosb:everit-json-schema
- org.apache.kafka:connect-api
- org.apache.kafka:connect-json
- software.amazon.glue:schema-registry-serde 1.1.12
- org.junit.jupiter:junit-jupiter-api test
- org.junit.jupiter:junit-jupiter-engine test
- org.junit.jupiter:junit-jupiter-params test
- org.mockito:mockito-core test
- org.mockito:mockito-junit-jupiter test
- software.amazon.glue:schema-registry-serde 1.1.12
- org.junit.jupiter:junit-jupiter-api test
- org.junit.jupiter:junit-jupiter-engine test
- org.junit.jupiter:junit-jupiter-params test
- org.junit.platform:junit-platform-commons test
- org.mockito:mockito-core test
- org.mockito:mockito-junit-jupiter test
- com.fasterxml.jackson:jackson-bom 2.12.2 import
- software.amazon.awssdk:bom 2.17.122 import
- com.amazonaws:aws-java-sdk-kinesis 1.12.151
- com.fasterxml.jackson.dataformat:jackson-dataformat-cbor 2.12.2
- com.github.erosb:everit-json-schema 1.12.2
- com.google.guava:guava 30.0-jre
- com.google.protobuf:protobuf-java 3.19.2
- com.kjetland:mbknor-jackson-jsonschema_2.12 1.0.39
- commons-cli:commons-cli 1.2
- io.github.classgraph:classgraph 4.8.120
- org.apache.avro:avro 1.11.0
- org.apache.avro:avro-compiler 1.11.0
- org.apache.avro:avro-ipc 1.11.0
- org.apache.avro:avro-maven-plugin 1.11.0
- org.apache.commons:commons-compress 1.21
- org.apache.commons:commons-lang3 3.8.1
- org.apache.kafka:connect-api 2.8.1
- org.apache.kafka:connect-json 2.8.1
- org.apache.kafka:kafka-clients 2.8.1
- org.apache.kafka:kafka-streams 2.8.1
- org.apache.kafka:kafka_2.12 2.8.1
- org.apache.logging.log4j:log4j-1.2-api 2.17.1
- org.apache.logging.log4j:log4j-api 2.17.1
- org.apache.logging.log4j:log4j-core 2.17.1
- org.apache.logging.log4j:log4j-slf4j-impl 2.17.1
- org.junit.platform:junit-platform-surefire-provider 1.3.2
- org.projectlombok:lombok 1.18.20
- org.projectlombok:lombok-utils 1.18.12
- org.slf4j:slf4j-api 1.7.30
- org.slf4j:slf4j-simple 1.7.30
- software.amazon.awssdk:glue 2.17.122
- cloud.localstack:localstack-utils 0.2.11 test
- com.amazonaws:amazon-kinesis-producer LATEST test
- org.awaitility:awaitility 3.0.0 test
- org.hamcrest:hamcrest-all 1.1 test
- org.junit.jupiter:junit-jupiter 5.6.2 test
- org.junit.jupiter:junit-jupiter-api 5.6.2 test
- org.junit.jupiter:junit-jupiter-engine 5.6.2 test
- org.junit.jupiter:junit-jupiter-params 5.6.2 test
- org.junit.platform:junit-platform-commons 1.6.2 test
- org.mockito:mockito-core 3.3.3 test
- org.mockito:mockito-junit-jupiter 3.3.3 test
- software.amazon.kinesis:amazon-kinesis-client 2.2.9 test
- com.google.protobuf:protobuf-java ${protobuf.version}
- com.google.protobuf:protobuf-java-util ${protobuf.version}
- io.apicurio:apicurio-registry-protobuf-schema-utilities ${apicurio-registry.version}
- org.apache.kafka:connect-api
- software.amazon.glue:schema-registry-serde 1.1.12
- org.junit.jupiter:junit-jupiter-api test
- org.junit.jupiter:junit-jupiter-engine test
- org.junit.jupiter:junit-jupiter-params test
- org.mockito:mockito-core test
- org.mockito:mockito-junit-jupiter test
- com.amazonaws:aws-java-sdk-sts ${aws.sdk.v1.version}
- com.github.erosb:everit-json-schema
- com.google.api.grpc:proto-google-common-protos 2.7.4
- com.google.jimfs:jimfs 1.1
- com.google.protobuf:protobuf-java ${protobuf.version}
- com.google.protobuf:protobuf-java
- com.kjetland:mbknor-jackson-jsonschema_2.12
- com.squareup.wire:wire-compiler ${square-wireschema.version}
- com.squareup.wire:wire-schema ${square-wireschema.version}
- io.github.classgraph:classgraph
- org.apache.kafka:kafka-clients
- software.amazon.awssdk:arns
- software.amazon.awssdk:sts ${aws.sdk.v2.version}
- software.amazon.glue:schema-registry-common 1.1.12
- com.google.truth.extensions:truth-proto-extension 1.1.3 test
- org.junit.jupiter:junit-jupiter-api test
- org.junit.jupiter:junit-jupiter-engine test
- org.junit.jupiter:junit-jupiter-params test
- org.junit.platform:junit-platform-commons test
- org.mockito:mockito-core test
- org.mockito:mockito-junit-jupiter test
- actions/cache v2 composite
- actions/checkout v2 composite
- actions/setup-java v1 composite
- public.ecr.aws/bitnami/kafka latest
- public.ecr.aws/bitnami/zookeeper latest
- software.amazon.glue:schema-registry-serde 1.1.14
- software.amazon.msk:aws-msk-iam-auth ${aws.msk.iam.auth}