HEX
Server: LiteSpeed
System: Linux CentOS-79-64-minimal 3.10.0-1160.119.1.el7.x86_64 #1 SMP Tue Jun 4 14:43:51 UTC 2024 x86_64
User: vishn3436 (5293)
PHP: 8.0.15
Disabled: NONE
Upload Files
File: //scripts/script-server/src/tests/list_values_test.py
import os
import unittest

from parameterized import parameterized

from config.script.list_values import DependantScriptValuesProvider, FilesProvider, ScriptValuesProvider
from tests import test_utils
from tests.test_utils import create_parameter_model
from utils import file_utils
from utils.process_utils import ExecutionException


class ScriptValuesProviderTest(unittest.TestCase):
    @parameterized.expand([(True,), (False,)])
    def test_ls_3_files(self, shell):
        test_utils.create_files(['f1', 'f2', 'f3'])
        provider = ScriptValuesProvider('ls "' + test_utils.temp_folder + '"',
                                        shell=shell)
        self.assertEqual(['f1', 'f2', 'f3'], provider.get_values({}))

    @parameterized.expand([(True,), (False,)])
    def test_ls_no_files(self, shell):
        provider = ScriptValuesProvider('ls "' + test_utils.temp_folder + '"',
                                        shell=shell)
        self.assertEqual([], provider.get_values({}))

    def test_ls_3_files_when_bash_operator(self):
        test_utils.create_files(['f1', 'f2', 'f3'])
        self.assertRaises(ExecutionException,
                          ScriptValuesProvider,
                          'ls "' + test_utils.temp_folder + '" | grep 2',
                          shell=False)

    def test_ls_3_files_when_bash_operator_and_shell(self):
        test_utils.create_files(['f1', 'f2', 'f3'])
        provider = ScriptValuesProvider('ls "' + test_utils.temp_folder + '" | grep 2',
                                        shell=True)
        self.assertEqual(['f2'], provider.get_values({}))

    def setUp(self) -> None:
        super().setUp()

        test_utils.setup()

    def tearDown(self) -> None:
        super().tearDown()

        test_utils.cleanup()


class DependantScriptValuesProviderTest(unittest.TestCase):

    @parameterized.expand([(True,), (False,)])
    def test_get_required_parameters_when_single_dependency(self, shell):
        values_provider = DependantScriptValuesProvider(
            'ls ${param1}',
            self.create_parameters_supplier('param1'),
            shell=shell)

        self.assertCountEqual(['param1'], values_provider.get_required_parameters())

    @parameterized.expand([(True,), (False,)])
    def test_get_required_parameters_when_single_dependency_and_many_params(self, shell):
        values_provider = DependantScriptValuesProvider(
            'ls ${param1}',
            self.create_parameters_supplier('param1', 'param2', 'param3'),
            shell=shell)
        self.assertCountEqual(['param1'], values_provider.get_required_parameters())

    @parameterized.expand([(True,), (False,)])
    def test_get_required_parameters_when_multiple_dependencies(self, shell):
        values_provider = DependantScriptValuesProvider(
            'ls ${param1}/${param2}',
            self.create_parameters_supplier('param1', 'param2', 'param3'),
            shell=shell)
        self.assertCountEqual(['param1', 'param2'], values_provider.get_required_parameters())

    @parameterized.expand([(True,), (False,)])
    def test_get_values_when_no_values(self, shell):
        values_provider = DependantScriptValuesProvider(
            'ls ${param1}',
            self.create_parameters_supplier('param1'),
            shell=shell)
        self.assertEqual([], values_provider.get_values({}))

    @parameterized.expand([(True,), (False,)])
    def test_get_values_when_single_parameter(self, shell):
        values_provider = DependantScriptValuesProvider(
            "echo '_${param1}_'",
            self.create_parameters_supplier('param1'),
            shell=shell)
        self.assertEqual(['_hello world_'], values_provider.get_values({'param1': 'hello world'}))

    @parameterized.expand([(True,), (False,)])
    def test_get_values_when_multiple_parameters(self, shell):
        files_path = os.path.join(test_utils.temp_folder, 'path1', 'path2')
        for i in range(0, 5):
            file_utils.write_file(os.path.join(files_path, 'f' + str(i) + '.txt'), 'test')

        values_provider = DependantScriptValuesProvider(
            'ls ' + test_utils.temp_folder + '/${param1}/${param2}',
            self.create_parameters_supplier('param1', 'param2'),
            shell=shell)
        self.assertEqual(['f0.txt', 'f1.txt', 'f2.txt', 'f3.txt', 'f4.txt'],
                         values_provider.get_values({'param1': 'path1', 'param2': 'path2'}))

    @parameterized.expand([(True,), (False,)])
    def test_get_values_when_parameter_repeats(self, shell):
        values_provider = DependantScriptValuesProvider(
            "echo '_${param1}_\n' 'test\n' '+${param1}+'",
            self.create_parameters_supplier('param1'),
            shell=shell)
        self.assertEqual(['_123_', ' test', ' +123+'], values_provider.get_values({'param1': '123'}))

    @parameterized.expand([(True,), (False,)])
    def test_get_values_when_numeric_parameter(self, shell):
        values_provider = DependantScriptValuesProvider(
            "echo '_${param1}_'",
            self.create_parameters_supplier('param1'),
            shell=shell)
        self.assertEqual(['_123_'], values_provider.get_values({'param1': 123}))

    @parameterized.expand([(True,), (False,)])
    def test_get_values_when_newline_response(self, shell):
        values_provider = DependantScriptValuesProvider(
            "ls '${param1}'",
            self.create_parameters_supplier('param1'),
            shell=shell)
        self.assertEqual([], values_provider.get_values({'param1': test_utils.temp_folder}))

    @parameterized.expand([(True, ['1', '2']), (False, ['1 && echo 2'])])
    def test_no_code_injection_for_and_operator(self, shell, expected_values):
        values_provider = DependantScriptValuesProvider(
            "echo ${param1}",
            self.create_parameters_supplier('param1'),
            shell=shell)
        self.assertEqual(expected_values, values_provider.get_values({'param1': '1 && echo 2'}))

    @parameterized.expand([(True, ['y2', 'y3']), (False, [])])
    def test_no_code_injection_for_pipe_operator(self, shell, expected_values):
        test_utils.create_files(['x1', 'y2', 'y3'])

        values_provider = DependantScriptValuesProvider(
            "ls ${param1}",
            self.create_parameters_supplier('param1'),
            shell=shell)
        self.assertEqual(expected_values, values_provider.get_values({'param1': test_utils.temp_folder + ' | grep y'}))

    @parameterized.expand([(True,), (False,)])
    def test_script_fails(self, shell):
        values_provider = DependantScriptValuesProvider(
            "echo2 ${param1}",
            self.create_parameters_supplier('param1'),
            shell=shell)
        self.assertEqual([], values_provider.get_values({'param1': 'abc'}))

    def setUp(self):
        test_utils.setup()

    def tearDown(self):
        test_utils.cleanup()

    def create_parameters_supplier(self, *param_names):
        result = []

        for param_name in param_names:
            parameter = create_parameter_model(param_name, all_parameters=result)
            parameter.name = param_name
            result.append(parameter)

        return lambda: result


class FilesProviderTest(unittest.TestCase):
    def test_no_files(self):
        provider = FilesProvider(test_utils.temp_folder)
        self.assertEqual([], provider.get_values({}))

    def test_multiple_files(self):
        test_utils.create_files(['My.txt', 'file.dat', 'test.sh', 'file2.txt'])
        test_utils.create_dir('documents')
        test_utils.create_files(['another.txt'], 'documents')

        provider = FilesProvider(test_utils.temp_folder, file_extensions=['txt'])
        self.assertEqual(['file2.txt', 'My.txt'], provider.get_values({}))

    def test_invalid_file_dir(self):
        provider = FilesProvider('some_missing_folder')
        self.assertEqual([], provider.get_values({}))

    def setUp(self):
        test_utils.setup()

    def tearDown(self):
        test_utils.cleanup()