Skip to content

Stream Audio

Neon +2.8.31 +1.7

Using the receive_audio_frames method, you can receive audio frames, which you can use it to play them live, record, or perform real-time analysis like speech-to-text or sound analysis.

The data returned is an instance of AudioFrame.

AudioFrame(
    av_frame=<av.AudioFrame pts=None, 1024 samples at 8000Hz, mono, fltp at 0x1189e0ac0>,
    timestamp_unix_seconds=1758211800.8221593,
    resampler=<av.audio.resampler.AudioResampler object at 0x1189e04c0>
)
AudioFrame

AudioFrame

Bases: NamedTuple

An audio frame with timestamp information.

This class represents an audio frame from the audio stream with associated timestamp information. The Class inherits AudioFrame from py.av library.

Note

Audio in Neon is streamed as fltp mono 8K, this class takes the decoded packets as av.AudioFrames.

Methods:

Attributes:

av_frame instance-attribute

av_frame: AudioFrame

The audio frame.

datetime property

datetime: datetime

Get timestamp as a datetime object.

resampler instance-attribute

resampler: AudioResampler

A reference to a shared AudioResampler instance.

timestamp_unix_ns property

timestamp_unix_ns: int

Get timestamp in nanoseconds since Unix epoch.

timestamp_unix_seconds instance-attribute

timestamp_unix_seconds: float

Timestamp in seconds since Unix epoch.

to_ndarray

to_ndarray(*args: Any, **kwargs: Any) -> NDArray

Convert the audio frame to a NumPy array.

Source code in src/pupil_labs/realtime_api/streaming/audio.py
58
59
60
def to_ndarray(self, *args: Any, **kwargs: Any) -> npt.NDArray:
    """Convert the audio frame to a NumPy array."""
    return self.av_frame.to_ndarray(*args, **kwargs)

to_resampled_ndarray

to_resampled_ndarray(*args: Any, **kwargs: Any) -> Iterator[NDArray]

Convert the audio frame to a resampled s16 NumPy array

Source code in src/pupil_labs/realtime_api/streaming/audio.py
62
63
64
65
def to_resampled_ndarray(self, *args: Any, **kwargs: Any) -> Iterator[npt.NDArray]:
    """Convert the audio frame to a resampled s16 NumPy array"""
    for frame in self.resampler.resample(self.av_frame):
        yield frame.to_ndarray(*args, **kwargs)

By default, the audio signal is streamed in mono using the AAC codec. The stream is downsampled from the original 48 kHz source to a sampling rate of 8 kHz to save bandwidth, and uses a 32-bit floating-point planar (fltp) format.

The audio stream does not have it's own RTSP stream but is multiplexed with video, so in this client, we create a virtual sensor component using the Scene Camera stream.

Working with Audio Data

You can easily receive audio frames and convert them to NumPy arrays using the to_ndarray method and feed these to any audio library of your choice like librosa for analysis.

Check the whole example code here
stream_audio.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import asyncio
import contextlib
import logging

from pupil_labs.realtime_api import (
    Device,
    Network,
    receive_audio_frames,
)

logging.basicConfig(level=logging.INFO)


async def main():
    try:
        async with Network() as network:
            dev_info = await network.wait_for_new_device(timeout_seconds=5)
            if dev_info is None:
                print("No device could be found! Aborting.")
                return

            async with Device.from_discovered_device(dev_info) as device:
                print(f"Connecting to {device}...")
                status = await device.get_status()

                sensor_audio = status.direct_audio_sensor()
                if not sensor_audio.connected:
                    print(f"Audio sensor is not connected to {device}. Aborting.")
                    return

                audio_generator = receive_audio_frames(sensor_audio.url, run_loop=True)

                first_frame = await anext(audio_generator)
                sample_rate = first_frame.av_frame.sample_rate
                channels = first_frame.av_frame.layout.nb_channels
                print(
                    f"Audio stream parameters: "
                    f"Sample Rate: {sample_rate}, "
                    f"Channels: {channels}, "
                    f"Layout: {first_frame.av_frame.layout.name}"
                )
                async for audio_frame in audio_generator:
                    print(audio_frame)

    except asyncio.CancelledError:
        logging.info("Main task cancelled.")
    except KeyboardInterrupt:
        logging.info("KeyboardInterrupt received, initiating shutdown.")
    finally:
        logging.info("Cleaning up resources...")


if __name__ == "__main__":
    # Use contextlib.suppress to avoid a traceback on KeyboardInterrupt
    with contextlib.suppress(KeyboardInterrupt):
        asyncio.run(main())

For completeness, we have also included an example that shows how plot audio using librosa and rich on the terminal.

Audio Wave
Check the whole example code here
stream_audio_spectrum.py
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
import asyncio
import contextlib
import logging
import os

import librosa
import numpy as np
from rich.align import Align
from rich.live import Live
from rich.panel import Panel
from rich.text import Text

from pupil_labs.realtime_api import (
    Device,
    Network,
    receive_audio_frames,
)


class TerminalAudioBar:
    def __init__(self, target_freq, color, max_height=24, min_level=0, max_level=1.0):
        self.target_freq = target_freq
        self.color = color
        self.min_height = 1
        self.max_height = max_height
        self.height = self.min_height
        self.min_level = min_level
        self.max_level = max_level
        level_range = self.max_level - self.min_level
        height_range = self.max_height - self.min_height
        self.__level_height_ratio = height_range / level_range if level_range else 1.0

    def update(self, dt, level):
        desired_height = self.min_height + (level * self.__level_height_ratio)
        speed = (desired_height - self.height) / 0.1
        self.height += speed * dt
        self.height = np.clip(self.height, self.min_height, self.max_height)


def generate_linear_spectrum(audio_chunk, sample_rate, bars, dt):
    """Render a linear, vertically symmetric bar spectrum."""
    audio_chunk = np.squeeze(audio_chunk).astype(np.float32)
    if audio_chunk.size == 0:
        return ""

    stft_data = librosa.stft(audio_chunk)
    stft_magnitude = np.abs(stft_data)
    n_fft = (stft_magnitude.shape[0] - 1) * 2
    freqs = librosa.fft_frequencies(sr=sample_rate, n_fft=n_fft)
    freqs = freqs[: stft_magnitude.shape[0]]

    for bar in bars:
        freq_index = np.argmin(np.abs(freqs - bar.target_freq))
        level = np.mean(stft_magnitude[freq_index, :])
        bar.update(dt, level)

    term_size_obj = os.get_terminal_size()
    height = min(term_size_obj.lines, 40)

    center_y = height // 2
    output_text = Text()

    for row_idx in range(height):
        row_text = Text()
        for bar in bars:
            half_height = bar.height / 2
            is_filled_down = center_y <= row_idx < center_y + half_height
            is_filled_up = center_y > row_idx >= center_y - half_height

            if is_filled_up or is_filled_down:
                row_text.append("█", style=bar.color)
            else:
                row_text.append(" ")

        output_text.append(row_text)
        output_text.append("\n")

    return output_text


async def main():
    async with Network() as network:
        dev_info = await network.wait_for_new_device(timeout_seconds=5)
    if dev_info is None:
        print("No device could be found! Abort")
        return

    async with Device.from_discovered_device(dev_info) as device:
        print(f"Getting status information from {device}")

        status = await device.get_status()

        sensor_audio = status.direct_audio_sensor()
        if not sensor_audio.connected:
            print(f"Audio sensor is not connected to {device}")
            return

        audio_generator = receive_audio_frames(sensor_audio.url, run_loop=True)
        # Prime the generator to get the first frame for parameters
        first_frame = await anext(audio_generator)
        print(
            f"Audio stream parameters: "
            f"Sample Rate: {first_frame.av_frame.sample_rate}, "
            f"Channels: {first_frame.av_frame.layout.nb_channels}, "
            f"Layout: {first_frame.av_frame.layout.name}"
        )

        frequencies = np.logspace(
            np.log10(100), np.log10(first_frame.av_frame.sample_rate / 2), num=100
        )

        bars = [
            TerminalAudioBar(
                target_freq=freq,
                color="cyan",
            )
            for i, freq in enumerate(frequencies)
        ]
        last_ts = first_frame.timestamp_unix_seconds
        with Live(auto_refresh=False, screen=True, vertical_overflow="visible") as live:
            async for audio_frame in receive_audio_frames(
                sensor_audio.url, run_loop=True
            ):
                dt = audio_frame.timestamp_unix_seconds - last_ts
                last_ts = audio_frame.timestamp_unix_seconds
                aframe_ndarray = audio_frame.to_ndarray()

                spectrum = generate_linear_spectrum(
                    aframe_ndarray,
                    sample_rate=audio_frame.av_frame.sample_rate,
                    bars=bars,
                    dt=dt,
                )
                display_panel = Panel(
                    Align.center(spectrum, vertical="middle"),
                    title="[bold cyan]Live Audio Waveform[/bold cyan]",
                    border_style="magenta",
                    padding=(1, 1),
                )
                live.update(display_panel, refresh=True)

        try:
            # Keep the main asyncio loop running until interrupted
            while True:
                await asyncio.sleep(1)
        except asyncio.CancelledError:
            logging.info("Main task cancelled.")
        except KeyboardInterrupt:
            logging.info("KeyboardInterrupt received.")


if __name__ == "__main__":
    with contextlib.suppress(KeyboardInterrupt):
        asyncio.run(main())

Playing Audio

Audio Playback in realtime can be tricky, here we use SoundDevice. This library digest NumPy arrays and allows to play them back quickly, with the only caveat that it does not accept 32 bit planar audio format, thus, we have to resample it.

For commodity, we included a PyAv AudioResampler object to the AudioFrame class, it lazy loads, and calling to_resampled_ndarray will convert convert the av.AudioFrame to a NumPy array in signed 16-bit integer format.

Note

Now, you can also use a different audio library like PyAudio or pygame to play back the audio data, but you might need to install portaudio, and the latter is more suited for game development.

We also bundle an additional AudioPlayer class. It handles audio buffering and playback in a background thread, using a circular buffer to guarantee smooth playback without glitches or silence.

Check the whole example code here
stream_audio_and_play.py
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
import asyncio
import contextlib
import logging
import typing as T

from pupil_labs.realtime_api import (
    AudioFrame,
    Device,
    Network,
    receive_audio_frames,
)
from pupil_labs.realtime_api.audio_player import AudioPlayer

logging.basicConfig(level=logging.INFO)


async def enqueue_audio_data(
    audio_generator: T.AsyncIterator[AudioFrame],
    player: AudioPlayer,
) -> None:
    """Get audio frames from a generator, resample and put them into a queue."""
    logging.info("Audio enqueuer task started.")
    try:
        async for audio_frame in audio_generator:
            # We place the resampled ndarray (s16) in the queue
            # for the audio callback to consume.
            for resampled_chunk in audio_frame.to_resampled_ndarray():
                player.add_data(resampled_chunk.T)
    except asyncio.CancelledError:
        logging.info("Audio enqueuer task cancelled.")
    except Exception:
        logging.exception("An error occurred in the audio enqueuer task.")
    finally:
        logging.info("Audio enqueuer task finished. Signaling end of stream.")
        player.close()  # Signal the audio playback thread to stop


async def main():
    try:
        async with Network() as network:
            dev_info = await network.wait_for_new_device(timeout_seconds=5)
            if dev_info is None:
                print("No device could be found! Aborting.")
                return

            async with Device.from_discovered_device(dev_info) as device:
                print(f"Connecting to {device}...")
                status = await device.get_status()

                sensor_audio = status.direct_audio_sensor()
                if not sensor_audio.connected:
                    print(f"Audio sensor is not connected to {device}. Aborting.")
                    return

                audio_generator = receive_audio_frames(sensor_audio.url, run_loop=True)

                first_frame = await anext(audio_generator)
                sample_rate = first_frame.av_frame.sample_rate
                channels = first_frame.av_frame.layout.nb_channels
                print(
                    f"Audio stream parameters: "
                    f"Sample Rate: {sample_rate}, "
                    f"Channels: {channels}, "
                    f"Layout: {first_frame.av_frame.layout.name}"
                )
                player = AudioPlayer(
                    samplerate=sample_rate,
                    channels=channels,
                    dtype="int16",
                )

                player.start()

                # Start the asyncio task to enqueue audio data from the generator
                enqueue_task = asyncio.create_task(
                    enqueue_audio_data(audio_generator, player)
                )

                # Prime the queue with the first frame we already extracted
                for resampled_chunk in first_frame.to_resampled_ndarray():
                    player.add_data(resampled_chunk.T)

                # Wait for the enqueuer task to complete or be cancelled
                await enqueue_task

    except asyncio.CancelledError:
        logging.info("Main task cancelled.")
    except KeyboardInterrupt:
        logging.info("KeyboardInterrupt received, initiating shutdown.")
    finally:
        logging.info("Cleaning up resources...")
        player.close()

        logging.info("Cleanup complete.")


if __name__ == "__main__":
    # Use contextlib.suppress to avoid a traceback on KeyboardInterrupt
    with contextlib.suppress(KeyboardInterrupt):
        asyncio.run(main())

Playing Video and Audio

Here you can find an example that shows how to play both video with gaze overlay and audio using OpenCV and SoundDevice. Note that this is a example demonstrates the usage of sounddevice without the AudioPlayer class.

Check the whole example code here
stream_video_gaze_and_audio.py
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
import asyncio
import contextlib
import logging
import threading
import typing as T
from queue import Empty, Queue

import cv2
import numpy as np
import numpy.typing as npt
import sounddevice as sd

# Workaround for https://github.com/opencv/opencv/issues/21952
cv2.imshow("cv/av bug", np.zeros(1))
cv2.destroyAllWindows()

from pupil_labs.realtime_api import (  # noqa: E402
    Device,
    Network,
    receive_audio_frames,
    receive_gaze_data,
    receive_video_frames,
)

logging.basicConfig(level=logging.INFO)

# Use a threading event to signal the audio playback thread to stop
stop_audio_event = threading.Event()


def audio_playback_thread_target(
    sample_rate: int,
    stop_event: threading.Event,
    audio_queue: Queue,
):
    """Dedicated thread for sounddevice playback.

    This runs in a separate thread to avoid blocking the main asyncio event loop.
    It receives raw AudioFrames, resamples them, and plays them back.
    """
    logging.info("Audio playback thread started.")
    audio_buffer = np.array([], dtype=np.int16)

    def audio_callback(outdata: npt.NDArray[np.int16], frames: int, *args):
        nonlocal audio_buffer
        while len(audio_buffer) < frames:
            try:
                frame = audio_queue.get_nowait()
                if frame is None:
                    raise sd.CallbackStop("End of stream.")
                for resampled_chunk in frame.to_resampled_ndarray():
                    audio_buffer = np.concatenate((
                        audio_buffer,
                        resampled_chunk.flatten(),
                    ))
            except Empty:
                logging.debug("Audio buffer underrun: filling with silence.")
                break

        frames_to_play = min(len(audio_buffer), frames)
        outdata[:frames_to_play, 0] = audio_buffer[:frames_to_play]
        outdata[frames_to_play:, 0] = 0
        audio_buffer = audio_buffer[frames_to_play:]

    try:
        stream = sd.OutputStream(
            samplerate=sample_rate,
            channels=1,
            dtype="int16",
            callback=audio_callback,
            blocksize=0,
            latency="low",
        )
        with stream:
            logging.info("Audio stream started.")
            stop_event.wait()
            logging.info("Stop signal received, closing audio stream.")
    except Exception:
        logging.exception("An error occurred in the audio playback thread.")
    finally:
        logging.info("Audio playback thread finished.")


async def manage_audio_playback(
    queue_audio: asyncio.Queue, audio_playback_queue: Queue
):
    """Audio management task.

    Waits for the first audio frame, starts the playback thread,
    and then continuously moves frames from the asyncio queue to the thread's queue.
    """
    audio_playback_thread = None
    try:
        # Wait for the first frame to arrive to start the playback thread
        _ts, first_frame = await queue_audio.get()
        logging.info("First audio frame received, starting playback thread.")

        sample_rate = first_frame.av_frame.sample_rate
        audio_playback_thread = threading.Thread(
            target=audio_playback_thread_target,
            args=(sample_rate, stop_audio_event, audio_playback_queue),
            name="AudioPlaybackThread",
        )
        audio_playback_thread.start()

        # Put the first frame into the playback queue
        audio_playback_queue.put(first_frame)

        # Continuously move frames from the async queue to the playback queue
        while not stop_audio_event.is_set():
            _ts, frame = await queue_audio.get()
            audio_playback_queue.put(frame)

    except asyncio.CancelledError:
        logging.info("Audio manager task cancelled.")
    finally:
        if audio_playback_thread and audio_playback_thread.is_alive():
            # Signal end of stream to the audio thread
            audio_playback_queue.put(None)
        logging.info("Audio manager task finished.")


async def enqueue_sensor_data(sensor: T.AsyncIterator, queue: asyncio.Queue) -> None:
    """Move sensor data into an asyncio queue."""
    async for datum in sensor:
        try:
            queue.put_nowait((datum.datetime, datum))
        except asyncio.QueueFull:
            logging.warning(f"Queue is full, dropping {datum.__class__.__name__}")


async def get_most_recent_item(queue: asyncio.Queue):
    """Empty the queue and returns the last item."""
    item = await queue.get()
    while True:
        try:
            next_item = queue.get_nowait()
        except asyncio.QueueEmpty:
            return item
        else:
            item = next_item


async def get_closest_item(queue: asyncio.Queue, timestamp):
    """Get the item from the queue that is closest in time to the timestamp."""
    item_ts, item = await queue.get()
    if item_ts > timestamp:
        return item_ts, item
    while True:
        try:
            next_item_ts, next_item = queue.get_nowait()
        except asyncio.QueueEmpty:
            return item_ts, item
        else:
            if next_item_ts > timestamp:
                return next_item_ts, next_item
            item_ts, item = next_item_ts, next_item


async def match_and_draw(queue_video: asyncio.Queue, queue_gaze: asyncio.Queue):
    """Match video and gaze data and draws the gaze overlay."""
    while not stop_audio_event.is_set():
        try:
            video_datetime, video_frame = await get_most_recent_item(queue_video)
            _, gaze_datum = await get_closest_item(queue_gaze, video_datetime)

            bgr_buffer = video_frame.to_ndarray(format="bgr24")
            cv2.circle(
                bgr_buffer,
                (int(gaze_datum.x), int(gaze_datum.y)),
                radius=20,
                color=(0, 0, 255),
                thickness=5,
            )
            cv2.imshow("Scene Camera with Gaze and Audio", bgr_buffer)
            if cv2.waitKey(1) & 0xFF == ord("q"):
                print("'q' pressed, exiting.")
                break
        except asyncio.QueueEmpty:
            # Queues might be empty at the start, just continue
            await asyncio.sleep(0.01)
            continue
        except Exception:
            logging.exception("Error in drawing loop")
            break


async def main():
    async with Network() as network:
        try:
            dev_info = await network.wait_for_new_device(timeout_seconds=5)
            if dev_info is None:
                logging.error("No device found. Aborting.")
                return
        except asyncio.TimeoutError:
            logging.exception("Timeout while searching for a device. Aborting.")
            return

        async with Device.from_discovered_device(dev_info) as device:
            logging.info(f"Connecting to {device}...")
            status = await device.get_status()

            sensor_world = status.direct_world_sensor()
            sensor_gaze = status.direct_gaze_sensor()
            sensor_audio = status.direct_audio_sensor()

            if not all(s.connected for s in [sensor_world, sensor_gaze, sensor_audio]):
                logging.error("Not all required sensors are connected. Aborting.")
                return

            logging.info("All sensors connected.")
            restart_on_disconnect = True

            # Initialize Queues
            queue_video = asyncio.Queue()
            queue_gaze = asyncio.Queue()
            queue_audio = asyncio.Queue()
            audio_playback_queue = Queue()  # For communication with the audio thread

            # Create tasks for receiving and processing data
            tasks = []
            audio_playback_thread = None
            try:
                # Sensor data enqueuing tasks
                tasks.extend((
                    asyncio.create_task(
                        enqueue_sensor_data(
                            receive_video_frames(
                                sensor_world.url, run_loop=restart_on_disconnect
                            ),
                            queue_video,
                        )
                    ),
                    asyncio.create_task(
                        enqueue_sensor_data(
                            receive_gaze_data(
                                sensor_gaze.url, run_loop=restart_on_disconnect
                            ),
                            queue_gaze,
                        )
                    ),
                    asyncio.create_task(
                        enqueue_sensor_data(
                            receive_audio_frames(
                                sensor_audio.url, run_loop=restart_on_disconnect
                            ),
                            queue_audio,
                        )
                    ),
                ))

                # Audio management task
                audio_manager_task = asyncio.create_task(
                    manage_audio_playback(queue_audio, audio_playback_queue)
                )
                tasks.append(audio_manager_task)

                # Run the main drawing loop
                await match_and_draw(queue_video, queue_gaze)

            finally:
                logging.info("Shutting down...")
                stop_audio_event.set()
                for task in tasks:
                    task.cancel()
                await asyncio.gather(*tasks, return_exceptions=True)

                # Find the audio thread to join it
                for thread in threading.enumerate():
                    if thread.name == "AudioPlaybackThread":
                        audio_playback_thread = thread
                        break

                if audio_playback_thread and audio_playback_thread.is_alive():
                    # Put a final None to ensure the audio thread's queue.get() unblocks
                    audio_playback_queue.put(None)
                    audio_playback_thread.join(timeout=2)
                    if audio_playback_thread.is_alive():
                        logging.warning("Audio thread did not terminate cleanly.")
                cv2.destroyAllWindows()
                logging.info("Cleanup complete.")


if __name__ == "__main__":
    with contextlib.suppress(KeyboardInterrupt):
        asyncio.run(main())
AudioPlayer

AudioPlayer

AudioPlayer(samplerate: int, channels: int, dtype: str = 'int16')

Bases: Thread

A threaded, low-latency audio player using a shared RingBuffer.

Methods:

  • add_data

    Directly write data to the shared RingBuffer.

  • close

    Signal the thread to stop and clean up resources.

  • get_buffer_size

    Get the current number of samples in the buffer for debugging.

  • run

    Run the main entrypoint for the thread.

Source code in src/pupil_labs/realtime_api/audio_player.py
112
113
114
115
116
117
118
119
120
121
122
123
124
def __init__(self, samplerate: int, channels: int, dtype: str = "int16"):
    super().__init__(daemon=True)
    self.samplerate = samplerate
    self.channels = channels
    self.dtype = dtype

    self._stop_event = threading.Event()
    self._buffer = RingBuffer(
        capacity=1024,
        dtype=np.int16,
        channels=channels,
    )
    self.stream: sd.OutputStream | None = None

add_data

add_data(data: NDArray[int16]) -> None

Directly write data to the shared RingBuffer.

Source code in src/pupil_labs/realtime_api/audio_player.py
155
156
157
def add_data(self, data: npt.NDArray[np.int16]) -> None:
    """Directly write data to the shared RingBuffer."""
    self._buffer.write(data)

close

close() -> None

Signal the thread to stop and clean up resources.

Source code in src/pupil_labs/realtime_api/audio_player.py
163
164
165
166
167
168
def close(self) -> None:
    """Signal the thread to stop and clean up resources."""
    logging.debug("Closing audio player...")
    self._stop_event.set()
    self.join()  # Wait for the thread to finish
    logging.info("Audio player closed.")

get_buffer_size

get_buffer_size() -> int

Get the current number of samples in the buffer for debugging.

Source code in src/pupil_labs/realtime_api/audio_player.py
159
160
161
def get_buffer_size(self) -> int:
    """Get the current number of samples in the buffer for debugging."""
    return self._buffer.size

run

run() -> None

Run the main entrypoint for the thread.

Source code in src/pupil_labs/realtime_api/audio_player.py
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
def run(self) -> None:
    """Run the main entrypoint for the thread."""
    try:
        self.stream = sd.OutputStream(
            samplerate=self.samplerate,
            channels=self.channels,
            dtype=self.dtype,
            callback=self._callback,
            blocksize=0,  # Let the device choose the optimal size for low latency
            latency="low",
        )
        with self.stream:
            logging.debug("Audio stream started.")
            self._stop_event.wait()  # Wait until the close() method is called
    except Exception:
        logging.exception("Error in audio thread.")
    finally:
        logging.debug("Audio stream closed.")

Bonus

On the simple API examples you can also find how to use the audio for Speech-to-Text using the whisper library.