Memcache lockless queue implementation (v2)

Jonathan Ricketson

The new Google App Engine 1.3.1 SDK has an added method on memcache called “grab_tail”. This is a great little method that removes the need for my lockless queue to manage the read counter. We still need to manage the write counter though, because there doesn’t seem to be a way to add items to a memcache namespace with an arbitrary name.

I have also added a way to rollback in the case that your processing of queue items encounters an error. I was doing the reads non-destructively earlier, so I just needed to reset the read-counter to an appropriate point, but now (because grab_tail is destructive), we need to re-queue the messages.

<rant>

In this implementation, the method that gets the next write counter “__nextCounter”, is far more complicated than it should be, because of an outstanding defect on memcache. This defect is probably quite simple to fix, but since I reported it almost 6 months ago, it hasn’t even been acknowledged. I do really wonder about the value of an issue tracker to a community that doesn’t use it. If the google team are not going to use the issue tracker, it would be far better to acknowledge that and get rid of it completely.

</rant>

Here it is

from google.appengine.api import memcache
import logging
 
class Queue(object):
    itemPrefix="queueItem"
    writeCounter="writeCounter"
    def __init__(self, queueName):
        self.name = queueName
 
    def write(self, msg):
        counter = self.__nextWriteCounter()[0]
        msgKey = self.itemPrefix + str(counter)
        if not memcache.add(msgKey, msg, namespace=self.name):
            raise QueueException("msg key already existed: %s" % msgKey)
        logging.debug("wrote to %s:%s" % (self.name, msgKey))
 
    def writeMulti(self, messages):
        if len(messages) == 0:
            return
        mapping={}
        counters = self.__nextWriteCounter(len(messages))
        for msg,counter in zip(messages,counters):
            mapping[self.itemPrefix + str(counter)]=msg
 
        self.writeMulti(memcache.add_multi(mapping, namespace=self.name))
 
    def read(self):
        result = memcache.grab_tail(item_count=1, namespace=self.name)
        if len(result) > 0:
            return result[0]
        else:
            return None
 
    def readMulti(self, maxItems=100):
        return memcache.grab_tail(item_count=maxItems, namespace=self.name)
 
    def requeueMessages(self, messages):
        self.writeMulti(messages)
 
    def __currentWriteCounter(self):
        return self.__currentCounter(self.writeCounter, 0)
    def __nextWriteCounter(self,howmany=1):
        return self.__nextCounter(self.writeCounter, howmany=howmany)
 
    def __currentCounter(self, key, default):
        counter = memcache.get(key, namespace=self.name)
        if not counter:
            memcache.set(key, default, namespace=self.name)
            counter = default
        return counter
 
    def __nextCounter(self, key, howmany):
        counter = memcache.incr(key, namespace=self.name+"c", delta=howmany)
        if counter is None:
            if not memcache.add(key, howmany, namespace=self.name+"c"):
                # handles the case where another thread got in first with the add
                counter = memcache.incr(key, namespace=self.name+"c",delta=howmany)
                if counter==None:
                    raise QueueException("could not increment counter: %s" % key)
            else:
                counter = howmany
        return range(counter-(howmany-1),counter+1)
 
class QueueException(Exception):
    """Base APIProxy error type."""

and to use it:

queue=queue.Queue(clientViews.QUEUE_NAME)
def processMessages(request):
    msgs=[]
    try:
        msgs = clicksAndViewsQueue.readMulti(maxItems=100)
        if len(msgs) > 0:
            for msg in msgs:
                processMessage(msg)
 
            taskqueue.addTask(url=viewUrl, queueName=taskqueue.BACKGROUND_QUEUE)
    except Exception, e:
        clicksAndViewsQueue.requeueMessages(msgs)
        raise

This entry was posted on February 8th, 2010.