1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
//use this setting when deploying to Windows Azure <add key="Microsoft.ServiceBus.ConnectionString" value="Endpoint=sb://[your namespace].servicebus.windows.net;SharedSecretIssuer=owner;SharedSecretValue=[your secret]" /> public static string getLocalServiceBusConnectionString() { var ServerFQDN = System.Net.Dns.GetHostEntry(string.Empty).HostName; var ServiceNamespace = "ServiceBusDefaultNamespace"; var HttpPort = 9355; var TcpPort = 9354; var connBuilder = new ServiceBusConnectionStringBuilder(); connBuilder.ManagementPort = HttpPort; connBuilder.RuntimePort = TcpPort; connBuilder.Endpoints.Add(new UriBuilder() { Scheme = "sb", Host = ServerFQDN, Path = ServiceNamespace }.Uri); connBuilder.StsEndpoints.Add(new UriBuilder() { Scheme = "https", Host = ServerFQDN, Port = HttpPort, Path = ServiceNamespace }.Uri); return connBuilder.ToString(); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
public void CreateServiceBusTopicAndSubscriptions(NamespaceManager namespaceManager) { #region Configure and create Service Bus Topic var serviceBusTestTopic = new TopicDescription(TopicName); serviceBusTestTopic.MaxSizeInMegabytes = 5120; serviceBusTestTopic.DefaultMessageTimeToLive = new TimeSpan(0, 1, 0); if (!namespaceManager.TopicExists(TopicName)) { namespaceManager.CreateTopic(serviceBusTestTopic); } #endregion #region Create filters and subsctiptions //create filters var messagesFilter_Web = new SqlFilter("MessageOrigin = 'Web'"); var messagesFilter_Mobile = new SqlFilter("MessageOrigin = 'Mobile'"); var messagesFilter_Service = new SqlFilter("MessageOrigin = 'Service'"); if (!namespaceManager.SubscriptionExists(TopicName, "WebMessages")) { namespaceManager.CreateSubscription(TopicName, "WebMessages", messagesFilter_Web); } if (!namespaceManager.SubscriptionExists(TopicName, "MobileMessages")) { namespaceManager.CreateSubscription(TopicName, "MobileMessages", messagesFilter_Mobile); } if (!namespaceManager.SubscriptionExists(TopicName, "WCfServiceMessages")) { namespaceManager.CreateSubscription(TopicName, "WCfServiceMessages", messagesFilter_Service); } #endregion }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
public override void Run() { Parallel.ForEach(SubscriptionClients, currentSubscrtiption => { while (!IsStopped) { #region Receive messages try { // Receive the message var receivedMessage = currentSubscrtiption.Receive(); if (receivedMessage != null) { var messageFrom = receivedMessage.Properties["MessageOrigin"].ToString(); switch (messageFrom) { case "Web": //send it to web processing logic break; case "Mobile": //send it to mobile processing logic break; case "Service": //send it to service processing logic break; default: break; } // Process the message Trace.WriteLine(Environment.NewLine + "--------------------------" + Environment.NewLine); Trace.WriteLine(string.Format("{0} message content: {1}", messageFrom, receivedMessage.GetBody<string>())); receivedMessage.Complete(); } } catch (MessagingException e) { if (!e.IsTransient) { Trace.WriteLine(e.Message); throw; } Thread.Sleep(10000); } catch (OperationCanceledException e) { if (!IsStopped) { Trace.WriteLine(e.Message); throw; } } #endregion } }); }