[PHP]stream_select完全ガイド|複数ストリームを同時監視してノンブロッキングI/Oを実現する多重化関数

PHP

はじめに

PHPで複数のソケットやファイルディスクリプタを同時に扱う場合、単純にループで fread() を繰り返すと、どれか1つのストリームでデータ待ちが発生した瞬間に全体がブロックされてしまいます。

stream_select() は、複数のストリームを同時に監視し、読み取り・書き込み・例外のいずれかが可能になったストリームだけを返す関数です。Unix の select() システムコールをラップしており、PHPでノンブロッキングな多重I/Oを実現するための中心的な関数です。


関数の基本情報

項目内容
関数名stream_select()
対応バージョンPHP 4.3.0 以降
返り値int(変化したストリーム数)/ false(エラー時)
カテゴリストリーム関数

構文

stream_select(
    ?array &$read,
    ?array &$write,
    ?array &$except,
    ?int   $seconds,
    int    $microseconds = 0
): int|false

パラメータ

パラメータ説明
&$read?array読み取り可能になるまで監視するストリームの配列(参照渡し)
&$write?array書き込み可能になるまで監視するストリームの配列(参照渡し)
&$except?array例外(OOB データなど)を監視するストリームの配列(参照渡し)
$seconds?intタイムアウト秒数。0 で即時リターン(ノンブロッキング)、null で無制限待機
$microsecondsintタイムアウトのマイクロ秒部分(デフォルト 0

返り値

  • int:変化したストリームの総数(タイムアウトなら 0
  • false:エラーが発生した場合

重要: $read$write$except参照渡しです。関数呼び出し後、各配列には「実際に変化が起きたストリームのみ」が残ります(変化がなかったものは除去されます)。


基本的な使い方

<?php

// 2つのストリームを同時に監視する
$fp1 = fopen('php://memory', 'r+');
$fp2 = fopen('php://memory', 'r+');

fwrite($fp1, "stream1 data");
fwrite($fp2, "stream2 data");
rewind($fp1);
rewind($fp2);

$read   = [$fp1, $fp2];
$write  = null;
$except = null;

// 最大1秒待機して読み取り可能なストリームを取得
$changed = stream_select($read, $write, $except, 1);

if ($changed === false) {
    echo "エラーが発生しました" . PHP_EOL;
} elseif ($changed === 0) {
    echo "タイムアウト:読み取り可能なストリームなし" . PHP_EOL;
} else {
    echo "{$changed} 個のストリームが読み取り可能" . PHP_EOL;
    foreach ($read as $fp) {
        echo fread($fp, 1024) . PHP_EOL;
    }
}

fclose($fp1);
fclose($fp2);

stream_select() の動作フロー

呼び出し前:
  $read = [stream1, stream2, stream3]  ← 監視対象

           ↓ stream_select() が実行される

監視期間中:
  - stream2 にデータが到着
  - stream3 にデータが到着
  - stream1 はまだデータなし
  - タイムアウト or 変化検出で返却

呼び出し後:
  $read = [stream2, stream3]           ← 変化したものだけ残る
  戻り値 = 2                           ← 変化したストリーム数

実践的なクラスベースの活用例


例1:マルチストリームリーダー(MultiStreamReader)

複数のストリームを stream_select() で同時監視し、読み取り可能になった順にデータを収集する汎用リーダーです。

<?php

class StreamReadResult
{
    public function __construct(
        public readonly int    $index,
        public readonly string $data,
        public readonly float  $receivedAt,
    ) {}
}

class MultiStreamReader
{
    /** @var resource[] */
    private array $streams = [];
    private int   $timeoutSec;
    private int   $timeoutUsec;
    private int   $bufferSize;

    public function __construct(
        int $timeoutSec  = 5,
        int $timeoutUsec = 0,
        int $bufferSize  = 8192
    ) {
        $this->timeoutSec  = $timeoutSec;
        $this->timeoutUsec = $timeoutUsec;
        $this->bufferSize  = $bufferSize;
    }

    /**
     * 監視対象ストリームを追加する
     */
    public function add($stream): int
    {
        $this->streams[] = $stream;
        return count($this->streams) - 1;
    }

    /**
     * 全ストリームを監視し、読み取り可能になったものからデータを収集する
     *
     * @return StreamReadResult[]
     */
    public function readAll(): array
    {
        $results    = [];
        $remaining  = $this->streams;
        $indexMap   = array_flip(array_map('intval', array_keys($this->streams)));

        while (!empty($remaining)) {
            $read   = $remaining;
            $write  = null;
            $except = null;

            $changed = stream_select($read, $write, $except, $this->timeoutSec, $this->timeoutUsec);

            if ($changed === false) {
                throw new \RuntimeException('stream_select() でエラーが発生しました');
            }

            if ($changed === 0) {
                break; // タイムアウト
            }

            foreach ($read as $stream) {
                $data = '';
                while (!feof($stream)) {
                    $chunk = fread($stream, $this->bufferSize);
                    if ($chunk === false || $chunk === '') break;
                    $data .= $chunk;
                }

                // 元のインデックスを特定
                $idx = array_search($stream, $this->streams, true);
                $results[] = new StreamReadResult(
                    index:      $idx !== false ? (int)$idx : -1,
                    data:       $data,
                    receivedAt: microtime(true),
                );

                // 完了したストリームを remaining から除去
                $remaining = array_filter(
                    $remaining,
                    fn($s) => $s !== $stream
                );
            }
        }

        return $results;
    }
}

// 使用例
$reader = new MultiStreamReader(timeoutSec: 2);

// 3つのメモリストリームにデータを書き込む
$streams = [];
foreach (['Alpha', 'Beta', 'Gamma'] as $name) {
    $fp = fopen('php://memory', 'r+');
    fwrite($fp, "データ from {$name}");
    rewind($fp);
    $streams[] = $fp;
    $reader->add($fp);
}

$results = $reader->readAll();

foreach ($results as $result) {
    echo "ストリーム[{$result->index}]: {$result->data}" . PHP_EOL;
}

array_map('fclose', $streams);

// 出力:
// ストリーム[0]: データ from Alpha
// ストリーム[1]: データ from Beta
// ストリーム[2]: データ from Gamma

例2:ノンブロッキングTCPサーバー(NonBlockingServer)

stream_select() を使って、複数のクライアント接続を1つのプロセス・1スレッドで並行処理するTCPサーバーです。

<?php

class ClientConnection
{
    public string $buffer    = '';
    public float  $connectedAt;

    public function __construct(
        public readonly $socket,
        public readonly string $address,
    ) {
        $this->connectedAt = microtime(true);
    }
}

class NonBlockingServer
{
    private $serverSocket;
    /** @var ClientConnection[] */
    private array $clients = [];
    private bool  $running = false;
    private int   $maxClients;

    /** @var callable|null */
    private $onConnect;
    /** @var callable|null */
    private $onMessage;
    /** @var callable|null */
    private $onDisconnect;

    public function __construct(
        private string $host       = '127.0.0.1',
        private int    $port       = 8080,
        int            $maxClients = 100
    ) {
        $this->maxClients = $maxClients;
    }

    public function onConnect(callable $cb): void    { $this->onConnect    = $cb; }
    public function onMessage(callable $cb): void    { $this->onMessage    = $cb; }
    public function onDisconnect(callable $cb): void { $this->onDisconnect = $cb; }

    /**
     * サーバーを起動してイベントループを開始する
     */
    public function start(): void
    {
        $this->serverSocket = stream_socket_server(
            "tcp://{$this->host}:{$this->port}",
            $errno,
            $errstr
        );

        if (!$this->serverSocket) {
            throw new \RuntimeException("サーバー起動失敗: ({$errno}) {$errstr}");
        }

        stream_set_blocking($this->serverSocket, false);
        $this->running = true;

        echo "サーバー起動: tcp://{$this->host}:{$this->port}" . PHP_EOL;

        $this->eventLoop();
    }

    private function eventLoop(): void
    {
        while ($this->running) {
            // 監視リスト:サーバーソケット + 全クライアント
            $read = array_merge(
                [$this->serverSocket],
                array_map(fn($c) => $c->socket, $this->clients)
            );
            $write  = null;
            $except = null;

            $changed = stream_select($read, $write, $except, 0, 200_000); // 200ms

            if ($changed === false) break;
            if ($changed === 0)     continue; // タイムアウト、次のループへ

            foreach ($read as $socket) {
                if ($socket === $this->serverSocket) {
                    // 新しい接続を受け付ける
                    $this->acceptClient();
                } else {
                    // 既存クライアントからのデータを処理
                    $this->handleClient($socket);
                }
            }
        }

        fclose($this->serverSocket);
    }

    private function acceptClient(): void
    {
        if (count($this->clients) >= $this->maxClients) return;

        $clientSocket = stream_socket_accept($this->serverSocket, 0, $address);
        if (!$clientSocket) return;

        stream_set_blocking($clientSocket, false);
        $client = new ClientConnection($clientSocket, $address);
        $this->clients[] = $client;

        if ($this->onConnect) {
            ($this->onConnect)($client);
        }
    }

    private function handleClient($socket): void
    {
        $client = $this->findClient($socket);
        if (!$client) return;

        $data = fread($socket, 4096);

        if ($data === false || feof($socket)) {
            $this->disconnectClient($client);
            return;
        }

        $client->buffer .= $data;

        // 改行でメッセージを区切る
        while (($pos = strpos($client->buffer, "\n")) !== false) {
            $message        = substr($client->buffer, 0, $pos);
            $client->buffer = substr($client->buffer, $pos + 1);

            if ($this->onMessage) {
                ($this->onMessage)($client, trim($message));
            }
        }
    }

    private function disconnectClient(ClientConnection $client): void
    {
        if ($this->onDisconnect) {
            ($this->onDisconnect)($client);
        }
        fclose($client->socket);
        $this->clients = array_filter(
            $this->clients,
            fn($c) => $c !== $client
        );
    }

    private function findClient($socket): ?ClientConnection
    {
        foreach ($this->clients as $client) {
            if ($client->socket === $socket) return $client;
        }
        return null;
    }

    public function stop(): void { $this->running = false; }
    public function clientCount(): int { return count($this->clients); }
}

// 使用例(実際のサーバー起動はコメントアウト)
$server = new NonBlockingServer('127.0.0.1', 8080);

$server->onConnect(fn($c) =>
    echo "接続: {$c->address}" . PHP_EOL
);
$server->onMessage(fn($c, $msg) =>
    fwrite($c->socket, "Echo: {$msg}\n")
);
$server->onDisconnect(fn($c) =>
    echo "切断: {$c->address}" . PHP_EOL
);

echo "NonBlockingServer 設定完了(実際の起動は \$server->start())" . PHP_EOL;
// $server->start(); // 実際の起動はここで

例3:並行プロセス出力コレクター(ParallelProcessCollector)

proc_open() で複数のコマンドを並列実行し、stream_select() で各プロセスの stdout/stderr を同時に非同期収集するクラスです。

<?php

class ProcessResult
{
    public string $stdout   = '';
    public string $stderr   = '';
    public ?int   $exitCode = null;
    public float  $startedAt;
    public ?float $finishedAt = null;

    public function __construct(public readonly string $command)
    {
        $this->startedAt = microtime(true);
    }

    public function elapsedMs(): float
    {
        $end = $this->finishedAt ?? microtime(true);
        return ($end - $this->startedAt) * 1000;
    }
}

class ParallelProcessCollector
{
    private int $timeoutSec;

    public function __construct(int $timeoutSec = 30)
    {
        $this->timeoutSec = $timeoutSec;
    }

    /**
     * 複数コマンドを並列実行して全出力を収集する
     *
     * @param  string[] $commands
     * @return ProcessResult[]
     */
    public function runAll(array $commands): array
    {
        $processes = [];
        $results   = [];
        $pipes     = [];

        // 全コマンドを起動
        foreach ($commands as $i => $cmd) {
            $result = new ProcessResult($cmd);
            $spec   = [
                0 => ['pipe', 'r'],  // stdin
                1 => ['pipe', 'w'],  // stdout
                2 => ['pipe', 'w'],  // stderr
            ];

            $proc = proc_open($cmd, $spec, $pipe);
            if ($proc === false) continue;

            fclose($pipe[0]); // stdin は不要
            stream_set_blocking($pipe[1], false);
            stream_set_blocking($pipe[2], false);

            $processes[$i] = $proc;
            $pipes[$i]     = ['stdout' => $pipe[1], 'stderr' => $pipe[2]];
            $results[$i]   = $result;
        }

        $deadline = microtime(true) + $this->timeoutSec;

        // 全パイプが閉じるまでループ
        while (!empty($pipes) && microtime(true) < $deadline) {
            $read   = [];
            $idxMap = [];

            foreach ($pipes as $i => $pipe) {
                $read[] = $pipe['stdout'];
                $idxMap[(int)$pipe['stdout']] = ['i' => $i, 'type' => 'stdout'];
                $read[] = $pipe['stderr'];
                $idxMap[(int)$pipe['stderr']] = ['i' => $i, 'type' => 'stderr'];
            }

            $write  = null;
            $except = null;
            $changed = stream_select($read, $write, $except, 0, 100_000);

            if ($changed === false || $changed === 0) continue;

            foreach ($read as $stream) {
                $key  = (int)$stream;
                if (!isset($idxMap[$key])) continue;

                $i    = $idxMap[$key]['i'];
                $type = $idxMap[$key]['type'];

                $data = fread($stream, 4096);
                if ($data !== false && $data !== '') {
                    $results[$i]->{$type} .= $data;
                }
            }

            // 終了したプロセスをクリーンアップ
            foreach ($pipes as $i => $pipe) {
                if (feof($pipe['stdout']) && feof($pipe['stderr'])) {
                    fclose($pipe['stdout']);
                    fclose($pipe['stderr']);
                    $status = proc_get_status($processes[$i]);
                    $results[$i]->exitCode    = $status['exitcode'] ?? -1;
                    $results[$i]->finishedAt  = microtime(true);
                    proc_close($processes[$i]);
                    unset($pipes[$i]);
                }
            }
        }

        return $results;
    }
}

// 使用例
$collector = new ParallelProcessCollector(timeoutSec: 10);

$results = $collector->runAll([
    'echo "Hello from command 1"',
    'echo "Hello from command 2" && sleep 0',
    'php -r "echo PHP_VERSION;"',
]);

foreach ($results as $i => $result) {
    echo "=== コマンド {$i}: {$result->command} ===" . PHP_EOL;
    echo "  stdout  : " . trim($result->stdout)  . PHP_EOL;
    echo "  stderr  : " . trim($result->stderr)  . PHP_EOL;
    echo "  exitCode: " . $result->exitCode       . PHP_EOL;
    echo sprintf("  elapsed : %.2f ms\n", $result->elapsedMs());
}

// 出力例:
// === コマンド 0: echo "Hello from command 1" ===
//   stdout  : Hello from command 1
//   exitCode: 0
//   elapsed : 5.23 ms

例4:タイムアウト付きストリームポーリング(StreamPoller)

stream_select()$seconds = 0(即時リターン)を使ったノンブロッキングポーリングと、タイムアウト付きブロッキング待機を切り替えられるポーラークラスです。

<?php

class PollResult
{
    /** @param resource[] $readable */
    public function __construct(
        public readonly array $readable,
        public readonly array $writable,
        public readonly bool  $timedOut,
        public readonly float $elapsedMs,
    ) {}

    public function hasReadable(): bool  { return !empty($this->readable); }
    public function hasWritable(): bool  { return !empty($this->writable); }
    public function isEmpty(): bool      { return $this->timedOut || (!$this->hasReadable() && !$this->hasWritable()); }
}

class StreamPoller
{
    /**
     * ノンブロッキングポーリング($seconds = 0 で即時リターン)
     *
     * @param resource[] $streams
     */
    public function pollNow(array $streams): PollResult
    {
        return $this->poll($streams, 0, 0);
    }

    /**
     * 指定時間だけ待機してストリームの変化を確認する
     *
     * @param resource[] $streams
     */
    public function pollWait(array $streams, int $timeoutMs = 1000): PollResult
    {
        $sec  = intdiv($timeoutMs, 1000);
        $usec = ($timeoutMs % 1000) * 1000;
        return $this->poll($streams, $sec, $usec);
    }

    /**
     * ストリームに読み取り可能データが現れるまでビジーウェイトする
     *
     * @param resource[] $streams
     * @param int        $maxWaitMs  最大待機ミリ秒
     * @param int        $intervalMs ポーリング間隔ミリ秒
     */
    public function busyWait(array $streams, int $maxWaitMs = 5000, int $intervalMs = 10): PollResult
    {
        $deadline = microtime(true) + ($maxWaitMs / 1000);

        while (microtime(true) < $deadline) {
            $result = $this->pollNow($streams);
            if ($result->hasReadable()) return $result;
            usleep($intervalMs * 1000);
        }

        return new PollResult([], [], true, $maxWaitMs);
    }

    /**
     * コールバックが true を返すまでポーリングを繰り返す
     *
     * @param resource[]    $streams
     * @param callable      $callback `callable(PollResult): bool` → true で停止
     */
    public function pollUntil(array $streams, callable $callback, int $timeoutMs = 100, int $maxIterations = 100): void
    {
        for ($i = 0; $i < $maxIterations; $i++) {
            $result = $this->pollWait($streams, $timeoutMs);
            if ($callback($result)) break;
        }
    }

    private function poll(array $streams, int $sec, int $usec): PollResult
    {
        $start  = microtime(true);
        $read   = $streams;
        $write  = null;
        $except = null;

        $changed = stream_select($read, $write, $except, $sec, $usec);
        $elapsed = (microtime(true) - $start) * 1000;

        if ($changed === false) {
            throw new \RuntimeException('stream_select() が失敗しました');
        }

        return new PollResult(
            readable:  $read ?? [],
            writable:  [],
            timedOut:  $changed === 0,
            elapsedMs: $elapsed,
        );
    }
}

// 使用例
$poller = new StreamPoller();

// メモリストリームを準備
$fp1 = fopen('php://memory', 'r+');
$fp2 = fopen('php://memory', 'r+');
fwrite($fp1, "ready");
rewind($fp1);
// fp2 は空(読み取り不可)

// ノンブロッキングポーリング
$result = $poller->pollNow([$fp1, $fp2]);
echo "即時ポーリング:" . PHP_EOL;
echo "  読み取り可能数: " . count($result->readable) . PHP_EOL;
echo "  タイムアウト: "   . ($result->timedOut ? 'YES' : 'NO') . PHP_EOL;

// 待機付きポーリング
$result = $poller->pollWait([$fp1, $fp2], timeoutMs: 500);
echo PHP_EOL . "500ms待機ポーリング:" . PHP_EOL;
echo "  読み取り可能数: " . count($result->readable) . PHP_EOL;
echo sprintf("  経過時間: %.2f ms\n", $result->elapsedMs);

// pollUntil の例
$poller->pollUntil([$fp1], function (PollResult $r): bool {
    if ($r->hasReadable()) {
        echo "データ受信!ストリーム数: " . count($r->readable) . PHP_EOL;
        return true;
    }
    return false;
}, timeoutMs: 100);

fclose($fp1);
fclose($fp2);

例5:チャットルームサーバー(ChatRoomServer)

stream_select() を使って、複数クライアント間でメッセージをブロードキャストするシンプルなチャットルームサーバーです。

<?php

class ChatMessage
{
    public function __construct(
        public readonly string $from,
        public readonly string $body,
        public readonly float  $timestamp,
    ) {}

    public function format(): string
    {
        return sprintf("[%s] %s: %s\n", date('H:i:s', (int)$this->timestamp), $this->from, $this->body);
    }
}

class ChatRoomServer
{
    private $serverSocket;
    /** @var array<string, resource> クライアントID => ソケット */
    private array $sockets  = [];
    /** @var array<string, string> クライアントID => ニックネーム */
    private array $nicks    = [];
    /** @var ChatMessage[] */
    private array $history  = [];
    private bool  $running  = false;
    private int   $maxHistory;

    public function __construct(
        private string $host       = '127.0.0.1',
        private int    $port       = 9000,
        int            $maxHistory = 50
    ) {
        $this->maxHistory = $maxHistory;
    }

    public function start(): void
    {
        $this->serverSocket = stream_socket_server(
            "tcp://{$this->host}:{$this->port}",
            $errno, $errstr
        );
        if (!$this->serverSocket) {
            throw new \RuntimeException("起動失敗: ({$errno}) {$errstr}");
        }
        stream_set_blocking($this->serverSocket, false);
        $this->running = true;

        echo "チャットサーバー起動: tcp://{$this->host}:{$this->port}" . PHP_EOL;
        $this->loop();
    }

    private function loop(): void
    {
        while ($this->running) {
            $read   = array_merge([$this->serverSocket], array_values($this->sockets));
            $write  = null;
            $except = null;

            if (stream_select($read, $write, $except, 1) === false) break;

            foreach ($read as $socket) {
                if ($socket === $this->serverSocket) {
                    $this->accept();
                } else {
                    $this->receive($socket);
                }
            }
        }
        foreach ($this->sockets as $socket) fclose($socket);
        fclose($this->serverSocket);
    }

    private function accept(): void
    {
        $client = stream_socket_accept($this->serverSocket, 0, $addr);
        if (!$client) return;

        $id = uniqid('client_');
        stream_set_blocking($client, false);
        $this->sockets[$id] = $client;
        $this->nicks[$id]   = 'Guest_' . substr($id, -4);

        fwrite($client, "ようこそ!ニックネームを設定するには /nick <名前> と入力してください\n");

        // 最近の履歴を送信
        foreach (array_slice($this->history, -10) as $msg) {
            fwrite($client, $msg->format());
        }

        $this->broadcast("{$this->nicks[$id]} が入室しました", 'System', except: $id);
    }

    private function receive($socket): void
    {
        $id = array_search($socket, $this->sockets, true);
        if ($id === false) return;

        $data = fread($socket, 1024);

        if ($data === false || feof($socket)) {
            $nick = $this->nicks[$id];
            fclose($socket);
            unset($this->sockets[$id], $this->nicks[$id]);
            $this->broadcast("{$nick} が退室しました", 'System');
            return;
        }

        $text = trim($data);
        if ($text === '') return;

        // コマンド処理
        if (str_starts_with($text, '/nick ')) {
            $newNick = substr($text, 6);
            $oldNick = $this->nicks[$id];
            $this->nicks[$id] = htmlspecialchars($newNick, ENT_QUOTES);
            $this->broadcast("{$oldNick} が {$this->nicks[$id]} に改名しました", 'System');
            return;
        }

        $this->broadcast($text, $this->nicks[$id], except: null, from: $id);
    }

    private function broadcast(string $body, string $from, ?string $except = null, ?string $from_id = null): void
    {
        $msg = new ChatMessage($from, $body, microtime(true));
        $this->history[] = $msg;
        if (count($this->history) > $this->maxHistory) {
            array_shift($this->history);
        }

        foreach ($this->sockets as $id => $socket) {
            if ($id === $except) continue;
            fwrite($socket, $msg->format());
        }
    }

    public function stop(): void { $this->running = false; }
}

// 使用例
$chat = new ChatRoomServer('127.0.0.1', 9000);
echo "ChatRoomServer 設定完了(\$chat->start() で起動)" . PHP_EOL;
// $chat->start();

例6:ストリーム多重化パイプライン(StreamMuxPipeline)

複数の入力ストリームを stream_select() で順次読み取り、変換処理を施して単一の出力ストリームに集約するパイプラインです。

<?php

class StreamMuxPipeline
{
    /** @var resource[] */
    private array    $inputs = [];
    /** @var callable[] 変換関数 stream => string */
    private array    $transformers = [];
    private int      $bufferSize;
    private int      $timeoutMs;

    public function __construct(int $bufferSize = 4096, int $timeoutMs = 1000)
    {
        $this->bufferSize = $bufferSize;
        $this->timeoutMs  = $timeoutMs;
    }

    /**
     * 入力ストリームを追加する(オプションで変換関数を指定)
     */
    public function addInput($stream, ?callable $transformer = null): void
    {
        $idx = count($this->inputs);
        $this->inputs[$idx]       = $stream;
        $this->transformers[$idx] = $transformer ?? fn($data) => $data;
    }

    /**
     * 全入力を多重化して出力ストリームに書き込む
     */
    public function muxTo($output): int
    {
        $totalBytes = 0;
        $remaining  = $this->inputs;
        $transformers = $this->transformers;

        $sec  = intdiv($this->timeoutMs, 1000);
        $usec = ($this->timeoutMs % 1000) * 1000;

        while (!empty($remaining)) {
            $read   = array_values($remaining);
            $write  = null;
            $except = null;

            $changed = stream_select($read, $write, $except, $sec, $usec);

            if ($changed === false) throw new \RuntimeException('stream_select エラー');
            if ($changed === 0)     break;

            foreach ($read as $stream) {
                // 元のインデックスを特定
                $idx = array_search($stream, $this->inputs, true);

                $data = fread($stream, $this->bufferSize);
                if ($data === false || $data === '') {
                    // ストリーム終了
                    unset($remaining[$idx]);
                    continue;
                }

                // 変換を適用して出力
                $transformed  = ($transformers[$idx])($data);
                $written       = fwrite($output, $transformed);
                $totalBytes   += $written !== false ? $written : 0;
            }

            // EOF になったストリームを除去
            foreach ($remaining as $idx => $stream) {
                if (feof($stream)) unset($remaining[$idx]);
            }
        }

        return $totalBytes;
    }
}

// 使用例
$pipeline = new StreamMuxPipeline(bufferSize: 256);

// 3つの入力ストリーム(それぞれ異なる変換を適用)
$s1 = fopen('php://memory', 'r+');
fwrite($s1, "hello world\n");
rewind($s1);
$pipeline->addInput($s1, fn($d) => strtoupper($d));       // 大文字変換

$s2 = fopen('php://memory', 'r+');
fwrite($s2, "foo bar baz\n");
rewind($s2);
$pipeline->addInput($s2, fn($d) => str_rot13($d));        // ROT13 変換

$s3 = fopen('php://memory', 'r+');
fwrite($s3, "1234567890\n");
rewind($s3);
$pipeline->addInput($s3, fn($d) => strrev($d));           // 逆順変換

// 出力先
$output = fopen('php://memory', 'w+');
$bytes  = $pipeline->muxTo($output);

echo "合計書き込み: {$bytes} バイト" . PHP_EOL;
echo stream_get_contents($output, offset: 0) . PHP_EOL;

fclose($s1); fclose($s2); fclose($s3); fclose($output);

// 出力例(到着順に依存):
// 合計書き込み: 36 バイト
// HELLO WORLD
// sbb one onm
// 0987654321

例7:タイムアウトウォッチドッグ(StreamWatchdog)

stream_select() の戻り値(0 = タイムアウト)を使って、一定時間データが来ないストリームを検出し、自動的に再接続やアラートを発生させるウォッチドッグです。

<?php

class WatchdogEvent
{
    const TIMEOUT    = 'timeout';
    const RECOVERED  = 'recovered';
    const DATA       = 'data';
    const ERROR      = 'error';

    public function __construct(
        public readonly string $type,
        public readonly int    $streamIndex,
        public readonly mixed  $payload,
        public readonly float  $timestamp,
    ) {}
}

class StreamWatchdog
{
    /** @var resource[] */
    private array $streams;

    /** @var float[] ストリームインデックス => 最終受信時刻 */
    private array $lastSeen = [];

    /** @var bool[] ストリームインデックス => タイムアウト状態か */
    private array $timedOut = [];

    private int      $idleTimeoutMs;
    private int      $pollIntervalMs;
    private bool     $running = false;

    /** @var callable|null */
    private $onEvent;

    /**
     * @param resource[] $streams
     */
    public function __construct(
        array $streams,
        int   $idleTimeoutMs  = 5000,
        int   $pollIntervalMs = 500
    ) {
        $this->streams        = $streams;
        $this->idleTimeoutMs  = $idleTimeoutMs;
        $this->pollIntervalMs = $pollIntervalMs;

        $now = microtime(true);
        foreach (array_keys($streams) as $idx) {
            $this->lastSeen[$idx] = $now;
            $this->timedOut[$idx] = false;
        }
    }

    public function onEvent(callable $cb): void { $this->onEvent = $cb; }

    /**
     * 監視ループを開始する
     */
    public function watch(int $maxIterations = 100): void
    {
        $this->running = true;
        $sec  = intdiv($this->pollIntervalMs, 1000);
        $usec = ($this->pollIntervalMs % 1000) * 1000;

        for ($iter = 0; $iter < $maxIterations && $this->running; $iter++) {
            $read   = array_values($this->streams);
            $write  = null;
            $except = null;

            $changed = stream_select($read, $write, $except, $sec, $usec);

            if ($changed === false) {
                $this->emit(new WatchdogEvent(WatchdogEvent::ERROR, -1, 'stream_select failed', microtime(true)));
                break;
            }

            $now = microtime(true);

            // データが来たストリームの最終受信時刻を更新
            foreach ($read as $stream) {
                $idx = array_search($stream, $this->streams, true);
                if ($idx === false) continue;

                $data = fread($stream, 4096);
                $this->lastSeen[$idx] = $now;

                // タイムアウト状態から回復
                if ($this->timedOut[$idx]) {
                    $this->timedOut[$idx] = false;
                    $this->emit(new WatchdogEvent(WatchdogEvent::RECOVERED, (int)$idx, $data, $now));
                } else {
                    $this->emit(new WatchdogEvent(WatchdogEvent::DATA, (int)$idx, $data, $now));
                }
            }

            // アイドルタイムアウトチェック
            foreach ($this->streams as $idx => $stream) {
                $idle = ($now - $this->lastSeen[$idx]) * 1000;
                if ($idle >= $this->idleTimeoutMs && !$this->timedOut[$idx]) {
                    $this->timedOut[$idx] = true;
                    $this->emit(new WatchdogEvent(
                        WatchdogEvent::TIMEOUT, (int)$idx,
                        "アイドル {$idle}ms(閾値: {$this->idleTimeoutMs}ms)",
                        $now
                    ));
                }
            }
        }
    }

    private function emit(WatchdogEvent $event): void
    {
        if ($this->onEvent) {
            ($this->onEvent)($event);
        }
    }

    public function stop(): void { $this->running = false; }
}

// 使用例
$s1 = fopen('php://memory', 'r+');
$s2 = fopen('php://memory', 'r+');

// s1 にはデータあり、s2 はアイドル状態
fwrite($s1, "active data");
rewind($s1);

$watchdog = new StreamWatchdog(
    streams:        [$s1, $s2],
    idleTimeoutMs:  100,  // テスト用に短く
    pollIntervalMs: 50
);

$watchdog->onEvent(function (WatchdogEvent $e) {
    $labels = [
        WatchdogEvent::DATA      => 'DATA',
        WatchdogEvent::TIMEOUT   => 'TIMEOUT',
        WatchdogEvent::RECOVERED => 'RECOVERED',
        WatchdogEvent::ERROR     => 'ERROR',
    ];
    echo "[{$labels[$e->type]}] stream[{$e->streamIndex}]: "
        . (is_string($e->payload) ? substr($e->payload, 0, 50) : json_encode($e->payload))
        . PHP_EOL;
});

$watchdog->watch(maxIterations: 5);

fclose($s1);
fclose($s2);

// 出力例:
// [DATA] stream[0]: active data
// [TIMEOUT] stream[1]: アイドル 102ms(閾値: 100ms)

関連する関数との比較

関数役割
stream_select()複数ストリームを同時監視して変化があったものを返す(多重I/O)
stream_set_blocking()ストリームをブロッキング / ノンブロッキングモードに切り替える
stream_set_timeout()個別ストリームの読み取りタイムアウトを設定する
stream_get_meta_data()ストリームのタイムアウト・EOF・ブロッキング状態を確認する
socket_select()socket_create() で作成したソケットリソースの多重監視(stream_select と類似だが別系統)

注意点とベストプラクティス

1. 配列は参照渡しで上書きされる

stream_select()$read$write$except書き換えます。元のストリームリストを保持したい場合は事前にコピーを取りましょう。

$allStreams = [$fp1, $fp2, $fp3];

// ループのたびにコピーして渡す
while (true) {
    $read   = $allStreams; // ← コピー!
    $write  = null;
    $except = null;
    $n = stream_select($read, $write, $except, 1);
    // $read は変化したものだけになる
    // $allStreams は変化しない
}

2. $seconds = null は無限待機

null を渡すとデータが来るまで永遠にブロックします。サーバーループでは 0(即時)または適切な秒数を指定しましょう。

// 無限待機(注意して使う)
stream_select($read, $write, $except, null);

// 1秒タイムアウト(推奨)
stream_select($read, $write, $except, 1);

// 即時リターン(ノンブロッキング)
stream_select($read, $write, $except, 0, 0);

3. Windows では $seconds = 0 が正常動作しない場合がある

Windows 環境では $seconds = 0 のノンブロッキング動作に制限があります。クロスプラットフォームな実装では小さい値(0, 200000 など)を使うか、Unix 環境を前提にしましょう。

4. 空配列の渡し方

$write$except を監視しない場合は null または空配列を渡します。ただし null の方が意図が明確でパフォーマンスも良好です。

$write  = null; // 推奨
$except = null;
stream_select($read, $write, $except, 1);

まとめ

ポイント内容
基本動作複数ストリームを同時監視し、変化が起きたものだけを配列に残して返す
引数$read$write$except(参照渡し)、$seconds$microseconds(タイムアウト)
返り値変化したストリーム数(0 = タイムアウト、false = エラー)
配列の扱い呼び出し前にコピーを取る。元リストは毎ループ保持する必要がある
タイムアウトnull = 無限待機、0 = 即時リターン、正数 = 秒指定
活用シーン多重TCPサーバー・並列プロセス出力収集・チャットサーバー・ウォッチドッグ・ストリームMux

stream_select() は PHPで本格的なノンブロッキングI/Oを実現するための中核関数です。イベントループ・多重接続サーバー・並列処理など、シングルプロセスで高いI/O並行性が求められるあらゆる場面で活用できます。

タイトルとURLをコピーしました