HEX
Server: LiteSpeed
System: Linux CentOS-79-64-minimal 3.10.0-1160.119.1.el7.x86_64 #1 SMP Tue Jun 4 14:43:51 UTC 2024 x86_64
User: vishn3436 (5293)
PHP: 8.0.15
Disabled: NONE
Upload Files
File: //scripts/script-server/src/auth/authorization.py
from collections import defaultdict

ANY_USER = '__ANY_USER'
ADMIN_GROUP = 'admin_users'
GROUP_PREFIX = '@'


def _normalize_user(user):
    if user:
        return user.lower().strip()
    return user


def _normalize_users(allowed_users):
    if isinstance(allowed_users, list):
        if ANY_USER in allowed_users:
            return ANY_USER

        return [_normalize_user(user) for user in allowed_users]

    return allowed_users


class Authorizer:
    def __init__(self, app_allowed_users, admin_users, full_history_users, code_editor_users, groups_provider):
        self._app_allowed_users = _normalize_users(app_allowed_users)
        self._admin_users = _normalize_users(admin_users)
        self._full_history_users = _normalize_users(full_history_users)
        self._code_editor_users = _normalize_users(code_editor_users)

        self._groups_provider = groups_provider

    def is_allowed_in_app(self, user_id):
        return self._is_allowed_internal(user_id, self._app_allowed_users)

    def is_admin(self, user_id):
        return self._is_allowed_internal(user_id, self._admin_users)

    def has_full_history_access(self, user_id):
        return self.is_admin(user_id) or self._is_allowed_internal(user_id, self._full_history_users)

    def can_edit_code(self, user_id):
        return self.is_admin(user_id) and self._is_allowed_internal(user_id, self._code_editor_users)

    def is_allowed(self, user_id, allowed_users):
        normalized_users = _normalize_users(allowed_users)

        return self._is_allowed_internal(user_id, normalized_users)

    def _is_allowed_internal(self, user_id, normalized_allowed_users):
        if not normalized_allowed_users:
            return False

        if normalized_allowed_users == ANY_USER:
            return True

        if _normalize_user(user_id) in normalized_allowed_users:
            return True

        user_groups = self._groups_provider.get_groups(user_id)
        if not user_groups:
            return False

        for group in user_groups:
            if _normalize_user(GROUP_PREFIX + group) in normalized_allowed_users:
                return True

        return False


class EmptyGroupProvider:

    def get_groups(self, user, known_groups=None):
        return []


def _flatten_groups(groups):
    result = {}

    for group in groups.keys():
        group_members = set()
        visited_groups = set()

        queue = [group]

        while queue:
            current_group = queue.pop()
            if current_group in visited_groups:
                continue
            visited_groups.add(current_group)

            if current_group in result:
                group_members.update(result[current_group])
                continue

            current_group_members = groups[current_group]
            for member in current_group_members:
                group_members.add(member)

                if member.startswith(GROUP_PREFIX) and (member[1:] in groups):
                    queue.append(member[1:])

        result[group] = group_members

    return result


class PreconfiguredGroupProvider:

    def __init__(self, groups) -> None:
        self._user_groups = defaultdict(list)
        self._lazy_group_parents = defaultdict(list)

        flat_groups = _flatten_groups(groups)
        for group, members in flat_groups.items():
            for member in members:
                if member.startswith(GROUP_PREFIX):
                    self._lazy_group_parents[member[1:]].append(group)
                else:
                    self._user_groups[_normalize_user(member)].append(group)

    def get_groups(self, user, known_groups=None):
        user_groups = set(self._user_groups[_normalize_user(user)])

        if known_groups:
            for known_group in known_groups:
                if known_group in self._lazy_group_parents:
                    parent_groups = self._lazy_group_parents[known_group]
                    user_groups.update(parent_groups)

        return user_groups


class CombinedGroupProvider:

    def __init__(self, *other_providers) -> None:
        self._other_providers = list(other_providers)

    def get_groups(self, user, known_groups=None):
        groups = set()

        if not known_groups:
            known_groups = []

        for provider in self._other_providers:
            provider_groups = provider.get_groups(user, known_groups)
            if provider_groups:
                groups.update(provider_groups)
                known_groups.extend(provider_groups)

        return groups


def create_group_provider(user_groups, authenticator, admin_users):
    if admin_users:
        admin_users = _exclude_unknown_groups_from_admin_users(admin_users, user_groups)
        if user_groups is None:
            user_groups = {ADMIN_GROUP: admin_users}
        elif ADMIN_GROUP not in user_groups:
            user_groups[ADMIN_GROUP] = admin_users

    if not user_groups:
        if authenticator:
            return authenticator
        return EmptyGroupProvider()

    preconfigured_groups_provider = PreconfiguredGroupProvider(user_groups)
    if not authenticator:
        return preconfigured_groups_provider

    return CombinedGroupProvider(authenticator, preconfigured_groups_provider)


# in case groups will be loaded from ldap
def _exclude_unknown_groups_from_admin_users(admin_users, known_groups):
    if not admin_users or not known_groups:
        return admin_users

    result = []
    for user in admin_users:
        if user.startswith(GROUP_PREFIX):
            group = user[1:]
            if group not in known_groups.keys():
                continue

        result.append(user)

    return result


def is_same_user(user_id1, user_id2):
    return _normalize_user(user_id1) == _normalize_user(user_id2)