ようやくタイトルのコードを実行する準備ができました。SensorTagのデータをKafkaに送信してPySpark Streamingのウィンドウ集計します。JupyterをPythonのインタラクティブな実行環境に使います。
準備 これまでに用意したPythonスクリプトとKafka、Sparkのクラスタを使います。
Notebook JupyterのNotebookにインタラクティブにコードを実行して確認していきます。以下のパラグラフはそれぞれセルに相当します。WebブラウザからJupyterを開き右上のNew
ボタンからPython 3
を選択します。
http://<仮想マシンのパブリックIPアドレス>:8888
PYSPARK_SUBMIT_ARGS ScalaのJarファイルはバージョンの指定がかなり厳密です。Spark Streaming + Kafka Integration Guide
によるとSpark StreamingからKafkaに接続するためのJarファイルは2つあります。Kafkaのバージョンが0.8.2.1
以上のspark-streaming-kafka-0-8 と、0.10
以上のspark-streaming-kafka-0-10 です。今回利用しているKafkaのバージョンは0.10.2.1
ですがPythonをサポートしているspark-streaming-kafka-0-8
を使います。
パッケージ名はspark-streaming-kafka-<Kafkaのバージョン>
_<Scalaのバージョン>
:<Sparkのバージョン>
のようにそれぞれのバージョンを表しています。
import osos.environ['PYSPARK_SUBMIT_ARGS' ] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.1 pyspark-shell'
マジックコマンド kafka-python パッケージをインストールします。Jupyterのマジックコマンド を使います。
!pip install kafka-python
import Spark StreamingとKafkaに必要なパッケージをimportします。
import jsonimport pytzfrom datetime import datetimefrom pyspark import SparkContextfrom pyspark.streaming import StreamingContextfrom pyspark.streaming.kafka import KafkaUtilsfrom pyspark.sql import SparkSession, Rowfrom pyspark.sql.functions import *from pyspark.sql.types import *from kafka import KafkaProducer
Sparkのコンテキスト Spark 2.xになってからSparkのコンテキストがわかりにくくなっていますが、SparkSession.builderをエントリポイントにします。StreamingContextは1分のバッチ間隔で作成します。
spark = ( SparkSession .builder .getOrCreate() ) sc = spark.sparkContext ssc = StreamingContext(sc, 60 )
Kafka Producer Kafkaブローカーのリスト、入力と出力のトピック名を定義します。ウインドウ集計した結果はKafkaのトピックに出力するためproducerも作成します。
brokers = "<Kafka BrokerのIPアドレス>:9092" sourceTopic = "sensortag" sinkTopic = "sensortag-sink" producer = KafkaProducer(bootstrap_servers=brokers, value_serializer=lambda v: json.dumps(v).encode('utf-8' ))
ウィンドウ集計 今回のスクリプトのメインの処理をする関数です。KafkaのJSONを変換したRDDにStructTypeのスキーマを適用してDataFrameを作成します。DataFrameのウインドウ集計関数を使い2分ウインドウで周囲温度(ambient)と湿度(rh)の平均値を計算します。
またDataFrameは処理の途中でタイムゾーンを削除しているためウィンドウ集計の結果がわかりやすいようにAsia/Tokyo
タイムゾーンをつけています。
def windowAverage (rdd ): schema = StructType([ StructField('ambient' , DoubleType(), True ), StructField('bid' , StringType(), True ), StructField('humidity' , DoubleType(), True ), StructField('objecttemp' , DoubleType(), True ), StructField('rh' , DoubleType(), True ), StructField('time' , TimestampType(), True ), ]) streamingInputDF = spark.createDataFrame( rdd, schema=schema ) print('1分バッチのDataFrame' ) streamingInputDF.show(truncate=False ) averageDF = ( streamingInputDF .groupBy( streamingInputDF.bid, window("time" , "2 minute" )) .avg("ambient" ,"rh" ) ) sinkRDD = averageDF.rdd.map (lambda x: {'bid' : x[0 ], 'time' : pytz.utc.localize(x[1 ]['end' ]).astimezone(pytz.timezone('Asia/Tokyo' )).isoformat(), 'ambient' : x[2 ], 'rh' : x[3 ]}) if not sinkRDD.isEmpty(): print('2分ウィンドウの平均値' ) sinkList = sinkRDD.collect() print(sinkList) for sink in sinkList: producer.send(sinkTopic, sink)
Kafkaのストリーム作成 Kafka BrokerのIPアドレスとRaspberry Pi 3からSensorTagのJSON文字列を送信するトピックを指定します。JSON文字列を1行ずつデシリアライズしてpyspark.sql.Row を作成します。time
フィールドはUNIXタイムスタンプからPythonのdatetimeに変換しタイムゾーンを削除します。
kafkaStream = KafkaUtils.createDirectStream( ssc, [sourceTopic], {"metadata.broker.list" :brokers}) rowStream = ( kafkaStream .map (lambda line: json.loads(line[1 ])) .map (lambda x: Row( ambient=x['ambient' ], bid=x['bid' ], humidity=x['humidity' ], objecttemp=x['objecttemp' ], rh=x['rh' ], time=datetime.fromtimestamp(x['time' ]).replace(tzinfo=None ), ) ) ) rowStream.foreachRDD(windowAverage)
StreamingContextの開始 最後にStreamingContextを開始してプログラムが停止するまで待機します。
ssc.start() ssc.awaitTermination()
スクリプトの実行 Raspberry Pi 3からPart 2 で書いたPythonスクリプトを実行します。
出力結果 以下の様な出力が表示されます。DataFrameの出力ではタイムゾーンがないのに対し、ウィンドウ集計の結果にはタイムゾーンが付与されています。
1分バッチのDataFrame +--------+-----------------+-----------------+----------+---------------+---------------------+ |ambient |bid |humidity |objecttemp|rh |time | +--------+-----------------+-----------------+----------+---------------+---------------------+ |28.78125|B0:B4:48:BD:DA:03|28.72314453125 |22.96875 |75.714111328125|2017-08-01 23:44:03.0| |28.78125|B0:B4:48:BD:DA:03|28.72314453125 |22.90625 |75.714111328125|2017-08-01 23:44:13.0| |28.75 |B0:B4:48:BD:DA:03|28.72314453125 |22.875 |75.616455078125|2017-08-01 23:44:23.0| |28.75 |B0:B4:48:BD:DA:03|28.69293212890625|23.15625 |75.616455078125|2017-08-01 23:44:34.0| |28.75 |B0:B4:48:BD:DA:03|28.7030029296875 |23.03125 |75.616455078125|2017-08-01 23:44:44.0| |28.75 |B0:B4:48:BD:DA:03|28.69293212890625|23.125 |75.616455078125|2017-08-01 23:44:55.0| +--------+-----------------+-----------------+----------+---------------+---------------------+ 2分ウィンドウの平均値 [{'bid' : 'B0:B4:48:BD:DA:03' , 'time' : '2017-08-02T08:46:00+09:00' , 'ambient' : 28.760416666666668, 'rh' : 75.64900716145833}]