Amazon SQSとJMS実装のActiveMQをJDNIで切り替えてみる

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

SQSとJMS

最近はクラウドで分散キューのサービスといえば、Amazon SQSが一般的かと思いますが、ちょっと前までは自前でキューを作ったりしていましたよね。そこで良く使ったのがJMS(Javaメッセージングサービス)実装のActiveMQだったりします。今回は、SQSとJMSを連携させるライブラリを見つけたので使ってみたいと思います。

JMS実装のActiveMQの動作確認

まずは、JMS実装のActiveMQの動作確認してみます。Apache ActiveMQのページからダウンロードして起動しましょう。

Apache ActiveMQ

早速、Javaからメッセージを送ってみましょう。

package cm;

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class MQ2SQSv1 {

  private static String QUEUE_NAME = "queue-sample";
  private static String MESSAGE_BODY = "Hello JMS World";

  public static void main(String[] args) throws Exception {
    ActiveMQConnectionFactory connectionFactory = 
      new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_BROKER_URL);
    Connection connection = connectionFactory.createConnection();
    connection.start();
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    Destination destination = session.createQueue(QUEUE_NAME);

    MessageProducer producer = session.createProducer(destination);
    producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
    TextMessage message = session.createTextMessage(MESSAGE_BODY);
    producer.send(message);
    System.out.println("Sent message: " + message.getJMSMessageID());
    session.close();
    connection.close();
  }
}

動作確認はブラウザから行うことが出来ます。http://localhost:8161/ からアクセスしてみましょう。

screenshot 2014-12-31 1.36.29

ここまでは簡単ですね。

Amazon SQS Java Messaging Libraryを使ってみる

Amazon SQS Java Messaging Libraryは、JMSの記述を使ってSQSとやりとりするライブラリです。以下のURLからダウロードしてjarをビルドします。

github : Amazon SQS Java Messaging Library

開発はEclipse上で行います。AWS Toolkit for Eclipseを入れておくと便利です。

AWS Toolkit for Eclipse

または、AWS SDK for Javaを使ってもOKです。

AWS SDK for Java

それでは、JMSの記述を使ってSQSに接続してみましょう。

package cm;

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import com.amazon.sqs.javamessaging.SQSConnectionFactory;
import com.amazonaws.auth.profile.ProfileCredentialsProvider;
import com.amazonaws.regions.Region;
import com.amazonaws.regions.Regions;

public class MQ2SQSv2 {

  private static String QUEUE_NAME = "queue-sample";
  private static String MESSAGE_BODY = "Hello JMS World";

  public static void main(String[] args) throws Exception {
    SQSConnectionFactory connectionFactory = 
      SQSConnectionFactory.builder()
        .withRegion(Region.getRegion(Regions.AP_NORTHEAST_1))
        .withAWSCredentialsProvider(new ProfileCredentialsProvider())
        .build();
    Connection connection = connectionFactory.createConnection();
    connection.start();
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    Destination destination = session.createQueue(QUEUE_NAME);

    MessageProducer producer = session.createProducer(destination);
    producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
    TextMessage message = session.createTextMessage(MESSAGE_BODY);
    producer.send(message);
    System.out.println("Sent message: " + message.getJMSMessageID());
    session.close();
    connection.close();
  }
}

実行結果をAWS管理コンソールで確認してみましょう。

screenshot 2014-12-31 1.51.57

イイ感じで飛ばせましたね!ここでひとつモヤモヤしました。ソースに実装クラスをベタ書きしてしまっているのです。JavaEE環境では、ライブラリをJDNIに登録して、プロパティファイルで実装クラスを切り替えることが一般的です。例えば、JDBCドライバとか、コネクションプーリングとか、LDAPとかです。もうちょっと突っ込んでみましょう。

JNDI経由でJMSを呼び出す

まずは基本形からということで、JNDI経由でJMSの実装クラスを読み込んでメッセージを送りたいと思います。

package cm;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;

public class MQ2SQSv3 {

  private static String MESSAGE_BODY = "Hello JMS World";

  public static void main(String[] args) throws Exception {
    Context context = new InitialContext();
    System.out.println(context.getEnvironment());
    ConnectionFactory connectionFactory 
      = (ConnectionFactory) context.lookup("ConnectionFactory");
    System.out.println(connectionFactory);
    
    Connection connection = connectionFactory.createConnection();
    connection.start();
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    Destination destination = (Destination)context.lookup("MyQueue");
    
    MessageProducer producer = session.createProducer(destination);
    producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
    TextMessage message = session.createTextMessage(MESSAGE_BODY);
    producer.send(message);
    System.out.println("Sent message: " + message.getJMSMessageID());
    session.close();
    connection.close();
  }
}

次に、jndi.propertiesを用意してクラスパスが通る場所においておきます。中身は以下の様な感じです。

java.naming.factory.initial = org.apache.activemq.jndi.ActiveMQInitialContextFactory
java.naming.provider.url = tcp://localhost:61616
queue.MyQueue = queue-sample

実装するとちゃんとActiveMQにメッセージを送っていました。さて次は、JNDIからJMS経由でSQSを呼びましょう。

JNDI経由でSQSを呼ぶ

ここハマりました。ActiveMQInitialContextFactoryクラスでは、ActiveMQConnectionFactoryクラスをベタ書きで呼んでいるため、SQSConnectionFactoryを指定できません。そこで、SQSInitialContextFactoryクラスを実装しました。

package cm;

import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.StringTokenizer;

import javax.jms.Queue;
import javax.naming.Context;
import javax.naming.NamingException;
import javax.naming.spi.InitialContextFactory;

import org.apache.activemq.jndi.ReadOnlyContext;

import com.amazon.sqs.javamessaging.SQSConnectionFactory;
import com.amazon.sqs.javamessaging.SQSQueueDestination;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.regions.Region;
import com.amazonaws.regions.Regions;

public class SQSInitialContextFactory implements InitialContextFactory {

  private static final String[] defaultConnectionFactoryNames = { "ConnectionFactory", "QueueConnectionFactory" };

  private String connectionPrefix = "connection.";
  private String queuePrefix = "queue.";
  private String queueNamePostfix = ".name";
  private String queueUrlPostfix = ".url";

  public Context getInitialContext(Hashtable environment) throws NamingException {
    Map data = new HashMap();
    String[] names = getConnectionFactoryNames(environment);
    for (int i = 0; i < names.length; i++) {
      SQSConnectionFactory factory = null;
      String name = names[i];
      try {
        factory = createConnectionFactory(name, environment);
      } catch (Exception e) {
        throw new NamingException("Invalid broker URL");
      }

      data.put(name, factory);
    }
    createQueues(data, environment);
    return createContext(environment, data);
  }

  public String getQueuePrefix() {
    return queuePrefix;
  }

  public void setQueuePrefix(String queuePrefix) {
    this.queuePrefix = queuePrefix;
  }

  protected ReadOnlyContext createContext(Hashtable environment, Map data) {
    return new ReadOnlyContext(environment, data);
  }

  protected SQSConnectionFactory createConnectionFactory(String name, Hashtable environment) 
    throws URISyntaxException, ClassNotFoundException, InstantiationException, IllegalAccessException {
    Hashtable temp = new Hashtable(environment);
    String prefix = connectionPrefix + name + ".";
    for (Iterator iter = environment.entrySet().iterator(); iter.hasNext();) {
      Map.Entry entry = (Map.Entry) iter.next();
      String key = (String) entry.getKey();
      if (key.startsWith(prefix)) {
        temp.remove(key);
        key = key.substring(prefix.length());
        temp.put(key, entry.getValue());
      }
    }
    return createConnectionFactory(temp);
  }

  protected String[] getConnectionFactoryNames(Map environment) {
    String factoryNames = (String) environment.get("connectionFactoryNames");
    if (factoryNames != null) {
      List list = new ArrayList();
      for (StringTokenizer enumeration = new StringTokenizer(factoryNames, ","); enumeration.hasMoreTokens();) {
        list.add(enumeration.nextToken().trim());
      }
      int size = list.size();
      if (size > 0) {
        String[] answer = new String[size];
        list.toArray(answer);
        return answer;
      }
    }
    return defaultConnectionFactoryNames;
  }

  protected void createQueues(Map data, Hashtable environment) {
    String name = null;
    String url = null;
    String jndiName = null;
    for (Iterator iter = environment.entrySet().iterator(); iter.hasNext();) {
      Map.Entry entry = (Map.Entry) iter.next();
      String key = entry.getKey().toString();
      if (key.startsWith(queuePrefix) && key.endsWith(queueNamePostfix)) {
        name = entry.getValue().toString();
        jndiName = key.substring(queuePrefix.length()).substring(0, name.length() - queueNamePostfix.length());
        // System.out.println(jndiName);
        // System.out.println(name);
      }
      if (key.startsWith(queuePrefix) && key.endsWith(queueUrlPostfix)) {
        url = entry.getValue().toString();
        // System.out.println(url);
      }
    }
    data.put(jndiName, createQueue(name, url));
  }

  protected Queue createQueue(String name, String url) {
    return new SQSQueueDestination(name, url);
  }

  protected SQSConnectionFactory createConnectionFactory(Hashtable environment) 
    throws URISyntaxException, ClassNotFoundException, InstantiationException, IllegalAccessException {
    String regionName = "us-east-1";
    AWSCredentialsProvider credentialProvider = null;
    String providerName = "com.amazonaws.auth.profile.ProfileCredentialsProvider";
    for (Iterator iter = environment.entrySet().iterator(); iter.hasNext();) {
      Map.Entry entry = (Map.Entry) iter.next();
      String key = entry.getKey().toString();
      if (key.startsWith("Region")) {
        regionName = entry.getValue().toString();
        // System.out.println(regionName);
      }
      if (key.startsWith("AwsCredentialsProvider")) {
        Class clazz = Class.forName(entry.getValue().toString());
        credentialProvider = (AWSCredentialsProvider) clazz.newInstance();
        // System.out.println(credentialProvider);
      }
    }
    Regions region = Regions.fromName(regionName);

    SQSConnectionFactory answer 
    = SQSConnectionFactory.builder()
      .withRegion(Region.getRegion(region))
      .withAWSCredentialsProvider(credentialProvider).build();
    Properties properties = new Properties();
    properties.putAll(environment);
    return answer;
  }

  public String getConnectionPrefix() {
    return connectionPrefix;
  }

  public void setConnectionPrefix(String connectionPrefix) {
    this.connectionPrefix = connectionPrefix;
  }

}

そして、jndi.propertiesファイルの中身は以下です。先ほど自作したSQSInitialContextFactoryを指定して、キューの名前とURLを指定、そして、リージョンとクレデンシャル形式を指定しています。

java.naming.factory.initial = cm.SQSInitialContextFactory
queue.MyQueue.name = queue-sample
queue.MyQueue.url = https://sqs.ap-northeast-1.amazonaws.com/771293814336/queue-sample
Region=ap-northeast-1
AwsCredentialsProvider=com.amazonaws.auth.profile.ProfileCredentialsProvider

まとめ

ソースコードは同じで、設定ファイル(jndi.properties)によってActiveMQとSQSを切り替えることができました!

参考資料

Using JMS with Amazon SQS

ActiveMQ - Connectivity > Containers > JNDI Support