Source code for burpui.misc.acl.meta

# -*- coding: utf8 -*-
"""
.. module:: burpui.misc.acl.meta
    :platform: Unix
    :synopsis: Burp-UI ACL meta definitions.

.. moduleauthor:: Ziirish <hi+burpui@ziirish.me>

"""
from .interface import BUIacl
from ...utils import make_list
from ...config import config
from ...ext.cache import cache

from six import iteritems, itervalues

import re
import json
import fnmatch


class BUImetaGrant(object):

    def _merge_data(self, d1, d2):
        """Merge data as list or dict recursively avoiding duplicates"""
        if not d1 and not d2:
            if isinstance(d1, dict) or isinstance(d2, dict):
                return {}
            return []
        if not d2:
            return d1
        if not d1:
            return d2
        if isinstance(d1, list) and isinstance(d2, list):
            return list(set(d1 + d2))
        if isinstance(d1, list) and not isinstance(d2, dict):
            if d2 in d1:
                return d1
            return d1 + make_list(d2)
        if isinstance(d2, list) and not isinstance(d1, dict):
            if d1 in d2:
                return d2
            return d2 + make_list(d1)
        if not isinstance(d1, dict) and not isinstance(d2, dict):
            if d1 == d2:
                return make_list(d1)
            else:
                return [d1, d2]

        res = d1
        for key2, val2 in iteritems(d2):
            if key2 in res:
                res[key2] = self._merge_data(val2, res[key2])
            else:
                res[key2] = val2
        return res

    def _parse_clients(self, data, mode=None, parent=None):
        agents = clients = []
        advanced = {}
        if isinstance(data, list):
            if mode:
                if parent and parent not in ['agents', 'clients', 'ro', 'rw']:
                    advanced[mode] = {parent: data}
                else:
                    advanced[mode] = {'clients': data}
            return data, agents, advanced
        if not isinstance(data, dict):
            if mode:
                if parent and parent not in ['agents', 'clients', 'ro', 'rw']:
                    advanced[mode] = {parent: make_list(data)}
                else:
                    advanced[mode] = {'clients': make_list(data)}
            return make_list(data), agents, advanced
        for key, val in iteritems(data):
            if key in ['agents', 'clients', 'ro', 'rw']:
                continue
            cl1, ag1, ad1 = self._parse_clients(val, parent=key)
            agents = self._merge_data(agents, ag1)
            clients = self._merge_data(clients, cl1)
            agents = self._merge_data(agents, key)
            advanced = self._merge_data(advanced, ad1)
            advanced = self._merge_data(advanced, {key: cl1})
            if mode:
                if parent and parent not in ['agents', 'clients', 'ro', 'rw']:
                    advanced = self._merge_data(advanced, {mode: {parent: cl1}})
                else:
                    advanced = self._merge_data(advanced, {mode: {key: cl1}})

        for key in ['clients', 'ro', 'rw']:
            md = None
            if key in data:
                if key in ['ro', 'rw']:
                    md = key
                cl2, ag2, ad2 = self._parse_clients(data[key], md, parent=key)
                agents = self._merge_data(agents, ag2)
                clients = self._merge_data(clients, cl2)
                if parent and parent not in ['agents', 'clients', 'ro', 'rw']:
                    ro = ad2.get('ro')
                    rw = ad2.get('rw')
                    if ro and 'clients' in ro:
                        ro[parent] = ro['clients']
                        del ro['clients']
                        ad2['ro'] = ro
                    if rw and 'clients' in rw:
                        rw[parent] = rw['clients']
                        del rw['clients']
                        ad2['rw'] = rw
                advanced = self._merge_data(advanced, ad2)

        if 'agents' in data:
            ag3, cl3, ad3 = self._parse_agents(data['agents'])
            agents = self._merge_data(agents, ag3)
            clients = self._merge_data(clients, cl3)
            advanced = self._merge_data(advanced, ad3)

        return make_list(clients), make_list(agents), advanced

    def _parse_agents(self, data, mode=None):
        agents = clients = []
        advanced = {}
        if isinstance(data, list):
            if mode:
                advanced[mode] = {'agents': data}
            return data, clients, advanced
        if not isinstance(data, dict):
            if mode:
                advanced[mode] = {'agents': make_list(data)}
            return make_list(data), clients, advanced
        for key, val in iteritems(data):
            if key in ['agents', 'clients', 'ro', 'rw']:
                continue
            cl1, ag1, ad1 = self._parse_clients(data)
            agents = self._merge_data(agents, ag1)
            clients = self._merge_data(clients, cl1)
            agents = self._merge_data(agents, key)
            advanced = self._merge_data(advanced, ad1)
            if mode:
                advanced = self._merge_data(advanced, {mode: ad1})
            # FIXME: why did I do that?
            # advanced = self._merge_data(advanced, {key: cl1})
            # if mode:
            #     advanced = self._merge_data(advanced, {mode: {key: cl1}})

        for key in ['agents', 'ro', 'rw']:
            md = None
            if key in data:
                if key in ['ro', 'rw']:
                    md = key
                ag2, cl2, ad2 = self._parse_agents(data[key], md)
                agents = self._merge_data(agents, ag2)
                clients = self._merge_data(clients, cl2)
                advanced = self._merge_data(advanced, ad2)

        if 'clients' in data:
            cl3, ag3, ad3 = self._parse_clients(data['clients'])
            agents = self._merge_data(agents, ag3)
            clients = self._merge_data(clients, cl3)
            advanced = self._merge_data(advanced, ad3)

        return make_list(agents), make_list(clients), advanced


[docs]class BUIgrantHandler(BUImetaGrant, BUIacl): """This class is here to handle grants in a generic way. It will automatically merge grants from various backends that register to it """ _id = 1 _gp_admin_name = '@BUIADMINRESERVED' _gp_moderator_name = '@moderator' _gp_hidden = set([str(_gp_admin_name[1:]), str(_gp_moderator_name[1:])]) _grants = {} _groups = {} _options = {} _backends = {} _name = 'meta_grant' @property def id(self): """current handler id, used to detect configuration changes""" return self._id @property def grants(self): """grants managed by our handler""" return self._grants @property def groups(self): """groups managed by our handler""" return self._groups @property def options(self): """options of our ACL engine""" return self._options @options.setter def options(self, value): """set the options of our engine""" self._options = value if self._options.get('legacy'): self._options['extended'] = False
[docs] def changed(self, sid): """detect a configuration change""" return sid != self._id
[docs] def reset(self, reset_from): """a configuration change occurred, we reload our grants and groups""" self._grants.clear() self._groups.clear() self._reset_cached() self._id += 1 for name, backend in iteritems(self._backends): if name == reset_from: continue backend.reload()
[docs] def opt(self, key, default=False): """access a given option""" if key not in self.options: return default return self.options.get(key)
[docs] def register_backend(self, name, backend): """register a new ACL backend :param name: Backend name :type name: str :param backend: ACL Backend :type backend: :class:`burpui.misc.acl.interface.BUIaclLoader` """ self._backends[name] = backend
[docs] def set_grant(self, name, grant): """parse and set the given grants""" if name in self._grants: return self._grants[name].add_grants(grant) self._grants[name] = BUIaclGrant(name, grant) return self._grants[name].grants
[docs] def set_group(self, name, members): """parse and set the given group""" if name in self._groups: return self._groups[name].add_members(members) self._groups[name] = BUIaclGroup(name, members) return self._groups[name].members
[docs] def set_admin(self, admins): """parse and set the admins""" self.set_group(self._gp_admin_name, admins)
[docs] def set_moderator(self, moderators): """parse and set the moderators""" self.set_group(self._gp_moderator_name, moderators)
[docs] def set_moderator_grants(self, grants): """parse and set the moderators grants""" self.set_grant(self._gp_moderator_name, grants)
def get_member_groups(self, member): groups = [] for group in itervalues(self._groups): (ret, inh) = group.is_member(member) if ret and group.name not in self._gp_hidden: groups.append((group.name, inh)) return groups def _gen_key(self, username): return '{}-{}'.format(self._name, username) def _set_cached(self, username, value): key = self._gen_key(username) return cache.cache.set(key, value) def _get_cached(self, username): key = self._gen_key(username) return cache.cache.get(key) def _reset_cached(self): cache.clear() def _is_cached(self, username): key = self._gen_key(username) return cache.cache.has(key) def _extract_grants(self, username, parent=None): if not self._is_cached(username): data = {} if username in self.grants: grants = self.grants[username].grants else: grants = [] clients, agents, advanced = self._parse_clients(grants) data['clients'] = clients data['agents'] = agents data['advanced'] = [advanced] if advanced else [] def __merge_grants_with(grp, prt): data2 = self._extract_grants(grp, prt) data['clients'] = self._merge_data( data['clients'], data2['clients'] ) data['agents'] = self._merge_data( data['agents'], data2['agents'] ) tmp = data2['advanced'] if tmp: data['advanced'] += tmp # moderator is also a group for gname, group in iteritems(self.groups): # no grants need to be parsed for admins if gname == self._gp_admin_name: continue (ret, _) = group.is_member(username) if not parent: parent = set([username]) elif isinstance(parent, set): parent.add(username) if ret and gname != username and parent and gname not in parent: __merge_grants_with(gname, parent) self._set_cached(username, data) return data return self._get_cached(username) def _extract_clients(self, username): ret = self._extract_grants(username) return ret.get('clients', []) def _extract_agents(self, username): ret = self._extract_grants(username) return ret.get('agents', []) def _extract_advanced(self, username, idx=None): ret = self._extract_grants(username).get('advanced', []) if idx is not None: return ret[idx] if self.opt('inverse_inheritance'): return reversed(ret) return ret def _extract_advanced_mode(self, username, mode, kind, idx): return self._extract_advanced(username, idx).get(mode, {}).get(kind, []) def _client_match(self, username, client): clients = self._extract_clients(username) if not clients: return None if self.opt('extended'): matches = [] for exp in clients: regex = fnmatch.translate(exp) if re.match(regex, client): matches.append(exp) return matches if matches else False else: return [client] if client in clients else False def _server_match(self, username, server): servers = self._extract_agents(username) if not servers: return None if self.opt('extended'): matches = [] for exp in servers: regex = fnmatch.translate(exp) if re.match(regex, server): matches.append(exp) return matches if matches else False else: return [server] if server in servers else False # implement BUIacl methods
[docs] def is_admin(self, username): """See :func:`burpui.misc.acl.interface.BUIacl.is_admin`""" if self._gp_admin_name in self._groups: return self._groups[self._gp_admin_name].is_member(username) return False, None
[docs] def is_moderator(self, username): """See :func:`burpui.misc.acl.interface.BUIacl.is_moderator`""" if self._gp_moderator_name in self._groups: return self._groups[self._gp_moderator_name].is_member(username) return False, None
[docs] def is_client_rw(self, username=None, client=None, server=None): """See :func:`burpui.misc.acl.interface.BUIacl.is_client_rw`""" if not username or not client: # pragma: no cover return False (is_admin, _) = self.is_admin(username) ret = is_admin or self.opt('assume_rw', True) or self.opt('legacy') if self.is_client_allowed(username, client, server): # legacy mode: assume rw for everyone if self.opt('legacy'): return True client_match = self._client_match(username, client) advanced = self._extract_advanced(username) if client_match is None and username == client: client_match = [username] if server: server_match = self._server_match(username, server) if not server_match and not client_match: return is_admin or self.opt('assume_rw', True) for adv in advanced: for adv2 in advanced: # the whole agent is rw and we did not find explicit entry for # client_match if client_match is False: if server_match and \ any([x in adv.get('rw', {}) or x in adv.get('rw', {}).get('agents', []) for x in server_match]): return True if server in adv.get('rw', {}) or \ server in adv.get('rw', {}).get('agents', []): return True if server_match and \ any([x in adv.get('rw', {}) or x in adv.get('rw', {}).get('agents', []) for x in server_match]): # both agent and client are defined as rw if client_match and \ any([x in adv2.get('rw', []) or x in adv2.get('rw', {}).get('clients', []) or any([x in adv2.get('rw', {}).get(y, []) for y in server_match]) for x in client_match ]): return True # the agent is rw but the client is explicitly defined as ro if client_match and \ any([x in adv2.get('ro', []) or x in adv2.get('ro', {}).get('clients', []) or any([x in adv2.get('ro', {}).get(y, []) for y in server_match]) for x in client_match ]): return False if server_match and \ any([x in adv.get('ro', {}) or x in adv.get('ro', {}).get('agents', []) for x in server_match]): # the agent is ro, but the client is explicitly defined as rw if client_match and \ any([x in adv2.get('rw', {}).get('clients', []) or x in adv2.get('rw', []) or any([x in adv2.get('rw', {}).get(y, []) for y in server_match]) for x in client_match ]): return True # both server and client are explicitly defined as ro if client_match and \ any([x in adv2.get('ro', {}).get('clients', []) or x in adv2.get('ro', []) or any([x in adv2.get('ro', {}).get(y, []) for y in server_match]) for x in client_match ]): return False for adv in advanced: # client is explicitly defined as rw rw_clients = adv.get('rw', {}).get('clients', []) if client_match and \ any([x in rw_clients for x in client_match]): return True if client and \ client in rw_clients: return True # client is explicitly defined as ro ro_clients = adv.get('ro', {}).get('clients', []) if client_match and \ any([x in ro_clients for x in client_match]): return False if client and \ client in ro_clients: return False return ret
[docs] def is_client_allowed(self, username=None, client=None, server=None): """See :func:`burpui.misc.acl.interface.BUIacl.is_client_allowed`""" if not username or not client: # pragma: no cover return False (is_admin, _) = self.is_admin(username) client_match = self._client_match(username, client) if client_match is None and username == client: client_match = [username] elif not client_match: client_match = False if server: server_match = self._server_match(username, server) if server_match is not None or self.opt('legacy'): if not server_match: return is_admin advanced = self._extract_advanced(username) if self.opt('implicit_link', True) and not advanced: advanced = False if advanced is not False: for idx, adv in enumerate(advanced): if all([x not in adv for x in server_match]) and \ any([x in y for x in server_match for y in self._extract_advanced_mode(username, 'ro', 'agents', idx) ]) or \ any([x in y for x in server_match for y in self._extract_advanced_mode(username, 'rw', 'agents', idx) ]): return True tmp = set(adv.get(server, [])) for srv in server_match: tmp |= set(adv.get(srv, [])) adv2 = list(tmp) if client_match is not False and \ (any([x in adv2 for x in client_match]) or client in adv2): return True return False return client_match is not False or is_admin
[docs] def is_server_rw(self, username=None, server=None): """See :func:`burpui.misc.acl.interface.BUIacl.is_server_rw`""" if not username: # pragma: no cover return False # special case single-agent mode if not server and config.get('STANDALONE'): server = 'local' (is_admin, _) = self.is_admin(username) ret = is_admin or self.opt('assume_rw', True) or self.opt('legacy') if self.is_server_allowed(username, server): server_match = self._server_match(username, server) if not server_match: return is_admin or self.opt('assume_rw', True) advanced = self._extract_advanced(username) for adv in advanced: # server is explicitly defined as ro if any([x in adv.get('ro', {}).get('agents', []) for x in server_match]): return False # server is explicitly defined as rw if any([x in adv.get('rw', {}).get('agents', []) for x in server_match]): return True return ret
[docs] def is_server_allowed(self, username=None, server=None): """See :func:`burpui.misc.acl.interface.BUIacl.is_server_allowed`""" if not username or not server: return False server_match = self._server_match(username, server) (is_admin, _) = self.is_admin(username) if server_match is None and self.opt('legacy'): server_match = False return server_match is not False or is_admin
class BUIaclGroup(object): """The :class:`burpui.misc.acl.interface.BUIaclGroup` class is used to represent a Group""" def __init__(self, name, members=None): self._name = name self._set_members(members) self.has_subgroups = -1 def _parse_members(self, members): # we support only lists if members and ',' in members and not isinstance(members, list): parsed = [x.strip() for x in members.split(',')] else: parsed = make_list(members) return parsed def _set_members(self, members): self._members = set(self._parse_members(members)) def add_members(self, new_members): new_members = self._parse_members(new_members) self._members = self._members | set(new_members) # reset the flag self.has_subgroups = -1 return new_members def del_members(self, members_remove): members_remove = self._parse_members(members_remove) self._members = self._members - set(members_remove) # reset the flag self.has_subgroups = -1 def is_member(self, member, parent=None): inherit = set() ret = member in self._members if not ret and (self.has_subgroups > 0 or self.has_subgroups == -1): self.has_subgroups = 0 for mem in self.members: # avoid infinite loop with mutual inheritance if parent and mem in parent: continue if mem.startswith('@'): self.has_subgroups += 1 if mem in meta_grants._groups: if parent: parent.append(mem) else: parent = [mem] (ret, inh2) = meta_grants._groups[mem].is_member(member, parent=parent) if ret: for subinh in inh2: inherit.add(subinh) inherit.add(mem) # no break, we may have other inheritance at the level return ret, list(inherit) @property def name(self): if self._name and any(self._name.startswith(x) for x in ['@', '+']): return str(self._name[1:]) return self._name @property def members(self): return list(self._members) class BUIaclGrant(BUImetaGrant): """The :class:`burpui.misc.acl.interface.BUIaclGrant` class is used to represent a Grant""" def __init__(self, name, grants): self._name = name self._grants = self._parse_grants(grants) def _parse_grants(self, grants): try: ret = json.loads(grants) except (ValueError, TypeError): # handle empty/missing grants if not grants: return [] # ignore mal-formatted json if any([x in grants for x in ['{', '}', '[', ']']]): ret = None elif grants and ',' in grants: ret = [x.rstrip() for x in grants.split(',')] else: ret = make_list(grants) return ret @property def name(self): if self._name and any(self._name.startswith(x) for x in ['@', '+']): return str(self._name[1:]) return self._name @property def grants(self): return self._grants @property def grants_raw(self): return json.dumps(self._grants) def add_grants(self, grants): parsed = self._parse_grants(grants) self._grants = self._merge_data(self._grants, parsed) return parsed meta_grants = BUIgrantHandler()