1
2
3 USAGE = """
4 ## Example
5
6 For any existing app
7
8 Create File: app/models/scheduler.py ======
9 from gluon.scheduler import Scheduler
10
11 def demo1(*args,**vars):
12 print 'you passed args=%s and vars=%s' % (args, vars)
13 return 'done!'
14
15 def demo2():
16 1/0
17
18 scheduler = Scheduler(db,dict(demo1=demo1,demo2=demo2))
19 ## run worker nodes with:
20
21 cd web2py
22 python gluon/scheduler.py -u sqlite://storage.sqlite \
23 -f applications/myapp/databases/ \
24 -t mytasks.py
25 (-h for info)
26 python scheduler.py -h
27
28 ## schedule jobs using
29 http://127.0.0.1:8000/scheduler/appadmin/insert/db/scheduler_task
30
31 ## monitor scheduled jobs
32 http://127.0.0.1:8000/scheduler/appadmin/select/db?query=db.scheduler_task.id>0
33
34 ## view completed jobs
35 http://127.0.0.1:8000/scheduler/appadmin/select/db?query=db.scheduler_run.id>0
36
37 ## view workers
38 http://127.0.0.1:8000/scheduler/appadmin/select/db?query=db.scheduler_worker.id>0
39
40 ## Comments
41 """
42
43 import os
44 import time
45 import multiprocessing
46 import sys
47 import cStringIO
48 import threading
49 import traceback
50 import signal
51 import socket
52 import datetime
53 import logging
54 import optparse
55
56 try:
57 from gluon.contrib.simplejson import loads, dumps
58 except:
59 from simplejson import loads, dumps
60
61 if 'WEB2PY_PATH' in os.environ:
62 sys.path.append(os.environ['WEB2PY_PATH'])
63 else:
64 os.environ['WEB2PY_PATH'] = os.getcwd()
65
66 from gluon import DAL, Field, IS_NOT_EMPTY, IS_IN_SET
67 from gluon.utils import web2py_uuid
68
69 QUEUED = 'QUEUED'
70 ASSIGNED = 'ASSIGNED'
71 RUNNING = 'RUNNING'
72 COMPLETED = 'COMPLETED'
73 FAILED = 'FAILED'
74 TIMEOUT = 'TIMEOUT'
75 STOPPED = 'STOPPED'
76 ACTIVE = 'ACTIVE'
77 INACTIVE = 'INACTIVE'
78 DISABLED = 'DISABLED'
79 SECONDS = 1
80 HEARTBEAT = 3*SECONDS
81
83 - def __init__(self,app,function,timeout,args='[]',vars='{}',**kwargs):
84 logging.debug(' new task allocated: %s.%s' % (app,function))
85 self.app = app
86 self.function = function
87 self.timeout = timeout
88 self.args = args
89 self.vars = vars
90 self.__dict__.update(kwargs)
92 return '<Task: %s>' % self.function
93
95 - def __init__(self,status,result=None,output=None,tb=None):
96 logging.debug(' new task report: %s' % status)
97 if tb:
98 logging.debug(' traceback: %s' % tb)
99 else:
100 logging.debug(' result: %s' % result)
101 self.status = status
102 self.result = result
103 self.output = output
104 self.tb = tb
106 return '<TaskReport: %s>' % self.status
107
109 """ test function """
110 for i in range(argv[0]):
111 print 'click',i
112 time.sleep(1)
113 return 'done'
114
115
116
117
119 newlist = []
120 for i in lst:
121 if isinstance(i, unicode):
122 i = i.encode('utf-8')
123 elif isinstance(i, list):
124 i = _decode_list(i)
125 newlist.append(i)
126 return newlist
127
129 newdict = {}
130 for k, v in dct.iteritems():
131 if isinstance(k, unicode):
132 k = k.encode('utf-8')
133 if isinstance(v, unicode):
134 v = v.encode('utf-8')
135 elif isinstance(v, list):
136 v = _decode_list(v)
137 newdict[k] = v
138 return newdict
139
141 """ the background process """
142 logging.debug(' task started')
143 stdout, sys.stdout = sys.stdout, cStringIO.StringIO()
144 try:
145 if task.app:
146 os.chdir(os.environ['WEB2PY_PATH'])
147 from gluon.shell import env
148 from gluon.dal import BaseAdapter
149 from gluon import current
150 level = logging.getLogger().getEffectiveLevel()
151 logging.getLogger().setLevel(logging.WARN)
152 _env = env(task.app,import_models=True)
153 logging.getLogger().setLevel(level)
154 scheduler = current._scheduler
155 scheduler_tasks = current._scheduler.tasks
156 _function = scheduler_tasks[task.function]
157 globals().update(_env)
158 args = loads(task.args)
159 vars = loads(task.vars, object_hook=_decode_dict)
160 result = dumps(_function(*args,**vars))
161 else:
162
163 result = eval(task.function)(*loads(task.args, list_hook),**loads(task.vars, object_hook=_decode_dict))
164 stdout, sys.stdout = sys.stdout, stdout
165 queue.put(TaskReport(COMPLETED, result,stdout.getvalue()))
166 except BaseException,e:
167 sys.stdout = stdout
168 tb = traceback.format_exc()
169 queue.put(TaskReport(FAILED,tb=tb))
170
263
264
265 TASK_STATUS = (QUEUED, RUNNING, COMPLETED, FAILED, TIMEOUT, STOPPED)
266 RUN_STATUS = (RUNNING, COMPLETED, FAILED, TIMEOUT, STOPPED)
267 WORKER_STATUS = (ACTIVE,INACTIVE,DISABLED)
268
270 """
271 validator that check whether field is valid json and validate its type
272 """
273
274 - def __init__(self,myclass=list,parse=False):
277
279 from gluon import current
280 try:
281 obj = loads(value)
282 except:
283 return (value,current.T('invalid json'))
284 else:
285 if isinstance(obj,self.myclass):
286 if self.parse:
287 return (obj,None)
288 else:
289 return (value,None)
290 else:
291 return (value,current.T('Not of type: %s') % self.myclass)
292
294 - def __init__(self,db,tasks={},migrate=True,
295 worker_name=None,group_names=None,heartbeat=HEARTBEAT):
296
297 MetaScheduler.__init__(self)
298
299 self.db = db
300 self.db_thread = None
301 self.tasks = tasks
302 self.group_names = group_names or ['main']
303 self.heartbeat = heartbeat
304 self.worker_name = worker_name or socket.gethostname()+'#'+str(web2py_uuid())
305
306 from gluon import current
307 current._scheduler = self
308
309 self.define_tables(db,migrate=migrate)
310
312 from gluon import current
313 logging.debug('defining tables (migrate=%s)' % migrate)
314 now = datetime.datetime.now()
315 db.define_table(
316 'scheduler_task',
317 Field('application_name',requires=IS_NOT_EMPTY(),
318 default=None,writable=False),
319 Field('task_name',requires=IS_NOT_EMPTY()),
320 Field('group_name',default='main',writable=False),
321 Field('status',requires=IS_IN_SET(TASK_STATUS),
322 default=QUEUED,writable=False),
323 Field('function_name',
324 requires=IS_IN_SET(sorted(self.tasks.keys()))),
325 Field('args','text',default='[]',requires=TYPE(list)),
326 Field('vars','text',default='{}',requires=TYPE(dict)),
327 Field('enabled','boolean',default=True),
328 Field('start_time','datetime',default=now),
329 Field('next_run_time','datetime',default=now),
330 Field('stop_time','datetime',default=now+datetime.timedelta(days=1)),
331 Field('repeats','integer',default=1,comment="0=unlimted"),
332 Field('period','integer',default=60,comment='seconds'),
333 Field('timeout','integer',default=60,comment='seconds'),
334 Field('times_run','integer',default=0,writable=False),
335 Field('last_run_time','datetime',writable=False,readable=False),
336 Field('assigned_worker_name',default='',writable=False),
337 migrate=migrate,format='%(task_name)s')
338 if hasattr(current,'request'):
339 db.scheduler_task.application_name.default=current.request.application
340
341 db.define_table(
342 'scheduler_run',
343 Field('scheduler_task','reference scheduler_task'),
344 Field('status',requires=IS_IN_SET(RUN_STATUS)),
345 Field('start_time','datetime'),
346 Field('stop_time','datetime'),
347 Field('output','text'),
348 Field('result','text'),
349 Field('traceback','text'),
350 Field('worker_name',default=self.worker_name),
351 migrate=migrate)
352
353 db.define_table(
354 'scheduler_worker',
355 Field('worker_name'),
356 Field('first_heartbeat','datetime'),
357 Field('last_heartbeat','datetime'),
358 Field('status',requires=IS_IN_SET(WORKER_STATUS)),
359 migrate=migrate)
360 db.commit()
361
362 - def loop(self,worker_name=None):
364
366 now = datetime.datetime.now()
367 db, ts = self.db, self.db.scheduler_task
368 try:
369 logging.debug(' grabbing all queued tasks')
370 all_available = db(ts.status.belongs((QUEUED,RUNNING)))\
371 ((ts.times_run<ts.repeats)|(ts.repeats==0))\
372 (ts.start_time<=now)\
373 (ts.stop_time>now)\
374 (ts.next_run_time<=now)\
375 (ts.enabled==True)\
376 (ts.assigned_worker_name.belongs((None,'',self.worker_name)))
377 number_grabbed = all_available.update(
378 assigned_worker_name=self.worker_name,status=ASSIGNED)
379 db.commit()
380 except:
381 number_grabbed = None
382 db.rollback()
383 if number_grabbed:
384 logging.debug(' grabbed %s tasks' % number_grabbed)
385 grabbed = db(ts.assigned_worker_name==self.worker_name)\
386 (ts.status==ASSIGNED)
387 task = grabbed.select(limitby=(0,1), orderby=ts.next_run_time).first()
388
389 logging.debug(' releasing all but one (running)')
390 if task:
391 task.update_record(status=RUNNING,last_run_time=now)
392 grabbed.update(assigned_worker_name='',status=QUEUED)
393 db.commit()
394 else:
395 return None
396 next_run_time = task.last_run_time + datetime.timedelta(seconds=task.period)
397 times_run = task.times_run + 1
398 if times_run < task.repeats or task.repeats==0:
399 run_again = True
400 else:
401 run_again = False
402 logging.debug(' new scheduler_run record')
403 while True:
404 try:
405 run_id = db.scheduler_run.insert(
406 scheduler_task = task.id,
407 status=RUNNING,
408 start_time=now,
409 worker_name=self.worker_name)
410 db.commit()
411 break
412 except:
413 db.rollback
414 logging.info('new task %(id)s "%(task_name)s" %(application_name)s.%(function_name)s' % task)
415 return Task(
416 app = task.application_name,
417 function = task.function_name,
418 timeout = task.timeout,
419 args = task.args,
420 vars = task.vars,
421 task_id = task.id,
422 run_id = run_id,
423 run_again = run_again,
424 next_run_time=next_run_time,
425 times_run = times_run)
426
428 logging.debug(' recording task report in db (%s)' % task_report.status)
429 db = self.db
430 db(db.scheduler_run.id==task.run_id).update(
431 status = task_report.status,
432 stop_time = datetime.datetime.now(),
433 result = task_report.result,
434 output = task_report.output,
435 traceback = task_report.tb)
436 if task_report.status == COMPLETED:
437 d = dict(status = task.run_again and QUEUED or COMPLETED,
438 next_run_time = task.next_run_time,
439 times_run = task.times_run,
440 assigned_worker_name = '')
441 else:
442 d = dict(
443 assigned_worker_name = '',
444 status = {'FAILED':'FAILED',
445 'TIMEOUT':'TIMEOUT',
446 'STOPPED':'QUEUED'}[task_report.status])
447 db(db.scheduler_task.id==task.task_id)\
448 (db.scheduler_task.status==RUNNING).update(**d)
449 db.commit()
450 logging.info('task completed (%s)' % task_report.status)
451
453 if not self.db_thread:
454 logging.debug('thread building own DAL object')
455 self.db_thread = DAL(self.db._uri,folder = self.db._adapter.folder)
456 self.define_tables(self.db_thread,migrate=False)
457 try:
458 db = self.db_thread
459 sw, st = db.scheduler_worker, db.scheduler_task
460 now = datetime.datetime.now()
461 expiration = now-datetime.timedelta(seconds=self.heartbeat*3)
462
463 logging.debug('........recording heartbeat')
464 if not db(sw.worker_name==self.worker_name)\
465 .update(last_heartbeat = now, status = ACTIVE):
466 sw.insert(status = ACTIVE,worker_name = self.worker_name,
467 first_heartbeat = now,last_heartbeat = now)
468 if counter % 10 == 0:
469
470 logging.debug(' freeing workers that have not sent heartbeat')
471 inactive_workers = db(sw.last_heartbeat<expiration)
472 db(st.assigned_worker_name.belongs(
473 inactive_workers._select(sw.worker_name)))\
474 (st.status.belongs((RUNNING,ASSIGNED,QUEUED)))\
475 .update(assigned_worker_name='',status=QUEUED)
476 inactive_workers.delete()
477 db.commit()
478 except:
479 db.rollback()
480 time.sleep(self.heartbeat)
481
484
486 """
487 allows to run worker without python web2py.py .... by simply python this.py
488 """
489 parser = optparse.OptionParser()
490 parser.add_option(
491 "-w", "--worker_name", dest="worker_name", default=None,
492 help="start a worker with name")
493 parser.add_option(
494 "-b", "--heartbeat",dest="heartbeat", default = 10,
495 help="heartbeat time in seconds (default 10)")
496 parser.add_option(
497 "-L", "--logger_level",dest="logger_level",
498 default = 'INFO',
499 help="level of logging (DEBUG, INFO, WARNING, ERROR)")
500 parser.add_option(
501 "-g", "--group_names",dest="group_names",
502 default = 'main',
503 help="comma separated list of groups to be picked by the worker")
504 parser.add_option(
505 "-f", "--db_folder",dest="db_folder",
506 default = '/Users/mdipierro/web2py/applications/scheduler/databases',
507 help="location of the dal database folder")
508 parser.add_option(
509 "-u", "--db_uri",dest="db_uri",
510 default = 'sqlite://storage.sqlite',
511 help="database URI string (web2py DAL syntax)")
512 parser.add_option(
513 "-t", "--tasks",dest="tasks",default=None,
514 help="file containing task files, must define" + \
515 "tasks = {'task_name':(lambda: 'output')} or similar set of tasks")
516 (options, args) = parser.parse_args()
517 if not options.tasks or not options.db_uri:
518 print USAGE
519 if options.tasks:
520 path,filename = os.path.split(options.tasks)
521 if filename.endswith('.py'):
522 filename = filename[:-3]
523 sys.path.append(path)
524 print 'importing tasks...'
525 tasks = __import__(filename, globals(), locals(), [], -1).tasks
526 print 'tasks found: '+', '.join(tasks.keys())
527 else:
528 tasks = {}
529 group_names = [x.strip() for x in options.group_names.split(',')]
530
531 logging.getLogger().setLevel(options.logger_level)
532
533 print 'groups for this worker: '+', '.join(group_names)
534 print 'connecting to database in folder: ' + options.db_folder or './'
535 print 'using URI: '+options.db_uri
536 db = DAL(options.db_uri,folder=options.db_folder)
537 print 'instantiating scheduler...'
538 scheduler=Scheduler(db = db,
539 worker_name = options.worker_name,
540 tasks = tasks,
541 migrate = True,
542 group_names = group_names,
543 heartbeat = options.heartbeat)
544 print 'starting main worker loop...'
545 scheduler.loop()
546
547 if __name__=='__main__':
548 main()
549