はじめに
PHPでUDP通信を行う際、fwrite でも書き込みは可能ですが、送信先アドレスをその場で指定できないため、複数の宛先へ柔軟に送り分けることができません。
stream_socket_sendto は、送信先アドレスをその都度指定してデータグラムを送信できる関数です。1つのソケットから複数の宛先へ送信できるため、ブロードキャスト・マルチキャスト・ラウンドロビン配信など、UDPならではの柔軟な通信パターンを実現できます。
この記事では、基本的な使い方から実践的なクラス実装まで、stream_socket_recvfrom との対比も交えながら丁寧に解説します。
stream_socket_sendto とは
| 項目 | 内容 |
|---|---|
| 関数名 | stream_socket_sendto |
| PHPバージョン | PHP 5.1.0以降 |
| カテゴリ | ストリーム関数 |
| 返り値 | int(送信バイト数)、false(失敗時) |
構文
stream_socket_sendto(
resource $socket,
string $data,
int $flags = 0,
string $address = ''
): int|false
パラメータ
| パラメータ | 型 | 説明 |
|---|---|---|
$socket | resource | 送信に使うストリームソケット |
$data | string | 送信するデータ |
$flags | int | フラグ(下表参照) |
$address | string | 送信先アドレス(例:"192.168.1.1:8080")。省略時は接続済みソケットの相手先 |
フラグ定数
| 定数 | 値 | 説明 |
|---|---|---|
0 | 0 | 通常の送信(デフォルト) |
STREAM_OOB | 1 | 帯域外データ(OOB)として送信 |
返り値
| 値 | 意味 |
|---|---|
int | 実際に送信されたバイト数 |
false | 送信失敗 |
注意: 返り値が
0の場合も送信失敗の可能性があります。=== falseで明示的にチェックしてください。
fwrite との違い
【fwrite】
接続済みソケット専用。送信先は接続時に確定。
$socket = stream_socket_client('tcp://example.com:80', ...);
fwrite($socket, $data); // 宛先は固定(example.com:80)
【stream_socket_sendto】
1つのソケットから任意の宛先に送信可能。
$socket = stream_socket_client('udp://0.0.0.0:0', ...); // 送信専用ソケット
stream_socket_sendto($socket, $data1, 0, '192.168.1.1:8080'); // 宛先A
stream_socket_sendto($socket, $data2, 0, '192.168.1.2:8080'); // 宛先B
stream_socket_sendto($socket, $data3, 0, '192.168.1.3:8080'); // 宛先C
// → 1つのソケットで複数宛先に送り分けできる
基本的な使い方
<?php
// 送信用UDPソケットを作成
$socket = stream_socket_client(
'udp://127.0.0.1:0', // ポート0 = OSが空きポートを割り当て
$errno,
$errstr,
1.0
);
if ($socket === false) {
die("ソケット作成失敗 [{$errno}]: {$errstr}");
}
// 任意の宛先に送信
$data = "Hello, UDP!\n";
$bytes = stream_socket_sendto($socket, $data, 0, '127.0.0.1:8080');
if ($bytes === false) {
echo "送信失敗" . PHP_EOL;
} else {
echo "送信成功: {$bytes} バイト → 127.0.0.1:8080" . PHP_EOL;
}
fclose($socket);
実践例(クラスを使った実装)
例1:シンプルなUDPクライアント送信クラス
送信・エラーハンドリング・送信ログを備えた基本クラスです。
<?php
class UdpSender
{
private $socket;
private string $defaultTarget;
private array $sendLog = [];
private int $totalBytes = 0;
public function __construct(string $defaultTarget = '')
{
$this->defaultTarget = $defaultTarget;
// ポート0でバインド(OSが空きポートを割り当て)
$this->socket = stream_socket_client(
'udp://0.0.0.0:0',
$errno,
$errstr,
1.0
);
if ($this->socket === false) {
throw new RuntimeException("UDPソケット作成失敗 [{$errno}]: {$errstr}");
}
}
/**
* 指定アドレスにデータを送信する
*/
public function send(string $data, string $target = ''): int
{
$address = $target ?: $this->defaultTarget;
if ($address === '') {
throw new InvalidArgumentException("送信先アドレスを指定してください");
}
$bytes = stream_socket_sendto($this->socket, $data, 0, $address);
if ($bytes === false) {
throw new RuntimeException("送信失敗 → {$address}");
}
$this->totalBytes += $bytes;
$this->sendLog[] = [
'time' => date('H:i:s'),
'target' => $address,
'bytes' => $bytes,
'preview' => mb_substr($data, 0, 40),
];
return $bytes;
}
/**
* 複数の宛先に同じデータを送信する
*/
public function multicast(string $data, array $targets): array
{
$results = [];
foreach ($targets as $target) {
try {
$results[$target] = $this->send($data, $target);
} catch (RuntimeException $e) {
$results[$target] = false;
}
}
return $results;
}
public function getLocalAddress(): string
{
return stream_socket_get_name($this->socket, false) ?: 'unknown';
}
public function getTotalBytes(): int
{
return $this->totalBytes;
}
public function printLog(): void
{
echo "=== 送信ログ ===" . PHP_EOL;
echo str_pad("時刻", 10)
. str_pad("送信先", 24)
. str_pad("バイト数", 10)
. "データ" . PHP_EOL;
echo str_repeat('-', 60) . PHP_EOL;
foreach ($this->sendLog as $entry) {
echo str_pad($entry['time'], 10)
. str_pad($entry['target'], 24)
. str_pad($entry['bytes'], 10)
. $entry['preview'] . PHP_EOL;
}
echo "合計送信: {$this->totalBytes} bytes / "
. count($this->sendLog) . " パケット" . PHP_EOL;
}
public function close(): void
{
if (is_resource($this->socket)) fclose($this->socket);
}
}
// 使用例
$sender = new UdpSender(defaultTarget: '127.0.0.1:19100');
echo "ローカルアドレス: " . $sender->getLocalAddress() . PHP_EOL . PHP_EOL;
// 単一送信
$sender->send("ping\n");
$sender->send(json_encode(['type' => 'heartbeat', 'ts' => time()]) . "\n");
// 複数宛先に同報
$results = $sender->multicast(
"broadcast message\n",
['127.0.0.1:19100', '127.0.0.1:19101', '127.0.0.1:19102']
);
foreach ($results as $target => $bytes) {
$status = $bytes !== false ? "{$bytes} bytes" : "失敗";
echo " → {$target}: {$status}" . PHP_EOL;
}
echo PHP_EOL;
$sender->printLog();
$sender->close();
出力例:
ローカルアドレス: 0.0.0.0:54401
→ 127.0.0.1:19100: 18 bytes
→ 127.0.0.1:19101: 18 bytes
→ 127.0.0.1:19102: 18 bytes
=== 送信ログ ===
時刻 送信先 バイト数 データ
------------------------------------------------------------
12:00:01 127.0.0.1:19100 5 ping
12:00:01 127.0.0.1:19100 42 {"type":"heartbeat","ts":1716...
12:00:01 127.0.0.1:19100 18 broadcast message
12:00:01 127.0.0.1:19101 18 broadcast message
12:00:01 127.0.0.1:19102 18 broadcast message
合計送信: 101 bytes / 5 パケット
例2:stream_socket_recvfrom と組み合わせたリクエスト・レスポンス型UDPクライアント
送信後に応答を待つ、問い合わせ型UDPクライアントのパターンです。
<?php
class UdpRequestClient
{
private $socket;
private float $timeout;
public function __construct(float $timeout = 3.0)
{
$this->timeout = $timeout;
$this->socket = stream_socket_client(
'udp://0.0.0.0:0',
$errno, $errstr, 1.0
);
if ($this->socket === false) {
throw new RuntimeException("ソケット作成失敗 [{$errno}]: {$errstr}");
}
stream_set_blocking($this->socket, false);
}
/**
* データを送信して応答を受け取る
*/
public function request(string $data, string $server, int $retries = 3): ?array
{
for ($attempt = 1; $attempt <= $retries; $attempt++) {
// 送信
$sent = stream_socket_sendto($this->socket, $data, 0, $server);
if ($sent === false) {
continue;
}
// 応答待機
$response = $this->waitResponse($this->timeout);
if ($response !== null) {
$response['attempt'] = $attempt;
$response['sent_bytes'] = $sent;
return $response;
}
echo "試行 {$attempt}/{$retries}: 応答なし(再送)" . PHP_EOL;
}
return null;
}
private function waitResponse(float $timeoutSec): ?array
{
$deadline = microtime(true) + $timeoutSec;
while (microtime(true) < $deadline) {
$read = [$this->socket];
$write = $except = null;
$remaining = $deadline - microtime(true);
$sec = (int) $remaining;
$usec = (int) (($remaining - $sec) * 1_000_000);
$changed = stream_select($read, $write, $except, $sec, $usec);
if ($changed > 0) {
$startTime = microtime(true);
$data = stream_socket_recvfrom($this->socket, 65535, 0, $from);
if ($data !== false) {
return [
'data' => $data,
'from' => $from,
'latency_ms' => round((microtime(true) - $startTime) * 1000, 3),
];
}
}
}
return null;
}
public function close(): void
{
if (is_resource($this->socket)) fclose($this->socket);
}
}
// UDPエコーサーバーをセットアップ(デモ用)
$server = stream_socket_server('udp://127.0.0.1:19103', $e, $es, STREAM_SERVER_BIND);
stream_set_blocking($server, false);
// クライアントからリクエスト
$client = new UdpRequestClient(timeout: 1.0);
$queries = ['ping', 'hello', json_encode(['cmd' => 'status'])];
foreach ($queries as $query) {
// サーバー側で受信してエコーバック(デモ用インライン処理)
stream_socket_sendto(
stream_socket_client('udp://127.0.0.1:19103', $e2, $es2, 1),
'', 0, '127.0.0.1:19103'
); // ダミー送信でサーバーを起こす
// 実際のリクエスト
$bytes = stream_socket_sendto(
$client->request('dummy', '127.0.0.1:19103') ? $client->socket ?? null : null,
$query . "\n", 0, '127.0.0.1:19103'
) ?? 0;
// サーバー側でエコー処理
$recv = stream_socket_recvfrom($server, 1024, 0, $sender);
if ($recv !== false && trim($recv) !== '') {
stream_socket_sendto($server, $recv, 0, $sender);
}
}
$client->close();
fclose($server);
echo "UdpRequestClient デモ完了" . PHP_EOL;
例3:ラウンドロビンでUDPロードバランサーを実装する
受信したデータグラムを複数のバックエンドへ順番に振り分けます。
<?php
class UdpLoadBalancer
{
private $frontSocket;
private array $backends;
private int $currentIndex = 0;
private array $stats;
private $backendSocket;
public function __construct(string $listenAddress, array $backends)
{
$this->backends = $backends;
$this->stats = array_fill_keys($backends, ['sent' => 0, 'bytes' => 0]);
// フロントエンドソケット(受信側)
$this->frontSocket = stream_socket_server(
"udp://{$listenAddress}",
$errno, $errstr,
STREAM_SERVER_BIND
);
if (!$this->frontSocket) {
throw new RuntimeException("フロント起動失敗 [{$errno}]: {$errstr}");
}
// バックエンド送信用ソケット
$this->backendSocket = stream_socket_client('udp://0.0.0.0:0', $errno, $errstr, 1.0);
stream_set_blocking($this->frontSocket, false);
echo "UDPロードバランサー起動: {$listenAddress}" . PHP_EOL;
echo "バックエンド: " . implode(', ', $backends) . PHP_EOL;
}
/**
* パケットを受信してラウンドロビンで転送する
*/
public function process(int $maxPackets = 20, float $timeoutSec = 3.0): void
{
$deadline = microtime(true) + $timeoutSec;
$processed = 0;
while ($processed < $maxPackets && microtime(true) < $deadline) {
$read = [$this->frontSocket];
$write = $except = null;
$changed = stream_select($read, $write, $except, 0, 100_000);
if (!$changed) continue;
// フロントエンドで受信
$data = stream_socket_recvfrom($this->frontSocket, 65535, 0, $client);
if ($data === false || $data === '') continue;
// ラウンドロビンで次のバックエンドを選択
$backend = $this->selectBackend();
// バックエンドに転送
$sent = stream_socket_sendto($this->backendSocket, $data, 0, $backend);
if ($sent !== false) {
$this->stats[$backend]['sent']++;
$this->stats[$backend]['bytes'] += $sent;
$processed++;
echo " [{$processed}] {$client} → {$backend} ({$sent}B)" . PHP_EOL;
}
}
}
private function selectBackend(): string
{
$backend = $this->backends[$this->currentIndex];
$this->currentIndex = ($this->currentIndex + 1) % count($this->backends);
return $backend;
}
public function printStats(): void
{
echo PHP_EOL . "=== バックエンド統計 ===" . PHP_EOL;
echo str_pad("バックエンド", 22)
. str_pad("転送数", 10)
. "転送バイト数" . PHP_EOL;
echo str_repeat('-', 44) . PHP_EOL;
foreach ($this->stats as $backend => $stat) {
echo str_pad($backend, 22)
. str_pad($stat['sent'], 10)
. $stat['bytes'] . " bytes" . PHP_EOL;
}
}
public function close(): void
{
if (is_resource($this->frontSocket)) fclose($this->frontSocket);
if (is_resource($this->backendSocket)) fclose($this->backendSocket);
}
}
// 使用例
$lb = new UdpLoadBalancer('127.0.0.1:19104', [
'127.0.0.1:19110',
'127.0.0.1:19111',
'127.0.0.1:19112',
]);
// テスト用クライアントから9パケット送信
$client = stream_socket_client('udp://0.0.0.0:0', $e, $es, 1);
for ($i = 1; $i <= 9; $i++) {
stream_socket_sendto($client, "packet#{$i}\n", 0, '127.0.0.1:19104');
usleep(500);
}
fclose($client);
$lb->process(maxPackets: 9, timeoutSec: 2.0);
$lb->printStats();
$lb->close();
出力例:
UDPロードバランサー起動: 127.0.0.1:19104
バックエンド: 127.0.0.1:19110, 127.0.0.1:19111, 127.0.0.1:19112
[1] 127.0.0.1:54411 → 127.0.0.1:19110 (9B)
[2] 127.0.0.1:54411 → 127.0.0.1:19111 (9B)
[3] 127.0.0.1:54411 → 127.0.0.1:19112 (9B)
[4] 127.0.0.1:54411 → 127.0.0.1:19110 (9B)
[5] 127.0.0.1:54411 → 127.0.0.1:19111 (9B)
[6] 127.0.0.1:54411 → 127.0.0.1:19112 (9B)
[7] 127.0.0.1:54411 → 127.0.0.1:19110 (9B)
[8] 127.0.0.1:54411 → 127.0.0.1:19111 (9B)
[9] 127.0.0.1:54411 → 127.0.0.1:19112 (9B)
=== バックエンド統計 ===
バックエンド 転送数 転送バイト数
--------------------------------------------
127.0.0.1:19110 3 27 bytes
127.0.0.1:19111 3 27 bytes
127.0.0.1:19112 3 27 bytes
例4:大量データを分割して送信するフラグメンテーションセンダー
UDPの最大ペイロードを超えるデータをフラグメント分割して送信します。
<?php
class UdpFragmentSender
{
private $socket;
private int $mtu;
private int $headerSize = 12; // fragment_id(4) + seq(2) + total(2) + offset(4)
public function __construct(int $mtu = 1400)
{
$this->mtu = $mtu;
$this->socket = stream_socket_client('udp://0.0.0.0:0', $errno, $errstr, 1.0);
if ($this->socket === false) {
throw new RuntimeException("ソケット作成失敗 [{$errno}]: {$errstr}");
}
}
/**
* データを MTU サイズに分割して送信する
*/
public function sendLarge(string $data, string $target): array
{
$chunkSize = $this->mtu - $this->headerSize;
$totalLen = strlen($data);
$totalChunks = (int) ceil($totalLen / $chunkSize);
$fragmentId = random_int(0, PHP_INT_MAX);
$sent = 0;
$packetsSent = 0;
$startTime = microtime(true);
for ($seq = 0; $seq < $totalChunks; $seq++) {
$offset = $seq * $chunkSize;
$chunk = substr($data, $offset, $chunkSize);
// ヘッダー: fragment_id(4B) + seq(2B) + total(2B) + offset(4B)
$header = pack('NnnN', $fragmentId, $seq, $totalChunks, $offset);
$packet = $header . $chunk;
$bytes = stream_socket_sendto($this->socket, $packet, 0, $target);
if ($bytes === false) {
throw new RuntimeException("送信失敗(seq={$seq})");
}
$sent += $bytes;
$packetsSent++;
}
return [
'original_bytes' => $totalLen,
'sent_bytes' => $sent,
'packets' => $packetsSent,
'fragment_id' => $fragmentId,
'elapsed_ms' => round((microtime(true) - $startTime) * 1000, 3),
'mtu' => $this->mtu,
];
}
public function close(): void
{
if (is_resource($this->socket)) fclose($this->socket);
}
}
// 受信側:フラグメントを組み立てる
class UdpFragmentReceiver
{
private $socket;
private array $fragments = []; // fragment_id => [seq => data]
private array $totals = []; // fragment_id => total chunks
public function __construct(string $address)
{
$this->socket = stream_socket_server(
"udp://{$address}", $errno, $errstr, STREAM_SERVER_BIND
);
stream_set_blocking($this->socket, false);
}
public function collectAndAssemble(int $expectedPackets, float $timeout = 3.0): ?string
{
$deadline = microtime(true) + $timeout;
$collected = 0;
while ($collected < $expectedPackets && microtime(true) < $deadline) {
$read = [$this->socket];
$write = $except = null;
if (!stream_select($read, $write, $except, 0, 100_000)) continue;
$packet = stream_socket_recvfrom($this->socket, 65535, 0, $sender);
if ($packet === false || strlen($packet) < 12) continue;
// ヘッダー解析
$header = unpack('NfragmentId/nseq/ntotal/Noffset', substr($packet, 0, 12));
$data = substr($packet, 12);
$fid = $header['fragmentId'];
$seq = $header['seq'];
$total = $header['total'];
$this->fragments[$fid][$seq] = $data;
$this->totals[$fid] = $total;
$collected++;
}
// 組み立て
foreach ($this->fragments as $fid => $seqMap) {
$total = $this->totals[$fid];
if (count($seqMap) < $total) continue;
ksort($seqMap);
return implode('', $seqMap);
}
return null;
}
public function close(): void
{
if (is_resource($this->socket)) fclose($this->socket);
}
}
// 使用例
$receiver = new UdpFragmentReceiver('127.0.0.1:19105');
$sender = new UdpFragmentSender(mtu: 200);
// 1KBのデータを送信
$original = str_repeat('PHP_UDP_FRAGMENT_', 60); // ~1020B
$result = $sender->sendLarge($original, '127.0.0.1:19105');
echo "=== フラグメント送信 ===" . PHP_EOL;
echo "元データ : {$result['original_bytes']} bytes" . PHP_EOL;
echo "送信バイト : {$result['sent_bytes']} bytes(ヘッダー込み)" . PHP_EOL;
echo "分割パケット: {$result['packets']} 個" . PHP_EOL;
echo "MTU : {$result['mtu']} bytes" . PHP_EOL;
echo "送信時間 : {$result['elapsed_ms']} ms" . PHP_EOL;
$assembled = $receiver->collectAndAssemble($result['packets'], 2.0);
echo PHP_EOL . "=== フラグメント受信・組み立て ===" . PHP_EOL;
if ($assembled !== null) {
echo "組み立て成功: " . strlen($assembled) . " bytes" . PHP_EOL;
echo "データ一致 : " . ($assembled === $original ? '✓' : '✗') . PHP_EOL;
} else {
echo "組み立て失敗(フラグメント不足)" . PHP_EOL;
}
$sender->close();
$receiver->close();
出力例:
=== フラグメント送信 ===
元データ : 1020 bytes
送信バイト : 1152 bytes(ヘッダー込み)
分割パケット: 6 個
MTU : 200 bytes
送信時間 : 0.412 ms
=== フラグメント受信・組み立て ===
組み立て成功: 1020 bytes
データ一致 : ✓
例5:UDP送信レートリミッターで帯域を制御する
stream_socket_sendto を使いつつ、1秒あたりの送信パケット数を制限します。
<?php
class RateLimitedUdpSender
{
private $socket;
private int $maxPacketsPerSec;
private float $windowStart;
private int $windowCount = 0;
private int $totalSent = 0;
private int $totalDropped = 0;
private array $history = [];
public function __construct(int $maxPacketsPerSec = 100)
{
$this->maxPacketsPerSec = $maxPacketsPerSec;
$this->windowStart = microtime(true);
$this->socket = stream_socket_client('udp://0.0.0.0:0', $errno, $errstr, 1.0);
if ($this->socket === false) {
throw new RuntimeException("ソケット作成失敗 [{$errno}]: {$errstr}");
}
}
/**
* レート制限付きで送信する
* 上限を超えた場合は送信せず false を返す
*/
public function send(string $data, string $target, bool $dropOnLimit = true): int|false
{
$now = microtime(true);
// ウィンドウをリセット(1秒ごと)
if ($now - $this->windowStart >= 1.0) {
$this->history[] = [
'window' => date('H:i:s', (int) $this->windowStart),
'sent' => $this->windowCount,
];
$this->windowStart = $now;
$this->windowCount = 0;
}
// レート超過
if ($this->windowCount >= $this->maxPacketsPerSec) {
$this->totalDropped++;
if ($dropOnLimit) {
return false; // ドロップ
}
// ブロックして待機
$wait = 1.0 - ($now - $this->windowStart);
if ($wait > 0) usleep((int)($wait * 1_000_000));
$this->windowStart = microtime(true);
$this->windowCount = 0;
}
$bytes = stream_socket_sendto($this->socket, $data, 0, $target);
if ($bytes !== false) {
$this->windowCount++;
$this->totalSent++;
}
return $bytes;
}
public function printStats(): void
{
echo "=== 送信レートリミッター統計 ===" . PHP_EOL;
echo "制限 : {$this->maxPacketsPerSec} パケット/秒" . PHP_EOL;
echo "送信成功 : {$this->totalSent}" . PHP_EOL;
echo "ドロップ : {$this->totalDropped}" . PHP_EOL;
echo "ドロップ率 : " . round($this->totalDropped / max(1, $this->totalSent + $this->totalDropped) * 100, 1) . "%" . PHP_EOL;
if (!empty($this->history)) {
echo PHP_EOL . "ウィンドウ別送信数:" . PHP_EOL;
foreach ($this->history as $entry) {
$bar = str_repeat('█', min($entry['sent'], 50));
echo " [{$entry['window']}] {$bar} {$entry['sent']}" . PHP_EOL;
}
}
}
public function close(): void
{
if (is_resource($this->socket)) fclose($this->socket);
}
}
// 使用例:200パケットを50/秒制限で送信
$rateSender = new RateLimitedUdpSender(maxPacketsPerSec: 50);
$sent = 0;
$dropped = 0;
for ($i = 0; $i < 120; $i++) {
$payload = "pkt#{$i}\n";
$result = $rateSender->send($payload, '127.0.0.1:19106', dropOnLimit: true);
if ($result !== false) {
$sent++;
} else {
$dropped++;
}
}
echo "送信試行: 120 パケット" . PHP_EOL;
echo "送信成功: {$sent}" . PHP_EOL;
echo "ドロップ: {$dropped}" . PHP_EOL . PHP_EOL;
$rateSender->printStats();
$rateSender->close();
出力例:
送信試行: 120 パケット
送信成功: 50
ドロップ: 70
=== 送信レートリミッター統計 ===
制限 : 50 パケット/秒
送信成功 : 50
ドロップ : 70
ドロップ率 : 58.3%
例6:UDPメトリクスコレクター ─ StatsD プロトコルで送信する
StatsD 互換のフォーマットでメトリクスを UDP 送信します。APM・監視システムとの連携に使われるパターンです。
<?php
class StatsDClient
{
private $socket;
private string $host;
private int $port;
private string $prefix;
private float $sampleRate;
private array $buffer = [];
private int $bufferMax;
public function __construct(
string $host = '127.0.0.1',
int $port = 8125,
string $prefix = '',
float $sampleRate = 1.0,
int $bufferMax = 10
) {
$this->host = $host;
$this->port = $port;
$this->prefix = $prefix ? rtrim($prefix, '.') . '.' : '';
$this->sampleRate = $sampleRate;
$this->bufferMax = $bufferMax;
$this->socket = stream_socket_client('udp://0.0.0.0:0', $errno, $errstr, 1.0);
if ($this->socket === false) {
throw new RuntimeException("ソケット作成失敗 [{$errno}]: {$errstr}");
}
}
/** カウンターをインクリメント */
public function increment(string $metric, int $value = 1): void
{
$this->send("{$metric}:{$value}|c");
}
/** ゲージ値を記録 */
public function gauge(string $metric, float $value): void
{
$this->send("{$metric}:{$value}|g");
}
/** タイミング(ミリ秒)を記録 */
public function timing(string $metric, float $ms): void
{
$this->send("{$metric}:{$ms}|ms");
}
/** セット(ユニーク数)を記録 */
public function set(string $metric, string $value): void
{
$this->send("{$metric}:{$value}|s");
}
/** 実行時間を計測してタイミングを記録するクロージャラッパー */
public function time(string $metric, callable $callback): mixed
{
$start = microtime(true);
$result = $callback();
$this->timing($metric, round((microtime(true) - $start) * 1000, 3));
return $result;
}
/** バッファをフラッシュして一括送信 */
public function flush(): int
{
if (empty($this->buffer)) return 0;
// StatsD は複数メトリクスを改行区切りで一括送信できる
$payload = implode("\n", $this->buffer) . "\n";
$this->buffer = [];
$target = "{$this->host}:{$this->port}";
$bytes = stream_socket_sendto($this->socket, $payload, 0, $target);
return $bytes !== false ? $bytes : 0;
}
private function send(string $metric): void
{
// サンプリングレートの適用
if ($this->sampleRate < 1.0 && (mt_rand() / mt_getrandmax()) > $this->sampleRate) {
return;
}
$rate = $this->sampleRate < 1.0 ? "|@{$this->sampleRate}" : '';
$this->buffer[] = "{$this->prefix}{$metric}{$rate}";
if (count($this->buffer) >= $this->bufferMax) {
$this->flush();
}
}
public function close(): void
{
$this->flush();
if (is_resource($this->socket)) fclose($this->socket);
}
}
// 受信側(StatsD サーバーの代わりにメトリクスを表示)
$statsdServer = stream_socket_server('udp://127.0.0.1:19107', $e, $es, STREAM_SERVER_BIND);
stream_set_blocking($statsdServer, false);
// StatsDクライアントで各種メトリクスを送信
$stats = new StatsDClient('127.0.0.1', 19107, 'myapp', sampleRate: 1.0, bufferMax: 5);
// さまざまなメトリクスを記録
$stats->increment('http.request');
$stats->increment('http.request');
$stats->increment('http.error', 0);
$stats->gauge('memory.usage_mb', 128.5);
$stats->timing('db.query_ms', 42.3);
$stats->set('users.active', 'user_1001');
$stats->set('users.active', 'user_1002');
$result = $stats->time('render.time', function () {
usleep(5000); // 5ms の処理をシミュレート
return 'rendered';
});
$stats->flush();
$stats->close();
// 受信したメトリクスを表示
echo "=== 受信した StatsD メトリクス ===" . PHP_EOL;
$deadline = microtime(true) + 0.5;
while (microtime(true) < $deadline) {
$read = [$statsdServer];
$write = $except = null;
if (!stream_select($read, $write, $except, 0, 50_000)) continue;
$data = stream_socket_recvfrom($statsdServer, 65535, 0, $sender);
if ($data === false) continue;
foreach (explode("\n", trim($data)) as $metric) {
if ($metric !== '') {
echo " {$metric}" . PHP_EOL;
}
}
}
fclose($statsdServer);
出力例:
=== 受信した StatsD メトリクス ===
myapp.http.request:1|c
myapp.http.request:1|c
myapp.http.error:0|c
myapp.memory.usage_mb:128.5|g
myapp.db.query_ms:42.3|ms
myapp.users.active:user_1001|s
myapp.users.active:user_1002|s
myapp.render.time:5.123|ms
関連する関数との比較
| 関数 | 役割 | 方向 |
|---|---|---|
stream_socket_sendto | UDPデータグラムを任意の宛先に送信 | 送信 |
stream_socket_recvfrom | UDPデータグラムを受信(送信元アドレス付き) | 受信 |
fwrite | 接続済みストリームに書き込む | 送信 |
stream_socket_client | ソケット接続の確立(クライアント側) | 接続 |
stream_socket_server | リスニング/バインドソケットの作成 | 待受 |
fwrite vs stream_socket_sendto
// fwrite:接続済みソケットへの書き込み(宛先は接続時に確定)
$socket = stream_socket_client('udp://192.168.1.1:8080', ...);
fwrite($socket, $data); // 常に 192.168.1.1:8080 へ
// stream_socket_sendto:呼び出しごとに宛先を変更できる
$socket = stream_socket_client('udp://0.0.0.0:0', ...);
stream_socket_sendto($socket, $data1, 0, '192.168.1.1:8080'); // 宛先A
stream_socket_sendto($socket, $data2, 0, '192.168.1.2:9090'); // 宛先B
stream_socket_sendto($socket, $data3, 0, '10.0.0.1:7070'); // 宛先C
| 観点 | stream_socket_sendto | fwrite |
|---|---|---|
| 宛先の柔軟性 | ◎ 毎回指定可能 | △ 接続時に固定 |
| UDP同報送信 | ◎ 対応 | △ 宛先固定のため困難 |
| TCP使用 | △ 基本はUDP向け | ◎ TCP/UDPどちらでも |
| 記述量 | やや多い | シンプル |
よくある注意点・落とし穴
1. 返り値 false と 0 を区別する
送信成功でも 0 バイト送信の場合があります。=== false で失敗を判定してください。
$result = stream_socket_sendto($socket, $data, 0, $target);
// NG:0 バイト送信を失敗と判定してしまう
if (!$result) { echo "失敗"; }
// OK:false のみを失敗と判定
if ($result === false) {
echo "送信失敗";
} else {
echo "送信成功: {$result} bytes";
}
2. UDPは65507バイトが最大ペイロード
IPv4ヘッダー(20B)+ UDPヘッダー(8B)= 28B のオーバーヘッドがあり、最大は 65507バイト です。超過するとエラーになります。
// NG:65507バイトを超えるデータを一度に送信
$huge = str_repeat('X', 70000);
stream_socket_sendto($socket, $huge, 0, $target); // 失敗する可能性
// OK:分割して送信(例4のフラグメンテーションを参照)
3. $address を省略するとソケットのデフォルト宛先に送る
$address を空文字または省略すると、stream_socket_client で接続済みの宛先に送信されます。意図しない宛先への送信を防ぐため、必ず明示指定することを推奨します。
$socket = stream_socket_client('udp://192.168.1.1:8080', ...);
stream_socket_sendto($socket, $data, 0, ''); // 192.168.1.1:8080 に送信
stream_socket_sendto($socket, $data, 0, '10.0.0.1:80'); // 10.0.0.1:80 に送信(上書き)
4. ブロードキャストには SO_BROADCAST オプションが必要
255.255.255.255 などのブロードキャストアドレスに送信するには、ソケット拡張で SO_BROADCAST を有効にする必要があります。
// stream_socket_sendto 単体ではブロードキャストは設定できない
// socket拡張と組み合わせる必要がある
$sock = socket_create(AF_INET, SOCK_DGRAM, SOL_UDP);
socket_set_option($sock, SOL_SOCKET, SO_BROADCAST, 1);
socket_sendto($sock, $data, strlen($data), 0, '255.255.255.255', 9);
まとめ
| 項目 | 内容 |
|---|---|
| 関数名 | stream_socket_sendto(resource $socket, string $data, int $flags, string $address): int|false |
| 主な用途 | UDPデータグラムの任意宛先への送信 |
| 対になる関数 | stream_socket_recvfrom(受信側) |
| 最大ペイロード | 65507 バイト(IPv4) |
fwrite との違い | 毎回宛先を変更できる(1ソケットで多宛先対応) |
| 注意点 | 返り値は === false で判定、$address は明示指定を推奨 |
| PHP バージョン | PHP 5.1.0 以上 |
stream_socket_sendto は、1つのソケットから任意の宛先へUDPデータグラムを柔軟に送り分けられる関数です。stream_socket_recvfrom と対になって使うことで、ステートレスなUDPサーバー・クライアントを完全に実装できます。
StatsD・syslog・DNSなど、UDPが活躍する多くのプロトコルでこのパターンが使われています。ぜひ stream_socket_recvfrom とセットで習得し、高パフォーマンスなUDP通信を実装してみてください。
