Apache FlinkもKafkaと同様にScalaで書かれています。Scalaに特徴的な後方互換性を重視せずアグレッシブな開発をしています。そのためネットで検索できる情報もどんどん古くなりAPIもDeprecatedやPublicEvolvingになりがちで初学者には少し入りづらい状況です。なかなか学習用の良い記事が見つかりませんでしたが、センサーデータのウィンドウ集計の書き方はTHE RISE OF BIG DATA STREAMINGを参考にさせていただきました。
val env = ExecutionEnvironment.getExecutionEnvironment
val text = env.fromElements("To be, or not to be,--that is the question:--", "Whether 'tis nobler in the mind to suffer", "The slings and arrows of outrageous fortune", "Or to take arms against a sea of troubles,")
val env = ExecutionEnvironment.getExecutionEnvironment
val text = env.fromElements("To be, or not to be,--that is the question:--", "Whether 'tis nobler in the mind to suffer", "The slings and arrows of outrageous fortune", "Or to take arms against a sea of troubles,")
objectApp{ val fmt = DateTimeFormatter.ISO_OFFSET_DATE_TIME
val conf = ConfigFactory.load() val bootstrapServers = conf.getString("app.bootstrap-servers") val groupId = conf.getString("app.group-id") val sourceTopic = conf.getString("app.source-topic") val sinkTopic = conf.getString("app.sink-topic")
defmain(args: Array[String]): Unit = { val props = newProperties() props.setProperty("bootstrap.servers", bootstrapServers) props.setProperty("group.id", groupId)
val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val source = newFlinkKafkaConsumer010[ObjectNode]( sourceTopic, newJSONDeserializationSchema(), props)
val events = env.addSource(source).name("events")
val timestamped = events.assignTimestampsAndWatermarks( newBoundedOutOfOrdernessTimestampExtractor[ObjectNode](Time.seconds(10)) { overridedefextractTimestamp(element: ObjectNode): Long = element.get("time").asLong * 1000 })
timestamped .map { v => val key = v.get("bid").asText val ambient = v.get("ambient").asDouble (key, ambient) } .keyBy(v => v._1) .timeWindow(Time.seconds(60)) .aggregate(newAggregate(), ( key: String, window: TimeWindow, input: Iterable[Accumulator], out: Collector[Accumulator] ) => { var in = input.iterator.next() out.collect(Accumulator(window.getEnd, key, in.sum/in.count, in.count)) } ) .map { v => val zdt = newDate(v.time).toInstant().atZone(ZoneId.systemDefault()) val time = fmt.format(zdt) val json = Map("time" -> time, "bid" -> v.bid, "ambient" -> v.sum) val retval = JSONObject(json).toString() println(retval) retval } .addSink(newFlinkKafkaProducer010[String]( bootstrapServers, sinkTopic, newSimpleStringSchema) ).name("kafka") env.execute() } }
main()の処理を順番にみていきます。最初にKafkaに接続する設定を行います。接続するKafkaが0.10のためFlinkKafkaConsumer010を使います。Raspberry Pi 3から届くセンサーデータは以下のようなJSONフォーマットです。
.map { v => val zdt = newDate(v.time).toInstant().atZone(ZoneId.systemDefault()) val time = fmt.format(zdt) val json = Map("time" -> time, "bid" -> v.bid, "ambient" -> v.sum) val retval = JSONObject(json).toString() println(retval) retval }