https://github.com/awslabs/spark-sql-kinesis-connector
Spark Structured Streaming Kinesis Data Streams connector supports both GetRecords and SubscribeToShard (Enhanced Fan-Out, EFO)
Science Score: 26.0%
This score indicates how likely this project is to be science-related based on various indicators:
-
○CITATION.cff file
-
✓codemeta.json file
Found codemeta.json file -
✓.zenodo.json file
Found .zenodo.json file -
○DOI references
-
○Academic publication links
-
○Academic email domains
-
○Institutional organization owner
-
○JOSS paper metadata
-
○Scientific vocabulary similarity
Low similarity (9.9%) to scientific vocabulary
Repository
Spark Structured Streaming Kinesis Data Streams connector supports both GetRecords and SubscribeToShard (Enhanced Fan-Out, EFO)
Basic Info
Statistics
- Stars: 35
- Watchers: 5
- Forks: 23
- Open Issues: 18
- Releases: 9
Metadata Files
README.md
Amazon Kinesis Data Streams Connector for Spark Structured Streaming
Implementation of Amazon Kinesis Data Streams connector in Spark Structured Streaming with support to both GetRecords and SubscribeToShard (Enhanced Fan-Out, EFO) consumer types.
Developer Setup
Clone SparkSqlKinesisConnector from the source repository on GitHub.
```sh git clone https://github.com/awslabs/spark-sql-kinesis-connector.git cd spark-sql-kinesis-connector
mvn clean install -DskipTests ```
This will create target/spark-streaming-sql-kinesis-connector_2.12-<kineisis-connector-version>-SNAPSHOT.jar file which contains the connector and its shaded dependencies. The jar file will also be installed to local maven repository.
After the jar file is installed in local Maven repository, configure your project pom.xml (use Maven as an example):
xml
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-sql-kinesis-connector_2.12</artifactId>
<version>${kinesis-connector-version}</version>
</dependency>
Current version is tested with Spark 3.2 and above.
Public jar file
For easier access, there is a public jar file available at S3. For example, for version 1.0.0, the file path to jar file is s3://awslabs-code-us-east-1/spark-sql-kinesis-connector/spark-streaming-sql-kinesis-connector_2.12-1.0.0.jar.
To run with spark-submit, include the jar file as below (version 1.0.0 as an example)
--jars s3://awslabs-code-us-east-1/spark-sql-kinesis-connector/spark-streaming-sql-kinesis-connector_2.12-1.0.0.jar
The jar file can also be downloaded at https://awslabs-code-us-east-1.s3.amazonaws.com/spark-sql-kinesis-connector/spark-streaming-sql-kinesis-connector_2.12-1.0.0.jar
Change the jar file name based on version, e.g. version 1.1.0 is spark-streaming-sql-kinesis-connector_2.12-1.1.0.jar
How to use it
Code Examples
Configure Kinesis Source with GetRecords consumer type
Consume data from Kinesis using GetRecords consumer type which is default consumer type.
scala
val kinesis = spark
.readStream
.format("aws-kinesis")
.option("kinesis.region", "us-east-2")
.option("kinesis.streamName", "teststream")
.option("kinesis.consumerType", "GetRecords")
.option("kinesis.endpointUrl", endpointUrl)
.option("kinesis.startingposition", "LATEST")
.load
Following policy definition should be added to the IAM role
json
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "KdsStreamSubscribeToShardPolicy",
"Effect": "Allow",
"Action": [
"kinesis:DescribeStreamSummary",
"kinesis:ListShards",
"kinesis:GetShardIterator",
"kinesis:GetRecords"
],
"Resource": [
"arn:aws:kinesis:*:<account-id>:stream/<kinesis-stream-name>",
"arn:aws:kinesis:*:<account-id>:stream/<Kinesis-stream-name>/*"
]
}
]
}
Configure Kinesis Source with SubscribeToShard consumer type
Consume data from Kinesis using SubscribeToShard(EFO) consumer type (Please be aware that EFO may incur extra AWS costs)
scala
val kinesis = spark
.readStream
.format("aws-kinesis")
.option("kinesis.region", "us-east-2")
.option("kinesis.streamName", "teststream")
.option("kinesis.consumerType", "SubscribeToShard")
.option("kinesis.endpointUrl", endpointUrl)
.option("kinesis.startingposition", "LATEST")
.option("kinesis.consumerName", "TestConsumer")
.load()
Following policy definition should be added to the IAM role
json
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "KdsStreamSubscribeToShardPolicy",
"Effect": "Allow",
"Action": [
"kinesis:SubscribeToShard",
"kinesis:DescribeStreamSummary",
"kinesis:ListShards",
"kinesis:DescribeStreamConsumer",
"kinesis:GetShardIterator",
"kinesis:GetRecords",
"kinesis:ListStreamConsumers",
"kinesis:RegisterStreamConsumer",
"kinesis:DeregisterStreamConsumer"
],
"Resource": [
"arn:aws:kinesis:*:<account-id>:stream/<kinesis-stream-name>",
"arn:aws:kinesis:*:<account-id>:stream/<Kinesis-stream-name>/*"
]
}
]
}
Check Schema
scala
kinesis.printSchema
root
|-- data: binary (nullable = true)
|-- streamName: string (nullable = true)
|-- partitionKey: string (nullable = true)
|-- sequenceNumber: string (nullable = true)
|-- approximateArrivalTimestamp: timestamp (nullable = true)
Start Query
scala
// Cast data into string and group by data column
val query = kinesis
.selectExpr("CAST(data AS STRING)").as[(String)]
.groupBy("data").count()
.writeStream
.format("console")
.outputMode("complete")
.start()
Gracefully Shutdown Query
Below is an example on how to ensure the query is gracefully shutdown before the streaming driver process is stopped.
```scala // add query stop to system shutdown hook sys.addShutdownHook { query.stop() }
// wait for the signal to stop query waitForQueryStop(query, writeToDir)
def waitForQueryStop(query: StreamingQuery, path: String): Unit = { val stopLockPath = new Path(path, "STOP_LOCK") val fileContext = FileContext.getFileContext(stopLockPath.toUri, new Configuration())
while (query.isActive) {
// Stop the query when "STOP_LOCK" file is found
if (fileContext.util().exists(stopLockPath)) {
query.stop()
fileContext.delete(stopLockPath, false)
}
Thread.sleep(500)
}
} ```
Note: Even gracefully shutdown is implemented, there is no guarantee of consumer deregistration success, especially in the event that an application is terminated brutally.
Using the Kinesis Sink
scala
// Cast data into string and group by data column
kinesis
.selectExpr("CAST(rand() AS STRING) as partitionKey","CAST(data AS STRING)").as[(String,String)]
.groupBy("data").count()
.writeStream
.format("aws-kinesis")
.outputMode("append")
.option("kinesis.region", "us-east-1")
.option("kinesis.streamName", "sparkSinkTest")
.option("kinesis.endpointUrl", "https://kinesis.us-east-1.amazonaws.com")
.option("checkpointLocation", "/path/to/checkpoint")
.start()
Kinesis Connector Metadata storage
By default, Kinesis Connector's metadata is stored under the same HDFS/S3 folder of checkpoint location .
It is also possible to save the metadata in DynamoDB by specifying the options as below:
scala
.option("kinesis.metadataCommitterType", "DYNAMODB")
.option("kinesis.dynamodb.tableName", "kinesisTestMetadata")
To use DynamoDB, following policy definition should be added to the IAM role running the job
JSON
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "KDSConnectorAccess",
"Effect": "Allow",
"Action": [
"dynamodb:CreateTable",
"dynamodb:DescribeTable",
"dynamodb:DeleteItem",
"dynamodb:PutItem",
"dynamodb:GetItem",
"dynamodb:Scan",
"dynamodb:Query"
],
"Resource": "arn:aws:dynamodb:<region>:<account-number>:table/<tablename>"
}
]
}
EFO Consumer Registration/Deregistration
The Spark application need to provide a kinesis.consumerName when it is using Kinesis Enhanced Fan Out. Each application must have a unique stream consumer name. Kinesis Connector registers the stream consumer automatically when the application starts. If a consumer with the same kinesis.consumerName already exists, the connector reuses it.
The Stream consumer is deregistered when the application is shutdown gracefully with query stop() called. There is no guarantee of deregistration success, especially in the event that an application is terminated brutally. The stream consumer will be reused when the application restarts. Note that The stream consumers remain registered may incur extra AWS costs.
Avoid race conditions
Speculative execution should to be disabled (by default, spark.speculation is turned off on EMR) to avoid Spark running two tasks for the same shard at the same time which will create race conditions.
For the same reason, If two jobs need to read from the same Kinesis stream at the same time, the Spark application should cache the dataframe. Here is an example of caching dataframe in scala. Although
batchDF.countandbatchDF.writewill start two jobs,batchDF.persist()ensures the application will only read from Kinesis stream once.batchDF.unpersist()releases the cache once the processing is done.
```scala val batchProcessor: (DataFrame, Long) => Unit = (batchDF: DataFrame, batchId: Long) => { val now = System.currentTimeMillis() val writeToDirNow = s"${writeToDir}/${now}" batchDF.persist() if (batchDF.count() > 0) { batchDF.write .format("csv") .mode(SaveMode.Append) .save(writeToDirNow) } batchDF.unpersist() }
val inputDf = reader.load()
.selectExpr("CAST(data AS STRING)")
val query = inputDf
.writeStream
.queryName("KinesisDataConsumerForeachBatch")
.foreachBatch {batchProcessor}
.option("checkpointLocation", checkpointDir)
.trigger(Trigger.ProcessingTime("15 seconds"))
.start()
```
The same applies if you want to write the output of a streaming query to multiple locations
scala
val query = inputDf
.writeStream
.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
batchDF.persist()
batchDF.write.format(...).save(...) // location 1
batchDF.write.format(...).save(...) // location 2
batchDF.unpersist()
}
Credential Provider
Kinesis Connector uses the default credentials provider chain to supply credentials that are used in your application.It looks for credentials in this order:
- Java System Properties - aws.accessKeyId and aws.secretAccessKey
- Environment Variables - AWSACCESSKEYID and AWSSECRETACCESSKEY
- Web Identity Token credentials from system properties or environment variables
- Credential profiles file at the default location (~/.aws/credentials) shared by all AWS SDKs and the AWS CLI
- Credentials delivered through the Amazon EC2 container service if AWSCONTAINERCREDENTIALSRELATIVEURI" environment variable is set and security manager has permission to access the variable,
- Instance profile credentials delivered through the Amazon EC2 metadata service
Refer to AWS SDK for Java 2.x developer Guide for details.
Cross Account Access using AssumeRole
There are scenarios where customers follow a multi-account approach resulting in Kinesis Data Streams and Spark consumer applications operating in different accounts.
The steps to access a Kinesis data stream in one account from a Spark structured streaming application in another account are: * Step 1 – Create AWS Identity and Access Management (IAM) role in Account A to access the Kinesis data stream with trust relationship with Account B.
Attach below policy to the role in Account A
```json
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "KdsStreamPolicy",
"Effect": "Allow",
"Action": [
"kinesis:SubscribeToShard",
"kinesis:DescribeStreamSummary",
"kinesis:ListShards",
"kinesis:DescribeStreamConsumer",
"kinesis:GetShardIterator",
"kinesis:GetRecords",
"kinesis:ListStreamConsumers",
"kinesis:RegisterStreamConsumer",
"kinesis:DeregisterStreamConsumer"
],
"Resource": [
"arn:aws:kinesis::
```
Trust policy of the role in Account A
json
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "AccountATrust",
"Effect": "Allow",
"Principal": {"AWS":["arn:aws:iam::<AccountB-id>:root"]},
"Action": "sts:AssumeRole"
}
]
}
- Step 2 – Create IAM role in Account B to assume the role in Account A. This role is used to run the Spark application.
add below permission
json
{
"Sid": "AssumeRoleInSourceAccount",
"Effect": "Allow",
"Action": "sts:AssumeRole",
"Resource": "<RoleArnInAccountA>"
}
- Step 3 – Configure Kinesis connector to assume the role in Account A to read Kinesis data stream in Account A as below
scala
.option("kinesis.stsRoleArn", "RoleArnInAccountA")
.option("kinesis.stsSessionName", "StsSessionName")
Cross Account Access using Access Key
It's also possible to access cross account Kinesis data stream using user's AWS credentials. The user is in Kinesis account (Account A) and needs to have access permission to Kinesis as above.
scala
.option("kinesis.awsAccessKeyId", "awsAccessKeyId")
.option("kinesis.awsSecretKey", "awsSecretKey")
Note: Using permanent credentials are not recommended due to security concerns.
Known Limitations
- Speculative execution is not supported.
- Trigger.AvailableNow is not supported.
- Continuous Processing is not supported.
Kinesis Source Configuration
| Name | Default Value | Description |
| ------------------------------------------ | ------------------------------------------------------------ | ------------------------------------------------------------ |
| kinesis.endpointUrl | required, no default value | Endpoint URL for Kinesis Stream |
| kinesis.region | inferred value from kinesis.endpointUrl | Region running the Kinesis connector |
| kinesis.streamName | required, no default value | Name of the stream |
| kinesis.consumerType | GetRecords | Consumer type. Possible values are "GetRecords", "SubscribeToShard" |
| kinesis.failOnDataLoss | false | Fail the streaming job if any active shard is missing or expired |
| kinesis.maxFetchRecordsPerShard | 100,000 | Maximum number of records to fetch per shard per microbatch |
| kinesis.maxFetchTimePerShardSec | - | Maximum time in seconds to fetch records per shard per microbatch. If kinesis.maxFetchTimePerShardSec is not explicitly defined, the decision to conclude the current task is based on the value of kinesis.maxFetchRecordsPerShard. However, if kinesis.maxFetchTimePerShardSec is defined, the current task is terminated when either kinesis.maxFetchRecordsPerShard or kinesis.maxFetchTimePerShardSec is reached first. maxFetchTimePerShardSec must be no less than 10 seconds to make sure the fetch can be progressing.
Note: If a shard is idle (no new data) for more than 10s, the task terminates even if neither maxFetchTimePerShard nor maxFetchRecordsPerShard reached. |
| kinesis.startingPosition | LATEST | Starting Position in Kinesis to fetch data from. Possible values are "LATEST", "TRIMHORIZON", "EARLIEST" (alias for TRIMHORIZON), or "AT_TIMESTAMP YYYY-MM-DDTHH:MM:SSZ" (e.g. 2023-08-30T19:00:05Z, 2023-08-30T19:00:05-08:00) |
| kinesis.describeShardInterval | 1s | Minimum Interval between two ListShards API calls to get latest shards. Possible values are time values such as 50s, 100ms. |
| kinesis.minBatchesToRetain | same as spark.sql.streaming.minBatchesToRetain | The minimum number of batches of kinesis metadata that must be retained and made recoverable. |
| kinesis.checkNewRecordThreads | 8 | Number of threads in Spark driver to check if there are new records in Kinesis stream. |
| kinesis.metadataCommitterType | HDFS | Where to save Kinesis connector metadata. Possible values are "HDFS", "DYNAMODB" |
| kinesis.metadataPath | Same as checkpointLocation | a path to HDFS or S3. Only valid when kinesis.metadataCommitterType is HDFS. |
| kinesis.metadataNumRetries | 5 | Maximum Number of retries for metadata requests |
| kinesis.metadataRetryIntervalsMs | 1000 (milliseconds) | Wait time before retrying metadata requests |
| kinesis.metadataMaxRetryIntervalMs | 10000 (milliseconds) | Max wait time between 2 retries of metadata requests |
| kinesis.clientNumRetries | 5 | Maximum Number of retries for Kinesis API requests |
| kinesis.clientRetryIntervalsMs | 1000 (milliseconds) | Wait time before retrying Kinesis requests |
| kinesis.clientMaxRetryIntervalMs | 10000 (milliseconds) | Max wait time between 2 retries of Kinesis requests |
| kinesis.consumerName | Required when kinesis.consumerType is "SubscribeToShard" | Kinesis stream Enhance Fan Out consumer name |
| kinesis.stsRoleArn | - | AWS STS Role ARN for Kinesis operations describe, read record, etc. |
| kinesis.stsSessionName | - | AWS STS Session name |
| kinesis.stsEndpointUrl | - | AWS STS Endpoint URL |
| kinesis.awsAccessKeyId | - | awsAccessKeyId for Kinesis operations describe, read record, etc. |
| kinesis.awsSecretKey | - | awsSecretKey for Kinesis operations describe, read record, etc. |
| kinesis.kinesisRegion | inferred value from kinesis.endpointUrl | Region the Kinesis stream belongs to |
| kinesis.credentialProviderClass | - | Custom connector credential provider implements org.apache.spark.sql.connector.kinesis.ConnectorAwsCredentialsProvider. If the implementation returns temporary credentials, it is responsible for refreshing the credentials before they expire. |
| kinesis.credentialProviderParam | - | Custom connector credential provider's input parameter |
| kinesis.dynamodb.tableName | Required when when kinesis.metadataCommitterType is "DYNAMODB" | Dynamodb tableName |
| kinesis.subscribeToShard.timeoutSec | 60 (seconds) | Timeout waiting for subscribeToShard finish |
| kinesis.subscribeToShard.maxRetries | 10 | Max retries of subscribeToShard request |
| kinesis.getRecords.numberOfRecordsPerFetch | 10,000 | Maximum Number of records to fetch per getRecords API call |
| kinesis.getRecords.fetchIntervalMs | 200 (milliseconds) | Minimum interval of two getRecords API calls |
Kinesis Sink Configuration
| Name | Default Value | Description | | ------------------------------------ | -------------------------------- | ------------------------------------------------------------ | | kinesis.endpointUrl | required, no default value | Endpoint URL for Kinesis Stream | | kinesis.region | inferred value from endpoint url | Region running the Kinesis connector | | kinesis.streamName | required, no default value | Name of the stream | | kinesis.sink.flushWaitTimeMs | 100 (milliseconds) | Wait time while flushing records to Kinesis on Task End | | kinesis.sink.recordMaxBufferedTimeMs | 1000 (milliseconds) | Specify the maximum buffered time of a record | | kinesis.sink.maxConnections | 1 | Specify the maximum connections to Kinesis | | kinesis.sink.aggregationEnabled | True | Specify if records should be aggregated before sending them to Kinesis | | kinesis.sink.recordTtl | 30000 (milliseconds) | Records not successfully written to Kinesis within this time are dropped |
Security
See CONTRIBUTING for more information.
Acknowledgement
This connector would not have been possible without reference implemetation of Kinesis Connector, Apache Flink AWS Connectors, spark-streaming-sql-s3-connector and Kinesis Client Library. Structure of some part of the code is influenced by the excellent work done by various Apache Spark Contributors.
Owner
- Name: Amazon Web Services - Labs
- Login: awslabs
- Kind: organization
- Location: Seattle, WA
- Website: http://amazon.com/aws/
- Repositories: 914
- Profile: https://github.com/awslabs
AWS Labs
GitHub Events
Total
- Create event: 7
- Release event: 2
- Issues event: 16
- Watch event: 10
- Delete event: 3
- Issue comment event: 18
- Push event: 24
- Pull request review comment event: 42
- Pull request review event: 50
- Pull request event: 42
- Fork event: 10
Last Year
- Create event: 7
- Release event: 2
- Issues event: 16
- Watch event: 10
- Delete event: 3
- Issue comment event: 18
- Push event: 24
- Pull request review comment event: 42
- Pull request review event: 50
- Pull request event: 42
- Fork event: 10
Issues and Pull Requests
Last synced: about 2 years ago
All Time
- Total issues: 8
- Total pull requests: 8
- Average time to close issues: 21 days
- Average time to close pull requests: 7 days
- Total issue authors: 8
- Total pull request authors: 3
- Average comments per issue: 2.0
- Average comments per pull request: 0.63
- Merged pull requests: 7
- Bot issues: 0
- Bot pull requests: 0
Past Year
- Issues: 8
- Pull requests: 8
- Average time to close issues: 21 days
- Average time to close pull requests: 7 days
- Issue authors: 8
- Pull request authors: 3
- Average comments per issue: 2.0
- Average comments per pull request: 0.63
- Merged pull requests: 7
- Bot issues: 0
- Bot pull requests: 0
Top Authors
Issue Authors
- penniman26 (4)
- MJFND (3)
- ttrankle (2)
- shenavaa (1)
- adarsh112 (1)
- CICDamen (1)
- project-unmarshal (1)
- satya-dd (1)
- lemdandk (1)
- RichardChester (1)
- tweelix (1)
- glaucotiburtino (1)
- benjaminwootton (1)
- greg-roberts-bbc (1)
- tagatac (1)
Pull Request Authors
- hwanghw (10)
- vrozov (6)
- tagatac (3)
- ttrankle (2)
- mtorjyan123 (2)
- penniman26 (2)
- pankajmisra (1)
- CICDamen (1)
- azhur (1)
- feliperazeek (1)
- dependabot[bot] (1)
- chloeyamzn (1)
- tweelix (1)
- efaracci018 (1)
Top Labels
Issue Labels
Pull Request Labels
Dependencies
- software.amazon.awssdk:bom 2.19.33 import
- org.apache.spark:spark-sql_2.12 3.4.1 provided
- org.apache.spark:spark-tags_2.12 3.4.1 provided
- com.amazonaws:amazon-kinesis-producer 0.15.7
- com.amazonaws:aws-java-sdk 1.12.382
- com.fasterxml.jackson.core:jackson-annotations 2.14.3
- com.fasterxml.jackson.core:jackson-core 2.14.3
- com.fasterxml.jackson.core:jackson-databind 2.14.3
- com.fasterxml.jackson.dataformat:jackson-dataformat-cbor 2.14.3
- com.google.protobuf:protobuf-java 3.21.7
- software.amazon.awssdk:apache-client
- software.amazon.awssdk:dynamodb
- software.amazon.awssdk:kinesis
- software.amazon.awssdk:netty-nio-client
- software.amazon.awssdk:sts
- com.amazonaws:amazon-kinesis-aggregator 1.0.3 test
- org.apache.hadoop:hadoop-aws 3.2.4 test
- org.apache.logging.log4j:log4j-slf4j-impl 2.19.0 test
- org.apache.logging.log4j:log4j-slf4j2-impl 2.19.0 test
- org.apache.spark:spark-catalyst_2.12 3.4.1 test
- org.apache.spark:spark-core_2.12 3.4.1 test
- org.apache.spark:spark-sql_2.12 3.4.1 test
- org.apache.spark:spark-tags_2.12 3.4.1 test
- org.mockito:mockito-core 4.8.0 test
- org.scalacheck:scalacheck_2.12 1.16.0 test
- org.scalatest:scalatest_2.12 3.2.13 test
- org.slf4j:slf4j-reload4j 2.0.6 test