👩‍👩‍👧‍👧

PHPでマルチプロセス処理を実現するには

2024/06/01に公開

こんにちはみなさん

PHPはそもそもシングルプロセスシングルスレッドの同期処理をメインにしたプログラミング言語ですが、一昔前、なんとか非同期処理をいろいろな方法で実現できないかを試したことがあります。
あるのですが、結局はまあ、同期処理であまり困らんでしょってなって、だんだん忘れ去っていたのですが、最近ふと思い出したようにマルチプロセス処理みたいなの、PHPでどうやってやるんだっけなぁってなったので、いい機会なので、思い出しがてら記事化してみようかなと思った次第。

テストの速度改善とか、いろいろできそうだなって思ったので、つい

PHPでマルチプロセス処理する

pcntl

PHPでマルチプロセス処理を作るのに最も簡単な方法としては、pcntlモジュールを使うのがいいでしょう。
私のようにローカルでdockerを使って検証をしている場合は、以下のコマンドでインストールできます。

docker-php-ext-configure pcntl --enable-pcntl && docker-php-ext-install pcntl

この上で、最も簡単な子プロセスを使った処理を書いてみましょう

$pid = pcntl_fork();
if ($pid === 0) {
    sleep(1);
    echo 'chiled process!!!', "\n";
    sleep(1);
} else if ($pid === -1) {
    throw new RuntimeException('プロセスの作成に失敗した模様');
} else {
    echo 'parent process!!!', "\n";
    sleep(2);
}
echo 'process end!!', "\n";

こいつを実行すると、こんな結果が出てきます。

# php parallel.php 
parent process!!!
chiled process!!!
process end!!
process end!!

ここで、pcntl_forkというのは、「現在実行中の自身のプロセスをコピーして新しいプロセスを作る」(=プロセスをforkする)関数になっています。
forkされたプロセスはそのまま実行されるわけで、つまりはコピー元のプロセスとコピー先のプロセスの二つが並列で以降のスクリプトを実行することになるわけです。

forkのイメージ

親プロセスと子プロセスの違いは、pcntl_fork関数の返す値$pidにあり、親プロセスの場合は子プロセスのIDが、子プロセスの場合は0が入ります。
子プロセスの生成に失敗した場合、$pidには-1が入ります。
これにより、上記の分岐が発生していたわけです。

forkされたプロセスとメモリ

forkされたプロセスとメモリについて検証してみましょう。
以下のコードは親プロセスと子プロセスでスカラ変数、配列、オブジェクトそれぞれに違う変更を行っています。

$greeting = 'こんにちは!';
$obj = new stdClass;
$obj->name = '私は';
$arr = ['name' => '私は'];

$pid = pcntl_fork();

if ($pid === 0) {
    $greeting .= '子プロセスさん!';
    $obj->name .= '子プロセス';
    $arr['name'] .= '子プロセス';
    sleep(2);
} else if ($pid === -1) {
    throw new RuntimeException('プロセスの作成に失敗した模様');
} else {
    sleep(1);
    $greeting .= '親プロセスさん!';
    $obj->name .= '親プロセス';
    $arr['name'] .= '親プロセス';
}

echo 'スカラ: ' . $greeting . "\n";
echo 'オブジェクト: ' . $obj->name . "\n";
echo '配列: ' . $arr['name'] . "\n";

これの結果はどうなるでしょうか?

# php parallel.php 
スカラ: こんにちは!親プロセスさん!
オブジェクト: 私は親プロセス
配列: 私は親プロセス
## いったんここでプロセスが終了し、その後、次のものが出力される
スカラ: こんにちは!子プロセスさん!
オブジェクト: 私は子プロセス
配列: 私は子プロセス

ここでわかるのは、プロセスがforkされたとき、メモリ上に展開しているデータも同じようにコピーされ、親と子のメモリが共有されないことです。
メモリが共有されないので、シングルトンを好き勝手に使っても親と子で影響が出ることはないのですが、forkした時点で親プロセスのメモリにでかいのが乗っていると、子プロセスにも同じようにでかいのが乗るのには注意したいところです。

マルチプロセス処理の設計

重たい処理を別のプロセスに投げる

時間がかかる処理がある場合、その処理を別プロセスで実行しつつ、現在のプロセスはさっさと終わらせたい場合に、時間のかかる関数をラッピングして別プロセスで実行してもらうようなクラスがあると便利です。
例えばこんなものを作ります。

use Closure;
use RuntimeException;

class Task
{
    public function __construct(private Closure $func)
    {
    }

    public function run()
    {
        $exec = $this->func;
        $pid = pcntl_fork();
        if ($pid === 0) {
            $exec();
            exit(0);
        } else if ($pid === -1) {
            new RuntimeException('fail to fork');
        }
    }
}

こんなクラスを作ったうえで、以下のように使えばよい感じです。

echo '時間のかかる処理が必要だ' . "\n";
$task = new Task(function () {
    sleep(1);
    echo '時間がかかってます' . "\n";
    sleep(1);
    echo '処理が完了しました' . "\n";
});
$task->run();
echo '時間のかかる処理は別でやってもらっているので、自分はここで終わり' . "\n";

すると、出力はこんな感じになります。

# php parallel.php 
時間のかかる処理が必要だ
時間のかかる処理は別でやってもらっているので、自分はここで終わり
## ここでいったんプロセスが終了している
時間がかかってます
処理が完了しました

処理が終了するまで待つ

上で定義したTaskクラスに次のメソッドを用意します。

    public function run()
    {
        // ...略
        $this->processId = $pid;
    }

    public function wait(): int
    {
        pcntl_waitpid($this->processId, $status);
        return $status;
    }

この上で、次のコードを実行します。

echo '時間のかかる処理が必要だ' . "\n";
$task = new Task(function () {
    sleep(1);
    echo '時間がかかってます' . "\n";
    sleep(1);
    echo '処理が完了しました' . "\n";
});
$task->run();
$task->wait();
echo 'すべての処理が完了したので終了だ' . "\n";

これの結果は以下の通りです。

# php parallel.php 
時間のかかる処理が必要だ
時間がかかってます
処理が完了しました
すべての処理が完了したので終了だ

これにより、子プロセスの処理が完了するまで待つという処理を追加することができます。
しかし、せっかく子プロセスに投げているのに、わざわざ処理を待つのはどういうことかというと、複数の処理を並列で実行したいからです。

複数の処理を並列で実行し、全部終わるまで待つ

重たい処理をいったんいくつかのプロセスに分担してやらせつつ、処理が全部終わったら次に進むというのを書きたいことはあるでしょう。例えば、外部APIへの問い合わせや重たいSQLの実行、リモートファイルの移動などCPUの処理はたいして使わないものの、時間だけはかかる処理などを子プロセスにやらせて、全部終わったら自身も終わるといった感じです。

この処理を実現するために、まずは複数の処理を管理するリストクラスを作成します。

<?php
namespace Niisan\MultiTask;

use Closure;

class TaskList
{
    /** @var Task[] */
    private array $taskList = [];

    public function __construct(Closure|Task ...$tasks)
    {
        $this->add(...$tasks);
    }

    /**
     * タスクを追加する
     * クロージャーを渡した場合は、タスクにしたうえで格納する
     *
     * @param Closure|Task ...$tasks
     * @return void
     */
    public function add(Closure|Task ...$tasks)
    {
        foreach ($tasks as $task) {
            $item = ($task instanceof Task) ? $task : new Task($task);
            $this->taskList[] = $item;
        }
    }

    /**
     * 自身の持つタスクを実行し、すべて完了するまで待つ
     *
     * @return void
     */
    public function execSync()
    {
        if (count($this->taskList) === 0) {
            return;
        }

        foreach ($this->taskList as $task) {
            $task->run();
        }

        foreach ($this->taskList as $task) {
            $task->wait();
        }
    }
}

これはクロージャーもしくはTaskクラスをリストに保持し、execSyncを実行すると保持したクラスをすべて実行します。
保持したTaskインスタンスを全部実行したら、あとはすべての処理が終わるまで待ちます。waitの部分は、処理が終わっていれば即座に返ってくるので、実質的に最後の処理が終わったらこのループは抜けます。
このクラスを定義したうえで、次の実装を実行してみます。

echo '並列処理に何秒かかるかな' . "\n";
$start = microtime(true);
$func = function ($name, $wait) {
    sleep($wait);
    echo 'Hello!! ' . $name . "\n";
};

$taskList = new TaskList();
$taskList->add(fn() => $func('Yamada', 2));
$taskList->add(fn() => $func('Ito', 1));
$taskList->add(fn() => $func('Kobayashi', 4));
$taskList->add(fn() => $func('Ichikawa', 3));
$taskList->execSync();
$execTime = round(microtime(true) - $start, 1);
echo "完了までに{$execTime}秒かかったよ" . "\n";

結果はこんな感じ

# php parallel.php 
並列処理に何秒かかるかな
Hello!! Ito
Hello!! Yamada
Hello!! Ichikawa
Hello!! Kobayashi
完了までに4秒かかったよ

同期的に実行すれば10秒かかる処理なので、これを4秒に抑えることができるというのがこの並列処理の利点になります。
一方、先に述べた通り、メモリもコピーされるので、丸々太ったプロセスからforkするのは危険なように思うので、並列個数には注意しましょう。

まとめ

ということで、PHPでマルチプロセス処理を作ってみました。
PHPのマニュアルを見れば、大体わかるので、実装するのはそんなに難しくはないと思います。
しかしながら、どちらかというと昔見ていたライブラリとかがなくなってたりとかして、時の流れを感じてしまいました。
今回はこんなところです。

Discussion