PySpark StreamingでSensorTagのデータをJupyterを動作環境にしてウィンドウ集計を試しました。ストリーム処理のフレームワークは他にもいくつかありますが次はKafka Streamsを使ってみます。Sparkと違いこちらはクラスタではなくライブラリです。現在のところ開発言語は公式にはJavaのみサポートしています。
Java環境
Ubuntu 16.04に構築したEclimをMavenでコードを書いていきます。
プロジェクト
プロジェクトのディレクトリに以下のファイルを作成します。完全なコードはこちらのリポジトリにあります。
$ tree |
App.java
いくつかのパートにわけてコードの説明をします。
定数
トピック名などはpom.xmlに定義した環境変数から取得します。WINDOWS_MINUTES
はウィンドウ集計をする間隔です。COMMIT_MINUTES
は後述するようにKafkaがキャッシュを自動的にコミットする間隔です。ここでは分で指定します。
public class App { |
シリアライゼーション
レコードのシリアライザとデシリアライザを作成します。Kafka Streamsアプリでは処理の中間結果をトピックに保存してフローを実装していきます。レコードをトピックから読むときのデシリアライザ、書くときのシリアライザの2つをまとめてSerDeを定義します。SerDeはトピックのキーと値の型ごとに必要です。
sensorSumSerde
SenroSum
はカスタムで作成した周囲温度 (ambient)とウィンドウ集計の状態を保持するクラスです。stringSerde
デフォルトのString用のSerDeです。今回メッセージのキーはすべてString
です。doubleSerde
デフォルトのdouble用のSerDeです。SensorTagの周囲温度 (ambient)はdouble
でウィンドウ集計します。
public static void main(String[] args) throws Exception { |
KStreamの作成
最初にKStreamBuilderのstream()
を呼びKStreamを作成します。トピックのキーは文字列、値はJsonNodeのSerDeを指定します。
final KStreamBuilder builder = new KStreamBuilder(); |
KGroupedStreamを作成
SensorTagのメッセージはRaspberry Pi 3からJSON文字列でKafkaのトピックに届きます。
{'bid': 'B0:B4:48:BE:5E:00', 'time': 1502152524, 'humidity': 27.26287841796875, 'objecttemp': 21.1875, 'ambient': 27.03125, 'rh': 75.311279296875} |
KStreamのレコードはキーと値を持つKeyValueオブジェクトです。例では周囲温度 (ambient)の平均値だけウィンドウ集計するためmap()
を呼びキーと周囲温度のペアだけ持つ新しいKStreamを作成します。
次にgroupByKey()
を呼びキーでグループ化してKGroupedStreamを作成します。レコードはキーは文字列、値は周囲温度のdouble
になっているのでそれぞれのSerDeを指定します。
final KGroupedStream<String, Double> sensors = |
KStramからKTableを作成
KGroupedStreamのaggregate()
を呼びKTableを作成します。KTableはキーごとに指定されたウィンドウ間隔でレコードの合計値とレコード数の状態を保持します。
aggregate()
の第1引数のInitializerではストリームの集約で使うアグリゲータの初期化を行います。ここでウィンドウ集計の状態を保持するSensorSum
の初期化を行います。第2引数でアグリゲータを実装します。現在のレコードのキーと値、1つ前のレコード処理で作成したSensorSum
が渡されます。データの到着ごとに合計値とレコード数を加算して新しいSensorSum
を返します。第3引数は2分ウィンドウのTimeWindowsを定義します。第4引数はSensorSum
のSerDe、第5引数は状態を保持するトピック名を渡します。
final KTable<Windowed<String>, SensorSum> sensorAgg = |
KTableからKStramを作成
KTableのmapValues()
で平均値を計算します。合計値をレコード数で除算した平均値はDouble
レコードの新しいKTableです。ここからtoStream()
を呼びKStreamを作成します。レコードはタイムスタンプ、キー、平均値のJSON文字列にフォーマットしてストリームに出力します。タイムスタンプは異なるシステム間でデータ交換がしやすいようにISO 8601にしています。最後に指定したトピックへレコードを保存して終了です。
final DateTimeFormatter fmt = |
Kafka Streamsの開始
KafkaStreamsを設定オブジェクトとビルダーから作成してKafka Streamsアプリを開始します。またSIGTERM
でKafka Streamを停止するようにシャットダウンフックに登録しておきます。
final StreamsConfig config = new StreamsConfig(getProperties()); |
Kafka Streamsの設定とタイムアウトについて
環境変数などからKafka Streamsの設定で使うProperties
を作成します。
private static Properties getProperties() { |
COMMIT_INTERVAL_MS_CONFIG
最初はStreamsConfig.COMMIT_INTERVAL_MS_CONFIGは変更していませんでした。レコードをトピック保存する前にKStreamのmap()でログを出力しています。2分ウィンドウ間隔の集計結果を最後に1回だけ出力をさせたかったのですが、4-5回不特定に重複する結果になりました。
{"time": "2017-08-08T10:34:00+09:00", "bid": "B0:B4:48:BE:5E:00", "ambient": 27.343750} |
以下の記事を参考にすると、これはKTableの変更履歴 (changelog stream)という特徴から期待される動作のようです。KTableにウィンドウ集計の最終結果という状態はなく、キャッシュに更新された値は一定の間隔でコミットされます。KStreamへtoStream()
したあとにtransform()
やprocess()
を使いレコードの重複を除去するコードを自分で実装する必要があるようです。
レコードの重複を完全に除去することはできませんがStreamsConfig.COMMIT_INTERVAL_MS_CONFIG
の値を大きくすることでキャッシュがコミットされる回数を減らすことができます。デフォルト値は30秒が指定されています。
- How to send final kafka-streams aggregation result of a time windowed KTable?
- Immutable Record with Kafka Stream
- Kafka KStreams - processing timeouts
- Kafka Streams for Stream processing A few words about how Kafka works.
- Memory management
その他のクラス
モデル (SensorSum.java)、シリアライザ (SensorSumSerializer.java)、デシリアライザ (SensorSumDeserializer)のクラスを用意します。シリアライザはserialize()
を実装してSensorSum
のプロパティをバイト配列に変換します。byteバッファに周囲温度合計値のDouble
の8バイトと、レコード数のInteger
の4バイト分を割り当て使います。
public byte[] serialize(String topic, SensorSum data) { |
実行
Exec Maven PluginからKafka Streamsを実行します。
$ mvn clean install exec:exec@json |
ウィンドウ間隔が2分、キャッシュのコミット間隔を3分に指定してみました。やはり何回か重複した出力はありますが重複した出力を減らすことができました。
{"time": "2017-08-08T11:32:00+09:00", "bid": "B0:B4:48:BE:5E:00", "ambient": 27.414773} |