eo-kafka

EO Kafka Producers and consumers for working with Apache Kafka message broker

https://github.com/eo-cqrs/eo-kafka

Science Score: 26.0%

This score indicates how likely this project is to be science-related based on various indicators:

  • CITATION.cff file
  • codemeta.json file
    Found codemeta.json file
  • .zenodo.json file
    Found .zenodo.json file
  • DOI references
  • Academic publication links
  • Academic email domains
  • Institutional organization owner
  • JOSS paper metadata
  • Scientific vocabulary similarity
    Low similarity (8.5%) to scientific vocabulary

Keywords

elegant-objects kafka-consumer kafka-producer oop
Last synced: 6 months ago · JSON representation

Repository

EO Kafka Producers and consumers for working with Apache Kafka message broker

Basic Info
Statistics
  • Stars: 28
  • Watchers: 3
  • Forks: 3
  • Open Issues: 31
  • Releases: 11
Topics
elegant-objects kafka-consumer kafka-producer oop
Created about 3 years ago · Last pushed 7 months ago
Metadata Files
Readme License Citation

README.md

eo-kafka

logo

This nice logo made by @l3r8yJ

Managed By Self XDSD EO principles respected here DevOps By Rultor.com We recommend IntelliJ IDEA

mvn maven central javadoc codecov

Hits-of-Code Lines-of-Code PDD status License

Project architect: @h1alexbel

EO Kafka Producers and consumers for working with Apache Kafka message broker.

Read Kafka Producers and Consumers for Elegant Microservices, the blog post about EO-Kafka, and EO-Kafka with Spring, about how to connect EO-Kafka with Spring.

Motivation. We are not happy with Spring Kafka, because it is very procedural and not object-oriented. eo-kafka is suggesting to do almost exactly the same, but through objects.

Principles. These are the design principles behind eo-kafka.

How to use. All you need is this (get the latest version here):

Maven:

xml <dependency> <groupId>io.github.eo-cqrs</groupId> <artifactId>eo-kafka</artifactId> </dependency>

To use it with Spring Boot:

xml <dependency> <groupId>io.github.eo-cqrs</groupId> <artifactId>eo-kafka</artifactId> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> </exclusion> </exclusions> </dependency>

With Gradle:

groovy dependencies { compile 'io.github.eo-cqrs:eo-kafka:<version>' }

Messages

To create Kafka Message with Topic, Key and Value:

java final Message<String, String> msg = new Tkv<>("test.topic", "test-k", "test-v");

Creation Kafka Message with Partition:

java final Message<String, String> msg = new WithPartition<>( 0, new Tkv<>( "test.topic", "test-k", "test-v" ) );

Creation Kafka Message with Timestamp:

java final Message<String, String> msg = new Timestamped<>( tmstmp, new WithPartition<>( partition, new Tkv<>( topic, key, value ) ) );

Producers

To create Kafka Producer you can wrap original KafkaProducer:

java final KafkaProducer origin = ...; final Producer<String, String> producer = new KfProducer<>(origin);

Or construct it with KfFlexible:

java final Producer<String, String> producer = new KfProducer<>( new KfFlexible<>( new KfProducerParams( new KfParams( new BootstrapServers("localhost:9092"), new KeySerializer("org.apache.kafka.common.serialization.StringSerializer"), new ValueSerializer("org.apache.kafka.common.serialization.StringSerializer") ) ) ) );

Or create it with XML file:

java final Producer<String, String> producer = new KfProducer<>( new KfXmlFlexible<String, String>( "producer.xml" // file with producer config ) );

btw, your XML file should be in the resources look like:

xml <producer> <bootstrapServers>localhost:9092</bootstrapServers> <keySerializer>org.apache.kafka.common.serialization.StringSerializer</keySerializer> <valueSerializer>org.apache.kafka.common.serialization.StringSerializer</valueSerializer> </producer>

Since version 0.4.6 you can create Producer with JSON file:

java final Producer<String, String> producer = new KfProducer<>( new KfJsonFlexible<String, String>( "producer.json" // file with producer config ) );

Your JSON, located in resources directory, should look like this:

json { "bootstrapServers": "localhost:9092", "keySerializer": "org.apache.kafka.common.serialization.StringSerializer", "valueSerializer": "org.apache.kafka.common.serialization.StringSerializer" }

Since version 0.5.6 you can create Producer with YAML file:

java final Producer<String, String> producer = new KfProducer<>( new KfYamlProducerSettings<>( "producer.yaml" ) );

Your YAML, located in resources directory, should look like this:

yaml bootstrap-servers: localhost:9092 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer

To send a message:

java try (final Producer<String, String> producer = ...) { producer.send( new WithPartition<>( 0, new Tkv<>( "xyz.topic", "key", "message" ) ) ); } catch (Exception e) { throw new IllegalStateException(e); } }

Also, you can create KfCallback, Kafka Producer with async Callback support:

java final Producer<String, String> producer = new KfCallback<>( new KfFlexible<>( new KfProducerParams( new KfParams( // producer params ) ) ), new Callback() { @Override public void onCompletion(final RecordMetadata meta, final Exception ex) { // logic } } );

Consumers

To create Kafka Consumer you can wrap original KafkaConsumer:

java final KafkaConsumer origin = ...; final Consumer<String, String> producer = new KfConsumer<>(origin);

Using KfFlexible:

java final Consumer<String, String> consumer = new KfConsumer<>( new KfFlexible<>( new KfConsumerParams( new KfParams( new BootstrapServers("localhost:9092"), new GroupId("1"), new KeyDeserializer("org.apache.kafka.common.serialization.StringDeserializer"), new ValueDeserializer("org.apache.kafka.common.serialization.StringDeserializer") ) ) ) );

And XML file approach:

java final Consumer<String, String> consumer = new KfConsumer<>( new KfXmlFlexible<String, String>("consumer.xml") );

Again, XML file should be in the resources look like:

xml <consumer> <bootstrapServers>localhost:9092</bootstrapServers> <groupId>1</groupId> <keyDeserializer>org.apache.kafka.common.serialization.StringDeserializer</keyDeserializer> <valueDeserializer>org.apache.kafka.common.serialization.StringDeserializer</valueDeserializer> </consumer>

Since version 0.4.6 you can create Consumer with JSON file:

java final Consumer<String, String> producer = new KfConsumer<>( new KfJsonFlexible<String, String>( "consumer.json" // file with producer config ) );

Your JSON, located in resources directory, should look like this:

json { "bootstrapServers": "localhost:9092", "groupId": "1", "keyDeserializer": "org.apache.kafka.common.serialization.StringDeserializer", "valueDeserializer": "org.apache.kafka.common.serialization.StringDeserializer" }

Since version 0.5.6 you can create Consumer with YAML file:

java final Consumer<String, String> consumer = new KfConsumer<>( new KfYamlConsumerSettings<>( "consumer.yaml" ) );

Your YAML, located in resources directory, should look like this:

yaml bootstrap-servers: localhost:9092 group-id: "1" key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

Consuming messages:

java try ( final Consumer<String, String> consumer = new KfConsumer<>( new KfFlexible<>( new KfConsumerParams( new KfParams( new BootstrapServers(this.severs), new GroupId("1"), new AutoOffsetReset("earliest"), new KeyDeserializer("org.apache.kafka.common.serialization.StringDeserializer"), new ValueDeserializer("org.apache.kafka.common.serialization.StringDeserializer") ) ) ) ) ) { // you need to be subscribed on a topic to iterate over data in the topic // consumer.subscribe(new ListOf<>("orders-saga-init"))); // or call #records(topic, duration) it will subscribe to the topic you provide final ConsumerRecords<String, String> records = consumer.records("orders-saga-init", Duration.ofSeconds(5L)); } }

Also, you can subscribe with ConsumerRebalanceListener:

java consumer.subscribe(new ConsumerRebalanceListener() { @Override public void onPartitionsRevoked(final Collection<TopicPartition> partitions) { } @Override public void onPartitionsAssigned(final Collection<TopicPartition> partitions) { } }, "<your topic>"); } );

Finally, you can unsubscribe:

java consumer.unsubscribe();

Fakes

In case of mocking eo-kafka, you can use existing Fake Objects from io.github.eocqrs.kafka.fake package. They look like a normal ones, but instead of talking to real Kafka broker, they are manipulating in-memory XML document.

FkBroker

java final FkBroker broker = new InXml( new Synchronized( new InFile( "consumer-test", "<broker/>" ) ) );

It will create in-memory XML document with following structure:

xml <?xml version="1.0" encoding="UTF-8" standalone="no"?> <broker> <topics/> <subs/> </broker>

you can create a topic inside broker:

java broker.with(new TopicDirs("fake.topic").value());

Under the hood XML will be modified to:

xml <?xml version="1.0" encoding="UTF-8" standalone="no"?> <broker> <topics> <topic> <name>fake.topic</name> <datasets/> </topic> </topics> <subs/> </broker>

FkProducer

java final Producer<String, String> producer = new FkProducer<>( UUID.randomUUID(), broker );

FkConsumer

java final Consumer<Object, String> consumer = new FkConsumer( UUID.randomUUID(), broker );

Example with Fakes

java final String topic = "test"; final Consumer<Object, String> consumer = new FkConsumer(UUID.randomUUID(), this.broker .with(new TopicDirs(topic).value()) ); final Producer<String, String> producer = new FkProducer<>(UUID.randomUUID(), this.broker); producer.send( new WithPartition<>( 0, new Tkv<>( topic, "test1", "test-data-1" ) ) ); producer.send( new WithPartition<>( 0, new Tkv<>( topic, "test2", "test-data-2" ) ) ); producer.send( new WithPartition<>( 0, new Tkv<>( topic, "test-data-3", "test3" ) ) ); final ConsumerRecords<Object, String> records = consumer.records(topic, Duration.ofSeconds(1L)); final List<String> datasets = new ListOf<>(); records.forEach(rec -> datasets.add(rec.value())); MatcherAssert.assertThat( "First datasets in right format", datasets, Matchers.contains("test-data-1", "test-data-2", "test-data-3") );

As well as production producers and consumers, fake ones also should be closed after things been done:

java fake.close();

Under the hood XML document will looks like this:

xml <?xml version="1.0" encoding="UTF-8" standalone="no"?> <broker> <topics> <topic> <name>test</name> <datasets> <dataset> <partition>0</partition> <key>test1</key> <value>test-data-1</value> <seen>true</seen> </dataset> <dataset> <partition>0</partition> <key>test2</key> <value>test-data-2</value> <seen>true</seen> </dataset> <dataset> <partition>0</partition> <key>test3</key> <value>test-data-3</value> <seen>true</seen> </dataset> </datasets> </topic> </topics> <subs> <sub> <topic>test</topic> <consumer>aa4a2008-764b-4e19-9368-8250df4bea38</consumer> </sub> </subs> </broker>

By the version 0.3.5, eo-kafka support only String values in FkConsumer.

Configs

| Kafka Property | eo-kafka API | XML/JSON tag | YAML |-----------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------| --------------------- |------- | bootstrap.servers | BootstrapServers | bootstrapServers | bootstrap-servers | key.serializer | KeySerializer | keySerializer | key-serializer | value.serializer | ValueSerializer | valueSerializer | value-serializer | key.deserializer | KeyDeserializer | keyDeserializer | key-deserializer | value.deserializer | ValueDeserializer | valueDeserializer | value-Deserializer | group.id | GroupId | groupId | group-id | auto.offset.reset | AutoOffsetReset | autoOffsetReset | auto-offset-reset | client.id | ClientId | clientId | client-id | client.rack | ClientRack | clientRack | client-rack | acks | Acks | acks | acks | security.protocol | SecurityProtocol | securityProtocol | security-protocol | sasl.jaas.config | SaslJaasConfig | saslJaasConfig | sasl-jaas-config | sasl.mechanism | SaslMechanism | saslMechanism | sasl-mechanism | batch.size | BatchSize | batchSize | batch-size | buffer.memory | BufferMemory | bufferMemory | buffer-memory | linger.ms | LingerMs | lingerMs | linger-ms | retries | Retries | retries | retries | retry.backoff.ms | RetryBackoffMs | retryBackoffMs | retry-backoff-ms | compression.type | CompressionType | compressionType | compression-type | partition.assignment.strategy | PartitionAssignmentStrategy | partitionAssignmentStrategy | partition-assignment-strategy | max.poll.records | MaxPollRecords | maxPollRecords | max-poll-records | max.poll.interval.ms | MaxPollIntervalMs | maxPollIntervalMs | max-poll-intervalMs | heartbeat.interval.ms | HeartbeatIntervalMs | heartbeatIntervalMs | heartbeat-interval-ms | enable.auto.commit | EnableAutoCommit | enableAutoCommit | enable-auto-commit | session.timeout.ms | SessionTimeoutMs | sessionTimeoutMs | session-timeout-ms | max.partition.fetch.bytes | MaxPartitionFetchBytes | maxPartitionFetchBytes | max-partition-fetch-bytes | fetch.max.wait.ms | FetchMaxWaitMs | fetchMaxWaitMs | fetch-max-wait-ms | fetch.min.bytes | FetchMinBytes | fetchMinBytes | fetch-min-bytes | fetch.max.bytes | FetchMaxBytes | fetchMaxBytes | fetch-max-bytes | send.buffer.bytes | SendBufferBytes | sendBufferBytes | send-buffer-bytes | receive.buffer.bytes | ReceiveBufferBytes | receiveBufferBytes | receive-buffer-bytes | max.block.ms | MaxBlockMs | maxBlockMs | max-block-ms | max.request.size | MaxRqSize | maxRequestSize | max-request-size | group.instance.id | GroupInstanceId | groupInstanceId | group-instance-id | max.in.flight.requests.per.connection | MaxInFlightRq | maxInFlightRequestsPerConnection | max-in-flight-requests-per-connection | delivery.timeout.ms | DeliveryTimeoutMs | deliveryTimeoutMs | delivery-timeout-ms | enable.idempotence | EnableIdempotence | enableIdempotence | enable-idempotence

How to Contribute

Fork repository, make changes, send us a pull request. We will review your changes and apply them to the master branch shortly, provided they don't violate our quality standards. To avoid frustration, before sending us your pull request please run full Maven build:

bash mvn clean install

You will need Maven 3.8.7+ and Java 17+.

If you want to contribute to the next release version of eo-kafka, please check the project board.

Our rultor image for CI/CD.

Owner

  • Name: EO-CQRS
  • Login: eo-cqrs
  • Kind: organization
  • Email: github@eocqrs.com
  • Location: Belarus

True OOP, small components for clean microservices development

GitHub Events

Total
  • Issues event: 3
  • Watch event: 1
  • Delete event: 15
  • Issue comment event: 75
  • Push event: 55
  • Pull request event: 29
  • Create event: 17
Last Year
  • Issues event: 3
  • Watch event: 1
  • Delete event: 15
  • Issue comment event: 75
  • Push event: 55
  • Pull request event: 29
  • Create event: 17

Issues and Pull Requests

Last synced: 6 months ago

All Time
  • Total issues: 0
  • Total pull requests: 2
  • Average time to close issues: N/A
  • Average time to close pull requests: 6 months
  • Total issue authors: 0
  • Total pull request authors: 1
  • Average comments per issue: 0
  • Average comments per pull request: 0.5
  • Merged pull requests: 0
  • Bot issues: 0
  • Bot pull requests: 2
Past Year
  • Issues: 0
  • Pull requests: 2
  • Average time to close issues: N/A
  • Average time to close pull requests: 6 months
  • Issue authors: 0
  • Pull request authors: 1
  • Average comments per issue: 0
  • Average comments per pull request: 0.5
  • Merged pull requests: 0
  • Bot issues: 0
  • Bot pull requests: 2
Top Authors
Issue Authors
  • zoeself (11)
  • l3r8yJ (3)
  • IlyaLisov (2)
  • h1alexbel (2)
Pull Request Authors
  • renovate[bot] (98)
  • h1alexbel (5)
  • IlyaLisov (1)
Top Labels
Issue Labels
duplicate (11) puzzle (11) 45 min (6) 60 min (4) @l3r8yJ (2) build failed (1) bug (1) @h1alexbel (1) 90 min (1)
Pull Request Labels
@h1alexbel (48) @l3r8yJ (15) enhancement (1)

Dependencies

.github/workflows/codecov.yml actions
  • actions/cache v3 composite
  • actions/checkout v4 composite
  • actions/setup-java v3 composite
  • codecov/codecov-action v3 composite
.github/workflows/maven.yml actions
  • actions/cache v3 composite
  • actions/checkout v4 composite
  • actions/setup-java v3 composite
  • docker-practice/actions-setup-docker master composite
.github/workflows/pdd.yml actions
  • actions/checkout v4 composite
  • g4s8/pdd-action master composite
.github/workflows/xcop.yml actions
  • actions/checkout v4 composite
  • g4s8/xcop-action master composite
pom.xml maven
  • com.jcabi.incubator:xembly 0.28.1
  • com.jcabi:jcabi-immutable
  • com.jcabi:jcabi-xml
  • io.github.eo-cqrs:eokson 0.3.2
  • io.github.eo-cqrs:xfake 0.1.2
  • org.apache.kafka:kafka-clients 3.6.0
  • org.cactoos:cactoos 0.55.0
  • org.projectlombok:lombok 1.18.28
  • org.slf4j:slf4j-api 2.0.9
  • org.slf4j:slf4j-simple 2.0.9
  • org.yaml:snakeyaml 2.2
  • org.assertj:assertj-core 3.24.2 test
  • org.junit.jupiter:junit-jupiter-api 5.10.0 test
  • org.junit.jupiter:junit-jupiter-params 5.10.0 test
  • org.mockito:mockito-core 5.5.0 test
  • org.mockito:mockito-junit-jupiter 5.5.0 test
src/it/producer-consumer-api/pom.xml maven
  • org.slf4j:slf4j-api 2.0.9
  • @project.groupId@:@project.artifactId@ @project.version@ test
  • io.github.eo-cqrs:eo-kafka 1.0-SNAPSHOT test
  • org.assertj:assertj-core 3.24.2 test
  • org.hamcrest:hamcrest 2.2 test
  • org.junit.jupiter:junit-jupiter 5.10.0 test
  • org.junit.jupiter:junit-jupiter-api 5.10.0 test
  • org.mockito:mockito-core 5.5.0 test
  • org.rnorth.duct-tape:duct-tape 1.0.8 test
  • org.testcontainers:junit-jupiter 1.19.0 test
  • org.testcontainers:kafka 1.19.0 test
  • org.testcontainers:testcontainers 1.19.0 test