1# http://pyrocko.org - GPLv3
2#
3# The Pyrocko Developers, 21st Century
4# ---|P------/S----------~Lg----------
5'''
6Simple async HTTP server
8Based on this recipe:
10 http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/440665
12which is based on this one:
14 http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/259148
15'''
17import asynchat
18import asyncore
19import socket
20try:
21 from http.server import SimpleHTTPRequestHandler as SHRH
22 from html import escape
23except ImportError:
24 from SimpleHTTPServer import SimpleHTTPRequestHandler as SHRH
25 from cgi import escape
27import sys
29import json
30import cgi
31from io import BytesIO
32import io
33import os
34import traceback
35import posixpath
36import re
37from collections import deque
38import logging
40import matplotlib
41matplotlib.use('Agg') # noqa
43import matplotlib.pyplot as plt # noqa
44from pyrocko.plot import cake_plot # noqa
45from pyrocko import gf, util # noqa
46from pyrocko.util import quote, unquote # noqa
48logger = logging.getLogger('pyrocko.gf.server')
50__version__ = '1.0'
52store_id_pattern = gf.StringID.pattern[1:-1]
55def enc(s):
56 try:
57 return s.encode('utf-8')
58 except Exception:
59 return s
62def popall(self):
63 # Preallocate the list to save memory resizing.
64 r = len(self)*[None]
65 for i in range(len(r)):
66 r[i] = self.popleft()
67 return r
70class writewrapper(object):
71 def __init__(self, d, blocksize=4096):
72 self.blocksize = blocksize
73 self.d = d
75 def write(self, data):
76 if self.blocksize in (None, -1):
77 self.d.append(data)
78 else:
79 BS = self.blocksize
80 xtra = 0
81 if len(data) % BS:
82 xtra = len(data) % BS + BS
83 buf = self.d
84 for i in range(0, len(data)-xtra, BS):
85 buf.append(data[i:i+BS])
86 if xtra:
87 buf.append(data[-xtra:])
90class RequestHandler(asynchat.async_chat, SHRH):
92 server_version = 'Seismosizer/'+__version__
93 protocol_version = 'HTTP/1.1'
94 blocksize = 4096
96 # In enabling the use of buffer objects by setting use_buffer to True,
97 # any data block sent will remain in memory until it has actually been
98 # sent.
99 use_buffer = False
101 def __init__(self, conn, addr, server):
102 asynchat.async_chat.__init__(self, conn)
103 self.client_address = addr
104 self.connection = conn
105 self.server = server
106 self.opened = []
107 # set the terminator : when it is received, this means that the
108 # http request is complete ; control will be passed to
109 # self.found_terminator
110 self.set_terminator(b'\r\n\r\n')
111 self.incoming = deque()
112 self.outgoing = deque()
113 self.rfile = None
114 self.wfile = writewrapper(
115 self.outgoing,
116 -self.use_buffer or self.blocksize)
117 self.found_terminator = self.handle_request_line
118 self.request_version = 'HTTP/1.1'
119 self.code = None
120 # buffer the response and headers to avoid several calls to select()
122 def update_b(self, fsize):
123 if fsize > 1048576:
124 self.use_buffer = True
125 self.blocksize = 131072
127 def collect_incoming_data(self, data):
128 '''Collect the data arriving on the connexion'''
129 if not data:
130 self.ac_in_buffer = ''
131 return
132 self.incoming.append(data)
134 def prepare_POST(self):
135 '''Prepare to read the request body'''
136 try:
137 bytesToRead = int(self.headers.getheader('Content-length'))
138 except AttributeError:
139 bytesToRead = int(self.headers['Content-length'])
140 # set terminator to length (will read bytesToRead bytes)
141 self.set_terminator(bytesToRead)
142 self.incoming.clear()
143 # control will be passed to a new found_terminator
144 self.found_terminator = self.handle_post_data
146 def handle_post_data(self):
147 '''Called when a POST request body has been read'''
148 self.rfile = BytesIO(b''.join(popall(self.incoming)))
149 self.rfile.seek(0)
150 self.do_POST()
152 def parse_request_url(self):
153 # Check for query string in URL
154 qspos = self.path.find('?')
155 if qspos >= 0:
156 self.body = cgi.parse_qs(self.path[qspos+1:], keep_blank_values=1)
157 self.path = self.path[:qspos]
158 else:
159 self.body = {}
161 def do_HEAD(self):
162 '''Begins serving a HEAD request'''
163 self.parse_request_url()
164 f = self.send_head()
165 if f:
166 f.close()
167 self.log_request(self.code)
169 def do_GET(self):
170 '''Begins serving a GET request'''
171 self.parse_request_url()
172 self.handle_data()
174 def do_POST(self):
175 '''Begins serving a POST request. The request data must be readable
176 on a file-like object called self.rfile'''
177 try:
178 data = cgi.parse_header(self.headers.getheader('content-type'))
179 length = int(self.headers.getheader('content-length', 0))
180 except AttributeError:
181 data = cgi.parse_header(self.headers.get('content-type'))
182 length = int(self.headers.get('content-length', 0))
184 ctype, pdict = data if data else (None, None)
186 if ctype == 'multipart/form-data':
187 self.body = cgi.parse_multipart(self.rfile, pdict)
188 elif ctype == 'application/x-www-form-urlencoded':
189 qs = self.rfile.read(length)
190 self.body = cgi.parse_qs(qs, keep_blank_values=1)
191 else:
192 self.body = {}
193 # self.handle_post_body()
194 self.handle_data()
196 def handle_close(self):
197 for f in self.opened:
198 if not f.closed:
199 f.close()
200 asynchat.async_chat.handle_close(self)
202 def handle_data(self):
203 '''Class to override'''
205 f = self.send_head()
206 if f:
207 # do some special things with file objects so that we don't have
208 # to read them all into memory at the same time...may leave a
209 # file handle open for longer than is really desired, but it does
210 # make it able to handle files of unlimited size.
211 try:
212 size = sys.getsizeof(f)
213 except (AttributeError, io.UnsupportedOperation):
214 size = len(f.getvalue())
216 self.update_b(size)
217 self.log_request(self.code, size)
218 self.outgoing.append(f)
219 else:
220 self.log_request(self.code)
221 # signal the end of this request
222 self.outgoing.append(None)
224 def handle_request_line(self):
225 '''Called when the http request line and headers have been received'''
226 # prepare attributes needed in parse_request()
227 self.rfile = BytesIO(b''.join(popall(self.incoming)))
228 self.rfile.seek(0)
229 self.raw_requestline = self.rfile.readline()
230 self.parse_request()
232 if self.command in ['GET', 'HEAD']:
233 # if method is GET or HEAD, call do_GET or do_HEAD and finish
234 method = 'do_'+self.command
235 if hasattr(self, method):
236 getattr(self, method)()
237 elif self.command == 'POST':
238 # if method is POST, call prepare_POST, don't finish yet
239 self.prepare_POST()
240 else:
241 self.send_error(501, 'Unsupported method (%s)' % self.command)
243 def handle_error(self):
244 try:
245 traceback.print_exc(sys.stderr)
246 except Exception:
247 logger.error(
248 'An error occurred and another one while printing the '
249 'traceback. Please debug me...')
251 self.close()
253 def writable(self):
254 return len(self.outgoing) and self.connected
256 def handle_write(self):
257 out = self.outgoing
258 while len(out):
259 a = out.popleft()
261 a = enc(a)
262 # handle end of request disconnection
263 if a is None:
264 # Some clients have issues with keep-alive connections, or
265 # perhaps I implemented them wrong.
267 # If the user is running a Python version < 2.4.1, there is a
268 # bug with SimpleHTTPServer:
269 # http://python.org/sf/1097597
270 # So we should be closing anyways, even though the client will
271 # claim a partial download, so as to prevent hung-connections.
272 # if self.close_connection:
273 self.close()
274 return
276 # handle file objects
277 elif hasattr(a, 'read'):
278 _a, a = a, a.read(self.blocksize)
279 if not len(a):
280 _a.close()
281 del _a
282 continue
283 else:
284 out.appendleft(_a) # noqa
285 break
287 # handle string/buffer objects
288 elif len(a):
289 break
290 else:
291 # if we get here, the outgoing deque is empty
292 return
294 # if we get here, 'a' is a string or buffer object of length > 0
295 try:
296 num_sent = self.send(a)
297 if num_sent < len(a):
298 if not num_sent:
299 # this is probably overkill, but it can save the
300 # allocations of buffers when they are enabled
301 out.appendleft(a)
302 elif self.use_buffer:
303 out.appendleft(buffer(a, num_sent)) # noqa
304 else:
305 out.appendleft(a[num_sent:])
307 except socket.error as why:
308 if isinstance(why, str):
309 self.log_error(why)
310 elif isinstance(why, tuple) and isinstance(why[-1], str):
311 self.log_error(why[-1])
312 else:
313 self.log_error(str(why))
314 self.handle_error()
316 def log(self, message):
317 self.log_info(message)
319 def log_info(self, message, type='info'):
320 {
321 'debug': logger.debug,
322 'info': logger.info,
323 'warning': logger.warning,
324 'error': logger.error
325 }.get(type, logger.info)(str(message))
327 def log_message(self, format, *args):
328 self.log_info('%s - - [%s] %s \"%s\" \"%s\"\n' % (
329 self.address_string(),
330 self.log_date_time_string(),
331 format % args,
332 self.headers.get('referer', ''),
333 self.headers.get('user-agent', '')))
335 def listdir(self, path):
336 return os.listdir(path)
338 def list_directory(self, path):
339 '''Helper to produce a directory listing (absent index.html).
341 Return value is either a file object, or None (indicating an
342 error). In either case, the headers are sent, making the
343 interface the same as for send_head().
345 '''
346 try:
347 list = self.listdir(path)
348 except os.error:
349 self.send_error(404, 'No permission to list directory')
350 return None
352 list.sort(key=lambda a: a.lower())
353 f = BytesIO()
354 displaypath = escape(unquote(self.path))
355 f.write(enc('<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">'))
356 f.write(enc('<html>\n<title>Directory listing for %s</title>\n'
357 % displaypath))
358 f.write(
359 enc('<body>\n<h2>Directory listing for %s</h2>\n' % displaypath))
360 f.write(enc('<hr>\n<ul>\n'))
361 for name in list:
362 fullname = os.path.join(path, name)
363 displayname = linkname = name
364 # Append / for directories or @ for symbolic links
365 if os.path.isdir(fullname):
366 displayname = name + '/'
367 linkname = name + '/'
368 if os.path.islink(fullname):
369 displayname = name + '@'
370 # Note: a link to a directory displays with @ and links with /
371 f.write(enc('<li><a href="%s">%s</a>\n' %
372 (quote(linkname),
373 escape(displayname))))
374 f.write(enc('</ul>\n<hr>\n</body>\n</html>\n'))
375 length = f.tell()
376 f.seek(0)
377 encoding = sys.getfilesystemencoding()
379 self.send_response(200, 'OK')
380 self.send_header('Content-Length', str(length))
381 self.send_header('Content-Type', 'text/html; charset=%s' % encoding)
382 self.end_headers()
384 return f
386 def redirect(self, path):
387 self.send_response(301)
388 self.send_header('Location', path)
389 self.end_headers()
391 def send_head(self):
392 '''Common code for GET and HEAD commands.
394 This sends the response code and MIME headers.
396 Return value is either a file object (which has to be copied
397 to the outputfile by the caller unless the command was HEAD,
398 and must be closed by the caller under all circumstances), or
399 None, in which case the caller has nothing further to do.
401 '''
402 path = self.translate_path(self.path)
403 if path is None:
404 self.send_error(404, 'File not found')
405 return None
407 f = None
408 if os.path.isdir(path):
409 if not self.path.endswith('/'):
410 # redirect browser - doing basically what apache does
411 return self.redirect(self.path + '/')
412 else:
413 return self.list_directory(path)
415 ctype = self.guess_type(path)
416 try:
417 # Always read in binary mode. Opening files in text mode may cause
418 # newline translations, making the actual size of the content
419 # transmitted *less* than the content-length!
420 f = open(path, 'rb')
421 self.opened.append(f)
422 except IOError:
423 self.send_error(404, 'File not found')
424 return None
425 fs = os.fstat(f.fileno())
426 self.send_response(200, 'OK')
427 self.send_header('Last-Modified', self.date_time_string(fs.st_mtime))
428 self.send_header('Content-Length', str(fs[6]))
429 self.send_header('Content-Type', ctype)
430 self.send_header('Content-Disposition', 'attachment')
431 self.end_headers()
432 return f
435class SeismosizerHandler(RequestHandler):
437 stores_path = '/gfws/static/stores/'
438 api_path = '/gfws/api/'
439 process_path = '/gfws/seismosizer/1/query'
441 def send_head(self):
442 S = self.stores_path
443 P = self.process_path
444 A = self.api_path
445 for x in (S,):
446 if re.match(r'^' + x[:-1] + '$', self.path):
447 return self.redirect(x)
449 if re.match(r'^' + S + store_id_pattern, self.path):
450 return RequestHandler.send_head(self)
452 elif re.match(r'^' + S + '$', self.path):
453 return self.list_stores()
455 elif re.match(r'^' + A + '$', self.path):
456 return self.list_stores_json()
458 elif re.match(r'^' + A + store_id_pattern + '$', self.path):
459 return self.get_store_config()
461 elif re.match(r'^' + A + store_id_pattern + '/profile$', self.path):
462 return self.get_store_velocity_profile()
464 elif re.match(r'^' + P + '$', self.path):
465 return self.process()
467 else:
468 self.send_error(404, 'File not found')
469 self.end_headers()
470 return None
472 def translate_path(self, path):
473 path = path.split('?', 1)[0]
474 path = path.split('#', 1)[0]
475 path = posixpath.normpath(unquote(path))
476 words = path.split('/')
477 words = [_f for _f in words if _f]
479 path = '/'
480 if words[:3] == self.stores_path.split('/')[1:-1] and len(words) > 3:
481 engine = self.server.engine
482 if words[3] not in engine.get_store_ids():
483 return None
484 else:
485 path = engine.get_store_dir(words[3])
486 words = words[4:]
487 else:
488 return None
490 for word in words:
491 drive, word = os.path.splitdrive(word)
492 head, word = os.path.split(word)
493 if word in (os.curdir, os.pardir):
494 continue
495 path = os.path.join(path, word)
497 return path
499 def listdir(self, path):
500 if path == self.stores_path:
501 return list(self.server.engine.get_store_ids())
502 else:
503 return RequestHandler.listdir(self, path)
505 def list_stores(self):
506 '''Create listing of stores.'''
507 from jinja2 import Template
509 engine = self.server.engine
511 store_ids = list(engine.get_store_ids())
512 store_ids.sort(key=lambda x: x.lower())
514 stores = [engine.get_store(store_id) for store_id in store_ids]
516 templates = {
517 'html': Template('''
518<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">
519<html>
520<title>{{ title }}</title>
521<body>
522<h2>{{ title }}</h2>
523<hr>
524<table>
525 <tr>
526 <th style="text-align:left">Store ID</th>
527 <th style="text-align:center">Type</th>
528 <th style="text-align:center">Extent</th>
529 <th style="text-align:center">Sample-rate</th>
530 <th style="text-align:center">Size (index + traces)</th>
531 </tr>
532{% for store in stores %}
533 <tr>
534 <td><a href="{{ store.config.id }}/">{{ store.config.id|e }}/</a></td>
535 <td style="text-align:center">{{ store.config.short_type }}</td>
536 <td style="text-align:right">{{ store.config.short_extent }} km</td>
537 <td style="text-align:right">{{ store.config.sample_rate }} Hz</td>
538 <td style="text-align:right">{{ store.size_index_and_data_human }}</td>
539 </tr>
540{% endfor %}
541</table>
542</hr>
543</body>
544</html>
545'''.lstrip()),
546 'text': Template('''
547{% for store in stores %}{#
548#}{{ store.config.id.ljust(25) }} {#
549#}{{ store.config.short_type.center(5) }} {#
550#}{{ store.config.short_extent.rjust(30) }} km {#
551#}{{ "%10.2g"|format(store.config.sample_rate) }} Hz {#
552#}{{ store.size_index_and_data_human.rjust(8) }}
553{% endfor %}'''.lstrip())}
555 format = self.body.get('format', ['html'])[0]
556 if format not in ('html', 'text'):
557 format = 'html'
559 title = "Green's function stores listing"
560 s = templates[format].render(stores=stores, title=title).encode('utf8')
561 length = len(s)
562 f = BytesIO(s)
563 self.send_response(200, 'OK')
564 self.send_header('Content-Type', 'text/html; charset=utf-8')
565 self.send_header('Content-Length', str(length))
566 self.end_headers()
567 return f
569 def list_stores_json(self):
570 engine = self.server.engine
572 store_ids = list(engine.get_store_ids())
573 store_ids.sort(key=lambda x: x.lower())
575 def get_store_dict(store):
576 store.ensure_reference()
578 return {
579 'id': store.config.id,
580 'short_type': store.config.short_type,
581 'modelling_code_id': store.config.modelling_code_id,
582 'source_depth_min': store.config.source_depth_min,
583 'source_depth_max': store.config.source_depth_max,
584 'source_depth_delta': store.config.source_depth_delta,
585 'distance_min': store.config.distance_min,
586 'distance_max': store.config.distance_max,
587 'distance_delta': store.config.distance_delta,
588 'sample_rate': store.config.sample_rate,
589 'size': store.size_index_and_data,
590 'uuid': store.config.uuid,
591 'reference': store.config.reference
592 }
594 stores = {
595 'stores': [get_store_dict(engine.get_store(store_id))
596 for store_id in store_ids]
597 }
599 s = json.dumps(stores)
600 length = len(s)
601 f = BytesIO(s.encode('ascii'))
602 self.send_response(200, 'OK')
603 self.send_header('Content-Type', 'text/html; charset=utf-8')
604 self.send_header('Content-Length', str(length))
605 self.send_header('Access-Control-Allow-Origin', '*')
606 self.end_headers()
608 return f
610 def get_store_config(self):
611 engine = self.server.engine
613 store_ids = list(engine.get_store_ids())
614 store_ids.sort(key=lambda x: x.lower())
616 for match in re.finditer(r'/gfws/api/(' + store_id_pattern + ')',
617 self.path):
618 store_id = match.groups()[0]
620 try:
621 store = engine.get_store(store_id)
622 except Exception:
623 self.send_error(404)
624 self.end_headers()
625 return
627 data = {}
628 data['id'] = store_id
629 data['config'] = str(store.config)
631 s = json.dumps(data)
632 length = len(s)
633 f = BytesIO(s.encode('ascii'))
634 self.send_response(200, 'OK')
635 self.send_header('Content-Type', 'text/html; charset=utf-8')
636 self.send_header('Content-Length', str(length))
637 self.send_header('Access-Control-Allow-Origin', '*')
638 self.end_headers()
640 return f
642 def get_store_velocity_profile(self):
643 engine = self.server.engine
645 fig = plt.figure()
646 axes = fig.gca()
648 store_ids = list(engine.get_store_ids())
649 store_ids.sort(key=lambda x: x.lower())
651 for match in re.finditer(
652 r'/gfws/api/(' + store_id_pattern + ')/profile', self.path):
653 store_id = match.groups()[0]
655 try:
656 store = engine.get_store(store_id)
657 except Exception:
658 self.send_error(404)
659 self.end_headers()
660 return
662 if store.config.earthmodel_1d is None:
663 self.send_error(404)
664 self.end_headers()
665 return
667 cake_plot.my_model_plot(store.config.earthmodel_1d, axes=axes)
669 f = BytesIO()
670 fig.savefig(f, format='png')
672 length = f.tell()
673 self.send_response(200, 'OK')
674 self.send_header('Content-Type', 'image/png;')
675 self.send_header('Content-Length', str(length))
676 self.send_header('Access-Control-Allow-Origin', '*')
677 self.end_headers()
679 f.seek(0)
680 return f.read()
682 def process(self):
684 request = gf.load(string=self.body['request'][0])
685 try:
686 resp = self.server.engine.process(request=request)
687 except (gf.BadRequest, gf.StoreError) as e:
688 self.send_error(400, str(e))
689 return
691 f = BytesIO()
692 resp.dump(stream=f)
693 length = f.tell()
695 f.seek(0)
697 self.send_response(200, 'OK')
698 self.send_header('Content-Type', 'text/html; charset=utf-8')
699 self.send_header('Content-Length', str(length))
700 self.end_headers()
701 return f
703 def guess_type(self, path):
704 bn = os.path.basename
705 dn = os.path.dirname
706 if bn(path) == 'config':
707 return 'text/plain'
709 if bn(dn(path)) == 'extra':
710 return 'text/plain'
712 else:
713 return RequestHandler.guess_type(self, path) \
714 or 'application/x-octet'
717class Server(asyncore.dispatcher):
718 def __init__(self, ip, port, handler, engine):
719 self.ensure_uuids(engine)
720 logger.info('starting Server at http://%s:%d', ip, port)
722 asyncore.dispatcher.__init__(self)
723 self.ip = ip
724 self.port = port
725 self.handler = handler
726 asyncore.dispatcher.__init__(self)
727 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
729 self.set_reuse_addr()
730 self.bind((ip, port))
731 self.engine = engine
733 # Quoting the socket module documentation...
734 # listen(backlog)
735 # Listen for connections made to the socket. The backlog argument
736 # specifies the maximum number of queued connections and should
737 # be at least 1; the maximum value is system-dependent (usually
738 # 5).
739 self.listen(5)
741 @staticmethod
742 def ensure_uuids(engine):
743 logger.info('ensuring UUIDs of available stores')
744 store_ids = list(engine.get_store_ids())
745 for store_id in store_ids:
746 store = engine.get_store(store_id)
747 store.ensure_reference()
749 def handle_accept(self):
750 try:
751 conn, addr = self.accept()
752 except socket.error:
753 self.log_info('warning: server accept() threw an exception',
754 'warning')
755 return
756 except TypeError:
757 self.log_info('warning: server accept() threw EWOULDBLOCK',
758 'warning')
759 return
761 self.handler(conn, addr, self)
763 def log(self, message):
764 self.log_info(message)
766 def handle_close(self):
767 self.close()
769 def log_info(self, message, type='info'):
770 {
771 'debug': logger.debug,
772 'info': logger.info,
773 'warning': logger.warning,
774 'error': logger.error
775 }.get(type, 'info')(str(message))
778def run(ip, port, engine):
779 s = Server(ip, port, SeismosizerHandler, engine)
780 asyncore.loop()
781 del s
784if __name__ == '__main__':
785 util.setup_logging('pyrocko.gf.server', 'info')
786 port = 8085
787 engine = gf.LocalEngine(store_superdirs=sys.argv[1:])
788 run('127.0.0.1', port, engine)