Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/Discord.Net.WebSocket/API/Voice/SpeakingParams.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ namespace Discord.API.Voice
internal class SpeakingParams
{
[JsonProperty("speaking")]
public bool IsSpeaking { get; set; }
public int Speaking { get; set; }
[JsonProperty("delay")]
public int Delay { get; set; }
[JsonProperty("ssrc")]
public uint Ssrc { get; set; }
}
}
24 changes: 15 additions & 9 deletions src/Discord.Net.WebSocket/Audio/AudioClient.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
using Discord.API.Voice;
using Discord.Audio.Streams;
using Discord.Logging;
using Discord.Net;
using Discord.Net.Converters;
using Discord.WebSocket;
using Newtonsoft.Json;
Expand Down Expand Up @@ -45,7 +44,7 @@ public StreamPair(AudioInStream reader, AudioOutStream writer)
private readonly SemaphoreSlim _stateLock;
private readonly ConcurrentQueue<long> _heartbeatTimes;
private readonly ConcurrentQueue<KeyValuePair<ulong, int>> _keepaliveTimes;
private readonly ConcurrentDictionary<uint, ulong> _ssrcMap;
private readonly SsrcMap _ssrcMap;
private readonly ConcurrentDictionary<ulong, StreamPair> _streams;

private Task _heartbeatTask, _keepaliveTask;
Expand All @@ -54,7 +53,7 @@ public StreamPair(AudioInStream reader, AudioOutStream writer)
private string _url, _sessionId, _token;
private ulong _userId;
private uint _ssrc;
private bool _isSpeaking;
private bool? _isSpeaking;
private StopReason _stopReason;
private bool _resuming;

Expand Down Expand Up @@ -90,7 +89,7 @@ internal AudioClient(SocketGuild guild, int clientId, ulong channelId)
_connection.Disconnected += (exception, _) => _disconnectedEvent.InvokeAsync(exception);
_heartbeatTimes = new ConcurrentQueue<long>();
_keepaliveTimes = new ConcurrentQueue<KeyValuePair<ulong, int>>();
_ssrcMap = new ConcurrentDictionary<uint, ulong>();
_ssrcMap = new SsrcMap();
_streams = new ConcurrentDictionary<ulong, StreamPair>();

_serializer = new JsonSerializer { ContractResolver = new DiscordContractResolver() };
Expand Down Expand Up @@ -146,12 +145,15 @@ private async Task OnConnectingAsync()

//Wait for READY
await _connection.WaitAsync().ConfigureAwait(false);
_ssrcMap.UserSpeakingChanged += OnUserSpeakingChanged;
}
private async Task OnDisconnectingAsync(Exception ex)
{
await _audioLogger.DebugAsync("Disconnecting ApiClient").ConfigureAwait(false);
await ApiClient.DisconnectAsync().ConfigureAwait(false);

_ssrcMap.UserSpeakingChanged -= OnUserSpeakingChanged;

if (_stopReason == StopReason.Unknown && ex.InnerException is WebSocketException exception)
{
await _audioLogger.WarningAsync(
Expand Down Expand Up @@ -207,6 +209,11 @@ private async Task FinishDisconnect(Exception ex, bool wontTryReconnect)
}
}

private async void OnUserSpeakingChanged(ulong userId, bool isSpeaking)
{
await _speakingUpdatedEvent.InvokeAsync(userId, isSpeaking);
}

private async Task ClearHeartBeaters()
{
//Wait for tasks to complete
Expand Down Expand Up @@ -332,8 +339,7 @@ private async Task ProcessMessageAsync(VoiceOpCode opCode, object payload)
throw new InvalidOperationException($"Discord selected an unexpected mode: {data.Mode}");

SecretKey = data.SecretKey;
_isSpeaking = false;
await ApiClient.SendSetSpeaking(_isSpeaking).ConfigureAwait(false);
await SetSpeakingAsync(false);
_keepaliveTask = RunKeepaliveAsync(_connection.CancelToken);

_ = _connection.CompleteAsync();
Expand Down Expand Up @@ -366,7 +372,7 @@ private async Task ProcessMessageAsync(VoiceOpCode opCode, object payload)
await _audioLogger.DebugAsync("Received Speaking").ConfigureAwait(false);

var data = (payload as JToken).ToObject<SpeakingEvent>(_serializer);
_ssrcMap[data.Ssrc] = data.UserId;
_ssrcMap.AddClient(data.Ssrc, data.UserId, data.Speaking);

await _speakingUpdatedEvent.InvokeAsync(data.UserId, data.Speaking);
}
Expand Down Expand Up @@ -468,7 +474,7 @@ private async Task ProcessPacketAsync(byte[] packet)
{
await _audioLogger.DebugAsync("Malformed Frame").ConfigureAwait(false);
}
else if (!_ssrcMap.TryGetValue(ssrc, out ulong userId))
else if (!_ssrcMap.TryUpdateUser(ssrc, out ulong userId))
{
await _audioLogger.DebugAsync($"Unknown SSRC {ssrc}").ConfigureAwait(false);
}
Expand Down Expand Up @@ -582,7 +588,7 @@ public async Task SetSpeakingAsync(bool value)
if (_isSpeaking != value)
{
_isSpeaking = value;
await ApiClient.SendSetSpeaking(value).ConfigureAwait(false);
await ApiClient.SendSetSpeaking(value, _ssrc).ConfigureAwait(false);
}
}

Expand Down
100 changes: 100 additions & 0 deletions src/Discord.Net.WebSocket/Audio/SsrcMap.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
using System;
using System.Collections.Concurrent;
using System.Timers;

namespace Discord.Audio
{
internal class SsrcMap
{
// The delay after a packet is received from a user until he is marked as not speaking anymore.
public static readonly TimeSpan Delay = TimeSpan.FromMilliseconds(100);

private readonly ConcurrentDictionary<uint, ClientData> _clients;

public event Action<ulong, bool> UserSpeakingChanged;

public SsrcMap()
{
_clients = new ConcurrentDictionary<uint, ClientData>();
}

public void AddClient(uint ssrc, ulong userId, bool isSpeaking)
{
if (_clients.TryGetValue(ssrc, out ClientData client))
{
client.SpeakingChanged -= OnUserSpeakingChanged;
}

client = new ClientData(userId, isSpeaking);
client.SpeakingChanged += OnUserSpeakingChanged;
_clients[ssrc] = client;
}

public bool TryUpdateUser(uint ssrc, out ulong userId)
{
bool exists = false;
userId = 0;

if (_clients.TryGetValue(ssrc, out ClientData client))
{
exists = true;
userId = client.UserId;
client.ActivateSpeaking();
}

return exists;
}

private void OnUserSpeakingChanged(ClientData client)
{
UserSpeakingChanged?.Invoke(client.UserId, client.IsSpeaking);
}

public void Clear()
{
_clients.Clear();
}

private class ClientData
{
public ulong UserId { get; }
public Timer Timer { get; }
public bool IsSpeaking { get; private set; }

public event Action<ClientData> SpeakingChanged;

public ClientData(ulong userId, bool isSpeaking)
{
UserId = userId;
Timer = new Timer(Delay);
Timer.AutoReset = false;
Timer.Elapsed += OnTimerElapsed;
IsSpeaking = isSpeaking;

if (IsSpeaking) Timer.Start();
}

private void OnTimerElapsed(object sender, ElapsedEventArgs e)
{
if (IsSpeaking)
{
IsSpeaking = false;
SpeakingChanged?.Invoke(this);
}
}

public void ActivateSpeaking()
{
if (!IsSpeaking)
{
IsSpeaking = true;
SpeakingChanged?.Invoke(this);
}

// Restart timer
Timer.Stop();
Timer.Start();
}
}
}
}
3 changes: 0 additions & 3 deletions src/Discord.Net.WebSocket/Audio/Streams/RTPReadStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ namespace Discord.Audio.Streams
public class RTPReadStream : AudioOutStream
{
private readonly AudioStream _next;
private readonly byte[] _buffer, _nonce;

public override bool CanRead => true;
public override bool CanSeek => false;
Expand All @@ -17,8 +16,6 @@ public class RTPReadStream : AudioOutStream
public RTPReadStream(AudioStream next, int bufferSize = 4000)
{
_next = next;
_buffer = new byte[bufferSize];
_nonce = new byte[24];
}

/// <exception cref="OperationCanceledException">The token has had cancellation requested.</exception>
Expand Down
7 changes: 4 additions & 3 deletions src/Discord.Net.WebSocket/DiscordVoiceApiClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -164,12 +164,13 @@ public Task SendSelectProtocol(string externalIp)
});
}

public Task SendSetSpeaking(bool value)
public Task SendSetSpeaking(bool value, uint ssrc)
{
return SendAsync(VoiceOpCode.Speaking, new SpeakingParams
{
IsSpeaking = value,
Delay = 0
Speaking = value ? 1 : 0,
Delay = 0,
Ssrc = ssrc
});
}

Expand Down