[PHP]stream_filter_register完全ガイド|カスタムストリームフィルターを自作してデータ変換を自在に操る

PHP

はじめに

PHPでファイルの読み書きやネットワーク通信を行う際、「読み取ったデータを自動的に変換したい」「書き込む前に暗号化や圧縮を挟みたい」と思ったことはないでしょうか。

stream_filter_register() は、ストリームにアタッチできる独自のフィルタークラスを登録する関数です。一度登録しておけば、stream_filter_append()fopen() のコンテキストオプションと組み合わせることで、読み書きのたびにデータが自動的にフィルター処理されます。

組み込みフィルター(string.rot13zlib.deflate など)では対応できない独自処理を、シームレスにストリームへ組み込めるのが最大の魅力です。


関数の基本情報

項目内容
関数名stream_filter_register()
対応バージョンPHP 5.0.0 以降
返り値bool(成功時 true、失敗時 false
カテゴリストリーム関数

構文

stream_filter_register(string $filter_name, string $class): bool

パラメータ

パラメータ説明
$filter_namestringフィルターの名前。fopen()stream_filter_append() で指定する識別子
$classstringphp_user_filter を継承したクラス名(文字列で渡す)

返り値

  • true:登録成功
  • false:同名フィルターがすでに登録済み、またはクラスが存在しない場合

カスタムフィルタークラスの基本構造

stream_filter_register() を使うには、まず php_user_filter を継承したクラスを定義する必要があります。実装すべきメソッドは以下のとおりです。

class MyFilter extends php_user_filter
{
    /**
     * データバケットにフィルター処理を適用する
     *
     * @param resource $in       入力バケットブリゲード
     * @param resource $out      出力バケットブリゲード
     * @param int      $consumed 処理済みバイト数(参照渡し)
     * @param bool     $closing  ストリームが閉じられようとしているか
     * @return int PSFS_PASS_ON / PSFS_FEED_ME / PSFS_ERR_FATAL
     */
    public function filter($in, $out, &$consumed, bool $closing): int
    {
        while ($bucket = stream_bucket_make_writeable($in)) {
            // $bucket->data にフィルター処理を施す
            $bucket->data = strtoupper($bucket->data);
            $consumed += $bucket->datalen;
            stream_bucket_append($out, $bucket);
        }
        return PSFS_PASS_ON;
    }
}

stream_filter_register('my.uppercase', MyFilter::class);

filter() の戻り値定数

定数意味
PSFS_PASS_ON出力バケットにデータを渡す(通常はこれを返す)
PSFS_FEED_MEデータが足りないため、もっと入力が必要
PSFS_ERR_FATAL回復不能なエラーが発生した

基本的な使い方

<?php

class UpperCaseFilter extends php_user_filter
{
    public function filter($in, $out, &$consumed, bool $closing): int
    {
        while ($bucket = stream_bucket_make_writeable($in)) {
            $bucket->data = strtoupper($bucket->data);
            $consumed += $bucket->datalen;
            stream_bucket_append($out, $bucket);
        }
        return PSFS_PASS_ON;
    }
}

stream_filter_register('str.uppercase', UpperCaseFilter::class);

$fp = fopen('php://memory', 'w+');
stream_filter_append($fp, 'str.uppercase');

fwrite($fp, 'hello, stream filter!');
rewind($fp);

echo fread($fp, 1024);
// 出力: HELLO, STREAM FILTER!

fclose($fp);

実践的なクラスベースの活用例


例1:ログ整形フィルター(LogFormatterFilter)

アプリケーションのログファイルへの書き込み時に、タイムスタンプとプレフィックスを自動付与するフィルターです。

<?php

class LogFormatterFilter extends php_user_filter
{
    private string $prefix;

    public function onCreate(): bool
    {
        // params プロパティでフィルター名のサフィックスを取得
        $this->prefix = $this->params ?? 'INFO';
        return true;
    }

    public function filter($in, $out, &$consumed, bool $closing): int
    {
        while ($bucket = stream_bucket_make_writeable($in)) {
            $lines = explode("\n", $bucket->data);
            $formatted = [];
            foreach ($lines as $line) {
                if ($line === '') continue;
                $timestamp = date('Y-m-d H:i:s');
                $formatted[] = "[{$timestamp}] [{$this->prefix}] {$line}";
            }
            $bucket->data = implode("\n", $formatted) . "\n";
            $consumed += $bucket->datalen;
            stream_bucket_append($out, $bucket);
        }
        return PSFS_PASS_ON;
    }
}

stream_filter_register('log.formatter', LogFormatterFilter::class);

class Logger
{
    private $fp;

    public function __construct(string $filePath, string $level = 'INFO')
    {
        $this->fp = fopen($filePath, 'a');
        // $params にログレベルを渡す
        stream_filter_append($this->fp, 'log.formatter', STREAM_FILTER_WRITE, $level);
    }

    public function write(string $message): void
    {
        fwrite($this->fp, $message);
    }

    public function __destruct()
    {
        fclose($this->fp);
    }
}

// 使用例
$logger = new Logger('/tmp/app.log', 'ERROR');
$logger->write("データベース接続に失敗しました");
$logger->write("再試行します");

// /tmp/app.log に以下のように書き込まれる:
// [2025-05-13 10:00:00] [ERROR] データベース接続に失敗しました
// [2025-05-13 10:00:00] [ERROR] 再試行します

ポイント: onCreate() をオーバーライドすることで、フィルターのパラメータ($this->params)を初期化時に受け取れます。


例2:CSVサニタイズフィルター(CsvSanitizerFilter)

外部から受け取ったCSVファイルを読み込む際に、各フィールドのトリム・改行除去・不正文字削除をストリーム上でリアルタイムに行うフィルターです。

<?php

class CsvSanitizerFilter extends php_user_filter
{
    public function filter($in, $out, &$consumed, bool $closing): int
    {
        while ($bucket = stream_bucket_make_writeable($in)) {
            $lines = explode("\n", $bucket->data);
            $sanitized = [];

            foreach ($lines as $line) {
                if (trim($line) === '') continue;

                // CSVの各フィールドをパース→サニタイズ→再構築
                $fields = str_getcsv($line);
                $fields = array_map(function (string $field): string {
                    $field = trim($field);                          // 前後の空白除去
                    $field = str_replace(["\r", "\n"], '', $field); // 改行除去
                    $field = preg_replace('/[^\x20-\x7E\x{3040}-\x{9FAF}]/u', '', $field); // 制御文字除去
                    return $field;
                }, $fields);

                $sanitized[] = implode(',', array_map(
                    fn($f) => '"' . str_replace('"', '""', $f) . '"',
                    $fields
                ));
            }

            $bucket->data = implode("\n", $sanitized) . "\n";
            $consumed += $bucket->datalen;
            stream_bucket_append($out, $bucket);
        }
        return PSFS_PASS_ON;
    }
}

stream_filter_register('csv.sanitizer', CsvSanitizerFilter::class);

class CsvImporter
{
    public function import(string $filePath): array
    {
        $fp = fopen($filePath, 'r');
        stream_filter_append($fp, 'csv.sanitizer', STREAM_FILTER_READ);

        $rows = [];
        while (($line = fgets($fp)) !== false) {
            $row = str_getcsv(trim($line));
            if (!empty(array_filter($row))) {
                $rows[] = $row;
            }
        }
        fclose($fp);
        return $rows;
    }
}

// 使用例
$importer = new CsvImporter();
$data = $importer->import('/tmp/input.csv');
foreach ($data as $row) {
    echo implode(' | ', $row) . PHP_EOL;
}

例3:Base64エンコード/デコードフィルター(Base64Filter)

メール添付ファイルや API レスポンスのバイナリデータを、ストリーム上で透過的に Base64 変換するフィルターです。

<?php

class Base64Filter extends php_user_filter
{
    private string $mode;
    private string $buffer = '';

    public function onCreate(): bool
    {
        $this->mode   = $this->params === 'decode' ? 'decode' : 'encode';
        $this->buffer = '';
        return true;
    }

    public function filter($in, $out, &$consumed, bool $closing): int
    {
        while ($bucket = stream_bucket_make_writeable($in)) {
            $this->buffer .= $bucket->data;
            $consumed += $bucket->datalen;
        }

        // ストリーム終端のみ処理(バッファリングして一括変換)
        if ($closing && $this->buffer !== '') {
            $result = $this->mode === 'encode'
                ? base64_encode($this->buffer)
                : base64_decode($this->buffer);

            $bucket = stream_bucket_new($this->stream, $result);
            stream_bucket_append($out, $bucket);
            $this->buffer = '';
        }

        return PSFS_PASS_ON;
    }
}

stream_filter_register('base64.transform', Base64Filter::class);

class FileEncoder
{
    /**
     * ファイルをBase64エンコードして別ファイルに書き出す
     */
    public function encode(string $src, string $dst): void
    {
        $in  = fopen($src, 'rb');
        $out = fopen($dst, 'wb');

        stream_filter_append($in, 'base64.transform', STREAM_FILTER_READ, 'encode');
        stream_copy_to_stream($in, $out);

        fclose($in);
        fclose($out);
    }

    /**
     * Base64エンコードされたファイルをデコードして書き出す
     */
    public function decode(string $src, string $dst): void
    {
        $in  = fopen($src, 'rb');
        $out = fopen($dst, 'wb');

        stream_filter_append($in, 'base64.transform', STREAM_FILTER_READ, 'decode');
        stream_copy_to_stream($in, $out);

        fclose($in);
        fclose($out);
    }
}

$encoder = new FileEncoder();
$encoder->encode('/tmp/image.png', '/tmp/image.b64');
$encoder->decode('/tmp/image.b64', '/tmp/image_restored.png');
echo "エンコード・デコード完了" . PHP_EOL;

例4:文字コード変換フィルター(EncodingConvertFilter)

Shift_JIS や EUC-JP の legacy ファイルを読み込む際に、UTF-8 へリアルタイム変換するフィルターです。

<?php

class EncodingConvertFilter extends php_user_filter
{
    private string $fromEncoding;
    private string $toEncoding;

    public function onCreate(): bool
    {
        // "SJIS-win:UTF-8" 形式で from:to を受け取る
        $parts = explode(':', $this->params ?? 'SJIS-win:UTF-8', 2);
        $this->fromEncoding = $parts[0] ?? 'SJIS-win';
        $this->toEncoding   = $parts[1] ?? 'UTF-8';
        return true;
    }

    public function filter($in, $out, &$consumed, bool $closing): int
    {
        while ($bucket = stream_bucket_make_writeable($in)) {
            $converted = mb_convert_encoding(
                $bucket->data,
                $this->toEncoding,
                $this->fromEncoding
            );
            $bucket->data = $converted;
            $consumed += $bucket->datalen;
            stream_bucket_append($out, $bucket);
        }
        return PSFS_PASS_ON;
    }
}

stream_filter_register('encoding.convert', EncodingConvertFilter::class);

class LegacyFileReader
{
    public function read(string $filePath, string $encoding = 'SJIS-win'): string
    {
        $fp = fopen($filePath, 'rb');
        stream_filter_append(
            $fp,
            'encoding.convert',
            STREAM_FILTER_READ,
            "{$encoding}:UTF-8"
        );

        $content = stream_get_contents($fp);
        fclose($fp);
        return $content;
    }
}

// 使用例
$reader = new LegacyFileReader();
$content = $reader->read('/tmp/legacy_sjis.txt', 'SJIS-win');
echo $content; // UTF-8として正常に出力される

例5:JSONストリームラッパーフィルター(JsonStreamFilter)

複数行の JSON オブジェクトが連続して流れてくる NDJSON(Newline Delimited JSON)ストリームを、1行ずつパースしながら変換するフィルターです。

<?php

class JsonStreamFilter extends php_user_filter
{
    private string $buffer = '';

    public function onCreate(): bool
    {
        $this->buffer = '';
        return true;
    }

    public function filter($in, $out, &$consumed, bool $closing): int
    {
        while ($bucket = stream_bucket_make_writeable($in)) {
            $this->buffer .= $bucket->data;
            $consumed += $bucket->datalen;
        }

        // 行単位でJSONをパース・変換
        $lines     = explode("\n", $this->buffer);
        $remaining = array_pop($lines); // 未完の行はバッファに残す

        $output = '';
        foreach ($lines as $line) {
            $line = trim($line);
            if ($line === '') continue;

            $decoded = json_decode($line, true);
            if ($decoded === null) continue;

            // フィールド名をスネークケース→キャメルケースに変換
            $transformed = $this->toCamelCase($decoded);
            $output .= json_encode($transformed, JSON_UNESCAPED_UNICODE) . "\n";
        }

        $this->buffer = $remaining ?? '';

        if ($output !== '') {
            $bucket = stream_bucket_new($this->stream, $output);
            stream_bucket_append($out, $bucket);
        }

        return PSFS_PASS_ON;
    }

    private function toCamelCase(array $data): array
    {
        $result = [];
        foreach ($data as $key => $value) {
            $camelKey = lcfirst(str_replace('_', '', ucwords($key, '_')));
            $result[$camelKey] = is_array($value) ? $this->toCamelCase($value) : $value;
        }
        return $result;
    }
}

stream_filter_register('json.camelcase', JsonStreamFilter::class);

// 使用例: NDJSON ファイルを変換して読み込む
$ndjson = '{"user_id":1,"first_name":"太郎","last_name":"山田"}' . "\n"
        . '{"user_id":2,"first_name":"花子","last_name":"鈴木"}' . "\n";

$fp = fopen('php://memory', 'w+');
fwrite($fp, $ndjson);
rewind($fp);

stream_filter_append($fp, 'json.camelcase', STREAM_FILTER_READ);

while (($line = fgets($fp)) !== false) {
    echo $line;
}
fclose($fp);

// 出力:
// {"userId":1,"firstName":"太郎","lastName":"山田"}
// {"userId":2,"firstName":"花子","lastName":"鈴木"}

例6:レート制限フィルター(ThrottleFilter)

大容量ファイルのダウンロードや転送速度を意図的に制限したいケースで使う、スループット調整フィルターです。

<?php

class ThrottleFilter extends php_user_filter
{
    private int   $bytesPerSecond;
    private float $lastTime;
    private int   $bytesSent = 0;

    public function onCreate(): bool
    {
        // params でバイト/秒を指定(デフォルト: 1MB/s)
        $this->bytesPerSecond = (int)($this->params ?? 1_048_576);
        $this->lastTime       = microtime(true);
        $this->bytesSent      = 0;
        return true;
    }

    public function filter($in, $out, &$consumed, bool $closing): int
    {
        while ($bucket = stream_bucket_make_writeable($in)) {
            $this->bytesSent += $bucket->datalen;
            $consumed        += $bucket->datalen;

            // 送信量に応じたスリープ時間を計算
            $elapsed  = microtime(true) - $this->lastTime;
            $expected = $this->bytesSent / $this->bytesPerSecond;

            if ($expected > $elapsed) {
                $sleepMicro = (int)(($expected - $elapsed) * 1_000_000);
                usleep($sleepMicro);
            }

            stream_bucket_append($out, $bucket);
        }
        return PSFS_PASS_ON;
    }
}

stream_filter_register('io.throttle', ThrottleFilter::class);

class ThrottledFileServer
{
    /**
     * 転送速度を制限してファイルを出力する
     *
     * @param string $filePath       送信するファイルパス
     * @param int    $bytesPerSecond 最大転送速度(バイト/秒)
     */
    public function serve(string $filePath, int $bytesPerSecond = 512_000): void
    {
        $fp = fopen($filePath, 'rb');
        stream_filter_append($fp, 'io.throttle', STREAM_FILTER_READ, $bytesPerSecond);

        header('Content-Type: application/octet-stream');
        header('Content-Disposition: attachment; filename="' . basename($filePath) . '"');

        fpassthru($fp);
        fclose($fp);
    }
}

// 使用例(Webリクエスト内で):
// $server = new ThrottledFileServer();
// $server->serve('/var/www/downloads/large_file.zip', 256_000); // 256KB/s
echo "ThrottleFilter 登録完了" . PHP_EOL;

例7:マルチフィルターチェーン管理クラス(StreamFilterChain)

複数のカスタムフィルターを順序立ててストリームに適用し、パイプライン処理を実現するユーティリティクラスです。

<?php

// --- フィルター定義 ---

class TrimLinesFilter extends php_user_filter
{
    public function filter($in, $out, &$consumed, bool $closing): int
    {
        while ($bucket = stream_bucket_make_writeable($in)) {
            $lines = explode("\n", $bucket->data);
            $bucket->data = implode("\n", array_map('trim', $lines));
            $consumed += $bucket->datalen;
            stream_bucket_append($out, $bucket);
        }
        return PSFS_PASS_ON;
    }
}

class RemoveEmptyLinesFilter extends php_user_filter
{
    public function filter($in, $out, &$consumed, bool $closing): int
    {
        while ($bucket = stream_bucket_make_writeable($in)) {
            $lines = explode("\n", $bucket->data);
            $lines = array_filter($lines, fn($l) => trim($l) !== '');
            $bucket->data = implode("\n", $lines) . "\n";
            $consumed += $bucket->datalen;
            stream_bucket_append($out, $bucket);
        }
        return PSFS_PASS_ON;
    }
}

class NumberLinesFilter extends php_user_filter
{
    private int $lineNumber = 1;

    public function filter($in, $out, &$consumed, bool $closing): int
    {
        while ($bucket = stream_bucket_make_writeable($in)) {
            $lines = explode("\n", rtrim($bucket->data, "\n"));
            $numbered = [];
            foreach ($lines as $line) {
                if ($line !== '') {
                    $numbered[] = sprintf('%4d: %s', $this->lineNumber++, $line);
                }
            }
            $bucket->data = implode("\n", $numbered) . "\n";
            $consumed += $bucket->datalen;
            stream_bucket_append($out, $bucket);
        }
        return PSFS_PASS_ON;
    }
}

// --- フィルター登録 ---
stream_filter_register('text.trim_lines',        TrimLinesFilter::class);
stream_filter_register('text.remove_empty',      RemoveEmptyLinesFilter::class);
stream_filter_register('text.number_lines',      NumberLinesFilter::class);

// --- チェーン管理クラス ---

class StreamFilterChain
{
    /** @var array{name: string, mode: int, params: mixed}[] */
    private array $filters = [];

    public function pipe(string $filterName, int $mode = STREAM_FILTER_ALL, mixed $params = null): static
    {
        $this->filters[] = ['name' => $filterName, 'mode' => $mode, 'params' => $params];
        return $this;
    }

    public function apply($stream): array
    {
        $handles = [];
        foreach ($this->filters as $filter) {
            $handles[] = stream_filter_append(
                $stream,
                $filter['name'],
                $filter['mode'],
                $filter['params']
            );
        }
        return $handles;
    }
}

// --- 使用例 ---

$rawText = "   PHP Stream Filter  \n"
         . "\n"
         . "  行1のデータ    \n"
         . "     \n"
         . "行2のデータ\n"
         . "  行3のデータ   \n";

$fp = fopen('php://memory', 'w+');
fwrite($fp, $rawText);
rewind($fp);

$chain = (new StreamFilterChain())
    ->pipe('text.trim_lines',   STREAM_FILTER_READ)
    ->pipe('text.remove_empty', STREAM_FILTER_READ)
    ->pipe('text.number_lines', STREAM_FILTER_READ);

$chain->apply($fp);

echo fread($fp, 4096);
fclose($fp);

// 出力:
//    1: PHP Stream Filter
//    2: 行1のデータ
//    3: 行2のデータ
//    4: 行3のデータ

関連する関数との比較

関数役割
stream_filter_register()カスタムフィルタークラスをPHPのフィルターシステムに登録
stream_filter_append()登録済みフィルターをストリームの末尾に追加
stream_filter_prepend()登録済みフィルターをストリームの先頭に追加
stream_filter_remove()ストリームからフィルターを取り外す
stream_get_filters()現在利用可能な全フィルター名を一覧取得
stream_wrapper_register()カスタムストリームラッパー(プロトコル)を登録(フィルターより上位概念)

フィルターとラッパーの違い

  • フィルター(filter):既存のストリーム上を流れるデータを「変換・加工」する
  • ラッパー(wrapper)fopen('myproto://...') のように新しいストリームプロトコル自体を定義する

注意点とベストプラクティス

1. 同名フィルターの再登録はできない

stream_filter_register() は同じ名前で2回呼ぶと false を返します。名前の衝突を避けるために、vendor.category.name のようなドット区切りの命名規則を使いましょう。

// 良い命名例
stream_filter_register('myapp.text.trim',       TrimFilter::class);
stream_filter_register('myapp.crypto.aes256',   AesFilter::class);
stream_filter_register('myapp.compress.brotli', BrotliFilter::class);

2. バケット処理の注意

stream_bucket_make_writeable() は入力バケットブリゲードからバケットを取り出します。取り出したバケットは必ず stream_bucket_append() で出力に渡すか、消費済みとして処理する必要があります。

3. バッファリングが必要なフィルター

Base64 変換のように「全データ揃ってから処理」する場合、$closing === true のタイミングで出力するパターンが有効です。ただし、メモリ使用量に注意が必要です。

4. onCreate() と onClose() の活用

  • onCreate(): フィルターがストリームにアタッチされた際の初期化
  • onClose(): フィルターがデタッチされる際のクリーンアップ(リソース解放など)

まとめ

ポイント内容
登録stream_filter_register('名前', クラス名) でフィルターをシステムに登録
実装php_user_filter を継承し filter() メソッドで変換ロジックを記述
適用stream_filter_append() でストリームにアタッチ
戻り値PSFS_PASS_ON / PSFS_FEED_ME / PSFS_ERR_FATAL
初期化onCreate() でパラメータを受け取り、フィルター設定を初期化
活用ログ整形・文字コード変換・CSV加工・Base64変換・スロットリングなど幅広く使える

stream_filter_register() をマスターすると、I/Oをまたぐ変換処理を一箇所に集約でき、コードの見通しが大きく向上します。ストリームを扱うあらゆる場面に透過的に処理を挟み込める強力な仕組みをぜひ活用してください。

タイトルとURLをコピーしました