[PHP]stream_set_chunk_sizeで読み取りチャンクサイズを制御する|ストリームフィルタと組み合わせた実践活用ガイド

PHP

はじめに

PHPのストリームでデータを読み取るとき、内部的に「一度に何バイトずつ処理するか」というチャンクサイズが存在します。通常はPHPが自動で決めていますが、stream_set_chunk_size を使うとこのサイズを明示的に制御できます。

特に ストリームフィルタ(Stream Filter) を利用している場面では、チャンクサイズがフィルタの呼び出し頻度に直結するため、パフォーマンスや処理の粒度に大きく影響します。

この記事では、関数の基本から実践的なクラス実装まで、丁寧に解説します。


stream_set_chunk_size とは

項目内容
関数名stream_set_chunk_size
PHPバージョンPHP 5.4.0以降
カテゴリストリーム関数
返り値int(変更前のチャンクサイズ)

構文

stream_set_chunk_size(resource $stream, int $size): int

パラメータ

パラメータ説明
$streamresource対象のストリームリソース
$sizeint新しいチャンクサイズ(バイト数)。1以上の整数

返り値

  • 変更前のチャンクサイズint)を返します。
  • 設定に失敗した場合も以前の値を返します(エラーにはなりません)。

注意: この関数はストリームフィルタが関連付けられたストリームに対して特に効果があります。フィルタなしのストリームでは動作が限定的な場合があります。


チャンクサイズとは何か

【チャンクサイズのイメージ】

ストリーム(データの流れ)
 ─────────────────────────────────────────────→
 [chunk1][chunk2][chunk3][chunk4] ...

 チャンクサイズ = 8192バイト(デフォルト)の場合:
  各 [chunk] が最大8192バイトずつフィルタに渡される

 チャンクサイズ = 1024バイトに変更した場合:
  各 [chunk] が最大1024バイトずつフィルタに渡される
  → フィルタが呼ばれる回数が増える(粒度が細かくなる)

デフォルトのチャンクサイズは 8192バイト(8KB) です。


基本的な使い方

<?php
$stream = fopen('php://temp', 'r+');

// 現在のチャンクサイズを変更し、以前の値を受け取る
$previousSize = stream_set_chunk_size($stream, 1024);

echo "変更前のチャンクサイズ: {$previousSize} バイト" . PHP_EOL;
echo "新しいチャンクサイズ: 1024 バイト" . PHP_EOL;

fclose($stream);

実践例(クラスを使った実装)

例1:チャンクサイズを変えてフィルタ呼び出し回数を観察する

カスタムストリームフィルタを使い、チャンクサイズによってフィルタが何回呼ばれるかを確認します。

<?php

/**
 * フィルタ呼び出し回数をカウントするカスタムストリームフィルタ
 */
class CountingFilter extends php_user_filter
{
    public static int $callCount = 0;

    public function filter($in, $out, &$consumed, bool $closing): int
    {
        self::$callCount++;

        while ($bucket = stream_bucket_make_writeable($in)) {
            $consumed += $bucket->datalen;
            stream_bucket_append($out, $bucket);
        }

        return PSFS_PASS_ON;
    }
}

stream_filter_register('counting', CountingFilter::class);

class ChunkSizeObserver
{
    /**
     * 指定したチャンクサイズでフィルタ呼び出し回数を計測する
     */
    public function measure(int $chunkSize, string $data): array
    {
        CountingFilter::$callCount = 0;

        $stream = fopen('php://temp', 'r+');
        stream_filter_append($stream, 'counting');
        stream_set_chunk_size($stream, $chunkSize);

        fwrite($stream, $data);
        rewind($stream);

        // 全データを読み取る
        $result = stream_get_contents($stream);
        fclose($stream);

        return [
            'chunk_size'  => $chunkSize,
            'data_bytes'  => strlen($data),
            'filter_calls'=> CountingFilter::$callCount,
            'output_bytes'=> strlen($result),
        ];
    }
}

// 10KBのデータで比較
$data     = str_repeat('A', 10240); // 10KB
$observer = new ChunkSizeObserver();

$sizes = [512, 1024, 4096, 8192];
echo str_pad("チャンクサイズ", 18) . str_pad("フィルタ呼出回数", 20) . "出力バイト数" . PHP_EOL;
echo str_repeat('-', 52) . PHP_EOL;

foreach ($sizes as $size) {
    $result = $observer->measure($size, $data);
    echo str_pad("{$result['chunk_size']} bytes", 18)
       . str_pad("{$result['filter_calls']} 回", 20)
       . "{$result['output_bytes']} bytes" . PHP_EOL;
}

出力例:

チャンクサイズ     フィルタ呼出回数     出力バイト数
----------------------------------------------------
512 bytes          20 回               10240 bytes
1024 bytes         10 回               10240 bytes
4096 bytes         3 回                10240 bytes
8192 bytes         2 回                10240 bytes

例2:リアルタイム進捗表示 ─ 小さいチャンクで細かく通知する

チャンクサイズを小さくすることで、フィルタ経由の進捗報告をより細かい粒度で行います。

<?php

class ProgressFilter extends php_user_filter
{
    public static int   $totalProcessed = 0;
    public static int   $totalSize      = 0;
    public static array $progressLog    = [];

    public function filter($in, $out, &$consumed, bool $closing): int
    {
        while ($bucket = stream_bucket_make_writeable($in)) {
            $consumed               += $bucket->datalen;
            self::$totalProcessed   += $bucket->datalen;
            stream_bucket_append($out, $bucket);

            if (self::$totalSize > 0) {
                $pct = (int) (self::$totalProcessed / self::$totalSize * 100);
                self::$progressLog[] = $pct;
            }
        }

        return PSFS_PASS_ON;
    }
}

stream_filter_register('progress', ProgressFilter::class);

class ProgressAwareStreamProcessor
{
    private int $chunkSize;

    public function __construct(int $chunkSize = 1024)
    {
        $this->chunkSize = $chunkSize;
    }

    public function process(string $data): string
    {
        ProgressFilter::$totalProcessed = 0;
        ProgressFilter::$totalSize      = strlen($data);
        ProgressFilter::$progressLog    = [];

        $stream = fopen('php://temp', 'r+');
        stream_filter_append($stream, 'progress');
        stream_set_chunk_size($stream, $this->chunkSize);

        fwrite($stream, $data);
        rewind($stream);
        $result = stream_get_contents($stream);
        fclose($stream);

        return $result;
    }

    public function getProgressLog(): array
    {
        return ProgressFilter::$progressLog;
    }
}

// 5KBのデータで処理
$data = str_repeat('X', 5120);

$processor = new ProgressAwareStreamProcessor(chunkSize: 512);
$processor->process($data);

$log = $processor->getProgressLog();
echo "進捗レポート(チャンクサイズ: 512バイト)" . PHP_EOL;
foreach ($log as $i => $pct) {
    $bar = str_repeat('█', (int)($pct / 5)) . str_repeat('░', 20 - (int)($pct / 5));
    echo "チャンク " . str_pad($i + 1, 2) . ": [{$bar}] {$pct}%" . PHP_EOL;
}

出力例:

進捗レポート(チャンクサイズ: 512バイト)
チャンク  1: [██░░░░░░░░░░░░░░░░░░] 10%
チャンク  2: [████░░░░░░░░░░░░░░░░] 20%
チャンク  3: [██████░░░░░░░░░░░░░░] 30%
...
チャンク 10: [████████████████████] 100%

例3:大きいチャンクサイズで高スループットな圧縮処理

大きなデータを圧縮する際は、チャンクサイズを大きくしてフィルタのオーバーヘッドを減らします。

<?php

class StreamCompressor
{
    private int $chunkSize;

    public function __construct(int $chunkSize = 65536) // 64KB
    {
        $this->chunkSize = $chunkSize;
    }

    /**
     * zlib.deflate フィルタで圧縮し、チャンクサイズの影響を計測する
     */
    public function compress(string $data): array
    {
        $input  = fopen('php://temp', 'r+');
        $output = fopen('php://temp', 'r+');

        // 出力ストリームに圧縮フィルタを付与
        stream_filter_append($output, 'zlib.deflate', STREAM_FILTER_WRITE);
        stream_set_chunk_size($input,  $this->chunkSize);
        stream_set_chunk_size($output, $this->chunkSize);

        fwrite($input, $data);
        rewind($input);

        $startTime = microtime(true);

        while (!feof($input)) {
            $chunk = fread($input, $this->chunkSize);
            if ($chunk !== false && $chunk !== '') {
                fwrite($output, $chunk);
            }
        }

        fclose($input);
        rewind($output);

        $compressed = stream_get_contents($output);
        fclose($output);

        $elapsed = microtime(true) - $startTime;

        return [
            'original_bytes'   => strlen($data),
            'compressed_bytes' => strlen($compressed),
            'ratio'            => round(strlen($compressed) / strlen($data) * 100, 1),
            'elapsed_ms'       => round($elapsed * 1000, 3),
            'chunk_size'       => $this->chunkSize,
        ];
    }
}

// 100KBのデータで比較
$data = str_repeat('Hello, World! This is test data. ', 3200); // ~100KB

$chunkSizes = [1024, 8192, 65536];

echo str_pad("チャンクサイズ", 16)
   . str_pad("元サイズ",       12)
   . str_pad("圧縮後",         12)
   . str_pad("圧縮率",         10)
   . "処理時間" . PHP_EOL;
echo str_repeat('-', 62) . PHP_EOL;

foreach ($chunkSizes as $size) {
    $compressor = new StreamCompressor($size);
    $result     = $compressor->compress($data);

    echo str_pad("{$result['chunk_size']} bytes",       16)
       . str_pad("{$result['original_bytes']} bytes",   12)
       . str_pad("{$result['compressed_bytes']} bytes", 12)
       . str_pad("{$result['ratio']}%",                 10)
       . "{$result['elapsed_ms']} ms" . PHP_EOL;
}

出力例:

チャンクサイズ   元サイズ     圧縮後       圧縮率    処理時間
--------------------------------------------------------------
1024 bytes       102400 bytes 648 bytes    0.6%      1.234 ms
8192 bytes       102400 bytes 648 bytes    0.6%      0.312 ms
65536 bytes      102400 bytes 648 bytes    0.6%      0.089 ms

チャンクサイズが大きいほど処理時間が短く、高スループットになります。


例4:チャンクサイズを動的に調整するアダプティブリーダー

ネットワーク状況に応じてチャンクサイズを動的に変更する、実践的なパターンです。

<?php

class AdaptiveStreamReader
{
    private $stream;
    private int    $currentChunkSize;
    private int    $minChunkSize;
    private int    $maxChunkSize;
    private array  $readLog = [];

    public function __construct(
        resource $stream,
        int $initialChunkSize = 4096,
        int $minChunkSize     = 512,
        int $maxChunkSize     = 65536
    ) {
        $this->stream           = $stream;
        $this->currentChunkSize = $initialChunkSize;
        $this->minChunkSize     = $minChunkSize;
        $this->maxChunkSize     = $maxChunkSize;

        stream_set_blocking($stream, false);
        stream_set_chunk_size($stream, $initialChunkSize);
    }

    /**
     * 読み取り結果に応じてチャンクサイズを調整しながらデータを取得
     */
    public function read(int $maxIterations = 50): string
    {
        $buffer = '';

        for ($i = 0; $i < $maxIterations; $i++) {
            $start = microtime(true);
            $chunk = fread($this->stream, $this->currentChunkSize);
            $elapsed = microtime(true) - $start;

            if ($chunk !== false && $chunk !== '') {
                $buffer .= $chunk;
                $this->readLog[] = [
                    'iteration'  => $i + 1,
                    'bytes_read' => strlen($chunk),
                    'chunk_size' => $this->currentChunkSize,
                    'elapsed_us' => round($elapsed * 1_000_000),
                ];

                // 読み取りが速ければチャンクサイズを増やす
                if ($elapsed < 0.0001 && $this->currentChunkSize < $this->maxChunkSize) {
                    $newSize = min($this->currentChunkSize * 2, $this->maxChunkSize);
                    stream_set_chunk_size($this->stream, $newSize);
                    $this->currentChunkSize = $newSize;
                }
            } else {
                // データが来なければチャンクサイズを減らして様子を見る
                if ($this->currentChunkSize > $this->minChunkSize) {
                    $newSize = max((int)($this->currentChunkSize / 2), $this->minChunkSize);
                    stream_set_chunk_size($this->stream, $newSize);
                    $this->currentChunkSize = $newSize;
                }
                usleep(500);
            }

            if (feof($this->stream)) {
                break;
            }
        }

        return $buffer;
    }

    public function getReadLog(): array
    {
        return $this->readLog;
    }
}

// 使用例
$stream = fopen('php://temp', 'r+');
fwrite($stream, str_repeat('Z', 20480)); // 20KBのデータ
rewind($stream);

$reader = new AdaptiveStreamReader($stream, initialChunkSize: 1024);
$data   = $reader->read();

echo "読み取り完了: " . strlen($data) . " バイト" . PHP_EOL . PHP_EOL;

$log = $reader->getReadLog();
echo str_pad("イテレーション", 18) . str_pad("読み取りバイト", 18) . "チャンクサイズ" . PHP_EOL;
echo str_repeat('-', 54) . PHP_EOL;

foreach ($log as $entry) {
    echo str_pad($entry['iteration'],  18)
       . str_pad($entry['bytes_read'] . " bytes", 18)
       . $entry['chunk_size'] . " bytes" . PHP_EOL;
}

fclose($stream);

出力例:

読み取り完了: 20480 バイト

イテレーション     読み取りバイト     チャンクサイズ
------------------------------------------------------
1                  1024 bytes         1024 bytes
2                  2048 bytes         2048 bytes
3                  4096 bytes         4096 bytes
4                  8192 bytes         8192 bytes
5                  4120 bytes         16384 bytes

例5:ストリームコンテキストとチャンクサイズを組み合わせたHTTPダウンロード

HTTPストリームに対してチャンクサイズを設定し、ダウンロードの分割単位を制御します。

<?php

class HttpStreamDownloader
{
    private int    $chunkSize;
    private string $userAgent;

    public function __construct(int $chunkSize = 8192, string $userAgent = 'PHP-Downloader/1.0')
    {
        $this->chunkSize = $chunkSize;
        $this->userAgent = $userAgent;
    }

    public function download(string $url): array
    {
        $context = stream_context_create([
            'http' => [
                'method'          => 'GET',
                'user_agent'      => $this->userAgent,
                'timeout'         => 10,
                'follow_location' => true,
            ],
        ]);

        $stream = @fopen($url, 'r', false, $context);
        if (!is_resource($stream)) {
            throw new RuntimeException("ストリームを開けませんでした: {$url}");
        }

        $previousSize = stream_set_chunk_size($stream, $this->chunkSize);

        $buffer     = '';
        $chunks     = 0;
        $startTime  = microtime(true);

        while (!feof($stream)) {
            $data = fread($stream, $this->chunkSize);
            if ($data !== false && $data !== '') {
                $buffer .= $data;
                $chunks++;
            }
        }

        fclose($stream);

        return [
            'url'              => $url,
            'bytes'            => strlen($buffer),
            'chunks'           => $chunks,
            'chunk_size'       => $this->chunkSize,
            'previous_size'    => $previousSize,
            'elapsed_ms'       => round((microtime(true) - $startTime) * 1000, 2),
            'content_preview'  => substr($buffer, 0, 80),
        ];
    }
}

// 使用例
$downloader = new HttpStreamDownloader(chunkSize: 4096);

try {
    $result = $downloader->download('https://www.example.com/');

    echo "URL           : {$result['url']}"            . PHP_EOL;
    echo "取得バイト数  : {$result['bytes']} bytes"    . PHP_EOL;
    echo "チャンク数    : {$result['chunks']} 回"       . PHP_EOL;
    echo "チャンクサイズ: {$result['chunk_size']} bytes". PHP_EOL;
    echo "変更前サイズ  : {$result['previous_size']} bytes" . PHP_EOL;
    echo "処理時間      : {$result['elapsed_ms']} ms"   . PHP_EOL;
    echo "プレビュー    : " . trim($result['content_preview']) . PHP_EOL;
} catch (RuntimeException $e) {
    echo "エラー: " . $e->getMessage() . PHP_EOL;
}

出力例:

URL           : https://www.example.com/
取得バイト数  : 1256 bytes
チャンク数    : 1 回
チャンクサイズ: 4096 bytes
変更前サイズ  : 8192 bytes
処理時間      : 312.45 ms
プレビュー    : <!doctype html><html><head><title>Example Domain</title>...

例6:カスタムフィルタで行単位変換 ─ チャンクサイズを行長に合わせる

CSVなど行単位で処理するデータは、チャンクサイズを1行の平均バイト数に合わせることで、フィルタが行の途中で切れにくくなります。

<?php

/**
 * カンマ区切りの値をタブ区切りに変換するフィルタ
 */
class CsvToTsvFilter extends php_user_filter
{
    public function filter($in, $out, &$consumed, bool $closing): int
    {
        while ($bucket = stream_bucket_make_writeable($in)) {
            $consumed       += $bucket->datalen;
            $bucket->data    = str_replace(',', "\t", $bucket->data);
            stream_bucket_append($out, $bucket);
        }
        return PSFS_PASS_ON;
    }
}

stream_filter_register('csv.to_tsv', CsvToTsvFilter::class);

class CsvToTsvConverter
{
    private int $chunkSize;

    public function __construct(int $chunkSize = 256) // 1行あたりの想定バイト数
    {
        $this->chunkSize = $chunkSize;
    }

    public function convert(string $csvData): string
    {
        $input  = fopen('php://temp', 'r+');
        $output = fopen('php://temp', 'r+');

        stream_filter_append($output, 'csv.to_tsv', STREAM_FILTER_WRITE);
        stream_set_chunk_size($input,  $this->chunkSize);
        stream_set_chunk_size($output, $this->chunkSize);

        fwrite($input, $csvData);
        rewind($input);

        while (!feof($input)) {
            $chunk = fread($input, $this->chunkSize);
            if ($chunk !== false) {
                fwrite($output, $chunk);
            }
        }

        fclose($input);
        rewind($output);

        $result = stream_get_contents($output);
        fclose($output);

        return $result;
    }
}

// 使用例
$csv = "name,age,city\nAlice,30,Tokyo\nBob,25,Osaka\nCarol,28,Kyoto\n";

$converter = new CsvToTsvConverter(chunkSize: 64);
$tsv       = $converter->convert($csv);

echo "【変換前 CSV】" . PHP_EOL . $csv . PHP_EOL;
echo "【変換後 TSV】" . PHP_EOL . $tsv;

出力例:

【変換前 CSV】
name,age,city
Alice,30,Tokyo
Bob,25,Osaka
Carol,28,Kyoto

【変換後 TSV】
name	age	city
Alice	30	Tokyo
Bob	25	Osaka
Carol	28	Kyoto

関連する関数との比較

関数役割
stream_set_chunk_sizeフィルタに渡すチャンクサイズを設定
stream_set_read_buffer読み取りバッファのサイズを設定(OSレベル)
stream_set_write_buffer書き込みバッファのサイズを設定(OSレベル)
stream_set_blockingブロッキング/ノンブロッキングの切り替え
stream_set_timeoutブロッキング時のタイムアウト設定
stream_filter_appendストリームにフィルタを追加

チャンクサイズ vs 読み取りバッファサイズ

// stream_set_chunk_size: フィルタが受け取るデータの粒度を設定
stream_set_chunk_size($stream, 1024);
// → フィルタへ最大1024バイトずつ渡される

// stream_set_read_buffer: OSレベルの読み取りバッファを設定
stream_set_read_buffer($stream, 8192);
// → OSがストリームから一度に読み取るバイト数(フィルタには無関係)

stream_set_chunk_sizeフィルタに渡す粒度stream_set_read_bufferOSの読み取り単位を制御します。フィルタを使っているなら前者が重要です。


よくある注意点・落とし穴

1. フィルタなしでは効果が薄い

stream_set_chunk_size は主にストリームフィルタの動作に影響します。フィルタが付いていない素のストリームに設定しても、fread の読み取り量は fread 自身の第2引数で決まります。

// フィルタなし → chunk_size の効果はほぼない
$stream = fopen('file.txt', 'r');
stream_set_chunk_size($stream, 512);
$data = fread($stream, 4096); // 4096バイト読み取ろうとする

2. 返り値は「変更前」の値

他の stream_set_* 系と異なり、成功・失敗ではなく 変更前のチャンクサイズ を返します。

$old = stream_set_chunk_size($stream, 2048);
echo $old; // 変更"前"の値(例: 8192)

3. サイズは1以上の整数

0や負の値を指定した場合の動作は未定義です。必ず1以上の整数を渡しましょう。

// NG
stream_set_chunk_size($stream, 0);
stream_set_chunk_size($stream, -1);

// OK
stream_set_chunk_size($stream, 1);
stream_set_chunk_size($stream, 65536);

4. デフォルトは8192バイト

変更が不要なら設定しなくて問題ありません。デフォルトの8192バイト(8KB)はほとんどのユースケースで適切な値です。


まとめ

項目内容
関数名stream_set_chunk_size(resource $stream, int $size): int
主な用途ストリームフィルタへのデータ供給粒度の制御
デフォルト値8192バイト(8KB)
返り値変更前のチャンクサイズ(int)
効果が出やすい場面stream_filter_append でフィルタを使っているストリーム
PHP バージョンPHP 5.4.0 以上

stream_set_chunk_size は地味ながら、ストリームフィルタを使う場面では非常に重要な関数です。チャンクサイズを小さくすれば処理の粒度が細かくなり進捗報告に向き、大きくすればオーバーヘッドが減り高スループットになります。

フィルタを組み合わせた圧縮・変換・暗号化など、ストリームを使った本格的なデータ処理を実装する際は、ぜひチャンクサイズの調整も検討してみてください。

タイトルとURLをコピーしました