Skip to content

Commit fc491d7

Browse files
committed
加入线程池
1 parent eb92d0a commit fc491d7

File tree

9 files changed

+206
-71
lines changed

9 files changed

+206
-71
lines changed

docker-compose.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ services:
1515
- taos-network
1616

1717
taos:
18-
image: tdengine/tdengine:2.4.0.12
18+
image: tdengine/tdengine:2.6.0.12
1919
restart: always
2020
container_name: taos
2121
hostname: taos

src/Example/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ RUN echo "deb https://mirrors.tuna.tsinghua.edu.cn/debian/ bullseye main contrib
77
echo "deb https://mirrors.tuna.tsinghua.edu.cn/debian/ bullseye-backports main contrib non-free" >> /etc/apt/sources.list && \
88
apt-get -y -q update && apt-get install -y -q apt-utils libgdiplus libc6-dev lsof net-tools wget curl iputils-ping inetutils-tools && \
99
apt-get autoremove -y && apt-get clean && apt-get autoclean && rm /var/cache/apt/* -rf && ln -sf /usr/share/zoneinfo/Asia/Shanghai /etc/localtime
10-
RUN curl -o TDengine-client.tar.gz "https://www.taosdata.com/assets-download/TDengine-client-2.4.0.12-Linux-x64.tar.gz" && \
10+
RUN curl -o TDengine-client.tar.gz "https://www.taosdata.com/assets-download/TDengine-client-2.6.0.12-Linux-x64.tar.gz" && \
1111
tar -xvf TDengine-client.tar.gz && rm TDengine-client.tar.gz -f && cd $(ls TDengine-client* -d) && \
1212
./install_client.sh && \
1313
rm $(pwd) -rf

src/Example/Program.cs

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
using System.Data.Common;
99
using System.Linq;
1010
using System.Threading;
11+
using System.Threading.Tasks;
1112

1213
namespace TaosADODemo
1314
{
@@ -21,7 +22,7 @@ static void Main(string[] args)
2122
string database = "db_" + DateTime.Now.ToString("yyyyMMddHHmmss");
2223
var builder = new TaosConnectionStringBuilder()
2324
{
24-
DataSource = "airleaderserver",
25+
DataSource = "taos",
2526
DataBase = database,
2627
Username = "root",
2728
Password = "taosdata",
@@ -61,7 +62,7 @@ static void Main(string[] args)
6162
Console.WriteLine("");
6263
ConsoleTableBuilder.From(reader.ToDataTable()).WithFormat(ConsoleTableBuilderFormat.MarkDown).ExportAndWriteLine();
6364

64-
65+
connection.ChangeDatabase(database);
6566
Console.WriteLine("");
6667
connection.CreateCommand($"CREATE TABLE datas ('reportTime' timestamp, type int, 'bufferedEnd' bool, address nchar(64), parameter nchar(64), value nchar(64)) TAGS ('boxCode' nchar(64), 'machineId' int);").ExecuteNonQuery();
6768
connection.CreateCommand($"INSERT INTO data_history_67 USING datas TAGS (mongo, 67) values ( 1608173534840 2 false 'Channel1.窑.烟囱温度' '烟囱温度' '122.00' );").ExecuteNonQuery();
@@ -84,8 +85,15 @@ static void Main(string[] args)
8485
connection.CreateCommand("CREATE TABLE IF NOT EXISTS telemetrydata (ts timestamp,value_type tinyint, value_boolean bool, value_string binary(10240), value_long bigint,value_datetime timestamp,value_double double) TAGS (deviceid binary(32),keyname binary(64));").ExecuteNonQuery();
8586
var devid1 = $"{Guid.NewGuid():N}";
8687
var devid2 = $"{Guid.NewGuid():N}";
87-
UploadTelemetryData(connection, devid1, "1#air-compressor-two-level-discharge-temperature", 2000);
88-
UploadTelemetryData(connection, devid2, "1#air-compressor-load-rate", 2000);
88+
DateTime dt = DateTime.Now;
89+
UploadTelemetryData(connection, devid1, "1#air-compressor-two-level-discharge-temperature", 4000);
90+
UploadTelemetryData(connection, devid2, "1#air-compressor-load-rate", 4000);
91+
92+
DateTime dt2 = DateTime.Now;
93+
UploadTelemetryDataPool(connection, devid1, "1#air-compressor-two-level-discharge-temperature1", 4000);
94+
UploadTelemetryDataPool(connection, devid2, "1#air-compressor-load-rate1", 4000);
95+
Console.WriteLine($"UploadTelemetryData 耗时:{DateTime.Now.Subtract(dt).TotalSeconds}");
96+
Console.WriteLine($"UploadTelemetryDataPool 耗时:{DateTime.Now.Subtract(dt2).TotalSeconds}");
8997
var reader2 = connection.CreateCommand("select last_row(*) from telemetrydata group by deviceid,keyname ;").ExecuteReader();
9098
ConsoleTableBuilder.From(reader2.ToDataTable()).WithFormat(ConsoleTableBuilderFormat.Default).ExportAndWriteLine();
9199
var reader3 = connection.CreateCommand("select * from telemetrydata").ExecuteReader();
@@ -120,7 +128,7 @@ static void Main(string[] args)
120128
string[] jsonStr = {
121129
"{"
122130
+"\"metric\": \"stb0_0\","
123-
+"\"timestamp\": 1626006833,"
131+
+$"\"timestamp\": {DateTimeOffset.Now.ToUnixTimeSeconds()},"
124132
+"\"value\": 10,"
125133
+"\"tags\": {"
126134
+" \"t1\": true,"
@@ -169,7 +177,7 @@ static JObject AddTag(JObject tags, string name, object value, string type)
169177
payload.Add("metric", "stb3_0");
170178

171179
var timestamp = new JObject();
172-
timestamp.Add("value", 1626006833);
180+
timestamp.Add("value", DateTimeOffset.Now.ToUnixTimeSeconds() );
173181
timestamp.Add("type", "s");
174182
payload.Add("timestamp", timestamp);
175183

@@ -245,5 +253,13 @@ static void UploadTelemetryData( TaosConnection connection, string devid, strin
245253
connection.CreateCommand($"INSERT INTO device_{devid} USING telemetrydata TAGS(\"{devid}\",\"{keyname}\") values (now,2,true,'{i}',{i},now,{i});").ExecuteNonQuery();
246254
}
247255
}
256+
257+
static void UploadTelemetryDataPool(TaosConnection connection, string devid, string keyname, int count)
258+
{
259+
Parallel.For(0, count,new ParallelOptions() { MaxDegreeOfParallelism=connection.PoolSize }, i =>
260+
{
261+
connection.CreateCommand($"INSERT INTO device_{devid} USING telemetrydata TAGS(\"{devid}\",\"{keyname}\") values (now,2,true,'{i}',{i},now,{i});").ExecuteNonQuery();
262+
});
263+
}
248264
}
249265
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
using System;
2+
using System.Collections.Concurrent;
3+
using System.Collections.Generic;
4+
using System.Linq;
5+
using System.Text;
6+
using System.Threading;
7+
using System.Threading.Tasks;
8+
9+
namespace IoTSharp.Data.Taos
10+
{
11+
public class ConcurrentTaosQueue
12+
{
13+
public ConcurrentQueue<IntPtr> TaosQueue { get; }
14+
15+
public ConcurrentTaosQueue(List<IntPtr> clients)
16+
{
17+
TaosQueue = new ConcurrentQueue<IntPtr>(clients);
18+
}
19+
20+
public ConcurrentTaosQueue()
21+
{
22+
TaosQueue = new ConcurrentQueue<IntPtr>();
23+
}
24+
25+
public void Return(IntPtr client)
26+
{
27+
Monitor.Enter(TaosQueue);
28+
TaosQueue.Enqueue(client);
29+
System.Diagnostics.Debug.WriteLine($"TaosQueue Return:{client}");
30+
Monitor.Pulse(TaosQueue);
31+
Monitor.Exit(TaosQueue);
32+
33+
}
34+
int _ref = 0;
35+
public void AddRef()
36+
{
37+
lock (this)
38+
{
39+
_ref++;
40+
}
41+
}
42+
public int GetRef()
43+
{
44+
return _ref;
45+
}
46+
public void RemoveRef()
47+
{
48+
lock (this)
49+
{
50+
_ref--;
51+
}
52+
}
53+
public IntPtr Take()
54+
{
55+
Monitor.Enter(TaosQueue);
56+
if (TaosQueue.IsEmpty)
57+
{
58+
Monitor.Wait(TaosQueue);
59+
}
60+
TaosQueue.TryDequeue(out var client);
61+
System.Diagnostics.Debug.WriteLine($"TaosQueue Take:{client}");
62+
Monitor.Exit(TaosQueue);
63+
return client;
64+
}
65+
}
66+
}

src/IoTSharp.Data.Taos/Driver/TDengineDriver.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -196,8 +196,8 @@ static public string Error(IntPtr res)
196196
static extern public int ErrorNo(IntPtr res);
197197

198198
[DllImport("taos", EntryPoint = "taos_query", CallingConvention = CallingConvention.Cdecl)]
199-
// static extern public IntPtr Query(IntPtr conn, string sqlstr);
200199
static extern private IntPtr Query(IntPtr conn, IntPtr byteArr);
200+
201201
[DllImport("taos", EntryPoint = "taos_stop_query", CallingConvention = CallingConvention.Cdecl)]
202202
public static extern void StopQuery(IntPtr taos);
203203

src/IoTSharp.Data.Taos/TaosCommand.cs

Lines changed: 28 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ public class TaosCommand : DbCommand
2727
private readonly DateTime _dt1970;
2828
private TaosConnection _connection;
2929
private string _commandText;
30-
private IntPtr _taos => _connection._taos;
30+
3131
/// <summary>
3232
/// Initializes a new instance of the <see cref="TaosCommand" /> class.
3333
/// </summary>
@@ -116,10 +116,9 @@ public override string CommandText
116116
{
117117
throw new InvalidOperationException($"SetRequiresNoOpenReader{nameof(Connection)}");
118118
}
119-
119+
120120
if (value != _connection)
121121
{
122-
123122
_connection?.RemoveCommand(this);
124123
_connection = value;
125124
value?.AddCommand(this);
@@ -137,6 +136,7 @@ protected override DbConnection DbConnection
137136
set => Connection = (TaosConnection)value;
138137
}
139138

139+
140140
/// <summary>
141141
/// Gets or sets the transaction within which the command executes.
142142
/// </summary>
@@ -317,12 +317,12 @@ internal long GetDateTimeFrom(DateTime dt, IntPtr _taos)
317317
{
318318
throw new InvalidOperationException($"CallRequiresSetCommandText{nameof(ExecuteReader)}");
319319
}
320-
321320
var unprepared = false;
322321
TaosDataReader dataReader = null;
323322
var closeConnection = (behavior & CommandBehavior.CloseConnection) != 0;
324323
try
325324
{
325+
var _taos = _connection.TakeClient();
326326
#if DEBUG
327327
Debug.WriteLine($"_commandText:{_commandText}");
328328
#endif
@@ -336,7 +336,7 @@ internal long GetDateTimeFrom(DateTime dt, IntPtr _taos)
336336
if (stmt != IntPtr.Zero)
337337
{
338338
var pms = _parameters.Value;
339-
binds = BindParamters(pms);
339+
binds = BindParamters(pms,_taos);
340340
int res = TDengine.StmtPrepare(stmt, _commandText);
341341
if (res == 0)
342342
{
@@ -415,31 +415,33 @@ internal long GetDateTimeFrom(DateTime dt, IntPtr _taos)
415415
}
416416
}
417417
#endif
418-
dataReader = new TaosDataReader(this, metas, closeConnection, code.Result, _affectRows, metas?.Length ?? 0, binds);
419-
}
420-
else if (isok && TDengine.ErrorNo(code.Result) != 0)
421-
{
422-
TaosException.ThrowExceptionForRC(_commandText, new TaosErrorResult() { Code = TDengine.ErrorNo(code.Result), Error = TDengine.Error(code.Result) });
418+
dataReader = new TaosDataReader(this, metas, closeConnection, _taos, code.Result, _affectRows, metas?.Length ?? 0, binds);
423419
}
424-
else if (isok && code.Result == IntPtr.Zero)
420+
else if (isok && TDengine.ErrorNo(code.Result) != 0)
425421
{
426-
TaosException.ThrowExceptionForRC(_commandText, new TaosErrorResult() { Code = TDengine.ErrorNo(_taos), Error = TDengine.Error(_taos) });
427-
}
428-
else if (code.Status == TaskStatus.Running || !isok)
429-
{
430-
TaosException.ThrowExceptionForRC(-10006, "Execute sql command timeout", null);
431-
}
432-
else if (code.IsCanceled)
433-
{
434-
TaosException.ThrowExceptionForRC(-10003, "Command is Canceled", null);
435-
}
436-
else if (code.IsFaulted)
437-
{
438-
TaosException.ThrowExceptionForRC(-10004, code.Exception.Message, code.Exception?.InnerException);
422+
var ter = new TaosErrorResult() { Code = TDengine.ErrorNo(code.Result), Error = TDengine.Error(code.Result) };
423+
_connection.ReturnClient(_taos);
424+
TaosException.ThrowExceptionForRC(_commandText, ter);
439425
}
440426
else
441427
{
442-
TaosException.ThrowExceptionForRC(_commandText, new TaosErrorResult() { Code = -10007, Error = $"Unknow Exception" });
428+
_connection.ReturnClient(_taos);
429+
if (code.Status == TaskStatus.Running || !isok)
430+
{
431+
TaosException.ThrowExceptionForRC(-10006, "Execute sql command timeout", null);
432+
}
433+
else if (code.IsCanceled)
434+
{
435+
TaosException.ThrowExceptionForRC(-10003, "Command is Canceled", null);
436+
}
437+
else if (code.IsFaulted)
438+
{
439+
TaosException.ThrowExceptionForRC(-10004, code.Exception.Message, code.Exception?.InnerException);
440+
}
441+
else
442+
{
443+
TaosException.ThrowExceptionForRC(_commandText, new TaosErrorResult() { Code = -10007, Error = $"Unknow Exception" });
444+
}
443445
}
444446
}
445447
catch when (unprepared)
@@ -449,7 +451,7 @@ internal long GetDateTimeFrom(DateTime dt, IntPtr _taos)
449451
return dataReader;
450452
}
451453

452-
private TAOS_BIND[] BindParamters(TaosParameterCollection pms)
454+
private TAOS_BIND[] BindParamters(TaosParameterCollection pms,IntPtr _taos)
453455
{
454456
TAOS_BIND[] binds = new TAOS_BIND[pms.Count];
455457
for (int i = 0; i < pms.Count; i++)

0 commit comments

Comments
 (0)