По принципу работы топики похожи на очереди, однако их главной особенностью является множество конечных точек для обработчиков – они называются подписками (Subscriptions), и такая модель называется publish/subscribe messaging communication – каждый потребитель-обработчик “подписывается” на подписку. Каждая подписка имеет доступ ко всем добавленным сообщениям, при этом механизм фильтрации позволяет им определить, доступно ли сообщение через подписку или нет, и если доступно, то обработчик забирает сообщение из подписки аналогичным очередям образом. Таким образом, можно реализовать распространение одного и того же сообщения на несколько подписок. Механизм подписок использует принцип работы FIFO и представлен на рис.3.
Рис.3. Принцип работы подписок (Subscription) Service Bus
Топики и очереди позволяют разрабатывать более слабо связанные приложения, с временной независимостью (когда отправитель и обработчик сообщения не обязаны быть в он-лайн одновременно), балансировкой нагрузки.
Для того, чтобы начать работу с Service Bus в Windows Azure, необходимо создать собственно именованный сервис (пространство имен), которое будет предоставлять контейнер для Service Bus.
1) Зайдите на портал управления Windows Azure (http://windows.azure.com)
2) В левом нижнем углу портала управления нажмите Service Bus, Access Control & Caching, после чего нажмите на Service Bus и нажмите в левом верхнем углу портала управления кнопку New. (рис.4).
Рис.4. Интерфейс портала управления Windows Azure, управление Service Bus
3) В появившемся диалоговом окне Create a new Service Namespace введите значение Namespace и нажмите Check Availability – в этот момент введенное значение будет проверено на уникальность в пределах платформы(рис.5).
Рис 5.Интерфейс портала управления Windows Azure, создание контейнера Service Bus
4) Выберите соответствующие значения региона расположения вашего контейнера (лучше, если это будет тот же регион, в котором вы используете ваше приложение) и нажмите Create Namespace. Контейнер создан, дождитесь появления статуса Active (рис.6).
Рис 6.Интерфейс портала управления Windows Azure, список контейнеров Service Bus
Для управления вашей Service Bus вам необходимо получить определенные данные. В панели слева нажмите на Service Bus и выберите созданный только что контейнер (рис.7).
Рис 7.Интерфейс портала управления Windows Azure, список контейнеров Service Bus
В правой панели свойств вы увидите необходимые вам данные – в том числе Default Key. Нажмите на кнопку View рядом с Default Key для отображения ключа доступа.
Если вы хотите, то можете отсюда же создать топик и в дальнейшем использовать имя созданного топика в своем приложении. Для этого необходимо выбрать ваш контейнер и нажать кнопку New Topic. В появившемся диалоговом окне введите необходимые данные (например, время жизни сообщения) и нажмите Ok.
Всё, на этом настройка Service Bus закончена. Можно переходить к приложению.
Как и в прошлых статьях, приведу лишь общий кусок Java-кода, который готовы к использованию, с комментариями. Хочу обратить ваше внимание на использование новой функциональности (в прошлой части всё было гораздо проще). В данном коде используется дополнительный метод convertStreamToString(InputStream is), который преобразовывает поток в строку. Кроме этого, показано два метода добавления контента в сообщение – как напрямую (в конструкторе), так и с помощью метода setBody().
import java.io.BufferedReader; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.io.Reader; import java.io.StringWriter; import java.io.Writer; import java.math.BigDecimal; import java.util.ArrayList; import java.util.Calendar; import java.util.List; import com.microsoft.windowsazure.services.serviceBus.*; import com.microsoft.windowsazure.services.serviceBus.models.*; import com.microsoft.windowsazure.services.core.*; import javax.xml.datatype.*; import javax.xml.datatype.DatatypeConstants.Field; public class TestServiceBusRelay { // Все операции по управлению вашим контейнером Service Bus выполняются с // помощью класса ServiceBusContract, при создании которого конструктору // передаётся // конфигурация вашего контейнера - название контейнера, имя issuer и ключ. // Все эти данные можно получить из панели свойств Properties вашего // контейнера Service Bus на портале Windows Azure. // Далее можно воспользоваться функциональностью класса ServiceBusService, // предоставляющего управление очередями - создание, удаление и т.д. В // данном методе создаётся контейнер Service Bus с именем ahrimansb. private static ServiceBusContract createServiceBus(String issuer, String key) { Configuration config = ServiceBusConfiguration .configureWithWrapAuthentication("ahrimansb", issuer, key); ServiceBusContract service = ServiceBusService.create(config); return service; } // В данном методе используется функциональность класса TopicInfo, с помощью // которой в данном случае определяется // максимальный размер очереди в мегабайтах (указывается максимальный размер // в 5Гб). С ��омощью методов TopicInfo можно настраивать различные параметры // ваших очередей, в т.ч. Time To Live для сообщений, максимальный размер и // многое другое. private static void createTopic(String name, ServiceBusContract service) { long queueSize = 5120; TopicInfo topicInfo = new TopicInfo(name); try { topicInfo.setMaxSizeInMegabytes(queueSize); CreateTopicResult result = service.createTopic(topicInfo); } catch (ServiceException e) { System.out.print("ServiceException: " + e.getMessage()); } } // В данном методе используется фильтр по умолчанию MatchAll (поэтому нет // никаких дополнительных указаний значения фильтра). // При использовании фильтра по умолчанию все сообщения, которые поступают в // топик, помещаются в подписку-очередь. private static void createSubscriptionWithFilterMatchAll( String subscriptionInfoName, String topicName, ServiceBusContract service) { try { SubscriptionInfo subInfo = new SubscriptionInfo( subscriptionInfoName); CreateSubscriptionResult result = service.createSubscription( topicName, subInfo); } catch (ServiceException e) { System.out.print("ServiceException: " + e.getMessage()); } } // В данном методе создаётся ещё одна подписка с SQL-фильтром сообщений // (SqlFilter). При этом // в качестве условия используется сравнение некоторого custom-свойства // MessageSequenceId. // После создания двух подписок сообщения, поступающие в соответствующий // топик, буду уходить - // все в первую подписку, и только удовлетворяющие условиям фильтра - во // вторую, таким образом распределяясь по обработчикам. Естественно, // что гораздо оптимальнее создать несколько подписок с разными фильтрами и // распределять сообщения по определенному условию. private static void createSubscriptionWithFilter( String subscriptionInfoName, String topicName, ServiceBusContract service) { SubscriptionInfo subInfo = new SubscriptionInfo(subscriptionInfoName); try { CreateSubscriptionResult result = service.createSubscription( topicName, subInfo); } catch (ServiceException e) { e.printStackTrace(); } RuleInfo ruleInfo = new RuleInfo(); ruleInfo = ruleInfo.withSqlExpressionFilter("MessageNumber > 4"); try { CreateRuleResult ruleResult = service.createRule(topicName, "subscriptioninfoname2", ruleInfo); } catch (ServiceException e) { e.printStackTrace(); } } // Для отображения сообщения в объектной модели существует класс // BrokeredMessage, объекты которого содержат набор методов для управления // сообщением, // набор параметров и набор данных. В набор данных можно передать любой // сериализуемый объект. Очереди Service Bus имеют ограничение на размер // сообщения в 256 мегабайт(заголовок, содержащий свойства - 64 мегабайт), // однако ограничений как таковых на количество хранимых в очереди сообщений // нет, кроме задаваемого вами ограничения-максимального размера очереди. В // данном случае мы также указываем custom-свойство MessageSequenceId, // которое будет использоваться для фильтра private static void putMessageToTopic(String topicName, ServiceBusContract service, BrokeredMessage message) { try { message.setProperty("MessageNumber", "6"); service.sendTopicMessage(topicName, message); } catch (ServiceException e) { System.out.print("ServiceException: " + e.getMessage()); } } // Метод для добавления множества сообщений в топик. private static void putMessagesToTopic(String topicName, ServiceBusContract service, List<BrokeredMessage> messages) { try { for (BrokeredMessage message : messages) { service.sendTopicMessage(topicName, message); } } catch (ServiceException e) { e.printStackTrace(); } } // Аналогично сервису очередей хранилища Windows Azure, вы можете // использовать два метода для извлечения сообщений из очередей топиков - // получение и удаление (ReceiveAndDelete) и "подсматривание" - получение // сообщение, но не удаление его из очереди топика (PeekLock). При // использовании // метода ReceiveAndDelete при получении запроса на извлечение // сообщения, очередь помечает это сообщение как "потреблённое". // При использовании метода PeekLock процесс получения дробится на два этапа // - когда Service Bus получает запрос на извлечение сообщения, он находит // это сообщение, помечает его как locked (в этот момент другие обработчики // перестают видеть это сообщение) и возвращает его приложению. После // окончания // обработки приложением сообщения закрывается второй этап процесса с // помощью вызова метода Delete полученного сообщения. После этого сообщение // помечается как удаленное. // Типичным паттерном опроса очереди на наличие новых сообщений является // использование бесконечного цикла while. В данном методе очередь // опрашивается постоянно. Если вы хотите // ограничить выполнение каким-либо количеством полученных сообщений, вам // необходимо реализовать логику с использованием break. private static void getMessageFromTopic(String topicName, String subscriptionName, ServiceBusContract service) throws ServiceException { ReceiveMessageOptions opts = ReceiveMessageOptions.DEFAULT; opts.setReceiveMode(ReceiveMode.PEEK_LOCK); while (true) { ReceiveSubscriptionMessageResult result = service .receiveSubscriptionMessage(topicName, subscriptionName, opts); BrokeredMessage message = result.getValue(); if (message != null && message.getMessageId() != null) { try { System.out .println("Начало работы по опросу очереди подписки с именем:" + subscriptionName); System.out.println("Сообщение: " + convertStreamToString(message.getBody())); System.out.println("ID сообщения: " + message.getMessageId()); System.out .println("Если вы задали какое-то свойство, его можно получить с помощью метода getProperty(): " + message.getProperty("MessageNumber")); System.out.println("Сообщение прочитано - удалено."); service.deleteMessage(message); } catch (Exception ex) { // если было выброшено исключение, сообщение будет // разблокировано для других обработчиков System.out.println("Исключение!"); service.unlockMessage(message); } } else { System.out .println("Больше нет сообщений, но топик продолжает опрашиваться."); } } } private static void deleteTopic(String topicName, ServiceBusContract service) { try { service.deleteTopic(topicName); } catch (ServiceException e) { e.printStackTrace(); } } private static void deleteSubscription(String subscriptionName, String subscriptionInfoName, ServiceBusContract service) { try { service.deleteSubscription(subscriptionName, subscriptionInfoName); } catch (ServiceException e) { e.printStackTrace(); } } public static void main(String args[]) throws FileNotFoundException { ServiceBusContract service = createServiceBus("owner", "WqmNgDFb1mgicPy7eHzx5sLklBS1Qwb8QjIujhFk8P4="); createTopic("mytopic", service); createSubscriptionWithFilterMatchAll("subscriptioninfoname1", "mytopic", service); createSubscriptionWithFilter("subscriptioninfoname2", "mytopic", service); InputStream input = new FileInputStream("c:\\1.txt"); BrokeredMessage message = new BrokeredMessage("Our message."); message.setBody(input); ArrayList<BrokeredMessage> messages = new ArrayList<BrokeredMessage>(); for (int i = 0; i < 5; i++) { BrokeredMessage msg = new BrokeredMessage("Message text: " + i); msg.setProperty("MessageNumber", i); messages.add(msg); } putMessageToTopic("mytopic", service, message); putMessagesToTopic("mytopic", service, messages); try { getMessageFromTopic("mytopic", "subscriptioninfoname1", service); getMessageFromTopic("mytopic", "mysubscriptionwithfilter", service); } catch (ServiceException e) { e.printStackTrace(); } } public static String convertStreamToString(InputStream is) throws IOException { if (is != null) { Writer writer = new StringWriter(); char[] buffer = new char[1024]; try { Reader reader = new BufferedReader(new InputStreamReader(is, "UTF-8")); int n; while ((n = reader.read(buffer)) != -1) { writer.write(buffer, 0, n); } } finally { is.close(); } return writer.toString(); } else { return ""; } } }
Далее просто запустите ваше приложение. Для этого нажмите ALT+Shift+X либо нажмите соответствующую кнопку в меню в Eclipse (рис. 8).
Рис.8. Запуск простого Java-проекта.
В консоли вы должны увидеть результат.
Обратите внимание, что, если вы получаете исключение ниже, вам необходимо реализовать проверку на существование топика и подписки перед тем, как их создавать – это исключение значит, что такие сущности уже есть в контейнере.
04.04.2012 11:57:06 com.microsoft.windowsazure.services.serviceBus.implementation.ServiceBusExceptionProcessor processCatch WARNING: com.sun.jersey.api.client.UniformInterfaceException: PUT https://ahrimansb.servicebus.windows.net/myqueue returned a response status of 409 Conflict com.sun.jersey.api.client.UniformInterfaceException: PUT https://ahrimansb.servicebus.windows.net/myqueue returned a response status of 409 Conflict at com.sun.jersey.api.client.WebResource.handle(WebResource.java:676) at com.sun.jersey.api.client.WebResource.access$200(WebResource.java:74) at com.sun.jersey.api.client.WebResource$Builder.put(WebResource.java:533) at com.microsoft.windowsazure.services.serviceBus.implementation.ServiceBusRestProxy.createQueue(ServiceBusRestProxy.java:265) at com.microsoft.windowsazure.services.serviceBus.implementation.ServiceBusExceptionProcessor.createQueue(ServiceBusExceptionProcessor.java:188) at TestServiceBusRelay.createQueue(TestServiceBusRelay.java:46) at TestServiceBusRelay.main(TestServiceBusRelay.java:135) ServiceException: com.sun.jersey.api.client.UniformInterfaceException: PUT https://ahrimansb.servicebus.windows.net/myqueue returned a response status of 409 Conflict Response Body: <Error><Code>409</Code><Detail>Conflict.TrackingId:59992d2d-e47f-40a0-935e-a16ea910d5f2_2,TimeStamp:4/4/2012 4:57:08 AM</Detail></Error>Сообщение: com.microsoft.windowsazure.services.serviceBus.models.BrokeredMessage@878c4c
В этом случае вы можете также просто удалить топик или подписку с помощью портала управления Windows Azure.