PythonとSensorTag, Kafka, Spark Streamingのストリーム処理 - Part 2: KafkaとLandoop

 前回はRaspberry Pi 3上でSensorTagから環境データを取得するPythonスクリプトを書きました。この環境データはKafkaを経由してストリーム処理する予定です。次にRaspberry Pi 3からメッセージを受け取るKafkaクラスタをクラウド上に構築していきます。

 KafkaクラスタはLandoopが開発しているfast-data-devのDockerイメージを使います。  

LandoopでKafkaクラスタを構築する

 LandoopはKafkaのソリューションを提供する会社です。特にKafka Connectを中心にしたストリームのデータパイプラインの開発に強い印象です。

Landoop 

 
 LandoopではKafka Topics UIKafka Connect UIKafka Topics UIなどのWebツールを開発しています。demoサイトからどのようなツールなのかを確認できます。このdemoサイトと同じ環境はfast-data-connect-clusterを使うと簡単に構築することができます。  

landoop-top.png

fast-data-connect-cluster

 fast-data-connect-clusterのリポジトリをcloneします。

$ git clone https://github.com/Landoop/fast-data-connect-cluster

 
 リポジトリに含まれるdocker-compose.ymlを少し変更してクラウド上で利用します。Debianの仮想マシンを用意してDockerとDocker Composeをインストールしました。利用するバージョンは以下です。

$ docker --version
Docker version 17.06.0-ce, build 02c1d87
$ docker-compose --version
docker-compose version 1.14.0, build c7bdf9e

 このdocker-compose.ymlにはLandoopのWebツールに加えてConfluent Open Sourceに含まれるKafkaSchema RegistryKafka REST ProxyKafka ConnectApache ZooKeeperが含まれます。一通りKafkaを使った開発に必要なコンテナが揃うのでとても便利です。

 現在Confluent Open SourceとKafkaのバージョンは以下になっています。  

  • Confluent Open Source: v3.2.2
  • Kafka v0.10.2.1

 docker-compose.ymlの主な変更点です。

  • ADV_HOST: Dockerが起動している仮想マシンのパブリックIPアドレスを指定します。
  • ports: リモートから接続するためにZooKeeperやKafkaクラスタのポートを公開します。
docker-compose.yml
version: '2'
services:
kafka-stack:
image: landoop/fast-data-dev
environment:
- FORWARDLOGS=0
- RUNTESTS=0
- ADV_HOST=210.xxx.xxx.xxx
ports:
- 3030:3030
- 9092:9092
- 2181:2181
- 8081:8081
connect-node-1:
image: landoop/fast-data-dev-connect-cluster
depends_on:
- kafka-stack
environment:
- ID=01
- BS=kafka-stack:9092
- ZK=kafka-stack:2181
- SR=http://kafka-stack:8081
connect-node-2:
image: landoop/fast-data-dev-connect-cluster
depends_on:
- kafka-stack
environment:
- ID=01
- BS=kafka-stack:9092
- ZK=kafka-stack:2181
- SR=http://kafka-stack:8081
connect-node-3:
image: landoop/fast-data-dev-connect-cluster
depends_on:
- kafka-stack
environment:
- ID=01
- BS=kafka-stack:9092
- ZK=kafka-stack:2181
- SR=http://kafka-stack:8081
connect-ui:
image: landoop/kafka-connect-ui:latest
depends_on:
- connect-node-1
environment:
- CONNECT_URL=http://connect-node-1:8083
ports:
- 8000:8000

 docker-compose.ymlのディレクトリに移動してコンテナを起動します。

$ cd fast-data-connect-cluster
$ docker-compose up -d

動作確認

 Kafkaクライアントから簡単に動作確認をします。kafka-topicsコマンドでトピックを作成します。

$ docker-compose exec kafka-stack kafka-topics \
--create --topic test \
--zookeeper localhost:2181 \
--partitions 1 --replication-factor 1

 kafka-console-consumerコマンドを実行します。メッセージがトピックに届くとこのシェルに表示されます。

$ docker-compose exec kafka-stack \
kafka-console-consumer \
--bootstrap-server localhost:9092 \
--from-beginning \
--topic test

 別のシェルからkafka-console-producerコマンドを実行します。

$ docker-compose exec kafka-stack \
kafka-console-producer \
--broker-list localhost:9092 \
--topic test

 コマンドは待機状態になるので適当なメッセージをシェルに入力します。同じメッセージがkafka-console-consumerのシェルにも表示されます。

hello world

 Kafka Topics UIのページではトピックの一覧とメッセージの中身を確認することができます。
 
landoop-topic.png

SensorTagの環境データをKafkaに送信する

kafka-python

 PythonのKafkaクライアントにはkafka-pythonconfluent-kafka-pythonがあります。APIが微妙に違うので間違えないようにします。今回はkafka-pythonをインストールして使います。

$ sudo pip install kafka-python

 前回書いたSensorTagのデータを取得するPythonのコードにKafkaへメッセージを送信するproducerを追加します。

json_producer_sensortag_kafka.py
from bluepy.sensortag import SensorTag
import sys
import time
import json
import calendar
from kafka import KafkaProducer
def main():
argvs = sys.argv
argc = len(argvs)
if (argc != 2):
print 'Usage: # python {0} bd_address'.format(argvs[0])
quit()
bid = argvs[1]
print('Connecting to ' + bid)
timeout = 10.0
tag = SensorTag(bid)
tag.IRtemperature.enable()
tag.humidity.enable()
time.sleep(1.0)
producer = KafkaProducer(bootstrap_servers='210.xxx.xxx.xxx:9092')
while True:
tAmb, tObj = tag.IRtemperature.read()
humidity, rh = tag.humidity.read()
value = {
"bid" : bid,
"time" : calendar.timegm(time.gmtime()),
"ambient": tAmb,
"objecttemp": tObj,
"humidity": humidity,
"rh": rh
}
msg = json.dumps(value).encode("utf-8")
producer.send('sensortag', msg)
producer.flush()
print(msg)
time.sleep(timeout)
tag.disconnect()
del tag
if __name__ == '__main__':
main()

 SensorTagのBDアドレスを引数にしてPythonスクリプトを実行します。

$ python json_producer_sensortag_kafka.py B0:B4:48:BE:5E:00

 10秒間隔で取得した環境データをJSON文字列に整形してKafkaクラスタに送信します。

Connecting to B0:B4:48:BE:5E:00
{"bid": "B0:B4:48:BE:5E:00", "time": 1501464133, "humidity": 26.8096923828125, "objecttemp": 22.0625, "ambient": 26.59375, "rh": 68.829345703125}
{"bid": "B0:B4:48:BE:5E:00", "time": 1501464143, "humidity": 26.86004638671875, "objecttemp": 22.40625, "ambient": 26.65625, "rh": 68.927001953125}
{"bid": "B0:B4:48:BE:5E:00", "time": 1501464153, "humidity": 26.92047119140625, "objecttemp": 22.71875, "ambient": 26.71875, "rh": 68.95751953125}

 Kafka Topics UIの画面にもSensorTagの環境データが表示されました。

landoop-sensortag.png