Serverless CacheサービスのMomentoがPub/Sub機能を実装します

2023.02.14

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

Introduction

サーバレスキャッシュサービスMomentoの情報です。
最近機能追加がいろいろとされてますが、
(Preview版ながら)Pub/Sub機能が使えるようになりました。
今回はCLIおよびRust用SDKを使って
Pub/Sub機能を試してみます。

※2023年2月現在Pub/Sub機能はPreviewです

Environment

今回試した環境は以下のとおりです。

  • MacBook Pro (13-inch, M1, 2020)
  • OS : MacOS 12.4
  • Rust : 1.66.1
  • Momento CLI : v0.22.1

DevIOのMomento関連記事はここにあるので、
これらもご確認ください。

Setup

例によってMomentoのセットアップです。
このあたりを参考に、
MomentoのCLIインストールと認証トークンを取得しましょう。

認証トークンを取得したらコンソールでCLI用・プログラム用に
トークンをセットしておきます。

# CLI用
% momento configure
Please paste your Momento auth token.  
(If you do not have an auth token, use `momento account` to generate one.)
Windows users: if CTRL-V does not work, try right-click or SHIFT-INSERT to paste.

Token [****]:<認証トークン>

 ・・・

#プログラム用
% export MOMENTO_AUTH_TOKEN = <取得した認証トークン>

Pub/Sub using CLI

ではMomento CLIでPub/Subの動作確認をしてみましょう。

% momento --version
momento 0.29.0

Pub/Sub用のキャッシュを新たに作成します。

% momento cache create --name my_pubsub_cache

momento topic subscribeコマンドでSubscriberを起動します。
さきほど作成したキャッシュを指定し、任意の名前のトピック名を
指定してSubscribeます。

% momento topic subscribe --cache my_pubsub_cache my_topic

別のコンソールを起動し、↑で指定したキャッシュ・トピックに対してpublishします。

% momento topic publish --cache my_pubsub_cache my_topic 'hello' --verbose
・・・

Subscriberをみると、ちゃんとメッセージを受信できてます。

% momento topic subscribe --cache my_pubsub_cache my_topic

hello #受信したメッセージ

CLIでtopicの動作確認ができたので、次はRustSDKで
PublisherとSubsriberを実装してみます。

Implement Pub/Sub using Rust

CLIで動作確認ができたので、
Rust SDKでも実装してみましょう。

Cargoで新規プロジェクトを作成します。

% cargo new momento_pubsub
% cd momento_pubsub

Cargo.tomlを下記のように修正します。
MomentoライブラリやTokioなど、必要ライブラリの設定と、
binセクションでPublisherとSubscriber、
それぞれのプログラムを起動できるようにします。

# Cargo.toml

・・・

[dependencies]
momento = "0.22.1"
tokio = { version = "1.21.2", features = ["full"] }
anyhow = "1.0.66"

[[bin]]
name = "sub"
src  = "src/bin/sub.rs"

[[bin]]
name = "pub"
src  = "src/bin/pub.rs"

まずSubscriber(src/bin/sub.rs)の実装をしましょう。
キャッシュはさきほど作成したものを使うか、新しくcreateします。

// src/bin/sub.rs
use anyhow::self;
use std::env;

use momento::preview::topics::*;

#[tokio::main]
async fn main() -> anyhow::Result<()> {

    let auth_token = env::var("MOMENTO_AUTH_TOKEN").unwrap();
    let cache_name = "<Your demo Cache name for Topic>";
    let topic_name = "my_topic";

    let client = TopicClient::connect(auth_token, 
        None, Some("pubsub-demo")).expect("could not connect");
        
    let mut subscription = client
        .subscribe(cache_name.to_string(), topic_name.to_string(), None)
        .await.expect("subscribe rpc failed");

        
    while let Some(item) = subscription.item().await? {
        match item {
            SubscriptionItem::Value(value) => match value.kind {
                ValueKind::Text(text) => println!("{text}"),
                ValueKind::Binary(binary) => {
                    println!("{{\"kind\": \"binary\", \"length\": {}}}", binary.len())
                }
            },
            SubscriptionItem::Discontinuity(discontinuity) => {
                println!("{discontinuity:?}")
            }
        }
    }    
 
   Ok(())
}

TopicClient::connectでクライアントを作成し、
キャッシュ名とトピック名を指定してsubscribe関数を実行します。
これでデータ待受の準備ができました、

なお、現状ではsubscribe実行後に60秒データがこないと
タイムアウトしてしまいます。
このあたりは今後改善される予定になってます。

次はPublisherの実装(src/bin/pub.rs)です。
TopicClient::connectでクライアントを作成し、
publish関数(もしくはpublish_mut)でデータのpublishを行います。

// src/bin/pub.rs

use anyhow::self;
use std::env;

use momento::preview::topics::*;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let auth_token = env::var("MOMENTO_AUTH_TOKEN").unwrap();
    let cache_name = "demotopics";
    let topic_name = "my_topic";

    let client = TopicClient::connect(auth_token, None, Some("pubsub-demo"))?;
    client.publish(cache_name.to_string(), topic_name.to_string(),
    "Hello from Momento Publisher".to_string()).await?;
    
    println!("publish done.");

    Ok(())
}

では動作確認してみましょう。
まずはSubscriberを実行します。

% cargo run --bin sub

別ウィンドウでPublisherを実行します。

% cargo run --bin pub

publish done

Subscriberのコンソールを確認すると、
メッセージを受信できているのがわかります。

% cargo run --bin sub

hello from publisher

Summary

今回はMomentoの新機能であるPub/Sub機能を試してみました。
まだプレビュー版ですが、基本的な動作確認はできます。
ユーザー側で管理の必要がなく、
スケールも自動で大規模なシステムに対応も可能ですし、
完全従量課金($0.15/GB)になっていますので、
Cache機能 & Pub/Sub機能としてご検討ください。

なお、Momentoについてのお問い合わせはこちらです。
お気軽にお問い合わせください。

Momentoセミナーのお知らせ

2023年2月21日(火) 16:00からMomentoのセミナーを開催します。
興味があるかたはぜひご参加ください。