Creo que hace algunas preguntas muy buenas que resaltan lo útil que puede ser SWF como servicio. En resumen, no le dice a sus servidores que coordinen el trabajo entre ellos. Su decisor orquesta todo esto por usted, con la ayuda del servicio SWF.
La implementación de su flujo de trabajo será de la siguiente manera:
- Registrar su flujo de trabajo y sus actividades con el servicio (único).
- Implemente el decisor y los trabajadores.
- Deje correr a sus trabajadores y decisores.
- Iniciar un nuevo flujo de trabajo.
Hay varias formas de introducir credenciales en el código de boto.swf. A los efectos de este ejercicio, recomiendo exportarlos al entorno antes de ejecutar el siguiente código:
export AWS_ACCESS_KEY_ID=<your access key>
export AWS_SECRET_ACCESS_KEY=<your secret key>
1) Para registrar el dominio, el flujo de trabajo y las actividades, ejecute lo siguiente:
# ab_setup.py
import boto.swf.layer2 as swf
DOMAIN = 'stackoverflow'
ACTIVITY1 = 'ServerAActivity'
ACTIVITY2 = 'ServerBActivity'
VERSION = '1.0'
swf.Domain(name=DOMAIN).register()
swf.ActivityType(domain=DOMAIN, name=ACTIVITY1, version=VERSION, task_list='a_tasks').register()
swf.ActivityType(domain=DOMAIN, name=ACTIVITY2, version=VERSION, task_list='b_tasks').register()
swf.WorkflowType(domain=DOMAIN, name='MyWorkflow', version=VERSION, task_list='default_tasks').register()
2) Implementar y ejecutar decisores y trabajadores.
# ab_decider.py
import time
import boto.swf.layer2 as swf
DOMAIN = 'stackoverflow'
ACTIVITY1 = 'ServerAActivity'
ACTIVITY2 = 'ServerBActivity'
VERSION = '1.0'
class ABDecider(swf.Decider):
domain = DOMAIN
task_list = 'default_tasks'
version = VERSION
def run(self):
history = self.poll()
# Print history to familiarize yourself with its format.
print history
if 'events' in history:
# Get a list of non-decision events to see what event came in last.
workflow_events = [e for e in history['events']
if not e['eventType'].startswith('Decision')]
decisions = swf.Layer1Decisions()
# Record latest non-decision event.
last_event = workflow_events[-1]
last_event_type = last_event['eventType']
if last_event_type == 'WorkflowExecutionStarted':
# At the start, get the worker to fetch the first assignment.
decisions.schedule_activity_task('%s-%i' % (ACTIVITY1, time.time()),
ACTIVITY1, VERSION, task_list='a_tasks')
elif last_event_type == 'ActivityTaskCompleted':
# Take decision based on the name of activity that has just completed.
# 1) Get activity's event id.
last_event_attrs = last_event['activityTaskCompletedEventAttributes']
completed_activity_id = last_event_attrs['scheduledEventId'] - 1
# 2) Extract its name.
activity_data = history['events'][completed_activity_id]
activity_attrs = activity_data['activityTaskScheduledEventAttributes']
activity_name = activity_attrs['activityType']['name']
# 3) Optionally, get the result from the activity.
result = last_event['activityTaskCompletedEventAttributes'].get('result')
# Take the decision.
if activity_name == ACTIVITY1:
# Completed ACTIVITY1 just came in. Kick off ACTIVITY2.
decisions.schedule_activity_task('%s-%i' % (ACTIVITY2, time.time()),
ACTIVITY2, VERSION, task_list='b_tasks', input=result)
elif activity_name == ACTIVITY2:
# Server B completed activity. We're done.
decisions.complete_workflow_execution()
self.complete(decisions=decisions)
return True
Los trabajadores son mucho más simples, no necesita usar la herencia si no quiere.
# ab_worker.py
import os
import time
import boto.swf.layer2 as swf
DOMAIN = 'stackoverflow'
ACTIVITY1 = 'ServerAActivity'
ACTIVITY2 = 'ServerBActivity'
VERSION = '1.0'
class MyBaseWorker(swf.ActivityWorker):
domain = DOMAIN
version = VERSION
task_list = None
def run(self):
activity_task = self.poll()
print activity_task
if 'activityId' in activity_task:
# Get input.
# Get the method for the requested activity.
try:
self.activity(activity_task.get('input'))
except Exception, error:
self.fail(reason=str(error))
raise error
return True
def activity(self, activity_input):
raise NotImplementedError
class WorkerA(MyBaseWorker):
task_list = 'a_tasks'
def activity(self, activity_input):
result = str(time.time())
print 'worker a reporting time: %s' % result
self.complete(result=result)
class WorkerB(MyBaseWorker):
task_list = 'b_tasks'
def activity(self, activity_input):
result = str(os.getpid())
print 'worker b returning pid: %s' % result
self.complete(result=result)
3) Ejecute sus decisores y trabajadores. Su decisor y trabajadores pueden ejecutarse desde hosts separados o desde la misma máquina. Abra cuatro terminales y ejecute sus actores:
Primero tu decisor
$ python -i ab_decider.py
>>> while ABDecider().run(): pass
...
Entonces, el trabajador A, podría hacer esto desde el servidor A:
$ python -i ab_workers.py
>>> while WorkerA().run(): pass
Luego, el trabajador B, posiblemente del servidor B, pero si los ejecuta todos desde una computadora portátil, funcionará igual de bien:
$ python -i ab_workers.py
>>> while WorkerB().run(): pass
...
4) Finalmente, inicie el flujo de trabajo.
$ python
Python 2.6.5 (r265:79063, Apr 16 2010, 13:57:41)
[GCC 4.4.3] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> import boto.swf.layer2 as swf
>>> workflows = swf.Domain(name='stackoverflow').workflows()
>>> workflows
[<WorkflowType 'MyWorkflow-1.0' at 0xdeb1d0>]
>>> execution = workflows[0].start(task_list='default_tasks')
>>>
Vuelva atrás para ver qué sucede con sus actores. Es posible que se desconecten del servicio después de un minuto de inactividad. Si eso sucede, presione la flecha hacia arriba + Intro para volver a ingresar al ciclo de sondeo.
Ahora puede ir al panel SWF de su consola de administración de AWS, ver cómo están las ejecuciones y ver su historial. Alternativamente, puede consultarlo a través de la línea de comando.
>>> execution.history()
[{'eventId': 1, 'eventType': 'WorkflowExecutionStarted',
'workflowExecutionStartedEventAttributes': {'taskList': {'name': 'default_tasks'},
'parentInitiatedEventId': 0, 'taskStartToCloseTimeout': '300', 'childPolicy':
'TERMINATE', 'executionStartToCloseTimeout': '3600', 'workflowType': {'version':
'1.0', 'name': 'MyWorkflow'}}, 'eventTimestamp': 1361132267.5810001}, {'eventId': 2,
'eventType': 'DecisionTaskScheduled', 'decisionTaskScheduledEventAttributes':
{'startToCloseTimeout': '300', 'taskList': {'name': ...
Ese es solo un ejemplo de un flujo de trabajo con ejecución en serie de actividades, pero también es posible que el decisor programe y coordine la ejecución paralela de actividades.
Espero que esto al menos te ayude a empezar. Para un ejemplo un poco más complejo de un flujo de trabajo en serie, recomiendo mirar esto.
No tengo ningún código de ejemplo para compartir, pero definitivamente puede usar SWF para coordinar la ejecución de scripts en dos servidores. La idea principal con esto es crear tres piezas de código que hablen con SWF:
- Un componente que sabe qué secuencia de comandos ejecutar primero y qué hacer una vez que se termina de ejecutar la primera secuencia de comandos. Esto se denomina "decisivo" en términos de SWF.
- Dos componentes que entienden cómo ejecutar el script específico que desea ejecutar en cada máquina. Estos se denominan "trabajadores de actividad" en términos de SWF.
El primer componente, el decisor, llama a dos API de SWF:PollForDecisionTask y RespondDecisionTaskCompleted. La solicitud de encuesta le dará al componente decisorio el historial actual de un flujo de trabajo en ejecución, básicamente la información de estado "dónde estoy" para su ejecutor de secuencias de comandos. Usted escribe un código que analiza estos eventos y determina qué secuencia de comandos debe ejecutar. Estos "comandos" para ejecutar un script tendrían la forma de una programación de una tarea de actividad, que se devuelve como parte de la llamada a RespondDecisionTaskCompleted.
Los segundos componentes que escribe, los trabajadores de actividad, cada uno llama a dos API de SWF:PollForActivityTask y RespondActivityTaskCompleted. La solicitud de sondeo le dará al trabajador de la actividad una indicación de que debe ejecutar el script que conoce, lo que SWF llama una tarea de actividad. La información devuelta por la solicitud de sondeo a SWF puede incluir datos específicos de ejecución únicos que se enviaron a SWF como parte de la programación de la tarea de actividad. Cada uno de sus servidores sondeará de forma independiente SWF en busca de tareas de actividad para indicar la ejecución del script local en ese host. Una vez que el trabajador termina de ejecutar el script, vuelve a llamar a SWF a través de la API RespondActivityTaskCompleted.
La devolución de llamada de su trabajador de actividad a SWF da como resultado que se entregue un nuevo historial al componente decisorio que ya mencioné. Verá el historial, verá que el primer script está hecho y programará la ejecución del segundo. Una vez que ve que el segundo está hecho, puede "cerrar" el flujo de trabajo usando otro tipo de decisión.
Usted inicia todo el proceso de ejecución de los scripts en cada host llamando a la API StartWorkflowExecution. Esto crea el registro del proceso general en SWF y envía el primer historial al proceso decisorio para programar la ejecución del primer script en el primer host.
Esperemos que esto brinde un poco más de contexto sobre cómo lograr este tipo de flujo de trabajo utilizando SWF. Si aún no lo ha hecho, echaría un vistazo a la guía para desarrolladores en la página de SWF para obtener información adicional.