Tuesday, February 10, 2015

Working with Queue using Visual Studio C# code in azure

Writing the Consumer Application

We now need to write the application that will represent our robot. It will continually check the queue for any messages that have been sent to it, and assumedly execute them somehow.
  1. In Visual Studio, click File > Add > New Project.
  2. Select Console Application, set its name to Consumer and hit OK.
  3. Add references to the Microsoft.WindowsAzure.StorageClient and System.configuration assemblies as you did for the Producer solution.
  4. Add an app.config file to the Consumer project and add the same appSettings element to this file as you did for the Producer solution.
Now open program.cs for the consumer solution if it isn’t already open. Initially, this application needs to do the same configuration and queue setup as the producer application, so our first additions replicate those made in the Starting the Sample section.
namespace Consumer

{

    using System;
    using System.Linq;
    using Microsoft.WindowsAzure.StorageClient;
    using Microsoft.WindowsAzure;
    using System.Configuration;
    using System.Threading;
    public static class Program
    {
       private static void Main(string[] args)
        {
            CloudStorageAccount.SetConfigurationSettingPublisher((configName, configSetter) =>
            {
                configSetter(ConfigurationManager.AppSettings[configName]);
            });

            var storageAccount = CloudStorageAccount.FromConfigurationSetting("DataConnectionString");
            var queueClient = storageAccount.CreateCloudQueueClient();
            var queue = queueClient.GetQueueReference("robotcommands");
            queue.CreateIfNotExist();
        }
    }
}
Now to move on to the guts of our consumer application. The consumer of the queue will connect to the queue just like the message producer code. Once you have a reference to the queue you can callGetMessage(). A consumer will normally do this from within a polling loop that will never end. An example of this type of loop, without all of the error checking that you would normally include, is below.
In this while loop we will get the next message on the queue. If the queue is empty, the GetMessage() method will return a null. If we get a null then we want to sleep for some period of time. In this example we are sleeping for five seconds before we poll again. Sometimes you might sleep a shorter period of time (speeding up the poll loop and fetching messages more aggressively), and sometimes you might want to slow the poll loop down. We will look at how to do this later in this article.
The pattern you should follow in this loop is:
  1. Get Message
    • If no message available, sleep for five seconds
  2. Process the Message
  3. Delete the Message
The code that will do this is as follows. Add it to the Main() method after the call to queue.CreateIfNotExist().
            CloudQueueMessage newMessage = null;
            double secondsToDelay = 5;
            Console.WriteLine("Will start reading the command queue, and output them to the screen.");
            Console.WriteLine(string.Format("Polling queue every {0} second(s).", secondsToDelay.ToString()));
            while (true)
            {
                newMessage = queue.GetMessage();
                if (newMessage != null)
                {
                    Console.WriteLine(string.Format("Received Command: {0}", newMessage.AsString));
                    queue.DeleteMessage(newMessage);
                }

                else
                    Thread.Sleep(TimeSpan.FromSeconds(secondsToDelay));
            }
If there is a message found we will then want to process it. This is whatever work you have for that message to do. Messages generally follow what is called the Work Ticket pattern. This means that the message includes key data for the work to be done, but not the real data that is needed. This keeps the message light and easy to move around. In this case the message is just simple commands for the robot to process.
After the work is completed we want to remove the message from the queue so that it is not processed again. This is accomplished with the DeleteMessage() method. In order to do this we need to pass in the original message, because the service needs to know the message id and the pop receipt (more on this in The Message Lifecycle section) to perform the delete. And then the loop continues on with its polling and processing.

Running the Sample

You should now have a Visual Studio solution that has two projects in it. A console application called Producer that will generate robot commands and submit them to your queue. You will also have a second console application called Consumer that plays the role of the robot, consuming messages from the queue.
We need to run both of these console applications at the same time, which you can’t normally do with f5 in Visual Studio. The trick to running both is to right click on each project name, select the debug menu, and then select ‘start new instance’. It doesn’t matter which one you start first.
After you do this you will have two DOS windows open, one for each application. Use the Producer application to start creating messages to be sent to the queue. Here is what it looks like. Make sure the storage emulator from the Windows Azure SDK is already running before you start the applications.
CmdProducer and CmdConsumer

The Message Lifecycle

The prior section mentioned something called a pop receipt. The pop receipt is an important part of the lifecycle of a message in the queue. When a message is grabbed from the top of the queue it is not actually removed from the queue. This doesn’t happen until DeleteMessage is called later. The message stays on the queue but is marked invisible. Every time a message is retrieved from the queue, the consumer can determine how long this timeout of invisibility should be, based on their processing logic. This defaults to 30 seconds, but can be as long as two hours. The consumer is also given a unique pop receipt for that get operation. Once a message is marked as invisible, and the time out clock starts ticking, there isn’t a way to end it quicker. You must wait for the full timeout to expire.
When the consumer comes back, within the timeout window, with the proper receipt id, the message can then be deleted.
If the consumer does not try to delete the message within the timeout window, the message will become visible again, at the position it had in the queue to begin with. Perhaps during this window of time the server processing the message crashed, or something untoward happened. The queue remains reliable by marking the message as visible again so another consumer can pick the message up and have a chance to process it. In this way a message can never be lost, which is critical when using a queuing system. No one wants to lose the $50,000 order for pencils that just came in from your best customer.
This does lead us to one small problem. Let’s say our message was picked up by server A, but server A never returned to delete it, and the message timed out. The message then became visible again, and our second server, server B, finds the message, picks it up and processes it. When it picks up the message it receives a new pop receipt, making the pop receipt originally given to server A invalid.
During this time, we find out that server A didn’t actually crash, it just took longer to process the message than we predicted with the timeout window. It comes back after all of its hard work and tries to delete the message with its old pop receipt. Because the old pop receipt is invalid server A will receive an exception telling it that the message has been picked up by another processor.
This failure recovery process rarely happens, and it is there for your protection. But it can lead to a message being picked up more than once. Each message has a property, DequeueCount, that tells you how many times this message has been picked up for processing. In our example above, when server A first received the message, the dequeuecount would be 0. When server B picked up the message, after server A’s tardiness, the dequeuecount would be 1. In this way you can detect a poison message and route it to a repair and resubmit process. A poison message is a message that is somehow continually failing to be processed correctly. This is usually caused by some data in the contents that causes the processing code to fail. Since the processing fails, the messages timeout expires and it reappears on the queue. The repair and resubmit process is sometimes a queue that is managed by a human, or written out to Blob storage, or some other mechanism that allows the system to keep on processing messages without being stuck in an infinite loop on one message. You need to check for and set a threshold for this dequeuecount for yourself. For example:
if (newMessage.DequeueCount > 5)

{

   routePoisonMessage(newMessage);

}

Word of the Day: Idempotent

Since a message can actually be picked up more than once, we have to keep in mind that the queue service guarantees that a message will be delivered, AT LEAST ONCE.
This means you need to make sure that the ‘do work here’ code is idempotent in nature. Idempotent means that a process can be repeated without changing the outcome. For example, if the ATM was not idempotent when I deposited $10, and there was a failure leading to the processing of my deposit more than once, I would end up with more than ten dollars in my account. If the ATM was idempotent, then even if the deposit transaction is processed ten times, I still get only ten dollars deposited into my account.
You need to make sure that your processing code is idempotent. There are several ways to do this. Most usually you should just build it into the nature of the backend systems that are consuming the messages. In our robot example we wouldn’t want the robot to execute a single ‘Turn Left’ command twice because it is accidentally handling the same message twice. In this scenario we might track the message id of each message processed, and check that list before we execute a command to make sure we haven’t processed it.

When Queues are Useful

No comments:

Post a Comment