はじめに
PHPでファイルの読み書きやネットワーク通信を行う際、「読み取ったデータを自動的に変換したい」「書き込む前に暗号化や圧縮を挟みたい」と思ったことはないでしょうか。
stream_filter_register() は、ストリームにアタッチできる独自のフィルタークラスを登録する関数です。一度登録しておけば、stream_filter_append() や fopen() のコンテキストオプションと組み合わせることで、読み書きのたびにデータが自動的にフィルター処理されます。
組み込みフィルター(string.rot13、zlib.deflate など)では対応できない独自処理を、シームレスにストリームへ組み込めるのが最大の魅力です。
関数の基本情報
| 項目 | 内容 |
|---|---|
| 関数名 | stream_filter_register() |
| 対応バージョン | PHP 5.0.0 以降 |
| 返り値 | bool(成功時 true、失敗時 false) |
| カテゴリ | ストリーム関数 |
構文
stream_filter_register(string $filter_name, string $class): bool
パラメータ
| パラメータ | 型 | 説明 |
|---|---|---|
$filter_name | string | フィルターの名前。fopen() や stream_filter_append() で指定する識別子 |
$class | string | php_user_filter を継承したクラス名(文字列で渡す) |
返り値
true:登録成功false:同名フィルターがすでに登録済み、またはクラスが存在しない場合
カスタムフィルタークラスの基本構造
stream_filter_register() を使うには、まず php_user_filter を継承したクラスを定義する必要があります。実装すべきメソッドは以下のとおりです。
class MyFilter extends php_user_filter
{
/**
* データバケットにフィルター処理を適用する
*
* @param resource $in 入力バケットブリゲード
* @param resource $out 出力バケットブリゲード
* @param int $consumed 処理済みバイト数(参照渡し)
* @param bool $closing ストリームが閉じられようとしているか
* @return int PSFS_PASS_ON / PSFS_FEED_ME / PSFS_ERR_FATAL
*/
public function filter($in, $out, &$consumed, bool $closing): int
{
while ($bucket = stream_bucket_make_writeable($in)) {
// $bucket->data にフィルター処理を施す
$bucket->data = strtoupper($bucket->data);
$consumed += $bucket->datalen;
stream_bucket_append($out, $bucket);
}
return PSFS_PASS_ON;
}
}
stream_filter_register('my.uppercase', MyFilter::class);
filter() の戻り値定数
| 定数 | 意味 |
|---|---|
PSFS_PASS_ON | 出力バケットにデータを渡す(通常はこれを返す) |
PSFS_FEED_ME | データが足りないため、もっと入力が必要 |
PSFS_ERR_FATAL | 回復不能なエラーが発生した |
基本的な使い方
<?php
class UpperCaseFilter extends php_user_filter
{
public function filter($in, $out, &$consumed, bool $closing): int
{
while ($bucket = stream_bucket_make_writeable($in)) {
$bucket->data = strtoupper($bucket->data);
$consumed += $bucket->datalen;
stream_bucket_append($out, $bucket);
}
return PSFS_PASS_ON;
}
}
stream_filter_register('str.uppercase', UpperCaseFilter::class);
$fp = fopen('php://memory', 'w+');
stream_filter_append($fp, 'str.uppercase');
fwrite($fp, 'hello, stream filter!');
rewind($fp);
echo fread($fp, 1024);
// 出力: HELLO, STREAM FILTER!
fclose($fp);
実践的なクラスベースの活用例
例1:ログ整形フィルター(LogFormatterFilter)
アプリケーションのログファイルへの書き込み時に、タイムスタンプとプレフィックスを自動付与するフィルターです。
<?php
class LogFormatterFilter extends php_user_filter
{
private string $prefix;
public function onCreate(): bool
{
// params プロパティでフィルター名のサフィックスを取得
$this->prefix = $this->params ?? 'INFO';
return true;
}
public function filter($in, $out, &$consumed, bool $closing): int
{
while ($bucket = stream_bucket_make_writeable($in)) {
$lines = explode("\n", $bucket->data);
$formatted = [];
foreach ($lines as $line) {
if ($line === '') continue;
$timestamp = date('Y-m-d H:i:s');
$formatted[] = "[{$timestamp}] [{$this->prefix}] {$line}";
}
$bucket->data = implode("\n", $formatted) . "\n";
$consumed += $bucket->datalen;
stream_bucket_append($out, $bucket);
}
return PSFS_PASS_ON;
}
}
stream_filter_register('log.formatter', LogFormatterFilter::class);
class Logger
{
private $fp;
public function __construct(string $filePath, string $level = 'INFO')
{
$this->fp = fopen($filePath, 'a');
// $params にログレベルを渡す
stream_filter_append($this->fp, 'log.formatter', STREAM_FILTER_WRITE, $level);
}
public function write(string $message): void
{
fwrite($this->fp, $message);
}
public function __destruct()
{
fclose($this->fp);
}
}
// 使用例
$logger = new Logger('/tmp/app.log', 'ERROR');
$logger->write("データベース接続に失敗しました");
$logger->write("再試行します");
// /tmp/app.log に以下のように書き込まれる:
// [2025-05-13 10:00:00] [ERROR] データベース接続に失敗しました
// [2025-05-13 10:00:00] [ERROR] 再試行します
ポイント: onCreate() をオーバーライドすることで、フィルターのパラメータ($this->params)を初期化時に受け取れます。
例2:CSVサニタイズフィルター(CsvSanitizerFilter)
外部から受け取ったCSVファイルを読み込む際に、各フィールドのトリム・改行除去・不正文字削除をストリーム上でリアルタイムに行うフィルターです。
<?php
class CsvSanitizerFilter extends php_user_filter
{
public function filter($in, $out, &$consumed, bool $closing): int
{
while ($bucket = stream_bucket_make_writeable($in)) {
$lines = explode("\n", $bucket->data);
$sanitized = [];
foreach ($lines as $line) {
if (trim($line) === '') continue;
// CSVの各フィールドをパース→サニタイズ→再構築
$fields = str_getcsv($line);
$fields = array_map(function (string $field): string {
$field = trim($field); // 前後の空白除去
$field = str_replace(["\r", "\n"], '', $field); // 改行除去
$field = preg_replace('/[^\x20-\x7E\x{3040}-\x{9FAF}]/u', '', $field); // 制御文字除去
return $field;
}, $fields);
$sanitized[] = implode(',', array_map(
fn($f) => '"' . str_replace('"', '""', $f) . '"',
$fields
));
}
$bucket->data = implode("\n", $sanitized) . "\n";
$consumed += $bucket->datalen;
stream_bucket_append($out, $bucket);
}
return PSFS_PASS_ON;
}
}
stream_filter_register('csv.sanitizer', CsvSanitizerFilter::class);
class CsvImporter
{
public function import(string $filePath): array
{
$fp = fopen($filePath, 'r');
stream_filter_append($fp, 'csv.sanitizer', STREAM_FILTER_READ);
$rows = [];
while (($line = fgets($fp)) !== false) {
$row = str_getcsv(trim($line));
if (!empty(array_filter($row))) {
$rows[] = $row;
}
}
fclose($fp);
return $rows;
}
}
// 使用例
$importer = new CsvImporter();
$data = $importer->import('/tmp/input.csv');
foreach ($data as $row) {
echo implode(' | ', $row) . PHP_EOL;
}
例3:Base64エンコード/デコードフィルター(Base64Filter)
メール添付ファイルや API レスポンスのバイナリデータを、ストリーム上で透過的に Base64 変換するフィルターです。
<?php
class Base64Filter extends php_user_filter
{
private string $mode;
private string $buffer = '';
public function onCreate(): bool
{
$this->mode = $this->params === 'decode' ? 'decode' : 'encode';
$this->buffer = '';
return true;
}
public function filter($in, $out, &$consumed, bool $closing): int
{
while ($bucket = stream_bucket_make_writeable($in)) {
$this->buffer .= $bucket->data;
$consumed += $bucket->datalen;
}
// ストリーム終端のみ処理(バッファリングして一括変換)
if ($closing && $this->buffer !== '') {
$result = $this->mode === 'encode'
? base64_encode($this->buffer)
: base64_decode($this->buffer);
$bucket = stream_bucket_new($this->stream, $result);
stream_bucket_append($out, $bucket);
$this->buffer = '';
}
return PSFS_PASS_ON;
}
}
stream_filter_register('base64.transform', Base64Filter::class);
class FileEncoder
{
/**
* ファイルをBase64エンコードして別ファイルに書き出す
*/
public function encode(string $src, string $dst): void
{
$in = fopen($src, 'rb');
$out = fopen($dst, 'wb');
stream_filter_append($in, 'base64.transform', STREAM_FILTER_READ, 'encode');
stream_copy_to_stream($in, $out);
fclose($in);
fclose($out);
}
/**
* Base64エンコードされたファイルをデコードして書き出す
*/
public function decode(string $src, string $dst): void
{
$in = fopen($src, 'rb');
$out = fopen($dst, 'wb');
stream_filter_append($in, 'base64.transform', STREAM_FILTER_READ, 'decode');
stream_copy_to_stream($in, $out);
fclose($in);
fclose($out);
}
}
$encoder = new FileEncoder();
$encoder->encode('/tmp/image.png', '/tmp/image.b64');
$encoder->decode('/tmp/image.b64', '/tmp/image_restored.png');
echo "エンコード・デコード完了" . PHP_EOL;
例4:文字コード変換フィルター(EncodingConvertFilter)
Shift_JIS や EUC-JP の legacy ファイルを読み込む際に、UTF-8 へリアルタイム変換するフィルターです。
<?php
class EncodingConvertFilter extends php_user_filter
{
private string $fromEncoding;
private string $toEncoding;
public function onCreate(): bool
{
// "SJIS-win:UTF-8" 形式で from:to を受け取る
$parts = explode(':', $this->params ?? 'SJIS-win:UTF-8', 2);
$this->fromEncoding = $parts[0] ?? 'SJIS-win';
$this->toEncoding = $parts[1] ?? 'UTF-8';
return true;
}
public function filter($in, $out, &$consumed, bool $closing): int
{
while ($bucket = stream_bucket_make_writeable($in)) {
$converted = mb_convert_encoding(
$bucket->data,
$this->toEncoding,
$this->fromEncoding
);
$bucket->data = $converted;
$consumed += $bucket->datalen;
stream_bucket_append($out, $bucket);
}
return PSFS_PASS_ON;
}
}
stream_filter_register('encoding.convert', EncodingConvertFilter::class);
class LegacyFileReader
{
public function read(string $filePath, string $encoding = 'SJIS-win'): string
{
$fp = fopen($filePath, 'rb');
stream_filter_append(
$fp,
'encoding.convert',
STREAM_FILTER_READ,
"{$encoding}:UTF-8"
);
$content = stream_get_contents($fp);
fclose($fp);
return $content;
}
}
// 使用例
$reader = new LegacyFileReader();
$content = $reader->read('/tmp/legacy_sjis.txt', 'SJIS-win');
echo $content; // UTF-8として正常に出力される
例5:JSONストリームラッパーフィルター(JsonStreamFilter)
複数行の JSON オブジェクトが連続して流れてくる NDJSON(Newline Delimited JSON)ストリームを、1行ずつパースしながら変換するフィルターです。
<?php
class JsonStreamFilter extends php_user_filter
{
private string $buffer = '';
public function onCreate(): bool
{
$this->buffer = '';
return true;
}
public function filter($in, $out, &$consumed, bool $closing): int
{
while ($bucket = stream_bucket_make_writeable($in)) {
$this->buffer .= $bucket->data;
$consumed += $bucket->datalen;
}
// 行単位でJSONをパース・変換
$lines = explode("\n", $this->buffer);
$remaining = array_pop($lines); // 未完の行はバッファに残す
$output = '';
foreach ($lines as $line) {
$line = trim($line);
if ($line === '') continue;
$decoded = json_decode($line, true);
if ($decoded === null) continue;
// フィールド名をスネークケース→キャメルケースに変換
$transformed = $this->toCamelCase($decoded);
$output .= json_encode($transformed, JSON_UNESCAPED_UNICODE) . "\n";
}
$this->buffer = $remaining ?? '';
if ($output !== '') {
$bucket = stream_bucket_new($this->stream, $output);
stream_bucket_append($out, $bucket);
}
return PSFS_PASS_ON;
}
private function toCamelCase(array $data): array
{
$result = [];
foreach ($data as $key => $value) {
$camelKey = lcfirst(str_replace('_', '', ucwords($key, '_')));
$result[$camelKey] = is_array($value) ? $this->toCamelCase($value) : $value;
}
return $result;
}
}
stream_filter_register('json.camelcase', JsonStreamFilter::class);
// 使用例: NDJSON ファイルを変換して読み込む
$ndjson = '{"user_id":1,"first_name":"太郎","last_name":"山田"}' . "\n"
. '{"user_id":2,"first_name":"花子","last_name":"鈴木"}' . "\n";
$fp = fopen('php://memory', 'w+');
fwrite($fp, $ndjson);
rewind($fp);
stream_filter_append($fp, 'json.camelcase', STREAM_FILTER_READ);
while (($line = fgets($fp)) !== false) {
echo $line;
}
fclose($fp);
// 出力:
// {"userId":1,"firstName":"太郎","lastName":"山田"}
// {"userId":2,"firstName":"花子","lastName":"鈴木"}
例6:レート制限フィルター(ThrottleFilter)
大容量ファイルのダウンロードや転送速度を意図的に制限したいケースで使う、スループット調整フィルターです。
<?php
class ThrottleFilter extends php_user_filter
{
private int $bytesPerSecond;
private float $lastTime;
private int $bytesSent = 0;
public function onCreate(): bool
{
// params でバイト/秒を指定(デフォルト: 1MB/s)
$this->bytesPerSecond = (int)($this->params ?? 1_048_576);
$this->lastTime = microtime(true);
$this->bytesSent = 0;
return true;
}
public function filter($in, $out, &$consumed, bool $closing): int
{
while ($bucket = stream_bucket_make_writeable($in)) {
$this->bytesSent += $bucket->datalen;
$consumed += $bucket->datalen;
// 送信量に応じたスリープ時間を計算
$elapsed = microtime(true) - $this->lastTime;
$expected = $this->bytesSent / $this->bytesPerSecond;
if ($expected > $elapsed) {
$sleepMicro = (int)(($expected - $elapsed) * 1_000_000);
usleep($sleepMicro);
}
stream_bucket_append($out, $bucket);
}
return PSFS_PASS_ON;
}
}
stream_filter_register('io.throttle', ThrottleFilter::class);
class ThrottledFileServer
{
/**
* 転送速度を制限してファイルを出力する
*
* @param string $filePath 送信するファイルパス
* @param int $bytesPerSecond 最大転送速度(バイト/秒)
*/
public function serve(string $filePath, int $bytesPerSecond = 512_000): void
{
$fp = fopen($filePath, 'rb');
stream_filter_append($fp, 'io.throttle', STREAM_FILTER_READ, $bytesPerSecond);
header('Content-Type: application/octet-stream');
header('Content-Disposition: attachment; filename="' . basename($filePath) . '"');
fpassthru($fp);
fclose($fp);
}
}
// 使用例(Webリクエスト内で):
// $server = new ThrottledFileServer();
// $server->serve('/var/www/downloads/large_file.zip', 256_000); // 256KB/s
echo "ThrottleFilter 登録完了" . PHP_EOL;
例7:マルチフィルターチェーン管理クラス(StreamFilterChain)
複数のカスタムフィルターを順序立ててストリームに適用し、パイプライン処理を実現するユーティリティクラスです。
<?php
// --- フィルター定義 ---
class TrimLinesFilter extends php_user_filter
{
public function filter($in, $out, &$consumed, bool $closing): int
{
while ($bucket = stream_bucket_make_writeable($in)) {
$lines = explode("\n", $bucket->data);
$bucket->data = implode("\n", array_map('trim', $lines));
$consumed += $bucket->datalen;
stream_bucket_append($out, $bucket);
}
return PSFS_PASS_ON;
}
}
class RemoveEmptyLinesFilter extends php_user_filter
{
public function filter($in, $out, &$consumed, bool $closing): int
{
while ($bucket = stream_bucket_make_writeable($in)) {
$lines = explode("\n", $bucket->data);
$lines = array_filter($lines, fn($l) => trim($l) !== '');
$bucket->data = implode("\n", $lines) . "\n";
$consumed += $bucket->datalen;
stream_bucket_append($out, $bucket);
}
return PSFS_PASS_ON;
}
}
class NumberLinesFilter extends php_user_filter
{
private int $lineNumber = 1;
public function filter($in, $out, &$consumed, bool $closing): int
{
while ($bucket = stream_bucket_make_writeable($in)) {
$lines = explode("\n", rtrim($bucket->data, "\n"));
$numbered = [];
foreach ($lines as $line) {
if ($line !== '') {
$numbered[] = sprintf('%4d: %s', $this->lineNumber++, $line);
}
}
$bucket->data = implode("\n", $numbered) . "\n";
$consumed += $bucket->datalen;
stream_bucket_append($out, $bucket);
}
return PSFS_PASS_ON;
}
}
// --- フィルター登録 ---
stream_filter_register('text.trim_lines', TrimLinesFilter::class);
stream_filter_register('text.remove_empty', RemoveEmptyLinesFilter::class);
stream_filter_register('text.number_lines', NumberLinesFilter::class);
// --- チェーン管理クラス ---
class StreamFilterChain
{
/** @var array{name: string, mode: int, params: mixed}[] */
private array $filters = [];
public function pipe(string $filterName, int $mode = STREAM_FILTER_ALL, mixed $params = null): static
{
$this->filters[] = ['name' => $filterName, 'mode' => $mode, 'params' => $params];
return $this;
}
public function apply($stream): array
{
$handles = [];
foreach ($this->filters as $filter) {
$handles[] = stream_filter_append(
$stream,
$filter['name'],
$filter['mode'],
$filter['params']
);
}
return $handles;
}
}
// --- 使用例 ---
$rawText = " PHP Stream Filter \n"
. "\n"
. " 行1のデータ \n"
. " \n"
. "行2のデータ\n"
. " 行3のデータ \n";
$fp = fopen('php://memory', 'w+');
fwrite($fp, $rawText);
rewind($fp);
$chain = (new StreamFilterChain())
->pipe('text.trim_lines', STREAM_FILTER_READ)
->pipe('text.remove_empty', STREAM_FILTER_READ)
->pipe('text.number_lines', STREAM_FILTER_READ);
$chain->apply($fp);
echo fread($fp, 4096);
fclose($fp);
// 出力:
// 1: PHP Stream Filter
// 2: 行1のデータ
// 3: 行2のデータ
// 4: 行3のデータ
関連する関数との比較
| 関数 | 役割 |
|---|---|
stream_filter_register() | カスタムフィルタークラスをPHPのフィルターシステムに登録 |
stream_filter_append() | 登録済みフィルターをストリームの末尾に追加 |
stream_filter_prepend() | 登録済みフィルターをストリームの先頭に追加 |
stream_filter_remove() | ストリームからフィルターを取り外す |
stream_get_filters() | 現在利用可能な全フィルター名を一覧取得 |
stream_wrapper_register() | カスタムストリームラッパー(プロトコル)を登録(フィルターより上位概念) |
フィルターとラッパーの違い
- フィルター(filter):既存のストリーム上を流れるデータを「変換・加工」する
- ラッパー(wrapper):
fopen('myproto://...')のように新しいストリームプロトコル自体を定義する
注意点とベストプラクティス
1. 同名フィルターの再登録はできない
stream_filter_register() は同じ名前で2回呼ぶと false を返します。名前の衝突を避けるために、vendor.category.name のようなドット区切りの命名規則を使いましょう。
// 良い命名例
stream_filter_register('myapp.text.trim', TrimFilter::class);
stream_filter_register('myapp.crypto.aes256', AesFilter::class);
stream_filter_register('myapp.compress.brotli', BrotliFilter::class);
2. バケット処理の注意
stream_bucket_make_writeable() は入力バケットブリゲードからバケットを取り出します。取り出したバケットは必ず stream_bucket_append() で出力に渡すか、消費済みとして処理する必要があります。
3. バッファリングが必要なフィルター
Base64 変換のように「全データ揃ってから処理」する場合、$closing === true のタイミングで出力するパターンが有効です。ただし、メモリ使用量に注意が必要です。
4. onCreate() と onClose() の活用
onCreate(): フィルターがストリームにアタッチされた際の初期化onClose(): フィルターがデタッチされる際のクリーンアップ(リソース解放など)
まとめ
| ポイント | 内容 |
|---|---|
| 登録 | stream_filter_register('名前', クラス名) でフィルターをシステムに登録 |
| 実装 | php_user_filter を継承し filter() メソッドで変換ロジックを記述 |
| 適用 | stream_filter_append() でストリームにアタッチ |
| 戻り値 | PSFS_PASS_ON / PSFS_FEED_ME / PSFS_ERR_FATAL |
| 初期化 | onCreate() でパラメータを受け取り、フィルター設定を初期化 |
| 活用 | ログ整形・文字コード変換・CSV加工・Base64変換・スロットリングなど幅広く使える |
stream_filter_register() をマスターすると、I/Oをまたぐ変換処理を一箇所に集約でき、コードの見通しが大きく向上します。ストリームを扱うあらゆる場面に透過的に処理を挟み込める強力な仕組みをぜひ活用してください。
