https://github.com/awslabs/aws-fluent-plugin-kinesis

Amazon Kinesis output plugin for Fluentd

https://github.com/awslabs/aws-fluent-plugin-kinesis

Science Score: 23.0%

This score indicates how likely this project is to be science-related based on various indicators:

  • CITATION.cff file
  • codemeta.json file
    Found codemeta.json file
  • .zenodo.json file
  • DOI references
  • Academic publication links
  • Committers with academic emails
    1 of 35 committers (2.9%) from academic institutions
  • Institutional organization owner
  • JOSS paper metadata
  • Scientific vocabulary similarity
    Low similarity (8.9%) to scientific vocabulary

Keywords

amazon-kinesis fluent-plugin fluentd kinesis-firehose kinesis-producer kinesis-streams ruby
Last synced: 5 months ago · JSON representation

Repository

Amazon Kinesis output plugin for Fluentd

Basic Info
  • Host: GitHub
  • Owner: awslabs
  • License: apache-2.0
  • Language: Ruby
  • Default Branch: master
  • Homepage:
  • Size: 468 KB
Statistics
  • Stars: 291
  • Watchers: 95
  • Forks: 97
  • Open Issues: 5
  • Releases: 6
Topics
amazon-kinesis fluent-plugin fluentd kinesis-firehose kinesis-producer kinesis-streams ruby
Created almost 12 years ago · Last pushed 7 months ago
Metadata Files
Readme Changelog Contributing License Code of conduct

README.md

Fluent plugin for Amazon Kinesis

Build Status Gem Version Gem Downloads

A fluentd output plugin to send events to Amazon Kinesis Data Streams and Amazon Data Firehose. The plugin also supports KPL Aggregated Record Format.

This gem includes following three output plugins:

  • kinesis_streams
  • kinesis_firehose
  • kinesis_streams_aggregated

The plugin is also described in official Fluentd document.

Note: This README is for the latest v3. Plugin v3 is almost compatible with v2. If you use v1, see v1 README.

Installation

Simply use RubyGems:

$ gem install fluent-plugin-kinesis --no-document

If you would like to build by yourself and install, you can build and install as follows:

$ git clone https://github.com/awslabs/aws-fluent-plugin-kinesis.git
$ cd aws-fluent-plugin-kinesis
$ bundle install
$ bundle exec rake build
$ bundle exec rake install

If using td-agent v4 or lower, use td-agent-gem:

$ td-agent-gem install fluent-plugin-kinesis

If using fluent-package v5 or higher, use fluent-gem:

$ fluent-gem install fluent-plugin-kinesis

Requirements

| fluent-plugin-kinesis | fluentd | ruby | |:---------------------:|:-----------:|:--------:| | >= 3.6.0 | >= 0.14.22 | >= 2.7.1 | | >= 3.5.0 && < 3.6.0 | >= 0.14.22 | >= 2.4.2 | | >= 3.2.0 && < 3.5.0 | >= 0.14.22 | >= 2.3 | | >= 3.0.0 && < 3.2.0 | >= 0.14.10 | >= 2.1 | | >= 2.0.0 && < 3.0.0 | >= 0.12.35 | >= 2.1 | | < 2.0.0 | >= 0.10.58 | >= 2.0 |

Getting Started

When you run this plugin on Amazon EC2 instances or container services, use instance profiles to assume role. If you want to use specific credentials, see Credentials.

kinesis_streams

In your Fluentd configuration, use @type kinesis_streams. The configuration would look like this:

<match *>
  @type kinesis_streams
  region us-east-1
  stream_name YOUR_STREAM
  partition_key key  # Otherwise, use random partition key
</match>

For more details, see Configuration: kinesis_streams.

kinesis_firehose

In your Fluentd configuration, use @type kinesis_firehose. The configuration would look like this:

<match *>
  @type kinesis_firehose
  region us-east-1
  delivery_stream_name YOUR_STREAM
</match>

For more details, see Configuration: kinesis_firehose.

kinesisstreamsaggregated

In your Fluentd configuration, use @type kinesis_streams_aggregated. The configuration would look like this:

<match *>
  @type kinesis_streams_aggregated
  region us-east-1
  stream_name YOUR_STREAM
  # Unlike kinesis_streams, there is no way to use dynamic partition key.
  # fixed_partition_key or random.
</match>

For more details, see Configuration: kinesisstreamsaggregated.

Configuration

Configuration: Plugin

Configuration: kinesis_streams

The following parameters are kinesis_streams specific configurations.

stream_name

Name of the stream to put data.

As of Fluentd v1, you can use placeholders for this stream_name parameter. Note that chunk keys are required in your buffer section attributes for placeholders to work.

The following configuration shows kinesis_streams output plugin that applies extractplaceholders on `streamname`:

# chunk_key: tag
# ${tag} will be replaced with actual tag string
<match *>
  @type kinesis_streams
  stream_name ${tag}-stream

  <buffer tag>
    # ...
  </buffer>
</match>

The value of timekey in buffer chunk keys can be extracted using strptime placeholders like this:

# chunk_key: tag and time
<match *>
  @type kinesis_streams
  stream_name ${tag}-stream-%Y%m%d

  <buffer tag, time>
    # ...
  </buffer>
</match>

You can also use custom placeholder like this:

# chunk_key: $.kubernetes.annotations.kinesis_stream
<match *>
  @type kinesis_streams
  stream_name "${$.kubernetes.annotations.kinesis_stream}"

  <buffer $.kubernetes.annotations.kinesis_stream>
    # ...
  </buffer>
</match>

For more details, see Placeholders in Config: Buffer Section.

partition_key

A key to extract partition key from JSON object. Default nil, which means partition key will be generated randomly.

Configuration: kinesis_firehose

The following parameters are kinesis_firehose specific configurations.

deliverystreamname

Name of the delivery stream to put data.

As of Fluentd v1, placerholders are supported. For more details, see streamname for kinesisstreams plugin and Placeholders in Config: Buffer Section.

appendnewline

Boolean. Default true. If it is enabled, the plugin adds new line character (\n) to each serialized record.
Before appending \n, plugin calls chomp and removes separator from the end of each record as chomp_record is true. Therefore, you don't need to enable chomp_record option when you use kinesis_firehose output with default configuration (appendnewline is true). If you want to set appendnewline false, you can choose chomp_record false (default) or true (compatible format with plugin v2).

Configuration: kinesisstreamsaggregated

The following parameters are kinesis_streams_aggregated specific configurations.

streamname (kinesisstreams_aggregated)

Name of the stream to put data.

As of Fluentd v1, placerholders are supported. For more details, see streamname for kinesisstreams plugin and Placeholders in Config: Buffer Section.

fixedpartitionkey

A value of fixed partition key. Default nil, which means partition key will be generated randomly. Note that all records will go to single shard if you specify this option.

Configuration: Credentials

To put records into Amazon Kinesis Data Streams or Amazon Data Firehose, you need to provide AWS security credentials. Without specifying credentials in config file, this plugin automatically fetches credentials just following AWS SDK for Ruby does (environment variable, shared profile, or instance profile).

This plugin uses almost same configurations as fluent-plugin-s3, but also supports several additional configurations like aws_ses_token for temporary credentials.

AWS key and secret authentication

These parameters are required when your agent is not running on EC2 instance with an IAM Role. When using an IAM role, make sure to configure instance_profile_credentials. Usage can be found below.

awskeyid (required)

AWS access key id.

awsseckey (required)

AWS secret key.

awssestoken

AWS session token. This parameter is optional, but can be provided if using MFA or temporary credentials when your agent is not running on EC2 instance with an IAM Role.

awsiamretries

The number of attempts to make (with exponential backoff) when loading instance profile credentials from the EC2 metadata service using an IAM role. Defaults to 5 retries.

<assumerolecredentials> section

Typically, you can use AssumeRole for cross-account access or federation.

<match *>
  @type kinesis_streams

  <assume_role_credentials>
    role_arn          ROLE_ARN
    role_session_name ROLE_SESSION_NAME
  </assume_role_credentials>
</match>

See also:

role_arn (required)

The Amazon Resource Name (ARN) of the role to assume.

rolesessionname (required)

An identifier for the assumed role session.

policy

An IAM policy in JSON format.

duration_seconds

The duration, in seconds, of the role session. The value can range from 900 seconds (15 minutes) to 3600 seconds (1 hour). By default, the value is set to 3600 seconds.

external_id

A unique identifier that is used by third parties when assuming roles in their customers' accounts.

stshttpproxy

Proxy url for proxying requests to amazon sts service api. This needs to be set up independently from global httpproxy parameter for the use case in which requests to kinesis api are going via kinesis vpc endpoint but requests to sts api have to go via http proxy. It should be added to *assumerole_credentials* section in the next format:

sts_http_proxy http://[username:password]@hostname:port

stsendpointurl

STS API endpoint url. This can be used to override the default global STS API endpoint of sts.amazonaws.com. Using regional endpoints may be preferred to reduce latency, and are required if utilizing a PrivateLink VPC Endpoint for STS API calls.

<webidentitycredentials> section

Similar to the assumerolecredentials, but for usage in EKS.

<match *>
  @type kinesis_streams

  <web_identity_credentials>
    role_arn          ROLE_ARN
    role_session_name ROLE_SESSION_NAME
    web_identity_token_file AWS_WEB_IDENTITY_TOKEN_FILE
  </web_identity_credentials>
</match>

See also:

role_arn (required)

The Amazon Resource Name (ARN) of the role to assume.

rolesessionname (required)

An identifier for the assumed role session.

webidentitytoken_file (required)

The absolute path to the file on disk containing the OIDC token.

policy

An IAM policy in JSON format.

duration_seconds

The duration, in seconds, of the role session. The value can range from 900 seconds (15 minutes) to 43200 seconds (12 hours). By default, the value is set to 3600 seconds (1 hour).

<instanceprofilecredentials> section

Retrieve temporary security credentials via HTTP request. This is useful on EC2 instance.

<match *>
  @type kinesis_streams

  <instance_profile_credentials>
    ip_address IP_ADDRESS
    port       PORT
  </instance_profile_credentials>
</match>

See also:

retries

Number of times to retry when retrieving credentials. Default is 5.

ip_address

Default is 169.254.169.254.

port

Default is 80.

httpopentimeout

Default is 5.

httpreadtimeout

Default is 5.

<shared_credentials> section

This loads AWS access credentials from local ini file. This is useful for local developing.

<match *>
  @type kinesis_streams

  <shared_credentials>
    path         PATH
    profile_name PROFILE_NAME
  </shared_credentials>
</match>

See also:

path

Path to the shared file. Defaults to "#{Dir.home}/.aws/credentials".

profile_name

Defaults to 'default' or [ENV]('AWS_PROFILE').

<process_credentials> section

This loads AWS access credentials from an external process.

<match *>
  @type kinesis_streams

  <process_credentials>
    process CMD
  </process_credentials>
</match>

See also:

process (required)

Command to be executed as an external process.

Configuration: Performance

<buffer> section

Use Fluentd buffering and flushing parameters to optimize throughput. When you use Fluent v1+ (td-agent v3+), write these configurations in buffer section like this:

<match *>
  @type kinesis_streams

  <buffer>
    flush_interval 1
    chunk_limit_size 1m
    flush_thread_interval 0.1
    flush_thread_burst_interval 0.01
    flush_thread_count 15
  </buffer>
</match>

For more details, see Config: Buffer Section. Note that each parameter should be adjusted to your system.

Configuration: Batch Request

retriesonbatch_request

Integer, default is 8. The plugin will put multiple records to Amazon Kinesis Data Streams in batches using PutRecords. A set of records in a batch may fail for reasons documented in the Kinesis Service API Reference for PutRecords. Failed records will be retried retriesonbatch_request times. If a record fails all retries an error log will be emitted.

resetbackoffif_success

Boolean, default true. If enabled, when after retrying, the next retrying checks the number of succeeded records on the former batch request and reset exponential backoff if there is any success. Because batch request could be composed by requests across shards, simple exponential backoff for the batch request wouldn't work some cases.

batchrequestmax_count

Integer, default 500. The number of max count of making batch request from record chunk. It can't exceed the default value because it's API limit.

Default:

  • kinesis_streams: 500
  • kinesis_firehose: 500
  • kinesis_streams_aggregated: 100,000

batchrequestmax_size

Integer. The number of max size of making batch request from record chunk. It can't exceed the default value because it's API limit.

Default:

  • kinesis_streams: 5 MB
  • kinesis_firehose: 4 MB
  • kinesis_streams_aggregated: 1 MB

dropfailedrecordsafterbatchrequestretries

Boolean, default true.

If dropfailedrecordsafterbatchrequestretries is enabled (default), the plugin will drop failed records when batch request fails after retrying max times configured as retriesonbatch_request. This dropping can be monitored from monitor_agent or fluent-plugin-prometheus as retry_count or num_errors metrics.

If dropfailedrecordsafterbatchrequestretries is disabled, the plugin will raise error and return chunk to Fluentd buffer when batch request fails after retrying max times. Fluentd will retry to send chunk records according to retry config in Buffer Section. Note that this retryng may create duplicate records since PutRecords API of Kinesis Data Streams and PutRecordBatch API of Kinesis Data Firehose may return a partially successful response.

monitornumofbatchrequest_retries

Boolean, default false. If enabled, the plugin will increment retry_count monitoring metrics after internal retrying to send batch request. This configuration enables you to monitor ProvisionedThroughputExceededException from monitor_agent or fluent-plugin-prometheus. Note that retry_count metrics will be counted by the plugin in addition to original Fluentd buffering mechanism if monitornumofbatchrequest_retries is enabled.

Configuration: Format

<format> section

This plugin uses Fluent::TextFormatter to serialize record to string. See formatter.rb for more details. By default, it uses json formatter same as specific like below:

<match *>
  @type kinesis_streams

  <format>
    @type json
  </format>
</match>

For other configurations of json formatter, see json formatter plugin.

<inject> section

This plugin uses Fluent::TimeFormatter and other injection configurations. See inject.rb for more details.

For example, the config below will add time field whose value is event time with nanosecond and tag field whose value is its tag.

<match *>
  @type kinesis_streams

  <inject>
    time_key time
    tag_key tag
  </inject>
</match>

By default, time_type string and time_format %Y-%m-%dT%H:%M:%S.%N%z are already set to be applicable to Elasticsearch sub-second format. Although, you can use any configuration.

In addition, there are some format related options:

data_key

If your record contains a field whose string should be sent to Amazon Kinesis directly (without formatter), use this parameter to specify the field. In that case, other fields than data_key are thrown away and never sent to Amazon Kinesis. Default nil, which means whole record will be formatted and sent.

compression

Specifying compression way for data of each record. Current accepted options are zlib and gzip. Otherwise, no compression will be preformed.

logtruncatemax_size

Integer, default 1024. When emitting the log entry, the message will be truncated by this size to avoid infinite loop when the log is also sent to Kinesis. The value 0 means no truncation.

chomp_record

Boolean. Default false. If it is enabled, the plugin calls chomp and removes separator from the end of each record. This option is for compatible format with plugin v2. See #142 for more details.
When you use kinesis_firehose output, appendnewline option is true as default. If appendnewline is enabled, the plugin calls chomp as chomp_record is true before appending \n to each record. Therefore, you don't need to enable chomp_record option when you use kinesis_firehose with default configuration. If you want to set appendnewline false, you can choose chomp_record false (default) or true (compatible format with plugin v2).

Configuration: AWS SDK

region

AWS region of your stream. It should be in form like us-east-1, us-west-2. Refer to Regions and Endpoints in AWS General Reference for supported regions. Default nil, which means try to find from environment variable AWS_REGION.

maxrecordsize

The upper limit of size of each record. Default is 1 MB which is the limitation of Kinesis.

http_proxy

HTTP proxy for API calling. Default nil.

endpoint

API endpoint URL, for testing. Default nil.

sslverifypeer

Boolean. Disable if you want to verify ssl connection, for testing. Default true.

debug

Boolean. Enable if you need to debug Amazon Data Firehose API call. Default is false.

Development

To launch fluentd process with this plugin for development, follow the steps below:

git clone https://github.com/awslabs/aws-fluent-plugin-kinesis.git
cd aws-fluent-plugin-kinesis
make # will install gems dependency
bundle exec fluentd -c /path/to/fluent.conf

To launch using specified version of Fluentd, use BUNDLE_GEMFILE environment variable:

BUNDLE_GEMFILE=$PWD/gemfiles/Gemfile.td-agent-3.3.0 bundle exec fluentd -c /path/to/fluent.conf

Contributing

Bug reports and pull requests are welcome on GitHub.

Related Resources

Owner

  • Name: Amazon Web Services - Labs
  • Login: awslabs
  • Kind: organization
  • Location: Seattle, WA

AWS Labs

GitHub Events

Total
  • Issues event: 2
  • Issue comment event: 1
  • Push event: 5
  • Pull request event: 2
  • Pull request review event: 2
  • Pull request review comment event: 1
Last Year
  • Issues event: 2
  • Issue comment event: 1
  • Push event: 5
  • Pull request event: 2
  • Pull request review event: 2
  • Pull request review comment event: 1

Committers

Last synced: 9 months ago

All Time
  • Total Commits: 176
  • Total Committers: 35
  • Avg Commits per committer: 5.029
  • Development Distribution Score (DDS): 0.602
Past Year
  • Commits: 0
  • Committers: 0
  • Avg Commits per committer: 0.0
  • Development Distribution Score (DDS): 0.0
Top Committers
Name Email Commits
simukappu s****8@g****m 70
Ryosuke IWANAGA r****p@g****m 40
Yuta Imai i****y@g****m 13
Genki Sugawara s****s@y****p 5
Nhan Dang n****g@a****m 5
Trevor Rowe t****e@g****m 4
Steve Bruggeman s****n@g****m 3
Michael Grosser m****l@g****t 3
Hiroshi Hatake c****c@g****m 3
Michael Lorant m****t@f****u 2
Genki Sugawara s****a@c****m 2
Jan-Erik Carlsen c****n@g****m 2
Adam Malcontenti-Wilson a****n@z****m 2
Henri Yandell h****l@a****m 1
Jerome Touffe-Blin j****n@a****m 1
JiHyunSong j****9@g****m 1
Chris Broglie c****e@z****m 1
Eugene Merlinsky e****m@p****m 1
kenju-wagatsuma k****a@c****m 1
akira-kuriyama a****a@s****p 1
Yuta Imai y****i@a****p 1
Jacky Chen j****n@a****m 1
Masahiro Nakagawa r****y@g****m 1
Olek Dudek d****4@g****m 1
Rahul Ashok r****k@g****m 1
Rajesh Koilpillai r****i@g****m 1
Ryuma Yoshida r****7@g****m 1
Sam Splunks 7****s 1
Trevor Howard t****y@g****m 1
Xavi x****z@g****m 1
and 5 more...

Issues and Pull Requests

Last synced: 7 months ago

All Time
  • Total issues: 65
  • Total pull requests: 42
  • Average time to close issues: 6 months
  • Average time to close pull requests: 3 months
  • Total issue authors: 62
  • Total pull request authors: 28
  • Average comments per issue: 3.45
  • Average comments per pull request: 2.55
  • Merged pull requests: 30
  • Bot issues: 0
  • Bot pull requests: 0
Past Year
  • Issues: 1
  • Pull requests: 1
  • Average time to close issues: N/A
  • Average time to close pull requests: N/A
  • Issue authors: 1
  • Pull request authors: 1
  • Average comments per issue: 0.0
  • Average comments per pull request: 0.0
  • Merged pull requests: 0
  • Bot issues: 0
  • Bot pull requests: 0
Top Authors
Issue Authors
  • Resisty (2)
  • da-head0 (2)
  • kvitali (2)
  • burkaygur (1)
  • darknighthunder (1)
  • doubret (1)
  • danmx (1)
  • kevinsookocheff-wf (1)
  • rolandjitsu (1)
  • llby (1)
  • benkap (1)
  • adammw (1)
  • mitrilmad (1)
  • tatsu-yam (1)
  • jamesongithub (1)
Pull Request Authors
  • simukappu (8)
  • riywo (4)
  • cosmo0920 (3)
  • mpospisi (2)
  • grosser (2)
  • tahoward (2)
  • ohkyutaeknology (1)
  • geofffranks (1)
  • adammw (1)
  • mseiwald (1)
  • dtmistry (1)
  • hyandell (1)
  • JiHyunSong (1)
  • janerikcarlsen (1)
  • samsplunks (1)
Top Labels
Issue Labels
feature requests (9) enhancement (5) waiting for feedback (4) bug (3) duplicate (2) question (2) need more info (2) wontfix (1) help wanted (1)
Pull Request Labels
enhancement (5) need more info (2) feature requests (1) working in progress (1) bug (1)

Packages

  • Total packages: 3
  • Total downloads:
    • rubygems 17,449,670 total
  • Total docker downloads: 38,254,506
  • Total dependent packages: 0
    (may contain duplicates)
  • Total dependent repositories: 710
    (may contain duplicates)
  • Total versions: 67
  • Total maintainers: 4
rubygems.org: fluent-plugin-kinesis

Fluentd output plugin that sends events to Amazon Kinesis.

  • Versions: 35
  • Dependent Packages: 0
  • Dependent Repositories: 710
  • Downloads: 17,446,269 Total
  • Docker Downloads: 38,254,506
Rankings
Docker downloads count: 0.8%
Downloads: 0.9%
Dependent repos count: 1.3%
Forks count: 2.5%
Stargazers count: 3.3%
Average: 4.1%
Dependent packages count: 15.6%
Maintainers (3)
Last synced: 6 months ago
proxy.golang.org: github.com/awslabs/aws-fluent-plugin-kinesis
  • Versions: 30
  • Dependent Packages: 0
  • Dependent Repositories: 0
Rankings
Dependent packages count: 7.0%
Average: 8.2%
Dependent repos count: 9.3%
Last synced: 6 months ago
rubygems.org: adp-fluent-plugin-kinesis

Fork of plugin created by AWS

  • Versions: 2
  • Dependent Packages: 0
  • Dependent Repositories: 0
  • Downloads: 3,401 Total
Rankings
Forks count: 2.3%
Stargazers count: 3.0%
Dependent packages count: 15.7%
Average: 31.3%
Dependent repos count: 46.8%
Downloads: 88.8%
Maintainers (1)
Last synced: 6 months ago

Dependencies

fluent-plugin-kinesis.gemspec rubygems
  • bundler >= 1.10 development
  • fakefs >= 0.8.1 development
  • mocha >= 1.1.0 development
  • net-empty_port >= 0.0.2 development
  • pry >= 0.10.1 development
  • pry-byebug >= 3.3.0 development
  • pry-stack_explorer >= 0.4.9.2 development
  • rake >= 10.0 development
  • test-unit >= 3.0.8 development
  • test-unit-rr >= 1.0.3 development
  • webmock >= 1.24.2 development
  • fluentd >= 0.14.22, < 2
  • google-protobuf ~> 3
.github/workflows/test.yml actions
  • actions/checkout v2 composite
  • ruby/setup-ruby v1 composite
Gemfile rubygems