Update 2014-10-09
: IronMQの非同期処理をNewRelicで監視 - Part7: mackerel-agentをFabricでインストール
IronMQを使いワーカーへのタスクのqueueを分散環境から行いたいのが目的です。シリーズのタイトルにCeleryとしていますが今回は使いません。CeleryはPythonのプログラムからdelayメソッドでタスクの非同期実行をしますが、分散環境にPythonの実行環境がなかったり、他システムとの連携を考えると、IronMQにREST API
でメッセージをqueueできると便利です。
Celeryのメッセージフォーマット
Celeryのメッセージフォーマットの仕様によるとJSON書式は以下です。
{"id": "4cc7438e-afd4-4f8f-a2f3-f46567e7ca77", |
このメッセージをqueueするとタスクを実行できそうなのですが、IronMQをブローカーにしたときの使い方がよくわかりませんでした。
そのうち理解できてきたら、Celeryも試してみます。
今回はRackspaceのDeveloper Blog
のUsing IronMQ for Delayed Processing and Increasing Scaleというポストを参考に、IronMQとPythonのメインループだけで実装してみようと思います。
IronMQにはREST API
があるので、curlでもメッセージが送れます。キューをPUTする場合は以下のようになります。
OAuthのTOKEN
とPROJECT_ID
をIronMQの画面にログインして取得しておきます。
$ curl -H "Content-Type: application/json" -H "Authorization: OAuth ***" -d '{"messages":[{"body":"hello world!"}]}' "https://mq-aws-us-east-1.iron.io/1/projects/***/queues/my_queue/messages" |
このqueueをワーカーからpollingしてpull queue
します。push queue
のワーカーを作る場合は、IronMQからHTTP/HTTPSでコールバック用のエンドポイントを公開しないといけないので、社内用途では使えません。
テスト用に簡単なPythonプログラムを書きました。main loop
でpollingしてqueueにメッセージがPUTされたらGETして処理をするようにします。
プロジェクト
IronMQのライブラリをpipインストールします。
$ sudo pip install iron_mq |
作成したPythonスクリプトは一つです。
$ tree ~/python_apps/spike_nocelery/ |
Fabricのインストール
Fabricをpipインストールします。サンプルプログラムはqueueからメッセージを取得できたら、リモートサーバーにSSH接続をして
任意のコマンドを実行するシナリオです。
直接Paramikoを使ってSSH接続をしたり、ファイル転送をしても良いですが、Fabricにはリモートサーバー管理の自動化用の関数が揃っています。PythonのプログラムからAPIを実行できると、サーバー管理がプログラマブルになってとても便利です。
$ sudo pip install fabric |
Pythonのメインループ
SSH接続するサーバーは、CentOSなので簡単に/etc/redhat-release
をcatしてみます。
メインループでqueueにメッセージがあるかpollingして、取得できたら標準出力します。
実際のプロジェクトではSSH接続して実行するコマンドの引数を、分散環境からメッセージで渡します。
処理が終わったら忘れずにqueueからメッセージを削除します。
# -*- coding: utf-8 -*- |
起動テスト
テスト用にフォアグラウンドでプログラムを実行します。
$ cd ~/python_apps/spike_nocelery/ |
curlでメッセージをqueueにPUTします。今回はワーカーをキックするだけなので、メッセージの内容に意味はありません。
$ curl -H "Content-Type: application/json" -H "Authorization: OAuth ***" -d '{"messages":[{"body":"hello world!"}]}' "https://mq-aws-us-east-1.iron.io/1/projects/***/queues/my_queue/messages" |
ワーカーの標準出力に、リモートホストにSSH接続してcatを実行した結果が表示されます。
{u'messages': [{u'body': u'hello world!', u'reserved_count': 1, u'push_status': {}, u'id': u'6051713210758384919', u'timeout': 60}]} |
runitでデモナイズ
runitの起動スクリプトを用意します。
|
runitに付属しているsvlogdコマンドを使ったログの設定をします。
|
runitを有効にするため、一度コンテナをrestart します。
$ docker restart a05a1d |
サービスのログをtailします。
$ sudo tail -f /var/log/spike/current |
メッセージをqueueにPUTします。
$ curl -H "Content-Type: application/json" -H "Authorization: OAuth ***" -d '{"messages":[{"body":"hello world!"}]}' "https://mq-aws-us-east-1.iron.io/1/projects/***/queues/my_queue/messages" |
ログにも同様にcatの結果が出力されます。
2014-08-26_03:42:23.60780 {u'messages': [{u'body': u'hello world!', u'reserved_count': 1, u'push_status': {}, u'id': u'6051714293089978639', u'timeout': 60}]} |