Skip to content

AudioBufferQueue

Bounded FIFO queue that buffers audio chunks between pipeline stages.

Defined in: src/core/pipeline/AudioBufferQueue.ts:216

Bounded FIFO queue that buffers audio chunks between pipeline stages.

Remarks

The queue is the primary mechanism for fixing the race condition where the STT WebSocket handshake completes after audio capture has already started. Audio chunks produced by the input provider are enqueued while the STT connects, then flushed in order once the STT is ready.

The queue supports three overflow strategies to handle situations where the consumer is slow or disconnected for an extended period:

  • 'drop-oldest' — Removes the oldest chunk when full (default). Best for real-time audio where recent frames are more valuable.
  • 'drop-newest' — Discards incoming chunks when full. Preserves the beginning of the stream.
  • 'block' — The enqueue call blocks (via promise) until space is available. Use with caution as it introduces backpressure.

Example

const queue = new AudioBufferQueue({
  name: 'input',
  maxSize: 1000,
  overflowStrategy: 'drop-oldest',
});

// Enqueue while waiting for STT
queue.enqueue(chunk1);
queue.enqueue(chunk2);
console.log(queue.size); // 2

// STT connected — drain and switch to pass-through
queue.startDraining((chunk) => stt.sendAudio(chunk.data));
// chunk1 and chunk2 are flushed immediately
// subsequent enqueues pass through directly

// Pause for turn-taking
queue.stopDraining();
// chunks buffer again until next startDraining()

See

Constructors

Constructor

new AudioBufferQueue(config): AudioBufferQueue;

Defined in: src/core/pipeline/AudioBufferQueue.ts:260

Creates a new AudioBufferQueue.

Parameters

ParameterTypeDescription
configAudioBufferQueueConfigQueue configuration specifying name, max size, and overflow strategy.

Returns

AudioBufferQueue

Example

const queue = new AudioBufferQueue({
  name: 'input',
  maxSize: 2000,
  overflowStrategy: 'drop-oldest',
});

See

AudioBufferQueueConfig

Accessors

size

Get Signature

get size(): number;

Defined in: src/core/pipeline/AudioBufferQueue.ts:409

Current number of chunks in the buffer.

Remarks

Always 0 when the queue is in draining mode (pass-through).

Returns

number

Methods

clear()

clear(): void;

Defined in: src/core/pipeline/AudioBufferQueue.ts:421

Removes all chunks from the buffer and resets the block resolver.

Returns

void

Remarks

Does not affect the draining state or counters. Chunks removed by clear() are not counted as dropped — they are simply discarded. Use this when stopping the pipeline to release memory.


enqueue()

enqueue(chunk): void | Promise<void>;

Defined in: src/core/pipeline/AudioBufferQueue.ts:282

Enqueues an audio chunk into the buffer.

Parameters

ParameterTypeDescription
chunkAudioChunkThe audio chunk to enqueue.

Returns

void | Promise<void>

void for 'drop-oldest' and 'drop-newest'; a Promise<void> for 'block' when the queue is full (resolves when space is available).

Remarks

Behavior depends on the current mode:

  • Draining mode: The chunk is passed directly to the drain callback (zero-copy fast path) without touching the internal buffer.
  • Buffering mode: The chunk is added to the FIFO buffer. If the buffer is full, the overflow strategy determines what happens:
    • 'drop-oldest': The oldest chunk is removed to make room.
    • 'drop-newest': The incoming chunk is discarded.
    • 'block': Returns a promise that resolves when space becomes available.

getStats()

getStats(): QueueStats;

Defined in: src/core/pipeline/AudioBufferQueue.ts:448

Returns a snapshot of queue statistics for monitoring.

Returns

QueueStats

A QueueStats snapshot.

Remarks

The returned QueueStats object is a snapshot — it does not update as the queue changes. Call this method again for fresh stats.

Example

const stats = queue.getStats();
if (stats.totalDropped > 0) {
  console.warn(`Queue "${stats.name}" dropped ${stats.totalDropped} chunks`);
}

isDraining()

isDraining(): boolean;

Defined in: src/core/pipeline/AudioBufferQueue.ts:468

Whether the queue is currently in draining (pass-through) mode.

Returns

boolean

true if startDraining has been called and stopDraining has not been called since.


onOverflow()

onOverflow(callback): void;

Defined in: src/core/pipeline/AudioBufferQueue.ts:495

Registers a callback that is invoked whenever the queue drops chunks due to overflow.

Parameters

ParameterTypeDescription
callbackOverflowCallback | nullThe function to call on overflow, or null to clear.

Returns

void

Remarks

The callback receives the number of chunks dropped in this instance and the current buffer size after the drop. Only one overflow callback is supported; calling this method replaces any previously registered callback.

The orchestrator uses this to bridge overflow events to the typed EventEmitter system.

Example

queue.onOverflow((droppedChunks, currentSize) => {
  console.warn(`Dropped ${droppedChunks} chunks, buffer at ${currentSize}`);
});

See

OverflowCallback


peek()

peek(): AudioChunk | undefined;

Defined in: src/core/pipeline/AudioBufferQueue.ts:399

Returns the first chunk in the buffer without removing it.

Returns

AudioChunk | undefined

The oldest chunk in the buffer, or undefined.

Remarks

Returns undefined if the buffer is empty or the queue is in draining mode (since the buffer is empty in draining mode).


startDraining()

startDraining(callback): void;

Defined in: src/core/pipeline/AudioBufferQueue.ts:353

Starts draining the queue: flushes all buffered chunks, then switches to pass-through mode.

Parameters

ParameterTypeDescription
callbackDrainCallbackFunction to call for each chunk (buffered and future).

Returns

void

Remarks

This is the key method for the race condition fix. Call it after the STT WebSocket handshake completes to receive all chunks that were buffered during the connection attempt, followed by real-time pass-through of subsequent chunks.

The flush is synchronous — all buffered chunks are delivered to the callback in FIFO order before this method returns. After flushing, the queue enters draining mode where enqueue passes chunks directly to the callback.

If the queue is already draining, calling this method replaces the existing callback.

Example

// Buffer 5 chunks while STT connects
for (const chunk of chunks) queue.enqueue(chunk);

// STT ready — flush all 5 and switch to pass-through
await stt.connect();
queue.startDraining((chunk) => stt.sendAudio(chunk.data));

See

stopDraining to return to buffering mode


stopDraining()

stopDraining(): void;

Defined in: src/core/pipeline/AudioBufferQueue.ts:385

Stops draining and returns the queue to buffering mode.

Returns

void

Remarks

After calling this method, subsequent enqueue calls will buffer chunks internally instead of passing them to the drain callback. The drain callback is cleared.

This is used during turn-taking: when the agent starts speaking, the orchestrator stops draining the input queue (pauses STT) and resumes draining when the agent finishes.

See

startDraining to resume draining

© 2026 CompositeVoice. All rights reserved.

Font size
Contrast
Motion
Transparency