I am, of course, talking about a pattern in high scale web application design, Queues. More specifically the Queue service on the Windows Azure cloud computing platform.

So when do you use Queues?

Its common to use thread synchronization objects in a web application to serialize access to a resource, but this thread synchronization approach doesn't work so well when you have multiple applications, over multiple servers competing for exclusive access to a global resource.

The first scenario I've found which suits the Queue service is in a user driven content ranking system. I suspect I'll find many more places to use the Queue services as I continue to learn more about developing and designing applications on the cloud platform.

In the web application I'm building I want users to be able to rate dynamic content and I also want to record the number of views. This is a very common concept which allows the users to provide feedback which can then be shared with other users and used to dynamically rank the content.

I can start by implementing some tables, one to store views and one to store votes. All I need to do is insert a row into the corresponding table for every view or vote. I can then query the tables to get all the views and votes for each content item. While this will work, it won't scale. Imagine querying a YouTube video view table! It's certainly not the sort of task you could do while the user is waiting for a page to render.

You may be able to resolve this problem with a background process, using some thread locks, to periodically calculate and cache this information on each web server. While I may end up using some caching on each web server, storing all the pages and their counts on each web server may not be feasible. Another problem with this is all the web servers are doing the same data analysis, which doesn't seem a efficient use of resources.

A nice solution for this type of problem is to use the Queue service and a table for the summarized data. Instead of adding page views and votes to the original tables, they can be added to a queue. A worker role can then be implemented to periodically check this queue, potentially doing some validation (or other work) on each item, and then update the reference table. This could be scaled even further, if required, by having multiple worker roles processing items from queue.

A not entirely desirable effect of using this pattern is that votes and pages views are not immediately registered when a user votes or visits a page. I guess this is more desirable than your users not being able to view pages when a single server web server reaches capacity or users getting page time-outs as you process billions of table rows while they wait.

Continuing a theme in my recent posts, I'm going to have a play with the service using IronPython.

I've been using this little helper module which just imports the StorageClient assembly and makes it easy to create development StorageAccountInfo objects as the have the development credentials as default values.

import clr
clr.AddReference("StorageClient.dll")
from Microsoft.Samples.ServiceHosting.StorageClient import *
from System import Uri

class Account():

   def __init__(self):
      self.endPointUri = Uri("http://127.0.0.1:10002/")
      self.accountName = 'devstoreaccount1'
      self.accountSharedKey = '...'

   def GetStorageAccountInfo(self):
      return StorageAccountInfo(self.endPointUri, None, self.accountName,self.accountSharedKey)

class BlobAccount(Account):

   def __init__(self):
      Account.__init__(self)
      self.endPointUri = Uri("http://127.0.0.1:10000/")

class QueueAccount(Account):

   def __init__(self):
      Account.__init__(self)
      self.endPointUri = Uri("http://127.0.0.1:10001/")

It's then pretty easy to create a queue and add an item to it

from AzureQueueHelper import *

# create a connection to the development storage service
queueStorage = QueueStorage.Create(QueueAccount().GetStorageAccountInfo())

# create a queue by name
messageQueue = queueStorage.GetQueue("test1")

# this will create the queue if it does't exist, but will do nothing if it already exists
messageQueue.CreateQueue()

# create and send a message
msg = Message("testString")
messageQueue.PutMessage(msg)

Then get it back in another process

from AzureQueueHelper import *

# create a connection to the development storage service
queueStorage = QueueStorage.Create(QueueAccount().GetStorageAccountInfo())

# create a queue by name
messageQueue = queueStorage.GetQueue("test1")

# this will create the queue if it does't exist, but will do nothing if it already exists
messageQueue.CreateQueue()

# the is the time you have exclusive access to the queue item
msg = messageQueue.GetMessage(5)

if msg:

   print msg.ContentAsString()

   # message must be explicitly removed from the queue
   messageQueue.DeleteMessage(msg)

The asynchronous methods can be used to create a simple little event based queue processing server.

from AzureQueueHelper import *
from System.Threading import *

# auto reset event to keep it alive
are = AutoResetEvent(False)

# create a connection to the development storage service
queueStorage = QueueStorage.Create(QueueAccount().GetStorageAccountInfo())

# create a queue by name
messageQueue = queueStorage.GetQueue("test1")

# this will create the queue if it does't exist, but will do nothing if it already exists
messageQueue.CreateQueue()

# handle message from the queue
def MessageReceieved(sender, e):
   print e.Message.ContentAsString()
   messageQueue.DeleteMessage(e.Message)

# setup polling, should keep reading untill the queue is empty, then poll for 5 seconds
messageQueue.MessageReceived += MessageReceieved
messageQueue.PollInterval = 10000;
messageQueue.StartReceiving();

# repeat until are is set (forever in this example)
are.WaitOne()

Its really amazing how little is required to extended this little server to the worker process discussed in this post using just the table services I've previously posted about.

Comments

Comment