この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
SQSとJMS
最近はクラウドで分散キューのサービスといえば、Amazon SQSが一般的かと思いますが、ちょっと前までは自前でキューを作ったりしていましたよね。そこで良く使ったのがJMS(Javaメッセージングサービス)実装のActiveMQだったりします。今回は、SQSとJMSを連携させるライブラリを見つけたので使ってみたいと思います。
JMS実装のActiveMQの動作確認
まずは、JMS実装のActiveMQの動作確認してみます。Apache ActiveMQのページからダウンロードして起動しましょう。
早速、Javaからメッセージを送ってみましょう。
package cm;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class MQ2SQSv1 {
private static String QUEUE_NAME = "queue-sample";
private static String MESSAGE_BODY = "Hello JMS World";
public static void main(String[] args) throws Exception {
ActiveMQConnectionFactory connectionFactory =
new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_BROKER_URL);
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(QUEUE_NAME);
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
TextMessage message = session.createTextMessage(MESSAGE_BODY);
producer.send(message);
System.out.println("Sent message: " + message.getJMSMessageID());
session.close();
connection.close();
}
}
動作確認はブラウザから行うことが出来ます。http://localhost:8161/ からアクセスしてみましょう。
ここまでは簡単ですね。
Amazon SQS Java Messaging Libraryを使ってみる
Amazon SQS Java Messaging Libraryは、JMSの記述を使ってSQSとやりとりするライブラリです。以下のURLからダウロードしてjarをビルドします。
github : Amazon SQS Java Messaging Library
開発はEclipse上で行います。AWS Toolkit for Eclipseを入れておくと便利です。
または、AWS SDK for Javaを使ってもOKです。
それでは、JMSの記述を使ってSQSに接続してみましょう。
package cm;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import com.amazon.sqs.javamessaging.SQSConnectionFactory;
import com.amazonaws.auth.profile.ProfileCredentialsProvider;
import com.amazonaws.regions.Region;
import com.amazonaws.regions.Regions;
public class MQ2SQSv2 {
private static String QUEUE_NAME = "queue-sample";
private static String MESSAGE_BODY = "Hello JMS World";
public static void main(String[] args) throws Exception {
SQSConnectionFactory connectionFactory =
SQSConnectionFactory.builder()
.withRegion(Region.getRegion(Regions.AP_NORTHEAST_1))
.withAWSCredentialsProvider(new ProfileCredentialsProvider())
.build();
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(QUEUE_NAME);
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
TextMessage message = session.createTextMessage(MESSAGE_BODY);
producer.send(message);
System.out.println("Sent message: " + message.getJMSMessageID());
session.close();
connection.close();
}
}
実行結果をAWS管理コンソールで確認してみましょう。
イイ感じで飛ばせましたね!ここでひとつモヤモヤしました。ソースに実装クラスをベタ書きしてしまっているのです。JavaEE環境では、ライブラリをJDNIに登録して、プロパティファイルで実装クラスを切り替えることが一般的です。例えば、JDBCドライバとか、コネクションプーリングとか、LDAPとかです。もうちょっと突っ込んでみましょう。
JNDI経由でJMSを呼び出す
まずは基本形からということで、JNDI経由でJMSの実装クラスを読み込んでメッセージを送りたいと思います。
package cm;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;
public class MQ2SQSv3 {
private static String MESSAGE_BODY = "Hello JMS World";
public static void main(String[] args) throws Exception {
Context context = new InitialContext();
System.out.println(context.getEnvironment());
ConnectionFactory connectionFactory
= (ConnectionFactory) context.lookup("ConnectionFactory");
System.out.println(connectionFactory);
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = (Destination)context.lookup("MyQueue");
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
TextMessage message = session.createTextMessage(MESSAGE_BODY);
producer.send(message);
System.out.println("Sent message: " + message.getJMSMessageID());
session.close();
connection.close();
}
}
次に、jndi.propertiesを用意してクラスパスが通る場所においておきます。中身は以下の様な感じです。
java.naming.factory.initial = org.apache.activemq.jndi.ActiveMQInitialContextFactory
java.naming.provider.url = tcp://localhost:61616
queue.MyQueue = queue-sample
実装するとちゃんとActiveMQにメッセージを送っていました。さて次は、JNDIからJMS経由でSQSを呼びましょう。
JNDI経由でSQSを呼ぶ
ここハマりました。ActiveMQInitialContextFactoryクラスでは、ActiveMQConnectionFactoryクラスをベタ書きで呼んでいるため、SQSConnectionFactoryを指定できません。そこで、SQSInitialContextFactoryクラスを実装しました。
package cm;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.StringTokenizer;
import javax.jms.Queue;
import javax.naming.Context;
import javax.naming.NamingException;
import javax.naming.spi.InitialContextFactory;
import org.apache.activemq.jndi.ReadOnlyContext;
import com.amazon.sqs.javamessaging.SQSConnectionFactory;
import com.amazon.sqs.javamessaging.SQSQueueDestination;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.regions.Region;
import com.amazonaws.regions.Regions;
public class SQSInitialContextFactory implements InitialContextFactory {
private static final String[] defaultConnectionFactoryNames = { "ConnectionFactory", "QueueConnectionFactory" };
private String connectionPrefix = "connection.";
private String queuePrefix = "queue.";
private String queueNamePostfix = ".name";
private String queueUrlPostfix = ".url";
public Context getInitialContext(Hashtable environment) throws NamingException {
Map data = new HashMap();
String[] names = getConnectionFactoryNames(environment);
for (int i = 0; i < names.length; i++) {
SQSConnectionFactory factory = null;
String name = names[i];
try {
factory = createConnectionFactory(name, environment);
} catch (Exception e) {
throw new NamingException("Invalid broker URL");
}
data.put(name, factory);
}
createQueues(data, environment);
return createContext(environment, data);
}
public String getQueuePrefix() {
return queuePrefix;
}
public void setQueuePrefix(String queuePrefix) {
this.queuePrefix = queuePrefix;
}
protected ReadOnlyContext createContext(Hashtable environment, Map data) {
return new ReadOnlyContext(environment, data);
}
protected SQSConnectionFactory createConnectionFactory(String name, Hashtable environment)
throws URISyntaxException, ClassNotFoundException, InstantiationException, IllegalAccessException {
Hashtable temp = new Hashtable(environment);
String prefix = connectionPrefix + name + ".";
for (Iterator iter = environment.entrySet().iterator(); iter.hasNext();) {
Map.Entry entry = (Map.Entry) iter.next();
String key = (String) entry.getKey();
if (key.startsWith(prefix)) {
temp.remove(key);
key = key.substring(prefix.length());
temp.put(key, entry.getValue());
}
}
return createConnectionFactory(temp);
}
protected String[] getConnectionFactoryNames(Map environment) {
String factoryNames = (String) environment.get("connectionFactoryNames");
if (factoryNames != null) {
List list = new ArrayList();
for (StringTokenizer enumeration = new StringTokenizer(factoryNames, ","); enumeration.hasMoreTokens();) {
list.add(enumeration.nextToken().trim());
}
int size = list.size();
if (size > 0) {
String[] answer = new String[size];
list.toArray(answer);
return answer;
}
}
return defaultConnectionFactoryNames;
}
protected void createQueues(Map data, Hashtable environment) {
String name = null;
String url = null;
String jndiName = null;
for (Iterator iter = environment.entrySet().iterator(); iter.hasNext();) {
Map.Entry entry = (Map.Entry) iter.next();
String key = entry.getKey().toString();
if (key.startsWith(queuePrefix) && key.endsWith(queueNamePostfix)) {
name = entry.getValue().toString();
jndiName = key.substring(queuePrefix.length()).substring(0, name.length() - queueNamePostfix.length());
// System.out.println(jndiName);
// System.out.println(name);
}
if (key.startsWith(queuePrefix) && key.endsWith(queueUrlPostfix)) {
url = entry.getValue().toString();
// System.out.println(url);
}
}
data.put(jndiName, createQueue(name, url));
}
protected Queue createQueue(String name, String url) {
return new SQSQueueDestination(name, url);
}
protected SQSConnectionFactory createConnectionFactory(Hashtable environment)
throws URISyntaxException, ClassNotFoundException, InstantiationException, IllegalAccessException {
String regionName = "us-east-1";
AWSCredentialsProvider credentialProvider = null;
String providerName = "com.amazonaws.auth.profile.ProfileCredentialsProvider";
for (Iterator iter = environment.entrySet().iterator(); iter.hasNext();) {
Map.Entry entry = (Map.Entry) iter.next();
String key = entry.getKey().toString();
if (key.startsWith("Region")) {
regionName = entry.getValue().toString();
// System.out.println(regionName);
}
if (key.startsWith("AwsCredentialsProvider")) {
Class clazz = Class.forName(entry.getValue().toString());
credentialProvider = (AWSCredentialsProvider) clazz.newInstance();
// System.out.println(credentialProvider);
}
}
Regions region = Regions.fromName(regionName);
SQSConnectionFactory answer
= SQSConnectionFactory.builder()
.withRegion(Region.getRegion(region))
.withAWSCredentialsProvider(credentialProvider).build();
Properties properties = new Properties();
properties.putAll(environment);
return answer;
}
public String getConnectionPrefix() {
return connectionPrefix;
}
public void setConnectionPrefix(String connectionPrefix) {
this.connectionPrefix = connectionPrefix;
}
}
そして、jndi.propertiesファイルの中身は以下です。先ほど自作したSQSInitialContextFactoryを指定して、キューの名前とURLを指定、そして、リージョンとクレデンシャル形式を指定しています。
java.naming.factory.initial = cm.SQSInitialContextFactory
queue.MyQueue.name = queue-sample
queue.MyQueue.url = https://sqs.ap-northeast-1.amazonaws.com/771293814336/queue-sample
Region=ap-northeast-1
AwsCredentialsProvider=com.amazonaws.auth.profile.ProfileCredentialsProvider
まとめ
ソースコードは同じで、設定ファイル(jndi.properties)によってActiveMQとSQSを切り替えることができました!