[PHP]stream_socket_sendtoでUDPデータグラムを送信する|接続なし送信から高度な活用パターンまで実践ガイド

PHP

はじめに

PHPでUDP通信を行う際、fwrite でも書き込みは可能ですが、送信先アドレスをその場で指定できないため、複数の宛先へ柔軟に送り分けることができません。

stream_socket_sendto は、送信先アドレスをその都度指定してデータグラムを送信できる関数です。1つのソケットから複数の宛先へ送信できるため、ブロードキャスト・マルチキャスト・ラウンドロビン配信など、UDPならではの柔軟な通信パターンを実現できます。

この記事では、基本的な使い方から実践的なクラス実装まで、stream_socket_recvfrom との対比も交えながら丁寧に解説します。


stream_socket_sendto とは

項目内容
関数名stream_socket_sendto
PHPバージョンPHP 5.1.0以降
カテゴリストリーム関数
返り値int(送信バイト数)、false(失敗時)

構文

stream_socket_sendto(
    resource $socket,
    string   $data,
    int      $flags   = 0,
    string   $address = ''
): int|false

パラメータ

パラメータ説明
$socketresource送信に使うストリームソケット
$datastring送信するデータ
$flagsintフラグ(下表参照)
$addressstring送信先アドレス(例:"192.168.1.1:8080")。省略時は接続済みソケットの相手先

フラグ定数

定数説明
00通常の送信(デフォルト)
STREAM_OOB1帯域外データ(OOB)として送信

返り値

意味
int実際に送信されたバイト数
false送信失敗

注意: 返り値が 0 の場合も送信失敗の可能性があります。=== false で明示的にチェックしてください。


fwrite との違い

【fwrite】
  接続済みソケット専用。送信先は接続時に確定。

  $socket = stream_socket_client('tcp://example.com:80', ...);
  fwrite($socket, $data);   // 宛先は固定(example.com:80)

【stream_socket_sendto】
  1つのソケットから任意の宛先に送信可能。

  $socket = stream_socket_client('udp://0.0.0.0:0', ...);  // 送信専用ソケット

  stream_socket_sendto($socket, $data1, 0, '192.168.1.1:8080');  // 宛先A
  stream_socket_sendto($socket, $data2, 0, '192.168.1.2:8080');  // 宛先B
  stream_socket_sendto($socket, $data3, 0, '192.168.1.3:8080');  // 宛先C
  // → 1つのソケットで複数宛先に送り分けできる

基本的な使い方

<?php
// 送信用UDPソケットを作成
$socket = stream_socket_client(
    'udp://127.0.0.1:0',   // ポート0 = OSが空きポートを割り当て
    $errno,
    $errstr,
    1.0
);

if ($socket === false) {
    die("ソケット作成失敗 [{$errno}]: {$errstr}");
}

// 任意の宛先に送信
$data  = "Hello, UDP!\n";
$bytes = stream_socket_sendto($socket, $data, 0, '127.0.0.1:8080');

if ($bytes === false) {
    echo "送信失敗" . PHP_EOL;
} else {
    echo "送信成功: {$bytes} バイト → 127.0.0.1:8080" . PHP_EOL;
}

fclose($socket);

実践例(クラスを使った実装)

例1:シンプルなUDPクライアント送信クラス

送信・エラーハンドリング・送信ログを備えた基本クラスです。

<?php

class UdpSender
{
    private $socket;
    private string $defaultTarget;
    private array  $sendLog = [];
    private int    $totalBytes = 0;

    public function __construct(string $defaultTarget = '')
    {
        $this->defaultTarget = $defaultTarget;

        // ポート0でバインド(OSが空きポートを割り当て)
        $this->socket = stream_socket_client(
            'udp://0.0.0.0:0',
            $errno,
            $errstr,
            1.0
        );

        if ($this->socket === false) {
            throw new RuntimeException("UDPソケット作成失敗 [{$errno}]: {$errstr}");
        }
    }

    /**
     * 指定アドレスにデータを送信する
     */
    public function send(string $data, string $target = ''): int
    {
        $address = $target ?: $this->defaultTarget;

        if ($address === '') {
            throw new InvalidArgumentException("送信先アドレスを指定してください");
        }

        $bytes = stream_socket_sendto($this->socket, $data, 0, $address);

        if ($bytes === false) {
            throw new RuntimeException("送信失敗 → {$address}");
        }

        $this->totalBytes += $bytes;
        $this->sendLog[] = [
            'time'    => date('H:i:s'),
            'target'  => $address,
            'bytes'   => $bytes,
            'preview' => mb_substr($data, 0, 40),
        ];

        return $bytes;
    }

    /**
     * 複数の宛先に同じデータを送信する
     */
    public function multicast(string $data, array $targets): array
    {
        $results = [];
        foreach ($targets as $target) {
            try {
                $results[$target] = $this->send($data, $target);
            } catch (RuntimeException $e) {
                $results[$target] = false;
            }
        }
        return $results;
    }

    public function getLocalAddress(): string
    {
        return stream_socket_get_name($this->socket, false) ?: 'unknown';
    }

    public function getTotalBytes(): int
    {
        return $this->totalBytes;
    }

    public function printLog(): void
    {
        echo "=== 送信ログ ===" . PHP_EOL;
        echo str_pad("時刻",     10)
           . str_pad("送信先",   24)
           . str_pad("バイト数",  10)
           . "データ" . PHP_EOL;
        echo str_repeat('-', 60) . PHP_EOL;

        foreach ($this->sendLog as $entry) {
            echo str_pad($entry['time'],   10)
               . str_pad($entry['target'], 24)
               . str_pad($entry['bytes'],  10)
               . $entry['preview'] . PHP_EOL;
        }

        echo "合計送信: {$this->totalBytes} bytes / "
           . count($this->sendLog) . " パケット" . PHP_EOL;
    }

    public function close(): void
    {
        if (is_resource($this->socket)) fclose($this->socket);
    }
}

// 使用例
$sender = new UdpSender(defaultTarget: '127.0.0.1:19100');

echo "ローカルアドレス: " . $sender->getLocalAddress() . PHP_EOL . PHP_EOL;

// 単一送信
$sender->send("ping\n");
$sender->send(json_encode(['type' => 'heartbeat', 'ts' => time()]) . "\n");

// 複数宛先に同報
$results = $sender->multicast(
    "broadcast message\n",
    ['127.0.0.1:19100', '127.0.0.1:19101', '127.0.0.1:19102']
);

foreach ($results as $target => $bytes) {
    $status = $bytes !== false ? "{$bytes} bytes" : "失敗";
    echo "  → {$target}: {$status}" . PHP_EOL;
}

echo PHP_EOL;
$sender->printLog();
$sender->close();

出力例:

ローカルアドレス: 0.0.0.0:54401

  → 127.0.0.1:19100: 18 bytes
  → 127.0.0.1:19101: 18 bytes
  → 127.0.0.1:19102: 18 bytes

=== 送信ログ ===
時刻      送信先                  バイト数  データ
------------------------------------------------------------
12:00:01  127.0.0.1:19100         5         ping
12:00:01  127.0.0.1:19100         42        {"type":"heartbeat","ts":1716...
12:00:01  127.0.0.1:19100         18        broadcast message
12:00:01  127.0.0.1:19101         18        broadcast message
12:00:01  127.0.0.1:19102         18        broadcast message
合計送信: 101 bytes / 5 パケット

例2:stream_socket_recvfrom と組み合わせたリクエスト・レスポンス型UDPクライアント

送信後に応答を待つ、問い合わせ型UDPクライアントのパターンです。

<?php

class UdpRequestClient
{
    private $socket;
    private float $timeout;

    public function __construct(float $timeout = 3.0)
    {
        $this->timeout = $timeout;

        $this->socket = stream_socket_client(
            'udp://0.0.0.0:0',
            $errno, $errstr, 1.0
        );

        if ($this->socket === false) {
            throw new RuntimeException("ソケット作成失敗 [{$errno}]: {$errstr}");
        }

        stream_set_blocking($this->socket, false);
    }

    /**
     * データを送信して応答を受け取る
     */
    public function request(string $data, string $server, int $retries = 3): ?array
    {
        for ($attempt = 1; $attempt <= $retries; $attempt++) {
            // 送信
            $sent = stream_socket_sendto($this->socket, $data, 0, $server);

            if ($sent === false) {
                continue;
            }

            // 応答待機
            $response = $this->waitResponse($this->timeout);

            if ($response !== null) {
                $response['attempt']  = $attempt;
                $response['sent_bytes'] = $sent;
                return $response;
            }

            echo "試行 {$attempt}/{$retries}: 応答なし(再送)" . PHP_EOL;
        }

        return null;
    }

    private function waitResponse(float $timeoutSec): ?array
    {
        $deadline = microtime(true) + $timeoutSec;

        while (microtime(true) < $deadline) {
            $read    = [$this->socket];
            $write   = $except = null;
            $remaining = $deadline - microtime(true);
            $sec     = (int) $remaining;
            $usec    = (int) (($remaining - $sec) * 1_000_000);

            $changed = stream_select($read, $write, $except, $sec, $usec);

            if ($changed > 0) {
                $startTime = microtime(true);
                $data      = stream_socket_recvfrom($this->socket, 65535, 0, $from);

                if ($data !== false) {
                    return [
                        'data'       => $data,
                        'from'       => $from,
                        'latency_ms' => round((microtime(true) - $startTime) * 1000, 3),
                    ];
                }
            }
        }

        return null;
    }

    public function close(): void
    {
        if (is_resource($this->socket)) fclose($this->socket);
    }
}

// UDPエコーサーバーをセットアップ(デモ用)
$server = stream_socket_server('udp://127.0.0.1:19103', $e, $es, STREAM_SERVER_BIND);
stream_set_blocking($server, false);

// クライアントからリクエスト
$client = new UdpRequestClient(timeout: 1.0);

$queries = ['ping', 'hello', json_encode(['cmd' => 'status'])];

foreach ($queries as $query) {
    // サーバー側で受信してエコーバック(デモ用インライン処理)
    stream_socket_sendto(
        stream_socket_client('udp://127.0.0.1:19103', $e2, $es2, 1),
        '', 0, '127.0.0.1:19103'
    ); // ダミー送信でサーバーを起こす

    // 実際のリクエスト
    $bytes = stream_socket_sendto(
        $client->request('dummy', '127.0.0.1:19103') ? $client->socket ?? null : null,
        $query . "\n", 0, '127.0.0.1:19103'
    ) ?? 0;

    // サーバー側でエコー処理
    $recv = stream_socket_recvfrom($server, 1024, 0, $sender);
    if ($recv !== false && trim($recv) !== '') {
        stream_socket_sendto($server, $recv, 0, $sender);
    }
}

$client->close();
fclose($server);

echo "UdpRequestClient デモ完了" . PHP_EOL;

例3:ラウンドロビンでUDPロードバランサーを実装する

受信したデータグラムを複数のバックエンドへ順番に振り分けます。

<?php

class UdpLoadBalancer
{
    private $frontSocket;
    private array  $backends;
    private int    $currentIndex = 0;
    private array  $stats;
    private $backendSocket;

    public function __construct(string $listenAddress, array $backends)
    {
        $this->backends = $backends;
        $this->stats    = array_fill_keys($backends, ['sent' => 0, 'bytes' => 0]);

        // フロントエンドソケット(受信側)
        $this->frontSocket = stream_socket_server(
            "udp://{$listenAddress}",
            $errno, $errstr,
            STREAM_SERVER_BIND
        );

        if (!$this->frontSocket) {
            throw new RuntimeException("フロント起動失敗 [{$errno}]: {$errstr}");
        }

        // バックエンド送信用ソケット
        $this->backendSocket = stream_socket_client('udp://0.0.0.0:0', $errno, $errstr, 1.0);

        stream_set_blocking($this->frontSocket, false);
        echo "UDPロードバランサー起動: {$listenAddress}" . PHP_EOL;
        echo "バックエンド: " . implode(', ', $backends) . PHP_EOL;
    }

    /**
     * パケットを受信してラウンドロビンで転送する
     */
    public function process(int $maxPackets = 20, float $timeoutSec = 3.0): void
    {
        $deadline  = microtime(true) + $timeoutSec;
        $processed = 0;

        while ($processed < $maxPackets && microtime(true) < $deadline) {
            $read    = [$this->frontSocket];
            $write   = $except = null;
            $changed = stream_select($read, $write, $except, 0, 100_000);

            if (!$changed) continue;

            // フロントエンドで受信
            $data = stream_socket_recvfrom($this->frontSocket, 65535, 0, $client);
            if ($data === false || $data === '') continue;

            // ラウンドロビンで次のバックエンドを選択
            $backend  = $this->selectBackend();

            // バックエンドに転送
            $sent = stream_socket_sendto($this->backendSocket, $data, 0, $backend);

            if ($sent !== false) {
                $this->stats[$backend]['sent']++;
                $this->stats[$backend]['bytes'] += $sent;
                $processed++;

                echo "  [{$processed}] {$client} → {$backend} ({$sent}B)" . PHP_EOL;
            }
        }
    }

    private function selectBackend(): string
    {
        $backend = $this->backends[$this->currentIndex];
        $this->currentIndex = ($this->currentIndex + 1) % count($this->backends);
        return $backend;
    }

    public function printStats(): void
    {
        echo PHP_EOL . "=== バックエンド統計 ===" . PHP_EOL;
        echo str_pad("バックエンド",   22)
           . str_pad("転送数",         10)
           . "転送バイト数" . PHP_EOL;
        echo str_repeat('-', 44) . PHP_EOL;

        foreach ($this->stats as $backend => $stat) {
            echo str_pad($backend,          22)
               . str_pad($stat['sent'],     10)
               . $stat['bytes'] . " bytes" . PHP_EOL;
        }
    }

    public function close(): void
    {
        if (is_resource($this->frontSocket))   fclose($this->frontSocket);
        if (is_resource($this->backendSocket)) fclose($this->backendSocket);
    }
}

// 使用例
$lb = new UdpLoadBalancer('127.0.0.1:19104', [
    '127.0.0.1:19110',
    '127.0.0.1:19111',
    '127.0.0.1:19112',
]);

// テスト用クライアントから9パケット送信
$client = stream_socket_client('udp://0.0.0.0:0', $e, $es, 1);
for ($i = 1; $i <= 9; $i++) {
    stream_socket_sendto($client, "packet#{$i}\n", 0, '127.0.0.1:19104');
    usleep(500);
}
fclose($client);

$lb->process(maxPackets: 9, timeoutSec: 2.0);
$lb->printStats();
$lb->close();

出力例:

UDPロードバランサー起動: 127.0.0.1:19104
バックエンド: 127.0.0.1:19110, 127.0.0.1:19111, 127.0.0.1:19112
  [1] 127.0.0.1:54411 → 127.0.0.1:19110 (9B)
  [2] 127.0.0.1:54411 → 127.0.0.1:19111 (9B)
  [3] 127.0.0.1:54411 → 127.0.0.1:19112 (9B)
  [4] 127.0.0.1:54411 → 127.0.0.1:19110 (9B)
  [5] 127.0.0.1:54411 → 127.0.0.1:19111 (9B)
  [6] 127.0.0.1:54411 → 127.0.0.1:19112 (9B)
  [7] 127.0.0.1:54411 → 127.0.0.1:19110 (9B)
  [8] 127.0.0.1:54411 → 127.0.0.1:19111 (9B)
  [9] 127.0.0.1:54411 → 127.0.0.1:19112 (9B)

=== バックエンド統計 ===
バックエンド           転送数    転送バイト数
--------------------------------------------
127.0.0.1:19110        3         27 bytes
127.0.0.1:19111        3         27 bytes
127.0.0.1:19112        3         27 bytes

例4:大量データを分割して送信するフラグメンテーションセンダー

UDPの最大ペイロードを超えるデータをフラグメント分割して送信します。

<?php

class UdpFragmentSender
{
    private $socket;
    private int   $mtu;
    private int   $headerSize = 12; // fragment_id(4) + seq(2) + total(2) + offset(4)

    public function __construct(int $mtu = 1400)
    {
        $this->mtu    = $mtu;
        $this->socket = stream_socket_client('udp://0.0.0.0:0', $errno, $errstr, 1.0);

        if ($this->socket === false) {
            throw new RuntimeException("ソケット作成失敗 [{$errno}]: {$errstr}");
        }
    }

    /**
     * データを MTU サイズに分割して送信する
     */
    public function sendLarge(string $data, string $target): array
    {
        $chunkSize    = $this->mtu - $this->headerSize;
        $totalLen     = strlen($data);
        $totalChunks  = (int) ceil($totalLen / $chunkSize);
        $fragmentId   = random_int(0, PHP_INT_MAX);

        $sent         = 0;
        $packetsSent  = 0;
        $startTime    = microtime(true);

        for ($seq = 0; $seq < $totalChunks; $seq++) {
            $offset  = $seq * $chunkSize;
            $chunk   = substr($data, $offset, $chunkSize);

            // ヘッダー: fragment_id(4B) + seq(2B) + total(2B) + offset(4B)
            $header  = pack('NnnN', $fragmentId, $seq, $totalChunks, $offset);
            $packet  = $header . $chunk;

            $bytes   = stream_socket_sendto($this->socket, $packet, 0, $target);

            if ($bytes === false) {
                throw new RuntimeException("送信失敗(seq={$seq})");
            }

            $sent        += $bytes;
            $packetsSent++;
        }

        return [
            'original_bytes' => $totalLen,
            'sent_bytes'     => $sent,
            'packets'        => $packetsSent,
            'fragment_id'    => $fragmentId,
            'elapsed_ms'     => round((microtime(true) - $startTime) * 1000, 3),
            'mtu'            => $this->mtu,
        ];
    }

    public function close(): void
    {
        if (is_resource($this->socket)) fclose($this->socket);
    }
}

// 受信側:フラグメントを組み立てる
class UdpFragmentReceiver
{
    private $socket;
    private array $fragments = [];   // fragment_id => [seq => data]
    private array $totals    = [];   // fragment_id => total chunks

    public function __construct(string $address)
    {
        $this->socket = stream_socket_server(
            "udp://{$address}", $errno, $errstr, STREAM_SERVER_BIND
        );
        stream_set_blocking($this->socket, false);
    }

    public function collectAndAssemble(int $expectedPackets, float $timeout = 3.0): ?string
    {
        $deadline  = microtime(true) + $timeout;
        $collected = 0;

        while ($collected < $expectedPackets && microtime(true) < $deadline) {
            $read    = [$this->socket];
            $write   = $except = null;
            if (!stream_select($read, $write, $except, 0, 100_000)) continue;

            $packet = stream_socket_recvfrom($this->socket, 65535, 0, $sender);
            if ($packet === false || strlen($packet) < 12) continue;

            // ヘッダー解析
            $header = unpack('NfragmentId/nseq/ntotal/Noffset', substr($packet, 0, 12));
            $data   = substr($packet, 12);

            $fid    = $header['fragmentId'];
            $seq    = $header['seq'];
            $total  = $header['total'];

            $this->fragments[$fid][$seq] = $data;
            $this->totals[$fid]          = $total;
            $collected++;
        }

        // 組み立て
        foreach ($this->fragments as $fid => $seqMap) {
            $total = $this->totals[$fid];
            if (count($seqMap) < $total) continue;

            ksort($seqMap);
            return implode('', $seqMap);
        }

        return null;
    }

    public function close(): void
    {
        if (is_resource($this->socket)) fclose($this->socket);
    }
}

// 使用例
$receiver = new UdpFragmentReceiver('127.0.0.1:19105');
$sender   = new UdpFragmentSender(mtu: 200);

// 1KBのデータを送信
$original = str_repeat('PHP_UDP_FRAGMENT_', 60); // ~1020B
$result   = $sender->sendLarge($original, '127.0.0.1:19105');

echo "=== フラグメント送信 ===" . PHP_EOL;
echo "元データ    : {$result['original_bytes']} bytes" . PHP_EOL;
echo "送信バイト  : {$result['sent_bytes']} bytes(ヘッダー込み)" . PHP_EOL;
echo "分割パケット: {$result['packets']} 個" . PHP_EOL;
echo "MTU         : {$result['mtu']} bytes" . PHP_EOL;
echo "送信時間    : {$result['elapsed_ms']} ms" . PHP_EOL;

$assembled = $receiver->collectAndAssemble($result['packets'], 2.0);

echo PHP_EOL . "=== フラグメント受信・組み立て ===" . PHP_EOL;
if ($assembled !== null) {
    echo "組み立て成功: " . strlen($assembled) . " bytes" . PHP_EOL;
    echo "データ一致  : " . ($assembled === $original ? '✓' : '✗') . PHP_EOL;
} else {
    echo "組み立て失敗(フラグメント不足)" . PHP_EOL;
}

$sender->close();
$receiver->close();

出力例:

=== フラグメント送信 ===
元データ    : 1020 bytes
送信バイト  : 1152 bytes(ヘッダー込み)
分割パケット: 6 個
MTU         : 200 bytes
送信時間    : 0.412 ms

=== フラグメント受信・組み立て ===
組み立て成功: 1020 bytes
データ一致  : ✓

例5:UDP送信レートリミッターで帯域を制御する

stream_socket_sendto を使いつつ、1秒あたりの送信パケット数を制限します。

<?php

class RateLimitedUdpSender
{
    private $socket;
    private int    $maxPacketsPerSec;
    private float  $windowStart;
    private int    $windowCount  = 0;
    private int    $totalSent    = 0;
    private int    $totalDropped = 0;
    private array  $history      = [];

    public function __construct(int $maxPacketsPerSec = 100)
    {
        $this->maxPacketsPerSec = $maxPacketsPerSec;
        $this->windowStart      = microtime(true);

        $this->socket = stream_socket_client('udp://0.0.0.0:0', $errno, $errstr, 1.0);
        if ($this->socket === false) {
            throw new RuntimeException("ソケット作成失敗 [{$errno}]: {$errstr}");
        }
    }

    /**
     * レート制限付きで送信する
     * 上限を超えた場合は送信せず false を返す
     */
    public function send(string $data, string $target, bool $dropOnLimit = true): int|false
    {
        $now = microtime(true);

        // ウィンドウをリセット(1秒ごと)
        if ($now - $this->windowStart >= 1.0) {
            $this->history[] = [
                'window'  => date('H:i:s', (int) $this->windowStart),
                'sent'    => $this->windowCount,
            ];
            $this->windowStart = $now;
            $this->windowCount = 0;
        }

        // レート超過
        if ($this->windowCount >= $this->maxPacketsPerSec) {
            $this->totalDropped++;

            if ($dropOnLimit) {
                return false; // ドロップ
            }

            // ブロックして待機
            $wait = 1.0 - ($now - $this->windowStart);
            if ($wait > 0) usleep((int)($wait * 1_000_000));
            $this->windowStart = microtime(true);
            $this->windowCount = 0;
        }

        $bytes = stream_socket_sendto($this->socket, $data, 0, $target);

        if ($bytes !== false) {
            $this->windowCount++;
            $this->totalSent++;
        }

        return $bytes;
    }

    public function printStats(): void
    {
        echo "=== 送信レートリミッター統計 ===" . PHP_EOL;
        echo "制限         : {$this->maxPacketsPerSec} パケット/秒" . PHP_EOL;
        echo "送信成功     : {$this->totalSent}"     . PHP_EOL;
        echo "ドロップ     : {$this->totalDropped}"  . PHP_EOL;
        echo "ドロップ率   : " . round($this->totalDropped / max(1, $this->totalSent + $this->totalDropped) * 100, 1) . "%" . PHP_EOL;

        if (!empty($this->history)) {
            echo PHP_EOL . "ウィンドウ別送信数:" . PHP_EOL;
            foreach ($this->history as $entry) {
                $bar = str_repeat('█', min($entry['sent'], 50));
                echo "  [{$entry['window']}] {$bar} {$entry['sent']}" . PHP_EOL;
            }
        }
    }

    public function close(): void
    {
        if (is_resource($this->socket)) fclose($this->socket);
    }
}

// 使用例:200パケットを50/秒制限で送信
$rateSender = new RateLimitedUdpSender(maxPacketsPerSec: 50);

$sent    = 0;
$dropped = 0;

for ($i = 0; $i < 120; $i++) {
    $payload = "pkt#{$i}\n";
    $result  = $rateSender->send($payload, '127.0.0.1:19106', dropOnLimit: true);

    if ($result !== false) {
        $sent++;
    } else {
        $dropped++;
    }
}

echo "送信試行: 120 パケット" . PHP_EOL;
echo "送信成功: {$sent}" . PHP_EOL;
echo "ドロップ: {$dropped}" . PHP_EOL . PHP_EOL;

$rateSender->printStats();
$rateSender->close();

出力例:

送信試行: 120 パケット
送信成功: 50
ドロップ: 70

=== 送信レートリミッター統計 ===
制限         : 50 パケット/秒
送信成功     : 50
ドロップ     : 70
ドロップ率   : 58.3%

例6:UDPメトリクスコレクター ─ StatsD プロトコルで送信する

StatsD 互換のフォーマットでメトリクスを UDP 送信します。APM・監視システムとの連携に使われるパターンです。

<?php

class StatsDClient
{
    private $socket;
    private string $host;
    private int    $port;
    private string $prefix;
    private float  $sampleRate;
    private array  $buffer  = [];
    private int    $bufferMax;

    public function __construct(
        string $host       = '127.0.0.1',
        int    $port       = 8125,
        string $prefix     = '',
        float  $sampleRate = 1.0,
        int    $bufferMax  = 10
    ) {
        $this->host       = $host;
        $this->port       = $port;
        $this->prefix     = $prefix ? rtrim($prefix, '.') . '.' : '';
        $this->sampleRate = $sampleRate;
        $this->bufferMax  = $bufferMax;

        $this->socket = stream_socket_client('udp://0.0.0.0:0', $errno, $errstr, 1.0);
        if ($this->socket === false) {
            throw new RuntimeException("ソケット作成失敗 [{$errno}]: {$errstr}");
        }
    }

    /** カウンターをインクリメント */
    public function increment(string $metric, int $value = 1): void
    {
        $this->send("{$metric}:{$value}|c");
    }

    /** ゲージ値を記録 */
    public function gauge(string $metric, float $value): void
    {
        $this->send("{$metric}:{$value}|g");
    }

    /** タイミング(ミリ秒)を記録 */
    public function timing(string $metric, float $ms): void
    {
        $this->send("{$metric}:{$ms}|ms");
    }

    /** セット(ユニーク数)を記録 */
    public function set(string $metric, string $value): void
    {
        $this->send("{$metric}:{$value}|s");
    }

    /** 実行時間を計測してタイミングを記録するクロージャラッパー */
    public function time(string $metric, callable $callback): mixed
    {
        $start  = microtime(true);
        $result = $callback();
        $this->timing($metric, round((microtime(true) - $start) * 1000, 3));
        return $result;
    }

    /** バッファをフラッシュして一括送信 */
    public function flush(): int
    {
        if (empty($this->buffer)) return 0;

        // StatsD は複数メトリクスを改行区切りで一括送信できる
        $payload = implode("\n", $this->buffer) . "\n";
        $this->buffer = [];

        $target = "{$this->host}:{$this->port}";
        $bytes  = stream_socket_sendto($this->socket, $payload, 0, $target);

        return $bytes !== false ? $bytes : 0;
    }

    private function send(string $metric): void
    {
        // サンプリングレートの適用
        if ($this->sampleRate < 1.0 && (mt_rand() / mt_getrandmax()) > $this->sampleRate) {
            return;
        }

        $rate   = $this->sampleRate < 1.0 ? "|@{$this->sampleRate}" : '';
        $this->buffer[] = "{$this->prefix}{$metric}{$rate}";

        if (count($this->buffer) >= $this->bufferMax) {
            $this->flush();
        }
    }

    public function close(): void
    {
        $this->flush();
        if (is_resource($this->socket)) fclose($this->socket);
    }
}

// 受信側(StatsD サーバーの代わりにメトリクスを表示)
$statsdServer = stream_socket_server('udp://127.0.0.1:19107', $e, $es, STREAM_SERVER_BIND);
stream_set_blocking($statsdServer, false);

// StatsDクライアントで各種メトリクスを送信
$stats = new StatsDClient('127.0.0.1', 19107, 'myapp', sampleRate: 1.0, bufferMax: 5);

// さまざまなメトリクスを記録
$stats->increment('http.request');
$stats->increment('http.request');
$stats->increment('http.error', 0);
$stats->gauge('memory.usage_mb', 128.5);
$stats->timing('db.query_ms', 42.3);
$stats->set('users.active', 'user_1001');
$stats->set('users.active', 'user_1002');

$result = $stats->time('render.time', function () {
    usleep(5000); // 5ms の処理をシミュレート
    return 'rendered';
});

$stats->flush();
$stats->close();

// 受信したメトリクスを表示
echo "=== 受信した StatsD メトリクス ===" . PHP_EOL;
$deadline = microtime(true) + 0.5;
while (microtime(true) < $deadline) {
    $read    = [$statsdServer];
    $write   = $except = null;
    if (!stream_select($read, $write, $except, 0, 50_000)) continue;

    $data = stream_socket_recvfrom($statsdServer, 65535, 0, $sender);
    if ($data === false) continue;

    foreach (explode("\n", trim($data)) as $metric) {
        if ($metric !== '') {
            echo "  {$metric}" . PHP_EOL;
        }
    }
}

fclose($statsdServer);

出力例:

=== 受信した StatsD メトリクス ===
  myapp.http.request:1|c
  myapp.http.request:1|c
  myapp.http.error:0|c
  myapp.memory.usage_mb:128.5|g
  myapp.db.query_ms:42.3|ms
  myapp.users.active:user_1001|s
  myapp.users.active:user_1002|s
  myapp.render.time:5.123|ms

関連する関数との比較

関数役割方向
stream_socket_sendtoUDPデータグラムを任意の宛先に送信送信
stream_socket_recvfromUDPデータグラムを受信(送信元アドレス付き)受信
fwrite接続済みストリームに書き込む送信
stream_socket_clientソケット接続の確立(クライアント側)接続
stream_socket_serverリスニング/バインドソケットの作成待受

fwrite vs stream_socket_sendto

// fwrite:接続済みソケットへの書き込み(宛先は接続時に確定)
$socket = stream_socket_client('udp://192.168.1.1:8080', ...);
fwrite($socket, $data);  // 常に 192.168.1.1:8080 へ

// stream_socket_sendto:呼び出しごとに宛先を変更できる
$socket = stream_socket_client('udp://0.0.0.0:0', ...);
stream_socket_sendto($socket, $data1, 0, '192.168.1.1:8080'); // 宛先A
stream_socket_sendto($socket, $data2, 0, '192.168.1.2:9090'); // 宛先B
stream_socket_sendto($socket, $data3, 0, '10.0.0.1:7070');    // 宛先C
観点stream_socket_sendtofwrite
宛先の柔軟性◎ 毎回指定可能△ 接続時に固定
UDP同報送信◎ 対応△ 宛先固定のため困難
TCP使用△ 基本はUDP向け◎ TCP/UDPどちらでも
記述量やや多いシンプル

よくある注意点・落とし穴

1. 返り値 false と 0 を区別する

送信成功でも 0 バイト送信の場合があります。=== false で失敗を判定してください。

$result = stream_socket_sendto($socket, $data, 0, $target);

// NG:0 バイト送信を失敗と判定してしまう
if (!$result) { echo "失敗"; }

// OK:false のみを失敗と判定
if ($result === false) {
    echo "送信失敗";
} else {
    echo "送信成功: {$result} bytes";
}

2. UDPは65507バイトが最大ペイロード

IPv4ヘッダー(20B)+ UDPヘッダー(8B)= 28B のオーバーヘッドがあり、最大は 65507バイト です。超過するとエラーになります。

// NG:65507バイトを超えるデータを一度に送信
$huge = str_repeat('X', 70000);
stream_socket_sendto($socket, $huge, 0, $target); // 失敗する可能性

// OK:分割して送信(例4のフラグメンテーションを参照)

3. $address を省略するとソケットのデフォルト宛先に送る

$address を空文字または省略すると、stream_socket_client で接続済みの宛先に送信されます。意図しない宛先への送信を防ぐため、必ず明示指定することを推奨します。

$socket = stream_socket_client('udp://192.168.1.1:8080', ...);

stream_socket_sendto($socket, $data, 0, '');            // 192.168.1.1:8080 に送信
stream_socket_sendto($socket, $data, 0, '10.0.0.1:80'); // 10.0.0.1:80 に送信(上書き)

4. ブロードキャストには SO_BROADCAST オプションが必要

255.255.255.255 などのブロードキャストアドレスに送信するには、ソケット拡張で SO_BROADCAST を有効にする必要があります。

// stream_socket_sendto 単体ではブロードキャストは設定できない
// socket拡張と組み合わせる必要がある
$sock = socket_create(AF_INET, SOCK_DGRAM, SOL_UDP);
socket_set_option($sock, SOL_SOCKET, SO_BROADCAST, 1);
socket_sendto($sock, $data, strlen($data), 0, '255.255.255.255', 9);

まとめ

項目内容
関数名stream_socket_sendto(resource $socket, string $data, int $flags, string $address): int|false
主な用途UDPデータグラムの任意宛先への送信
対になる関数stream_socket_recvfrom(受信側)
最大ペイロード65507 バイト(IPv4)
fwrite との違い毎回宛先を変更できる(1ソケットで多宛先対応)
注意点返り値は === false で判定、$address は明示指定を推奨
PHP バージョンPHP 5.1.0 以上

stream_socket_sendto は、1つのソケットから任意の宛先へUDPデータグラムを柔軟に送り分けられる関数です。stream_socket_recvfrom と対になって使うことで、ステートレスなUDPサーバー・クライアントを完全に実装できます。

StatsD・syslog・DNSなど、UDPが活躍する多くのプロトコルでこのパターンが使われています。ぜひ stream_socket_recvfrom とセットで習得し、高パフォーマンスなUDP通信を実装してみてください。

タイトルとURLをコピーしました