Kafka

In another post we are doing to explain how to use Kafka to retrieve streaming data and then write that to Spark Streaming.  But first we explain how to get started with Apache Kafka.

As every writer says when they first write about Kafka, Kafka was developed by LinkedIn.

Having gotten that out of the way. Let’s now explain how to install Kafka, how to send messages to yourself using the command line shell, and then how to write a simple Python program to pull messages off the queue.

[See Also: Zymr Product Engineering Services – Outcome Certainty for Startups]

First, you need to know two definitions.  A Kafka program that writes messages to the message queue is called a Producer. A program that retrieves a message from the queue is called a Consumer.

Install Kafka

wget http://www-eu.apache.org/dist/kafka/0.9.0.1/kafka_2.10-0.9.0.1.tgz

And unzip it somewhere and then set $KAFKA_HOME.

Start Kafka

First start Zookeeper and then start Kafka.  Zookeeper is automatically installed when you install Kafka.

$KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties

$KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties

Kafka Pingpong:  Send Messages to Yourself

You can use these two commands to send messages to yourself.  Run the producer in one terminal session and then run the consumer in another.  Watch the consumer pop messages of the queue.

$KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic HiMe

$KAFKA_HOME/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic HiMe --from-beginning

Notice that we did not have to create the topic HiMe.  Also notice that we used the default ports.

[See Also: Optimizing Continuous Testing for DevOps Success]

Sample Python Kafka Consumer Program

Install the Kafka client for Python:

pip install kafka-python

Add that to your Python path:

export PYTHONPATH=$SPARK_HOME/python/:$SPARK_HOME/python/lib/py4j-0.9-src.zip

Here is a sample program.  It will loop through messages that you have added to the HiMe topic.  Use the console mentioned above to put messages there.

import threading

from kafka import KafkaConsumer

consumer = KafkaConsumer(bootstrap_servers='localhost:9092',

...                       auto_offset_reset='earliest')

consumer.subscribe(['HiMe'])

for message in consumer:

    print (message)

...

Everything you need to know about outsourcing technology development

Access a special Introduction Package with everything you want to know about outsourcing your technology development. How should you evaluate a partner? What components of your solution that are suitable to be handed off to a partner? These answers and more below.

Introduction Package

Zymr blogger

0 comments

Leave a Reply

© 2019, Zymr, Inc. All Rights Reserved.| LEGAL DISCLAIMER | PRIVACY POLICY | COOKIE POLICY