はじめに
PHPで複数のソケットやファイルディスクリプタを同時に扱う場合、単純にループで fread() を繰り返すと、どれか1つのストリームでデータ待ちが発生した瞬間に全体がブロックされてしまいます。
stream_select() は、複数のストリームを同時に監視し、読み取り・書き込み・例外のいずれかが可能になったストリームだけを返す関数です。Unix の select() システムコールをラップしており、PHPでノンブロッキングな多重I/Oを実現するための中心的な関数です。
関数の基本情報
| 項目 | 内容 |
|---|---|
| 関数名 | stream_select() |
| 対応バージョン | PHP 4.3.0 以降 |
| 返り値 | int(変化したストリーム数)/ false(エラー時) |
| カテゴリ | ストリーム関数 |
構文
stream_select(
?array &$read,
?array &$write,
?array &$except,
?int $seconds,
int $microseconds = 0
): int|false
パラメータ
| パラメータ | 型 | 説明 |
|---|---|---|
&$read | ?array | 読み取り可能になるまで監視するストリームの配列(参照渡し) |
&$write | ?array | 書き込み可能になるまで監視するストリームの配列(参照渡し) |
&$except | ?array | 例外(OOB データなど)を監視するストリームの配列(参照渡し) |
$seconds | ?int | タイムアウト秒数。0 で即時リターン(ノンブロッキング)、null で無制限待機 |
$microseconds | int | タイムアウトのマイクロ秒部分(デフォルト 0) |
返り値
int:変化したストリームの総数(タイムアウトなら0)false:エラーが発生した場合
重要:
$read・$write・$exceptは参照渡しです。関数呼び出し後、各配列には「実際に変化が起きたストリームのみ」が残ります(変化がなかったものは除去されます)。
基本的な使い方
<?php
// 2つのストリームを同時に監視する
$fp1 = fopen('php://memory', 'r+');
$fp2 = fopen('php://memory', 'r+');
fwrite($fp1, "stream1 data");
fwrite($fp2, "stream2 data");
rewind($fp1);
rewind($fp2);
$read = [$fp1, $fp2];
$write = null;
$except = null;
// 最大1秒待機して読み取り可能なストリームを取得
$changed = stream_select($read, $write, $except, 1);
if ($changed === false) {
echo "エラーが発生しました" . PHP_EOL;
} elseif ($changed === 0) {
echo "タイムアウト:読み取り可能なストリームなし" . PHP_EOL;
} else {
echo "{$changed} 個のストリームが読み取り可能" . PHP_EOL;
foreach ($read as $fp) {
echo fread($fp, 1024) . PHP_EOL;
}
}
fclose($fp1);
fclose($fp2);
stream_select() の動作フロー
呼び出し前:
$read = [stream1, stream2, stream3] ← 監視対象
↓ stream_select() が実行される
監視期間中:
- stream2 にデータが到着
- stream3 にデータが到着
- stream1 はまだデータなし
- タイムアウト or 変化検出で返却
呼び出し後:
$read = [stream2, stream3] ← 変化したものだけ残る
戻り値 = 2 ← 変化したストリーム数
実践的なクラスベースの活用例
例1:マルチストリームリーダー(MultiStreamReader)
複数のストリームを stream_select() で同時監視し、読み取り可能になった順にデータを収集する汎用リーダーです。
<?php
class StreamReadResult
{
public function __construct(
public readonly int $index,
public readonly string $data,
public readonly float $receivedAt,
) {}
}
class MultiStreamReader
{
/** @var resource[] */
private array $streams = [];
private int $timeoutSec;
private int $timeoutUsec;
private int $bufferSize;
public function __construct(
int $timeoutSec = 5,
int $timeoutUsec = 0,
int $bufferSize = 8192
) {
$this->timeoutSec = $timeoutSec;
$this->timeoutUsec = $timeoutUsec;
$this->bufferSize = $bufferSize;
}
/**
* 監視対象ストリームを追加する
*/
public function add($stream): int
{
$this->streams[] = $stream;
return count($this->streams) - 1;
}
/**
* 全ストリームを監視し、読み取り可能になったものからデータを収集する
*
* @return StreamReadResult[]
*/
public function readAll(): array
{
$results = [];
$remaining = $this->streams;
$indexMap = array_flip(array_map('intval', array_keys($this->streams)));
while (!empty($remaining)) {
$read = $remaining;
$write = null;
$except = null;
$changed = stream_select($read, $write, $except, $this->timeoutSec, $this->timeoutUsec);
if ($changed === false) {
throw new \RuntimeException('stream_select() でエラーが発生しました');
}
if ($changed === 0) {
break; // タイムアウト
}
foreach ($read as $stream) {
$data = '';
while (!feof($stream)) {
$chunk = fread($stream, $this->bufferSize);
if ($chunk === false || $chunk === '') break;
$data .= $chunk;
}
// 元のインデックスを特定
$idx = array_search($stream, $this->streams, true);
$results[] = new StreamReadResult(
index: $idx !== false ? (int)$idx : -1,
data: $data,
receivedAt: microtime(true),
);
// 完了したストリームを remaining から除去
$remaining = array_filter(
$remaining,
fn($s) => $s !== $stream
);
}
}
return $results;
}
}
// 使用例
$reader = new MultiStreamReader(timeoutSec: 2);
// 3つのメモリストリームにデータを書き込む
$streams = [];
foreach (['Alpha', 'Beta', 'Gamma'] as $name) {
$fp = fopen('php://memory', 'r+');
fwrite($fp, "データ from {$name}");
rewind($fp);
$streams[] = $fp;
$reader->add($fp);
}
$results = $reader->readAll();
foreach ($results as $result) {
echo "ストリーム[{$result->index}]: {$result->data}" . PHP_EOL;
}
array_map('fclose', $streams);
// 出力:
// ストリーム[0]: データ from Alpha
// ストリーム[1]: データ from Beta
// ストリーム[2]: データ from Gamma
例2:ノンブロッキングTCPサーバー(NonBlockingServer)
stream_select() を使って、複数のクライアント接続を1つのプロセス・1スレッドで並行処理するTCPサーバーです。
<?php
class ClientConnection
{
public string $buffer = '';
public float $connectedAt;
public function __construct(
public readonly $socket,
public readonly string $address,
) {
$this->connectedAt = microtime(true);
}
}
class NonBlockingServer
{
private $serverSocket;
/** @var ClientConnection[] */
private array $clients = [];
private bool $running = false;
private int $maxClients;
/** @var callable|null */
private $onConnect;
/** @var callable|null */
private $onMessage;
/** @var callable|null */
private $onDisconnect;
public function __construct(
private string $host = '127.0.0.1',
private int $port = 8080,
int $maxClients = 100
) {
$this->maxClients = $maxClients;
}
public function onConnect(callable $cb): void { $this->onConnect = $cb; }
public function onMessage(callable $cb): void { $this->onMessage = $cb; }
public function onDisconnect(callable $cb): void { $this->onDisconnect = $cb; }
/**
* サーバーを起動してイベントループを開始する
*/
public function start(): void
{
$this->serverSocket = stream_socket_server(
"tcp://{$this->host}:{$this->port}",
$errno,
$errstr
);
if (!$this->serverSocket) {
throw new \RuntimeException("サーバー起動失敗: ({$errno}) {$errstr}");
}
stream_set_blocking($this->serverSocket, false);
$this->running = true;
echo "サーバー起動: tcp://{$this->host}:{$this->port}" . PHP_EOL;
$this->eventLoop();
}
private function eventLoop(): void
{
while ($this->running) {
// 監視リスト:サーバーソケット + 全クライアント
$read = array_merge(
[$this->serverSocket],
array_map(fn($c) => $c->socket, $this->clients)
);
$write = null;
$except = null;
$changed = stream_select($read, $write, $except, 0, 200_000); // 200ms
if ($changed === false) break;
if ($changed === 0) continue; // タイムアウト、次のループへ
foreach ($read as $socket) {
if ($socket === $this->serverSocket) {
// 新しい接続を受け付ける
$this->acceptClient();
} else {
// 既存クライアントからのデータを処理
$this->handleClient($socket);
}
}
}
fclose($this->serverSocket);
}
private function acceptClient(): void
{
if (count($this->clients) >= $this->maxClients) return;
$clientSocket = stream_socket_accept($this->serverSocket, 0, $address);
if (!$clientSocket) return;
stream_set_blocking($clientSocket, false);
$client = new ClientConnection($clientSocket, $address);
$this->clients[] = $client;
if ($this->onConnect) {
($this->onConnect)($client);
}
}
private function handleClient($socket): void
{
$client = $this->findClient($socket);
if (!$client) return;
$data = fread($socket, 4096);
if ($data === false || feof($socket)) {
$this->disconnectClient($client);
return;
}
$client->buffer .= $data;
// 改行でメッセージを区切る
while (($pos = strpos($client->buffer, "\n")) !== false) {
$message = substr($client->buffer, 0, $pos);
$client->buffer = substr($client->buffer, $pos + 1);
if ($this->onMessage) {
($this->onMessage)($client, trim($message));
}
}
}
private function disconnectClient(ClientConnection $client): void
{
if ($this->onDisconnect) {
($this->onDisconnect)($client);
}
fclose($client->socket);
$this->clients = array_filter(
$this->clients,
fn($c) => $c !== $client
);
}
private function findClient($socket): ?ClientConnection
{
foreach ($this->clients as $client) {
if ($client->socket === $socket) return $client;
}
return null;
}
public function stop(): void { $this->running = false; }
public function clientCount(): int { return count($this->clients); }
}
// 使用例(実際のサーバー起動はコメントアウト)
$server = new NonBlockingServer('127.0.0.1', 8080);
$server->onConnect(fn($c) =>
echo "接続: {$c->address}" . PHP_EOL
);
$server->onMessage(fn($c, $msg) =>
fwrite($c->socket, "Echo: {$msg}\n")
);
$server->onDisconnect(fn($c) =>
echo "切断: {$c->address}" . PHP_EOL
);
echo "NonBlockingServer 設定完了(実際の起動は \$server->start())" . PHP_EOL;
// $server->start(); // 実際の起動はここで
例3:並行プロセス出力コレクター(ParallelProcessCollector)
proc_open() で複数のコマンドを並列実行し、stream_select() で各プロセスの stdout/stderr を同時に非同期収集するクラスです。
<?php
class ProcessResult
{
public string $stdout = '';
public string $stderr = '';
public ?int $exitCode = null;
public float $startedAt;
public ?float $finishedAt = null;
public function __construct(public readonly string $command)
{
$this->startedAt = microtime(true);
}
public function elapsedMs(): float
{
$end = $this->finishedAt ?? microtime(true);
return ($end - $this->startedAt) * 1000;
}
}
class ParallelProcessCollector
{
private int $timeoutSec;
public function __construct(int $timeoutSec = 30)
{
$this->timeoutSec = $timeoutSec;
}
/**
* 複数コマンドを並列実行して全出力を収集する
*
* @param string[] $commands
* @return ProcessResult[]
*/
public function runAll(array $commands): array
{
$processes = [];
$results = [];
$pipes = [];
// 全コマンドを起動
foreach ($commands as $i => $cmd) {
$result = new ProcessResult($cmd);
$spec = [
0 => ['pipe', 'r'], // stdin
1 => ['pipe', 'w'], // stdout
2 => ['pipe', 'w'], // stderr
];
$proc = proc_open($cmd, $spec, $pipe);
if ($proc === false) continue;
fclose($pipe[0]); // stdin は不要
stream_set_blocking($pipe[1], false);
stream_set_blocking($pipe[2], false);
$processes[$i] = $proc;
$pipes[$i] = ['stdout' => $pipe[1], 'stderr' => $pipe[2]];
$results[$i] = $result;
}
$deadline = microtime(true) + $this->timeoutSec;
// 全パイプが閉じるまでループ
while (!empty($pipes) && microtime(true) < $deadline) {
$read = [];
$idxMap = [];
foreach ($pipes as $i => $pipe) {
$read[] = $pipe['stdout'];
$idxMap[(int)$pipe['stdout']] = ['i' => $i, 'type' => 'stdout'];
$read[] = $pipe['stderr'];
$idxMap[(int)$pipe['stderr']] = ['i' => $i, 'type' => 'stderr'];
}
$write = null;
$except = null;
$changed = stream_select($read, $write, $except, 0, 100_000);
if ($changed === false || $changed === 0) continue;
foreach ($read as $stream) {
$key = (int)$stream;
if (!isset($idxMap[$key])) continue;
$i = $idxMap[$key]['i'];
$type = $idxMap[$key]['type'];
$data = fread($stream, 4096);
if ($data !== false && $data !== '') {
$results[$i]->{$type} .= $data;
}
}
// 終了したプロセスをクリーンアップ
foreach ($pipes as $i => $pipe) {
if (feof($pipe['stdout']) && feof($pipe['stderr'])) {
fclose($pipe['stdout']);
fclose($pipe['stderr']);
$status = proc_get_status($processes[$i]);
$results[$i]->exitCode = $status['exitcode'] ?? -1;
$results[$i]->finishedAt = microtime(true);
proc_close($processes[$i]);
unset($pipes[$i]);
}
}
}
return $results;
}
}
// 使用例
$collector = new ParallelProcessCollector(timeoutSec: 10);
$results = $collector->runAll([
'echo "Hello from command 1"',
'echo "Hello from command 2" && sleep 0',
'php -r "echo PHP_VERSION;"',
]);
foreach ($results as $i => $result) {
echo "=== コマンド {$i}: {$result->command} ===" . PHP_EOL;
echo " stdout : " . trim($result->stdout) . PHP_EOL;
echo " stderr : " . trim($result->stderr) . PHP_EOL;
echo " exitCode: " . $result->exitCode . PHP_EOL;
echo sprintf(" elapsed : %.2f ms\n", $result->elapsedMs());
}
// 出力例:
// === コマンド 0: echo "Hello from command 1" ===
// stdout : Hello from command 1
// exitCode: 0
// elapsed : 5.23 ms
例4:タイムアウト付きストリームポーリング(StreamPoller)
stream_select() の $seconds = 0(即時リターン)を使ったノンブロッキングポーリングと、タイムアウト付きブロッキング待機を切り替えられるポーラークラスです。
<?php
class PollResult
{
/** @param resource[] $readable */
public function __construct(
public readonly array $readable,
public readonly array $writable,
public readonly bool $timedOut,
public readonly float $elapsedMs,
) {}
public function hasReadable(): bool { return !empty($this->readable); }
public function hasWritable(): bool { return !empty($this->writable); }
public function isEmpty(): bool { return $this->timedOut || (!$this->hasReadable() && !$this->hasWritable()); }
}
class StreamPoller
{
/**
* ノンブロッキングポーリング($seconds = 0 で即時リターン)
*
* @param resource[] $streams
*/
public function pollNow(array $streams): PollResult
{
return $this->poll($streams, 0, 0);
}
/**
* 指定時間だけ待機してストリームの変化を確認する
*
* @param resource[] $streams
*/
public function pollWait(array $streams, int $timeoutMs = 1000): PollResult
{
$sec = intdiv($timeoutMs, 1000);
$usec = ($timeoutMs % 1000) * 1000;
return $this->poll($streams, $sec, $usec);
}
/**
* ストリームに読み取り可能データが現れるまでビジーウェイトする
*
* @param resource[] $streams
* @param int $maxWaitMs 最大待機ミリ秒
* @param int $intervalMs ポーリング間隔ミリ秒
*/
public function busyWait(array $streams, int $maxWaitMs = 5000, int $intervalMs = 10): PollResult
{
$deadline = microtime(true) + ($maxWaitMs / 1000);
while (microtime(true) < $deadline) {
$result = $this->pollNow($streams);
if ($result->hasReadable()) return $result;
usleep($intervalMs * 1000);
}
return new PollResult([], [], true, $maxWaitMs);
}
/**
* コールバックが true を返すまでポーリングを繰り返す
*
* @param resource[] $streams
* @param callable $callback `callable(PollResult): bool` → true で停止
*/
public function pollUntil(array $streams, callable $callback, int $timeoutMs = 100, int $maxIterations = 100): void
{
for ($i = 0; $i < $maxIterations; $i++) {
$result = $this->pollWait($streams, $timeoutMs);
if ($callback($result)) break;
}
}
private function poll(array $streams, int $sec, int $usec): PollResult
{
$start = microtime(true);
$read = $streams;
$write = null;
$except = null;
$changed = stream_select($read, $write, $except, $sec, $usec);
$elapsed = (microtime(true) - $start) * 1000;
if ($changed === false) {
throw new \RuntimeException('stream_select() が失敗しました');
}
return new PollResult(
readable: $read ?? [],
writable: [],
timedOut: $changed === 0,
elapsedMs: $elapsed,
);
}
}
// 使用例
$poller = new StreamPoller();
// メモリストリームを準備
$fp1 = fopen('php://memory', 'r+');
$fp2 = fopen('php://memory', 'r+');
fwrite($fp1, "ready");
rewind($fp1);
// fp2 は空(読み取り不可)
// ノンブロッキングポーリング
$result = $poller->pollNow([$fp1, $fp2]);
echo "即時ポーリング:" . PHP_EOL;
echo " 読み取り可能数: " . count($result->readable) . PHP_EOL;
echo " タイムアウト: " . ($result->timedOut ? 'YES' : 'NO') . PHP_EOL;
// 待機付きポーリング
$result = $poller->pollWait([$fp1, $fp2], timeoutMs: 500);
echo PHP_EOL . "500ms待機ポーリング:" . PHP_EOL;
echo " 読み取り可能数: " . count($result->readable) . PHP_EOL;
echo sprintf(" 経過時間: %.2f ms\n", $result->elapsedMs);
// pollUntil の例
$poller->pollUntil([$fp1], function (PollResult $r): bool {
if ($r->hasReadable()) {
echo "データ受信!ストリーム数: " . count($r->readable) . PHP_EOL;
return true;
}
return false;
}, timeoutMs: 100);
fclose($fp1);
fclose($fp2);
例5:チャットルームサーバー(ChatRoomServer)
stream_select() を使って、複数クライアント間でメッセージをブロードキャストするシンプルなチャットルームサーバーです。
<?php
class ChatMessage
{
public function __construct(
public readonly string $from,
public readonly string $body,
public readonly float $timestamp,
) {}
public function format(): string
{
return sprintf("[%s] %s: %s\n", date('H:i:s', (int)$this->timestamp), $this->from, $this->body);
}
}
class ChatRoomServer
{
private $serverSocket;
/** @var array<string, resource> クライアントID => ソケット */
private array $sockets = [];
/** @var array<string, string> クライアントID => ニックネーム */
private array $nicks = [];
/** @var ChatMessage[] */
private array $history = [];
private bool $running = false;
private int $maxHistory;
public function __construct(
private string $host = '127.0.0.1',
private int $port = 9000,
int $maxHistory = 50
) {
$this->maxHistory = $maxHistory;
}
public function start(): void
{
$this->serverSocket = stream_socket_server(
"tcp://{$this->host}:{$this->port}",
$errno, $errstr
);
if (!$this->serverSocket) {
throw new \RuntimeException("起動失敗: ({$errno}) {$errstr}");
}
stream_set_blocking($this->serverSocket, false);
$this->running = true;
echo "チャットサーバー起動: tcp://{$this->host}:{$this->port}" . PHP_EOL;
$this->loop();
}
private function loop(): void
{
while ($this->running) {
$read = array_merge([$this->serverSocket], array_values($this->sockets));
$write = null;
$except = null;
if (stream_select($read, $write, $except, 1) === false) break;
foreach ($read as $socket) {
if ($socket === $this->serverSocket) {
$this->accept();
} else {
$this->receive($socket);
}
}
}
foreach ($this->sockets as $socket) fclose($socket);
fclose($this->serverSocket);
}
private function accept(): void
{
$client = stream_socket_accept($this->serverSocket, 0, $addr);
if (!$client) return;
$id = uniqid('client_');
stream_set_blocking($client, false);
$this->sockets[$id] = $client;
$this->nicks[$id] = 'Guest_' . substr($id, -4);
fwrite($client, "ようこそ!ニックネームを設定するには /nick <名前> と入力してください\n");
// 最近の履歴を送信
foreach (array_slice($this->history, -10) as $msg) {
fwrite($client, $msg->format());
}
$this->broadcast("{$this->nicks[$id]} が入室しました", 'System', except: $id);
}
private function receive($socket): void
{
$id = array_search($socket, $this->sockets, true);
if ($id === false) return;
$data = fread($socket, 1024);
if ($data === false || feof($socket)) {
$nick = $this->nicks[$id];
fclose($socket);
unset($this->sockets[$id], $this->nicks[$id]);
$this->broadcast("{$nick} が退室しました", 'System');
return;
}
$text = trim($data);
if ($text === '') return;
// コマンド処理
if (str_starts_with($text, '/nick ')) {
$newNick = substr($text, 6);
$oldNick = $this->nicks[$id];
$this->nicks[$id] = htmlspecialchars($newNick, ENT_QUOTES);
$this->broadcast("{$oldNick} が {$this->nicks[$id]} に改名しました", 'System');
return;
}
$this->broadcast($text, $this->nicks[$id], except: null, from: $id);
}
private function broadcast(string $body, string $from, ?string $except = null, ?string $from_id = null): void
{
$msg = new ChatMessage($from, $body, microtime(true));
$this->history[] = $msg;
if (count($this->history) > $this->maxHistory) {
array_shift($this->history);
}
foreach ($this->sockets as $id => $socket) {
if ($id === $except) continue;
fwrite($socket, $msg->format());
}
}
public function stop(): void { $this->running = false; }
}
// 使用例
$chat = new ChatRoomServer('127.0.0.1', 9000);
echo "ChatRoomServer 設定完了(\$chat->start() で起動)" . PHP_EOL;
// $chat->start();
例6:ストリーム多重化パイプライン(StreamMuxPipeline)
複数の入力ストリームを stream_select() で順次読み取り、変換処理を施して単一の出力ストリームに集約するパイプラインです。
<?php
class StreamMuxPipeline
{
/** @var resource[] */
private array $inputs = [];
/** @var callable[] 変換関数 stream => string */
private array $transformers = [];
private int $bufferSize;
private int $timeoutMs;
public function __construct(int $bufferSize = 4096, int $timeoutMs = 1000)
{
$this->bufferSize = $bufferSize;
$this->timeoutMs = $timeoutMs;
}
/**
* 入力ストリームを追加する(オプションで変換関数を指定)
*/
public function addInput($stream, ?callable $transformer = null): void
{
$idx = count($this->inputs);
$this->inputs[$idx] = $stream;
$this->transformers[$idx] = $transformer ?? fn($data) => $data;
}
/**
* 全入力を多重化して出力ストリームに書き込む
*/
public function muxTo($output): int
{
$totalBytes = 0;
$remaining = $this->inputs;
$transformers = $this->transformers;
$sec = intdiv($this->timeoutMs, 1000);
$usec = ($this->timeoutMs % 1000) * 1000;
while (!empty($remaining)) {
$read = array_values($remaining);
$write = null;
$except = null;
$changed = stream_select($read, $write, $except, $sec, $usec);
if ($changed === false) throw new \RuntimeException('stream_select エラー');
if ($changed === 0) break;
foreach ($read as $stream) {
// 元のインデックスを特定
$idx = array_search($stream, $this->inputs, true);
$data = fread($stream, $this->bufferSize);
if ($data === false || $data === '') {
// ストリーム終了
unset($remaining[$idx]);
continue;
}
// 変換を適用して出力
$transformed = ($transformers[$idx])($data);
$written = fwrite($output, $transformed);
$totalBytes += $written !== false ? $written : 0;
}
// EOF になったストリームを除去
foreach ($remaining as $idx => $stream) {
if (feof($stream)) unset($remaining[$idx]);
}
}
return $totalBytes;
}
}
// 使用例
$pipeline = new StreamMuxPipeline(bufferSize: 256);
// 3つの入力ストリーム(それぞれ異なる変換を適用)
$s1 = fopen('php://memory', 'r+');
fwrite($s1, "hello world\n");
rewind($s1);
$pipeline->addInput($s1, fn($d) => strtoupper($d)); // 大文字変換
$s2 = fopen('php://memory', 'r+');
fwrite($s2, "foo bar baz\n");
rewind($s2);
$pipeline->addInput($s2, fn($d) => str_rot13($d)); // ROT13 変換
$s3 = fopen('php://memory', 'r+');
fwrite($s3, "1234567890\n");
rewind($s3);
$pipeline->addInput($s3, fn($d) => strrev($d)); // 逆順変換
// 出力先
$output = fopen('php://memory', 'w+');
$bytes = $pipeline->muxTo($output);
echo "合計書き込み: {$bytes} バイト" . PHP_EOL;
echo stream_get_contents($output, offset: 0) . PHP_EOL;
fclose($s1); fclose($s2); fclose($s3); fclose($output);
// 出力例(到着順に依存):
// 合計書き込み: 36 バイト
// HELLO WORLD
// sbb one onm
// 0987654321
例7:タイムアウトウォッチドッグ(StreamWatchdog)
stream_select() の戻り値(0 = タイムアウト)を使って、一定時間データが来ないストリームを検出し、自動的に再接続やアラートを発生させるウォッチドッグです。
<?php
class WatchdogEvent
{
const TIMEOUT = 'timeout';
const RECOVERED = 'recovered';
const DATA = 'data';
const ERROR = 'error';
public function __construct(
public readonly string $type,
public readonly int $streamIndex,
public readonly mixed $payload,
public readonly float $timestamp,
) {}
}
class StreamWatchdog
{
/** @var resource[] */
private array $streams;
/** @var float[] ストリームインデックス => 最終受信時刻 */
private array $lastSeen = [];
/** @var bool[] ストリームインデックス => タイムアウト状態か */
private array $timedOut = [];
private int $idleTimeoutMs;
private int $pollIntervalMs;
private bool $running = false;
/** @var callable|null */
private $onEvent;
/**
* @param resource[] $streams
*/
public function __construct(
array $streams,
int $idleTimeoutMs = 5000,
int $pollIntervalMs = 500
) {
$this->streams = $streams;
$this->idleTimeoutMs = $idleTimeoutMs;
$this->pollIntervalMs = $pollIntervalMs;
$now = microtime(true);
foreach (array_keys($streams) as $idx) {
$this->lastSeen[$idx] = $now;
$this->timedOut[$idx] = false;
}
}
public function onEvent(callable $cb): void { $this->onEvent = $cb; }
/**
* 監視ループを開始する
*/
public function watch(int $maxIterations = 100): void
{
$this->running = true;
$sec = intdiv($this->pollIntervalMs, 1000);
$usec = ($this->pollIntervalMs % 1000) * 1000;
for ($iter = 0; $iter < $maxIterations && $this->running; $iter++) {
$read = array_values($this->streams);
$write = null;
$except = null;
$changed = stream_select($read, $write, $except, $sec, $usec);
if ($changed === false) {
$this->emit(new WatchdogEvent(WatchdogEvent::ERROR, -1, 'stream_select failed', microtime(true)));
break;
}
$now = microtime(true);
// データが来たストリームの最終受信時刻を更新
foreach ($read as $stream) {
$idx = array_search($stream, $this->streams, true);
if ($idx === false) continue;
$data = fread($stream, 4096);
$this->lastSeen[$idx] = $now;
// タイムアウト状態から回復
if ($this->timedOut[$idx]) {
$this->timedOut[$idx] = false;
$this->emit(new WatchdogEvent(WatchdogEvent::RECOVERED, (int)$idx, $data, $now));
} else {
$this->emit(new WatchdogEvent(WatchdogEvent::DATA, (int)$idx, $data, $now));
}
}
// アイドルタイムアウトチェック
foreach ($this->streams as $idx => $stream) {
$idle = ($now - $this->lastSeen[$idx]) * 1000;
if ($idle >= $this->idleTimeoutMs && !$this->timedOut[$idx]) {
$this->timedOut[$idx] = true;
$this->emit(new WatchdogEvent(
WatchdogEvent::TIMEOUT, (int)$idx,
"アイドル {$idle}ms(閾値: {$this->idleTimeoutMs}ms)",
$now
));
}
}
}
}
private function emit(WatchdogEvent $event): void
{
if ($this->onEvent) {
($this->onEvent)($event);
}
}
public function stop(): void { $this->running = false; }
}
// 使用例
$s1 = fopen('php://memory', 'r+');
$s2 = fopen('php://memory', 'r+');
// s1 にはデータあり、s2 はアイドル状態
fwrite($s1, "active data");
rewind($s1);
$watchdog = new StreamWatchdog(
streams: [$s1, $s2],
idleTimeoutMs: 100, // テスト用に短く
pollIntervalMs: 50
);
$watchdog->onEvent(function (WatchdogEvent $e) {
$labels = [
WatchdogEvent::DATA => 'DATA',
WatchdogEvent::TIMEOUT => 'TIMEOUT',
WatchdogEvent::RECOVERED => 'RECOVERED',
WatchdogEvent::ERROR => 'ERROR',
];
echo "[{$labels[$e->type]}] stream[{$e->streamIndex}]: "
. (is_string($e->payload) ? substr($e->payload, 0, 50) : json_encode($e->payload))
. PHP_EOL;
});
$watchdog->watch(maxIterations: 5);
fclose($s1);
fclose($s2);
// 出力例:
// [DATA] stream[0]: active data
// [TIMEOUT] stream[1]: アイドル 102ms(閾値: 100ms)
関連する関数との比較
| 関数 | 役割 |
|---|---|
stream_select() | 複数ストリームを同時監視して変化があったものを返す(多重I/O) |
stream_set_blocking() | ストリームをブロッキング / ノンブロッキングモードに切り替える |
stream_set_timeout() | 個別ストリームの読み取りタイムアウトを設定する |
stream_get_meta_data() | ストリームのタイムアウト・EOF・ブロッキング状態を確認する |
socket_select() | socket_create() で作成したソケットリソースの多重監視(stream_select と類似だが別系統) |
注意点とベストプラクティス
1. 配列は参照渡しで上書きされる
stream_select() は $read・$write・$except を書き換えます。元のストリームリストを保持したい場合は事前にコピーを取りましょう。
$allStreams = [$fp1, $fp2, $fp3];
// ループのたびにコピーして渡す
while (true) {
$read = $allStreams; // ← コピー!
$write = null;
$except = null;
$n = stream_select($read, $write, $except, 1);
// $read は変化したものだけになる
// $allStreams は変化しない
}
2. $seconds = null は無限待機
null を渡すとデータが来るまで永遠にブロックします。サーバーループでは 0(即時)または適切な秒数を指定しましょう。
// 無限待機(注意して使う)
stream_select($read, $write, $except, null);
// 1秒タイムアウト(推奨)
stream_select($read, $write, $except, 1);
// 即時リターン(ノンブロッキング)
stream_select($read, $write, $except, 0, 0);
3. Windows では $seconds = 0 が正常動作しない場合がある
Windows 環境では $seconds = 0 のノンブロッキング動作に制限があります。クロスプラットフォームな実装では小さい値(0, 200000 など)を使うか、Unix 環境を前提にしましょう。
4. 空配列の渡し方
$write や $except を監視しない場合は null または空配列を渡します。ただし null の方が意図が明確でパフォーマンスも良好です。
$write = null; // 推奨
$except = null;
stream_select($read, $write, $except, 1);
まとめ
| ポイント | 内容 |
|---|---|
| 基本動作 | 複数ストリームを同時監視し、変化が起きたものだけを配列に残して返す |
| 引数 | $read・$write・$except(参照渡し)、$seconds・$microseconds(タイムアウト) |
| 返り値 | 変化したストリーム数(0 = タイムアウト、false = エラー) |
| 配列の扱い | 呼び出し前にコピーを取る。元リストは毎ループ保持する必要がある |
| タイムアウト | null = 無限待機、0 = 即時リターン、正数 = 秒指定 |
| 活用シーン | 多重TCPサーバー・並列プロセス出力収集・チャットサーバー・ウォッチドッグ・ストリームMux |
stream_select() は PHPで本格的なノンブロッキングI/Oを実現するための中核関数です。イベントループ・多重接続サーバー・並列処理など、シングルプロセスで高いI/O並行性が求められるあらゆる場面で活用できます。
