[PHP]マルチプロセス処理を実現!pcntl_fork関数の完全ガイド

PHP

はじめに

PHPで大量のデータを処理する際、1つのプロセスでは時間がかかりすぎることがあります。そんな時、複数のプロセスを同時に実行できれば、処理時間を大幅に短縮できます。

pcntl_fork 関数は、現在のプロセスを複製(フォーク)して子プロセスを作成する、UNIXライクなシステムプログラミングの基本機能です。この記事では、PHPでマルチプロセス処理を実装するための重要な関数について、基礎から実践まで詳しく解説します。

pcntl_fork関数とは

pcntl_fork は、現在実行中のプロセスを複製して、新しい子プロセスを作成する関数です。フォーク後は、親プロセスと子プロセスが独立して並行実行されます。

基本構文

int pcntl_fork(void)

パラメータ

この関数はパラメータを取りません。

戻り値

  • 親プロセス: 子プロセスのPID(正の整数)を返す
  • 子プロセス: 0を返す
  • エラー時: -1を返す

重要な前提条件

<?php
// 環境チェック
if (php_sapi_name() !== 'cli') {
    die("この機能はCLI環境でのみ利用可能です。\n");
}

if (!function_exists('pcntl_fork')) {
    die("pcntl拡張が利用できません。\n");
}

if (stripos(PHP_OS, 'WIN') === 0) {
    die("pcntl_forkはWindows環境では動作しません。\n");
}

echo "✓ 環境チェック完了\n";
echo "OS: " . PHP_OS . "\n";
echo "PHP Version: " . PHP_VERSION . "\n\n";
?>

フォークの仕組み

<?php
/**
 * pcntl_forkの基本的な動作を理解する
 */

echo "フォーク前: 1つのプロセス(PID: " . getmypid() . ")\n\n";

$pid = pcntl_fork();

if ($pid === -1) {
    // フォーク失敗
    die("フォークに失敗しました\n");
    
} elseif ($pid) {
    // 親プロセス
    echo "[親] PID: " . getmypid() . " | 子プロセスのPID: {$pid}\n";
    echo "[親] 親プロセスの処理を実行中...\n";
    sleep(2);
    echo "[親] 処理完了\n";
    
    // 子プロセスの終了を待つ
    pcntl_wait($status);
    echo "[親] 子プロセスが終了しました\n";
    
} else {
    // 子プロセス($pid === 0)
    echo "[子] PID: " . getmypid() . " | 親プロセスのPID: " . posix_getppid() . "\n";
    echo "[子] 子プロセスの処理を実行中...\n";
    sleep(1);
    echo "[子] 処理完了\n";
    exit(0); // 子プロセスを明示的に終了
}

echo "[親] プログラム終了\n";
?>

基本的な使用例

シンプルな子プロセスの作成

<?php
function createChildProcess($child_id) {
    $pid = pcntl_fork();
    
    if ($pid === -1) {
        die("子プロセス{$child_id}の作成に失敗しました\n");
    } elseif ($pid) {
        // 親プロセス
        return $pid;
    } else {
        // 子プロセス
        echo "[子{$child_id}] 開始 (PID: " . getmypid() . ")\n";
        
        // 何か処理を実行
        $sleep_time = rand(1, 3);
        sleep($sleep_time);
        
        echo "[子{$child_id}] 完了 ({$sleep_time}秒)\n";
        exit(0);
    }
}

// 3つの子プロセスを作成
echo "=== 複数の子プロセス作成 ===\n\n";

$children = [];
for ($i = 1; $i <= 3; $i++) {
    $pid = createChildProcess($i);
    $children[] = $pid;
    echo "[親] 子プロセス{$i}を作成しました (PID: {$pid})\n";
}

echo "\n[親] すべての子プロセスの終了を待機中...\n\n";

// すべての子プロセスが終了するまで待つ
foreach ($children as $pid) {
    pcntl_waitpid($pid, $status);
    echo "[親] 子プロセス {$pid} が終了しました\n";
}

echo "\n[親] すべての処理が完了しました\n";
?>

変数のスコープとコピー

<?php
/**
 * フォーク時の変数のコピー動作
 */

$shared_before_fork = "フォーク前の値";
$counter = 0;

echo "フォーク前:\n";
echo "  shared_before_fork: {$shared_before_fork}\n";
echo "  counter: {$counter}\n\n";

$pid = pcntl_fork();

if ($pid === -1) {
    die("フォーク失敗\n");
} elseif ($pid) {
    // 親プロセス
    $shared_before_fork = "親プロセスで変更";
    $counter = 100;
    
    sleep(1); // 子プロセスの処理を待つ
    
    echo "[親] 変数の状態:\n";
    echo "  shared_before_fork: {$shared_before_fork}\n";
    echo "  counter: {$counter}\n\n";
    
    pcntl_wait($status);
    
} else {
    // 子プロセス
    $shared_before_fork = "子プロセスで変更";
    $counter = 200;
    
    echo "[子] 変数の状態:\n";
    echo "  shared_before_fork: {$shared_before_fork}\n";
    echo "  counter: {$counter}\n\n";
    
    exit(0);
}

echo "重要: 親と子のメモリ空間は独立しています\n";
?>

実践的な活用例

1. 並列データ処理

<?php
class ParallelProcessor {
    private $max_workers;
    private $workers = [];
    
    public function __construct($max_workers = 4) {
        $this->max_workers = $max_workers;
    }
    
    /**
     * データを並列処理
     */
    public function process(array $data, callable $callback) {
        $chunks = array_chunk($data, ceil(count($data) / $this->max_workers));
        
        echo "データを{$this->max_workers}個のワーカーに分割しました\n";
        echo "チャンク数: " . count($chunks) . "\n";
        echo "各チャンクのサイズ: " . implode(', ', array_map('count', $chunks)) . "\n\n";
        
        foreach ($chunks as $index => $chunk) {
            $pid = pcntl_fork();
            
            if ($pid === -1) {
                die("ワーカー{$index}の起動に失敗しました\n");
            } elseif ($pid) {
                // 親プロセス
                $this->workers[$pid] = [
                    'id' => $index,
                    'pid' => $pid,
                    'chunk_size' => count($chunk),
                    'start_time' => time()
                ];
                echo "[親] ワーカー{$index}を起動 (PID: {$pid}, データ数: " . count($chunk) . ")\n";
            } else {
                // 子プロセス
                $this->workerProcess($index, $chunk, $callback);
                exit(0);
            }
        }
        
        echo "\n[親] すべてのワーカーの完了を待機中...\n\n";
        
        // すべてのワーカーの終了を待つ
        $this->waitForWorkers();
    }
    
    /**
     * ワーカープロセスの処理
     */
    private function workerProcess($worker_id, array $data, callable $callback) {
        echo "[ワーカー{$worker_id}] 処理開始 (PID: " . getmypid() . ", データ数: " . count($data) . ")\n";
        
        $start_time = microtime(true);
        $processed = 0;
        
        foreach ($data as $item) {
            call_user_func($callback, $item);
            $processed++;
        }
        
        $elapsed = microtime(true) - $start_time;
        
        echo "[ワーカー{$worker_id}] 完了: {$processed}件処理 (所要時間: " . 
             number_format($elapsed, 2) . "秒)\n";
    }
    
    /**
     * すべてのワーカーの終了を待つ
     */
    private function waitForWorkers() {
        $completed = 0;
        $total_time = 0;
        
        while (count($this->workers) > 0) {
            $pid = pcntl_wait($status);
            
            if (isset($this->workers[$pid])) {
                $worker = $this->workers[$pid];
                $duration = time() - $worker['start_time'];
                $total_time += $duration;
                
                echo "[親] ワーカー{$worker['id']} (PID: {$pid}) が終了 (実行時間: {$duration}秒)\n";
                
                unset($this->workers[$pid]);
                $completed++;
            }
        }
        
        echo "\n[親] すべてのワーカーが完了しました\n";
        echo "完了したワーカー数: {$completed}\n";
        echo "合計実行時間: {$total_time}秒\n";
    }
}

// 使用例
$processor = new ParallelProcessor(4);

// 処理対象のデータ
$data = range(1, 20);

echo "=== 並列データ処理の実行 ===\n\n";

$start_time = microtime(true);

$processor->process($data, function($item) {
    // 各アイテムの処理(重い処理をシミュレート)
    usleep(500000); // 0.5秒
    // 実際の処理: データベース更新、API呼び出しなど
});

$total_elapsed = microtime(true) - $start_time;

echo "\n総処理時間: " . number_format($total_elapsed, 2) . "秒\n";
echo "(シリアル処理の場合: " . (count($data) * 0.5) . "秒かかるところ)\n";
?>

2. ワーカープールの実装

<?php
class WorkerPool {
    private $max_workers;
    private $workers = [];
    private $tasks = [];
    private $results = [];
    
    public function __construct($max_workers = 5) {
        $this->max_workers = $max_workers;
        
        // シグナルハンドラ設定
        pcntl_async_signals(true);
        pcntl_signal(SIGCHLD, [$this, 'handleChildExit']);
    }
    
    /**
     * タスクをプールに追加
     */
    public function addTask($task_id, callable $callback, array $args = []) {
        $this->tasks[] = [
            'id' => $task_id,
            'callback' => $callback,
            'args' => $args,
            'status' => 'pending'
        ];
    }
    
    /**
     * プールを実行
     */
    public function run() {
        echo "ワーカープールを開始します\n";
        echo "最大ワーカー数: {$this->max_workers}\n";
        echo "タスク数: " . count($this->tasks) . "\n\n";
        
        $task_index = 0;
        
        while ($task_index < count($this->tasks) || count($this->workers) > 0) {
            // 空きワーカーがあればタスクを割り当て
            while (count($this->workers) < $this->max_workers && $task_index < count($this->tasks)) {
                $task = $this->tasks[$task_index];
                $this->spawnWorker($task);
                $task_index++;
            }
            
            // 少し待機
            usleep(100000); // 0.1秒
        }
        
        echo "\nすべてのタスクが完了しました\n";
        $this->displayResults();
    }
    
    /**
     * ワーカーを起動
     */
    private function spawnWorker($task) {
        $pid = pcntl_fork();
        
        if ($pid === -1) {
            die("ワーカーの起動に失敗しました\n");
        } elseif ($pid) {
            // 親プロセス
            $this->workers[$pid] = [
                'task_id' => $task['id'],
                'pid' => $pid,
                'start_time' => microtime(true)
            ];
            echo "[ワーカー起動] タスク: {$task['id']}, PID: {$pid}\n";
        } else {
            // 子プロセス
            $result = $this->executeTask($task);
            
            // 結果を共有メモリやファイルに保存
            $this->saveResult($task['id'], $result);
            
            exit(0);
        }
    }
    
    /**
     * タスクを実行
     */
    private function executeTask($task) {
        $start = microtime(true);
        
        try {
            $result = call_user_func_array($task['callback'], $task['args']);
            $elapsed = microtime(true) - $start;
            
            return [
                'success' => true,
                'result' => $result,
                'elapsed' => $elapsed
            ];
        } catch (Exception $e) {
            return [
                'success' => false,
                'error' => $e->getMessage(),
                'elapsed' => microtime(true) - $start
            ];
        }
    }
    
    /**
     * 結果を保存
     */
    private function saveResult($task_id, $result) {
        $result_file = "/tmp/worker_result_{$task_id}_" . getmypid() . ".json";
        file_put_contents($result_file, json_encode($result));
    }
    
    /**
     * 子プロセス終了ハンドラ
     */
    public function handleChildExit($signo) {
        while (($pid = pcntl_waitpid(-1, $status, WNOHANG)) > 0) {
            if (isset($this->workers[$pid])) {
                $worker = $this->workers[$pid];
                $duration = microtime(true) - $worker['start_time'];
                
                echo "[ワーカー終了] タスク: {$worker['task_id']}, PID: {$pid}, " .
                     "実行時間: " . number_format($duration, 2) . "秒\n";
                
                // 結果を読み込み
                $result_file = "/tmp/worker_result_{$worker['task_id']}_{$pid}.json";
                if (file_exists($result_file)) {
                    $result = json_decode(file_get_contents($result_file), true);
                    $this->results[$worker['task_id']] = $result;
                    unlink($result_file);
                }
                
                unset($this->workers[$pid]);
            }
        }
    }
    
    /**
     * 結果を表示
     */
    private function displayResults() {
        echo "\n=== 処理結果 ===\n";
        
        $total_time = 0;
        $success_count = 0;
        
        foreach ($this->results as $task_id => $result) {
            $status = $result['success'] ? '✓' : '✗';
            $total_time += $result['elapsed'];
            
            if ($result['success']) {
                $success_count++;
                echo "[{$status}] タスク {$task_id}: " . 
                     number_format($result['elapsed'], 2) . "秒\n";
            } else {
                echo "[{$status}] タスク {$task_id}: エラー - {$result['error']}\n";
            }
        }
        
        echo "\n成功: {$success_count} / " . count($this->results) . "\n";
        echo "合計処理時間: " . number_format($total_time, 2) . "秒\n";
    }
}

// 使用例
$pool = new WorkerPool(3);

// タスクを追加
for ($i = 1; $i <= 10; $i++) {
    $pool->addTask("task_{$i}", function($task_num) {
        // 重い処理をシミュレート
        $sleep_time = rand(1, 3);
        sleep($sleep_time);
        return "タスク{$task_num}の結果({$sleep_time}秒)";
    }, [$i]);
}

// 実行
$pool->run();
?>

3. マスター・ワーカーパターン

<?php
class MasterWorkerPattern {
    private $num_workers;
    private $workers = [];
    private $jobs = [];
    private $results = [];
    
    public function __construct($num_workers = 4) {
        $this->num_workers = $num_workers;
        
        // シグナルハンドラ
        pcntl_async_signals(true);
        pcntl_signal(SIGTERM, [$this, 'shutdown']);
        pcntl_signal(SIGINT, [$this, 'shutdown']);
    }
    
    /**
     * ジョブを追加
     */
    public function addJob($job_data) {
        $this->jobs[] = $job_data;
    }
    
    /**
     * マスタープロセスを開始
     */
    public function start() {
        echo "=== マスター・ワーカーパターン ===\n";
        echo "マスターPID: " . getmypid() . "\n";
        echo "ワーカー数: {$this->num_workers}\n";
        echo "ジョブ数: " . count($this->jobs) . "\n\n";
        
        // ワーカーを起動
        for ($i = 0; $i < $this->num_workers; $i++) {
            $this->spawnWorker($i);
        }
        
        // ジョブをワーカーに配布
        $this->distributeJobs();
        
        // すべてのワーカーの終了を待つ
        $this->waitForWorkers();
        
        echo "\nマスタープロセス終了\n";
    }
    
    /**
     * ワーカーを起動
     */
    private function spawnWorker($worker_id) {
        // パイプを作成(親→子への通信用)
        $pipe = [];
        if (!socket_create_pair(AF_UNIX, SOCK_STREAM, 0, $pipe)) {
            die("パイプの作成に失敗しました\n");
        }
        
        $pid = pcntl_fork();
        
        if ($pid === -1) {
            die("ワーカー{$worker_id}の起動に失敗しました\n");
        } elseif ($pid) {
            // 親プロセス(マスター)
            socket_close($pipe[1]); // 子プロセス側を閉じる
            
            $this->workers[$pid] = [
                'id' => $worker_id,
                'pid' => $pid,
                'socket' => $pipe[0],
                'jobs_processed' => 0
            ];
            
            echo "[マスター] ワーカー{$worker_id}を起動 (PID: {$pid})\n";
        } else {
            // 子プロセス(ワーカー)
            socket_close($pipe[0]); // 親プロセス側を閉じる
            $this->workerProcess($worker_id, $pipe[1]);
            exit(0);
        }
    }
    
    /**
     * ジョブを配布
     */
    private function distributeJobs() {
        echo "\n[マスター] ジョブを配布開始\n";
        
        foreach ($this->jobs as $index => $job) {
            // ラウンドロビン方式でワーカーを選択
            $worker_pids = array_keys($this->workers);
            $selected_pid = $worker_pids[$index % count($worker_pids)];
            
            $worker = $this->workers[$selected_pid];
            $job_data = json_encode($job) . "\n";
            
            socket_write($worker['socket'], $job_data, strlen($job_data));
            $this->workers[$selected_pid]['jobs_processed']++;
            
            echo "[マスター] ジョブ{$index}をワーカー{$worker['id']}に送信\n";
        }
        
        // 終了シグナルを送信
        foreach ($this->workers as $pid => $worker) {
            socket_write($worker['socket'], "EXIT\n", 5);
            socket_close($worker['socket']);
        }
        
        echo "[マスター] すべてのジョブを配布しました\n\n";
    }
    
    /**
     * ワーカープロセス
     */
    private function workerProcess($worker_id, $socket) {
        echo "[ワーカー{$worker_id}] 起動 (PID: " . getmypid() . ")\n";
        
        while (true) {
            $data = socket_read($socket, 1024);
            
            if ($data === false || trim($data) === 'EXIT') {
                break;
            }
            
            $job = json_decode(trim($data), true);
            
            if ($job) {
                echo "[ワーカー{$worker_id}] ジョブ処理中: {$job['name']}\n";
                
                // ジョブを処理
                sleep(rand(1, 2)); // 処理をシミュレート
                
                echo "[ワーカー{$worker_id}] ジョブ完了: {$job['name']}\n";
            }
        }
        
        socket_close($socket);
        echo "[ワーカー{$worker_id}] 終了\n";
    }
    
    /**
     * すべてのワーカーの終了を待つ
     */
    private function waitForWorkers() {
        echo "[マスター] ワーカーの終了を待機中...\n";
        
        foreach ($this->workers as $pid => $worker) {
            pcntl_waitpid($pid, $status);
            echo "[マスター] ワーカー{$worker['id']} (PID: {$pid}) 終了 " .
                 "(処理ジョブ数: {$worker['jobs_processed']})\n";
        }
    }
    
    /**
     * シャットダウンハンドラ
     */
    public function shutdown($signo) {
        echo "\n[マスター] シャットダウンシグナルを受信\n";
        
        // すべてのワーカーに終了シグナルを送信
        foreach ($this->workers as $pid => $worker) {
            posix_kill($pid, SIGTERM);
        }
        
        exit(0);
    }
}

// 使用例
$master = new MasterWorkerPattern(3);

// ジョブを追加
for ($i = 1; $i <= 9; $i++) {
    $master->addJob([
        'name' => "Job_{$i}",
        'data' => "データ{$i}"
    ]);
}

// 実行
$master->start();
?>

4. フォークボム対策とリソース管理

<?php
class SafeForkManager {
    private $max_children = 10;
    private $children = [];
    private $fork_count = 0;
    private $max_fork_rate = 5; // 秒あたりの最大フォーク数
    private $fork_times = [];
    
    public function __construct($max_children = 10, $max_fork_rate = 5) {
        $this->max_children = $max_children;
        $this->max_fork_rate = $max_fork_rate;
        
        echo "SafeForkManager初期化\n";
        echo "  最大子プロセス数: {$this->max_children}\n";
        echo "  最大フォーク率: {$this->max_fork_rate}/秒\n\n";
    }
    
    /**
     * 安全に子プロセスを作成
     */
    public function safeFork() {
        // 1. 子プロセス数の制限チェック
        if (count($this->children) >= $this->max_children) {
            echo "⚠️  警告: 最大子プロセス数に達しました\n";
            return false;
        }
        
        // 2. フォーク率の制限チェック
        if (!$this->checkForkRate()) {
            echo "⚠️  警告: フォーク率が制限を超えています\n";
            sleep(1);
            return $this->safeFork(); // リトライ
        }
        
        // 3. システムリソースチェック
        if (!$this->checkSystemResources()) {
            echo "⚠️  警告: システムリソースが不足しています\n";
            return false;
        }
        
        // 4. フォーク実行
        $pid = pcntl_fork();
        
        if ($pid === -1) {
            echo "✗ エラー: フォークに失敗しました\n";
            return false;
        } elseif ($pid) {
            // 親プロセス
            $this->children[$pid] = [
                'pid' => $pid,
                'start_time' => time(),
                'fork_time' => microtime(true)
            ];
            $this->fork_count++;
            $this->fork_times[] = time();
            
            echo "✓ 子プロセス作成 (PID: {$pid}, 総数: " . count($this->children) . ")\n";
            return $pid;
        } else {
            // 子プロセス
            return 0;
        }
    }
    
    /**
     * フォーク率をチェック
     */
    private function checkForkRate() {
        $now = time();
        
        // 古いエントリを削除(1秒以上前)
        $this->fork_times = array_filter($this->fork_times, function($time) use ($now) {
            return ($now - $time) < 1;
        });
        
        return count($this->fork_times) < $this->max_fork_rate;
    }
    
    /**
     * システムリソースをチェック
     */
    private function checkSystemResources() {
        // メモリ使用量チェック
        $memory_usage = memory_get_usage(true);
        $memory_limit = $this->parseMemoryLimit(ini_get('memory_limit'));
        
        if ($memory_limit > 0) {
            $memory_percent = ($memory_usage / $memory_limit) * 100;
            
            if ($memory_percent > 80) {
                echo "⚠️  メモリ使用率: " . number_format($memory_percent, 1) . "%\n";
                return false;
            }
        }
        
        // プロセス数チェック(簡易版)
        if (function_exists('sys_getloadavg')) {
            $load = sys_getloadavg();
            $cpu_count = $this->getCpuCount();
            
            if ($load[0] > $cpu_count * 2) {
                echo "⚠️  システム負荷が高すぎます: " . number_format($load[0], 2) . "\n";
                return false;
            }
        }
        
        return true;
    }
    
    /**
     * メモリ制限をパース
     */
    private function parseMemoryLimit($limit) {
        if ($limit === '-1') {
            return -1;
        }
        
        $unit = strtoupper(substr($limit, -1));
        $value = (int)$limit;
        
        switch ($unit) {
            case 'G': return $value * 1024 * 1024 * 1024;
            case 'M': return $value * 1024 * 1024;
            case 'K': return $value * 1024;
            default: return $value;
        }
    }
    
    /**
     * CPU数を取得
     */
    private function getCpuCount() {
        if (stripos(PHP_OS, 'WIN') === 0) {
            return (int)getenv('NUMBER_OF_PROCESSORS') ?: 1;
        } else {
            $cpuinfo = @file_get_contents('/proc/cpuinfo');
            if ($cpuinfo) {
                return substr_count($cpuinfo, 'processor');
            }
        }
        return 1;
    }
    
    /**
     * 子プロセスを回収
     */
    public function reapChildren($blocking = false) {
        $flag = $blocking ? 0 : WNOHANG;
        
        while (($pid = pcntl_waitpid(-1, $status, $flag)) > 0) {
            if (isset($this->children[$pid])) {
                $uptime = time() - $this->children[$pid]['start_time'];
                echo "✓ 子プロセス終了 (PID: {$pid}, 稼働時間: {$uptime}秒)\n";
                unset($this->children[$pid]);
            }
        }
    }
    
    /**
     * すべての子プロセスを待つ
     */
    public function waitAll() {
        echo "\nすべての子プロセスの終了を待機中...\n";
        
        while (count($this->children) > 0) {
            $this->reapChildren(true);
        }
        
        echo "すべての子プロセスが終了しました\n";
        echo "総フォーク数: {$this->fork_count}\n";
    }
    
    /**
     * 統計情報を表示
     */
    public function displayStats() {
        echo "\n=== フォーク統計 ===\n";
        echo "現在の子プロセス数: " . count($this->children) . " / {$this->max_children}\n";
        echo "総フォーク数: {$this->fork_count}\n";
        echo "メモリ使用量: " . number_format(memory_get_usage(true) / 1024 / 1024, 2) . " MB\n";
        
        if (function_exists('sys_getloadavg')) {
            $load = sys_getloadavg();
            echo "システム負荷: " . implode(', ', array_map(function($v) {
                return number_format($v, 2);
            }, $load)) . "\n";
        }
        
        echo "==================\n\n";
    }
}

// 使用例
$manager = new SafeForkManager(5, 3);

echo "=== 安全なフォーク管理のデモ ===\n\n";

for ($i = 1; $i <= 8; $i++) {
    $pid = $manager->safeFork();
    
    if ($pid === 0) {
        // 子プロセス
        echo "[子{$i}] 処理開始 (PID: " . getmypid() . ")\n";
        sleep(rand(2, 4));
        echo "[子{$i}] 処理完了\n";
        exit(0);
    } elseif ($pid === false) {
        echo "子プロセス{$i}の作成をスキップしました\n";
    }
    
    // 定期的に終了した子プロセスを回収
    $manager->reapChildren();
    
    // 統計表示
    if ($i % 3 === 0) {
        $manager->displayStats();
    }
    
    usleep(500000); // 0.5秒待機
}

$manager->waitAll();
$manager->displayStats();
?>

重要な注意点とベストプラクティス

ゾンビプロセスの防止

<?php
/**
 * ゾンビプロセスを防ぐ方法
 */

echo "=== ゾンビプロセス対策 ===\n\n";

// 方法1: pcntl_waitを使う
function preventZombiesWithWait() {
    echo "方法1: pcntl_wait使用\n";
    
    $pid = pcntl_fork();
    
    if ($pid === 0) {
        echo "[子] 処理実行中...\n";
        sleep(1);
        exit(0);
    } else {
        echo "[親] 子プロセス (PID: {$pid}) を待機\n";
        pcntl_wait($status); // 子プロセスの終了を待つ
        echo "[親] 子プロセス終了(ゾンビにならない)\n";
    }
}

// 方法2: SIGCHLDシグナルハンドラを使う
function preventZombiesWithSignal() {
    echo "\n方法2: SIGCHLDシグナルハンドラ使用\n";
    
    pcntl_async_signals(true);
    
    pcntl_signal(SIGCHLD, function($signo) {
        while (($pid = pcntl_waitpid(-1, $status, WNOHANG)) > 0) {
            echo "[親] 子プロセス {$pid} を回収(ゾンビ防止)\n";
        }
    });
    
    for ($i = 1; $i <= 3; $i++) {
        $pid = pcntl_fork();
        
        if ($pid === 0) {
            echo "[子{$i}] 処理実行\n";
            sleep(1);
            exit(0);
        } else {
            echo "[親] 子プロセス{$i} (PID: {$pid}) を起動\n";
        }
    }
    
    echo "[親] メイン処理継続中...\n";
    sleep(3);
    echo "[親] 完了\n";
}

// 方法3: SIG_IGNを設定
function preventZombiesWithIgnore() {
    echo "\n方法3: SIGCHLD無視(自動回収)\n";
    
    // この方法は一部のシステムでのみ動作
    pcntl_signal(SIGCHLD, SIG_IGN);
    
    $pid = pcntl_fork();
    
    if ($pid === 0) {
        echo "[子] 処理実行\n";
        sleep(1);
        exit(0);
    } else {
        echo "[親] 子プロセスは自動的に回収されます\n";
        sleep(2);
    }
}

preventZombiesWithWait();
preventZombiesWithSignal();
// preventZombiesWithIgnore(); // 環境によって動作が異なる
?>

メモリとリソースの管理

<?php
/**
 * フォーク時のメモリ管理
 */

class MemoryEfficientFork {
    /**
     * メモリ効率的なフォーク
     */
    public static function demonstrateMemoryBehavior() {
        echo "=== フォーク時のメモリ動作 ===\n\n";
        
        // 大きなデータを作成
        $large_data = str_repeat('x', 10 * 1024 * 1024); // 10MB
        
        echo "フォーク前のメモリ使用量: " . 
             number_format(memory_get_usage(true) / 1024 / 1024, 2) . " MB\n";
        
        $pid = pcntl_fork();
        
        if ($pid === 0) {
            // 子プロセス
            echo "[子] メモリ使用量: " . 
                 number_format(memory_get_usage(true) / 1024 / 1024, 2) . " MB\n";
            
            // Copy-on-Write: 変更するまでメモリはコピーされない
            echo "[子] データを変更します...\n";
            $large_data[0] = 'y'; // これでメモリがコピーされる
            
            echo "[子] 変更後のメモリ使用量: " . 
                 number_format(memory_get_usage(true) / 1024 / 1024, 2) . " MB\n";
            
            exit(0);
        } else {
            // 親プロセス
            sleep(1);
            pcntl_wait($status);
            
            echo "[親] 子プロセス終了後のメモリ使用量: " . 
                 number_format(memory_get_usage(true) / 1024 / 1024, 2) . " MB\n";
        }
    }
    
    /**
     * リソースのクリーンアップ
     */
    public static function demonstrateResourceCleanup() {
        echo "\n=== リソースのクリーンアップ ===\n\n";
        
        // データベース接続をシミュレート
        $db_connection = ['host' => 'localhost', 'connected' => true];
        
        $pid = pcntl_fork();
        
        if ($pid === 0) {
            // 子プロセス
            echo "[子] データベース接続を再確立します\n";
            
            // 重要: 子プロセスでは親の接続を使わず、新しい接続を作る
            $db_connection = null; // 親の接続を破棄
            $db_connection = ['host' => 'localhost', 'connected' => true, 'pid' => getmypid()];
            
            echo "[子] 処理実行中(独自の接続)\n";
            sleep(1);
            
            // クリーンアップ
            echo "[子] 接続をクローズ\n";
            $db_connection = null;
            
            exit(0);
        } else {
            // 親プロセス
            pcntl_wait($status);
            echo "[親] 親プロセスの接続は影響を受けていません\n";
        }
    }
}

MemoryEfficientFork::demonstrateMemoryBehavior();
MemoryEfficientFork::demonstrateResourceCleanup();
?>

エラーハンドリングとデバッグ

<?php
/**
 * フォークのエラーハンドリング
 */

class ForkErrorHandler {
    /**
     * 包括的なエラーハンドリング
     */
    public static function safeForkWithErrorHandling($callback) {
        // フォーク前のチェック
        if (!function_exists('pcntl_fork')) {
            throw new RuntimeException("pcntl_fork関数が利用できません");
        }
        
        if (php_sapi_name() !== 'cli') {
            throw new RuntimeException("CLI環境が必要です");
        }
        
        try {
            $pid = pcntl_fork();
            
            if ($pid === -1) {
                // フォーク失敗
                $error = pcntl_strerror(pcntl_get_last_error());
                throw new RuntimeException("フォーク失敗: {$error}");
                
            } elseif ($pid) {
                // 親プロセス
                return [
                    'success' => true,
                    'pid' => $pid,
                    'is_parent' => true
                ];
                
            } else {
                // 子プロセス
                try {
                    $result = call_user_func($callback);
                    exit(0); // 正常終了
                } catch (Exception $e) {
                    error_log("[子プロセス] エラー: " . $e->getMessage());
                    exit(1); // エラー終了
                }
            }
        } catch (Exception $e) {
            error_log("フォークエラー: " . $e->getMessage());
            return [
                'success' => false,
                'error' => $e->getMessage()
            ];
        }
    }
    
    /**
     * 子プロセスの終了ステータスを確認
     */
    public static function checkChildStatus($pid) {
        $status = 0;
        $result = pcntl_waitpid($pid, $status);
        
        if ($result === -1) {
            echo "エラー: 子プロセスの待機に失敗\n";
            return null;
        }
        
        $info = [
            'pid' => $pid,
            'exited' => pcntl_wifexited($status),
            'exit_code' => pcntl_wexitstatus($status),
            'signaled' => pcntl_wifsignaled($status),
            'stopped' => pcntl_wifstopped($status)
        ];
        
        if ($info['signaled']) {
            $info['term_signal'] = pcntl_wtermsig($status);
        }
        
        if ($info['stopped']) {
            $info['stop_signal'] = pcntl_wstopsig($status);
        }
        
        return $info;
    }
    
    /**
     * デバッグ情報の表示
     */
    public static function displayProcessInfo() {
        echo "=== プロセス情報 ===\n";
        echo "PID: " . getmypid() . "\n";
        echo "親PID: " . posix_getppid() . "\n";
        echo "UID: " . posix_getuid() . "\n";
        echo "GID: " . posix_getgid() . "\n";
        echo "プロセスグループID: " . posix_getpgid(getmypid()) . "\n";
        echo "==================\n";
    }
}

// 使用例
echo "=== エラーハンドリングのデモ ===\n\n";

// 正常なフォーク
$result = ForkErrorHandler::safeForkWithErrorHandling(function() {
    echo "[子] 正常に処理を実行\n";
    sleep(1);
});

if ($result['success'] && isset($result['is_parent'])) {
    echo "[親] 子プロセス (PID: {$result['pid']}) を作成しました\n";
    
    $status = ForkErrorHandler::checkChildStatus($result['pid']);
    
    if ($status['exited']) {
        echo "[親] 子プロセスは正常終了しました(終了コード: {$status['exit_code']})\n";
    }
}

echo "\n";
ForkErrorHandler::displayProcessInfo();
?>

パフォーマンス最適化

<?php
/**
 * フォークのパフォーマンス最適化
 */

class ForkPerformanceOptimization {
    /**
     * 最適なワーカー数を決定
     */
    public static function determineOptimalWorkers() {
        $cpu_count = self::getCpuCount();
        
        // CPU数に基づいて最適なワーカー数を決定
        // CPU-boundタスク: CPU数と同じ
        // I/O-boundタスク: CPU数の2〜4倍
        
        echo "CPU数: {$cpu_count}\n";
        echo "推奨ワーカー数:\n";
        echo "  CPU-boundタスク: {$cpu_count}\n";
        echo "  I/O-boundタスク: " . ($cpu_count * 2) . " 〜 " . ($cpu_count * 4) . "\n";
        
        return $cpu_count;
    }
    
    /**
     * CPU数を取得
     */
    private static function getCpuCount() {
        if (stripos(PHP_OS, 'WIN') === 0) {
            return (int)getenv('NUMBER_OF_PROCESSORS') ?: 1;
        }
        
        $cpuinfo = @file_get_contents('/proc/cpuinfo');
        if ($cpuinfo) {
            return substr_count($cpuinfo, 'processor');
        }
        
        return 1;
    }
    
    /**
     * ベンチマーク: シリアル vs 並列処理
     */
    public static function benchmark($data, $num_workers = 4) {
        echo "\n=== パフォーマンスベンチマーク ===\n";
        echo "データ数: " . count($data) . "\n";
        echo "ワーカー数: {$num_workers}\n\n";
        
        // シリアル処理
        $start = microtime(true);
        foreach ($data as $item) {
            self::processItem($item);
        }
        $serial_time = microtime(true) - $start;
        
        echo "シリアル処理時間: " . number_format($serial_time, 2) . "秒\n";
        
        // 並列処理
        $start = microtime(true);
        self::parallelProcess($data, $num_workers);
        $parallel_time = microtime(true) - $start;
        
        echo "並列処理時間: " . number_format($parallel_time, 2) . "秒\n";
        
        $speedup = $serial_time / $parallel_time;
        echo "速度向上: " . number_format($speedup, 2) . "倍\n";
        echo "効率: " . number_format(($speedup / $num_workers) * 100, 1) . "%\n";
    }
    
    /**
     * アイテムを処理
     */
    private static function processItem($item) {
        usleep(100000); // 0.1秒の処理をシミュレート
    }
    
    /**
     * 並列処理
     */
    private static function parallelProcess($data, $num_workers) {
        $chunks = array_chunk($data, ceil(count($data) / $num_workers));
        $pids = [];
        
        foreach ($chunks as $chunk) {
            $pid = pcntl_fork();
            
            if ($pid === 0) {
                foreach ($chunk as $item) {
                    self::processItem($item);
                }
                exit(0);
            } else {
                $pids[] = $pid;
            }
        }
        
        foreach ($pids as $pid) {
            pcntl_waitpid($pid, $status);
        }
    }
}

// ベンチマーク実行
$optimal_workers = ForkPerformanceOptimization::determineOptimalWorkers();
$test_data = range(1, 20);
ForkPerformanceOptimization::benchmark($test_data, $optimal_workers);
?>

まとめ

pcntl_fork 関数は、PHPでマルチプロセス処理を実現する強力な機能です。

主な利点:

  • 並列処理: 複数の処理を同時実行してパフォーマンス向上
  • リソース分離: 各プロセスが独立したメモリ空間を持つ
  • 安定性: 1つのプロセスがクラッシュしても他に影響しない

主な用途:

  • 大量データの並列処理
  • ワーカープールの実装
  • バックグラウンドジョブの実行
  • 負荷分散処理

重要な注意点:

  • CLI環境専用(Web環境では使用不可)
  • UNIX/Linux系OS専用(Windowsでは動作しない)
  • ゾンビプロセス対策が必須
  • リソース管理が重要
  • Copy-on-Writeの理解が必要
  • 子プロセスで必ずexit()を呼ぶ

ベストプラクティス:

  • SIGCHLDハンドラでゾンビプロセスを防止
  • 最大プロセス数を制限
  • メモリとCPUリソースを監視
  • エラーハンドリングを適切に実装
  • データベース接続などのリソースを子プロセスで再作成

適切に使用することで、PHPでも高度な並列処理が可能になり、大規模データ処理やバッチ処理のパフォーマンスを劇的に向上させることができます。


マルチプロセス処理は強力ですが、適切な管理が不可欠です。この記事を参考に、安全で効率的な並列処理システムを構築してください。

タイトルとURLをコピーしました