from webob import Request, Response from webob import exc import webob from htconsole.evalcontext import EvalContext from htconsole import extrabuiltins from threading import local import urllib import simplejson import tempita from paste.urlparser import StaticURLParser import os import sys class WebResponse(Response): default_content_type = 'text/html' class WebRequest(Request): session = webob.environ_getter('beaker.session') req_context = local() class HTConsole(object): def __init__(self, save_dir, doctest_dir, context=None): self.save_dir = save_dir self.doctest_dir = doctest_dir if context is None: context = EvalContext( make_callback, extrabuiltins.ExtraBuiltins(self)) self.context = context self.static = StaticURLParser( os.path.join(os.path.dirname(__file__), 'templates', 'static')) def __call__(self, environ, start_response): res = WebResponse() req = WebRequest(environ) req.response = res action = req.path_info_peek() req_context.request = req req_context.app = self req_context.application_url = req.application_url req_context.static_url = req.application_url + '/static' handler = Handler(self, req, req_context.static_url) req_context.handler = handler if action == 'favicon.ico': return exc.HTTPNotFound('No favicon.ico')(environ, start_response) elif action == 'static': req.path_info_pop() return self.static(environ, start_response) elif action: meth_name = 'action_%s' % action if hasattr(handler, meth_name): meth = getattr(handler, meth_name) else: print 'Nothing can respond to %s (action=%r, method %r.%s)' % (req.url, action, handler, meth_name) return exc.HTTPNotFound('No method %s' % meth_name)(environ, start_response) #req.path_info_pop() else: meth = handler.action_index try: result = meth() except exc.HTTPException, e: return e(environ, start_response) if result is None: result = req.response elif isinstance(result, basestring): req.response.body = result result = req.response return result(environ, start_response) def make_callback(callback_id): req = req_context.request return req.application_url + '/callback/%s?_=t' % urllib.quote(str(callback_id)) class Handler(object): def __init__(self, app, req, static_url): self.app = app self.context = self.app.context self.req = req self.static_url = static_url def action_index(self): self.namespace = [] for name, value in sorted(self.context.namespace.items()): if name.startswith('_'): continue self.namespace.append((name, self.context.html_repr(value))) self.saved_console = self.req.session.get('saved_console', '') return self.render('index') def action_callback(self): self.req.path_info_pop() id = self.req.path_info_peek() if self.req.content_type == 'text/json': args = simplejson.loads(self.req.body) else: args = self.req.POST result = self.context.do_callback(id, **dict(args)) if isinstance(result, basestring) and 0: res = WebResponse( content_type='text/html', body=result) else: res = WebResponse( content_type='text/json', body=simplejson.dumps(result)) return res def action_display(self): filename = self.req.params['filename'] filename = self.clean_filename(filename) filename = os.path.join(self.app.save_dir, filename) f = open(filename) body = f.read() f.close() return WebResponse( content_type='text/plain', body=body) def action_save_console(self): console = self.req.params['console'] self.req.session['saved_console'] = console return exc.HTTPCreated() def action_run(self): expr = self.req.params['command'] lines = expr.splitlines(True) lines = ['>>> %s' % lines[0]] + [ '... %s' % l for l in lines[1:]] sys.stdout.write(''.join(lines)) if not expr.endswith('\n'): expr += '\n' response = self.context.exec_expr_response(expr) res_text = simplejson.dumps(response) res = WebResponse(content_type='text/plain') res.body = res_text return res def clean_filename(self, fn): fn = fn.split('/')[-1] fn = fn.split(os.path.sep)[-1] return fn def render(self, name): self.response = WebResponse() tmpl_fn = os.path.join(os.path.dirname(__file__), 'templates', name+'.tmpl') tmpl = tempita.HTMLTemplate.from_filename(tmpl_fn) ns = self.__dict__.copy() body = tmpl.substitute(ns) self.response.body = body return self.response def __repr__(self): return '<%s for %r>' % (self.__class__.__name__, self.req) def make_app( global_conf, debug=None, save_dir='./saved-files', doctest_dir=None): from paste.deploy.converters import asbool if debug is None: debug = global_conf.get('debug', True) debug = asbool(debug) app = HTConsole(save_dir=save_dir, doctest_dir=doctest_dir) from beaker.middleware import session_filter_app_factory app = session_filter_app_factory(app, global_conf) if debug: from weberror.evalexception import EvalException app = EvalException(app, global_conf) return app