HE
hengfengli/kcpy
Python library for consuming Kinesis Data Stream.
Kinesis Consumer in Python
A kinesis consumer is purely written in python. This is a lightweight wrapper
on top of AWS python library boto3. You also can
consume records from Kinesis Data Stream (KDS) via:
- Lambda function: I have a demo kinesis-lambda-sqs-demo
showing how to consume records in a serverless and real-time way. - Kinesis Firehose: This is a AWS managed service and easily save records
into different sinks, like S3, ElasticSearch, Redshift.
Installation
Install the package via pip:
pip install kcpyGetting started
from kcpy import StreamConsumer
consumer = StreamConsumer('my_stream_name')
for record in consumer:
print(record)The output would look like:
{
'ApproximateArrivalTimestamp': datetime.datetime(2018, 11, 13, 11, 57, 55, 117807),
'Data': b'Jessica Walter',
'PartitionKey': 'Jessica Walter',
'SequenceNumber': '1'
}Or, you can consume stream data with checkpointing:
from kcpy import StreamConsumer
consumer = StreamConsumer('my_stream_name', consumer_name='my_consumer', checkpoint=True)
for record in consumer:
print(record)Checkpointing
Below shows the schema of checkpointing:
producer
[stream_1] |
+---------------+---+---+---+---+---+---+---+---+ |
| shard_1 | 1 | 2 | 3 | 4 | 5 | 6 | 7 |...| <-------------------+
+---------------+---+---+---+---+---+---+---+---+ |
| shard_2 | 1 | 2 | 3 | 4 | 5 |...| <---------------------------+
+---------------+---+---+---+---+---+---+---+---+---+ |
| shard_3 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 |...| <---------------+
+---------------+---+---+---+---+---+---+---+---+---+
^ ^
| |
consumer_1 consumer_2
| |
| +---------+
| |
+------------------+ |
| |
v |
+---------------+-------------+----------+--------+ |
| consumer_name | stream_name | shard_id | seq_no | |
+---------------+-------------+----------+--------+ |
| consumer_1 | stream_1 | shard_1 | 5 | |
| consumer_1 | stream_1 | shard_2 | 15 | |
| consumer_1 | stream_1 | ... | 15 | |
| consumer_1 | stream_1 | shard_N | XX | |
| consumer_2 | stream_1 | shard_1 | 6 | <---+
+---------------+-------------+----------+--------+
Features
- Read records from a stream with multiple shards
- Save checkpoint for each shard consumer for a stream
Todo
Add type checking with mypyAdd tox for automating multiple testing environmentsAdd the config for travis CI- Support other storage solutions (mysql, dynamodb, redis, etc.) for checkpointing
- Rebalance when the number of shards changes
- Allow kcpy to run on multiple machines
Changelog
0.1.7
- Add travis CI config and remove python3.5.
0.1.6
- Fix some issues in setup.py.
0.1.5
- Add consumer checkpointing with a simple sqlite storage solution.
0.1.4
- Pass aws configurations into boto3 client directly.
0.1.3
- Update the README.
0.1.2
- Add markdown support for long description.
0.1.1
- Add a long description.
0.1.0
- First version of kcpy.
License
Copyright (c) 2018 Hengfeng Li. It is free software, and may
be redistributed under the terms specified in the LICENSE file.
