Pupil Labs Neon Recording¶
Functionality for loading Neon recordings in native recording format
Installation¶
pip install pupil-labs-neon-recording
or
pip install -e git+https://github.com/pupil-labs/pl-neon-recording.git
Documentation¶
Documentation is available at https://pupil-labs.github.io/pl-neon-recording/
Usage¶
Basic Usage¶
import sys
import pupil_labs.neon_recording as nr
if len(sys.argv) < 2:
print("Usage:")
print("python basic_usage.py path/to/recording/folder")
# Open a recording
recording = nr.open(sys.argv[1])
# get basic info
print("Recording Info:")
print(f"\tStart time (ns): {recording.start_time}")
print(f"\tWearer : {recording.wearer['name']}")
print(f"\tDevice serial : {recording.device_serial}")
print(f"\tGaze samples : {len(recording.gaze)}")
print("")
# read 10 gaze samples
print("First 10 gaze samples:")
timestamps = recording.gaze.time[:10]
subsample = recording.gaze.sample(timestamps)
for gaze_datum in subsample:
print(
f"\t{gaze_datum.time} :",
f"({gaze_datum.point[0]:0.2f}, {gaze_datum.point[1]:0.2f})",
)
Blinks And Fixations¶
import sys
import cv2
import numpy as np
from tqdm import tqdm
# Workaround for https://github.com/opencv/opencv/issues/21952
cv2.imshow("cv/av bug", np.zeros(1))
cv2.destroyAllWindows()
import pupil_labs.neon_recording as nr # noqa: E402
from pupil_labs.neon_recording import match_ts # noqa: E402
from pupil_labs.video import Writer # noqa: E402
def write_text(image, text, x, y):
return cv2.putText(
image,
text,
(x, y),
cv2.FONT_HERSHEY_SIMPLEX,
1,
(0, 0, 255),
2,
cv2.LINE_AA,
)
def match_events(target_time, events):
# Blink start needs to be <= target_time
matches = match_ts(target_time, events.start_time, method="backward")
# Blink end needs to be >= target_time
matches_end = match_ts(target_time, events.stop_time, method="forward")
matches[np.isnan(matches_end)] = np.nan
matches[matches != matches_end] = np.nan
return matches
def make_overlaid_video(recording_dir, output_video_path):
recording = nr.open(recording_dir)
video_writer = Writer(
output_video_path,
)
video_start_time = recording.eye.time[0]
blink_matches = match_events(recording.eye.time, recording.blinks)
fixation_matches = match_events(recording.eye.time, recording.fixations)
blink_count = 0
fixation_count = 0
for frame, blink_index, fixation_index in tqdm(
zip(recording.eye, blink_matches, fixation_matches, strict=False),
total=len(recording.eye),
):
frame_pixels = frame.bgr
if not np.isnan(blink_index):
blink_count = int(blink_index + 1)
frame_pixels = write_text(frame_pixels, f"Blinks: {blink_count}", 0, 40)
if not np.isnan(fixation_index):
fixation_count = int(fixation_index + 1)
frame_pixels = write_text(frame_pixels, f"Fixations: {fixation_count}", 0, 80)
video_time = (frame.time - video_start_time) / 1e9
video_writer.write_image(frame_pixels, time=video_time)
cv2.imshow("Frame", frame_pixels)
cv2.pollKey()
video_writer.close()
if __name__ == "__main__":
make_overlaid_video(sys.argv[1], "blinks-and-fixations.mp4")
Data Access¶
import sys
from collections.abc import Mapping
from datetime import datetime
import numpy as np
import pupil_labs.neon_recording as nr
from pupil_labs.neon_recording.timeseries.timeseries import Timeseries
if len(sys.argv) < 2:
print("Usage:")
print("python basic_usage.py path/to/recording/folder")
# Open a recording
recording = nr.open(sys.argv[1])
def pretty_format(mapping: Mapping):
output = []
pad = " "
keys = mapping.keys()
n = max(len(key) for key in keys)
for k, v in mapping.items():
v_repr_lines = str(v).splitlines()
output.append(f"{pad}{k:>{n}}: {v_repr_lines[0]}")
if len(v_repr_lines) > 1:
output.extend(f"{pad + ' '}{n * ' '}{line}" for line in v_repr_lines[1:])
return "\n".join(output)
print("Basic Recording Info:")
print_data = {
"Recording ID": recording.id,
"Start time (ns since unix epoch)": f"{recording.start_time}",
"Start time (datetime)": f"{datetime.fromtimestamp(recording.start_time / 1e9)}",
"Duration (nanoseconds)": f"{recording.duration}",
"Duration (seconds)": f"{recording.duration / 1e9}",
"Wearer": f"{recording.wearer['name']} ({recording.wearer['uuid']})",
"Device serial": recording.device_serial,
"App version": recording.info["app_version"],
"Data format": recording.info["data_format_version"],
"Gaze Offset": recording.info["gaze_offset"],
}
print(pretty_format(print_data))
streams: list[Timeseries] = [
recording.gaze,
recording.imu,
recording.eyeball,
recording.pupil,
recording.eyelid,
recording.blinks,
recording.fixations,
recording.saccades,
recording.worn,
recording.eye,
recording.scene,
recording.audio,
]
print()
print("Recording streams:")
print(
pretty_format({
f"{stream.name} ({len(stream)} samples)": "\n" + pretty_format(stream[0])
for stream in streams
})
)
print()
print("Getting data from a stream:")
print()
print("Gaze points", recording.gaze.point)
# GazeArray([(1741948698620648018, 966.3677 , 439.58817),
# (1741948698630654018, 965.9669 , 441.60403),
# (1741948698635648018, 964.2665 , 442.4974 ), ...,
# (1741948717448190018, 757.85815, 852.34644),
# (1741948717453190018, 766.53174, 857.3709 ),
# (1741948717458190018, 730.93604, 851.53723)],
# dtype=[('ts', '<i8'), ('x', '<f4'), ('y', '<f4')])
print()
print("Gaze timestamps", recording.gaze.time)
# array([1741948698620648018, 1741948698630654018, 1741948698635648018, ...,
# 1741948717448190018, 1741948717453190018, 1741948717458190018])
# All stream data can also be accesses as structured numpy arrays and pandas dataframes.
print()
print("Gaze data as a structured numpy array:")
gaze_np = recording.gaze.data
print()
print(gaze_np)
print()
print("Gaze data as pandas dataframe:")
gaze_df = recording.gaze.pd
print()
print(gaze_df)
print()
print("Sampling data:")
print()
print("Get closest gaze for scene frames")
closest_gaze_to_scene = recording.gaze.sample(recording.scene.time)
print(closest_gaze_to_scene)
print(
"closest_gaze_to_scene_times",
(closest_gaze_to_scene.time - recording.start_time) / 1e9,
)
print()
print("Sampled data can be resampled")
print()
print("Closest gaze sampled at 1 fps")
closest_gaze_to_scene_at_one_fps = closest_gaze_to_scene.sample(
np.arange(
closest_gaze_to_scene.time[0],
closest_gaze_to_scene.time[-1],
1e9 / 1,
dtype=np.int64,
),
)
print(closest_gaze_to_scene_at_one_fps)
print(
"closest_gaze_to_scene_at_one_fps_times",
(closest_gaze_to_scene_at_one_fps.time - recording.start_time) / 1e9,
)
Eye Overlay¶
import sys
import cv2
import numpy as np
from tqdm import tqdm
# Workaround for https://github.com/opencv/opencv/issues/21952
cv2.imshow("cv/av bug", np.zeros(1))
cv2.destroyAllWindows()
import pupil_labs.neon_recording as nr # noqa: E402
from pupil_labs.neon_recording.timeseries.av.video import ( # noqa: E402
GrayFrame,
)
def overlay_image(img, img_overlay, x, y):
"""Overlay `img_overlay` onto `img` at (x, y)."""
# Image ranges
y1, y2 = max(0, y), min(img.shape[0], y + img_overlay.shape[0])
x1, x2 = max(0, x), min(img.shape[1], x + img_overlay.shape[1])
# Overlay ranges
y1o, y2o = max(0, -y), min(img_overlay.shape[0], img.shape[0] - y)
x1o, x2o = max(0, -x), min(img_overlay.shape[1], img.shape[1] - x)
if y1 >= y2 or x1 >= x2 or y1o >= y2o or x1o >= x2o:
return
img_crop = img[y1:y2, x1:x2]
img_overlay_crop = img_overlay[y1o:y2o, x1o:x2o]
img_crop[:] = img_overlay_crop
def make_overlaid_video(recording_dir, output_video_path, fps=30):
recording = nr.open(recording_dir)
video_writer = cv2.VideoWriter(
str(output_video_path),
cv2.VideoWriter_fourcc(*"MJPG"),
fps,
(recording.scene.width, recording.scene.height),
)
output_timestamps = np.arange(
recording.scene.time[0], recording.scene.time[-1], int(1e9 / fps)
)
combined_data = zip(
output_timestamps,
recording.scene.sample(output_timestamps),
recording.eye.sample(output_timestamps),
strict=False,
)
frame_idx = 0
for ts, scene_frame, eye_frame in tqdm(combined_data, total=len(output_timestamps)):
frame_idx += 1 # noqa: SIM113
if abs(scene_frame.time - ts) < 2e9 / fps:
frame_pixels = scene_frame.bgr
else:
# if the video frame timestamp is too far ahead or behind temporally,
# replace it with a gray frame
frame_pixels = GrayFrame(scene_frame.width, scene_frame.height).bgr
if abs(eye_frame.time - ts) < 2e9 / fps:
eye_pixels = cv2.cvtColor(eye_frame.gray, cv2.COLOR_GRAY2BGR)
else:
# if the video frame timestamp is too far ahead or behind temporally,
# replace it with a gray frame
eye_pixels = GrayFrame(eye_frame.width, eye_frame.height).bgr
overlay_image(frame_pixels, eye_pixels, 50, 50)
video_writer.write(frame_pixels)
cv2.imshow("Frame", frame_pixels)
cv2.pollKey()
video_writer.release()
if __name__ == "__main__":
make_overlaid_video(sys.argv[1], "eye-overlay-output-video.avi")
Eye State¶
import sys
import cv2
import numpy as np
from tqdm import tqdm
# Workaround for https://github.com/opencv/opencv/issues/21952
cv2.imshow("cv/av bug", np.zeros(1))
cv2.destroyAllWindows()
import pupil_labs.neon_recording as nr # noqa: E402
from pupil_labs.video import Writer # noqa: E402
def overlay_image(img, img_overlay, x, y):
"""Overlay `img_overlay` onto `img` at (x, y)."""
# Image ranges
y1, y2 = max(0, y), min(img.shape[0], y + img_overlay.shape[0])
x1, x2 = max(0, x), min(img.shape[1], x + img_overlay.shape[1])
# Overlay ranges
y1o, y2o = max(0, -y), min(img_overlay.shape[0], img.shape[0] - y)
x1o, x2o = max(0, -x), min(img_overlay.shape[1], img.shape[1] - x)
if y1 >= y2 or x1 >= x2 or y1o >= y2o or x1o >= x2o:
return
img_crop = img[y1:y2, x1:x2]
img_overlay_crop = img_overlay[y1o:y2o, x1o:x2o]
img_crop[:] = img_overlay_crop
def plot(img, data, value_range, x_width, color, line_width=2):
for idx in range(1, len(data)):
x_values = [int(idx2 * x_width) for idx2 in [idx - 1, idx]]
y_norms = [
(data[idx2] - value_range[0]) / (value_range[1] - value_range[0])
for idx2 in [idx - 1, idx]
]
y_values = [int(y_norm * img.shape[0]) for y_norm in y_norms]
points = [[*v] for v in zip(x_values, y_values, strict=False)]
cv2.line(img, points[0], points[1], color, line_width)
def make_eye_state_video(recording_dir, output_video_path):
recording = nr.open(recording_dir)
fps = 200
video_writer = Writer(output_video_path)
video_start_time = recording.eye.time[0]
plot_config = [
{"color": [0, 0, 255]},
{"color": [0, 255, 0]},
{"color": [255, 0, 0]},
]
for dim, config in enumerate(plot_config):
config["range"] = (
np.min(recording.eyeball.optical_axis_left[:, dim]),
np.max(recording.eyeball.optical_axis_left[:, dim]),
)
plot_duration_secs = 0.5
plot_point_count = plot_duration_secs * fps
plot_x_width = recording.eye.width / plot_point_count
for eye_sample in tqdm(recording.eye):
eye_pixels = eye_sample.bgr
for dim, config in enumerate(plot_config):
min_ts = eye_sample.time - plot_duration_secs * 1e9
mask = (min_ts < recording.eyeball.time) & (
recording.eyeball.time <= eye_sample.time
)
plot_data = recording.eyeball.optical_axis_left[mask, dim]
plot(
eye_pixels,
plot_data,
config["range"],
plot_x_width,
config["color"],
)
video_time = (eye_sample.time - video_start_time) / 1e9
video_writer.write_image(eye_pixels, time=video_time)
cv2.imshow("Frame", eye_pixels)
cv2.pollKey()
video_writer.close()
if __name__ == "__main__":
make_eye_state_video(sys.argv[1], "eye-state-output-video.avi")
Find Clap¶
import sys
import numpy as np
from tqdm import tqdm
import pupil_labs.neon_recording as nr
def find_clap(recording_dir, window_size_seconds=0.1, first_n_seconds=10):
recording = nr.open(recording_dir)
audio_data = np.array([], dtype=np.float32)
ts_lookup = []
for frame in recording.audio:
if first_n_seconds is not None:
rel_time = (frame.time - recording.start_time) / 1e9
if rel_time > first_n_seconds:
break
# Create a timestamp lookup table
ts_lookup.append([len(audio_data), frame.time])
# Gather all the audio samples
audio_data = np.concatenate((audio_data, frame.to_ndarray().flatten()))
ts_lookup = np.array(ts_lookup)
# Calculate RMS over a sliding window
# Remember the sample index of the loudest window
max_rms = 0
loudest_sample_idx = 0
samples_per_window = int(window_size_seconds * recording.audio.rate)
for i in tqdm(range(len(audio_data) - samples_per_window)):
segment = audio_data[i : i + samples_per_window]
rms = np.sqrt(np.mean(np.square(segment)))
if rms > max_rms:
max_rms = rms
loudest_sample_idx = int(i + samples_per_window / 2)
# Find the reference timestamp from the lookup table
lookup_idx = np.searchsorted(ts_lookup[:, 0], loudest_sample_idx) - 1
reference_ts = ts_lookup[lookup_idx, 1]
# Calculate the sample timestamp using the reference timestamp
samples_after_reference = loudest_sample_idx - ts_lookup[lookup_idx, 0]
loudest_time = reference_ts + (samples_after_reference / recording.audio.rate) * 1e9
print(f"The loudest audio occurs at {loudest_time:.0f} rms = {max_rms:.3f}.")
print(
f" Relative to recording start: "
f"{(loudest_time - recording.start_time) / 1e9:0.3f}s"
)
print(
f" Relative to video start : "
f"{(loudest_time - recording.scene.time[0]) / 1e9:0.3f}s"
)
print(
f" Relative to audio start : "
f"{(loudest_time - recording.audio.time[0]) / 1e9:0.3f}s"
)
if __name__ == "__main__":
find_clap(sys.argv[1])
Gaze Overlay¶
import sys
import cv2
import numpy as np
from tqdm import tqdm
# Workaround for https://github.com/opencv/opencv/issues/21952
cv2.imshow("cv/av bug", np.zeros(1))
cv2.destroyAllWindows()
import pupil_labs.neon_recording as nr # noqa: E402
from pupil_labs.video import Writer # noqa: E402
def make_overlaid_video(recording_dir, output_video_path):
rec = nr.open(recording_dir)
combined_data = zip(
rec.scene,
rec.gaze.sample(rec.scene.time),
strict=True,
)
video_writer = Writer(output_video_path)
video_start_time = rec.scene.time[0]
for scene_frame, gaze_datum in tqdm(combined_data, total=len(rec.scene.time)):
frame_pixels = scene_frame.bgr
frame_pixels = cv2.circle(
frame_pixels,
(int(gaze_datum.point[0]), int(gaze_datum.point[1])),
50,
(0, 0, 255),
10,
)
video_time = (scene_frame.time - video_start_time) / 1e9
video_writer.write_image(frame_pixels, time=video_time)
cv2.imshow("Frame", frame_pixels)
cv2.waitKey(30)
video_writer.close()
if __name__ == "__main__":
make_overlaid_video(sys.argv[1], "gaze-overlay-output-video.mp4")
Imu¶
import sys
import cv2
import numpy as np
from scipy.spatial.transform import Rotation
from tqdm import tqdm
# Workaround for https://github.com/opencv/opencv/issues/21952
cv2.imshow("cv/av bug", np.zeros(1))
cv2.destroyAllWindows()
import pupil_labs.neon_recording as nr # noqa: E402
if len(sys.argv) < 2:
print("Usage:")
print("python imu.py path/to/recording/folder")
# Open a recording
recording = nr.open(sys.argv[1])
# Sample the IMU data at 60Hz
fps = 60
z = recording.gaze[:10]
timestamps = np.arange(
recording.imu.time[0], recording.imu.time[-1], 1e9 / fps, dtype=np.int64
)
imu_data = recording.imu.sample(timestamps)
# Use scipy to convert the quaternions to euler angles
quaternions = np.array([s.rotation for s in imu_data])
rotations = Rotation.from_quat(quaternions).as_euler(seq="yxz", degrees=True) % 360
# Combine the timestamps and eulers
rotations_with_time = np.column_stack((timestamps, rotations))
timestamped_eulers = np.array(
[tuple(row) for row in rotations_with_time],
dtype=[
("time", np.int64),
("roll", np.float64),
("pitch", np.float64),
("yaw", np.float64),
],
)
# Display the angles
frame_size = 512
colors = {"pitch": (0, 0, 255), "yaw": (0, 255, 0), "roll": (255, 0, 0)}
for row in tqdm(timestamped_eulers):
# Create a blank image
frame = np.zeros((frame_size, frame_size, 3), dtype=np.uint8)
# Define the center and radius of the circles
center = [frame_size // 2] * 2
radius = frame.shape[0] // 3
# Calculate the end points for the angles
for field, color in colors.items():
pitch_end = (
int(center[0] + radius * np.cos(np.deg2rad(row[field]))),
int(center[1] - radius * np.sin(np.deg2rad(row[field]))),
)
cv2.line(frame, center, pitch_end, color, 2)
# Write the angle values on the image
cv2.putText(
frame,
f"{field}: {row[field]:.2f}",
(10, 30 + list(colors.keys()).index(field) * 30),
cv2.FONT_HERSHEY_SIMPLEX,
0.7,
color,
2,
)
# Display the image
cv2.imshow("IMU Angles", frame)
if cv2.waitKey(1000 // fps) == 27:
break
cv2.destroyAllWindows()
Worn¶
import sys
import cv2
import numpy as np
from tqdm import tqdm
# Workaround for https://github.com/opencv/opencv/issues/21952
cv2.imshow("cv/av bug", np.zeros(1))
cv2.destroyAllWindows()
from pupil_labs import neon_recording as nr # noqa: E402
def write_text(image, text, x, y):
return cv2.putText(
image,
text,
(x, y),
cv2.FONT_HERSHEY_SIMPLEX,
1,
(0, 0, 255),
2,
cv2.LINE_AA,
)
def make_overlaid_video(recording_dir, output_video_path, fps=30):
recording = nr.open(recording_dir)
video_writer = cv2.VideoWriter(
str(output_video_path),
cv2.VideoWriter_fourcc(*"MJPG"),
fps,
(recording.eye.width, recording.eye.height),
)
output_timestamps = np.arange(
recording.eye.time[0], recording.eye.time[-1], int(1e9 / fps)
)
eyes_and_worn = zip(
recording.eye.sample(output_timestamps),
recording.worn.sample(output_timestamps),
strict=False,
)
for frame, worn_record in tqdm(eyes_and_worn, total=len(output_timestamps)):
frame_pixels = frame.bgr
text_y = 40
if worn_record.worn:
frame_pixels = write_text(frame_pixels, "Worn", 0, text_y)
video_writer.write(frame_pixels)
cv2.imshow("Frame", frame_pixels)
cv2.pollKey()
video_writer.release()
if __name__ == "__main__":
make_overlaid_video(sys.argv[1], "worn.mp4")