Skip to content

Stream Eye Cameras

Neon +1.1.2

Neon allows you to receive the eye cameras video stream with timestamps. Using the same receive_video_frames method used for the scene camera, but using the sensor.eyes that you can withdraw from direct_eyes_sensor.

status = await device.get_status()
sensor_eyes = status.direct_eyes_sensor()
async for frame in receive_video_frames(
    sensor_eyes.url, run_loop=restart_on_disconnect
    ):
    bgr_buffer = frame.bgr_buffer()
Eye Cameras
VideoFrame

VideoFrame

Bases: NamedTuple

A video frame with timestamp information.

This class represents a video frame from the scene camera with associated timestamp information. The Class inherits VideoFrame from py.av library.

Methods:

  • bgr_buffer

    Convert the video frame to a BGR buffer.

  • to_ndarray

    Convert the video frame to a NumPy array.

Attributes:

av_frame instance-attribute

av_frame: VideoFrame

The video frame.

datetime property

datetime: datetime

Get timestamp as a datetime object.

timestamp_unix_ns property

timestamp_unix_ns: int

Get timestamp in nanoseconds since Unix epoch.

timestamp_unix_seconds instance-attribute

timestamp_unix_seconds: float

Timestamp in seconds since Unix epoch.

bgr_buffer

bgr_buffer() -> BGRBuffer

Convert the video frame to a BGR buffer.

This method converts the video frame to a BGR buffer, which is a NumPy array with the shape (height, width, 3) and dtype uint8. The BGR format is commonly used in computer vision applications.

Returns:

  • BGRBuffer ( BGRBuffer ) –

    The BGR buffer as a NumPy array.

Source code in src/pupil_labs/realtime_api/streaming/video.py
46
47
48
49
50
51
52
53
54
55
56
57
def bgr_buffer(self) -> BGRBuffer:
    """Convert the video frame to a BGR buffer.

    This method converts the video frame to a BGR buffer, which is a
    NumPy array with the shape (height, width, 3) and dtype uint8.
    The BGR format is commonly used in computer vision applications.

    Returns:
        BGRBuffer: The BGR buffer as a NumPy array.

    """
    return self.to_ndarray(format="bgr24")

to_ndarray

to_ndarray(*args: Any, **kwargs: Any) -> NDArray

Convert the video frame to a NumPy array.

Source code in src/pupil_labs/realtime_api/streaming/video.py
42
43
44
def to_ndarray(self, *args: Any, **kwargs: Any) -> npt.NDArray:
    """Convert the video frame to a NumPy array."""
    return self.av_frame.to_ndarray(*args, **kwargs)
Check the whole example code here
stream_eyes_camera_video.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
import asyncio
import contextlib
import time

import cv2
import numpy as np

# Workaround for https://github.com/opencv/opencv/issues/21952
cv2.imshow("cv/av bug", np.zeros(1))
cv2.destroyAllWindows()

from pupil_labs.realtime_api import Device, Network, receive_video_frames  # noqa: E402


async def main(preview_frame_rate=30):
    async with Network() as network:
        dev_info = await network.wait_for_new_device(timeout_seconds=5)
    if dev_info is None:
        print("No device could be found! Abort")
        return

    async with Device.from_discovered_device(dev_info) as device:
        status = await device.get_status()
        sensor_eyes = status.direct_eyes_sensor()
        if not sensor_eyes.connected:
            print(f"Eyes camera is not connected to {device}")
            return

        restart_on_disconnect = True
        last_update = time.perf_counter()
        async for frame in receive_video_frames(
            sensor_eyes.url, run_loop=restart_on_disconnect
        ):
            bgr_buffer = frame.bgr_buffer()
            draw_time(bgr_buffer, frame.datetime)
            cv2.imshow("Eye Cameras - Press ESC to quit", bgr_buffer)

            time_since_last_update = time.perf_counter() - last_update
            if time_since_last_update > 1 / preview_frame_rate:
                if cv2.waitKey(1) & 0xFF == 27:
                    return
                last_update = time.perf_counter()


def draw_time(frame, time):
    frame_txt_font_name = cv2.FONT_HERSHEY_SIMPLEX
    frame_txt_font_scale = 0.5
    frame_txt_thickness = 1

    # first line: frame index
    frame_txt = str(time)

    cv2.putText(
        frame,
        frame_txt,
        (20, 50),
        frame_txt_font_name,
        frame_txt_font_scale,
        (255, 255, 255),
        thickness=frame_txt_thickness,
        lineType=cv2.LINE_8,
    )


if __name__ == "__main__":
    with contextlib.suppress(KeyboardInterrupt):
        asyncio.run(main())