Skip to content

Commit 4ac8601

Browse files
committed
优化分片查询吞吐量
1 parent ace2d0c commit 4ac8601

17 files changed

+275
-53
lines changed
Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
using System;
2+
using System.Collections;
3+
using System.Collections.Generic;
4+
using System.Threading;
5+
using System.Threading.Tasks;
6+
7+
namespace ShardingCore.Sharding.Enumerators.StreamMergeAsync
8+
{
9+
internal class OneAtMostElementStreamMergeAsyncEnumerator<T> : IStreamMergeAsyncEnumerator<T>
10+
{
11+
private List<T>.Enumerator _enumerator;
12+
private bool skip;
13+
14+
public OneAtMostElementStreamMergeAsyncEnumerator(IStreamMergeAsyncEnumerator<T> streamMergeAsyncEnumerator)
15+
{
16+
var list = new List<T>();
17+
if (streamMergeAsyncEnumerator.HasElement())
18+
{
19+
list.Add(streamMergeAsyncEnumerator.ReallyCurrent);
20+
}
21+
22+
_enumerator = list.GetEnumerator();
23+
_enumerator.MoveNext();
24+
skip = true;
25+
}
26+
27+
#if !EFCORE2&&!EFCORE3&&!EFCORE5
28+
public ValueTask DisposeAsync()
29+
{
30+
_enumerator.Dispose();
31+
return ValueTask.CompletedTask;
32+
}
33+
34+
public ValueTask<bool> MoveNextAsync()
35+
{
36+
var moveNext = _enumerator.MoveNext();
37+
return ValueTask.FromResult<bool>(moveNext);
38+
}
39+
40+
public void Dispose()
41+
{
42+
_enumerator.Dispose();
43+
}
44+
45+
#endif
46+
#if EFCORE3 || EFCORE5
47+
public ValueTask DisposeAsync()
48+
{
49+
_enumerator.Dispose();
50+
return new ValueTask();
51+
}
52+
53+
public ValueTask<bool> MoveNextAsync()
54+
{
55+
var moveNext = _enumerator.MoveNext();
56+
return new ValueTask<bool>(moveNext);
57+
}
58+
59+
public void Dispose()
60+
{
61+
_enumerator.Dispose();
62+
}
63+
64+
#endif
65+
public bool MoveNext()
66+
{
67+
if (skip)
68+
{
69+
skip = false;
70+
return null != _enumerator.Current;
71+
}
72+
73+
var moveNext = _enumerator.MoveNext();
74+
return moveNext;
75+
}
76+
77+
public bool SkipFirst()
78+
{
79+
if (skip)
80+
{
81+
skip = false;
82+
return true;
83+
}
84+
85+
return false;
86+
}
87+
88+
public bool HasElement()
89+
{
90+
return null != _enumerator.Current;
91+
}
92+
93+
94+
public void Reset()
95+
{
96+
throw new NotImplementedException();
97+
}
98+
99+
object IEnumerator.Current => Current;
100+
public T Current => GetCurrent();
101+
public T ReallyCurrent => GetReallyCurrent();
102+
103+
public T GetCurrent()
104+
{
105+
if (skip)
106+
return default;
107+
return _enumerator.Current;
108+
}
109+
110+
public T GetReallyCurrent()
111+
{
112+
return _enumerator.Current;
113+
}
114+
#if EFCORE2
115+
public void Dispose()
116+
{
117+
_enumerator.Dispose();
118+
}
119+
120+
public Task<bool> MoveNext(CancellationToken cancellationToken = new CancellationToken())
121+
{
122+
if (skip)
123+
{
124+
skip = false;
125+
return Task.FromResult(null != _enumerator.Current);
126+
}
127+
var moveNext = _enumerator.MoveNext();
128+
return Task.FromResult(moveNext);
129+
}
130+
131+
#endif
132+
}
133+
}

src/ShardingCore/Sharding/MergeEngines/EnumeratorStreamMergeEngines/Enumerables/SingleOrDefaultStreamEnumerable.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ protected override IStreamMergeCombine GetStreamMergeCombine()
2020

2121
protected override IExecutor<IStreamMergeAsyncEnumerator<TEntity>> CreateExecutor0(bool async)
2222
{
23-
GetStreamMergeContext().ReSetTake(1);
23+
GetStreamMergeContext().ReSetTake(2);
2424
return new DefaultEnumeratorExecutor<TEntity>(GetStreamMergeContext(), GetStreamMergeCombine(), async);
2525
}
2626
}

src/ShardingCore/Sharding/MergeEngines/EnumeratorStreamMergeEngines/Enumerables/SingleQueryStreamEnumerable.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ protected override IStreamMergeCombine GetStreamMergeCombine()
3636
var streamMergeContext = GetStreamMergeContext();
3737
var shardingRouteResult = streamMergeContext.ShardingRouteResult;
3838
var sqlRouteUnit = shardingRouteResult.RouteUnits.First();
39-
var shardingDbContext = streamMergeContext.CreateDbContext(sqlRouteUnit, ConnectionModeEnum.MEMORY_STRICTLY);
39+
var shardingDbContext = streamMergeContext.CreateDbContext(sqlRouteUnit);
4040
var newQueryable = (IQueryable<TEntity>)streamMergeContext.GetOriginalQueryable().ReplaceDbContextQueryable(shardingDbContext);
4141
var enumeratorParallelExecutor = new SingleQueryEnumeratorExecutor<TEntity>(streamMergeContext);
4242
if (async)

src/ShardingCore/Sharding/MergeEngines/Executors/Abstractions/AbstractExecutor.cs

Lines changed: 25 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ namespace ShardingCore.Sharding.MergeEngines.Executors.Abstractions
1313
internal abstract class AbstractExecutor<TResult> : IExecutor<TResult>
1414
{
1515
private readonly StreamMergeContext _streamMergeContext;
16+
1617
/// <summary>
1718
/// not cancelled const mark
1819
/// </summary>
@@ -22,10 +23,11 @@ internal abstract class AbstractExecutor<TResult> : IExecutor<TResult>
2223
/// cancelled const mark
2324
/// </summary>
2425
private const int cancelled = 0;
26+
2527
/// <summary>
2628
/// cancel status
2729
/// </summary>
28-
private int cancelStatus= notCancelled;
30+
private int cancelStatus = notCancelled;
2931

3032
protected AbstractExecutor(StreamMergeContext streamMergeContext)
3133
{
@@ -36,6 +38,7 @@ protected StreamMergeContext GetStreamMergeContext()
3638
{
3739
return _streamMergeContext;
3840
}
41+
3942
/// <summary>
4043
/// 创建熔断器来中断符合查询的结果比如firstordefault只需要在顺序查询下查询到第一个
4144
/// 如果不是顺序查询则需要所有表的第一个进行内存再次查询
@@ -53,7 +56,9 @@ private bool IsCancelled()
5356
return cancelStatus == cancelled;
5457
}
5558

56-
public async Task<LinkedList<TResult>> ExecuteAsync(bool async, DataSourceSqlExecutorUnit dataSourceSqlExecutorUnit, CancellationToken cancellationToken = new CancellationToken())
59+
public async Task<LinkedList<TResult>> ExecuteAsync(bool async,
60+
DataSourceSqlExecutorUnit dataSourceSqlExecutorUnit,
61+
CancellationToken cancellationToken = new CancellationToken())
5762
{
5863
try
5964
{
@@ -65,8 +70,12 @@ private bool IsCancelled()
6570
throw;
6671
}
6772
}
68-
private async Task<LinkedList<TResult>> ExecuteAsync0(bool async, DataSourceSqlExecutorUnit dataSourceSqlExecutorUnit, CancellationToken cancellationToken = new CancellationToken())
73+
74+
private async Task<LinkedList<TResult>> ExecuteAsync0(bool async,
75+
DataSourceSqlExecutorUnit dataSourceSqlExecutorUnit,
76+
CancellationToken cancellationToken = new CancellationToken())
6977
{
78+
var streamMergeContext = GetStreamMergeContext();
7079
var circuitBreaker = CreateCircuitBreaker();
7180
var executorGroups = dataSourceSqlExecutorUnit.SqlExecutorGroups;
7281
LinkedList<TResult> result = new LinkedList<TResult>();
@@ -84,13 +93,7 @@ private bool IsCancelled()
8493
var dbContexts = routeQueryResults.Select(o => o.DbContext);
8594
foreach (var dbContext in dbContexts)
8695
{
87-
#if !EFCORE2
88-
await dbContext.DisposeAsync();
89-
90-
#endif
91-
#if EFCORE2
92-
dbContext.Dispose();
93-
#endif
96+
await streamMergeContext.DbContextDisposeAsync(dbContext);
9497
}
9598
}
9699
else
@@ -105,20 +108,22 @@ private bool IsCancelled()
105108
var hasNextLoop = executorGroupsCount > 0;
106109
if (hasNextLoop)
107110
{
108-
if (IsCancelled()|| circuitBreaker.Terminated(result))
111+
if (IsCancelled() || circuitBreaker.Terminated(result))
109112
break;
110113
}
111114
}
112115

113116
return result;
114117
}
118+
115119
/// <summary>
116120
/// 同库同组下面的并行异步执行,需要归并成一个结果
117121
/// </summary>
118122
/// <param name="sqlExecutorUnits"></param>
119123
/// <param name="cancellationToken"></param>
120124
/// <returns></returns>
121-
protected async Task<LinkedList<ShardingMergeResult<TResult>>> GroupExecuteAsync(List<SqlExecutorUnit> sqlExecutorUnits, CancellationToken cancellationToken = new CancellationToken())
125+
protected async Task<LinkedList<ShardingMergeResult<TResult>>> GroupExecuteAsync(
126+
List<SqlExecutorUnit> sqlExecutorUnits, CancellationToken cancellationToken = new CancellationToken())
122127
{
123128
if (sqlExecutorUnits.Count <= 0)
124129
{
@@ -128,8 +133,9 @@ private bool IsCancelled()
128133
{
129134
var result = new LinkedList<ShardingMergeResult<TResult>>();
130135

131-
var tasks = sqlExecutorUnits.Select(sqlExecutorUnit => ExecuteUnitAsync(sqlExecutorUnit, cancellationToken)).ToArray();
132-
136+
var tasks = sqlExecutorUnits
137+
.Select(sqlExecutorUnit => ExecuteUnitAsync(sqlExecutorUnit, cancellationToken)).ToArray();
138+
133139
var results = await TaskHelper.WhenAllFastFail(tasks);
134140
foreach (var r in results)
135141
{
@@ -139,14 +145,17 @@ private bool IsCancelled()
139145
return result;
140146
}
141147
}
142-
protected virtual void MergeParallelExecuteResult(LinkedList<TResult> previewResults, IEnumerable<TResult> parallelResults, bool async)
148+
149+
protected virtual void MergeParallelExecuteResult(LinkedList<TResult> previewResults,
150+
IEnumerable<TResult> parallelResults, bool async)
143151
{
144152
foreach (var parallelResult in parallelResults)
145153
{
146154
previewResults.AddLast(parallelResult);
147155
}
148156
}
157+
149158
protected abstract Task<ShardingMergeResult<TResult>> ExecuteUnitAsync(SqlExecutorUnit sqlExecutorUnit,
150159
CancellationToken cancellationToken = new CancellationToken());
151160
}
152-
}
161+
}

src/ShardingCore/Sharding/MergeEngines/Executors/Enumerators/Abstractions/AbstractEnumeratorExecutor.cs

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,16 @@
55
using System.Threading;
66
using System.Threading.Tasks;
77
using Microsoft.EntityFrameworkCore;
8+
using ShardingCore.Core;
89
using ShardingCore.Exceptions;
910
using ShardingCore.Sharding.Enumerators;
1011
using ShardingCore.Sharding.Enumerators.StreamMergeAsync;
1112
using ShardingCore.Sharding.Enumerators.StreamMergeAsync.EFCore2x;
1213
using ShardingCore.Sharding.MergeEngines.Abstractions;
14+
using ShardingCore.Sharding.MergeEngines.Common;
1315
using ShardingCore.Sharding.MergeEngines.Executors.Abstractions;
1416
using ShardingCore.Sharding.MergeEngines.Executors.CircuitBreakers;
17+
using ShardingCore.Sharding.ShardingExecutors;
1518

1619
#if EFCORE2
1720
using Microsoft.EntityFrameworkCore.Extensions.Internal;
@@ -140,5 +143,62 @@ public IEnumerator<TResult> GetEnumerator0(IQueryable<TResult> newQueryable)
140143
enumator.MoveNext();
141144
return enumator;
142145
}
146+
147+
protected override async Task<ShardingMergeResult<IStreamMergeAsyncEnumerator<TResult>>> ExecuteUnitAsync(SqlExecutorUnit sqlExecutorUnit, CancellationToken cancellationToken = new CancellationToken())
148+
{
149+
var shardingMergeResult = await ExecuteUnitAsync0(sqlExecutorUnit, cancellationToken);
150+
var dbContext = shardingMergeResult.DbContext;
151+
var streamMergeAsyncEnumerator = shardingMergeResult.MergeResult;
152+
//连接数严格的会在内存中聚合然后聚合后回收,非连接数严格需要判断是否需要当前执行单元直接回收
153+
//first last 等操作没有skip就可以回收,如果没有元素就可以回收
154+
//single如果没有元素就可以回收
155+
//enumerable如果没有元素就可以回收
156+
if (sqlExecutorUnit.ConnectionMode != ConnectionModeEnum.CONNECTION_STRICTLY)
157+
{
158+
var streamMergeContext = GetStreamMergeContext();
159+
if (DisposeInExecuteUnit(streamMergeContext,streamMergeAsyncEnumerator))
160+
{
161+
var disConnectionStreamMergeAsyncEnumerator = new OneAtMostElementStreamMergeAsyncEnumerator<TResult>(streamMergeAsyncEnumerator);
162+
await streamMergeContext.DbContextDisposeAsync(dbContext);
163+
return new ShardingMergeResult<IStreamMergeAsyncEnumerator<TResult>>(null,
164+
disConnectionStreamMergeAsyncEnumerator);
165+
}
166+
}
167+
168+
return shardingMergeResult;
169+
}
170+
/// <summary>
171+
/// 是否需要在执行单元中直接回收掉链接有助于提高吞吐量
172+
/// </summary>
173+
/// <param name="streamMergeContext"></param>
174+
/// <param name="streamMergeAsyncEnumerator"></param>
175+
/// <returns></returns>
176+
private bool DisposeInExecuteUnit(StreamMergeContext streamMergeContext,IStreamMergeAsyncEnumerator<TResult> streamMergeAsyncEnumerator)
177+
{
178+
var queryMethodName = streamMergeContext.MergeQueryCompilerContext.GetQueryMethodName();
179+
var hasElement = streamMergeAsyncEnumerator.HasElement();
180+
181+
switch (queryMethodName)
182+
{
183+
case nameof(Queryable.First):
184+
case nameof(Queryable.FirstOrDefault):
185+
case nameof(Queryable.Last):
186+
case nameof(Queryable.LastOrDefault):
187+
{
188+
var skip = streamMergeContext.GetSkip();
189+
return skip is null or < 0||!hasElement;
190+
}
191+
case nameof(Queryable.Single):
192+
case nameof(Queryable.SingleOrDefault):
193+
case QueryCompilerContext.ENUMERABLE:
194+
{
195+
return !hasElement;
196+
}
197+
}
198+
199+
return false;
200+
}
201+
protected abstract Task<ShardingMergeResult<IStreamMergeAsyncEnumerator<TResult>>> ExecuteUnitAsync0(
202+
SqlExecutorUnit sqlExecutorUnit, CancellationToken cancellationToken = new CancellationToken());
143203
}
144204
}

src/ShardingCore/Sharding/MergeEngines/Executors/Enumerators/AppendOrderSequenceEnumeratorExecutor.cs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,12 @@ public AppendOrderSequenceEnumeratorExecutor(StreamMergeContext streamMergeConte
3232
_async = async;
3333
}
3434

35-
protected override async Task<ShardingMergeResult<IStreamMergeAsyncEnumerator<TResult>>> ExecuteUnitAsync(SqlExecutorUnit sqlExecutorUnit, CancellationToken cancellationToken = new CancellationToken())
35+
protected override async Task<ShardingMergeResult<IStreamMergeAsyncEnumerator<TResult>>> ExecuteUnitAsync0(SqlExecutorUnit sqlExecutorUnit, CancellationToken cancellationToken = new CancellationToken())
3636
{
3737
var streamMergeContext = GetStreamMergeContext();
38-
var connectionMode = streamMergeContext.RealConnectionMode(sqlExecutorUnit.ConnectionMode);
3938
var sqlSequenceRouteUnit = sqlExecutorUnit.RouteUnit.As<SqlSequenceRouteUnit>();
4039
var sequenceResult = sqlSequenceRouteUnit.SequenceResult;
41-
var shardingDbContext = streamMergeContext.CreateDbContext(sqlSequenceRouteUnit, connectionMode);
40+
var shardingDbContext = streamMergeContext.CreateDbContext(sqlSequenceRouteUnit);
4241
var newQueryable = _noPaginationQueryable
4342
.Skip(sequenceResult.Skip)
4443
.Take(sequenceResult.Take)

src/ShardingCore/Sharding/MergeEngines/Executors/Enumerators/DefaultEnumeratorExecutor.cs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,12 +44,10 @@ public override IStreamMergeAsyncEnumerator<TResult> CombineInMemoryStreamMergeA
4444
return base.CombineInMemoryStreamMergeAsyncEnumerator(streamsAsyncEnumerators);
4545
}
4646

47-
protected override async Task<ShardingMergeResult<IStreamMergeAsyncEnumerator<TResult>>> ExecuteUnitAsync(SqlExecutorUnit sqlExecutorUnit, CancellationToken cancellationToken = new CancellationToken())
47+
protected override async Task<ShardingMergeResult<IStreamMergeAsyncEnumerator<TResult>>> ExecuteUnitAsync0(SqlExecutorUnit sqlExecutorUnit, CancellationToken cancellationToken = new CancellationToken())
4848
{
4949
var streamMergeContext = GetStreamMergeContext();
50-
var connectionMode = streamMergeContext.RealConnectionMode(sqlExecutorUnit.ConnectionMode);
51-
52-
var shardingDbContext = streamMergeContext.CreateDbContext(sqlExecutorUnit.RouteUnit, connectionMode);
50+
var shardingDbContext = streamMergeContext.CreateDbContext(sqlExecutorUnit.RouteUnit);
5351
var newQueryable = (IQueryable<TResult>)streamMergeContext.GetReWriteQueryable()
5452
.ReplaceDbContextQueryable(shardingDbContext);
5553

0 commit comments

Comments
 (0)