Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 125 additions & 1 deletion src/app/components/chat/chat.component.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import {Location} from '@angular/common';
import {HttpErrorResponse} from '@angular/common/http';
import {ChangeDetectionStrategy, Component, ErrorHandler} from '@angular/core';
import {ChangeDetectionStrategy, Component, ElementRef, ErrorHandler} from '@angular/core';
import {ComponentFixture, TestBed} from '@angular/core/testing';
import {MatDialog, MatDialogModule} from '@angular/material/dialog';
import { SnackbarService } from '../../core/services/snackbar.service';
Expand Down Expand Up @@ -924,6 +924,130 @@ describe('ChatComponent', () => {
expect(component.toggleAudioRecording).toHaveBeenCalled();
});
});

describe('Bidi (live) streaming restart', () => {
const SESSION_ID = 'test-session-id';

beforeEach(() => {
component.sessionId = SESSION_ID;
component.appName = TEST_APP_1_NAME;
component.userId = USER_ID;
// Session already exists, so no lazy creation is required.
spyOn(component, 'ensureSessionActive').and.resolveTo(true);
mockSnackBar.open.calls.reset();
});

describe('audio', () => {
it('allows restarting after stopping without warning', async () => {
await component.startAudioRecording();
expect(component.activeBidiSessions.has(SESSION_ID)).toBe(true);

component.stopAudioRecording();
expect(component.activeBidiSessions.has(SESSION_ID)).toBe(false);
expect(component.isAudioRecording).toBe(false);

await component.startAudioRecording();
expect(component.isAudioRecording).toBe(true);
expect(mockStreamChatService.startAudioChat)
.toHaveBeenCalledTimes(2);
expect(mockSnackBar.open).not.toHaveBeenCalled();
});

it('prevents starting a second concurrent audio stream', async () => {
await component.startAudioRecording();
await component.startAudioRecording();

expect(mockStreamChatService.startAudioChat)
.toHaveBeenCalledTimes(1);
expect(mockSnackBar.open).toHaveBeenCalledWith(
'Another streaming request is already in progress. Please stop it before starting a new one.',
'OK');
});

it('stops video when the call ends', async () => {
spyOn(component, 'chatPanel').and.returnValue({
videoContainer: {nativeElement: document.createElement('div')},
} as any);
await component.startAudioRecording();
component.startVideoRecording();
expect(component.isVideoRecording).toBe(true);

component.stopAudioRecording();

expect(component.isVideoRecording).toBe(false);
expect(mockStreamChatService.stopVideoStreaming).toHaveBeenCalled();
});
});

describe('video', () => {
let videoContainer: ElementRef;
let chatPanelSpy: jasmine.Spy;

beforeEach(() => {
videoContainer = {nativeElement: document.createElement('div')} as
ElementRef;
chatPanelSpy = spyOn(component, 'chatPanel').and.returnValue({
videoContainer,
} as any);
});

it('streams frames over the existing connection without opening a new one',
() => {
component.startVideoRecording();
expect(mockStreamChatService.startVideoStreaming)
.toHaveBeenCalledWith(videoContainer);
expect(mockStreamChatService.startVideoChat).not.toHaveBeenCalled();

component.stopVideoRecording();
expect(mockStreamChatService.stopVideoStreaming)
.toHaveBeenCalledWith(videoContainer);
});

it('starts during an active call without tripping the in-progress warning',
() => {
// Audio call already active for this session.
component.activeBidiSessions.add(SESSION_ID);

component.startVideoRecording();

expect(component.isVideoRecording).toBe(true);
expect(mockStreamChatService.startVideoStreaming)
.toHaveBeenCalledWith(videoContainer);
expect(mockSnackBar.open).not.toHaveBeenCalled();
});

it('does not touch the session lock', () => {
component.activeBidiSessions.add(SESSION_ID);

component.startVideoRecording();
component.stopVideoRecording();

// The call (audio) owns the lock; video must leave it intact.
expect(component.activeBidiSessions.has(SESSION_ID)).toBe(true);
});

it('can be toggled off and on within the same call', () => {
component.startVideoRecording();
component.stopVideoRecording();
component.startVideoRecording();

expect(component.isVideoRecording).toBe(true);
expect(mockStreamChatService.startVideoStreaming)
.toHaveBeenCalledTimes(2);
expect(mockSnackBar.open).not.toHaveBeenCalled();
});

it('is a no-op when videoContainer is missing', () => {
chatPanelSpy.and.returnValue({videoContainer: undefined} as any);

component.startVideoRecording();

expect(component.isVideoRecording).toBe(false);
expect(mockStreamChatService.startVideoStreaming)
.not.toHaveBeenCalled();
});
});
});
});

describe('File Handling', () => {
Expand Down
31 changes: 18 additions & 13 deletions src/app/components/chat/chat.component.ts
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,8 @@ class CustomPaginatorIntl extends MatPaginatorIntl {
};
}

const BIDI_STREAMING_RESTART_WARNING =
'Restarting bidirectional streaming is not currently supported. Please refresh the page or start a new session.';
const BIDI_STREAMING_IN_PROGRESS_WARNING =
'Another streaming request is already in progress. Please stop it before starting a new one.';

@Component({
changeDetection: ChangeDetectionStrategy.Default,
Expand Down Expand Up @@ -531,8 +531,8 @@ export class ChatComponent implements OnInit, AfterViewInit, OnDestroy {
window.localStorage.setItem('adk-hide-intermediate-events', String(newVal));
}

// TODO: Remove this once backend supports restarting bidi streaming.
sessionHasUsedBidi = new Set<string>();
// Sessions with an in-progress bidi stream, used to block concurrent starts.
activeBidiSessions = new Set<string>();

eventData = new Map<string, any>();
traceData: Span[] = [];
Expand Down Expand Up @@ -1937,11 +1937,9 @@ export class ChatComponent implements OnInit, AfterViewInit, OnDestroy {
onAppSelection(event: any) {
if (this.isAudioRecording) {
this.stopAudioRecording();
this.isAudioRecording = false;
}
if (this.isVideoRecording) {
this.stopVideoRecording();
this.isVideoRecording = false;
}
this.evalTab()?.resetEvalResults();
this.traceData = [];
Expand All @@ -1953,8 +1951,8 @@ export class ChatComponent implements OnInit, AfterViewInit, OnDestroy {
}

async startAudioRecording(flags?: LiveFlags) {
if (this.sessionId && this.sessionHasUsedBidi.has(this.sessionId)) {
this.openSnackBar(BIDI_STREAMING_RESTART_WARNING, 'OK');
if (this.sessionId && this.activeBidiSessions.has(this.sessionId)) {
this.openSnackBar(BIDI_STREAMING_IN_PROGRESS_WARNING, 'OK');
return;
}

Expand All @@ -1965,45 +1963,52 @@ export class ChatComponent implements OnInit, AfterViewInit, OnDestroy {
}

this.isAudioRecording = true;
this.activeBidiSessions.add(this.sessionId);
void this.streamChatService.startAudioChat({
appName: this.appName,
userId: this.userId,
sessionId: this.sessionId,
flags: flags,
});
this.sessionHasUsedBidi.add(this.sessionId);
this.changeDetectorRef.detectChanges();
}

stopAudioRecording() {
this.audioPlayingService.stopAudio();
this.streamChatService.stopAudioChat();
this.isAudioRecording = false;
this.activeBidiSessions.delete(this.sessionId);
if (this.isVideoRecording) {
this.stopVideoRecording();
}
this.changeDetectorRef.detectChanges();
}

toggleVideoRecording() {
this.isVideoRecording ? this.stopVideoRecording() :
this.startVideoRecording();
}

// Video is an add-on to an active call: it streams frames over the existing
// connection rather than opening its own, so it doesn't touch the session
// lock or the websocket.
startVideoRecording() {
const videoContainer = this.chatPanel()?.videoContainer;
if (!videoContainer) {
return;
}
this.isVideoRecording = true;
this.streamChatService.startVideoStreaming(videoContainer);
void this.streamChatService.startVideoStreaming(videoContainer);
this.changeDetectorRef.detectChanges();
}

stopVideoRecording() {
const videoContainer = this.chatPanel()?.videoContainer;
if (!videoContainer) {
return;
if (videoContainer) {
this.streamChatService.stopVideoStreaming(videoContainer);
}
this.streamChatService.stopVideoStreaming(videoContainer);
this.isVideoRecording = false;
this.changeDetectorRef.detectChanges();
}

private getAsyncFunctionsFromParts(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,5 @@ import {AudioPlayingService} from '../audio-playing.service';
@Injectable()
export class MockAudioPlayingService implements Partial<AudioPlayingService> {
playAudio = jasmine.createSpy('playAudio');
stopAudio = jasmine.createSpy('stopAudio');
}
30 changes: 30 additions & 0 deletions src/app/core/services/websocket.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,34 @@ describe('WebSocketService', () => {
expect(service.urlSafeBase64ToBase64('abcd')).toEqual('abcd');
});
});

describe('connection restart', () => {
it('should reset the audio buffer when reconnecting', () => {
service.connect('ws://test1');
(service as any).audioBuffer = [new Uint8Array([1, 2, 3])];

service.connect('ws://test2');

expect((service as any).audioBuffer).toEqual([]);
});

it('should close the previous connection when reconnecting', () => {
service.connect('ws://test1');
const firstSocket = (service as any).socket$;
spyOn(firstSocket, 'complete').and.callThrough();

service.connect('ws://test2');

expect(firstSocket.complete).toHaveBeenCalled();
});

it('should clear the audio interval when closing the connection', () => {
service.connect('ws://test');
expect((service as any).audioIntervalId).not.toBeNull();

service.closeConnection();

expect((service as any).audioIntervalId).toBeNull();
});
});
});
10 changes: 8 additions & 2 deletions src/app/core/services/websocket.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ export class WebSocketService implements WebSocketServiceInterface {
private closeReasonSubject = new Subject<string>();

connect(serverUrl: string) {
// Reset any previous connection/buffer so restarts start clean.
this.closeConnection();
this.audioBuffer = [];

this.socket$ = new WebSocketSubject({
url: serverUrl,
serializer: (msg) => JSON.stringify(msg),
Expand Down Expand Up @@ -75,8 +79,10 @@ export class WebSocketService implements WebSocketServiceInterface {
}

closeConnection() {
clearInterval(this.audioIntervalId);
this.audioIntervalId = null;
if (this.audioIntervalId !== null) {
clearInterval(this.audioIntervalId);
this.audioIntervalId = null;
}
if (this.socket$) {
this.socket$.complete();
}
Expand Down
Loading