Sunday, December 23, 2012

Windows Azure 1.8 SDK– Queue to Queue Transfers

I was recently watching an episode of the Cloud Cover show where Abhishek Lal was talking about some of the recent features that are available in SDK 1.8.  One of the features that stood out for me was Queue to Queue transfers. A Queue to Queue transfer allows for a publisher to push content to a Queue.  Next, the Queue that just received the message can now push it out to another Queue automatically.

This new capability supports a couple design patterns:

  • Fan In – Where you have multiple systems publishing messages and you want to reduce the receiving endpoint surface down to a smaller number.
  • Fan Out – Where you have a single source message that you want to disseminate to more parties

The focus of this post is the Fan In scenario.  The diagram below describes the messaging pattern that we would like to enforce.  In this case we have 4 publishers.  Let’s pretend these are retailers who are now requesting more supply of a particular product.  If we want isolation between publishers then we would create a queue for each Publisher.  However, if we are interested in order delivery we now have race conditions that exist on the Receiver side.  Since this is a BizTalk Blog, BizTalk is acting as my Receiver.  Since we have 4 queues with 4 unique URIs this translates into 4 BizTalk Receive Locations (the blue RL boxes below).  We do not have any control over when and how those messages are received.  In my opinion this problem gets worse if we are building our own .Net client that is checking each queue looking for new messages.  Even if we are trying to be “fair” about the way in which we check the queues for new messages we don’t have any assurances of in order delivery.

image

Let’s make our lives easier and let the Service Bus maintain the order of messages through Queue to Queue  transfers and have a single endpoint that we need to consume from.  It will also simplify our BizTalk configuration as we will only need 1 Receive Location.

image

 

Solution

Within Visual Studio I am going to create a Solution that has 3 C# console applications.

image

QueueToQueuePublisher Project

The core of the overall solution is the QueueToQueuePublisher project.  Within it there are two classes:

  • DataContracts.cs -  contains our class that we will use as our PurchaseOrder
  • Program.cs -  is where we will create our Queues and establish our Queue to Queue forwarding.

image

DataContracts Class

If we further examine the DataContracts class we will discover the following object:

namespace QueueToQueuePublisher
{
    public class PurchaseOrder
    {
        public string ProductID { get; set; }
        public int QuantityRequired { get; set; }
        public string CompanyID { get; set; }
    }
}

 

Program Class

In Program.cs things get a little more interesting


using System;
using System.Collections.Generic;
using Microsoft.ServiceBus;
using Microsoft.ServiceBus.Messaging;

namespace QueueToQueuePublisher
{
    public class Program
    {
        const string CommonQueueName = "CommonQueue";
        const string PubAQueueName = "PubAQueue";
        const string PubBQueueName = "PubBQueue";
        const string ServiceNamespace = "<your_namespace>";
        const string IssuerName = "owner";
        const string IssuerKey ="<your_key>";

        static void Main(string[] args)
        {

            TokenProvider credentials = TokenProvider.CreateSharedSecretTokenProvider(Program.IssuerName, Program.IssuerKey);
            Uri serviceUri = ServiceBusEnvironment.CreateServiceUri("sb", Program.ServiceNamespace, string.Empty);

            try
            {
                //*************************************************************************************************
                //                                   Management Operations
                //**************************************************************************************************         
                NamespaceManager namespaceClient = new NamespaceManager(serviceUri, credentials);
                if (namespaceClient == null)
                {
                    Console.WriteLine("\nUnexpected Error: NamespaceManager is NULL");
                    return;
                }

                Console.WriteLine("\nCreating Queue '{0}'...", Program.CommonQueueName);

                // Create Queue if it doesn't exist.
                //This Queue must exist prior to another
                //Queue forwarding messages to it
                if (!namespaceClient.QueueExists(Program.CommonQueueName))
                {
                    namespaceClient.CreateQueue(Program.CommonQueueName);
                }

                // Create Publisher A's Queue if it doesn't exist
                if (!namespaceClient.QueueExists(Program.PubAQueueName))
                {
                    QueueDescription qd = new QueueDescription(Program.PubAQueueName);

                    //This is where we establish our forwarding
                    qd.ForwardTo = Program.CommonQueueName;
                    namespaceClient.CreateQueue(qd);
                }

                // Create Publisher B's Queue if it doesn't exist
                if (!namespaceClient.QueueExists(Program.PubBQueueName))
                {
                     QueueDescription qd = new QueueDescription(Program.PubBQueueName);
                    {
                         //This is where we establish our forwarding

                         qd.ForwardTo = Program.CommonQueueName;
                         namespaceClient.CreateQueue(qd);
                    };
                  
                }

            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.ToString());
            }
        }
    }
}

 

Within this class, the purpose is to:

  1. Create the Common Queue, if it does not already exist.
  2. If Publisher A’s Queue does not exist, create a new Queue Description and include the ForwardTo directive that will forward messages from the Publisher A Queue to the Common Queue. We will then use this Queue Description to create the Publisher A Queue.
  3. If Publisher B’s Queue does not exist, create a new Queue Description and include the ForwardTo directive that will forward messages from the Publisher B Queue to the Common Queue. We will then use this Queue Description to create the Publisher B Queue.

The Program.cs class that is part of this project only needs to run once in order to setup and configure our queues.

 

Publisher A Project

The purpose of this Project is very simple.  We want to create an instance of a Purchase Order and publish this message to our Publisher A Queue.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.ServiceBus;
using Microsoft.ServiceBus.Messaging;
using QueueToQueuePublisher;
using System.Runtime.Serialization;


namespace PublisherA
{
    class Program
    {

            const string SendQueueName = "pubaqueue";
            const string ServiceNamespace = "<your_namespace>";
            const string IssuerName ="owner";
            const string IssuerKey = "<your_key>";
      
               
       static void Main(string[] args)
        {

          

     
            //***************************************************************************************************
            //                                   Get Credentials
            //***************************************************************************************************          
            TokenProvider credentials = TokenProvider.CreateSharedSecretTokenProvider  (Program.IssuerName, Program.IssuerKey);
            Uri serviceUri = ServiceBusEnvironment.CreateServiceUri("sb", Program.ServiceNamespace, string.Empty);

            MessagingFactory factory = null;

            try
            {
                PurchaseOrder po = new PurchaseOrder();

                po.CompanyID = "PublisherA";
                po.ProductID = "A1234";
                po.QuantityRequired = 300;

                factory = MessagingFactory.Create(serviceUri, credentials);

                QueueClient myQueueClient = factory.CreateQueueClient(Program.SendQueueName);

                BrokeredMessage message = new BrokeredMessage(po, new DataContractSerializer(typeof(PurchaseOrder)));
                Console.WriteLine("Publisher A sending message");
                myQueueClient.Send(message);


            }
            catch(Exception ex)
            {
                Console.WriteLine(ex.ToString());
            }

        }
    }
}

Publisher B Project

This project is pretty much a carbon copy of the Publisher A Project with the difference being that we are going to send messages to our Publisher B Queue instead of the Publisher A Queue.  I have included this project for completeness.

 

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.ServiceBus;
using Microsoft.ServiceBus.Messaging;
using QueueToQueuePublisher;
using System.Runtime.Serialization;


namespace PublisherA
{
    class Program
    {

        const string SendQueueName = "pubbqueue";
        const string ServiceNamespace = "<your_namespace>";
        const string IssuerName = "owner";
        const string IssuerKey = "<your_key>";


        static void Main(string[] args)
        {

 


            //***************************************************************************************************
            //                                   Get Credentials
            //***************************************************************************************************          
            TokenProvider credentials = TokenProvider.CreateSharedSecretTokenProvider(Program.IssuerName, Program.IssuerKey);
            Uri serviceUri = ServiceBusEnvironment.CreateServiceUri("sb", Program.ServiceNamespace, string.Empty);

            MessagingFactory factory = null;

            try
            {
                PurchaseOrder po = new PurchaseOrder();

                po.CompanyID = "PublisherB";
                po.ProductID = "B1234";
                po.QuantityRequired = 300;

                factory = MessagingFactory.Create(serviceUri, credentials);

                QueueClient myQueueClient = factory.CreateQueueClient(Program.SendQueueName);


                BrokeredMessage message = new BrokeredMessage(po, new DataContractSerializer(typeof(PurchaseOrder)));
                Console.WriteLine("Publisher B sending message");
                myQueueClient.Send(message);


            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.ToString());
            }

        }
    }
}

BizTalk

From a BizTalk perspective we are going to keep this very simple.  We will simply create a Send Port Subscription and write a copy of any message that is retrieved from the Common Service Bus Queue to disk.

In order to configure a Send Port Subscription we need to first create a Receive Port and Receive Location.  This Receive Location will connect to our ServiceBus Namespace and will be looking for messages form the CommonQueue only.  As you may recall the messages that are sent to the individual Publisher A and B Queues will be forwarded to this Common Queue. 

Also note, since I am not deploying any Schemas we want to use the PassThruReceive pipeline.  If you specify XMLReceive then BizTalk will be looking for a schema that doesn’t exist.

image

Our Send Port will consist of using the FILE Adapter to write our messages to the local file system.

image

In order for our Send Port Subscription to work we do need to create a Filter based upon our Receive Port Name.

image

At this point we can enable our BizTalk Application.

 

Testing

In order to test our application we do need to make sure that the QueueToQueuePublisher console application is run.  This will create our common queue and our two publisher queues.  After running this application we should see the following within our namespace.

image

If we want to test our Queue To Queue forwarding we can simply create a test message in our pubaqueue and then receive the test message from our commonqueue.

image

image

image

Now that our Queue configuration has been verified we can run an instance of our PublisherA console application.

image

If we check our file folder that our send port is writing to we should see a new file has been written.

image

We can now perform the same actions with PublisherB.

image

image

Conclusion

As you can see the Queue to Queue forwarding is a pretty neat feature.  We can use it for Fan In, as in our scenario, messaging scenarios that forces Service Bus to worry about in order delivery and simplifies a Receiver’s endpoint surface.  Arguably it creates more configuration in the cloud so there may be a bit of a trade off in that regard.

Currently the only way to configure the ForwardTo property is through the Management APIs.  There is currently no GUI that allows you to take care of this.  But, without having any private knowledge, I am sure that this is something that Microsoft will address in a future update.

Something else to be aware of is that BizTalk has no understanding of ForwardTo.  Nor would any other client of the Service Bus.  This “configuration” is occurring outside of client configurations which is the way it should be.  Any perceived complexities that exist should be abstracted away from systems that are communicating with Service Bus.