はちゅにっき

こっちのブログはまったり更新

Perl でプロセスの待ち合わせをする

たとえば以下のようなプログラムを Perl で書いて

#!/usr/bin/env perl
use strict;
use warnings;
use feature qw/ say /;
use Parallel::ForkManager;
use Time::HiRes ( );
 
my $process = 10;
my $pm = Parallel::ForkManager->new($process);
 
for (1..$process) {
    if ($pm->start) {
        Time::HiRes::sleep(0.2);  # fork に時間がかかることを想定
        next;
    }

    # 子プロセスの処理開始時刻を表示
    say "[$$] ", Time::HiRes::time( );
 
    $pm->finish;
}
 
$pm->wait_all_children;

これを実行してみると

[3107] 1381815422.89502
[3108] 1381815423.09763
[3109] 1381815423.30142
[3110] 1381815423.50445
[3111] 1381815423.70717
[3112] 1381815423.91102
[3113] 1381815424.1142
[3114] 1381815424.31746
[3115] 1381815424.52011
[3116] 1381815424.72334

当然、親プロセスが 0.2 秒休んでいるため、子プロセスの処理開始時刻は約0.2秒間隔のバラバラに。
今回は sleep しているので当然ですが、fork って結構時間がかかる処理なので特別 sleep を入れなくても、子プロセスの処理開始時刻はバラバラになりがち。
普通はこれでも全然かまわないんですが、fork が完了するのを待って子プロセスを一斉に開始させたい!とか、fork 完了後に親プロセスが何らかの処理を行ない、その後一斉に子プロセスを開始させたい!とか、特定の条件が揃うまで子プロセスの実行を停止させておきたい!なんてときには、プロセスの "待ち合わせ" が必要になります。
滅多にそんなことは発生しないんだろうけれど、なんか今回はそういう要件があってだな。。。
というわけで、今回は手っ取り早くセマフォを使ってプロセスの待ち合わせをしてみることに。


Perlセマフォというと、semget / semctl / semop といった低レイヤーのインタフェースもあるようですが、めんどくさかったので CPAN に相談して IPC::Semaphore を使うことにしました。

IPC-Semaphore
http://search.cpan.org/~mhx/IPC-SysV/lib/IPC/Semaphore.pm

と言うわけでセマフォを導入した版のプログラムがこちら。

#!/usr/bin/env perl
use strict;
use warnings;
use feature qw/ say /;
use IPC::Semaphore;
use IPC::SysV qw/ IPC_PRIVATE IPC_CREAT S_IWUSR SEM_UNDO /;
use Parallel::ForkManager;
use Time::HiRes ( );
 
my $process = 10;
my $pm  = Parallel::ForkManager->new($process);

# 大きさ 1 のセマフォをつくる
my $sem = IPC::Semaphore->new(IPC_PRIVATE, 1, IPC_CREAT | S_IWUSR);

# 0番目のセマフォに 0 をセット (誰もロックを取得できない)
$sem->setval(0, 0);
 
for (1..$process) {
    if ($pm->start) {
        Time::HiRes::sleep(0.2);  # fork に時間がかかることを想定
        next;
    }

    # 0番目からロックを獲得できるまで WAIT する
    $sem->op(0, -1, SEM_UNDO);

    # 子プロセスの処理開始時刻を表示
    say "[$$] ", Time::HiRes::time( );
 
    # 子プロセス死亡でセマフォはアンロック
    $pm->finish;
}
 
# 0番目のセマフォにプロセス分の値をセット
#   => すべての子プロセスがロック取得可能に
$sem->setval(0, $process);
$pm->wait_all_children;

# セマフォ削除
$sem->remove;

これを実行してみると

[3218] 1381815496.42006
[3216] 1381815496.42017
[3217] 1381815496.42216
[3220] 1381815496.42278
[3219] 1381815496.42684
[3222] 1381815496.42698
[3224] 1381815496.4317
[3221] 1381815496.43173
[3223] 1381815496.43479
[3225] 1381815496.45572

当然すべての子プロセスが完全に同時刻に実行はできませんが、ほぼバラつきなく実行され、プロセス同士がきちんと待ち合わせていることが分かります。
今回は親プロセスが、セマフォに子プロセスの数と同じ値をセットしたため、すべての子プロセスがほぼ同時に実行されましたが、たとえばこれを半分だけにして

# ... (略)
for (1..$process) {
    # ... (略)

    $sem->op(0, -1, SEM_UNDO);
    say "[$$] ", Time::HiRes::time( );
    Time::HiRes::sleep(0.5);  # 0.5 秒休んでみる

    $pm->finish;
}

$sem->setval(0, $process/2);  # 子プロセスの数の半分をセット
$pm->wait_all_children;
$sem->remove;

実行すると

[19428] 1381998363.44075
[19426] 1381998363.44078
[19430] 1381998363.44125
[19427] 1381998363.44129
[19429] 1381998363.44166
[19431] 1381998363.94839  # <=ロック取得できず 0.5 秒 wait している
[19432] 1381998363.95549
[19433] 1381998363.96007
[19434] 1381998363.96224
[19435] 1381998363.96389

と、予想通りの待ち合わせをしてくれます。
今回は大きさ1のセマフォを作成し、そこにプロセス数分の値をセットしましたが、プロセス数分のセマフォを確保して、プロセスごとに取得するセマフォを分ければ、任意の子プロセスとの待ち合わせ。なんてことも実現できるんじゃないかと思います。

もちろん、待ち合わせの話しに限ったことではなく Perlセマフォを使いたくなったときにもご利用ください。
セマフォなんて使う機会は少ないと思いますが、忘れないように一応メモ。