0%

Kafka in Dockerで分散メッセージングシステムを構築する

Apach KafkaLinkedInで開発された分散メッセージングシステムです。ArduinoからMQTTブローカーにpublishしたセンシングデータをKafkaでconsumeしてからRiemannSpark StreamingStormなどのリアルタイムストリーミング処理が目的です。

Docker Composeのインストール

KafkaのDockerイメージはwurstmeister/kafka-dockerを使います。Kafkaクラスタの構成管理にZookeeperが必要になります。このイメージはDocker Composeを使っているのでちょうど良い勉強になります。事前にインストールしておきます。

$ curl -L https://github.com/docker/compose/releases/download/1.2.0/docker-compose-`uname -s`-`uname -m` > /usr/local/bin/docker-compose; chmod +x /usr/local/bin/docker-compose

プロジェクトの作成

適当なディレクトリを作成して、リポジトリからgit cloneします。

$ cd ~/docker_apps
$ git clone https://github.com/wurstmeister/kafka-docker.git
$ cd kafka-docker

クラスタを起動する

Kafka Dockerの手順を読みながらクラスタの構築と簡単なテストまで行います。

docker-compose.yml

リポジトリにはクラスタ用と1台構成用のdocker-compose.ymlが用意されています。今回はブローカーを2台起動したいのでクラスタ用のdocker-compose.ymlを使います。YAMLのKAFKA_ADVERTISED_HOST_NAMEの値をDockerホストのIPアドレスに書き換えて実行します。

~/docker_apps/kafka-docker/docker-compose.yml
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181"
kafka:
build: .
ports:
- "9092"
links:
- zookeeper:zk
environment:
KAFKA_ADVERTISED_HOST_NAME: 10.3.0.165
volumes:
- /var/run/docker.sock:/var/run/docker.sock

最初にdocker-compose upするとkafkaのDockerイメージのビルドが始まります。必要なコンテナがすべて起動するまでしばらく待ちます。

$ docker-compose up

クラスタがすべて起動したら別シェルからKafkaブローカーを2台に変更します。

$ docker-compose scale kafka=2
Creating kafkadocker_kafka_2...
Starting kafkadocker_kafka_2...

psでコンテナの起動状況を確認します。

$ docker-compose ps
Name Command State Ports
-----------------------------------------------------------------------------------------------------
kafkadocker_kafka_2 /bin/sh -c start- Up 0.0.0.0:32789->9092/tcp
kafka.sh
kafkadocker_kafka_3 /bin/sh -c start- Up 0.0.0.0:32790->9092/tcp
kafka.sh
kafkadocker_zookeeper_1 /bin/sh -c Up 0.0.0.0:32788->2181/tcp
/usr/sbin/sshd ... , 22/tcp, 2888/tcp,
3888/tcp

Kafka Shell

Kafka Shellを起動する書式は以下です。

$ start-kafka-shell.sh <DOCKER_HOST_IP> <ZK_HOST:ZK_PORT>

ZK_HOST:PORTの値はdocker-compose psで確認した値を使います。

$ cd ~/docker_apps/kafka-docker/
$ ./start-kafka-shell.sh 10.3.0.165 10.3.0.165:32788

start-kafka-shell.shでは使い捨てのDockerコンテナを起動してbashを実行します。このコンテナを使いKafkaのproducerとconsumerのプロセスを起動します。

Kafkaコンテナの環境変数を確認しておきます。

$ echo $KAFKA_HOME
/opt/kafka_2.10-0.8.2.0
$ echo $ZK
10.3.0.165:32788

topicの作成

Kafka Shellを使いtopicを作成します。Kafkaのtopicはメッセージのカテゴリになります。このtopicはパーティションは4つ、メッセージのレプリカは2個の設定です。

$ $KAFKA_HOME/bin/kafka-topics.sh --create --topic topic \
--partitions 4 --zookeeper $ZK --replication-factor 2
Created topic "topic".

作成したtopicを確認します。

$ $KAFKA_HOME/bin/kafka-topics.sh --describe --topic topic --zookeeper $ZK
Topic:topic PartitionCount:4 ReplicationFactor:2 Configs:
Topic: topic Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: topic Partition: 1 Leader: 2 Replicas: 2,1 Isr: 2,1
Topic: topic Partition: 2 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: topic Partition: 3 Leader: 2 Replicas: 2,1 Isr: 2,1

producerの送信

Kafka Shellの一つからproducerを実行します。コンソールは文字列の入力待ちになります。

$ $KAFKA_HOME/bin/kafka-console-producer.sh --topic=topic \
--broker-list=`broker-list.sh`

consumerの受信

Kafka Shellを起動してconsumerを実行します。

$ ./start-kafka-shell.sh 10.3.0.165 10.3.0.165:32788
$ $KAFKA_HOME/bin/kafka-console-consumer.sh --topic=topic --zookeeper=$ZK

入力待ちのproducerのコンソールに文字列をタイプすると、consumerのコンソールに表示されます。