[PHP]posix_mkfifo関数とは?名前付きパイプを作成する方法を徹底解説

PHP

こんにちは!今回はPHPのPOSIX関数の中から、posix_mkfifo関数について詳しく解説していきます。プロセス間通信(IPC)の手法の一つである「名前付きパイプ(FIFO)」の作成方法を、実践的なサンプルコードと共にお伝えします。

posix_mkfifo関数とは?

posix_mkfifoは、**名前付きパイプ(Named Pipe/FIFO)**を作成する関数です。FIFOは「First In, First Out」の略で、複数のプロセス間でデータをやり取りするための特殊なファイルです。

基本構文

posix_mkfifo(string $filename, int $permissions): bool
  • 第1引数: FIFOのパス名
  • 第2引数: パーミッション(8進数)
  • 戻り値: 成功時にtrue、失敗時にfalse

名前付きパイプ(FIFO)とは?

FIFOは、ファイルシステム上に存在する特殊なファイルで、プロセス間の通信に使用されます。

通常のパイプとの違い

<?php
// 通常のパイプ(無名パイプ)
// - 親子プロセス間でのみ使用可能
// - コマンドラインの | 演算子
// 例: cat file.txt | grep "pattern"

// 名前付きパイプ(FIFO)
// - 無関係なプロセス間でも使用可能
// - ファイルシステム上に実在
// - ファイル名でアクセス可能
posix_mkfifo('/tmp/myfifo', 0666);
?>

FIFOの特徴

ファイルシステム上に存在: lsコマンドで確認可能 ✅ FIFO順序: 書き込んだ順にデータが読み出される ✅ ブロッキング動作: 読み手/書き手が揃うまで待機 ✅ 複数プロセス対応: 無関係なプロセス間で通信可能 ✅ 一方向通信: データは一方向のみに流れる

実践的な使い方

例1: 基本的なFIFOの作成と使用

<?php
$fifo_path = '/tmp/myfifo';

// FIFOを作成
if (!file_exists($fifo_path)) {
    if (posix_mkfifo($fifo_path, 0666)) {
        echo "FIFOを作成しました: {$fifo_path}\n";
    } else {
        die("FIFOの作成に失敗しました\n");
    }
}

// プロセスをフォーク
$pid = pcntl_fork();

if ($pid == -1) {
    die("フォークに失敗しました\n");
} elseif ($pid == 0) {
    // 子プロセス: データを書き込む
    echo "子プロセス: FIFOにデータを書き込みます\n";
    
    $fp = fopen($fifo_path, 'w');
    if ($fp) {
        fwrite($fp, "Hello from child process!\n");
        fwrite($fp, "これは2行目のメッセージです\n");
        fwrite($fp, "終了\n");
        fclose($fp);
        echo "子プロセス: 書き込み完了\n";
    }
    exit(0);
} else {
    // 親プロセス: データを読み取る
    echo "親プロセス: FIFOからデータを読み取ります\n";
    
    $fp = fopen($fifo_path, 'r');
    if ($fp) {
        while (!feof($fp)) {
            $line = fgets($fp);
            if ($line) {
                echo "受信: {$line}";
            }
        }
        fclose($fp);
        echo "親プロセス: 読み取り完了\n";
    }
    
    // 子プロセスの終了を待つ
    pcntl_wait($status);
    
    // FIFOを削除
    unlink($fifo_path);
}
?>

例2: ロギングシステムの実装

<?php
class FIFOLogger {
    private $fifo_path;
    private $writer_pid = null;
    
    public function __construct($fifo_path = '/tmp/app.log.fifo') {
        $this->fifo_path = $fifo_path;
    }
    
    public function startLogWriter($log_file) {
        // FIFOを作成
        if (!file_exists($this->fifo_path)) {
            if (!posix_mkfifo($this->fifo_path, 0666)) {
                throw new Exception("FIFOの作成に失敗しました");
            }
        }
        
        // ログライタープロセスをフォーク
        $pid = pcntl_fork();
        
        if ($pid == -1) {
            throw new Exception("フォークに失敗しました");
        } elseif ($pid == 0) {
            // 子プロセス: ログライター
            $this->runLogWriter($log_file);
            exit(0);
        } else {
            // 親プロセス
            $this->writer_pid = $pid;
            sleep(1); // ライターの準備を待つ
            return $pid;
        }
    }
    
    private function runLogWriter($log_file) {
        echo "ログライター起動 (PID: " . posix_getpid() . ")\n";
        
        $fifo = fopen($this->fifo_path, 'r');
        $log = fopen($log_file, 'a');
        
        if (!$fifo || !$log) {
            die("ファイルオープンに失敗しました\n");
        }
        
        // ノンブロッキングモードに設定
        stream_set_blocking($fifo, false);
        
        while (true) {
            $line = fgets($fifo);
            
            if ($line !== false) {
                fwrite($log, $line);
                fflush($log);
            } else {
                usleep(100000); // 0.1秒待機
            }
        }
        
        fclose($fifo);
        fclose($log);
    }
    
    public function log($level, $message) {
        $timestamp = date('Y-m-d H:i:s');
        $pid = posix_getpid();
        $log_line = "[{$timestamp}] [{$level}] [PID:{$pid}] {$message}\n";
        
        // FIFOに書き込み(ノンブロッキング)
        $fp = fopen($this->fifo_path, 'w');
        if ($fp) {
            fwrite($fp, $log_line);
            fclose($fp);
        }
    }
    
    public function stopLogWriter() {
        if ($this->writer_pid) {
            posix_kill($this->writer_pid, SIGTERM);
            pcntl_wait($status);
            $this->writer_pid = null;
        }
        
        if (file_exists($this->fifo_path)) {
            unlink($this->fifo_path);
        }
    }
}

// 使用例
$logger = new FIFOLogger();
$logger->startLogWriter('/tmp/application.log');

// メインアプリケーション
for ($i = 1; $i <= 5; $i++) {
    $logger->log('INFO', "処理 {$i} を実行中");
    sleep(1);
}

$logger->log('INFO', 'アプリケーションを終了します');
sleep(1);

$logger->stopLogWriter();
echo "ログライターを停止しました\n";
?>

例3: タスクキューシステム

<?php
class TaskQueue {
    private $fifo_path;
    private $worker_pids = [];
    private $running = true;
    
    public function __construct($fifo_path = '/tmp/taskqueue.fifo') {
        $this->fifo_path = $fifo_path;
        
        // FIFOを作成
        if (!file_exists($fifo_path)) {
            if (!posix_mkfifo($fifo_path, 0666)) {
                throw new Exception("FIFOの作成に失敗しました");
            }
        }
    }
    
    public function startWorkers($num_workers = 3) {
        for ($i = 0; $i < $num_workers; $i++) {
            $pid = pcntl_fork();
            
            if ($pid == 0) {
                // ワーカープロセス
                $this->workerProcess($i);
                exit(0);
            } else {
                $this->worker_pids[] = $pid;
                echo "ワーカー {$i} を起動しました (PID: {$pid})\n";
            }
        }
    }
    
    private function workerProcess($worker_id) {
        echo "ワーカー {$worker_id} が待機中...\n";
        
        while (true) {
            // FIFOからタスクを読み取る
            $fp = fopen($this->fifo_path, 'r');
            if (!$fp) {
                sleep(1);
                continue;
            }
            
            $task = fgets($fp);
            fclose($fp);
            
            if ($task) {
                $task = trim($task);
                echo "ワーカー {$worker_id}: タスクを受信 - {$task}\n";
                
                // タスクを実行
                $this->executeTask($task, $worker_id);
            }
        }
    }
    
    private function executeTask($task, $worker_id) {
        // タスクの処理をシミュレート
        $data = json_decode($task, true);
        
        if ($data) {
            echo "ワーカー {$worker_id}: {$data['type']} を処理中...\n";
            sleep(rand(1, 3)); // 処理時間をシミュレート
            echo "ワーカー {$worker_id}: {$data['type']} 完了\n";
        }
    }
    
    public function addTask($type, $data = []) {
        $task = json_encode([
            'type' => $type,
            'data' => $data,
            'timestamp' => time()
        ]);
        
        // FIFOにタスクを書き込む
        $fp = fopen($this->fifo_path, 'w');
        if ($fp) {
            fwrite($fp, $task . "\n");
            fclose($fp);
            echo "タスクを追加: {$type}\n";
            return true;
        }
        return false;
    }
    
    public function stopWorkers() {
        foreach ($this->worker_pids as $pid) {
            posix_kill($pid, SIGTERM);
        }
        
        foreach ($this->worker_pids as $pid) {
            pcntl_wait($status);
        }
        
        if (file_exists($this->fifo_path)) {
            unlink($this->fifo_path);
        }
        
        echo "すべてのワーカーを停止しました\n";
    }
}

// 使用例
$queue = new TaskQueue();
$queue->startWorkers(3);

sleep(2); // ワーカーの準備を待つ

// タスクを追加
$queue->addTask('email', ['to' => 'user@example.com']);
$queue->addTask('resize_image', ['file' => 'photo.jpg']);
$queue->addTask('generate_report', ['month' => 'November']);
$queue->addTask('backup_database', []);
$queue->addTask('send_notification', ['user_id' => 123]);

// タスクの処理を待つ
sleep(10);

$queue->stopWorkers();
?>

例4: プロセス間メッセージング

<?php
class IPCMessenger {
    private $fifo_in;
    private $fifo_out;
    
    public function __construct($process_name) {
        $this->fifo_in = "/tmp/ipc_{$process_name}_in.fifo";
        $this->fifo_out = "/tmp/ipc_{$process_name}_out.fifo";
        
        // 入力FIFOを作成
        if (!file_exists($this->fifo_in)) {
            posix_mkfifo($this->fifo_in, 0666);
        }
        
        // 出力FIFOを作成
        if (!file_exists($this->fifo_out)) {
            posix_mkfifo($this->fifo_out, 0666);
        }
    }
    
    public function send($message, $target_process) {
        $target_fifo = "/tmp/ipc_{$target_process}_in.fifo";
        
        if (!file_exists($target_fifo)) {
            throw new Exception("ターゲットプロセスのFIFOが見つかりません");
        }
        
        $fp = fopen($target_fifo, 'w');
        if ($fp) {
            fwrite($fp, json_encode([
                'from' => basename($this->fifo_in, '_in.fifo'),
                'message' => $message,
                'timestamp' => time()
            ]) . "\n");
            fclose($fp);
            return true;
        }
        return false;
    }
    
    public function receive($timeout = 5) {
        $fp = fopen($this->fifo_in, 'r');
        if (!$fp) {
            return null;
        }
        
        // タイムアウト設定
        stream_set_blocking($fp, false);
        stream_set_timeout($fp, $timeout);
        
        $message = fgets($fp);
        fclose($fp);
        
        if ($message) {
            return json_decode(trim($message), true);
        }
        return null;
    }
    
    public function startListener($callback) {
        echo "メッセージリスナー起動\n";
        
        while (true) {
            $message = $this->receive(1);
            
            if ($message) {
                echo "メッセージ受信: " . $message['message'] . 
                     " (from: " . $message['from'] . ")\n";
                
                if ($callback) {
                    $callback($message);
                }
            }
            
            usleep(100000); // 0.1秒待機
        }
    }
    
    public function cleanup() {
        if (file_exists($this->fifo_in)) {
            unlink($this->fifo_in);
        }
        if (file_exists($this->fifo_out)) {
            unlink($this->fifo_out);
        }
    }
}

// 使用例: 2つのプロセス間でメッセージング
$pid = pcntl_fork();

if ($pid == 0) {
    // 子プロセス: プロセスB
    $messenger = new IPCMessenger('processB');
    sleep(1);
    
    // プロセスAにメッセージを送信
    $messenger->send('Hello from Process B!', 'processA');
    sleep(1);
    $messenger->send('How are you?', 'processA');
    
    sleep(3);
    $messenger->cleanup();
    exit(0);
} else {
    // 親プロセス: プロセスA
    $messenger = new IPCMessenger('processA');
    
    // メッセージを受信
    for ($i = 0; $i < 2; $i++) {
        $message = $messenger->receive(5);
        if ($message) {
            echo "受信: {$message['message']}\n";
        }
    }
    
    // 返信を送信
    $messenger->send('Hello Process B! I am fine!', 'processB');
    
    pcntl_wait($status);
    $messenger->cleanup();
}
?>

例5: モニタリングシステム

<?php
class SystemMonitor {
    private $fifo_path = '/tmp/sysmon.fifo';
    private $monitor_pid = null;
    
    public function startMonitor() {
        if (!file_exists($this->fifo_path)) {
            posix_mkfifo($this->fifo_path, 0666);
        }
        
        $pid = pcntl_fork();
        
        if ($pid == 0) {
            $this->monitorProcess();
            exit(0);
        } else {
            $this->monitor_pid = $pid;
            return $pid;
        }
    }
    
    private function monitorProcess() {
        echo "モニタープロセス起動 (PID: " . posix_getpid() . ")\n";
        
        $fp = fopen($this->fifo_path, 'r');
        stream_set_blocking($fp, false);
        
        $metrics = [];
        
        while (true) {
            $line = fgets($fp);
            
            if ($line) {
                $data = json_decode(trim($line), true);
                
                if ($data) {
                    $this->processMetric($data, $metrics);
                }
            }
            
            usleep(100000); // 0.1秒
        }
        
        fclose($fp);
    }
    
    private function processMetric($data, &$metrics) {
        $type = $data['type'];
        $value = $data['value'];
        
        if (!isset($metrics[$type])) {
            $metrics[$type] = [];
        }
        
        $metrics[$type][] = $value;
        
        // 統計情報を計算
        $avg = array_sum($metrics[$type]) / count($metrics[$type]);
        $max = max($metrics[$type]);
        $min = min($metrics[$type]);
        
        echo sprintf(
            "[%s] %s: %.2f (avg: %.2f, max: %.2f, min: %.2f)\n",
            date('H:i:s'),
            $type,
            $value,
            $avg,
            $max,
            $min
        );
        
        // アラート判定
        if ($type === 'cpu' && $value > 80) {
            echo "⚠️  警告: CPU使用率が高い ({$value}%)\n";
        }
        
        if ($type === 'memory' && $value > 90) {
            echo "🚨 アラート: メモリ使用率が危険水準 ({$value}%)\n";
        }
    }
    
    public function reportMetric($type, $value) {
        $data = json_encode([
            'type' => $type,
            'value' => $value,
            'timestamp' => time()
        ]);
        
        $fp = fopen($this->fifo_path, 'w');
        if ($fp) {
            fwrite($fp, $data . "\n");
            fclose($fp);
        }
    }
    
    public function stopMonitor() {
        if ($this->monitor_pid) {
            posix_kill($this->monitor_pid, SIGTERM);
            pcntl_wait($status);
        }
        
        if (file_exists($this->fifo_path)) {
            unlink($this->fifo_path);
        }
    }
}

// 使用例
$monitor = new SystemMonitor();
$monitor->startMonitor();

sleep(1);

// メトリクスを送信
for ($i = 0; $i < 20; $i++) {
    $cpu = rand(30, 95);
    $memory = rand(40, 95);
    $disk = rand(20, 70);
    
    $monitor->reportMetric('cpu', $cpu);
    $monitor->reportMetric('memory', $memory);
    $monitor->reportMetric('disk', $disk);
    
    sleep(1);
}

$monitor->stopMonitor();
echo "モニタリングを停止しました\n";
?>

注意点とトラブルシューティング

ブロッキング動作

<?php
// FIFOは読み手と書き手が揃うまでブロックする
$fifo = '/tmp/test.fifo';
posix_mkfifo($fifo, 0666);

// これは読み手がいないとブロックする
$fp = fopen($fifo, 'w'); // ここで待機...

// ノンブロッキングモードで開く
$fp = fopen($fifo, 'w+'); // 読み書き両用モードで開く
// または
$fp = fopen($fifo, 'w');
stream_set_blocking($fp, false); // ノンブロッキングに設定
?>

パーミッションの問題

<?php
// umaskを考慮したパーミッション設定
$old_umask = umask(0);
posix_mkfifo('/tmp/myfifo', 0666); // 実際は 0666 & ~umask
umask($old_umask);

// または明示的に chmod
posix_mkfifo('/tmp/myfifo', 0666);
chmod('/tmp/myfifo', 0666);
?>

既存FIFOのチェック

<?php
function createFIFOSafely($path, $perms = 0666) {
    if (file_exists($path)) {
        // ファイルタイプを確認
        $type = filetype($path);
        
        if ($type === 'fifo') {
            echo "FIFOは既に存在します\n";
            return true;
        } else {
            throw new Exception("パスは既に存在しますが、FIFOではありません ({$type})");
        }
    }
    
    if (!posix_mkfifo($path, $perms)) {
        $error = posix_get_last_error();
        $error_msg = posix_strerror($error);
        throw new Exception("FIFO作成失敗: {$error_msg}");
    }
    
    return true;
}

// 使用例
try {
    createFIFOSafely('/tmp/myfifo');
} catch (Exception $e) {
    echo "エラー: " . $e->getMessage() . "\n";
}
?>

クリーンアップの重要性

<?php
class FIFOManager {
    private $fifos = [];
    
    public function create($path, $perms = 0666) {
        if (posix_mkfifo($path, $perms)) {
            $this->fifos[] = $path;
            return true;
        }
        return false;
    }
    
    public function cleanup() {
        foreach ($this->fifos as $fifo) {
            if (file_exists($fifo)) {
                unlink($fifo);
                echo "FIFOを削除: {$fifo}\n";
            }
        }
        $this->fifos = [];
    }
    
    public function __destruct() {
        $this->cleanup();
    }
}

// 使用例
$manager = new FIFOManager();
$manager->create('/tmp/fifo1');
$manager->create('/tmp/fifo2');

// スクリプト終了時に自動的にクリーンアップされる
?>

Windows環境での制限

<?php
if (!function_exists('posix_mkfifo')) {
    die("posix_mkfifoはこの環境では使用できません\n");
}

// または代替手段を提供
function createIPC($path) {
    if (function_exists('posix_mkfifo')) {
        return posix_mkfifo($path, 0666);
    }
    
    // Windows環境では別の方法を使用
    // 例: ファイルベースのキュー、ソケット通信など
    return false;
}
?>

関連する便利な関数と技術

ファイルタイプの確認

<?php
$path = '/tmp/myfifo';

// ファイルタイプを確認
$type = filetype($path);
echo "ファイルタイプ: {$type}\n"; // "fifo"

// FIFOかどうかチェック
if (is_file($path) && filetype($path) === 'fifo') {
    echo "これはFIFOです\n";
}
?>

他のIPC手法との比較

<?php
// 1. FIFO (名前付きパイプ)
posix_mkfifo('/tmp/myfifo', 0666);

// 2. Unix Domain Socket
// より高度な双方向通信が可能

// 3. System V IPC (メッセージキュー)
// msg_get_queue(), msg_send(), msg_receive()

// 4. 共有メモリ
// shmop_open(), shmop_write(), shmop_read()

// 5. セマフォ
// sem_get(), sem_acquire(), sem_release()
?>

まとめ

posix_mkfifo関数は以下の特徴があります。

✅ 名前付きパイプ(FIFO)を作成 ✅ プロセス間通信(IPC)に最適 ✅ ファイルシステムベースの通信 ✅ シンプルなFIFO順序のデータ転送 ✅ ログシステムやタスクキューの実装に便利

FIFOは、複数のプロセスが協調して動作するシステムを構築する際の基本的なツールです。ログ収集、タスクキュー、メッセージング、モニタリングなど、様々な場面で活用できます。

ベストプラクティス

  1. 常にクリーンアップ: 使用後はFIFOを削除
  2. ノンブロッキング: 適切にノンブロッキングモードを使用
  3. エラーハンドリング: FIFO操作の失敗に対応
  4. パーミッション管理: 適切なアクセス権を設定
  5. デッドロック回避: 読み書きの順序に注意

実用的なヒント

  • 大量データには不向き: FIFOはバッファサイズに制限がある
  • 永続化不可: データは読み取られると消える
  • 一方向通信: 双方向通信には2つのFIFOが必要
  • Unix/Linux専用: Windowsでは使用不可

効率的なプロセス間通信を! この記事が役に立ったら、ぜひシェアしてください。PHPのシステムプログラミングについて、他にも知りたいことがあればコメントで教えてくださいね。

タイトルとURLをコピーしました