Skip to content

Pupil Labs Neon Recording

ci documentation uv ruff pre-commit pypi version

Functionality for loading Neon recordings in native recording format

Installation

pip install pupil-labs-neon-recording

or

pip install -e git+https://github.com/pupil-labs/pl-neon-recording.git

Documentation

Documentation is available at https://pupil-labs.github.io/pl-neon-recording/

Usage

Basic Usage

import sys

import pupil_labs.neon_recording as nr

if len(sys.argv) < 2:
    print("Usage:")
    print("python basic_usage.py path/to/recording/folder")

# Open a recording
recording = nr.open(sys.argv[1])

# get basic info
print("Recording Info:")
print(f"\tStart time (ns): {recording.start_time}")
print(f"\tWearer         : {recording.wearer['name']}")
print(f"\tDevice serial  : {recording.device_serial}")
print(f"\tGaze samples   : {len(recording.gaze)}")
print("")

# read 10 gaze samples
print("First 10 gaze samples:")
timestamps = recording.gaze.time[:10]
subsample = recording.gaze.sample(timestamps)
for gaze_datum in subsample:
    print(
        f"\t{gaze_datum.time} :",
        f"({gaze_datum.point[0]:0.2f}, {gaze_datum.point[1]:0.2f})",
    )
import sys

import cv2
import numpy as np
from tqdm import tqdm

# Workaround for https://github.com/opencv/opencv/issues/21952
cv2.imshow("cv/av bug", np.zeros(1))
cv2.destroyAllWindows()

import pupil_labs.neon_recording as nr  # noqa: E402
from pupil_labs.neon_recording import match_ts  # noqa: E402
from pupil_labs.video import Writer  # noqa: E402


def write_text(image, text, x, y):
    return cv2.putText(
        image,
        text,
        (x, y),
        cv2.FONT_HERSHEY_SIMPLEX,
        1,
        (0, 0, 255),
        2,
        cv2.LINE_AA,
    )


def match_events(target_time, events):
    # Blink start needs to be <= target_time
    matches = match_ts(target_time, events.start_time, method="backward")

    # Blink end needs to be >= target_time
    matches_end = match_ts(target_time, events.stop_time, method="forward")

    matches[np.isnan(matches_end)] = np.nan
    matches[matches != matches_end] = np.nan

    return matches


def make_overlaid_video(recording_dir, output_video_path):
    recording = nr.open(recording_dir)

    video_writer = Writer(
        output_video_path,
    )
    video_start_time = recording.eye.time[0]

    blink_matches = match_events(recording.eye.time, recording.blinks)
    fixation_matches = match_events(recording.eye.time, recording.fixations)

    blink_count = 0
    fixation_count = 0
    for frame, blink_index, fixation_index in tqdm(
        zip(recording.eye, blink_matches, fixation_matches, strict=False),
        total=len(recording.eye),
    ):
        frame_pixels = frame.bgr

        if not np.isnan(blink_index):
            blink_count = int(blink_index + 1)
        frame_pixels = write_text(frame_pixels, f"Blinks: {blink_count}", 0, 40)

        if not np.isnan(fixation_index):
            fixation_count = int(fixation_index + 1)
        frame_pixels = write_text(frame_pixels, f"Fixations: {fixation_count}", 0, 80)

        video_time = (frame.time - video_start_time) / 1e9
        video_writer.write_image(frame_pixels, time=video_time)

        cv2.imshow("Frame", frame_pixels)
        cv2.pollKey()

    video_writer.close()


if __name__ == "__main__":
    make_overlaid_video(sys.argv[1], "blinks-and-fixations.mp4")

Data Access

import sys
from collections.abc import Mapping
from datetime import datetime

import numpy as np

import pupil_labs.neon_recording as nr
from pupil_labs.neon_recording.timeseries.timeseries import Timeseries

if len(sys.argv) < 2:
    print("Usage:")
    print("python basic_usage.py path/to/recording/folder")

# Open a recording
recording = nr.open(sys.argv[1])


def pretty_format(mapping: Mapping):
    output = []
    pad = "    "
    keys = mapping.keys()
    n = max(len(key) for key in keys)
    for k, v in mapping.items():
        v_repr_lines = str(v).splitlines()
        output.append(f"{pad}{k:>{n}}: {v_repr_lines[0]}")
        if len(v_repr_lines) > 1:
            output.extend(f"{pad + '  '}{n * ' '}{line}" for line in v_repr_lines[1:])
    return "\n".join(output)


print("Basic Recording Info:")
print_data = {
    "Recording ID": recording.id,
    "Start time (ns since unix epoch)": f"{recording.start_time}",
    "Start time (datetime)": f"{datetime.fromtimestamp(recording.start_time / 1e9)}",
    "Duration (nanoseconds)": f"{recording.duration}",
    "Duration (seconds)": f"{recording.duration / 1e9}",
    "Wearer": f"{recording.wearer['name']} ({recording.wearer['uuid']})",
    "Device serial": recording.device_serial,
    "App version": recording.info["app_version"],
    "Data format": recording.info["data_format_version"],
    "Gaze Offset": recording.info["gaze_offset"],
}
print(pretty_format(print_data))

streams: list[Timeseries] = [
    recording.gaze,
    recording.imu,
    recording.eyeball,
    recording.pupil,
    recording.eyelid,
    recording.blinks,
    recording.fixations,
    recording.saccades,
    recording.worn,
    recording.eye,
    recording.scene,
    recording.audio,
]
print()
print("Recording streams:")
print(
    pretty_format({
        f"{stream.name} ({len(stream)} samples)": "\n" + pretty_format(stream[0])
        for stream in streams
    })
)

print()
print("Getting data from a stream:")

print()
print("Gaze points", recording.gaze.point)
# GazeArray([(1741948698620648018, 966.3677 , 439.58817),
#            (1741948698630654018, 965.9669 , 441.60403),
#            (1741948698635648018, 964.2665 , 442.4974 ), ...,
#            (1741948717448190018, 757.85815, 852.34644),
#            (1741948717453190018, 766.53174, 857.3709 ),
#            (1741948717458190018, 730.93604, 851.53723)],
#           dtype=[('ts', '<i8'), ('x', '<f4'), ('y', '<f4')])

print()
print("Gaze timestamps", recording.gaze.time)
# array([1741948698620648018, 1741948698630654018, 1741948698635648018, ...,
#        1741948717448190018, 1741948717453190018, 1741948717458190018])


# All stream data can also be accesses as structured numpy arrays and pandas dataframes.

print()
print("Gaze data as a structured numpy array:")
gaze_np = recording.gaze.data
print()
print(gaze_np)


print()
print("Gaze data as pandas dataframe:")
gaze_df = recording.gaze.pd
print()
print(gaze_df)

print()
print("Sampling data:")

print()
print("Get closest gaze for scene frames")
closest_gaze_to_scene = recording.gaze.sample(recording.scene.time)
print(closest_gaze_to_scene)
print(
    "closest_gaze_to_scene_times",
    (closest_gaze_to_scene.time - recording.start_time) / 1e9,
)

print()
print("Sampled data can be resampled")

print()
print("Closest gaze sampled at 1 fps")
closest_gaze_to_scene_at_one_fps = closest_gaze_to_scene.sample(
    np.arange(
        closest_gaze_to_scene.time[0],
        closest_gaze_to_scene.time[-1],
        1e9 / 1,
        dtype=np.int64,
    ),
)
print(closest_gaze_to_scene_at_one_fps)
print(
    "closest_gaze_to_scene_at_one_fps_times",
    (closest_gaze_to_scene_at_one_fps.time - recording.start_time) / 1e9,
)

Eye Overlay

import sys

import cv2
import numpy as np
from tqdm import tqdm

# Workaround for https://github.com/opencv/opencv/issues/21952
cv2.imshow("cv/av bug", np.zeros(1))
cv2.destroyAllWindows()

import pupil_labs.neon_recording as nr  # noqa: E402
from pupil_labs.neon_recording.timeseries.av.video import (  # noqa: E402
    GrayFrame,
)


def overlay_image(img, img_overlay, x, y):
    """Overlay `img_overlay` onto `img` at (x, y)."""
    # Image ranges
    y1, y2 = max(0, y), min(img.shape[0], y + img_overlay.shape[0])
    x1, x2 = max(0, x), min(img.shape[1], x + img_overlay.shape[1])

    # Overlay ranges
    y1o, y2o = max(0, -y), min(img_overlay.shape[0], img.shape[0] - y)
    x1o, x2o = max(0, -x), min(img_overlay.shape[1], img.shape[1] - x)

    if y1 >= y2 or x1 >= x2 or y1o >= y2o or x1o >= x2o:
        return

    img_crop = img[y1:y2, x1:x2]
    img_overlay_crop = img_overlay[y1o:y2o, x1o:x2o]
    img_crop[:] = img_overlay_crop


def make_overlaid_video(recording_dir, output_video_path, fps=30):
    recording = nr.open(recording_dir)

    video_writer = cv2.VideoWriter(
        str(output_video_path),
        cv2.VideoWriter_fourcc(*"MJPG"),
        fps,
        (recording.scene.width, recording.scene.height),
    )

    output_timestamps = np.arange(
        recording.scene.time[0], recording.scene.time[-1], int(1e9 / fps)
    )

    combined_data = zip(
        output_timestamps,
        recording.scene.sample(output_timestamps),
        recording.eye.sample(output_timestamps),
        strict=False,
    )

    frame_idx = 0
    for ts, scene_frame, eye_frame in tqdm(combined_data, total=len(output_timestamps)):
        frame_idx += 1  # noqa: SIM113

        if abs(scene_frame.time - ts) < 2e9 / fps:
            frame_pixels = scene_frame.bgr
        else:
            # if the video frame timestamp is too far ahead or behind temporally,
            # replace it with a gray frame
            frame_pixels = GrayFrame(scene_frame.width, scene_frame.height).bgr

        if abs(eye_frame.time - ts) < 2e9 / fps:
            eye_pixels = cv2.cvtColor(eye_frame.gray, cv2.COLOR_GRAY2BGR)
        else:
            # if the video frame timestamp is too far ahead or behind temporally,
            # replace it with a gray frame
            eye_pixels = GrayFrame(eye_frame.width, eye_frame.height).bgr

        overlay_image(frame_pixels, eye_pixels, 50, 50)

        video_writer.write(frame_pixels)
        cv2.imshow("Frame", frame_pixels)
        cv2.pollKey()

    video_writer.release()


if __name__ == "__main__":
    make_overlaid_video(sys.argv[1], "eye-overlay-output-video.avi")

Eye State

import sys

import cv2
import numpy as np
from tqdm import tqdm

# Workaround for https://github.com/opencv/opencv/issues/21952
cv2.imshow("cv/av bug", np.zeros(1))
cv2.destroyAllWindows()

import pupil_labs.neon_recording as nr  # noqa: E402
from pupil_labs.video import Writer  # noqa: E402


def overlay_image(img, img_overlay, x, y):
    """Overlay `img_overlay` onto `img` at (x, y)."""
    # Image ranges
    y1, y2 = max(0, y), min(img.shape[0], y + img_overlay.shape[0])
    x1, x2 = max(0, x), min(img.shape[1], x + img_overlay.shape[1])

    # Overlay ranges
    y1o, y2o = max(0, -y), min(img_overlay.shape[0], img.shape[0] - y)
    x1o, x2o = max(0, -x), min(img_overlay.shape[1], img.shape[1] - x)

    if y1 >= y2 or x1 >= x2 or y1o >= y2o or x1o >= x2o:
        return

    img_crop = img[y1:y2, x1:x2]
    img_overlay_crop = img_overlay[y1o:y2o, x1o:x2o]
    img_crop[:] = img_overlay_crop


def plot(img, data, value_range, x_width, color, line_width=2):
    for idx in range(1, len(data)):
        x_values = [int(idx2 * x_width) for idx2 in [idx - 1, idx]]

        y_norms = [
            (data[idx2] - value_range[0]) / (value_range[1] - value_range[0])
            for idx2 in [idx - 1, idx]
        ]
        y_values = [int(y_norm * img.shape[0]) for y_norm in y_norms]

        points = [[*v] for v in zip(x_values, y_values, strict=False)]

        cv2.line(img, points[0], points[1], color, line_width)


def make_eye_state_video(recording_dir, output_video_path):
    recording = nr.open(recording_dir)

    fps = 200
    video_writer = Writer(output_video_path)
    video_start_time = recording.eye.time[0]

    plot_config = [
        {"color": [0, 0, 255]},
        {"color": [0, 255, 0]},
        {"color": [255, 0, 0]},
    ]

    for dim, config in enumerate(plot_config):
        config["range"] = (
            np.min(recording.eyeball.optical_axis_left[:, dim]),
            np.max(recording.eyeball.optical_axis_left[:, dim]),
        )

    plot_duration_secs = 0.5
    plot_point_count = plot_duration_secs * fps
    plot_x_width = recording.eye.width / plot_point_count

    for eye_sample in tqdm(recording.eye):
        eye_pixels = eye_sample.bgr

        for dim, config in enumerate(plot_config):
            min_ts = eye_sample.time - plot_duration_secs * 1e9
            mask = (min_ts < recording.eyeball.time) & (
                recording.eyeball.time <= eye_sample.time
            )
            plot_data = recording.eyeball.optical_axis_left[mask, dim]
            plot(
                eye_pixels,
                plot_data,
                config["range"],
                plot_x_width,
                config["color"],
            )

        video_time = (eye_sample.time - video_start_time) / 1e9
        video_writer.write_image(eye_pixels, time=video_time)
        cv2.imshow("Frame", eye_pixels)
        cv2.pollKey()

    video_writer.close()


if __name__ == "__main__":
    make_eye_state_video(sys.argv[1], "eye-state-output-video.avi")

Find Clap

import sys

import numpy as np
from tqdm import tqdm

import pupil_labs.neon_recording as nr


def find_clap(recording_dir, window_size_seconds=0.1, first_n_seconds=10):
    recording = nr.open(recording_dir)

    audio_data = np.array([], dtype=np.float32)
    ts_lookup = []
    for frame in recording.audio:
        if first_n_seconds is not None:
            rel_time = (frame.time - recording.start_time) / 1e9
            if rel_time > first_n_seconds:
                break

        # Create a timestamp lookup table
        ts_lookup.append([len(audio_data), frame.time])

        # Gather all the audio samples
        audio_data = np.concatenate((audio_data, frame.to_ndarray().flatten()))

    ts_lookup = np.array(ts_lookup)

    # Calculate RMS over a sliding window
    # Remember the sample index of the loudest window
    max_rms = 0
    loudest_sample_idx = 0

    samples_per_window = int(window_size_seconds * recording.audio.rate)
    for i in tqdm(range(len(audio_data) - samples_per_window)):
        segment = audio_data[i : i + samples_per_window]
        rms = np.sqrt(np.mean(np.square(segment)))

        if rms > max_rms:
            max_rms = rms
            loudest_sample_idx = int(i + samples_per_window / 2)

    # Find the reference timestamp from the lookup table
    lookup_idx = np.searchsorted(ts_lookup[:, 0], loudest_sample_idx) - 1
    reference_ts = ts_lookup[lookup_idx, 1]

    # Calculate the sample timestamp using the reference timestamp
    samples_after_reference = loudest_sample_idx - ts_lookup[lookup_idx, 0]
    loudest_time = reference_ts + (samples_after_reference / recording.audio.rate) * 1e9

    print(f"The loudest audio occurs at {loudest_time:.0f} rms = {max_rms:.3f}.")
    print(
        f"    Relative to recording start: "
        f"{(loudest_time - recording.start_time) / 1e9:0.3f}s"
    )
    print(
        f"    Relative to video start    : "
        f"{(loudest_time - recording.scene.time[0]) / 1e9:0.3f}s"
    )
    print(
        f"    Relative to audio start    : "
        f"{(loudest_time - recording.audio.time[0]) / 1e9:0.3f}s"
    )


if __name__ == "__main__":
    find_clap(sys.argv[1])

Gaze Overlay

import sys

import cv2
import numpy as np
from tqdm import tqdm

# Workaround for https://github.com/opencv/opencv/issues/21952
cv2.imshow("cv/av bug", np.zeros(1))
cv2.destroyAllWindows()

import pupil_labs.neon_recording as nr  # noqa: E402
from pupil_labs.video import Writer  # noqa: E402


def make_overlaid_video(recording_dir, output_video_path):
    rec = nr.open(recording_dir)

    combined_data = zip(
        rec.scene,
        rec.gaze.sample(rec.scene.time),
        strict=True,
    )

    video_writer = Writer(output_video_path)
    video_start_time = rec.scene.time[0]

    for scene_frame, gaze_datum in tqdm(combined_data, total=len(rec.scene.time)):
        frame_pixels = scene_frame.bgr

        frame_pixels = cv2.circle(
            frame_pixels,
            (int(gaze_datum.point[0]), int(gaze_datum.point[1])),
            50,
            (0, 0, 255),
            10,
        )

        video_time = (scene_frame.time - video_start_time) / 1e9
        video_writer.write_image(frame_pixels, time=video_time)

        cv2.imshow("Frame", frame_pixels)
        cv2.waitKey(30)

    video_writer.close()


if __name__ == "__main__":
    make_overlaid_video(sys.argv[1], "gaze-overlay-output-video.mp4")

Imu

import sys

import cv2
import numpy as np
from scipy.spatial.transform import Rotation
from tqdm import tqdm

# Workaround for https://github.com/opencv/opencv/issues/21952
cv2.imshow("cv/av bug", np.zeros(1))
cv2.destroyAllWindows()

import pupil_labs.neon_recording as nr  # noqa: E402

if len(sys.argv) < 2:
    print("Usage:")
    print("python imu.py path/to/recording/folder")

# Open a recording
recording = nr.open(sys.argv[1])

# Sample the IMU data at 60Hz
fps = 60
z = recording.gaze[:10]
timestamps = np.arange(
    recording.imu.time[0], recording.imu.time[-1], 1e9 / fps, dtype=np.int64
)
imu_data = recording.imu.sample(timestamps)

# Use scipy to convert the quaternions to euler angles
quaternions = np.array([s.rotation for s in imu_data])
rotations = Rotation.from_quat(quaternions).as_euler(seq="yxz", degrees=True) % 360

# Combine the timestamps and eulers
rotations_with_time = np.column_stack((timestamps, rotations))
timestamped_eulers = np.array(
    [tuple(row) for row in rotations_with_time],
    dtype=[
        ("time", np.int64),
        ("roll", np.float64),
        ("pitch", np.float64),
        ("yaw", np.float64),
    ],
)

# Display the angles
frame_size = 512
colors = {"pitch": (0, 0, 255), "yaw": (0, 255, 0), "roll": (255, 0, 0)}

for row in tqdm(timestamped_eulers):
    # Create a blank image
    frame = np.zeros((frame_size, frame_size, 3), dtype=np.uint8)

    # Define the center and radius of the circles
    center = [frame_size // 2] * 2
    radius = frame.shape[0] // 3

    # Calculate the end points for the angles
    for field, color in colors.items():
        pitch_end = (
            int(center[0] + radius * np.cos(np.deg2rad(row[field]))),
            int(center[1] - radius * np.sin(np.deg2rad(row[field]))),
        )
        cv2.line(frame, center, pitch_end, color, 2)

        # Write the angle values on the image
        cv2.putText(
            frame,
            f"{field}: {row[field]:.2f}",
            (10, 30 + list(colors.keys()).index(field) * 30),
            cv2.FONT_HERSHEY_SIMPLEX,
            0.7,
            color,
            2,
        )

    # Display the image
    cv2.imshow("IMU Angles", frame)
    if cv2.waitKey(1000 // fps) == 27:
        break

cv2.destroyAllWindows()

Worn

import sys

import cv2
import numpy as np
from tqdm import tqdm

# Workaround for https://github.com/opencv/opencv/issues/21952
cv2.imshow("cv/av bug", np.zeros(1))
cv2.destroyAllWindows()

from pupil_labs import neon_recording as nr  # noqa: E402


def write_text(image, text, x, y):
    return cv2.putText(
        image,
        text,
        (x, y),
        cv2.FONT_HERSHEY_SIMPLEX,
        1,
        (0, 0, 255),
        2,
        cv2.LINE_AA,
    )


def make_overlaid_video(recording_dir, output_video_path, fps=30):
    recording = nr.open(recording_dir)

    video_writer = cv2.VideoWriter(
        str(output_video_path),
        cv2.VideoWriter_fourcc(*"MJPG"),
        fps,
        (recording.eye.width, recording.eye.height),
    )

    output_timestamps = np.arange(
        recording.eye.time[0], recording.eye.time[-1], int(1e9 / fps)
    )
    eyes_and_worn = zip(
        recording.eye.sample(output_timestamps),
        recording.worn.sample(output_timestamps),
        strict=False,
    )

    for frame, worn_record in tqdm(eyes_and_worn, total=len(output_timestamps)):
        frame_pixels = frame.bgr

        text_y = 40
        if worn_record.worn:
            frame_pixels = write_text(frame_pixels, "Worn", 0, text_y)

        video_writer.write(frame_pixels)
        cv2.imshow("Frame", frame_pixels)
        cv2.pollKey()

    video_writer.release()


if __name__ == "__main__":
    make_overlaid_video(sys.argv[1], "worn.mp4")