KafkaからTreasure DataにブリッジするDocker Compose

 td-agentコンテナとKafka Consumerコンテナを使いKafkaからTreasure DataへブリッジするDocker Composeサービスを起動します。別のポストではPySpark Streamingのウィンドウ集計した結果をKafkaのトピックに出力するコードを書きました。このストリーム処理はデータパイプラインの前処理やエンリッチメントに相当します。後続にビッグデータのバッチ処理を想定してTreasure Dataに保存します。

Docker Compose

 最初に今回作成するプロジェクトのディレクトリ構成です。

$ tree -a
.
├── docker-compose.yml
├── .env
├── .gitignore
├── kafka-bridge
│   ├── Dockerfile
│   ├── fluentd-consumer.properties
│   └── log4j.properties
└── td-agent2
├── Dockerfile
└── td-agent.conf
2 directories, 9 files

docker-compose.yml

 td-agentとKafka ConsumerサービスはそれぞれDockefileを書いてビルドします。Kafkaはlandoop/fast-data-devを利用します。Confluent Open Sourceを同梱しているためKafkaとZooKeeperも起動します。

docker-compose.yml
version: '2'
services:
kafka-stack:
image: landoop/fast-data-dev
environment:
- FORWARDLOGS=0
- RUNTESTS=0
- ADV_HOST=<仮想マシンのパブリックIPアドレス>
ports:
- 3030:3030
- 9092:9092
- 2181:2181
- 8081:8081
td-agent2:
build: ./td-agent2
env_file:
- ./.env
ports:
- 24224:24224
kafka-bridge:
build: ./kafka-bridge
depends_on:
- td-agent

.env

 Treasure Dataの接続情報は環境変数ファイルの.envに記述しDocker Composeから読み込みます。

TD_API_KEY=<YOUR API KEY>
TD_ENDPOINT=<TD ENDPOINT>

td-agent2

 td-agentのDockerイメージを作成します。

Dockerfile

 Overview of Server-Side Agent (td-agent)のインストール手順に従います。install-ubuntu-xenial-td-agent2.shの中ではsudoも必要です。

FROM ubuntu:xenial
RUN apt-get update && apt-get install sudo curl -y
RUN curl -L https://toolbelt.treasuredata.com/sh/install-ubuntu-xenial-td-agent2.sh | sh
RUN apt-get clean && rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/*
ADD td-agent.conf /etc/td-agent/
EXPOSE 24224
CMD ["/usr/sbin/td-agent"]

td-agent.conf

 td-agent.confは環境変数を参照することができます。Treasure Dataへの接続情報を.envファイルから取得します。

<match td.*.*>
@type tdlog
endpoint "#{ENV['TD_ENDPOINT']}"
apikey "#{ENV['TD_API_KEY']}"
auto_create_table
buffer_type file
buffer_path /var/log/td-agent/buffer/td
use_ssl true
num_threads 8
</match>
<source>
@type forward
</source>

kafka-fluentd-consumer

 KafkaからTreasure Dataへのブリッジにはkafka-fluentd-consumerのJarを利用します。

Dockerfile

コンパイル済のkafka-fluentd-consumer-0.3.1-all.jarをダウンロードします。

FROM java:8-jre
ARG KAFKA_FLUENTD_CONSUMER_VERSION=0.3.1
WORKDIR /app
RUN wget -q -O kafka-fluentd-consumer-all.jar https://github.com/treasure-data/kafka-fluentd-consumer/releases/download/v$KAFKA_FLUENTD_CONSUMER_VERSION/kafka-fluentd-consumer-$KAFKA_FLUENTD_CONSUMER_VERSION-all.jar
ADD log4j.properties .
ADD fluentd-consumer.properties .
CMD ["java", "-Dlog4j.configuration=file:///app/log4j.properties", "-jar", "kafka-fluentd-consumer-all.jar", "fluentd-consumer.properties"]

fluentd-consumer.properties

 デフォルトの設定から以下を変更します。fluentd.connectzookeeper.connectはdocker-compose.ymlを使う場合はそれぞれサービス名を指定します。

  • fluentd.connect=:24224
  • fluentd.tag.prefix=td.<データベース名>.
  • fluentd.consumer.topics=<トピック名>
  • zookeeper.connect=:2181
  • group.id=<コンシューマグループ名>
# Fluentd instance destinations.
fluentd.connect=td-agent2:24224
# Dynamic event tag with topic name.
fluentd.tag.prefix=td.sensortag_dev.
# Consumed topics.
fluentd.consumer.topics=sensortag-sink
# The number of threads per consumer streams
fluentd.consumer.threads=1
# The path for backup un-flushed events during shutdown.
fluentd.consumer.backup.dir=/tmp/fluentd-consumer-backup/
# Kafka Consumer related parameters
zookeeper.connect=kafka-stack:2181
group.id=my-sensortag-sink-group
zookeeper.session.timeout.ms=400
zookeeper.sync.time.ms=200
auto.commit.interval.ms=1000

log4j.properties

 log4j.propertiesはデフォルトのまま使います。

# log4j logging configuration.
# This is based on Pinterest's secor
# root logger.
log4j.rootLogger=DEBUG, ROLLINGFILE
log4j.appender.ROLLINGFILE = org.apache.log4j.RollingFileAppender
log4j.appender.ROLLINGFILE.Threshold=INFO
log4j.appender.ROLLINGFILE.File=/tmp/fluentd-consumer.log
# keep log files up to 1G
log4j.appender.ROLLINGFILE.MaxFileSize=20MB
log4j.appender.ROLLINGFILE.MaxBackupIndex=50
log4j.appender.ROLLINGFILE.layout=org.apache.log4j.PatternLayout
log4j.appender.ROLLINGFILE.layout.ConversionPattern=%d{ISO8601} [%t] (%C:%L) %-5p %m%n

動作確認

 Docker Composeのサービスを起動します。

$ docker-compose up -d

 td-agentのバージョンを確認します。

$ docker-compose exec td-agent2 td-agent --version
td-agent 0.12.35

 Spark Streamingを使ったウィンドウ集計のサンプルのようにfluentd.consumer.topicsに指定したtopicへJSONフォーマットでデータを送信します。

 テストとしてkafka-console-producerから直接JSONを送信してみます。

$ docker-compose exec kafka-stack kafka-console-producer \
--broker-list localhost:9092 \
--topic sensortag-sink

 コマンドを実行後の待機状態でJSON文字列を入力します。

{"bid": "B0:B4:48:BD:DA:03", "time": 1501654353, "humidity": 27.152099609375, "objecttemp": 21.6875, "ambient": 27.09375, "rh": 78.4423828125}

 td-agentはファイルバッファを作成してデフォルトでは5分間隔でTreasure Dataへデータがアップロードします。

$ docker-compose exec td-agent2 ls /var/log/td-agent/buffer
td.sensortag_dev.sensortag_sink.b555bf24951c65554.log

treasuredata.png