DynamoDB CDC Relay¶
About¶
Relay data changes from DynamoDB into CrateDB using a one-stop command
ctk load table kinesis+dynamodb+cdc://...
, in order to facilitate
convenient data transfers to be used within data pipelines or ad hoc
operations.
It taps into Change data capture for DynamoDB Streams, in this case using Kinesis Data Streams. It is the sister to the corresponding full-load implementation, DynamoDB Table Loader.
Install¶
pip install --upgrade 'cratedb-toolkit[kinesis]'
Usage¶
Consume data from Kinesis Data Stream of DynamoDB CDC events into CrateDB schema/table.
export CRATEDB_SQLALCHEMY_URL=crate://crate@localhost:4200/testdrive/demo
ctk load table kinesis+dynamodb+cdc://AWS_ACCESS_KEY:AWS_SECRET_ACCESS_KEY@aws/cdc-stream?region=eu-central-1
Query data in CrateDB.
export CRATEDB_SQLALCHEMY_URL=crate://crate@localhost:4200/testdrive/demo
ctk shell --command "SELECT * FROM testdrive.demo;"
ctk show table "testdrive.demo"
Options¶
Batch Size¶
The source URL option batch-size
configures how many items to consume from
the Kinesis Stream at once. The default value is 100
.
For many datasets, a much larger batch size is applicable for most efficient
data transfers.
ctk load table .../cdc-stream?batch-size=5000
Create¶
The source URL option create
configures whether the designated Kinesis Stream
should be created upfront. The default value is false
.
ctk load table .../cdc-stream?create=true
Create Shards¶
The source URL option create-shards
configures whether the designated number
of shards when a Kinesis Stream is created before consuming.
The default value is 1
.
ctk load table .../cdc-stream?create=true&create-shards=4
Region¶
The source URL accepts the region
option to configure the AWS region
label. The default value is us-east-1
.
ctk load table .../cdc-stream?region=eu-central-1
Start¶
The source URL accepts the start
option to configure the DynamoDB ShardIteratorType.
It accepts the following values, mapping to corresponding original options. The default
value is earliest
.
ctk load table .../cdc-stream?start=latest
start=earliest
Start reading at the last (untrimmed) stream record, which is the oldest record in the shard. In DynamoDB Streams, there is a 24 hour limit on data retention. Stream records whose age exceeds this limit are subject to removal (trimming) from the stream. This option equals
ShardIteratorType=TRIM_HORIZON
.start=latest
Start reading just after the most recent stream record in the shard, so that you always read the most recent data in the shard. This option equals
ShardIteratorType=LATEST
.start=seqno-at&seqno=...
Start reading exactly from the position denoted by a specific sequence number. This option equals
ShardIteratorType=AT_SEQUENCE_NUMBER
andSequenceNumber=...
.start=seqno-after&seqno=...
Start reading right after the position denoted by a specific sequence number. This option equals
ShardIteratorType=AFTER_SEQUENCE_NUMBER
andSequenceNumber=...
.
SeqNo¶
The source URL accepts the seqno
option to configure the DynamoDB SequenceNumber
parameter. It accepts the sequence number of a stream record in the shard from which
to start reading.
ctk load table .../cdc-stream?start=seqno-after&seqno=49590338271490256608559692538361571095921575989136588898
Idle Sleep¶
The idle-sleep
option configures the waiting time to hibernate the event loop after
running out of items to consume. The default value is 0.5
.
Buffer Time¶
The buffer-time
option configures the time to wait before flushing produced items
to the wire. The default value is 0.5
.
Variants¶
CrateDB Cloud¶
When aiming to transfer data to CrateDB Cloud, the shape of the target URL looks like that.
export CRATEDB_SQLALCHEMY_URL='crate://admin:dZ...6LqB@testdrive.eks1.eu-west-1.aws.cratedb.net:4200/?ssl=true'
LocalStack¶
In order to exercise data transfers exclusively on your workstation, you can use LocalStack to run DynamoDB and Kinesis service surrogates locally. See also the Get started with Kinesis on LocalStack tutorial.
For addressing a Kinesis Data Stream on LocalStack, use a command of that shape. See Credentials for accessing LocalStack AWS API for further information.
ctk load table kinesis+dynamodb+cdc://LSIAQAAAAAAVNCBMPNSG:dummy@localhost:4566/cdc-stream?region=eu-central-1
Tip
LocalStack is a cloud service emulator that runs in a single container on your laptop or in your CI environment. With LocalStack, you can run your AWS applications or Lambdas entirely on your local machine without connecting to a remote cloud provider.
In order to invoke LocalStack on your workstation, you can use this Docker command.
docker run \
--rm -it \
-p 127.0.0.1:4566:4566 \
-p 127.0.0.1:4510-4559:4510-4559 \
-v /var/run/docker.sock:/var/run/docker.sock \
localstack/localstack:latest