Feb 8 2010

Memcache lockless queue implementation (v2)

Jonathan

The new Google App Engine 1.3.1 SDK has an added method on memcache called “grab_tail”. This is a great little method that removes the need for my lockless queue to manage the read counter. We still need to manage the write counter though, because there doesn’t seem to be a way to add items to a memcache namespace with an arbitrary name.

I have also added a way to rollback in the case that your processing of queue items encounters an error. I was doing the reads non-destructively earlier, so I just needed to reset the read-counter to an appropriate point, but now (because grab_tail is destructive), we need to re-queue the messages.

<rant>

In this implementation, the method that gets the next write counter “__nextCounter”, is far more complicated than it should be, because of an outstanding defect on memcache. This defect is probably quite simple to fix, but since I reported it almost 6 months ago, it hasn’t even been acknowledged. I do really wonder about the value of an issue tracker to a community that doesn’t use it. If the google team are not going to use the issue tracker, it would be far better to acknowledge that and get rid of it completely.

</rant>

Here it is

from google.appengine.api import memcache
import logging
 
class Queue(object):
    itemPrefix="queueItem"
    writeCounter="writeCounter"
    def __init__(self, queueName):
        self.name = queueName
 
    def write(self, msg):
        counter = self.__nextWriteCounter()[0]
        msgKey = self.itemPrefix + str(counter)
        if not memcache.add(msgKey, msg, namespace=self.name):
            raise QueueException("msg key already existed: %s" % msgKey)
        logging.debug("wrote to %s:%s" % (self.name, msgKey))
 
    def writeMulti(self, messages):
        if len(messages) == 0:
            return
        mapping={}
        counters = self.__nextWriteCounter(len(messages))
        for msg,counter in zip(messages,counters):
            mapping[self.itemPrefix + str(counter)]=msg
 
        self.writeMulti(memcache.add_multi(mapping, namespace=self.name))
 
    def read(self):
        result = memcache.grab_tail(item_count=1, namespace=self.name)
        if len(result) > 0:
            return result[0]
        else:
            return None
 
    def readMulti(self, maxItems=100):
        return memcache.grab_tail(item_count=maxItems, namespace=self.name)
 
    def requeueMessages(self, messages):
        self.writeMulti(messages)
 
    def __currentWriteCounter(self):
        return self.__currentCounter(self.writeCounter, 0)
    def __nextWriteCounter(self,howmany=1):
        return self.__nextCounter(self.writeCounter, howmany=howmany)
 
    def __currentCounter(self, key, default):
        counter = memcache.get(key, namespace=self.name)
        if not counter:
            memcache.set(key, default, namespace=self.name)
            counter = default
        return counter
 
    def __nextCounter(self, key, howmany):
        counter = memcache.incr(key, namespace=self.name+"c", delta=howmany)
        if counter is None:
            if not memcache.add(key, howmany, namespace=self.name+"c"):
                # handles the case where another thread got in first with the add
                counter = memcache.incr(key, namespace=self.name+"c",delta=howmany)
                if counter==None:
                    raise QueueException("could not increment counter: %s" % key)
            else:
                counter = howmany
        return range(counter-(howmany-1),counter+1)
 
class QueueException(Exception):
    """Base APIProxy error type."""

and to use it:

queue=queue.Queue(clientViews.QUEUE_NAME)
def processMessages(request):
    msgs=[]
    try:
        msgs = clicksAndViewsQueue.readMulti(maxItems=100)
        if len(msgs) > 0:
            for msg in msgs:
                processMessage(msg)
 
            taskqueue.addTask(url=viewUrl, queueName=taskqueue.BACKGROUND_QUEUE)
    except Exception, e:
        clicksAndViewsQueue.requeueMessages(msgs)
        raise

Oct 14 2009

Money database property for Google App Engine

Jonathan

Calculating money is a tricky thing. Your calculations have to be ultra-precise, so no storing things as floats where $1.33 might actually be stored as 1.3299999999. That is no good for calculations… But equally it is no good storing in cents either: what is 133c /2?

Python has a Decimal datatype, but this requires serialisation to String. Nick Johnson from the Google App Engine team wrote a post about how to write a Decimal property for Google App Engine. Unfortunately this because this serialises to String, then when you do any sorting, then you get String sorting: 100 comes before 11, which comes before 20. Bummer. I played around with storing numbers with a bunch of leading zeroes ie: 0000000010. But that starts to feel a bit hacky.

So I wrote a Money database property that has 6 places of precision, and works as a normal Python numeric type. I haven’t implemented all of those methods, just the ones that I needed. I would be happy to take feedback though.

To use:

class Transaction(db.model):
    dateOccurred = db.DateProperty(auto_now_add=True)
    description = db.StringProperty(required=True)
    amount = MoneyProperty(required=True)
 
t = Transaction(description="I got some money", amount=Money(10.34))
from google.appengine.ext import db
 
class Money(object):
	multiple = 1000000.0
	def __init__(self, val, multiply=True):
		if multiply:
			self._intVal = int(float(val) * self.multiple)
		else:
			self._intVal = int(val)
 
	def format(self, places=2):
		return "%.*f" % (places, float(self))
 
	def __float__(self):
		return self._intVal / self.multiple
 
	def __repr__(self):
		return "%.06f" % (self._intVal / self.multiple)
 
	def __mul__(self, other):
		if type(other) == Money:
 			return Money((self._intVal * other._intVal) / self.multiple, False)
		else:
			return Money(self._intVal * other, False)
 
	__rmul__ = __mul__
	def __add__(self, other):
		if type(other) == Money:
 			return Money(self._intVal + other._intVal, False)
		else:
			return Money(self._intVal + (other * self.multiple), False)
 
	__radd__ = __add__
 
	def __cmp__(self, other):
		if other == None:
			return 1
		elif other == "":
			return 1
		elif type(other) != Money:
			return self._intVal - other*self.multiple
		return self._intVal - other._intVal
 
	def __sub__(self, other): 
		return Money(self._intVal - other._intVal, False)
 
	def __rsub__(self, other): 
		return Money(other._intVal - self._intVal, False)
 
	def __div__(self, other): 
		if type(other) == Money:
			return Money((self._intVal * self.multiple) / other._intVal, False)
		else:
			return Money(self._intVal / other, False)
 
	def __rdiv__(self, other): 
		if type(other) == Money:
			return Money((other._intVal * self.multiple) / self._intVal, False)
		else:
			return Money((other * self.multiple * self.multiple) / self._intVal, False)
 
	def __neg__(self): 
		return Money(self._intVal * -1, False)
 
class MoneyProperty(db.Property):
    data_type = Money
 
    def get_value_for_datastore(self, model_instance):
    	value = super(MoneyProperty, self).get_value_for_datastore(model_instance)
    	if value==None:
    		return None
    	elif isinstance(value, Money):
    		return value._intVal
    	else:
    		return Money(value)._intVal
    def make_value_from_datastore(self, value):
    	if value==None:
    		return None
    	else:
    		return Money(value, False)
 
    def empty(self, value):
    	return value == None
 
	def get_value_for_form(self, instance):
		value = super(MoneyProperty, self).get_value_for_form(instance)
		if not value:
			return None
		if isinstance(value, Money):
			return float(value)
		return value
 
	def make_value_from_form(self, value):
		if not value:
			return []
		if isinstance(value, Money):
			return Money(value)
		return value

Jun 28 2009

Google App Engine version migration

Jonathan

When writing your application on Google App Engine, you are inevitably going to deploy a version to production that does not have a final set of features. This is (of course) unavoidable. So, then for version 2, when creating new features you will also (probably) have to refactor the data model, at least adding new fields, and potentially renaming fields, or creating composed entities. So, when putting version 2 into production the data that all of your users have created in version 1 will need to be migrated to the new data-model.

Nothing exists as of now to do something similar to Rails Migrations, so everything pretty much needs to be done yourself.

The solution that I outline here is an automated way of processing over all entities in a list of Model classes. This solution loads the entities up into the v2 model classes and then calls your code to modify them (and create any new objects that might be required). This solution will not work for the case where the v2 model is missing fields that need to be read to migrate to the new state. That solution would need to be written using the underlying Entity classes and Query. (I haven’t needed to do that yet). This is really more of a recipe that can be modified to your purposes, than a generic drop-in migration tool.

The idea came from code copied from a google demo.

Here is the file: migrate.py

Directions for use (these directions are for Django, but if you are using the basic handler it would look very similar):

1. Rename migrate.py to migrateXToY.py (for your X and your Y)

2. Modify your urls.py to contain:

patterns("migrateXToY",
                       (r'^migrate/migrateXToY$', 'migrateXToY'),
                       (r'^worker/migrateXToY/(?P[^/]+)$', 'migrateModel'),
)

3. Create a migrateXToY method in the migrate.py like the following (modelsToMigrate is the list of model names that will be processed):

def migrate1To2(request):
    modelsToMigrate = ['User', 'Foo', 'Bar', 'BarDetail', 'Image', 'Report']
    [taskqueue.add(url='/worker/migrateXToY/%s' % i) for i in modelsToMigrate]
    return HttpResponse("created all tasks for processing")

4. Create a FooWorker class like the following (the migrateItem method will receive each item that is being migrated, the method should return the item after processing and should not put it. For efficiency puts are batched):

class PlacementWorker(MigrationWorker):
    kind=Foo
    kindName="Foo"
    def migrateItem(self, item):
        logging.info("processing %s %s" % (self.kindName, item.key()))
        #whatever processing you need to do
        item.cancelled=False
        return item

Jun 24 2009

Memcache lockless queue implementation

Jonathan

I had a need for an application that I am writing on Google App Engine for a way to store jobs and then process them all at once. I found the idea for a Memcached lockless queue and created an implementation of it: queue.py
To write to it (I do this from a view where I want to store some stats about the data that was shown in the view):

thisQueue=queue.Queue(QUEUE_NAME)
def method():
    thisQueue.write(data)

and then later on to read from it. I created a cron job that is executed often.

thisQueue=queue.Queue(QUEUE_NAME)
def cronMethod():
    msg = thisQueue.read()
    while msg:
        processMessage(msg)
        msg = thisQueue.read()

If you are using this on Google App Engine, you should also be careful that you don’t run out of time to execute. If you get a DeadlineExceededError after you have read, but before you have finished processing, then the message might get lost.