1# http://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
5"""
6Simple async HTTP server
8Based on this recipe:
10 http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/440665
12which is based on this one:
14 http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/259148
15"""
16from __future__ import absolute_import
18import asynchat
19import asyncore
20import socket
21try:
22 from http.server import SimpleHTTPRequestHandler as SHRH
23 from html import escape
24except ImportError:
25 from SimpleHTTPServer import SimpleHTTPRequestHandler as SHRH
26 from cgi import escape
28import sys
30import json
31import cgi
32from io import BytesIO
33import io
34import os
35import traceback
36import posixpath
37import re
38from collections import deque
39import logging
41import matplotlib
42matplotlib.use('Agg') # noqa
44import matplotlib.pyplot as plt # noqa
45from pyrocko.plot import cake_plot # noqa
46from pyrocko import gf, util # noqa
47from pyrocko.util import quote, unquote # noqa
49try:
50 newstr = unicode
51except NameError:
52 newstr = str
54logger = logging.getLogger('pyrocko.gf.server')
56__version__ = '1.0'
58store_id_pattern = gf.StringID.pattern[1:-1]
61def enc(s):
62 try:
63 return s.encode('utf-8')
64 except Exception:
65 return s
68def popall(self):
69 # Preallocate the list to save memory resizing.
70 r = len(self)*[None]
71 for i in range(len(r)):
72 r[i] = self.popleft()
73 return r
76class writewrapper(object):
77 def __init__(self, d, blocksize=4096):
78 self.blocksize = blocksize
79 self.d = d
81 def write(self, data):
82 if self.blocksize in (None, -1):
83 self.d.append(data)
84 else:
85 BS = self.blocksize
86 xtra = 0
87 if len(data) % BS:
88 xtra = len(data) % BS + BS
89 buf = self.d
90 for i in range(0, len(data)-xtra, BS):
91 buf.append(data[i:i+BS])
92 if xtra:
93 buf.append(data[-xtra:])
96class RequestHandler(asynchat.async_chat, SHRH):
98 server_version = 'Seismosizer/'+__version__
99 protocol_version = 'HTTP/1.1'
100 blocksize = 4096
102 # In enabling the use of buffer objects by setting use_buffer to True,
103 # any data block sent will remain in memory until it has actually been
104 # sent.
105 use_buffer = False
107 def __init__(self, conn, addr, server):
108 asynchat.async_chat.__init__(self, conn)
109 self.client_address = addr
110 self.connection = conn
111 self.server = server
112 self.opened = []
113 # set the terminator : when it is received, this means that the
114 # http request is complete ; control will be passed to
115 # self.found_terminator
116 self.set_terminator(b'\r\n\r\n')
117 self.incoming = deque()
118 self.outgoing = deque()
119 self.rfile = None
120 self.wfile = writewrapper(
121 self.outgoing,
122 -self.use_buffer or self.blocksize)
123 self.found_terminator = self.handle_request_line
124 self.request_version = "HTTP/1.1"
125 self.code = None
126 # buffer the response and headers to avoid several calls to select()
128 def update_b(self, fsize):
129 if fsize > 1048576:
130 self.use_buffer = True
131 self.blocksize = 131072
133 def collect_incoming_data(self, data):
134 """Collect the data arriving on the connexion"""
135 if not data:
136 self.ac_in_buffer = ""
137 return
138 self.incoming.append(data)
140 def prepare_POST(self):
141 """Prepare to read the request body"""
142 try:
143 bytesToRead = int(self.headers.getheader('Content-length'))
144 except AttributeError:
145 bytesToRead = int(self.headers['Content-length'])
146 # set terminator to length (will read bytesToRead bytes)
147 self.set_terminator(bytesToRead)
148 self.incoming.clear()
149 # control will be passed to a new found_terminator
150 self.found_terminator = self.handle_post_data
152 def handle_post_data(self):
153 """Called when a POST request body has been read"""
154 self.rfile = BytesIO(b''.join(popall(self.incoming)))
155 self.rfile.seek(0)
156 self.do_POST()
158 def parse_request_url(self):
159 # Check for query string in URL
160 qspos = self.path.find('?')
161 if qspos >= 0:
162 self.body = cgi.parse_qs(self.path[qspos+1:], keep_blank_values=1)
163 self.path = self.path[:qspos]
164 else:
165 self.body = {}
167 def do_HEAD(self):
168 """Begins serving a HEAD request"""
169 self.parse_request_url()
170 f = self.send_head()
171 if f:
172 f.close()
173 self.log_request(self.code)
175 def do_GET(self):
176 """Begins serving a GET request"""
177 self.parse_request_url()
178 self.handle_data()
180 def do_POST(self):
181 """Begins serving a POST request. The request data must be readable
182 on a file-like object called self.rfile"""
183 try:
184 data = cgi.parse_header(self.headers.getheader('content-type'))
185 length = int(self.headers.getheader('content-length', 0))
186 except AttributeError:
187 data = cgi.parse_header(self.headers.get('content-type'))
188 length = int(self.headers.get('content-length', 0))
190 ctype, pdict = data if data else (None, None)
192 if ctype == 'multipart/form-data':
193 self.body = cgi.parse_multipart(self.rfile, pdict)
194 elif ctype == 'application/x-www-form-urlencoded':
195 qs = self.rfile.read(length)
196 self.body = cgi.parse_qs(qs, keep_blank_values=1)
197 else:
198 self.body = {}
199 # self.handle_post_body()
200 self.handle_data()
202 def handle_close(self):
203 for f in self.opened:
204 if not f.closed:
205 f.close()
206 asynchat.async_chat.handle_close(self)
208 def handle_data(self):
209 """Class to override"""
211 f = self.send_head()
212 if f:
213 # do some special things with file objects so that we don't have
214 # to read them all into memory at the same time...may leave a
215 # file handle open for longer than is really desired, but it does
216 # make it able to handle files of unlimited size.
217 try:
218 size = sys.getsizeof(f)
219 except (AttributeError, io.UnsupportedOperation):
220 size = len(f.getvalue())
222 self.update_b(size)
223 self.log_request(self.code, size)
224 self.outgoing.append(f)
225 else:
226 self.log_request(self.code)
227 # signal the end of this request
228 self.outgoing.append(None)
230 def handle_request_line(self):
231 """Called when the http request line and headers have been received"""
232 # prepare attributes needed in parse_request()
233 self.rfile = BytesIO(b''.join(popall(self.incoming)))
234 self.rfile.seek(0)
235 self.raw_requestline = self.rfile.readline()
236 self.parse_request()
238 if self.command in ['GET', 'HEAD']:
239 # if method is GET or HEAD, call do_GET or do_HEAD and finish
240 method = "do_"+self.command
241 if hasattr(self, method):
242 getattr(self, method)()
243 elif self.command == "POST":
244 # if method is POST, call prepare_POST, don't finish yet
245 self.prepare_POST()
246 else:
247 self.send_error(501, "Unsupported method (%s)" % self.command)
249 def handle_error(self):
250 try:
251 traceback.print_exc(sys.stderr)
252 except Exception:
253 logger.error(
254 'An error occurred and another one while printing the '
255 'traceback. Please debug me...')
257 self.close()
259 def writable(self):
260 return len(self.outgoing) and self.connected
262 def handle_write(self):
263 out = self.outgoing
264 while len(out):
265 a = out.popleft()
267 a = enc(a)
268 # handle end of request disconnection
269 if a is None:
270 # Some clients have issues with keep-alive connections, or
271 # perhaps I implemented them wrong.
273 # If the user is running a Python version < 2.4.1, there is a
274 # bug with SimpleHTTPServer:
275 # http://python.org/sf/1097597
276 # So we should be closing anyways, even though the client will
277 # claim a partial download, so as to prevent hung-connections.
278 # if self.close_connection:
279 self.close()
280 return
282 # handle file objects
283 elif hasattr(a, 'read'):
284 _a, a = a, a.read(self.blocksize)
285 if not len(a):
286 _a.close()
287 del _a
288 continue
289 else:
290 out.appendleft(_a) # noqa
291 break
293 # handle string/buffer objects
294 elif len(a):
295 break
296 else:
297 # if we get here, the outgoing deque is empty
298 return
300 # if we get here, 'a' is a string or buffer object of length > 0
301 try:
302 num_sent = self.send(a)
303 if num_sent < len(a):
304 if not num_sent:
305 # this is probably overkill, but it can save the
306 # allocations of buffers when they are enabled
307 out.appendleft(a)
308 elif self.use_buffer:
309 out.appendleft(buffer(a, num_sent)) # noqa
310 else:
311 out.appendleft(a[num_sent:])
313 except socket.error as why:
314 if isinstance(why, newstr):
315 self.log_error(why)
316 elif isinstance(why, tuple) and isinstance(why[-1], newstr):
317 self.log_error(why[-1])
318 else:
319 self.log_error(str(why))
320 self.handle_error()
322 def log(self, message):
323 self.log_info(message)
325 def log_info(self, message, type='info'):
326 {
327 'debug': logger.debug,
328 'info': logger.info,
329 'warning': logger.warning,
330 'error': logger.error
331 }.get(type, logger.info)(str(message))
333 def log_message(self, format, *args):
334 self.log_info("%s - - [%s] %s \"%s\" \"%s\"\n" % (
335 self.address_string(),
336 self.log_date_time_string(),
337 format % args,
338 self.headers.get('referer', ''),
339 self.headers.get('user-agent', '')))
341 def listdir(self, path):
342 return os.listdir(path)
344 def list_directory(self, path):
345 """Helper to produce a directory listing (absent index.html).
347 Return value is either a file object, or None (indicating an
348 error). In either case, the headers are sent, making the
349 interface the same as for send_head().
351 """
352 try:
353 list = self.listdir(path)
354 except os.error:
355 self.send_error(404, "No permission to list directory")
356 return None
358 list.sort(key=lambda a: a.lower())
359 f = BytesIO()
360 displaypath = escape(unquote(self.path))
361 f.write(enc('<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">'))
362 f.write(enc("<html>\n<title>Directory listing for %s</title>\n"
363 % displaypath))
364 f.write(
365 enc("<body>\n<h2>Directory listing for %s</h2>\n" % displaypath))
366 f.write(enc("<hr>\n<ul>\n"))
367 for name in list:
368 fullname = os.path.join(path, name)
369 displayname = linkname = name
370 # Append / for directories or @ for symbolic links
371 if os.path.isdir(fullname):
372 displayname = name + "/"
373 linkname = name + "/"
374 if os.path.islink(fullname):
375 displayname = name + "@"
376 # Note: a link to a directory displays with @ and links with /
377 f.write(enc('<li><a href="%s">%s</a>\n' %
378 (quote(linkname),
379 escape(displayname))))
380 f.write(enc("</ul>\n<hr>\n</body>\n</html>\n"))
381 length = f.tell()
382 f.seek(0)
383 encoding = sys.getfilesystemencoding()
385 self.send_response(200, 'OK')
386 self.send_header("Content-Length", str(length))
387 self.send_header("Content-Type", "text/html; charset=%s" % encoding)
388 self.end_headers()
390 return f
392 def redirect(self, path):
393 self.send_response(301)
394 self.send_header("Location", path)
395 self.end_headers()
397 def send_head(self):
398 """Common code for GET and HEAD commands.
400 This sends the response code and MIME headers.
402 Return value is either a file object (which has to be copied
403 to the outputfile by the caller unless the command was HEAD,
404 and must be closed by the caller under all circumstances), or
405 None, in which case the caller has nothing further to do.
407 """
408 path = self.translate_path(self.path)
409 if path is None:
410 self.send_error(404, "File not found")
411 return None
413 f = None
414 if os.path.isdir(path):
415 if not self.path.endswith('/'):
416 # redirect browser - doing basically what apache does
417 return self.redirect(self.path + '/')
418 else:
419 return self.list_directory(path)
421 ctype = self.guess_type(path)
422 try:
423 # Always read in binary mode. Opening files in text mode may cause
424 # newline translations, making the actual size of the content
425 # transmitted *less* than the content-length!
426 f = open(path, 'rb')
427 self.opened.append(f)
428 except IOError:
429 self.send_error(404, "File not found")
430 return None
431 fs = os.fstat(f.fileno())
432 self.send_response(200, "OK")
433 self.send_header("Last-Modified", self.date_time_string(fs.st_mtime))
434 self.send_header("Content-Length", str(fs[6]))
435 self.send_header("Content-Type", ctype)
436 self.send_header("Content-Disposition", "attachment")
437 self.end_headers()
438 return f
441class SeismosizerHandler(RequestHandler):
443 stores_path = '/gfws/static/stores/'
444 api_path = '/gfws/api/'
445 process_path = '/gfws/seismosizer/1/query'
447 def send_head(self):
448 S = self.stores_path
449 P = self.process_path
450 A = self.api_path
451 for x in (S,):
452 if re.match(r'^' + x[:-1] + '$', self.path):
453 return self.redirect(x)
455 if re.match(r'^' + S + store_id_pattern, self.path):
456 return RequestHandler.send_head(self)
458 elif re.match(r'^' + S + '$', self.path):
459 return self.list_stores()
461 elif re.match(r'^' + A + '$', self.path):
462 return self.list_stores_json()
464 elif re.match(r'^' + A + store_id_pattern + '$', self.path):
465 return self.get_store_config()
467 elif re.match(r'^' + A + store_id_pattern + '/profile$', self.path):
468 return self.get_store_velocity_profile()
470 elif re.match(r'^' + P + '$', self.path):
471 return self.process()
473 else:
474 self.send_error(404, "File not found")
475 self.end_headers()
476 return None
478 def translate_path(self, path):
479 path = path.split('?', 1)[0]
480 path = path.split('#', 1)[0]
481 path = posixpath.normpath(unquote(path))
482 words = path.split('/')
483 words = [_f for _f in words if _f]
485 path = '/'
486 if words[:3] == self.stores_path.split('/')[1:-1] and len(words) > 3:
487 engine = self.server.engine
488 if words[3] not in engine.get_store_ids():
489 return None
490 else:
491 path = engine.get_store_dir(words[3])
492 words = words[4:]
493 else:
494 return None
496 for word in words:
497 drive, word = os.path.splitdrive(word)
498 head, word = os.path.split(word)
499 if word in (os.curdir, os.pardir):
500 continue
501 path = os.path.join(path, word)
503 return path
505 def listdir(self, path):
506 if path == self.stores_path:
507 return list(self.server.engine.get_store_ids())
508 else:
509 return RequestHandler.listdir(self, path)
511 def list_stores(self):
512 '''Create listing of stores.'''
513 from jinja2 import Template
515 engine = self.server.engine
517 store_ids = list(engine.get_store_ids())
518 store_ids.sort(key=lambda x: x.lower())
520 stores = [engine.get_store(store_id) for store_id in store_ids]
522 templates = {
523 'html': Template('''
524<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">
525<html>
526<title>{{ title }}</title>
527<body>
528<h2>{{ title }}</h2>
529<hr>
530<table>
531 <tr>
532 <th style="text-align:left">Store ID</th>
533 <th style="text-align:center">Type</th>
534 <th style="text-align:center">Extent</th>
535 <th style="text-align:center">Sample-rate</th>
536 <th style="text-align:center">Size (index + traces)</th>
537 </tr>
538{% for store in stores %}
539 <tr>
540 <td><a href="{{ store.config.id }}/">{{ store.config.id|e }}/</a></td>
541 <td style="text-align:center">{{ store.config.short_type }}</td>
542 <td style="text-align:right">{{ store.config.short_extent }} km</td>
543 <td style="text-align:right">{{ store.config.sample_rate }} Hz</td>
544 <td style="text-align:right">{{ store.size_index_and_data_human }}</td>
545 </tr>
546{% endfor %}
547</table>
548</hr>
549</body>
550</html>
551'''.lstrip()),
552 'text': Template('''
553{% for store in stores %}{#
554#}{{ store.config.id.ljust(25) }} {#
555#}{{ store.config.short_type.center(5) }} {#
556#}{{ store.config.short_extent.rjust(30) }} km {#
557#}{{ "%10.2g"|format(store.config.sample_rate) }} Hz {#
558#}{{ store.size_index_and_data_human.rjust(8) }}
559{% endfor %}'''.lstrip())}
561 format = self.body.get('format', ['html'])[0]
562 if format not in ('html', 'text'):
563 format = 'html'
565 title = "Green's function stores listing"
566 s = templates[format].render(stores=stores, title=title).encode('utf8')
567 length = len(s)
568 f = BytesIO(s)
569 self.send_response(200, 'OK')
570 self.send_header("Content-Type", "text/html; charset=utf-8")
571 self.send_header("Content-Length", str(length))
572 self.end_headers()
573 return f
575 def list_stores_json(self):
576 engine = self.server.engine
578 store_ids = list(engine.get_store_ids())
579 store_ids.sort(key=lambda x: x.lower())
581 def get_store_dict(store):
582 store.ensure_reference()
584 return {
585 'id': store.config.id,
586 'short_type': store.config.short_type,
587 'modelling_code_id': store.config.modelling_code_id,
588 'source_depth_min': store.config.source_depth_min,
589 'source_depth_max': store.config.source_depth_max,
590 'source_depth_delta': store.config.source_depth_delta,
591 'distance_min': store.config.distance_min,
592 'distance_max': store.config.distance_max,
593 'distance_delta': store.config.distance_delta,
594 'sample_rate': store.config.sample_rate,
595 'size': store.size_index_and_data,
596 'uuid': store.config.uuid,
597 'reference': store.config.reference
598 }
600 stores = {
601 'stores': [get_store_dict(engine.get_store(store_id))
602 for store_id in store_ids]
603 }
605 s = json.dumps(stores)
606 length = len(s)
607 f = BytesIO(s.encode('ascii'))
608 self.send_response(200, 'OK')
609 self.send_header("Content-Type", "text/html; charset=utf-8")
610 self.send_header("Content-Length", str(length))
611 self.send_header("Access-Control-Allow-Origin", '*')
612 self.end_headers()
614 return f
616 def get_store_config(self):
617 engine = self.server.engine
619 store_ids = list(engine.get_store_ids())
620 store_ids.sort(key=lambda x: x.lower())
622 for match in re.finditer(r'/gfws/api/(' + store_id_pattern + ')',
623 self.path):
624 store_id = match.groups()[0]
626 try:
627 store = engine.get_store(store_id)
628 except Exception:
629 self.send_error(404)
630 self.end_headers()
631 return
633 data = {}
634 data['id'] = store_id
635 data['config'] = str(store.config)
637 s = json.dumps(data)
638 length = len(s)
639 f = BytesIO(s.encode('ascii'))
640 self.send_response(200, 'OK')
641 self.send_header("Content-Type", "text/html; charset=utf-8")
642 self.send_header("Content-Length", str(length))
643 self.send_header("Access-Control-Allow-Origin", '*')
644 self.end_headers()
646 return f
648 def get_store_velocity_profile(self):
649 engine = self.server.engine
651 fig = plt.figure()
652 axes = fig.gca()
654 store_ids = list(engine.get_store_ids())
655 store_ids.sort(key=lambda x: x.lower())
657 for match in re.finditer(
658 r'/gfws/api/(' + store_id_pattern + ')/profile', self.path):
659 store_id = match.groups()[0]
661 try:
662 store = engine.get_store(store_id)
663 except Exception:
664 self.send_error(404)
665 self.end_headers()
666 return
668 if store.config.earthmodel_1d is None:
669 self.send_error(404)
670 self.end_headers()
671 return
673 cake_plot.my_model_plot(store.config.earthmodel_1d, axes=axes)
675 f = BytesIO()
676 fig.savefig(f, format='png')
678 length = f.tell()
679 self.send_response(200, 'OK')
680 self.send_header("Content-Type", "image/png;")
681 self.send_header("Content-Length", str(length))
682 self.send_header("Access-Control-Allow-Origin", '*')
683 self.end_headers()
685 f.seek(0)
686 return f.read()
688 def process(self):
690 request = gf.load(string=self.body['request'][0])
691 try:
692 resp = self.server.engine.process(request=request)
693 except (gf.BadRequest, gf.StoreError) as e:
694 self.send_error(400, str(e))
695 return
697 f = BytesIO()
698 resp.dump(stream=f)
699 length = f.tell()
701 f.seek(0)
703 self.send_response(200, 'OK')
704 self.send_header("Content-Type", "text/html; charset=utf-8")
705 self.send_header("Content-Length", str(length))
706 self.end_headers()
707 return f
709 def guess_type(self, path):
710 bn = os.path.basename
711 dn = os.path.dirname
712 if bn(path) == 'config':
713 return 'text/plain'
715 if bn(dn(path)) == 'extra':
716 return 'text/plain'
718 else:
719 return RequestHandler.guess_type(self, path) \
720 or 'application/x-octet'
723class Server(asyncore.dispatcher):
724 def __init__(self, ip, port, handler, engine):
725 self.ensure_uuids(engine)
726 logger.info('starting Server at http://%s:%d', ip, port)
728 asyncore.dispatcher.__init__(self)
729 self.ip = ip
730 self.port = port
731 self.handler = handler
732 asyncore.dispatcher.__init__(self)
733 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
735 self.set_reuse_addr()
736 self.bind((ip, port))
737 self.engine = engine
739 # Quoting the socket module documentation...
740 # listen(backlog)
741 # Listen for connections made to the socket. The backlog argument
742 # specifies the maximum number of queued connections and should
743 # be at least 1; the maximum value is system-dependent (usually
744 # 5).
745 self.listen(5)
747 @staticmethod
748 def ensure_uuids(engine):
749 logger.info('ensuring UUIDs of available stores')
750 store_ids = list(engine.get_store_ids())
751 for store_id in store_ids:
752 store = engine.get_store(store_id)
753 store.ensure_reference()
755 def handle_accept(self):
756 try:
757 conn, addr = self.accept()
758 except socket.error:
759 self.log_info('warning: server accept() threw an exception',
760 'warning')
761 return
762 except TypeError:
763 self.log_info('warning: server accept() threw EWOULDBLOCK',
764 'warning')
765 return
767 self.handler(conn, addr, self)
769 def log(self, message):
770 self.log_info(message)
772 def handle_close(self):
773 self.close()
775 def log_info(self, message, type='info'):
776 {
777 'debug': logger.debug,
778 'info': logger.info,
779 'warning': logger.warning,
780 'error': logger.error
781 }.get(type, 'info')(str(message))
784def run(ip, port, engine):
785 s = Server(ip, port, SeismosizerHandler, engine)
786 asyncore.loop()
787 del s
790if __name__ == '__main__':
791 util.setup_logging('pyrocko.gf.server', 'info')
792 port = 8085
793 engine = gf.LocalEngine(store_superdirs=sys.argv[1:])
794 run('127.0.0.1', port, engine)