AudioBufferQueue
Bounded FIFO queue that buffers audio chunks between pipeline stages.
Defined in: src/core/pipeline/AudioBufferQueue.ts:216
Bounded FIFO queue that buffers audio chunks between pipeline stages.
Remarks
The queue is the primary mechanism for fixing the race condition where the STT WebSocket handshake completes after audio capture has already started. Audio chunks produced by the input provider are enqueued while the STT connects, then flushed in order once the STT is ready.
The queue supports three overflow strategies to handle situations where the consumer is slow or disconnected for an extended period:
'drop-oldest'— Removes the oldest chunk when full (default). Best for real-time audio where recent frames are more valuable.'drop-newest'— Discards incoming chunks when full. Preserves the beginning of the stream.'block'— The enqueue call blocks (via promise) until space is available. Use with caution as it introduces backpressure.
Example
const queue = new AudioBufferQueue({
name: 'input',
maxSize: 1000,
overflowStrategy: 'drop-oldest',
});
// Enqueue while waiting for STT
queue.enqueue(chunk1);
queue.enqueue(chunk2);
console.log(queue.size); // 2
// STT connected — drain and switch to pass-through
queue.startDraining((chunk) => stt.sendAudio(chunk.data));
// chunk1 and chunk2 are flushed immediately
// subsequent enqueues pass through directly
// Pause for turn-taking
queue.stopDraining();
// chunks buffer again until next startDraining()
See
- AudioBufferQueueConfig for configuration
- QueueStats for monitoring
- AudioHeaderCache for companion header caching
Constructors
Constructor
new AudioBufferQueue(config): AudioBufferQueue;
Defined in: src/core/pipeline/AudioBufferQueue.ts:260
Creates a new AudioBufferQueue.
Parameters
| Parameter | Type | Description |
|---|---|---|
config | AudioBufferQueueConfig | Queue configuration specifying name, max size, and overflow strategy. |
Returns
AudioBufferQueue
Example
const queue = new AudioBufferQueue({
name: 'input',
maxSize: 2000,
overflowStrategy: 'drop-oldest',
});
See
Accessors
size
Get Signature
get size(): number;
Defined in: src/core/pipeline/AudioBufferQueue.ts:409
Current number of chunks in the buffer.
Remarks
Always 0 when the queue is in draining mode (pass-through).
Returns
number
Methods
clear()
clear(): void;
Defined in: src/core/pipeline/AudioBufferQueue.ts:421
Removes all chunks from the buffer and resets the block resolver.
Returns
void
Remarks
Does not affect the draining state or counters. Chunks removed by clear() are not counted as dropped — they are simply discarded. Use this when stopping the pipeline to release memory.
enqueue()
enqueue(chunk): void | Promise<void>;
Defined in: src/core/pipeline/AudioBufferQueue.ts:282
Enqueues an audio chunk into the buffer.
Parameters
| Parameter | Type | Description |
|---|---|---|
chunk | AudioChunk | The audio chunk to enqueue. |
Returns
void | Promise<void>
void for 'drop-oldest' and 'drop-newest'; a Promise<void> for 'block' when the queue is full (resolves when space is available).
Remarks
Behavior depends on the current mode:
- Draining mode: The chunk is passed directly to the drain callback (zero-copy fast path) without touching the internal buffer.
- Buffering mode: The chunk is added to the FIFO buffer. If the buffer is full, the overflow strategy determines what happens:
'drop-oldest': The oldest chunk is removed to make room.'drop-newest': The incoming chunk is discarded.'block': Returns a promise that resolves when space becomes available.
getStats()
getStats(): QueueStats;
Defined in: src/core/pipeline/AudioBufferQueue.ts:448
Returns a snapshot of queue statistics for monitoring.
Returns
A QueueStats snapshot.
Remarks
The returned QueueStats object is a snapshot — it does not update as the queue changes. Call this method again for fresh stats.
Example
const stats = queue.getStats();
if (stats.totalDropped > 0) {
console.warn(`Queue "${stats.name}" dropped ${stats.totalDropped} chunks`);
}
isDraining()
isDraining(): boolean;
Defined in: src/core/pipeline/AudioBufferQueue.ts:468
Whether the queue is currently in draining (pass-through) mode.
Returns
boolean
true if startDraining has been called and stopDraining has not been called since.
onOverflow()
onOverflow(callback): void;
Defined in: src/core/pipeline/AudioBufferQueue.ts:495
Registers a callback that is invoked whenever the queue drops chunks due to overflow.
Parameters
| Parameter | Type | Description |
|---|---|---|
callback | OverflowCallback | null | The function to call on overflow, or null to clear. |
Returns
void
Remarks
The callback receives the number of chunks dropped in this instance and the current buffer size after the drop. Only one overflow callback is supported; calling this method replaces any previously registered callback.
The orchestrator uses this to bridge overflow events to the typed EventEmitter system.
Example
queue.onOverflow((droppedChunks, currentSize) => {
console.warn(`Dropped ${droppedChunks} chunks, buffer at ${currentSize}`);
});
See
peek()
peek(): AudioChunk | undefined;
Defined in: src/core/pipeline/AudioBufferQueue.ts:399
Returns the first chunk in the buffer without removing it.
Returns
AudioChunk | undefined
The oldest chunk in the buffer, or undefined.
Remarks
Returns undefined if the buffer is empty or the queue is in draining mode (since the buffer is empty in draining mode).
startDraining()
startDraining(callback): void;
Defined in: src/core/pipeline/AudioBufferQueue.ts:353
Starts draining the queue: flushes all buffered chunks, then switches to pass-through mode.
Parameters
| Parameter | Type | Description |
|---|---|---|
callback | DrainCallback | Function to call for each chunk (buffered and future). |
Returns
void
Remarks
This is the key method for the race condition fix. Call it after the STT WebSocket handshake completes to receive all chunks that were buffered during the connection attempt, followed by real-time pass-through of subsequent chunks.
The flush is synchronous — all buffered chunks are delivered to the callback in FIFO order before this method returns. After flushing, the queue enters draining mode where enqueue passes chunks directly to the callback.
If the queue is already draining, calling this method replaces the existing callback.
Example
// Buffer 5 chunks while STT connects
for (const chunk of chunks) queue.enqueue(chunk);
// STT ready — flush all 5 and switch to pass-through
await stt.connect();
queue.startDraining((chunk) => stt.sendAudio(chunk.data));
See
stopDraining to return to buffering mode
stopDraining()
stopDraining(): void;
Defined in: src/core/pipeline/AudioBufferQueue.ts:385
Stops draining and returns the queue to buffering mode.
Returns
void
Remarks
After calling this method, subsequent enqueue calls will buffer chunks internally instead of passing them to the drain callback. The drain callback is cleared.
This is used during turn-taking: when the agent starts speaking, the orchestrator stops draining the input queue (pauses STT) and resumes draining when the agent finishes.
See
startDraining to resume draining