[Rust] Axumを使ってMomentoにアクセスする

2022.10.14

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

Introduction

この記事では、AWS Lambdaから
キャッシュサービスであるMomentoにアクセスしてみました。
Momentoはサーバレスアーキテクチャとの相性がとてもよいですが、
普通のWebアプリにも簡単に組み込むことができます。

本稿ではRustのWebフレームワークであるAxumを使ってMomentoにアクセスしてみます。

Momento?

ここでも解説しているとおり、
Momentoは、クラウドネイティブなサーバーレスキャッシュサービスです。
設定や管理はすべてMomentoが行い、
最適化やスケールを考慮する必要はありません。
常に最適な構成を提供してくれて、
料金も転送量のみ(送受信1GBあたり0.15USD)です。

プログラムに組み込むことも簡単で、各種SDKが提供されています。
今回はMomentoのCrateをAxumで使います。

Axum?

Axumはactix-webと並んで人気の
Webアプリケーションフレームワークです。
非同期ランタイムのTokioと同じチームが開発しています。

Axumについてはドキュメントなどをご確認ください。

Environment

Momentoはすでに設定済み(認証トークン取得済み)とします。
トークン取得についてはここなどをご確認ください。

  • OS : MacOS 12.4
  • Rust : 1.62.1

Create Application

Axumで雛形作成

最初はcargoでプロジェクトを生成します。

% cargo new axum-example && cd axum-example

Cargo.tomlの依存ライブラリを下記のように指定。

[dependencies]
axum = "0.5.16"
tokio = { version = "1.0", features = ["full"] }
momento = "0.7.4"
serde = "1.0.145"
serde_json = "1.0.86"

次に、src/main.rsでルーティング設定やハンドラーの仮実装をします。
/postにキャッシュ名、キー名、値をJSONフォーマットでPOSTすると
Momentoにキャッシュを登録します。

/get/<キャッシュ名>/キー名
でGETするとMomentoからキャッシュ値の取得をします。

まずはリクエストとレスポンス用の構造体を定義します。

//パスパラメータ用
#[derive(Deserialize)]
struct Params {
    cache_name: String,
    key: String,
}

//JSONリクエストボディ用
#[derive(Debug, Deserialize)]
struct InputBody {
    cache_name: String,
    key: String,
    value: String,
}

//レスポンス用
#[derive(Debug, Serialize)]
struct ResponseBody {
    message: String,
}

GET用とPOST用のハンドラーも定義します。

//GET用
async fn handler_get(
    Path(Params { cache_name, key }): Path<Params>,
) -> impl IntoResponse {
    println!("{:?}", cache_name);
    println!("{:?}", key);

    (
        StatusCode::OK,
        Json(json!({"message":"response!"})),
    )
    
}

//POST用
async fn handler_post(
    Json(input): Json<InputBody>,
) -> impl IntoResponse {

    let cache_name = input.cache_name;
    let key = input.key;
    let value = input.value;

    println!("{:?}", cache_name);
    println!("{:?}", key);
    println!("{:?}", value);

    (
        StatusCode::CREATED,
        Json(json!({"message":"cache set ok!"})),
    )
}

そしてmain関数の定義です。
main関数では、さきほどのハンドラーとURLパスを関連付け、
ルーティング定義をしてサーバを起動します。

#[tokio::main]
async fn main() {
    let app = Router::new()
        .route("/get/:cache_name/:key", get(handler_get))
        .route("/post", post(handler_post));

    // run it
    let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
    println!("listening on {}", addr);
    axum::Server::bind(&addr)
        .serve(app.into_make_service())
        .await
        .unwrap();
}

cargo run でアプリを起動してみます。

% cargo run -p axum-example
    Finished dev [unoptimized + debuginfo] target(s) in 0.27s
     Running `target/debug/axum-example`
listening on 127.0.0.1:3000

この時点でもcurlなどでアクセスすれば結果を取得できます。
では次に各ハンドラーでMomentoにアクセスするように実装します。

Momentoの組み込み

Momentoにアクセスするためのクライアント(SimpleCacheClient)は
Arcを使用してMutexをラップし、
所有権を複数のスレッド間で共有できるようにします。
そして、状態を保持するために使用できるExtension機能を使って再利用します。   

struct State {
    client: SimpleCacheClient,
}

type SharedState = Arc<Mutex<State>>;

なお、現在rcであるAxumのversion 0.6では、 ここにあるように、
extract::Stateをつかってもう少しシンプルに記述できるみたいです。

get/postハンドラーも修正します。
引数にExtensionを受け取るようにして、
そこからMomentoクライアントを取り出して使用します。

async fn handler_get(
    Extension(state): Extension<SharedState>,
    Path(Params { cache_name, key }): Path<Params>,
) -> impl IntoResponse {
    let client = &mut state.lock().await.client;

    println!("{:?}", cache_name);
    println!("{:?}", key);

    let result: MomentoGetResponse = client.get(&cache_name, key.clone()).await.unwrap();

    let msg = format!(
        "key:{} , value:{}",
        key,
        String::from_utf8(result.value).unwrap()
    );
    let response = ResponseBody { message: msg };
    (
        StatusCode::OK,
        Json(response),
    )
    
}

async fn handler_post(
    Extension(state): Extension<SharedState>,
    Json(input): Json<InputBody>,
) -> impl IntoResponse {
    let client = &mut state.lock().await.client;

    let cache_name = input.cache_name;
    let key = input.key;
    let value = input.value;

    println!("{:?}", cache_name);
    println!("{:?}", key);
    println!("{:?}", value);

    client
        .set(&cache_name, key.clone(), value.clone(), None)
        .await
        .unwrap();

    (
        StatusCode::CREATED,
        Json(json!({"message":"cache set ok!"})),
    )
}

main関数も修正します。
init_momento_client関数でMomentoクライアントの初期化を行い、
ExtensionでMomentoクライアントを設定します。

async fn init_momento_client() -> Result<SimpleCacheClient, ()> {
    let env_token = env::var("token").expect("token is undefined.");
    let ttl = 60;

    match SimpleCacheClientBuilder::new(env_token, NonZeroU64::new(ttl).unwrap()) {
        Ok(client) => Ok(client.build()),
        Err(e) => panic!("Error : {:?}", e),
    }
}

#[tokio::main]
async fn main() {
    // build our application with a route
    let moment_cli = init_momento_client().await.unwrap();
    let state = Arc::new(Mutex::new(State { client: moment_cli }));

    let app = Router::new()
        .route("/get/:cache_name/:key", get(handler_get))
        .route("/post", post(handler_post))
        .layer(Extension(state));
・・・

}

最終的なmain.rsは下記のようになります。

use axum::{
    extract::Extension, extract::Path, http::StatusCode, response::IntoResponse, routing::get,
    routing::post, Json, Router,
};
use std::env;
use std::net::SocketAddr;
use std::num::NonZeroU64;
use std::sync::Arc;
use tokio::sync::Mutex;

use serde::{Deserialize, Serialize};
use serde_json::json;

use momento::response::cache_get_response::MomentoGetResponse;
use momento::simple_cache_client::{SimpleCacheClient, SimpleCacheClientBuilder};

#[derive(Deserialize)]
struct Params {
    cache_name: String,
    key: String,
}

#[derive(Debug, Deserialize)]
struct InputBody {
    cache_name: String,
    key: String,
    value: String,
}

#[derive(Debug, Serialize)]
struct ResponseBody {
    message: String,
}

struct State {
    client: SimpleCacheClient,
}

type SharedState = Arc<Mutex<State>>;

async fn init_momento_client() -> Result<SimpleCacheClient, ()> {
    let env_token = env::var("token").expect("token is undefined.");
    let ttl = 60;

    match SimpleCacheClientBuilder::new(env_token, NonZeroU64::new(ttl).unwrap()) {
        Ok(client) => Ok(client.build()),
        Err(e) => panic!("Error : {:?}", e),
    }
}

async fn handler_get(
    Extension(state): Extension<SharedState>,
    Path(Params { cache_name, key }): Path<Params>,
) -> impl IntoResponse {

    let client = &mut state.lock().await.client;

    println!("{:?}", cache_name);
    println!("{:?}", key);

    let result: MomentoGetResponse = client.get(&cache_name, key.clone()).await.unwrap();

    let msg = format!(
        "key:{} , value:{}",
        key,
        String::from_utf8(result.value).unwrap()
    );
    let response = ResponseBody { message: msg };
    (
        StatusCode::OK,
        Json(response),
    )

}

async fn handler_post(
    Extension(state): Extension<SharedState>,
    Json(input): Json<InputBody>,
) -> impl IntoResponse {

    let client = &mut state.lock().await.client;

    let cache_name = input.cache_name;
    let key = input.key;
    let value = input.value;

    println!("{:?}", cache_name);
    println!("{:?}", key);
    println!("{:?}", value);

    client
        .set(&cache_name, key.clone(), value.clone(), None)
        .await
        .unwrap();

    (
        StatusCode::CREATED,
        Json(json!({"message":"cache set ok!"})),
    )
}

#[tokio::main]
async fn main() {

    // build our application with a route
    let moment_cli = init_momento_client().await.unwrap();
    let state = Arc::new(Mutex::new(State { client: moment_cli }));

    let app = Router::new()
        .route("/get/:cache_name/:key", get(handler_get))
        .route("/post", post(handler_post))
        .layer(Extension(state));

    // run it
    let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
    println!("listening on {}", addr);
    axum::Server::bind(&addr)
        .serve(app.into_make_service())
        .await
        .unwrap();
}

動作確認

では動作確認してみましょう。
token環境変数に認証トークンをセットします。

% export token=<Momentoの認証トークン>

アプリを起動します。

% cargo run -p axum-example
    Finished dev [unoptimized + debuginfo] target(s) in 0.27s
     Running `target/debug/axum-example`
listening on 127.0.0.1:3000

サーバが起動したらcurlを使ってキャッシュデータの登録と取得を実行してみましょう。
Momentoにアクセスできていることがわかります。

# Create Cache Data
% curl -X POST -H "Content-Type:application/json" http://127.0.0.1:3000/post -d '{"cache_name":"default-cache","key":"mm_key","value":"mm_value"}'

{"message":"cache set ok!"}%   

# Get Cache Data
% curl http://127.0.0.1:3000/get/default-cache/mm_key
{"message":"key:mm_key , value:mm_value"}%

Summary

今回はAxumからMomentoにアクセスしてみました。
ExtensionをつかってMomentoクライアントを使い回す以外は
普通にMomentoへアクセスするのと変わりはありません。
AWS Lambdaでも通常のWebアプリでも簡単にMomentoを使うことができます。

Momentoセミナーのお知らせ

2022年11月11日(金) 16:00からMomentoのセミナーを開催します。
興味があるかたはぜひご参加ください。

References