diff --git a/src/app/components/chat/chat.component.spec.ts b/src/app/components/chat/chat.component.spec.ts index e346bf89..97ed3138 100644 --- a/src/app/components/chat/chat.component.spec.ts +++ b/src/app/components/chat/chat.component.spec.ts @@ -17,7 +17,7 @@ import {Location} from '@angular/common'; import {HttpErrorResponse} from '@angular/common/http'; -import {ChangeDetectionStrategy, Component, ErrorHandler} from '@angular/core'; +import {ChangeDetectionStrategy, Component, ElementRef, ErrorHandler} from '@angular/core'; import {ComponentFixture, TestBed} from '@angular/core/testing'; import {MatDialog, MatDialogModule} from '@angular/material/dialog'; import { SnackbarService } from '../../core/services/snackbar.service'; @@ -924,6 +924,130 @@ describe('ChatComponent', () => { expect(component.toggleAudioRecording).toHaveBeenCalled(); }); }); + + describe('Bidi (live) streaming restart', () => { + const SESSION_ID = 'test-session-id'; + + beforeEach(() => { + component.sessionId = SESSION_ID; + component.appName = TEST_APP_1_NAME; + component.userId = USER_ID; + // Session already exists, so no lazy creation is required. + spyOn(component, 'ensureSessionActive').and.resolveTo(true); + mockSnackBar.open.calls.reset(); + }); + + describe('audio', () => { + it('allows restarting after stopping without warning', async () => { + await component.startAudioRecording(); + expect(component.activeBidiSessions.has(SESSION_ID)).toBe(true); + + component.stopAudioRecording(); + expect(component.activeBidiSessions.has(SESSION_ID)).toBe(false); + expect(component.isAudioRecording).toBe(false); + + await component.startAudioRecording(); + expect(component.isAudioRecording).toBe(true); + expect(mockStreamChatService.startAudioChat) + .toHaveBeenCalledTimes(2); + expect(mockSnackBar.open).not.toHaveBeenCalled(); + }); + + it('prevents starting a second concurrent audio stream', async () => { + await component.startAudioRecording(); + await component.startAudioRecording(); + + expect(mockStreamChatService.startAudioChat) + .toHaveBeenCalledTimes(1); + expect(mockSnackBar.open).toHaveBeenCalledWith( + 'Another streaming request is already in progress. Please stop it before starting a new one.', + 'OK'); + }); + + it('stops video when the call ends', async () => { + spyOn(component, 'chatPanel').and.returnValue({ + videoContainer: {nativeElement: document.createElement('div')}, + } as any); + await component.startAudioRecording(); + component.startVideoRecording(); + expect(component.isVideoRecording).toBe(true); + + component.stopAudioRecording(); + + expect(component.isVideoRecording).toBe(false); + expect(mockStreamChatService.stopVideoStreaming).toHaveBeenCalled(); + }); + }); + + describe('video', () => { + let videoContainer: ElementRef; + let chatPanelSpy: jasmine.Spy; + + beforeEach(() => { + videoContainer = {nativeElement: document.createElement('div')} as + ElementRef; + chatPanelSpy = spyOn(component, 'chatPanel').and.returnValue({ + videoContainer, + } as any); + }); + + it('streams frames over the existing connection without opening a new one', + () => { + component.startVideoRecording(); + expect(mockStreamChatService.startVideoStreaming) + .toHaveBeenCalledWith(videoContainer); + expect(mockStreamChatService.startVideoChat).not.toHaveBeenCalled(); + + component.stopVideoRecording(); + expect(mockStreamChatService.stopVideoStreaming) + .toHaveBeenCalledWith(videoContainer); + }); + + it('starts during an active call without tripping the in-progress warning', + () => { + // Audio call already active for this session. + component.activeBidiSessions.add(SESSION_ID); + + component.startVideoRecording(); + + expect(component.isVideoRecording).toBe(true); + expect(mockStreamChatService.startVideoStreaming) + .toHaveBeenCalledWith(videoContainer); + expect(mockSnackBar.open).not.toHaveBeenCalled(); + }); + + it('does not touch the session lock', () => { + component.activeBidiSessions.add(SESSION_ID); + + component.startVideoRecording(); + component.stopVideoRecording(); + + // The call (audio) owns the lock; video must leave it intact. + expect(component.activeBidiSessions.has(SESSION_ID)).toBe(true); + }); + + it('can be toggled off and on within the same call', () => { + component.startVideoRecording(); + component.stopVideoRecording(); + component.startVideoRecording(); + + expect(component.isVideoRecording).toBe(true); + expect(mockStreamChatService.startVideoStreaming) + .toHaveBeenCalledTimes(2); + expect(mockSnackBar.open).not.toHaveBeenCalled(); + }); + + it('is a no-op when videoContainer is missing', () => { + chatPanelSpy.and.returnValue({videoContainer: undefined} as any); + + component.startVideoRecording(); + + expect(component.isVideoRecording).toBe(false); + expect(mockStreamChatService.startVideoStreaming) + .not.toHaveBeenCalled(); + }); + }); + }); }); describe('File Handling', () => { diff --git a/src/app/components/chat/chat.component.ts b/src/app/components/chat/chat.component.ts index ee77aca6..3a1eb0b3 100644 --- a/src/app/components/chat/chat.component.ts +++ b/src/app/components/chat/chat.component.ts @@ -134,8 +134,8 @@ class CustomPaginatorIntl extends MatPaginatorIntl { }; } -const BIDI_STREAMING_RESTART_WARNING = - 'Restarting bidirectional streaming is not currently supported. Please refresh the page or start a new session.'; +const BIDI_STREAMING_IN_PROGRESS_WARNING = + 'Another streaming request is already in progress. Please stop it before starting a new one.'; @Component({ changeDetection: ChangeDetectionStrategy.Default, @@ -531,8 +531,8 @@ export class ChatComponent implements OnInit, AfterViewInit, OnDestroy { window.localStorage.setItem('adk-hide-intermediate-events', String(newVal)); } - // TODO: Remove this once backend supports restarting bidi streaming. - sessionHasUsedBidi = new Set(); + // Sessions with an in-progress bidi stream, used to block concurrent starts. + activeBidiSessions = new Set(); eventData = new Map(); traceData: Span[] = []; @@ -1937,11 +1937,9 @@ export class ChatComponent implements OnInit, AfterViewInit, OnDestroy { onAppSelection(event: any) { if (this.isAudioRecording) { this.stopAudioRecording(); - this.isAudioRecording = false; } if (this.isVideoRecording) { this.stopVideoRecording(); - this.isVideoRecording = false; } this.evalTab()?.resetEvalResults(); this.traceData = []; @@ -1953,8 +1951,8 @@ export class ChatComponent implements OnInit, AfterViewInit, OnDestroy { } async startAudioRecording(flags?: LiveFlags) { - if (this.sessionId && this.sessionHasUsedBidi.has(this.sessionId)) { - this.openSnackBar(BIDI_STREAMING_RESTART_WARNING, 'OK'); + if (this.sessionId && this.activeBidiSessions.has(this.sessionId)) { + this.openSnackBar(BIDI_STREAMING_IN_PROGRESS_WARNING, 'OK'); return; } @@ -1965,22 +1963,25 @@ export class ChatComponent implements OnInit, AfterViewInit, OnDestroy { } this.isAudioRecording = true; + this.activeBidiSessions.add(this.sessionId); void this.streamChatService.startAudioChat({ appName: this.appName, userId: this.userId, sessionId: this.sessionId, flags: flags, }); - this.sessionHasUsedBidi.add(this.sessionId); + this.changeDetectorRef.detectChanges(); } stopAudioRecording() { this.audioPlayingService.stopAudio(); this.streamChatService.stopAudioChat(); this.isAudioRecording = false; + this.activeBidiSessions.delete(this.sessionId); if (this.isVideoRecording) { this.stopVideoRecording(); } + this.changeDetectorRef.detectChanges(); } toggleVideoRecording() { @@ -1988,22 +1989,26 @@ export class ChatComponent implements OnInit, AfterViewInit, OnDestroy { this.startVideoRecording(); } + // Video is an add-on to an active call: it streams frames over the existing + // connection rather than opening its own, so it doesn't touch the session + // lock or the websocket. startVideoRecording() { const videoContainer = this.chatPanel()?.videoContainer; if (!videoContainer) { return; } this.isVideoRecording = true; - this.streamChatService.startVideoStreaming(videoContainer); + void this.streamChatService.startVideoStreaming(videoContainer); + this.changeDetectorRef.detectChanges(); } stopVideoRecording() { const videoContainer = this.chatPanel()?.videoContainer; - if (!videoContainer) { - return; + if (videoContainer) { + this.streamChatService.stopVideoStreaming(videoContainer); } - this.streamChatService.stopVideoStreaming(videoContainer); this.isVideoRecording = false; + this.changeDetectorRef.detectChanges(); } private getAsyncFunctionsFromParts( diff --git a/src/app/core/services/testing/mock-audio-playing.service.ts b/src/app/core/services/testing/mock-audio-playing.service.ts index 8fc7b550..abb97c63 100644 --- a/src/app/core/services/testing/mock-audio-playing.service.ts +++ b/src/app/core/services/testing/mock-audio-playing.service.ts @@ -22,4 +22,5 @@ import {AudioPlayingService} from '../audio-playing.service'; @Injectable() export class MockAudioPlayingService implements Partial { playAudio = jasmine.createSpy('playAudio'); + stopAudio = jasmine.createSpy('stopAudio'); } diff --git a/src/app/core/services/websocket.service.spec.ts b/src/app/core/services/websocket.service.spec.ts index c3f3125d..012d7489 100644 --- a/src/app/core/services/websocket.service.spec.ts +++ b/src/app/core/services/websocket.service.spec.ts @@ -55,4 +55,34 @@ describe('WebSocketService', () => { expect(service.urlSafeBase64ToBase64('abcd')).toEqual('abcd'); }); }); + + describe('connection restart', () => { + it('should reset the audio buffer when reconnecting', () => { + service.connect('ws://test1'); + (service as any).audioBuffer = [new Uint8Array([1, 2, 3])]; + + service.connect('ws://test2'); + + expect((service as any).audioBuffer).toEqual([]); + }); + + it('should close the previous connection when reconnecting', () => { + service.connect('ws://test1'); + const firstSocket = (service as any).socket$; + spyOn(firstSocket, 'complete').and.callThrough(); + + service.connect('ws://test2'); + + expect(firstSocket.complete).toHaveBeenCalled(); + }); + + it('should clear the audio interval when closing the connection', () => { + service.connect('ws://test'); + expect((service as any).audioIntervalId).not.toBeNull(); + + service.closeConnection(); + + expect((service as any).audioIntervalId).toBeNull(); + }); + }); }); diff --git a/src/app/core/services/websocket.service.ts b/src/app/core/services/websocket.service.ts index 4aa984e3..507339b4 100644 --- a/src/app/core/services/websocket.service.ts +++ b/src/app/core/services/websocket.service.ts @@ -38,6 +38,10 @@ export class WebSocketService implements WebSocketServiceInterface { private closeReasonSubject = new Subject(); connect(serverUrl: string) { + // Reset any previous connection/buffer so restarts start clean. + this.closeConnection(); + this.audioBuffer = []; + this.socket$ = new WebSocketSubject({ url: serverUrl, serializer: (msg) => JSON.stringify(msg), @@ -75,8 +79,10 @@ export class WebSocketService implements WebSocketServiceInterface { } closeConnection() { - clearInterval(this.audioIntervalId); - this.audioIntervalId = null; + if (this.audioIntervalId !== null) { + clearInterval(this.audioIntervalId); + this.audioIntervalId = null; + } if (this.socket$) { this.socket$.complete(); }