HEX
Server: LiteSpeed
System: Linux CentOS-79-64-minimal 3.10.0-1160.119.1.el7.x86_64 #1 SMP Tue Jun 4 14:43:51 UTC 2024 x86_64
User: vishn3436 (5293)
PHP: 8.0.15
Disabled: NONE
Upload Files
File: //scripts/script-server/src/model/server_conf.py
import json
import logging
import os

import utils.file_utils as file_utils
from auth.authorization import ANY_USER
from model import model_helper
from model.model_helper import read_list, read_int_from_config, read_bool_from_config
from model.trusted_ips import TrustedIpValidator
from utils.string_utils import strip
import utils.custom_json as custom_json

LOGGER = logging.getLogger('server_conf')

XSRF_PROTECTION_TOKEN = 'token'
XSRF_PROTECTION_HEADER = 'header'
XSRF_PROTECTION_DISABLED = 'disabled'


class ServerConfig(object):
    def __init__(self) -> None:
        self.address = None
        self.port = None
        self.ssl = False
        self.ssl_key_path = None
        self.ssl_cert_path = None
        self.authenticator = None
        self.allowed_users = None
        self.alerts_config = None
        self.logging_config = None
        self.admin_config = None
        self.title = None
        self.enable_script_titles = None
        self.ip_validator = TrustedIpValidator([])
        self.user_groups = None
        self.admin_users = []
        self.full_history_users = []
        self.code_editor_users = []
        self.max_request_size_mb = None
        self.callbacks_config = None
        self.user_header_name = None
        self.secret_storage_file = None
        self.xsrf_protection = None

    def get_port(self):
        return self.port

    def is_ssl(self):
        return self.ssl

    def get_ssl_key_path(self):
        return self.ssl_key_path

    def get_ssl_cert_path(self):
        return self.ssl_cert_path


class LoggingConfig:
    def __init__(self) -> None:
        self.filename_pattern = None
        self.date_format = None


def from_json(conf_path, temp_folder):
    if os.path.exists(conf_path):
        file_content = file_utils.read_file(conf_path)
    else:
        file_content = "{}"

    config = ServerConfig()

    json_object = custom_json.loads(file_content)

    address = "0.0.0.0"
    port = 5000

    ssl = json_object.get("ssl")
    if ssl is not None:
        key_path = model_helper.read_obligatory(ssl, 'key_path', ' for ssl')
        cert_path = model_helper.read_obligatory(ssl, 'cert_path', ' for ssl')

        config.ssl = True
        config.ssl_key_path = key_path
        config.ssl_cert_path = cert_path
        port = 5443

    if json_object.get("address"):
        address = json_object.get("address")
    config.address = address

    if json_object.get("port"):
        port = json_object.get("port")
    config.port = port

    if json_object.get('title'):
        config.title = json_object.get('title')
    config.enable_script_titles = read_bool_from_config('enable_script_titles', json_object, default=True)

    access_config = json_object.get('access')
    if access_config:
        allowed_users = access_config.get('allowed_users')
        user_groups = model_helper.read_dict(access_config, 'groups')
        user_header_name = access_config.get('user_header_name')
    else:
        allowed_users = None
        user_groups = {}
        user_header_name = None

    auth_config = json_object.get('auth')
    if auth_config:
        config.authenticator = create_authenticator(auth_config, temp_folder)

        auth_type = config.authenticator.auth_type
        if auth_type == 'google_oauth' and allowed_users is None:
            raise Exception('access.allowed_users field is mandatory for ' + auth_type)

        def_trusted_ips = []
        def_admins = []
    else:
        def_trusted_ips = ['127.0.0.1', '::1', '172.20.0.1']
        def_admins = def_trusted_ips

    if access_config:
        trusted_ips = strip(read_list(access_config, 'trusted_ips', default=def_trusted_ips))
        admin_users = _parse_admin_users(access_config, default_admins=def_admins)
        full_history_users = _parse_history_users(access_config)
        code_editor_users = _parse_code_editor_users(access_config, admin_users)
    else:
        trusted_ips = def_trusted_ips
        admin_users = def_admins
        full_history_users = []
        code_editor_users = def_admins

    security = model_helper.read_dict(json_object, 'security')

    config.allowed_users = _prepare_allowed_users(allowed_users, admin_users, user_groups)
    config.alerts_config = json_object.get('alerts')
    config.callbacks_config = json_object.get('callbacks')
    config.logging_config = parse_logging_config(json_object)
    config.user_groups = user_groups
    config.admin_users = admin_users
    config.full_history_users = full_history_users
    config.code_editor_users = code_editor_users
    config.user_header_name = user_header_name
    config.ip_validator = TrustedIpValidator(trusted_ips)

    config.max_request_size_mb = read_int_from_config('max_request_size', json_object, default=10)

    config.secret_storage_file = json_object.get('secret_storage_file', os.path.join(temp_folder, 'secret.dat'))
    config.xsrf_protection = _parse_xsrf_protection(security)

    return config


def create_authenticator(auth_object, temp_folder):
    auth_type = auth_object.get('type')

    if not auth_type:
        raise Exception('Auth type should be specified')

    auth_type = auth_type.strip().lower()
    if auth_type == 'ldap':
        from auth.auth_ldap import LdapAuthenticator
        authenticator = LdapAuthenticator(auth_object, temp_folder)
    elif auth_type == 'google_oauth':
        from auth.auth_google_oauth import GoogleOauthAuthenticator
        authenticator = GoogleOauthAuthenticator(auth_object)
    elif auth_type == 'gitlab':
        from auth.auth_gitlab import GitlabOAuthAuthenticator
        authenticator = GitlabOAuthAuthenticator(auth_object)
    elif auth_type == 'htpasswd':
        from auth.auth_htpasswd import HtpasswdAuthenticator
        authenticator = HtpasswdAuthenticator(auth_object)
    else:
        raise Exception(auth_type + ' auth is not supported')

    authenticator.auth_expiration_days = float(auth_object.get('expiration_days', 30))

    authenticator.auth_type = auth_type

    return authenticator


def _prepare_allowed_users(allowed_users, admin_users, user_groups):
    if (allowed_users is None) or (allowed_users == '*'):
        return [ANY_USER]

    elif not isinstance(allowed_users, list):
        raise Exception('allowed_users should be list')

    coerced_users = strip(allowed_users)
    coerced_users = {user for user in coerced_users if len(user) > 0}

    if '*' in coerced_users:
        return [ANY_USER]

    if admin_users and (ANY_USER != admin_users) and (ANY_USER not in admin_users):
        coerced_users.update(admin_users)

    if user_groups:
        for group, users in user_groups.items():
            if (ANY_USER != users) and (ANY_USER not in users):
                coerced_users.update(users)

    return list(coerced_users)


def parse_logging_config(json_object):
    config = LoggingConfig()

    if json_object.get('logging'):
        json_logging_config = json_object.get('logging')
        config.filename_pattern = json_logging_config.get('execution_file')
        config.date_format = json_logging_config.get('execution_date_format')

    return config


def _parse_admin_users(json_object, default_admins=None):
    admins = strip(read_list(json_object, 'admin_users', default=default_admins))
    if '*' in admins:
        LOGGER.warning('Any user is allowed to access admin page, be careful!')
        return [ANY_USER]

    return admins


def _parse_history_users(json_object):
    full_history_users = strip(read_list(json_object, 'full_history', default=[]))
    if (isinstance(full_history_users, list) and '*' in full_history_users) \
            or full_history_users == '*':
        return [ANY_USER]

    return full_history_users


def _parse_code_editor_users(json_object, admin_users):
    full_code_editor_users = strip(read_list(json_object, 'code_editors', default=admin_users))
    if (isinstance(full_code_editor_users, list) and '*' in full_code_editor_users) \
            or full_code_editor_users == '*':
        return [ANY_USER]

    return full_code_editor_users


def _parse_xsrf_protection(security):
    return model_helper.read_str_from_config(security,
                                             'xsrf_protection',
                                             default=XSRF_PROTECTION_TOKEN,
                                             allowed_values=[XSRF_PROTECTION_TOKEN,
                                                             XSRF_PROTECTION_HEADER,
                                                             XSRF_PROTECTION_DISABLED])


class InvalidServerConfigException(Exception):
    def __init__(self, message) -> None:
        super().__init__(message)