タケユー・ウェブ日報

Ruby on Rails や Flutter といったWeb・モバイルアプリ技術を武器にお客様のビジネス立ち上げを支援する、タケユー・ウェブ株式会社の技術ブログです。

MovableTypeでジョブキューによる非同期処理を行いたい

たとえばメルマガ配信は大変時間のかかる処理。 ただでさえSMTPが遅い上に大量にある

  • コールバック内で送信までやるべきでない
  • 中断した場合再開できる必要がある

そこでジョブキュー。MovableTypeは標準でこの仕組みを備えている。

ジョブキュー

AmazonSQSとかResqueとかみたいなのぽい

MTにもジョブキューの仕組みはある

公開キューのコードを読んでどうやっているのか調べてみる

MT::PublishOption::ASYNC() 検索

lib/MT/WeblogPublisher.pm

    # From here on, we're committed to publishing this file via TheSchwartz
    return 1 if $throttle->{type} != MT::PublishOption::ASYNC();

    return 1 if $args{force};    # if async, but force is used, publish

    require MT::TheSchwartz;
    require TheSchwartz::Job;
    my $job = TheSchwartz::Job->new();
    $job->funcname('MT::Worker::Publish');
    $job->uniqkey( $fi->id );

    my $priority = 0;

    my $at = $fi->archive_type || '';

    # Default priority assignment....
    if ( ( $at eq 'Individual' ) || ( $at eq 'Page' ) ) {
        require MT::TemplateMap;
        my $map = MT::TemplateMap->load( $fi->templatemap_id );

        # Individual/Page archive pages that are the 'permalink' pages
        # should have highest build priority.
        if ( $map && $map->is_preferred ) {
            $priority = 10;
        }
        else {
            $priority = 5;
        }
    }
    elsif ( $at eq 'index' ) {

        # Index pages are second in priority, if they are named 'index'
        # or 'default'
        if ( $fi->file_path =~ m!index|default|atom|feed!i ) {
            $priority = 8;
        }
        else {
            $priority = 9;
        }
    }
    elsif ( $at =~ m/Category|Author/ ) {
        $priority = 1;
    }
    elsif ( $at =~ m/Yearly/ ) {
        $priority = 1;
    }
    elsif ( $at =~ m/Monthly/ ) {
        $priority = 2;
    }
    elsif ( $at =~ m/Weekly/ ) {
        $priority = 3;
    }
    elsif ( $at =~ m/Daily/ ) {
        $priority = 4;
    }

    $job->priority($priority);
    $job->coalesce( ( $fi->blog_id || 0 ) . ':'
            . $$ . ':'
            . $priority . ':'
            . ( time - ( time % 10 ) ) );

    MT::TheSchwartz->insert($job);

TheSchwartz

perlのジョブキューシステムTheSchwartzを使っていることがわかった。

https://movabletype.org/documentation/developer/schwartz-workers.html

TheSchwartz::Job

ジョブを処理するワーカーとか、処理に必要なパラメータとかをセット

MT::TheSchwartz

lib/MT/TheSchwartz.pm

TheSchwartzクライアント(TheSchwartz)のサブクラス。データベース設定やワーカークラスの情報をMT設定から取り出してよしなにしてくれる。

A subclass of C, a job queue system. The MT subclass is responsible for configuring TheSchwartz to work with the MT database configuration and supplies TheSchwartz worker classes from the MT registry.

今回もTheSchwartzを使えば、「配信ジョブ」を処理できそう

TheSchwartz調査

Workerの書き方

MailMug::Worker

https://github.com/takeyuweb/mt-plugin-MailMug/blob/trunk/plugins/MailMug/lib/MailMug/Worker.pm

中断した場合再開できる必要がある

ジョブがエラーや強制終了で止まっても、「終了」を指示するまで、一定時間後に再実行されるようにする

sub retry_delay { 30 }
sub max_retries { 10 }

のようにWorkerメソッドをオーバーライドすることでfailedの時に再実行できるっぽい。

再実行するべきでないエラーの場合、

長時間かかるジョブ

TheSchwartz で時間が掛かる job を実行するときは grab_for に注意 http://d.hatena.ne.jp/sfujiwara/20080731/1217484591

sub grab_for { 60 * 10 }

参考記事にもあるように、基本的にはあまり長くならないように設計する。

また、途中エラー等でfailedできなかった場合、再実行させる必要があるので、その点でもあまり長い値を指定するのは好ましくないと思う。

並列化

送信先をいくつかのグループにわけて複数のジョブとして登録

prefork等でワーカーを並列実行できるようにするか、ワーカーを増やせば並列実行できそう

あとはググれ!