HEX
Server: LiteSpeed
System: Linux CentOS-79-64-minimal 3.10.0-1160.119.1.el7.x86_64 #1 SMP Tue Jun 4 14:43:51 UTC 2024 x86_64
User: vishn3436 (5293)
PHP: 8.0.15
Disabled: NONE
Upload Files
File: //scripts/script-server/src/model/parameter_config.py
import logging
import os
from collections import OrderedDict
from ipaddress import ip_address, IPv4Address, IPv6Address

from config.constants import PARAM_TYPE_SERVER_FILE, FILE_TYPE_FILE, PARAM_TYPE_MULTISELECT, FILE_TYPE_DIR, \
    PARAM_TYPE_EDITABLE_LIST
from config.script.list_values import ConstValuesProvider, ScriptValuesProvider, EmptyValuesProvider, \
    DependantScriptValuesProvider, NoneValuesProvider, FilesProvider
from model import model_helper
from model.model_helper import resolve_env_vars, replace_auth_vars, is_empty, SECURE_MASK, \
    normalize_extension, read_bool_from_config, InvalidValueException, read_str_from_config
from react.properties import ObservableDict, observable_fields
from utils import file_utils, string_utils, process_utils
from utils.file_utils import FileMatcher
from utils.string_utils import strip

LOGGER = logging.getLogger('script_server.parameter_config')


@observable_fields(
    'param',
    'same_arg_param'
    'env_var',
    'no_value',
    'description',
    'required',
    'default',
    'type',
    'min',
    'max',
    'max_length',
    'constant',
    '_values_provider',
    'values',
    'secure',
    'separator',
    'multiselect_argument_type',
    'file_dir',  # path relative to working dir (for execution)
    '_list_files_dir',  # file_dir, relative to the server path (for listing files)
    'file_type',
    'file_extensions',
    'file_recursive')
class ParameterModel(object):
    def __init__(self, parameter_config, username, audit_name, other_params_supplier,
                 other_param_values: ObservableDict = None,
                 working_dir=None):
        self._username = username
        self._audit_name = audit_name
        self._parameters_supplier = other_params_supplier
        self._working_dir = working_dir

        self.name = parameter_config.get('name')

        self._original_config = parameter_config
        self._parameter_values = other_param_values

        self._reload()

        if (other_param_values is not None) \
                and (self._values_provider is not None) \
                and self._values_provider.get_required_parameters():
            other_param_values.subscribe(self._param_values_observer)

    def _reload(self):
        config = self._original_config

        self.param = config.get('param')
        self.same_arg_param = read_bool_from_config('same_arg_param', config, default=False)
        self.env_var = config.get('env_var')
        self.no_value = read_bool_from_config('no_value', config, default=False)
        self.description = replace_auth_vars(config.get('description'), self._username, self._audit_name)
        self.required = read_bool_from_config('required', config, default=False)
        self.min = config.get('min')
        self.max = config.get('max')
        self.max_length = config.get('max_length')
        self.secure = read_bool_from_config('secure', config, default=False)
        self.separator = config.get('separator', ',')
        self.multiselect_argument_type = read_str_from_config(
            config,
            'multiselect_argument_type',
            default='single_argument',
            allowed_values=['single_argument', 'argument_per_value', 'repeat_param_value'])
        self.default = _resolve_default(config.get('default'), self._username, self._audit_name, self._working_dir)
        self.file_dir = _resolve_file_dir(config, 'file_dir')
        self._list_files_dir = _resolve_list_files_dir(self.file_dir, self._working_dir)
        self.file_extensions = _resolve_file_extensions(config, 'file_extensions')
        self.file_type = _resolve_parameter_file_type(config, 'file_type', self.file_extensions)
        self.file_recursive = read_bool_from_config('file_recursive', config, default=False)
        self.excluded_files_matcher = _resolve_excluded_files(config, 'excluded_files', self._list_files_dir)

        self.type = self._read_type(config)

        self.constant = read_bool_from_config('constant', config, default=False)

        self._validate_config()

        values_provider = self._create_values_provider(
            config.get('values'),
            self.type,
            self.constant)
        self._values_provider = values_provider
        self._reload_values()

    def _validate_config(self):
        param_log_name = self.str_name()

        if self.constant and not self.default:
            message = 'Constant should have default value specified'
            raise Exception('Failed to set parameter "' + param_log_name + '" to constant: ' + message)

        if self.type == PARAM_TYPE_SERVER_FILE:
            if not self.file_dir:
                raise Exception('Parameter ' + param_log_name + ' has missing config file_dir')

    def str_name(self):
        names = (name for name in (self.name, self.param, self.description) if name)
        return next(names, 'unknown')

    def validate_parameter_dependencies(self, all_parameters):
        required_parameters = self.get_required_parameters()
        if not required_parameters:
            return

        parameters_dict = {p.name: p for p in all_parameters}

        for parameter_name in required_parameters:
            if parameter_name not in parameters_dict:
                raise Exception('Missing parameter "' + parameter_name + '" for the script')
            parameter = parameters_dict[parameter_name]
            unsupported_type = None

            if parameter.constant:
                unsupported_type = 'constant'
            elif parameter.secure:
                unsupported_type = 'secure'
            elif parameter.no_value:
                unsupported_type = 'no_value'

            if unsupported_type:
                raise Exception(
                    'Unsupported parameter "' + parameter_name
                    + '" of type "' + unsupported_type
                    + '" in values.script! ')

    def _read_type(self, config):
        type = config.get('type', 'text')

        if type.lower() in ('ip', 'ip4', 'ip6', 'ipv4', 'ipv6'):
            type = type.lower().replace('v', '')

        return type

    def _param_values_observer(self, key, old_value, new_value):
        values_provider = self._values_provider
        if values_provider is None:
            return

        if key not in values_provider.get_required_parameters():
            return

        self._reload_values()

    def _reload_values(self):
        values_provider = self._values_provider
        if not values_provider:
            self.values = None
            return

        values = values_provider.get_values(self._parameter_values)
        self.values = values

    def _create_values_provider(self, values_config, type, constant):
        if constant:
            return NoneValuesProvider()

        if self._is_plain_server_file():
            return FilesProvider(self._list_files_dir, self.file_type, self.file_extensions,
                                 self.excluded_files_matcher)

        if (type != 'list') and (type != PARAM_TYPE_MULTISELECT) and (type != PARAM_TYPE_EDITABLE_LIST):
            return NoneValuesProvider()

        if is_empty(values_config):
            return EmptyValuesProvider()

        if isinstance(values_config, list):
            return ConstValuesProvider(values_config)

        elif 'script' in values_config:
            original_script = values_config['script']
            has_variables = ('${' in original_script)

            script = replace_auth_vars(original_script, self._username, self._audit_name)
            shell = read_bool_from_config('shell', values_config, default=not has_variables)

            if '${' not in script:
                return ScriptValuesProvider(script, shell)

            return DependantScriptValuesProvider(script, self._parameters_supplier, shell)

        else:
            message = 'Unsupported "values" format for ' + self.name
            raise Exception(message)

    def get_required_parameters(self):
        if not self._values_provider:
            return []

        return self._values_provider.get_required_parameters()

    def normalize_user_value(self, value):
        if self.type == PARAM_TYPE_MULTISELECT or self._is_recursive_server_file():
            if isinstance(value, list):
                return value
            if not is_empty(value):
                return [value]
            else:
                return []

        return value

    def value_to_str(self, value):
        if self.secure:
            return SECURE_MASK

        return str(value)

    def value_to_repr(self, value):
        if self.secure:
            return SECURE_MASK

        return repr(value)

    def get_secured_value(self, value):
        if (not self.secure) or (value is None) or self.no_value:
            return value

        if isinstance(value, list):
            return [self.value_to_str(e) for e in value]

        return self.value_to_str(value)

    def map_to_script(self, user_value):
        def map_single_value(user_value):
            if self._values_provider:
                return self._values_provider.map_value(user_value)
            return user_value

        if self.type == PARAM_TYPE_MULTISELECT:
            return [map_single_value(v) for v in user_value]

        elif self._is_recursive_server_file():
            if user_value:
                return os.path.join(self.file_dir, *user_value)
            else:
                return None
        elif self._is_plain_server_file():
            if not is_empty(user_value):
                return os.path.join(self.file_dir, user_value)
            else:
                return None

        return map_single_value(user_value)

    def to_script_args(self, script_value):
        if self.type == PARAM_TYPE_MULTISELECT:
            if self.multiselect_argument_type == 'single_argument':
                return self.separator.join(script_value)
            else:
                return script_value

        return script_value

    def validate_value(self, value, *, ignore_required=False):
        if self.constant:
            return None

        if is_empty(value):
            if self.required and not ignore_required:
                return 'is not specified'
            return None

        value_string = self.value_to_repr(value)

        if self.no_value:
            if value not in ['true', True, 'false', False]:
                return 'should be boolean, but has value ' + value_string
            return None

        if self.type == 'text':
            if (not is_empty(self.max_length)) and (len(value) > int(self.max_length)):
                return 'is longer than allowed char length (' \
                        + str(len(value)) + ' > ' + str(self.max_length) + ')'
            return None

        if self.type == 'file_upload':
            if not os.path.exists(value):
                return 'Cannot find file ' + value
            return None

        if self.type == 'int':
            if not (isinstance(value, int) or (isinstance(value, str) and string_utils.is_integer(value))):
                return 'should be integer, but has value ' + value_string

            int_value = int(value)

            if (not is_empty(self.max)) and (int_value > int(self.max)):
                return 'is greater than allowed value (' \
                       + value_string + ' > ' + str(self.max) + ')'

            if (not is_empty(self.min)) and (int_value < int(self.min)):
                return 'is lower than allowed value (' \
                       + value_string + ' < ' + str(self.min) + ')'
            return None

        if self.type in ('ip', 'ip4', 'ip6'):
            try:
                address = ip_address(value.strip())
                if self.type == 'ip4':
                    if not isinstance(address, IPv4Address):
                        return value_string + ' is not an IPv4 address'
                elif self.type == 'ip6':
                    if not isinstance(address, IPv6Address):
                        return value_string + ' is not an IPv6 address'
            except ValueError:
                return 'wrong IP address ' + value_string

        allowed_values = self.values

        if (self.type == 'list') or (self._is_plain_server_file()):
            if value not in allowed_values:
                return 'has value ' + value_string \
                       + ', but should be in ' + repr(allowed_values)
            return None

        if self.type == PARAM_TYPE_MULTISELECT:
            if not isinstance(value, list):
                return 'should be a list, but was: ' + value_string + '(' + str(type(value)) + ')'
            for value_element in value:
                if value_element not in allowed_values:
                    element_str = self.value_to_repr(value_element)
                    return 'has value ' + element_str \
                           + ', but should be in ' + repr(allowed_values)
            return None

        if self._is_recursive_server_file():
            return self._validate_recursive_path(value, intermediate=False)

        return None

    def list_files(self, path):
        if not self._is_recursive_server_file():
            raise WrongParameterUsageException(self.name, 'Can list files only for recursive file parameters')

        validation_error = self._validate_recursive_path(path, intermediate=True)
        if validation_error:
            raise InvalidValueException(self.name, validation_error)

        full_path = self._build_list_file_path(path)

        result = []

        if is_empty(self.file_type) or self.file_type == FILE_TYPE_FILE:
            files = model_helper.list_files(full_path,
                                            file_type=FILE_TYPE_FILE,
                                            file_extensions=self.file_extensions,
                                            excluded_files_matcher=self.excluded_files_matcher)
            for file in files:
                result.append({'name': file, 'type': FILE_TYPE_FILE, 'readable': True})

        dirs = model_helper.list_files(full_path,
                                       file_type=FILE_TYPE_DIR,
                                       excluded_files_matcher=self.excluded_files_matcher)
        for dir in dirs:
            dir_path = os.path.join(full_path, dir)

            readable = os.access(dir_path, os.R_OK)
            result.append({'name': dir, 'type': FILE_TYPE_DIR, 'readable': readable})

        return result

    def _is_plain_server_file(self):
        return self.type == PARAM_TYPE_SERVER_FILE and not self.file_recursive

    def _is_recursive_server_file(self):
        return self.type == PARAM_TYPE_SERVER_FILE and self.file_recursive

    def _validate_recursive_path(self, path, intermediate):
        value_string = self.value_to_str(path)

        if not isinstance(path, list):
            return 'should be a list, but was: ' + value_string + '(' + str(type(path)) + ')'

        if ('.' in path) or ('..' in path):
            return 'Relative path references are not allowed'

        full_path = self._build_list_file_path(path)

        if self.excluded_files_matcher.has_match(full_path):
            return 'Path ' + value_string + ' is excluded'

        if not os.path.exists(full_path):
            return 'Path ' + value_string + ' does not exist'

        if intermediate:
            if not os.access(full_path, os.R_OK):
                return 'Path ' + value_string + ' not accessible'

            if not os.path.isdir(full_path):
                return 'Path ' + value_string + ' is not a directory'

        else:
            dir = path[:-1]
            file = path[-1]

            dir_path = self._build_list_file_path(dir)
            allowed_files = model_helper.list_files(dir_path,
                                                    file_type=self.file_type,
                                                    file_extensions=self.file_extensions,
                                                    excluded_files_matcher=self.excluded_files_matcher)
            if file not in allowed_files:
                return 'Path ' + value_string + ' is not allowed'

    def _build_list_file_path(self, child_path):
        return os.path.normpath(os.path.join(self._list_files_dir, *child_path))


def _resolve_default(default, username, audit_name, working_dir):
    if not default:
        return default

    script = False
    if isinstance(default, dict) and 'script' in default:
        string_value = default['script']
        script = True
    elif isinstance(default, str):
        string_value = default
    else:
        return default

    resolved_string_value = resolve_env_vars(string_value, full_match=True)
    if resolved_string_value == string_value:
        resolved_string_value = replace_auth_vars(string_value, username, audit_name)

    if script:
        has_variables = string_value != resolved_string_value
        shell = read_bool_from_config('shell', default, default=not has_variables)
        output = process_utils.invoke(resolved_string_value, working_dir, shell=shell)
        return output.strip()

    return resolved_string_value


def _resolve_file_dir(config, key):
    raw_value = config.get(key)
    if not raw_value:
        return raw_value

    return resolve_env_vars(raw_value)


def _resolve_list_files_dir(file_dir, working_dir):
    if not file_dir or not working_dir:
        return file_dir

    return file_utils.normalize_path(file_dir, working_dir)


def _resolve_file_extensions(config, key):
    result = model_helper.read_list(config, key)
    if result is None:
        return []

    return [normalize_extension(e) for e in strip(result)]


def _resolve_excluded_files(config, key, file_dir):
    raw_patterns = model_helper.read_list(config, key)
    if raw_patterns is None:
        patterns = []
    else:
        patterns = [resolve_env_vars(e) for e in strip(raw_patterns)]
    return FileMatcher(patterns, file_dir)


def _resolve_parameter_file_type(config, key, file_extensions):
    if file_extensions:
        return FILE_TYPE_FILE

    value = config.get(key)

    if is_empty(value):
        return value

    return value.strip().lower()


class WrongParameterUsageException(Exception):
    def __init__(self, param_name, error_message) -> None:
        super().__init__(error_message)
        self.param_name = param_name


def get_sorted_config(param_config):
    key_order = ['name', 'required',
                 'param',
                 'same_arg_param',
                 'type', 'no_value', 'default', 'constant', 'description',
                 'secure',
                 'values',
                 'min',
                 'max',
                 'multiselect_argument_type',
                 'separator',
                 'file_dir',
                 'file_recursive',
                 'file_type',
                 'file_extensions',
                 'excluded_files']

    def get_order(key):
        if key in key_order:
            return key_order.index(key)
        else:
            return 100

    sorted_config = OrderedDict(sorted(param_config.items(), key=lambda item: get_order(item[0])))
    return sorted_config