from webob import Request, Response from webob.exc import HTTPException, HTTPNotFound #from webob import __all__ as __webob_all__ #from webob.exc import * #from webob.exc import __all__ as __webob_exc_all__ __all__ = (['webob_wrap', 'webob_middleware', 'webob_postprocessor', 'default_charset', 'WrapperBase'] + ['Request', 'Response'] #+ __webob_all__ + __webob_exc_all__ ) class DeferredWrap(object): __slots__ = ('wrapper', 'args', 'kw') def __init__(self, wrapper, args, kw): self.wrapper = wrapper self.args = args self.kw = kw def __call__(self, *args, **kw): args = self.args + args if kw: newkw = self.kw.copy() newkw.update(kw) else: newkw = self.kw return self.wrapper(*args, **newkw) def __repr__(self): return '%s(%s, %r, %r)' % (self.__class__.__name__, self.wrapper.__name__, self.args, self.kw) def __get__(self, obj, type): if obj is not None: next = self.args[0] if hasattr(next, '__get__'): bound = next.__get__(obj, type) return self.__class__(self.wrapper, (bound,) + self.args[1:], self.kw) return self class WrapperBase(object): """ This makes the following identical: wrapped_app = webob_middleware(mw)(app) and wrapped_app = webob_middleware(mw, app) This is implemented by using DeferredWrap: >>> webob_middleware(lambda req, app: app) # doctest: +ELLIPSIS DeferredWrap(webob_middleware, ( at ...>,), {}) Middleware can take extra parameters: >>> def multiply_mw(req, app, num=1): ... resp = req.get_response(app) ... resp.body *= num ... return resp ... >>> webob_middleware(multiply_mw, num=2) # doctest: +ELLIPSIS DeferredWrap(webob_middleware, (,), {'num': 2}) >>> wrapped_app = webob_middleware(multiply_mw, Response('abc'), num=2) >>> wrapped_app # doctest: +ELLIPSIS webob_middleware(, , num=2) >>> get = Request.blank('/').get_response >>> get(wrapped_app).body 'abcabc' wrapped_app can also be created like this: >>> double_mw = webob_middleware(multiply_mw, num=2) >>> wrapped_app = double_mw(Response('abc')) >>> get(wrapped_app).body 'abcabc' Or even like this: >>> @webob_middleware(num=2) ... def multiply_mw(req, app, num=1): ... resp = req.get_response(app) ... resp.body *= num ... return resp ... >>> wrapped_app = multiply_mw(Response('abc')) >>> get(wrapped_app).body 'abcabc' Middleware parameters can be overridden later: >>> triple_app = multiply_mw(Response('abc'), num=3) >>> get(triple_app).body 'abcabcabc' """ __slots__ = ('_original_args', '_original_kw') def __new__(cls, *args, **kw): if not hasattr(cls, '_min_init_args'): import inspect init_args, _, _, init_defaults = inspect.getargspec(cls.__init__) cls._min_init_args = len(init_args) - len(init_defaults or ()) - 1 if len(args) < cls._min_init_args: return DeferredWrap(cls, args, kw) else: inst = super(WrapperBase, cls).__new__(cls)# , *args, **kw) # py2.6 #inst._key = '_%s_%d' % (cls.__name__, id(inst)) inst._original_args = list(args) inst._original_kw = kw.copy() return inst def __repr__(self): args = map(repr, self._original_args) for name, val in self._original_kw.iteritems(): args.append('%s=%r' % (name, val)) return '%s(%s)' % (self.__class__.__name__, ', '.join(args)) def __get__(self, obj, type): if obj is not None: next = self._original_args[0] if hasattr(next, '__get__'): bound = next.__get__(obj, type) return self.__class__(bound, *self._original_args[1:], **self._original_kw) return self class webob_wrap(WrapperBase): """ Turns a callable into an WSGI app. Subject should * accept one argument: webob.Request instance * return * any WSGI app (usually webob.Response) * or return None (will be interpreted as 404 Not Found) * or throw webob.exc.HTTPException which will be caught and used to generate the response @webob_wrap def app(req): return Response(req.path) Resulting `app` is a WSGI application. Wrapping of methods works as well, for example: class MyApp(object): def __init__(self, message): self.message = message @webob_wrap def __call__(self, req): return Response(self.message) `MyApp` instances are WSGI apps. """ __slots__ = ('func') __name__ = property(lambda self: self.func.__name__) request_cls = Request def __init__(self, func):#, **kw): self.func = func #self.kw = kw def __call__(self, environ, start_response): req = self.request_cls(environ) try: #app = self.func(req, **self.kw) app = self.func(req) if app is None: app = HTTPNotFound() except HTTPException, exc: app = exc return app(environ, start_response) class webob_middleware(WrapperBase): """ This makes creating WSGI middleware trivial, for example: @webob_middleware def require_user_mw(req, app, user='root'): if req.remote_user != user: raise HTTPUnauthorized() return app require_user_mw is a middleware now. protected_app = require_user_mw(app) protected_app2 = require_user_mw(app, user='Administrator') Assuming `app` is a WSGI application, `protected_app` and `protected_app2` are WSGI apps too. Note how we used `user` argument to `require_user_mw` to customize the middleware-wrapped instances. """ __slots__ = ('mwfunc', 'next_app', 'kw') def __init__(self, mwfunc, next_app, **kw): self.mwfunc = mwfunc self.next_app = next_app self.kw = kw @webob_wrap def __call__(self, req): return self.mwfunc(req, self.next_app, **self.kw) class webob_postprocessor(WrapperBase): """ This class streamlines implementation of postprocessing WSGI middleware, for example: @webob_postprocessor def censor_mw(resp, words=[]): if resp.content_type == 'text/plain': for word in words: resp.body = resp.body.replace(word, '*' * len(word)) The resulting middleware is used in the same way as ones created with `webob_middleware`, like this: censored_app = censor_mw(app, words=['nasty', 'words']) """ __slots__ = ('postprocessor', 'next_app', 'no_range', 'decode_content', 'takes_request', 'recalc_etag', 'conditional', 'kw') def __init__(self, postprocessor, next_app, no_range=True, decode_content=True, takes_request=False, recalc_etag=True, conditional=False, **kw): self.postprocessor = postprocessor self.next_app = next_app self.no_range = no_range self.decode_content = decode_content self.takes_request = takes_request self.recalc_etag = recalc_etag self.conditional = conditional self.kw = kw @webob_wrap def __call__(self, req): if self.no_range: req.range = req.if_range = None resp = req.get_response(self.next_app) # skip body-less responses (HEAD, Not Modified etc) if resp._app_iter or resp._body: if self.decode_content: resp.decode_content() if self.takes_request: r = self.postprocessor(req, resp, **self.kw) else: r = self.postprocessor(resp, **self.kw) if r is not None: resp = r if self.recalc_etag: resp.md5_etag(set_content_md5=True) resp.conditional_response = self.conditional return resp class default_charset(WrapperBase): """ Use this decorator inside @webob_* @webob_wrap @default_charset('latin-1') def app(req): ... If chardet is installed, you can also write: @webob_wrap @detect_charset def app(req): ... """ __slots__ = ('next_app', 'charset') def __new__(cls, *args, **kw): if args and isinstance(args[0], str): if 'charset' in kw: raise ValueError kw['charset'] = args[0] args = args[1:] return super(default_charset, cls).__new__(cls, *args, **kw) def __init__(self, next_app, charset='UTF-8'): self.next_app = next_app self.charset = charset def __call__(self, req, **kw): if req.charset is None: if callable(self.charset): req.charset = self.charset(req) else: req.charset = self.charset return self.next_app(req, **kw) try: from chardet.universaldetector import UniversalDetector except ImportError: pass else: def req_charset_detect(req): detector = UniversalDetector() detector.feed('\n'.join([req.script_name, req.path_info, req.query_string])) if not detector.done: detector.feed(req.body) detector.close() return detector.result['encoding'] detect_charset = default_charset(charset=req_charset_detect) __all__.append('detect_charset') if __name__ == '__main__': def test(app, url='/'): return Request.blank(url).get_response(app) #@webob_wrap(x=1) #def app(req, x): # return Response(str(x)) @webob_wrap def app(req): return Response('1') #print app assert test(app).body == '1' class App(object): def __init__(self, val): self.val = str(val) @webob_wrap @default_charset('ASCII') def __call__(self, req): return Response(self.val) assert test(App('123')).body == '123' @webob_middleware def mw(req, app): r = req.get_response(app) r.md5_etag() return r mwa = mw(app) mwr = test(mwa) #print mw #print mwa assert mwr.body == '1' assert mwr.etag is not None @webob_postprocessor def double(r): r.body += r.body #print double #print double.__call__ dapp = double(app) #assert dapp.__call__.request_charset == 'UTF-8' assert test(dapp).body == '11' def charset_app(req): return Response(str(req.charset)) assert test(webob_wrap(default_charset(charset_app, None))).body == 'None' q_charset = webob_wrap(default_charset(charset_app, lambda req: req.str_GET.get('charset'))) assert test(q_charset).body == 'None' assert test(q_charset, '/?charset=cp1251').body == 'cp1251' ru_cp866 = '\xe0\xe3\xe1\xe1\xaa\xa8\xa9' #assert test(detect_charset, ru_cp866).body == 'IBM866' detect_charset_app = webob_wrap(detect_charset(charset_app)) assert test(detect_charset_app, ru_cp866).body == 'IBM866' assert Request.blank('/', body=ru_cp866).get_response(detect_charset_app).body == 'IBM866'