Spring Boot + Amazon MQ(ActiveMQ)でメッセージの送受信を実装する

Amazon MQでApache ActiveMQを構築し、Spring Bootアプリケーションから利用する手順を記載します。また、ActiveMQでファイルオーバーが発生した際のアプリケーション側の動作について検証します。
2021.06.03

コンサル部のとばち(@toda_kk)です。

Amazon MQでApache ActiveMQを構築し、Spring Bootアプリケーションから利用する方法について記載します。

意外とまとまった情報が見つからなかったのですが、Amazon MQを利用する場合でも通常のActiveMQと使い勝手は大きくは変わりません。

Amazon MQとAmazon SQSの違いについて

AWSでキューメッセージのサービスといえば、まずAmazon SQSが思い浮かぶでしょう。

SQSはフルマネージドなサービスであり、メッセージブローカーの設定を行う必要がなく、またメッセージのリクエスト数に応じて自動でスケーリングするため、構築や運用が容易です。

ただ、SQSが選択できないケースもあるかと思います。

例えば、メッセージブローカーとしてApache ActiveMQを利用している既存のアプリケーションをAWSに移行したい場合、アプリケーションに手を加えずに済むのでActiveMQのまま使いたいといったケースが考えられます。

また、SQSではマネージドであるがゆえにいくつか制約事項があります。詳細は下記ページをご参照ください。

上記のような事情によりAmazon MQを選択するケースがあります。

Amazon MQは、Apache ActiveMQおよびRabbitMQと互換性のあるメッセージブローカーサービスです。EC2などのサーバーにミドルウェアをインストールすることなく利用できるマネージドサービスです。SQSと異なり、メッセージブローカーの設定を自前で行う必要があり、またスケーリングなど運用面での手間があります。

MQとSQSの違いや使い分けについては、AWS公式のドキュメントでは以下のように説明されています。

Q: Amazon SQS と Amazon MQ の違いは何ですか?

既存のアプリケーションで処理しているメッセージング機能をクラウドにすばやく簡単に移したい場合、Amazon MQ の使用をお勧めします。業界標準の API とプロトコルがサポートされているため、どのような標準に準拠したメッセージブローカーからでも、アプリケーション内のメッセージングコードを書き換えることなく Amazon MQ に切り替えられます。クラウド上でまったく新しいアプリケーションを構築される場合は、Amazon SQS と Amazon SNS のご検討をお勧めします。Amazon SQS と SNS は、ほぼ無制限にスケーリングでき、シンプルで使いやすい API を提供する、軽量な完全マネージド型のメッセージキューサービスおよびトピックサービスです。

Q: Amazon MQ と、自分で管理する Amazon EC2 の ActiveMQ や RabbitMQ はどのように使い分ければよいですか?

メッセージブローカーと基盤となるインフラストラクチャをどこまで詳細に管理したいかによって選択は異なります。Amazon MQ はマネージド型のメッセージブローカーサービスであり、高可用性と耐久性を備えた基盤となるインフラストラクチャのセットアップ、モニタリング、メンテナンス、プロビジョンなどの運用を処理します。運用オーバーヘッドと関連コストの削減を希望する場合、Amazon MQ が適しています。機能や設定のカスタマイズを細かくコントロールしたい場合やカスタムプラグインを使用したい場合には、メッセージブローカーを Amazon EC2 に直接インストールして実行することを検討できます。

Spring Initializarでプロジェクト作成

まず、Spring Bootアプリケーションを用意します。Spring Initializarを使ってライブラリなどが用意された形でプロジェクトを作成してみます。

ActiveMQを扱うためのライブラリがSpring公式で提供されているので、Dependenciesとして指定しておきます。

Spring Initializr

上図の通り、バージョンは Java 11 および Spring Boot 2.5.0 を利用しています。また、他のライブラリは検証用に入れてます。

Amazon MQ(ActiveMQ)の作成

次に、Amazon MQでActiveMQを作成します。設定内容は下記の通りです。後ほどフェイルオーバーの検証もしたいため、デプロイモードをアクティブ/スタンバイにして作成しました。

Amazon MQ

ActiveMQのバージョンは 5.15.15 を利用しています。

今回は検証のためパブリックアクセシビリティを有効にしていますが、本番運用の際は無効にしてVPC内からのみアクセス許可する方が望ましいでしょう。

また、CloudWatch Logsへの全般ログや監査ログの出力も、必要に応じて有効にするかどうか検討しましょう。

ActiveMQを利用したメッセージの送受信

Spring公式ライブラリであるspring-boot-starter-activemqでは、エンドポイントや認証情報を下記のように指定できます。

application.yml

spring:
  activemq:
    broker-url: failover:(ssl://xxxxxxxxxx-1.mq.ap-northeast-1.amazonaws.com:61617,ssl://xxxxxxxxxx-2.mq.ap-northeast-1.amazonaws.com:61617)
    user: user
    password: password

その他の設定項目については公式ドキュメントをご参照ください。

続いて、メッセージングの処理を実装します。今回はSpring公式で提供されているガイドをほとんどそのまま参考にしました。

ソースコード

Application.Java

package com.example.demo;

import com.example.demo.hello.Email;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jms.DefaultJmsListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.support.converter.MappingJackson2MessageConverter;
import org.springframework.jms.support.converter.MessageConverter;
import org.springframework.jms.support.converter.MessageType;

import javax.jms.ConnectionFactory;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Timer;
import java.util.TimerTask;

@SpringBootApplication
@EnableJms
public class Application {

    @Bean
    public JmsListenerContainerFactory<?> myFactory(ConnectionFactory connectionFactory,
                                                    DefaultJmsListenerContainerFactoryConfigurer configurer) {
        var factory = new DefaultJmsListenerContainerFactory();
        configurer.configure(factory, connectionFactory);
        return factory;
    }

    @Bean
    public MessageConverter jacksonJmsMessageConverter() {
        var converter = new MappingJackson2MessageConverter();
        converter.setTargetType(MessageType.TEXT);
        converter.setTypeIdPropertyName("_type");
        return converter;
    }

    public static void main(String[] args) {
        var context = SpringApplication.run(Application.class, args);
        var jmsTemplate = context.getBean(JmsTemplate.class);

        sendQueue(jmsTemplate);
    }

    private static void sendQueue(JmsTemplate jmsTemplate) {
        var sendingTask = new TimerTask() {
            public void run() {
                var zonedDateTime = ZonedDateTime.now();
                var dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm:ss.SSS");
                var now = zonedDateTime.format(dateTimeFormatter);

                System.out.println("Sending an email message.");
                jmsTemplate.convertAndSend("mailbox", new Email("info@example.com", now));
            }
        };
        var timer = new Timer();
        timer.schedule(sendingTask, 0, 10000);
    }
}

Email.Java

package com.example.demo.hello;

public class Email {

    private String to;
    private String body;

    public Email() {
    }

    public Email(String to, String body) {
        this.to = to;
        this.body = body;
    }

    public String getTo() {
        return to;
    }

    public void setTo(String to) {
        this.to = to;
    }

    public String getBody() {
        return body;
    }

    public void setBody(String body) {
        this.body = body;
    }

    @Override
    public String toString() {
        return String.format("Email{to=%s, body=%s}", getTo(), getBody());
    }

}

Receiver.Java

package com.example.demo.hello;

import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

@Component
public class Receiver {

    @JmsListener(destination = "mailbox", containerFactory = "myFactory")
    public void receiveMessage(Email email) {
        System.out.println("Received <" + email + ">");
    }

}

一部だけ内容を変更しており、メッセージの内容として現在日時を送信するように実装しています。ActiveMQでフェイルオーバーが発生した際に、キューが欠損せず正常に処理が実行されるかを確認するためです。

この実装でSpring Bootを起動すると下記のようなログが出力されます。

アプリケーションログ出力

2021-06-01 15:04:07.776  INFO 53464 --- [  restartedMain] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 8080 (http) with context path ''
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.activemq.util.IntrospectionSupport (file:/Users/tobachi/.gradle/caches/modules-2/files-2.1/org.apache.activemq/activemq-client/5.16.2/xxxxxxxxxx/activemq-client-5.16.2.jar) to method sun.security.ssl.SSLSocketImpl.setHost(java.lang.String)
WARNING: Please consider reporting this to the maintainers of org.apache.activemq.util.IntrospectionSupport
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
2021-06-01 15:04:08.452  INFO 53464 --- [ActiveMQ Task-1] o.a.a.t.failover.FailoverTransport       : Successfully connected to ssl://xxxxxxxxxx-1.mq.ap-northeast-1.amazonaws.com:61617
2021-06-01 15:04:08.522  INFO 53464 --- [  restartedMain] com.example.demo.Application             : Started Application in 2.962 seconds (JVM running for 3.784)
2021-06-01 15:04:08.525  INFO 53464 --- [  restartedMain] o.s.b.a.ApplicationAvailabilityBean      : Application availability state LivenessState changed to CORRECT
2021-06-01 15:04:08.527  INFO 53464 --- [  restartedMain] o.s.b.a.ApplicationAvailabilityBean      : Application availability state ReadinessState changed to ACCEPTING_TRAFFIC
2021-06-01 15:04:09.056  INFO 53464 --- [on(1)-127.0.0.1] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring DispatcherServlet 'dispatcherServlet'
2021-06-01 15:04:09.057  INFO 53464 --- [on(1)-127.0.0.1] o.s.web.servlet.DispatcherServlet        : Initializing Servlet 'dispatcherServlet'
2021-06-01 15:04:09.057  INFO 53464 --- [on(1)-127.0.0.1] o.s.web.servlet.DispatcherServlet        : Completed initialization in 0 ms
Sending an email message.
Received <Email{to=info@example.com, body=2021/06/01 15:04:18.532}>
Sending an email message.
Received <Email{to=info@example.com, body=2021/06/01 15:04:28.537}>
Sending an email message.
Received <Email{to=info@example.com, body=2021/06/01 15:04:38.542}>
Sending an email message.

メッセージの送信と受信がしっかり実行されていることがわかります。

ActiveMQのフェイルオーバー

今回はブローカーをアクティブ/スタンバイ構成で作成しました。

この構成だとブローカーは2つ作成されますが、アクティブになる(アクセスできる)のはどちらか一方のみとなります。アクティブなブローカーが何らかの原因でダウンしたとき、スタンバイ状態のブローカーがアクティブとなる仕組みです。

また、ブローカーごとにURIが用意されており、アクティブなブローカーのURIのみが有効になっているような状態です。

上述した application.yml ファイルのように、ActiveMQではエンドポイントを指定する際にfailover:uri1,...,uriNといった形式でURIを記述できるようになっています。この形式で記述しておくことで、ActiveMQのアクティブなブローカーがダウンした際に、キューの欠損が発生しないようにアプリケーション側でフェイルオーバーを待つように対応できます。

では、実際にフェイルオーバーしたときの動きを確認してみます。Amazon MQでは、マネジメントコンソールから「再起動」を実行することで、フェイルオーバーを実現できます。AWS公式のチュートリアルが提供されているので、参考にしてみてください。

フェイルオーバーが発生すると、アプリケーション側で下記のようなログが出力されます。

フェイルオーバー時のログ出力

Sending an email message.
Received <Email{to=info@example.com, body=2021/06/01 15:07:28.615}>
Sending an email message.
Received <Email{to=info@example.com, body=2021/06/01 15:07:38.620}>
2021-06-01 15:07:48.365  WARN 53464 --- [.73.97.75:61617] o.a.a.t.failover.FailoverTransport       : Transport (ssl://xxxxxxxxxx-1.mq.ap-northeast-1.amazonaws.com:61617) failed, attempting to automatically reconnect

java.io.EOFException: null
	at java.base/java.io.DataInputStream.readInt(DataInputStream.java:397) ~[na:na]
	at org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:268) ~[activemq-client-5.16.2.jar:5.16.2]
	at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:240) ~[activemq-client-5.16.2.jar:5.16.2]
	at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:232) ~[activemq-client-5.16.2.jar:5.16.2]
	at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:215) ~[activemq-client-5.16.2.jar:5.16.2]
	at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]

2021-06-01 15:07:48.404  WARN 53464 --- [ActiveMQ Task-3] o.a.a.t.failover.FailoverTransport       : Failed to connect to [ssl://xxxxxxxxxx-2.mq.ap-northeast-1.amazonaws.com:61617, ssl://xxxxxxxxxx-1.mq.ap-northeast-1.amazonaws.com:61617] after: 1 attempt(s) with Connection refused (Connection refused), continuing to retry.
Sending an email message.
2021-06-01 15:07:59.229  INFO 53464 --- [ActiveMQ Task-3] o.a.a.t.failover.FailoverTransport       : Successfully reconnected to ssl://xxxxxxxxxx-2.mq.ap-northeast-1.amazonaws.com:61617
Received <Email{to=info@example.com, body=2021/06/01 15:07:48.624}>
Sending an email message.
Received <Email{to=info@example.com, body=2021/06/01 15:07:59.318}>

ブローカーがダウンしたことを検知し自動的に再接続していることがわかります。また、受信したキューの内容を確認すると、キューの欠損がないままで送受信の処理ができているようです。

SQSではスケーリングをAWS側で管理してくれるのであまり意識する必要がないですが、ActiveMQでもフェイルオーバーの仕組みがちゃんと用意されています。Spring公式のライブラリでもしっかり対応されているため、ある程度の可用性が担保できそうです。

その他の参考ドキュメント

Spring Bootを利用しないJavaコードの場合は、AWS公式ドキュメントの内容が参考になるでしょう。

以上、コンサル部のとばち(@toda_kk)でした。