--- a
+++ b/pyforge/flyway/runner.py
@@ -0,0 +1,150 @@
+import logging
+
+from .model import MigrationInfo
+from .migrate import Migration
+
+log = logging.getLogger(__name__)
+
+def run_migration(datastore, target_versions, dry_run):
+ '''Attempt to migrate the database to a specific set of required
+ modules & versions.'''
+ # Get the migration status of the db
+ session = MigrationInfo.__mongometa__.session
+ session.bind = datastore
+ info = MigrationInfo.m.get()
+ if info is None:
+ info = MigrationInfo.make({})
+ latest_versions = Migration.latest_versions()
+ for k,v in target_versions.iteritems():
+ cur = info.versions.get(k, -1)
+ islatest = ' (LATEST)' if v == latest_versions[k] else ''
+ log.info('Target %s=%s%s (current=%s)', k, v, islatest, cur)
+ # Create a migration plan
+ plan = list(plan_migration(session, info, target_versions))
+ # Execute (or print) the plan
+ for step in plan:
+ log.info('Migrate %r', step)
+ if dry_run: continue
+ step.apply(info.versions)
+ info.m.save()
+
+def reset_migration(datastore, dry_run):
+ '''Reset the state of the database to non-version-controlled WITHOUT migrating
+
+ This is equivalent to setting all the versions to -1.'''
+ session = MigrationInfo.__mongometa__.session
+ session.bind = datastore
+ log.info('Reset migrations')
+ if not dry_run:
+ MigrationInfo.m.remove()
+
+class MigrationStep(object):
+
+ def __init__(self, session, module, version, direction):
+ self.module = module
+ self.version = version
+ self.direction = direction
+ self.session = session
+ self.migration = Migration.get(module, version)(session)
+ if direction == 'up':
+ self.requires = dict(self.migration.requires())
+ self.postcondition = {module:version}
+ else:
+ self.requires = {module:version}
+ self.postcondition = {module:version-1}
+
+ def __repr__(self):
+ return '<%s on %s.%s>' % (self.direction, self.module, self.version)
+
+ def apply(self, state):
+ state.update(self.postcondition)
+ if self.direction == 'up':
+ self.migration.up()
+ else:
+ self.migration.down()
+
+ def unmet_requirements(self, state):
+ result = {}
+ for k,v in self.requires.iteritems():
+ if state.get(k, -1) != v: result[k] = v
+ return result
+
+ def precluded_by(self, other_step):
+ for k,v in self.requires.iteritems():
+ if other_step.postcondition.get(k, v) != v:
+ return True
+ return False
+
+ def add_requirements(self, steps, req):
+ if self.direction == 'down':
+ mod,ver = self.module, self.version-1
+ if (mod,ver) in steps: return
+ if req[mod] == ver: return
+ step = MigrationStep(self.session, mod, ver, 'down')
+ steps[mod,ver] = step
+ step.add_requirements(steps, req)
+ for mod, ver in self.requires.iteritems():
+ if (mod,ver) in steps: continue
+ if ver != -1:
+ step = MigrationStep(self.session, mod, ver, self.direction)
+ steps[mod,ver] = step
+ step.add_requirements(steps, req)
+
+def plan_migration(session, info, target_versions):
+ '''Create a migration plan based on the current DB state and the
+ target version set'''
+ # Determine all the (final) migrations that need to be run
+ steps = {}
+ for mod,req_ver in target_versions.iteritems():
+ cur_ver = info.versions.get(mod, -1)
+ if cur_ver < req_ver:
+ steps[mod,req_ver] = MigrationStep(session, mod, req_ver, 'up')
+ elif cur_ver > req_ver:
+ steps[mod,cur_ver] = MigrationStep(session, mod, cur_ver, 'down')
+ # Add the dependencies of all the migrations
+ current = dict(info.versions)
+ for step in steps.values():
+ step.add_requirements(steps, target_versions)
+ # Schedule migrations to be run
+ steps = sorted(steps.values(), key=lambda s:(s.version, s.module))
+ log.debug('Migrations to be run: %r', steps)
+ while steps:
+ step = _pop_step(steps, current)
+ log.info('State %s, step %s', current, step)
+ yield step
+ current.update(step.postcondition)
+
+def _pop_step(steps, current):
+ '''This method looks at all the available migration steps and the current
+ current versioning state and chooses a migration step to run next, removing it
+ from the list of available migration steps and returning it.
+ '''
+ # Find all "valid" migrations, i.e. migrations whose requirements() are met
+ valid = []
+ invalid = []
+ for s in steps:
+ if s.unmet_requirements(current):
+ invalid.append(s)
+ else:
+ valid.append(s)
+ # If there's only one valid migration, then it's the next one we'll run
+ if len(valid) == 1:
+ steps[:] = invalid
+ return valid[0]
+ # Find a migration that does not preclude other valid migrations
+ # from running
+ for i, step_a in enumerate(valid):
+ for j, step_b in enumerate(valid):
+ if i == j: continue # don't check against self
+ if step_b.precluded_by(step_a): break # conflict, step_a is not next
+ else:
+ # No conflicts found, step_a is the next step
+ # Remove step_a from the list of steps
+ steps[:] = [ s for s in steps if s is not step_a ]
+ return step_a
+ # No next step found, could be circular dependency. Log the error and raise
+ # a ValueError
+ log.error('Cannot find valid step at state %s', current)
+ for v in valid:
+ log.error(' %r', v)
+ raise ValueError, "Plan stuck"