Skip to content

Collection Speed Both Cameras

import queue
import time
from threading import Event, Thread

from tqdm import tqdm

from pupil_labs.neon_usb import (
    EyeCameraUVC,
    Frame,
    SceneCamera,
    get_all_items,
    image_receiver,
)

eye_start_event = Event()
eye_stop_event = Event()
scene_start_event = Event()
scene_stop_event = Event()

eye_q = queue.Queue[Frame](maxsize=400)
eye_thread = Thread(
    target=image_receiver,
    args=(EyeCameraUVC, eye_q, eye_start_event, eye_stop_event, scene_start_event),
)
eye_thread.start()


scene_q = queue.Queue[Frame](maxsize=400)
scene_thread = Thread(
    target=image_receiver,
    args=(SceneCamera, scene_q, scene_start_event, scene_stop_event, eye_start_event),
)
scene_thread.start()

eye_start_event.wait()
scene_start_event.wait()


total_eye_frames = 5000
eye_frame_counter = 0
scene_frame_counter = 0
with tqdm(total=total_eye_frames) as pbar:
    start = time.time()
    while True:
        if eye_frame_counter >= total_eye_frames:
            break
        eye_frames = get_all_items(eye_q)
        num_eye_frames = len(eye_frames)
        eye_frame_counter += num_eye_frames

        scene_frames = get_all_items(scene_q)
        num_scene_frames = len(scene_frames)
        scene_frame_counter += num_scene_frames

        pbar.update(num_eye_frames)

end = time.time()
eye_stop_event.set()
scene_stop_event.set()
print(
    "\t".join([
        f"Eye FPS: {eye_frame_counter / (end - start):.1f}",
        f"Scene FPS: {scene_frame_counter / (end - start):.1f}",
        f"Duration: {end - start:.1f}",
    ])
)