Introduction
サーバレスキャッシュサービスMomentoの情報です。
最近機能追加がいろいろとされてますが、
(Preview版ながら)Pub/Sub機能が使えるようになりました。
今回はCLIおよびRust用SDKを使って
Pub/Sub機能を試してみます。
※2023年2月現在Pub/Sub機能はPreviewです
Environment
今回試した環境は以下のとおりです。
- MacBook Pro (13-inch, M1, 2020)
- OS : MacOS 12.4
- Rust : 1.66.1
- Momento CLI : v0.22.1
DevIOのMomento関連記事はここにあるので、
これらもご確認ください。
Setup
例によってMomentoのセットアップです。
このあたりを参考に、
MomentoのCLIインストールと認証トークンを取得しましょう。
認証トークンを取得したらコンソールでCLI用・プログラム用に
トークンをセットしておきます。
# CLI用
% momento configure
Please paste your Momento auth token.
(If you do not have an auth token, use `momento account` to generate one.)
Windows users: if CTRL-V does not work, try right-click or SHIFT-INSERT to paste.
Token [****]:<認証トークン>
・・・
#プログラム用
% export MOMENTO_AUTH_TOKEN = <取得した認証トークン>
Pub/Sub using CLI
ではMomento CLIでPub/Subの動作確認をしてみましょう。
% momento --version
momento 0.29.0
Pub/Sub用のキャッシュを新たに作成します。
% momento cache create --name my_pubsub_cache
momento topic subscribeコマンドでSubscriberを起動します。
さきほど作成したキャッシュを指定し、任意の名前のトピック名を
指定してSubscribeます。
% momento topic subscribe --cache my_pubsub_cache my_topic
別のコンソールを起動し、↑で指定したキャッシュ・トピックに対してpublishします。
% momento topic publish --cache my_pubsub_cache my_topic 'hello' --verbose
・・・
Subscriberをみると、ちゃんとメッセージを受信できてます。
% momento topic subscribe --cache my_pubsub_cache my_topic
hello #受信したメッセージ
CLIでtopicの動作確認ができたので、次はRustSDKで
PublisherとSubsriberを実装してみます。
Implement Pub/Sub using Rust
CLIで動作確認ができたので、
Rust SDKでも実装してみましょう。
Cargoで新規プロジェクトを作成します。
% cargo new momento_pubsub
% cd momento_pubsub
Cargo.tomlを下記のように修正します。
MomentoライブラリやTokioなど、必要ライブラリの設定と、
binセクションでPublisherとSubscriber、
それぞれのプログラムを起動できるようにします。
# Cargo.toml
・・・
[dependencies]
momento = "0.22.1"
tokio = { version = "1.21.2", features = ["full"] }
anyhow = "1.0.66"
[[bin]]
name = "sub"
src = "src/bin/sub.rs"
[[bin]]
name = "pub"
src = "src/bin/pub.rs"
まずSubscriber(src/bin/sub.rs)の実装をしましょう。
キャッシュはさきほど作成したものを使うか、新しくcreateします。
// src/bin/sub.rs
use anyhow::self;
use std::env;
use momento::preview::topics::*;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let auth_token = env::var("MOMENTO_AUTH_TOKEN").unwrap();
let cache_name = "<Your demo Cache name for Topic>";
let topic_name = "my_topic";
let client = TopicClient::connect(auth_token,
None, Some("pubsub-demo")).expect("could not connect");
let mut subscription = client
.subscribe(cache_name.to_string(), topic_name.to_string(), None)
.await.expect("subscribe rpc failed");
while let Some(item) = subscription.item().await? {
match item {
SubscriptionItem::Value(value) => match value.kind {
ValueKind::Text(text) => println!("{text}"),
ValueKind::Binary(binary) => {
println!("{{\"kind\": \"binary\", \"length\": {}}}", binary.len())
}
},
SubscriptionItem::Discontinuity(discontinuity) => {
println!("{discontinuity:?}")
}
}
}
Ok(())
}
TopicClient::connectでクライアントを作成し、
キャッシュ名とトピック名を指定してsubscribe関数を実行します。
これでデータ待受の準備ができました、
なお、現状ではsubscribe実行後に60秒データがこないと
タイムアウトしてしまいます。
このあたりは今後改善される予定になってます。
次はPublisherの実装(src/bin/pub.rs)です。
TopicClient::connectでクライアントを作成し、
publish関数(もしくはpublish_mut)でデータのpublishを行います。
// src/bin/pub.rs
use anyhow::self;
use std::env;
use momento::preview::topics::*;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let auth_token = env::var("MOMENTO_AUTH_TOKEN").unwrap();
let cache_name = "demotopics";
let topic_name = "my_topic";
let client = TopicClient::connect(auth_token, None, Some("pubsub-demo"))?;
client.publish(cache_name.to_string(), topic_name.to_string(),
"Hello from Momento Publisher".to_string()).await?;
println!("publish done.");
Ok(())
}
では動作確認してみましょう。
まずはSubscriberを実行します。
% cargo run --bin sub
別ウィンドウでPublisherを実行します。
% cargo run --bin pub
publish done
Subscriberのコンソールを確認すると、
メッセージを受信できているのがわかります。
% cargo run --bin sub
hello from publisher
Summary
今回はMomentoの新機能であるPub/Sub機能を試してみました。
まだプレビュー版ですが、基本的な動作確認はできます。
ユーザー側で管理の必要がなく、
スケールも自動で大規模なシステムに対応も可能ですし、
完全従量課金($0.15/GB)になっていますので、
Cache機能 & Pub/Sub機能としてご検討ください。
なお、Momentoについてのお問い合わせはこちらです。
お気軽にお問い合わせください。
Momentoセミナーのお知らせ
2023年2月21日(火) 16:00からMomentoのセミナーを開催します。
興味があるかたはぜひご参加ください。