Skip to content

Pupil Labs Neon Recording

ci documentation uv ruff pre-commit pypi version

Functionality for loading Neon recordings in native recording format

Installation

pip install pupil-labs-neon-recording

or

pip install -e git+https://github.com/pupil-labs/pl-neon-recording.git

Documentation

Documentation is available at https://pupil-labs.github.io/pl-neon-recording/

Usage

Basic Usage

import sys

import pupil_labs.neon_recording as nr

if len(sys.argv) < 2:
    print("Usage:")
    print("python basic_usage.py path/to/recording/folder")

# Open a recording
recording = nr.open(sys.argv[1])

# get basic info
print("Recording Info:")
print(f"\tStart time (ns): {recording.start_ts}")
print(f"\tWearer         : {recording.wearer['name']}")
print(f"\tDevice serial  : {recording.device_serial}")
print(f"\tGaze samples   : {len(recording.gaze)}")
print("")

# read 10 gaze samples
print("First 10 gaze samples:")
timestamps = recording.gaze.ts[:10]
subsample = recording.gaze.sample(timestamps)
for gaze_datum in subsample:
    print(f"\t{gaze_datum.ts} : ({gaze_datum.x:0.2f}, {gaze_datum.y:0.2f})")
import sys

import cv2
import numpy as np
from tqdm import tqdm

# Workaround for https://github.com/opencv/opencv/issues/21952
cv2.imshow("cv/av bug", np.zeros(1))
cv2.destroyAllWindows()

import pupil_labs.neon_recording as nr  # noqa: E402


class EventTracker:
    def __init__(self, stream):
        self.idx = 0
        self.itr = iter(stream)
        self.next_event = next(self.itr)

    def in_event(self, ts):
        if self.next_event is None:
            return False

        return (
            self.next_event.start_timestamp_ns < ts < self.next_event.end_timestamp_ns
        )

    def step_to(self, ts):
        try:
            while self.next_event is not None and self.next_event.end_timestamp_ns < ts:
                self.next_event = next(self.itr)
                self.idx += 1

        except StopIteration:
            self.next_event = None

        return self.in_event(ts)


def write_text(image, text, x, y):
    return cv2.putText(
        image,
        text,
        (x, y),
        cv2.FONT_HERSHEY_SIMPLEX,
        1,
        (0, 0, 255),
        2,
        cv2.LINE_AA,
    )


def make_overlaid_video(recording_dir, output_video_path, fps=30):
    recording = nr.open(recording_dir)

    video_writer = cv2.VideoWriter(
        str(output_video_path),
        cv2.VideoWriter_fourcc(*"MJPG"),
        fps,
        (recording.eye.width, recording.eye.height),
    )

    output_timestamps = np.arange(
        recording.eye.ts[0], recording.eye.ts[-1], int(1e9 / fps)
    )
    eye_frames = recording.eye.sample(output_timestamps)

    fixations_only = recording.fixations[recording.fixations["event_type"] == 1]
    event_trackers = {
        "Fixation": EventTracker(fixations_only),
        "Blink": EventTracker(recording.blinks),
    }

    for frame in tqdm(eye_frames):
        frame_pixels = frame.bgr

        text_y = 0
        for stream, tracker in event_trackers.items():
            text_y += 40
            if tracker.step_to(frame.ts):
                frame_pixels = write_text(
                    frame_pixels, f"{stream} {tracker.idx + 1}", 0, text_y
                )

        video_writer.write(frame_pixels)
        cv2.imshow("Frame", frame_pixels)
        cv2.pollKey()

    video_writer.release()


if __name__ == "__main__":
    make_overlaid_video(sys.argv[1], "blinks-and-fixations.mp4")

Data Access

import sys
from collections.abc import Mapping
from datetime import datetime

import numpy as np

import pupil_labs.neon_recording as nr
from pupil_labs.neon_recording.stream.stream import Stream

if len(sys.argv) < 2:
    print("Usage:")
    print("python basic_usage.py path/to/recording/folder")

# Open a recording
recording = nr.open(sys.argv[1])


def pretty_format(mapping: Mapping):
    output = []
    pad = "    "
    keys = mapping.keys()
    n = max(len(key) for key in keys)
    for k, v in mapping.items():
        v_repr_lines = str(v).splitlines()
        output.append(f"{pad}{k:>{n}}: {v_repr_lines[0]}")
        if len(v_repr_lines) > 1:
            output.extend(f"{pad + '  '}{n * ' '}{line}" for line in v_repr_lines[1:])
    return "\n".join(output)


print("Basic Recording Info:")
print_data = {
    "Recording ID": recording.id,
    "Start time (ns since unix epoch)": f"{recording.start_ts}",
    "Start time (datetime)": f"{datetime.fromtimestamp(recording.start_ts / 1e9)}",
    "Duration (nanoseconds)": f"{recording.duration}",
    "Duration (seconds)": f"{recording.duration / 1e9}",
    "Wearer": f"{recording.wearer['name']} ({recording.wearer['uuid']})",
    "Device serial": recording.device_serial,
    "App version": recording.info["app_version"],
    "Data format": recording.info["data_format_version"],
    "Gaze Offset": recording.info["gaze_offset"],
}
print(pretty_format(print_data))

streams: list[Stream] = [
    recording.gaze,
    recording.imu,
    recording.eye_state,
    recording.blinks,
    recording.fixations,
    recording.worn,
    recording.eye,
    recording.scene,
    recording.audio,
]
print()
print("Recording streams:")
print(
    pretty_format({
        f"{stream.name} ({len(stream)} samples)": "\n" + pretty_format(stream[0])
        for stream in streams
    })
)

# Data can be converted to numpy or dataframes, however multi column properties
# like .xy will not be included

print()
print("Gaze data as numpy array:")
gaze_np = recording.gaze.data
print()
print(gaze_np)


print()
print("Gaze data as pandas dataframe:")
gaze_df = recording.gaze.pd
print()
print(gaze_df)


print()
print("Getting data from a stream:")

print()
print("Gaze data", recording.gaze)
# GazeArray([(1741948698620648018, 966.3677 , 439.58817),
#            (1741948698630654018, 965.9669 , 441.60403),
#            (1741948698635648018, 964.2665 , 442.4974 ), ...,
#            (1741948717448190018, 757.85815, 852.34644),
#            (1741948717453190018, 766.53174, 857.3709 ),
#            (1741948717458190018, 730.93604, 851.53723)],
#           dtype=[('ts', '<i8'), ('x', '<f4'), ('y', '<f4')])

print()
print("Gaze ts via prop", recording.gaze.ts)
print("Gaze ts via key", recording.gaze["ts"])
# array([1741948698620648018, 1741948698630654018, 1741948698635648018, ...,
#        1741948717448190018, 1741948717453190018, 1741948717458190018])

print()
print("Gaze x coords via prop", recording.gaze.x)
print("Gaze x coords via key", recording.gaze["x"])
# array([966.3677 , 965.9669 , 964.2665 , ..., 757.85815, 766.53174,
#        730.93604], dtype=float32)

print()
print("Gaze y coords via prop", recording.gaze.y)
print("Gaze y coords via key", recording.gaze["y"])
# array([439.58817, 441.60403, 442.4974 , ..., 852.34644, 857.3709 ,
#        851.53723], dtype=float32)

print()
print("Gaze xy coords", recording.gaze.xy)
print("Gaze xy coords", recording.gaze[["x", "y"]])
# array([[966.3677 , 439.58817],
#        ...,
#        [730.93604, 851.53723]], dtype=float32)

print()
print("Gaze ts and x and y", recording.gaze[["ts", "x", "y"]])
# array([[1.74194870e+18, 9.66367676e+02, 4.39588165e+02],
#        ...,
#        [1.74194872e+18, 7.30936035e+02, 8.51537231e+02]])

print()
print("Sampling data:")

print()
print("Get closest gaze for scene frames")
closest_gaze_to_scene = recording.gaze.sample(recording.scene.ts)
print(closest_gaze_to_scene)
print(
    "closest_gaze_to_scene_times",
    (closest_gaze_to_scene.ts - recording.start_ts) / 1e9,
)


print()
print("Get closest before gaze for scene frames")
closest_gaze_before_scene = recording.gaze.sample(recording.scene.ts, method="before")
print(closest_gaze_before_scene)
print(
    "closest_gaze_before_scene_times",
    (closest_gaze_before_scene.ts - recording.start_ts) / 1e9,
)


print()
print("Sampled data can be resampled")

print()
print("Closest gaze sampled at 1 fps")
closest_gaze_to_scene_at_one_fps = closest_gaze_before_scene.sample(
    np.arange(closest_gaze_to_scene.ts[0], closest_gaze_to_scene.ts[-1], 1e9 / 1)
)
print(closest_gaze_to_scene_at_one_fps)
print(
    "closest_gaze_to_scene_at_one_fps_times",
    (closest_gaze_to_scene_at_one_fps.ts - recording.start_ts) / 1e9,
)

Eye Overlay

import sys

import cv2
import numpy as np
from tqdm import tqdm

# Workaround for https://github.com/opencv/opencv/issues/21952
cv2.imshow("cv/av bug", np.zeros(1))
cv2.destroyAllWindows()

import pupil_labs.neon_recording as nr  # noqa: E402, I001
from pupil_labs.neon_recording.stream.av_stream.video_stream import GrayFrame  # noqa: E402


def overlay_image(img, img_overlay, x, y):
    """Overlay `img_overlay` onto `img` at (x, y)."""
    # Image ranges
    y1, y2 = max(0, y), min(img.shape[0], y + img_overlay.shape[0])
    x1, x2 = max(0, x), min(img.shape[1], x + img_overlay.shape[1])

    # Overlay ranges
    y1o, y2o = max(0, -y), min(img_overlay.shape[0], img.shape[0] - y)
    x1o, x2o = max(0, -x), min(img_overlay.shape[1], img.shape[1] - x)

    if y1 >= y2 or x1 >= x2 or y1o >= y2o or x1o >= x2o:
        return

    img_crop = img[y1:y2, x1:x2]
    img_overlay_crop = img_overlay[y1o:y2o, x1o:x2o]
    img_crop[:] = img_overlay_crop


def make_overlaid_video(recording_dir, output_video_path, fps=None):
    recording = nr.open(recording_dir)

    video_writer = cv2.VideoWriter(
        str(output_video_path),
        cv2.VideoWriter_fourcc(*"MJPG"),
        fps,
        (recording.scene.width, recording.scene.height),
    )

    if fps is None:
        output_timestamps = recording.scene.ts
        fps = 30
    else:
        output_timestamps = np.arange(
            recording.scene.ts[0], recording.scene.ts[-1], int(1e9 / fps)
        )

    combined_data = zip(
        output_timestamps,
        recording.scene.sample(output_timestamps),
        recording.eye.sample(output_timestamps),
    )

    frame_idx = 0
    for ts, scene_frame, eye_frame in tqdm(combined_data, total=len(output_timestamps)):
        frame_idx += 1  # noqa: SIM113

        if abs(scene_frame.ts - ts) < 2e9 / fps:
            # if the video frame timestamp is too far ahead or behind temporally,
            # replace it with a gray frame
            frame_pixels = scene_frame.bgr
        else:
            frame_pixels = GrayFrame(scene_frame.width, scene_frame.height).bgr

        if abs(eye_frame.ts - ts) < 2e9 / fps:
            # if the video frame timestamp is too far ahead or behind temporally,
            # replace it with a gray frame
            eye_pixels = cv2.cvtColor(eye_frame.gray, cv2.COLOR_GRAY2BGR)
        else:
            eye_pixels = GrayFrame(eye_frame.width, eye_frame.height).bgr

        overlay_image(frame_pixels, eye_pixels, 50, 50)

        video_writer.write(frame_pixels)
        cv2.imshow("Frame", frame_pixels)
        cv2.pollKey()

    video_writer.release()


if __name__ == "__main__":
    make_overlaid_video(sys.argv[1], "eye-overlay-output-video.avi", None)

Eye State

import sys

import cv2
import numpy as np
from tqdm import tqdm

# Workaround for https://github.com/opencv/opencv/issues/21952
cv2.imshow("cv/av bug", np.zeros(1))
cv2.destroyAllWindows()

import pupil_labs.neon_recording as nr  # noqa: E402, I001
from pupil_labs.neon_recording.stream.av_stream.video_stream import GrayFrame  # noqa: E402


def overlay_image(img, img_overlay, x, y):
    """Overlay `img_overlay` onto `img` at (x, y)."""
    # Image ranges
    y1, y2 = max(0, y), min(img.shape[0], y + img_overlay.shape[0])
    x1, x2 = max(0, x), min(img.shape[1], x + img_overlay.shape[1])

    # Overlay ranges
    y1o, y2o = max(0, -y), min(img_overlay.shape[0], img.shape[0] - y)
    x1o, x2o = max(0, -x), min(img_overlay.shape[1], img.shape[1] - x)

    if y1 >= y2 or x1 >= x2 or y1o >= y2o or x1o >= x2o:
        return

    img_crop = img[y1:y2, x1:x2]
    img_overlay_crop = img_overlay[y1o:y2o, x1o:x2o]
    img_crop[:] = img_overlay_crop


def plot(img, data, value_range, x_width, color, line_width=2):
    for idx in range(1, len(data)):
        x_values = [int(idx2 * x_width) for idx2 in [idx - 1, idx]]

        y_norms = [
            (data[idx2] - value_range[0]) / (value_range[1] - value_range[0])
            for idx2 in [idx - 1, idx]
        ]
        y_values = [int(y_norm * img.shape[0]) for y_norm in y_norms]

        points = [[*v] for v in zip(x_values, y_values)]

        cv2.line(img, points[0], points[1], color, line_width)


def make_eye_state_video(recording_dir, output_video_path):
    recording = nr.open(recording_dir)

    fps = 200

    video_writer = cv2.VideoWriter(
        str(output_video_path),
        cv2.VideoWriter_fourcc(*"MJPG"),
        fps,
        (recording.eye.width, recording.eye.height),
    )

    output_timestamps = np.arange(
        recording.eye.ts[0], recording.eye.ts[-1], int(1e9 / fps)
    )

    eye_video_sampled = recording.eye.sample(output_timestamps)
    eye_state_sampled = recording.eye_state.sample(output_timestamps)
    combined_data = zip(
        output_timestamps,
        eye_video_sampled,
        eye_state_sampled,
    )

    plot_metas = {
        "optical_axis_left_x": {"color": [0, 0, 255]},
        "optical_axis_left_y": {
            "color": [0, 255, 0],
        },
        "optical_axis_left_z": {
            "color": [255, 0, 0],
        },
    }

    for plot_name, plot_meta in plot_metas.items():
        plot_meta["range"] = (
            np.min(recording.eye_state.data[plot_name]),
            np.max(recording.eye_state.data[plot_name]),
        )

    plot_duration_secs = 0.5
    plot_point_count = plot_duration_secs * fps
    plot_x_width = recording.eye.width / plot_point_count

    for ts, eye_frame, _eye_state in tqdm(combined_data, total=len(output_timestamps)):
        if abs(eye_frame.ts - ts) < 2e9 / fps:
            eye_pixels = cv2.cvtColor(eye_frame.gray, cv2.COLOR_GRAY2BGR)
        else:
            eye_pixels = GrayFrame(eye_frame.width, eye_frame.height).bgr

        for plot_name, plot_meta in plot_metas.items():
            min_ts = ts - plot_duration_secs * 1e9
            time_frame = (min_ts < eye_state_sampled.ts) & (eye_state_sampled.ts <= ts)
            plot_data = eye_state_sampled[time_frame][plot_name]
            plot(
                eye_pixels,
                plot_data,
                plot_meta["range"],
                plot_x_width,
                plot_meta["color"],
            )

        video_writer.write(eye_pixels)
        cv2.imshow("Frame", eye_pixels)
        cv2.pollKey()

    video_writer.release()


if __name__ == "__main__":
    make_eye_state_video(sys.argv[1], "eye-state-output-video.avi")

Find Clap

import sys

import numpy as np

import pupil_labs.neon_recording as nr


def find_clap(recording_dir):
    recording = nr.open(recording_dir)

    max_rms = -1
    max_time = -1

    for frame in recording.audio:
        audio_data = frame.to_ndarray()
        rms = np.sqrt(np.mean(audio_data**2))

        if rms > max_rms:
            max_rms = rms
            max_time = frame.ts

    rel_time = (max_time - recording.start_ts) / 1e9
    print(f"The loudest audio occurs at {max_time} (rel={rel_time}), rms = {max_rms}.")


if __name__ == "__main__":
    find_clap(sys.argv[1])

Gaze Overlay

import sys

import cv2
import numpy as np
from tqdm import tqdm

# Workaround for https://github.com/opencv/opencv/issues/21952
cv2.imshow("cv/av bug", np.zeros(1))
cv2.destroyAllWindows()

import pupil_labs.neon_recording as nr  # noqa: E402, I001
from pupil_labs.neon_recording.stream.av_stream.video_stream import GrayFrame  # noqa: E402


def make_overlaid_video(recording_dir, output_video_path, fps=None):
    recording = nr.open(recording_dir)

    video_writer = cv2.VideoWriter(
        str(output_video_path),
        cv2.VideoWriter_fourcc(*"MJPG"),
        fps,
        (recording.scene.width, recording.scene.height),
    )

    if fps is None:
        output_timestamps = recording.scene.ts
        fps = 30
    else:
        output_timestamps = np.arange(
            recording.scene.ts[0], recording.scene.ts[-1], 1e9 / fps
        )

    scene_datas = recording.scene.sample(output_timestamps)
    combined_data = zip(
        output_timestamps,
        scene_datas,
        recording.gaze.sample(output_timestamps),
    )

    for ts, scene_frame, gaze_datum in tqdm(
        combined_data, total=len(output_timestamps)
    ):
        if abs(scene_frame.ts - ts) < 2e9 / fps:
            frame_pixels = scene_frame.bgr
        else:
            frame_pixels = GrayFrame(scene_frame.width, scene_frame.height).bgr

        if abs(gaze_datum.ts - ts) < 2e9 / fps:
            frame_pixels = cv2.circle(
                frame_pixels,
                (int(gaze_datum.x), int(gaze_datum.y)),
                50,
                (0, 0, 255),
                10,
            )

        video_writer.write(frame_pixels)
        cv2.imshow("Frame", frame_pixels)
        cv2.pollKey()

    video_writer.release()


if __name__ == "__main__":
    make_overlaid_video(sys.argv[1], "gaze-overlay-output-video.avi", 24)

Imu

import sys

import cv2
import numpy as np
from scipy.spatial.transform import Rotation

# Workaround for https://github.com/opencv/opencv/issues/21952
cv2.imshow("cv/av bug", np.zeros(1))
cv2.destroyAllWindows()

import pupil_labs.neon_recording as nr  # noqa: E402

if len(sys.argv) < 2:
    print("Usage:")
    print("python imu.py path/to/recording/folder")

# Open a recording
recording = nr.open(sys.argv[1])

# Sample the IMU data at 60Hz
fps = 60
z = recording.gaze[:10]
timestamps = np.arange(recording.imu.ts[0], recording.imu.ts[-1], 1e9 / fps)
imu_data = recording.imu.sample(timestamps)

# Use scipy to convert the quaternions to euler angles
quaternions = np.array([s.quaternion_wxyz for s in imu_data])
rotations = (
    Rotation.from_quat(quaternions, scalar_first=True).as_euler(seq="yxz", degrees=True)
    % 360
)

# Combine the timestamps and eulers
rotations_with_time = np.column_stack((timestamps, rotations))
timestamped_eulers = np.array(
    [tuple(row) for row in rotations_with_time],
    dtype=[
        ("ts", np.int64),
        ("roll", np.float64),
        ("pitch", np.float64),
        ("yaw", np.float64),
    ],
)

# Display the angles
frame_size = 512
colors = {"pitch": (0, 0, 255), "yaw": (0, 255, 0), "roll": (255, 0, 0)}

for row in timestamped_eulers:
    # Create a blank image
    frame = np.zeros((frame_size, frame_size, 3), dtype=np.uint8)

    # Define the center and radius of the circles
    center = [frame_size // 2] * 2
    radius = frame.shape[0] // 3

    # Calculate the end points for the angles
    for field, color in colors.items():
        pitch_end = (
            int(center[0] + radius * np.cos(np.deg2rad(row[field]))),
            int(center[1] - radius * np.sin(np.deg2rad(row[field]))),
        )
        cv2.line(frame, center, pitch_end, color, 2)

        # Write the angle values on the image
        cv2.putText(
            frame,
            f"{field}: {row[field]:.2f}",
            (10, 30 + list(colors.keys()).index(field) * 30),
            cv2.FONT_HERSHEY_SIMPLEX,
            0.7,
            color,
            2,
        )

    # Display the image
    cv2.imshow("IMU Angles", frame)
    if cv2.waitKey(1000 // fps) == 27:
        break

cv2.destroyAllWindows()

Worn

import sys

import cv2
import numpy as np
from tqdm import tqdm

# Workaround for https://github.com/opencv/opencv/issues/21952
cv2.imshow("cv/av bug", np.zeros(1))
cv2.destroyAllWindows()

from pupil_labs import neon_recording as nr  # noqa: E402


def write_text(image, text, x, y):
    return cv2.putText(
        image,
        text,
        (x, y),
        cv2.FONT_HERSHEY_SIMPLEX,
        1,
        (0, 0, 255),
        2,
        cv2.LINE_AA,
    )


def make_overlaid_video(recording_dir, output_video_path, fps=30):
    recording = nr.open(recording_dir)

    video_writer = cv2.VideoWriter(
        str(output_video_path),
        cv2.VideoWriter_fourcc(*"MJPG"),
        fps,
        (recording.eye.width, recording.eye.height),
    )

    output_timestamps = np.arange(
        recording.eye.ts[0], recording.eye.ts[-1], int(1e9 / fps)
    )
    eyes_and_worn = zip(
        recording.eye.sample(output_timestamps),
        recording.worn.sample(output_timestamps),
    )

    for frame, worn_record in tqdm(eyes_and_worn, total=len(output_timestamps)):
        frame_pixels = frame.bgr

        text_y = 40
        if worn_record.worn:
            frame_pixels = write_text(frame_pixels, "Worn", 0, text_y)

        video_writer.write(frame_pixels)
        cv2.imshow("Frame", frame_pixels)
        cv2.pollKey()

    video_writer.release()


if __name__ == "__main__":
    make_overlaid_video(sys.argv[1], "worn.mp4")