HEX
Server: LiteSpeed
System: Linux CentOS-79-64-minimal 3.10.0-1160.119.1.el7.x86_64 #1 SMP Tue Jun 4 14:43:51 UTC 2024 x86_64
User: vishn3436 (5293)
PHP: 8.0.15
Disabled: NONE
Upload Files
File: //scripts/script-server/src/tests/observable_test.py
import threading
import time
import unittest

from react.observable import Observable, _StoringObserver, ReplayObservable, PipedObservable, \
    read_until_closed


class TestObservable(unittest.TestCase):
    def _test_no_updates(self, observable):
        observer = _StoringObserver()
        observable.subscribe(observer)

        self.assertEqual([], observer.data)

    def test_no_updates(self):
        observable = self.create_observable()

        self._test_no_updates(observable)

    def _test_single_update(self, source, observable):
        observer = _StoringObserver()
        observable.subscribe(observer)

        message = 'test notification'
        source.push(message)

        return observer, message

    def test_single_update(self):
        observable = self.create_observable()

        observer, message = self._test_single_update(observable, observable)
        self.assertEqual([message], observer.data)

    def test_map_single_update(self):
        observable = self.create_observable()
        mapped_observable = observable.map(lambda chunk: chunk + '_x')

        observer, message = self._test_single_update(observable, mapped_observable)
        self.assertEqual([message + '_x'], observer.data)

    def test_time_buffer_single_update(self):
        observable = self.create_observable()
        buffered_observable = observable.time_buffered(10)

        observer, message = self._test_single_update(observable, buffered_observable)
        self.wait_buffer_flush(buffered_observable)

        self.assertEqual([message], observer.data)

    def test_slow_time_buffer_single_update(self):
        observable = self.create_observable()
        buffered_observable = observable.time_buffered(300)

        observer, message = self._test_single_update(observable, buffered_observable)

        self.assertEqual([], observer.data)

        self.wait_buffer_flush(buffered_observable)

        self.assertEqual([message], observer.data)

    def test_time_buffer_aggregate_single_update(self):
        observable = self.create_observable()
        buffered_observable = observable.time_buffered(100, lambda chunks: ['|||'.join(chunks)])

        observer, message = self._test_single_update(observable, buffered_observable)
        self.wait_buffer_flush(buffered_observable)

        self.assertEqual([message], observer.data)

    def _test_multiple_updates(self, source, observable):
        observer = _StoringObserver()
        observable.subscribe(observer)

        messages = ['m1', '', 'message 3']
        for message in messages:
            source.push(message)

        return observer, messages

    def test_multiple_updates(self):
        observable = self.create_observable()

        observer, messages = self._test_multiple_updates(observable, observable)

        self.assertEqual(messages, observer.data)

    def test_map_multiple_updates(self):
        observable = self.create_observable()
        mapped_observable = observable.map(lambda chunk: chunk + '_x')

        observer, messages = self._test_multiple_updates(observable, mapped_observable)
        messages = [m + '_x' for m in messages]

        self.assertEqual(messages, observer.data)

    def test_time_buffer_multiple_updates(self):
        observable = self.create_observable()
        buffered_observable = observable.time_buffered(10)

        observer, messages = self._test_multiple_updates(observable, buffered_observable)
        self.wait_buffer_flush(buffered_observable)

        self.assertEqual(messages, observer.data)

    def test_time_buffer_aggregate_multiple_updates(self):
        observable = self.create_observable()
        buffered_observable = observable.time_buffered(100, lambda chunks: ['|||'.join(chunks)])

        observer, messages = self._test_multiple_updates(observable, buffered_observable)
        self.wait_buffer_flush(buffered_observable)
        aggregated_message = '|||'.join(messages)

        self.assertEqual([aggregated_message], observer.data)

    def _test_multiple_observers_single_update(self, source, observable):
        observer1 = _StoringObserver()
        observable.subscribe(observer1)

        observer2 = _StoringObserver()
        observable.subscribe(observer2)

        observer3 = _StoringObserver()
        observable.subscribe(observer3)

        message = 'message'
        source.push(message)

        return [observer1, observer2, observer3], message

    def test_multiple_observers_single_update(self):
        observable = self.create_observable()

        observers, message = self._test_multiple_observers_single_update(observable, observable)

        for observer in observers:
            self.assertEqual([message], observer.data)

    def test_map_multiple_observers_single_update(self):
        observable = self.create_observable()
        mapped_observable = observable.map(lambda x: '_' + x + '_')

        observers, message = self._test_multiple_observers_single_update(observable, mapped_observable)

        for observer in observers:
            self.assertEqual(['_' + message + '_'], observer.data)

    def _test_multiple_observers_multiple_updates(self, source, observable):
        observer1 = _StoringObserver()
        observable.subscribe(observer1)

        observer2 = _StoringObserver()
        observable.subscribe(observer2)

        observer3 = _StoringObserver()
        observable.subscribe(observer3)

        messages = ['message', 'and another one', 'and the third']
        for message in messages:
            source.push(message)

        return [observer1, observer2, observer3], messages

    def test_multiple_observers_multiple_updates(self):
        observable = self.create_observable()

        observers, messages = self._test_multiple_observers_multiple_updates(observable, observable)
        for observer in observers:
            self.assertEqual(messages, observer.data)

    def test_time_buffer_aggregate_multiple_observers_multiple_updates(self):
        observable = self.create_observable()
        time_buffered = observable.time_buffered(100, lambda chunks: ['replacement'])

        observers, messages = self._test_multiple_observers_multiple_updates(observable, time_buffered)
        self.wait_buffer_flush(time_buffered)

        for observer in observers:
            self.assertEqual(['replacement'], observer.data)

    def _test_push_before_late_subscription(self, source, observable, push_postprocessor=None):
        message = 'test1'
        source.push(message)

        if push_postprocessor is not None:
            push_postprocessor()

        observer = _StoringObserver()
        observable.subscribe(observer)

        return observer, message

    def test_push_before_late_subscription(self):
        observable = self.create_observable()

        observer, _ = self._test_push_before_late_subscription(observable, observable)

        self.assertEqual([], observer.data)

    def test_time_buffer_push_before_late_subscription(self):
        observable = self.create_observable()
        buffered_observable = observable.time_buffered(10)

        observer, _ = self._test_push_before_late_subscription(
            observable,
            buffered_observable,
            lambda: self.wait_buffer_flush(buffered_observable))

        self.assertEqual([], observer.data)

    def test_replay_push_before_late_subscription(self):
        observable = self.create_replay_observable()

        observer, message = self._test_push_before_late_subscription(observable, observable)
        self.assertEqual([message], observer.data)

    def test_pipe_replay_push_before_late_subscription(self):
        observable = self.create_observable()
        replay = self.replay(observable)

        observer, message = self._test_push_before_late_subscription(observable, replay)
        self.assertEqual([message], observer.data)

    def _test_push_after_late_subscription(self, source, observable, push_postprocessor=None):
        early_message = 'test1'
        source.push(early_message)

        if push_postprocessor is not None:
            push_postprocessor()

        observer = _StoringObserver()
        observable.subscribe(observer)

        late_message = 'test3'
        source.push(late_message)

        if push_postprocessor is not None:
            push_postprocessor()

        return observer, early_message, late_message

    def test_push_after_late_subscription(self):
        observable = self.create_observable()

        observer, _, late_message = self._test_push_after_late_subscription(observable, observable)
        self.assertEqual([late_message], observer.data)

    def test_time_buffer_aggregated_push_after_late_subscription(self):
        observable = self.create_observable()
        buffered_observable = observable.time_buffered(100, lambda chunks: ['|'.join(chunks)])

        observer, early_message, late_message = self._test_push_after_late_subscription(
            observable,
            buffered_observable,
            lambda: self.wait_buffer_flush(buffered_observable))

        self.assertEqual([late_message], observer.data)

    def test_replay_push_after_late_subscription(self):
        observable = self.create_replay_observable()

        observer, early_message, late_message = self._test_push_after_late_subscription(observable, observable)
        self.assertEqual([early_message, late_message], observer.data)

    def test_pipe_replay_push_after_late_subscription(self):
        observable = self.create_observable()
        replay = self.replay(observable)

        observer, early_message, late_message = self._test_push_after_late_subscription(observable, replay)
        self.assertEqual([early_message, late_message], observer.data)

    def test_replay_time_buffer_aggregated_push_after_late_subscription(self):
        observable = self.create_observable()
        buffered_observable = observable.time_buffered(100, lambda chunks: ['___'.join(chunks)])
        replay = self.replay(buffered_observable)

        observer, early_message, late_message = self._test_push_after_late_subscription(
            observable,
            replay)

        self.wait_buffer_flush(buffered_observable)

        self.assertEqual([early_message + '___' + late_message], observer.data)

    def _test_multiple_updates_before_late_subscription(self, source, observable):
        messages = ['a', 'b', 'c']

        for message in messages:
            source.push(message)

        observer = _StoringObserver()
        observable.subscribe(observer)

        return observer, messages

    def test_multiple_updates_late_subscription(self):
        observable = self.create_observable()

        observer, _ = self._test_multiple_updates_before_late_subscription(observable, observable)
        self.assertEqual([], observer.data)

    def test_map_multiple_updates_late_subscription(self):
        observable = self.create_observable()
        mapped_observable = observable.map(lambda x: x + '_suffix')

        observer, _ = self._test_multiple_updates_before_late_subscription(observable, mapped_observable)
        self.assertEqual([], observer.data)

    def test_replay_multiple_updates_late_subscription(self):
        observable = self.create_replay_observable()

        observer, messages = self._test_multiple_updates_before_late_subscription(observable, observable)
        self.assertEqual(messages, observer.data)

    def test_pipe_replay_multiple_updates_late_subscription(self):
        observable = self.create_observable()
        replay = self.replay(observable)

        observer, messages = self._test_multiple_updates_before_late_subscription(observable, replay)
        self.assertEqual(messages, observer.data)

    def test_pipe_replay_map_multiple_updates_late_subscription(self):
        observable = self.create_observable()
        replay = self.replay(observable.map(lambda x: x + '_suffix'))

        observer, messages = self._test_multiple_updates_before_late_subscription(observable, replay)
        messages = [x + '_suffix' for x in messages]
        self.assertEqual(messages, observer.data)

    def _test_close(self, source, observable, postclose_callback=None):
        observer = _StoringObserver()
        observable.subscribe(observer)

        source.close()

        if postclose_callback:
            postclose_callback()

        self.assertTrue(observer.closed)
        self.assertTrue(observable.closed)

    def test_close(self):
        observable = self.create_observable()

        self._test_close(observable, observable)

    def test_map_close(self):
        observable = self.create_observable()
        mapped_observable = observable.map(lambda chunk: chunk + '_x')

        self._test_close(observable, mapped_observable)

    def test_time_buffer_close(self):
        observable = self.create_observable()
        buffered_observable = observable.time_buffered(10)

        self._test_close(
            observable,
            buffered_observable,
            lambda: self.wait_buffer_flush(buffered_observable))

    def test_replay_close(self):
        observable = self.create_replay_observable()

        self._test_close(observable, observable)

    def test_pipe_replay_close(self):
        observable = self.create_observable()
        replay = self.replay(observable)

        self._test_close(observable, replay)

    def _test_close_only_subscription_with_callback_arguments(self, source, observable, postclose_callback=None):
        actual_callback_arguments = []

        def callback(value1, value2):
            actual_callback_arguments.append(value1)
            actual_callback_arguments.append(value2)

        observable.subscribe_on_close(callback, 'x', 115)

        source.close()

        if postclose_callback:
            postclose_callback()

        self.assertEqual(['x', 115], actual_callback_arguments)

    def test_close_only_subscription(self):
        observable = self.create_observable()
        self._test_close_only_subscription_with_callback_arguments(observable, observable)

    def test_map_close_only_subscription(self):
        observable = self.create_replay_observable()
        mapped_pipe = observable.map(lambda x: x + '_x')
        self._test_close_only_subscription_with_callback_arguments(observable, mapped_pipe)

    def test_replay_close_only_subscription(self):
        observable = self.create_replay_observable()
        self._test_close_only_subscription_with_callback_arguments(observable, observable)

    def test_close_only_subscription_ignores_on_next(self):
        observable = self.create_observable()

        def callback():
            pass

        observable.subscribe_on_close(callback)

        observable.push('some message')
        observable.close()

    def test_close_multiple_subscribers(self):
        observable = self.create_observable()

        observable.push('test')

        observer1 = _StoringObserver()
        observable.subscribe(observer1)

        observer2 = _StoringObserver()
        observable.subscribe(observer2)

        observable.close()

        self.assertTrue(observer1.closed)
        self.assertTrue(observer2.closed)

    def _test_close_late_subscription(self, source, observable, postclose_callback=None):
        source.push('test')
        source.close()

        if postclose_callback:
            postclose_callback()

        observer = _StoringObserver()
        observable.subscribe(observer)

        self.assertTrue(observer.closed)
        self.assertTrue(observable.closed)

    def test_close_late_subscription(self):
        observable = self.create_observable()

        self._test_close_late_subscription(observable, observable)

    def test_map_close_late_subscription(self):
        observable = self.create_observable()
        mapped_observable = observable.map(lambda chunk: chunk + '_x')

        self._test_close_late_subscription(observable, mapped_observable)

    def test_time_buffer_close_late_subscription(self):
        observable = self.create_observable()
        buffered_observable = observable.time_buffered(10)

        self._test_close_late_subscription(
            observable,
            buffered_observable,
            lambda: self.wait_buffer_flush(buffered_observable))

    def test_replay_close_late_subscription(self):
        observable = self.create_replay_observable()

        self._test_close_late_subscription(observable, observable)

    def test_close_multiple_subscribers_late_subscription(self):
        observable = self.create_observable()

        observable.push('test')
        observable.close()

        observer1 = _StoringObserver()
        observable.subscribe(observer1)

        observer2 = _StoringObserver()
        observable.subscribe(observer2)
        self.assertTrue(observer1.closed)
        self.assertTrue(observer2.closed)

    def _test_wait_close(self, source, observable, postclose_callback=None):
        thread = threading.Thread(target=self.close_delayed, args=[source], daemon=True)
        thread.start()

        observable.wait_close(timeout=0.2)

        if postclose_callback is not None:
            postclose_callback()

        self.assertTrue(observable.closed)

    def test_wait_close(self):
        observable = self.create_observable()

        self._test_wait_close(observable, observable)

    def test_map_wait_close(self):
        observable = self.create_observable()
        mapped_observable = observable.map(lambda chunk: chunk + '_x')

        self._test_wait_close(observable, mapped_observable)

    def test_time_buffer_wait_close(self):
        observable = self.create_observable()
        buffered_observable = observable.time_buffered(10)

        self._test_wait_close(observable, buffered_observable, buffered_observable.wait_close)

    def test_replay_wait_close(self):
        observable = self.create_replay_observable()

        self._test_wait_close(observable, observable)

    def test_replay_pipe_wait_close(self):
        observable = self.create_observable()
        replay = self.replay(observable)

        self._test_wait_close(observable, replay)

    def test_on_next_exception_once(self):
        observable = self.create_observable()

        class FailingObserver(_StoringObserver):
            def __init__(self):
                super().__init__()
                self.count = 0

            def on_next(self, chunk):
                self.count += 1
                if self.count == 1:
                    raise Exception

                super().on_next(chunk)

        observer = FailingObserver()
        observable.subscribe(observer)

        observable.push('m1')
        observable.push('m2')

        self.assertEqual(['m2'], observer.data)

    def test_on_next_exception_close_afterwards(self):
        observable = self.create_observable()

        class FailingObserver(_StoringObserver):
            def on_next(self, chunk):
                raise Exception

        observer = FailingObserver()
        observable.subscribe(observer)

        observable.push('m1')
        observable.push('m2')
        observable.close()

        self.assertEqual([], observer.data)
        self.assertTrue(observer.closed)

    def test_on_next_exception_different_observers(self):
        observable = self.create_observable()

        class FailingObserver(_StoringObserver):
            def on_next(self, chunk):
                raise Exception

        failing_observer = FailingObserver()
        observable.subscribe(failing_observer)

        normal_observer = _StoringObserver()
        observable.subscribe(normal_observer)

        observable.push('m1')
        observable.push('m2')

        self.assertEqual([], failing_observer.data)
        self.assertEqual(['m1', 'm2'], normal_observer.data)

    def test_on_close_exception_single_observer(self):
        observable = self.create_observable()

        class FailingObserver(_StoringObserver):
            def on_close(self):
                raise Exception

        observer = FailingObserver()
        observable.subscribe(observer)

        observable.close()

        self.assertTrue(observable.closed)

    def test_on_close_exception_different_observers(self):
        observable = self.create_observable()

        class FailingObserver(_StoringObserver):
            def on_close(self):
                raise Exception

        failing_observer = FailingObserver()
        observable.subscribe(failing_observer)

        normal_observer = _StoringObserver()
        observable.subscribe(normal_observer)

        observable.close()

        self.assertTrue(observable.closed)
        self.assertTrue(normal_observer.closed)

    def test_prohibit_piped_modifications(self):
        pipe_creators = [
            lambda o: o.map(lambda x: '_' + x),
            lambda o: o.time_buffered(10),
            lambda o: self.replay(o)
        ]

        for pipe_creator in pipe_creators:
            observable = self.create_observable()
            pipe = pipe_creator(observable)

            self.assertRaises(RuntimeError, pipe.push, ['any_message'])
            self.assertRaises(RuntimeError, pipe.close)

    def test_time_buffer_multiple_buffers(self):
        observable = self.create_observable()

        buffered_observable = observable.time_buffered(10)

        observer = _StoringObserver()
        buffered_observable.subscribe(observer)

        observable.push('message1')

        self.wait_buffer_flush(buffered_observable)

        observable.push('message2')

        self.wait_buffer_flush(buffered_observable)

        observable.push('message3')

        self.wait_buffer_flush(buffered_observable)

        self.assertEqual(['message1', 'message2', 'message3'], observer.data)

    def test_time_buffer_aggregate_multiple_updates_multiple_buffers(self):
        observable = self.create_observable()

        buffered_observable = observable.time_buffered(100, lambda chunks: ['|||'.join(chunks)])

        observer = _StoringObserver()
        buffered_observable.subscribe(observer)

        observable.push('a')
        observable.push('b')
        observable.push('c')

        self.wait_buffer_flush(buffered_observable)

        observable.push('d')
        observable.push('e')
        observable.push('f')

        self.wait_buffer_flush(buffered_observable)

        self.assertEqual(['a|||b|||c', 'd|||e|||f'], observer.data)

    def test_time_buffer_flush_before_close(self):
        observable = self.create_observable()

        buffered_observable = observable.time_buffered(100)

        observer = _StoringObserver()
        buffered_observable.subscribe(observer)

        observable.push('m1')
        observable.push('m2')
        observable.close()

        self.assertEqual([], observer.data)
        self.assertFalse(observer.closed)

        self.wait_buffer_flush(buffered_observable)

        self.assertEqual(['m1', 'm2'], observer.data)
        self.assertTrue(observer.closed)

    def _test_read_until_closed(self, source, observable, prewait_callback=None):
        early_messages = ['early 1', 'early 1']
        late_messages = ['lateX', 'lateY']

        for message in early_messages:
            source.push(message)

        def publish_async():
            time.sleep(0.1)

            for message in late_messages:
                source.push(message)

            source.close()

        thread = threading.Thread(target=publish_async, daemon=True)
        thread.start()

        if prewait_callback:
            prewait_callback()

        data = read_until_closed(observable, timeout=0.3)

        self.assertTrue(source.closed)
        self.assertTrue(observable.closed)

        return data, early_messages, late_messages

    def test_read_until_closed(self):
        observable = self.create_observable()

        data, _, late_messages = self._test_read_until_closed(observable, observable)

        self.assertEqual(late_messages, data)

    def test_map_read_until_closed(self):
        observable = self.create_observable()
        mapped_observable = observable.map(lambda chunk: chunk + '_x')

        data, _, late_messages = self._test_read_until_closed(observable, mapped_observable)
        late_messages = [m + '_x' for m in late_messages]

        self.assertEqual(late_messages, data)

    def test_time_buffer_read_until_closed(self):
        observable = self.create_observable()
        buffered_observable = observable.time_buffered(10)

        data, _, late_messages = self._test_read_until_closed(
            observable,
            buffered_observable,
            lambda: self.wait_buffer_flush(buffered_observable))

        self.assertEqual(late_messages, data)

    def test_time_buffer_aggregated_read_until_closed(self):
        observable = self.create_observable()
        buffered_observable = observable.time_buffered(100, lambda chunks: ['|'.join(chunks)])

        data, _, late_messages = self._test_read_until_closed(
            observable,
            buffered_observable,
            lambda: self.wait_buffer_flush(buffered_observable))
        message = '|'.join(late_messages)

        self.assertEqual([message], data)

    def test_time_buffer_not_flushed_read_until_closed(self):
        observable = self.create_observable()

        buffered_observable = observable.time_buffered(300)

        observable.push('m1')
        observable.push('m2')
        observable.close()

        self.assertFalse(buffered_observable.closed)
        self.assertTrue(observable.closed)

        self.assertRaises(TimeoutError, read_until_closed, buffered_observable, 0.001)

    def test_replay_read_until_closed(self):
        observable = self.create_replay_observable()

        data, early_messages, late_messages = self._test_read_until_closed(observable, observable)

        self.assertEqual(early_messages + late_messages, data)

    def test_pipe_replay_read_until_closed(self):
        observable = self.create_observable()
        replay = self.replay(observable)

        data, early_messages, late_messages = self._test_read_until_closed(observable, replay)

        self.assertEqual(early_messages + late_messages, data)

    def test_replay_dispose(self):
        observable = self.create_replay_observable()
        observable.push('1')
        observable.close()
        observable.dispose()

        data = read_until_closed(observable, 0.1)
        self.assertEqual([], data)

    def test_replay_dispose_when_active(self):
        observable = self.create_replay_observable()
        observable.push('1')
        observable.dispose()

        self.assertTrue(observable.closed)
        self.assertEqual([], read_until_closed(observable, timeout=0.001))

    def test_pipe_replay_dispose_deferred(self):
        observable = self.create_observable()
        replay = self.replay(observable)

        observer = _StoringObserver()
        replay.subscribe(observer)

        replay.dispose()

        self.assertFalse(replay.closed)
        self.assertFalse(observer.closed)

        observable.close()
        self.assertTrue(replay.closed)
        self.assertTrue(observer.closed)

    def test_pipe_replay_dispose_deferred_for_updates(self):
        observable = self.create_observable()
        replay = self.replay(observable)

        observer = _StoringObserver()
        replay.subscribe(observer)

        observable.push('1')
        replay.dispose()
        observable.push('2')

        self.assertEqual(['1', '2'], observer.data)

        observable.close()
        self.assertEqual([], read_until_closed(replay, timeout=0.001))

    def test_unlimited_wait_close(self):
        observable = self.create_observable()

        thread = threading.Thread(target=observable.wait_close, daemon=True)
        thread.start()

        time.sleep(0.1)

        self.assertTrue(thread.is_alive())
        observable.close()

        thread.join(timeout=0.1)
        self.assertFalse(thread.is_alive())

    def create_observable(self):
        observable = Observable()
        self._track(observable)
        return observable

    def replay(self, observable):
        replay = observable.replay()
        self._track(observable)
        return replay

    def create_replay_observable(self):
        observable = ReplayObservable()
        self._track(observable)
        return observable

    def _track(self, observable):
        self._observables.append(observable)

    def setUp(self):
        super().setUp()
        self._observables = []

    def tearDown(self):
        super().tearDown()

        for observable in self._observables:
            if not observable.closed and not isinstance(observable, PipedObservable):
                observable.close()

            if isinstance(observable, ReplayObservable):
                observable.dispose()

    @staticmethod
    def close_delayed(observable):
        time.sleep(0.1)
        observable.close()

    @staticmethod
    def wait_buffer_flush(buffered_observable):
        time.sleep(buffered_observable.period_millis * 1.3 / 1000.0 + 0.01)