[PHP]stream_set_blocking完全解説|ノンブロッキングI/Oで高速ストリーム処理を実現する方法

PHP

はじめに

PHPでファイルやネットワーク通信を扱う際、「処理が終わるまで次の行に進めない」という経験はないでしょうか。これはブロッキングI/Oと呼ばれる動作で、PHPのストリームはデフォルトでこのモードになっています。

stream_set_blocking は、ストリームの読み書き動作を ブロッキング(同期)ノンブロッキング(非同期) で切り替えるための関数です。ネットワークソケット、パイプ、プロセス間通信など、複数のストリームを同時に扱う場面で威力を発揮します。

この記事では、基本的な使い方から実践的なユースケースまで、クラスを用いた具体例とともに丁寧に解説します。


stream_set_blocking とは

項目内容
関数名stream_set_blocking
PHPバージョンPHP 4.3.0以降
カテゴリストリーム関数
返り値bool(成功時 true、失敗時 false

構文

stream_set_blocking(resource $stream, bool $enable): bool

パラメータ

パラメータ説明
$streamresource対象のストリームリソース
$enablebooltrue でブロッキングモード、false でノンブロッキングモード

返り値

  • 成功時:true
  • 失敗時:false

ブロッキングとノンブロッキングの違い

【ブロッキングモード(デフォルト)】
 fread() → データが来るまでずっと待つ → 次の処理へ
 └── シンプルだが、データが来ない間は何もできない

【ノンブロッキングモード】
 fread() → データがなければ即座に false/空を返す → 次の処理へ
 └── データが無くても処理を継続できる

基本的な使い方

<?php
// ファイルストリームをノンブロッキングに設定
$stream = fopen('some_file.txt', 'r');

// ノンブロッキングモードへ切り替え
stream_set_blocking($stream, false);

$data = fread($stream, 1024);
// データがなければ即座に '' または false が返る

fclose($stream);

実践例(クラスを使った実装)

例1:ノンブロッキングで複数ストリームを同時監視する

複数のファイルやパイプを同時に読み込みたい場合の基本パターンです。

<?php

class MultiStreamReader
{
    private array $streams = [];
    private array $buffers = [];

    public function addStream(string $label, resource $stream): void
    {
        stream_set_blocking($stream, false);
        $this->streams[$label] = $stream;
        $this->buffers[$label] = '';
    }

    public function poll(int $maxIterations = 100): array
    {
        $results = [];

        for ($i = 0; $i < $maxIterations; $i++) {
            $active = false;

            foreach ($this->streams as $label => $stream) {
                if (feof($stream)) {
                    continue;
                }

                $chunk = fread($stream, 512);
                if ($chunk !== false && $chunk !== '') {
                    $this->buffers[$label] .= $chunk;
                    $active = true;
                }
            }

            if (!$active) {
                usleep(1000); // 1ms 待機してCPU使用率を抑える
            }

            // すべてのストリームがEOFなら終了
            $allDone = array_reduce(
                array_keys($this->streams),
                fn($carry, $label) => $carry && feof($this->streams[$label]),
                true
            );

            if ($allDone) {
                break;
            }
        }

        foreach ($this->buffers as $label => $buf) {
            $results[$label] = $buf;
        }

        return $results;
    }

    public function close(): void
    {
        foreach ($this->streams as $stream) {
            fclose($stream);
        }
        $this->streams = [];
        $this->buffers = [];
    }
}

// 使用例(実際のパイプやFIFOで動作)
$reader = new MultiStreamReader();

// プロセスの出力を並列監視する例
$p1 = popen('echo "Hello from process 1"', 'r');
$p2 = popen('echo "Hello from process 2"', 'r');

$reader->addStream('process1', $p1);
$reader->addStream('process2', $p2);

$results = $reader->poll();

foreach ($results as $label => $output) {
    echo "[{$label}]: " . trim($output) . PHP_EOL;
}

$reader->close();

出力例:

[process1]: Hello from process 1
[process2]: Hello from process 2

例2:外部コマンドの stdout/stderr を分離して取得する

proc_open と組み合わせることで、標準出力とエラー出力を分けてリアルタイムに取得できます。

<?php

class CommandRunner
{
    private string $command;

    public function __construct(string $command)
    {
        $this->command = $command;
    }

    public function run(): array
    {
        $descriptors = [
            0 => ['pipe', 'r'],           // stdin
            1 => ['pipe', 'w'],           // stdout
            2 => ['pipe', 'w'],           // stderr
        ];

        $process = proc_open($this->command, $descriptors, $pipes);

        if (!is_resource($process)) {
            throw new RuntimeException("プロセスの起動に失敗しました: {$this->command}");
        }

        fclose($pipes[0]); // stdin は使わないので閉じる

        // stdout / stderr をノンブロッキングに設定
        stream_set_blocking($pipes[1], false);
        stream_set_blocking($pipes[2], false);

        $stdout = '';
        $stderr = '';

        // どちらかのパイプが開いている間、読み続ける
        while (!feof($pipes[1]) || !feof($pipes[2])) {
            $chunk = fread($pipes[1], 4096);
            if ($chunk !== false && $chunk !== '') {
                $stdout .= $chunk;
            }

            $chunk = fread($pipes[2], 4096);
            if ($chunk !== false && $chunk !== '') {
                $stderr .= $chunk;
            }

            usleep(500);
        }

        fclose($pipes[1]);
        fclose($pipes[2]);

        $exitCode = proc_close($process);

        return [
            'stdout'    => $stdout,
            'stderr'    => $stderr,
            'exit_code' => $exitCode,
        ];
    }
}

// 使用例
$runner = new CommandRunner('php -r "echo \"成功\"; fwrite(STDERR, \"警告メッセージ\");"');
$result = $runner->run();

echo "STDOUT : " . $result['stdout']    . PHP_EOL;
echo "STDERR : " . $result['stderr']    . PHP_EOL;
echo "終了コード: " . $result['exit_code'] . PHP_EOL;

出力例:

STDOUT : 成功
STDERR : 警告メッセージ
終了コード: 0

例3:TCPソケットのノンブロッキング読み取り(タイムアウト処理付き)

ネットワークソケットでよく使われるパターンです。一定時間内にデータが来なければタイムアウトとして扱います。

<?php

class NonBlockingSocketClient
{
    private $socket;
    private float $timeoutSeconds;

    public function __construct(string $host, int $port, float $timeoutSeconds = 5.0)
    {
        $this->timeoutSeconds = $timeoutSeconds;

        $this->socket = fsockopen($host, $port, $errno, $errstr, $timeoutSeconds);

        if (!$this->socket) {
            throw new RuntimeException("接続失敗: [{$errno}] {$errstr}");
        }

        // ノンブロッキングモードに設定
        stream_set_blocking($this->socket, false);
    }

    public function send(string $data): void
    {
        fwrite($this->socket, $data);
    }

    public function receive(): ?string
    {
        $response  = '';
        $startTime = microtime(true);

        while (true) {
            $chunk = fread($this->socket, 4096);

            if ($chunk !== false && $chunk !== '') {
                $response .= $chunk;
            }

            if (feof($this->socket)) {
                break;
            }

            // タイムアウト判定
            if ((microtime(true) - $startTime) >= $this->timeoutSeconds) {
                throw new RuntimeException("受信タイムアウト({$this->timeoutSeconds}秒)");
            }

            usleep(1000);
        }

        return $response !== '' ? $response : null;
    }

    public function close(): void
    {
        if (is_resource($this->socket)) {
            fclose($this->socket);
        }
    }
}

// 使用例(ローカルにHTTPサーバーがある場合)
try {
    $client = new NonBlockingSocketClient('example.com', 80, 5.0);
    $client->send("GET / HTTP/1.0\r\nHost: example.com\r\n\r\n");
    $response = $client->receive();
    echo "レスポンス受信: " . strlen($response ?? '') . " バイト" . PHP_EOL;
    $client->close();
} catch (RuntimeException $e) {
    echo "エラー: " . $e->getMessage() . PHP_EOL;
}

例4:stream_select と組み合わせた効率的な多重化

stream_select と組み合わせることで、CPUを無駄に消費しない効率的な多重I/Oが実現できます。

<?php

class StreamMultiplexer
{
    private array $streams   = [];
    private array $labels    = [];
    private array $results   = [];
    private float $timeout;

    public function __construct(float $timeout = 10.0)
    {
        $this->timeout = $timeout;
    }

    public function add(string $label, resource $stream): void
    {
        stream_set_blocking($stream, false);
        $key = (int) $stream;
        $this->streams[$key] = $stream;
        $this->labels[$key]  = $label;
        $this->results[$label] = '';
    }

    public function readAll(): array
    {
        $deadline = microtime(true) + $this->timeout;

        while (!empty($this->streams)) {
            $read   = array_values($this->streams);
            $write  = null;
            $except = null;

            // stream_select でデータが来たストリームだけを検知
            $changed = stream_select($read, $write, $except, 0, 50000); // 50ms

            if ($changed === false) {
                break;
            }

            foreach ($read as $stream) {
                $key   = (int) $stream;
                $label = $this->labels[$key];

                $chunk = fread($stream, 4096);
                if ($chunk !== false && $chunk !== '') {
                    $this->results[$label] .= $chunk;
                }

                if (feof($stream)) {
                    fclose($stream);
                    unset($this->streams[$key]);
                }
            }

            if (microtime(true) >= $deadline) {
                break;
            }
        }

        return $this->results;
    }
}

// 使用例
$mux = new StreamMultiplexer(5.0);

$mux->add('cmd_a', popen('echo "結果A"', 'r'));
$mux->add('cmd_b', popen('echo "結果B"', 'r'));

$results = $mux->readAll();
foreach ($results as $label => $output) {
    echo "[{$label}]: " . trim($output) . PHP_EOL;
}

出力例:

[cmd_a]: 結果A
[cmd_b]: 結果B

例5:ブロッキングモードの切り替えをロガーで記録するデコレーター

本番環境でモード切り替えのタイミングを監視・デバッグするためのパターンです。

<?php

class StreamBlockingDecorator
{
    private $stream;
    private string $name;
    private bool $currentMode;
    private array $log = [];

    public function __construct(resource $stream, string $name, bool $initialBlocking = true)
    {
        $this->stream      = $stream;
        $this->name        = $name;
        $this->currentMode = $initialBlocking;

        stream_set_blocking($stream, $initialBlocking);
        $this->record($initialBlocking, 'コンストラクタ');
    }

    public function setBlocking(bool $enable, string $reason = ''): bool
    {
        $result = stream_set_blocking($this->stream, $enable);

        if ($result) {
            $this->currentMode = $enable;
            $this->record($enable, $reason);
        } else {
            $this->log[] = [
                'time'   => date('H:i:s'),
                'stream' => $this->name,
                'mode'   => null,
                'reason' => "モード変更失敗: {$reason}",
            ];
        }

        return $result;
    }

    public function getStream(): resource
    {
        return $this->stream;
    }

    public function isBlocking(): bool
    {
        return $this->currentMode;
    }

    public function getLog(): array
    {
        return $this->log;
    }

    public function printLog(): void
    {
        foreach ($this->log as $entry) {
            $mode = $entry['mode'] === null ? '不明'
                  : ($entry['mode'] ? 'BLOCKING' : 'NON-BLOCKING');
            $reason = $entry['reason'] ? " ({$entry['reason']})" : '';
            echo "[{$entry['time']}] {$entry['stream']}: {$mode}{$reason}" . PHP_EOL;
        }
    }

    private function record(bool $mode, string $reason): void
    {
        $this->log[] = [
            'time'   => date('H:i:s'),
            'stream' => $this->name,
            'mode'   => $mode,
            'reason' => $reason,
        ];
    }
}

// 使用例
$raw = fopen('php://temp', 'r+');
$decorated = new StreamBlockingDecorator($raw, 'temp_stream', true);

$decorated->setBlocking(false, 'ノンブロッキング読み取り開始');
fwrite($decorated->getStream(), "テストデータ");
rewind($decorated->getStream());
$data = fread($decorated->getStream(), 100);

$decorated->setBlocking(true, '処理完了後にブロッキングへ戻す');
fclose($decorated->getStream());

echo "読み取ったデータ: {$data}" . PHP_EOL;
$decorated->printLog();

出力例:

読み取ったデータ: テストデータ
[12:00:00] temp_stream: BLOCKING (コンストラクタ)
[12:00:00] temp_stream: NON-BLOCKING (ノンブロッキング読み取り開始)
[12:00:00] temp_stream: BLOCKING (処理完了後にブロッキングへ戻す)

例6:パイプラインを使ったデータ変換チェーン

複数のコマンドをパイプで繋ぎ、各ストリームをノンブロッキングで読み取るパターンです。

<?php

class PipelineProcessor
{
    /**
     * コマンドの配列を受け取り、各コマンドの出力をノンブロッキングで取得する
     *
     * @param string[] $commands
     * @return array<string, string>
     */
    public function run(array $commands): array
    {
        $outputs = [];

        foreach ($commands as $name => $cmd) {
            $pipe = popen($cmd, 'r');
            if (!is_resource($pipe)) {
                $outputs[$name] = "[エラー: コマンド起動失敗]";
                continue;
            }

            stream_set_blocking($pipe, false);

            $buffer  = '';
            $timeout = microtime(true) + 5.0;

            while (!feof($pipe)) {
                $chunk = fread($pipe, 4096);
                if ($chunk !== false && $chunk !== '') {
                    $buffer .= $chunk;
                }

                if (microtime(true) > $timeout) {
                    $buffer .= PHP_EOL . "[タイムアウト]";
                    break;
                }

                usleep(500);
            }

            pclose($pipe);
            $outputs[$name] = trim($buffer);
        }

        return $outputs;
    }
}

// 使用例
$processor = new PipelineProcessor();

$commands = [
    'date'     => 'date "+%Y-%m-%d"',
    'hostname' => 'hostname',
    'php_ver'  => 'php -r "echo PHP_VERSION;"',
];

$results = $processor->run($commands);

foreach ($results as $name => $output) {
    echo str_pad("[{$name}]", 12) . ": {$output}" . PHP_EOL;
}

出力例:

[date]      : 2026-05-25
[hostname]  : my-server
[php_ver]   : 8.3.0

関連する関数との比較

関数役割
stream_set_blockingブロッキング/ノンブロッキングの切り替え
stream_select複数ストリームのI/O多重化(変化を待つ)
stream_set_timeoutブロッキングモード時のタイムアウト設定
stream_set_read_buffer読み取りバッファサイズの設定
stream_set_write_buffer書き込みバッファサイズの設定
socket_set_nonblockソケット拡張のノンブロッキング設定(別拡張)

stream_set_blocking vs stream_select

// stream_set_blocking: 1つのストリームの動作を変える
stream_set_blocking($stream, false);
$data = fread($stream, 1024); // すぐ返る

// stream_select: 複数ストリームを効率よく監視する(変化まで待機)
$read = [$stream1, $stream2];
$write = $except = null;
stream_select($read, $write, $except, 5); // 最大5秒待ち、変化したものを返す

使い分けの目安:

  • ストリームが1〜2本 → stream_set_blocking でシンプルに
  • ストリームが3本以上 → stream_select と組み合わせて効率的に

よくある注意点・落とし穴

1. ローカルファイルへの効果は限定的

stream_set_blocking はネットワークソケットやパイプで特に効果を発揮します。ローカルファイルは通常OSがキャッシュするため、ノンブロッキングに設定しても体感差が出にくいことがあります。

// ローカルファイルではあまり意味がないケースも
$fp = fopen('/tmp/test.txt', 'r');
stream_set_blocking($fp, false); // ← ローカルファイルは常にデータがある

2. false と空文字の区別

ノンブロッキングモードでデータが無い場合、freadfalse ではなく 空文字列 '' を返します。

$chunk = fread($stream, 1024);

if ($chunk === false) {
    echo "エラーまたはEOF";
} elseif ($chunk === '') {
    echo "まだデータがない(待機中)";
} else {
    echo "データ取得: " . strlen($chunk) . " バイト";
}

3. Windows環境での制限

Windows環境では、ファイルストリームに対して stream_set_blockingfalse にしても無視される場合があります(OSの制約)。ソケットやパイプでは動作します。

4. ブロッキングモードに戻す

ノンブロッキング処理が終わったら、必要に応じてブロッキングモードに戻すことで、以降の通常読み取りが安全になります。

stream_set_blocking($stream, false); // ノンブロッキングで処理
// ... 処理 ...
stream_set_blocking($stream, true);  // 元に戻す

まとめ

項目内容
関数名stream_set_blocking(resource $stream, bool $enable): bool
主な用途ソケット・パイプのノンブロッキングI/O
代表的な組み合わせproc_openpopenfsockopenstream_select
注意点ローカルファイルへの効果は限定的、Windows制限あり
PHP バージョンPHP 4.3.0 以上

stream_set_blocking は、複数のプロセスやネットワーク接続を同時に扱う PHPアプリケーションで非常に重要な関数です。特に proc_openstream_select と組み合わせることで、デッドロックを回避しながら効率的な並行処理が実現できます。

ノンブロッキングI/Oを正しく活用することで、「1つの処理を待ち続ける」ボトルネックから解放され、レスポンシブなシステムを構築できます。ぜひ実際のプロジェクトで試してみてください。

タイトルとURLをコピーしました