Source code for burpui.app

# -*- coding: utf8 -*-
"""
Burp-UI is a web-ui for burp backup written in python with Flask and
jQuery/Bootstrap

.. module:: burpui.app
    :platform: Unix
    :synopsis: Burp-UI app module.

.. moduleauthor:: Ziirish <hi+burpui@ziirish.me>
"""
import json
import logging
import os
import sys
import time

from .desc import __release__, __version__
from .extensions import (
    create_celery,
    create_db,
    create_websocket,
    get_redis_server,
    parse_db_setting,
)


[docs]def create_app(conf=None, verbose=0, logfile=None, **kwargs): """Initialize the whole application. :param conf: Configuration file to use :type conf: str :param verbose: Set the verbosity level :type verbose: int :param logfile: Store the logs in the given file :type logfile: str :param kwargs: Extra options: - gunicorn (bool): Enable gunicorn engine instead of flask's default. Default is True. - unittest (bool): Are we running tests (used for test only). Default is False. - debug (bool): Enable debug mode. Default is False. - cli (bool): Are we running the CLI. Default is False. - reverse_proxy (bool): Are we behind a reverse-proxy. Default is True if gunicorn is True - websocket_server (bool): Are we running the websocket server. Default is False :type kwargs: dict :returns: A :class:`burpui.engines.server.BUIServer` object """ from flask import g, request, session from flask_babel import gettext from flask_login import LoginManager from .engines.server import BUIServer as BurpUI from .ext.cache import cache from .ext.i18n import babel, get_locale from .filter import mask from .misc.auth.handler import BUIanon from .security import basic_login_from_request from .sessions import session_manager from .thirdparty.flask_bower import Bower from .tools.logging import logger from .utils import ReverseProxied, is_uuid, lookup_file gunicorn = kwargs.get("gunicorn", True) unittest = kwargs.get("unittest", False) debug = kwargs.get("debug", False) cli = kwargs.get("cli", False) reverse_proxy = kwargs.get("reverse_proxy", gunicorn) celery_worker = kwargs.get("celery_worker", False) websocket_server = kwargs.get("websocket_server", False) # We initialize the core app = BurpUI() app.config["CFG"] = None app.config["LOG_FILE"] = logfile app.config["LOG_LEVEL"] = verbose logger.init_app(app) if verbose: app.enable_logger() app.gunicorn = gunicorn logger.debug( "conf: {}\n".format(conf) + "verbose: {}\n".format(logging.getLevelName(verbose)) + "logfile: {}\n".format(logfile) + "gunicorn: {}\n".format(gunicorn) + "debug: {}\n".format(debug) + "unittest: {}\n".format(unittest) + "cli: {}\n".format(cli) + "reverse_proxy: {}\n".format(reverse_proxy) + "celery_worker: {}\n".format(celery_worker) + "websocket_server: {}".format(websocket_server) ) # Some config app.config["BUI_CLI"] = cli # FIXME: strange behavior when bundling errors # app.config['BUNDLE_ERRORS'] = True app.config["REMEMBER_COOKIE_HTTPONLY"] = True if debug and not gunicorn: # pragma: no cover app.config["DEBUG"] = True and not unittest app.config["TESTING"] = True and not unittest # Still need to test conf file here because the init function can be called # by gunicorn directly if conf: app.config["CFG"] = lookup_file(conf, guess=False) else: app.config["CFG"] = lookup_file() logger.info("Using configuration: {}".format(app.config["CFG"])) app.setup(app.config["CFG"], unittest, cli) if cli and not websocket_server and "shell" not in sys.argv: return app if debug: app.config.setdefault("TEMPLATES_AUTO_RELOAD", True) app.config["TEMPLATES_AUTO_RELOAD"] = True app.config["DEBUG"] = True if app.demo: try: import sentry_sdk from sentry_sdk.integrations.flask import FlaskIntegration sentry_sdk.init( dsn=app.config["BUI_DSN"], integrations=[FlaskIntegration()] ) except ImportError: pass # manage application secret key if app.secret_key and ( app.secret_key.lower() == "none" or (app.secret_key.lower() == "random" and gunicorn) ): # pragma: no cover logger.critical( "Your setup is not secure! Please consider setting a" " secret key in your configuration file" ) app.secret_key = "Burp-UI" if not app.secret_key or app.secret_key.lower() == "random": from base64 import b64encode app.secret_key = b64encode(os.urandom(256)) app.wsgi_app = ReverseProxied(app.wsgi_app, app) # Manage reverse_proxy special tricks & improvements if reverse_proxy: # pragma: no cover from werkzeug.middleware.proxy_fix import ProxyFix kwargs = {} if app.config["NUM_PROXIES"] > 0: kwargs = app.config["PROXY_FIX_ARGS"].format( num_proxies=app.config["NUM_PROXIES"] ) kwargs = json.loads(kwargs) logger.debug(f"Using {kwargs} as ProxyFix parameters") app.wsgi_app = ProxyFix(app.wsgi_app, **kwargs) if app.storage and app.storage.lower() == "redis": try: # Session setup if not app.session_db or str(app.session_db).lower() not in [ "none", "false", ]: from redis import Redis from .ext.session import sess host, port, pwd = get_redis_server(app) db = 0 if app.session_db and str(app.session_db).lower() not in [ "redis", "default", "true", ]: try: # pragma: no cover (_, _, pwd, host, port, db) = parse_db_setting(app.session_db) except ValueError as exp: logger.warning(str(exp)) try: db = int(db) except ValueError: db = 0 logger.debug( "SESSION: Using redis://guest:****@{}:{}/{}".format(host, port, db) ) red = Redis(host=host, port=port, db=db, password=pwd) app.config["WITH_SRV_SESSION"] = True app.config["SESSION_TYPE"] = "redis" app.config["SESSION_REDIS"] = red app.config["SESSION_USE_SIGNER"] = app.secret_key is not None app.config["SESSION_PERMANENT"] = False sess.init_app(app) session_manager.backend = red except Exception as exp: # pragma: no cover logger.warning("Unable to initialize session: {}".format(str(exp))) app.config["WITH_SRV_SESSION"] = False try: # Cache setup if not app.cache_db or str(app.cache_db).lower() not in ["none", "false"]: host, port, pwd = get_redis_server(app) db = 1 if app.cache_db and str(app.cache_db).lower() not in [ "redis", "default", "true", ]: try: # pragma: no cover (_, _, pwd, host, port, db) = parse_db_setting(app.cache_db) except ValueError as exp: logger.warning(str(exp)) try: db = int(db) except ValueError: db = 1 logger.debug( "CACHE: Using redis://guest:****@{}:{}/{}".format(host, port, db) ) cache.init_app( app, config={ "CACHE_TYPE": "flask_caching.backends.redis", "CACHE_REDIS_HOST": host, "CACHE_REDIS_PORT": port, "CACHE_REDIS_PASSWORD": pwd, "CACHE_REDIS_DB": db, }, ) # clear cache at startup in case we removed or added servers with app.app_context(): cache.clear() else: # pragma: no cover cache.init_app(app) except Exception as exp: # pragma: no cover logger.warning("Unable to initialize cache: {}".format(str(exp))) cache.init_app(app) try: # Limiter setup if app.limiter and str(app.limiter).lower() not in [ "none", "false", ]: # pragma: no cover from .ext.limit import limiter app.config["RATELIMIT_HEADERS_ENABLED"] = True if app.limiter and str(app.limiter).lower() not in [ "default", "redis", "true", ]: app.config["RATELIMIT_STORAGE_URL"] = app.limiter else: db = 3 host, port, pwd = get_redis_server(app) if pwd: conn = "redis://guest:{}@{}:{}/{}".format(pwd, host, port, db) else: conn = "redis://{}:{}/{}".format(host, port, db) app.config["RATELIMIT_STORAGE_URL"] = conn (_, _, pwd, host, port, db) = parse_db_setting( app.config["RATELIMIT_STORAGE_URL"] ) logger.debug( "LIMITER: Using redis://guest:****@{}:{}/{}".format(host, port, db) ) limiter.init_app(app) app.config["WITH_LIMIT"] = True except ImportError: # pragma: no cover logger.warning( "Unable to load limiter. Did you run 'pip install " "flask-limiter'?" ) except Exception as exp: # pragma: no cover logger.warning("Unable to initialize limiter: {}".format(str(exp))) else: cache.init_app(app) # Initialize i18n babel.init_app(app) # Create SQLAlchemy if enabled create_db(app, cli, unittest, celery_worker=celery_worker) if not celery_worker: from .api import api, apibp from .routes import mypad, view app.jinja_env.globals.update( isinstance=isinstance, list=list, mypad=mypad, version_id="{}-{}".format(__version__, __release__), ) # We initialize the API api.load_all() app.register_blueprint(apibp) # Then we load our routes app.register_blueprint(view) # Initialize Bower ext app.config.setdefault("BOWER_COMPONENTS_ROOT", os.path.join("static", "vendor")) app.config.setdefault("BOWER_REPLACE_URL_FOR", True) bower = Bower() bower.init_app(app) # Order of the initialization matters! # The websocket must be configured prior to the celery worker for instance # Initialize Session Manager session_manager.init_app(app) # Initialize filter mask.init_app(app) # And the login_manager app.login_manager = LoginManager() app.login_manager.anonymous_user = BUIanon app.login_manager.login_view = "view.login" app.login_manager.login_message_category = "info" app.login_manager.session_protection = "strong" # This is just to have the strings in the .po files app.login_manager.login_message = gettext("Please log in to access this page.") app.login_manager.needs_refresh_message = gettext( "Please reauthenticate to access this page." ) # This will be called at runtime and will then translate the strings app.login_manager.localize_callback = gettext app.login_manager.init_app(app) # Create WebSocket server if create_websocket(app, websocket_server, celery_worker, cli): return app # Create celery app if enabled create_celery(app, warn=False) def _check_session(user, request, api=False): """Check if the session is in the db""" if user and not session_manager.session_in_db(): # pragma: no cover login = getattr(user, "name", None) if login and not is_uuid(login): remember = session.get("persistent", False) if not remember: from flask_login import decode_cookie remember_cookie = request.cookies.get( app.config.get("REMEMBER_COOKIE_NAME"), False ) # check if the remember_cookie is legit if remember_cookie and decode_cookie(remember_cookie): remember = True session_manager.store_session( login, request.remote_addr, request.headers.get("User-Agent"), remember, api, ) elif login: app.uhandler.remove(login) @app.before_request def setup_request(): g.version = "{}-{}".format(__version__, __release__) g.locale = get_locale() g.now = round(time.time()) g.date_format = session.get("dateFormat", "llll") g.timezone = session.get("timezone") # make sure to store secure cookie if required if app.config["BUI_SCOOKIE"]: criteria = ( request.is_secure, request.headers.get("X-Forwarded-Proto", "http") == "https", ) app.config["SESSION_COOKIE_SECURE"] = app.config[ "REMEMBER_COOKIE_SECURE" ] = any(criteria) if "_extra" in request.args: session["_extra"] = request.args.get("_extra") g._extra = session.get("_extra", "") @app.login_manager.user_loader def load_user(userid): """User loader callback""" if app.auth != "none": user = app.uhandler.user(userid) if not user: return None if "X-Language" in request.headers: language = request.headers.get("X-Language") user.language = language session["language"] = language if "_id" not in session: from flask_login import login_user # if _id is not in session, it means we loaded the user from # cache/db using the remember cookie so we need to login it login_user(user, remember=user.is_authenticated, fresh=False) _check_session(user, request) return user return None @app.login_manager.request_loader def load_user_from_request(request): """User loader from request callback""" if app.auth != "none": user = basic_login_from_request(request, app) _check_session(user, request, True) return user @app.after_request def after_request(response): if getattr(g, "basic_session", False): if session_manager.invalidate_current_session(): session_manager.delete_session() return response return app