Skip to content

Commit 52ed6e0

Browse files
committed
重构聚合模块
1 parent abe56c2 commit 52ed6e0

File tree

123 files changed

+2474
-2125
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

123 files changed

+2474
-2125
lines changed

samples/Sample.MySql/Controllers/WeatherForecastController.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
using System.Linq;
77
using System.Threading.Tasks;
88
using Microsoft.EntityFrameworkCore;
9+
using Microsoft.EntityFrameworkCore.Infrastructure;
910
using Microsoft.EntityFrameworkCore.Storage;
1011
using Sample.MySql.DbContexts;
1112
using Sample.MySql.Domain.Entities;

samples/Sample.SqlServer/Controllers/ValuesController.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,6 @@ public async Task<IActionResult> Get1([FromQuery] int p, [FromQuery] int s)
177177
// var compiler = provider.GetFieldValue("_queryCompiler") as ShardingQueryCompiler;
178178
// var shardingDbContext = compiler.GetFieldValue("_shardingDbContext") as IShardingDbContext;
179179

180-
181180
Stopwatch sp = new Stopwatch();
182181
sp.Start();
183182
var shardingPageResultAsync = await _defaultTableDbContext.Set<SysUserMod>()

src/ShardingCore/Sharding/Enumerators/AggregateExtensions/AggregateExtension.cs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,30 @@ private static MethodCallExpression CreateSumByProperty(this IQueryable source,
123123
new[] { source.Expression, Expression.Quote(selector) });
124124
return callExpression;
125125
}
126+
127+
private static MethodCallExpression CreateSumByConstant<TSelect>(this IQueryable source)
128+
{
129+
if (source == null) throw new ArgumentNullException(nameof(source));
130+
var valueType = typeof(TSelect);
131+
if (!valueType.IsNumericType())
132+
throw new ShardingCoreInvalidOperationException(
133+
$"method sum cant calc type :[{valueType}]");
134+
ParameterExpression parameter = Expression.Parameter(source.ElementType, "s");
135+
// MemberExpression getter = Expression.MakeMemberAccess(parameter, property);
136+
Expression selector = Expression.Lambda(parameter);
137+
MethodInfo sumMethod = typeof(Queryable).GetMethods().First(
138+
m => m.Name == nameof(Queryable.Sum)
139+
&& m.ReturnType == valueType
140+
&& m.IsGenericMethod);
141+
142+
var genericSumMethod = sumMethod.MakeGenericMethod(new[] { source.ElementType });
143+
144+
var callExpression = Expression.Call(
145+
null,
146+
genericSumMethod,
147+
new[] { source.Expression, Expression.Quote(selector) });
148+
return callExpression;
149+
}
126150
/// <summary>
127151
/// 根据属性求和
128152
/// </summary>
@@ -140,6 +164,11 @@ public static TSelect SumByProperty<TSelect>(this IQueryable source, PropertyInf
140164
var callExpression = CreateSumByProperty(source, property);
141165
return source.Provider.Execute<TSelect>(callExpression);
142166
}
167+
public static TSelect SumByConstant<TSelect>(this IQueryable source)
168+
{
169+
var callExpression = CreateSumByConstant<TSelect>(source);
170+
return source.Provider.Execute<TSelect>(callExpression);
171+
}
143172
/// <summary>
144173
/// 根据属性求和
145174
/// </summary>
Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
using System;
2+
using System.Collections;
3+
using System.Collections.Generic;
4+
using System.Linq;
5+
using System.Threading;
6+
using System.Threading.Tasks;
7+
using ShardingCore.Extensions;
8+
9+
namespace ShardingCore.Sharding.Enumerators.StreamMergeAsync
10+
{
11+
internal class InMemoryGroupByOrderStreamMergeAsyncEnumerator<T> : IInMemoryStreamMergeAsyncEnumerator<T>
12+
{
13+
private readonly StreamMergeContext _streamMergeContext;
14+
private readonly bool _async;
15+
private readonly IEnumerator<T> _inMemoryEnumerator;
16+
private bool skip;
17+
private int _inMemoryReallyCount;
18+
19+
public InMemoryGroupByOrderStreamMergeAsyncEnumerator(StreamMergeContext streamMergeContext,IStreamMergeAsyncEnumerator<T> asyncSource, bool async)
20+
{
21+
if (_inMemoryEnumerator != null)
22+
throw new ArgumentNullException(nameof(_inMemoryEnumerator));
23+
_streamMergeContext = streamMergeContext;
24+
_async = async;
25+
26+
if (_async)
27+
_inMemoryEnumerator = GetAllRowsAsync(asyncSource).WaitAndUnwrapException();
28+
else
29+
_inMemoryEnumerator = GetAllRows(asyncSource);
30+
_inMemoryEnumerator.MoveNext();
31+
skip = true;
32+
}
33+
34+
private async Task<IEnumerator<T>> GetAllRowsAsync(IStreamMergeAsyncEnumerator<T> streamMergeAsyncEnumerator)
35+
{
36+
var list = new List<T>();
37+
#if !EFCORE2
38+
while (await streamMergeAsyncEnumerator.MoveNextAsync())
39+
#endif
40+
#if EFCORE2
41+
while (await streamMergeAsyncEnumerator.MoveNext(new CancellationToken()))
42+
#endif
43+
{
44+
list.Add(streamMergeAsyncEnumerator.GetCurrent());
45+
_inMemoryReallyCount++;
46+
}
47+
return list.AsQueryable().OrderWithExpression(_streamMergeContext.Orders).GetEnumerator();
48+
}
49+
private IEnumerator<T> GetAllRows(IStreamMergeAsyncEnumerator<T> streamMergeAsyncEnumerator)
50+
{
51+
var list = new List<T>();
52+
#if !EFCORE2
53+
while ( streamMergeAsyncEnumerator.MoveNext())
54+
#endif
55+
#if EFCORE2
56+
while (streamMergeAsyncEnumerator.MoveNext())
57+
#endif
58+
{
59+
list.Add(streamMergeAsyncEnumerator.GetCurrent());
60+
_inMemoryReallyCount++;
61+
}
62+
63+
return list.AsQueryable().OrderWithExpression(_streamMergeContext.Orders).GetEnumerator();
64+
}
65+
66+
public bool SkipFirst()
67+
{
68+
if (skip)
69+
{
70+
skip = false;
71+
return true;
72+
}
73+
return false;
74+
}
75+
public int GetReallyCount()
76+
{
77+
return _inMemoryReallyCount;
78+
}
79+
#if !EFCORE2
80+
81+
public ValueTask DisposeAsync()
82+
{
83+
_inMemoryEnumerator.Dispose();
84+
return new ValueTask();
85+
}
86+
87+
public ValueTask<bool> MoveNextAsync()
88+
{
89+
if (skip)
90+
{
91+
skip = false;
92+
return new ValueTask<bool>(null != _inMemoryEnumerator.Current);
93+
}
94+
return new ValueTask<bool>(_inMemoryEnumerator.MoveNext());
95+
}
96+
97+
public void Dispose()
98+
{
99+
_inMemoryEnumerator?.Dispose();
100+
}
101+
102+
#endif
103+
public bool MoveNext()
104+
{
105+
if (skip)
106+
{
107+
skip = false;
108+
return null != _inMemoryEnumerator.Current;
109+
}
110+
return _inMemoryEnumerator.MoveNext();
111+
}
112+
113+
public bool HasElement()
114+
{
115+
return null != _inMemoryEnumerator.Current;
116+
}
117+
118+
119+
public void Reset()
120+
{
121+
_inMemoryEnumerator.Reset();
122+
}
123+
124+
object IEnumerator.Current => Current;
125+
public T Current => GetCurrent();
126+
public T ReallyCurrent => GetReallyCurrent();
127+
public T GetCurrent()
128+
{
129+
if (skip)
130+
return default;
131+
return _inMemoryEnumerator.Current;
132+
}
133+
public T GetReallyCurrent()
134+
{
135+
return _inMemoryEnumerator.Current;
136+
}
137+
#if EFCORE2
138+
public void Dispose()
139+
{
140+
_inMemoryEnumerator?.Dispose();
141+
}
142+
143+
public Task<bool> MoveNext(CancellationToken cancellationToken = new CancellationToken())
144+
{
145+
cancellationToken.ThrowIfCancellationRequested();
146+
if (skip)
147+
{
148+
skip = false;
149+
return Task.FromResult(null != _inMemoryEnumerator.Current);
150+
}
151+
return Task.FromResult(_inMemoryEnumerator.MoveNext());
152+
}
153+
154+
155+
#endif
156+
}
157+
}

src/ShardingCore/Sharding/Enumerators/StreamMergeAsync/InMemoryStreamMergeAsyncEnumerator.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@ private async Task<IEnumerator<T>> GetAllRowsAsync(IStreamMergeAsyncEnumerator<T
4343
list.Add(streamMergeAsyncEnumerator.GetCurrent());
4444
_inMemoryReallyCount++;
4545
}
46-
4746
return list.GetEnumerator();
4847
}
4948
private IEnumerator<T> GetAllRows(IStreamMergeAsyncEnumerator<T> streamMergeAsyncEnumerator)

src/ShardingCore/Sharding/MergeContexts/GroupByContext.cs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,11 @@ public class GroupByContext
1313
/// <summary>
1414
/// group by 表达式
1515
/// </summary>
16-
public LambdaExpression GroupExpression { get; set; }
16+
public LambdaExpression GroupExpression { get; set; }
17+
/// <summary>
18+
/// 是否内存聚合
19+
/// </summary>
20+
public bool GroupMemoryMerge { get; set; }
1721

1822
}
1923
}

0 commit comments

Comments
 (0)