HEX
Server: LiteSpeed
System: Linux CentOS-79-64-minimal 3.10.0-1160.119.1.el7.x86_64 #1 SMP Tue Jun 4 14:43:51 UTC 2024 x86_64
User: vishn3436 (5293)
PHP: 8.0.15
Disabled: NONE
Upload Files
File: //scripts/script-server/src/execution/execution_service.py
import logging
from collections import namedtuple
from typing import Optional, Dict, Callable, Any

from auth.authorization import Authorizer, is_same_user
from auth.user import User
from execution.executor import ScriptExecutor
from model import script_config
from model.model_helper import is_empty, AccessProhibitedException
from utils.exceptions.missing_arg_exception import MissingArgumentException
from utils.exceptions.not_found_exception import NotFoundException

LOGGER = logging.getLogger('script_server.execution_service')

_ExecutionInfo = namedtuple('_ExecutionInfo',
                            ['execution_id', 'owner_user', 'audit_name', 'config', 'audit_command'])


class ExecutionService:
    def __init__(self, authorizer, id_generator):

        self._id_generator = id_generator
        self._authorizer = authorizer  # type: Authorizer

        self._executors = {}  # type: Dict[str, ScriptExecutor]
        self._execution_infos = {}  # type: Dict[str, _ExecutionInfo]

        # active from user perspective:
        #   - either they are running
        #   - OR user haven't yet seen execution results
        self._active_executor_ids = set()

        self._finish_listeners = []
        self._start_listeners = []

    def get_active_executor(self, execution_id, user):
        self.validate_execution_id(execution_id, user, only_active=False)
        if execution_id not in self._active_executor_ids:
            return None

        return self._executors.get(execution_id)

    def start_script(self, config, values, user: User):
        audit_name = user.get_audit_name()

        executor = ScriptExecutor(config, values)
        execution_id = self._id_generator.next_id()

        audit_command = executor.get_secure_command()
        LOGGER.info('Calling script #%s: %s', execution_id, audit_command)

        executor.start()
        self._executors[execution_id] = executor
        self._execution_infos[execution_id] = _ExecutionInfo(
            execution_id=execution_id,
            owner_user=user,
            audit_name=audit_name,
            audit_command=audit_command,
            config=config)
        self._active_executor_ids.add(execution_id)

        self._add_post_finish_handling(execution_id, executor, user)

        self._fire_execution_started(execution_id, user)

        return execution_id

    def stop_script(self, execution_id, user):
        self.validate_execution_id(execution_id, user)

        if execution_id in self._executors:
            self._executors[execution_id].stop()

    def kill_script(self, execution_id, user):
        self.validate_execution_id(execution_id, user)

        if execution_id in self._executors:
            self._executors[execution_id].kill()

    def kill_script_by_system(self, execution_id):
        if execution_id in self._executors:
            self._executors[execution_id].kill()

    def get_exit_code(self, execution_id):
        return self._get_for_executor(execution_id, lambda e: e.get_return_code())

    def is_running(self, execution_id, user):
        executor = self._executors.get(execution_id)  # type: ScriptExecutor
        if executor is None:
            return False

        self.validate_execution_id(execution_id, user, only_active=False, allow_when_history_access=True)

        return not executor.is_finished()

    def get_active_executions(self, user_id):
        result = []
        for id in self._active_executor_ids:
            execution_info = self._execution_infos[id]

            if self._can_access_execution(execution_info, user_id):
                result.append(id)

        return result

    def get_running_executions(self):
        result = []
        for id, executor in self._executors.items():
            if executor.is_finished():
                continue
            result.append(id)

        return result

    def get_config(self, execution_id, user) -> Optional[script_config.ConfigModel]:
        self.validate_execution_id(execution_id, user)

        return self._get_for_execution_info(execution_id,
                                            lambda i: i.config)

    def is_active(self, execution_id):
        return execution_id in self._active_executor_ids

    def can_access(self, execution_id, user_id):
        execution_info = self._execution_infos.get(execution_id)
        return self._can_access_execution(execution_info, user_id)

    def validate_execution_id(self, execution_id, user, only_active=True, allow_when_history_access=False):
        if is_empty(execution_id):
            raise MissingArgumentException('Execution id is missing', 'execution_id')

        if only_active and (not self.is_active(execution_id)):
            raise NotFoundException('No (active) executor found for id ' + execution_id)

        if not self.can_access(execution_id, user.user_id) \
                and not (allow_when_history_access and self._has_full_history_rights(user.user_id)):
            LOGGER.warning('Prohibited access to not owned execution #%s (user=%s)',
                           execution_id, str(user))
            raise AccessProhibitedException('Prohibited access to not owned execution')

    @staticmethod
    def _can_access_execution(execution_info: _ExecutionInfo, user_id):
        return (execution_info is not None) and (is_same_user(execution_info.owner_user.user_id, user_id))

    def get_user_parameter_values(self, execution_id):
        return self._get_for_executor(execution_id,
                                      lambda e: e.get_user_parameter_values())

    def get_script_parameter_values(self, execution_id):
        return self._get_for_executor(execution_id,
                                      lambda e: e.get_script_parameter_values())

    def get_owner(self, execution_id):
        return self._get_for_execution_info(execution_id,
                                            lambda i: i.owner_user.user_id)

    def get_audit_name(self, execution_id):
        return self._get_for_execution_info(execution_id,
                                            lambda i: i.owner_user.get_audit_name())

    def get_audit_command(self, execution_id):
        return self._get_for_execution_info(execution_id,
                                            lambda i: i.audit_command)

    def get_all_audit_names(self, execution_id):
        return self._get_for_execution_info(execution_id,
                                            lambda i: i.owner_user.audit_names)

    def get_anonymized_output_stream(self, execution_id):
        return self._get_for_executor(execution_id,
                                      lambda e: e.get_anonymized_output_stream())

    def get_raw_output_stream(self, execution_id, user_id):
        owner = self.get_owner(execution_id)

        def getter(executor):
            if user_id != owner:
                LOGGER.warning(user_id + ' tried to access execution #' + execution_id + ' with owner ' + owner)
            return executor.get_raw_output_stream()

        return self._get_for_executor(execution_id, getter)

    def get_process_id(self, execution_id):
        return self._get_for_executor(execution_id,
                                      lambda e: e.get_process_id())

    def _get_for_executor(self, execution_id, getter: Callable[[ScriptExecutor], Any]):
        executor = self._executors.get(execution_id)
        if executor is None:
            return None

        return getter(executor)

    def _get_for_execution_info(self, execution_id, getter: Callable[[_ExecutionInfo], Any]):
        info = self._execution_infos.get(execution_id)
        if info is None:
            return None

        return getter(info)

    def cleanup_execution(self, execution_id, user):
        try:
            self.validate_execution_id(execution_id, user)
        except NotFoundException:
            return

        executor = self._executors.get(execution_id)

        if not executor.is_finished():
            raise Exception('Executor ' + execution_id + ' is not yet finished')

        executor.cleanup()
        self._active_executor_ids.remove(execution_id)

    def add_finish_listener(self, callback, execution_id=None):
        if execution_id is None:
            self._finish_listeners.append(callback)

        else:
            executor = self._executors.get(execution_id)
            if not executor:
                LOGGER.error('Failed to find executor for id ' + execution_id)
                return

            class FinishListener:
                def finished(self):
                    callback()

            executor.add_finish_listener(FinishListener())

    def _add_post_finish_handling(self, execution_id, executor, user):
        self_service = self

        class FinishListener:
            def finished(self):
                self_service._fire_execution_finished(execution_id, user)

        executor.add_finish_listener(FinishListener())

    def _fire_execution_finished(self, execution_id, user):
        for callback in self._finish_listeners:
            try:
                callback(execution_id, user)
            except:
                LOGGER.exception('Could not notify finish listener (%s), execution: %s', str(callback), execution_id)

    def add_start_listener(self, callback):
        self._start_listeners.append(callback)

    def _fire_execution_started(self, execution_id, user):
        for callback in self._start_listeners:
            try:
                callback(execution_id, user)
            except:
                LOGGER.exception('Could not notify start listener (%s), execution: %s', str(callback), execution_id)

    def _has_full_history_rights(self, user_id):
        return self._authorizer.has_full_history_access(user_id)