Skip to content

Commit 7961a24

Browse files
committed
调整线程池处理
1 parent fc491d7 commit 7961a24

File tree

6 files changed

+104
-35
lines changed

6 files changed

+104
-35
lines changed

src/Example/Program.cs

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -85,19 +85,27 @@ static void Main(string[] args)
8585
connection.CreateCommand("CREATE TABLE IF NOT EXISTS telemetrydata (ts timestamp,value_type tinyint, value_boolean bool, value_string binary(10240), value_long bigint,value_datetime timestamp,value_double double) TAGS (deviceid binary(32),keyname binary(64));").ExecuteNonQuery();
8686
var devid1 = $"{Guid.NewGuid():N}";
8787
var devid2 = $"{Guid.NewGuid():N}";
88+
var devid3 = $"{Guid.NewGuid():N}";
89+
var devid4 = $"{Guid.NewGuid():N}";
8890
DateTime dt = DateTime.Now;
89-
UploadTelemetryData(connection, devid1, "1#air-compressor-two-level-discharge-temperature", 4000);
90-
UploadTelemetryData(connection, devid2, "1#air-compressor-load-rate", 4000);
91-
91+
UploadTelemetryData(connection, devid1, "1#air-compressor-two-level-discharge-temperature", 5000);
92+
UploadTelemetryData(connection, devid2, "1#air-compressor-load-rate", 5000);
93+
Console.WriteLine("");
94+
Console.WriteLine("");
95+
Console.WriteLine("");
9296
DateTime dt2 = DateTime.Now;
93-
UploadTelemetryDataPool(connection, devid1, "1#air-compressor-two-level-discharge-temperature1", 4000);
94-
UploadTelemetryDataPool(connection, devid2, "1#air-compressor-load-rate1", 4000);
95-
Console.WriteLine($"UploadTelemetryData 耗时:{DateTime.Now.Subtract(dt).TotalSeconds}");
96-
Console.WriteLine($"UploadTelemetryDataPool 耗时:{DateTime.Now.Subtract(dt2).TotalSeconds}");
97+
UploadTelemetryDataPool(connection, devid3, "1#air-compressor-two-level-discharge-temperature1", 5000);
98+
UploadTelemetryDataPool(connection, devid4, "1#air-compressor-load-rate1", 5000);
99+
var t1 = DateTime.Now.Subtract(dt).TotalSeconds;
100+
var t2 = DateTime.Now.Subtract(dt2).TotalSeconds;
101+
Console.WriteLine("Done");
102+
Thread.Sleep(TimeSpan.FromSeconds(1));
103+
Console.WriteLine($"UploadTelemetryData 耗时:{t1}");
104+
Console.WriteLine($"UploadTelemetryDataPool 耗时:{t2}");
105+
Thread.Sleep(TimeSpan.FromSeconds(2));
97106
var reader2 = connection.CreateCommand("select last_row(*) from telemetrydata group by deviceid,keyname ;").ExecuteReader();
98107
ConsoleTableBuilder.From(reader2.ToDataTable()).WithFormat(ConsoleTableBuilderFormat.Default).ExportAndWriteLine();
99108
var reader3 = connection.CreateCommand("select * from telemetrydata").ExecuteReader();
100-
101109
List<string> list = new List<string>();
102110
while (reader3.Read())
103111
{
@@ -258,7 +266,15 @@ static void UploadTelemetryDataPool(TaosConnection connection, string devid, str
258266
{
259267
Parallel.For(0, count,new ParallelOptions() { MaxDegreeOfParallelism=connection.PoolSize }, i =>
260268
{
261-
connection.CreateCommand($"INSERT INTO device_{devid} USING telemetrydata TAGS(\"{devid}\",\"{keyname}\") values (now,2,true,'{i}',{i},now,{i});").ExecuteNonQuery();
269+
try
270+
{
271+
connection.CreateCommand($"INSERT INTO device_{devid} USING telemetrydata TAGS(\"{devid}\",\"{keyname}\") values (now,2,true,'{i}',{i},now,{i});").ExecuteNonQuery();
272+
Console.WriteLine($"线程:{Thread.CurrentThread.ManagedThreadId}{i}条数据, OK");
273+
}
274+
catch (Exception ex)
275+
{
276+
Console.WriteLine($"线程:{Thread.CurrentThread.ManagedThreadId}{i}条数据, {ex.Message}");
277+
}
262278
});
263279
}
264280
}

src/IoTSharp.Data.Taos/ConcurrentTaosQueue.cs

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
using System;
22
using System.Collections.Concurrent;
33
using System.Collections.Generic;
4+
using System.Diagnostics;
45
using System.Linq;
56
using System.Text;
67
using System.Threading;
@@ -26,10 +27,10 @@ public void Return(IntPtr client)
2627
{
2728
Monitor.Enter(TaosQueue);
2829
TaosQueue.Enqueue(client);
29-
System.Diagnostics.Debug.WriteLine($"TaosQueue Return:{client}");
30+
Debug.WriteLine($"线程{Thread.CurrentThread.ManagedThreadId} 归还 {client}");
3031
Monitor.Pulse(TaosQueue);
3132
Monitor.Exit(TaosQueue);
32-
33+
Thread.Sleep(0);
3334
}
3435
int _ref = 0;
3536
public void AddRef()
@@ -50,16 +51,29 @@ public void RemoveRef()
5051
_ref--;
5152
}
5253
}
54+
public int Timeout { get; set; }
5355
public IntPtr Take()
5456
{
57+
IntPtr client = IntPtr.Zero;
5558
Monitor.Enter(TaosQueue);
5659
if (TaosQueue.IsEmpty)
5760
{
58-
Monitor.Wait(TaosQueue);
61+
Debug.WriteLine($"线程{Thread.CurrentThread.ManagedThreadId} 连接池已空,请等待 超时时长:{Timeout}");
62+
Monitor.Wait(TaosQueue, TimeSpan.FromSeconds(Timeout));
63+
}
64+
if (!TaosQueue.TryDequeue(out client))
65+
{
66+
Debug.WriteLine($"线程{Thread.CurrentThread.ManagedThreadId} 从连接池获取连接失败,等待并重试");
67+
}
68+
else
69+
{
70+
Debug.WriteLine($"线程{Thread.CurrentThread.ManagedThreadId} 拿走 {client}");
5971
}
60-
TaosQueue.TryDequeue(out var client);
61-
System.Diagnostics.Debug.WriteLine($"TaosQueue Take:{client}");
6272
Monitor.Exit(TaosQueue);
73+
if (client == IntPtr.Zero)
74+
{
75+
throw new TimeoutException($"Connection pool is empty and wait time out({Timeout}s)!");
76+
}
6377
return client;
6478
}
6579
}

src/IoTSharp.Data.Taos/TaosCommand.cs

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -289,6 +289,14 @@ internal long GetDateTimeFrom(DateTime dt, IntPtr _taos)
289289
/// <exception cref="TaosException">A Taos error occurs during execution.</exception>
290290
public new virtual TaosDataReader ExecuteReader(CommandBehavior behavior)
291291
{
292+
var _taos = _connection.TakeClient();
293+
var dr= ExecuteReader(behavior, _taos);
294+
dr.OnDispose += (object sender, EventArgs e)=>_connection.ReturnClient(_taos);
295+
return dr;
296+
}
297+
298+
private TaosDataReader ExecuteReader(CommandBehavior behavior,IntPtr _taos)
299+
{
292300
if ((behavior & ~(CommandBehavior.Default | CommandBehavior.SequentialAccess | CommandBehavior.SingleResult
293301
| CommandBehavior.SingleRow | CommandBehavior.CloseConnection)) != 0)
294302
{
@@ -322,7 +330,7 @@ internal long GetDateTimeFrom(DateTime dt, IntPtr _taos)
322330
var closeConnection = (behavior & CommandBehavior.CloseConnection) != 0;
323331
try
324332
{
325-
var _taos = _connection.TakeClient();
333+
326334
#if DEBUG
327335
Debug.WriteLine($"_commandText:{_commandText}");
328336
#endif
@@ -605,10 +613,14 @@ public override int ExecuteNonQuery()
605613
{
606614
throw new InvalidOperationException($"CallRequiresSetCommandText{nameof(ExecuteNonQuery)}");
607615
}
608-
using (var reader = ExecuteReader())
616+
var _taos= _connection.TakeClient();
617+
int result = -1;
618+
using (var reader = ExecuteReader( CommandBehavior.Default,_taos))
609619
{
610-
return reader.RecordsAffected;
620+
result= reader.RecordsAffected;
611621
}
622+
_connection.ReturnClient(_taos);
623+
return result;
612624
}
613625

614626
/// <summary>
@@ -626,13 +638,16 @@ public override object ExecuteScalar()
626638
{
627639
throw new InvalidOperationException($"CallRequiresSetCommandText{nameof(ExecuteScalar)}");
628640
}
629-
641+
var _taos = _connection.TakeClient();
642+
object result =null;
630643
using (var reader = ExecuteReader())
631644
{
632-
return reader.Read()
645+
result= reader.Read()
633646
? reader.GetValue(0)
634647
: null;
635648
}
649+
_connection.ReturnClient(_taos);
650+
return result;
636651
}
637652

638653
/// <summary>

src/IoTSharp.Data.Taos/TaosConnection.cs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,8 @@ public override string ConnectionString
101101

102102
internal TaosConnectionStringBuilder ConnectionStringBuilder { get; set; }
103103

104+
public override int ConnectionTimeout => ConnectionStringBuilder.ConnectionTimeout;
105+
104106

105107
/// <summary>
106108
/// Gets the path to the database file. Will be absolute for open connections.
@@ -207,18 +209,18 @@ public override void Open()
207209
}
208210
if (!g_pool.ContainsKey(_connectionString))
209211
{
210-
g_pool.Add(_connectionString, new ConcurrentTaosQueue());
212+
g_pool.Add(_connectionString, new ConcurrentTaosQueue() { Timeout= ConnectionTimeout});
211213
}
212214
_queue = g_pool[_connectionString];
213215
_queue.AddRef();
214216
if (ConnectionString == null)
215217
{
216218
throw new InvalidOperationException("Open Requires Set ConnectionString");
217219
}
218-
for (int i = 0; i < ConnectionStringBuilder.PoolSize; i++)
220+
for (int i = 0; i < ConnectionStringBuilder.PoolSize+1; i++)
219221
{
220222
var c = TDengine.Connect(this.DataSource, ConnectionStringBuilder.Username, ConnectionStringBuilder.Password, "", (short)ConnectionStringBuilder.Port);
221-
if (c != null && c!=IntPtr.Zero)
223+
if (c!=IntPtr.Zero)
222224
{
223225
_queue.Return(c);
224226
}

src/IoTSharp.Data.Taos/TaosConnectionStringBuilder.cs

Lines changed: 29 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ public class TaosConnectionStringBuilder : DbConnectionStringBuilder
2626
private const string DataBaseKeyword = "DataBase";
2727
private const string PortKeyword = "Port";
2828
private const string PoolSizeKeyword = "PoolSize";
29-
29+
private const string TimeOutKeyword = "TimeOut";
3030
private enum Keywords
3131
{
3232
DataSource,
@@ -35,7 +35,9 @@ private enum Keywords
3535
Password,
3636
Port,
3737
Charset,
38-
PoolSize
38+
PoolSize,
39+
TimeOut
40+
3941

4042
}
4143

@@ -48,21 +50,23 @@ private enum Keywords
4850
private string _charset = System.Text.Encoding.UTF8.EncodingName;
4951
private string _password = string.Empty;
5052
private int _port =6030;
51-
private int _PoolSize=8;
53+
private int _PoolSize=Environment.ProcessorCount;
54+
private int _timeout = 30;
5255

5356
static TaosConnectionStringBuilder()
5457
{
55-
var validKeywords = new string[7];
58+
var validKeywords = new string[8];
5659
validKeywords[(int)Keywords.DataSource] = DataSourceKeyword;
5760
validKeywords[(int)Keywords.DataBase] = DataBaseKeyword;
5861
validKeywords[(int)Keywords.Username] = UserNameKeyword;
5962
validKeywords[(int)Keywords.Password] = PasswordKeyword;
6063
validKeywords[(int)Keywords.Charset] =CharsetKeyword;
6164
validKeywords[(int)Keywords.Port] = PortKeyword;
6265
validKeywords[(int)Keywords.PoolSize] = PoolSizeKeyword;
66+
validKeywords[(int)Keywords.TimeOut] = TimeOutKeyword;
6367
_validKeywords = validKeywords;
6468

65-
_keywords = new Dictionary<string, Keywords>(7, StringComparer.OrdinalIgnoreCase)
69+
_keywords = new Dictionary<string, Keywords>(8, StringComparer.OrdinalIgnoreCase)
6670
{
6771
[DataSourceKeyword] = Keywords.DataSource,
6872
[UserNameKeyword] = Keywords.Username,
@@ -71,7 +75,8 @@ static TaosConnectionStringBuilder()
7175
[CharsetKeyword] = Keywords.Charset,
7276
[DataSourceNoSpaceKeyword] = Keywords.DataSource,
7377
[PortKeyword] = Keywords.Port,
74-
[PoolSizeKeyword] = Keywords.PoolSize
78+
[PoolSizeKeyword] = Keywords.PoolSize,
79+
[TimeOutKeyword] = Keywords.TimeOut
7580
};
7681
}
7782

@@ -100,6 +105,8 @@ public virtual string DataSource
100105
get => _dataSource;
101106
set => base[DataSourceKeyword] = _dataSource = value;
102107
}
108+
109+
103110
public virtual string Username
104111
{
105112
get => _userName;
@@ -164,7 +171,13 @@ public virtual string DataBase
164171
}
165172
internal bool ForceDatabaseName { get; set; } = false;
166173

174+
167175

176+
public int ConnectionTimeout
177+
{
178+
get => _timeout;
179+
set => base[TimeOutKeyword] = _timeout = value;
180+
}
168181

169182
/// <summary>
170183
/// Gets or sets the value associated with the specified key.
@@ -206,6 +219,9 @@ public override object this[string keyword]
206219
case Keywords.PoolSize:
207220
PoolSize = Convert.ToInt32(value, CultureInfo.InvariantCulture);
208221
return;
222+
case Keywords.TimeOut:
223+
ConnectionTimeout = Convert.ToInt32(value, CultureInfo.InvariantCulture);
224+
return;
209225
default:
210226
Debug.Assert(false, "Unexpected keyword: " + keyword);
211227
return;
@@ -330,6 +346,8 @@ private object GetAt(Keywords index)
330346
return Charset;
331347
case Keywords.PoolSize:
332348
return PoolSize;
349+
case Keywords.TimeOut:
350+
return ConnectionTimeout;
333351
default:
334352
Debug.Assert(false, "Unexpected keyword: " + index);
335353
return null;
@@ -358,13 +376,16 @@ private void Reset(Keywords index)
358376
_dataBase = string.Empty;
359377
return;
360378
case Keywords.Port:
361-
_port=6060;
379+
_port=6030;
362380
return;
363381
case Keywords.Charset:
364382
_charset = System.Text.Encoding.UTF8.EncodingName;
365383
return;
366384
case Keywords.PoolSize:
367-
_PoolSize = 8;
385+
_PoolSize = Environment.ProcessorCount;
386+
return;
387+
case Keywords.TimeOut :
388+
_timeout = 30;
368389
return;
369390
default:
370391
Debug.Assert(false, "Unexpected keyword: " + index);

src/IoTSharp.Data.Taos/TaosDataReader.cs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ public class TaosDataReader : DbDataReader
2424
private readonly int _recordsAffected;
2525
private bool _closed;
2626
private bool _closeConnection;
27-
private readonly IntPtr _taos;
2827
private IntPtr _taosResult;
2928
private int _fieldCount;
3029
IntPtr rowdata=IntPtr.Zero;
@@ -33,11 +32,12 @@ public class TaosDataReader : DbDataReader
3332
private DateTime _dt1970;
3433
private TAOS_BIND[] _binds;
3534

35+
36+
3637
internal TaosDataReader(TaosCommand taosCommand, taosField[] metas, bool closeConnection, IntPtr taos, IntPtr res, int recordsAffected, int fieldcount, TAOS_BIND[] binds)
3738
{
3839
_command = taosCommand;
3940
_closeConnection = closeConnection;
40-
_taos = taos;
4141
_fieldCount = fieldcount;
4242
_hasRows = recordsAffected > 0;
4343
_recordsAffected = recordsAffected;
@@ -145,6 +145,8 @@ public override bool NextResult()
145145
public override void Close()
146146
=> Dispose(true);
147147

148+
149+
internal event EventHandler OnDispose;
148150
/// <summary>
149151
/// Releases any resources used by the data reader and closes it.
150152
/// </summary>
@@ -160,8 +162,7 @@ protected override void Dispose(bool disposing)
160162
_command.DataReader = null;
161163

162164
_closed = true;
163-
164-
165+
165166
TDengine.FreeResult(_taosResult);
166167
if (_binds != null)
167168
{
@@ -172,7 +173,7 @@ protected override void Dispose(bool disposing)
172173
{
173174
TDengine.FreeResult(rowdata);
174175
}
175-
_command.Connection.ReturnClient(_taos);
176+
OnDispose?.Invoke(this, EventArgs.Empty);
176177
}
177178

178179
/// <summary>

0 commit comments

Comments
 (0)