[PHP]stream_socket_pairでプロセス間通信を実装する|双方向パイプの作成から実践活用まで完全ガイド

PHP

はじめに

PHPで親プロセスと子プロセスが通信したいとき、ファイルや共有メモリを使う方法もありますが、より洗練されたアプローチが ソケットペア です。

stream_socket_pair は、相互に接続された2つのソケットストリームを一度に作成する関数です。片方に書いたデータがもう片方から読めるという双方向パイプとして機能し、pcntl_fork と組み合わせることでプロセス間の効率的な通信チャネルを構築できます。

この記事では、ソケットペアの仕組みから実践的なIPC(プロセス間通信)実装まで、クラスを用いた具体例とともに丁寧に解説します。


stream_socket_pair とは

項目内容
関数名stream_socket_pair
PHPバージョンPHP 5.1.0以降
カテゴリストリーム関数
返り値array(2要素のストリーム配列)、false(失敗時)

構文

stream_socket_pair(int $domain, int $type, int $protocol): array|false

パラメータ

パラメータ説明
$domainintソケットドメイン(下表参照)
$typeint通信タイプ(下表参照)
$protocolintプロトコル(通常 0

ドメイン定数

定数説明対応OS
STREAM_PF_UNIXUNIXドメインソケットLinux / macOS
STREAM_PF_INETIPv4 インターネットソケット全OS
STREAM_PF_INET6IPv6 インターネットソケット全OS

タイプ定数

定数説明
STREAM_SOCK_STREAMストリーム型(TCP相当、信頼性あり)
STREAM_SOCK_DGRAMデータグラム型(UDP相当、高速)
STREAM_SOCK_RAWrawソケット

返り値

成功時は 2要素の配列 [$socket0, $socket1] を返します。どちらか一方に書いたデータを、もう一方から読むことができます。


ソケットペアの仕組み

【stream_socket_pair の動作イメージ】

stream_socket_pair() 呼び出し後:

  $pair[0] ←───────────────→ $pair[1]
            双方向ストリーム

  $pair[0] に fwrite("Hello") すると
  $pair[1] から fread() で "Hello" が読める
  
  逆も同様:
  $pair[1] に fwrite("World") すると
  $pair[0] から fread() で "World" が読める

【fork() と組み合わせた場合】

  fork() 前に pair を作成
  
  親プロセス                      子プロセス
  $pair[0] を使う ←─────────→ $pair[1] を使う
  $pair[1] を閉じる              $pair[0] を閉じる
  
  → 親子間の双方向通信チャネルが完成

基本的な使い方

<?php
// ソケットペアを作成
$pair = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, 0);

if ($pair === false) {
    die("ソケットペアの作成に失敗しました");
}

[$socket0, $socket1] = $pair;

// $socket0 に書いたデータを $socket1 から読む
fwrite($socket0, "Hello from socket0\n");
$data = fgets($socket1);
echo "socket1 が受信: " . trim($data) . PHP_EOL;

// 逆方向も同様
fwrite($socket1, "Hello from socket1\n");
$data = fgets($socket0);
echo "socket0 が受信: " . trim($data) . PHP_EOL;

fclose($socket0);
fclose($socket1);

出力:

socket1 が受信: Hello from socket0
socket0 が受信: Hello from socket1

実践例(クラスを使った実装)

例1:ソケットペアをメッセージパイプとして使う

コルーチン的にデータを受け渡すシンプルなパイプクラスです。pcntl_fork なしでも同一プロセス内での通信バッファとして利用できます。

<?php

class SocketPipe
{
    private $end0;
    private $end1;
    private int $messageCount = 0;

    public function __construct(int $domain = STREAM_PF_UNIX)
    {
        $pair = stream_socket_pair($domain, STREAM_SOCK_STREAM, 0);

        if ($pair === false) {
            throw new RuntimeException("ソケットペアの作成に失敗しました");
        }

        [$this->end0, $this->end1] = $pair;

        stream_set_timeout($this->end0, 3);
        stream_set_timeout($this->end1, 3);
    }

    /**
     * end0 → end1 方向に書き込む
     */
    public function send(string $message): int
    {
        $data    = $message . "\n";
        $written = fwrite($this->end0, $data);
        $this->messageCount++;
        return $written ?: 0;
    }

    /**
     * end1 側から読み取る
     */
    public function receive(): string
    {
        $line = fgets($this->end1, 65536);
        $meta = stream_get_meta_data($this->end1);

        if ($meta['timed_out']) {
            throw new RuntimeException("受信タイムアウト");
        }

        return $line !== false ? rtrim($line) : '';
    }

    /**
     * end1 → end0 方向に書き込む(逆方向)
     */
    public function reply(string $message): int
    {
        $data    = $message . "\n";
        $written = fwrite($this->end1, $data);
        return $written ?: 0;
    }

    /**
     * end0 側から読み取る(逆方向)
     */
    public function readReply(): string
    {
        $line = fgets($this->end0, 65536);
        $meta = stream_get_meta_data($this->end0);

        if ($meta['timed_out']) {
            throw new RuntimeException("返信タイムアウト");
        }

        return $line !== false ? rtrim($line) : '';
    }

    public function getEnds(): array
    {
        return [$this->end0, $this->end1];
    }

    public function getMessageCount(): int
    {
        return $this->messageCount;
    }

    public function close(): void
    {
        if (is_resource($this->end0)) fclose($this->end0);
        if (is_resource($this->end1)) fclose($this->end1);
    }
}

// 使用例:双方向メッセージングのデモ
$pipe = new SocketPipe();

$messages = [
    "ping",
    "データ転送テスト",
    json_encode(['type' => 'command', 'action' => 'status']),
];

foreach ($messages as $msg) {
    $pipe->send($msg);
    $received = $pipe->receive();
    echo "送信: {$msg}" . PHP_EOL;
    echo "受信: {$received}" . PHP_EOL;

    // 逆方向で返信
    $pipe->reply("ACK: {$received}");
    $ack = $pipe->readReply();
    echo "ACK : {$ack}" . PHP_EOL . PHP_EOL;
}

echo "総メッセージ数: " . $pipe->getMessageCount() . PHP_EOL;
$pipe->close();

出力例:

送信: ping
受信: ping
ACK : ACK: ping

送信: データ転送テスト
受信: データ転送テスト
ACK : ACK: データ転送テスト

送信: {"type":"command","action":"status"}
受信: {"type":"command","action":"status"}
ACK : ACK: {"type":"command","action":"status"}

総メッセージ数: 3

例2:pcntl_fork と組み合わせた親子プロセス間通信

fork 後に不要な端を閉じて、親子間の専用チャネルを確立するパターンです。

<?php

class ForkChannel
{
    private $parentEnd;
    private $childEnd;
    private bool $isParent;
    private int  $childPid = 0;

    private function __construct(resource $parentEnd, resource $childEnd, bool $isParent, int $pid = 0)
    {
        $this->parentEnd = $parentEnd;
        $this->childEnd  = $childEnd;
        $this->isParent  = $isParent;
        $this->childPid  = $pid;
    }

    /**
     * ソケットペアを作成してforkし、親用・子用のチャネルを返す
     */
    public static function create(): self
    {
        $pair = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, 0);
        if ($pair === false) {
            throw new RuntimeException("ソケットペア作成失敗");
        }

        [$end0, $end1] = $pair;
        $pid = pcntl_fork();

        if ($pid === -1) {
            fclose($end0);
            fclose($end1);
            throw new RuntimeException("fork失敗");
        }

        if ($pid > 0) {
            // 親プロセス:end1 を閉じて end0 を使う
            fclose($end1);
            stream_set_timeout($end0, 10);
            return new self($end0, $end0, true, $pid);
        } else {
            // 子プロセス:end0 を閉じて end1 を使う
            fclose($end0);
            stream_set_timeout($end1, 10);
            return new self($end1, $end1, false);
        }
    }

    public function isParent(): bool { return $this->isParent; }
    public function isChild(): bool  { return !$this->isParent; }
    public function getChildPid(): int { return $this->childPid; }

    public function send(string $message): void
    {
        $socket = $this->isParent ? $this->parentEnd : $this->childEnd;
        fwrite($socket, $message . "\n");
    }

    public function receive(): string
    {
        $socket = $this->isParent ? $this->parentEnd : $this->childEnd;
        $line   = fgets($socket, 65536);
        return $line !== false ? rtrim($line) : '';
    }

    public function close(): void
    {
        $socket = $this->isParent ? $this->parentEnd : $this->childEnd;
        if (is_resource($socket)) {
            fclose($socket);
        }
    }

    public function waitChild(): int
    {
        if ($this->isParent && $this->childPid > 0) {
            pcntl_waitpid($this->childPid, $status);
            return pcntl_wexitstatus($status);
        }
        return -1;
    }
}

// 使用例
if (!function_exists('pcntl_fork')) {
    echo "pcntl 拡張が必要です(CLI環境でお試しください)" . PHP_EOL;
    exit;
}

$channel = ForkChannel::create();

if ($channel->isParent()) {
    // 親プロセス
    echo "[親] PID: " . getmypid() . " / 子PID: " . $channel->getChildPid() . PHP_EOL;

    // 子にタスクを送信
    $tasks = ['task:compute', 'task:fetch', 'task:cleanup'];
    foreach ($tasks as $task) {
        echo "[親] 送信: {$task}" . PHP_EOL;
        $channel->send($task);

        $result = $channel->receive();
        echo "[親] 受信: {$result}" . PHP_EOL;
    }

    $channel->send('exit');
    $exitCode = $channel->waitChild();
    echo "[親] 子プロセス終了(コード: {$exitCode})" . PHP_EOL;
    $channel->close();

} else {
    // 子プロセス
    echo "[子] PID: " . getmypid() . PHP_EOL;

    while (true) {
        $task = $channel->receive();
        if ($task === 'exit' || $task === '') break;

        echo "[子] タスク受信: {$task}" . PHP_EOL;
        // 擬似的な処理
        usleep(10000);
        $channel->send("done:{$task}");
    }

    $channel->close();
    exit(0);
}

出力例(親プロセス側):

[親] PID: 12345 / 子PID: 12346
[子] PID: 12346
[親] 送信: task:compute
[子] タスク受信: task:compute
[親] 受信: done:task:compute
[親] 送信: task:fetch
[子] タスク受信: task:fetch
[親] 受信: done:task:fetch
[親] 送信: task:cleanup
[子] タスク受信: task:cleanup
[親] 受信: done:task:cleanup
[親] 子プロセス終了(コード: 0)

例3:ノンブロッキングで非同期メッセージキューを実装する

ソケットペアをバッファとして使い、ノンブロッキングで非同期的にメッセージを処理します。

<?php

class AsyncMessageQueue
{
    private $writeEnd;
    private $readEnd;
    private int $enqueued = 0;
    private int $dequeued = 0;

    public function __construct()
    {
        $pair = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, 0);
        if ($pair === false) {
            throw new RuntimeException("ソケットペア作成失敗");
        }

        [$this->writeEnd, $this->readEnd] = $pair;

        // 読み取り側をノンブロッキングに設定
        stream_set_blocking($this->readEnd, false);
    }

    /**
     * メッセージをエンキューする
     */
    public function enqueue(mixed $data): void
    {
        $payload = json_encode($data) . "\n";
        fwrite($this->writeEnd, $payload);
        $this->enqueued++;
    }

    /**
     * メッセージをデキューする(なければ null)
     */
    public function dequeue(): mixed
    {
        $line = fgets($this->readEnd, 65536);
        if ($line === false || $line === '') {
            return null;
        }
        $this->dequeued++;
        return json_decode(rtrim($line), true);
    }

    /**
     * 利用可能なすべてのメッセージをデキューする
     */
    public function drainAll(): array
    {
        $messages = [];
        while (($msg = $this->dequeue()) !== null) {
            $messages[] = $msg;
        }
        return $messages;
    }

    /**
     * メッセージが利用可能かチェックする(stream_select 使用)
     */
    public function hasMessages(int $timeoutUsec = 0): bool
    {
        $read   = [$this->readEnd];
        $write  = $except = null;
        $result = stream_select($read, $write, $except, 0, $timeoutUsec);
        return $result > 0;
    }

    public function stats(): array
    {
        return [
            'enqueued' => $this->enqueued,
            'dequeued' => $this->dequeued,
            'pending'  => $this->enqueued - $this->dequeued,
        ];
    }

    public function close(): void
    {
        if (is_resource($this->writeEnd)) fclose($this->writeEnd);
        if (is_resource($this->readEnd))  fclose($this->readEnd);
    }
}

// 使用例
$queue = new AsyncMessageQueue();

// メッセージをエンキュー
$events = [
    ['type' => 'user.login',   'user_id' => 1001, 'ip' => '192.168.1.10'],
    ['type' => 'page.view',    'user_id' => 1001, 'path' => '/dashboard'],
    ['type' => 'user.logout',  'user_id' => 1001],
    ['type' => 'user.login',   'user_id' => 1002, 'ip' => '10.0.0.5'],
    ['type' => 'api.request',  'endpoint' => '/api/v1/data', 'latency_ms' => 42],
];

foreach ($events as $event) {
    $queue->enqueue($event);
}

echo "エンキュー完了: " . $queue->stats()['enqueued'] . " 件" . PHP_EOL . PHP_EOL;

// 処理ループ(メッセージがあれば処理)
if ($queue->hasMessages()) {
    $messages = $queue->drainAll();

    echo "=== メッセージ処理 ===" . PHP_EOL;
    foreach ($messages as $msg) {
        $typeLabel = str_pad($msg['type'], 16);
        $detail    = match(true) {
            isset($msg['user_id']) && isset($msg['ip']) => "ユーザー#{$msg['user_id']} from {$msg['ip']}",
            isset($msg['user_id']) && isset($msg['path']) => "ユーザー#{$msg['user_id']} → {$msg['path']}",
            isset($msg['user_id']) => "ユーザー#{$msg['user_id']}",
            isset($msg['endpoint']) => "{$msg['endpoint']} ({$msg['latency_ms']}ms)",
            default => json_encode($msg),
        };
        echo "  [{$typeLabel}] {$detail}" . PHP_EOL;
    }
}

echo PHP_EOL;
$stats = $queue->stats();
echo "エンキュー: {$stats['enqueued']} / デキュー: {$stats['dequeued']} / 残: {$stats['pending']}" . PHP_EOL;

$queue->close();

出力例:

エンキュー完了: 5 件

=== メッセージ処理 ===
  [user.login     ] ユーザー#1001 from 192.168.1.10
  [page.view      ] ユーザー#1001 → /dashboard
  [user.logout    ] ユーザー#1001
  [user.login     ] ユーザー#1002 from 10.0.0.5
  [api.request    ] /api/v1/data (42ms)

エンキュー: 5 / デキュー: 5 / 残: 0

例4:ドメイン別のソケットペアを比較する

STREAM_PF_UNIX / STREAM_PF_INET / STREAM_PF_INET6 の挙動と性能を比較します。

<?php

class SocketPairBenchmark
{
    public static function run(int $domain, string $label, int $iterations = 10000): array
    {
        $pair = stream_socket_pair($domain, STREAM_SOCK_STREAM, 0);
        if ($pair === false) {
            return ['label' => $label, 'error' => '作成失敗'];
        }

        [$s0, $s1] = $pair;

        stream_set_blocking($s0, true);
        stream_set_blocking($s1, true);

        $payload   = "benchmark_message\n";
        $startTime = microtime(true);

        for ($i = 0; $i < $iterations; $i++) {
            fwrite($s0, $payload);
            fgets($s1);
        }

        $elapsed = microtime(true) - $startTime;

        // アドレス情報取得
        $localName = stream_socket_get_name($s0, false);

        fclose($s0);
        fclose($s1);

        return [
            'label'       => $label,
            'domain'      => $domain,
            'iterations'  => $iterations,
            'elapsed_ms'  => round($elapsed * 1000, 2),
            'msg_per_sec' => round($iterations / $elapsed),
            'local_name'  => $localName ?: 'N/A',
        ];
    }
}

// 各ドメインで計測
$domains = [
    [STREAM_PF_UNIX,  'STREAM_PF_UNIX(UNIXドメイン)'],
    [STREAM_PF_INET,  'STREAM_PF_INET(IPv4)'],
    [STREAM_PF_INET6, 'STREAM_PF_INET6(IPv6)'],
];

echo str_pad("ドメイン",                 30)
   . str_pad("処理時間",                 14)
   . str_pad("メッセージ/秒",            18)
   . "アドレス例" . PHP_EOL;
echo str_repeat('-', 80) . PHP_EOL;

foreach ($domains as [$domain, $label]) {
    $result = SocketPairBenchmark::run($domain, $label);

    if (isset($result['error'])) {
        echo str_pad($label, 30) . $result['error'] . PHP_EOL;
        continue;
    }

    echo str_pad($result['label'],          30)
       . str_pad("{$result['elapsed_ms']} ms", 14)
       . str_pad(number_format($result['msg_per_sec']), 18)
       . $result['local_name'] . PHP_EOL;
}

出力例:

ドメイン                      処理時間      メッセージ/秒     アドレス例
--------------------------------------------------------------------------------
STREAM_PF_UNIX(UNIXドメイン)123.45 ms     81,000            
STREAM_PF_INET(IPv4)        287.32 ms     34,800            127.0.0.1:54321
STREAM_PF_INET6(IPv6)       301.18 ms     33,200            [::1]:54322

STREAM_PF_UNIX はネットワークスタックを経由しないため最も高速です。


例5:シリアライズしたオブジェクトをソケットペア経由で受け渡す

構造化データのIPC転送パターンです。JSONエンコード/デコードで型安全に送受信します。

<?php

class TypedSocketChannel
{
    private $sendEnd;
    private $recvEnd;
    private string $delimiter;

    public function __construct(string $delimiter = "\n")
    {
        $pair = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, 0);
        if ($pair === false) {
            throw new RuntimeException("ソケットペア作成失敗");
        }

        [$this->sendEnd, $this->recvEnd] = $pair;
        $this->delimiter = $delimiter;

        stream_set_timeout($this->sendEnd, 5);
        stream_set_timeout($this->recvEnd, 5);
    }

    /**
     * 任意のデータをシリアライズして送信する
     */
    public function write(mixed $data): void
    {
        $payload = json_encode([
            'type'    => gettype($data),
            'class'   => is_object($data) ? get_class($data) : null,
            'payload' => $data,
            'ts'      => microtime(true),
        ]) . $this->delimiter;

        if (fwrite($this->sendEnd, $payload) === false) {
            throw new RuntimeException("書き込み失敗");
        }
    }

    /**
     * データを受信してデシリアライズする
     */
    public function read(): mixed
    {
        $line = fgets($this->recvEnd, 1_048_576); // 最大1MB
        $meta = stream_get_meta_data($this->recvEnd);

        if ($meta['timed_out']) {
            throw new RuntimeException("受信タイムアウト");
        }

        if ($line === false) {
            throw new RuntimeException("受信失敗");
        }

        $envelope = json_decode(rtrim($line), true);
        if (json_last_error() !== JSON_ERROR_NONE) {
            throw new RuntimeException("JSONデコード失敗: " . json_last_error_msg());
        }

        return $envelope['payload'];
    }

    public function getEnds(): array
    {
        return [$this->sendEnd, $this->recvEnd];
    }

    public function close(): void
    {
        if (is_resource($this->sendEnd)) fclose($this->sendEnd);
        if (is_resource($this->recvEnd)) fclose($this->recvEnd);
    }
}

// 使用例:さまざまな型のデータを転送
$channel = new TypedSocketChannel();

$testData = [
    "文字列メッセージ",
    42,
    3.14,
    true,
    ['items' => ['apple', 'banana', 'cherry'], 'count' => 3],
    ['user' => ['id' => 1, 'name' => '田中太郎', 'roles' => ['admin', 'editor']]],
    range(1, 5),
];

echo "=== 型安全なソケット転送テスト ===" . PHP_EOL;
echo str_pad("送信データ",          30) . str_pad("型",     10) . "受信データ" . PHP_EOL;
echo str_repeat('-', 64) . PHP_EOL;

foreach ($testData as $data) {
    $channel->write($data);
    $received   = $channel->read();

    $sendStr = is_array($data)  ? json_encode($data,     JSON_UNESCAPED_UNICODE)
             : var_export($data, true);
    $recvStr = is_array($received) ? json_encode($received, JSON_UNESCAPED_UNICODE)
             : var_export($received, true);

    $match = $data === $received ? '✓' : '✗';

    echo str_pad(mb_substr($sendStr, 0, 28), 30)
       . str_pad(gettype($data), 10)
       . "{$match} " . mb_substr($recvStr, 0, 28) . PHP_EOL;
}

$channel->close();

出力例:

=== 型安全なソケット転送テスト ===
送信データ                    型        受信データ
----------------------------------------------------------------
'文字列メッセージ'            string    ✓ '文字列メッセージ'
42                            integer   ✓ 42
3.14                          double    ✓ 3.14
true                          boolean   ✓ true
{"items":["apple","banana"... array     ✓ {"items":["apple","banana"...
{"user":{"id":1,"name":"田... array     ✓ {"user":{"id":1,"name":"田...
[1,2,3,4,5]                   array     ✓ [1,2,3,4,5]

例6:ワーカープールでタスクを並列処理する

複数の子プロセスをソケットペアで管理し、タスクを効率的に分散処理します。

<?php

class WorkerPool
{
    private array $workers    = [];  // [pid => ['socket' => resource, 'busy' => bool]]
    private int   $workerCount;
    private int   $completed  = 0;

    public function __construct(int $workerCount = 3)
    {
        if (!function_exists('pcntl_fork')) {
            throw new RuntimeException("pcntl拡張が必要です");
        }

        $this->workerCount = $workerCount;
        $this->spawnWorkers();
    }

    private function spawnWorkers(): void
    {
        for ($i = 0; $i < $this->workerCount; $i++) {
            $pair = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, 0);
            if ($pair === false) {
                throw new RuntimeException("ソケットペア作成失敗");
            }

            [$parentSocket, $childSocket] = $pair;
            $pid = pcntl_fork();

            if ($pid === -1) {
                throw new RuntimeException("fork失敗");
            }

            if ($pid > 0) {
                // 親:子ソケットを閉じて親ソケットを保持
                fclose($childSocket);
                stream_set_timeout($parentSocket, 30);
                stream_set_blocking($parentSocket, false);

                $this->workers[$pid] = [
                    'socket' => $parentSocket,
                    'busy'   => false,
                    'worker_id' => $i + 1,
                ];
            } else {
                // 子:親ソケットを閉じてワーカーループ開始
                fclose($parentSocket);
                stream_set_blocking($childSocket, true);
                stream_set_timeout($childSocket, 30);

                $this->workerLoop($childSocket, $i + 1);
                exit(0);
            }
        }
    }

    private function workerLoop(resource $socket, int $workerId): void
    {
        while (true) {
            $line = fgets($socket, 65536);
            if ($line === false || trim($line) === 'exit') break;

            $task = json_decode(rtrim($line), true);
            if (!$task) continue;

            // 擬似的なタスク処理(CPU処理のシミュレーション)
            $result = $this->processTask($task, $workerId);

            fwrite($socket, json_encode($result) . "\n");
        }
    }

    private function processTask(array $task, int $workerId): array
    {
        usleep(rand(5000, 20000)); // 5〜20ms の処理時間をシミュレート
        return [
            'task_id'   => $task['id'],
            'input'     => $task['value'],
            'result'    => $task['value'] * $task['value'],
            'worker_id' => $workerId,
            'pid'       => getmypid(),
        ];
    }

    public function dispatch(array $tasks): array
    {
        $results    = [];
        $pending    = $tasks;
        $inFlight   = 0;

        while (!empty($pending) || $inFlight > 0) {
            // 空きワーカーにタスクを割り当て
            foreach ($this->workers as $pid => &$worker) {
                if (!$worker['busy'] && !empty($pending)) {
                    $task = array_shift($pending);
                    fwrite($worker['socket'], json_encode($task) . "\n");
                    $worker['busy'] = true;
                    $inFlight++;
                }
            }
            unset($worker);

            // stream_select で完了を検知
            $readSockets = [];
            $pidMap = [];
            foreach ($this->workers as $pid => $worker) {
                if ($worker['busy']) {
                    $id           = (int) $worker['socket'];
                    $readSockets[$id] = $worker['socket'];
                    $pidMap[$id]      = $pid;
                }
            }

            if (empty($readSockets)) break;

            $read   = array_values($readSockets);
            $write  = $except = null;
            $changed = stream_select($read, $write, $except, 1, 0);

            if ($changed > 0) {
                foreach ($read as $sock) {
                    $id  = (int) $sock;
                    $pid = $pidMap[$id];

                    $line = fgets($sock, 65536);
                    if ($line !== false && $line !== '') {
                        $result    = json_decode(rtrim($line), true);
                        $results[] = $result;
                        $this->workers[$pid]['busy'] = false;
                        $inFlight--;
                        $this->completed++;
                    }
                }
            }
        }

        return $results;
    }

    public function shutdown(): void
    {
        foreach ($this->workers as $pid => $worker) {
            fwrite($worker['socket'], "exit\n");
            fclose($worker['socket']);
            pcntl_waitpid($pid, $status);
        }
        $this->workers = [];
    }

    public function getCompleted(): int
    {
        return $this->completed;
    }
}

// 使用例
if (!function_exists('pcntl_fork')) {
    echo "pcntl 拡張が必要です" . PHP_EOL;
    exit;
}

$pool = new WorkerPool(workerCount: 3);

// 10個のタスクを分散処理
$tasks = array_map(
    fn($i) => ['id' => $i, 'value' => $i * 2],
    range(1, 10)
);

$startTime = microtime(true);
$results   = $pool->dispatch($tasks);
$elapsed   = round((microtime(true) - $startTime) * 1000, 1);

// 結果をタスクID順にソート
usort($results, fn($a, $b) => $a['task_id'] <=> $b['task_id']);

echo "=== ワーカープール実行結果 ===" . PHP_EOL;
echo str_pad("タスクID", 10) . str_pad("入力値", 8) . str_pad("結果", 10) . "担当ワーカー" . PHP_EOL;
echo str_repeat('-', 44) . PHP_EOL;

foreach ($results as $r) {
    echo str_pad($r['task_id'],  10)
       . str_pad($r['input'],    8)
       . str_pad($r['result'],   10)
       . "Worker#{$r['worker_id']} (PID:{$r['pid']})" . PHP_EOL;
}

echo PHP_EOL;
echo "処理件数 : {$pool->getCompleted()} タスク" . PHP_EOL;
echo "処理時間 : {$elapsed} ms({$pool->workerCount}並列)" . PHP_EOL;

$pool->shutdown();

出力例:

=== ワーカープール実行結果 ===
タスクID  入力値  結果      担当ワーカー
--------------------------------------------
1         2       4         Worker#1 (PID:12347)
2         4       16        Worker#2 (PID:12348)
3         6       36        Worker#3 (PID:12349)
4         8       64        Worker#1 (PID:12347)
5         10      100       Worker#2 (PID:12348)
...
10        20      400       Worker#3 (PID:12349)

処理件数 : 10 タスク
処理時間 : 87.3 ms(3並列)

関連する関数との比較

関数役割特徴
stream_socket_pair双方向ソケットペアを作成fork前に作成してIPCに使う
stream_socket_server + stream_socket_clientサーバー/クライアント型の接続ネットワーク越しの通信も可能
proc_open外部プロセスとパイプで通信単方向パイプ(stdin/stdout/stderr)
socket_create_pair同等機能(ソケット拡張)socket_* APIが必要

stream_socket_pair vs proc_open パイプ

// proc_open:外部コマンドとの単方向パイプ
$proc  = proc_open('php worker.php', [
    0 => ['pipe', 'r'],  // stdin(書き込み専用)
    1 => ['pipe', 'w'],  // stdout(読み取り専用)
], $pipes);
// → stdin と stdout は別々のパイプ(半二重)

// stream_socket_pair:双方向の全二重チャネル
$pair = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, 0);
// → $pair[0] と $pair[1] が双方向(全二重)
観点stream_socket_pairproc_open パイプ
通信方向双方向(全二重)単方向(半二重)
プロセス種別PHPプロセス間外部コマンドとも可
バッファリングOSのソケットバッファOSのパイプバッファ
デッドロックリスク低(双方向)高(正しく管理しないと)

よくある注意点・落とし穴

1. fork後は不要な端を必ず閉じる

両端を両プロセスが持ったままだと、fgets がEOFを検知できずにデッドロックします。

$pair = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, 0);
$pid  = pcntl_fork();

if ($pid > 0) {
    fclose($pair[1]); // 親:$pair[1] を閉じる(重要)
    // $pair[0] を使う
} else {
    fclose($pair[0]); // 子:$pair[0] を閉じる(重要)
    // $pair[1] を使う
}

2. Windows では STREAM_PF_UNIX が使えない

Windowsでは STREAM_PF_UNIX がサポートされていません。STREAM_PF_INET を代わりに使います。

$domain = PHP_OS_FAMILY === 'Windows' ? STREAM_PF_INET : STREAM_PF_UNIX;
$pair   = stream_socket_pair($domain, STREAM_SOCK_STREAM, 0);

3. 大きなデータはフレーミングが必要

fgets は改行まで、fread は指定バイト数しか読まないため、大きなデータを送る場合は長さプレフィックスや区切り文字でフレーミングします。

// 送信側:長さプレフィックスを付ける
$data   = json_encode($largeObject);
$frame  = pack('N', strlen($data)) . $data; // 4バイトの長さ + データ
fwrite($socket, $frame);

// 受信側:長さを読んでから本文を読む
$lenBytes = fread($socket, 4);
$length   = unpack('N', $lenBytes)[1];
$data     = '';
while (strlen($data) < $length) {
    $data .= fread($socket, $length - strlen($data));
}

4. STREAM_SOCK_DGRAM はデータグラム単位での読み書きになる

STREAM_SOCK_STREAM はバイトストリームですが、STREAM_SOCK_DGRAM はメッセージ単位です。fgets は使えず fread でデータグラムを一括読み取りします。


まとめ

項目内容
関数名stream_socket_pair(int $domain, int $type, int $protocol): array|false
主な用途pcntl_fork と組み合わせたプロセス間通信(IPC)
返り値[$socket0, $socket1](双方向に接続済み)
推奨ドメインSTREAM_PF_UNIX(Linux/macOS)、Windows では STREAM_PF_INET
fork後の注意不要な端を必ず fclose する(デッドロック防止)
Windows対応STREAM_PF_UNIX 非対応(STREAM_PF_INET を使う)
PHP バージョンPHP 5.1.0 以上

stream_socket_pair は、PHPでのプロセス間通信に最適な双方向ソケットチャネルを提供する関数です。proc_open のパイプが半二重なのに対し、ソケットペアは全二重のため、複雑な双方向プロトコルもシンプルに実装できます。

pcntl_fork と組み合わせて親子プロセス間のデータ授受を行う場面や、同一プロセス内の非同期メッセージバッファとして活用するなど、さまざまなユースケースに対応できます。ぜひ並列処理が必要なPHPアプリケーションで試してみてください。

タイトルとURLをコピーしました