1
2
3
4
5
6
7 import sys
8 import errno
9 import socket
10 import logging
11 import platform
12 import traceback
13
14
15 VERSION = '1.2.4'
16 SERVER_NAME = socket.gethostname()
17 SERVER_SOFTWARE = 'Rocket %s' % VERSION
18 HTTP_SERVER_SOFTWARE = '%s Python/%s' % (SERVER_SOFTWARE, sys.version.split(' ')[0])
19 BUF_SIZE = 16384
20 SOCKET_TIMEOUT = 1
21 THREAD_STOP_CHECK_INTERVAL = 1
22 IS_JYTHON = platform.system() == 'Java'
23 IGNORE_ERRORS_ON_CLOSE = set([errno.ECONNABORTED, errno.ECONNRESET])
24 DEFAULT_LISTEN_QUEUE_SIZE = 5
25 DEFAULT_MIN_THREADS = 10
26 DEFAULT_MAX_THREADS = 0
27 DEFAULTS = dict(LISTEN_QUEUE_SIZE = DEFAULT_LISTEN_QUEUE_SIZE,
28 MIN_THREADS = DEFAULT_MIN_THREADS,
29 MAX_THREADS = DEFAULT_MAX_THREADS)
30
31 PY3K = sys.version_info[0] > 2
32
34 "A Logging handler to prevent library errors."
35 - def emit(self, record):
37
38 if PY3K:
40 """ Convert string/unicode/bytes literals into bytes. This allows for
41 the same code to run on Python 2.x and 3.x. """
42 if isinstance(val, str):
43 return val.encode()
44 else:
45 return val
46
47 - def u(val, encoding="us-ascii"):
48 """ Convert bytes into string/unicode. This allows for the
49 same code to run on Python 2.x and 3.x. """
50 if isinstance(val, bytes):
51 return val.decode(encoding)
52 else:
53 return val
54
55 else:
57 """ Convert string/unicode/bytes literals into bytes. This allows for
58 the same code to run on Python 2.x and 3.x. """
59 if isinstance(val, unicode):
60 return val.encode()
61 else:
62 return val
63
64 - def u(val, encoding="us-ascii"):
65 """ Convert bytes into string/unicode. This allows for the
66 same code to run on Python 2.x and 3.x. """
67 if isinstance(val, str):
68 return val.decode(encoding)
69 else:
70 return val
71
72
73
74
75 __all__ = ['VERSION', 'SERVER_SOFTWARE', 'HTTP_SERVER_SOFTWARE', 'BUF_SIZE',
76 'IS_JYTHON', 'IGNORE_ERRORS_ON_CLOSE', 'DEFAULTS', 'PY3K', 'b', 'u',
77 'Rocket', 'CherryPyWSGIServer', 'SERVER_NAME', 'NullHandler']
78
79
80
81
82
83 import sys
84 import time
85 import socket
86 try:
87 import ssl
88 has_ssl = True
89 except ImportError:
90 has_ssl = False
91
92
93
94
95
97 __slots__ = [
98 'setblocking',
99 'sendall',
100 'shutdown',
101 'makefile',
102 'fileno',
103 'client_addr',
104 'client_port',
105 'server_port',
106 'socket',
107 'start_time',
108 'ssl',
109 'secure',
110 'recv',
111 'send',
112 'read',
113 'write'
114 ]
115
116 - def __init__(self, sock_tuple, port, secure=False):
139
140
141
142
143
145 if hasattr(self.socket, '_sock'):
146 try:
147 self.socket._sock.close()
148 except socket.error:
149 info = sys.exc_info()
150 if info[1].args[0] != socket.EBADF:
151 raise info[1]
152 else:
153 pass
154 self.socket.close()
155
156
157
158
159
160
161 import socket
162 try:
163 from io import StringIO
164 except ImportError:
165 try:
166 from cStringIO import StringIO
167 except ImportError:
168 from StringIO import StringIO
169
170
171
174 self.conn = conn
175 self.buf_size = buf_size
176 self.buffer = StringIO()
177 self.content_length = None
178
179 if self.conn.socket.gettimeout() == 0.0:
180 self.read = self.non_blocking_read
181 else:
182 self.read = self.blocking_read
183
186
187 - def recv(self, size):
188 while True:
189 try:
190 return self.conn.recv(size)
191 except socket.error:
192 exc = sys.exc_info()
193 e = exc[1]
194
195 if (e.args[0] not in set()):
196 raise
197
199 data = self.readline()
200 if data == '':
201 raise StopIteration
202 return data
203
205
206 bufr = self.buffer
207 bufr.seek(0, 2)
208 if size is None:
209 while True:
210 data = self.recv(self.buf_size)
211 if not data:
212 break
213 bufr.write(data)
214
215 self.buffer = StringIO()
216
217 return bufr.getvalue()
218 else:
219 buf_len = self.buffer.tell()
220 if buf_len >= size:
221 bufr.seek(0)
222 data = bufr.read(size)
223 self.buffer = StringIO(bufr.read())
224 return data
225
226 self.buffer = StringIO()
227 while True:
228 remaining = size - buf_len
229 data = self.recv(remaining)
230
231 if not data:
232 break
233
234 n = len(data)
235 if n == size and not buf_len:
236 return data
237
238 if n == remaining:
239 bufr.write(data)
240 del data
241 break
242
243 bufr.write(data)
244 buf_len += n
245 del data
246
247 return bufr.getvalue()
248
250 if length is None:
251 if self.content_length is not None:
252 length = self.content_length
253 else:
254 length = 1
255
256 try:
257 data = self.conn.recv(length)
258 except:
259 data = b('')
260
261 return data
262
264 data = b("")
265 char = self.read(1)
266 while char != b('\n') and char is not b(''):
267 line = repr(char)
268 data += char
269 char = self.read(1)
270 data += char
271 return data
272
275
277 self.conn = None
278 self.content_length = None
279
280
281
282
283
284 import time
285 try:
286 from concurrent.futures import Future, ThreadPoolExecutor
287 from concurrent.futures.thread import _WorkItem
288 has_futures = True
289 except ImportError:
290 has_futures = False
291
294
297
300
301
303 - def __init__(self, f_dict, *args, **kwargs):
304 Future.__init__(self, *args, **kwargs)
305
306 self.timeout = None
307
308 self._mem_dict = f_dict
309 self._lifespan = 30
310 self._name = None
311 self._start_time = time.time()
312
318
319
320 - def remember(self, name, lifespan=None):
321 self._lifespan = lifespan or self._lifespan
322
323 if name in self._mem_dict:
324 raise NameError('Cannot remember future by name "%s". ' % name + \
325 'A future already exists with that name.' )
326 self._name = name
327 self._mem_dict[name] = self
328
329 return self
330
332 if self._name in self._mem_dict and self._mem_dict[self._name] is self:
333 del self._mem_dict[self._name]
334 self._name = None
335
337 - def __init__(self, future, fn, args, kwargs):
338 self.future = future
339 self.fn = fn
340 self.args = args
341 self.kwargs = kwargs
342
344 if not self.future.set_running_or_notify_cancel():
345 return
346
347 try:
348 result = self.fn(*self.args, **self.kwargs)
349 except BaseException:
350 e = sys.exc_info()[1]
351 self.future.set_exception(e)
352 else:
353 self.future.set_result(result)
354
356 multithread = True
357 multiprocess = False
358
363
364 - def submit(self, fn, *args, **kwargs):
365 if self._shutdown_lock.acquire():
366 if self._shutdown:
367 self._shutdown_lock.release()
368 raise RuntimeError('Cannot schedule new futures after shutdown')
369
370 f = WSGIFuture(self.futures)
371 w = _WorkItem(f, fn, args, kwargs)
372
373 self._work_queue.put(w)
374 self._adjust_thread_count()
375 self._shutdown_lock.release()
376 return f
377 else:
378 return False
379
381 "Futures middleware that adds a Futures Executor to the environment"
385
386 - def __call__(self, environ, start_response):
390
391
392
393
394
395 import os
396 import socket
397 import logging
398 import traceback
399 from threading import Thread
400
401 try:
402 import ssl
403 from ssl import SSLError
404 has_ssl = True
405 except ImportError:
406 has_ssl = False
409
410
411
413 """The Listener class is a class responsible for accepting connections
414 and queuing them to be processed by a worker thread."""
415
416 - def __init__(self, interface, queue_size, active_queue, *args, **kwargs):
417 Thread.__init__(self, *args, **kwargs)
418
419
420 self.active_queue = active_queue
421 self.interface = interface
422 self.addr = interface[0]
423 self.port = interface[1]
424 self.secure = len(interface) >= 4
425 self.clientcert_req = (len(interface) == 5 and interface[4])
426
427 self.thread = None
428 self.ready = False
429
430
431 self.err_log = logging.getLogger('Rocket.Errors.Port%i' % self.port)
432 self.err_log.addHandler(NullHandler())
433
434
435 listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
436
437 if not listener:
438 self.err_log.error("Failed to get socket.")
439 return
440
441 if self.secure:
442 if not has_ssl:
443 self.err_log.error("ssl module required to serve HTTPS.")
444 return
445 elif not os.path.exists(interface[2]):
446 data = (interface[2], interface[0], interface[1])
447 self.err_log.error("Cannot find key file "
448 "'%s'. Cannot bind to %s:%s" % data)
449 return
450 elif not os.path.exists(interface[3]):
451 data = (interface[3], interface[0], interface[1])
452 self.err_log.error("Cannot find certificate file "
453 "'%s'. Cannot bind to %s:%s" % data)
454 return
455
456 if self.clientcert_req and not os.path.exists(interface[4]):
457 data = (interface[4], interface[0], interface[1])
458 self.err_log.error("Cannot find root ca certificate file "
459 "'%s'. Cannot bind to %s:%s" % data)
460 return
461
462
463 try:
464 listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
465 except:
466 msg = "Cannot share socket. Using %s:%i exclusively."
467 self.err_log.warning(msg % (self.addr, self.port))
468
469 try:
470 if not IS_JYTHON:
471 listener.setsockopt(socket.IPPROTO_TCP,
472 socket.TCP_NODELAY,
473 1)
474 except:
475 msg = "Cannot set TCP_NODELAY, things might run a little slower"
476 self.err_log.warning(msg)
477
478 try:
479 listener.bind((self.addr, self.port))
480 except:
481 msg = "Socket %s:%i in use by other process and it won't share."
482 self.err_log.error(msg % (self.addr, self.port))
483 else:
484
485
486 listener.settimeout(THREAD_STOP_CHECK_INTERVAL)
487
488
489 listener.listen(queue_size)
490
491 self.listener = listener
492
493 self.ready = True
494
496 try:
497 if self.clientcert_req:
498 ca_certs = self.interface[4]
499 cert_reqs = ssl.CERT_OPTIONAL
500 sock = ssl.wrap_socket(sock,
501 keyfile = self.interface[2],
502 certfile = self.interface[3],
503 server_side = True,
504 cert_reqs = cert_reqs,
505 ca_certs = ca_certs,
506 ssl_version = ssl.PROTOCOL_SSLv23)
507 else:
508 sock = ssl.wrap_socket(sock,
509 keyfile = self.interface[2],
510 certfile = self.interface[3],
511 server_side = True,
512 ssl_version = ssl.PROTOCOL_SSLv23)
513
514 except SSLError:
515
516
517
518
519 pass
520
521 return sock
522
524 if not self.ready:
525 self.err_log.warning('Listener started when not ready.')
526 return
527
528 if self.thread is not None and self.thread.isAlive():
529 self.err_log.warning('Listener already running.')
530 return
531
532 self.thread = Thread(target=self.listen, name="Port" + str(self.port))
533
534 self.thread.start()
535
541
543 if self.thread is None:
544 return
545
546 self.ready = False
547
548 self.thread.join()
549
550 del self.thread
551 self.thread = None
552 self.ready = True
553
555 if __debug__:
556 self.err_log.debug('Entering main loop.')
557 while True:
558 try:
559 sock, addr = self.listener.accept()
560
561 if self.secure:
562 sock = self.wrap_socket(sock)
563
564 self.active_queue.put(((sock, addr),
565 self.interface[1],
566 self.secure))
567
568 except socket.timeout:
569
570
571
572 if not self.ready:
573 if __debug__:
574 self.err_log.debug('Listener exiting.')
575 return
576 else:
577 continue
578 except:
579 self.err_log.error(traceback.format_exc())
580
581
582
583
584
585 import sys
586 import time
587 import socket
588 import logging
589 import traceback
590 from threading import Lock
591 try:
592 from queue import Queue
593 except ImportError:
594 from Queue import Queue
595
596
597
598
599
600
601
602
603
604 log = logging.getLogger('Rocket')
605 log.addHandler(NullHandler())
606
608 """The Rocket class is responsible for handling threads and accepting and
609 dispatching connections."""
610
611 - def __init__(self,
612 interfaces = ('127.0.0.1', 8000),
613 method = 'wsgi',
614 app_info = None,
615 min_threads = None,
616 max_threads = None,
617 queue_size = None,
618 timeout = 600,
619 handle_signals = True):
620
621 self.handle_signals = handle_signals
622 self.startstop_lock = Lock()
623 self.timeout = timeout
624
625 if not isinstance(interfaces, list):
626 self.interfaces = [interfaces]
627 else:
628 self.interfaces = interfaces
629
630 if min_threads is None:
631 min_threads = DEFAULTS['MIN_THREADS']
632
633 if max_threads is None:
634 max_threads = DEFAULTS['MAX_THREADS']
635
636 if not queue_size:
637 if hasattr(socket, 'SOMAXCONN'):
638 queue_size = socket.SOMAXCONN
639 else:
640 queue_size = DEFAULTS['LISTEN_QUEUE_SIZE']
641
642 if max_threads and queue_size > max_threads:
643 queue_size = max_threads
644
645 if isinstance(app_info, dict):
646 app_info['server_software'] = SERVER_SOFTWARE
647
648 self.monitor_queue = Queue()
649 self.active_queue = Queue()
650
651 self._threadpool = ThreadPool(get_method(method),
652 app_info = app_info,
653 active_queue = self.active_queue,
654 monitor_queue = self.monitor_queue,
655 min_threads = min_threads,
656 max_threads = max_threads)
657
658
659 self.listeners = [Listener(i, queue_size, self.active_queue) for i in self.interfaces]
660 for ndx in range(len(self.listeners)-1, 0, -1):
661 if not self.listeners[ndx].ready:
662 del self.listeners[ndx]
663
664 if not self.listeners:
665 log.critical("No interfaces to listen on...closing.")
666 sys.exit(1)
667
669 log.info('Received SIGTERM')
670 self.stop()
671
673 log.info('Received SIGHUP')
674 self.restart()
675
676 - def start(self, background=False):
677 log.info('Starting %s' % SERVER_SOFTWARE)
678
679 self.startstop_lock.acquire()
680
681 try:
682
683 if self.handle_signals:
684 try:
685 import signal
686 signal.signal(signal.SIGTERM, self._sigterm)
687 signal.signal(signal.SIGUSR1, self._sighup)
688 except:
689 log.debug('This platform does not support signals.')
690
691
692 self._threadpool.start()
693
694
695 self._monitor = Monitor(self.monitor_queue,
696 self.active_queue,
697 self.timeout,
698 self._threadpool)
699 self._monitor.setDaemon(True)
700 self._monitor.start()
701
702
703
704 str_extract = lambda l: (l.addr, l.port, l.secure and '*' or '')
705
706 msg = 'Listening on sockets: '
707 msg += ', '.join(['%s:%i%s' % str_extract(l) for l in self.listeners])
708 log.info(msg)
709
710 for l in self.listeners:
711 l.start()
712
713 finally:
714 self.startstop_lock.release()
715
716 if background:
717 return
718
719 while self._monitor.isAlive():
720 try:
721 time.sleep(THREAD_STOP_CHECK_INTERVAL)
722 except KeyboardInterrupt:
723
724 break
725 except:
726 if self._monitor.isAlive():
727 log.error(traceback.format_exc())
728 continue
729
730 return self.stop()
731
732 - def stop(self, stoplogging = False):
733 log.info('Stopping %s' % SERVER_SOFTWARE)
734
735 self.startstop_lock.acquire()
736
737 try:
738
739 for l in self.listeners:
740 l.ready = False
741
742
743 time.sleep(0.01)
744
745 for l in self.listeners:
746 if l.isAlive():
747 l.join()
748
749
750 self._monitor.stop()
751 if self._monitor.isAlive():
752 self._monitor.join()
753
754
755 self._threadpool.stop()
756
757 if stoplogging:
758 logging.shutdown()
759 msg = "Calling logging.shutdown() is now the responsibility of \
760 the application developer. Please update your \
761 applications to no longer call rocket.stop(True)"
762 try:
763 import warnings
764 raise warnings.DeprecationWarning(msg)
765 except ImportError:
766 raise RuntimeError(msg)
767
768 finally:
769 self.startstop_lock.release()
770
774
775 -def CherryPyWSGIServer(bind_addr,
776 wsgi_app,
777 numthreads = 10,
778 server_name = None,
779 max = -1,
780 request_queue_size = 5,
781 timeout = 10,
782 shutdown_timeout = 5):
783 """ A Cherrypy wsgiserver-compatible wrapper. """
784 max_threads = max
785 if max_threads < 0:
786 max_threads = 0
787 return Rocket(bind_addr, 'wsgi', {'wsgi_app': wsgi_app},
788 min_threads = numthreads,
789 max_threads = max_threads,
790 queue_size = request_queue_size,
791 timeout = timeout)
792
793
794
795
796
797 import time
798 import logging
799 import select
800 from threading import Thread
801
802
803
804
806
807
808 - def __init__(self,
809 monitor_queue,
810 active_queue,
811 timeout,
812 threadpool,
813 *args,
814 **kwargs):
815
816 Thread.__init__(self, *args, **kwargs)
817
818 self._threadpool = threadpool
819
820
821 self.monitor_queue = monitor_queue
822 self.active_queue = active_queue
823 self.timeout = timeout
824
825 self.log = logging.getLogger('Rocket.Monitor')
826 self.log.addHandler(NullHandler())
827
828 self.connections = set()
829 self.active = False
830
832 self.active = True
833 conn_list = list()
834 list_changed = False
835
836
837 while not self.monitor_queue.empty():
838 self.monitor_queue.get()
839
840 if __debug__:
841 self.log.debug('Entering monitor loop.')
842
843
844 while self.active:
845
846
847 while not self.monitor_queue.empty():
848 if __debug__:
849 self.log.debug('In "receive timed-out connections" loop.')
850
851 c = self.monitor_queue.get()
852
853 if c is None:
854
855 if __debug__:
856 self.log.debug('Received a death threat.')
857 self.stop()
858 break
859
860 self.log.debug('Received a timed out connection.')
861
862 if __debug__:
863 assert(c not in self.connections)
864
865 if IS_JYTHON:
866
867
868 c.setblocking(False)
869
870 if __debug__:
871 self.log.debug('Adding connection to monitor list.')
872
873 self.connections.add(c)
874 list_changed = True
875
876
877 if list_changed:
878 conn_list = list(self.connections)
879 list_changed = False
880
881 try:
882 if len(conn_list):
883 readable = select.select(conn_list,
884 [],
885 [],
886 THREAD_STOP_CHECK_INTERVAL)[0]
887 else:
888 time.sleep(THREAD_STOP_CHECK_INTERVAL)
889 readable = []
890
891 if not self.active:
892 break
893
894
895 for r in readable:
896 if __debug__:
897 self.log.debug('Restoring readable connection')
898
899 if IS_JYTHON:
900
901
902
903 r.setblocking(True)
904
905 r.start_time = time.time()
906 self.active_queue.put(r)
907
908 self.connections.remove(r)
909 list_changed = True
910
911 except:
912 if self.active:
913 raise
914 else:
915 break
916
917
918 if self.timeout:
919 now = time.time()
920 stale = set()
921 for c in self.connections:
922 if (now - c.start_time) >= self.timeout:
923 stale.add(c)
924
925 for c in stale:
926 if __debug__:
927
928 data = (c.client_addr, c.server_port, c.ssl and '*' or '')
929 self.log.debug('Flushing stale connection: %s:%i%s' % data)
930
931 self.connections.remove(c)
932 list_changed = True
933
934 try:
935 c.close()
936 finally:
937 del c
938
939
940 self._threadpool.dynamic_resize()
941
942
944 self.active = False
945
946 if __debug__:
947 self.log.debug('Flushing waiting connections')
948
949 while self.connections:
950 c = self.connections.pop()
951 try:
952 c.close()
953 finally:
954 del c
955
956 if __debug__:
957 self.log.debug('Flushing queued connections')
958
959 while not self.monitor_queue.empty():
960 c = self.monitor_queue.get()
961
962 if c is None:
963 continue
964
965 try:
966 c.close()
967 finally:
968 del c
969
970
971 self.monitor_queue.put(None)
972
973
974
975
976
977 import logging
978
979
980
981
982
983 log = logging.getLogger('Rocket.Errors.ThreadPool')
984 log.addHandler(NullHandler())
985
987 """The ThreadPool class is a container class for all the worker threads. It
988 manages the number of actively running threads."""
989
990 - def __init__(self,
991 method,
992 app_info,
993 active_queue,
994 monitor_queue,
995 min_threads=DEFAULTS['MIN_THREADS'],
996 max_threads=DEFAULTS['MAX_THREADS'],
997 ):
998
999 if __debug__:
1000 log.debug("Initializing ThreadPool.")
1001
1002 self.check_for_dead_threads = 0
1003 self.active_queue = active_queue
1004
1005 self.worker_class = method
1006 self.min_threads = min_threads
1007 self.max_threads = max_threads
1008 self.monitor_queue = monitor_queue
1009 self.stop_server = False
1010 self.alive = False
1011
1012
1013 self.grow_threshold = int(max_threads/10) + 2
1014
1015 if not isinstance(app_info, dict):
1016 app_info = dict()
1017
1018 if has_futures and app_info.get('futures'):
1019 app_info['executor'] = WSGIExecutor(max([DEFAULTS['MIN_THREADS'],
1020 2]))
1021
1022 app_info.update(max_threads=max_threads,
1023 min_threads=min_threads)
1024
1025 self.min_threads = min_threads
1026 self.app_info = app_info
1027
1028 self.threads = set()
1029
1031 self.stop_server = False
1032 if __debug__:
1033 log.debug("Starting threads.")
1034
1035 self.grow(self.min_threads)
1036
1037 self.alive = True
1038
1040 self.alive = False
1041
1042 if __debug__:
1043 log.debug("Stopping threads.")
1044
1045 self.stop_server = True
1046
1047
1048 self.shrink(len(self.threads))
1049
1050
1051 if has_futures and self.app_info.get('futures'):
1052 if __debug__:
1053 log.debug("Future executor is present. Python will not "
1054 "exit until all jobs have finished.")
1055 self.app_info['executor'].shutdown(wait=False)
1056
1057
1058
1059
1060
1061
1062
1063
1064 for t in self.threads:
1065 if t.isAlive():
1066 t.join()
1067
1068
1069 self.bring_out_your_dead()
1070
1072
1073
1074 dead_threads = [t for t in self.threads if not t.isAlive()]
1075 for t in dead_threads:
1076 if __debug__:
1077 log.debug("Removing dead thread: %s." % t.getName())
1078 try:
1079
1080 self.threads.remove(t)
1081 except:
1082 pass
1083 self.check_for_dead_threads -= len(dead_threads)
1084
1085 - def grow(self, amount=None):
1086 if self.stop_server:
1087 return
1088
1089 if not amount:
1090 amount = self.max_threads
1091
1092 if self.alive:
1093 amount = min([amount, self.max_threads - len(self.threads)])
1094
1095 if __debug__:
1096 log.debug("Growing by %i." % amount)
1097
1098 for x in range(amount):
1099 worker = self.worker_class(self.app_info,
1100 self.active_queue,
1101 self.monitor_queue)
1102
1103 worker.setDaemon(True)
1104 self.threads.add(worker)
1105 worker.start()
1106
1108 if __debug__:
1109 log.debug("Shrinking by %i." % amount)
1110
1111 self.check_for_dead_threads += amount
1112
1113 for x in range(amount):
1114 self.active_queue.put(None)
1115
1117 if (self.max_threads > self.min_threads or self.max_threads == 0):
1118 if self.check_for_dead_threads > 0:
1119 self.bring_out_your_dead()
1120
1121 queueSize = self.active_queue.qsize()
1122 threadCount = len(self.threads)
1123
1124 if __debug__:
1125 log.debug("Examining ThreadPool. %i threads and %i Q'd conxions"
1126 % (threadCount, queueSize))
1127
1128 if queueSize == 0 and threadCount > self.min_threads:
1129 self.shrink()
1130
1131 elif queueSize > self.grow_threshold:
1132
1133 self.grow(queueSize)
1134
1135
1136
1137
1138
1139 import re
1140 import sys
1141 import socket
1142 import logging
1143 import traceback
1144 from wsgiref.headers import Headers
1145 from threading import Thread
1146 from datetime import datetime
1147
1148 try:
1149 from urllib import unquote
1150 except ImportError:
1151 from urllib.parse import unquote
1152
1153 try:
1154 from io import StringIO
1155 except ImportError:
1156 try:
1157 from cStringIO import StringIO
1158 except ImportError:
1159 from StringIO import StringIO
1160
1161 try:
1162 from ssl import SSLError
1163 except ImportError:
1166
1167
1168
1169
1170
1171 re_SLASH = re.compile('%2F', re.IGNORECASE)
1172 re_REQUEST_LINE = re.compile(r"""^
1173 (?P<method>OPTIONS|GET|HEAD|POST|PUT|DELETE|TRACE|CONNECT) # Request Method
1174 \ # (single space)
1175 (
1176 (?P<scheme>[^:/]+) # Scheme
1177 (://) #
1178 (?P<host>[^/]+) # Host
1179 )? #
1180 (?P<path>(\*|/[^ \?]*)) # Path
1181 (\? (?P<query_string>[^ ]+))? # Query String
1182 \ # (single space)
1183 (?P<protocol>HTTPS?/1\.[01]) # Protocol
1184 $
1185 """, re.X)
1186 LOG_LINE = '%(client_ip)s - "%(request_line)s" - %(status)s %(size)s'
1187 RESPONSE = '''\
1188 HTTP/1.1 %s
1189 Content-Length: %i
1190 Content-Type: %s
1191
1192 %s
1193 '''
1194 if IS_JYTHON:
1195 HTTP_METHODS = set(['OPTIONS', 'GET', 'HEAD', 'POST', 'PUT', 'DELETE', 'TRACE', 'CONNECT'])
1196
1198 """The Worker class is a base class responsible for receiving connections
1199 and (a subclass) will run an application to process the the connection """
1200
1201 - def __init__(self,
1202 app_info,
1203 active_queue,
1204 monitor_queue,
1205 *args,
1206 **kwargs):
1207
1208 Thread.__init__(self, *args, **kwargs)
1209
1210
1211 self.app_info = app_info
1212 self.active_queue = active_queue
1213 self.monitor_queue = monitor_queue
1214
1215 self.size = 0
1216 self.status = "200 OK"
1217 self.closeConnection = True
1218 self.request_line = ""
1219
1220
1221 self.req_log = logging.getLogger('Rocket.Requests')
1222 self.req_log.addHandler(NullHandler())
1223
1224
1225 self.err_log = logging.getLogger('Rocket.Errors.'+self.getName())
1226 self.err_log.addHandler(NullHandler())
1227
1229 if typ == SSLError:
1230 if 'timed out' in val.args[0]:
1231 typ = SocketTimeout
1232 if typ == SocketTimeout:
1233 if __debug__:
1234 self.err_log.debug('Socket timed out')
1235 self.monitor_queue.put(self.conn)
1236 return True
1237 if typ == SocketClosed:
1238 self.closeConnection = True
1239 if __debug__:
1240 self.err_log.debug('Client closed socket')
1241 return False
1242 if typ == BadRequest:
1243 self.closeConnection = True
1244 if __debug__:
1245 self.err_log.debug('Client sent a bad request')
1246 return True
1247 if typ == socket.error:
1248 self.closeConnection = True
1249 if val.args[0] in IGNORE_ERRORS_ON_CLOSE:
1250 if __debug__:
1251 self.err_log.debug('Ignorable socket Error received...'
1252 'closing connection.')
1253 return False
1254 else:
1255 self.status = "999 Utter Server Failure"
1256 tb_fmt = traceback.format_exception(typ, val, tb)
1257 self.err_log.error('Unhandled Error when serving '
1258 'connection:\n' + '\n'.join(tb_fmt))
1259 return False
1260
1261 self.closeConnection = True
1262 tb_fmt = traceback.format_exception(typ, val, tb)
1263 self.err_log.error('\n'.join(tb_fmt))
1264 self.send_response('500 Server Error')
1265 return False
1266
1268 if __debug__:
1269 self.err_log.debug('Entering main loop.')
1270
1271
1272 while True:
1273 conn = self.active_queue.get()
1274
1275 if not conn:
1276
1277 if __debug__:
1278 self.err_log.debug('Received a death threat.')
1279 return conn
1280
1281 if isinstance(conn, tuple):
1282 conn = Connection(*conn)
1283
1284 self.conn = conn
1285
1286 if conn.ssl != conn.secure:
1287 self.err_log.info('Received HTTP connection on HTTPS port.')
1288 self.send_response('400 Bad Request')
1289 self.closeConnection = True
1290 conn.close()
1291 continue
1292 else:
1293 if __debug__:
1294 self.err_log.debug('Received a connection.')
1295 self.closeConnection = False
1296
1297
1298 while True:
1299 if __debug__:
1300 self.err_log.debug('Serving a request')
1301 try:
1302 self.run_app(conn)
1303 log_info = dict(client_ip = conn.client_addr,
1304 time = datetime.now().strftime('%c'),
1305 status = self.status.split(' ')[0],
1306 size = self.size,
1307 request_line = self.request_line)
1308 self.req_log.info(LOG_LINE % log_info)
1309 except:
1310 exc = sys.exc_info()
1311 handled = self._handleError(*exc)
1312 if handled:
1313 break
1314 else:
1315 if self.request_line:
1316 log_info = dict(client_ip = conn.client_addr,
1317 time = datetime.now().strftime('%c'),
1318 status = self.status.split(' ')[0],
1319 size = self.size,
1320 request_line = self.request_line + ' - not stopping')
1321 self.req_log.info(LOG_LINE % log_info)
1322
1323 if self.closeConnection:
1324 try:
1325 conn.close()
1326 except:
1327 self.err_log.error(str(traceback.format_exc()))
1328
1329 break
1330
1332
1333
1334 self.closeConnection = True
1335 raise NotImplementedError('Overload this method!')
1336
1338 stat_msg = status.split(' ', 1)[1]
1339 msg = RESPONSE % (status,
1340 len(stat_msg),
1341 'text/plain',
1342 stat_msg)
1343 try:
1344 self.conn.sendall(b(msg))
1345 except socket.error:
1346 self.closeConnection = True
1347 self.err_log.error('Tried to send "%s" to client but received socket'
1348 ' error' % status)
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1360 self.request_line = ''
1361 try:
1362
1363 d = sock_file.readline()
1364 if PY3K:
1365 d = d.decode('ISO-8859-1')
1366
1367 if d == '\r\n':
1368
1369 if __debug__:
1370 self.err_log.debug('Client sent newline')
1371
1372 d = sock_file.readline()
1373 if PY3K:
1374 d = d.decode('ISO-8859-1')
1375 except socket.timeout:
1376 raise SocketTimeout("Socket timed out before request.")
1377
1378 d = d.strip()
1379
1380 if not d:
1381 if __debug__:
1382 self.err_log.debug('Client did not send a recognizable request.')
1383 raise SocketClosed('Client closed socket.')
1384
1385 self.request_line = d
1386
1387
1388
1389
1390
1391
1392 if IS_JYTHON:
1393 return self._read_request_line_jython(d)
1394
1395 match = re_REQUEST_LINE.match(d)
1396
1397 if not match:
1398 self.send_response('400 Bad Request')
1399 raise BadRequest
1400
1401 req = match.groupdict()
1402 for k,v in req.items():
1403 if not v:
1404 req[k] = ""
1405 if k == 'path':
1406 req['path'] = r'%2F'.join([unquote(x) for x in re_SLASH.split(v)])
1407
1408 return req
1409
1411 d = d.strip()
1412 try:
1413 method, uri, proto = d.split(' ')
1414 if not proto.startswith('HTTP') or \
1415 proto[-3:] not in ('1.0', '1.1') or \
1416 method not in HTTP_METHODS:
1417 self.send_response('400 Bad Request')
1418 raise BadRequest
1419 except ValueError:
1420 self.send_response('400 Bad Request')
1421 raise BadRequest
1422
1423 req = dict(method=method, protocol = proto)
1424 scheme = ''
1425 host = ''
1426 if uri == '*' or uri.startswith('/'):
1427 path = uri
1428 elif '://' in uri:
1429 scheme, rest = uri.split('://')
1430 host, path = rest.split('/', 1)
1431 path = '/' + path
1432 else:
1433 self.send_response('400 Bad Request')
1434 raise BadRequest
1435
1436 query_string = ''
1437 if '?' in path:
1438 path, query_string = path.split('?', 1)
1439
1440 path = r'%2F'.join([unquote(x) for x in re_SLASH.split(path)])
1441
1442 req.update(path=path,
1443 query_string=query_string,
1444 scheme=scheme.lower(),
1445 host=host)
1446 return req
1447
1448
1450 try:
1451 headers = dict()
1452 l = sock_file.readline()
1453
1454 lname = None
1455 lval = None
1456 while True:
1457 if PY3K:
1458 try:
1459 l = str(l, 'ISO-8859-1')
1460 except UnicodeDecodeError:
1461 self.err_log.warning('Client sent invalid header: ' + repr(l))
1462
1463 if l == '\r\n':
1464 break
1465
1466 if l[0] in ' \t' and lname:
1467
1468 lval += ',' + l.strip()
1469 else:
1470
1471 l = l.split(':', 1)
1472
1473
1474 lname = l[0].strip().upper().replace('-', '_')
1475 lval = l[-1].strip()
1476 headers[str(lname)] = str(lval)
1477
1478 l = sock_file.readline()
1479 except socket.timeout:
1480 raise SocketTimeout("Socket timed out before request.")
1481
1482 return headers
1483
1485 "Exception for when a socket times out between requests."
1486 pass
1487
1489 "Exception for when a client sends an incomprehensible request."
1490 pass
1491
1493 "Exception for when a socket is closed by the client."
1494 pass
1495
1498 self.stream = sock_file
1499 self.chunk_size = 0
1500
1502 chunk_len = ""
1503 try:
1504 while "" == chunk_len:
1505 chunk_len = self.stream.readline().strip()
1506 return int(chunk_len, 16)
1507 except ValueError:
1508 return 0
1509
1510 - def read(self, size):
1511 data = b('')
1512 chunk_size = self.chunk_size
1513 while size:
1514 if not chunk_size:
1515 chunk_size = self._read_header()
1516
1517 if size < chunk_size:
1518 data += self.stream.read(size)
1519 chunk_size -= size
1520 break
1521 else:
1522 if not chunk_size:
1523 break
1524 data += self.stream.read(chunk_size)
1525 size -= chunk_size
1526 chunk_size = 0
1527
1528 self.chunk_size = chunk_size
1529 return data
1530
1532 data = b('')
1533 c = self.read(1)
1534 while c and c != b('\n'):
1535 data += c
1536 c = self.read(1)
1537 data += c
1538 return data
1539
1542
1549
1550
1551
1552
1553
1554
1555
1556
1557 import os
1558 import time
1559 import mimetypes
1560 from email.utils import formatdate
1561 from wsgiref.headers import Headers
1562 from wsgiref.util import FileWrapper
1563
1564
1565
1566
1567
1568 CHUNK_SIZE = 2**16
1569 HEADER_RESPONSE = '''HTTP/1.1 %s\r\n%s'''
1570 INDEX_HEADER = '''\
1571 <html>
1572 <head><title>Directory Index: %(path)s</title>
1573 <style> .parent { margin-bottom: 1em; }</style>
1574 </head>
1575 <body><h1>Directory Index: %(path)s</h1>
1576 <table>
1577 <tr><th>Directories</th></tr>
1578 '''
1579 INDEX_ROW = '''<tr><td><div class="%(cls)s"><a href="/%(link)s">%(name)s</a></div></td></tr>'''
1580 INDEX_FOOTER = '''</table></body></html>\r\n'''
1581
1583 - def __init__(self, limit=None, *args, **kwargs):
1584 self.limit = limit
1585 FileWrapper.__init__(self, *args, **kwargs)
1586
1587 - def read(self, amt):
1588 if amt > self.limit:
1589 amt = self.limit
1590 self.limit -= amt
1591 return FileWrapper.read(self, amt)
1592
1595 """Builds some instance variables that will last the life of the
1596 thread."""
1597
1598 Worker.__init__(self, *args, **kwargs)
1599
1600 self.root = os.path.abspath(self.app_info['document_root'])
1601 self.display_index = self.app_info['display_index']
1602
1604 filestat = os.stat(filepath)
1605 self.size = filestat.st_size
1606 modtime = time.strftime("%a, %d %b %Y %H:%M:%S GMT",
1607 time.gmtime(filestat.st_mtime))
1608 self.headers.add_header('Last-Modified', modtime)
1609 if headers.get('if_modified_since') == modtime:
1610
1611 self.status = "304 Not Modified"
1612 self.data = []
1613 return
1614
1615 ct = mimetypes.guess_type(filepath)[0]
1616 self.content_type = ct if ct else 'text/plain'
1617 try:
1618 f = open(filepath, 'rb')
1619 self.headers['Pragma'] = 'cache'
1620 self.headers['Cache-Control'] = 'private'
1621 self.headers['Content-Length'] = str(self.size)
1622 if self.etag:
1623 self.headers.add_header('Etag', self.etag)
1624 if self.expires:
1625 self.headers.add_header('Expires', self.expires)
1626
1627 try:
1628
1629 start, end = headers['range'].split('-')
1630 start = 0 if not start.isdigit() else int(start)
1631 end = self.size if not end.isdigit() else int(end)
1632 if self.size < end or start < 0:
1633 self.status = "214 Unsatisfiable Range Requested"
1634 self.data = FileWrapper(f, CHUNK_SIZE)
1635 else:
1636 f.seek(start)
1637 self.data = LimitingFileWrapper(f, CHUNK_SIZE, limit=end)
1638 self.status = "206 Partial Content"
1639 except:
1640 self.data = FileWrapper(f, CHUNK_SIZE)
1641 except IOError:
1642 self.status = "403 Forbidden"
1643
1645 def rel_path(path):
1646 return os.path.normpath(path[len(self.root):] if path.startswith(self.root) else path)
1647
1648 if not self.display_index:
1649 self.status = '404 File Not Found'
1650 return b('')
1651 else:
1652 self.content_type = 'text/html'
1653
1654 dir_contents = [os.path.join(pth, x) for x in os.listdir(os.path.normpath(pth))]
1655 dir_contents.sort()
1656
1657 dirs = [rel_path(x)+'/' for x in dir_contents if os.path.isdir(x)]
1658 files = [rel_path(x) for x in dir_contents if os.path.isfile(x)]
1659
1660 self.data = [INDEX_HEADER % dict(path='/'+rpth)]
1661 if rpth:
1662 self.data += [INDEX_ROW % dict(name='(parent directory)', cls='dir parent', link='/'.join(rpth[:-1].split('/')[:-1]))]
1663 self.data += [INDEX_ROW % dict(name=os.path.basename(x[:-1]), link=os.path.join(rpth, os.path.basename(x[:-1])).replace('\\', '/'), cls='dir') for x in dirs]
1664 self.data += ['<tr><th>Files</th></tr>']
1665 self.data += [INDEX_ROW % dict(name=os.path.basename(x), link=os.path.join(rpth, os.path.basename(x)).replace('\\', '/'), cls='file') for x in files]
1666 self.data += [INDEX_FOOTER]
1667 self.headers['Content-Length'] = self.size = str(sum([len(x) for x in self.data]))
1668 self.status = '200 OK'
1669
1671 self.status = "200 OK"
1672 self.size = 0
1673 self.expires = None
1674 self.etag = None
1675 self.content_type = 'text/plain'
1676 self.content_length = None
1677
1678 if __debug__:
1679 self.err_log.debug('Getting sock_file')
1680
1681
1682 sock_file = conn.makefile('rb',BUF_SIZE)
1683 request = self.read_request_line(sock_file)
1684 if request['method'].upper() not in ('GET', ):
1685 self.status = "501 Not Implemented"
1686
1687 try:
1688
1689 headers = dict([(str(k.lower()), v) for k, v in self.read_headers(sock_file).items()])
1690 rpath = request.get('path', '').lstrip('/')
1691 filepath = os.path.join(self.root, rpath)
1692 filepath = os.path.abspath(filepath)
1693 if __debug__:
1694 self.err_log.debug('Request for path: %s' % filepath)
1695
1696 self.closeConnection = headers.get('connection', 'close').lower() == 'close'
1697 self.headers = Headers([('Date', formatdate(usegmt=True)),
1698 ('Server', HTTP_SERVER_SOFTWARE),
1699 ('Connection', headers.get('connection', 'close')),
1700 ])
1701
1702 if not filepath.lower().startswith(self.root.lower()):
1703
1704 self.status = "400 Bad Request"
1705 self.closeConnection = True
1706 elif not os.path.exists(filepath):
1707 self.status = "404 File Not Found"
1708 self.closeConnection = True
1709 elif os.path.isdir(filepath):
1710 self.serve_dir(filepath, rpath)
1711 elif os.path.isfile(filepath):
1712 self.serve_file(filepath, headers)
1713 else:
1714
1715
1716 self.status = "501 Not Implemented"
1717 self.closeConnection = True
1718
1719 h = self.headers
1720 statcode, statstr = self.status.split(' ', 1)
1721 statcode = int(statcode)
1722 if statcode >= 400:
1723 h.add_header('Content-Type', self.content_type)
1724 self.data = [statstr]
1725
1726
1727 header_data = HEADER_RESPONSE % (self.status, str(h))
1728
1729
1730 if __debug__:
1731 self.err_log.debug('Sending Headers: %s' % repr(header_data))
1732 self.conn.sendall(b(header_data))
1733
1734 for data in self.data:
1735 self.conn.sendall(b(data))
1736
1737 if hasattr(self.data, 'close'):
1738 self.data.close()
1739
1740 finally:
1741 if __debug__:
1742 self.err_log.debug('Finally closing sock_file')
1743 sock_file.close()
1744
1745
1746
1747
1748
1749 import sys
1750 import socket
1751 from wsgiref.headers import Headers
1752 from wsgiref.util import FileWrapper
1753
1754
1755
1756
1757
1758
1759 if PY3K:
1760 from email.utils import formatdate
1761 else:
1762
1763 from email.Utils import formatdate
1764
1765
1766 NEWLINE = b('\r\n')
1767 HEADER_RESPONSE = '''HTTP/1.1 %s\r\n%s'''
1768 BASE_ENV = {'SERVER_NAME': SERVER_NAME,
1769 'SCRIPT_NAME': '',
1770 'wsgi.errors': sys.stderr,
1771 'wsgi.version': (1, 0),
1772 'wsgi.multiprocess': False,
1773 'wsgi.run_once': False,
1774 'wsgi.file_wrapper': FileWrapper
1775 }
1776
1779 """Builds some instance variables that will last the life of the
1780 thread."""
1781 Worker.__init__(self, *args, **kwargs)
1782
1783 if isinstance(self.app_info, dict):
1784 multithreaded = self.app_info.get('max_threads') != 1
1785 else:
1786 multithreaded = False
1787 self.base_environ = dict({'SERVER_SOFTWARE': self.app_info['server_software'],
1788 'wsgi.multithread': multithreaded,
1789 })
1790 self.base_environ.update(BASE_ENV)
1791
1792
1793 self.app = self.app_info.get('wsgi_app')
1794
1795 if not hasattr(self.app, "__call__"):
1796 raise TypeError("The wsgi_app specified (%s) is not a valid WSGI application." % repr(self.app))
1797
1798
1799 if has_futures and self.app_info.get('futures'):
1800 executor = self.app_info['executor']
1801 self.base_environ.update({"wsgiorg.executor": executor,
1802 "wsgiorg.futures": executor.futures})
1803
1805 """ Build the execution environment. """
1806
1807 request = self.read_request_line(sock_file)
1808
1809
1810 environ = self.base_environ.copy()
1811
1812
1813 for k, v in self.read_headers(sock_file).items():
1814 environ[str('HTTP_'+k)] = v
1815
1816
1817 environ['REQUEST_METHOD'] = request['method']
1818 environ['PATH_INFO'] = request['path']
1819 environ['SERVER_PROTOCOL'] = request['protocol']
1820 environ['SERVER_PORT'] = str(conn.server_port)
1821 environ['REMOTE_PORT'] = str(conn.client_port)
1822 environ['REMOTE_ADDR'] = str(conn.client_addr)
1823 environ['QUERY_STRING'] = request['query_string']
1824 if 'HTTP_CONTENT_LENGTH' in environ:
1825 environ['CONTENT_LENGTH'] = environ['HTTP_CONTENT_LENGTH']
1826 if 'HTTP_CONTENT_TYPE' in environ:
1827 environ['CONTENT_TYPE'] = environ['HTTP_CONTENT_TYPE']
1828
1829
1830 self.request_method = environ['REQUEST_METHOD']
1831
1832
1833 if conn.ssl:
1834 environ['wsgi.url_scheme'] = 'https'
1835 environ['HTTPS'] = 'on'
1836 else:
1837 environ['wsgi.url_scheme'] = 'http'
1838
1839 if conn.ssl:
1840 try:
1841 peercert = conn.socket.getpeercert(binary_form=True)
1842 environ['SSL_CLIENT_RAW_CERT'] = \
1843 peercert and ssl.DER_cert_to_PEM_cert(peercert)
1844 except Exception,e:
1845 print e
1846
1847 if environ.get('HTTP_TRANSFER_ENCODING', '') == 'chunked':
1848 environ['wsgi.input'] = ChunkedReader(sock_file)
1849 else:
1850 environ['wsgi.input'] = sock_file
1851
1852 return environ
1853
1855 h_set = self.header_set
1856
1857
1858 self.chunked = h_set.get('transfer-encoding', '').lower() == 'chunked'
1859
1860
1861 if not 'date' in h_set:
1862 h_set['Date'] = formatdate(usegmt=True)
1863
1864
1865 if not 'server' in h_set:
1866 h_set['Server'] = HTTP_SERVER_SOFTWARE
1867
1868 if 'content-length' in h_set:
1869 self.size = int(h_set['content-length'])
1870 else:
1871 s = int(self.status.split(' ')[0])
1872 if s < 200 or s not in (204, 205, 304):
1873 if not self.chunked:
1874 if sections == 1:
1875
1876 h_set['Content-Length'] = str(len(data))
1877 self.size = len(data)
1878 else:
1879
1880 h_set['Transfer-Encoding'] = 'Chunked'
1881 self.chunked = True
1882 if __debug__:
1883 self.err_log.debug('Adding header...'
1884 'Transfer-Encoding: Chunked')
1885
1886 if 'connection' not in h_set:
1887
1888 client_conn = self.environ.get('HTTP_CONNECTION', '').lower()
1889 if self.environ['SERVER_PROTOCOL'] == 'HTTP/1.1':
1890
1891 if client_conn:
1892 h_set['Connection'] = client_conn
1893 else:
1894 h_set['Connection'] = 'keep-alive'
1895 else:
1896
1897 h_set['Connection'] = 'close'
1898
1899
1900 self.closeConnection = h_set.get('connection', '').lower() == 'close'
1901
1902
1903 header_data = HEADER_RESPONSE % (self.status, str(h_set))
1904
1905
1906 if __debug__:
1907 self.err_log.debug('Sending Headers: %s' % repr(header_data))
1908 self.conn.sendall(b(header_data))
1909 self.headers_sent = True
1910
1912 self.err_log.warning('WSGI app called write method directly. This is '
1913 'deprecated behavior. Please update your app.')
1914 return self.write(data, sections)
1915
1916 - def write(self, data, sections=None):
1917 """ Write the data to the output socket. """
1918
1919 if self.error[0]:
1920 self.status = self.error[0]
1921 data = b(self.error[1])
1922
1923 if not self.headers_sent:
1924 self.send_headers(data, sections)
1925
1926 if self.request_method != 'HEAD':
1927 try:
1928 if self.chunked:
1929 self.conn.sendall(b('%x\r\n%s\r\n' % (len(data), data)))
1930 else:
1931 self.conn.sendall(data)
1932 except socket.error:
1933
1934
1935 self.closeConnection = True
1936
1938 """ Store the HTTP status and headers to be sent when self.write is
1939 called. """
1940 if exc_info:
1941 try:
1942 if self.headers_sent:
1943
1944
1945 raise
1946 finally:
1947 exc_info = None
1948 elif self.header_set:
1949 raise AssertionError("Headers already set!")
1950
1951 if PY3K and not isinstance(status, str):
1952 self.status = str(status, 'ISO-8859-1')
1953 else:
1954 self.status = status
1955
1956 try:
1957 self.header_set = Headers(response_headers)
1958 except UnicodeDecodeError:
1959 self.error = ('500 Internal Server Error',
1960 'HTTP Headers should be bytes')
1961 self.err_log.error('Received HTTP Headers from client that contain'
1962 ' invalid characters for Latin-1 encoding.')
1963
1964 return self.write_warning
1965
1967 self.size = 0
1968 self.header_set = Headers([])
1969 self.headers_sent = False
1970 self.error = (None, None)
1971 self.chunked = False
1972 sections = None
1973 output = None
1974
1975 if __debug__:
1976 self.err_log.debug('Getting sock_file')
1977
1978
1979 if PY3K:
1980 sock_file = conn.makefile(mode='rb', buffering=BUF_SIZE)
1981 else:
1982 sock_file = conn.makefile(BUF_SIZE)
1983
1984 try:
1985
1986 self.environ = environ = self.build_environ(sock_file, conn)
1987
1988
1989 if environ.get('HTTP_EXPECT', '') == '100-continue':
1990 res = environ['SERVER_PROTOCOL'] + ' 100 Continue\r\n\r\n'
1991 conn.sendall(b(res))
1992
1993
1994 output = self.app(environ, self.start_response)
1995
1996 if not hasattr(output, '__len__') and not hasattr(output, '__iter__'):
1997 self.error = ('500 Internal Server Error',
1998 'WSGI applications must return a list or '
1999 'generator type.')
2000
2001 if hasattr(output, '__len__'):
2002 sections = len(output)
2003
2004 for data in output:
2005
2006 if data:
2007 self.write(data, sections)
2008
2009 if self.chunked:
2010
2011 self.conn.sendall(b('0\r\n\r\n'))
2012 elif not self.headers_sent:
2013
2014 self.send_headers('', sections)
2015
2016
2017
2018 finally:
2019 if __debug__:
2020 self.err_log.debug('Finally closing output and sock_file')
2021
2022 if hasattr(output,'close'):
2023 output.close()
2024
2025 sock_file.close()
2026
2027
2028
2029
2030
2031
2032
2034 global static_folder
2035 import os
2036 types = {'htm': 'text/html','html': 'text/html','gif': 'image/gif',
2037 'jpg': 'image/jpeg','png': 'image/png','pdf': 'applications/pdf'}
2038 if static_folder:
2039 if not static_folder.startswith('/'):
2040 static_folder = os.path.join(os.getcwd(),static_folder)
2041 path = os.path.join(static_folder, environ['PATH_INFO'][1:] or 'index.html')
2042 type = types.get(path.split('.')[-1],'text')
2043 if os.path.exists(path):
2044 try:
2045 data = open(path,'rb').read()
2046 start_response('200 OK', [('Content-Type', type)])
2047 except IOError:
2048 start_response('404 NOT FOUND', [])
2049 data = '404 NOT FOUND'
2050 else:
2051 start_response('500 INTERNAL SERVER ERROR', [])
2052 data = '500 INTERNAL SERVER ERROR'
2053 else:
2054 start_response('200 OK', [('Content-Type', 'text/html')])
2055 data = '<html><body><h1>Hello from Rocket Web Server</h1></body></html>'
2056 return [data]
2057
2059 from optparse import OptionParser
2060 parser = OptionParser()
2061 parser.add_option("-i", "--ip", dest="ip",default="127.0.0.1",
2062 help="ip address of the network interface")
2063 parser.add_option("-p", "--port", dest="port",default="8000",
2064 help="post where to run web server")
2065 parser.add_option("-s", "--static", dest="static",default=None,
2066 help="folder containing static files")
2067 (options, args) = parser.parse_args()
2068 global static_folder
2069 static_folder = options.static
2070 print 'Rocket running on %s:%s' % (options.ip, options.port)
2071 r=Rocket((options.ip,int(options.port)),'wsgi', {'wsgi_app':demo_app})
2072 r.start()
2073
2074 if __name__=='__main__':
2075 demo()
2076