Using the receive_audio_frames method, you can receive audio frames, which you can use it to play them live, record, or perform real-time analysis like speech-to-text or sound analysis.
Convert the audio frame to a resampled s16 NumPy array
Source code in src/pupil_labs/realtime_api/streaming/audio.py
62636465
defto_resampled_ndarray(self,*args:Any,**kwargs:Any)->Iterator[npt.NDArray]:"""Convert the audio frame to a resampled s16 NumPy array"""forframeinself.resampler.resample(self.av_frame):yieldframe.to_ndarray(*args,**kwargs)
By default, the audio signal is streamed in mono using the AAC codec. The stream is downsampled from the original 48 kHz source to a sampling rate of 8 kHz to save bandwidth, and uses a 32-bit floating-point planar (fltp) format.
The audio stream does not have it's own RTSP stream but is multiplexed with video, so in this client, we create a virtual sensor component using the Scene Camera stream.
You can easily receive audio frames and convert them to NumPy arrays using the to_ndarray method and feed these to any audio library of your choice like librosa for analysis.
importasyncioimportcontextlibimportloggingfrompupil_labs.realtime_apiimport(Device,Network,receive_audio_frames,)logging.basicConfig(level=logging.INFO)asyncdefmain():try:asyncwithNetwork()asnetwork:dev_info=awaitnetwork.wait_for_new_device(timeout_seconds=5)ifdev_infoisNone:print("No device could be found! Aborting.")returnasyncwithDevice.from_discovered_device(dev_info)asdevice:print(f"Connecting to {device}...")status=awaitdevice.get_status()sensor_audio=status.direct_audio_sensor()ifnotsensor_audio.connected:print(f"Audio sensor is not connected to {device}. Aborting.")returnaudio_generator=receive_audio_frames(sensor_audio.url,run_loop=True)first_frame=awaitanext(audio_generator)sample_rate=first_frame.av_frame.sample_ratechannels=first_frame.av_frame.layout.nb_channelsprint(f"Audio stream parameters: "f"Sample Rate: {sample_rate}, "f"Channels: {channels}, "f"Layout: {first_frame.av_frame.layout.name}")asyncforaudio_frameinaudio_generator:print(audio_frame)exceptasyncio.CancelledError:logging.info("Main task cancelled.")exceptKeyboardInterrupt:logging.info("KeyboardInterrupt received, initiating shutdown.")finally:logging.info("Cleaning up resources...")if__name__=="__main__":# Use contextlib.suppress to avoid a traceback on KeyboardInterruptwithcontextlib.suppress(KeyboardInterrupt):asyncio.run(main())
For completeness, we have also included an example that shows how plot audio using librosa and rich on the terminal.
importasyncioimportcontextlibimportloggingimportosimportlibrosaimportnumpyasnpfromrich.alignimportAlignfromrich.liveimportLivefromrich.panelimportPanelfromrich.textimportTextfrompupil_labs.realtime_apiimport(Device,Network,receive_audio_frames,)classTerminalAudioBar:def__init__(self,target_freq,color,max_height=24,min_level=0,max_level=1.0):self.target_freq=target_freqself.color=colorself.min_height=1self.max_height=max_heightself.height=self.min_heightself.min_level=min_levelself.max_level=max_levellevel_range=self.max_level-self.min_levelheight_range=self.max_height-self.min_heightself.__level_height_ratio=height_range/level_rangeiflevel_rangeelse1.0defupdate(self,dt,level):desired_height=self.min_height+(level*self.__level_height_ratio)speed=(desired_height-self.height)/0.1self.height+=speed*dtself.height=np.clip(self.height,self.min_height,self.max_height)defgenerate_linear_spectrum(audio_chunk,sample_rate,bars,dt):"""Render a linear, vertically symmetric bar spectrum."""audio_chunk=np.squeeze(audio_chunk).astype(np.float32)ifaudio_chunk.size==0:return""stft_data=librosa.stft(audio_chunk)stft_magnitude=np.abs(stft_data)n_fft=(stft_magnitude.shape[0]-1)*2freqs=librosa.fft_frequencies(sr=sample_rate,n_fft=n_fft)freqs=freqs[:stft_magnitude.shape[0]]forbarinbars:freq_index=np.argmin(np.abs(freqs-bar.target_freq))level=np.mean(stft_magnitude[freq_index,:])bar.update(dt,level)term_size_obj=os.get_terminal_size()height=min(term_size_obj.lines,40)center_y=height//2output_text=Text()forrow_idxinrange(height):row_text=Text()forbarinbars:half_height=bar.height/2is_filled_down=center_y<=row_idx<center_y+half_heightis_filled_up=center_y>row_idx>=center_y-half_heightifis_filled_uporis_filled_down:row_text.append("█",style=bar.color)else:row_text.append(" ")output_text.append(row_text)output_text.append("\n")returnoutput_textasyncdefmain():asyncwithNetwork()asnetwork:dev_info=awaitnetwork.wait_for_new_device(timeout_seconds=5)ifdev_infoisNone:print("No device could be found! Abort")returnasyncwithDevice.from_discovered_device(dev_info)asdevice:print(f"Getting status information from {device}")status=awaitdevice.get_status()sensor_audio=status.direct_audio_sensor()ifnotsensor_audio.connected:print(f"Audio sensor is not connected to {device}")returnaudio_generator=receive_audio_frames(sensor_audio.url,run_loop=True)# Prime the generator to get the first frame for parametersfirst_frame=awaitanext(audio_generator)print(f"Audio stream parameters: "f"Sample Rate: {first_frame.av_frame.sample_rate}, "f"Channels: {first_frame.av_frame.layout.nb_channels}, "f"Layout: {first_frame.av_frame.layout.name}")frequencies=np.logspace(np.log10(100),np.log10(first_frame.av_frame.sample_rate/2),num=100)bars=[TerminalAudioBar(target_freq=freq,color="cyan",)fori,freqinenumerate(frequencies)]last_ts=first_frame.timestamp_unix_secondswithLive(auto_refresh=False,screen=True,vertical_overflow="visible")aslive:asyncforaudio_frameinreceive_audio_frames(sensor_audio.url,run_loop=True):dt=audio_frame.timestamp_unix_seconds-last_tslast_ts=audio_frame.timestamp_unix_secondsaframe_ndarray=audio_frame.to_ndarray()spectrum=generate_linear_spectrum(aframe_ndarray,sample_rate=audio_frame.av_frame.sample_rate,bars=bars,dt=dt,)display_panel=Panel(Align.center(spectrum,vertical="middle"),title="[bold cyan]Live Audio Waveform[/bold cyan]",border_style="magenta",padding=(1,1),)live.update(display_panel,refresh=True)try:# Keep the main asyncio loop running until interruptedwhileTrue:awaitasyncio.sleep(1)exceptasyncio.CancelledError:logging.info("Main task cancelled.")exceptKeyboardInterrupt:logging.info("KeyboardInterrupt received.")if__name__=="__main__":withcontextlib.suppress(KeyboardInterrupt):asyncio.run(main())
Audio Playback in realtime can be tricky, here we use SoundDevice. This library digest NumPy arrays and allows to play them back quickly, with the only caveat that it does not accept 32 bit planar audio format, thus, we have to resample it.
For commodity, we included a PyAv AudioResampler object to the AudioFrame class, it lazy loads, and calling to_resampled_ndarray will convert convert the av.AudioFrame to a NumPy array in signed 16-bit integer format.
Note
Now, you can also use a different audio library like PyAudio or pygame to play back the audio data, but you might need to install portaudio, and the latter is more suited for game development.
We also bundle an additional AudioPlayer class. It handles audio buffering and playback in a background thread, using a circular buffer to guarantee smooth playback without glitches or silence.
importasyncioimportcontextlibimportloggingimporttypingasTfrompupil_labs.realtime_apiimport(AudioFrame,Device,Network,receive_audio_frames,)frompupil_labs.realtime_api.audio_playerimportAudioPlayerlogging.basicConfig(level=logging.INFO)asyncdefenqueue_audio_data(audio_generator:T.AsyncIterator[AudioFrame],player:AudioPlayer,)->None:"""Get audio frames from a generator, resample and put them into a queue."""logging.info("Audio enqueuer task started.")try:asyncforaudio_frameinaudio_generator:# We place the resampled ndarray (s16) in the queue# for the audio callback to consume.forresampled_chunkinaudio_frame.to_resampled_ndarray():player.add_data(resampled_chunk.T)exceptasyncio.CancelledError:logging.info("Audio enqueuer task cancelled.")exceptException:logging.exception("An error occurred in the audio enqueuer task.")finally:logging.info("Audio enqueuer task finished. Signaling end of stream.")player.close()# Signal the audio playback thread to stopasyncdefmain():try:asyncwithNetwork()asnetwork:dev_info=awaitnetwork.wait_for_new_device(timeout_seconds=5)ifdev_infoisNone:print("No device could be found! Aborting.")returnasyncwithDevice.from_discovered_device(dev_info)asdevice:print(f"Connecting to {device}...")status=awaitdevice.get_status()sensor_audio=status.direct_audio_sensor()ifnotsensor_audio.connected:print(f"Audio sensor is not connected to {device}. Aborting.")returnaudio_generator=receive_audio_frames(sensor_audio.url,run_loop=True)first_frame=awaitanext(audio_generator)sample_rate=first_frame.av_frame.sample_ratechannels=first_frame.av_frame.layout.nb_channelsprint(f"Audio stream parameters: "f"Sample Rate: {sample_rate}, "f"Channels: {channels}, "f"Layout: {first_frame.av_frame.layout.name}")player=AudioPlayer(samplerate=sample_rate,channels=channels,dtype="int16",)player.start()# Start the asyncio task to enqueue audio data from the generatorenqueue_task=asyncio.create_task(enqueue_audio_data(audio_generator,player))# Prime the queue with the first frame we already extractedforresampled_chunkinfirst_frame.to_resampled_ndarray():player.add_data(resampled_chunk.T)# Wait for the enqueuer task to complete or be cancelledawaitenqueue_taskexceptasyncio.CancelledError:logging.info("Main task cancelled.")exceptKeyboardInterrupt:logging.info("KeyboardInterrupt received, initiating shutdown.")finally:logging.info("Cleaning up resources...")player.close()logging.info("Cleanup complete.")if__name__=="__main__":# Use contextlib.suppress to avoid a traceback on KeyboardInterruptwithcontextlib.suppress(KeyboardInterrupt):asyncio.run(main())
Here you can find an example that shows how to play both video with gaze overlay and audio using OpenCV and SoundDevice. Note that this is a example demonstrates the usage of sounddevice without the AudioPlayer class.
importasyncioimportcontextlibimportloggingimportthreadingimporttypingasTfromqueueimportEmpty,Queueimportcv2importnumpyasnpimportnumpy.typingasnptimportsounddeviceassd# Workaround for https://github.com/opencv/opencv/issues/21952cv2.imshow("cv/av bug",np.zeros(1))cv2.destroyAllWindows()frompupil_labs.realtime_apiimport(# noqa: E402Device,Network,receive_audio_frames,receive_gaze_data,receive_video_frames,)logging.basicConfig(level=logging.INFO)# Use a threading event to signal the audio playback thread to stopstop_audio_event=threading.Event()defaudio_playback_thread_target(sample_rate:int,stop_event:threading.Event,audio_queue:Queue,):"""Dedicated thread for sounddevice playback. This runs in a separate thread to avoid blocking the main asyncio event loop. It receives raw AudioFrames, resamples them, and plays them back. """logging.info("Audio playback thread started.")audio_buffer=np.array([],dtype=np.int16)defaudio_callback(outdata:npt.NDArray[np.int16],frames:int,*args):nonlocalaudio_bufferwhilelen(audio_buffer)<frames:try:frame=audio_queue.get_nowait()ifframeisNone:raisesd.CallbackStop("End of stream.")forresampled_chunkinframe.to_resampled_ndarray():audio_buffer=np.concatenate((audio_buffer,resampled_chunk.flatten(),))exceptEmpty:logging.debug("Audio buffer underrun: filling with silence.")breakframes_to_play=min(len(audio_buffer),frames)outdata[:frames_to_play,0]=audio_buffer[:frames_to_play]outdata[frames_to_play:,0]=0audio_buffer=audio_buffer[frames_to_play:]try:stream=sd.OutputStream(samplerate=sample_rate,channels=1,dtype="int16",callback=audio_callback,blocksize=0,latency="low",)withstream:logging.info("Audio stream started.")stop_event.wait()logging.info("Stop signal received, closing audio stream.")exceptException:logging.exception("An error occurred in the audio playback thread.")finally:logging.info("Audio playback thread finished.")asyncdefmanage_audio_playback(queue_audio:asyncio.Queue,audio_playback_queue:Queue):"""Audio management task. Waits for the first audio frame, starts the playback thread, and then continuously moves frames from the asyncio queue to the thread's queue. """audio_playback_thread=Nonetry:# Wait for the first frame to arrive to start the playback thread_ts,first_frame=awaitqueue_audio.get()logging.info("First audio frame received, starting playback thread.")sample_rate=first_frame.av_frame.sample_rateaudio_playback_thread=threading.Thread(target=audio_playback_thread_target,args=(sample_rate,stop_audio_event,audio_playback_queue),name="AudioPlaybackThread",)audio_playback_thread.start()# Put the first frame into the playback queueaudio_playback_queue.put(first_frame)# Continuously move frames from the async queue to the playback queuewhilenotstop_audio_event.is_set():_ts,frame=awaitqueue_audio.get()audio_playback_queue.put(frame)exceptasyncio.CancelledError:logging.info("Audio manager task cancelled.")finally:ifaudio_playback_threadandaudio_playback_thread.is_alive():# Signal end of stream to the audio threadaudio_playback_queue.put(None)logging.info("Audio manager task finished.")asyncdefenqueue_sensor_data(sensor:T.AsyncIterator,queue:asyncio.Queue)->None:"""Move sensor data into an asyncio queue."""asyncfordatuminsensor:try:queue.put_nowait((datum.datetime,datum))exceptasyncio.QueueFull:logging.warning(f"Queue is full, dropping {datum.__class__.__name__}")asyncdefget_most_recent_item(queue:asyncio.Queue):"""Empty the queue and returns the last item."""item=awaitqueue.get()whileTrue:try:next_item=queue.get_nowait()exceptasyncio.QueueEmpty:returnitemelse:item=next_itemasyncdefget_closest_item(queue:asyncio.Queue,timestamp):"""Get the item from the queue that is closest in time to the timestamp."""item_ts,item=awaitqueue.get()ifitem_ts>timestamp:returnitem_ts,itemwhileTrue:try:next_item_ts,next_item=queue.get_nowait()exceptasyncio.QueueEmpty:returnitem_ts,itemelse:ifnext_item_ts>timestamp:returnnext_item_ts,next_itemitem_ts,item=next_item_ts,next_itemasyncdefmatch_and_draw(queue_video:asyncio.Queue,queue_gaze:asyncio.Queue):"""Match video and gaze data and draws the gaze overlay."""whilenotstop_audio_event.is_set():try:video_datetime,video_frame=awaitget_most_recent_item(queue_video)_,gaze_datum=awaitget_closest_item(queue_gaze,video_datetime)bgr_buffer=video_frame.to_ndarray(format="bgr24")cv2.circle(bgr_buffer,(int(gaze_datum.x),int(gaze_datum.y)),radius=20,color=(0,0,255),thickness=5,)cv2.imshow("Scene Camera with Gaze and Audio",bgr_buffer)ifcv2.waitKey(1)&0xFF==ord("q"):print("'q' pressed, exiting.")breakexceptasyncio.QueueEmpty:# Queues might be empty at the start, just continueawaitasyncio.sleep(0.01)continueexceptException:logging.exception("Error in drawing loop")breakasyncdefmain():asyncwithNetwork()asnetwork:try:dev_info=awaitnetwork.wait_for_new_device(timeout_seconds=5)ifdev_infoisNone:logging.error("No device found. Aborting.")returnexceptasyncio.TimeoutError:logging.exception("Timeout while searching for a device. Aborting.")returnasyncwithDevice.from_discovered_device(dev_info)asdevice:logging.info(f"Connecting to {device}...")status=awaitdevice.get_status()sensor_world=status.direct_world_sensor()sensor_gaze=status.direct_gaze_sensor()sensor_audio=status.direct_audio_sensor()ifnotall(s.connectedforsin[sensor_world,sensor_gaze,sensor_audio]):logging.error("Not all required sensors are connected. Aborting.")returnlogging.info("All sensors connected.")restart_on_disconnect=True# Initialize Queuesqueue_video=asyncio.Queue()queue_gaze=asyncio.Queue()queue_audio=asyncio.Queue()audio_playback_queue=Queue()# For communication with the audio thread# Create tasks for receiving and processing datatasks=[]audio_playback_thread=Nonetry:# Sensor data enqueuing taskstasks.extend((asyncio.create_task(enqueue_sensor_data(receive_video_frames(sensor_world.url,run_loop=restart_on_disconnect),queue_video,)),asyncio.create_task(enqueue_sensor_data(receive_gaze_data(sensor_gaze.url,run_loop=restart_on_disconnect),queue_gaze,)),asyncio.create_task(enqueue_sensor_data(receive_audio_frames(sensor_audio.url,run_loop=restart_on_disconnect),queue_audio,)),))# Audio management taskaudio_manager_task=asyncio.create_task(manage_audio_playback(queue_audio,audio_playback_queue))tasks.append(audio_manager_task)# Run the main drawing loopawaitmatch_and_draw(queue_video,queue_gaze)finally:logging.info("Shutting down...")stop_audio_event.set()fortaskintasks:task.cancel()awaitasyncio.gather(*tasks,return_exceptions=True)# Find the audio thread to join itforthreadinthreading.enumerate():ifthread.name=="AudioPlaybackThread":audio_playback_thread=threadbreakifaudio_playback_threadandaudio_playback_thread.is_alive():# Put a final None to ensure the audio thread's queue.get() unblocksaudio_playback_queue.put(None)audio_playback_thread.join(timeout=2)ifaudio_playback_thread.is_alive():logging.warning("Audio thread did not terminate cleanly.")cv2.destroyAllWindows()logging.info("Cleanup complete.")if__name__=="__main__":withcontextlib.suppress(KeyboardInterrupt):asyncio.run(main())
Source code in src/pupil_labs/realtime_api/audio_player.py
163164165166167168
defclose(self)->None:"""Signal the thread to stop and clean up resources."""logging.debug("Closing audio player...")self._stop_event.set()self.join()# Wait for the thread to finishlogging.info("Audio player closed.")
defrun(self)->None:"""Run the main entrypoint for the thread."""try:self.stream=sd.OutputStream(samplerate=self.samplerate,channels=self.channels,dtype=self.dtype,callback=self._callback,blocksize=0,# Let the device choose the optimal size for low latencylatency="low",)withself.stream:logging.debug("Audio stream started.")self._stop_event.wait()# Wait until the close() method is calledexceptException:logging.exception("Error in audio thread.")finally:logging.debug("Audio stream closed.")
Bonus
On the simple API examples you can also find how to use the audio for Speech-to-Text using the whisper library.