Google App Engine migration script (v2)

Jonathan Ricketson

After talking about theprevious version of my migration script, I had need to make some significant changes to it. These changes support loading model classes that do not validate in their current model version. I.E. if you have added a new required field, or have renamed a field then you can make these changes using this script.

This uses the underlying Query and Entity types, so no Model constraints are enforced. But after you make all the changes to the entity, it is then loaded into the Model class to get any defaults applied and validation.

So the main features of this migration approach is:

  • Uses the task queue for paging over all specified Model classes
  • Sets defaults from the Model definition
  • Allows adding/removing modifying of fields through a Dictionary like object (the Entity)

If you just want to apply any new defaults, you don’t need to do anything, other than specify the model class in the list to migrate. Any classes without a MigrationWorker are loaded into their Model class and have defaults and validation applied through that mechanism.

If there are specific things that you want to do to the object then you need to create your own MigrationWorker.

class ads_fooWorker(MigrationWorker):
    kind = "ads_foo"
    def processItem(self, item):
        logging.info("processing %s %s" % (item.kind(), item.key()))
        logging.info(item)
        if "views" in item and type(item['views']) == type([]):
            item['views'] = sum(item['views'])
        elif "views" not in item:
            item['views'] = 0
        if "clicks" in item and type(item['clicks']) == type([]):
            item['clicks'] = sum(item['clicks'])
        elif "clicks" not in item:
            item['clicks'] = 0
        super(ads_fooWorker, self).processItem(item)

In this class we are migrating a model class that (because we are using AEP) is called ads_foo. The underlying Entity object that we are operating on is a Dictionary like object, so we can look for keys, add keys and ‘del’ keys. As you can see here, we are changing a field from being a List of Integers to being a single Integer. This would not be possible if the object were loaded into the Model class.

The thing to be careful of (if using the taskQueue) is to make sure that you check if you have already made the changes as you are not guaranteed that your task will not be run twice. So make sure that your MigrationWorker checks if it needs to do it’s work.

The full code is here. To do my migration I usually add my MigrationWorkers directly to this script.

import logging
 
from django.http import HttpResponse
 
from google.appengine.api import datastore
from google.appengine.ext import db
from google.appengine.api.labs import taskqueue
from google.appengine.runtime import apiproxy_errors
 
"""migrate from version x to version y"""
def migrate(request):
    modelsToMigrate = ["ads_foo",
                     "ads_bar"]
    [_addTask(url=Worker.worker_url % i) for i in modelsToMigrate]
    return HttpResponse("created all tasks for processing")
 
#General Migration
def migrateModel(request, model_name):
    #create the worker class and tell it to work
    workerName = '%sWorker' % model_name
    if not workerName in globals():
        logging.info("no worker for %s" % model_name)
        Worker = generateWorker(model_name)
    else:
        Worker = globals()[workerName]
    if "start" in request.REQUEST:
        Worker(request.REQUEST['start']).work()
    else:
        Worker().work()
    return HttpResponse("ok")
 
class MigrationWorker(object):
    ITEMS_TO_FETCH = 10
    worker_url = "/worker/migrate/%s"
    def __init__(self, startKey=None):
        self.startKey = startKey
 
    def work(self):
        query = datastore.Query(self.kind)
        if self.startKey:
            query['__key__ >'] = db.Key(self.startKey)
        items = query.Get(self.ITEMS_TO_FETCH)
        if not items:
            logging.info('Finished migrating %s' % self.kind)
            return
 
        last_key = items[-1].key()
        [self.processItem(x) for x in items]
 
        _addTask(url=self.worker_url % self.kind, params=dict(start=last_key))
        logging.info('Added another task to queue for %s starting at %s' %
                     (self.kind, last_key))
 
    """Override this method to do some work for each item
    """
    def processItem(self, item):
        logging.info("processing %s %s" % (item.kind(), item.key()))
        modelClass = db.class_for_kind(item.kind()).from_entity(item)
        modelClass.put()
 
def generateWorker(kind_name):
    class DynamicClass(MigrationWorker):
        kind = kind_name
    return DynamicClass
 
class ads_fooWorker(MigrationWorker):
    kind = "ads_foo"
    def processItem(self, item):
        logging.info("processing %s %s" % (item.kind(), item.key()))
        logging.info(item)
        if "views" in item and type(item['views']) == type([]):
            item['views'] = sum(item['views'])
        elif "views" not in item:
            item['views'] = 0
        if "clicks" in item and type(item['clicks']) == type([]):
            item['clicks'] = sum(item['clicks'])
        elif "clicks" not in item:
            item['clicks'] = 0
        super(ads_fooWorker, self).processItem(item)
 
def _addTask(url, params={}, queueName='default'):
    try:
        logging.info("add task to %s [%s]" % (queueName, (url, params)))
        task = taskqueue.Task(url=url, params=params)
        task.add(queueName)
    except taskqueue.TransientError, e:
        logging.exception("adding Task failed with a TransientError")
        addTask(url, params, queueName)
    except apiproxy_errors.OverQuotaError, e:
        #but keep going
        logging.exception("adding Task failed with a TransientError")

This entry was posted on September 11th, 2009.