diff --git a/dotnet/src/Client.cs b/dotnet/src/Client.cs index 6f7acc747..7fe339f24 100644 --- a/dotnet/src/Client.cs +++ b/dotnet/src/Client.cs @@ -61,6 +61,7 @@ public sealed partial class CopilotClient : IDisposable, IAsyncDisposable /// Minimum protocol version this SDK can communicate with. /// private const int MinProtocolVersion = 2; + private static readonly TimeSpan StderrPumpShutdownTimeout = TimeSpan.FromSeconds(5); private readonly ConcurrentDictionary _sessions = new(); private readonly CopilotClientOptions _options; @@ -229,6 +230,7 @@ async Task StartCoreAsync(CancellationToken ct) var startTimestamp = Stopwatch.GetTimestamp(); Connection? connection = null; Process? cliProcess = null; + ProcessStderrPump? stderrPump = null; try { @@ -241,10 +243,11 @@ async Task StartCoreAsync(CancellationToken ct) else { // Child process (stdio or TCP) - var (startedProcess, portOrNull, stderrBuffer) = await StartCliServerAsync(_options, _effectiveConnectionToken, _logger, ct); + var (startedProcess, portOrNull, startedStderrPump) = await StartCliServerAsync(_options, _effectiveConnectionToken, _logger, ct); cliProcess = startedProcess; + stderrPump = startedStderrPump; _actualPort = portOrNull; - connection = await ConnectToServerAsync(cliProcess, portOrNull is null ? null : "localhost", portOrNull, stderrBuffer, ct); + connection = await ConnectToServerAsync(cliProcess, portOrNull is null ? null : "localhost", portOrNull, stderrPump, ct); } LogTiming(_logger, LogLevel.Debug, null, @@ -273,20 +276,33 @@ async Task StartCoreAsync(CancellationToken ct) } catch (Exception ex) { - if (ex is not OperationCanceledException) + var cleanupErrors = new List(); + try { - LogTiming(_logger, LogLevel.Warning, ex, - "CopilotClient.StartAsync failed. Elapsed={Elapsed}", - startTimestamp); - } + if (ex is not OperationCanceledException) + { + LogTiming(_logger, LogLevel.Warning, ex, + "CopilotClient.StartAsync failed. Elapsed={Elapsed}", + startTimestamp); + } - if (connection is not null) - { - await CleanupConnectionAsync(connection, errors: null); + if (connection is not null) + { + await CleanupConnectionAsync(connection, cleanupErrors); + } + else if (cliProcess is not null) + { + await CleanupCliProcessAsync(cliProcess, stderrPump, cleanupErrors, _logger); + } + + foreach (var cleanupError in cleanupErrors) + { + _logger.LogDebug(cleanupError, "Failed to clean up Copilot client connection after startup failure"); + } } - else if (cliProcess is not null) + finally { - await CleanupCliProcessAsync(cliProcess, errors: null, _logger); + _connectionTask = null; } throw; @@ -395,7 +411,6 @@ private async Task CleanupConnectionAsync(List? errors) } var connectionTask = _connectionTask; - _connectionTask = null; Connection ctx; try @@ -407,6 +422,13 @@ private async Task CleanupConnectionAsync(List? errors) _logger.LogDebug(ex, "Ignoring failed Copilot client startup during cleanup"); return; } + finally + { + if (ReferenceEquals(_connectionTask, connectionTask)) + { + _connectionTask = null; + } + } await CleanupConnectionAsync(ctx, errors); } @@ -428,31 +450,60 @@ private async Task CleanupConnectionAsync(Connection ctx, List? error if (ctx.CliProcess is { } childProcess) { - await CleanupCliProcessAsync(childProcess, errors, _logger); + await CleanupCliProcessAsync(childProcess, ctx.StderrPump, errors, _logger); } } - private static async Task CleanupCliProcessAsync(Process childProcess, List? errors, ILogger? logger) + private static async Task CleanupCliProcessAsync(Process childProcess, ProcessStderrPump? stderrPump, List? errors, ILogger? logger) { + stderrPump?.Cancel(); + try { + if (!childProcess.HasExited) + { + childProcess.Kill(entireProcessTree: true); + // Kill is asynchronous; wait for the root CLI process to exit so cleanup callers + // do not observe StopAsync/DisposeAsync completion while it is still tearing down. + await childProcess.WaitForExitAsync(); + } + } + catch (Exception ex) + { + AddCleanupError(errors, ex, logger); + } + + if (stderrPump is not null) + { + var stderrPumpWaitTimestamp = Stopwatch.GetTimestamp(); try { - if (!childProcess.HasExited) + await stderrPump.WaitForCompletionAsync(StderrPumpShutdownTimeout); + } + catch (TimeoutException ex) + { + if (logger is not null) { - childProcess.Kill(entireProcessTree: true); - await childProcess.WaitForExitAsync(); + LogTiming(logger, LogLevel.Debug, ex, + "Timed out waiting for CLI stderr pump to stop. Elapsed={Elapsed}, Timeout={Timeout}", + stderrPumpWaitTimestamp, + StderrPumpShutdownTimeout); } + + AddCleanupError(errors, ex, logger); + } + catch (Exception ex) + { + AddCleanupError(errors, ex, logger); } finally { - childProcess.Dispose(); + stderrPump.Dispose(); } } - catch (Exception ex) - { - AddCleanupError(errors, ex, logger); - } + + try { childProcess.Dispose(); } + catch (Exception ex) { AddCleanupError(errors, ex, logger); } } private static void AddCleanupError(List? errors, Exception ex, ILogger? logger) @@ -1369,7 +1420,7 @@ private static bool IsUnsupportedConnectMethod(RemoteRpcException ex) || string.Equals(ex.Message, "Unhandled method connect", StringComparison.Ordinal); } - private static async Task<(Process Process, int? DetectedLocalhostTcpPort, StringBuilder StderrBuffer)> StartCliServerAsync(CopilotClientOptions options, string? connectionToken, ILogger logger, CancellationToken cancellationToken) + private static async Task<(Process Process, int? DetectedLocalhostTcpPort, ProcessStderrPump StderrPump)> StartCliServerAsync(CopilotClientOptions options, string? connectionToken, ILogger logger, CancellationToken cancellationToken) { // Use explicit path, COPILOT_CLI_PATH env var (from options.Environment or process env), or bundled CLI - no PATH fallback var envCliPath = options.Environment is not null && options.Environment.TryGetValue("COPILOT_CLI_PATH", out var envValue) ? envValue @@ -1474,38 +1525,29 @@ private static bool IsUnsupportedConnectMethod(RemoteRpcException ex) if (telemetry.CaptureContent is { } capture) startInfo.Environment["OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT"] = capture ? "true" : "false"; } - Process? cliProcess = null; + var cliProcess = new Process { StartInfo = startInfo }; try { - cliProcess = new Process { StartInfo = startInfo }; var spawnTimestamp = Stopwatch.GetTimestamp(); cliProcess.Start(); LogTiming(logger, LogLevel.Debug, null, "CopilotClient.StartCliServerAsync subprocess spawned. Elapsed={Elapsed}", spawnTimestamp); + } + catch + { + cliProcess.Dispose(); + throw; + } - // Capture stderr for error messages and forward to logger - var stderrBuffer = new StringBuilder(); - var stderrReader = Task.Run(async () => - { - while (true) - { - var line = await cliProcess.StandardError.ReadLineAsync(cancellationToken); - if (line is null) - { - break; - } - - lock (stderrBuffer) - { - stderrBuffer.AppendLine(line); - } - - logger.LogWarning("[CLI] {Line}", line); - } - }, cancellationToken); + // Capture stderr for error messages and forward to logger. + // The pump has its own lifetime token and is later cancelled/observed + // by the owning Connection before the process is disposed. + var stderrPump = ProcessStderrPump.Start(cliProcess, logger); - var detectedLocalhostTcpPort = (int?)null; + var detectedLocalhostTcpPort = (int?)null; + try + { if (options.UseStdio != true) { // Wait for port announcement @@ -1513,39 +1555,47 @@ private static bool IsUnsupportedConnectMethod(RemoteRpcException ex) using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); cts.CancelAfter(TimeSpan.FromSeconds(30)); - while (!cts.Token.IsCancellationRequested) + try { - var line = await cliProcess.StandardOutput.ReadLineAsync(cts.Token); - if (line is null) - { - await stderrReader; - throw CreateCliExitedException("CLI process exited unexpectedly", stderrBuffer); - } - - if (logger.IsEnabled(LogLevel.Debug)) - { - logger.LogDebug("[CLI] {Line}", line); - } - - if (ListeningOnPortRegex().Match(line) is { Success: true } match) + while (true) { - detectedLocalhostTcpPort = int.Parse(match.Groups[1].Value, CultureInfo.InvariantCulture); - LogTiming(logger, LogLevel.Debug, null, - "CopilotClient.StartCliServerAsync TCP port wait complete. Elapsed={Elapsed}, Port={Port}", - portWaitTimestamp, - detectedLocalhostTcpPort.Value); - break; + var line = await cliProcess.StandardOutput.ReadLineAsync(cts.Token); + if (line is null) + { + throw CreateCliExitedException("CLI process exited unexpectedly", stderrPump.Buffer); + } + + if (logger.IsEnabled(LogLevel.Debug)) + { + logger.LogDebug("[CLI] {Line}", line); + } + + if (ListeningOnPortRegex().Match(line) is { Success: true } match) + { + detectedLocalhostTcpPort = int.Parse(match.Groups[1].Value, CultureInfo.InvariantCulture); + LogTiming(logger, LogLevel.Debug, null, + "CopilotClient.StartCliServerAsync TCP port wait complete. Elapsed={Elapsed}, Port={Port}", + portWaitTimestamp, + detectedLocalhostTcpPort.Value); + break; + } } } + catch (OperationCanceledException) when (!cancellationToken.IsCancellationRequested && cts.IsCancellationRequested) + { + throw CreateCliExitedException("Timed out waiting for Copilot CLI to report its TCP listening port.", stderrPump.Buffer); + } } - return (cliProcess, detectedLocalhostTcpPort, stderrBuffer); + return (cliProcess, detectedLocalhostTcpPort, stderrPump); } catch { - if (cliProcess is not null) + var cleanupErrors = new List(); + await CleanupCliProcessAsync(cliProcess, stderrPump, cleanupErrors, logger); + foreach (var cleanupError in cleanupErrors) { - await CleanupCliProcessAsync(cliProcess, errors: null, logger); + logger.LogDebug(cleanupError, "Failed to clean up Copilot CLI process after startup failure"); } throw; @@ -1593,86 +1643,104 @@ private static (string FileName, IEnumerable Args) ResolveCliCommand(str return (cliPath, args); } - private async Task ConnectToServerAsync(Process? cliProcess, string? tcpHost, int? tcpPort, StringBuilder? stderrBuffer, CancellationToken cancellationToken) + private async Task ConnectToServerAsync(Process? cliProcess, string? tcpHost, int? tcpPort, ProcessStderrPump? stderrPump, CancellationToken cancellationToken) { var setupTimestamp = Stopwatch.GetTimestamp(); - Stream inputStream, outputStream; NetworkStream? networkStream = null; + JsonRpc? rpc = null; - if (_options.UseStdio == true) + try { - if (cliProcess == null) - { - throw new InvalidOperationException("CLI process not started"); - } + Stream inputStream, outputStream; - inputStream = cliProcess.StandardOutput.BaseStream; - outputStream = cliProcess.StandardInput.BaseStream; - } - else - { - if (tcpHost is null || tcpPort is null) + if (_options.UseStdio == true) { - throw new InvalidOperationException("Cannot connect because TCP host or port are not available"); - } + if (cliProcess == null) + { + throw new InvalidOperationException("CLI process not started"); + } - var socket = new Socket(SocketType.Stream, ProtocolType.Tcp); - try - { - var tcpConnectTimestamp = Stopwatch.GetTimestamp(); - LogConnectingToCliServer(_logger, tcpHost, tcpPort.Value); - await socket.ConnectAsync(tcpHost, tcpPort.Value, cancellationToken); - LogTiming(_logger, LogLevel.Debug, null, - "CopilotClient.ConnectToServerAsync TCP connect complete. Elapsed={Elapsed}, Host={Host}, Port={Port}", - tcpConnectTimestamp, - tcpHost, - tcpPort.Value); + inputStream = cliProcess.StandardOutput.BaseStream; + outputStream = cliProcess.StandardInput.BaseStream; } - catch + else { - socket.Dispose(); - throw; + if (tcpHost is null || tcpPort is null) + { + throw new InvalidOperationException("Cannot connect because TCP host or port are not available"); + } + + var socket = new Socket(SocketType.Stream, ProtocolType.Tcp); + try + { + var tcpConnectTimestamp = Stopwatch.GetTimestamp(); + LogConnectingToCliServer(_logger, tcpHost, tcpPort.Value); + await socket.ConnectAsync(tcpHost, tcpPort.Value, cancellationToken); + LogTiming(_logger, LogLevel.Debug, null, + "CopilotClient.ConnectToServerAsync TCP connect complete. Elapsed={Elapsed}, Host={Host}, Port={Port}", + tcpConnectTimestamp, + tcpHost, + tcpPort.Value); + } + catch + { + socket.Dispose(); + throw; + } + + inputStream = outputStream = networkStream = new NetworkStream(socket, ownsSocket: true); } - inputStream = outputStream = networkStream = new NetworkStream(socket, ownsSocket: true); - } - - var rpc = new JsonRpc( - outputStream, - inputStream, - SerializerOptionsForMessageFormatter, - _logger); - - var handler = new RpcHandler(this); - rpc.SetLocalRpcMethod("session.event", handler.OnSessionEvent); - rpc.SetLocalRpcMethod("session.lifecycle", handler.OnSessionLifecycle); - // Protocol v3 servers send tool calls / permission requests as broadcast events. - // Protocol v2 servers use the older tool.call / permission.request RPC model. - // We always register v2 adapters because handlers are set up before version - // negotiation; a v3 server will simply never send these requests. - rpc.SetLocalRpcMethod("tool.call", handler.OnToolCallV2); - rpc.SetLocalRpcMethod("permission.request", handler.OnPermissionRequestV2); - rpc.SetLocalRpcMethod("userInput.request", handler.OnUserInputRequest); - rpc.SetLocalRpcMethod("exitPlanMode.request", handler.OnExitPlanModeRequest); - rpc.SetLocalRpcMethod("autoModeSwitch.request", handler.OnAutoModeSwitchRequest); - rpc.SetLocalRpcMethod("hooks.invoke", handler.OnHooksInvoke); - rpc.SetLocalRpcMethod("systemMessage.transform", handler.OnSystemMessageTransform); - ClientSessionApiRegistration.RegisterClientSessionApiHandlers(rpc, sessionId => - { - var session = GetSession(sessionId) ?? throw new ArgumentException($"Unknown session {sessionId}"); - return session.ClientSessionApis; - }); - rpc.StartListening(); - LogTiming(_logger, LogLevel.Debug, null, - "CopilotClient.ConnectToServerAsync transport setup complete. Elapsed={Elapsed}", - setupTimestamp); + rpc = new JsonRpc( + outputStream, + inputStream, + SerializerOptionsForMessageFormatter, + _logger); + + var handler = new RpcHandler(this); + rpc.SetLocalRpcMethod("session.event", handler.OnSessionEvent); + rpc.SetLocalRpcMethod("session.lifecycle", handler.OnSessionLifecycle); + // Protocol v3 servers send tool calls / permission requests as broadcast events. + // Protocol v2 servers use the older tool.call / permission.request RPC model. + // We always register v2 adapters because handlers are set up before version + // negotiation; a v3 server will simply never send these requests. + rpc.SetLocalRpcMethod("tool.call", handler.OnToolCallV2); + rpc.SetLocalRpcMethod("permission.request", handler.OnPermissionRequestV2); + rpc.SetLocalRpcMethod("userInput.request", handler.OnUserInputRequest); + rpc.SetLocalRpcMethod("exitPlanMode.request", handler.OnExitPlanModeRequest); + rpc.SetLocalRpcMethod("autoModeSwitch.request", handler.OnAutoModeSwitchRequest); + rpc.SetLocalRpcMethod("hooks.invoke", handler.OnHooksInvoke); + rpc.SetLocalRpcMethod("systemMessage.transform", handler.OnSystemMessageTransform); + ClientSessionApiRegistration.RegisterClientSessionApiHandlers(rpc, sessionId => + { + var session = GetSession(sessionId) ?? throw new ArgumentException($"Unknown session {sessionId}"); + return session.ClientSessionApis; + }); + rpc.StartListening(); + LogTiming(_logger, LogLevel.Debug, null, + "CopilotClient.ConnectToServerAsync transport setup complete. Elapsed={Elapsed}", + setupTimestamp); + + // Transition state to Disconnected if the JSON-RPC connection drops + _ = rpc.Completion.ContinueWith(_ => _disconnected = true, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default); - // Transition state to Disconnected if the JSON-RPC connection drops - _ = rpc.Completion.ContinueWith(_ => _disconnected = true, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default); + _serverRpc = new ServerRpc(rpc); + + return new Connection(rpc, cliProcess, networkStream, stderrPump); + } + catch + { + try { rpc?.Dispose(); } + catch (Exception ex) { _logger.LogDebug(ex, "Failed to dispose JSON-RPC connection after startup failure"); } - _serverRpc = new ServerRpc(rpc); + if (networkStream is not null) + { + try { await networkStream.DisposeAsync(); } + catch (Exception ex) { _logger.LogDebug(ex, "Failed to dispose TCP stream after startup failure"); } + } - return new Connection(rpc, cliProcess, networkStream, stderrBuffer); + throw; + } } private static JsonSerializerOptions SerializerOptionsForMessageFormatter { get; } = CreateSerializerOptions(); @@ -1927,12 +1995,110 @@ private class Connection( JsonRpc rpc, Process? cliProcess, // Set if we created the child process NetworkStream? networkStream, // Set if using TCP - StringBuilder? stderrBuffer = null) // Captures stderr for error messages + ProcessStderrPump? stderrPump = null) // Captures stderr for error messages { public Process? CliProcess => cliProcess; public JsonRpc Rpc => rpc; public NetworkStream? NetworkStream => networkStream; - public StringBuilder? StderrBuffer => stderrBuffer; + public ProcessStderrPump? StderrPump => stderrPump; + public StringBuilder? StderrBuffer => stderrPump?.Buffer; + } + + private sealed class ProcessStderrPump : IDisposable + { + private readonly CancellationTokenSource _cancellationTokenSource = new(); + private readonly Task _completion; + private int _disposeRequested; + + private ProcessStderrPump(Process process, ILogger logger) + { + _completion = Task.Run(() => PumpAsync(process, logger, _cancellationTokenSource.Token)); + } + + public StringBuilder Buffer { get; } = new(); + + public static ProcessStderrPump Start(Process process, ILogger logger) + { + return new ProcessStderrPump(process, logger); + } + + public void Cancel() + { + try + { + _cancellationTokenSource.Cancel(); + } + catch (ObjectDisposedException) + { + } + } + + public async Task WaitForCompletionAsync(TimeSpan timeout) + { + await _completion.WaitAsync(timeout); + } + + public void Dispose() + { + if (Interlocked.Exchange(ref _disposeRequested, 1) != 0) + { + return; + } + + Cancel(); + + if (_completion.IsCompleted) + { + _cancellationTokenSource.Dispose(); + } + else + { + _ = _completion.ContinueWith( + static (_, state) => ((CancellationTokenSource)state!).Dispose(), + _cancellationTokenSource, + CancellationToken.None, + TaskContinuationOptions.ExecuteSynchronously, + TaskScheduler.Default); + } + } + + private async Task PumpAsync(Process process, ILogger logger, CancellationToken cancellationToken) + { + try + { + while (true) + { + var line = await process.StandardError.ReadLineAsync(cancellationToken); + if (line is null) + { + break; + } + + lock (Buffer) + { + Buffer.AppendLine(line); + } + + logger.LogWarning("[CLI] {Line}", line); + } + } + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) + { + } + catch (InvalidOperationException) when (cancellationToken.IsCancellationRequested) + { + } + catch (ObjectDisposedException) when (cancellationToken.IsCancellationRequested) + { + } + catch (IOException) when (cancellationToken.IsCancellationRequested) + { + } + catch (Exception ex) + { + logger.LogDebug(ex, "CLI stderr pump stopped unexpectedly"); + } + } } private static class ProcessArgumentEscaper