Amazon SQSとJMS実装のActiveMQをJDNIで切り替えてみる
SQSとJMS
最近はクラウドで分散キューのサービスといえば、Amazon SQSが一般的かと思いますが、ちょっと前までは自前でキューを作ったりしていましたよね。そこで良く使ったのがJMS(Javaメッセージングサービス)実装のActiveMQだったりします。今回は、SQSとJMSを連携させるライブラリを見つけたので使ってみたいと思います。
JMS実装のActiveMQの動作確認
まずは、JMS実装のActiveMQの動作確認してみます。Apache ActiveMQのページからダウンロードして起動しましょう。
早速、Javaからメッセージを送ってみましょう。
package cm; import javax.jms.Connection; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class MQ2SQSv1 { private static String QUEUE_NAME = "queue-sample"; private static String MESSAGE_BODY = "Hello JMS World"; public static void main(String[] args) throws Exception { ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_BROKER_URL); Connection connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue(QUEUE_NAME); MessageProducer producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); TextMessage message = session.createTextMessage(MESSAGE_BODY); producer.send(message); System.out.println("Sent message: " + message.getJMSMessageID()); session.close(); connection.close(); } }
動作確認はブラウザから行うことが出来ます。http://localhost:8161/ からアクセスしてみましょう。
ここまでは簡単ですね。
Amazon SQS Java Messaging Libraryを使ってみる
Amazon SQS Java Messaging Libraryは、JMSの記述を使ってSQSとやりとりするライブラリです。以下のURLからダウロードしてjarをビルドします。
github : Amazon SQS Java Messaging Library
開発はEclipse上で行います。AWS Toolkit for Eclipseを入れておくと便利です。
または、AWS SDK for Javaを使ってもOKです。
それでは、JMSの記述を使ってSQSに接続してみましょう。
package cm; import javax.jms.Connection; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import com.amazon.sqs.javamessaging.SQSConnectionFactory; import com.amazonaws.auth.profile.ProfileCredentialsProvider; import com.amazonaws.regions.Region; import com.amazonaws.regions.Regions; public class MQ2SQSv2 { private static String QUEUE_NAME = "queue-sample"; private static String MESSAGE_BODY = "Hello JMS World"; public static void main(String[] args) throws Exception { SQSConnectionFactory connectionFactory = SQSConnectionFactory.builder() .withRegion(Region.getRegion(Regions.AP_NORTHEAST_1)) .withAWSCredentialsProvider(new ProfileCredentialsProvider()) .build(); Connection connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue(QUEUE_NAME); MessageProducer producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); TextMessage message = session.createTextMessage(MESSAGE_BODY); producer.send(message); System.out.println("Sent message: " + message.getJMSMessageID()); session.close(); connection.close(); } }
実行結果をAWS管理コンソールで確認してみましょう。
イイ感じで飛ばせましたね!ここでひとつモヤモヤしました。ソースに実装クラスをベタ書きしてしまっているのです。JavaEE環境では、ライブラリをJDNIに登録して、プロパティファイルで実装クラスを切り替えることが一般的です。例えば、JDBCドライバとか、コネクションプーリングとか、LDAPとかです。もうちょっと突っ込んでみましょう。
JNDI経由でJMSを呼び出す
まずは基本形からということで、JNDI経由でJMSの実装クラスを読み込んでメッセージを送りたいと思います。
package cm; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import javax.naming.Context; import javax.naming.InitialContext; public class MQ2SQSv3 { private static String MESSAGE_BODY = "Hello JMS World"; public static void main(String[] args) throws Exception { Context context = new InitialContext(); System.out.println(context.getEnvironment()); ConnectionFactory connectionFactory = (ConnectionFactory) context.lookup("ConnectionFactory"); System.out.println(connectionFactory); Connection connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination = (Destination)context.lookup("MyQueue"); MessageProducer producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); TextMessage message = session.createTextMessage(MESSAGE_BODY); producer.send(message); System.out.println("Sent message: " + message.getJMSMessageID()); session.close(); connection.close(); } }
次に、jndi.propertiesを用意してクラスパスが通る場所においておきます。中身は以下の様な感じです。
java.naming.factory.initial = org.apache.activemq.jndi.ActiveMQInitialContextFactory java.naming.provider.url = tcp://localhost:61616 queue.MyQueue = queue-sample
実装するとちゃんとActiveMQにメッセージを送っていました。さて次は、JNDIからJMS経由でSQSを呼びましょう。
JNDI経由でSQSを呼ぶ
ここハマりました。ActiveMQInitialContextFactoryクラスでは、ActiveMQConnectionFactoryクラスをベタ書きで呼んでいるため、SQSConnectionFactoryを指定できません。そこで、SQSInitialContextFactoryクラスを実装しました。
package cm; import java.net.URISyntaxException; import java.util.ArrayList; import java.util.HashMap; import java.util.Hashtable; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.StringTokenizer; import javax.jms.Queue; import javax.naming.Context; import javax.naming.NamingException; import javax.naming.spi.InitialContextFactory; import org.apache.activemq.jndi.ReadOnlyContext; import com.amazon.sqs.javamessaging.SQSConnectionFactory; import com.amazon.sqs.javamessaging.SQSQueueDestination; import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.regions.Region; import com.amazonaws.regions.Regions; public class SQSInitialContextFactory implements InitialContextFactory { private static final String[] defaultConnectionFactoryNames = { "ConnectionFactory", "QueueConnectionFactory" }; private String connectionPrefix = "connection."; private String queuePrefix = "queue."; private String queueNamePostfix = ".name"; private String queueUrlPostfix = ".url"; public Context getInitialContext(Hashtable environment) throws NamingException { Map data = new HashMap(); String[] names = getConnectionFactoryNames(environment); for (int i = 0; i < names.length; i++) { SQSConnectionFactory factory = null; String name = names[i]; try { factory = createConnectionFactory(name, environment); } catch (Exception e) { throw new NamingException("Invalid broker URL"); } data.put(name, factory); } createQueues(data, environment); return createContext(environment, data); } public String getQueuePrefix() { return queuePrefix; } public void setQueuePrefix(String queuePrefix) { this.queuePrefix = queuePrefix; } protected ReadOnlyContext createContext(Hashtable environment, Map data) { return new ReadOnlyContext(environment, data); } protected SQSConnectionFactory createConnectionFactory(String name, Hashtable environment) throws URISyntaxException, ClassNotFoundException, InstantiationException, IllegalAccessException { Hashtable temp = new Hashtable(environment); String prefix = connectionPrefix + name + "."; for (Iterator iter = environment.entrySet().iterator(); iter.hasNext();) { Map.Entry entry = (Map.Entry) iter.next(); String key = (String) entry.getKey(); if (key.startsWith(prefix)) { temp.remove(key); key = key.substring(prefix.length()); temp.put(key, entry.getValue()); } } return createConnectionFactory(temp); } protected String[] getConnectionFactoryNames(Map environment) { String factoryNames = (String) environment.get("connectionFactoryNames"); if (factoryNames != null) { List list = new ArrayList(); for (StringTokenizer enumeration = new StringTokenizer(factoryNames, ","); enumeration.hasMoreTokens();) { list.add(enumeration.nextToken().trim()); } int size = list.size(); if (size > 0) { String[] answer = new String[size]; list.toArray(answer); return answer; } } return defaultConnectionFactoryNames; } protected void createQueues(Map data, Hashtable environment) { String name = null; String url = null; String jndiName = null; for (Iterator iter = environment.entrySet().iterator(); iter.hasNext();) { Map.Entry entry = (Map.Entry) iter.next(); String key = entry.getKey().toString(); if (key.startsWith(queuePrefix) && key.endsWith(queueNamePostfix)) { name = entry.getValue().toString(); jndiName = key.substring(queuePrefix.length()).substring(0, name.length() - queueNamePostfix.length()); // System.out.println(jndiName); // System.out.println(name); } if (key.startsWith(queuePrefix) && key.endsWith(queueUrlPostfix)) { url = entry.getValue().toString(); // System.out.println(url); } } data.put(jndiName, createQueue(name, url)); } protected Queue createQueue(String name, String url) { return new SQSQueueDestination(name, url); } protected SQSConnectionFactory createConnectionFactory(Hashtable environment) throws URISyntaxException, ClassNotFoundException, InstantiationException, IllegalAccessException { String regionName = "us-east-1"; AWSCredentialsProvider credentialProvider = null; String providerName = "com.amazonaws.auth.profile.ProfileCredentialsProvider"; for (Iterator iter = environment.entrySet().iterator(); iter.hasNext();) { Map.Entry entry = (Map.Entry) iter.next(); String key = entry.getKey().toString(); if (key.startsWith("Region")) { regionName = entry.getValue().toString(); // System.out.println(regionName); } if (key.startsWith("AwsCredentialsProvider")) { Class clazz = Class.forName(entry.getValue().toString()); credentialProvider = (AWSCredentialsProvider) clazz.newInstance(); // System.out.println(credentialProvider); } } Regions region = Regions.fromName(regionName); SQSConnectionFactory answer = SQSConnectionFactory.builder() .withRegion(Region.getRegion(region)) .withAWSCredentialsProvider(credentialProvider).build(); Properties properties = new Properties(); properties.putAll(environment); return answer; } public String getConnectionPrefix() { return connectionPrefix; } public void setConnectionPrefix(String connectionPrefix) { this.connectionPrefix = connectionPrefix; } }
そして、jndi.propertiesファイルの中身は以下です。先ほど自作したSQSInitialContextFactoryを指定して、キューの名前とURLを指定、そして、リージョンとクレデンシャル形式を指定しています。
java.naming.factory.initial = cm.SQSInitialContextFactory queue.MyQueue.name = queue-sample queue.MyQueue.url = https://sqs.ap-northeast-1.amazonaws.com/771293814336/queue-sample Region=ap-northeast-1 AwsCredentialsProvider=com.amazonaws.auth.profile.ProfileCredentialsProvider
まとめ
ソースコードは同じで、設定ファイル(jndi.properties)によってActiveMQとSQSを切り替えることができました!