This guide describes the AWS Kinesis implementation of the Spring Cloud Stream Binder. It contains information about its design, usage and configuration options, as well as information on how the Stream Cloud Stream concepts map into AWS Kinesis specific constructs.
Usage
For using the AWS Kinesis Binder, you just need to add it to your Spring Cloud Stream application, using the following Maven coordinates:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kinesis</artifactId>
</dependency>
Kinesis Binder Overview
The Spring Cloud Stream Binder for AWS Kinesis provides the binding implementation for the Spring Cloud Stream. This implementation uses Spring Integration AWS Kinesis Channel Adapters at its foundation. The following captures how the Kinesis Binder implementation maps each of the configured destination to a AWS Kinesis Streams:
Unlike Apache Kafka the AWS Kinesis doesn’t provide out-of-the-box support for consumer groups.
The support of this feature is implemented as a part of MetadataStore
key for shard checkpoints in the KinesisMessageDrivenChannelAdapter
- [CONSUMER_GROUP]:[STREAM]:[SHARD_ID]
.
In addition, the LockRegistry
is used to ensure exclusive access to each shard.
This way only one channel adapter in the same consumer group will consumer messages from a single shard in the stream it is configured for.
The partitioning logic in AWS Kinesis is similar to the Apache Kafka support, but with slightly different logic.
The partitionKey
on the producer side determines which shard in the stream the data record is assigned to.
Partition keys are Unicode strings with a maximum length limit of 256 characters for each key.
AWS Kinesis uses the partition key as input to a hash function that maps the partition key and associated data to a specific shard.
Specifically, an MD5 hash function is used to map partition keys to 128-bit integer values and to map associated data records to shards.
As a result of this hashing mechanism, all data records with the same partition key map to the same shard within the stream.
But at the same time we can’t select target shard to send explicitly.
Although calculating the hash manually (and use explicitHashKeyExpression
for producer, respectively), we may track the target shard by inclusion into its HashKeyRange
.
By default partition key is a result of the Object.hash()
from the message payload
.
The Spring Cloud Stream partition handling logic is excluded in case of AWS Kinesis Binder since it is out of use and the provided producer.partitionKeyExpression
is propagated to the KinesisMessageHandler
directly.
On the consumer side the instanceCount
and instanceIndex
are used to distribute shards between consumers in group evenly.
This has an effect only for regular KinesisMessageDrivenChannelAdapter
which can assign specific shards for the target Kinesis consumer.
With Kinesis Client Library we can only subscribe to the provided stream and shards distribution is done by that client.
See more information in the Kinesis Client Library documentation.
Consumer Groups
Consumer groups are implemented with focus on High availability, Message ordering and guaranteed Message delivery in Spring cloud stream.
A single consumer
for the message is ensured by consumer group abstraction.
To have a highly available consumer group for your kinesis stream:
-
Ensure all instances of your consumer applications use a shared
DynamoDbMetadataStore
andDynamoDbLockRegistry
(See below for configuration options). -
Use same group name for the channel in all application instances by using property
spring.cloud.stream.bindings.<bindingTarget>.group
.
These configurations alone guarantee HA, message ordering and guaranteed message delivery. However, even distribution across instances is not guaranteed as of now. There is a very high chance that a single instance in a consumer group will pick up all the shards for consuming. But, when that instance goes down (couldn’t send heartbeat for any reason), other instance in the consumer group will start processing from the last checkpoint of the previous consumer (for shardIterator type TRIM_HORIZON).
So, configuring consumer concurrency is important to achieve throughput.
It can be configured using spring.cloud.stream.bindings.<bindingTarget>.consumer.concurrency
.
Static shard distribution within a single consumer group
It is possible to evenly distribute shard across all instances within a single consumer group. This done by configuring:
-
spring.cloud.stream.instanceCount=
to number of instances -
spring.cloud.stream.instanceIndex=
current instance’s index
The only way to achieve HA in this case is that, when an instance processing a particular shard goes down, another instance must have spring.cloud.stream.instanceIndex=
to be the same as the failed instance’s index to start processing from those shards.
Configuration Options
This section contains settings specific to the Kinesis Binder and bound channels.
For general binding configuration options and properties, please refer to the Spring Cloud Stream core documentation.
Kinesis Binder Properties
The following properties are available for Kinesis Binder configuration, which start with the spring.cloud.stream.kinesis.binder.
prefix
- headers
-
The set of custom headers to transfer over AWS Kinesis
Default: "correlationId", "sequenceSize", "sequenceNumber", "contentType", "originalContentType".
- describeStreamBackoff
-
The amount of time in milliseconds in between retries for the
DescribeStream
operationDefault:
1000
. - describeStreamRetries
-
The amount of times the consumer will retry a
DescribeStream
operation waiting for the stream to be inACTIVE
stateDefault:
50
. - autoCreateStream
-
If set to
true
, the binder will create the stream automatically. If set tofalse
, the binder will rely on the stream being already created.Default:
true
- autoAddShards
-
If set to
true
, the binder will create new shards automatically. If set tofalse
, the binder will rely on the shard size of the stream being already configured. If the shard count of the target stream is smaller than the expected value, the binder will ignore that valueDefault:
false
- minShardCount
-
Effective only if
autoAddShards
is set totrue
. The minimum number of shards that the binder will configure on the stream from which it produces/consumes data. It can be superseded by thepartitionCount
setting of the producer or by the value ofinstanceCount * concurrency
settings of the producer (if either is larger)Default:
1
- kplKclEnabled
-
Enable the usage of Kinesis Client Library / Kinesis Producer Library for all message consumption and production
Default:
false
MetadataStore
Support for consumer groups is implemented using DynamoDbMetadataStore.
The partitionKey
name used in the table is KEY
.
This is not configurable.
DynamoDB Checkpoint properties are prefixed with spring.cloud.stream.kinesis.binder.checkpoint.
- table
-
The name to give the DynamoDb table
Default:
SpringIntegrationMetadataStore
- createDelay
-
The amount of time in seconds between each polling attempt while waiting for the checkpoint DynamoDB table to be created
Default:
1
- createRetries
-
The amount of times the consumer will poll DynamoDB while waiting for the checkpoint table to be created
Default:
25
- billingMode
-
The Billing Mode of the DynamoDB table. See DynamoDB On-Demand Mode. Possible values are
provisioned
andpayPerRequest
. If left empty or set topayPerRequest
bothreadCapacity
andwriteCapacity
are ignoredDefault:
payPerRequest
- readCapacity
-
The Read capacity of the DynamoDb table. See DynamoDB Provisioned Throughput. This property is used only when
billingMode
is set toprovisioned
Default:
1
- writeCapacity
-
The write capacity of the DynamoDb table. See DynamoDB Provisioned Throughput. This property is used only when
billingMode
is set toprovisioned
Default:
1
- timeToLive
-
A period in seconds for items expiration. See DynamoDB TTL
No default - means no records expiration.
LockRegistry
LockRegistry is used to ensure exclusive access to each shard so that, only one channel adapter in the same consumer group will consumer messages from a single shard in the stream. This is implemented using DynamoDbLockRegistry
DynamoDB LockRegistry
properties are prefixed with spring.cloud.stream.kinesis.binder.locks.
- table
-
The name to give the DynamoDB table
Default:
SpringIntegrationLockRegistry
- billingMode
-
The Billing Mode of the DynamoDB table. See DynamoDB On-Demand Mode. Possible values are
provisioned
andpayPerRequest
. If left empty or set topayPerRequest
bothreadCapacity
andwriteCapacity
are ignoredDefault:
payPerRequest
- readCapacity
-
The Read capacity of the DynamoDB table. See DynamoDB Provisioned Throughput. This property is used only when
billingMode
is set toprovisioned
Default:
1
- writeCapacity
-
The write capacity of the DynamoDb table. See DynamoDB Provisioned Throughput. This property is used only when
billingMode
is set toprovisioned
Default:
1
- leaseDuration
-
The length of time that the lease for the lock will be granted for. If this is set to, for example, 30 seconds, then the lock will expire if the heartbeat is not sent for at least 30 seconds (which would happen if the box or the heartbeat thread dies, for example.)
Default:
20
- heartbeatPeriod
-
How often to update DynamoDB to note that the instance is still running (recommendation is to make this at least 3 times smaller than the
leaseDuration
- for exampleheartBeatPeriod=1
second,leaseDuration=10
seconds could be a reasonable configuration, make sure to include a buffer for network latency.)Default:
5
- refreshPeriod
-
How long to wait before trying to get the lock again (if set to 10 seconds, for example, it would attempt to do so every 10 seconds)
Default:
1000
- partitionKey
-
The partition key name of the table.
Default:
lockKey
- sortKeyName
-
The sort key name for DynamoDB table partitioning.
Default:
sortKey
- sortKey
-
The sort key to try and acquire the lock on (specify if and only if the table has sort keys)
Default:
SpringIntegrationLocks
Kinesis Consumer Properties
The following properties are available for Kinesis consumers only and must be prefixed with spring.cloud.stream.kinesis.bindings.<channel-name>.consumer
- startTimeout
-
The amount of time to wait for the consumer to start, in milliseconds.
Default:
60000
. - listenerMode
-
The mode in which records are processed. If
record
, eachMessage
will containbyte[]
from a singleRecord.data
. Ifbatch
, eachMessage
will contain aList<byte[]>
extracted from the consumed records. WhenuseNativeDecoding = true
is used on the consumer together with thelistenerMode = batch
, there is no any out-of-the-box conversion happened and a result message contains a payload likeList<com.amazonaws.services.kinesis.model.Record>
. It’s up to target application to convert those records manually.Default:
record
- checkpointMode
-
The mode in which checkpoints are updated. If
record
, checkpoints occur after each record is processed (but this option is only effective iflistenerMode
is set torecord
). Ifbatch
, checkpoints occur after each batch of records is processed. Ifmanual
, checkpoints occur on demand via theCheckpointer
callback. Ifperiodic
, checkpoints occurs at specified time interval (frominterval
property in checkpoint configuration)Default:
batch
- checkpointInterval
-
The interval, in milliseconds, between two checkpoints when checkpoint mode is
periodic
.Default -
5000
- workerId
-
The worker identifier used to distinguish different workers/processes (only used when Kinesis Client Library is enabled).
No default - if not set, default value inside spring-integration-aws will be used (random UUID).
- recordsLimit
-
The maximum number of records to poll per
GetRecords
request. Must not be greater than10000
.Default:
10000
- idleBetweenPolls
-
The sleep interval used in the main loop between shards polling cycles, in milliseconds. Must not be less than
250
.Default:
1000
- consumerBackoff
-
The amount of time the consumer will wait to attempt another
GetRecords
operation after a read with no results, in milliseconds.Default:
1000
- shardIteratorType
-
The
com.amazonaws.services.kinesis.model.ShardIteratorType
name with an optionalsequenceNumber
for theAT_SEQUENCE_NUMBER/AFTER_SEQUENCE_NUMBER
or milliseconds for theAT_TIMESTAMP
after:
. For example:AT_TIMESTAMP:1515090166767
.Default:
LATEST
for anonymous groups andTRIM_HORIZON
otherwise.
When TRIM_HORIZON shard iterator type is used, we need to take into account the time lag which happens during pointing the ShardIterator to the last untrimmed record in the shard in the system (the oldest data record in the shard).
So the getRecords() will move from that point to the last point, which takes time.
It is by default 1 day and it can be extended to 7 days.
This happens only for new consumer groups.
Any subsequent starts of the consumer in the same group are adjusted according the stored checkpoint via AFTER_SEQUENCE_NUMBER iterator type.
|
- dynamoDbStreams
-
The
boolean
flag indicating that Kinesis consumer channel adapter should adapt DynamoDB Streams functionality instead of regular Kinesis streams. Thespring.cloud.stream.bindings.<bindingTarget>.destination
value must be a DynamoDB table name. Default:false
.
Starting with version 2.0.1, beans of KinesisClientLibConfiguration
type can be provided in the application context to have a full control over Kinesis Client Library configuration options.
The stream and consumer group (plus workerId) must be provided in the respective KinesisClientLibConfiguration
bean.
When KclMessageDrivenChannelAdapter
endpoint is configured in the binder, it selects an appropriate KinesisClientLibConfiguration
from the application context according a destination (stream) for binding.
If there is no KinesisClientLibConfiguration
bean for its stream, the KclMessageDrivenChannelAdapter
falls back to original configuration with defaults options for its internal KinesisClientLibConfiguration
.
- shardId
-
An explicit shard id to consume from.
Kinesis Client Library does not support a configuration for a specific shard.
When shardId property is used, it is ignored for Kinesis Client Library and standard stream consumer distribution is applied.
Also, in case of an instanceCount > 1 , application will throw validation exception.
The instanceCount and shardId are considered as mutually exclusive.
|
Kinesis Producer Properties
The following properties are available for Kinesis producers only and must be prefixed with spring.cloud.stream.kinesis.bindings.<bindingTarget>.producer.
.
- sync
-
Whether the producer should act in a synchronous manner with respect to writing records into a stream. If true, the producer will wait for a response from Kinesis after a
PutRecord
operation.Default:
false
- sendTimeout
-
Effective only if
sync
is set totrue
. The amount of time to wait for a response from Kinesis after aPutRecord
operation, in milliseconds.Default:
10000
Also, if you’d like to produce a batch of records into Kinesis stream, the message payload must be as a PutRecordsRequest
instance and general Spring Cloud Stream producer property useNativeEncoding
must be set to true
, so Spring Cloud Stream won’t try to convert a PutRecordsRequest
into a byte[]
. The content of the PutRecordsRequest
is now end-user responsibility.
Error Channels
The binder can be configured to send producer exceptions to an error channel. See the section on Spring Cloud error channel support for more information.
The payload of the ErrorMessage
for a send failure is an AwsRequestFailureException
with properties:
-
failedMessage
- the spring-messagingMessage<?>
that failed to be sent. -
request
- the rawAmazonWebServiceRequest
(eitherPutRecordRequest
orPutRecordsRequest
) that was created from thefailedMessage
.
There is no automatic handling of these exceptions (such as sending to a dead letter queue), but you can consume these exceptions with your own Spring Integration flow.
DynamoDB Streams
Starting with version 1.2, the KinesisMessageChannelBinder
supports DynamoDB Streams for Kinesis consumer channel adapter.
The functionality is based on the DynamoDB Streams Kinesis Adapter and every Kinesis consumer endpoint selects an appropriate Kinesis Client according the spring.cloud.stream.kinesis.bindings.<bindingTarget>.consumer.dynamoDbStreams
boolean flag.
In this case a value for spring.cloud.stream.bindings.<bindingTarget>.destination
must be a DynamoDB table name.
Optional Resources
Starting with version 1.2, if your Spring Cloud Stream application delivered only in the source
role, the extra beans, required for sink
(or Kinesis consumers), are not going to be registered in the application context and, therefore, no need to worry about their resources on AWS.
The story is about DynamoDB and Cloud Watch.
AWS Roles and Policies
In order to be able to run properly on AWS, the role that will be used by the application needs to have a set of policies configured. Here are the policies statements that your application role need:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"kinesis:SubscribeToShard",
"kinesis:DescribeStreamSummary",
"kinesis:DescribeStreamConsumer",
"kinesis:GetShardIterator",
"kinesis:GetRecords",
"kinesis:PutRecords",
"kinesis:DescribeStream"
],
"Resource": [
"arn:aws:kinesis:<region>:<account_number>:*/*/consumer/*:*",
"arn:aws:kinesis:<region>:<account_number>:stream/<stream_name>"
]
},
{
"Effect": "Allow",
"Action": "kinesis:DescribeLimits",
"Resource": "*"
},
{
"Sid": "DynamoDB",
"Effect": "Allow",
"Action": [
"dynamodb:BatchGetItem",
"dynamodb:BatchWriteItem",
"dynamodb:PutItem",
"dynamodb:GetItem",
"dynamodb:Scan",
"dynamodb:Query",
"dynamodb:UpdateItem"
],
"Resource": [
"arn:aws:dynamodb:<region>:<account>:table/<name-of-metadata-table>",
"arn:aws:dynamodb:<region>:<account>:table/<name-of-lock-table>"
]
}
]
}
Keep in mind that these are only the policies to allow the application to consume/produce records from/to Kinesis. If you’re going to allow spring-cloud-stream-binder-kinesis to create the resources for you, you’ll need an extra set of policies.
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"dynamodb:CreateTable",
"kinesis:CreateStream",
"kinesis:UpdateShardCount",
"kinesis:EnableEnhancedMonitoring",
"kinesis:DisableEnhancedMonitoring",
"dynamodb:DeleteTable",
"dynamodb:UpdateTable"
],
"Resource": [
"arn:aws:dynamodb:<region>:<account>:table/<table_name>",
"arn:aws:kinesis:<region>:<account>:stream/<stream_name>"
]
}
]
}
Running locally with localstack
Sometimes we don’t have the necessary permissions to connect to the real Kinesis and DynamoDB from our developer’s machine. In moments like this, it’s pretty useful to setup Localstack in your project, so you can run everything locally, without having to worry about permissions and enterprise restrictions.
Create a docker-compose.yaml file, in the root of your project, to quickly start localstack
version: '3.5'
services:
localstack:
image: localstack/localstack:0.12.10
environment:
- AWS_DEFAULT_REGION=sa-east-1
- EDGE_PORT=4566
- SERVICES=kinesis, dynamodb
ports:
- '4566:4566'
volumes:
- localstack:/tmp/localstack
- './setup-localstack.sh:/docker-entrypoint-initaws.d/setup-localstack.sh'
volumes:
localstack:
After that, create a script called setup-localstack.sh, in the root directory, that will contain the script to create the Kinesis Stream, and the 2 DynamoDB Tables
awslocal kinesis create-stream --stream-name my-test-stream --shard-count 1
awslocal dynamodb create-table \
--table-name spring-stream-lock-registry \
--attribute-definitions AttributeName=lockKey,AttributeType=S AttributeName=sortKey,AttributeType=S \
--key-schema AttributeName=lockKey,KeyType=HASH AttributeName=sortKey,KeyType=RANGE \
--provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5 \
--tags Key=Owner,Value=localstack
awslocal dynamodb create-table \
--table-name spring-stream-metadata \
--attribute-definitions AttributeName=KEY,AttributeType=S \
--key-schema AttributeName=KEY,KeyType=HASH \
--provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5 \
--tags Key=Owner,Value=localstack
awslocal dynamodb list-tables
awslocal kinesis list-streams
Since this file is being mapped to the localstack image, the container will automatically run this script the first time you run the container.
To run localstack, just execute
docker-compose up -d
Your local AWS Endpoint is now available at http://localhost:4566
To put records into your test stream, just run
aws --endpoint-url=http://localhost:4566 kinesis put-record --stream-name my-test-stream --partition-key 1 --data <base64-encoded-data>
Telling the binder to use your local endpoint
By default, the Kinesis and DynamoDB Client will try to hit the real AWS Endpoint. To change this behavior, you have to declare a new @Bean, and override the endpoint.
For example:
@Configuration
@Profile("local")
class DynamoDBConfigLocal {
@Value("\${cloud.aws.region.static}")
val region: String = ""
private val endpointUrl: String = "http://localhost:4566"
@Bean
@Primary
fun amazonDynamoDBAsync(): AmazonDynamoDBAsync {
return AmazonDynamoDBAsyncClientBuilder.standard()
.withEndpointConfiguration(AwsClientBuilder.EndpointConfiguration(this.endpointUrl, region))
.build()
}
}
@Configuration
@Profile("local")
class KinesisConfigLocal {
@Value("\${cloud.aws.region.static}")
val region: String = ""
private val endpointUrl: String = "http://localhost:4566"
@Bean
fun amazonKinesis(awsCredentialsProvider: AWSCredentialsProvider): AmazonKinesisAsync {
return AmazonKinesisAsyncClientBuilder
.standard()
.withCredentials(awsCredentialsProvider)
.withEndpointConfiguration(AwsClientBuilder.EndpointConfiguration(endpointUrl, region))
.build()
}
}
Now, remember to pass the following environment variables when running locally:
SPRING_PROFILES_ACTIVE=local AWS_CBOR_DISABLE=true gradle bootRun
This will make sure that these beans are only instantiated when running locally, and will also disable CBOR, which is not supported for the localstack’s kinesis stream.
Kinesis Binder Health Indicator
Version 2.0 has introduced a KinesisBinderHealthIndicator
implementation which is a part of BindersHealthContributor
composition under the binders
path.
An out-of-the-box implementation iterates over Kinesis streams involved in the binder configuration calling a describeStream
command against them.
If any of streams doesn’t exist the health is treated as DOWN
.
If LimitExceededException
is thrown according describeStream
limitations, the KinesisBinderHealthIndicator
tries over again after one second interval.
Ony when all the configured stream are described properly the UP
health is returned.
You can override out-of-the-box implementation provided your own bean with the kinesisBinderHealthIndicator
name.