Apach KafkaはLinkedInで開発された分散メッセージングシステムです。ArduinoからMQTTブローカーにpublishしたセンシングデータをKafkaでconsumeしてからRiemann、Spark Streaming、Stormなどのリアルタイムストリーミング処理が目的です。
Docker Composeのインストール
KafkaのDockerイメージはwurstmeister/kafka-dockerを使います。Kafkaクラスタの構成管理にZookeeperが必要になります。このイメージはDocker Composeを使っているのでちょうど良い勉強になります。事前にインストールしておきます。
$ curl -L https://github.com/docker/compose/releases/download/1.2.0/docker-compose-`uname -s`-`uname -m` > /usr/local/bin/docker-compose; chmod +x /usr/local/bin/docker-compose |
プロジェクトの作成
適当なディレクトリを作成して、リポジトリからgit clone
します。
$ cd ~/docker_apps |
クラスタを起動する
Kafka Dockerの手順を読みながらクラスタの構築と簡単なテストまで行います。
docker-compose.yml
リポジトリにはクラスタ用と1台構成用のdocker-compose.ymlが用意されています。今回はブローカーを2台起動したいのでクラスタ用のdocker-compose.ymlを使います。YAMLのKAFKA_ADVERTISED_HOST_NAME
の値をDockerホストのIPアドレスに書き換えて実行します。
zookeeper: |
最初にdocker-compose up
するとkafkaのDockerイメージのビルドが始まります。必要なコンテナがすべて起動するまでしばらく待ちます。
$ docker-compose up |
クラスタがすべて起動したら別シェルからKafkaブローカーを2台に変更します。
$ docker-compose scale kafka=2 |
ps
でコンテナの起動状況を確認します。
$ docker-compose ps |
Kafka Shell
Kafka Shellを起動する書式は以下です。
$ start-kafka-shell.sh <DOCKER_HOST_IP> <ZK_HOST:ZK_PORT> |
ZK_HOST:PORT
の値はdocker-compose ps
で確認した値を使います。
$ cd ~/docker_apps/kafka-docker/ |
start-kafka-shell.sh
では使い捨てのDockerコンテナを起動してbashを実行します。このコンテナを使いKafkaのproducerとconsumerのプロセスを起動します。
Kafkaコンテナの環境変数を確認しておきます。
$ echo $KAFKA_HOME |
topicの作成
Kafka Shellを使いtopicを作成します。Kafkaのtopicはメッセージのカテゴリになります。このtopicはパーティションは4つ、メッセージのレプリカは2個の設定です。
$ $KAFKA_HOME/bin/kafka-topics.sh --create --topic topic \ |
作成したtopicを確認します。
$ $KAFKA_HOME/bin/kafka-topics.sh --describe --topic topic --zookeeper $ZK |
producerの送信
Kafka Shellの一つからproducerを実行します。コンソールは文字列の入力待ちになります。
$ $KAFKA_HOME/bin/kafka-console-producer.sh --topic=topic \ |
consumerの受信
Kafka Shellを起動してconsumerを実行します。
$ ./start-kafka-shell.sh 10.3.0.165 10.3.0.165:32788 |
入力待ちのproducerのコンソールに文字列をタイプすると、consumerのコンソールに表示されます。