はじめに
PHPで大量のデータを処理する際、1つのプロセスでは時間がかかりすぎることがあります。そんな時、複数のプロセスを同時に実行できれば、処理時間を大幅に短縮できます。
pcntl_fork
関数は、現在のプロセスを複製(フォーク)して子プロセスを作成する、UNIXライクなシステムプログラミングの基本機能です。この記事では、PHPでマルチプロセス処理を実装するための重要な関数について、基礎から実践まで詳しく解説します。
pcntl_fork関数とは
pcntl_fork
は、現在実行中のプロセスを複製して、新しい子プロセスを作成する関数です。フォーク後は、親プロセスと子プロセスが独立して並行実行されます。
基本構文
int pcntl_fork(void)
パラメータ
この関数はパラメータを取りません。
戻り値
- 親プロセス: 子プロセスのPID(正の整数)を返す
- 子プロセス: 0を返す
- エラー時: -1を返す
重要な前提条件
<?php
// 環境チェック
if (php_sapi_name() !== 'cli') {
die("この機能はCLI環境でのみ利用可能です。\n");
}
if (!function_exists('pcntl_fork')) {
die("pcntl拡張が利用できません。\n");
}
if (stripos(PHP_OS, 'WIN') === 0) {
die("pcntl_forkはWindows環境では動作しません。\n");
}
echo "✓ 環境チェック完了\n";
echo "OS: " . PHP_OS . "\n";
echo "PHP Version: " . PHP_VERSION . "\n\n";
?>
フォークの仕組み
<?php
/**
* pcntl_forkの基本的な動作を理解する
*/
echo "フォーク前: 1つのプロセス(PID: " . getmypid() . ")\n\n";
$pid = pcntl_fork();
if ($pid === -1) {
// フォーク失敗
die("フォークに失敗しました\n");
} elseif ($pid) {
// 親プロセス
echo "[親] PID: " . getmypid() . " | 子プロセスのPID: {$pid}\n";
echo "[親] 親プロセスの処理を実行中...\n";
sleep(2);
echo "[親] 処理完了\n";
// 子プロセスの終了を待つ
pcntl_wait($status);
echo "[親] 子プロセスが終了しました\n";
} else {
// 子プロセス($pid === 0)
echo "[子] PID: " . getmypid() . " | 親プロセスのPID: " . posix_getppid() . "\n";
echo "[子] 子プロセスの処理を実行中...\n";
sleep(1);
echo "[子] 処理完了\n";
exit(0); // 子プロセスを明示的に終了
}
echo "[親] プログラム終了\n";
?>
基本的な使用例
シンプルな子プロセスの作成
<?php
function createChildProcess($child_id) {
$pid = pcntl_fork();
if ($pid === -1) {
die("子プロセス{$child_id}の作成に失敗しました\n");
} elseif ($pid) {
// 親プロセス
return $pid;
} else {
// 子プロセス
echo "[子{$child_id}] 開始 (PID: " . getmypid() . ")\n";
// 何か処理を実行
$sleep_time = rand(1, 3);
sleep($sleep_time);
echo "[子{$child_id}] 完了 ({$sleep_time}秒)\n";
exit(0);
}
}
// 3つの子プロセスを作成
echo "=== 複数の子プロセス作成 ===\n\n";
$children = [];
for ($i = 1; $i <= 3; $i++) {
$pid = createChildProcess($i);
$children[] = $pid;
echo "[親] 子プロセス{$i}を作成しました (PID: {$pid})\n";
}
echo "\n[親] すべての子プロセスの終了を待機中...\n\n";
// すべての子プロセスが終了するまで待つ
foreach ($children as $pid) {
pcntl_waitpid($pid, $status);
echo "[親] 子プロセス {$pid} が終了しました\n";
}
echo "\n[親] すべての処理が完了しました\n";
?>
変数のスコープとコピー
<?php
/**
* フォーク時の変数のコピー動作
*/
$shared_before_fork = "フォーク前の値";
$counter = 0;
echo "フォーク前:\n";
echo " shared_before_fork: {$shared_before_fork}\n";
echo " counter: {$counter}\n\n";
$pid = pcntl_fork();
if ($pid === -1) {
die("フォーク失敗\n");
} elseif ($pid) {
// 親プロセス
$shared_before_fork = "親プロセスで変更";
$counter = 100;
sleep(1); // 子プロセスの処理を待つ
echo "[親] 変数の状態:\n";
echo " shared_before_fork: {$shared_before_fork}\n";
echo " counter: {$counter}\n\n";
pcntl_wait($status);
} else {
// 子プロセス
$shared_before_fork = "子プロセスで変更";
$counter = 200;
echo "[子] 変数の状態:\n";
echo " shared_before_fork: {$shared_before_fork}\n";
echo " counter: {$counter}\n\n";
exit(0);
}
echo "重要: 親と子のメモリ空間は独立しています\n";
?>
実践的な活用例
1. 並列データ処理
<?php
class ParallelProcessor {
private $max_workers;
private $workers = [];
public function __construct($max_workers = 4) {
$this->max_workers = $max_workers;
}
/**
* データを並列処理
*/
public function process(array $data, callable $callback) {
$chunks = array_chunk($data, ceil(count($data) / $this->max_workers));
echo "データを{$this->max_workers}個のワーカーに分割しました\n";
echo "チャンク数: " . count($chunks) . "\n";
echo "各チャンクのサイズ: " . implode(', ', array_map('count', $chunks)) . "\n\n";
foreach ($chunks as $index => $chunk) {
$pid = pcntl_fork();
if ($pid === -1) {
die("ワーカー{$index}の起動に失敗しました\n");
} elseif ($pid) {
// 親プロセス
$this->workers[$pid] = [
'id' => $index,
'pid' => $pid,
'chunk_size' => count($chunk),
'start_time' => time()
];
echo "[親] ワーカー{$index}を起動 (PID: {$pid}, データ数: " . count($chunk) . ")\n";
} else {
// 子プロセス
$this->workerProcess($index, $chunk, $callback);
exit(0);
}
}
echo "\n[親] すべてのワーカーの完了を待機中...\n\n";
// すべてのワーカーの終了を待つ
$this->waitForWorkers();
}
/**
* ワーカープロセスの処理
*/
private function workerProcess($worker_id, array $data, callable $callback) {
echo "[ワーカー{$worker_id}] 処理開始 (PID: " . getmypid() . ", データ数: " . count($data) . ")\n";
$start_time = microtime(true);
$processed = 0;
foreach ($data as $item) {
call_user_func($callback, $item);
$processed++;
}
$elapsed = microtime(true) - $start_time;
echo "[ワーカー{$worker_id}] 完了: {$processed}件処理 (所要時間: " .
number_format($elapsed, 2) . "秒)\n";
}
/**
* すべてのワーカーの終了を待つ
*/
private function waitForWorkers() {
$completed = 0;
$total_time = 0;
while (count($this->workers) > 0) {
$pid = pcntl_wait($status);
if (isset($this->workers[$pid])) {
$worker = $this->workers[$pid];
$duration = time() - $worker['start_time'];
$total_time += $duration;
echo "[親] ワーカー{$worker['id']} (PID: {$pid}) が終了 (実行時間: {$duration}秒)\n";
unset($this->workers[$pid]);
$completed++;
}
}
echo "\n[親] すべてのワーカーが完了しました\n";
echo "完了したワーカー数: {$completed}\n";
echo "合計実行時間: {$total_time}秒\n";
}
}
// 使用例
$processor = new ParallelProcessor(4);
// 処理対象のデータ
$data = range(1, 20);
echo "=== 並列データ処理の実行 ===\n\n";
$start_time = microtime(true);
$processor->process($data, function($item) {
// 各アイテムの処理(重い処理をシミュレート)
usleep(500000); // 0.5秒
// 実際の処理: データベース更新、API呼び出しなど
});
$total_elapsed = microtime(true) - $start_time;
echo "\n総処理時間: " . number_format($total_elapsed, 2) . "秒\n";
echo "(シリアル処理の場合: " . (count($data) * 0.5) . "秒かかるところ)\n";
?>
2. ワーカープールの実装
<?php
class WorkerPool {
private $max_workers;
private $workers = [];
private $tasks = [];
private $results = [];
public function __construct($max_workers = 5) {
$this->max_workers = $max_workers;
// シグナルハンドラ設定
pcntl_async_signals(true);
pcntl_signal(SIGCHLD, [$this, 'handleChildExit']);
}
/**
* タスクをプールに追加
*/
public function addTask($task_id, callable $callback, array $args = []) {
$this->tasks[] = [
'id' => $task_id,
'callback' => $callback,
'args' => $args,
'status' => 'pending'
];
}
/**
* プールを実行
*/
public function run() {
echo "ワーカープールを開始します\n";
echo "最大ワーカー数: {$this->max_workers}\n";
echo "タスク数: " . count($this->tasks) . "\n\n";
$task_index = 0;
while ($task_index < count($this->tasks) || count($this->workers) > 0) {
// 空きワーカーがあればタスクを割り当て
while (count($this->workers) < $this->max_workers && $task_index < count($this->tasks)) {
$task = $this->tasks[$task_index];
$this->spawnWorker($task);
$task_index++;
}
// 少し待機
usleep(100000); // 0.1秒
}
echo "\nすべてのタスクが完了しました\n";
$this->displayResults();
}
/**
* ワーカーを起動
*/
private function spawnWorker($task) {
$pid = pcntl_fork();
if ($pid === -1) {
die("ワーカーの起動に失敗しました\n");
} elseif ($pid) {
// 親プロセス
$this->workers[$pid] = [
'task_id' => $task['id'],
'pid' => $pid,
'start_time' => microtime(true)
];
echo "[ワーカー起動] タスク: {$task['id']}, PID: {$pid}\n";
} else {
// 子プロセス
$result = $this->executeTask($task);
// 結果を共有メモリやファイルに保存
$this->saveResult($task['id'], $result);
exit(0);
}
}
/**
* タスクを実行
*/
private function executeTask($task) {
$start = microtime(true);
try {
$result = call_user_func_array($task['callback'], $task['args']);
$elapsed = microtime(true) - $start;
return [
'success' => true,
'result' => $result,
'elapsed' => $elapsed
];
} catch (Exception $e) {
return [
'success' => false,
'error' => $e->getMessage(),
'elapsed' => microtime(true) - $start
];
}
}
/**
* 結果を保存
*/
private function saveResult($task_id, $result) {
$result_file = "/tmp/worker_result_{$task_id}_" . getmypid() . ".json";
file_put_contents($result_file, json_encode($result));
}
/**
* 子プロセス終了ハンドラ
*/
public function handleChildExit($signo) {
while (($pid = pcntl_waitpid(-1, $status, WNOHANG)) > 0) {
if (isset($this->workers[$pid])) {
$worker = $this->workers[$pid];
$duration = microtime(true) - $worker['start_time'];
echo "[ワーカー終了] タスク: {$worker['task_id']}, PID: {$pid}, " .
"実行時間: " . number_format($duration, 2) . "秒\n";
// 結果を読み込み
$result_file = "/tmp/worker_result_{$worker['task_id']}_{$pid}.json";
if (file_exists($result_file)) {
$result = json_decode(file_get_contents($result_file), true);
$this->results[$worker['task_id']] = $result;
unlink($result_file);
}
unset($this->workers[$pid]);
}
}
}
/**
* 結果を表示
*/
private function displayResults() {
echo "\n=== 処理結果 ===\n";
$total_time = 0;
$success_count = 0;
foreach ($this->results as $task_id => $result) {
$status = $result['success'] ? '✓' : '✗';
$total_time += $result['elapsed'];
if ($result['success']) {
$success_count++;
echo "[{$status}] タスク {$task_id}: " .
number_format($result['elapsed'], 2) . "秒\n";
} else {
echo "[{$status}] タスク {$task_id}: エラー - {$result['error']}\n";
}
}
echo "\n成功: {$success_count} / " . count($this->results) . "\n";
echo "合計処理時間: " . number_format($total_time, 2) . "秒\n";
}
}
// 使用例
$pool = new WorkerPool(3);
// タスクを追加
for ($i = 1; $i <= 10; $i++) {
$pool->addTask("task_{$i}", function($task_num) {
// 重い処理をシミュレート
$sleep_time = rand(1, 3);
sleep($sleep_time);
return "タスク{$task_num}の結果({$sleep_time}秒)";
}, [$i]);
}
// 実行
$pool->run();
?>
3. マスター・ワーカーパターン
<?php
class MasterWorkerPattern {
private $num_workers;
private $workers = [];
private $jobs = [];
private $results = [];
public function __construct($num_workers = 4) {
$this->num_workers = $num_workers;
// シグナルハンドラ
pcntl_async_signals(true);
pcntl_signal(SIGTERM, [$this, 'shutdown']);
pcntl_signal(SIGINT, [$this, 'shutdown']);
}
/**
* ジョブを追加
*/
public function addJob($job_data) {
$this->jobs[] = $job_data;
}
/**
* マスタープロセスを開始
*/
public function start() {
echo "=== マスター・ワーカーパターン ===\n";
echo "マスターPID: " . getmypid() . "\n";
echo "ワーカー数: {$this->num_workers}\n";
echo "ジョブ数: " . count($this->jobs) . "\n\n";
// ワーカーを起動
for ($i = 0; $i < $this->num_workers; $i++) {
$this->spawnWorker($i);
}
// ジョブをワーカーに配布
$this->distributeJobs();
// すべてのワーカーの終了を待つ
$this->waitForWorkers();
echo "\nマスタープロセス終了\n";
}
/**
* ワーカーを起動
*/
private function spawnWorker($worker_id) {
// パイプを作成(親→子への通信用)
$pipe = [];
if (!socket_create_pair(AF_UNIX, SOCK_STREAM, 0, $pipe)) {
die("パイプの作成に失敗しました\n");
}
$pid = pcntl_fork();
if ($pid === -1) {
die("ワーカー{$worker_id}の起動に失敗しました\n");
} elseif ($pid) {
// 親プロセス(マスター)
socket_close($pipe[1]); // 子プロセス側を閉じる
$this->workers[$pid] = [
'id' => $worker_id,
'pid' => $pid,
'socket' => $pipe[0],
'jobs_processed' => 0
];
echo "[マスター] ワーカー{$worker_id}を起動 (PID: {$pid})\n";
} else {
// 子プロセス(ワーカー)
socket_close($pipe[0]); // 親プロセス側を閉じる
$this->workerProcess($worker_id, $pipe[1]);
exit(0);
}
}
/**
* ジョブを配布
*/
private function distributeJobs() {
echo "\n[マスター] ジョブを配布開始\n";
foreach ($this->jobs as $index => $job) {
// ラウンドロビン方式でワーカーを選択
$worker_pids = array_keys($this->workers);
$selected_pid = $worker_pids[$index % count($worker_pids)];
$worker = $this->workers[$selected_pid];
$job_data = json_encode($job) . "\n";
socket_write($worker['socket'], $job_data, strlen($job_data));
$this->workers[$selected_pid]['jobs_processed']++;
echo "[マスター] ジョブ{$index}をワーカー{$worker['id']}に送信\n";
}
// 終了シグナルを送信
foreach ($this->workers as $pid => $worker) {
socket_write($worker['socket'], "EXIT\n", 5);
socket_close($worker['socket']);
}
echo "[マスター] すべてのジョブを配布しました\n\n";
}
/**
* ワーカープロセス
*/
private function workerProcess($worker_id, $socket) {
echo "[ワーカー{$worker_id}] 起動 (PID: " . getmypid() . ")\n";
while (true) {
$data = socket_read($socket, 1024);
if ($data === false || trim($data) === 'EXIT') {
break;
}
$job = json_decode(trim($data), true);
if ($job) {
echo "[ワーカー{$worker_id}] ジョブ処理中: {$job['name']}\n";
// ジョブを処理
sleep(rand(1, 2)); // 処理をシミュレート
echo "[ワーカー{$worker_id}] ジョブ完了: {$job['name']}\n";
}
}
socket_close($socket);
echo "[ワーカー{$worker_id}] 終了\n";
}
/**
* すべてのワーカーの終了を待つ
*/
private function waitForWorkers() {
echo "[マスター] ワーカーの終了を待機中...\n";
foreach ($this->workers as $pid => $worker) {
pcntl_waitpid($pid, $status);
echo "[マスター] ワーカー{$worker['id']} (PID: {$pid}) 終了 " .
"(処理ジョブ数: {$worker['jobs_processed']})\n";
}
}
/**
* シャットダウンハンドラ
*/
public function shutdown($signo) {
echo "\n[マスター] シャットダウンシグナルを受信\n";
// すべてのワーカーに終了シグナルを送信
foreach ($this->workers as $pid => $worker) {
posix_kill($pid, SIGTERM);
}
exit(0);
}
}
// 使用例
$master = new MasterWorkerPattern(3);
// ジョブを追加
for ($i = 1; $i <= 9; $i++) {
$master->addJob([
'name' => "Job_{$i}",
'data' => "データ{$i}"
]);
}
// 実行
$master->start();
?>
4. フォークボム対策とリソース管理
<?php
class SafeForkManager {
private $max_children = 10;
private $children = [];
private $fork_count = 0;
private $max_fork_rate = 5; // 秒あたりの最大フォーク数
private $fork_times = [];
public function __construct($max_children = 10, $max_fork_rate = 5) {
$this->max_children = $max_children;
$this->max_fork_rate = $max_fork_rate;
echo "SafeForkManager初期化\n";
echo " 最大子プロセス数: {$this->max_children}\n";
echo " 最大フォーク率: {$this->max_fork_rate}/秒\n\n";
}
/**
* 安全に子プロセスを作成
*/
public function safeFork() {
// 1. 子プロセス数の制限チェック
if (count($this->children) >= $this->max_children) {
echo "⚠️ 警告: 最大子プロセス数に達しました\n";
return false;
}
// 2. フォーク率の制限チェック
if (!$this->checkForkRate()) {
echo "⚠️ 警告: フォーク率が制限を超えています\n";
sleep(1);
return $this->safeFork(); // リトライ
}
// 3. システムリソースチェック
if (!$this->checkSystemResources()) {
echo "⚠️ 警告: システムリソースが不足しています\n";
return false;
}
// 4. フォーク実行
$pid = pcntl_fork();
if ($pid === -1) {
echo "✗ エラー: フォークに失敗しました\n";
return false;
} elseif ($pid) {
// 親プロセス
$this->children[$pid] = [
'pid' => $pid,
'start_time' => time(),
'fork_time' => microtime(true)
];
$this->fork_count++;
$this->fork_times[] = time();
echo "✓ 子プロセス作成 (PID: {$pid}, 総数: " . count($this->children) . ")\n";
return $pid;
} else {
// 子プロセス
return 0;
}
}
/**
* フォーク率をチェック
*/
private function checkForkRate() {
$now = time();
// 古いエントリを削除(1秒以上前)
$this->fork_times = array_filter($this->fork_times, function($time) use ($now) {
return ($now - $time) < 1;
});
return count($this->fork_times) < $this->max_fork_rate;
}
/**
* システムリソースをチェック
*/
private function checkSystemResources() {
// メモリ使用量チェック
$memory_usage = memory_get_usage(true);
$memory_limit = $this->parseMemoryLimit(ini_get('memory_limit'));
if ($memory_limit > 0) {
$memory_percent = ($memory_usage / $memory_limit) * 100;
if ($memory_percent > 80) {
echo "⚠️ メモリ使用率: " . number_format($memory_percent, 1) . "%\n";
return false;
}
}
// プロセス数チェック(簡易版)
if (function_exists('sys_getloadavg')) {
$load = sys_getloadavg();
$cpu_count = $this->getCpuCount();
if ($load[0] > $cpu_count * 2) {
echo "⚠️ システム負荷が高すぎます: " . number_format($load[0], 2) . "\n";
return false;
}
}
return true;
}
/**
* メモリ制限をパース
*/
private function parseMemoryLimit($limit) {
if ($limit === '-1') {
return -1;
}
$unit = strtoupper(substr($limit, -1));
$value = (int)$limit;
switch ($unit) {
case 'G': return $value * 1024 * 1024 * 1024;
case 'M': return $value * 1024 * 1024;
case 'K': return $value * 1024;
default: return $value;
}
}
/**
* CPU数を取得
*/
private function getCpuCount() {
if (stripos(PHP_OS, 'WIN') === 0) {
return (int)getenv('NUMBER_OF_PROCESSORS') ?: 1;
} else {
$cpuinfo = @file_get_contents('/proc/cpuinfo');
if ($cpuinfo) {
return substr_count($cpuinfo, 'processor');
}
}
return 1;
}
/**
* 子プロセスを回収
*/
public function reapChildren($blocking = false) {
$flag = $blocking ? 0 : WNOHANG;
while (($pid = pcntl_waitpid(-1, $status, $flag)) > 0) {
if (isset($this->children[$pid])) {
$uptime = time() - $this->children[$pid]['start_time'];
echo "✓ 子プロセス終了 (PID: {$pid}, 稼働時間: {$uptime}秒)\n";
unset($this->children[$pid]);
}
}
}
/**
* すべての子プロセスを待つ
*/
public function waitAll() {
echo "\nすべての子プロセスの終了を待機中...\n";
while (count($this->children) > 0) {
$this->reapChildren(true);
}
echo "すべての子プロセスが終了しました\n";
echo "総フォーク数: {$this->fork_count}\n";
}
/**
* 統計情報を表示
*/
public function displayStats() {
echo "\n=== フォーク統計 ===\n";
echo "現在の子プロセス数: " . count($this->children) . " / {$this->max_children}\n";
echo "総フォーク数: {$this->fork_count}\n";
echo "メモリ使用量: " . number_format(memory_get_usage(true) / 1024 / 1024, 2) . " MB\n";
if (function_exists('sys_getloadavg')) {
$load = sys_getloadavg();
echo "システム負荷: " . implode(', ', array_map(function($v) {
return number_format($v, 2);
}, $load)) . "\n";
}
echo "==================\n\n";
}
}
// 使用例
$manager = new SafeForkManager(5, 3);
echo "=== 安全なフォーク管理のデモ ===\n\n";
for ($i = 1; $i <= 8; $i++) {
$pid = $manager->safeFork();
if ($pid === 0) {
// 子プロセス
echo "[子{$i}] 処理開始 (PID: " . getmypid() . ")\n";
sleep(rand(2, 4));
echo "[子{$i}] 処理完了\n";
exit(0);
} elseif ($pid === false) {
echo "子プロセス{$i}の作成をスキップしました\n";
}
// 定期的に終了した子プロセスを回収
$manager->reapChildren();
// 統計表示
if ($i % 3 === 0) {
$manager->displayStats();
}
usleep(500000); // 0.5秒待機
}
$manager->waitAll();
$manager->displayStats();
?>
重要な注意点とベストプラクティス
ゾンビプロセスの防止
<?php
/**
* ゾンビプロセスを防ぐ方法
*/
echo "=== ゾンビプロセス対策 ===\n\n";
// 方法1: pcntl_waitを使う
function preventZombiesWithWait() {
echo "方法1: pcntl_wait使用\n";
$pid = pcntl_fork();
if ($pid === 0) {
echo "[子] 処理実行中...\n";
sleep(1);
exit(0);
} else {
echo "[親] 子プロセス (PID: {$pid}) を待機\n";
pcntl_wait($status); // 子プロセスの終了を待つ
echo "[親] 子プロセス終了(ゾンビにならない)\n";
}
}
// 方法2: SIGCHLDシグナルハンドラを使う
function preventZombiesWithSignal() {
echo "\n方法2: SIGCHLDシグナルハンドラ使用\n";
pcntl_async_signals(true);
pcntl_signal(SIGCHLD, function($signo) {
while (($pid = pcntl_waitpid(-1, $status, WNOHANG)) > 0) {
echo "[親] 子プロセス {$pid} を回収(ゾンビ防止)\n";
}
});
for ($i = 1; $i <= 3; $i++) {
$pid = pcntl_fork();
if ($pid === 0) {
echo "[子{$i}] 処理実行\n";
sleep(1);
exit(0);
} else {
echo "[親] 子プロセス{$i} (PID: {$pid}) を起動\n";
}
}
echo "[親] メイン処理継続中...\n";
sleep(3);
echo "[親] 完了\n";
}
// 方法3: SIG_IGNを設定
function preventZombiesWithIgnore() {
echo "\n方法3: SIGCHLD無視(自動回収)\n";
// この方法は一部のシステムでのみ動作
pcntl_signal(SIGCHLD, SIG_IGN);
$pid = pcntl_fork();
if ($pid === 0) {
echo "[子] 処理実行\n";
sleep(1);
exit(0);
} else {
echo "[親] 子プロセスは自動的に回収されます\n";
sleep(2);
}
}
preventZombiesWithWait();
preventZombiesWithSignal();
// preventZombiesWithIgnore(); // 環境によって動作が異なる
?>
メモリとリソースの管理
<?php
/**
* フォーク時のメモリ管理
*/
class MemoryEfficientFork {
/**
* メモリ効率的なフォーク
*/
public static function demonstrateMemoryBehavior() {
echo "=== フォーク時のメモリ動作 ===\n\n";
// 大きなデータを作成
$large_data = str_repeat('x', 10 * 1024 * 1024); // 10MB
echo "フォーク前のメモリ使用量: " .
number_format(memory_get_usage(true) / 1024 / 1024, 2) . " MB\n";
$pid = pcntl_fork();
if ($pid === 0) {
// 子プロセス
echo "[子] メモリ使用量: " .
number_format(memory_get_usage(true) / 1024 / 1024, 2) . " MB\n";
// Copy-on-Write: 変更するまでメモリはコピーされない
echo "[子] データを変更します...\n";
$large_data[0] = 'y'; // これでメモリがコピーされる
echo "[子] 変更後のメモリ使用量: " .
number_format(memory_get_usage(true) / 1024 / 1024, 2) . " MB\n";
exit(0);
} else {
// 親プロセス
sleep(1);
pcntl_wait($status);
echo "[親] 子プロセス終了後のメモリ使用量: " .
number_format(memory_get_usage(true) / 1024 / 1024, 2) . " MB\n";
}
}
/**
* リソースのクリーンアップ
*/
public static function demonstrateResourceCleanup() {
echo "\n=== リソースのクリーンアップ ===\n\n";
// データベース接続をシミュレート
$db_connection = ['host' => 'localhost', 'connected' => true];
$pid = pcntl_fork();
if ($pid === 0) {
// 子プロセス
echo "[子] データベース接続を再確立します\n";
// 重要: 子プロセスでは親の接続を使わず、新しい接続を作る
$db_connection = null; // 親の接続を破棄
$db_connection = ['host' => 'localhost', 'connected' => true, 'pid' => getmypid()];
echo "[子] 処理実行中(独自の接続)\n";
sleep(1);
// クリーンアップ
echo "[子] 接続をクローズ\n";
$db_connection = null;
exit(0);
} else {
// 親プロセス
pcntl_wait($status);
echo "[親] 親プロセスの接続は影響を受けていません\n";
}
}
}
MemoryEfficientFork::demonstrateMemoryBehavior();
MemoryEfficientFork::demonstrateResourceCleanup();
?>
エラーハンドリングとデバッグ
<?php
/**
* フォークのエラーハンドリング
*/
class ForkErrorHandler {
/**
* 包括的なエラーハンドリング
*/
public static function safeForkWithErrorHandling($callback) {
// フォーク前のチェック
if (!function_exists('pcntl_fork')) {
throw new RuntimeException("pcntl_fork関数が利用できません");
}
if (php_sapi_name() !== 'cli') {
throw new RuntimeException("CLI環境が必要です");
}
try {
$pid = pcntl_fork();
if ($pid === -1) {
// フォーク失敗
$error = pcntl_strerror(pcntl_get_last_error());
throw new RuntimeException("フォーク失敗: {$error}");
} elseif ($pid) {
// 親プロセス
return [
'success' => true,
'pid' => $pid,
'is_parent' => true
];
} else {
// 子プロセス
try {
$result = call_user_func($callback);
exit(0); // 正常終了
} catch (Exception $e) {
error_log("[子プロセス] エラー: " . $e->getMessage());
exit(1); // エラー終了
}
}
} catch (Exception $e) {
error_log("フォークエラー: " . $e->getMessage());
return [
'success' => false,
'error' => $e->getMessage()
];
}
}
/**
* 子プロセスの終了ステータスを確認
*/
public static function checkChildStatus($pid) {
$status = 0;
$result = pcntl_waitpid($pid, $status);
if ($result === -1) {
echo "エラー: 子プロセスの待機に失敗\n";
return null;
}
$info = [
'pid' => $pid,
'exited' => pcntl_wifexited($status),
'exit_code' => pcntl_wexitstatus($status),
'signaled' => pcntl_wifsignaled($status),
'stopped' => pcntl_wifstopped($status)
];
if ($info['signaled']) {
$info['term_signal'] = pcntl_wtermsig($status);
}
if ($info['stopped']) {
$info['stop_signal'] = pcntl_wstopsig($status);
}
return $info;
}
/**
* デバッグ情報の表示
*/
public static function displayProcessInfo() {
echo "=== プロセス情報 ===\n";
echo "PID: " . getmypid() . "\n";
echo "親PID: " . posix_getppid() . "\n";
echo "UID: " . posix_getuid() . "\n";
echo "GID: " . posix_getgid() . "\n";
echo "プロセスグループID: " . posix_getpgid(getmypid()) . "\n";
echo "==================\n";
}
}
// 使用例
echo "=== エラーハンドリングのデモ ===\n\n";
// 正常なフォーク
$result = ForkErrorHandler::safeForkWithErrorHandling(function() {
echo "[子] 正常に処理を実行\n";
sleep(1);
});
if ($result['success'] && isset($result['is_parent'])) {
echo "[親] 子プロセス (PID: {$result['pid']}) を作成しました\n";
$status = ForkErrorHandler::checkChildStatus($result['pid']);
if ($status['exited']) {
echo "[親] 子プロセスは正常終了しました(終了コード: {$status['exit_code']})\n";
}
}
echo "\n";
ForkErrorHandler::displayProcessInfo();
?>
パフォーマンス最適化
<?php
/**
* フォークのパフォーマンス最適化
*/
class ForkPerformanceOptimization {
/**
* 最適なワーカー数を決定
*/
public static function determineOptimalWorkers() {
$cpu_count = self::getCpuCount();
// CPU数に基づいて最適なワーカー数を決定
// CPU-boundタスク: CPU数と同じ
// I/O-boundタスク: CPU数の2〜4倍
echo "CPU数: {$cpu_count}\n";
echo "推奨ワーカー数:\n";
echo " CPU-boundタスク: {$cpu_count}\n";
echo " I/O-boundタスク: " . ($cpu_count * 2) . " 〜 " . ($cpu_count * 4) . "\n";
return $cpu_count;
}
/**
* CPU数を取得
*/
private static function getCpuCount() {
if (stripos(PHP_OS, 'WIN') === 0) {
return (int)getenv('NUMBER_OF_PROCESSORS') ?: 1;
}
$cpuinfo = @file_get_contents('/proc/cpuinfo');
if ($cpuinfo) {
return substr_count($cpuinfo, 'processor');
}
return 1;
}
/**
* ベンチマーク: シリアル vs 並列処理
*/
public static function benchmark($data, $num_workers = 4) {
echo "\n=== パフォーマンスベンチマーク ===\n";
echo "データ数: " . count($data) . "\n";
echo "ワーカー数: {$num_workers}\n\n";
// シリアル処理
$start = microtime(true);
foreach ($data as $item) {
self::processItem($item);
}
$serial_time = microtime(true) - $start;
echo "シリアル処理時間: " . number_format($serial_time, 2) . "秒\n";
// 並列処理
$start = microtime(true);
self::parallelProcess($data, $num_workers);
$parallel_time = microtime(true) - $start;
echo "並列処理時間: " . number_format($parallel_time, 2) . "秒\n";
$speedup = $serial_time / $parallel_time;
echo "速度向上: " . number_format($speedup, 2) . "倍\n";
echo "効率: " . number_format(($speedup / $num_workers) * 100, 1) . "%\n";
}
/**
* アイテムを処理
*/
private static function processItem($item) {
usleep(100000); // 0.1秒の処理をシミュレート
}
/**
* 並列処理
*/
private static function parallelProcess($data, $num_workers) {
$chunks = array_chunk($data, ceil(count($data) / $num_workers));
$pids = [];
foreach ($chunks as $chunk) {
$pid = pcntl_fork();
if ($pid === 0) {
foreach ($chunk as $item) {
self::processItem($item);
}
exit(0);
} else {
$pids[] = $pid;
}
}
foreach ($pids as $pid) {
pcntl_waitpid($pid, $status);
}
}
}
// ベンチマーク実行
$optimal_workers = ForkPerformanceOptimization::determineOptimalWorkers();
$test_data = range(1, 20);
ForkPerformanceOptimization::benchmark($test_data, $optimal_workers);
?>
まとめ
pcntl_fork
関数は、PHPでマルチプロセス処理を実現する強力な機能です。
主な利点:
- 並列処理: 複数の処理を同時実行してパフォーマンス向上
- リソース分離: 各プロセスが独立したメモリ空間を持つ
- 安定性: 1つのプロセスがクラッシュしても他に影響しない
主な用途:
- 大量データの並列処理
- ワーカープールの実装
- バックグラウンドジョブの実行
- 負荷分散処理
重要な注意点:
- CLI環境専用(Web環境では使用不可)
- UNIX/Linux系OS専用(Windowsでは動作しない)
- ゾンビプロセス対策が必須
- リソース管理が重要
- Copy-on-Writeの理解が必要
- 子プロセスで必ず
exit()
を呼ぶ
ベストプラクティス:
- SIGCHLDハンドラでゾンビプロセスを防止
- 最大プロセス数を制限
- メモリとCPUリソースを監視
- エラーハンドリングを適切に実装
- データベース接続などのリソースを子プロセスで再作成
適切に使用することで、PHPでも高度な並列処理が可能になり、大規模データ処理やバッチ処理のパフォーマンスを劇的に向上させることができます。
マルチプロセス処理は強力ですが、適切な管理が不可欠です。この記事を参考に、安全で効率的な並列処理システムを構築してください。