持続メッセージを発行/購読メッセージングで使用するには、永続的サブスクライバを使用する必要があります。永続的サブスクライバは、クラッシュが発生してもサーバがサブスクライブを記憶しているように、持続名でJMSサーバに登録されます。永続的サブスクライバは、いつでも切断できます。切断中でも、サーバは、サブスクライバの代わりにメッセージを収集します。サブスクライバは、JMSサーバに再接続すると、最後に接続していたときからトピックに送信されたすべてのメッセージ(メッセージの期限により異なります)を受信します。
永続的サブスクライバは、切断時でもリソースを消費するため、アプリケーションにより永続的サブスクライバを適切に管理する必要があります。アプリケーションで永続的サブスクライバを「忘れてしまった」場合、他のサブスクライバの速度が落ち、結果的にサーバリソースを消費することになります。
このアプリケーションのサンプルプログラムは4つあります。
1 初期サブスクライブの作成
永続的サブスクライバには、永続的サブスクライバがサーバに初めて接続するときの「初期サブスクライブ」という概念があります。永続的サブスクライバは、常に同じ方法で接続しますが、最初の接続では、サーバがサブスクライブを認識できるポイントがマークされます。初期サブスクライブ後、JMSは、永続的サブスクライバの今後の配信のために、トピックに送信されたすべてのメッセージを保存します。一時メッセージはサーバクラッシュが発生すると失われますが、この機能を使用すると永続的サブスクライバに持続メッセージが確実に配信されます。
次のサンプルプログラムでは、初期サブスクライブを作成する方法を示します。
通常の非永続的サブスクライバと比較すると、永続的サブスクライバでは2つの初期手順を実行する必要があります。package durable; import javax.naming.InitialContext; import javax.jms.Topic; import javax.jms.Session; import javax.jms.TopicSession; import javax.jms.TopicSubscriber; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; public class Subscribe { public static void main(String[] args) throws Exception { | // get the initial context | InitialContext ctx = new InitialContext(); | | // lookup the topic object | Topic topic = (Topic) ctx.lookup("topic/topic0"); | | // lookup the topic connection factory | TopicConnectionFactory connFactory = (TopicConnectionFactory) ctx. | lookup("topic/connectionFactory"); | | // create a topic connection | TopicConnection topicConn = connFactory.createTopicConnection(); | topicConn.setClientID("durable"); | | // create a topic session | TopicSession topicSession = topicConn.createTopicSession(false, | Session.AUTO_ACKNOWLEDGE); | | // create a topic subscriber | TopicSubscriber topicSubscriber; | topicSubscriber = topicSession.createDurableSubscriber(topic, "mySub"); | | System.out.println("created durable subscriber mySub"); | | // close the topic connection | topicConn.close(); } }
- 接続にClientIDを設定する: ClientIDは、永続的サブスクライバの名前の一部です。任意のセッションを作成する前に、接続にClientIDを設定する必要があります。
- トピックセッションの
createDurableSubscriber
ファクトリメソッドを使用する: このメソッドには、永続的サブスクライバの名前を持つ2つめのパラメータが必要です。2 メッセージの発行
永続的サブスクライバへのメッセージの発行は、非永続的サブスクライバの場合と同じ方法で行われます。 パブリッシャアプリケーションの説明については、他のpub/subの例を参照してください。上のパブリッシャは、オプションのコマンドライン引数を取り、送信されるメッセージの数を示すだけです。このメッセージの数は、ヘッダプロパティ「id」がメッセージの数に設定されたトピックにパブリッシュされます。デフォルトでは、100メッセージがパブリッシュされます。package durable; import javax.naming.InitialContext; import javax.jms.Topic; import javax.jms.Session; import javax.jms.Message; import javax.jms.TopicPublisher; import javax.jms.TopicSession; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; public class Publish { public static void main(String[] args) throws Exception { | // get the initial context | InitialContext ctx = new InitialContext(); | | // lookup the topic object | Topic topic = (Topic) ctx.lookup("topic/topic0"); | | // lookup the topic connection factory | TopicConnectionFactory connFactory = (TopicConnectionFactory) ctx. | lookup("topic/connectionFactory"); | | // create a topic connection | TopicConnection topicConn = connFactory.createTopicConnection(); | | // create a topic session | TopicSession topicSession = topicConn.createTopicSession(false, | Session.AUTO_ACKNOWLEDGE); | | // create a topic publisher | TopicPublisher topicPublisher = topicSession.createPublisher(topic); | | // create a simple message | Message message = topicSession.createMessage(); | | // get the number of messages to publish | int numMsgs = (args.length > 0) ? new Integer(args[0]).intValue() : 100; | | // publish the messages | for (int i=0; i < numMsgs; i++) { | | message.setIntProperty("id", i); | | topicPublisher.publish(message); | } | | System.out.println("published " + numMsgs + " on " + topic); | | // close the topic connection | topicConn.close(); } }3 メッセージの消費
メッセージコンシューマは、非永続的サブスクライバに類似しています。ただし、前に説明したとおり、永続的サブスクライバでは接続ClientIDを設定し、createDurableSubscriber
メソッドを使用してサブスクライバを登録する必要があります。サブスクライブがすでに登録されている場合、サブスクライバは、再接続されます。永続的サブスクライバのメッセージ消費のソースコードは、次のとおりです。
着信メッセージの「id」ヘッダプロパティを参照して、永続的サブスクライバは、メッセージの受信範囲を出力します。サブスクライバはAUTO_ACKNOWLEDGEを使用するため、package durable; import javax.naming.InitialContext; import javax.jms.Topic; import javax.jms.Session; import javax.jms.Message; import javax.jms.TopicSession; import javax.jms.TopicSubscriber; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; public class Consume { public static void main(String[] args) throws Exception { | // get the initial context | InitialContext ctx = new InitialContext(); | | // lookup the topic object | Topic topic = (Topic) ctx.lookup("topic/topic0"); | | // lookup the topic connection factory | TopicConnectionFactory connFactory = (TopicConnectionFactory) ctx. | lookup("topic/connectionFactory"); | | // create a topic connection | TopicConnection topicConn = connFactory.createTopicConnection(); | topicConn.setClientID("durable"); | | // create a topic session | TopicSession topicSession = topicConn.createTopicSession(false, | Session.AUTO_ACKNOWLEDGE); | | // create a topic subscriber | TopicSubscriber topicSubscriber; | topicSubscriber = topicSession.createDurableSubscriber(topic, "mySub"); | | // start the connection | topicConn.start(); | | // get the number of messages to receive | int numMsgs = (args.length > 0) ? new Integer(args[0]).intValue() : 100; | | int firstID = 0, lastID = 0; | | // receive the message | for (int i=0; i < numMsgs; i++) { | | Message message = topicSubscriber.receive(); | | if (i == 0) firstID = message.getIntProperty("id"); | | if (i == numMsgs - 1) lastID = message.getIntProperty("id"); | } | | System.out.println("consumed messages " + firstID + "-" + lastID); | | // close the topic connection | topicConn.close(); } }receive
メソッドが返される直前にセッションによりメッセージが自動的に確認されます。名前の同じ2つの永続的サブスクライバが同時に実行することはできません。接続ClientIDとサブスクライバの名前が同じ永続的サブスクライバがすでにアクティブな場合、
createDurableSubscriber
メソッドによりJMSExceptionがスローされます。4 永続的サブスクライバの登録解除
永続的サブスクライバは、トピックセッションのunsubscribe
メソッドを使用して登録解除されます。前に説明したとおり、システムリソースの浪費を避けるため、永続的サブスクライバを常に整理し、登録解除することは重要です。永続的サブスクライバの登録解除のソースコードは、次のとおりです。
永続的サブスクライバを登録解除する場合、ClientIDは、最初にサブスクライバを作成するときに使用されたものと同じでなければなりません。接続に正しいClientIDが使用する限り、永続的サブスクライバの登録解除には任意のセッションを使用できます。package durable; import javax.naming.InitialContext; import javax.jms.Topic; import javax.jms.Session; import javax.jms.TopicSession; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; public class Unsubscribe { public static void main(String[] args) throws Exception { | // get the initial context | InitialContext ctx = new InitialContext(); | | // lookup the topic object | Topic topic = (Topic) ctx.lookup("topic/topic0"); | | // lookup the topic connection factory | TopicConnectionFactory connFactory = (TopicConnectionFactory) ctx. | lookup("topic/connectionFactory"); | | // create a topic connection | TopicConnection topicConn = connFactory.createTopicConnection(); | topicConn.setClientID("durable"); | | // create a topic session | TopicSession topicSession = topicConn.createTopicSession(false, | Session.AUTO_ACKNOWLEDGE); | | // unsubscribe the durable topic subscriber | topicSession.unsubscribe("mySub"); | | System.out.println("unsubscribed mySub"); | | // close the topic connection | topicConn.close(); } }
Copyright © 2000-2003, Novell, Inc.All rights reserved. |