https://github.com/awslabs/aws-glue-schema-registry

AWS Glue Schema Registry Client library provides serializers / de-serializers for applications to integrate with AWS Glue Schema Registry Service. The library currently supports Avro, JSON and Protobuf data formats. See https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html to get started.

https://github.com/awslabs/aws-glue-schema-registry

Science Score: 26.0%

This score indicates how likely this project is to be science-related based on various indicators:

  • CITATION.cff file
  • codemeta.json file
    Found codemeta.json file
  • .zenodo.json file
    Found .zenodo.json file
  • DOI references
  • Academic publication links
  • Committers with academic emails
  • Institutional organization owner
  • JOSS paper metadata
  • Scientific vocabulary similarity
    Low similarity (12.6%) to scientific vocabulary

Keywords from Contributors

diagram interactive labels
Last synced: 10 months ago · JSON representation

Repository

AWS Glue Schema Registry Client library provides serializers / de-serializers for applications to integrate with AWS Glue Schema Registry Service. The library currently supports Avro, JSON and Protobuf data formats. See https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html to get started.

Basic Info
  • Host: GitHub
  • Owner: awslabs
  • License: apache-2.0
  • Language: Java
  • Default Branch: master
  • Homepage:
  • Size: 1.95 MB
Statistics
  • Stars: 142
  • Watchers: 14
  • Forks: 114
  • Open Issues: 124
  • Releases: 24
Created over 6 years ago · Last pushed 10 months ago
Metadata Files
Readme Changelog Contributing License Code of conduct Notice

README.md

AWS Glue Schema Registry Library

Build Status CI Status Apache 2 License Java

AWS Glue Schema Registry provides a solution for customers to centrally discover, control and evolve schemas while ensuring data produced was validated by registered schemas. AWS Glue Schema Registry Library offers Serializers and Deserializers that plug-in with Glue Schema Registry.

Getting Started

  1. Sign up for AWS — Before you begin, you need an AWS account. For more information about creating an AWS account and retrieving your AWS credentials, see AWS Account and Credentials in the AWS SDK for Java Developer Guide.
  2. Sign up for AWS Glue Schema Registry — Go to the AWS Glue Schema Registry console to sign up for the service and create an AWS Glue Schema Registry. For more information, see Getting Started with Glue Schema Registry in the AWS Glue Developer Guide.
  3. Minimum requirements — To use the AWS Glue Schema Registry, you'll need Java > 1.8 and < Java 15.

Features

  1. Messages/records are serialized on producer front and deserialized on the consumer front by using schema-registry-serde.
  2. Support for three data formats: AVRO, JSON (with JSON Schema Draft04, Draft06, Draft07), and Protocol Buffers (Protobuf syntax versions 2 and 3).
  3. Kafka Streams support for AWS Glue Schema Registry.
  4. Records can be compressed to reduce message size.
  5. An inbuilt local in-memory cache to save calls to AWS Glue Schema Registry. The schema version id for a schema definition is cached on Producer side and schema for a schema version id is cached on the Consumer side.
  6. Auto registration of schema can be enabled for any new schema to be auto-registered.
  7. For Schemas, Evolution check is performed while registering.
  8. Migration from a third party Schema Registry.
  9. Flink support for AWS Glue Schema Registry.
  10. Kafka Connect support for AWS Glue Schema Registry.

Building from Source

After you've downloaded the code from GitHub, you can build it using Maven.

The following maven command will clean the target directory, compile the project, execute the tests and package the project build into a JAR.

cd build-tools/ && mvn clean install && cd .. && mvn clean install

Alternatively, one could git clone this repo and run mvn clean install.

Testing

To simply run the tests, execute the following maven command:

mvn test

Using the AWS Glue Schema Registry Library Serializer / Deserializer

The recommended way to use the AWS Glue Schema Registry Library for Java is to consume it from Maven.

Using AWS Glue Schema Registry with Amazon MSK — To set-up Amazon Managed Streaming for Apache Kafka see Getting started with Amazon MSK.

Maven Dependency

xml <dependency> <groupId>software.amazon.glue</groupId> <artifactId>schema-registry-serde</artifactId> <version>1.1.25</version> </dependency>

Code Example

Producer for Kafka with AVRO format

```java properties.put(ProducerConfig.KEYSERIALIZERCLASSCONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUESERIALIZERCLASSCONFIG, GlueSchemaRegistryKafkaSerializer.class.getName()); properties.put(AWSSchemaRegistryConstants.DATAFORMAT, DataFormat.AVRO.name()); properties.put(AWSSchemaRegistryConstants.AWSREGION, "us-east-1"); properties.put(AWSSchemaRegistryConstants.REGISTRYNAME, "my-registry"); properties.put(AWSSchemaRegistryConstants.SCHEMANAME, "my-schema");

    Schema schema_payment = null;
    try {
        schema_payment = parser.parse(new File("src/main/resources/avro/com/tutorial/Payment.avsc"));
    } catch (IOException e) {
        e.printStackTrace();
    }

    GenericRecord musical = new GenericData.Record(schema_payment);
    musical.put("id", "entertainment_2");
    musical.put("amount", 105.0);

    List<GenericRecord> misc = new ArrayList<>();
    misc.add(musical);

    try (KafkaProducer<String, GenericRecord> producer = new KafkaProducer<String, GenericRecord>(properties)) {
        for (int i = 0; i < 4; i++) {
            GenericRecord r = misc.get(i);

            final ProducerRecord<String, GenericRecord> record;
            record = new ProducerRecord<String, GenericRecord>(topic, r.get("id").toString(), r);

            producer.send(record);
            System.out.println("Sent message " + i);
            Thread.sleep(1000L);
        }
        producer.flush();
        System.out.println("Successfully produced 10 messages to a topic called " + topic);

    } catch (final InterruptedException | SerializationException e) {
        e.printStackTrace();
    }

```

Consumer for Kafka with AVRO format

```java properties.put(ConsumerConfig.KEYDESERIALIZERCLASSCONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUEDESERIALIZERCLASSCONFIG, GlueSchemaRegistryKafkaDeserializer.class .getName(); properties.put(AWSSchemaRegistryConstants.AWSREGION, "us-east-1"); properties.put(AWSSchemaRegistryConstants.AVRORECORDTYPE, AvroRecordType.GENERICRECORD.getName());

    try (final KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<String, GenericRecord>(properties)) {
        consumer.subscribe(Collections.singletonList(topic));

        while (true) {
            final ConsumerRecords<String, GenericRecord> records = consumer.poll(100);
            for (final ConsumerRecord<String, GenericRecord> record : records) {
                final String key = record.key();
                final GenericRecord value = record.value();
                System.out.println("Received message: key = " + key + ", value = " + value);
            }
        }
    }

```

Producer for Kafka with JSON format

```java properties.put(ProducerConfig.KEYSERIALIZERCLASSCONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUESERIALIZERCLASSCONFIG, GlueSchemaRegistryKafkaSerializer.class.getName()); properties.put(AWSSchemaRegistryConstants.DATAFORMAT, DataFormat.JSON.name()); properties.put(AWSSchemaRegistryConstants.AWSREGION, "us-east-1"); properties.put(AWSSchemaRegistryConstants.REGISTRYNAME, "my-registry"); properties.put(AWSSchemaRegistryConstants.SCHEMANAME, "my-schema");

    String jsonSchema = "{\n" + "        \"$schema\": \"http://json-schema.org/draft-04/schema#\",\n"
                                            + "        \"type\": \"object\",\n" + "        \"properties\": {\n" + "          \"employee\": {\n"
                                            + "            \"type\": \"object\",\n" + "            \"properties\": {\n"
                                            + "              \"name\": {\n" + "                \"type\": \"string\"\n" + "              },\n"
                                            + "              \"age\": {\n" + "                \"type\": \"integer\"\n" + "              },\n"
                                            + "              \"city\": {\n" + "                \"type\": \"string\"\n" + "              }\n"
                                            + "            },\n" + "            \"required\": [\n" + "              \"name\",\n"
                                            + "              \"age\",\n" + "              \"city\"\n" + "            ]\n" + "          }\n"
                                            + "        },\n" + "        \"required\": [\n" + "          \"employee\"\n" + "        ]\n"
                                            + "      }";
    String jsonPayload = "{\n" + "        \"employee\": {\n" + "          \"name\": \"John\",\n" + "          \"age\": 30,\n"
                                             + "          \"city\": \"New York\"\n" + "        }\n" + "      }";

    JsonDataWithSchema jsonSchemaWithData = JsonDataWithSchema.builder(jsonSchema, jsonPayload).build();

    List<JsonDataWithSchema> genericJsonRecords = new ArrayList<>();
    genericJsonRecords.add(jsonSchemaWithData);

    try (KafkaProducer<String, JsonDataWithSchema> producer = new KafkaProducer<String, JsonDataWithSchema>(properties)) {
        for (int i = 0; i < genericJsonRecords.size(); i++) {
            JsonDataWithSchema r = genericJsonRecords.get(i);

            final ProducerRecord<String, JsonDataWithSchema> record;
            record = new ProducerRecord<String, JsonDataWithSchema>(topic, "message-" + i, r);

            producer.send(record);
            System.out.println("Sent message " + i);
            Thread.sleep(1000L);
        }
        producer.flush();
        System.out.println("Successfully produced 10 messages to a topic called " + topic);

    } catch (final InterruptedException | SerializationException e) {
        e.printStackTrace();
    }

```

Consumer for Kafka with JSON format

```java properties.put(ConsumerConfig.KEYDESERIALIZERCLASSCONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUEDESERIALIZERCLASSCONFIG, GlueSchemaRegistryKafkaDeserializer.class .getName(); properties.put(AWSSchemaRegistryConstants.AWS_REGION, "us-east-1");

    try (final KafkaConsumer<String, JsonDataWithSchema> consumer = new KafkaConsumer<String, JsonDataWithSchema>(properties)) {
        consumer.subscribe(Collections.singletonList(topic));

        while (true) {
            final ConsumerRecords<String, JsonDataWithSchema> records = consumer.poll(100);
            for (final ConsumerRecord<String, JsonDataWithSchema> record : records) {
                final String key = record.key();
                final JsonDataWithSchema value = record.value();
                System.out.println("Received message: key = " + key + ", value = " + value);
            }
        }
    }

```

Producer for Kafka with PROTOBUF format

```java properties.put(ProducerConfig.KEYSERIALIZERCLASSCONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUESERIALIZERCLASSCONFIG, GlueSchemaRegistryKafkaSerializer.class.getName()); properties.put(AWSSchemaRegistryConstants.DATAFORMAT, DataFormat.PROTOBUF.name()); properties.put(AWSSchemaRegistryConstants.AWSREGION, "us-east-1"); properties.put(AWSSchemaRegistryConstants.REGISTRYNAME, "my-registry"); properties.put(AWSSchemaRegistryConstants.SCHEMANAME, "protobuf-file-name.proto")

    // POJO production

    // CustomerAddress is the generated Protocol Buffers class based on the given Protobuf schema definition
    CustomerAddress customerAddress = CustomerAddress.newBuilder().build();

    KafkaProducer<String, CustomerAddress> producer = 
         new KafkaProducer<String, CustomerAddress>(properties);

    producer.send(customerAddress);

    // DynamicMessage production

    DynamicMesssage customerDynamicMessage = 
         DynamicMessage.newBuilder(CustomerAddress.getDescriptor()).build();

    KafkaProducer<String, DynamicMesssage> producer = 
         new KafkaProducer<String, DynamicMesssage>(properties);

    producer.send(customerDynamicMessage);

```

Consumer for Kafka with PROTOBUF format

```java properties.put(ConsumerConfig.KEYDESERIALIZERCLASSCONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUEDESERIALIZERCLASSCONFIG, GlueSchemaRegistryKafkaDeserializer.class.getName()); properties.put(AWSSchemaRegistryConstants.AWS_REGION, "us-east-1");

    // POJO consumption

    properties.put(AWSSchemaRegistryConstants.PROTOBUF_MESSAGE_TYPE, ProtobufMessageType.POJO.getName());

    KafkaConsumer<String, CustomerAddress> consumer = 
         new KafkaConsumer<String, CustomerAddress>(properties)

    consumer.subscribe(Collections.singletonList(topic));

    final ConsumerRecords<String, CustomerAddress> records = consumer.poll(10);
    records
        .stream()
        .forEach(record -> processRecord(record))

    // DynamicMessage consumption

    // This is optional. By default AWSSchemaRegistryConstants.PROTOBUF_MESSAGE_TYPE is set as ProtobufMessageType.DYNAMIC_MESSAGE.getName()
    properties.put(AWSSchemaRegistryConstants.PROTOBUF_MESSAGE_TYPE, ProtobufMessageType.DYNAMIC_MESSAGE.getName());

    KafkaConsumer<String, DynamicMessage> consumer = 
         new KafkaConsumer<String, DynamicMesssage>(properties)

    consumer.subscribe(Collections.singletonList(topic));

    final ConsumerRecords<String, DynamicMessage> records = consumer.poll(10);
    records
        .stream()
        .forEach(record -> processRecord(record))

```

Dealing with Specific Record (JAVA POJO) for JSON

You could use a Java POJO and pass the object as a record. We use mbknor-jackson-jsonschema to generate a JSON Schema for the POJO passed. This library can also inject additional information in the JSON Schema.

GSR Library uses the "className" to fully classified class name to deserialize back to an Object of the POJO

Example class :

```java

@JsonSchemaDescription("This is a car") @JsonSchemaTitle("Simple Car Schema") @Builder @AllArgsConstructor @EqualsAndHashCode // Fully qualified class name to be added to an additionally injected property // called className for deserializer to determine which class to deserialize // the bytes into @JsonSchemaInject( strings = {@JsonSchemaString(path = "className", value = "com.amazonaws.services.schemaregistry.integrationtests.generators.Car")} ) // List of annotations to help infer JSON Schema are defined by https://github.com/mbknor/mbknor-jackson-jsonSchema public class Car { @JsonProperty(required = true) private String make;

@JsonProperty(required = true)
private String model;

@JsonSchemaDefault("true")
@JsonProperty
public boolean used;

@JsonSchemaInject(ints = {@JsonSchemaInt(path = "multipleOf", value = 1000)})
@Max(200000)
@JsonProperty
private int miles;

@Min(2000)
@JsonProperty
private int year;

@JsonProperty
private Date purchaseDate;

@JsonProperty
@JsonFormat(shape = JsonFormat.Shape.NUMBER)
private Date listedDate;

@JsonProperty
private String[] owners;

@JsonProperty
private Collection<Float> serviceChecks;

// Empty constructor is required by Jackson to deserialize bytes
// into an Object of this class
public Car() {}

}

```

Using AWS Glue Schema Registry with Kinesis Data Streams

Kinesis Client library (KCL) / Kinesis Producer Library (KPL): Getting started with AWS Glue Schema Registry with AWS Kinesis Data Streams

If you cannot use KCL / KPL libraries for Kinesis Data Streams integration, **See examples and integration-tests for working example with Kinesis SDK, KPL and KCL.

Using Auto-Registration

Auto-Registration allows any record produced with new schema to be automatically registered with the AWS Glue Schema Registry. The Schema is registered automatically and a new schema version is created and evolution checks are performed.

If the Schema already exists, but the schema version is new, the new schema version is created and evolution checks are performed.

Auto-Registration is disabled by default. To enable Auto-Registration, enable setting by passing the configuration to the Producer as below :

java properties.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, true); // If not passed, defaults to false

Providing Registry Name

Registry Name can be provided by setting this property -

java properties.put(AWSSchemaRegistryConstants.REGISTRY_NAME, "my-registry"); // If not passed, uses "default-registry"

Providing Schema Name

Schema Name can be provided by setting this property -

java properties.put(AWSSchemaRegistryConstants.SCHEMA_NAME, "my-schema"); // If not passed, uses transport name (topic name in case of Kafka)

Alternatively, a schema registry naming strategy implementation can be provided. java properties.put(AWSSchemaRegistryConstants.SCHEMA_NAMING_GENERATION_CLASS, "com.amazonaws.services.schemaregistry.serializers.avro.CustomerProvidedSchemaNamingStrategy"); An example test implementation class is here.

Providing Registry Description

Registry Description can be provided by setting this property -

java properties.put(AWSSchemaRegistryConstants.DESCRIPTION, "This registry is used for several purposes."); // If not passed, constructs a description

Providing Compatibility Setting for Schema

Registry Description can be provided by setting this property -

java properties.put(AWSSchemaRegistryConstants.COMPATIBILITY_SETTING, Compatibility.FULL); // Pass a compatibility mode. If not passed, uses Compatibility.BACKWARD

Using Compression

Deserialized byte array can be compressed to save on data usage over the network and storage on the topic/stream. The Consumer side using AWS Glue Schema Registry Deserializer would be able to decompress and deserialize the byte array. By default, compression is disabled. Customers can choose ZLIB as compressionType by setting up below property.

java // If not passed, defaults to no compression properties.put(AWSSchemaRegistryConstants.COMPRESSION_TYPE, AWSSchemaRegistryConstants.COMPRESSION.ZLIB.name());

In-Memory Cache settings

In Memory cache is used by Producer to store schema to schema version id mapping and by consumer to store schema version id to schema mapping. This cache allows Producers and Consumers to save time and hits on IO calls to Schema Registry.

The cache is available by default. However, it can be fine-tuned by providing cache specific properties.

java properties.put(AWSSchemaRegistryConstants.CACHE_TIME_TO_LIVE_MILLIS, "60000"); // If not passed, defaults to 24 Hours properties.put(AWSSchemaRegistryConstants.CACHE_SIZE, "100"); // Maximum number of elements in a cache - If not passed, defaults to 200

Migrating from a third party Schema Registry

To migrate to AWS Glue Schema Registry from a third party schema registry for AVRO data types for Kafka, add this property for value class along with the third party jar.

java properties.put(AWSSchemaRegistryConstants.SECONDARY_DESERAILIZER, <ThirdPartyKafkaDeserializer>);

Using Kafka Connect with AWS Glue Schema Registry

  • Clone this repo, build and copy dependencies

java git clone git@github.com:awslabs/aws-glue-schema-registry.git cd aws-glue-schema-registry cd build-tools mvn clean install cd .. mvn clean install mvn dependency:copy-dependencies

  • Configure Kafka Connectors with following properties

When configuring Kafka Connect workers or connectors, use the value of the string constant properties in the AWSSchemaRegistryConstants class to configure the AWSKafkaAvroConverter.

java key.converter=com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter value.converter=com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter key.converter.region=ca-central-1 value.converter.region=ca-central-1 key.converter.schemaAutoRegistrationEnabled=true value.converter.schemaAutoRegistrationEnabled=true key.converter.avroRecordType=GENERIC_RECORD value.converter.avroRecordType=GENERIC_RECORD key.converter.schemaName=KeySchema value.converter.schemaName=ValueSchema

As Glue Schema Registry is a fully managed service by AWS, there is no notion of schema registry URLs. Name of the registry (within the same AWS account) can be optionally configured using following options. If not specified, default-registry is used.

java key.converter.registry.name=my-registry value.converter.registry.name=my-registry

  • Add command below to Launch mode section under kafka-run-class.sh

-cp $CLASSPATH:"<your aws glue schema registry base directory>/target/dependency/*"

It should look like this

# Launch mode if [ "x$DAEMON_MODE" = "xtrue" ]; then nohup "$JAVA" $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH:"/Users/johndoe/aws-glue-schema-registry/target/dependency/*" $KAFKA_OPTS "$@" > "$CONSOLE_OUTPUT_FILE" 2>&1 < /dev/null & else exec "$JAVA" $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH:"/Users/johndoe/aws-glue-schema-registry/target/dependency/*" $KAFKA_OPTS "$@" fi

  • If using bash, run the below commands to set-up your CLASSPATH in your bashprofile. (For any other shell, update the environment accordingly.) ```bash echo 'export GSRLIBBASEDIR=<>' >>~/.bashprofile echo 'export GSRLIBVERSION=1.1.25' >>~/.bashprofile echo 'export KAFKAHOME=' >>~/.bashprofile echo 'export CLASSPATH=$CLASSPATH:$GSRLIBBASEDIR/avro-kafkaconnect-converter/target/schema-registry-kafkaconnect-converter-$GSRLIBVERSION.jar:$GSRLIBBASEDIR/common/target/schema-registry-common-$GSRLIBVERSION.jar:$GSRLIBBASEDIR/avro-serializer-deserializer/target/schema-registry-serde-$GSRLIBVERSION.jar' >>~/.bashprofile source ~/.bash_profile ```
  • (Optional) If you wish to test with a simple file source then clone the file source connector.

bash git clone https://github.com/mmolimar/kafka-connect-fs.git cd kafka-connect-fs/

Under source connector configuration(config/kafka-connect-fs.properties), edit the data format to Avro, file reader to AvroFileReader and update an example Avro object from the file path you are reading from. For example:

fs.uris=<path to a sample avro object> policy.regexp=^.*\.avro$ file_reader.class=com.github.mmolimar.kafka.connect.fs.file.reader.AvroFileReader

Install source connector

mvn clean package echo "export CLASSPATH=\$CLASSPATH:\"\$(find target/ -type f -name '*.jar'| grep '\-package' | tr '\n' ':')\"" >>~/.bash_profile source ~/.bash_profile

Update the sink properties under /config/connect-file-sink.properties

file=<output file full path> topics=<my topic>

Start Source Connector (In this example it is file source connector)

$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties config/kafka-connect-fs.properties

Run Sink Connector (In this example it is file sink connector))

$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties $KAFKA_HOME/config/connect-file-sink.properties

  • For more examples for running Kafka Connect with Avro, JSON, and Protobuf formats, refer script run-local-tests.sh under integration-tests module.

Using Kafka Streams with AWS Glue Schema Registry

Maven Dependency

xml <dependency> <groupId>software.amazon.glue</groupId> <artifactId>schema-registry-kafkastreams-serde</artifactId> <version>1.1.25</version> </dependency>

```java final Properties props = new Properties(); props.put(StreamsConfig.APPLICATIONIDCONFIG, "avro-streams"); props.put(StreamsConfig.BOOTSTRAPSERVERSCONFIG, "localhost:9092"); props.put(StreamsConfig.CACHEMAXBYTESBUFFERINGCONFIG, 0); props.put(StreamsConfig.DEFAULTKEYSERDECLASSCONFIG, Serdes.String().getClass().getName()); props.put(StreamsConfig.DEFAULTVALUESERDECLASSCONFIG, AWSKafkaAvroSerDe.class.getName()); props.put(ConsumerConfig.AUTOOFFSETRESET_CONFIG, "earliest");

props.put(AWSSchemaRegistryConstants.AWS_REGION, "us-east-1");
props.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, true);
props.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName());

StreamsBuilder builder = new StreamsBuilder();
final KStream<String, GenericRecord> source = builder.stream("avro-input");
final KStream<String, GenericRecord> result = source
    .filter((key, value) -> !"pink".equals(String.valueOf(value.get("favorite_color"))));
    .filter((key, value) -> !"15.0".equals(String.valueOf(value.get("amount"))));
result.to("avro-output");

KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();

```

Using the AWS Glue Schema Registry Flink Connector

AWS Glue Schema Registry Flink Connector for Java in this repository is not recommended. Please check out Apache Flink repository for the latest support: Avro SerializationSchema and DeserializationSchema and JSON SerializationSchema and DeserializationSchema. Protobuf integration will be followed up soon.

Maven Dependency

xml <dependency> <groupId>software.amazon.glue</groupId> <artifactId>schema-registry-flink-serde</artifactId> <version>1.1.25</version> </dependency>

Code Example

Flink Kafka Producer with AVRO format

```java String topic = "topic"; Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); properties.setProperty("group.id", "test");

Map<String, Object> configs = new HashMap<>();
configs.put(AWSSchemaRegistryConstants.AWS_REGION, "us-east-1");
configs.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, true);

Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(new File("path/to/avro/file"));

FlinkKafkaProducer<GenericRecord> producer = new FlinkKafkaProducer<>(
        topic,
        GlueSchemaRegistryAvroSerializationSchema.forGeneric(schema, topic, configs),
        properties);
stream.addSink(producer);

```

Flink Kafka Consumer with AVRO format

```java String topic = "topic"; Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); properties.setProperty("group.id", "test");

Map<String, Object> configs = new HashMap<>();
configs.put(AWSSchemaRegistryConstants.AWS_REGION, "us-east-1");
configs.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName());

Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(new File("path/to/avro/file"));

FlinkKafkaConsumer<GenericRecord> consumer = new FlinkKafkaConsumer<>(
        topic,
        GlueSchemaRegistryAvroDeserializationSchema.forGeneric(schema, configs),
        properties);
DataStream<GenericRecord> stream = env.addSource(consumer);

```

Cross-Account Avro Converter Support

The AWSKafkaAvroConverter Avro converter is able to assume an IAM role in a different AWS account before accessing Glue Schema Registry. You can configure the role ARN and an optional session name.

If assumeRoleArn is not provided, the converter will fallback to using the default credentials associated to the host.

Connector configuration

Include these properties in your Kafka Connect worker or connector config:

```properties

Define converter

key.converter=com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter value.converter=com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter

Specify cross-account role arn

key.converter.assumeRoleArn="arn:aws:iam::123456789012:role/my-role" value.converter.assumeRoleArn="arn:aws:iam::123456789012:role/my-role"

Override default session name (optional; default is "kafka-connect-session")

key.converter.assumeRoleSessionName=my-custom-session value.converter.assumeRoleSessionName=my-custom-session ```

## Security issue notifications If you discover a potential security issue in this project we ask that you notify AWS/Amazon Security via our vulnerability reporting page. Please do not create a public github issue.

Owner

  • Name: Amazon Web Services - Labs
  • Login: awslabs
  • Kind: organization
  • Location: Seattle, WA

AWS Labs

GitHub Events

Total
  • Create event: 9
  • Issues event: 13
  • Release event: 3
  • Watch event: 15
  • Delete event: 8
  • Issue comment event: 53
  • Push event: 64
  • Pull request review comment event: 101
  • Pull request review event: 111
  • Pull request event: 69
  • Fork event: 18
Last Year
  • Create event: 9
  • Issues event: 13
  • Release event: 3
  • Watch event: 15
  • Delete event: 8
  • Issue comment event: 53
  • Push event: 64
  • Pull request review comment event: 101
  • Pull request review event: 111
  • Pull request event: 69
  • Fork event: 18

Committers

Last synced: over 3 years ago

All Time
  • Total Commits: 110
  • Total Committers: 28
  • Avg Commits per committer: 3.929
  • Development Distribution Score (DDS): 0.782
Top Committers
Name Email Commits
Mohit Paliwal i****l@g****m 24
Ravindranath Kakarla r****h@g****m 12
dependabot[bot] 4****]@u****m 11
Kexin 3****3@u****m 9
mohit m****a@a****m 8
Ravindranath Kakarla r****h@a****m 7
Rushin Naik 7****k@u****m 4
Kexin Hui k****i@a****m 4
George Yang h****g@a****m 4
Vanessa Pinto v****p@a****m 3
NersesAM 1****M@u****m 3
Sebastian Gerstenlauer S****r@p****m 3
Linyu Yao 7****1@u****m 2
George Yang d****r@h****m 2
Aravinthsamy Sekar 6****r@u****m 1
Amazon GitHub Automation 5****o@u****m 1
Davide Savazzi d****i@g****m 1
Matthias Krueger m****i@g****m 1
Burak Kose b****1@g****m 1
Mark Lambert m****t@g****m 1
Thomas Vestergaard Trolle t****n@u****m 1
Linyu y****y@a****m 1
awsalialem 8****m@u****m 1
Vanessa Pinto 4****7@u****m 1
Wei 1****8@u****m 1
antontreushchenko 1****o@u****m 1
Ali Alemi a****m@a****m 1
msalistra m****a@u****m 1
Committer Domains (Top 20 + Academic)

Issues and Pull Requests

Last synced: 10 months ago

All Time
  • Total issues: 118
  • Total pull requests: 131
  • Average time to close issues: 2 months
  • Average time to close pull requests: about 1 month
  • Total issue authors: 101
  • Total pull request authors: 41
  • Average comments per issue: 2.49
  • Average comments per pull request: 0.64
  • Merged pull requests: 72
  • Bot issues: 0
  • Bot pull requests: 10
Past Year
  • Issues: 11
  • Pull requests: 43
  • Average time to close issues: about 1 month
  • Average time to close pull requests: 3 days
  • Issue authors: 10
  • Pull request authors: 9
  • Average comments per issue: 0.18
  • Average comments per pull request: 0.05
  • Merged pull requests: 21
  • Bot issues: 0
  • Bot pull requests: 0
Top Authors
Issue Authors
  • singhbaljit (3)
  • mdimas (3)
  • antontreushchenko (2)
  • julillosamaral (2)
  • raipragyaa (2)
  • cr-urra (2)
  • chriline (2)
  • ShakeelHussain (2)
  • blacktooth (2)
  • er1c (2)
  • javierlarota (2)
  • pc-akothapeta (2)
  • luizamboni (2)
  • dexter-mh-lee (2)
  • NersesAM (2)
Pull Request Authors
  • isaurab007 (18)
  • knoxy5467 (13)
  • YangSan0622 (12)
  • dependabot[bot] (10)
  • vanessapinto257 (8)
  • blacktooth (8)
  • allkliu (5)
  • mohitpali (4)
  • bowen-s (4)
  • phshah95 (3)
  • junyuc25 (3)
  • NersesAM (3)
  • abhilash47 (2)
  • fcappi (2)
  • charlescd (2)
Top Labels
Issue Labels
enhancement (19) question (19) PR welcome (18) research (12) bug (3) documentation (2) dependencies (2)
Pull Request Labels
dependencies (10)

Packages

  • Total packages: 11
  • Total downloads: unknown
  • Total docker downloads: 293,048,699
  • Total dependent packages: 21
    (may contain duplicates)
  • Total dependent repositories: 358
    (may contain duplicates)
  • Total versions: 287
repo1.maven.org: software.amazon.glue:schema-registry-serde

The AWS Glue Schema Registry Serializer/Deserializer enables Java developers to easily integrate their Apache Kafka and AWS Kinesis applications with AWS Glue Schema Registry

  • Versions: 28
  • Dependent Packages: 10
  • Dependent Repositories: 197
  • Docker Downloads: 232,228,379
Rankings
Docker downloads count: 0.4%
Dependent repos count: 1.2%
Dependent packages count: 6.1%
Average: 11.4%
Forks count: 21.5%
Stargazers count: 27.9%
Last synced: 10 months ago
repo1.maven.org: software.amazon.glue:schema-registry-common

The AWS Glue Schema Common is the common package used by AWS Glue Schema Registry Library and has the core functionality.

  • Versions: 28
  • Dependent Packages: 6
  • Dependent Repositories: 149
  • Docker Downloads: 30,477,016
Rankings
Docker downloads count: 1.0%
Dependent repos count: 1.4%
Dependent packages count: 9.7%
Average: 12.3%
Forks count: 21.5%
Stargazers count: 27.9%
Last synced: 10 months ago
repo1.maven.org: software.amazon.glue:schema-registry-build-tools

The AWS Glue Schema Registry build tools helps run code coverage for AWS Glue Schema Registry Library

  • Versions: 28
  • Dependent Packages: 1
  • Dependent Repositories: 4
  • Docker Downloads: 30,343,127
Rankings
Docker downloads count: 1.0%
Dependent repos count: 12.1%
Average: 19.1%
Forks count: 21.5%
Stargazers count: 27.9%
Dependent packages count: 33.0%
Last synced: 10 months ago
repo1.maven.org: software.amazon.glue:schema-registry-flink-serde

The AWS Glue Schema Registry Library for Apache Flink enables Java developers to easily integrate their Apache Flink applications with AWS Glue Schema Registry

  • Versions: 28
  • Dependent Packages: 2
  • Dependent Repositories: 4
  • Docker Downloads: 24
Rankings
Dependent repos count: 12.1%
Average: 21.2%
Forks count: 21.5%
Dependent packages count: 23.2%
Stargazers count: 27.9%
Last synced: 10 months ago
repo1.maven.org: software.amazon.glue:schema-registry-kafkastreams-serde

The AWS Glue Schema Registry Kafka Streams SerDe library enables Java developers to easily integrate their Apache Kafka Streams applications with AWS Glue Schema Registry

  • Versions: 28
  • Dependent Packages: 2
  • Dependent Repositories: 4
  • Docker Downloads: 24
Rankings
Dependent repos count: 12.1%
Average: 21.2%
Forks count: 21.5%
Dependent packages count: 23.2%
Stargazers count: 27.9%
Last synced: 10 months ago
repo1.maven.org: software.amazon.glue:protobuf-kafkaconnect-converter

The AWS Glue Schema Registry Kafka Connect Converter enables Java developers to easily integrate their Kafka Connect applications with AWS Glue Schema Registry for Protobuf schemas.

  • Versions: 14
  • Dependent Packages: 0
  • Dependent Repositories: 0
  • Docker Downloads: 24
Rankings
Forks count: 15.1%
Stargazers count: 20.6%
Average: 29.1%
Dependent repos count: 32.0%
Dependent packages count: 48.9%
Last synced: 10 months ago
repo1.maven.org: software.amazon.glue:jsonschema-kafkaconnect-converter

The AWS Glue Schema Registry Kafka Connect Converter enables Java developers to easily integrate their Kafka Connect applications with AWS Glue Schema Registry

  • Versions: 25
  • Dependent Packages: 0
  • Dependent Repositories: 0
  • Docker Downloads: 24
Rankings
Forks count: 16.4%
Stargazers count: 20.9%
Average: 29.5%
Dependent repos count: 32.0%
Dependent packages count: 48.9%
Last synced: 10 months ago
repo1.maven.org: software.amazon.glue:schema-registry-examples

The AWS Glue Schema examples has sample code for using Schema Registry with different applications.

  • Versions: 27
  • Dependent Packages: 0
  • Dependent Repositories: 0
  • Docker Downloads: 24
Rankings
Forks count: 16.4%
Stargazers count: 20.9%
Average: 29.5%
Dependent repos count: 32.0%
Dependent packages count: 48.9%
Last synced: 10 months ago
repo1.maven.org: software.amazon.glue:schema-registry-integration-tests

The AWS Glue Schema Registry tests would do a sanity check for the schema registry library.

  • Versions: 25
  • Dependent Packages: 0
  • Dependent Repositories: 0
  • Docker Downloads: 24
Rankings
Forks count: 16.3%
Stargazers count: 21.1%
Average: 29.6%
Dependent repos count: 32.0%
Dependent packages count: 48.9%
Last synced: 10 months ago
repo1.maven.org: software.amazon.glue:schema-registry-parent

The AWS Glue Schema Registry Library for Java enables Java developers to easily integrate their data streaming systems with AWS Glue Schema Registry

  • Versions: 28
  • Dependent Packages: 0
  • Dependent Repositories: 0
  • Docker Downloads: 24
Rankings
Dependent repos count: 32.0%
Average: 40.4%
Dependent packages count: 48.9%
Last synced: 10 months ago
repo1.maven.org: software.amazon.glue:schema-registry-kafkaconnect-converter

The AWS Glue Schema Registry Kafka Connect Converter enables Java developers to easily integrate their Kafka Connect applications with AWS Glue Schema Registry

  • Versions: 28
  • Dependent Packages: 0
  • Dependent Repositories: 0
  • Docker Downloads: 9
Rankings
Dependent repos count: 32.0%
Average: 40.4%
Dependent packages count: 48.9%
Last synced: 10 months ago

Dependencies

avro-flink-serde/pom.xml maven
  • org.apache.flink:flink-streaming-java_2.11 1.12.2 provided
  • org.apache.commons:commons-compress 1.21
  • org.apache.flink:flink-avro 1.12.2
  • software.amazon.glue:schema-registry-serde 1.1.12
  • junit:junit 4.13.1 test
  • org.hamcrest:hamcrest 2.2 test
  • org.junit.jupiter:junit-jupiter-api 5.6.2 test
  • org.junit.jupiter:junit-jupiter-engine 5.6.2 test
  • org.junit.jupiter:junit-jupiter-params 5.6.2 test
  • org.junit.platform:junit-platform-commons 1.6.2 test
  • org.mockito:mockito-core 3.3.3 test
  • org.mockito:mockito-junit-jupiter 3.3.3 test
  • software.amazon.glue:schema-registry-serde 1.0.2 test
avro-kafkaconnect-converter/pom.xml maven
  • org.apache.kafka:connect-api
  • software.amazon.glue:schema-registry-serde 1.1.12
  • junit:junit test
  • org.junit.jupiter:junit-jupiter-api test
  • org.junit.jupiter:junit-jupiter-engine test
  • org.junit.jupiter:junit-jupiter-params test
  • org.junit.platform:junit-platform-commons test
  • org.mockito:mockito-core test
  • org.mockito:mockito-junit-jupiter test
  • org.powermock:powermock-reflect 2.0.7 test
  • uk.co.jemos.podam:podam 7.2.5.RELEASE test
common/pom.xml maven
  • com.google.guava:guava
  • org.apache.avro:avro
  • org.apache.commons:commons-compress
  • org.apache.commons:commons-lang3
  • org.projectlombok:lombok
  • org.projectlombok:lombok-utils
  • org.slf4j:slf4j-api
  • software.amazon.awssdk:aws-json-protocol
  • software.amazon.awssdk:glue
  • software.amazon.awssdk:url-connection-client
  • software.amazon.glue:schema-registry-build-tools 1.1.12
  • org.junit.jupiter:junit-jupiter-api test
  • org.junit.jupiter:junit-jupiter-engine test
  • org.junit.jupiter:junit-jupiter-params test
  • org.junit.platform:junit-platform-commons test
  • org.mockito:mockito-core test
  • org.mockito:mockito-junit-jupiter test
examples/pom.xml maven
  • com.amazonaws:aws-java-sdk-kinesis
  • com.fasterxml.jackson.dataformat:jackson-dataformat-cbor
  • commons-cli:commons-cli
  • org.projectlombok:lombok
  • org.projectlombok:lombok-utils
  • software.amazon.glue:schema-registry-serde 1.1.12
integration-tests/pom.xml maven
  • cloud.localstack:localstack-utils
  • com.amazonaws:amazon-kinesis-producer
  • com.google.protobuf:protobuf-java
  • org.apache.avro:avro
  • org.apache.avro:avro-compiler
  • org.apache.avro:avro-ipc
  • org.apache.avro:avro-maven-plugin
  • org.apache.kafka:kafka-clients
  • org.apache.kafka:kafka-streams
  • org.apache.kafka:kafka_${kafka.scala.version}
  • org.apache.logging.log4j:log4j-1.2-api
  • org.apache.logging.log4j:log4j-api
  • org.apache.logging.log4j:log4j-core
  • org.apache.logging.log4j:log4j-slf4j-impl
  • org.awaitility:awaitility
  • org.hamcrest:hamcrest-all
  • org.junit.jupiter:junit-jupiter
  • org.junit.platform:junit-platform-surefire-provider
  • org.slf4j:slf4j-api
  • org.slf4j:slf4j-simple
  • software.amazon.awssdk:auth
  • software.amazon.awssdk:kinesis
  • software.amazon.glue:schema-registry-kafkastreams-serde 1.1.12
  • software.amazon.glue:schema-registry-serde 1.1.12
  • software.amazon.kinesis:amazon-kinesis-client
  • software.amazon.glue:schema-registry-serde 1.1.12 test
jsonschema-kafkaconnect-converter/pom.xml maven
  • com.github.erosb:everit-json-schema
  • org.apache.kafka:connect-api
  • org.apache.kafka:connect-json
  • software.amazon.glue:schema-registry-serde 1.1.12
  • org.junit.jupiter:junit-jupiter-api test
  • org.junit.jupiter:junit-jupiter-engine test
  • org.junit.jupiter:junit-jupiter-params test
  • org.mockito:mockito-core test
  • org.mockito:mockito-junit-jupiter test
kafkastreams-serde/pom.xml maven
  • software.amazon.glue:schema-registry-serde 1.1.12
  • org.junit.jupiter:junit-jupiter-api test
  • org.junit.jupiter:junit-jupiter-engine test
  • org.junit.jupiter:junit-jupiter-params test
  • org.junit.platform:junit-platform-commons test
  • org.mockito:mockito-core test
  • org.mockito:mockito-junit-jupiter test
pom.xml maven
  • com.fasterxml.jackson:jackson-bom 2.12.2 import
  • software.amazon.awssdk:bom 2.17.122 import
  • com.amazonaws:aws-java-sdk-kinesis 1.12.151
  • com.fasterxml.jackson.dataformat:jackson-dataformat-cbor 2.12.2
  • com.github.erosb:everit-json-schema 1.12.2
  • com.google.guava:guava 30.0-jre
  • com.google.protobuf:protobuf-java 3.19.2
  • com.kjetland:mbknor-jackson-jsonschema_2.12 1.0.39
  • commons-cli:commons-cli 1.2
  • io.github.classgraph:classgraph 4.8.120
  • org.apache.avro:avro 1.11.0
  • org.apache.avro:avro-compiler 1.11.0
  • org.apache.avro:avro-ipc 1.11.0
  • org.apache.avro:avro-maven-plugin 1.11.0
  • org.apache.commons:commons-compress 1.21
  • org.apache.commons:commons-lang3 3.8.1
  • org.apache.kafka:connect-api 2.8.1
  • org.apache.kafka:connect-json 2.8.1
  • org.apache.kafka:kafka-clients 2.8.1
  • org.apache.kafka:kafka-streams 2.8.1
  • org.apache.kafka:kafka_2.12 2.8.1
  • org.apache.logging.log4j:log4j-1.2-api 2.17.1
  • org.apache.logging.log4j:log4j-api 2.17.1
  • org.apache.logging.log4j:log4j-core 2.17.1
  • org.apache.logging.log4j:log4j-slf4j-impl 2.17.1
  • org.junit.platform:junit-platform-surefire-provider 1.3.2
  • org.projectlombok:lombok 1.18.20
  • org.projectlombok:lombok-utils 1.18.12
  • org.slf4j:slf4j-api 1.7.30
  • org.slf4j:slf4j-simple 1.7.30
  • software.amazon.awssdk:glue 2.17.122
  • cloud.localstack:localstack-utils 0.2.11 test
  • com.amazonaws:amazon-kinesis-producer LATEST test
  • org.awaitility:awaitility 3.0.0 test
  • org.hamcrest:hamcrest-all 1.1 test
  • org.junit.jupiter:junit-jupiter 5.6.2 test
  • org.junit.jupiter:junit-jupiter-api 5.6.2 test
  • org.junit.jupiter:junit-jupiter-engine 5.6.2 test
  • org.junit.jupiter:junit-jupiter-params 5.6.2 test
  • org.junit.platform:junit-platform-commons 1.6.2 test
  • org.mockito:mockito-core 3.3.3 test
  • org.mockito:mockito-junit-jupiter 3.3.3 test
  • software.amazon.kinesis:amazon-kinesis-client 2.2.9 test
protobuf-kafkaconnect-converter/pom.xml maven
  • com.google.protobuf:protobuf-java ${protobuf.version}
  • com.google.protobuf:protobuf-java-util ${protobuf.version}
  • io.apicurio:apicurio-registry-protobuf-schema-utilities ${apicurio-registry.version}
  • org.apache.kafka:connect-api
  • software.amazon.glue:schema-registry-serde 1.1.12
  • org.junit.jupiter:junit-jupiter-api test
  • org.junit.jupiter:junit-jupiter-engine test
  • org.junit.jupiter:junit-jupiter-params test
  • org.mockito:mockito-core test
  • org.mockito:mockito-junit-jupiter test
serializer-deserializer/pom.xml maven
  • com.amazonaws:aws-java-sdk-sts ${aws.sdk.v1.version}
  • com.github.erosb:everit-json-schema
  • com.google.api.grpc:proto-google-common-protos 2.7.4
  • com.google.jimfs:jimfs 1.1
  • com.google.protobuf:protobuf-java ${protobuf.version}
  • com.google.protobuf:protobuf-java
  • com.kjetland:mbknor-jackson-jsonschema_2.12
  • com.squareup.wire:wire-compiler ${square-wireschema.version}
  • com.squareup.wire:wire-schema ${square-wireschema.version}
  • io.github.classgraph:classgraph
  • org.apache.kafka:kafka-clients
  • software.amazon.awssdk:arns
  • software.amazon.awssdk:sts ${aws.sdk.v2.version}
  • software.amazon.glue:schema-registry-common 1.1.12
  • com.google.truth.extensions:truth-proto-extension 1.1.3 test
  • org.junit.jupiter:junit-jupiter-api test
  • org.junit.jupiter:junit-jupiter-engine test
  • org.junit.jupiter:junit-jupiter-params test
  • org.junit.platform:junit-platform-commons test
  • org.mockito:mockito-core test
  • org.mockito:mockito-junit-jupiter test
.github/workflows/MavenCI.yml actions
  • actions/cache v2 composite
  • actions/checkout v2 composite
  • actions/setup-java v1 composite
integration-tests/docker-compose.yml docker
  • public.ecr.aws/bitnami/kafka latest
  • public.ecr.aws/bitnami/zookeeper latest
serializer-deserializer-msk-iam/pom.xml maven
  • software.amazon.glue:schema-registry-serde 1.1.14
  • software.amazon.msk:aws-msk-iam-auth ${aws.msk.iam.auth}
build-tools/pom.xml maven