eo-kafka
EO Kafka Producers and consumers for working with Apache Kafka message broker
Science Score: 26.0%
This score indicates how likely this project is to be science-related based on various indicators:
-
○CITATION.cff file
-
✓codemeta.json file
Found codemeta.json file -
✓.zenodo.json file
Found .zenodo.json file -
○DOI references
-
○Academic publication links
-
○Academic email domains
-
○Institutional organization owner
-
○JOSS paper metadata
-
○Scientific vocabulary similarity
Low similarity (8.5%) to scientific vocabulary
Keywords
Repository
EO Kafka Producers and consumers for working with Apache Kafka message broker
Basic Info
- Host: GitHub
- Owner: eo-cqrs
- License: mit
- Language: Java
- Default Branch: master
- Homepage: https://eo-cqrs.github.io/eo-kafka
- Size: 1.38 MB
Statistics
- Stars: 28
- Watchers: 3
- Forks: 3
- Open Issues: 31
- Releases: 11
Topics
Metadata Files
README.md
eo-kafka
This nice logo made by @l3r8yJ
Project architect: @h1alexbel
EO Kafka Producers and consumers for working with Apache Kafka message broker.
Read Kafka Producers and Consumers for Elegant Microservices,
the blog post about EO-Kafka, and EO-Kafka with Spring,
about how to connect EO-Kafka with Spring.
Motivation. We are not happy with Spring Kafka, because it is very procedural and not object-oriented. eo-kafka is suggesting to do almost exactly the same, but through objects.
Principles. These are the design principles behind eo-kafka.
How to use. All you need is this (get the latest version here):
Maven:
xml
<dependency>
<groupId>io.github.eo-cqrs</groupId>
<artifactId>eo-kafka</artifactId>
</dependency>
To use it with Spring Boot:
xml
<dependency>
<groupId>io.github.eo-cqrs</groupId>
<artifactId>eo-kafka</artifactId>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
</exclusion>
</exclusions>
</dependency>
With Gradle:
groovy
dependencies {
compile 'io.github.eo-cqrs:eo-kafka:<version>'
}
Messages
To create Kafka Message with Topic, Key and Value:
java
final Message<String, String> msg = new Tkv<>("test.topic", "test-k", "test-v");
Creation Kafka Message with Partition:
java
final Message<String, String> msg =
new WithPartition<>(
0,
new Tkv<>(
"test.topic",
"test-k",
"test-v"
)
);
Creation Kafka Message with Timestamp:
java
final Message<String, String> msg =
new Timestamped<>(
tmstmp,
new WithPartition<>(
partition,
new Tkv<>(
topic,
key,
value
)
)
);
Producers
To create Kafka Producer you can wrap original KafkaProducer:
java
final KafkaProducer origin = ...;
final Producer<String, String> producer = new KfProducer<>(origin);
Or construct it with KfFlexible:
java
final Producer<String, String> producer =
new KfProducer<>(
new KfFlexible<>(
new KfProducerParams(
new KfParams(
new BootstrapServers("localhost:9092"),
new KeySerializer("org.apache.kafka.common.serialization.StringSerializer"),
new ValueSerializer("org.apache.kafka.common.serialization.StringSerializer")
)
)
)
);
Or create it with XML file:
java
final Producer<String, String> producer =
new KfProducer<>(
new KfXmlFlexible<String, String>(
"producer.xml" // file with producer config
)
);
btw, your XML file should be in the resources look like:
xml
<producer>
<bootstrapServers>localhost:9092</bootstrapServers>
<keySerializer>org.apache.kafka.common.serialization.StringSerializer</keySerializer>
<valueSerializer>org.apache.kafka.common.serialization.StringSerializer</valueSerializer>
</producer>
Since version 0.4.6 you can create Producer with JSON file:
java
final Producer<String, String> producer =
new KfProducer<>(
new KfJsonFlexible<String, String>(
"producer.json" // file with producer config
)
);
Your JSON, located in resources directory, should look like this:
json
{
"bootstrapServers": "localhost:9092",
"keySerializer": "org.apache.kafka.common.serialization.StringSerializer",
"valueSerializer": "org.apache.kafka.common.serialization.StringSerializer"
}
Since version 0.5.6 you can create Producer with YAML file:
java
final Producer<String, String> producer =
new KfProducer<>(
new KfYamlProducerSettings<>(
"producer.yaml"
)
);
Your YAML, located in resources directory, should look like this:
yaml
bootstrap-servers: localhost:9092
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
To send a message:
java
try (final Producer<String, String> producer = ...) {
producer.send(
new WithPartition<>(
0,
new Tkv<>(
"xyz.topic",
"key",
"message"
)
)
);
} catch (Exception e) {
throw new IllegalStateException(e);
}
}
Also, you can create KfCallback, Kafka Producer with async Callback support:
java
final Producer<String, String> producer =
new KfCallback<>(
new KfFlexible<>(
new KfProducerParams(
new KfParams(
// producer params
)
)
),
new Callback() {
@Override
public void onCompletion(final RecordMetadata meta, final Exception ex) {
// logic
}
}
);
Consumers
To create Kafka Consumer you can wrap original KafkaConsumer:
java
final KafkaConsumer origin = ...;
final Consumer<String, String> producer = new KfConsumer<>(origin);
Using KfFlexible:
java
final Consumer<String, String> consumer =
new KfConsumer<>(
new KfFlexible<>(
new KfConsumerParams(
new KfParams(
new BootstrapServers("localhost:9092"),
new GroupId("1"),
new KeyDeserializer("org.apache.kafka.common.serialization.StringDeserializer"),
new ValueDeserializer("org.apache.kafka.common.serialization.StringDeserializer")
)
)
)
);
And XML file approach:
java
final Consumer<String, String> consumer =
new KfConsumer<>(
new KfXmlFlexible<String, String>("consumer.xml")
);
Again, XML file should be in the resources look like:
xml
<consumer>
<bootstrapServers>localhost:9092</bootstrapServers>
<groupId>1</groupId>
<keyDeserializer>org.apache.kafka.common.serialization.StringDeserializer</keyDeserializer>
<valueDeserializer>org.apache.kafka.common.serialization.StringDeserializer</valueDeserializer>
</consumer>
Since version 0.4.6 you can create Consumer with JSON file:
java
final Consumer<String, String> producer =
new KfConsumer<>(
new KfJsonFlexible<String, String>(
"consumer.json" // file with producer config
)
);
Your JSON, located in resources directory, should look like this:
json
{
"bootstrapServers": "localhost:9092",
"groupId": "1",
"keyDeserializer": "org.apache.kafka.common.serialization.StringDeserializer",
"valueDeserializer": "org.apache.kafka.common.serialization.StringDeserializer"
}
Since version 0.5.6 you can create Consumer with YAML file:
java
final Consumer<String, String> consumer =
new KfConsumer<>(
new KfYamlConsumerSettings<>(
"consumer.yaml"
)
);
Your YAML, located in resources directory, should look like this:
yaml
bootstrap-servers: localhost:9092
group-id: "1"
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
Consuming messages:
java
try (
final Consumer<String, String> consumer =
new KfConsumer<>(
new KfFlexible<>(
new KfConsumerParams(
new KfParams(
new BootstrapServers(this.severs),
new GroupId("1"),
new AutoOffsetReset("earliest"),
new KeyDeserializer("org.apache.kafka.common.serialization.StringDeserializer"),
new ValueDeserializer("org.apache.kafka.common.serialization.StringDeserializer")
)
)
)
)
) {
// you need to be subscribed on a topic to iterate over data in the topic
// consumer.subscribe(new ListOf<>("orders-saga-init")));
// or call #records(topic, duration) it will subscribe to the topic you provide
final ConsumerRecords<String, String> records = consumer.records("orders-saga-init", Duration.ofSeconds(5L));
}
}
Also, you can subscribe with ConsumerRebalanceListener:
java
consumer.subscribe(new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(final Collection<TopicPartition> partitions) {
}
@Override
public void onPartitionsAssigned(final Collection<TopicPartition> partitions) {
}
}, "<your topic>");
}
);
Finally, you can unsubscribe:
java
consumer.unsubscribe();
Fakes
In case of mocking eo-kafka, you can use existing Fake Objects from
io.github.eocqrs.kafka.fake package. They look like a normal ones,
but instead of talking to real Kafka broker, they are manipulating
in-memory XML document.
FkBroker
java
final FkBroker broker = new InXml(
new Synchronized(
new InFile(
"consumer-test", "<broker/>"
)
)
);
It will create in-memory XML document with following structure:
xml
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<broker>
<topics/>
<subs/>
</broker>
you can create a topic inside broker:
java
broker.with(new TopicDirs("fake.topic").value());
Under the hood XML will be modified to:
xml
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<broker>
<topics>
<topic>
<name>fake.topic</name>
<datasets/>
</topic>
</topics>
<subs/>
</broker>
FkProducer
java
final Producer<String, String> producer =
new FkProducer<>(
UUID.randomUUID(),
broker
);
FkConsumer
java
final Consumer<Object, String> consumer =
new FkConsumer(
UUID.randomUUID(),
broker
);
Example with Fakes
java
final String topic = "test";
final Consumer<Object, String> consumer =
new FkConsumer(UUID.randomUUID(),
this.broker
.with(new TopicDirs(topic).value())
);
final Producer<String, String> producer =
new FkProducer<>(UUID.randomUUID(), this.broker);
producer.send(
new WithPartition<>(
0,
new Tkv<>(
topic,
"test1",
"test-data-1"
)
)
);
producer.send(
new WithPartition<>(
0,
new Tkv<>(
topic,
"test2",
"test-data-2"
)
)
);
producer.send(
new WithPartition<>(
0,
new Tkv<>(
topic,
"test-data-3",
"test3"
)
)
);
final ConsumerRecords<Object, String> records =
consumer.records(topic, Duration.ofSeconds(1L));
final List<String> datasets = new ListOf<>();
records.forEach(rec -> datasets.add(rec.value()));
MatcherAssert.assertThat(
"First datasets in right format",
datasets,
Matchers.contains("test-data-1", "test-data-2", "test-data-3")
);
As well as production producers and consumers, fake ones also should be closed after things been done:
java
fake.close();
Under the hood XML document will looks like this:
xml
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<broker>
<topics>
<topic>
<name>test</name>
<datasets>
<dataset>
<partition>0</partition>
<key>test1</key>
<value>test-data-1</value>
<seen>true</seen>
</dataset>
<dataset>
<partition>0</partition>
<key>test2</key>
<value>test-data-2</value>
<seen>true</seen>
</dataset>
<dataset>
<partition>0</partition>
<key>test3</key>
<value>test-data-3</value>
<seen>true</seen>
</dataset>
</datasets>
</topic>
</topics>
<subs>
<sub>
<topic>test</topic>
<consumer>aa4a2008-764b-4e19-9368-8250df4bea38</consumer>
</sub>
</subs>
</broker>
By the version 0.3.5, eo-kafka support only String values in FkConsumer.
Configs
| Kafka Property | eo-kafka API | XML/JSON tag | YAML
|-----------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------| --------------------- |-------
| bootstrap.servers | BootstrapServers | bootstrapServers | bootstrap-servers
| key.serializer | KeySerializer | keySerializer | key-serializer
| value.serializer | ValueSerializer | valueSerializer | value-serializer
| key.deserializer | KeyDeserializer | keyDeserializer | key-deserializer
| value.deserializer | ValueDeserializer | valueDeserializer | value-Deserializer
| group.id | GroupId | groupId | group-id
| auto.offset.reset | AutoOffsetReset | autoOffsetReset | auto-offset-reset
| client.id | ClientId | clientId | client-id
| client.rack | ClientRack | clientRack | client-rack
| acks | Acks | acks | acks
| security.protocol | SecurityProtocol | securityProtocol | security-protocol
| sasl.jaas.config | SaslJaasConfig | saslJaasConfig | sasl-jaas-config
| sasl.mechanism | SaslMechanism | saslMechanism | sasl-mechanism
| batch.size | BatchSize | batchSize | batch-size
| buffer.memory | BufferMemory | bufferMemory | buffer-memory
| linger.ms | LingerMs | lingerMs | linger-ms
| retries | Retries | retries | retries
| retry.backoff.ms | RetryBackoffMs | retryBackoffMs | retry-backoff-ms
| compression.type | CompressionType | compressionType | compression-type
| partition.assignment.strategy | PartitionAssignmentStrategy | partitionAssignmentStrategy | partition-assignment-strategy
| max.poll.records | MaxPollRecords | maxPollRecords | max-poll-records
| max.poll.interval.ms | MaxPollIntervalMs | maxPollIntervalMs | max-poll-intervalMs
| heartbeat.interval.ms | HeartbeatIntervalMs | heartbeatIntervalMs | heartbeat-interval-ms
| enable.auto.commit | EnableAutoCommit | enableAutoCommit | enable-auto-commit
| session.timeout.ms | SessionTimeoutMs | sessionTimeoutMs | session-timeout-ms
| max.partition.fetch.bytes | MaxPartitionFetchBytes | maxPartitionFetchBytes | max-partition-fetch-bytes
| fetch.max.wait.ms | FetchMaxWaitMs | fetchMaxWaitMs | fetch-max-wait-ms
| fetch.min.bytes | FetchMinBytes | fetchMinBytes | fetch-min-bytes
| fetch.max.bytes | FetchMaxBytes | fetchMaxBytes | fetch-max-bytes
| send.buffer.bytes | SendBufferBytes | sendBufferBytes | send-buffer-bytes
| receive.buffer.bytes | ReceiveBufferBytes | receiveBufferBytes | receive-buffer-bytes
| max.block.ms | MaxBlockMs | maxBlockMs | max-block-ms
| max.request.size | MaxRqSize | maxRequestSize | max-request-size
| group.instance.id | GroupInstanceId | groupInstanceId | group-instance-id
| max.in.flight.requests.per.connection | MaxInFlightRq | maxInFlightRequestsPerConnection | max-in-flight-requests-per-connection
| delivery.timeout.ms | DeliveryTimeoutMs | deliveryTimeoutMs | delivery-timeout-ms
| enable.idempotence | EnableIdempotence | enableIdempotence | enable-idempotence
How to Contribute
Fork repository, make changes, send us a pull request.
We will review your changes and apply them to the master branch shortly,
provided they don't violate our quality standards. To avoid frustration,
before sending us your pull request please run full Maven build:
bash
mvn clean install
You will need Maven 3.8.7+ and Java 17+.
If you want to contribute to the next release version of eo-kafka, please check the project board.
Our rultor image for CI/CD.
Owner
- Name: EO-CQRS
- Login: eo-cqrs
- Kind: organization
- Email: github@eocqrs.com
- Location: Belarus
- Website: https://eo-cqrs.github.io/.github
- Repositories: 12
- Profile: https://github.com/eo-cqrs
True OOP, small components for clean microservices development
GitHub Events
Total
- Issues event: 3
- Watch event: 1
- Delete event: 15
- Issue comment event: 75
- Push event: 55
- Pull request event: 29
- Create event: 17
Last Year
- Issues event: 3
- Watch event: 1
- Delete event: 15
- Issue comment event: 75
- Push event: 55
- Pull request event: 29
- Create event: 17
Issues and Pull Requests
Last synced: 6 months ago
All Time
- Total issues: 0
- Total pull requests: 2
- Average time to close issues: N/A
- Average time to close pull requests: 6 months
- Total issue authors: 0
- Total pull request authors: 1
- Average comments per issue: 0
- Average comments per pull request: 0.5
- Merged pull requests: 0
- Bot issues: 0
- Bot pull requests: 2
Past Year
- Issues: 0
- Pull requests: 2
- Average time to close issues: N/A
- Average time to close pull requests: 6 months
- Issue authors: 0
- Pull request authors: 1
- Average comments per issue: 0
- Average comments per pull request: 0.5
- Merged pull requests: 0
- Bot issues: 0
- Bot pull requests: 2
Top Authors
Issue Authors
- zoeself (11)
- l3r8yJ (3)
- IlyaLisov (2)
- h1alexbel (2)
Pull Request Authors
- renovate[bot] (98)
- h1alexbel (5)
- IlyaLisov (1)
Top Labels
Issue Labels
Pull Request Labels
Dependencies
- actions/cache v3 composite
- actions/checkout v4 composite
- actions/setup-java v3 composite
- codecov/codecov-action v3 composite
- actions/cache v3 composite
- actions/checkout v4 composite
- actions/setup-java v3 composite
- docker-practice/actions-setup-docker master composite
- actions/checkout v4 composite
- g4s8/pdd-action master composite
- actions/checkout v4 composite
- g4s8/xcop-action master composite
- com.jcabi.incubator:xembly 0.28.1
- com.jcabi:jcabi-immutable
- com.jcabi:jcabi-xml
- io.github.eo-cqrs:eokson 0.3.2
- io.github.eo-cqrs:xfake 0.1.2
- org.apache.kafka:kafka-clients 3.6.0
- org.cactoos:cactoos 0.55.0
- org.projectlombok:lombok 1.18.28
- org.slf4j:slf4j-api 2.0.9
- org.slf4j:slf4j-simple 2.0.9
- org.yaml:snakeyaml 2.2
- org.assertj:assertj-core 3.24.2 test
- org.junit.jupiter:junit-jupiter-api 5.10.0 test
- org.junit.jupiter:junit-jupiter-params 5.10.0 test
- org.mockito:mockito-core 5.5.0 test
- org.mockito:mockito-junit-jupiter 5.5.0 test
- org.slf4j:slf4j-api 2.0.9
- @project.groupId@:@project.artifactId@ @project.version@ test
- io.github.eo-cqrs:eo-kafka 1.0-SNAPSHOT test
- org.assertj:assertj-core 3.24.2 test
- org.hamcrest:hamcrest 2.2 test
- org.junit.jupiter:junit-jupiter 5.10.0 test
- org.junit.jupiter:junit-jupiter-api 5.10.0 test
- org.mockito:mockito-core 5.5.0 test
- org.rnorth.duct-tape:duct-tape 1.0.8 test
- org.testcontainers:junit-jupiter 1.19.0 test
- org.testcontainers:kafka 1.19.0 test
- org.testcontainers:testcontainers 1.19.0 test