Package web2py :: Package gluon :: Module rocket
[hide private]
[frames] | no frames]

Source Code for Module web2py.gluon.rocket

   1  # -*- coding: utf-8 -*- 
   2   
   3  # This file is part of the Rocket Web Server 
   4  # Copyright (c) 2011 Timothy Farrell 
   5   
   6  # Import System Modules 
   7  import sys 
   8  import errno 
   9  import socket 
  10  import logging 
  11  import platform 
  12  import traceback 
  13   
  14  # Define Constants 
  15  VERSION = '1.2.4' 
  16  SERVER_NAME = socket.gethostname() 
  17  SERVER_SOFTWARE = 'Rocket %s' % VERSION 
  18  HTTP_SERVER_SOFTWARE = '%s Python/%s' % (SERVER_SOFTWARE, sys.version.split(' ')[0]) 
  19  BUF_SIZE = 16384 
  20  SOCKET_TIMEOUT = 1 # in secs 
  21  THREAD_STOP_CHECK_INTERVAL = 1 # in secs, How often should threads check for a server stop message? 
  22  IS_JYTHON = platform.system() == 'Java' # Handle special cases for Jython 
  23  IGNORE_ERRORS_ON_CLOSE = set([errno.ECONNABORTED, errno.ECONNRESET]) 
  24  DEFAULT_LISTEN_QUEUE_SIZE = 5 
  25  DEFAULT_MIN_THREADS = 10 
  26  DEFAULT_MAX_THREADS = 0 
  27  DEFAULTS = dict(LISTEN_QUEUE_SIZE = DEFAULT_LISTEN_QUEUE_SIZE, 
  28                  MIN_THREADS = DEFAULT_MIN_THREADS, 
  29                  MAX_THREADS = DEFAULT_MAX_THREADS) 
  30   
  31  PY3K = sys.version_info[0] > 2 
  32   
33 -class NullHandler(logging.Handler):
34 "A Logging handler to prevent library errors."
35 - def emit(self, record):
36 pass
37 38 if PY3K:
39 - def b(val):
40 """ Convert string/unicode/bytes literals into bytes. This allows for 41 the same code to run on Python 2.x and 3.x. """ 42 if isinstance(val, str): 43 return val.encode() 44 else: 45 return val
46
47 - def u(val, encoding="us-ascii"):
48 """ Convert bytes into string/unicode. This allows for the 49 same code to run on Python 2.x and 3.x. """ 50 if isinstance(val, bytes): 51 return val.decode(encoding) 52 else: 53 return val
54 55 else:
56 - def b(val):
57 """ Convert string/unicode/bytes literals into bytes. This allows for 58 the same code to run on Python 2.x and 3.x. """ 59 if isinstance(val, unicode): 60 return val.encode() 61 else: 62 return val
63
64 - def u(val, encoding="us-ascii"):
65 """ Convert bytes into string/unicode. This allows for the 66 same code to run on Python 2.x and 3.x. """ 67 if isinstance(val, str): 68 return val.decode(encoding) 69 else: 70 return val
71 72 # Import Package Modules 73 # package imports removed in monolithic build 74 75 __all__ = ['VERSION', 'SERVER_SOFTWARE', 'HTTP_SERVER_SOFTWARE', 'BUF_SIZE', 76 'IS_JYTHON', 'IGNORE_ERRORS_ON_CLOSE', 'DEFAULTS', 'PY3K', 'b', 'u', 77 'Rocket', 'CherryPyWSGIServer', 'SERVER_NAME', 'NullHandler'] 78 79 # Monolithic build...end of module: rocket\__init__.py 80 # Monolithic build...start of module: rocket\connection.py 81 82 # Import System Modules 83 import sys 84 import time 85 import socket 86 try: 87 import ssl 88 has_ssl = True 89 except ImportError: 90 has_ssl = False 91 # Import Package Modules 92 # package imports removed in monolithic build 93 # TODO - This part is still very experimental. 94 #from .filelike import FileLikeSocket 95
96 -class Connection(object):
97 __slots__ = [ 98 'setblocking', 99 'sendall', 100 'shutdown', 101 'makefile', 102 'fileno', 103 'client_addr', 104 'client_port', 105 'server_port', 106 'socket', 107 'start_time', 108 'ssl', 109 'secure', 110 'recv', 111 'send', 112 'read', 113 'write' 114 ] 115
116 - def __init__(self, sock_tuple, port, secure=False):
117 self.client_addr, self.client_port = sock_tuple[1] 118 self.server_port = port 119 self.socket = sock_tuple[0] 120 self.start_time = time.time() 121 self.ssl = has_ssl and isinstance(self.socket, ssl.SSLSocket) 122 self.secure = secure 123 124 if IS_JYTHON: 125 # In Jython we must set TCP_NODELAY here since it does not 126 # inherit from the listening socket. 127 # See: http://bugs.jython.org/issue1309 128 self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) 129 130 self.socket.settimeout(SOCKET_TIMEOUT) 131 132 self.sendall = self.socket.sendall 133 self.shutdown = self.socket.shutdown 134 self.fileno = self.socket.fileno 135 self.setblocking = self.socket.setblocking 136 self.recv = self.socket.recv 137 self.send = self.socket.send 138 self.makefile = self.socket.makefile
139 140 # FIXME - this is not ready for prime-time yet. 141 # def makefile(self, buf_size=BUF_SIZE): 142 # return FileLikeSocket(self, buf_size) 143
144 - def close(self):
145 if hasattr(self.socket, '_sock'): 146 try: 147 self.socket._sock.close() 148 except socket.error: 149 info = sys.exc_info() 150 if info[1].args[0] != socket.EBADF: 151 raise info[1] 152 else: 153 pass 154 self.socket.close()
155 156 157 # Monolithic build...end of module: rocket\connection.py 158 # Monolithic build...start of module: rocket\filelike.py 159 160 # Import System Modules 161 import socket 162 try: 163 from io import StringIO 164 except ImportError: 165 try: 166 from cStringIO import StringIO 167 except ImportError: 168 from StringIO import StringIO 169 # Import Package Modules 170 # package imports removed in monolithic build 171
172 -class FileLikeSocket(object):
173 - def __init__(self, conn, buf_size=BUF_SIZE):
174 self.conn = conn 175 self.buf_size = buf_size 176 self.buffer = StringIO() 177 self.content_length = None 178 179 if self.conn.socket.gettimeout() == 0.0: 180 self.read = self.non_blocking_read 181 else: 182 self.read = self.blocking_read
183
184 - def __iter__(self):
185 return self
186
187 - def recv(self, size):
188 while True: 189 try: 190 return self.conn.recv(size) 191 except socket.error: 192 exc = sys.exc_info() 193 e = exc[1] 194 # FIXME - Don't raise socket_errors_nonblocking or socket_error_eintr 195 if (e.args[0] not in set()): 196 raise
197
198 - def next(self):
199 data = self.readline() 200 if data == '': 201 raise StopIteration 202 return data
203
204 - def non_blocking_read(self, size=None):
205 # Shamelessly adapted from Cherrypy! 206 bufr = self.buffer 207 bufr.seek(0, 2) 208 if size is None: 209 while True: 210 data = self.recv(self.buf_size) 211 if not data: 212 break 213 bufr.write(data) 214 215 self.buffer = StringIO() 216 217 return bufr.getvalue() 218 else: 219 buf_len = self.buffer.tell() 220 if buf_len >= size: 221 bufr.seek(0) 222 data = bufr.read(size) 223 self.buffer = StringIO(bufr.read()) 224 return data 225 226 self.buffer = StringIO() 227 while True: 228 remaining = size - buf_len 229 data = self.recv(remaining) 230 231 if not data: 232 break 233 234 n = len(data) 235 if n == size and not buf_len: 236 return data 237 238 if n == remaining: 239 bufr.write(data) 240 del data 241 break 242 243 bufr.write(data) 244 buf_len += n 245 del data 246 247 return bufr.getvalue()
248
249 - def blocking_read(self, length=None):
250 if length is None: 251 if self.content_length is not None: 252 length = self.content_length 253 else: 254 length = 1 255 256 try: 257 data = self.conn.recv(length) 258 except: 259 data = b('') 260 261 return data
262
263 - def readline(self):
264 data = b("") 265 char = self.read(1) 266 while char != b('\n') and char is not b(''): 267 line = repr(char) 268 data += char 269 char = self.read(1) 270 data += char 271 return data
272
273 - def readlines(self, hint="ignored"):
274 return list(self)
275
276 - def close(self):
277 self.conn = None 278 self.content_length = None
279 280 # Monolithic build...end of module: rocket\filelike.py 281 # Monolithic build...start of module: rocket\futures.py 282 283 # Import System Modules 284 import time 285 try: 286 from concurrent.futures import Future, ThreadPoolExecutor 287 from concurrent.futures.thread import _WorkItem 288 has_futures = True 289 except ImportError: 290 has_futures = False 291
292 - class Future:
293 pass
294
295 - class ThreadPoolExecutor:
296 pass
297
298 - class _WorkItem:
299 pass
300 301
302 -class WSGIFuture(Future):
303 - def __init__(self, f_dict, *args, **kwargs):
304 Future.__init__(self, *args, **kwargs) 305 306 self.timeout = None 307 308 self._mem_dict = f_dict 309 self._lifespan = 30 310 self._name = None 311 self._start_time = time.time()
312
314 if time.time() - self._start_time >= self._lifespan: 315 self.cancel() 316 else: 317 return super(WSGIFuture, self).set_running_or_notify_cancel()
318 319
320 - def remember(self, name, lifespan=None):
321 self._lifespan = lifespan or self._lifespan 322 323 if name in self._mem_dict: 324 raise NameError('Cannot remember future by name "%s". ' % name + \ 325 'A future already exists with that name.' ) 326 self._name = name 327 self._mem_dict[name] = self 328 329 return self
330
331 - def forget(self):
332 if self._name in self._mem_dict and self._mem_dict[self._name] is self: 333 del self._mem_dict[self._name] 334 self._name = None
335
336 -class _WorkItem(object):
337 - def __init__(self, future, fn, args, kwargs):
338 self.future = future 339 self.fn = fn 340 self.args = args 341 self.kwargs = kwargs
342
343 - def run(self):
344 if not self.future.set_running_or_notify_cancel(): 345 return 346 347 try: 348 result = self.fn(*self.args, **self.kwargs) 349 except BaseException: 350 e = sys.exc_info()[1] 351 self.future.set_exception(e) 352 else: 353 self.future.set_result(result)
354
355 -class WSGIExecutor(ThreadPoolExecutor):
356 multithread = True 357 multiprocess = False 358
359 - def __init__(self, *args, **kwargs):
360 ThreadPoolExecutor.__init__(self, *args, **kwargs) 361 362 self.futures = dict()
363
364 - def submit(self, fn, *args, **kwargs):
365 if self._shutdown_lock.acquire(): 366 if self._shutdown: 367 self._shutdown_lock.release() 368 raise RuntimeError('Cannot schedule new futures after shutdown') 369 370 f = WSGIFuture(self.futures) 371 w = _WorkItem(f, fn, args, kwargs) 372 373 self._work_queue.put(w) 374 self._adjust_thread_count() 375 self._shutdown_lock.release() 376 return f 377 else: 378 return False
379
380 -class FuturesMiddleware(object):
381 "Futures middleware that adds a Futures Executor to the environment"
382 - def __init__(self, app, threads=5):
383 self.app = app 384 self.executor = WSGIExecutor(threads)
385
386 - def __call__(self, environ, start_response):
387 environ["wsgiorg.executor"] = self.executor 388 environ["wsgiorg.futures"] = self.executor.futures 389 return self.app(environ, start_response)
390 391 # Monolithic build...end of module: rocket\futures.py 392 # Monolithic build...start of module: rocket\listener.py 393 394 # Import System Modules 395 import os 396 import socket 397 import logging 398 import traceback 399 from threading import Thread 400 401 try: 402 import ssl 403 from ssl import SSLError 404 has_ssl = True 405 except ImportError: 406 has_ssl = False
407 - class SSLError(socket.error):
408 pass
409 # Import Package Modules 410 # package imports removed in monolithic build 411
412 -class Listener(Thread):
413 """The Listener class is a class responsible for accepting connections 414 and queuing them to be processed by a worker thread.""" 415
416 - def __init__(self, interface, queue_size, active_queue, *args, **kwargs):
417 Thread.__init__(self, *args, **kwargs) 418 419 # Instance variables 420 self.active_queue = active_queue 421 self.interface = interface 422 self.addr = interface[0] 423 self.port = interface[1] 424 self.secure = len(interface) >= 4 425 self.clientcert_req = (len(interface) == 5 and interface[4]) 426 427 self.thread = None 428 self.ready = False 429 430 # Error Log 431 self.err_log = logging.getLogger('Rocket.Errors.Port%i' % self.port) 432 self.err_log.addHandler(NullHandler()) 433 434 # Build the socket 435 listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 436 437 if not listener: 438 self.err_log.error("Failed to get socket.") 439 return 440 441 if self.secure: 442 if not has_ssl: 443 self.err_log.error("ssl module required to serve HTTPS.") 444 return 445 elif not os.path.exists(interface[2]): 446 data = (interface[2], interface[0], interface[1]) 447 self.err_log.error("Cannot find key file " 448 "'%s'. Cannot bind to %s:%s" % data) 449 return 450 elif not os.path.exists(interface[3]): 451 data = (interface[3], interface[0], interface[1]) 452 self.err_log.error("Cannot find certificate file " 453 "'%s'. Cannot bind to %s:%s" % data) 454 return 455 456 if self.clientcert_req and not os.path.exists(interface[4]): 457 data = (interface[4], interface[0], interface[1]) 458 self.err_log.error("Cannot find root ca certificate file " 459 "'%s'. Cannot bind to %s:%s" % data) 460 return 461 462 # Set socket options 463 try: 464 listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 465 except: 466 msg = "Cannot share socket. Using %s:%i exclusively." 467 self.err_log.warning(msg % (self.addr, self.port)) 468 469 try: 470 if not IS_JYTHON: 471 listener.setsockopt(socket.IPPROTO_TCP, 472 socket.TCP_NODELAY, 473 1) 474 except: 475 msg = "Cannot set TCP_NODELAY, things might run a little slower" 476 self.err_log.warning(msg) 477 478 try: 479 listener.bind((self.addr, self.port)) 480 except: 481 msg = "Socket %s:%i in use by other process and it won't share." 482 self.err_log.error(msg % (self.addr, self.port)) 483 else: 484 # We want socket operations to timeout periodically so we can 485 # check if the server is shutting down 486 listener.settimeout(THREAD_STOP_CHECK_INTERVAL) 487 # Listen for new connections allowing queue_size number of 488 # connections to wait before rejecting a connection. 489 listener.listen(queue_size) 490 491 self.listener = listener 492 493 self.ready = True
494
495 - def wrap_socket(self, sock):
496 try: 497 if self.clientcert_req: 498 ca_certs = self.interface[4] 499 cert_reqs = ssl.CERT_OPTIONAL 500 sock = ssl.wrap_socket(sock, 501 keyfile = self.interface[2], 502 certfile = self.interface[3], 503 server_side = True, 504 cert_reqs = cert_reqs, 505 ca_certs = ca_certs, 506 ssl_version = ssl.PROTOCOL_SSLv23) 507 else: 508 sock = ssl.wrap_socket(sock, 509 keyfile = self.interface[2], 510 certfile = self.interface[3], 511 server_side = True, 512 ssl_version = ssl.PROTOCOL_SSLv23) 513 514 except SSLError: 515 # Generally this happens when an HTTP request is received on a 516 # secure socket. We don't do anything because it will be detected 517 # by Worker and dealt with appropriately. 518 # self.err_log.error('SSL Error: %s' % traceback.format_exc()) 519 pass 520 521 return sock
522
523 - def start(self):
524 if not self.ready: 525 self.err_log.warning('Listener started when not ready.') 526 return 527 528 if self.thread is not None and self.thread.isAlive(): 529 self.err_log.warning('Listener already running.') 530 return 531 532 self.thread = Thread(target=self.listen, name="Port" + str(self.port)) 533 534 self.thread.start()
535
536 - def isAlive(self):
537 if self.thread is None: 538 return False 539 540 return self.thread.isAlive()
541
542 - def join(self):
543 if self.thread is None: 544 return 545 546 self.ready = False 547 548 self.thread.join() 549 550 del self.thread 551 self.thread = None 552 self.ready = True
553
554 - def listen(self):
555 if __debug__: 556 self.err_log.debug('Entering main loop.') 557 while True: 558 try: 559 sock, addr = self.listener.accept() 560 561 if self.secure: 562 sock = self.wrap_socket(sock) 563 564 self.active_queue.put(((sock, addr), 565 self.interface[1], 566 self.secure)) 567 568 except socket.timeout: 569 # socket.timeout will be raised every THREAD_STOP_CHECK_INTERVAL 570 # seconds. When that happens, we check if it's time to die. 571 572 if not self.ready: 573 if __debug__: 574 self.err_log.debug('Listener exiting.') 575 return 576 else: 577 continue 578 except: 579 self.err_log.error(traceback.format_exc())
580 581 # Monolithic build...end of module: rocket\listener.py 582 # Monolithic build...start of module: rocket\main.py 583 584 # Import System Modules 585 import sys 586 import time 587 import socket 588 import logging 589 import traceback 590 from threading import Lock 591 try: 592 from queue import Queue 593 except ImportError: 594 from Queue import Queue 595 596 # Import Package Modules 597 # package imports removed in monolithic build 598 599 600 601 602 603 # Setup Logging 604 log = logging.getLogger('Rocket') 605 log.addHandler(NullHandler()) 606
607 -class Rocket(object):
608 """The Rocket class is responsible for handling threads and accepting and 609 dispatching connections.""" 610
611 - def __init__(self, 612 interfaces = ('127.0.0.1', 8000), 613 method = 'wsgi', 614 app_info = None, 615 min_threads = None, 616 max_threads = None, 617 queue_size = None, 618 timeout = 600, 619 handle_signals = True):
620 621 self.handle_signals = handle_signals 622 self.startstop_lock = Lock() 623 self.timeout = timeout 624 625 if not isinstance(interfaces, list): 626 self.interfaces = [interfaces] 627 else: 628 self.interfaces = interfaces 629 630 if min_threads is None: 631 min_threads = DEFAULTS['MIN_THREADS'] 632 633 if max_threads is None: 634 max_threads = DEFAULTS['MAX_THREADS'] 635 636 if not queue_size: 637 if hasattr(socket, 'SOMAXCONN'): 638 queue_size = socket.SOMAXCONN 639 else: 640 queue_size = DEFAULTS['LISTEN_QUEUE_SIZE'] 641 642 if max_threads and queue_size > max_threads: 643 queue_size = max_threads 644 645 if isinstance(app_info, dict): 646 app_info['server_software'] = SERVER_SOFTWARE 647 648 self.monitor_queue = Queue() 649 self.active_queue = Queue() 650 651 self._threadpool = ThreadPool(get_method(method), 652 app_info = app_info, 653 active_queue = self.active_queue, 654 monitor_queue = self.monitor_queue, 655 min_threads = min_threads, 656 max_threads = max_threads) 657 658 # Build our socket listeners 659 self.listeners = [Listener(i, queue_size, self.active_queue) for i in self.interfaces] 660 for ndx in range(len(self.listeners)-1, 0, -1): 661 if not self.listeners[ndx].ready: 662 del self.listeners[ndx] 663 664 if not self.listeners: 665 log.critical("No interfaces to listen on...closing.") 666 sys.exit(1)
667
668 - def _sigterm(self, signum, frame):
669 log.info('Received SIGTERM') 670 self.stop()
671
672 - def _sighup(self, signum, frame):
673 log.info('Received SIGHUP') 674 self.restart()
675
676 - def start(self, background=False):
677 log.info('Starting %s' % SERVER_SOFTWARE) 678 679 self.startstop_lock.acquire() 680 681 try: 682 # Set up our shutdown signals 683 if self.handle_signals: 684 try: 685 import signal 686 signal.signal(signal.SIGTERM, self._sigterm) 687 signal.signal(signal.SIGUSR1, self._sighup) 688 except: 689 log.debug('This platform does not support signals.') 690 691 # Start our worker threads 692 self._threadpool.start() 693 694 # Start our monitor thread 695 self._monitor = Monitor(self.monitor_queue, 696 self.active_queue, 697 self.timeout, 698 self._threadpool) 699 self._monitor.setDaemon(True) 700 self._monitor.start() 701 702 # I know that EXPR and A or B is bad but I'm keeping it for Py2.4 703 # compatibility. 704 str_extract = lambda l: (l.addr, l.port, l.secure and '*' or '') 705 706 msg = 'Listening on sockets: ' 707 msg += ', '.join(['%s:%i%s' % str_extract(l) for l in self.listeners]) 708 log.info(msg) 709 710 for l in self.listeners: 711 l.start() 712 713 finally: 714 self.startstop_lock.release() 715 716 if background: 717 return 718 719 while self._monitor.isAlive(): 720 try: 721 time.sleep(THREAD_STOP_CHECK_INTERVAL) 722 except KeyboardInterrupt: 723 # Capture a keyboard interrupt when running from a console 724 break 725 except: 726 if self._monitor.isAlive(): 727 log.error(traceback.format_exc()) 728 continue 729 730 return self.stop()
731
732 - def stop(self, stoplogging = False):
733 log.info('Stopping %s' % SERVER_SOFTWARE) 734 735 self.startstop_lock.acquire() 736 737 try: 738 # Stop listeners 739 for l in self.listeners: 740 l.ready = False 741 742 # Encourage a context switch 743 time.sleep(0.01) 744 745 for l in self.listeners: 746 if l.isAlive(): 747 l.join() 748 749 # Stop Monitor 750 self._monitor.stop() 751 if self._monitor.isAlive(): 752 self._monitor.join() 753 754 # Stop Worker threads 755 self._threadpool.stop() 756 757 if stoplogging: 758 logging.shutdown() 759 msg = "Calling logging.shutdown() is now the responsibility of \ 760 the application developer. Please update your \ 761 applications to no longer call rocket.stop(True)" 762 try: 763 import warnings 764 raise warnings.DeprecationWarning(msg) 765 except ImportError: 766 raise RuntimeError(msg) 767 768 finally: 769 self.startstop_lock.release()
770
771 - def restart(self):
772 self.stop() 773 self.start()
774
775 -def CherryPyWSGIServer(bind_addr, 776 wsgi_app, 777 numthreads = 10, 778 server_name = None, 779 max = -1, 780 request_queue_size = 5, 781 timeout = 10, 782 shutdown_timeout = 5):
783 """ A Cherrypy wsgiserver-compatible wrapper. """ 784 max_threads = max 785 if max_threads < 0: 786 max_threads = 0 787 return Rocket(bind_addr, 'wsgi', {'wsgi_app': wsgi_app}, 788 min_threads = numthreads, 789 max_threads = max_threads, 790 queue_size = request_queue_size, 791 timeout = timeout)
792 793 # Monolithic build...end of module: rocket\main.py 794 # Monolithic build...start of module: rocket\monitor.py 795 796 # Import System Modules 797 import time 798 import logging 799 import select 800 from threading import Thread 801 802 # Import Package Modules 803 # package imports removed in monolithic build 804
805 -class Monitor(Thread):
806 # Monitor worker class. 807
808 - def __init__(self, 809 monitor_queue, 810 active_queue, 811 timeout, 812 threadpool, 813 *args, 814 **kwargs):
815 816 Thread.__init__(self, *args, **kwargs) 817 818 self._threadpool = threadpool 819 820 # Instance Variables 821 self.monitor_queue = monitor_queue 822 self.active_queue = active_queue 823 self.timeout = timeout 824 825 self.log = logging.getLogger('Rocket.Monitor') 826 self.log.addHandler(NullHandler()) 827 828 self.connections = set() 829 self.active = False
830
831 - def run(self):
832 self.active = True 833 conn_list = list() 834 list_changed = False 835 836 # We need to make sure the queue is empty before we start 837 while not self.monitor_queue.empty(): 838 self.monitor_queue.get() 839 840 if __debug__: 841 self.log.debug('Entering monitor loop.') 842 843 # Enter thread main loop 844 while self.active: 845 846 # Move the queued connections to the selection pool 847 while not self.monitor_queue.empty(): 848 if __debug__: 849 self.log.debug('In "receive timed-out connections" loop.') 850 851 c = self.monitor_queue.get() 852 853 if c is None: 854 # A non-client is a signal to die 855 if __debug__: 856 self.log.debug('Received a death threat.') 857 self.stop() 858 break 859 860 self.log.debug('Received a timed out connection.') 861 862 if __debug__: 863 assert(c not in self.connections) 864 865 if IS_JYTHON: 866 # Jython requires a socket to be in Non-blocking mode in 867 # order to select on it. 868 c.setblocking(False) 869 870 if __debug__: 871 self.log.debug('Adding connection to monitor list.') 872 873 self.connections.add(c) 874 list_changed = True 875 876 # Wait on those connections 877 if list_changed: 878 conn_list = list(self.connections) 879 list_changed = False 880 881 try: 882 if len(conn_list): 883 readable = select.select(conn_list, 884 [], 885 [], 886 THREAD_STOP_CHECK_INTERVAL)[0] 887 else: 888 time.sleep(THREAD_STOP_CHECK_INTERVAL) 889 readable = [] 890 891 if not self.active: 892 break 893 894 # If we have any readable connections, put them back 895 for r in readable: 896 if __debug__: 897 self.log.debug('Restoring readable connection') 898 899 if IS_JYTHON: 900 # Jython requires a socket to be in Non-blocking mode in 901 # order to select on it, but the rest of the code requires 902 # that it be in blocking mode. 903 r.setblocking(True) 904 905 r.start_time = time.time() 906 self.active_queue.put(r) 907 908 self.connections.remove(r) 909 list_changed = True 910 911 except: 912 if self.active: 913 raise 914 else: 915 break 916 917 # If we have any stale connections, kill them off. 918 if self.timeout: 919 now = time.time() 920 stale = set() 921 for c in self.connections: 922 if (now - c.start_time) >= self.timeout: 923 stale.add(c) 924 925 for c in stale: 926 if __debug__: 927 # "EXPR and A or B" kept for Py2.4 compatibility 928 data = (c.client_addr, c.server_port, c.ssl and '*' or '') 929 self.log.debug('Flushing stale connection: %s:%i%s' % data) 930 931 self.connections.remove(c) 932 list_changed = True 933 934 try: 935 c.close() 936 finally: 937 del c 938 939 # Dynamically resize the threadpool to adapt to our changing needs. 940 self._threadpool.dynamic_resize()
941 942
943 - def stop(self):
944 self.active = False 945 946 if __debug__: 947 self.log.debug('Flushing waiting connections') 948 949 while self.connections: 950 c = self.connections.pop() 951 try: 952 c.close() 953 finally: 954 del c 955 956 if __debug__: 957 self.log.debug('Flushing queued connections') 958 959 while not self.monitor_queue.empty(): 960 c = self.monitor_queue.get() 961 962 if c is None: 963 continue 964 965 try: 966 c.close() 967 finally: 968 del c 969 970 # Place a None sentry value to cause the monitor to die. 971 self.monitor_queue.put(None)
972 973 # Monolithic build...end of module: rocket\monitor.py 974 # Monolithic build...start of module: rocket\threadpool.py 975 976 # Import System Modules 977 import logging 978 # Import Package Modules 979 # package imports removed in monolithic build 980 981 982 # Setup Logging 983 log = logging.getLogger('Rocket.Errors.ThreadPool') 984 log.addHandler(NullHandler()) 985
986 -class ThreadPool:
987 """The ThreadPool class is a container class for all the worker threads. It 988 manages the number of actively running threads.""" 989
990 - def __init__(self, 991 method, 992 app_info, 993 active_queue, 994 monitor_queue, 995 min_threads=DEFAULTS['MIN_THREADS'], 996 max_threads=DEFAULTS['MAX_THREADS'], 997 ):
998 999 if __debug__: 1000 log.debug("Initializing ThreadPool.") 1001 1002 self.check_for_dead_threads = 0 1003 self.active_queue = active_queue 1004 1005 self.worker_class = method 1006 self.min_threads = min_threads 1007 self.max_threads = max_threads 1008 self.monitor_queue = monitor_queue 1009 self.stop_server = False 1010 self.alive = False 1011 1012 # TODO - Optimize this based on some real-world usage data 1013 self.grow_threshold = int(max_threads/10) + 2 1014 1015 if not isinstance(app_info, dict): 1016 app_info = dict() 1017 1018 if has_futures and app_info.get('futures'): 1019 app_info['executor'] = WSGIExecutor(max([DEFAULTS['MIN_THREADS'], 1020 2])) 1021 1022 app_info.update(max_threads=max_threads, 1023 min_threads=min_threads) 1024 1025 self.min_threads = min_threads 1026 self.app_info = app_info 1027 1028 self.threads = set()
1029
1030 - def start(self):
1031 self.stop_server = False 1032 if __debug__: 1033 log.debug("Starting threads.") 1034 1035 self.grow(self.min_threads) 1036 1037 self.alive = True
1038
1039 - def stop(self):
1040 self.alive = False 1041 1042 if __debug__: 1043 log.debug("Stopping threads.") 1044 1045 self.stop_server = True 1046 1047 # Prompt the threads to die 1048 self.shrink(len(self.threads)) 1049 1050 # Stop futures initially 1051 if has_futures and self.app_info.get('futures'): 1052 if __debug__: 1053 log.debug("Future executor is present. Python will not " 1054 "exit until all jobs have finished.") 1055 self.app_info['executor'].shutdown(wait=False) 1056 1057 # Give them the gun 1058 #active_threads = [t for t in self.threads if t.isAlive()] 1059 #while active_threads: 1060 # t = active_threads.pop() 1061 # t.kill() 1062 1063 # Wait until they pull the trigger 1064 for t in self.threads: 1065 if t.isAlive(): 1066 t.join() 1067 1068 # Clean up the mess 1069 self.bring_out_your_dead()
1070
1071 - def bring_out_your_dead(self):
1072 # Remove dead threads from the pool 1073 1074 dead_threads = [t for t in self.threads if not t.isAlive()] 1075 for t in dead_threads: 1076 if __debug__: 1077 log.debug("Removing dead thread: %s." % t.getName()) 1078 try: 1079 # Py2.4 complains here so we put it in a try block 1080 self.threads.remove(t) 1081 except: 1082 pass 1083 self.check_for_dead_threads -= len(dead_threads)
1084
1085 - def grow(self, amount=None):
1086 if self.stop_server: 1087 return 1088 1089 if not amount: 1090 amount = self.max_threads 1091 1092 if self.alive: 1093 amount = min([amount, self.max_threads - len(self.threads)]) 1094 1095 if __debug__: 1096 log.debug("Growing by %i." % amount) 1097 1098 for x in range(amount): 1099 worker = self.worker_class(self.app_info, 1100 self.active_queue, 1101 self.monitor_queue) 1102 1103 worker.setDaemon(True) 1104 self.threads.add(worker) 1105 worker.start()
1106
1107 - def shrink(self, amount=1):
1108 if __debug__: 1109 log.debug("Shrinking by %i." % amount) 1110 1111 self.check_for_dead_threads += amount 1112 1113 for x in range(amount): 1114 self.active_queue.put(None)
1115
1116 - def dynamic_resize(self):
1117 if (self.max_threads > self.min_threads or self.max_threads == 0): 1118 if self.check_for_dead_threads > 0: 1119 self.bring_out_your_dead() 1120 1121 queueSize = self.active_queue.qsize() 1122 threadCount = len(self.threads) 1123 1124 if __debug__: 1125 log.debug("Examining ThreadPool. %i threads and %i Q'd conxions" 1126 % (threadCount, queueSize)) 1127 1128 if queueSize == 0 and threadCount > self.min_threads: 1129 self.shrink() 1130 1131 elif queueSize > self.grow_threshold: 1132 1133 self.grow(queueSize)
1134 1135 # Monolithic build...end of module: rocket\threadpool.py 1136 # Monolithic build...start of module: rocket\worker.py 1137 1138 # Import System Modules 1139 import re 1140 import sys 1141 import socket 1142 import logging 1143 import traceback 1144 from wsgiref.headers import Headers 1145 from threading import Thread 1146 from datetime import datetime 1147 1148 try: 1149 from urllib import unquote 1150 except ImportError: 1151 from urllib.parse import unquote 1152 1153 try: 1154 from io import StringIO 1155 except ImportError: 1156 try: 1157 from cStringIO import StringIO 1158 except ImportError: 1159 from StringIO import StringIO 1160 1161 try: 1162 from ssl import SSLError 1163 except ImportError:
1164 - class SSLError(socket.error):
1165 pass
1166 # Import Package Modules 1167 # package imports removed in monolithic build 1168 1169 1170 # Define Constants 1171 re_SLASH = re.compile('%2F', re.IGNORECASE) 1172 re_REQUEST_LINE = re.compile(r"""^ 1173 (?P<method>OPTIONS|GET|HEAD|POST|PUT|DELETE|TRACE|CONNECT) # Request Method 1174 \ # (single space) 1175 ( 1176 (?P<scheme>[^:/]+) # Scheme 1177 (://) # 1178 (?P<host>[^/]+) # Host 1179 )? # 1180 (?P<path>(\*|/[^ \?]*)) # Path 1181 (\? (?P<query_string>[^ ]+))? # Query String 1182 \ # (single space) 1183 (?P<protocol>HTTPS?/1\.[01]) # Protocol 1184 $ 1185 """, re.X) 1186 LOG_LINE = '%(client_ip)s - "%(request_line)s" - %(status)s %(size)s' 1187 RESPONSE = '''\ 1188 HTTP/1.1 %s 1189 Content-Length: %i 1190 Content-Type: %s 1191 1192 %s 1193 ''' 1194 if IS_JYTHON: 1195 HTTP_METHODS = set(['OPTIONS', 'GET', 'HEAD', 'POST', 'PUT', 'DELETE', 'TRACE', 'CONNECT']) 1196
1197 -class Worker(Thread):
1198 """The Worker class is a base class responsible for receiving connections 1199 and (a subclass) will run an application to process the the connection """ 1200
1201 - def __init__(self, 1202 app_info, 1203 active_queue, 1204 monitor_queue, 1205 *args, 1206 **kwargs):
1207 1208 Thread.__init__(self, *args, **kwargs) 1209 1210 # Instance Variables 1211 self.app_info = app_info 1212 self.active_queue = active_queue 1213 self.monitor_queue = monitor_queue 1214 1215 self.size = 0 1216 self.status = "200 OK" 1217 self.closeConnection = True 1218 self.request_line = "" 1219 1220 # Request Log 1221 self.req_log = logging.getLogger('Rocket.Requests') 1222 self.req_log.addHandler(NullHandler()) 1223 1224 # Error Log 1225 self.err_log = logging.getLogger('Rocket.Errors.'+self.getName()) 1226 self.err_log.addHandler(NullHandler())
1227
1228 - def _handleError(self, typ, val, tb):
1229 if typ == SSLError: 1230 if 'timed out' in val.args[0]: 1231 typ = SocketTimeout 1232 if typ == SocketTimeout: 1233 if __debug__: 1234 self.err_log.debug('Socket timed out') 1235 self.monitor_queue.put(self.conn) 1236 return True 1237 if typ == SocketClosed: 1238 self.closeConnection = True 1239 if __debug__: 1240 self.err_log.debug('Client closed socket') 1241 return False 1242 if typ == BadRequest: 1243 self.closeConnection = True 1244 if __debug__: 1245 self.err_log.debug('Client sent a bad request') 1246 return True 1247 if typ == socket.error: 1248 self.closeConnection = True 1249 if val.args[0] in IGNORE_ERRORS_ON_CLOSE: 1250 if __debug__: 1251 self.err_log.debug('Ignorable socket Error received...' 1252 'closing connection.') 1253 return False 1254 else: 1255 self.status = "999 Utter Server Failure" 1256 tb_fmt = traceback.format_exception(typ, val, tb) 1257 self.err_log.error('Unhandled Error when serving ' 1258 'connection:\n' + '\n'.join(tb_fmt)) 1259 return False 1260 1261 self.closeConnection = True 1262 tb_fmt = traceback.format_exception(typ, val, tb) 1263 self.err_log.error('\n'.join(tb_fmt)) 1264 self.send_response('500 Server Error') 1265 return False
1266
1267 - def run(self):
1268 if __debug__: 1269 self.err_log.debug('Entering main loop.') 1270 1271 # Enter thread main loop 1272 while True: 1273 conn = self.active_queue.get() 1274 1275 if not conn: 1276 # A non-client is a signal to die 1277 if __debug__: 1278 self.err_log.debug('Received a death threat.') 1279 return conn 1280 1281 if isinstance(conn, tuple): 1282 conn = Connection(*conn) 1283 1284 self.conn = conn 1285 1286 if conn.ssl != conn.secure: 1287 self.err_log.info('Received HTTP connection on HTTPS port.') 1288 self.send_response('400 Bad Request') 1289 self.closeConnection = True 1290 conn.close() 1291 continue 1292 else: 1293 if __debug__: 1294 self.err_log.debug('Received a connection.') 1295 self.closeConnection = False 1296 1297 # Enter connection serve loop 1298 while True: 1299 if __debug__: 1300 self.err_log.debug('Serving a request') 1301 try: 1302 self.run_app(conn) 1303 log_info = dict(client_ip = conn.client_addr, 1304 time = datetime.now().strftime('%c'), 1305 status = self.status.split(' ')[0], 1306 size = self.size, 1307 request_line = self.request_line) 1308 self.req_log.info(LOG_LINE % log_info) 1309 except: 1310 exc = sys.exc_info() 1311 handled = self._handleError(*exc) 1312 if handled: 1313 break 1314 else: 1315 if self.request_line: 1316 log_info = dict(client_ip = conn.client_addr, 1317 time = datetime.now().strftime('%c'), 1318 status = self.status.split(' ')[0], 1319 size = self.size, 1320 request_line = self.request_line + ' - not stopping') 1321 self.req_log.info(LOG_LINE % log_info) 1322 1323 if self.closeConnection: 1324 try: 1325 conn.close() 1326 except: 1327 self.err_log.error(str(traceback.format_exc())) 1328 1329 break
1330
1331 - def run_app(self, conn):
1332 # Must be overridden with a method reads the request from the socket 1333 # and sends a response. 1334 self.closeConnection = True 1335 raise NotImplementedError('Overload this method!')
1336
1337 - def send_response(self, status):
1338 stat_msg = status.split(' ', 1)[1] 1339 msg = RESPONSE % (status, 1340 len(stat_msg), 1341 'text/plain', 1342 stat_msg) 1343 try: 1344 self.conn.sendall(b(msg)) 1345 except socket.error: 1346 self.closeConnection = True 1347 self.err_log.error('Tried to send "%s" to client but received socket' 1348 ' error' % status)
1349 1350 #def kill(self): 1351 # if self.isAlive() and hasattr(self, 'conn'): 1352 # try: 1353 # self.conn.shutdown(socket.SHUT_RDWR) 1354 # except socket.error: 1355 # info = sys.exc_info() 1356 # if info[1].args[0] != socket.EBADF: 1357 # self.err_log.debug('Error on shutdown: '+str(info)) 1358
1359 - def read_request_line(self, sock_file):
1360 self.request_line = '' 1361 try: 1362 # Grab the request line 1363 d = sock_file.readline() 1364 if PY3K: 1365 d = d.decode('ISO-8859-1') 1366 1367 if d == '\r\n': 1368 # Allow an extra NEWLINE at the beginning per HTTP 1.1 spec 1369 if __debug__: 1370 self.err_log.debug('Client sent newline') 1371 1372 d = sock_file.readline() 1373 if PY3K: 1374 d = d.decode('ISO-8859-1') 1375 except socket.timeout: 1376 raise SocketTimeout("Socket timed out before request.") 1377 1378 d = d.strip() 1379 1380 if not d: 1381 if __debug__: 1382 self.err_log.debug('Client did not send a recognizable request.') 1383 raise SocketClosed('Client closed socket.') 1384 1385 self.request_line = d 1386 1387 # NOTE: I've replaced the traditional method of procedurally breaking 1388 # apart the request line with a (rather unsightly) regular expression. 1389 # However, Java's regexp support sucks so bad that it actually takes 1390 # longer in Jython to process the regexp than procedurally. So I've 1391 # left the old code here for Jython's sake...for now. 1392 if IS_JYTHON: 1393 return self._read_request_line_jython(d) 1394 1395 match = re_REQUEST_LINE.match(d) 1396 1397 if not match: 1398 self.send_response('400 Bad Request') 1399 raise BadRequest 1400 1401 req = match.groupdict() 1402 for k,v in req.items(): 1403 if not v: 1404 req[k] = "" 1405 if k == 'path': 1406 req['path'] = r'%2F'.join([unquote(x) for x in re_SLASH.split(v)]) 1407 1408 return req
1409
1410 - def _read_request_line_jython(self, d):
1411 d = d.strip() 1412 try: 1413 method, uri, proto = d.split(' ') 1414 if not proto.startswith('HTTP') or \ 1415 proto[-3:] not in ('1.0', '1.1') or \ 1416 method not in HTTP_METHODS: 1417 self.send_response('400 Bad Request') 1418 raise BadRequest 1419 except ValueError: 1420 self.send_response('400 Bad Request') 1421 raise BadRequest 1422 1423 req = dict(method=method, protocol = proto) 1424 scheme = '' 1425 host = '' 1426 if uri == '*' or uri.startswith('/'): 1427 path = uri 1428 elif '://' in uri: 1429 scheme, rest = uri.split('://') 1430 host, path = rest.split('/', 1) 1431 path = '/' + path 1432 else: 1433 self.send_response('400 Bad Request') 1434 raise BadRequest 1435 1436 query_string = '' 1437 if '?' in path: 1438 path, query_string = path.split('?', 1) 1439 1440 path = r'%2F'.join([unquote(x) for x in re_SLASH.split(path)]) 1441 1442 req.update(path=path, 1443 query_string=query_string, 1444 scheme=scheme.lower(), 1445 host=host) 1446 return req
1447 1448
1449 - def read_headers(self, sock_file):
1450 try: 1451 headers = dict() 1452 l = sock_file.readline() 1453 1454 lname = None 1455 lval = None 1456 while True: 1457 if PY3K: 1458 try: 1459 l = str(l, 'ISO-8859-1') 1460 except UnicodeDecodeError: 1461 self.err_log.warning('Client sent invalid header: ' + repr(l)) 1462 1463 if l == '\r\n': 1464 break 1465 1466 if l[0] in ' \t' and lname: 1467 # Some headers take more than one line 1468 lval += ',' + l.strip() 1469 else: 1470 # HTTP header values are latin-1 encoded 1471 l = l.split(':', 1) 1472 # HTTP header names are us-ascii encoded 1473 1474 lname = l[0].strip().upper().replace('-', '_') 1475 lval = l[-1].strip() 1476 headers[str(lname)] = str(lval) 1477 1478 l = sock_file.readline() 1479 except socket.timeout: 1480 raise SocketTimeout("Socket timed out before request.") 1481 1482 return headers
1483
1484 -class SocketTimeout(Exception):
1485 "Exception for when a socket times out between requests." 1486 pass
1487
1488 -class BadRequest(Exception):
1489 "Exception for when a client sends an incomprehensible request." 1490 pass
1491
1492 -class SocketClosed(Exception):
1493 "Exception for when a socket is closed by the client." 1494 pass
1495
1496 -class ChunkedReader(object):
1497 - def __init__(self, sock_file):
1498 self.stream = sock_file 1499 self.chunk_size = 0
1500
1501 - def _read_header(self):
1502 chunk_len = "" 1503 try: 1504 while "" == chunk_len: 1505 chunk_len = self.stream.readline().strip() 1506 return int(chunk_len, 16) 1507 except ValueError: 1508 return 0
1509
1510 - def read(self, size):
1511 data = b('') 1512 chunk_size = self.chunk_size 1513 while size: 1514 if not chunk_size: 1515 chunk_size = self._read_header() 1516 1517 if size < chunk_size: 1518 data += self.stream.read(size) 1519 chunk_size -= size 1520 break 1521 else: 1522 if not chunk_size: 1523 break 1524 data += self.stream.read(chunk_size) 1525 size -= chunk_size 1526 chunk_size = 0 1527 1528 self.chunk_size = chunk_size 1529 return data
1530
1531 - def readline(self):
1532 data = b('') 1533 c = self.read(1) 1534 while c and c != b('\n'): 1535 data += c 1536 c = self.read(1) 1537 data += c 1538 return data
1539
1540 - def readlines(self):
1541 yield self.readline()
1542
1543 -def get_method(method):
1544 1545 1546 methods = dict(wsgi=WSGIWorker, 1547 fs=FileSystemWorker) 1548 return methods[method.lower()]
1549 1550 # Monolithic build...end of module: rocket\worker.py 1551 # Monolithic build...start of module: rocket\methods\__init__.py 1552 1553 # Monolithic build...end of module: rocket\methods\__init__.py 1554 # Monolithic build...start of module: rocket\methods\fs.py 1555 1556 # Import System Modules 1557 import os 1558 import time 1559 import mimetypes 1560 from email.utils import formatdate 1561 from wsgiref.headers import Headers 1562 from wsgiref.util import FileWrapper 1563 # Import Package Modules 1564 # package imports removed in monolithic build 1565 1566 1567 # Define Constants 1568 CHUNK_SIZE = 2**16 # 64 Kilobyte chunks 1569 HEADER_RESPONSE = '''HTTP/1.1 %s\r\n%s''' 1570 INDEX_HEADER = '''\ 1571 <html> 1572 <head><title>Directory Index: %(path)s</title> 1573 <style> .parent { margin-bottom: 1em; }</style> 1574 </head> 1575 <body><h1>Directory Index: %(path)s</h1> 1576 <table> 1577 <tr><th>Directories</th></tr> 1578 ''' 1579 INDEX_ROW = '''<tr><td><div class="%(cls)s"><a href="/%(link)s">%(name)s</a></div></td></tr>''' 1580 INDEX_FOOTER = '''</table></body></html>\r\n''' 1581
1582 -class LimitingFileWrapper(FileWrapper):
1583 - def __init__(self, limit=None, *args, **kwargs):
1584 self.limit = limit 1585 FileWrapper.__init__(self, *args, **kwargs)
1586
1587 - def read(self, amt):
1588 if amt > self.limit: 1589 amt = self.limit 1590 self.limit -= amt 1591 return FileWrapper.read(self, amt)
1592
1593 -class FileSystemWorker(Worker):
1594 - def __init__(self, *args, **kwargs):
1595 """Builds some instance variables that will last the life of the 1596 thread.""" 1597 1598 Worker.__init__(self, *args, **kwargs) 1599 1600 self.root = os.path.abspath(self.app_info['document_root']) 1601 self.display_index = self.app_info['display_index']
1602
1603 - def serve_file(self, filepath, headers):
1604 filestat = os.stat(filepath) 1605 self.size = filestat.st_size 1606 modtime = time.strftime("%a, %d %b %Y %H:%M:%S GMT", 1607 time.gmtime(filestat.st_mtime)) 1608 self.headers.add_header('Last-Modified', modtime) 1609 if headers.get('if_modified_since') == modtime: 1610 # The browser cache is up-to-date, send a 304. 1611 self.status = "304 Not Modified" 1612 self.data = [] 1613 return 1614 1615 ct = mimetypes.guess_type(filepath)[0] 1616 self.content_type = ct if ct else 'text/plain' 1617 try: 1618 f = open(filepath, 'rb') 1619 self.headers['Pragma'] = 'cache' 1620 self.headers['Cache-Control'] = 'private' 1621 self.headers['Content-Length'] = str(self.size) 1622 if self.etag: 1623 self.headers.add_header('Etag', self.etag) 1624 if self.expires: 1625 self.headers.add_header('Expires', self.expires) 1626 1627 try: 1628 # Implement 206 partial file support. 1629 start, end = headers['range'].split('-') 1630 start = 0 if not start.isdigit() else int(start) 1631 end = self.size if not end.isdigit() else int(end) 1632 if self.size < end or start < 0: 1633 self.status = "214 Unsatisfiable Range Requested" 1634 self.data = FileWrapper(f, CHUNK_SIZE) 1635 else: 1636 f.seek(start) 1637 self.data = LimitingFileWrapper(f, CHUNK_SIZE, limit=end) 1638 self.status = "206 Partial Content" 1639 except: 1640 self.data = FileWrapper(f, CHUNK_SIZE) 1641 except IOError: 1642 self.status = "403 Forbidden"
1643
1644 - def serve_dir(self, pth, rpth):
1645 def rel_path(path): 1646 return os.path.normpath(path[len(self.root):] if path.startswith(self.root) else path)
1647 1648 if not self.display_index: 1649 self.status = '404 File Not Found' 1650 return b('') 1651 else: 1652 self.content_type = 'text/html' 1653 1654 dir_contents = [os.path.join(pth, x) for x in os.listdir(os.path.normpath(pth))] 1655 dir_contents.sort() 1656 1657 dirs = [rel_path(x)+'/' for x in dir_contents if os.path.isdir(x)] 1658 files = [rel_path(x) for x in dir_contents if os.path.isfile(x)] 1659 1660 self.data = [INDEX_HEADER % dict(path='/'+rpth)] 1661 if rpth: 1662 self.data += [INDEX_ROW % dict(name='(parent directory)', cls='dir parent', link='/'.join(rpth[:-1].split('/')[:-1]))] 1663 self.data += [INDEX_ROW % dict(name=os.path.basename(x[:-1]), link=os.path.join(rpth, os.path.basename(x[:-1])).replace('\\', '/'), cls='dir') for x in dirs] 1664 self.data += ['<tr><th>Files</th></tr>'] 1665 self.data += [INDEX_ROW % dict(name=os.path.basename(x), link=os.path.join(rpth, os.path.basename(x)).replace('\\', '/'), cls='file') for x in files] 1666 self.data += [INDEX_FOOTER] 1667 self.headers['Content-Length'] = self.size = str(sum([len(x) for x in self.data])) 1668 self.status = '200 OK'
1669
1670 - def run_app(self, conn):
1671 self.status = "200 OK" 1672 self.size = 0 1673 self.expires = None 1674 self.etag = None 1675 self.content_type = 'text/plain' 1676 self.content_length = None 1677 1678 if __debug__: 1679 self.err_log.debug('Getting sock_file') 1680 1681 # Build our file-like object 1682 sock_file = conn.makefile('rb',BUF_SIZE) 1683 request = self.read_request_line(sock_file) 1684 if request['method'].upper() not in ('GET', ): 1685 self.status = "501 Not Implemented" 1686 1687 try: 1688 # Get our file path 1689 headers = dict([(str(k.lower()), v) for k, v in self.read_headers(sock_file).items()]) 1690 rpath = request.get('path', '').lstrip('/') 1691 filepath = os.path.join(self.root, rpath) 1692 filepath = os.path.abspath(filepath) 1693 if __debug__: 1694 self.err_log.debug('Request for path: %s' % filepath) 1695 1696 self.closeConnection = headers.get('connection', 'close').lower() == 'close' 1697 self.headers = Headers([('Date', formatdate(usegmt=True)), 1698 ('Server', HTTP_SERVER_SOFTWARE), 1699 ('Connection', headers.get('connection', 'close')), 1700 ]) 1701 1702 if not filepath.lower().startswith(self.root.lower()): 1703 # File must be within our root directory 1704 self.status = "400 Bad Request" 1705 self.closeConnection = True 1706 elif not os.path.exists(filepath): 1707 self.status = "404 File Not Found" 1708 self.closeConnection = True 1709 elif os.path.isdir(filepath): 1710 self.serve_dir(filepath, rpath) 1711 elif os.path.isfile(filepath): 1712 self.serve_file(filepath, headers) 1713 else: 1714 # It exists but it's not a file or a directory???? 1715 # What is it then? 1716 self.status = "501 Not Implemented" 1717 self.closeConnection = True 1718 1719 h = self.headers 1720 statcode, statstr = self.status.split(' ', 1) 1721 statcode = int(statcode) 1722 if statcode >= 400: 1723 h.add_header('Content-Type', self.content_type) 1724 self.data = [statstr] 1725 1726 # Build our output headers 1727 header_data = HEADER_RESPONSE % (self.status, str(h)) 1728 1729 # Send the headers 1730 if __debug__: 1731 self.err_log.debug('Sending Headers: %s' % repr(header_data)) 1732 self.conn.sendall(b(header_data)) 1733 1734 for data in self.data: 1735 self.conn.sendall(b(data)) 1736 1737 if hasattr(self.data, 'close'): 1738 self.data.close() 1739 1740 finally: 1741 if __debug__: 1742 self.err_log.debug('Finally closing sock_file') 1743 sock_file.close()
1744 1745 # Monolithic build...end of module: rocket\methods\fs.py 1746 # Monolithic build...start of module: rocket\methods\wsgi.py 1747 1748 # Import System Modules 1749 import sys 1750 import socket 1751 from wsgiref.headers import Headers 1752 from wsgiref.util import FileWrapper 1753 1754 # Import Package Modules 1755 # package imports removed in monolithic build 1756 1757 1758 1759 if PY3K: 1760 from email.utils import formatdate 1761 else: 1762 # Caps Utils for Py2.4 compatibility 1763 from email.Utils import formatdate 1764 1765 # Define Constants 1766 NEWLINE = b('\r\n') 1767 HEADER_RESPONSE = '''HTTP/1.1 %s\r\n%s''' 1768 BASE_ENV = {'SERVER_NAME': SERVER_NAME, 1769 'SCRIPT_NAME': '', # Direct call WSGI does not need a name 1770 'wsgi.errors': sys.stderr, 1771 'wsgi.version': (1, 0), 1772 'wsgi.multiprocess': False, 1773 'wsgi.run_once': False, 1774 'wsgi.file_wrapper': FileWrapper 1775 } 1776
1777 -class WSGIWorker(Worker):
1778 - def __init__(self, *args, **kwargs):
1779 """Builds some instance variables that will last the life of the 1780 thread.""" 1781 Worker.__init__(self, *args, **kwargs) 1782 1783 if isinstance(self.app_info, dict): 1784 multithreaded = self.app_info.get('max_threads') != 1 1785 else: 1786 multithreaded = False 1787 self.base_environ = dict({'SERVER_SOFTWARE': self.app_info['server_software'], 1788 'wsgi.multithread': multithreaded, 1789 }) 1790 self.base_environ.update(BASE_ENV) 1791 1792 # Grab our application 1793 self.app = self.app_info.get('wsgi_app') 1794 1795 if not hasattr(self.app, "__call__"): 1796 raise TypeError("The wsgi_app specified (%s) is not a valid WSGI application." % repr(self.app)) 1797 1798 # Enable futures 1799 if has_futures and self.app_info.get('futures'): 1800 executor = self.app_info['executor'] 1801 self.base_environ.update({"wsgiorg.executor": executor, 1802 "wsgiorg.futures": executor.futures})
1803
1804 - def build_environ(self, sock_file, conn):
1805 """ Build the execution environment. """ 1806 # Grab the request line 1807 request = self.read_request_line(sock_file) 1808 1809 # Copy the Base Environment 1810 environ = self.base_environ.copy() 1811 1812 # Grab the headers 1813 for k, v in self.read_headers(sock_file).items(): 1814 environ[str('HTTP_'+k)] = v 1815 1816 # Add CGI Variables 1817 environ['REQUEST_METHOD'] = request['method'] 1818 environ['PATH_INFO'] = request['path'] 1819 environ['SERVER_PROTOCOL'] = request['protocol'] 1820 environ['SERVER_PORT'] = str(conn.server_port) 1821 environ['REMOTE_PORT'] = str(conn.client_port) 1822 environ['REMOTE_ADDR'] = str(conn.client_addr) 1823 environ['QUERY_STRING'] = request['query_string'] 1824 if 'HTTP_CONTENT_LENGTH' in environ: 1825 environ['CONTENT_LENGTH'] = environ['HTTP_CONTENT_LENGTH'] 1826 if 'HTTP_CONTENT_TYPE' in environ: 1827 environ['CONTENT_TYPE'] = environ['HTTP_CONTENT_TYPE'] 1828 1829 # Save the request method for later 1830 self.request_method = environ['REQUEST_METHOD'] 1831 1832 # Add Dynamic WSGI Variables 1833 if conn.ssl: 1834 environ['wsgi.url_scheme'] = 'https' 1835 environ['HTTPS'] = 'on' 1836 else: 1837 environ['wsgi.url_scheme'] = 'http' 1838 1839 if conn.ssl: 1840 try: 1841 peercert = conn.socket.getpeercert(binary_form=True) 1842 environ['SSL_CLIENT_RAW_CERT'] = \ 1843 peercert and ssl.DER_cert_to_PEM_cert(peercert) 1844 except Exception,e: 1845 print e 1846 1847 if environ.get('HTTP_TRANSFER_ENCODING', '') == 'chunked': 1848 environ['wsgi.input'] = ChunkedReader(sock_file) 1849 else: 1850 environ['wsgi.input'] = sock_file 1851 1852 return environ
1853
1854 - def send_headers(self, data, sections):
1855 h_set = self.header_set 1856 1857 # Does the app want us to send output chunked? 1858 self.chunked = h_set.get('transfer-encoding', '').lower() == 'chunked' 1859 1860 # Add a Date header if it's not there already 1861 if not 'date' in h_set: 1862 h_set['Date'] = formatdate(usegmt=True) 1863 1864 # Add a Server header if it's not there already 1865 if not 'server' in h_set: 1866 h_set['Server'] = HTTP_SERVER_SOFTWARE 1867 1868 if 'content-length' in h_set: 1869 self.size = int(h_set['content-length']) 1870 else: 1871 s = int(self.status.split(' ')[0]) 1872 if s < 200 or s not in (204, 205, 304): 1873 if not self.chunked: 1874 if sections == 1: 1875 # Add a Content-Length header if it's not there already 1876 h_set['Content-Length'] = str(len(data)) 1877 self.size = len(data) 1878 else: 1879 # If they sent us more than one section, we blow chunks 1880 h_set['Transfer-Encoding'] = 'Chunked' 1881 self.chunked = True 1882 if __debug__: 1883 self.err_log.debug('Adding header...' 1884 'Transfer-Encoding: Chunked') 1885 1886 if 'connection' not in h_set: 1887 # If the application did not provide a connection header, fill it in 1888 client_conn = self.environ.get('HTTP_CONNECTION', '').lower() 1889 if self.environ['SERVER_PROTOCOL'] == 'HTTP/1.1': 1890 # HTTP = 1.1 defaults to keep-alive connections 1891 if client_conn: 1892 h_set['Connection'] = client_conn 1893 else: 1894 h_set['Connection'] = 'keep-alive' 1895 else: 1896 # HTTP < 1.1 supports keep-alive but it's quirky so we don't support it 1897 h_set['Connection'] = 'close' 1898 1899 # Close our connection if we need to. 1900 self.closeConnection = h_set.get('connection', '').lower() == 'close' 1901 1902 # Build our output headers 1903 header_data = HEADER_RESPONSE % (self.status, str(h_set)) 1904 1905 # Send the headers 1906 if __debug__: 1907 self.err_log.debug('Sending Headers: %s' % repr(header_data)) 1908 self.conn.sendall(b(header_data)) 1909 self.headers_sent = True
1910
1911 - def write_warning(self, data, sections=None):
1912 self.err_log.warning('WSGI app called write method directly. This is ' 1913 'deprecated behavior. Please update your app.') 1914 return self.write(data, sections)
1915
1916 - def write(self, data, sections=None):
1917 """ Write the data to the output socket. """ 1918 1919 if self.error[0]: 1920 self.status = self.error[0] 1921 data = b(self.error[1]) 1922 1923 if not self.headers_sent: 1924 self.send_headers(data, sections) 1925 1926 if self.request_method != 'HEAD': 1927 try: 1928 if self.chunked: 1929 self.conn.sendall(b('%x\r\n%s\r\n' % (len(data), data))) 1930 else: 1931 self.conn.sendall(data) 1932 except socket.error: 1933 # But some clients will close the connection before that 1934 # resulting in a socket error. 1935 self.closeConnection = True
1936
1937 - def start_response(self, status, response_headers, exc_info=None):
1938 """ Store the HTTP status and headers to be sent when self.write is 1939 called. """ 1940 if exc_info: 1941 try: 1942 if self.headers_sent: 1943 # Re-raise original exception if headers sent 1944 # because this violates WSGI specification. 1945 raise 1946 finally: 1947 exc_info = None 1948 elif self.header_set: 1949 raise AssertionError("Headers already set!") 1950 1951 if PY3K and not isinstance(status, str): 1952 self.status = str(status, 'ISO-8859-1') 1953 else: 1954 self.status = status 1955 # Make sure headers are bytes objects 1956 try: 1957 self.header_set = Headers(response_headers) 1958 except UnicodeDecodeError: 1959 self.error = ('500 Internal Server Error', 1960 'HTTP Headers should be bytes') 1961 self.err_log.error('Received HTTP Headers from client that contain' 1962 ' invalid characters for Latin-1 encoding.') 1963 1964 return self.write_warning
1965
1966 - def run_app(self, conn):
1967 self.size = 0 1968 self.header_set = Headers([]) 1969 self.headers_sent = False 1970 self.error = (None, None) 1971 self.chunked = False 1972 sections = None 1973 output = None 1974 1975 if __debug__: 1976 self.err_log.debug('Getting sock_file') 1977 1978 # Build our file-like object 1979 if PY3K: 1980 sock_file = conn.makefile(mode='rb', buffering=BUF_SIZE) 1981 else: 1982 sock_file = conn.makefile(BUF_SIZE) 1983 1984 try: 1985 # Read the headers and build our WSGI environment 1986 self.environ = environ = self.build_environ(sock_file, conn) 1987 1988 # Handle 100 Continue 1989 if environ.get('HTTP_EXPECT', '') == '100-continue': 1990 res = environ['SERVER_PROTOCOL'] + ' 100 Continue\r\n\r\n' 1991 conn.sendall(b(res)) 1992 1993 # Send it to our WSGI application 1994 output = self.app(environ, self.start_response) 1995 1996 if not hasattr(output, '__len__') and not hasattr(output, '__iter__'): 1997 self.error = ('500 Internal Server Error', 1998 'WSGI applications must return a list or ' 1999 'generator type.') 2000 2001 if hasattr(output, '__len__'): 2002 sections = len(output) 2003 2004 for data in output: 2005 # Don't send headers until body appears 2006 if data: 2007 self.write(data, sections) 2008 2009 if self.chunked: 2010 # If chunked, send our final chunk length 2011 self.conn.sendall(b('0\r\n\r\n')) 2012 elif not self.headers_sent: 2013 # Send headers if the body was empty 2014 self.send_headers('', sections) 2015 2016 # Don't capture exceptions here. The Worker class handles 2017 # them appropriately. 2018 finally: 2019 if __debug__: 2020 self.err_log.debug('Finally closing output and sock_file') 2021 2022 if hasattr(output,'close'): 2023 output.close() 2024 2025 sock_file.close()
2026 2027 # Monolithic build...end of module: rocket\methods\wsgi.py 2028 2029 # 2030 # the following code is not part of Rocket but was added in web2py for testing purposes 2031 # 2032
2033 -def demo_app(environ, start_response):
2034 global static_folder 2035 import os 2036 types = {'htm': 'text/html','html': 'text/html','gif': 'image/gif', 2037 'jpg': 'image/jpeg','png': 'image/png','pdf': 'applications/pdf'} 2038 if static_folder: 2039 if not static_folder.startswith('/'): 2040 static_folder = os.path.join(os.getcwd(),static_folder) 2041 path = os.path.join(static_folder, environ['PATH_INFO'][1:] or 'index.html') 2042 type = types.get(path.split('.')[-1],'text') 2043 if os.path.exists(path): 2044 try: 2045 data = open(path,'rb').read() 2046 start_response('200 OK', [('Content-Type', type)]) 2047 except IOError: 2048 start_response('404 NOT FOUND', []) 2049 data = '404 NOT FOUND' 2050 else: 2051 start_response('500 INTERNAL SERVER ERROR', []) 2052 data = '500 INTERNAL SERVER ERROR' 2053 else: 2054 start_response('200 OK', [('Content-Type', 'text/html')]) 2055 data = '<html><body><h1>Hello from Rocket Web Server</h1></body></html>' 2056 return [data]
2057
2058 -def demo():
2059 from optparse import OptionParser 2060 parser = OptionParser() 2061 parser.add_option("-i", "--ip", dest="ip",default="127.0.0.1", 2062 help="ip address of the network interface") 2063 parser.add_option("-p", "--port", dest="port",default="8000", 2064 help="post where to run web server") 2065 parser.add_option("-s", "--static", dest="static",default=None, 2066 help="folder containing static files") 2067 (options, args) = parser.parse_args() 2068 global static_folder 2069 static_folder = options.static 2070 print 'Rocket running on %s:%s' % (options.ip, options.port) 2071 r=Rocket((options.ip,int(options.port)),'wsgi', {'wsgi_app':demo_app}) 2072 r.start()
2073 2074 if __name__=='__main__': 2075 demo() 2076