Skip to content

Connect to a Device

The first step when using the Real-time API is to connect your client to a Companion device, enabling communication between them.

The examples below demonstrate how to connect to one or more devices using our device discovery functions, as well as how to connect directly via a device’s IP address.

Your device and the computer running the client must be connected to the same network, and the Companion app must be running. If no device can be found, please refer to the troubleshooting section.

Using the discover_one_device(), you will get access to the first device it detects.

discover_devices.py
from pupil_labs.realtime_api.simple import discover_devices, discover_one_device

print("Looking for the next best device...\n\t", end="")
print(discover_one_device(max_search_duration_seconds=10.0))

Using the discover_devices(), you can connect to more than one device.

discover_devices.py
from pupil_labs.realtime_api.simple import discover_devices, discover_one_device

print("Starting 10 second search...\n\t", end="")
print(discover_devices(search_duration_seconds=10.0))

There might be instances where devices can't be found due to the network, but you can always specify the IP address and port to simplify the process.

from pupil_labs.realtime_api.simple import Device

# This address is just an example. Find out the actual IP address of your device!
ip = "192.168.1.169"
device = Device(address=ip, port="8080")

Make sure the Companion App is running! If no device can be found, please have a look at the troubleshooting section.

Below you can find a link to the full code example and the API referece for the returned Device object.

Device Information & Automatic Status Updates

Once connected, the Device object provides access to various properties of he Companion device and connected glasses, such as battery level, free storage, and serial numbers.

This class automatically monitors the Companion device in the background and mirrors its state accordingly.

get_status.py
from pupil_labs.realtime_api.simple import discover_one_device

# Look for devices. Returns as soon as it has found the first device.
print("Looking for the next best device...")
device = discover_one_device(max_search_duration_seconds=10)
if device is None:
    print("No device found.")
    raise SystemExit(-1)

# Device status is fetched on initialization and kept up-to-date in the background

print(f"Phone IP address: {device.phone_ip}")
print(f"Phone name: {device.phone_name}")
print(f"Phone unique ID: {device.phone_id}")

print(f"Battery level: {device.battery_level_percent}%")
print(f"Battery state: {device.battery_state}")

print(f"Free storage: {device.memory_num_free_bytes / 1024**3}GB")
print(f"Storage level: {device.memory_state}")

print(f"Connected glasses: SN {device.serial_number_glasses}")
print(f"Connected scene camera: SN {device.serial_number_scene_cam}")

device.close()  # explicitly stop auto-update
Output
Phone IP address: 192.168.1.168
Phone name: OnePlus8
Battery level: 43%
Free storage: 92.4 GB
Serial number of connected glasses: h4gcf

from pupil_labs.realtime_api.simple import discover_one_device

# Look for devices. Returns as soon as it has found the first device.
print("Looking for the next best device...")
device = discover_one_device(max_search_duration_seconds=10)
if device is None:
    print("No device found.")
    raise SystemExit(-1)

scene_camera = device.world_sensor()
connected = False if scene_camera is None else scene_camera.connected
print("Scene camera connected:", connected)

input("(Dis)connect the scene camera and hit enter...")

scene_camera = device.world_sensor()
connected = False if scene_camera is None else scene_camera.connected
print("Scene camera connected:", connected)

device.close()
Output
Looking for the next best device...
Scene camera connected: True
(Dis)connect the scene camera and hit enter...
Scene camera connected: False
Device Reference

Device

Device(address: str, port: int, full_name: str | None = None, dns_name: str | None = None, start_streaming_by_default: bool = False, suppress_decoding_warnings: bool = True)

Bases: DeviceBase

Simple synchronous API for interacting with Pupil Labs devices.

This class provides a simplified, synchronous interface to Pupil Labs devices, wrapping the asynchronous API with a more user-friendly interface.

Important

Use discover_devices instead of initializing the class manually. See the simple_discovery_example example.

Parameters:

  • address (str) –

    IP address of the device.

  • port (int) –

    Port number of the device.

  • full_name (str | None, default: None ) –

    Full service discovery name.

  • dns_name (str | None, default: None ) –

    DNS name of the device.

  • start_streaming_by_default (bool, default: False ) –

    Whether to start streaming automatically.

  • suppress_decoding_warnings (bool, default: True ) –

    Whether to suppress decoding warnings.

Methods:

Attributes:

Source code in src/pupil_labs/realtime_api/simple/device.py
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def __init__(
    self,
    address: str,
    port: int,
    full_name: str | None = None,
    dns_name: str | None = None,
    start_streaming_by_default: bool = False,
    suppress_decoding_warnings: bool = True,
) -> None:
    """Initialize a Device instance.

    Args:
        address: IP address of the device.
        port: Port number of the device.
        full_name: Full service discovery name.
        dns_name: DNS name of the device.
        start_streaming_by_default: Whether to start streaming automatically.
        suppress_decoding_warnings: Whether to suppress decoding warnings.

    """
    super().__init__(
        address,
        port,
        full_name=full_name,
        dns_name=dns_name,
        suppress_decoding_warnings=suppress_decoding_warnings,
    )
    self._status = self._get_status()
    self._start_background_worker(start_streaming_by_default)

    self._errors: list[str] = []

    self.stream_name_start_event_map = {
        SensorName.GAZE.value: self._EVENT.SHOULD_START_GAZE,
        SensorName.WORLD.value: self._EVENT.SHOULD_START_WORLD,
        SensorName.EYES.value: self._EVENT.SHOULD_START_EYES,
        SensorName.IMU.value: self._EVENT.SHOULD_START_IMU,
        SensorName.EYE_EVENTS.value: self._EVENT.SHOULD_START_EYE_EVENTS,
        SensorName.AUDIO.value: self._EVENT.SHOULD_START_AUDIO,
    }

    self.stream_name_stop_event_map = {
        SensorName.GAZE.value: self._EVENT.SHOULD_STOP_GAZE,
        SensorName.WORLD.value: self._EVENT.SHOULD_STOP_WORLD,
        SensorName.EYES.value: self._EVENT.SHOULD_STOP_EYES,
        SensorName.IMU.value: self._EVENT.SHOULD_STOP_IMU,
        SensorName.EYE_EVENTS.value: self._EVENT.SHOULD_STOP_EYE_EVENTS,
        SensorName.AUDIO.value: self._EVENT.SHOULD_STOP_AUDIO,
    }

address instance-attribute

address: str = address

REST API server address.

battery_level_percent property

battery_level_percent: int

Get the battery level of the connected phone in percentage.

battery_state property

battery_state: Literal['OK', 'LOW', 'CRITICAL']

Get the battery state of the connected phone.

Possible values are "OK", "LOW", or "CRITICAL".

dns_name instance-attribute

dns_name: str | None = dns_name

REST API server DNS name, e.g.neon.local / pi.local..

full_name instance-attribute

full_name: str | None = full_name

Full service discovery name.

is_currently_streaming property

is_currently_streaming: bool

Check if data streaming is currently active.

Returns:

  • bool ( bool ) –

    True if streaming is active, False otherwise.

memory_num_free_bytes property

memory_num_free_bytes: int

Get the available memory of the connected phone in bytes.

memory_state property

memory_state: Literal['OK', 'LOW', 'CRITICAL']

Get the memory state of the connected phone.

Possible values are "OK", "LOW", or "CRITICAL".

module_serial property

module_serial: str | Literal['default'] | None

Returns None or "default" if no glasses are connected

Info

Only available on Neon.

phone_id property

phone_id: str

Get the ID of the connected phone.

phone_ip property

phone_ip: str

Get the IP address of the connected phone.

phone_name property

phone_name: str

Get the name of the connected phone.

port instance-attribute

port: int = port

REST API server port.

serial_number_glasses property

serial_number_glasses: str | Literal['default'] | None

Returns None or "default" if no glasses are connected

Info

Only available on Pupil Invisible.

serial_number_scene_cam property

serial_number_scene_cam: str | None

Returns None if no scene camera is connected

Info

Only available on Pupil Invisible.

version_glasses property

version_glasses: str | None

Get the version of the connected glasses.

Returns:

  • str ( str | None ) –

    1 -> Pupil Invisible, 2 -> Neon, or None -> No glasses connected.

api_url

api_url(path: APIPath, protocol: str = 'http', prefix: str = '/api') -> str

Construct a full API URL for the given path.

Parameters:

  • path (APIPath) –

    API path to access.

  • protocol (str, default: 'http' ) –

    Protocol to use (http).

  • prefix (str, default: '/api' ) –

    API URL prefix.

Returns:

  • str

    Complete URL for the API endpoint.

Source code in src/pupil_labs/realtime_api/base.py
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
def api_url(
    self, path: APIPath, protocol: str = "http", prefix: str = "/api"
) -> str:
    """Construct a full API URL for the given path.

    Args:
        path: API path to access.
        protocol: Protocol to use (http).
        prefix: API URL prefix.

    Returns:
        Complete URL for the API endpoint.

    """
    return path.full_address(
        self.address, self.port, protocol=protocol, prefix=prefix
    )

close

close() -> None

Close the device connection and stop all background threads.

This method should be called when the device is no longer needed to free up resources.

Source code in src/pupil_labs/realtime_api/simple/device.py
731
732
733
734
735
736
737
738
739
740
741
def close(self) -> None:
    """Close the device connection and stop all background threads.

    This method should be called when the device is no longer needed
    to free up resources.
    """
    if self._event_manager:
        if self.is_currently_streaming:
            self.streaming_stop()
        self._event_manager.trigger_threadsafe(self._EVENT.SHOULD_WORKER_CLOSE)
        self._auto_update_thread.join()

convert_from classmethod

convert_from(other: T) -> DeviceType

Convert another device instance to this type.

Parameters:

  • other (T) –

    Device instance to convert.

Returns:

Source code in src/pupil_labs/realtime_api/base.py
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
@classmethod
def convert_from(cls: type[DeviceType], other: T) -> DeviceType:
    """Convert another device instance to this type.

    Args:
        other: Device instance to convert.

    Returns:
        Converted device instance.

    """
    return cls(
        other.address,
        other.port,
        full_name=other.full_name,
        dns_name=other.dns_name,
    )

estimate_time_offset

estimate_time_offset(number_of_measurements: int = 100, sleep_between_measurements_seconds: float | None = None) -> TimeEchoEstimates | None

Estimate the time offset between the host device and the client.

This uses the Time Echo protocol to estimate the clock offset between the device and the client.

Parameters:

  • number_of_measurements (int, default: 100 ) –

    Number of measurements to take.

  • sleep_between_measurements_seconds (float | None, default: None ) –

    Optional sleep time between

Returns:

  • TimeEchoEstimates ( TimeEchoEstimates | None ) –

    Statistics for roundtrip durations and time offsets, or None if estimation failed or device doesn't support the protocol.

See Also

:mod:pupil_labs.realtime_api.time_echo for more details.

Source code in src/pupil_labs/realtime_api/simple/device.py
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
def estimate_time_offset(
    self,
    number_of_measurements: int = 100,
    sleep_between_measurements_seconds: float | None = None,
) -> TimeEchoEstimates | None:
    """Estimate the time offset between the host device and the client.

    This uses the Time Echo protocol to estimate the clock offset between
    the device and the client.

    Args:
        number_of_measurements: Number of measurements to take.
        sleep_between_measurements_seconds: Optional sleep time between
        measurements.

    Returns:
        TimeEchoEstimates: Statistics for roundtrip durations and time offsets,
            or None if estimation failed or device doesn't support the protocol.

    See Also:
        :mod:`pupil_labs.realtime_api.time_echo` for more details.

    """
    if self._status.phone.time_echo_port is None:
        logger.warning(
            "You Pupil Invisible Companion app is out-of-date and does not yet "
            "support the Time Echo protocol. Upgrade to version 1.4.28 or newer."
        )
        return None
    estimator = TimeOffsetEstimator(
        self.phone_ip, self._status.phone.time_echo_port
    )
    return asyncio.run(
        estimator.estimate(
            number_of_measurements, sleep_between_measurements_seconds
        )
    )

eye_events_sensor

eye_events_sensor() -> Sensor | None

Get the eye events sensor.

Source code in src/pupil_labs/realtime_api/simple/device.py
222
223
224
def eye_events_sensor(self) -> Sensor | None:
    """Get the eye events sensor."""
    return self._status.direct_eye_events_sensor()

from_discovered_device classmethod

from_discovered_device(device: DiscoveredDeviceInfo) -> DeviceType

Create a device instance from discovery information.

Parameters:

Returns:

Source code in src/pupil_labs/realtime_api/base.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
@classmethod
def from_discovered_device(
    cls: type[DeviceType], device: DiscoveredDeviceInfo
) -> DeviceType:
    """Create a device instance from discovery information.

    Args:
        device: Discovered device information.

    Returns:
        Device instance

    """
    return cls(
        device.addresses[0],
        device.port,
        full_name=device.name,
        dns_name=device.server,
    )

gaze_sensor

gaze_sensor() -> Sensor | None

Get the gaze sensor.

Source code in src/pupil_labs/realtime_api/simple/device.py
218
219
220
def gaze_sensor(self) -> Sensor | None:
    """Get the gaze sensor."""
    return self._status.direct_gaze_sensor()

get_calibration

get_calibration() -> Calibration

Get the current cameras calibration data.

Note that Pupil Invisible and Neon are calibration free systems, this refers to the intrinsincs and extrinsics of the cameras.

Returns:

  • Calibration

    pupil_labs.neon_recording.calib.Calibration: The calibration data.

Raises:

Source code in src/pupil_labs/realtime_api/simple/device.py
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
def get_calibration(self) -> Calibration:
    """Get the current cameras calibration data.

    Note that Pupil Invisible and Neon are calibration free systems, this refers to
    the intrinsincs and extrinsics of the cameras.

    Returns:
        pupil_labs.neon_recording.calib.Calibration: The calibration data.

    Raises:
        DeviceError: If the request fails.

    """

    async def _get_calibration() -> Calibration:
        async with _DeviceAsync.convert_from(self) as control:
            return await control.get_calibration()

    return asyncio.run(_get_calibration())

get_errors

get_errors() -> list[str]

Get a list of errors from the device.

Returns:

  • list[str]

    list[str]: List of error messages.

Source code in src/pupil_labs/realtime_api/simple/device.py
202
203
204
205
206
207
208
209
210
211
212
def get_errors(self) -> list[str]:
    """Get a list of errors from the device.

    Returns:
        list[str]: List of error messages.

    """
    errors = self._errors.copy()
    self._errors.clear()

    return errors

get_template

get_template() -> Template

Get the template currently selected on the device.

Wraps pupil_labs.realtime_api.device.Device.get_template

Returns:

  • Template ( Template ) –

    The currently selected template.

Raises:

Source code in src/pupil_labs/realtime_api/simple/device.py
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
def get_template(self) -> Template:
    """Get the template currently selected on the device.

    Wraps [pupil_labs.realtime_api.device.Device.get_template][]

    Returns:
        Template: The currently selected template.

    Raises:
        DeviceError: If the template can't be fetched.

    """

    async def _get_template() -> Template:
        async with _DeviceAsync.convert_from(self) as control:
            return await control.get_template()

    return asyncio.run(_get_template())

get_template_data

get_template_data(template_format: TemplateDataFormat = 'simple') -> Any

Get the template data entered on the device.

Wraps pupil_labs.realtime_api.device.Device.get_template_data

Parameters:

  • template_format (TemplateDataFormat, default: 'simple' ) –

    Format of the returned data. "api" returns the data as is from the api e.g., {"item_uuid": ["42"]} "simple" returns the data parsed e.g., {"item_uuid": 42}

Returns:

  • Any

    The result from the GET request.

Raises:

  • DeviceError

    If the template's data could not be fetched.

Source code in src/pupil_labs/realtime_api/simple/device.py
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
def get_template_data(self, template_format: TemplateDataFormat = "simple") -> Any:
    """Get the template data entered on the device.

    Wraps [pupil_labs.realtime_api.device.Device.get_template_data][]

    Args:
        template_format: Format of the returned data.
            "api" returns the data as is from the api e.g., {"item_uuid": ["42"]}
            "simple" returns the data parsed e.g., {"item_uuid": 42}

    Returns:
        The result from the GET request.

    Raises:
        DeviceError: If the template's data could not be fetched.

    """

    async def _get_template_data() -> Any:
        async with _DeviceAsync.convert_from(self) as control:
            return await control.get_template_data(template_format=template_format)

    return asyncio.run(_get_template_data())

post_template_data

post_template_data(template_data: dict[str, list[str]], template_format: TemplateDataFormat = 'simple') -> Any

Send the data to the currently selected template.

Wraps pupil_labs.realtime_api.device.Device.post_template_data

Parameters:

  • template_data (dict[str, list[str]]) –

    The template data to send.

  • template_format (TemplateDataFormat, default: 'simple' ) –

    Format of the input data. "api" accepts the data as in realtime api format e.g., {"item_uuid": ["42"]} "simple" accepts the data in parsed format e.g., {"item_uuid": 42}

Returns:

  • Any

    The result from the POST request.

Raises:

Source code in src/pupil_labs/realtime_api/simple/device.py
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
def post_template_data(
    self,
    template_data: dict[str, list[str]],
    template_format: TemplateDataFormat = "simple",
) -> Any:
    """Send the data to the currently selected template.

    Wraps [pupil_labs.realtime_api.device.Device.post_template_data][]

    Args:
        template_data: The template data to send.
        template_format: Format of the input data.
            "api" accepts the data as in realtime api format e.g.,
            {"item_uuid": ["42"]}
            "simple" accepts the data in parsed format e.g., {"item_uuid": 42}

    Returns:
        The result from the POST request.

    Raises:
        DeviceError: If the data can not be sent.
        ValueError: If invalid data type.

    """

    async def _post_template_data() -> Any:
        async with _DeviceAsync.convert_from(self) as control:
            return await control.post_template_data(
                template_data, template_format=template_format
            )

    return asyncio.run(_post_template_data())

receive_audio_frame

receive_audio_frame(timeout_seconds: float | None = None) -> AudioFrame | None

Receive an Audio frame.

Parameters:

  • timeout_seconds (float | None, default: None ) –

    Maximum time to wait for a new frame. If None, wait indefinitely.

Returns:

  • AudioFrame | None

    AudioFrame or None: The received audio frame, or None if timeout was

  • AudioFrame | None

    reached.

Source code in src/pupil_labs/realtime_api/simple/device.py
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
def receive_audio_frame(
    self, timeout_seconds: float | None = None
) -> AudioFrame | None:
    """Receive an Audio frame.

    Args:
        timeout_seconds: Maximum time to wait for a new frame.
            If None, wait indefinitely.

    Returns:
        AudioFrame or None: The received audio frame, or None if timeout was
        reached.

    """
    return cast(
        AudioFrame,
        self._receive_item(SensorName.AUDIO.value, timeout_seconds),
    )

receive_eye_events

receive_eye_events(timeout_seconds: float | None = None) -> FixationEventData | BlinkEventData | FixationOnsetEventData | None

Receive an eye event.

Parameters:

  • timeout_seconds (float | None, default: None ) –

    Maximum time to wait for a new eye event. If None, wait indefinitely.

Returns:

Source code in src/pupil_labs/realtime_api/simple/device.py
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
def receive_eye_events(
    self, timeout_seconds: float | None = None
) -> FixationEventData | BlinkEventData | FixationOnsetEventData | None:
    """Receive an eye event.

    Args:
        timeout_seconds: Maximum time to wait for a new eye event.
            If None, wait indefinitely.

    Returns:
        FixationEventData | BlinkEventData | FixationOnsetEventData or None:
        The received eye event, or None if timeout was reached.

    """
    return cast(
        FixationEventData | BlinkEventData | FixationOnsetEventData,
        self._receive_item(SensorName.EYE_EVENTS.value, timeout_seconds),
    )

receive_eyes_video_frame

receive_eyes_video_frame(timeout_seconds: float | None = None) -> SimpleVideoFrame | None

Receive an eye camera video frame.

Parameters:

  • timeout_seconds (float | None, default: None ) –

    Maximum time to wait for a new frame. If None, wait indefinitely.

Returns:

Source code in src/pupil_labs/realtime_api/simple/device.py
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
def receive_eyes_video_frame(
    self, timeout_seconds: float | None = None
) -> SimpleVideoFrame | None:
    """Receive an eye camera video frame.

    Args:
        timeout_seconds: Maximum time to wait for a new frame.
            If None, wait indefinitely.

    Returns:
        SimpleVideoFrame or None: The received video frame, or None if timeout
        was reached.

    """
    return cast(
        SimpleVideoFrame,
        self._receive_item(SensorName.EYES.value, timeout_seconds),
    )

receive_gaze_datum

receive_gaze_datum(timeout_seconds: float | None = None) -> GazeDataType | None

Receive a gaze data point.

Parameters:

  • timeout_seconds (float | None, default: None ) –

    Maximum time to wait for a new gaze datum. If None, wait indefinitely.

Returns:

  • GazeDataType | None

    GazeDataType or None: The received gaze data, or None if timeout was

  • GazeDataType | None

    reached.

Source code in src/pupil_labs/realtime_api/simple/device.py
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
def receive_gaze_datum(
    self, timeout_seconds: float | None = None
) -> GazeDataType | None:
    """Receive a gaze data point.

    Args:
        timeout_seconds: Maximum time to wait for a new gaze datum.
            If None, wait indefinitely.

    Returns:
        GazeDataType or None: The received gaze data, or None if timeout was
        reached.

    """
    return cast(
        GazeDataType, self._receive_item(SensorName.GAZE.value, timeout_seconds)
    )

receive_imu_datum

receive_imu_datum(timeout_seconds: float | None = None) -> IMUData | None

Receive an IMU data point.

Parameters:

  • timeout_seconds (float | None, default: None ) –

    Maximum time to wait for a new IMU datum. If None, wait indefinitely.

Returns:

  • IMUData | None

    IMUData or None: The received IMU data, or None if timeout was reached.

Source code in src/pupil_labs/realtime_api/simple/device.py
466
467
468
469
470
471
472
473
474
475
476
477
def receive_imu_datum(self, timeout_seconds: float | None = None) -> IMUData | None:
    """Receive an IMU data point.

    Args:
        timeout_seconds: Maximum time to wait for a new IMU datum.
            If None, wait indefinitely.

    Returns:
        IMUData or None: The received IMU data, or None if timeout was reached.

    """
    return cast(IMUData, self._receive_item(SensorName.IMU.value, timeout_seconds))

receive_matched_scene_and_eyes_video_frames_and_gaze

receive_matched_scene_and_eyes_video_frames_and_gaze(timeout_seconds: float | None = None) -> MatchedGazeEyesSceneItem | None

Receive a matched triplet of scene video frame, eye video frame, and gaze.

Parameters:

  • timeout_seconds (float | None, default: None ) –

    Maximum time to wait for a matched triplet. If None, wait indefinitely.

Returns:

Source code in src/pupil_labs/realtime_api/simple/device.py
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
def receive_matched_scene_and_eyes_video_frames_and_gaze(
    self, timeout_seconds: float | None = None
) -> MatchedGazeEyesSceneItem | None:
    """Receive a matched triplet of scene video frame, eye video frame, and gaze.

    Args:
        timeout_seconds: Maximum time to wait for a matched triplet.
            If None, wait indefinitely.

    Returns:
        MatchedGazeEyesSceneItem or None: The matched triplet, or None if timeout
        was reached.

    """
    return cast(
        MatchedGazeEyesSceneItem,
        self._receive_item(MATCHED_GAZE_EYES_LABEL, timeout_seconds),
    )

receive_matched_scene_video_frame_and_audio

receive_matched_scene_video_frame_and_audio(timeout_seconds: float | None = None) -> MatchedSceneAudioItem | None

Receive a matched pair of scene video frame and audio data.

Parameters:

  • timeout_seconds (float | None, default: None ) –

    Maximum time to wait for a matched pair. If None, wait indefinitely.

Returns:

  • MatchedSceneAudioItem | None

    MatchedSceneAudioItem or None: The matched pair, or None if timeout was

  • MatchedSceneAudioItem | None

    reached.

Source code in src/pupil_labs/realtime_api/simple/device.py
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
def receive_matched_scene_video_frame_and_audio(
    self, timeout_seconds: float | None = None
) -> MatchedSceneAudioItem | None:
    """Receive a matched pair of scene video frame and audio data.

    Args:
        timeout_seconds: Maximum time to wait for a matched pair.
            If None, wait indefinitely.

    Returns:
        MatchedSceneAudioItem or None: The matched pair, or None if timeout was
        reached.

    """
    return cast(
        MatchedSceneAudioItem,
        self._receive_item(MATCHED_SCENE_AUDIO_LABEL, timeout_seconds),
    )

receive_matched_scene_video_frame_and_gaze

receive_matched_scene_video_frame_and_gaze(timeout_seconds: float | None = None) -> MatchedItem | None

Receive a matched pair of scene video frame and gaze data.

Parameters:

  • timeout_seconds (float | None, default: None ) –

    Maximum time to wait for a matched pair. If None, wait indefinitely.

Returns:

  • MatchedItem | None

    MatchedItem or None: The matched pair, or None if timeout was reached.

Source code in src/pupil_labs/realtime_api/simple/device.py
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
def receive_matched_scene_video_frame_and_gaze(
    self, timeout_seconds: float | None = None
) -> MatchedItem | None:
    """Receive a matched pair of scene video frame and gaze data.

    Args:
        timeout_seconds: Maximum time to wait for a matched pair.
            If None, wait indefinitely.

    Returns:
        MatchedItem or None: The matched pair, or None if timeout was reached.

    """
    return cast(
        MatchedItem, self._receive_item(MATCHED_ITEM_LABEL, timeout_seconds)
    )

receive_scene_video_frame

receive_scene_video_frame(timeout_seconds: float | None = None) -> SimpleVideoFrame | None

Receive a scene (world) video frame.

Parameters:

  • timeout_seconds (float | None, default: None ) –

    Maximum time to wait for a new frame. If None, wait indefinitely.

Returns:

Source code in src/pupil_labs/realtime_api/simple/device.py
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
def receive_scene_video_frame(
    self, timeout_seconds: float | None = None
) -> SimpleVideoFrame | None:
    """Receive a scene (world) video frame.

    Args:
        timeout_seconds: Maximum time to wait for a new frame.
            If None, wait indefinitely.

    Returns:
        SimpleVideoFrame or None: The received video frame, or None if timeout was
        reached.

    """
    return cast(
        SimpleVideoFrame,
        self._receive_item(SensorName.WORLD.value, timeout_seconds),
    )

recording_cancel

recording_cancel() -> None

Cancel the current recording without saving it.

Wraps pupil_labs.realtime_api.device.Device.recording_cancel

Raises:

  • DeviceError

    If the recording could not be cancelled. Possible reasons

  • include
    • Recording not running
Source code in src/pupil_labs/realtime_api/simple/device.py
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
def recording_cancel(self) -> None:
    """Cancel the current recording without saving it.

    Wraps [pupil_labs.realtime_api.device.Device.recording_cancel][]

    Raises:
        DeviceError: If the recording could not be cancelled. Possible reasons
        include:
            - Recording not running

    """

    async def _cancel_recording() -> None:
        async with _DeviceAsync.convert_from(self) as control:
            return await control.recording_cancel()

    return asyncio.run(_cancel_recording())

recording_start

recording_start() -> str

Start a recording on the device.

Wraps pupil_labs.realtime_api.device.Device.recording_start

Returns:

  • str ( str ) –

    ID of the started recording.

Raises:

  • DeviceError

    If the recording could not be started. Possible reasons

  • include
    • Recording already running
    • Template has required fields
    • Low battery
    • Low storage
    • No wearer selected
    • No workspace selected
    • Setup bottom sheets not completed
Source code in src/pupil_labs/realtime_api/simple/device.py
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
def recording_start(self) -> str:
    """Start a recording on the device.

    Wraps [pupil_labs.realtime_api.device.Device.recording_start][]

    Returns:
        str: ID of the started recording.

    Raises:
        DeviceError: If the recording could not be started. Possible reasons
        include:
            - Recording already running
            - Template has required fields
            - Low battery
            - Low storage
            - No wearer selected
            - No workspace selected
            - Setup bottom sheets not completed

    """

    async def _start_recording() -> str:
        async with _DeviceAsync.convert_from(self) as control:
            return await control.recording_start()

    return asyncio.run(_start_recording())

recording_stop_and_save

recording_stop_and_save() -> None

Stop and save the current recording.

Wraps pupil_labs.realtime_api.device.Device.recording_stop_and_save

Raises:

  • DeviceError

    If the recording could not be stopped. Possible reasons

  • include
    • Recording not running
    • Template has required fields
Source code in src/pupil_labs/realtime_api/simple/device.py
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
def recording_stop_and_save(self) -> None:
    """Stop and save the current recording.

    Wraps [pupil_labs.realtime_api.device.Device.recording_stop_and_save][]

    Raises:
        DeviceError: If the recording could not be stopped. Possible reasons
        include:
            - Recording not running
            - Template has required fields

    """

    async def _stop_and_save_recording() -> None:
        async with _DeviceAsync.convert_from(self) as control:
            return await control.recording_stop_and_save()

    return asyncio.run(_stop_and_save_recording())

send_event

send_event(event_name: str, event_timestamp_unix_ns: int | None = None) -> Event

Send an event to the device.

Parameters:

  • event_name (str) –

    Name of the event.

  • event_timestamp_unix_ns (int | None, default: None ) –

    Optional timestamp in unix nanoseconds. If None, the current time will be used.

Returns:

  • Event ( Event ) –

    The created event.

Raises:

Source code in src/pupil_labs/realtime_api/simple/device.py
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
def send_event(
    self, event_name: str, event_timestamp_unix_ns: int | None = None
) -> Event:
    """Send an event to the device.

    Args:
        event_name: Name of the event.
        event_timestamp_unix_ns: Optional timestamp in unix nanoseconds.
            If None, the current time will be used.

    Returns:
        Event: The created event.

    Raises:
        DeviceError: If sending the event fails.

    """

    async def _send_event() -> Event:
        async with _DeviceAsync.convert_from(self) as control:
            return await control.send_event(event_name, event_timestamp_unix_ns)

    return asyncio.run(_send_event())

start_stream_if_needed

start_stream_if_needed(sensor: str) -> None

Start streaming if not already streaming.

Parameters:

  • sensor (str) –

    Sensor name to check.

Source code in src/pupil_labs/realtime_api/simple/device.py
612
613
614
615
616
617
618
619
620
621
def start_stream_if_needed(self, sensor: str) -> None:
    """Start streaming if not already streaming.

    Args:
        sensor: Sensor name to check.

    """
    if not self._is_streaming_flags[sensor].is_set():
        logger.debug("receive_* called without being streaming")
        self.streaming_start(sensor)

streaming_start

streaming_start(stream_name: str) -> None

Start streaming data from the specified sensor.

Parameters:

  • stream_name (str) –

    Name of the sensor to start streaming from. It can be one of

  • py:attr:SensorName values or None, which will start all streams.

Raises:

  • ValueError

    If the stream name is not recognized.

Source code in src/pupil_labs/realtime_api/simple/device.py
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
def streaming_start(self, stream_name: str) -> None:
    """Start streaming data from the specified sensor.

    Args:
        stream_name: Name of the sensor to start streaming from. It can be one of
        :py:attr:`SensorName` values or None, which will start all streams.

    Raises:
        ValueError: If the stream name is not recognized.

    """
    if stream_name is None:
        for event in (
            self._EVENT.SHOULD_START_GAZE,
            self._EVENT.SHOULD_START_WORLD,
            self._EVENT.SHOULD_START_EYES,
            self._EVENT.SHOULD_START_IMU,
            self._EVENT.SHOULD_START_EYE_EVENTS,
            self._EVENT.SHOULD_START_AUDIO,
        ):
            self._streaming_trigger_action(event)
        return

    event = self.stream_name_start_event_map[stream_name]
    self._streaming_trigger_action(event)

streaming_stop

streaming_stop(stream_name: str | None = None) -> None

Stop streaming data from the specified sensor.

Parameters:

  • stream_name (str | None, default: None ) –

    Name of the sensor to start streaming from. It can be one of

  • py:attr:SensorName values or None, which will stop all streams.

Raises:

  • ValueError

    If the stream name is not recognized.

Source code in src/pupil_labs/realtime_api/simple/device.py
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
def streaming_stop(self, stream_name: str | None = None) -> None:
    """Stop streaming data from the specified sensor.

    Args:
        stream_name: Name of the sensor to start streaming from. It can be one of
        :py:attr:`SensorName` values or None, which will stop all streams.

    Raises:
        ValueError: If the stream name is not recognized.

    """
    if stream_name is None:
        for event in (
            self._EVENT.SHOULD_STOP_GAZE,
            self._EVENT.SHOULD_STOP_WORLD,
            self._EVENT.SHOULD_STOP_EYES,
            self._EVENT.SHOULD_STOP_IMU,
            self._EVENT.SHOULD_STOP_EYE_EVENTS,
            self._EVENT.SHOULD_STOP_AUDIO,
        ):
            self._streaming_trigger_action(event)
        return

    event = self.stream_name_stop_event_map[stream_name]
    self._streaming_trigger_action(event)

world_sensor

world_sensor() -> Sensor | None

Get the world sensor.

Source code in src/pupil_labs/realtime_api/simple/device.py
214
215
216
def world_sensor(self) -> Sensor | None:
    """Get the world sensor."""
    return self._status.direct_world_sensor()

Full Code Examples

Check the whole example code here
discover_devices.py
1
2
3
4
5
6
7
8
9
from pupil_labs.realtime_api.simple import discover_devices, discover_one_device

# Look for devices. Returns as soon as it has found the first device.
print("Looking for the next best device...\n\t", end="")
print(discover_one_device(max_search_duration_seconds=10.0))

# List all devices that could be found within 10 seconds
print("Starting 10 second search...\n\t", end="")
print(discover_devices(search_duration_seconds=10.0))
get_status.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from pupil_labs.realtime_api.simple import discover_one_device

# Look for devices. Returns as soon as it has found the first device.
print("Looking for the next best device...")
device = discover_one_device(max_search_duration_seconds=10)
if device is None:
    print("No device found.")
    raise SystemExit(-1)

# Device status is fetched on initialization and kept up-to-date in the background

print(f"Phone IP address: {device.phone_ip}")
print(f"Phone name: {device.phone_name}")
print(f"Phone unique ID: {device.phone_id}")

print(f"Battery level: {device.battery_level_percent}%")
print(f"Battery state: {device.battery_state}")

print(f"Free storage: {device.memory_num_free_bytes / 1024**3}GB")
print(f"Storage level: {device.memory_state}")

print(f"Connected glasses: SN {device.serial_number_glasses}")
print(f"Connected scene camera: SN {device.serial_number_scene_cam}")

device.close()  # explicitly stop auto-update
status_auto_update.py
from pupil_labs.realtime_api.simple import discover_one_device

# Look for devices. Returns as soon as it has found the first device.
print("Looking for the next best device...")
device = discover_one_device(max_search_duration_seconds=10)
if device is None:
    print("No device found.")
    raise SystemExit(-1)

scene_camera = device.world_sensor()
connected = False if scene_camera is None else scene_camera.connected
print("Scene camera connected:", connected)

input("(Dis)connect the scene camera and hit enter...")

scene_camera = device.world_sensor()
connected = False if scene_camera is None else scene_camera.connected
print("Scene camera connected:", connected)

device.close()