File tree Expand file tree Collapse file tree
task_processing/plugins/kubernetes Expand file tree Collapse file tree Original file line number Diff line number Diff line change 11import logging
2+ import multiprocessing
23import queue
34import time
5+ from multiprocessing import JoinableQueue
46from queue import Queue
57from typing import Collection
68from typing import Optional
1719from kubernetes .client import V1SecurityContext
1820from kubernetes .client .exceptions import ApiException
1921from pyrsistent import pmap
20- import multiprocessing
21- from multiprocessing import JoinableQueue
2222from pyrsistent import v
2323from pyrsistent .typing import PMap
2424
@@ -113,7 +113,7 @@ def __init__(
113113 # possible to cleanly shutdown both of these.
114114 self .pending_events : "JoinableQueue[PodEvent]" = JoinableQueue ()
115115 self .event_queue : "JoinableQueue[Event]" = JoinableQueue ()
116-
116+
117117 # TODO(TASKPROC-243): keep track of resourceVersion so that we can continue event processing
118118 # from where we left off on restarts
119119 self .pod_event_watch_processes = []
Original file line number Diff line number Diff line change 1- import threading
21import multiprocessing
2+ import threading
33
44import mock
55import pytest
@@ -16,7 +16,8 @@ def mock_Thread():
1616 with mock .patch .object (threading , "Thread" ) as mock_Thread :
1717 yield mock_Thread
1818
19+
1920@pytest .fixture
2021def mock_Process ():
21- with mock .patch .object (multiprocessing , ' Process' ) as mock_Process :
22- yield mock_Process
22+ with mock .patch .object (multiprocessing , " Process" ) as mock_Process :
23+ yield mock_Process
Original file line number Diff line number Diff line change @@ -802,8 +802,8 @@ def test_process_event_enqueues_task_processing_events_no_state_transition(
802802 assert (
803803 len (k8s_executor .task_metadata [mock_pod .metadata .name ].task_state_history ) == 0
804804 )
805-
806-
805+
806+
807807def test_pending_event_processing_loop_processes_remaining_events_after_stop (
808808 k8s_executor ,
809809):
@@ -812,6 +812,7 @@ def test_pending_event_processing_loop_processes_remaining_events_after_stop(
812812 test_pod = V1Pod (
813813 metadata = V1ObjectMeta (
814814 name = "test-pod" ,
815+ namespace = "task_processing_tests" ,
815816 )
816817 # Add other necessary attributes here
817818 )
You can’t perform that action at this time.
0 commit comments