Kafka-node-wrapper is a custom wrapper built around the npm module kafka-node. It tries to simplify the complexities of implementing the boiler-plate and helps in focusing on the development of the application. It aims at a configure and use kind of strategy where in the developer just needs to setup the configuration of the client, producer/consumer.
- Consumer Groups
- High Level Producer
- SSL based connections on Kafka 0.9 +
- SASL/PLAIN based connections Kafka 0.10 +
Kafka can be installed and setup by following the official Kafka Guide.
Example
const kafka = require('node-kafka-wrapper');
function sample() {
kafka.producer.sendMessage(client_options, producer_options, ['sample_message'], 'sample_topic', () => console.log('Done'));
kafka.consumer.subscribe(client_options, consumer_options, ['sample_topic'], (message) => {
// Do some operations
});
}
sample();
Methods
sendMessage : behaves as a producer to send data to the broker
subscribe : behaves as a consumer group to subscribe to messages
Connects to the broker as a HighLevelProducer to publish data to a topic. The message can either be a string or keyed message.
The syntax for the producer is as follows.
const kafka = require('node-kafka-wrapper');
kafka.producer.sendMessage(client_options, producer_options, [string_message], topic_name, callback);
Connects to the broker as a ConsumerGroup to subscribe to data from a topic.
The syntax for the consumer is as follows.
const kafka = require('node-kafka-wrapper');
kafka.consumer.subscribe(client_options, consumer_options, ['topic1', 'topic2', 'topic3'], callback);
All the client, producer/consumer options compatible with the kafka-node library is accepted.
Example
{
kafkaHost: 'www.kafka-domain.com',
ssl: true,
requestTimeout: 100000,
sslOptions: {
host: 'www.kafka-domain.com',
ca: ['CA certificate'],
},
}
- kafkaHost
: A string of kafka broker/host combination delimited by comma for example:
kafka-1.us-east-1.myapp.com:9093,kafka-2.us-east-1.myapp.com:9093,kafka-3.us-east-1.myapp.com:9093default:
localhost:9092`. connectTimeout
: in ms it takes to wait for a successful connection before moving to the next host default:10000
requestTimeout
: in ms for a kafka request to timeout default:30000
autoConnect
: automatically connect when KafkaClient is instantiated otherwise you need to manually callconnect
default:true
connectRetryOptions
: object hash that applies to the initial connection. see retry module for these options.idleConnection
: allows the broker to disconnect an idle connection from a client (otherwise the clients continues to O after being disconnected). The value is elapsed time in ms without any data written to the TCP socket. default: 5 minutesreconnectOnIdle
: when the connection is closed due to client idling, client will attempt to auto-reconnect. default: truemaxAsyncRequests
: maximum async operations at a time toward the kafka cluster. default: 10sslOptions
: Object, options to be passed to the tls broker sockets, ex.{ rejectUnauthorized: false }
(Kafka 0.9+)sasl
: Object, SASL authentication configuration (only SASL/PLAIN is currently supported), ex.{ mechanism: 'plain', username: 'foo', password: 'bar' }
(Kafka 0.10+)
{
// Configuration for when to consider a message as acknowledged, default 1
requireAcks: 1,
// The amount of time in milliseconds to wait for all acks before considered, default 100ms
ackTimeoutMs: 100,
// Partitioner type (default = 0, random = 1, cyclic = 2, keyed = 3, custom = 4), default 0
partitionerType: 2
}
var options = {
kafkaHost: 'broker:9092', // connect directly to kafka broker (instantiates a KafkaClient)
batch: undefined, // put client batch settings if you need them
ssl: true, // optional (defaults to false) or tls options hash
groupId: 'ExampleTestGroup',
sessionTimeout: 15000,
// An array of partition assignment protocols ordered by preference.
// 'roundrobin' or 'range' string for built ins (see below to pass in custom assignment protocol)
protocol: ['roundrobin'],
encoding: 'utf8', // default is utf8, use 'buffer' for binary data
// Offsets to use for new groups other options could be 'earliest' or 'none' (none will emit an error if no offsets were saved)
// equivalent to Java client's auto.offset.reset
fromOffset: 'latest', // default
commitOffsetsOnFirstJoin: true, // on the very first time this consumer group subscribes to a topic, record the offset returned in fromOffset (latest/earliest)
// how to recover from OutOfRangeOffset error (where save offset is past server retention) accepts same value as fromOffset
outOfRangeOffset: 'earliest', // default
// Callback to allow consumers with autoCommit false a chance to commit before a rebalance finishes
// isAlreadyMember will be false on the first connection, and true on rebalances triggered after that
onRebalance: (isAlreadyMember, callback) => { callback(); } // or null
};
This is just a wrapper to simplify the implementation of kafka-node.