HEX
Server: LiteSpeed
System: Linux CentOS-79-64-minimal 3.10.0-1160.119.1.el7.x86_64 #1 SMP Tue Jun 4 14:43:51 UTC 2024 x86_64
User: vishn3436 (5293)
PHP: 8.0.15
Disabled: NONE
Upload Files
File: //scripts/script-server/src/features/executions_callback_feature.py
import logging
from collections import OrderedDict

from communications.communicaton_service import CommunicationsService
from communications.destination_script import ScriptDestination
from execution.execution_service import ExecutionService
from model.model_helper import read_bool_from_config, read_list

LOGGER = logging.getLogger('script_server.execution_callbacks')

_EXIT_CODE_FIELD = 'exit_code'
_DEFAULT_NOTIFICATION_FIELDS = ['execution_id', 'pid', 'script_name', 'user', _EXIT_CODE_FIELD]


def _init_destinations(destinations_config):
    destinations = []

    for destination_config in destinations_config:
        destination_type = destination_config.get('type')

        if destination_type == 'email':
            import communications.destination_email as email
            destination = email.EmailDestination(destination_config)
        elif destination_type == 'http':
            import communications.destination_http as http
            destination = http.HttpDestination(destination_config)
        elif destination_type == 'script':
            destination = ScriptDestination(destination_config)
        else:
            raise Exception('Unknown destination type: ' + destination_type)

        destinations.append(destination)

    return destinations


class ExecutionsCallbackFeature:
    def __init__(self,
                 execution_service: ExecutionService,
                 config):
        self._execution_service = execution_service

        if config is None:
            self.notify_on_start = False
            self.notify_on_finish = False
            return

        self.notify_on_start = read_bool_from_config('notify_on_start', config, default=True)
        self.notify_on_finish = read_bool_from_config('notify_on_finish', config, default=True)

        destinations_config = read_list(config, 'destinations', [])
        if not destinations_config:
            LOGGER.warning('Execution callback destinations are missing! Please specify any')
            self.notify_on_start = False
            self.notify_on_finish = False
            return

        destinations = _init_destinations(destinations_config)
        self._communication_service = CommunicationsService(destinations)

        self.notification_fields = read_list(config, 'notification_fields', default=_DEFAULT_NOTIFICATION_FIELDS)

    def _subscribe_execution_listener(self):
        execution_service = self._execution_service

        if self.notify_on_start:
            def started(execution_id, user):
                notification_object = self.prepare_notification_object(execution_id, 'execution_started', user)
                if _EXIT_CODE_FIELD in notification_object:
                    del notification_object[_EXIT_CODE_FIELD]
                title = 'Execution ' + str(execution_id) + ' started'

                self._communication_service.send(title, notification_object)

            execution_service.add_start_listener(started)

        if self.notify_on_finish:
            def finished(execution_id, user):
                notification_object = self.prepare_notification_object(execution_id, 'execution_finished', user)

                title = 'Execution ' + str(execution_id) + ' finished'
                self._communication_service.send(title, notification_object)

            execution_service.add_finish_listener(finished)

    def start(self):
        self._subscribe_execution_listener()

    def prepare_notification_object(self, execution_id, event_type, user):
        execution_service = self._execution_service
        pid = execution_service.get_process_id(execution_id)
        script_name = execution_service.get_config(execution_id, user).name

        notification_object = OrderedDict()

        notification_object['execution_id'] = execution_id
        notification_object['pid'] = pid
        notification_object['script_name'] = script_name
        notification_object['user'] = user.user_id
        notification_object[_EXIT_CODE_FIELD] = execution_service.get_exit_code(execution_id)

        all_fields = list(notification_object.keys())
        for field in all_fields:
            if field not in self.notification_fields:
                del notification_object[field]

        notification_object['event_type'] = event_type
        notification_object.move_to_end('event_type', False)

        return notification_object

    # tests only
    def _wait(self):
        self._communication_service._wait()