Package web2py :: Package gluon :: Module scheduler
[hide private]
[frames] | no frames]

Source Code for Module web2py.gluon.scheduler

  1  #### WORK IN PROGRESS... NOT SUPPOSED TO WORK YET 
  2   
  3  USAGE = """ 
  4  ## Example 
  5   
  6  For any existing app 
  7   
  8  Create File: app/models/scheduler.py ====== 
  9  from gluon.scheduler import Scheduler 
 10   
 11  def demo1(*args,**vars): 
 12      print 'you passed args=%s and vars=%s' % (args, vars) 
 13      return 'done!' 
 14   
 15  def demo2(): 
 16      1/0 
 17   
 18  scheduler = Scheduler(db,dict(demo1=demo1,demo2=demo2)) 
 19  ## run worker nodes with: 
 20   
 21     cd web2py 
 22     python gluon/scheduler.py -u sqlite://storage.sqlite \ 
 23                               -f applications/myapp/databases/ \ 
 24                               -t mytasks.py 
 25  (-h for info) 
 26  python scheduler.py -h 
 27   
 28  ## schedule jobs using 
 29  http://127.0.0.1:8000/scheduler/appadmin/insert/db/scheduler_task 
 30   
 31  ## monitor scheduled jobs 
 32  http://127.0.0.1:8000/scheduler/appadmin/select/db?query=db.scheduler_task.id>0 
 33   
 34  ## view completed jobs 
 35  http://127.0.0.1:8000/scheduler/appadmin/select/db?query=db.scheduler_run.id>0 
 36   
 37  ## view workers 
 38  http://127.0.0.1:8000/scheduler/appadmin/select/db?query=db.scheduler_worker.id>0 
 39   
 40  ## Comments 
 41  """ 
 42   
 43  import os 
 44  import time 
 45  import multiprocessing 
 46  import sys 
 47  import cStringIO 
 48  import threading 
 49  import traceback 
 50  import signal 
 51  import socket 
 52  import datetime 
 53  import logging 
 54  import optparse 
 55   
 56  try: 
 57      from gluon.contrib.simplejson import loads, dumps 
 58  except: 
 59      from simplejson import loads, dumps 
 60   
 61  if 'WEB2PY_PATH' in os.environ: 
 62      sys.path.append(os.environ['WEB2PY_PATH']) 
 63  else: 
 64      os.environ['WEB2PY_PATH'] = os.getcwd() 
 65   
 66  from gluon import DAL, Field, IS_NOT_EMPTY, IS_IN_SET 
 67  from gluon.utils import web2py_uuid 
 68   
 69  QUEUED = 'QUEUED' 
 70  ASSIGNED = 'ASSIGNED' 
 71  RUNNING = 'RUNNING' 
 72  COMPLETED = 'COMPLETED' 
 73  FAILED = 'FAILED' 
 74  TIMEOUT = 'TIMEOUT' 
 75  STOPPED = 'STOPPED' 
 76  ACTIVE = 'ACTIVE' 
 77  INACTIVE = 'INACTIVE' 
 78  DISABLED = 'DISABLED' 
 79  SECONDS = 1 
 80  HEARTBEAT = 3*SECONDS 
 81   
82 -class Task(object):
83 - def __init__(self,app,function,timeout,args='[]',vars='{}',**kwargs):
84 logging.debug(' new task allocated: %s.%s' % (app,function)) 85 self.app = app 86 self.function = function 87 self.timeout = timeout 88 self.args = args # json 89 self.vars = vars # json 90 self.__dict__.update(kwargs)
91 - def __str__(self):
92 return '<Task: %s>' % self.function
93
94 -class TaskReport(object):
95 - def __init__(self,status,result=None,output=None,tb=None):
96 logging.debug(' new task report: %s' % status) 97 if tb: 98 logging.debug(' traceback: %s' % tb) 99 else: 100 logging.debug(' result: %s' % result) 101 self.status = status 102 self.result = result 103 self.output = output 104 self.tb = tb
105 - def __str__(self):
106 return '<TaskReport: %s>' % self.status
107
108 -def demo_function(*argv,**kwargs):
109 """ test function """ 110 for i in range(argv[0]): 111 print 'click',i 112 time.sleep(1) 113 return 'done'
114 115 #the two functions below deal with simplejson decoding as unicode, esp for the dict decode 116 #and subsequent usage as function Keyword arguments unicode variable names won't work! 117 #borrowed from http://stackoverflow.com/questions/956867/how-to-get-string-objects-instead-unicode-ones-from-json-in-python
118 -def _decode_list(lst):
119 newlist = [] 120 for i in lst: 121 if isinstance(i, unicode): 122 i = i.encode('utf-8') 123 elif isinstance(i, list): 124 i = _decode_list(i) 125 newlist.append(i) 126 return newlist
127
128 -def _decode_dict(dct):
129 newdict = {} 130 for k, v in dct.iteritems(): 131 if isinstance(k, unicode): 132 k = k.encode('utf-8') 133 if isinstance(v, unicode): 134 v = v.encode('utf-8') 135 elif isinstance(v, list): 136 v = _decode_list(v) 137 newdict[k] = v 138 return newdict
139
140 -def executor(queue,task):
141 """ the background process """ 142 logging.debug(' task started') 143 stdout, sys.stdout = sys.stdout, cStringIO.StringIO() 144 try: 145 if task.app: 146 os.chdir(os.environ['WEB2PY_PATH']) 147 from gluon.shell import env 148 from gluon.dal import BaseAdapter 149 from gluon import current 150 level = logging.getLogger().getEffectiveLevel() 151 logging.getLogger().setLevel(logging.WARN) 152 _env = env(task.app,import_models=True) 153 logging.getLogger().setLevel(level) 154 scheduler = current._scheduler 155 scheduler_tasks = current._scheduler.tasks 156 _function = scheduler_tasks[task.function] 157 globals().update(_env) 158 args = loads(task.args) 159 vars = loads(task.vars, object_hook=_decode_dict) 160 result = dumps(_function(*args,**vars)) 161 else: 162 ### for testing purpose only 163 result = eval(task.function)(*loads(task.args, list_hook),**loads(task.vars, object_hook=_decode_dict)) 164 stdout, sys.stdout = sys.stdout, stdout 165 queue.put(TaskReport(COMPLETED, result,stdout.getvalue())) 166 except BaseException,e: 167 sys.stdout = stdout 168 tb = traceback.format_exc() 169 queue.put(TaskReport(FAILED,tb=tb))
170
171 -class MetaScheduler(threading.Thread):
172 - def __init__(self):
173 threading.Thread.__init__(self) 174 self.process = None # the backround process 175 self.have_heartbeat = True # set to False to kill
176 - def async(self,task):
177 """ 178 starts the background process and returns: 179 ('ok',result,output) 180 ('error',exception,None) 181 ('timeout',None,None) 182 ('terminated',None,None) 183 """ 184 queue = multiprocessing.Queue(maxsize=1) 185 p = multiprocessing.Process(target=executor,args=(queue,task)) 186 self.process = p 187 logging.debug(' task starting') 188 p.start() 189 try: 190 p.join(task.timeout) 191 except: 192 p.terminate() 193 p.join() 194 self.have_heartbeat = False 195 logging.debug(' task stopped') 196 return TaskReport(STOPPED) 197 if p.is_alive(): 198 p.terminate() 199 p.join() 200 logging.debug(' task timeout') 201 return TaskReport(TIMEOUT) 202 elif queue.empty(): 203 self.have_heartbeat = False 204 logging.debug(' task stopped') 205 return TaskReport(STOPPED) 206 else: 207 logging.debug(' task completed or failed') 208 return queue.get()
209
210 - def die(self):
211 logging.info('die!') 212 self.have_heartbeat = False 213 self.terminate_process()
214
215 - def terminate_process(self):
216 try: 217 self.process.terminate() 218 except: 219 pass # no process to terminate
220
221 - def run(self):
222 """ the thread that sends heartbeat """ 223 counter = 0 224 while self.have_heartbeat: 225 self.send_heartbeat(counter) 226 counter += 1
227
228 - def start_heartbeats(self):
229 self.start()
230
231 - def send_heartbeat(self,counter):
232 print 'thum' 233 time.sleep(1)
234
235 - def pop_task(self):
236 return Task( 237 app = None, 238 function = 'demo_function', 239 timeout = 7, 240 args = '[2]', 241 vars = '{}')
242
243 - def report_task(self,task,task_report):
244 print 'reporting task' 245 pass
246
247 - def sleep(self):
248 pass
249
250 - def loop(self):
251 try: 252 self.start_heartbeats() 253 while True and self.have_heartbeat: 254 logging.debug('looping...') 255 task = self.pop_task() 256 if task: 257 self.report_task(task,self.async(task)) 258 else: 259 logging.debug('sleeping...') 260 self.sleep() 261 except KeyboardInterrupt: 262 self.die()
263 264 265 TASK_STATUS = (QUEUED, RUNNING, COMPLETED, FAILED, TIMEOUT, STOPPED) 266 RUN_STATUS = (RUNNING, COMPLETED, FAILED, TIMEOUT, STOPPED) 267 WORKER_STATUS = (ACTIVE,INACTIVE,DISABLED) 268
269 -class TYPE(object):
270 """ 271 validator that check whether field is valid json and validate its type 272 """ 273
274 - def __init__(self,myclass=list,parse=False):
275 self.myclass = myclass 276 self.parse=parse
277
278 - def __call__(self,value):
279 from gluon import current 280 try: 281 obj = loads(value) 282 except: 283 return (value,current.T('invalid json')) 284 else: 285 if isinstance(obj,self.myclass): 286 if self.parse: 287 return (obj,None) 288 else: 289 return (value,None) 290 else: 291 return (value,current.T('Not of type: %s') % self.myclass)
292
293 -class Scheduler(MetaScheduler):
294 - def __init__(self,db,tasks={},migrate=True, 295 worker_name=None,group_names=None,heartbeat=HEARTBEAT):
296 297 MetaScheduler.__init__(self) 298 299 self.db = db 300 self.db_thread = None 301 self.tasks = tasks 302 self.group_names = group_names or ['main'] 303 self.heartbeat = heartbeat 304 self.worker_name = worker_name or socket.gethostname()+'#'+str(web2py_uuid()) 305 306 from gluon import current 307 current._scheduler = self 308 309 self.define_tables(db,migrate=migrate)
310
311 - def define_tables(self,db,migrate):
312 from gluon import current 313 logging.debug('defining tables (migrate=%s)' % migrate) 314 now = datetime.datetime.now() 315 db.define_table( 316 'scheduler_task', 317 Field('application_name',requires=IS_NOT_EMPTY(), 318 default=None,writable=False), 319 Field('task_name',requires=IS_NOT_EMPTY()), 320 Field('group_name',default='main',writable=False), 321 Field('status',requires=IS_IN_SET(TASK_STATUS), 322 default=QUEUED,writable=False), 323 Field('function_name', 324 requires=IS_IN_SET(sorted(self.tasks.keys()))), 325 Field('args','text',default='[]',requires=TYPE(list)), 326 Field('vars','text',default='{}',requires=TYPE(dict)), 327 Field('enabled','boolean',default=True), 328 Field('start_time','datetime',default=now), 329 Field('next_run_time','datetime',default=now), 330 Field('stop_time','datetime',default=now+datetime.timedelta(days=1)), 331 Field('repeats','integer',default=1,comment="0=unlimted"), 332 Field('period','integer',default=60,comment='seconds'), 333 Field('timeout','integer',default=60,comment='seconds'), 334 Field('times_run','integer',default=0,writable=False), 335 Field('last_run_time','datetime',writable=False,readable=False), 336 Field('assigned_worker_name',default='',writable=False), 337 migrate=migrate,format='%(task_name)s') 338 if hasattr(current,'request'): 339 db.scheduler_task.application_name.default=current.request.application 340 341 db.define_table( 342 'scheduler_run', 343 Field('scheduler_task','reference scheduler_task'), 344 Field('status',requires=IS_IN_SET(RUN_STATUS)), 345 Field('start_time','datetime'), 346 Field('stop_time','datetime'), 347 Field('output','text'), 348 Field('result','text'), 349 Field('traceback','text'), 350 Field('worker_name',default=self.worker_name), 351 migrate=migrate) 352 353 db.define_table( 354 'scheduler_worker', 355 Field('worker_name'), 356 Field('first_heartbeat','datetime'), 357 Field('last_heartbeat','datetime'), 358 Field('status',requires=IS_IN_SET(WORKER_STATUS)), 359 migrate=migrate) 360 db.commit()
361
362 - def loop(self,worker_name=None):
363 MetaScheduler.loop(self)
364
365 - def pop_task(self):
366 now = datetime.datetime.now() 367 db, ts = self.db, self.db.scheduler_task 368 try: 369 logging.debug(' grabbing all queued tasks') 370 all_available = db(ts.status.belongs((QUEUED,RUNNING)))\ 371 ((ts.times_run<ts.repeats)|(ts.repeats==0))\ 372 (ts.start_time<=now)\ 373 (ts.stop_time>now)\ 374 (ts.next_run_time<=now)\ 375 (ts.enabled==True)\ 376 (ts.assigned_worker_name.belongs((None,'',self.worker_name))) #None? 377 number_grabbed = all_available.update( 378 assigned_worker_name=self.worker_name,status=ASSIGNED) 379 db.commit() 380 except: 381 number_grabbed = None 382 db.rollback() 383 if number_grabbed: 384 logging.debug(' grabbed %s tasks' % number_grabbed) 385 grabbed = db(ts.assigned_worker_name==self.worker_name)\ 386 (ts.status==ASSIGNED) 387 task = grabbed.select(limitby=(0,1), orderby=ts.next_run_time).first() 388 389 logging.debug(' releasing all but one (running)') 390 if task: 391 task.update_record(status=RUNNING,last_run_time=now) 392 grabbed.update(assigned_worker_name='',status=QUEUED) 393 db.commit() 394 else: 395 return None 396 next_run_time = task.last_run_time + datetime.timedelta(seconds=task.period) 397 times_run = task.times_run + 1 398 if times_run < task.repeats or task.repeats==0: 399 run_again = True 400 else: 401 run_again = False 402 logging.debug(' new scheduler_run record') 403 while True: 404 try: 405 run_id = db.scheduler_run.insert( 406 scheduler_task = task.id, 407 status=RUNNING, 408 start_time=now, 409 worker_name=self.worker_name) 410 db.commit() 411 break 412 except: 413 db.rollback 414 logging.info('new task %(id)s "%(task_name)s" %(application_name)s.%(function_name)s' % task) 415 return Task( 416 app = task.application_name, 417 function = task.function_name, 418 timeout = task.timeout, 419 args = task.args, #in json 420 vars = task.vars, #in json 421 task_id = task.id, 422 run_id = run_id, 423 run_again = run_again, 424 next_run_time=next_run_time, 425 times_run = times_run)
426
427 - def report_task(self,task,task_report):
428 logging.debug(' recording task report in db (%s)' % task_report.status) 429 db = self.db 430 db(db.scheduler_run.id==task.run_id).update( 431 status = task_report.status, 432 stop_time = datetime.datetime.now(), 433 result = task_report.result, 434 output = task_report.output, 435 traceback = task_report.tb) 436 if task_report.status == COMPLETED: 437 d = dict(status = task.run_again and QUEUED or COMPLETED, 438 next_run_time = task.next_run_time, 439 times_run = task.times_run, 440 assigned_worker_name = '') 441 else: 442 d = dict( 443 assigned_worker_name = '', 444 status = {'FAILED':'FAILED', 445 'TIMEOUT':'TIMEOUT', 446 'STOPPED':'QUEUED'}[task_report.status]) 447 db(db.scheduler_task.id==task.task_id)\ 448 (db.scheduler_task.status==RUNNING).update(**d) 449 db.commit() 450 logging.info('task completed (%s)' % task_report.status)
451
452 - def send_heartbeat(self,counter):
453 if not self.db_thread: 454 logging.debug('thread building own DAL object') 455 self.db_thread = DAL(self.db._uri,folder = self.db._adapter.folder) 456 self.define_tables(self.db_thread,migrate=False) 457 try: 458 db = self.db_thread 459 sw, st = db.scheduler_worker, db.scheduler_task 460 now = datetime.datetime.now() 461 expiration = now-datetime.timedelta(seconds=self.heartbeat*3) 462 # record heartbeat 463 logging.debug('........recording heartbeat') 464 if not db(sw.worker_name==self.worker_name)\ 465 .update(last_heartbeat = now, status = ACTIVE): 466 sw.insert(status = ACTIVE,worker_name = self.worker_name, 467 first_heartbeat = now,last_heartbeat = now) 468 if counter % 10 == 0: 469 # deallocate jobs assigned to inactive workers and requeue them 470 logging.debug(' freeing workers that have not sent heartbeat') 471 inactive_workers = db(sw.last_heartbeat<expiration) 472 db(st.assigned_worker_name.belongs( 473 inactive_workers._select(sw.worker_name)))\ 474 (st.status.belongs((RUNNING,ASSIGNED,QUEUED)))\ 475 .update(assigned_worker_name='',status=QUEUED) 476 inactive_workers.delete() 477 db.commit() 478 except: 479 db.rollback() 480 time.sleep(self.heartbeat)
481
482 - def sleep(self):
483 time.sleep(self.heartbeat) # should only sleep until next available task
484
485 -def main():
486 """ 487 allows to run worker without python web2py.py .... by simply python this.py 488 """ 489 parser = optparse.OptionParser() 490 parser.add_option( 491 "-w", "--worker_name", dest="worker_name", default=None, 492 help="start a worker with name") 493 parser.add_option( 494 "-b", "--heartbeat",dest="heartbeat", default = 10, 495 help="heartbeat time in seconds (default 10)") 496 parser.add_option( 497 "-L", "--logger_level",dest="logger_level", 498 default = 'INFO', 499 help="level of logging (DEBUG, INFO, WARNING, ERROR)") 500 parser.add_option( 501 "-g", "--group_names",dest="group_names", 502 default = 'main', 503 help="comma separated list of groups to be picked by the worker") 504 parser.add_option( 505 "-f", "--db_folder",dest="db_folder", 506 default = '/Users/mdipierro/web2py/applications/scheduler/databases', 507 help="location of the dal database folder") 508 parser.add_option( 509 "-u", "--db_uri",dest="db_uri", 510 default = 'sqlite://storage.sqlite', 511 help="database URI string (web2py DAL syntax)") 512 parser.add_option( 513 "-t", "--tasks",dest="tasks",default=None, 514 help="file containing task files, must define" + \ 515 "tasks = {'task_name':(lambda: 'output')} or similar set of tasks") 516 (options, args) = parser.parse_args() 517 if not options.tasks or not options.db_uri: 518 print USAGE 519 if options.tasks: 520 path,filename = os.path.split(options.tasks) 521 if filename.endswith('.py'): 522 filename = filename[:-3] 523 sys.path.append(path) 524 print 'importing tasks...' 525 tasks = __import__(filename, globals(), locals(), [], -1).tasks 526 print 'tasks found: '+', '.join(tasks.keys()) 527 else: 528 tasks = {} 529 group_names = [x.strip() for x in options.group_names.split(',')] 530 531 logging.getLogger().setLevel(options.logger_level) 532 533 print 'groups for this worker: '+', '.join(group_names) 534 print 'connecting to database in folder: ' + options.db_folder or './' 535 print 'using URI: '+options.db_uri 536 db = DAL(options.db_uri,folder=options.db_folder) 537 print 'instantiating scheduler...' 538 scheduler=Scheduler(db = db, 539 worker_name = options.worker_name, 540 tasks = tasks, 541 migrate = True, 542 group_names = group_names, 543 heartbeat = options.heartbeat) 544 print 'starting main worker loop...' 545 scheduler.loop()
546 547 if __name__=='__main__': 548 main() 549