Skip to content
This repository has been archived by the owner on Nov 22, 2021. It is now read-only.

Latest commit

 

History

History
53 lines (38 loc) · 1.22 KB

README.md

File metadata and controls

53 lines (38 loc) · 1.22 KB

kinsumer

High level Amazon Kinesis Streams consumer.

Some features

  • Automatically detect shard count changes
  • Checkpoints/sequences persistence can be customized
  • Provided Checkpointer implementation for memory, and file
  • Memory bucket for temporary saving records

Usage

from kinsumer import Consumer

STREAM_REGION = 'ap-south-1'
STREAM_NAME = 'my-stream'
consumer = Consumer(__name__)
consumer.config.from_object(__name__)

@consumer.transform
def transform(data, shard_id, last_sequence_number, last_arrival_timestamp):
    """do transform and return"""
    return data

@consumer.after_consume
def after(data, shard_id, last_sequence_number, last_arrival_timestamp):
    """after transform and do something"""

if __name__ == '__main__':
    consumer.process()

Author and license

kinsumer is written by Ungi Kim, and licensed under the MIT license. You can find the source code from Github:

$ git clone git@github.com:ungikim/kinsumer.git

Missing features

  • Redis Checkpointer
  • Consumer Heartbeat

(Contributions would be appreciated!)