Feb
8
2010
Jonathan
The new Google App Engine 1.3.1 SDK has an added method on memcache called “grab_tail”. This is a great little method that removes the need for my lockless queue to manage the read counter. We still need to manage the write counter though, because there doesn’t seem to be a way to add items to a memcache namespace with an arbitrary name.
I have also added a way to rollback in the case that your processing of queue items encounters an error. I was doing the reads non-destructively earlier, so I just needed to reset the read-counter to an appropriate point, but now (because grab_tail is destructive), we need to re-queue the messages.
<rant>
In this implementation, the method that gets the next write counter “__nextCounter”, is far more complicated than it should be, because of an outstanding defect on memcache. This defect is probably quite simple to fix, but since I reported it almost 6 months ago, it hasn’t even been acknowledged. I do really wonder about the value of an issue tracker to a community that doesn’t use it. If the google team are not going to use the issue tracker, it would be far better to acknowledge that and get rid of it completely.
</rant>
Here it is
from google.appengine.api import memcache
import logging
class Queue(object):
itemPrefix="queueItem"
writeCounter="writeCounter"
def __init__(self, queueName):
self.name = queueName
def write(self, msg):
counter = self.__nextWriteCounter()[0]
msgKey = self.itemPrefix + str(counter)
if not memcache.add(msgKey, msg, namespace=self.name):
raise QueueException("msg key already existed: %s" % msgKey)
logging.debug("wrote to %s:%s" % (self.name, msgKey))
def writeMulti(self, messages):
if len(messages) == 0:
return
mapping={}
counters = self.__nextWriteCounter(len(messages))
for msg,counter in zip(messages,counters):
mapping[self.itemPrefix + str(counter)]=msg
self.writeMulti(memcache.add_multi(mapping, namespace=self.name))
def read(self):
result = memcache.grab_tail(item_count=1, namespace=self.name)
if len(result) > 0:
return result[0]
else:
return None
def readMulti(self, maxItems=100):
return memcache.grab_tail(item_count=maxItems, namespace=self.name)
def requeueMessages(self, messages):
self.writeMulti(messages)
def __currentWriteCounter(self):
return self.__currentCounter(self.writeCounter, 0)
def __nextWriteCounter(self,howmany=1):
return self.__nextCounter(self.writeCounter, howmany=howmany)
def __currentCounter(self, key, default):
counter = memcache.get(key, namespace=self.name)
if not counter:
memcache.set(key, default, namespace=self.name)
counter = default
return counter
def __nextCounter(self, key, howmany):
counter = memcache.incr(key, namespace=self.name+"c", delta=howmany)
if counter is None:
if not memcache.add(key, howmany, namespace=self.name+"c"):
# handles the case where another thread got in first with the add
counter = memcache.incr(key, namespace=self.name+"c",delta=howmany)
if counter==None:
raise QueueException("could not increment counter: %s" % key)
else:
counter = howmany
return range(counter-(howmany-1),counter+1)
class QueueException(Exception):
"""Base APIProxy error type."""
and to use it:
queue=queue.Queue(clientViews.QUEUE_NAME)
def processMessages(request):
msgs=[]
try:
msgs = clicksAndViewsQueue.readMulti(maxItems=100)
if len(msgs) > 0:
for msg in msgs:
processMessage(msg)
taskqueue.addTask(url=viewUrl, queueName=taskqueue.BACKGROUND_QUEUE)
except Exception, e:
clicksAndViewsQueue.requeueMessages(msgs)
raise
no comments | posted in Development, google-app-engine, python
Oct
14
2009
Jonathan
Calculating money is a tricky thing. Your calculations have to be ultra-precise, so no storing things as floats where $1.33 might actually be stored as 1.3299999999. That is no good for calculations… But equally it is no good storing in cents either: what is 133c /2?
Python has a Decimal datatype, but this requires serialisation to String. Nick Johnson from the Google App Engine team wrote a post about how to write a Decimal property for Google App Engine. Unfortunately this because this serialises to String, then when you do any sorting, then you get String sorting: 100 comes before 11, which comes before 20. Bummer. I played around with storing numbers with a bunch of leading zeroes ie: 0000000010. But that starts to feel a bit hacky.
So I wrote a Money database property that has 6 places of precision, and works as a normal Python numeric type. I haven’t implemented all of those methods, just the ones that I needed. I would be happy to take feedback though.
To use:
class Transaction(db.model):
dateOccurred = db.DateProperty(auto_now_add=True)
description = db.StringProperty(required=True)
amount = MoneyProperty(required=True)
t = Transaction(description="I got some money", amount=Money(10.34))
from google.appengine.ext import db
class Money(object):
multiple = 1000000.0
def __init__(self, val, multiply=True):
if multiply:
self._intVal = int(float(val) * self.multiple)
else:
self._intVal = int(val)
def format(self, places=2):
return "%.*f" % (places, float(self))
def __float__(self):
return self._intVal / self.multiple
def __repr__(self):
return "%.06f" % (self._intVal / self.multiple)
def __mul__(self, other):
if type(other) == Money:
return Money((self._intVal * other._intVal) / self.multiple, False)
else:
return Money(self._intVal * other, False)
__rmul__ = __mul__
def __add__(self, other):
if type(other) == Money:
return Money(self._intVal + other._intVal, False)
else:
return Money(self._intVal + (other * self.multiple), False)
__radd__ = __add__
def __cmp__(self, other):
if other == None:
return 1
elif other == "":
return 1
elif type(other) != Money:
return self._intVal - other*self.multiple
return self._intVal - other._intVal
def __sub__(self, other):
return Money(self._intVal - other._intVal, False)
def __rsub__(self, other):
return Money(other._intVal - self._intVal, False)
def __div__(self, other):
if type(other) == Money:
return Money((self._intVal * self.multiple) / other._intVal, False)
else:
return Money(self._intVal / other, False)
def __rdiv__(self, other):
if type(other) == Money:
return Money((other._intVal * self.multiple) / self._intVal, False)
else:
return Money((other * self.multiple * self.multiple) / self._intVal, False)
def __neg__(self):
return Money(self._intVal * -1, False)
class MoneyProperty(db.Property):
data_type = Money
def get_value_for_datastore(self, model_instance):
value = super(MoneyProperty, self).get_value_for_datastore(model_instance)
if value==None:
return None
elif isinstance(value, Money):
return value._intVal
else:
return Money(value)._intVal
def make_value_from_datastore(self, value):
if value==None:
return None
else:
return Money(value, False)
def empty(self, value):
return value == None
def get_value_for_form(self, instance):
value = super(MoneyProperty, self).get_value_for_form(instance)
if not value:
return None
if isinstance(value, Money):
return float(value)
return value
def make_value_from_form(self, value):
if not value:
return []
if isinstance(value, Money):
return Money(value)
return value
2 comments | posted in Development, google-app-engine, python
Jun
28
2009
Jonathan
When writing your application on Google App Engine, you are inevitably going to deploy a version to production that does not have a final set of features. This is (of course) unavoidable. So, then for version 2, when creating new features you will also (probably) have to refactor the data model, at least adding new fields, and potentially renaming fields, or creating composed entities. So, when putting version 2 into production the data that all of your users have created in version 1 will need to be migrated to the new data-model.
Nothing exists as of now to do something similar to Rails Migrations, so everything pretty much needs to be done yourself.
The solution that I outline here is an automated way of processing over all entities in a list of Model classes. This solution loads the entities up into the v2 model classes and then calls your code to modify them (and create any new objects that might be required). This solution will not work for the case where the v2 model is missing fields that need to be read to migrate to the new state. That solution would need to be written using the underlying Entity classes and Query. (I haven’t needed to do that yet). This is really more of a recipe that can be modified to your purposes, than a generic drop-in migration tool.
The idea came from code copied from a google demo.
Here is the file: migrate.py
Directions for use (these directions are for Django, but if you are using the basic handler it would look very similar):
1. Rename migrate.py to migrateXToY.py (for your X and your Y)
2. Modify your urls.py to contain:
patterns("migrateXToY",
(r'^migrate/migrateXToY$', 'migrateXToY'),
(r'^worker/migrateXToY/(?P[^/]+)$', 'migrateModel'),
)
3. Create a migrateXToY method in the migrate.py like the following (modelsToMigrate is the list of model names that will be processed):
def migrate1To2(request):
modelsToMigrate = ['User', 'Foo', 'Bar', 'BarDetail', 'Image', 'Report']
[taskqueue.add(url='/worker/migrateXToY/%s' % i) for i in modelsToMigrate]
return HttpResponse("created all tasks for processing")
4. Create a FooWorker class like the following (the migrateItem method will receive each item that is being migrated, the method should return the item after processing and should not put it. For efficiency puts are batched):
class PlacementWorker(MigrationWorker):
kind=Foo
kindName="Foo"
def migrateItem(self, item):
logging.info("processing %s %s" % (self.kindName, item.key()))
#whatever processing you need to do
item.cancelled=False
return item
1 comment | posted in google-app-engine, python
Jun
24
2009
Jonathan
I had a need for an application that I am writing on Google App Engine for a way to store jobs and then process them all at once. I found the idea for a Memcached lockless queue and created an implementation of it: queue.py
To write to it (I do this from a view where I want to store some stats about the data that was shown in the view):
thisQueue=queue.Queue(QUEUE_NAME)
def method():
thisQueue.write(data)
and then later on to read from it. I created a cron job that is executed often.
thisQueue=queue.Queue(QUEUE_NAME)
def cronMethod():
msg = thisQueue.read()
while msg:
processMessage(msg)
msg = thisQueue.read()
If you are using this on Google App Engine, you should also be careful that you don’t run out of time to execute. If you get a DeadlineExceededError after you have read, but before you have finished processing, then the message might get lost.
no comments | posted in google-app-engine, python