https://github.com/crowdstrike/kafka-replicator
Kafka replicator is a tool used to mirror and backup Kafka topics across regions
Science Score: 13.0%
This score indicates how likely this project is to be science-related based on various indicators:
-
○CITATION.cff file
-
✓codemeta.json file
Found codemeta.json file -
○.zenodo.json file
-
○DOI references
-
○Academic publication links
-
○Committers with academic emails
-
○Institutional organization owner
-
○JOSS paper metadata
-
○Scientific vocabulary similarity
Low similarity (5.6%) to scientific vocabulary
Keywords
Repository
Kafka replicator is a tool used to mirror and backup Kafka topics across regions
Basic Info
- Host: GitHub
- Owner: CrowdStrike
- License: apache-2.0
- Language: Go
- Default Branch: master
- Size: 284 KB
Statistics
- Stars: 16
- Watchers: 5
- Forks: 7
- Open Issues: 2
- Releases: 0
Topics
Metadata Files
README.md
kafka-replicator
Kafka replicator is a tool used to mirror and backup Kafka topics across regions.
Features
- At-least-once message delivery (details).
- Message ordering guarantee per topic partition.
- Compaction of historical data.
- Per partition metrics to track data volume and replication lag.
- Plugable storage format, with Apache Parquet the default implementation.
- Flexible configuration and replication topology.
Limitations
- AWS S3 storage class parameter is not exposed atm. (TODO)
- Destination topic partition count is not adjusted automatically to match source topic partition count. (TODO)
- Destination topics are not created automatically and topic metadata is not replicated.
- Partition offset gaps are not explicitly tracked which can cause false lost segment warning messages during ingress operation. Gaps can occur during topic compaction for slow egress consumers and for scenarios involving transactional messages.
Examples
The following samples show a simple setup where the source sensor.events Kafka topic is replicated from region us-east-1 to destination topic sensor.events.copy in region us-west-1 using Parquet storage format. Daily compaction for streaming segments is enabled to run in source region at 1:00 AM. Error handling is omitted for brevity.
Egress controller runs in AWS region us-east-1:
```golang func startEgress(awsConfig *aws.Config, kafkaBrokers []string) { awsSession, _ := session.NewSession(awsConfig)
parquet, _ := formats.NewS3Parquet(formats.S3ParquetConfig{
AWSConfig: awsConfig,
CreatedBy: "example_app",
})
controller, _ := replicator.NewEgress(replicator.EgressConfig{
LocalRegion: "us-east-1",
AWSConfig: awsConfig,
AWSSession: awsSession,
S3Bucket: "source-bucket",
SegmentFormat: parquet,
KafkaBrokers: kafkaBrokers,
KafkaTopics: map[string]*egress.TopicConfig{
"sensor.events": {
MaxSegmentMessages: 1000000,
MaxSegmentSize: 100 * 1024 * 1024, // 100 MiB
MaxSegmentAge: 5 * time.Minute,
},
},
})
controller.Start()
} ```
Ingress controller runs in AWS region us-west-1:
```golang func startIngress(awsConfig *aws.Config, kafkaBrokers []string) { awsSession, _ := session.NewSession(awsConfig)
parquet, _ := formats.NewS3Parquet(formats.S3ParquetConfig{
AWSConfig: awsConfig,
})
source := ingress.Source{
Region: "us-east-1",
Topic: "sensor.events",
}
controller, _ := replicator.NewIngress(replicator.IngressConfig{
LocalRegion: "us-west-1",
Sources: map[ingress.Source]*ingress.SourceConfig{
source: {
DestinationTopic: "sensor.events.copy",
LostSegmentTimeout: time.Hour,
LateSegmentRetries: 5,
FirstSegmentDelay: 5 * time.Minute,
},
},
AWSSession: awsSession,
AWSConfig: awsConfig,
S3Bucket: "destination-bucket",
SegmentFormat: parquet,
SQSQueueName: "destination-bucket-events-queue",
KafkaBrokers: kafkaBrokers,
KafkaSegmentEventsTopic: "replicator.segmentevents",
KafkaSegmentEventsRetention: time.Hour,
KafkaCheckpointTopic: "replicator.checkpoint",
})
controller.Start()
} ```
Daily compaction runs in the same AWS region us-east-1 as egress controller:
```golang func startDailyCompaction(awsConfig *aws.Config, kafkaBrokers []string) { awsSession, err := session.NewSession(awsConfig)
parquet, _ := formats.NewS3Parquet(formats.S3ParquetConfig{
AWSConfig: awsConfig,
CreatedBy: "example_app",
})
compactionConfig := compaction.Config{
MinLevel: core.LevelStreaming,
MaxLevel: core.LevelStreaming,
MinSegmentAge: time.Hour,
MinSegmentCount: 10,
MinSegmentSize: 1 * 1024 * 1024 * 1024, // 1 GiB
MaxSegmentCount: 1000,
MaxSegmentSize: 4 * 1024 * 1024 * 1024, // 4 GiB
BatchSize: 10000,
Delete: true,
}
controller, _ := replicator.NewCompaction(replicator.CompactionConfig{
LocalRegion: "us-east-1",
AWSSession: awsSession,
AWSConfig: awsConfig,
S3Bucket: "source-bucket",
SegmentFormat: parquet,
KafkaBrokers: kafkaBrokers,
Topics: map[string]*compaction.Config{
"sensor.events": &compactionConfig,
},
CronSchedule: "0 0 1 * * *",
Parallelism: 8,
})
controller.Start()
} ```
Getting started
Prerequisites
- Install the librdkafka dependencies
- Add Confluent Apache Kafka Golang client to your
GOPATHworkspace:bash go get -u github.com/confluentinc/confluent-kafka-go/kafka
Installation
Use
go getto add the project it to yourGOPATHworkspace:bash go get -u github.com/CrowdStrike/kafka-replicatorGolang import:
golang import "github.com/CrowdStrike/kafka-replicator"
AWS setup
- Create S3 buckets and SQS events queue in each region.
- Enable S3 to deliver notification events of type
s3:ObjectCreated:*ands3:ObjectRemoved:*to the SQS queue. - Configure S3 bucket sync process to match the desired replication topology. This part is not covered by kafka-replicator which was designed to avoid cross-region operations. For a starting point, check the AWS CLI s3 sync command.
- Configure S3 lifecycle rule to expire partial segment uploads resulted due to egress worker failures.
Kafka setup
The replicator requires two additional Kafka topics in destination region:
- Segment events topic: configured with
deletecleanup policy and appropriate retention time to discard old segment events. - Checkpoint topic: configured with
compactcleanup policy to retain only the last checkpoint for each source partition.
Architecture
The main components are the egress, ingress and compaction controllers, designed to run as a group of instances that leverage Kafka group membership for work placement.

Egress controller
Subscribes to source Kafka topics and manages the set of workers to match the assigned partitions. It continuously polls the Kafka consumer and dispatches incoming messages to corresponding worker.
Egress worker
Receives Kafka messages from parent controller and appends these to the current open segment by preserving the original offset order until the segment becomes full. At this point the segment is closed and Kafka offset for last consumed message is committed. The frequency at which new segments are created is dictated by configurable threshold parameters for size, age and messages count.
Ingress controller
Subscribes to destination Kafka topics and pauses message consumption right after partition assigned event is received which leverages Kafka group membership for worker lifecycle management. It listens for incoming segment events and dispatches these to corresponding worker.
Ingress worker
Receives segment events from parent controller and processes these in offset order. It produces message batches to destination Kafka topic and periodically checkpoints the last produced message offset to a log-compacted Kafka topic. The worker will reload the segment list using exponential backoff strategy when it detects that a segment is late/missing according to configured parameters.
Consistent segment store
Receives and stores the segment events to a Kafka topic for multiple purposess: to distribute the incoming events to all running instances, store events for group rebalance scenarios, and to address problems that can arise due to AWS S3 eventual data consistency model.
AWS S3 object key layout
Segment files are streamed by egress workers to a temporary key: prefix/id, where:
prefix: is a configurable value.id: random UUID value.
Once a segment is complete according to configured thresholds, it is moved to location: prefix/region/topic/partition/level/segment, where:
prefix: is a configurable value.region: is the unique local region identifier.topic: the source Kafka topic name.partition: the topic partition number.level: numeric value used for segment compaction. The lowest level (value 0) is reserved for egress controller and is where streaming data is written.segment: the segment name using the formatstart_offset-end_offset, representing the offset range of included messages formatted to fit uint64 values.
For example: data/us-west-1/sensor.events/107/0/00000000000000100000-00000000000000200000, is the S3 object key for a segment written in AWS region us-west-1 for source topic name sensor.events, partition number 107, at streaming level 0, containing messages in range [100000, 200000].
Segment compaction
Compaction is the process that merges multiple smaller segments into a single consolidated segment and runs periodically (e.g. daily) in the background to keep the total number of AWS S3 keys small and optimize segment compression ratio. It is designed as a local process, meaning that each region runs the segment compaction controller and only changes its own data. After multiple segment files were merged to form the compacted segment these can be safely deleted.
Compacted segments can be replicated cross-regions to serve as backups for disaster recovery scenarios and can use a cheaper AWS S3 storage class.
Compaction levels are used to group segments with similar characteristics: - Level 0: used for streaming segments written by the egress workers. This level does not contain compacted segments. - Level 1: used for automatic background compaction where multiple Level 0 segments are merged together into bigger segments (e.g. 4 GiB each) - Level 2 and above: used to further consolidate segments and reduce AWS S3 bucket key count. The operation could be triggered manually for certain topic partitions or executed as part of a cron job with lower frequency than background compaction (e.g. monthly/quarterly/etc.)
Contributors
Bogdan-Ciprian Rusu - Author/Maintainer
License
The project is licensed under the Apache License v2.0.
Owner
- Name: CrowdStrike
- Login: CrowdStrike
- Kind: organization
- Email: github@crowdstrike.com
- Location: United States of America
- Website: https://www.crowdstrike.com
- Repositories: 183
- Profile: https://github.com/CrowdStrike
GitHub Events
Total
- Watch event: 2
Last Year
- Watch event: 2
Committers
Last synced: 9 months ago
Top Committers
| Name | Commits | |
|---|---|---|
| brusu | b****u@c****m | 3 |
Committer Domains (Top 20 + Academic)
Issues and Pull Requests
Last synced: 9 months ago
All Time
- Total issues: 0
- Total pull requests: 4
- Average time to close issues: N/A
- Average time to close pull requests: 23 days
- Total issue authors: 0
- Total pull request authors: 2
- Average comments per issue: 0
- Average comments per pull request: 0.25
- Merged pull requests: 1
- Bot issues: 0
- Bot pull requests: 3
Past Year
- Issues: 0
- Pull requests: 0
- Average time to close issues: N/A
- Average time to close pull requests: N/A
- Issue authors: 0
- Pull request authors: 0
- Average comments per issue: 0
- Average comments per pull request: 0
- Merged pull requests: 0
- Bot issues: 0
- Bot pull requests: 0
Top Authors
Issue Authors
Pull Request Authors
- dependabot[bot] (3)
- bcrusu (1)
Top Labels
Issue Labels
Pull Request Labels
Packages
- Total packages: 1
- Total downloads: unknown
- Total dependent packages: 0
- Total dependent repositories: 0
- Total versions: 1
proxy.golang.org: github.com/CrowdStrike/kafka-replicator
- Homepage: https://github.com/CrowdStrike/kafka-replicator
- Documentation: https://pkg.go.dev/github.com/CrowdStrike/kafka-replicator#section-documentation
- License: Apache-2.0
-
Latest release: v0.0.0-20210721050518-7055b2f98007
published over 4 years ago
Rankings
Dependencies
- github.com/aws/aws-sdk-go v1.30.4
- github.com/confluentinc/confluent-kafka-go v1.5.2
- github.com/emirpasic/gods v1.12.0
- github.com/gofrs/uuid v3.2.0+incompatible
- github.com/gogo/protobuf v1.3.1
- github.com/golang/mock v1.4.3
- github.com/golang/protobuf v1.3.5
- github.com/onsi/ginkgo v1.12.0
- github.com/onsi/gomega v1.9.0
- github.com/pkg/errors v0.9.1
- github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0
- github.com/robfig/cron v1.2.0
- github.com/sirupsen/logrus v1.5.0
- github.com/xitongsys/parquet-go v1.5.1
- github.com/xitongsys/parquet-go-source v0.0.0-20200326031722-42b453e70c3b
- golang.org/x/time v0.0.0-20191024005414-555d28b269f0
- github.com/apache/thrift v0.0.0-20181112125854-24918abba929
- github.com/aws/aws-sdk-go v1.30.4
- github.com/confluentinc/confluent-kafka-go v1.3.0
- github.com/confluentinc/confluent-kafka-go v1.5.2
- github.com/davecgh/go-spew v1.1.0
- github.com/davecgh/go-spew v1.1.1
- github.com/emirpasic/gods v1.12.0
- github.com/fsnotify/fsnotify v1.4.7
- github.com/go-sql-driver/mysql v1.5.0
- github.com/gofrs/uuid v3.2.0+incompatible
- github.com/gogo/protobuf v1.3.1
- github.com/golang/mock v1.4.3
- github.com/golang/protobuf v1.2.0
- github.com/golang/protobuf v1.3.5
- github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db
- github.com/google/go-cmp v0.4.0
- github.com/hpcloud/tail v1.0.0
- github.com/jmespath/go-jmespath v0.3.0
- github.com/kisielk/errcheck v1.2.0
- github.com/kisielk/gotool v1.0.0
- github.com/klauspost/compress v1.9.7
- github.com/konsorten/go-windows-terminal-sequences v1.0.1
- github.com/onsi/ginkgo v1.6.0
- github.com/onsi/ginkgo v1.12.0
- github.com/onsi/gomega v1.7.1
- github.com/onsi/gomega v1.9.0
- github.com/pkg/errors v0.9.1
- github.com/pmezard/go-difflib v1.0.0
- github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0
- github.com/robfig/cron v1.2.0
- github.com/sirupsen/logrus v1.5.0
- github.com/stretchr/objx v0.1.0
- github.com/stretchr/testify v1.2.2
- github.com/stretchr/testify v1.5.1
- github.com/xitongsys/parquet-go v1.5.1
- github.com/xitongsys/parquet-go-source v0.0.0-20190524061010-2b72cbee77d5
- github.com/xitongsys/parquet-go-source v0.0.0-20200326031722-42b453e70c3b
- golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2
- golang.org/x/net v0.0.0-20180906233101-161cd47e91fd
- golang.org/x/net v0.0.0-20190311183353-d8887717615a
- golang.org/x/net v0.0.0-20200202094626-16171245cfb2
- golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f
- golang.org/x/sync v0.0.0-20190423024810-112230192c58
- golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e
- golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a
- golang.org/x/sys v0.0.0-20190422165155-953cdadca894
- golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e
- golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c
- golang.org/x/text v0.3.0
- golang.org/x/time v0.0.0-20191024005414-555d28b269f0
- golang.org/x/tools v0.0.0-20181030221726-6c7e314b6563
- golang.org/x/tools v0.0.0-20190425150028-36563e24a262
- golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7
- golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543
- gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405
- gopkg.in/fsnotify.v1 v1.4.7
- gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7
- gopkg.in/yaml.v2 v2.2.2
- gopkg.in/yaml.v2 v2.2.4
- rsc.io/quote/v3 v3.1.0
- rsc.io/sampler v1.3.0