0%

PythonとSensorTag, Kafka, Spark Streamingのストリーム処理 - Part 6: JupyterからPySpark Streamingのウィンドウ集計をする

 ようやくタイトルのコードを実行する準備ができました。SensorTagのデータをKafkaに送信してPySpark Streamingのウィンドウ集計します。JupyterをPythonのインタラクティブな実行環境に使います。

準備

 これまでに用意したPythonスクリプトとKafka、Sparkのクラスタを使います。

Notebook

 JupyterのNotebookにインタラクティブにコードを実行して確認していきます。以下のパラグラフはそれぞれセルに相当します。WebブラウザからJupyterを開き右上のNewボタンからPython 3を選択します。

  • http://<仮想マシンのパブリックIPアドレス>:8888

PYSPARK_SUBMIT_ARGS

 ScalaのJarファイルはバージョンの指定がかなり厳密です。Spark Streaming + Kafka Integration Guide によるとSpark StreamingからKafkaに接続するためのJarファイルは2つあります。Kafkaのバージョンが0.8.2.1以上のspark-streaming-kafka-0-8と、0.10以上のspark-streaming-kafka-0-10です。今回利用しているKafkaのバージョンは0.10.2.1ですがPythonをサポートしているspark-streaming-kafka-0-8を使います。

 パッケージ名はspark-streaming-kafka-<Kafkaのバージョン>_<Scalaのバージョン>:<Sparkのバージョン>のようにそれぞれのバージョンを表しています。

import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.1 pyspark-shell'

マジックコマンド

 kafka-pythonパッケージをインストールします。Jupyterのマジックコマンドを使います。

!pip install kafka-python

import

 Spark StreamingとKafkaに必要なパッケージをimportします。

import json
import pytz
from datetime import datetime
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import *
from pyspark.sql.types import *

from kafka import KafkaProducer

Sparkのコンテキスト

 Spark 2.xになってからSparkのコンテキストがわかりにくくなっていますが、SparkSession.builderをエントリポイントにします。StreamingContextは1分のバッチ間隔で作成します。

spark = (
SparkSession
.builder
.getOrCreate()
)

sc = spark.sparkContext
ssc = StreamingContext(sc, 60)

Kafka Producer

Kafkaブローカーのリスト、入力と出力のトピック名を定義します。ウインドウ集計した結果はKafkaのトピックに出力するためproducerも作成します。

brokers = "<Kafka BrokerのIPアドレス>:9092"
sourceTopic = "sensortag"
sinkTopic = "sensortag-sink"

producer = KafkaProducer(bootstrap_servers=brokers,
value_serializer=lambda v: json.dumps(v).encode('utf-8'))

ウィンドウ集計

 今回のスクリプトのメインの処理をする関数です。KafkaのJSONを変換したRDDにStructTypeのスキーマを適用してDataFrameを作成します。DataFrameのウインドウ集計関数を使い2分ウインドウで周囲温度(ambient)と湿度(rh)の平均値を計算します。

 またDataFrameは処理の途中でタイムゾーンを削除しているためウィンドウ集計の結果がわかりやすいようにAsia/Tokyoタイムゾーンをつけています。

def windowAverage(rdd):
schema = StructType([
StructField('ambient', DoubleType(), True),
StructField('bid', StringType(), True),
StructField('humidity', DoubleType(), True),
StructField('objecttemp', DoubleType(), True),
StructField('rh', DoubleType(), True),
StructField('time', TimestampType(), True),
])

streamingInputDF = spark.createDataFrame(
rdd, schema=schema
)

print('1分バッチのDataFrame')
streamingInputDF.show(truncate=False)

averageDF = (
streamingInputDF
.groupBy(
streamingInputDF.bid,
window("time", "2 minute"))
.avg("ambient","rh")
)

sinkRDD = averageDF.rdd.map(lambda x: {'bid': x[0],
'time': pytz.utc.localize(x[1]['end']).astimezone(pytz.timezone('Asia/Tokyo')).isoformat(),
'ambient': x[2],
'rh': x[3]})
if not sinkRDD.isEmpty():
print('2分ウィンドウの平均値')
sinkList = sinkRDD.collect()
print(sinkList)

for sink in sinkList:
producer.send(sinkTopic, sink)

Kafkaのストリーム作成

 Kafka BrokerのIPアドレスとRaspberry Pi 3からSensorTagのJSON文字列を送信するトピックを指定します。JSON文字列を1行ずつデシリアライズしてpyspark.sql.Rowを作成します。timeフィールドはUNIXタイムスタンプからPythonのdatetimeに変換しタイムゾーンを削除します。

kafkaStream = KafkaUtils.createDirectStream(
ssc, [sourceTopic], {"metadata.broker.list":brokers})

rowStream = (
kafkaStream
.map(lambda line: json.loads(line[1]))
.map(lambda x: Row(
ambient=x['ambient'],
bid=x['bid'],
humidity=x['humidity'],
objecttemp=x['objecttemp'],
rh=x['rh'],
time=datetime.fromtimestamp(x['time']).replace(tzinfo=None),
)
)
)

rowStream.foreachRDD(windowAverage)

StreamingContextの開始

 最後にStreamingContextを開始してプログラムが停止するまで待機します。

ssc.start()
ssc.awaitTermination()

スクリプトの実行

Raspberry Pi 3からPart 2で書いたPythonスクリプトを実行します。

出力結果

 以下の様な出力が表示されます。DataFrameの出力ではタイムゾーンがないのに対し、ウィンドウ集計の結果にはタイムゾーンが付与されています。

1分バッチのDataFrame
+--------+-----------------+-----------------+----------+---------------+---------------------+
|ambient |bid |humidity |objecttemp|rh |time |
+--------+-----------------+-----------------+----------+---------------+---------------------+
|28.78125|B0:B4:48:BD:DA:03|28.72314453125 |22.96875 |75.714111328125|2017-08-01 23:44:03.0|
|28.78125|B0:B4:48:BD:DA:03|28.72314453125 |22.90625 |75.714111328125|2017-08-01 23:44:13.0|
|28.75 |B0:B4:48:BD:DA:03|28.72314453125 |22.875 |75.616455078125|2017-08-01 23:44:23.0|
|28.75 |B0:B4:48:BD:DA:03|28.69293212890625|23.15625 |75.616455078125|2017-08-01 23:44:34.0|
|28.75 |B0:B4:48:BD:DA:03|28.7030029296875 |23.03125 |75.616455078125|2017-08-01 23:44:44.0|
|28.75 |B0:B4:48:BD:DA:03|28.69293212890625|23.125 |75.616455078125|2017-08-01 23:44:55.0|
+--------+-----------------+-----------------+----------+---------------+---------------------+

2分ウィンドウの平均値
[{'bid': 'B0:B4:48:BD:DA:03', 'time': '2017-08-02T08:46:00+09:00', 'ambient': 28.760416666666668, 'rh': 75.64900716145833}]