Source code for burpui.misc.acl.meta

# -*- coding: utf8 -*-
"""
.. module:: burpui.misc.acl.meta
    :platform: Unix
    :synopsis: Burp-UI ACL meta definitions.

.. moduleauthor:: Ziirish <hi+burpui@ziirish.me>

"""
import fnmatch
import json
import re

from ...config import config
from ...ext.cache import cache
from ...utils import make_list
from .interface import BUIacl

PARSE_EXCLUDE_KEYS = ["agents", "clients", "ro", "rw", "order", "exclude"]
PARSE_RESERVED_KEYS = ["ro", "rw", "order", "exclude"]
DEFAULT_EVAL_ORDER = ["exclude", "rw", "ro"]
MODE_RETURN = {
    "ro": False,
    "rw": True,
}


def _extract_key(data, key, name, default=[], fallback="clients"):
    if not isinstance(data, dict):
        return default

    ret = None
    extract = data.get(key, {})

    if isinstance(name, list):
        for nm in name:
            if nm in extract:
                ret = make_list(extract[nm])
    elif name:
        if name in extract:
            ret = make_list(extract[name])

    if ret:
        if key == "order":
            for odr in DEFAULT_EVAL_ORDER:
                if odr not in ret:
                    ret.append(odr)
        return ret
    return extract.get(fallback, default)


def _glob_match(globs, string, extended=True):
    def __eval_glob(glob):
        if extended:
            reg = fnmatch.translate(glob)
            return re.match(reg, string)
        return glob == string

    if not isinstance(globs, list):
        if __eval_glob(globs):
            return [globs]
        return []
    ret = []
    for glob in globs:
        if __eval_glob(glob):
            if extended:
                ret.append(glob)
            else:
                ret.append(string)
                break
    return ret


class BUImetaGrant(object):
    def _merge_data(self, d1, d2):
        """Merge data as list or dict recursively avoiding duplicates"""
        if not d1 and not d2:
            if isinstance(d1, dict) or isinstance(d2, dict):
                return {}
            return []
        if not d2:
            return d1
        if not d1:
            return d2
        if isinstance(d1, list) and isinstance(d2, list):
            return list(set(d1 + d2))
        if isinstance(d1, list) and not isinstance(d2, dict):
            if d2 in d1:
                return d1
            return d1 + make_list(d2)
        if isinstance(d2, list) and not isinstance(d1, dict):
            if d1 in d2:
                return d2
            return d2 + make_list(d1)
        if not isinstance(d1, dict) and not isinstance(d2, dict):
            if d1 == d2:
                return make_list(d1)
            else:
                return [d1, d2]

        res = d1
        for key2, val2 in d2.items():
            if key2 in res:
                res[key2] = self._merge_data(val2, res[key2])
            else:
                res[key2] = val2
        return res

    def _parse_clients(self, data, mode=None, parent=None):
        agents = clients = []
        advanced = {}
        if isinstance(data, list):
            if mode:
                if parent and parent not in PARSE_EXCLUDE_KEYS:
                    advanced[mode] = {parent: data}
                else:
                    advanced[mode] = {"clients": data}
            return data, agents, advanced
        if not isinstance(data, dict):
            if mode:
                if parent and parent not in PARSE_EXCLUDE_KEYS:
                    advanced[mode] = {parent: make_list(data)}
                else:
                    advanced[mode] = {"clients": make_list(data)}
            return make_list(data), agents, advanced
        for key, val in data.items():
            if key in PARSE_EXCLUDE_KEYS:
                continue
            cl1, ag1, ad1 = self._parse_clients(val, parent=key)
            agents = self._merge_data(agents, ag1)
            clients = self._merge_data(clients, cl1)
            agents = self._merge_data(agents, key)
            advanced = self._merge_data(advanced, ad1)
            advanced = self._merge_data(advanced, {key: cl1})
            if mode:
                if parent and parent not in PARSE_EXCLUDE_KEYS:
                    advanced = self._merge_data(advanced, {mode: {parent: cl1}})
                else:
                    advanced = self._merge_data(advanced, {mode: {key: cl1}})

        for key in ["clients"] + PARSE_RESERVED_KEYS:
            md = None
            if key in data:
                if key in PARSE_RESERVED_KEYS:
                    md = key
                    par = parent
                else:
                    par = key
                cl2, ag2, ad2 = self._parse_clients(data[key], md, parent=par)
                agents = self._merge_data(agents, ag2)
                if not md or md not in ["order", "exclude"]:
                    clients = self._merge_data(clients, cl2)
                if parent and parent not in PARSE_EXCLUDE_KEYS:
                    ro = ad2.get("ro")
                    rw = ad2.get("rw")
                    if ro and "clients" in ro:
                        ro[parent] = ro["clients"]
                        del ro["clients"]
                        ad2["ro"] = ro
                    if rw and "clients" in rw:
                        rw[parent] = rw["clients"]
                        del rw["clients"]
                        ad2["rw"] = rw
                advanced = self._merge_data(advanced, ad2)

        if "agents" in data:
            ag3, cl3, ad3 = self._parse_agents(data["agents"])
            agents = self._merge_data(agents, ag3)
            clients = self._merge_data(clients, cl3)
            advanced = self._merge_data(advanced, ad3)

        return make_list(clients), make_list(agents), advanced

    def _parse_agents(self, data, mode=None):
        agents = clients = []
        advanced = {}
        if isinstance(data, list):
            if mode:
                advanced[mode] = {"agents": data}
            return data, clients, advanced
        if not isinstance(data, dict):
            if mode:
                advanced[mode] = {"agents": make_list(data)}
            return make_list(data), clients, advanced
        for key, val in data.items():
            if key in PARSE_EXCLUDE_KEYS:
                continue
            cl1, ag1, ad1 = self._parse_clients(data)
            agents = self._merge_data(agents, ag1)
            clients = self._merge_data(clients, cl1)
            agents = self._merge_data(agents, key)
            advanced = self._merge_data(advanced, ad1)
            if mode:
                advanced = self._merge_data(advanced, {mode: ad1})
            # FIXME: why did I do that?
            # advanced = self._merge_data(advanced, {key: cl1})
            # if mode:
            #     advanced = self._merge_data(advanced, {mode: {key: cl1}})

        for key in ["agents"] + PARSE_RESERVED_KEYS:
            md = None
            if key in data:
                if key in PARSE_RESERVED_KEYS:
                    md = key
                ag2, cl2, ad2 = self._parse_agents(data[key], md)
                agents = self._merge_data(agents, ag2)
                clients = self._merge_data(clients, cl2)
                advanced = self._merge_data(advanced, ad2)

        if "clients" in data:
            cl3, ag3, ad3 = self._parse_clients(data["clients"])
            agents = self._merge_data(agents, ag3)
            clients = self._merge_data(clients, cl3)
            advanced = self._merge_data(advanced, ad3)

        return make_list(agents), make_list(clients), advanced


[docs]class BUIgrantHandler(BUImetaGrant, BUIacl): """This class is here to handle grants in a generic way. It will automatically merge grants from various backends that register to it """ _id = 1 _gp_admin_name = "@BUIADMINRESERVED" _gp_moderator_name = "@moderator" _gp_hidden = set([str(_gp_admin_name[1:]), str(_gp_moderator_name[1:])]) _grants = {} _groups = {} _options = {} _backends = {} _name = "meta_grant" @property def id(self): """current handler id, used to detect configuration changes""" return self._id @property def grants(self): """grants managed by our handler""" return self._grants @property def groups(self): """groups managed by our handler""" return self._groups @property def options(self): """options of our ACL engine""" return self._options @options.setter def options(self, value): """set the options of our engine""" self._options = value if self._options.get("legacy"): self._options["extended"] = False
[docs] def changed(self, sid): """detect a configuration change""" return sid != self._id
[docs] def reset(self, reset_from): """a configuration change occurred, we reload our grants and groups""" self._grants.clear() self._groups.clear() self._reset_cached() self._id += 1 for name, backend in self._backends.items(): if name == reset_from: continue backend.reload()
[docs] def opt(self, key, default=False): """access a given option""" if key not in self.options: return default return self.options.get(key)
[docs] def register_backend(self, name, backend): """register a new ACL backend :param name: Backend name :type name: str :param backend: ACL Backend :type backend: :class:`burpui.misc.acl.interface.BUIaclLoader` """ self._backends[name] = backend
[docs] def set_grant(self, name, grant): """parse and set the given grants""" if name in self._grants: return self._grants[name].add_grants(grant) self._grants[name] = BUIaclGrant(name, grant) return self._grants[name].grants
[docs] def set_group(self, name, members): """parse and set the given group""" if name in self._groups: return self._groups[name].add_members(members) self._groups[name] = BUIaclGroup(name, members) return self._groups[name].members
[docs] def set_admin(self, admins): """parse and set the admins""" self.set_group(self._gp_admin_name, admins)
[docs] def set_moderator(self, moderators): """parse and set the moderators""" self.set_group(self._gp_moderator_name, moderators)
[docs] def set_moderator_grants(self, grants): """parse and set the moderators grants""" self.set_grant(self._gp_moderator_name, grants)
def get_member_groups(self, member): groups = [] for group in self._groups.values(): (ret, inh) = group.is_member(member) if ret and group.name not in self._gp_hidden: groups.append((group.name, inh)) return groups def _gen_key(self, username): return "{}-{}".format(self._name, username) def _set_cached(self, username, value): key = self._gen_key(username) return cache.cache.set(key, value) def _get_cached(self, username): key = self._gen_key(username) return cache.cache.get(key) def _reset_cached(self): cache.clear() def _is_cached(self, username): key = self._gen_key(username) return cache.cache.has(key) def _extract_grants(self, username, parent=None): if not self._is_cached(username): data = {} if username in self.grants: grants = self.grants[username].grants else: grants = [] clients, agents, advanced = self._parse_clients(grants) data["clients"] = clients data["agents"] = agents data["advanced"] = [advanced] if advanced else [] def __merge_grants_with(grp, prt): data2 = self._extract_grants(grp, prt) data["clients"] = self._merge_data(data["clients"], data2["clients"]) data["agents"] = self._merge_data(data["agents"], data2["agents"]) tmp = data2["advanced"] if tmp: data["advanced"] += tmp # moderator is also a group for gname, group in self.groups.items(): # no grants need to be parsed for admins if gname == self._gp_admin_name: continue (ret, _) = group.is_member(username) if not parent: parent = set([username]) elif isinstance(parent, set): parent.add(username) if ret and gname != username and parent and gname not in parent: __merge_grants_with(gname, parent) self._set_cached(username, data) return data return self._get_cached(username) def _extract_clients(self, username): ret = self._extract_grants(username) return ret.get("clients", []) def _extract_agents(self, username): ret = self._extract_grants(username) return ret.get("agents", []) def _extract_advanced(self, username, idx=None): ret = self._extract_grants(username).get("advanced", []) if idx is not None: return ret[idx] if self.opt("inverse_inheritance"): return reversed(ret) return ret def _extract_advanced_mode(self, username, mode, kind, idx): return self._extract_advanced(username, idx).get(mode, {}).get(kind, []) def _client_match(self, username, client): clients = self._extract_clients(username) if not clients: return None if self.opt("extended"): matches = [] for exp in clients: regex = fnmatch.translate(exp) if re.match(regex, client): matches.append(exp) return matches if matches else False else: return [client] if client in clients else False def _server_match(self, username, server): servers = self._extract_agents(username) if not servers: return None if self.opt("extended"): matches = [] for exp in servers: regex = fnmatch.translate(exp) if re.match(regex, server): matches.append(exp) return matches if matches else False else: return [server] if server in servers else False # implement BUIacl methods
[docs] def is_admin(self, username): """See :func:`burpui.misc.acl.interface.BUIacl.is_admin`""" if self._gp_admin_name in self._groups: return self._groups[self._gp_admin_name].is_member(username) return False, None
[docs] def is_moderator(self, username): """See :func:`burpui.misc.acl.interface.BUIacl.is_moderator`""" if self._gp_moderator_name in self._groups: return self._groups[self._gp_moderator_name].is_member(username) return False, None
[docs] def is_client_rw(self, username=None, client=None, server=None): """See :func:`burpui.misc.acl.interface.BUIacl.is_client_rw`""" if not username or not client: # pragma: no cover return False (is_admin, _) = self.is_admin(username) ret = is_admin or self.opt("assume_rw", True) or self.opt("legacy") if self.is_client_allowed(username, client, server): # legacy mode: assume rw for everyone if self.opt("legacy"): return True client_match = self._client_match(username, client) advanced = self._extract_advanced(username) if client_match is None and username == client: client_match = [username] if server: server_match = self._server_match(username, server) if not server_match and not client_match: return is_admin or self.opt("assume_rw", True) for adv in advanced: order = _extract_key( adv, "order", [server] + server_match, DEFAULT_EVAL_ORDER ) for adv2 in advanced: # the whole agent is rw and we did not find explicit entry for # client_match if client_match is False: if server_match and any( x in adv.get("rw", {}) or x in adv.get("rw", {}).get("agents", []) for x in server_match ): return True if server in adv.get("rw", {}) or server in adv.get( "rw", {} ).get("agents", []): return True if server_match and any( x in adv.get("rw", {}) or x in adv.get("rw", {}).get("agents", []) for x in server_match ): for odr in order: if client_match and any( x in adv2.get(odr, []) or x in adv2.get(odr, {}).get("clients", []) or any( x in adv2.get(odr, {}).get(y, []) for y in server_match ) for x in client_match ): return MODE_RETURN.get(odr, False) if server_match and any( x in adv.get("ro", {}) or x in adv.get("ro", {}).get("agents", []) for x in server_match ): for odr in order: if client_match and any( x in adv2.get(odr, {}).get("clients", []) or x in adv2.get(odr, []) or any( x in adv2.get(odr, {}).get(y, []) for y in server_match ) for x in client_match ): return MODE_RETURN.get(odr, False) for adv in advanced: if server: key = [server] + self._server_match(username, server) else: key = None order = _extract_key(adv, "order", key, DEFAULT_EVAL_ORDER) for odr in order: eval_clients = adv.get(odr, {}).get("clients", []) if client_match and any(x in eval_clients for x in client_match): return MODE_RETURN.get(odr, False) if client and client in eval_clients: return MODE_RETURN.get(odr, False) return ret
[docs] def is_client_allowed(self, username=None, client=None, server=None): """See :func:`burpui.misc.acl.interface.BUIacl.is_client_allowed`""" if not username or not client: # pragma: no cover return False (is_admin, _) = self.is_admin(username) client_match = self._client_match(username, client) if client_match is None and username == client: client_match = [username] elif not client_match: client_match = False if server: server_match = self._server_match(username, server) if server_match is not None or self.opt("legacy"): if not server_match: return is_admin advanced = self._extract_advanced(username) if self.opt("implicit_link", True) and not advanced: advanced = False if advanced is not False: for idx, adv in enumerate(advanced): order = _extract_key( adv, "order", [server] + server_match, DEFAULT_EVAL_ORDER ) excludes = _extract_key( adv, "exclude", [server] + server_match, fallback="agents" ) if all(x not in adv for x in server_match): for odr in order: if odr == "exclude" and ( any(x in excludes for x in client_match) or _glob_match( excludes, client, self.opt("extended") ) ): return False elif any( x in y for x in server_match for y in self._extract_advanced_mode( username, odr, "agents", idx ) ): return True tmp = set(adv.get(server, [])) for srv in server_match: tmp |= set(adv.get(srv, [])) adv2 = list(tmp) excludes = _extract_key(adv, "exclude", [server] + server_match) for odr in order: if odr == "exclude" and ( any(x in excludes for x in client_match) or _glob_match(excludes, client, self.opt("extended")) ): return False elif client_match is not False and ( any(x in adv2 for x in client_match) or client in adv2 ): return True return False advanced = self._extract_advanced(username) if advanced: for adv in advanced: order = _extract_key(adv, "order", None, DEFAULT_EVAL_ORDER) excludes = _extract_key(adv, "exclude", None) for odr in order: if ( odr == "exclude" and client_match and ( any(x in excludes for x in client_match) or _glob_match(excludes, client, self.opt("extended")) ) ): return False return client_match is not False or is_admin
[docs] def is_server_rw(self, username=None, server=None): """See :func:`burpui.misc.acl.interface.BUIacl.is_server_rw`""" if not username: # pragma: no cover return False # special case single-agent mode if not server and config.get("STANDALONE"): server = "local" (is_admin, _) = self.is_admin(username) ret = is_admin or self.opt("assume_rw", True) or self.opt("legacy") if self.is_server_allowed(username, server): server_match = self._server_match(username, server) if not server_match: return is_admin or self.opt("assume_rw", True) advanced = self._extract_advanced(username) for adv in advanced: order = _extract_key( adv, "order", [server] + server_match, DEFAULT_EVAL_ORDER ) for odr in order: if any( x in adv.get(odr, {}).get("agents", []) for x in server_match ): return MODE_RETURN.get(odr, False) return ret
[docs] def is_server_allowed(self, username=None, server=None): """See :func:`burpui.misc.acl.interface.BUIacl.is_server_allowed`""" if not username or not server: return False server_match = self._server_match(username, server) (is_admin, _) = self.is_admin(username) if server_match is None and self.opt("legacy"): server_match = False return server_match is not False or is_admin
class BUIaclGroup(object): """The :class:`burpui.misc.acl.interface.BUIaclGroup` class is used to represent a Group""" def __init__(self, name, members=None): self._name = name self._set_members(members) self.has_subgroups = -1 def _parse_members(self, members): # we support only lists if members and "," in members and not isinstance(members, list): parsed = [x.strip() for x in members.split(",")] else: parsed = make_list(members) return parsed def _set_members(self, members): self._members = set(self._parse_members(members)) def add_members(self, new_members): new_members = self._parse_members(new_members) self._members = self._members | set(new_members) # reset the flag self.has_subgroups = -1 return new_members def del_members(self, members_remove): members_remove = self._parse_members(members_remove) self._members = self._members - set(members_remove) # reset the flag self.has_subgroups = -1 def is_member(self, member, parent=None): inherit = set() ret = member in self._members if not ret and (self.has_subgroups > 0 or self.has_subgroups == -1): self.has_subgroups = 0 for mem in self.members: # avoid infinite loop with mutual inheritance if parent and mem in parent: continue if mem.startswith("@"): self.has_subgroups += 1 if mem in meta_grants._groups: if parent: parent.append(mem) else: parent = [mem] (ret, inh2) = meta_grants._groups[mem].is_member( member, parent=parent ) if ret: for subinh in inh2: inherit.add(subinh) inherit.add(mem) # no break, we may have other inheritance at the level return ret, list(inherit) @property def name(self): if self._name and any(self._name.startswith(x) for x in ["@", "+"]): return str(self._name[1:]) return self._name @property def members(self): return list(self._members) class BUIaclGrant(BUImetaGrant): """The :class:`burpui.misc.acl.interface.BUIaclGrant` class is used to represent a Grant""" def __init__(self, name, grants): self._name = name self._grants = self._parse_grants(grants) def _parse_grants(self, grants): try: ret = json.loads(grants) except (ValueError, TypeError): # handle empty/missing grants if not grants: return [] # ignore mal-formatted json if any(x in grants for x in ["{", "}", "[", "]"]): ret = None elif grants and "," in grants: ret = [x.rstrip() for x in grants.split(",")] else: ret = make_list(grants) return ret @property def name(self): if self._name and any(self._name.startswith(x) for x in ["@", "+"]): return str(self._name[1:]) return self._name @property def grants(self): return self._grants @property def grants_raw(self): return json.dumps(self._grants) def add_grants(self, grants): parsed = self._parse_grants(grants) self._grants = self._merge_data(self._grants, parsed) return parsed meta_grants = BUIgrantHandler()