HEX
Server: LiteSpeed
System: Linux CentOS-79-64-minimal 3.10.0-1160.119.1.el7.x86_64 #1 SMP Tue Jun 4 14:43:51 UTC 2024 x86_64
User: vishn3436 (5293)
PHP: 8.0.15
Disabled: NONE
Upload Files
File: //scripts/script-server/src/tests/web/script_config_socket_test.py
import json
import os
from urllib.parse import quote

import tornado.concurrent
import tornado.escape
import tornado.ioloop
import tornado.web
import tornado.websocket
from tornado import testing, httpserver

from auth.authorization import Authorizer, ANY_USER, EmptyGroupProvider
from auth.identification import IpBasedIdentification
from auth.tornado_auth import TornadoAuth
from config.config_service import ConfigService
from model.trusted_ips import TrustedIpValidator
from tests import test_utils
from web.script_config_socket import ScriptConfigSocket


class ScriptConfigSocketTest(testing.AsyncTestCase):
    @testing.gen_test
    def test_initial_config(self):
        self.socket = yield self._connect('Test script 1')
        response = yield self.socket.read_message()
        self.assertIsNone(self.socket.close_reason)

        self.assert_model(response, 'initialConfig')

    @testing.gen_test
    def test_initial_config_when_init_with_values(self):
        self.socket = yield self._connect('Test script 1', init_with_values=True)
        self.assert_no_messages()

        self.socket.write_message(json.dumps({
            'event': 'initialValues',
            'data': {'parameterValues': {'list 1': 'B', 'file 1': 'y', 'list 2': 'y3.txt'}}}))

        response = yield self.socket.read_message()
        self.assertIsNone(self.socket.close_reason)

        self.assert_model(response, 'initialConfig', list2_values=['y1.txt', 'y2.txt', 'y3.txt'])

    @testing.gen_test
    def test_reload_model(self):
        self.socket = yield self._connect('Test script 1')
        _ = yield self.socket.read_message()

        self.socket.write_message(json.dumps({
            'event': 'reloadModelValues',
            'data': {'clientModelId': 'abcd',
                     'parameterValues': {'list 1': 'A', 'file 1': 'z', 'list 2': 'z1.txt'}}}))

        response = yield self.socket.read_message()
        self.assertIsNone(self.socket.close_reason)

        self.assert_model(response, 'reloadedConfig',
                          list2_values=['z1.txt', 'z2.txt', 'z3.txt'],
                          external_model_id='abcd')

    @testing.gen_test
    def test_client_version(self):
        self.socket = yield self._connect('Test script 1')
        _ = yield self.socket.read_message()

        self.socket.write_message(json.dumps({
            'event': 'parameterValue',
            'data': {'parameter': 'file 1',
                     'value': 'x',
                     'clientStateVersion': 2}}))

        self.socket.write_message(json.dumps({
            'event': 'parameterValue',
            'data': {'parameter': 'text 1',
                     'value': 'included',
                     'clientStateVersion': 3}}))

        self.socket.write_message(json.dumps({
            'event': 'parameterValue',
            'data': {'parameter': 'file 1',
                     'value': 'z',
                     'clientStateVersion': 4}}))

        self.socket.write_message(json.dumps({
            'event': 'parameterValue',
            'data': {'parameter': 'file 1',
                     'value': 'z',
                     'clientStateVersion': 5}}))

        self.socket.write_message(json.dumps({
            'event': 'parameterValue',
            'data': {'parameter': 'text 1',
                     'value': 'inc',
                     'clientStateVersion': 6}}))

        self.socket.write_message(json.dumps({
            'event': 'reloadModelValues',
            'data': {'clientModelId': 'abcd',
                     'parameterValues': {'list 1': 'A', 'file 1': 'y', 'list 2': 'y1.txt'},
                     'clientStateVersion': 7}}))

        self._assert_parameter_change((yield self.socket.read_message()),
                                      _list2(['x1.txt', 'x2.txt', 'x3.txt']), 2)
        self._assert_version_beat((yield self.socket.read_message()), 2)

        self._assert_parameter_added((yield self.socket.read_message()), _included_text2(), 3)
        self._assert_version_beat((yield self.socket.read_message()), 3)

        self._assert_parameter_change((yield self.socket.read_message()),
                                      _list2(['z1.txt', 'z2.txt', 'z3.txt']), 4)
        self._assert_version_beat((yield self.socket.read_message()), 4)

        self._assert_version_beat((yield self.socket.read_message()), 5)

        self._assert_parameter_removed((yield self.socket.read_message()), _included_text2()['name'], 6)
        self._assert_version_beat((yield self.socket.read_message()), 6)

        response3 = yield self.socket.read_message()
        self.assert_model(response3, 'reloadedConfig',
                          list2_values=['y1.txt', 'y2.txt', 'y3.txt'],
                          external_model_id='abcd',
                          client_version=7)

    def assert_model(self, response, event_type, external_model_id=None, list2_values=None,
                     client_version=None):
        event = json.loads(response)
        if 'id' in event['data']:
            del event['data']['id']

        if list2_values is None:
            list2_values = []

        self.assertEqual(
            {'event': event_type,
             'data': {'clientModelId': external_model_id, 'name': 'Test script 1', 'outputFormat': 'terminal',
                      'description': None, 'schedulable': False, 'parameters': [
                     _text1(),
                     _list1(),
                     _file1(),
                     _list2(list2_values)],
                      'clientStateVersion': client_version}},
            event)

    def _assert_parameter_change(self, response, parameter, client_version=None):
        event = json.loads(response)

        data = dict(parameter)
        data['clientStateVersion'] = client_version

        self.assertEqual(
            {'event': 'parameterChanged',
             'data': data},
            event)

    def _assert_parameter_added(self, response, parameter, client_version=None):
        event = json.loads(response)

        data = dict(parameter)
        data['clientStateVersion'] = client_version

        self.assertEqual(
            {'event': 'parameterAdded',
             'data': data},
            event)

    def _assert_parameter_removed(self, response, parameter_name, client_version=None):
        event = json.loads(response)

        self.assertEqual(
            {'event': 'parameterRemoved',
             'data': {
                 'clientStateVersion': client_version,
                 'parameterName': parameter_name
             }},
            event)

    def _assert_version_beat(self, response, client_version):
        event = json.loads(response)

        self.assertEqual(
            {'data': {'clientStateVersion': client_version}, 'event': 'clientStateVersionAccepted'},
            event)

    def _connect(self, script_name, init_with_values=False):
        url = 'ws://localhost:{}/scripts/{}'.format(self.port, quote(script_name))
        if init_with_values:
            url += '?initWithValues=True'

        return tornado.websocket.websocket_connect(url)

    def setUp(self):
        super().setUp()

        self.socket = None

        application = tornado.web.Application([(r'/scripts/([^/]*)', ScriptConfigSocket)],
                                              login_url='/login.html',
                                              cookie_secret='12345')
        application.auth = TornadoAuth(None)
        application.authorizer = Authorizer(ANY_USER, [], [], [], EmptyGroupProvider())
        application.identification = IpBasedIdentification(TrustedIpValidator(['127.0.0.1']), None)
        application.config_service = ConfigService(application.authorizer, test_utils.temp_folder)

        server = httpserver.HTTPServer(application)
        socket, self.port = testing.bind_unused_port()
        server.add_socket(socket)

        test_utils.setup()

        for dir in ['x', 'y', 'z']:
            for file in range(1, 4):
                filename = dir + str(file) + '.txt'
                test_utils.create_file(os.path.join('test1_files', dir, filename))

        test1_files_path = os.path.join(test_utils.temp_folder, 'test1_files')
        test_utils.write_script_config(
            {'name': 'Test script 1',
             'script_path': 'ls',
             'include': '${text 1}.json',
             'parameters': [
                 test_utils.create_script_param_config('text 1', required=True),
                 test_utils.create_script_param_config('list 1', type='list',
                                                       allowed_values=['A', 'B', 'C']),
                 test_utils.create_script_param_config('file 1', type='server_file',
                                                       file_dir=test1_files_path),
                 test_utils.create_script_param_config('list 2', type='list',
                                                       values_script='ls ' + test1_files_path + '/${file 1}')
             ]},
            'test_script_1')

        test_utils.write_script_config(
            {
                'parameters': [
                    test_utils.create_script_param_config('included constant', constant=True, default='abc'),
                    test_utils.create_script_param_config('included text 2'),
                ]},
            'included')

    def tearDown(self) -> None:
        if self.socket:
            self.socket.close()

        super().tearDown()
        test_utils.cleanup()

    def assert_no_messages(self):
        try:
            response = yield self.socket.read_queue.get(timeout=50)
            self.fail('Non-expected message received: ' + response)
        except TimeoutError:
            self.assertIsNone(self.socket.close_reason)


def _text1():
    return {'name': 'text 1', 'description': None, 'withoutValue': False, 'required': True, 'default': None,
            'type': 'text', 'min': None, 'max': None, 'max_length': None, 'values': None, 'secure': False,
            'fileRecursive': False, 'fileType': None,
            'requiredParameters': []
            }


def _list1():
    return {'name': 'list 1', 'description': None, 'withoutValue': False, 'required': False, 'default': None,
            'type': 'list', 'min': None, 'max': None, 'max_length': None, 'values': ['A', 'B', 'C'],
            'secure': False, 'fileRecursive': False, 'fileType': None,
            'requiredParameters': []}


def _file1():
    return {'name': 'file 1', 'description': None, 'withoutValue': False, 'required': False, 'default': None,
            'type': 'server_file', 'min': None, 'max': None, 'max_length': None, 'values': ['x', 'y', 'z'],
            'secure': False, 'fileRecursive': False, 'fileType': None,
            'requiredParameters': []}


def _list2(list2_values):
    return {'name': 'list 2', 'description': None, 'withoutValue': False, 'required': False, 'default': None,
            'type': 'list', 'min': None, 'max': None, 'max_length': None, 'values': list2_values,
            'secure': False,
            'fileRecursive': False, 'fileType': None,
            'requiredParameters': ['file 1']}


def _included_text2():
    return {'name': 'included text 2', 'description': None, 'withoutValue': False, 'required': False, 'default': None,
            'type': 'text', 'min': None, 'max': None, 'max_length': None, 'values': None, 'secure': False,
            'fileRecursive': False, 'fileType': None,
            'requiredParameters': []
            }