HEX
Server: LiteSpeed
System: Linux CentOS-79-64-minimal 3.10.0-1160.119.1.el7.x86_64 #1 SMP Tue Jun 4 14:43:51 UTC 2024 x86_64
User: vishn3436 (5293)
PHP: 8.0.15
Disabled: NONE
Upload Files
File: //scripts/script-server/src/web/script_config_socket.py
#!/usr/bin/env python3
import functools
import json
import logging.config
import uuid
from concurrent.futures.thread import ThreadPoolExecutor

import tornado.concurrent
import tornado.escape
import tornado.ioloop
import tornado.web
import tornado.websocket
from tornado import gen

from auth.user import User
from config.config_service import ConfigNotAllowedException
from model import external_model
from model.external_model import parameter_to_external
from model.model_helper import read_bool
from model.script_config import ConfigModel
from web.web_auth_utils import check_authorization
from web.web_utils import wrap_to_server_event, inject_user

LOGGER = logging.getLogger('web.script_config_socket')

active_config_models = {}


class ScriptConfigSocket(tornado.websocket.WebSocketHandler):
    user: User
    config_mode: ConfigModel

    # noinspection PyTypeChecker
    def __init__(self, application, request, **kwargs):
        super().__init__(application, request, **kwargs)

        self.config_model = None
        self.config_name = None
        self.user = None
        self.config_id = str(uuid.uuid4())

        self.init_with_values = read_bool(self.get_query_argument('initWithValues', default='false'))

        self.parameters_listener = self._create_parameters_listener()
        self._executor = ThreadPoolExecutor(max_workers=1, thread_name_prefix='-model-' + self.config_id)
        self._parameter_events_queue = []
        self._latest_client_state_version = None

    @check_authorization
    @inject_user
    @gen.coroutine
    def open(self, user, config_name):
        self.config_name = config_name
        self.user = user
        self.ioloop = tornado.ioloop.IOLoop.current()

        if self.init_with_values:
            return

        yield self._prepare_and_send_model(event_type='initialConfig')

    @gen.coroutine
    def on_message(self, text):
        try:
            message = json.loads(text)

            type = message.get('event')
            data = message.get('data')

            if type == 'parameterValue':
                set_parameter_func = functools.partial(
                    self._set_parameter_value,
                    data.get('parameter'),
                    data.get('value'),
                    data.get('clientStateVersion'))

                self._start_task(set_parameter_func)
                return
            elif type == 'reloadModelValues':
                parameter_values = data.get('parameterValues')
                external_id = data.get('clientModelId')

                yield self._prepare_and_send_model(parameter_values=parameter_values,
                                                   external_id=external_id,
                                                   event_type='reloadedConfig',
                                                   client_state_version=data.get('clientStateVersion'))
                return
            elif type == 'initialValues':
                if not self.init_with_values:
                    logging.warning('Received initial values, but not expected. Ignoring')
                    return
                if self.config_model:
                    logging.warning('Received initial values, but model is already initialized. Ignoring')
                    return
                parameter_values = data.get('parameterValues')

                yield self._prepare_and_send_model(parameter_values=parameter_values,
                                                   event_type='initialConfig')
                return

            LOGGER.warning('Unknown message received in ScriptConfigSocket: ' + text)
        except:
            LOGGER.exception('Failed to process message ' + text)

    def _start_task(self, func):
        future = tornado.ioloop.IOLoop.current().run_in_executor(
            executor=self._executor,
            func=func)

        return future

    def _set_parameter_value(self, parameter, value, client_state_version):
        self._latest_client_state_version = client_state_version
        self.config_model.set_param_value(parameter, value)
        self._send_parameter_event('clientStateVersionAccepted', {})

    def on_close(self):
        if self.config_id in active_config_models:
            del active_config_models[self.config_id]

    def safe_write(self, message):
        if self.ws_connection is not None:
            self.ioloop.add_callback(self.write_message, message)

    def _send_parameter_changed(self, parameter):
        external_param = parameter_to_external(parameter)
        if external_param is None:
            return

        self._send_parameter_event('parameterChanged', external_param)

    def _send_parameter_event(self, event_type, data):
        event = self._create_event(event_type, data)

        self.safe_write(event)

    def _subscribe_on_parameter(self, parameter):
        parameter.subscribe(lambda prop, old, new: self._send_parameter_changed(parameter))

    def _set_model(self, config_model: ConfigModel):
        if self.config_model is not None:
            self.config_model.parameters.unsubscribe(self.parameters_listener)

        self.config_model = config_model
        self.config_model.parameters.subscribe(self.parameters_listener)

        active_config_models[self.config_id] = {'model': self.config_model, 'user_id': self.user.user_id}

        for parameter in self.config_model.parameters:
            self._subscribe_on_parameter(parameter)

    def _create_parameters_listener(self):
        socket = self

        class ParameterListener:
            def on_add(self, parameter, index):
                external_parameter = parameter_to_external(parameter)
                if external_parameter is not None:
                    socket._send_parameter_event('parameterAdded', external_parameter)
                    socket._subscribe_on_parameter(parameter)

            def on_remove(self, parameter):
                external_parameter = parameter_to_external(parameter)
                if external_parameter is not None:
                    socket._send_parameter_event('parameterRemoved', {'parameterName': external_parameter['name']})

        return ParameterListener()

    @gen.coroutine
    def _prepare_model(self, config_name, user, parameter_values=None, skip_invalid_parameters=False) -> ConfigModel:
        try:
            socket = self

            def load_model():
                model = socket.application.config_service.load_config_model(config_name, user,
                                                                            parameter_values=parameter_values,
                                                                            skip_invalid_parameters=skip_invalid_parameters)

                socket._parameter_events_queue.clear()
                return model

            config_model = yield (self._start_task(load_model))

        except ConfigNotAllowedException:
            self.close(code=403, reason='Access to the script is denied')
            return None
        except Exception:
            message = 'Failed to load script config ' + config_name
            LOGGER.exception(message)
            self.close(code=500, reason=message)
            return None

        if not config_model:
            self.close(code=404, reason='Could not find a script by name')
            return None

        raise gen.Return(config_model)

    @gen.coroutine
    def _prepare_and_send_model(self, *, parameter_values=None, external_id=None, event_type,
                                client_state_version=None):
        config_model = yield self._prepare_model(self.config_name,
                                                 self.user,
                                                 parameter_values,
                                                 skip_invalid_parameters=True)
        if not config_model:
            return
        if client_state_version:
            self._latest_client_state_version = client_state_version

        self._set_model(config_model)

        new_config = external_model.config_to_external(config_model, self.config_id, external_id)
        self.safe_write(self._create_event(event_type, new_config))

    def _create_event(self, event_type, data):
        data['clientStateVersion'] = self._latest_client_state_version
        return wrap_to_server_event(event_type=event_type, data=data)