Pupil Labs Neon Recording¶
Functionality for loading Neon recordings in native recording format
Installation¶
pip install pupil-labs-neon-recording
or
pip install -e git+https://github.com/pupil-labs/pl-neon-recording.git
Documentation¶
Documentation is available at https://pupil-labs.github.io/pl-neon-recording/
Usage¶
Basic Usage¶
import sys
import pupil_labs.neon_recording as nr
if len(sys.argv) < 2:
print("Usage:")
print("python basic_usage.py path/to/recording/folder")
# Open a recording
recording = nr.open(sys.argv[1])
# get basic info
print("Recording Info:")
print(f"\tStart time (ns): {recording.start_ts}")
print(f"\tWearer : {recording.wearer['name']}")
print(f"\tDevice serial : {recording.device_serial}")
print(f"\tGaze samples : {len(recording.gaze)}")
print("")
# read 10 gaze samples
print("First 10 gaze samples:")
timestamps = recording.gaze.ts[:10]
subsample = recording.gaze.sample(timestamps)
for gaze_datum in subsample:
print(f"\t{gaze_datum.ts} : ({gaze_datum.x:0.2f}, {gaze_datum.y:0.2f})")
Blinks And Fixations¶
import sys
import cv2
import numpy as np
from tqdm import tqdm
# Workaround for https://github.com/opencv/opencv/issues/21952
cv2.imshow("cv/av bug", np.zeros(1))
cv2.destroyAllWindows()
import pupil_labs.neon_recording as nr # noqa: E402
class EventTracker:
def __init__(self, stream):
self.idx = 0
self.itr = iter(stream)
self.next_event = next(self.itr)
def in_event(self, ts):
if self.next_event is None:
return False
return (
self.next_event.start_timestamp_ns < ts < self.next_event.end_timestamp_ns
)
def step_to(self, ts):
try:
while self.next_event is not None and self.next_event.end_timestamp_ns < ts:
self.next_event = next(self.itr)
self.idx += 1
except StopIteration:
self.next_event = None
return self.in_event(ts)
def write_text(image, text, x, y):
return cv2.putText(
image,
text,
(x, y),
cv2.FONT_HERSHEY_SIMPLEX,
1,
(0, 0, 255),
2,
cv2.LINE_AA,
)
def make_overlaid_video(recording_dir, output_video_path, fps=30):
recording = nr.open(recording_dir)
video_writer = cv2.VideoWriter(
str(output_video_path),
cv2.VideoWriter_fourcc(*"MJPG"),
fps,
(recording.eye.width, recording.eye.height),
)
output_timestamps = np.arange(
recording.eye.ts[0], recording.eye.ts[-1], int(1e9 / fps)
)
eye_frames = recording.eye.sample(output_timestamps)
fixations_only = recording.fixations[recording.fixations["event_type"] == 1]
event_trackers = {
"Fixation": EventTracker(fixations_only),
"Blink": EventTracker(recording.blinks),
}
for frame in tqdm(eye_frames):
frame_pixels = frame.bgr
text_y = 0
for stream, tracker in event_trackers.items():
text_y += 40
if tracker.step_to(frame.ts):
frame_pixels = write_text(
frame_pixels, f"{stream} {tracker.idx + 1}", 0, text_y
)
video_writer.write(frame_pixels)
cv2.imshow("Frame", frame_pixels)
cv2.pollKey()
video_writer.release()
if __name__ == "__main__":
make_overlaid_video(sys.argv[1], "blinks-and-fixations.mp4")
Data Access¶
import sys
from collections.abc import Mapping
from datetime import datetime
import numpy as np
import pupil_labs.neon_recording as nr
from pupil_labs.neon_recording.stream.stream import Stream
if len(sys.argv) < 2:
print("Usage:")
print("python basic_usage.py path/to/recording/folder")
# Open a recording
recording = nr.open(sys.argv[1])
def pretty_format(mapping: Mapping):
output = []
pad = " "
keys = mapping.keys()
n = max(len(key) for key in keys)
for k, v in mapping.items():
v_repr_lines = str(v).splitlines()
output.append(f"{pad}{k:>{n}}: {v_repr_lines[0]}")
if len(v_repr_lines) > 1:
output.extend(f"{pad + ' '}{n * ' '}{line}" for line in v_repr_lines[1:])
return "\n".join(output)
print("Basic Recording Info:")
print_data = {
"Recording ID": recording.id,
"Start time (ns since unix epoch)": f"{recording.start_ts}",
"Start time (datetime)": f"{datetime.fromtimestamp(recording.start_ts / 1e9)}",
"Duration (nanoseconds)": f"{recording.duration}",
"Duration (seconds)": f"{recording.duration / 1e9}",
"Wearer": f"{recording.wearer['name']} ({recording.wearer['uuid']})",
"Device serial": recording.device_serial,
"App version": recording.info["app_version"],
"Data format": recording.info["data_format_version"],
"Gaze Offset": recording.info["gaze_offset"],
}
print(pretty_format(print_data))
streams: list[Stream] = [
recording.gaze,
recording.imu,
recording.eye_state,
recording.blinks,
recording.fixations,
recording.worn,
recording.eye,
recording.scene,
recording.audio,
]
print()
print("Recording streams:")
print(
pretty_format({
f"{stream.name} ({len(stream)} samples)": "\n" + pretty_format(stream[0])
for stream in streams
})
)
# Data can be converted to numpy or dataframes, however multi column properties
# like .xy will not be included
print()
print("Gaze data as numpy array:")
gaze_np = recording.gaze.data
print()
print(gaze_np)
print()
print("Gaze data as pandas dataframe:")
gaze_df = recording.gaze.pd
print()
print(gaze_df)
print()
print("Getting data from a stream:")
print()
print("Gaze data", recording.gaze)
# GazeArray([(1741948698620648018, 966.3677 , 439.58817),
# (1741948698630654018, 965.9669 , 441.60403),
# (1741948698635648018, 964.2665 , 442.4974 ), ...,
# (1741948717448190018, 757.85815, 852.34644),
# (1741948717453190018, 766.53174, 857.3709 ),
# (1741948717458190018, 730.93604, 851.53723)],
# dtype=[('ts', '<i8'), ('x', '<f4'), ('y', '<f4')])
print()
print("Gaze ts via prop", recording.gaze.ts)
print("Gaze ts via key", recording.gaze["ts"])
# array([1741948698620648018, 1741948698630654018, 1741948698635648018, ...,
# 1741948717448190018, 1741948717453190018, 1741948717458190018])
print()
print("Gaze x coords via prop", recording.gaze.x)
print("Gaze x coords via key", recording.gaze["x"])
# array([966.3677 , 965.9669 , 964.2665 , ..., 757.85815, 766.53174,
# 730.93604], dtype=float32)
print()
print("Gaze y coords via prop", recording.gaze.y)
print("Gaze y coords via key", recording.gaze["y"])
# array([439.58817, 441.60403, 442.4974 , ..., 852.34644, 857.3709 ,
# 851.53723], dtype=float32)
print()
print("Gaze xy coords", recording.gaze.xy)
print("Gaze xy coords", recording.gaze[["x", "y"]])
# array([[966.3677 , 439.58817],
# ...,
# [730.93604, 851.53723]], dtype=float32)
print()
print("Gaze ts and x and y", recording.gaze[["ts", "x", "y"]])
# array([[1.74194870e+18, 9.66367676e+02, 4.39588165e+02],
# ...,
# [1.74194872e+18, 7.30936035e+02, 8.51537231e+02]])
print()
print("Sampling data:")
print()
print("Get closest gaze for scene frames")
closest_gaze_to_scene = recording.gaze.sample(recording.scene.ts)
print(closest_gaze_to_scene)
print(
"closest_gaze_to_scene_times",
(closest_gaze_to_scene.ts - recording.start_ts) / 1e9,
)
print()
print("Get closest before gaze for scene frames")
closest_gaze_before_scene = recording.gaze.sample(recording.scene.ts, method="before")
print(closest_gaze_before_scene)
print(
"closest_gaze_before_scene_times",
(closest_gaze_before_scene.ts - recording.start_ts) / 1e9,
)
print()
print("Sampled data can be resampled")
print()
print("Closest gaze sampled at 1 fps")
closest_gaze_to_scene_at_one_fps = closest_gaze_before_scene.sample(
np.arange(closest_gaze_to_scene.ts[0], closest_gaze_to_scene.ts[-1], 1e9 / 1)
)
print(closest_gaze_to_scene_at_one_fps)
print(
"closest_gaze_to_scene_at_one_fps_times",
(closest_gaze_to_scene_at_one_fps.ts - recording.start_ts) / 1e9,
)
Eye Overlay¶
import sys
import cv2
import numpy as np
from tqdm import tqdm
# Workaround for https://github.com/opencv/opencv/issues/21952
cv2.imshow("cv/av bug", np.zeros(1))
cv2.destroyAllWindows()
import pupil_labs.neon_recording as nr # noqa: E402, I001
from pupil_labs.neon_recording.stream.av_stream.video_stream import GrayFrame # noqa: E402
def overlay_image(img, img_overlay, x, y):
"""Overlay `img_overlay` onto `img` at (x, y)."""
# Image ranges
y1, y2 = max(0, y), min(img.shape[0], y + img_overlay.shape[0])
x1, x2 = max(0, x), min(img.shape[1], x + img_overlay.shape[1])
# Overlay ranges
y1o, y2o = max(0, -y), min(img_overlay.shape[0], img.shape[0] - y)
x1o, x2o = max(0, -x), min(img_overlay.shape[1], img.shape[1] - x)
if y1 >= y2 or x1 >= x2 or y1o >= y2o or x1o >= x2o:
return
img_crop = img[y1:y2, x1:x2]
img_overlay_crop = img_overlay[y1o:y2o, x1o:x2o]
img_crop[:] = img_overlay_crop
def make_overlaid_video(recording_dir, output_video_path, fps=None):
recording = nr.open(recording_dir)
video_writer = cv2.VideoWriter(
str(output_video_path),
cv2.VideoWriter_fourcc(*"MJPG"),
fps,
(recording.scene.width, recording.scene.height),
)
if fps is None:
output_timestamps = recording.scene.ts
fps = 30
else:
output_timestamps = np.arange(
recording.scene.ts[0], recording.scene.ts[-1], int(1e9 / fps)
)
combined_data = zip(
output_timestamps,
recording.scene.sample(output_timestamps),
recording.eye.sample(output_timestamps),
)
frame_idx = 0
for ts, scene_frame, eye_frame in tqdm(combined_data, total=len(output_timestamps)):
frame_idx += 1 # noqa: SIM113
if abs(scene_frame.ts - ts) < 2e9 / fps:
# if the video frame timestamp is too far ahead or behind temporally,
# replace it with a gray frame
frame_pixels = scene_frame.bgr
else:
frame_pixels = GrayFrame(scene_frame.width, scene_frame.height).bgr
if abs(eye_frame.ts - ts) < 2e9 / fps:
# if the video frame timestamp is too far ahead or behind temporally,
# replace it with a gray frame
eye_pixels = cv2.cvtColor(eye_frame.gray, cv2.COLOR_GRAY2BGR)
else:
eye_pixels = GrayFrame(eye_frame.width, eye_frame.height).bgr
overlay_image(frame_pixels, eye_pixels, 50, 50)
video_writer.write(frame_pixels)
cv2.imshow("Frame", frame_pixels)
cv2.pollKey()
video_writer.release()
if __name__ == "__main__":
make_overlaid_video(sys.argv[1], "eye-overlay-output-video.avi", None)
Eye State¶
import sys
import cv2
import numpy as np
from tqdm import tqdm
# Workaround for https://github.com/opencv/opencv/issues/21952
cv2.imshow("cv/av bug", np.zeros(1))
cv2.destroyAllWindows()
import pupil_labs.neon_recording as nr # noqa: E402, I001
from pupil_labs.neon_recording.stream.av_stream.video_stream import GrayFrame # noqa: E402
def overlay_image(img, img_overlay, x, y):
"""Overlay `img_overlay` onto `img` at (x, y)."""
# Image ranges
y1, y2 = max(0, y), min(img.shape[0], y + img_overlay.shape[0])
x1, x2 = max(0, x), min(img.shape[1], x + img_overlay.shape[1])
# Overlay ranges
y1o, y2o = max(0, -y), min(img_overlay.shape[0], img.shape[0] - y)
x1o, x2o = max(0, -x), min(img_overlay.shape[1], img.shape[1] - x)
if y1 >= y2 or x1 >= x2 or y1o >= y2o or x1o >= x2o:
return
img_crop = img[y1:y2, x1:x2]
img_overlay_crop = img_overlay[y1o:y2o, x1o:x2o]
img_crop[:] = img_overlay_crop
def plot(img, data, value_range, x_width, color, line_width=2):
for idx in range(1, len(data)):
x_values = [int(idx2 * x_width) for idx2 in [idx - 1, idx]]
y_norms = [
(data[idx2] - value_range[0]) / (value_range[1] - value_range[0])
for idx2 in [idx - 1, idx]
]
y_values = [int(y_norm * img.shape[0]) for y_norm in y_norms]
points = [[*v] for v in zip(x_values, y_values)]
cv2.line(img, points[0], points[1], color, line_width)
def make_eye_state_video(recording_dir, output_video_path):
recording = nr.open(recording_dir)
fps = 200
video_writer = cv2.VideoWriter(
str(output_video_path),
cv2.VideoWriter_fourcc(*"MJPG"),
fps,
(recording.eye.width, recording.eye.height),
)
output_timestamps = np.arange(
recording.eye.ts[0], recording.eye.ts[-1], int(1e9 / fps)
)
eye_video_sampled = recording.eye.sample(output_timestamps)
eye_state_sampled = recording.eye_state.sample(output_timestamps)
combined_data = zip(
output_timestamps,
eye_video_sampled,
eye_state_sampled,
)
plot_metas = {
"optical_axis_left_x": {"color": [0, 0, 255]},
"optical_axis_left_y": {
"color": [0, 255, 0],
},
"optical_axis_left_z": {
"color": [255, 0, 0],
},
}
for plot_name, plot_meta in plot_metas.items():
plot_meta["range"] = (
np.min(recording.eye_state.data[plot_name]),
np.max(recording.eye_state.data[plot_name]),
)
plot_duration_secs = 0.5
plot_point_count = plot_duration_secs * fps
plot_x_width = recording.eye.width / plot_point_count
for ts, eye_frame, _eye_state in tqdm(combined_data, total=len(output_timestamps)):
if abs(eye_frame.ts - ts) < 2e9 / fps:
eye_pixels = cv2.cvtColor(eye_frame.gray, cv2.COLOR_GRAY2BGR)
else:
eye_pixels = GrayFrame(eye_frame.width, eye_frame.height).bgr
for plot_name, plot_meta in plot_metas.items():
min_ts = ts - plot_duration_secs * 1e9
time_frame = (min_ts < eye_state_sampled.ts) & (eye_state_sampled.ts <= ts)
plot_data = eye_state_sampled[time_frame][plot_name]
plot(
eye_pixels,
plot_data,
plot_meta["range"],
plot_x_width,
plot_meta["color"],
)
video_writer.write(eye_pixels)
cv2.imshow("Frame", eye_pixels)
cv2.pollKey()
video_writer.release()
if __name__ == "__main__":
make_eye_state_video(sys.argv[1], "eye-state-output-video.avi")
Find Clap¶
import sys
import numpy as np
import pupil_labs.neon_recording as nr
def find_clap(recording_dir):
recording = nr.open(recording_dir)
max_rms = -1
max_time = -1
for frame in recording.audio:
audio_data = frame.to_ndarray()
rms = np.sqrt(np.mean(audio_data**2))
if rms > max_rms:
max_rms = rms
max_time = frame.ts
rel_time = (max_time - recording.start_ts) / 1e9
print(f"The loudest audio occurs at {max_time} (rel={rel_time}), rms = {max_rms}.")
if __name__ == "__main__":
find_clap(sys.argv[1])
Gaze Overlay¶
import sys
import cv2
import numpy as np
from tqdm import tqdm
# Workaround for https://github.com/opencv/opencv/issues/21952
cv2.imshow("cv/av bug", np.zeros(1))
cv2.destroyAllWindows()
import pupil_labs.neon_recording as nr # noqa: E402, I001
from pupil_labs.neon_recording.stream.av_stream.video_stream import GrayFrame # noqa: E402
def make_overlaid_video(recording_dir, output_video_path, fps=None):
recording = nr.open(recording_dir)
video_writer = cv2.VideoWriter(
str(output_video_path),
cv2.VideoWriter_fourcc(*"MJPG"),
fps,
(recording.scene.width, recording.scene.height),
)
if fps is None:
output_timestamps = recording.scene.ts
fps = 30
else:
output_timestamps = np.arange(
recording.scene.ts[0], recording.scene.ts[-1], 1e9 / fps
)
scene_datas = recording.scene.sample(output_timestamps)
combined_data = zip(
output_timestamps,
scene_datas,
recording.gaze.sample(output_timestamps),
)
for ts, scene_frame, gaze_datum in tqdm(
combined_data, total=len(output_timestamps)
):
if abs(scene_frame.ts - ts) < 2e9 / fps:
frame_pixels = scene_frame.bgr
else:
frame_pixels = GrayFrame(scene_frame.width, scene_frame.height).bgr
if abs(gaze_datum.ts - ts) < 2e9 / fps:
frame_pixels = cv2.circle(
frame_pixels,
(int(gaze_datum.x), int(gaze_datum.y)),
50,
(0, 0, 255),
10,
)
video_writer.write(frame_pixels)
cv2.imshow("Frame", frame_pixels)
cv2.pollKey()
video_writer.release()
if __name__ == "__main__":
make_overlaid_video(sys.argv[1], "gaze-overlay-output-video.avi", 24)
Imu¶
import sys
import cv2
import numpy as np
from scipy.spatial.transform import Rotation
# Workaround for https://github.com/opencv/opencv/issues/21952
cv2.imshow("cv/av bug", np.zeros(1))
cv2.destroyAllWindows()
import pupil_labs.neon_recording as nr # noqa: E402
if len(sys.argv) < 2:
print("Usage:")
print("python imu.py path/to/recording/folder")
# Open a recording
recording = nr.open(sys.argv[1])
# Sample the IMU data at 60Hz
fps = 60
z = recording.gaze[:10]
timestamps = np.arange(recording.imu.ts[0], recording.imu.ts[-1], 1e9 / fps)
imu_data = recording.imu.sample(timestamps)
# Use scipy to convert the quaternions to euler angles
quaternions = np.array([s.quaternion_wxyz for s in imu_data])
rotations = (
Rotation.from_quat(quaternions, scalar_first=True).as_euler(seq="yxz", degrees=True)
% 360
)
# Combine the timestamps and eulers
rotations_with_time = np.column_stack((timestamps, rotations))
timestamped_eulers = np.array(
[tuple(row) for row in rotations_with_time],
dtype=[
("ts", np.int64),
("roll", np.float64),
("pitch", np.float64),
("yaw", np.float64),
],
)
# Display the angles
frame_size = 512
colors = {"pitch": (0, 0, 255), "yaw": (0, 255, 0), "roll": (255, 0, 0)}
for row in timestamped_eulers:
# Create a blank image
frame = np.zeros((frame_size, frame_size, 3), dtype=np.uint8)
# Define the center and radius of the circles
center = [frame_size // 2] * 2
radius = frame.shape[0] // 3
# Calculate the end points for the angles
for field, color in colors.items():
pitch_end = (
int(center[0] + radius * np.cos(np.deg2rad(row[field]))),
int(center[1] - radius * np.sin(np.deg2rad(row[field]))),
)
cv2.line(frame, center, pitch_end, color, 2)
# Write the angle values on the image
cv2.putText(
frame,
f"{field}: {row[field]:.2f}",
(10, 30 + list(colors.keys()).index(field) * 30),
cv2.FONT_HERSHEY_SIMPLEX,
0.7,
color,
2,
)
# Display the image
cv2.imshow("IMU Angles", frame)
if cv2.waitKey(1000 // fps) == 27:
break
cv2.destroyAllWindows()
Worn¶
import sys
import cv2
import numpy as np
from tqdm import tqdm
# Workaround for https://github.com/opencv/opencv/issues/21952
cv2.imshow("cv/av bug", np.zeros(1))
cv2.destroyAllWindows()
from pupil_labs import neon_recording as nr # noqa: E402
def write_text(image, text, x, y):
return cv2.putText(
image,
text,
(x, y),
cv2.FONT_HERSHEY_SIMPLEX,
1,
(0, 0, 255),
2,
cv2.LINE_AA,
)
def make_overlaid_video(recording_dir, output_video_path, fps=30):
recording = nr.open(recording_dir)
video_writer = cv2.VideoWriter(
str(output_video_path),
cv2.VideoWriter_fourcc(*"MJPG"),
fps,
(recording.eye.width, recording.eye.height),
)
output_timestamps = np.arange(
recording.eye.ts[0], recording.eye.ts[-1], int(1e9 / fps)
)
eyes_and_worn = zip(
recording.eye.sample(output_timestamps),
recording.worn.sample(output_timestamps),
)
for frame, worn_record in tqdm(eyes_and_worn, total=len(output_timestamps)):
frame_pixels = frame.bgr
text_y = 40
if worn_record.worn:
frame_pixels = write_text(frame_pixels, "Worn", 0, text_y)
video_writer.write(frame_pixels)
cv2.imshow("Frame", frame_pixels)
cv2.pollKey()
video_writer.release()
if __name__ == "__main__":
make_overlaid_video(sys.argv[1], "worn.mp4")