Wednesday, February 25, 2015

Code for Service Bus using C# in VS Azure

Azure Service Bus is a generic, Cloud-based messaging system. It connects apps running onAzure, on-premises systems or both. There are four different communication mechanisms of connection, each of which connects applications in a different way.
  • Queues, allow one-directional communication. Each queue acts as an intermediary (broker) that stores sent messages until they are received. Each message is received by a single recipient.
  • Topics, provids one-directional communication using subscriptions, a single topic can have multiple subscriptions. Like a queue, a topic acts as a broker, but each subscription can optionally use a filter to receive only messages that match specific criteria.
  • Relays, provide bi-directional communication. Unlike queues and topics, a relay doesn't store messages. Instead, it just passes them on to the destination application.
  • Event Hubs, is a highly scalable ingestion system that can process millions of events per second, enabling application to process and analyze the massive amounts of data produced by the connected devices and applications.
We can use one or more instances of four communication mechanisms depending on our requirement.

Reference - Azure Service Bus.

Creating Service Bus Namespace,
  1. Log on to the Azure portal.
  2. Click On New button. Select Hybrid Integration and select Service Bus.


  3. Click on Create button at bottom.

  4. Provide input as below and click Ok.

Using Service Bus Queue
Creating Service Bus Queue using Management Portal
  1. Once created select Service Namespace and select Queue option. 
  2. Click on Create New Queue. Select Quick Create and provide the inputs.

  3. Click Create A New Queue.
Creating Service Bus Queue programmatically
  1. Open Visual Studio and create new console application.
  2. In Solution Explorer, right-click References, then click Manage NuGet Packages.
  3. Select the Microsoft Azure Service Bus item. Click Install.
  4. Open App.confgig file, under appSettings you can see key for Connection String is added as below.
    1. <appSettings>  
    2. <!-- Service Bus specific app setings for messaging connections -->  
    3. <addkey="Microsoft.ServiceBus.ConnectionString"  
    4. value="Endpoint=sb://[your namespace].servicebus.windows.net;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=[your secret]"/>  
    5.   
    6. </appSettings>  
  5. Go to Management portal. Click on Configuration Information button at bottom. 

  6. Copy the connection string and replace value of connectionString key in App.config file.
  7. Write below code in program.cs file.

    Add the following references.
    1. using Microsoft.ServiceBus;  
    2. using Microsoft.WindowsAzure;

Add the following code in Main() method 
  1. stringconnectionString = CloudConfigurationManager.GetSetting("Microsoft.ServiceBus.ConnectionString");    
  2.     
  3. var namespaceManager = NamespaceManager.CreateFromConnectionString(connectionString);    
  4.     
  5. if (!namespaceManager.QueueExists("azservicequeue"))    
  6. {    
  7. namespaceManager.CreateQueue("azservicequeue");    
  8. }    
This will create "azservicequeue" service bus queue.

Send messages to a queue
  1. string connStr = "[connection string]";  
  2. MessagingFactory factory = MessagingFactory.CreateFromConnectionString(connStr);  
  3. MessageSendertestQueueSender = factory.CreateMessageSender("azservicequeue");  
  4. BrokeredMessage message = new BrokeredMessage("This is sample message.");  
  5. testQueueSender.Send(message);  
Retrieve message form queue
  1. string connStr = "[connection string]";  
  2. QueueClient Client = QueueClient.CreateFromConnectionString(connStr, "azservicequeue");  
  3. OnMessageOptions options = newOnMessageOptions();  
  4. options.AutoComplete = false;  
  5. options.AutoRenewTimeout = TimeSpan.FromMinutes(1);  
  6. Client.OnMessage((message) =>  
  7. {  
  8.     try  
  9.     {  
  10.         Console.WriteLine("Message: " + message.GetBody < string > ());  
  11.         // Remove message from queue.  
  12.         message.Complete();  
  13.     }  
  14.     catch (Exception)  
  15.     {  
  16.         // Indicates a problem, unlock message in queue.  
  17.         message.Abandon();  
  18.     }  
  19. }, options);  
Message can be retrieve in two different mode RecieveAndDelete or PeekLock.

By default, a message is retrieved using the PeekLock mode, this means that the message is locked on the queue so that it cannot be retrieved by another consumer. Once the retrieval code has finished processing, it calls Complete on the message. This notifies the service bus that this message is completed and can be permanently removed from the queue. Likewise, if something goes wrong during the processing and an exception is thrown, then the code calls Abandon on the message, notifying the service bus that the message could not be processed and it can be retrieved by another consumer.

Reference - Get started with Service Bus Queues.

Using Service Bus Topics
Creating Service Bus Topic
  1. Open Visual Studio and create new console application.
  2. Add NuGet Packages for Service Bus. And configure the “Microsoft.ServiceBus.ConnectionString” as we did in previous application.
  3. Write below code in program.cs file.
    1. string connectionString = CloudConfigurationManager.GetSetting("Microsoft.ServiceBus.ConnectionString");  
    2.   
    3. var namespaceManager = NamespaceManager.CreateFromConnectionString(connectionString);  
    4.   
    5. if (!namespaceManager.TopicExists("azservicetopic"))  
    6. {  
    7.    namespaceManager.CreateTopic("azservicetopic");  
    8. }  
Creating a subscription with the default (MatchAll) filter

The MatchAll filter is the default filter that is used if no filter is specified when a new subscription is created.
  1. string connectionString = CloudConfigurationManager.GetSetting("Microsoft.ServiceBus.ConnectionString");  
  2.   
  3. var namespaceManager = NamespaceManager.CreateFromConnectionString(connectionString);  
  4.   
  5. if (!namespaceManager.SubscriptionExists("azservicetopic""AllMessages"))  
  6. {  
  7.    namespaceManager.CreateSubscription("azservicetopic""AllMessages");  
  8. }  
Creating subscriptions with filters

The following example creates a subscription named HighProperty with a SqlFilter object that only selects messages that have a custom MessageNumber property greater than 3.
  1. SqlFilter highMessagesFilter =new SqlFilter("MessageNumber> 3");  
  2.   
  3. namespaceManager.CreateSubscription("azservicetopic","HighProperty",highMessagesFilter);  
Send messages to a topic
  1. TopicClient Client = TopicClient.CreateFromConnectionString(connectionString, "azservicetopic");  
  2.   
  3. BrokeredMessage message = newBrokeredMessage("This is sample message ");  
  4. Client.Send(message);  
We can also add custom property to BroderdMessage object.

You can get example in sample code.

Receive messages from a subscription
  1. SubscriptionClient Client = SubscriptionClient.CreateFromConnectionString(connectionString, "azservicetopic""HighProperty");  
  2. // Configure the callback options.  
  3. OnMessageOptions options = new OnMessageOptions();  
  4. options.AutoComplete = false;  
  5. options.AutoRenewTimeout = TimeSpan.FromMinutes(1);  
  6. Client.OnMessage((message) =>  
  7. {  
  8.     try  
  9.     {  
  10.         // Process message from subscription.  
  11.         Console.WriteLine("High Property Messages");  
  12.         Console.WriteLine("Message: " + message.GetBody < string > ());  
  13.         Console.WriteLine("Message Number: " +  
  14.             message.Properties["MessageNumber"]);  
  15.         // Remove message from subscription.  
  16.         message.Complete();  
  17.     }  
  18.     catch (Exception)  
  19.     {  
  20.         // Indicates a problem, unlock message in subscription.  
  21.         message.Abandon();  
  22.     }  
  23. }, options);  
  24. }  

Tuesday, February 10, 2015

Working with Queue using Visual Studio C# code in azure

Writing the Consumer Application

We now need to write the application that will represent our robot. It will continually check the queue for any messages that have been sent to it, and assumedly execute them somehow.
  1. In Visual Studio, click File > Add > New Project.
  2. Select Console Application, set its name to Consumer and hit OK.
  3. Add references to the Microsoft.WindowsAzure.StorageClient and System.configuration assemblies as you did for the Producer solution.
  4. Add an app.config file to the Consumer project and add the same appSettings element to this file as you did for the Producer solution.
Now open program.cs for the consumer solution if it isn’t already open. Initially, this application needs to do the same configuration and queue setup as the producer application, so our first additions replicate those made in the Starting the Sample section.
namespace Consumer

{

    using System;
    using System.Linq;
    using Microsoft.WindowsAzure.StorageClient;
    using Microsoft.WindowsAzure;
    using System.Configuration;
    using System.Threading;
    public static class Program
    {
       private static void Main(string[] args)
        {
            CloudStorageAccount.SetConfigurationSettingPublisher((configName, configSetter) =>
            {
                configSetter(ConfigurationManager.AppSettings[configName]);
            });

            var storageAccount = CloudStorageAccount.FromConfigurationSetting("DataConnectionString");
            var queueClient = storageAccount.CreateCloudQueueClient();
            var queue = queueClient.GetQueueReference("robotcommands");
            queue.CreateIfNotExist();
        }
    }
}
Now to move on to the guts of our consumer application. The consumer of the queue will connect to the queue just like the message producer code. Once you have a reference to the queue you can callGetMessage(). A consumer will normally do this from within a polling loop that will never end. An example of this type of loop, without all of the error checking that you would normally include, is below.
In this while loop we will get the next message on the queue. If the queue is empty, the GetMessage() method will return a null. If we get a null then we want to sleep for some period of time. In this example we are sleeping for five seconds before we poll again. Sometimes you might sleep a shorter period of time (speeding up the poll loop and fetching messages more aggressively), and sometimes you might want to slow the poll loop down. We will look at how to do this later in this article.
The pattern you should follow in this loop is:
  1. Get Message
    • If no message available, sleep for five seconds
  2. Process the Message
  3. Delete the Message
The code that will do this is as follows. Add it to the Main() method after the call to queue.CreateIfNotExist().
            CloudQueueMessage newMessage = null;
            double secondsToDelay = 5;
            Console.WriteLine("Will start reading the command queue, and output them to the screen.");
            Console.WriteLine(string.Format("Polling queue every {0} second(s).", secondsToDelay.ToString()));
            while (true)
            {
                newMessage = queue.GetMessage();
                if (newMessage != null)
                {
                    Console.WriteLine(string.Format("Received Command: {0}", newMessage.AsString));
                    queue.DeleteMessage(newMessage);
                }

                else
                    Thread.Sleep(TimeSpan.FromSeconds(secondsToDelay));
            }
If there is a message found we will then want to process it. This is whatever work you have for that message to do. Messages generally follow what is called the Work Ticket pattern. This means that the message includes key data for the work to be done, but not the real data that is needed. This keeps the message light and easy to move around. In this case the message is just simple commands for the robot to process.
After the work is completed we want to remove the message from the queue so that it is not processed again. This is accomplished with the DeleteMessage() method. In order to do this we need to pass in the original message, because the service needs to know the message id and the pop receipt (more on this in The Message Lifecycle section) to perform the delete. And then the loop continues on with its polling and processing.

Running the Sample

You should now have a Visual Studio solution that has two projects in it. A console application called Producer that will generate robot commands and submit them to your queue. You will also have a second console application called Consumer that plays the role of the robot, consuming messages from the queue.
We need to run both of these console applications at the same time, which you can’t normally do with f5 in Visual Studio. The trick to running both is to right click on each project name, select the debug menu, and then select ‘start new instance’. It doesn’t matter which one you start first.
After you do this you will have two DOS windows open, one for each application. Use the Producer application to start creating messages to be sent to the queue. Here is what it looks like. Make sure the storage emulator from the Windows Azure SDK is already running before you start the applications.
CmdProducer and CmdConsumer

The Message Lifecycle

The prior section mentioned something called a pop receipt. The pop receipt is an important part of the lifecycle of a message in the queue. When a message is grabbed from the top of the queue it is not actually removed from the queue. This doesn’t happen until DeleteMessage is called later. The message stays on the queue but is marked invisible. Every time a message is retrieved from the queue, the consumer can determine how long this timeout of invisibility should be, based on their processing logic. This defaults to 30 seconds, but can be as long as two hours. The consumer is also given a unique pop receipt for that get operation. Once a message is marked as invisible, and the time out clock starts ticking, there isn’t a way to end it quicker. You must wait for the full timeout to expire.
When the consumer comes back, within the timeout window, with the proper receipt id, the message can then be deleted.
If the consumer does not try to delete the message within the timeout window, the message will become visible again, at the position it had in the queue to begin with. Perhaps during this window of time the server processing the message crashed, or something untoward happened. The queue remains reliable by marking the message as visible again so another consumer can pick the message up and have a chance to process it. In this way a message can never be lost, which is critical when using a queuing system. No one wants to lose the $50,000 order for pencils that just came in from your best customer.
This does lead us to one small problem. Let’s say our message was picked up by server A, but server A never returned to delete it, and the message timed out. The message then became visible again, and our second server, server B, finds the message, picks it up and processes it. When it picks up the message it receives a new pop receipt, making the pop receipt originally given to server A invalid.
During this time, we find out that server A didn’t actually crash, it just took longer to process the message than we predicted with the timeout window. It comes back after all of its hard work and tries to delete the message with its old pop receipt. Because the old pop receipt is invalid server A will receive an exception telling it that the message has been picked up by another processor.
This failure recovery process rarely happens, and it is there for your protection. But it can lead to a message being picked up more than once. Each message has a property, DequeueCount, that tells you how many times this message has been picked up for processing. In our example above, when server A first received the message, the dequeuecount would be 0. When server B picked up the message, after server A’s tardiness, the dequeuecount would be 1. In this way you can detect a poison message and route it to a repair and resubmit process. A poison message is a message that is somehow continually failing to be processed correctly. This is usually caused by some data in the contents that causes the processing code to fail. Since the processing fails, the messages timeout expires and it reappears on the queue. The repair and resubmit process is sometimes a queue that is managed by a human, or written out to Blob storage, or some other mechanism that allows the system to keep on processing messages without being stuck in an infinite loop on one message. You need to check for and set a threshold for this dequeuecount for yourself. For example:
if (newMessage.DequeueCount > 5)

{

   routePoisonMessage(newMessage);

}

Word of the Day: Idempotent

Since a message can actually be picked up more than once, we have to keep in mind that the queue service guarantees that a message will be delivered, AT LEAST ONCE.
This means you need to make sure that the ‘do work here’ code is idempotent in nature. Idempotent means that a process can be repeated without changing the outcome. For example, if the ATM was not idempotent when I deposited $10, and there was a failure leading to the processing of my deposit more than once, I would end up with more than ten dollars in my account. If the ATM was idempotent, then even if the deposit transaction is processed ten times, I still get only ten dollars deposited into my account.
You need to make sure that your processing code is idempotent. There are several ways to do this. Most usually you should just build it into the nature of the backend systems that are consuming the messages. In our robot example we wouldn’t want the robot to execute a single ‘Turn Left’ command twice because it is accidentally handling the same message twice. In this scenario we might track the message id of each message processed, and check that list before we execute a command to make sure we haven’t processed it.

When Queues are Useful