Serverless CacheサービスのMomentoがPub/Sub機能を実装します
Introduction
サーバレスキャッシュサービスMomentoの情報です。
最近機能追加がいろいろとされてますが、
(Preview版ながら)Pub/Sub機能が使えるようになりました。
今回はCLIおよびRust用SDKを使って
Pub/Sub機能を試してみます。
※2023年2月現在Pub/Sub機能はPreviewです
Environment
今回試した環境は以下のとおりです。
- MacBook Pro (13-inch, M1, 2020)
- OS : MacOS 12.4
- Rust : 1.66.1
- Momento CLI : v0.22.1
DevIOのMomento関連記事はここにあるので、
これらもご確認ください。
Setup
例によってMomentoのセットアップです。
このあたりを参考に、
MomentoのCLIインストールと認証トークンを取得しましょう。
認証トークンを取得したらコンソールでCLI用・プログラム用に
トークンをセットしておきます。
# CLI用 % momento configure Please paste your Momento auth token. (If you do not have an auth token, use `momento account` to generate one.) Windows users: if CTRL-V does not work, try right-click or SHIFT-INSERT to paste. Token [****]:<認証トークン> ・・・ #プログラム用 % export MOMENTO_AUTH_TOKEN = <取得した認証トークン>
Pub/Sub using CLI
ではMomento CLIでPub/Subの動作確認をしてみましょう。
% momento --version momento 0.29.0
Pub/Sub用のキャッシュを新たに作成します。
% momento cache create --name my_pubsub_cache
momento topic subscribeコマンドでSubscriberを起動します。
さきほど作成したキャッシュを指定し、任意の名前のトピック名を
指定してSubscribeます。
% momento topic subscribe --cache my_pubsub_cache my_topic
別のコンソールを起動し、↑で指定したキャッシュ・トピックに対してpublishします。
% momento topic publish --cache my_pubsub_cache my_topic 'hello' --verbose ・・・
Subscriberをみると、ちゃんとメッセージを受信できてます。
% momento topic subscribe --cache my_pubsub_cache my_topic hello #受信したメッセージ
CLIでtopicの動作確認ができたので、次はRustSDKで
PublisherとSubsriberを実装してみます。
Implement Pub/Sub using Rust
CLIで動作確認ができたので、
Rust SDKでも実装してみましょう。
Cargoで新規プロジェクトを作成します。
% cargo new momento_pubsub % cd momento_pubsub
Cargo.tomlを下記のように修正します。
MomentoライブラリやTokioなど、必要ライブラリの設定と、
binセクションでPublisherとSubscriber、
それぞれのプログラムを起動できるようにします。
# Cargo.toml ・・・ [dependencies] momento = "0.22.1" tokio = { version = "1.21.2", features = ["full"] } anyhow = "1.0.66" [[bin]] name = "sub" src = "src/bin/sub.rs" [[bin]] name = "pub" src = "src/bin/pub.rs"
まずSubscriber(src/bin/sub.rs)の実装をしましょう。
キャッシュはさきほど作成したものを使うか、新しくcreateします。
// src/bin/sub.rs use anyhow::self; use std::env; use momento::preview::topics::*; #[tokio::main] async fn main() -> anyhow::Result<()> { let auth_token = env::var("MOMENTO_AUTH_TOKEN").unwrap(); let cache_name = "<Your demo Cache name for Topic>"; let topic_name = "my_topic"; let client = TopicClient::connect(auth_token, None, Some("pubsub-demo")).expect("could not connect"); let mut subscription = client .subscribe(cache_name.to_string(), topic_name.to_string(), None) .await.expect("subscribe rpc failed"); while let Some(item) = subscription.item().await? { match item { SubscriptionItem::Value(value) => match value.kind { ValueKind::Text(text) => println!("{text}"), ValueKind::Binary(binary) => { println!("{{\"kind\": \"binary\", \"length\": {}}}", binary.len()) } }, SubscriptionItem::Discontinuity(discontinuity) => { println!("{discontinuity:?}") } } } Ok(()) }
TopicClient::connectでクライアントを作成し、
キャッシュ名とトピック名を指定してsubscribe関数を実行します。
これでデータ待受の準備ができました、
なお、現状ではsubscribe実行後に60秒データがこないと
タイムアウトしてしまいます。
このあたりは今後改善される予定になってます。
次はPublisherの実装(src/bin/pub.rs)です。
TopicClient::connectでクライアントを作成し、
publish関数(もしくはpublish_mut)でデータのpublishを行います。
// src/bin/pub.rs use anyhow::self; use std::env; use momento::preview::topics::*; #[tokio::main] async fn main() -> anyhow::Result<()> { let auth_token = env::var("MOMENTO_AUTH_TOKEN").unwrap(); let cache_name = "demotopics"; let topic_name = "my_topic"; let client = TopicClient::connect(auth_token, None, Some("pubsub-demo"))?; client.publish(cache_name.to_string(), topic_name.to_string(), "Hello from Momento Publisher".to_string()).await?; println!("publish done."); Ok(()) }
では動作確認してみましょう。
まずはSubscriberを実行します。
% cargo run --bin sub
別ウィンドウでPublisherを実行します。
% cargo run --bin pub publish done
Subscriberのコンソールを確認すると、
メッセージを受信できているのがわかります。
% cargo run --bin sub hello from publisher
Summary
今回はMomentoの新機能であるPub/Sub機能を試してみました。
まだプレビュー版ですが、基本的な動作確認はできます。
ユーザー側で管理の必要がなく、
スケールも自動で大規模なシステムに対応も可能ですし、
完全従量課金($0.15/GB)になっていますので、
Cache機能 & Pub/Sub機能としてご検討ください。
なお、Momentoについてのお問い合わせはこちらです。
お気軽にお問い合わせください。
Momentoセミナーのお知らせ
2023年2月21日(火) 16:00からMomentoのセミナーを開催します。
興味があるかたはぜひご参加ください。