Kakajs the simplest way to use Kafka with Node JS
Kakajs the simplest way to use Kafka with Node JS
I am writing this based on my experience of using Kafka confluent with Node JS, let’s first check on Kafka
What is Apache Kafka?
Apache Kafka is a distributed data store optimized for ingesting and processing streaming data in real-time. Streaming data is data that is continuously generated by thousands of data sources, which typically send the data records in simultaneously. A streaming platform needs to handle this constant influx of data, and process the data sequentially and incrementally.
Kafka provides three main functions to its users:
- Publish and subscribe to streams of records
- Effectively store streams of records in the order in which records were generated
- Process streams of records in real-time
Kafka is primarily used to build real-time streaming data pipelines and applications that adapt to the data streams. It combines messaging, storage, and stream processing to allow storage and analysis of both historical and real-time data
Kafka messages are persisted on the disk and replicated within the cluster to prevent data loss. Kafka is built on top of the ZooKeeper synchronization service. It integrates very well with Apache Storm and Spark for real-time streaming data analysis
In this blog, we are talking about how to connect and build your service with Kafka where the Kafka cluster is already there
it's a simple picture we have Kafka Platform ready from https://confluent.cloud/ its Kafka platform provider from where we can buy this service and can start using it, it's like managed solutions provided by AWS
Now we can send or stream messages to Kafka where consumers can consume and react on that message, so what do we need for doing this, some library and Kafka connection details from https://confluent.cloud/
Let's check different options
node-rdkafka
Copyright (c) 2016 Blizzard Entertainment. https://github.com/blizzard/node-rdkafka I am looking for your help to make…
I started with node-rdkafka and later I moved to kafkajs but why ??
- Node.js version compatibility can cause problems with
node-rdkafka
. - Use the OS installation of
librdkafka
or build from source. - If you use
node-rdkafka
, you are bound to encounter compatibility issues as you upgrade the library or versions of Node.js. I recommend you use a system installation oflibrdkafka
and theBUILD_LIBRDKAFKA=0
flag to prevent the recompilation of the library onnpm install
. Configuring Kafka can be complicated — https://rclayton.silvrback.com/thoughts-on-node-rdkafka-development
UnhandledPromiseRejectionWarning: Error: Unsupported value "sasl_ssl" for configuration property "security.protocol": OpenSSL not available at build time
at Producer.Client (/Users/node_modules/node-rdkafka/lib/client.js:54:18)
at new Producer (/Users/node_modules/node-rdkafka/lib/producer.js:75:10)
- user compatible node version with node-rdkafka
- possible error
UnhandledPromiseRejectionWarning: Error: Unsupported value “sasl_ssl” for configuration property “security.protocol”: OpenSSL not available at build time
fix is link OpenSSL properly - You can see if you can fix by linking open SSL properly
brew link openssl --force
export LDFLAGS="-L/usr/local/opt/openssl@1.1/lib"
export CPPFLAGS="-I/usr/local/opt/openssl@1.1/include"
echo 'export PATH="/usr/local/opt/openssl@1.1/bin:$PATH"' >> ~/.zshrc
npm rebuild node-rdkafka
So the conclusion is I am not a big fan of node-rdkafka
Here is the better solution Kafkajs
Getting Started · KafkaJS
Install KafkaJS using :
yarn add kafkajs
npm install kafkajs
Let's start by instantiating the KafkaJS client by…
kafka js is a native library without any node js binding so there will be no compatibility issues and no runtime errors
So with kafka js, lots of problems with integration are no more for the developers
const { Kafka } = require('kafkajs')
// This creates a client instance that is configured to connect to the Kafka broker provided by
// the environment variable KAFKA_BOOTSTRAP_SERVER
const kafka = new Kafka({
clientId: 'qa-topic',
brokers: ['xxxxxxxxx.confluent.cloud:9092'],
ssl: true,
logLevel: 2,
sasl: {
mechanism: 'plain',
username: 'xxxxxxxxxxx',
password: 'xxxxxxxxxx'
}
})
const producer = kafka.producer()
producer.on('producer.connect', () => {
console.log(`KafkaProvider: connected`);
});
producer.on('producer.disconnect', () => {
console.log(`KafkaProvider: could not connect`);
});
producer.on('producer.network.request_timeout', (payload) => {
console.log(`KafkaProvider: request timeout ${payload.clientId}`);
});
const run = async () => {
// Producing
await producer.connect()
await producer.send({
topic: 'supplier-ratings',
messages: [
{
value: Buffer.from(JSON.stringify(
{
"event_name": "QA",
"external_id": user_uuiD,
"payload": {
"supplier_id": i.supplier_id,
"assessment": {
"performance": 7,
"quality": 7,
"communication": 7,
"flexibility": 7,
"cost": 7,
"delivery": 6
}
},
"metadata": {
"user_uuid": "5a12cba8-f4b5-495b-80ea-d0dd5d4ee17e"
}
}
))
},
],
})
Consuming
await consumer.connect()
await consumer.subscribe({ topic: 'test-topic', fromBeginning: true })
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log({
partition,
offset: message.offset,
value: message.value.toString(),
})
},
})
}
run().catch(console.error)
References
- https://docs.confluent.io/5.5.1/kafka/introduction.html
- https://aws.amazon.com/msk/what-is-kafka/
- https://aws.amazon.com
- https://www.tutorialspoint.com/apache_kafka/apache_kafka_introduction.htm
- https://rclayton.silvrback.com/thoughts-on-node-rdkafka-development
- https://kafka.js.org/docs/getting-started