Memcache lockless queue implementation (v2)
The new Google App Engine 1.3.1 SDK has an added method on memcache called “grab_tail”. This is a great little method that removes the need for my lockless queue to manage the read counter. We still need to manage the write counter though, because there doesn’t seem to be a way to add items to a memcache namespace with an arbitrary name.
I have also added a way to rollback in the case that your processing of queue items encounters an error. I was doing the reads non-destructively earlier, so I just needed to reset the read-counter to an appropriate point, but now (because grab_tail is destructive), we need to re-queue the messages.
<rant>
In this implementation, the method that gets the next write counter “__nextCounter”, is far more complicated than it should be, because of an outstanding defect on memcache. This defect is probably quite simple to fix, but since I reported it almost 6 months ago, it hasn’t even been acknowledged. I do really wonder about the value of an issue tracker to a community that doesn’t use it. If the google team are not going to use the issue tracker, it would be far better to acknowledge that and get rid of it completely.
</rant>
Here it is
from google.appengine.api import memcache import logging class Queue(object): itemPrefix="queueItem" writeCounter="writeCounter" def __init__(self, queueName): self.name = queueName def write(self, msg): counter = self.__nextWriteCounter()[0] msgKey = self.itemPrefix + str(counter) if not memcache.add(msgKey, msg, namespace=self.name): raise QueueException("msg key already existed: %s" % msgKey) logging.debug("wrote to %s:%s" % (self.name, msgKey)) def writeMulti(self, messages): if len(messages) == 0: return mapping={} counters = self.__nextWriteCounter(len(messages)) for msg,counter in zip(messages,counters): mapping[self.itemPrefix + str(counter)]=msg self.writeMulti(memcache.add_multi(mapping, namespace=self.name)) def read(self): result = memcache.grab_tail(item_count=1, namespace=self.name) if len(result) > 0: return result[0] else: return None def readMulti(self, maxItems=100): return memcache.grab_tail(item_count=maxItems, namespace=self.name) def requeueMessages(self, messages): self.writeMulti(messages) def __currentWriteCounter(self): return self.__currentCounter(self.writeCounter, 0) def __nextWriteCounter(self,howmany=1): return self.__nextCounter(self.writeCounter, howmany=howmany) def __currentCounter(self, key, default): counter = memcache.get(key, namespace=self.name) if not counter: memcache.set(key, default, namespace=self.name) counter = default return counter def __nextCounter(self, key, howmany): counter = memcache.incr(key, namespace=self.name+"c", delta=howmany) if counter is None: if not memcache.add(key, howmany, namespace=self.name+"c"): # handles the case where another thread got in first with the add counter = memcache.incr(key, namespace=self.name+"c",delta=howmany) if counter==None: raise QueueException("could not increment counter: %s" % key) else: counter = howmany return range(counter-(howmany-1),counter+1) class QueueException(Exception): """Base APIProxy error type."""
and to use it:
queue=queue.Queue(clientViews.QUEUE_NAME) def processMessages(request): msgs=[] try: msgs = clicksAndViewsQueue.readMulti(maxItems=100) if len(msgs) > 0: for msg in msgs: processMessage(msg) taskqueue.addTask(url=viewUrl, queueName=taskqueue.BACKGROUND_QUEUE) except Exception, e: clicksAndViewsQueue.requeueMessages(msgs) raise