Source code for pynaviz.video.video_worker

import queue
from multiprocessing import Event, Lock, Queue, shared_memory

import numpy as np

from pynaviz.video.video_handling import VideoHandler

from ..utils import RenderTriggerSource


[docs] def video_worker_process( video_path: str, shape: tuple, shm_frame_name: str, shm_index_name: str, request_queue: Queue, frame_ready: Event, response_queue: Queue, stop_event: Event, buffer_lock: Lock ): handler = VideoHandler(video_path) shm_frame = shared_memory.SharedMemory(name=shm_frame_name) shm_index = shared_memory.SharedMemory(name=shm_index_name) frame_buffer = np.ndarray(shape, dtype=np.float32, buffer=shm_frame.buf) index_buffer = np.ndarray((1,), dtype=np.float32, buffer=shm_index.buf) while not stop_event.is_set(): try: # wait for a new request item = request_queue.get(timeout=1.0) except queue.Empty: continue # if we received a shutdown signal terminate if item[0] is None: break # empty the queue keeping the most recent item while True: try: latest = request_queue.get_nowait() if latest[0] is None: # shutdown signal received, break immediately item = latest break item = latest except queue.Empty: break # unpack latest request idx, move_key_frame, request_type = item # TODO: unsure if this can happen now that i have the event if idx is None: break if request_type == RenderTriggerSource.LOCAL_KEY: frame, idx = handler.get_key_frame(move_key_frame) else: frame = handler[idx] # shape: (H, W, 3) in RGB, float32 with buffer_lock: np.copyto(frame_buffer, frame) np.copyto(index_buffer, idx) # drain response_queue to remove stale triggers while True: try: _ = response_queue.get_nowait() except queue.Empty: break # only now enqueue the trigger response_queue.put(request_type) frame_ready.set() try: handler.close() except Exception as e: print(f"[video_worker_process] Failed to close handler: {e}")