
ゼロコピー&超低レイテンシのプロセス間通信フレームワーク「iceoryx2」を使ってみる
Introduction
iceoryx2は、
プロセス間通信を可能な限り高速化するために設計されたプロセス間通信(IPC)ライブラリです。
Rustで実装されています。
メッセージキューのように高速で使いやすいのが特徴です。
iceoryx2?
iceoryx2は、Eclipse Foundationが提供する高性能なプロセス間通信(IPC)ライブラリです。
初代iceoryx.ioの後継としてC++からRustに書き直され、
より安全かつハイパフォーマンスになっています。
iceoryx2は共有メモリを使用し、プロセス間でデータのコピーを一切行わず(ゼロコピー通信)、
待機時間なしの並列処理もでき、低レイテンシでの処理が可能です。
また、実行時のサービス検出が可能であり中央管理プロセスも不要で手軽に使用できます。
Pub/Subパターンで配信するため、
従来のやり方と比べて、圧倒的に高速で効率的な通信が可能になります。
iceoryx(初代)との違いは以下。
特徴 | iceoryx | iceoryx2 |
---|---|---|
言語 | C++ | Rust |
安全性 | 手動管理 | コンパイル時保証 |
API | 複雑 | シンプル |
依存関係 | 多い | 最小限 |
なお、現在はpub/subのイベント駆動通信だけですが
今後はクラサバ通信やstreaming通信も開発予定とのことです。
↓はgithubにのっているサンプルです。
Subscriberでは以下のように
指定したサービス名でサービスを取得してsubscriber作成後、
データを待っています。
//subscriber.rs
use core::time::Duration;
use iceoryx2::prelude::*;
const CYCLE_TIME: Duration = Duration::from_secs(1);
fn main() -> Result<(), Box<dyn std::error::Error>> {
let node = NodeBuilder::new().create::<ipc::Service>()?;
let service = node.service_builder(&"My/Funk/ServiceName".try_into()?)
.publish_subscribe::<usize>()
.open_or_create()?;
// サブスクライバー作成
let subscriber = service.subscriber_builder().create()?;
while node.wait(CYCLE_TIME).is_ok() {
while let Some(sample) = subscriber.receive()? {
println!("received: {:?}", *sample);
}
}
Ok(())
}
Publisherでは指定したサービス名でサービスを取得してpublisher作成後、
データを送信しています。
//publisher.rs
use core::time::Duration;
use iceoryx2::prelude::*;
const CYCLE_TIME: Duration = Duration::from_secs(1);
fn main() -> Result<(), Box<dyn std::error::Error>> {
let node = NodeBuilder::new().create::<ipc::Service>()?;
let service = node.service_builder(&"My/Funk/ServiceName".try_into()?)
.publish_subscribe::<usize>()
.open_or_create()?;
// パブリッシャー作成
let publisher = service.publisher_builder().create()?;
// メインループ: 指定したサイクル時間ごとに実行
while node.wait(CYCLE_TIME).is_ok() {
// 未初期化のサンプルを借りる
let sample = publisher.loan_uninit()?;
// サンプルにペイロードを書き込む
let sample = sample.write_payload(1234);
// サンプルを送信
sample.send()?;
}
Ok(())
}
このように簡単に使うことができます。
iceoryx2の特徴であるゼロコピー通信ですが、
従来のIPCの場合、データコピーが以下のように実行されます。
// データが2回コピーされる
Process A → Kernel → Process B
しかしiceoryx2の場合、共有メモリを介してやりとりするので
コピーが発生せず、高速にデータの受け渡しができます。
//データのコピーは0回
Process A → 共有メモリ ← Process B
以下のようにPublisherは共有メモリ領域を借り、
そこにデータを書き込みます。
Subscriberは共有メモリへの参照を取得してデータをうけとるため
コピーは発生しません。
// Publisher
let publisher = service.publisher().create()?;
let mut sample = publisher.loan()?; // 共有メモリ領域を借りる
*sample = MyData { value: 42 }; // 直接共有メモリに書き込む
sample.send()?; // メタデータのみ送信
// Subscriber
let subscriber = service.subscriber().create()?;
while let Some(sample) = subscriber.receive()? {
// sampleは共有メモリへの参照
// データのコピーは発生しない
println!("Value: {}", sample.value);
}
Environment
- MacBook Pro (14-inch, M3, 2023)
- OS : MacOS 14.5
- Rust : 1.83.0
※OrbStackのubuntuで実行
Try
では、iceoryx2を使った通信を試してみます。
速度の比較対象として、Unix Domain Socketを使った通信も実装します。
cargoでプロジェクトを作成したら以下のようにCargo.tomlを記述。
_copyがついているほうがUnix Domain Socketの通信プログラムです。
# Cargo.toml
・・・
[[bin]]
name = "sender"
path = "src/sender.rs"
[[bin]]
name = "receiver"
path = "src/receiver.rs"
[[bin]]
name = "sender_copy"
path = "src/sender_copy.rs"
[[bin]]
name = "receiver_copy"
path = "src/receiver_copy.rs"
[dependencies]
iceoryx2 = "0.6.1"
まずはUnix Domain Socket版から。
受信用・送信用プログラムを以下のように実装。
//receiver_copy.rs
use std::io::Read;
use std::os::unix::net::UnixListener;
use std::time::Instant;
// バッファサイズ
const BUFFER_SIZE: usize = 2000;
const MESSAGE_SIZE: usize = 1_048_576;
fn main() -> Result<(), Box<dyn std::error::Error>> {
// 既存のソケットファイルを削除
let _ = std::fs::remove_file("/tmp/benchmark_socket");
// Unix Domain Socketのリスナーを作成
let listener = UnixListener::bind("/tmp/benchmark_socket")?;
println!("通常IPC Receiver: 接続待機中...");
let (mut stream, _) = listener.accept()?;
println!("接続確立");
let mut buffer = vec![0u8; MESSAGE_SIZE];
// システムの初期化とキャッシュの準備
println!("ウォームアップ中...");
for i in 0..10 {
stream.read_exact(&mut buffer)?;
assert_eq!(
buffer[0],
buffer[MESSAGE_SIZE - 1],
"ウォームアップ中にデータ不整合: メッセージ {}", i
);
}
// 計測開始
let iterations = 1000;
println!("ベンチマーク開始: {} messages × 1MB", iterations);
let start = Instant::now();
// メインのベンチマークループ
for i in 0..iterations {
// データを受信
stream.read_exact(&mut buffer)?;
// データの整合性チェック(最初と最後のバイトが一致することを確認)
assert_eq!(
buffer[0],
buffer[MESSAGE_SIZE - 1],
"データ不整合: メッセージ {}", i
);
if i % 100 == 0 {
println!("受信中: {}/{}", i, iterations);
}
}
// 計測結果の計算と表示
let elapsed = start.elapsed();
let throughput = (MESSAGE_SIZE * iterations) as f64 / elapsed.as_secs_f64() / 1_000_000_000.0;
println!("\n=== 通常IPC 受信結果 ===");
println!("受信完了: {} messages", iterations);
println!("合計時間: {:?}", elapsed);
println!("平均時間/msg: {:?}", elapsed / iterations as u32);
println!("スループット: {:.2} GB/s", throughput);
Ok(())
}
//sender_copy.rs
use std::io::Write;
use std::os::unix::net::UnixStream;
use std::time::Instant;
// バッファサイズ
const BUFFER_SIZE: usize = 2000;
const MESSAGE_SIZE: usize = 1_048_576;
fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("通常IPC Sender: 接続試行中...");
let mut stream = None;
for i in 0..30 {
match UnixStream::connect("/tmp/benchmark_socket") {
Ok(s) => {
stream = Some(s);
println!("接続成功");
break;
}
Err(_) => {
if i % 5 == 0 {
println!("接続待機中... ({}秒)", i);
}
std::thread::sleep(std::time::Duration::from_secs(1));
}
}
}
let mut stream = stream.expect("接続失敗: receiverが起動していることを確認してください");
let mut data = vec![0u8; MESSAGE_SIZE];
//システムの初期化とキャッシュの準備
println!("ウォームアップ中...");
for i in 0..10 {
data[0] = i as u8;
data[MESSAGE_SIZE - 1] = i as u8;
stream.write_all(&data)?;
}
stream.flush()?;
// 計測開始
let iterations = 1000;
println!("ベンチマーク開始: {} messages × 1MB", iterations);
let start = Instant::now();
for i in 0..iterations {
// バッファを再利用してデータを生成
data[0] = i as u8;
data[MESSAGE_SIZE - 1] = i as u8;
// データを送信
stream.write_all(&data)?;
if i % 100 == 0 {
println!("送信中: {}/{}", i, iterations);
}
}
// 最後に1回だけフラッシュ
stream.flush()?;
// 計測結果の計算と表示
let elapsed = start.elapsed();
let throughput = (MESSAGE_SIZE * iterations) as f64 / elapsed.as_secs_f64() / 1_000_000_000.0;
println!("\n=== 通常IPC 送信結果 ===");
println!("送信完了: {} messages", iterations);
println!("合計時間: {:?}", elapsed);
println!("平均時間/msg: {:?}", elapsed / iterations as u32);
println!("スループット: {:.2} GB/s", throughput);
Ok(())
}
実行してみます。
何度か実行した結果、70ms-90msくらいでした。
#Receiver
通常IPC Receiver: 接続待機中...
=== 通常IPC 受信結果 ===
受信完了: 1000 messages
合計時間: 73.471521ms
平均時間/msg: 73.471µs
スループット: 14.27 GB/s
#Sender
通常IPC Sender: 接続試行中...
=== 通常IPC 送信結果 ===
送信完了: 1000 messages
合計時間: 73.662853ms
平均時間/msg: 73.662µs
スループット: 14.23 GB/s
次はiceoryx2の処理を計測してみます。
//receiver.rs
use iceoryx2::prelude::*;
use iceoryx2::config::Config;
use std::time::Instant;
fn main() -> Result<(), Box<dyn std::error::Error>> {
// iceoryx2の設定とノードの初期化
let mut config = Config::default();
let node = NodeBuilder::new()
.config(&config)
.create::<ipc::Service>()?;
// サービスとサブスクライバーの設定
let service = node
.service_builder(&"Benchmark".try_into()?)
.publish_subscribe::<[u8; 1_048_576]>()
.subscriber_max_buffer_size(2000) // バッファサイズ設定
.history_size(1000) // 履歴サイズ設定
.open_or_create()?;
let subscriber = service
.subscriber_builder()
.buffer_size(2000)
.create()?;
println!("iceoryx2 Receiver: 受信待機中...");
let mut count = 0;
let mut start = None;
let target_count = 1000;
let mut warmup_count = 0;
loop {
// バッチ受信
while let Some(sample) = subscriber.receive()? {
if warmup_count < 10 {
warmup_count += 1;
continue;
}
// 最初のメッセージ受信時に計測開始
if start.is_none() {
start = Some(Instant::now());
println!("受信開始");
}
// データの整合性チェック
let data = sample.payload();
assert_eq!(data[0], data[1_048_575], "データ不整合");
count += 1;
if count % 100 == 0 {
println!("受信中: {}/{}", count, target_count);
}
if count >= target_count {
let elapsed = start.unwrap().elapsed();
let throughput = (1_048_576 * count) as f64 / elapsed.as_secs_f64() / 1_000_000_000.0;
println!("\n=== iceoryx2 受信結果 ===");
println!("受信完了: {} messages", count);
println!("合計時間: {:?}", elapsed);
println!("平均時間/msg: {:?}", elapsed / count);
println!("スループット: {:.2} GB/s", throughput);
return Ok(());
}
}
//CPUのスピンループを最適化
std::hint::spin_loop();
}
}
receiverでは、iceoryx2の設定を行いノードとサービスを初期化します。
サービスの設定(メッセージ、バッファサイズなど)をしたあとサブスクライバーを作成し、
(メッセージがきたら)受信を開始します。
※計測は、最初のメッセージ受信時から開始され、1000メッセージ受信完了まで継続
送信側のプログラムです。
//sender.rs
use iceoryx2::prelude::*;
use iceoryx2::config::Config;
use std::time::Instant;
fn main() -> Result<(), Box<dyn std::error::Error>> {
// iceoryx2の設定とノードの初期化
let mut config = Config::default();
let node = NodeBuilder::new()
.config(&config)
.create::<ipc::Service>()?;
// サービスとパブリッシャーの設定
let service = node
.service_builder(&"Benchmark".try_into()?)
.publish_subscribe::<[u8; 1_048_576]>()
.subscriber_max_buffer_size(2000) // バッファサイズを2000メッセージ分に設定
.history_size(1000) // 履歴サイズを1000メッセージ分に設定
.open_or_create()?;
let publisher = service
.publisher_builder()
.max_loaned_samples(100) // 最大100サンプルをローン可能に設定
.create()?;
println!("iceoryx2 Sender: 送信開始...");
std::thread::sleep(std::time::Duration::from_secs(1));
// ウォームアップ: システムの初期化とキャッシュの準備
println!("ウォームアップ中...");
for i in 0..10 {
let mut sample = publisher.loan_uninit()?;
unsafe {
let data_ptr = sample.payload_mut().as_mut_ptr() as *mut [u8; 1_048_576];
let data = &mut *data_ptr;
data[0] = i as u8;
data[1_048_575] = i as u8;
}
let sample = unsafe { sample.assume_init() };
sample.send()?;
}
// ベンチマークの設定
let iterations = 1000;
println!("ベンチマーク開始: {} messages × 1MB", iterations);
// 最初のメッセージ送信の準備
let mut sample = publisher.loan_uninit()?;
unsafe {
let data_ptr = sample.payload_mut().as_mut_ptr() as *mut [u8; 1_048_576];
let data = &mut *data_ptr;
data[0] = 0;
data[1_048_575] = 0;
}
let sample = unsafe { sample.assume_init() };
// 計測開始(最初のメッセージ送信前)
let start = Instant::now();
// 最初のメッセージを送信
sample.send()?;
// 残りのメッセージ送信
for i in 1..iterations {
// サンプルの取得と初期化
let mut sample = publisher.loan_uninit()?;
unsafe {
let data_ptr = sample.payload_mut().as_mut_ptr() as *mut [u8; 1_048_576];
let data = &mut *data_ptr;
data[0] = i as u8;
data[1_048_575] = i as u8;
}
// サンプルの送信
let sample = unsafe { sample.assume_init() };
sample.send()?;
// 進捗表示(100メッセージごと)
if i % 100 == 0 {
println!("送信中: {}/{}", i, iterations);
}
}
// 計測結果の計算と表示
let elapsed = start.elapsed();
let throughput = (1_048_576 * iterations) as f64 / elapsed.as_secs_f64() / 1_000_000_000.0;
println!("\n=== iceoryx2 送信結果 ===");
println!("送信完了: {} messages", iterations);
println!("合計時間: {:?}", elapsed);
println!("平均時間/msg: {:?}", elapsed / iterations);
println!("スループット: {:.2} GB/s", throughput);
std::thread::sleep(std::time::Duration::from_secs(1));
Ok(())
}
sender.rsはiceoryx2のノードとサービスを初期化後に送信処理を実施します。
パブリッシャーを作成し、システムの初期化とキャッシュの準備を行った後に
1000メッセージの送信をします。
そもそも同期処理と非同期処理だから
単純な比較をしてもしょうがないのですが、
さきほどより速いことがわかります。
#Receiver
iceoryx2 Receiver: 受信待機中...
=== iceoryx2 受信結果 ===
受信完了: 1000 messages
合計時間: 11.045864ms
平均時間/msg: 11.045µs
スループット: 94.93 GB/s
#Sender
iceoryx2 Sender: 送信開始...
=== iceoryx2 送信結果 ===
送信完了: 1000 messages
合計時間: 12.397655ms
平均時間/msg: 12.397µs
スループット: 84.58 GB/s
Summary
今回はRustの高速IPCライブラリ、iceoryx2を使ってみました。
これを使用すれば、高速な配信が可能になりそうです。
リアルタイム性が重要なシステムなどで採用を検討してもよいかもしれません。