先週末から急激にトラフィックが増えて既存の集計バッチ、サーバでは処理できなくなってしまっていたので今週頭から急遽EMRに移すことを決意しました。
構成
以前の構成はこんな感じ。
+----[S3] [ADサーバ]N-----N[LogProxy]N--+ +----[集計サーバ]
ADサーバからLogProxyに対してラウンドロビンでログを転送してLogProxyから集計サーバ(単体サーバ)に集めるというもの。バックアップ先としてS3を利用しています。ログの転送にはもちろんfluentdを使っています。fluentdが無かったらと思うとゾッとします。
ログの流量が多くなってしまい遅延するようになったり、Log Proxy側で集計サーバに流している分のログが溢れてしまうということも見受けられました。仮にログが正常に集計サーバに集まったとしても集計処理が長くなってしまい、時間別の集計も日々延びており、1日分の再集計に関してはアホかってくらい時間が掛かるようになってしまっていました。
そういうこともあってEMRに移ることにしました。
全体的な構成自体は変わっておらず、LogProxyからS3へアップしたログをEMRに食わすというもの。ただ、全てS3に移すと気軽にログを見たい時に不便なので、EMRで処理しなくても大丈夫な量のログに関しては今まで集計サーバで、多いものだけEMRで処理するようにしています。また、EMRで処理させるログに関しても、傾向を見たり統計データを算出するのにサンプリング後(1/100とか)を集計サーバに流すようにしています。
S3へはYYYY/MM/DD/HH/hoge.${ADサーバのHOST}.${LogProxyサーバのHOST.PORT}.logのように時間まで階層化されてその中に各種ログが入っています。
hive or streaming?
EMRでjava,hive,streamingなどどれを使おうか迷った挙句、調査をする時間もなかったので以前少し触ったことがあるstreamingにしました。処理が簡単なものはhiveを使った方がmapper/reducerのスクリプトを書かなくてくてメンテ性も上がるので様子を見ながらstreamingで実装したスクリプトをhiveで再実装しようかと思っています。
ジョブの登録と取り込み
cronなどから定期的にジョブを登録するための仕組みと、処理が終わったジョブを既存DB(mysql)へ取り込むための仕組みが必要になります。
ジョブの登録
ジョブの登録はelastic-mapreduce-rubyクライアントを使えば出来るんですが、設定ファイルからデフォルト値のロードとか色々既存アプリと共通化したかったのでperlから呼ぶことに。。。こんな感じで登録出来る。
perl job_register.pl --func='hoge_func' --from='2013-07-11 00:00:00' --to='2013-07-12 00:00:00' |
するとこんな感じの引数でelastic-mapreduce-rubyを呼び出します。
elastic-mapreduce --create --stream \ --enable-debugging \ --mapper s3n://bucket/path/to/mapper \ --reducer s3n://bucket/path/to/reducer \ --log-uri s3n://bucket/path/to/debug/YYYY/MM/DD/hoge_func_HHMMSS --output s3n://bucket/path/to/output/YYYY/MM/DD/hoge_func_HHMMSS --master-instance-type m1.small \ --slave-instance-type m1.small \ --num-instances 3 \ --input s3n://bucket/path/to/logs/2013/07/11/*/impression.*.log.gz,s3n://bucket/path/to/logs/2013/07/11/*/click.*.log.gz |
instance-type、num-instancesなど各種パラメータに関してはデフォルト値は設定ファイルに、必要に応じてjob_register.plの引数で上書きすることが出来るようにしています。これで、時間帯毎にインスタンス数を変えたりするのも楽です。
取り込み
先ほどのスクリプトから登録されたジョブIDはジョブキューに登録され、監視用workerでステータスを監視して、正常終了していたら各処理(hoge_func等)個別の取り込みバッチを実行するというものを実装しています。ステータスが処理中の場合は一定時間後に再度チェック。異常終了、キャンセルの場合はジョブキューから削除。という感じです。
処理個別の取り込みバッチに関してはworkerからしか起動できないと開発時などめんどくさいのでスクリプト単体でも実行できるようにしています。
まとめ
1日分の集計が20時間弱かかっていたのが1時間未満で終わるようになったりとスケールするって素晴らしい!こんなことならさっさとEMRに移しておけば良かった。