前回作成したKafkaクラスタをテストするために、簡単なNode.jsのproducerとconsumer用のコンテナを作成します。追加コンテナもKafkaとZooKeeperと同じdocker-compose.ymlに含めたかったのですが、うまく動かせませんでした。producerとconsumerのコンテナは通常のdocker run
コマンドで起動することにします。
kafka-node
Node.jsのKafkaクライアントはいくつかGitHubにあがっています。
今回のKafkaのバージョンは0.8.2.1です。Prozessは0.6のままので、0.8に対応しているkafka-nodを使うことにします。
プロジェクト
最初に作成したファイルのディレクトリ構造です。適当なディレクトリを作成します。
$ cd ~/docker_apps $ tree . ├── docker-compose.yml ├── kafka_consumer │ ├── Dockerfile │ ├── app.js │ └── package.json ├── kafka_docker │ ├── Dockerfile │ ├── LICENSE │ ├── README.md │ ├── broker-list.sh │ ├── docker-compose-single-broker.yml │ ├── docker-compose.yml │ ├── download-kafka.sh │ ├── start-kafka-shell.sh │ └── start-kafka.sh └── kafka_producer ├── Dockerfile ├── app.js └── package.json
|
kafka_dockerはgit clone
します。
$ cd ~/docker_apps $ git clone https://github.com/SOHU-Co/kafka-node.git
|
プログラム
docker-compose.ymlは前回と変わりません。ここにproducerとconsumerのコンテナも追加したいのですが、起動の順番が制御できず動作できませんでした。
docker-compose.yml
~/docker_apps/docker-compose.ymlzookeeper: image: wurstmeister/zookeeper ports: - "2181" kafka: build: ./kafka_docker ports: - "9092" links: - zookeeper:zk environment: KAFKA_ADVERTISED_HOST_NAME: 10.3.0.165 volumes: - /var/run/docker.sock:/var/run/docker.sock
|
Dockerfileとpackage.json
producerとconsumerのDockerfile、package.jsonは同じです。
package.jsonはnameとdescriptionを変更します。
~/docker_apps/kafka_producer/package.json{ "name": "kafka-node-producer-app", "description": "kafka-node-producer app", "version": "0.0.1", "private": true, "dependencies": { "kafka-node": "0.2.26" }, "scripts": {"start": "node app.js"} }
|
kafka_producer/app.js
High Levelのproducerのサンプルはhigh-level-producer.jsにあります。ZooKeeperのホストとポート番号は環境変数より取得します。
~/docker_apps/kafka_producer/app.js'use strict'; var kafka = require('kafka-node'), HighLevelProducer = kafka.HighLevelProducer, Client = kafka.Client, host = [process.env.ZK_PORT_2181_TCP_ADDR,':', process.env.ZK_PORT_2181_TCP_PORT], client = new Client(host.join('')), producer = new HighLevelProducer(client), count = 10, rets = 0;
producer.on('ready', function () { setInterval(send, 1000); });
producer.on('error', function (err) { console.log('error', err) });
function send() { var payloads = [ {topic: 'topic1', messages: ['hello','world']} ]; producer.send(payloads, function (err, data) { if (err) console.log(err); else console.log('send %d messages', ++rets); if (rets === count) process.exit(); }); }
|
kafka_consumer
High Levelのconsumerのサンプルもhigh-level-consumer.jsにあります。ZooKeeperの情報やオリジナルからtopic名を固定にするなど少し変更しています。
~/docker_apps/kafka_consumer/app.js'use strict'; var kafka = require('kafka-node'), HighLevelConsumer = kafka.HighLevelConsumer, Client = kafka.Client, host = [process.env.ZK_PORT_2181_TCP_ADDR,':', process.env.ZK_PORT_2181_TCP_PORT], client = new Client(host.join('')), topics = [ { topic: 'topic1' }], options = { autoCommit: true, fetchMaxWaitMs: 1000, fetchMaxBytes: 1024*1024 }, consumer = new HighLevelConsumer(client, topics, options);
consumer.on('message', function (message) { console.log(message); });
consumer.on('error', function (err) { console.log('error', err); });
|
Dockerコンテナの起動
Docker ComposeからKafkaとZooKeeperのコンテナを起動します。
$ cd ~/docker_apps $ docker-compose up
|
producerのDockerイメージをビルドしてコンテナを起動します。メッセージは10回送信します。--links
フラグを追加してプログラムから環境変数を通してZooKeeperのIPアドレスとポート番号を取得できるようにします。ZooKeeperの名前はDocker Composeが自動的に設定しているのでdocker-compose ps
コマンドから名前を確認しておきます。
$ cd ~/docker_apps/kafka_producer $ docker build -t kafka_producer . $ docker run --rm --link dockerapps_zookeeper_1:zk kafka_producer ... send 9 messages send 10 messages
|
consumerのDockerイメージをビルドしてコンテナを起動します。producerは毎回メッセージを2つ送信しているのでconsumerには20個のメッセージが届きます。
$ cd ~/docker_apps/kafka_consumer $ docker build -t kafka_consumer . $ docker run --rm --link dockerapps_zookeeper_1:zk kafka_consumer ... { topic: 'topic1', value: 'hello', offset: 18, partition: 0, key: <Buffer > } { topic: 'topic1', value: 'world', offset: 19, partition: 0, key: <Buffer > }
|