Skip to content

Scene Camera Video

You can receive the scene camera video stream with timestamps, using the receive_video_frames method.

async for frame in receive_video_frames(
    sensor_world.url, run_loop=restart_on_disconnect
):
    bgr_buffer = frame.bgr_buffer()
Scene camera
VideoFrame

VideoFrame

Bases: NamedTuple

A video frame with timestamp information.

This class represents a video frame from the scene camera with associated timestamp information. The Class inherits VideoFrame from py.av library.

Methods:

  • bgr_buffer

    Convert the video frame to a BGR buffer.

  • to_ndarray

    Convert the video frame to a NumPy array.

Attributes:

av_frame instance-attribute

av_frame: VideoFrame

The video frame.

datetime property

datetime: datetime

Get timestamp as a datetime object.

timestamp_unix_ns property

timestamp_unix_ns: int

Get timestamp in nanoseconds since Unix epoch.

timestamp_unix_seconds instance-attribute

timestamp_unix_seconds: float

Timestamp in seconds since Unix epoch.

bgr_buffer

bgr_buffer() -> BGRBuffer

Convert the video frame to a BGR buffer.

This method converts the video frame to a BGR buffer, which is a NumPy array with the shape (height, width, 3) and dtype uint8. The BGR format is commonly used in computer vision applications.

Returns:

  • BGRBuffer ( BGRBuffer ) –

    The BGR buffer as a NumPy array.

Source code in src/pupil_labs/realtime_api/streaming/video.py
46
47
48
49
50
51
52
53
54
55
56
57
def bgr_buffer(self) -> BGRBuffer:
    """Convert the video frame to a BGR buffer.

    This method converts the video frame to a BGR buffer, which is a
    NumPy array with the shape (height, width, 3) and dtype uint8.
    The BGR format is commonly used in computer vision applications.

    Returns:
        BGRBuffer: The BGR buffer as a NumPy array.

    """
    return self.to_ndarray(format="bgr24")

to_ndarray

to_ndarray(*args: Any, **kwargs: Any) -> NDArray

Convert the video frame to a NumPy array.

Source code in src/pupil_labs/realtime_api/streaming/video.py
42
43
44
def to_ndarray(self, *args: Any, **kwargs: Any) -> npt.NDArray:
    """Convert the video frame to a NumPy array."""
    return self.av_frame.to_ndarray(*args, **kwargs)
Check the whole example code here
stream_scene_camera_video.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
import asyncio
import contextlib

import cv2
import numpy as np

# Workaround for https://github.com/opencv/opencv/issues/21952
cv2.imshow("cv/av bug", np.zeros(1))
cv2.destroyAllWindows()

from pupil_labs.realtime_api import Device, Network, receive_video_frames  # noqa: E402


async def main():
    async with Network() as network:
        dev_info = await network.wait_for_new_device(timeout_seconds=5)
    if dev_info is None:
        print("No device could be found! Abort")
        return

    async with Device.from_discovered_device(dev_info) as device:
        status = await device.get_status()
        sensor_world = status.direct_world_sensor()
        if not sensor_world.connected:
            print(f"Scene camera is not connected to {device}")
            return

        restart_on_disconnect = True
        async for frame in receive_video_frames(
            sensor_world.url, run_loop=restart_on_disconnect
        ):
            bgr_buffer = frame.bgr_buffer()
            draw_time(bgr_buffer, frame.datetime)
            cv2.imshow("Scene Camera - Press ESC to quit", bgr_buffer)
            if cv2.waitKey(1) & 0xFF == 27:
                return


def draw_time(frame, time):
    frame_txt_font_name = cv2.FONT_HERSHEY_SIMPLEX
    frame_txt_font_scale = 1.0
    frame_txt_thickness = 1

    # first line: frame index
    frame_txt = str(time)

    cv2.putText(
        frame,
        frame_txt,
        (20, 50),
        frame_txt_font_name,
        frame_txt_font_scale,
        (255, 255, 255),
        thickness=frame_txt_thickness,
        lineType=cv2.LINE_8,
    )


if __name__ == "__main__":
    with contextlib.suppress(KeyboardInterrupt):
        asyncio.run(main())

Scene Camera Video with Overlayed Gaze

The following example shows how you can match multiple sensors streams by qeueing (asyncio.Queue()) and matching the data.

Scene camera with gaze overlay
Check the whole example code here
stream_video_with_overlayed_gaze.py
import asyncio
import contextlib
import typing as T

import cv2
import numpy as np

# Workaround for https://github.com/opencv/opencv/issues/21952
cv2.imshow("cv/av bug", np.zeros(1))
cv2.destroyAllWindows()

from pupil_labs.realtime_api import (  # noqa: E402
    Device,
    Network,
    receive_gaze_data,
    receive_video_frames,
)


async def main():
    async with Network() as network:
        dev_info = await network.wait_for_new_device(timeout_seconds=5)
    if dev_info is None:
        print("No device could be found! Abort")
        return

    async with Device.from_discovered_device(dev_info) as device:
        print(f"Getting status information from {device}")
        status = await device.get_status()

        sensor_gaze = status.direct_gaze_sensor()
        if not sensor_gaze.connected:
            print(f"Gaze sensor is not connected to {device}")
            return

        sensor_world = status.direct_world_sensor()
        if not sensor_world.connected:
            print(f"Scene camera is not connected to {device}")
            return

        restart_on_disconnect = True

        queue_video = asyncio.Queue()
        queue_gaze = asyncio.Queue()

        process_video = asyncio.create_task(
            enqueue_sensor_data(
                receive_video_frames(sensor_world.url, run_loop=restart_on_disconnect),
                queue_video,
            )
        )
        process_gaze = asyncio.create_task(
            enqueue_sensor_data(
                receive_gaze_data(sensor_gaze.url, run_loop=restart_on_disconnect),
                queue_gaze,
            )
        )
        try:
            await match_and_draw(queue_video, queue_gaze)
        finally:
            process_video.cancel()
            process_gaze.cancel()


async def enqueue_sensor_data(sensor: T.AsyncIterator, queue: asyncio.Queue) -> None:
    async for datum in sensor:
        try:
            queue.put_nowait((datum.datetime, datum))
        except asyncio.QueueFull:
            print(f"Queue is full, dropping {datum}")


async def match_and_draw(queue_video, queue_gaze):
    while True:
        video_datetime, video_frame = await get_most_recent_item(queue_video)
        _, gaze_datum = await get_closest_item(queue_gaze, video_datetime)

        bgr_buffer = video_frame.to_ndarray(format="bgr24")

        cv2.circle(
            bgr_buffer,
            (int(gaze_datum.x), int(gaze_datum.y)),
            radius=80,
            color=(0, 0, 255),
            thickness=15,
        )

        cv2.imshow("Scene camera with gaze overlay", bgr_buffer)
        cv2.waitKey(1)


async def get_most_recent_item(queue):
    item = await queue.get()
    while True:
        try:
            next_item = queue.get_nowait()
        except asyncio.QueueEmpty:
            return item
        else:
            item = next_item


async def get_closest_item(queue, timestamp):
    item_ts, item = await queue.get()
    # assumes monotonically increasing timestamps
    if item_ts > timestamp:
        return item_ts, item
    while True:
        try:
            next_item_ts, next_item = queue.get_nowait()
        except asyncio.QueueEmpty:
            return item_ts, item
        else:
            if next_item_ts > timestamp:
                return next_item_ts, next_item
            item_ts, item = next_item_ts, next_item


if __name__ == "__main__":
    with contextlib.suppress(KeyboardInterrupt):
        asyncio.run(main())

Scene Camera Video with Overlayed Fixations or Other Streams

Neon +2.9.0 +1.5.0

You can do this with any streams, eye cameras or including eye events (blinks, or fixations data).

Scene camera with fixations overlay
Check the whole example code here
stream_video_with_overlayed_fixations.py
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
import asyncio
import contextlib
import typing as T
from collections import deque

import cv2
import numpy as np

# Workaround for https://github.com/opencv/opencv/issues/21952
cv2.imshow("cv/av bug", np.zeros(1))
cv2.destroyAllWindows()

from pupil_labs.realtime_api import (  # noqa: E402
    Device,
    Network,
    receive_eye_events_data,
    receive_video_frames,
)
from pupil_labs.realtime_api.streaming import (  # noqa: E402
    BlinkEventData,
    FixationEventData,
)


async def main():
    async with Network() as network:
        dev_info = await network.wait_for_new_device(timeout_seconds=5)
    if dev_info is None:
        print("No device could be found! Abort")
        return

    async with Device.from_discovered_device(dev_info) as device:
        print(f"Getting status information from {device}")
        status = await device.get_status()

        sensor_eye_events = status.direct_eye_events_sensor()
        if not sensor_eye_events.connected:
            print(f"Eye events sensor is not connected to {device}")
            return

        sensor_world = status.direct_world_sensor()
        if not sensor_world.connected:
            print(f"Scene camera is not connected to {device}")
            return

        restart_on_disconnect = True

        queue_video = asyncio.Queue()
        queue_eye_events = asyncio.Queue()

        process_video = asyncio.create_task(
            enqueue_sensor_data(
                receive_video_frames(sensor_world.url, run_loop=restart_on_disconnect),
                queue_video,
            )
        )
        process_gaze = asyncio.create_task(
            enqueue_sensor_data(
                receive_eye_events_data(
                    sensor_eye_events.url, run_loop=restart_on_disconnect
                ),
                queue_eye_events,
            )
        )
        try:
            await match_and_draw(queue_video, queue_eye_events)
        finally:
            process_video.cancel()
            process_gaze.cancel()


async def enqueue_sensor_data(sensor: T.AsyncIterator, queue: asyncio.Queue) -> None:
    async for datum in sensor:
        try:
            queue.put_nowait((datum.datetime, datum))
        except asyncio.QueueFull:
            print(f"Queue is full, dropping {datum}")


async def match_and_draw(queue_video, queue_eye_events):
    fixation_history = deque(maxlen=10)
    fixation_counter = 0

    blink = None
    blink_counter = 0

    while True:
        _video_datetime, video_frame = await get_most_recent_item(queue_video)
        bgr_buffer = video_frame.to_ndarray(format="bgr24")

        while not queue_eye_events.empty():
            _, eye_event = await queue_eye_events.get()
            if isinstance(eye_event, FixationEventData):
                if eye_event.event_type == 0:
                    continue

                fixation_history.append({
                    "id": fixation_counter,
                    "fixation": eye_event,
                })
                fixation_counter += 1

            elif isinstance(eye_event, BlinkEventData):
                blink = eye_event
                blink_counter += 1

        for fixation_meta in fixation_history:
            fixation_id = fixation_meta["id"]
            fixation = fixation_meta["fixation"]

            age = video_frame.timestamp_unix_seconds - fixation.end_time_ns * 1e-9
            duration = (fixation.end_time_ns - fixation.start_time_ns) * 1e-9

            overlay = bgr_buffer.copy()
            cv2.circle(
                overlay,
                (int(fixation.mean_gaze_x), int(fixation.mean_gaze_y)),
                radius=40 + int(duration * 10),
                color=(255, 32, 32),
                thickness=5,
            )
            cv2.putText(
                overlay,
                str(fixation_id),
                (int(fixation.mean_gaze_x) - 10, int(fixation.mean_gaze_y) + 5),
                cv2.FONT_HERSHEY_SIMPLEX,
                1,
                (255, 255, 255),
                2,
                cv2.LINE_AA,
            )
            alpha = min(max(0, 1.0 - age / 5.0), 1.0)
            cv2.addWeighted(overlay, alpha, bgr_buffer, 1 - alpha, 0, bgr_buffer)

        if blink is not None:
            overlay = bgr_buffer.copy()
            cv2.putText(
                overlay,
                f"Blink {blink_counter}",
                (10, 30),
                cv2.FONT_HERSHEY_SIMPLEX,
                1,
                (0, 0, 255),
                2,
                cv2.LINE_AA,
            )
            age = video_frame.timestamp_unix_seconds - blink.end_time_ns * 1e-9
            alpha = min(max(0, 1.0 - age / 5.0), 1.0)
            cv2.addWeighted(overlay, alpha, bgr_buffer, 1 - alpha, 0, bgr_buffer)

        cv2.imshow("Scene camera with eye events", bgr_buffer)
        cv2.waitKey(1)


async def get_most_recent_item(queue):
    item = await queue.get()
    while True:
        try:
            next_item = queue.get_nowait()
        except asyncio.QueueEmpty:
            return item
        else:
            item = next_item


if __name__ == "__main__":
    with contextlib.suppress(KeyboardInterrupt):
        asyncio.run(main())