Source code for burpui.app

# -*- coding: utf8 -*-
"""
Burp-UI is a web-ui for burp backup written in python with Flask and
jQuery/Bootstrap

.. module:: burpui.app
    :platform: Unix
    :synopsis: Burp-UI app module.

.. moduleauthor:: Ziirish <hi+burpui@ziirish.me>
"""
import os
import re
import logging
import warnings

from logging import Formatter

from ._compat import PY3, to_unicode
from .desc import __url__, __doc__, __version__, __release__

if PY3:  # pragma: no cover
    basestring = str


def parse_db_setting(string):
    parts = re.search(
        '(?:(?P<backend>\w+)(?:\+(?P<driver>\w+))?://)?'
        '(?:(?P<user>\w+)(?::?(?P<pass>.+))?@)?'
        '(?P<host>[\w_.-]+):?(?P<port>\d+)?(?:/(?P<db>\w+))?',
        string
    )
    if not parts:  # pragma: no cover
        raise ValueError('Unable to parse the db: "{}"'.format(string))
    back = parts.group('backend') or ''
    user = parts.group('user') or None
    pwd = parts.group('pass') or None
    host = parts.group('host') or ''
    port = parts.group('port') or ''
    db = parts.group('db') or ''
    return (back, user, pwd, host, port, db)


def get_redis_server(myapp):
    host = 'localhost'
    port = 6379
    if myapp.redis and myapp.redis.lower() != 'none':
        try:
            back, user, pwd, host, port, db = parse_db_setting(myapp.redis)
            host = host or 'localhost'
            try:
                port = int(port)
            except (ValueError, IndexError):
                port = 6379
        except ValueError:  # pragma: no cover
            pass
    return host, port, pwd


def create_db(myapp, cli=False, unittest=False, create=True):
    """Create the SQLAlchemy instance if possible

    :param myapp: Application context
    :type myapp: :class:`burpui.server.BUIServer`
    """
    if myapp.config['WITH_SQL']:
        try:
            from .ext.sql import db
            from sqlalchemy.exc import OperationalError
            from sqlalchemy_utils.functions import database_exists
            myapp.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
            if not database_exists(myapp.config['SQLALCHEMY_DATABASE_URI']) and \
                    not cli and not unittest:
                if create:  # pragma: no cover
                    import subprocess
                    local = os.path.join(os.getcwd(), '..', 'tools', 'bui-manage')
                    buimanage = local if os.path.exists(local) else 'bui-manage'
                    cmd = [
                        buimanage,
                        '-c',
                        myapp.config['CFG'],
                        '-l',
                        os.devnull,
                        'db',
                        'upgrade'
                    ]
                    upgd = subprocess.Popen(
                        cmd,
                        stdout=subprocess.PIPE,
                        stderr=subprocess.STDOUT
                    )
                    (out, _) = upgd.communicate()
                    if upgd.returncode != 0:
                        myapp.logger.error(
                            'Disabling SQL support because '
                            'something went wrong while setting up the '
                            'database:\n{}'.format(out)
                        )
                        myapp.config['WITH_SQL'] = False
                        return None
                    return create_db(myapp, cli, unittest, False)
                else:  # pragma: no cover
                    myapp.logger.error(
                        'Database not found, disabling SQL support'
                    )
                    myapp.config['WITH_SQL'] = False
                    return None

            back = parse_db_setting(myapp.config['SQLALCHEMY_DATABASE_URI'])[0]

            if 'mysql' in back:  # pragma: no cover
                # optimize SQL pools for MySQL driver
                myapp.config['SQLALCHEMY_POOL_SIZE'] = 20
                myapp.config['SQLALCHEMY_POOL_RECYCLE'] = 600

            db.init_app(myapp)
            if not cli and not unittest:  # pragma: no cover
                with myapp.app_context():
                    try:
                        import subprocess

                        # get the current revision from alembic_version
                        res = db.engine.execute(
                            'select version_num from alembic_version'
                        )
                        if not res:
                            raise Exception(
                                'Alembic does not seem to be setup'
                            )
                        current = None
                        for row in res:
                            current = to_unicode(row['version_num'])
                            break

                        # get current head using alembic/FLask-Migrate
                        local = os.path.join(
                            os.getcwd(),
                            'tools',
                            'bui-manage'
                        )
                        buimanage = local if os.path.exists(local) \
                            else 'bui-manage'
                        cmd = [
                            buimanage,
                            '-c',
                            myapp.config['CFG'],
                            '-l',
                            os.devnull,
                            'db',
                            'heads'
                        ]
                        rev = subprocess.Popen(
                            cmd,
                            stdout=subprocess.PIPE,
                            stderr=subprocess.STDOUT
                        )
                        (out, _) = rev.communicate()
                        if rev.returncode != 0:
                            raise Exception(
                                'something went wrong while setting up the '
                                'database:\n{}'.format(out)
                            )

                        latest = to_unicode(out).split()[0]

                        # now we compare the revision numbers
                        if latest != current:
                            myapp.logger.critical(
                                'Your database seems out of sync ({} != {}), '
                                'you may want to run \'bui-manage db '
                                'upgrade\'.'.format(latest, current)
                            )
                            myapp.logger.critical(
                                'Disabling SQL support for now.'
                            )
                            myapp.config['WITH_SQL'] = False
                            return None

                    except (OperationalError, Exception) as exp:
                        err = str(exp)
                        if 'no such table' in err:
                            myapp.logger.critical(
                                'Your database seems out of sync, you may want '
                                'to run \'bui-manage db upgrade\'.'
                            )
                        else:
                            myapp.logger.critical(
                                'Something seems to be wrong with your setup: '
                                '{}'.format(err)
                            )

                        myapp.logger.critical('Disabling SQL support for now.')
                        myapp.config['WITH_SQL'] = False
                        return None

            # If we are here, it means everything is alright
            return db

        except ImportError:  # pragma: no cover
            myapp.logger.critical(
                'Unable to load requirements, you may want to run \'pip '
                'install "burp-ui[sql]"\'.\nDisabling SQL support for now.'
            )
            myapp.config['WITH_SQL'] = False
        except OperationalError as exp:  # pragma: no cover
            myapp.logger.critical(
                'unable to contact database: {}\nDisabling SQL '
                'support.'.format(exp)
            )
            myapp.config['WITH_SQL'] = False

    return None


def create_celery(myapp, warn=True):
    """Create the Celery app if possible

    :param myapp: Application context
    :type myapp: :class:`burpui.server.BUIServer`
    """
    if myapp.config['WITH_CELERY']:  # pragma: no cover
        from .ext.async import celery
        from .exceptions import BUIserverException
        host, oport, pwd = get_redis_server(myapp)
        odb = 2
        if isinstance(myapp.use_celery, basestring):
            try:
                (_, _, pwd, host, port, db) = parse_db_setting(myapp.use_celery)
                if not port:
                    port = oport
                if not db:
                    db = odb
                else:
                    try:
                        db = int(db)
                    except ValueError:
                        db = odb
            except ValueError:
                pass
        else:
            db = odb
            port = oport
        if pwd:
            redis_url = 'redis://:{}@{}:{}/{}'.format(pwd, host, port, db)
        else:
            redis_url = 'redis://{}:{}/{}'.format(host, port, db)
        myapp.config['CELERY_BROKER_URL'] = myapp.config['BROKER_URL'] = \
            redis_url
        myapp.config['CELERY_RESULT_BACKEND'] = redis_url
        celery.conf.update(myapp.config)

        if not hasattr(celery, 'flask_app'):
            celery.flask_app = myapp

        TaskBase = celery.Task

        class ContextTask(TaskBase):
            abstract = True

            def __call__(self, *args, **kwargs):
                with myapp.app_context():
                    try:
                        return TaskBase.__call__(self, *args, **kwargs)
                    except BUIserverException:
                        # ignore unhandled exceptions in the celery worker
                        pass

        celery.Task = ContextTask

        return celery

    if warn:  # pragma: no cover
        message = 'Something went wrong while initializing celery worker.\n' \
                  'Maybe it is not enabled in your conf ' \
                  '({}).'.format(myapp.config['CFG'])
        warnings.warn(
            message,
            RuntimeWarning
        )

    return None


[docs]def create_app(conf=None, verbose=0, logfile=None, **kwargs): """Initialize the whole application. :param conf: Configuration file to use :type conf: str :param verbose: Set the verbosity level :type verbose: int :param logfile: Store the logs in the given file :type logfile: str :param kwargs: Extra options: - gunicorn (bool): Enable gunicorn engine instead of flask's default. Default is True. - unittest (bool): Are we running tests (used for test only). Default is False. - debug (bool): Enable debug mode. Default is False. - cli (bool): Are we running the CLI. Default is False. - reverse_proxy (bool): Are we behind a reverse-proxy. Default is True if gunicorn is True :type kwargs: dict :returns: A :class:`burpui.server.BUIServer` object """ from flask import g, request, session from flask_login import LoginManager from flask_bower import Bower from flask_babel import gettext from .utils import basic_login_from_request, ReverseProxied, lookup_file, \ is_uuid from .server import BUIServer as BurpUI from .sessions import session_manager from .routes import view, mypad from .api import api, apibp from .ext.cache import cache from .ext.i18n import babel, get_locale logger = logging.getLogger('burp-ui') gunicorn = kwargs.get('gunicorn', True) unittest = kwargs.get('unittest', False) debug = kwargs.get('debug', False) cli = kwargs.get('cli', False) reverse_proxy = kwargs.get('reverse_proxy', gunicorn) # The debug argument used to be a boolean so we keep supporting this format if isinstance(verbose, bool): if verbose: verbose = logging.DEBUG else: verbose = logging.CRITICAL else: levels = [ logging.CRITICAL, logging.ERROR, logging.WARNING, logging.INFO, logging.DEBUG ] if verbose >= len(levels): verbose = len(levels) - 1 if not verbose: verbose = 0 verbose = levels[verbose] if logfile: from logging.handlers import RotatingFileHandler handler = RotatingFileHandler( logfile, maxBytes=1024 * 1024 * 100, backupCount=5 ) else: from logging import StreamHandler handler = StreamHandler() if verbose > logging.DEBUG: LOG_FORMAT = ( '[%(asctime)s] %(levelname)s in ' '%(module)s.%(funcName)s: %(message)s' ) else: LOG_FORMAT = ( '-' * 27 + '[%(asctime)s]' + '-' * 28 + '\n' + '%(levelname)s in %(module)s.%(funcName)s ' + '[%(pathname)s:%(lineno)d]:\n' + '%(message)s\n' + '-' * 80 ) handler.setLevel(verbose) handler.setFormatter(Formatter(LOG_FORMAT)) logger.setLevel(verbose) logger.addHandler(handler) logger.debug( 'conf: {}\n'.format(conf) + 'verbose: {}\n'.format(logging.getLevelName(verbose)) + 'logfile: {}\n'.format(logfile) + 'gunicorn: {}\n'.format(gunicorn) + 'debug: {}\n'.format(debug) + 'unittest: {}\n'.format(unittest) + 'cli: {}\n'.format(cli) + 'reverse_proxy: {}'.format(reverse_proxy) ) if not unittest: # pragma: no cover from ._compat import patch_json patch_json() # We initialize the core app = BurpUI() if verbose: app.enable_logger() app.gunicorn = gunicorn app.config['CFG'] = None # Some config app.config['BUI_CLI'] = cli # FIXME: strange behavior when bundling errors # app.config['BUNDLE_ERRORS'] = True app.config['REMEMBER_COOKIE_HTTPONLY'] = True if debug and not gunicorn: # pragma: no cover app.config['DEBUG'] = True and not unittest app.config['TESTING'] = True and not unittest # Still need to test conf file here because the init function can be called # by gunicorn directly if conf: app.config['CFG'] = lookup_file(conf, guess=False) else: app.config['CFG'] = lookup_file() logger.info('Using configuration: {}'.format(app.config['CFG'])) app.setup(app.config['CFG'], unittest, cli) if debug: app.config.setdefault('TEMPLATES_AUTO_RELOAD', True) app.config['TEMPLATES_AUTO_RELOAD'] = True app.config['DEBUG'] = True app.jinja_env.globals.update( isinstance=isinstance, list=list, mypad=mypad, version_id='{}-{}'.format(__version__, __release__), ) # manage application secret key if app.secret_key and \ (app.secret_key.lower() == 'none' or (app.secret_key.lower() == 'random' and gunicorn)): # pragma: no cover logger.critical('Your setup is not secure! Please consider setting a' ' secret key in your configuration file') app.secret_key = 'Burp-UI' if not app.secret_key or app.secret_key.lower() == 'random': from base64 import b64encode app.secret_key = b64encode(os.urandom(256)) app.wsgi_app = ReverseProxied(app.wsgi_app, app) # Manage reverse_proxy special tricks & improvements if reverse_proxy: # pragma: no cover from werkzeug.contrib.fixers import ProxyFix app.wsgi_app = ProxyFix(app.wsgi_app) if app.storage and app.storage.lower() == 'redis': try: # Session setup if not app.session_db or \ str(app.session_db).lower() not in ['none', 'false']: from redis import Redis from .ext.session import sess host, port, pwd = get_redis_server(app) db = 0 if app.session_db and \ str(app.session_db).lower() not \ in ['redis', 'default', 'true']: try: # pragma: no cover (_, _, pwd, host, port, db) = \ parse_db_setting(app.session_db) except ValueError as exp: logger.warning(str(exp)) try: db = int(db) except ValueError: db = 0 logger.debug( 'SESSION: Using redis://guest:****@{}:{}/{}'.format( host, port, db) ) red = Redis(host=host, port=port, db=db, password=pwd) app.config['SESSION_TYPE'] = 'redis' app.config['SESSION_REDIS'] = red app.config['SESSION_USE_SIGNER'] = app.secret_key is not None app.config['SESSION_PERMANENT'] = False sess.init_app(app) session_manager.backend = red except Exception as exp: # pragma: no cover logger.warning('Unable to initialize session: {}'.format(str(exp))) try: # Cache setup if not app.cache_db or \ str(app.cache_db).lower() not in ['none', 'false']: host, port, pwd = get_redis_server(app) db = 1 if app.cache_db and \ str(app.cache_db).lower() not \ in ['redis', 'default', 'true']: try: # pragma: no cover (_, _, pwd, host, port, db) = \ parse_db_setting(app.cache_db) except ValueError as exp: logger.warning(str(exp)) try: db = int(db) except ValueError: db = 1 logger.debug('CACHE: Using redis://guest:****@{}:{}/{}'.format( host, port, db) ) cache.init_app( app, config={ 'CACHE_TYPE': 'redis', 'CACHE_REDIS_HOST': host, 'CACHE_REDIS_PORT': port, 'CACHE_REDIS_PASSWORD': pwd, 'CACHE_REDIS_DB': db } ) # clear cache at startup in case we removed or added servers with app.app_context(): cache.clear() else: # pragma: no cover cache.init_app(app) except Exception as exp: # pragma: no cover logger.warning('Unable to initialize cache: {}'.format(str(exp))) cache.init_app(app) try: # Limiter setup if app.limiter and str(app.limiter).lower() not \ in ['none', 'false']: # pragma: no cover from .ext.limit import limiter app.config['RATELIMIT_HEADERS_ENABLED'] = True if app.limiter and str(app.limiter).lower() not \ in ['default', 'redis', 'true']: app.config['RATELIMIT_STORAGE_URL'] = app.limiter else: db = 3 host, port, pwd = get_redis_server(app) if pwd: conn = 'redis://guest:{}@{}:{}/{}'.format( pwd, host, port, db ) else: conn = 'redis://{}:{}/{}'.format(host, port, db) app.config['RATELIMIT_STORAGE_URL'] = conn (_, _, pwd, host, port, db) = parse_db_setting( app.config['RATELIMIT_STORAGE_URL'] ) logger.debug( 'LIMITER: Using redis://guest:****@{}:{}/{}'.format( host, port, db ) ) limiter.init_app(app) app.config['WITH_LIMIT'] = True except ImportError: # pragma: no cover logger.warning('Unable to load limiter. Did you run \'pip install ' 'flask-limiter\'?') except Exception as exp: # pragma: no cover logger.warning('Unable to initialize limiter: {}'.format(str(exp))) else: cache.init_app(app) # Initialize i18n babel.init_app(app) # Create SQLAlchemy if enabled create_db(app, cli, unittest) # We initialize the API api.version = __version__ api.release = __release__ api.__url__ = __url__ api.__doc__ = __doc__ api.load_all() app.register_blueprint(apibp) # Then we load our routes view.__url__ = __url__ view.__doc__ = __doc__ app.register_blueprint(view) # And the login_manager app.login_manager = LoginManager() app.login_manager.login_view = 'view.login' app.login_manager.login_message_category = 'info' app.login_manager.session_protection = 'strong' # This is just to have the strings in the .po files app.login_manager.login_message = gettext( 'Please log in to access this page.' ) app.login_manager.needs_refresh_message = gettext( 'Please reauthenticate to access this page.' ) # This will be called at runtime and will then translate the strings app.login_manager.localize_callback = gettext app.login_manager.init_app(app) # Initialize Session Manager session_manager.init_app(app) # Initialize Bower ext app.config.setdefault( 'BOWER_COMPONENTS_ROOT', os.path.join('static', 'vendor') ) app.config.setdefault('BOWER_REPLACE_URL_FOR', True) bower = Bower() bower.init_app(app) # Create celery app if enabled create_celery(app, warn=False) if app.config['WITH_CELERY']: # may fail in case redis is not running (this can happen while running # the bui-manage script) try: from .api.async import force_scheduling_now force_scheduling_now() except: # pragma: no cover pass def _check_session(user, request, api=False): """Check if the session is in the db""" if user and not session_manager.session_in_db(): # pragma: no cover login = getattr(user, 'name', None) if login and not is_uuid(login): remember = session.get('persistent', False) if not remember: from flask_login import decode_cookie remember_cookie = request.cookies.get( app.config.get('REMEMBER_COOKIE_NAME'), False ) # check if the remember_cookie is legit if remember_cookie and decode_cookie(remember_cookie): remember = True session_manager.store_session( login, request.remote_addr, request.headers.get('User-Agent'), remember, api ) elif login: app.uhandler.remove(login) @app.before_request def setup_request(): g.locale = get_locale() g.date_format = session.get('dateFormat', 'llll') # make sure to store secure cookie if required if app.scookie: criteria = [ request.is_secure, request.headers.get('X-Forwarded-Proto', 'http') == 'https' ] app.config['SESSION_COOKIE_SECURE'] = \ app.config['REMEMBER_COOKIE_SECURE'] = any(criteria) @app.login_manager.user_loader def load_user(userid): """User loader callback""" if app.auth != 'none': user = app.uhandler.user(userid) if 'X-Language' in request.headers: language = request.headers.get('X-Language') user.language = language session['language'] = language _check_session(user, request) return user return None @app.login_manager.request_loader def load_user_from_request(request): """User loader from request callback""" if app.auth != 'none': user = basic_login_from_request(request, app) _check_session(user, request, True) return user @app.after_request def after_request(response): if getattr(g, 'basic_session', False): if session_manager.invalidate_current_session(): session_manager.delete_session() return response return app
init = create_app