Real-time Machine Learning Application with Kafka

Kushagra Makharia
4 min readMar 29, 2024

--

Generated with Microsoft Copilot in Bing
Generated with Microsoft Copilot in Bing

In today’s data-driven landscape, organizations seek to extract valuable insights from their streaming data in real time. Enter Apache Kafka, a powerful distributed event streaming platform that acts as the central nervous system for data integration, processing, and communication. But what happens when we combine Kafka’s capabilities with the world of machine learning?

But wait.. What really is Kafka?

Apache Kafka is an open-source distributed streaming system used for stream processing, real-time data pipelines, and data integration at scale. It combines two messaging models, queuing and publish-subscribe, which provides advantage of being distributed along with highly scalability.

Kafka Terminology:

Before diving into Apache Kafka, familiarize yourself with these essential terms:

Message

Message is a array of bytes that is sent via a producer to a consumer

Topics

Messages are usually grouped/organized within a topic which are persisted on a physical storage

Producers

Producers publishes the messages attaching it to a topic

Consumers

Consumers usually subscribe to one or more topics to fetch/receive data published by the Producer from the beginning or in real-time

Streams

A combination of producer and consumer. Usually used to pre-process the message published via consumer and then publish it further to the consumer via different topic

Connector

They are used for creating reusable producers or consumers that connect kafka topics to existing data applications

What is Machine Learning?

Machine learning is a subfield of artificial intelligence that gives computers the ability to learn without explicitly being programmed.

Now, to learn more about how kafka can work with Machine Learning, we will see the code implementation in next section.

We will be using Confluent-kafka for this example. You must install and run the Zookeeper server and Kafka server before running this example code.

kafka-producer.py

from confluent_kafka import Producer
import time
import socket

conf = {'bootstrap.servers': 'localhost:9092',
'client.id': socket.gethostname()}

producer = Producer(conf)

Here, we have given the configuration to the Producer to establish the connection with the kafka cluster.

def acked(err, msg):
if err is not None:
print("Failed to deliver message: %s: %s" % (str(msg), str(err)))
else:
print("Message produced: %s" % (str(msg)))

We can also have a callback method to call if the message is failed to produce.

while True:
msg = input("Enter the message:")
producer.produce('comm-line', key="text", value=msg, callback=acked)

Here, we will take the input from the user in real-time to publish in the topic “comm-line”.

kafka-streamer.py

from confluent_kafka import Consumer, Producer
from transformers import pipeline
import socket

# Consumer
consumer_conf = {'bootstrap.servers': 'localhost:9092',
'group.id': 'streamer1',
'auto.offset.reset': 'smallest'}

consumer = Consumer(consumer_conf)
consumer.subscribe(['comm-line'])

# Producer
producer_conf = {'bootstrap.servers': 'localhost:9092',
'client.id': socket.gethostname()}

producer = Producer(producer_conf)

Initialized a producer and a consumer.

Here, we will be using a hugging-face library to get a text moderation model to filter out toxic messages sent by the producer previously.

def predict_sentiment(text):
print(text)
pipe = pipeline(model="KoalaAI/Text-Moderation")
category_dict = {
"S": "sexual",
"H": "hate",
"V": "violence",
"HR": 'harassment',
"SH": "self harm",
"S3": "sexual/minors",
"H2": "hate/threatening",
"V2": "violence/graphic",
"OK": "Ok"
}
res = pipe(str(text))
return category_dict[res[0]['label']]

Using this, we will be receiving messages from the kafka-producer, filter the messages, and send it forward within a new topic.

def acked(err, msg):
if err is not None:
print("Failed to deliver message: %s: %s" % (str(msg), str(err)))
else:
print("Message produced: %s" % (str(msg)))

while True:
# Consume messages from the topics
messages = consumer.consume(num_messages=10, timeout=1.0)

# Loop over the messages
for message in messages:
# Check for errors
if message.error():
print(f"Error: {message.error()}")
else:
# Print the message value and metadata
text = message.value()
sentiment = predict_sentiment(text)
if sentiment != 'Ok':
text = f"Censored due to {sentiment} content"
producer.produce('res-line', key="text", value=text, callback=acked)

Here, we will be showing the output to the end user.

kafka-consumer.py

from confluent_kafka import Consumer

conf = {'bootstrap.servers': 'localhost:9092',
'group.id': 'consumer1',
'auto.offset.reset': 'smallest'}

consumer = Consumer(conf)

consumer.subscribe(['res-line'])

while True:
# Consume messages from the topics
messages = consumer.consume(num_messages=10, timeout=1.0)

# Loop over the messages
for message in messages:
# Check for errors
if message.error():
print(f"Error: {message.error()}")
else:
print(f"Message value: {str(message.value())}")

In this blog, we embarked on an exciting journey into the realm of Real-time Machine Learning with Kafka. Let’s recap the key takeaways:

Kafka’s Power: We explored how Apache Kafka serves as a robust distributed streaming platform. Its ability to handle real-time data feeds, scalability, and fault tolerance makes it a cornerstone for modern data architectures.

Integration with ML: We delved into the fusion of Kafka and machine learning. By seamlessly integrating ML models into Kafka applications, we unlock powerful capabilities for real-time predictive analytics.

Use Case: Our sample use case demonstrated how Kafka enables real-time predictions. Whether it’s personalized recommendations, targeted ads, or fraud detection, Kafka provides the backbone for ML-driven insights.

Terminology: We familiarize ourselves with essential Kafka terminology — topics, producers, consumers, streams, and more. Understanding these concepts is crucial for building effective Kafka-based solutions.

Remember, the synergy between Kafka and ML empowers organizations to make informed decisions, detect anomalies, and enhance user experiences. As you embark on your own Kafka journey, keep exploring, experimenting, and innovating!

GitHub Repository:

https://github.com/KushagraMakharia/Comment-Box

Embrace Wonder, Chase the Extraordinary
— Kushagra

--

--