Skip to content

Commit 61f76b7

Browse files
committed
修复Migrate在同步上下文中block的bug
1 parent 44a6765 commit 61f76b7

File tree

7 files changed

+258
-7
lines changed

7 files changed

+258
-7
lines changed

nuget-publish.bat

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
:start
22
::定义版本
3-
set SHARDINGCORE=6.7.0.5
3+
set SHARDINGCORE=6.7.0.6
44

55
::删除所有bin与obj下的文件
66
@echo off

samples/Sample.AutoCreateIfPresent/Controllers/WeatherForecastController.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ public async Task<IActionResult> Insert()
5252
}
5353
public async Task<IActionResult> Query1()
5454
{
55+
var s = Guid.NewGuid().ToString();
56+
var anyAsync = await _defaultDbContext.Set<AreaDevice>().AnyAsync(o=>o.Area==s);
5557
var list = await _defaultDbContext.Set<AreaDevice>().ToListAsync();
5658
return Ok(list);
5759
}
Lines changed: 243 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,243 @@
1+
using System.Linq.Expressions;
2+
using Microsoft.EntityFrameworkCore;
3+
using ShardingCore.Core.VirtualRoutes.DataSourceRoutes.RouteRuleEngine;
4+
using ShardingCore.Core.VirtualRoutes.TableRoutes;
5+
using ShardingCore.Core.VirtualRoutes.TableRoutes.RouteTails.Abstractions;
6+
using ShardingCore.Exceptions;
7+
using ShardingCore.Extensions;
8+
using ShardingCore.Sharding.Abstractions;
9+
10+
namespace Sample.AutoCreateIfPresent;
11+
12+
public static class MyShardingExtension
13+
{
14+
15+
/// <summary>
16+
/// 根据对象集合解析
17+
/// </summary>
18+
/// <typeparam name="TShardingDbContext"></typeparam>
19+
/// <typeparam name="TEntity"></typeparam>
20+
/// <param name="shardingDbContext"></param>
21+
/// <param name="entities"></param>
22+
/// <returns></returns>
23+
public static Dictionary<string, Dictionary<DbContext, IEnumerable<TEntity>>> BulkShardingEnumerable<TShardingDbContext, TEntity>(this TShardingDbContext shardingDbContext,
24+
IEnumerable<TEntity> entities) where TShardingDbContext : DbContext, IShardingDbContext where TEntity : class
25+
{
26+
if (entities.IsEmpty())
27+
return new Dictionary<string, Dictionary<DbContext, IEnumerable<TEntity>>>();
28+
var shardingRuntimeContext = shardingDbContext.GetShardingRuntimeContext();
29+
var entityType = typeof(TEntity);
30+
var routeTailFactory = shardingRuntimeContext.GetRouteTailFactory();
31+
var virtualDataSource = shardingDbContext.GetVirtualDataSource();
32+
var dataSourceRouteManager = shardingRuntimeContext.GetDataSourceRouteManager();
33+
var tableRouteManager =shardingRuntimeContext.GetTableRouteManager();
34+
var entityMetadataManager = shardingRuntimeContext.GetEntityMetadataManager();
35+
var dataSourceNames = new Dictionary<string, Dictionary<string, BulkDicEntry<TEntity>>>();
36+
var entitiesArray = entities as TEntity[] ?? entities.ToArray();
37+
var isShardingDataSource = entityMetadataManager.IsShardingDataSource(entityType);
38+
var isShardingTable = entityMetadataManager.IsShardingTable(entityType);
39+
if (!isShardingDataSource && !isShardingTable)
40+
return new Dictionary<string, Dictionary<DbContext, IEnumerable<TEntity>>>()
41+
{
42+
{
43+
virtualDataSource.DefaultDataSourceName,
44+
new Dictionary<DbContext, IEnumerable<TEntity>>()
45+
{
46+
{
47+
shardingDbContext.CreateGenericDbContext(entitiesArray[0]),
48+
entitiesArray
49+
}
50+
}
51+
}
52+
};
53+
if (!isShardingDataSource)
54+
{
55+
var bulkDicEntries = new Dictionary<string, BulkDicEntry<TEntity>>();
56+
dataSourceNames.Add(virtualDataSource.DefaultDataSourceName, bulkDicEntries);
57+
58+
var tableRoute = tableRouteManager.GetRoute(entityType);
59+
var allTails = tableRoute.GetTails().ToHashSet();
60+
foreach (var entity in entitiesArray)
61+
{
62+
BulkShardingTableEnumerable(shardingDbContext, virtualDataSource.DefaultDataSourceName, bulkDicEntries,
63+
routeTailFactory, tableRoute, allTails, entity);
64+
}
65+
}
66+
else
67+
{
68+
var virtualDataSourceRoute = dataSourceRouteManager.GetRoute(entityType);
69+
var allDataSourceNames = virtualDataSourceRoute.GetAllDataSourceNames().ToHashSet();
70+
71+
var entityMetadata = entityMetadataManager.TryGet(entityType);
72+
IVirtualTableRoute tableRoute = null;
73+
ISet<string> allTails = null;
74+
if (isShardingTable)
75+
{
76+
tableRoute = tableRouteManager.GetRoute(entityType);
77+
allTails = tableRoute.GetTails().ToHashSet();
78+
}
79+
foreach (var entity in entitiesArray)
80+
{
81+
var shardingDataSourceValue = entity.GetPropertyValue(entityMetadata.ShardingDataSourceProperty.Name);
82+
if (shardingDataSourceValue == null)
83+
throw new ShardingCoreInvalidOperationException($" etities has null value of sharding data source value");
84+
var shardingDataSourceName = virtualDataSourceRoute.ShardingKeyToDataSourceName(shardingDataSourceValue);
85+
if (!allDataSourceNames.Contains(shardingDataSourceName))
86+
throw new ShardingCoreException(
87+
$" data source name :[{shardingDataSourceName}] all data source names:[{string.Join(",", allDataSourceNames)}]");
88+
if (!dataSourceNames.TryGetValue(shardingDataSourceName, out var bulkDicEntries))
89+
{
90+
bulkDicEntries = new Dictionary<string, BulkDicEntry<TEntity>>();
91+
dataSourceNames.Add(shardingDataSourceName, bulkDicEntries);
92+
}
93+
94+
if (isShardingTable)
95+
{
96+
BulkShardingTableEnumerable(shardingDbContext, shardingDataSourceName, bulkDicEntries,
97+
routeTailFactory, tableRoute, allTails, entity);
98+
}
99+
else
100+
BulkNoShardingTableEnumerable(shardingDbContext, shardingDataSourceName, bulkDicEntries,
101+
routeTailFactory, entity);
102+
}
103+
}
104+
105+
return dataSourceNames.ToDictionary(o => o.Key,
106+
o => o.Value.Select(o => o.Value).ToDictionary(v => v.InnerDbContext, v => v.InnerEntities.Select(t => t)));
107+
}
108+
109+
private static void BulkShardingTableEnumerable<TShardingDbContext, TEntity>(TShardingDbContext shardingDbContext, string dataSourceName, Dictionary<string, BulkDicEntry<TEntity>> dataSourceBulkDicEntries,
110+
IRouteTailFactory routeTailFactory,IVirtualTableRoute tableRoute, ISet<string> allTails, TEntity entity)
111+
where TShardingDbContext : DbContext, IShardingDbContext
112+
where TEntity : class
113+
{
114+
var entityType = typeof(TEntity);
115+
116+
var shardingKey = entity.GetPropertyValue(tableRoute.EntityMetadata.ShardingTableProperty.Name);
117+
var tail = tableRoute.ShardingKeyToTail(shardingKey);
118+
if (!allTails.Contains(tail))
119+
{
120+
//不在alltails说明需要新增那么调用routewithvalue就会处理对应tail
121+
var tableRouteUnit = tableRoute.RouteWithValue(new DataSourceRouteResult(new HashSet<string>(new[] { dataSourceName })),
122+
shardingKey);
123+
if (tableRouteUnit.Tail != tail)
124+
{
125+
throw new ShardingCoreException(
126+
$"sharding key route not match entity:{entityType.FullName},sharding key:{shardingKey},sharding tail:{tail}");
127+
}
128+
allTails.Add(tail);
129+
}
130+
131+
var routeTail = routeTailFactory.Create(tail);
132+
var routeTailIdentity = routeTail.GetRouteTailIdentity();
133+
if (!dataSourceBulkDicEntries.TryGetValue(routeTailIdentity, out var bulkDicEntry))
134+
{
135+
var dbContext = shardingDbContext.GetShareDbContext(dataSourceName, routeTail);
136+
bulkDicEntry = new BulkDicEntry<TEntity>(dbContext, new LinkedList<TEntity>());
137+
dataSourceBulkDicEntries.Add(routeTailIdentity, bulkDicEntry);
138+
}
139+
140+
bulkDicEntry.InnerEntities.AddLast(entity);
141+
}
142+
private static void BulkNoShardingTableEnumerable<TShardingDbContext, TEntity>(TShardingDbContext shardingDbContext, string dataSourceName, Dictionary<string, BulkDicEntry<TEntity>> dataSourceBulkDicEntries, IRouteTailFactory routeTailFactory, TEntity entity)
143+
where TShardingDbContext : DbContext, IShardingDbContext
144+
where TEntity : class
145+
{
146+
var routeTail = routeTailFactory.Create(string.Empty);
147+
var routeTailIdentity = routeTail.GetRouteTailIdentity();
148+
if (!dataSourceBulkDicEntries.TryGetValue(routeTailIdentity, out var bulkDicEntry))
149+
{
150+
var dbContext = shardingDbContext.GetShareDbContext(dataSourceName, routeTail);
151+
bulkDicEntry = new BulkDicEntry<TEntity>(dbContext, new LinkedList<TEntity>());
152+
dataSourceBulkDicEntries.Add(routeTailIdentity, bulkDicEntry);
153+
}
154+
155+
bulkDicEntry.InnerEntities.AddLast(entity);
156+
}
157+
internal class BulkDicEntry<TEntity>
158+
{
159+
public BulkDicEntry(DbContext innerDbContext, LinkedList<TEntity> innerEntities)
160+
{
161+
InnerDbContext = innerDbContext;
162+
InnerEntities = innerEntities;
163+
}
164+
165+
public DbContext InnerDbContext { get; }
166+
public LinkedList<TEntity> InnerEntities { get; }
167+
}
168+
169+
public static Dictionary<DbContext, IEnumerable<TEntity>> BulkShardingTableEnumerable<TShardingDbContext, TEntity>(this TShardingDbContext shardingDbContext,
170+
IEnumerable<TEntity> entities) where TShardingDbContext : DbContext, IShardingDbContext
171+
where TEntity : class
172+
{
173+
var shardingRuntimeContext = shardingDbContext.GetShardingRuntimeContext();
174+
var entityMetadataManager = shardingRuntimeContext.GetEntityMetadataManager();
175+
if (entityMetadataManager.IsShardingDataSource(typeof(TEntity)))
176+
throw new ShardingCoreInvalidOperationException(typeof(TEntity).FullName);
177+
//if (!entityMetadataManager.IsShardingTable(typeof(TEntity)))
178+
// throw new ShardingCoreInvalidOperationException(typeof(TEntity).FullName);
179+
if (entities.IsEmpty())
180+
return new Dictionary<DbContext, IEnumerable<TEntity>>();
181+
return shardingDbContext.BulkShardingEnumerable(entities).First().Value;
182+
}
183+
/// <summary>
184+
/// 根据条件表达式解析
185+
/// </summary>
186+
/// <typeparam name="TShardingDbContext"></typeparam>
187+
/// <typeparam name="TEntity"></typeparam>
188+
/// <param name="shardingDbContext"></param>
189+
/// <param name="where"></param>
190+
/// <returns></returns>
191+
public static IDictionary<string, IEnumerable<DbContext>> BulkShardingExpression<TShardingDbContext, TEntity>(this TShardingDbContext shardingDbContext, Expression<Func<TEntity, bool>> where) where TEntity : class
192+
where TShardingDbContext : DbContext, IShardingDbContext
193+
{
194+
var shardingRuntimeContext = shardingDbContext.GetShardingRuntimeContext();
195+
var routeTailFactory = shardingRuntimeContext.GetRouteTailFactory();
196+
var dataSourceRouteManager = shardingRuntimeContext.GetDataSourceRouteManager();
197+
var tableRouteManager = shardingRuntimeContext.GetTableRouteManager();// (IVirtualTableManager)ShardingContainer.GetService(typeof(IVirtualTableManager<>).GetGenericType0(shardingDbContext.GetType()));
198+
var entityMetadataManager = shardingRuntimeContext.GetEntityMetadataManager();// (IEntityMetadataManager)ShardingContainer.GetService(typeof(IEntityMetadataManager<>).GetGenericType0(shardingDbContext.GetType()));
199+
200+
var dataSourceNames = dataSourceRouteManager.GetDataSourceNames(where);
201+
var result = new Dictionary<string, LinkedList<DbContext>>();
202+
var entityType = typeof(TEntity);
203+
204+
foreach (var dataSourceName in dataSourceNames)
205+
{
206+
if (!result.TryGetValue(dataSourceName, out var dbContexts))
207+
{
208+
dbContexts = new LinkedList<DbContext>();
209+
result.Add(dataSourceName, dbContexts);
210+
}
211+
if (entityMetadataManager.IsShardingTable(entityType))
212+
{
213+
var physicTables = tableRouteManager.RouteTo(entityType,new DataSourceRouteResult(dataSourceName),new ShardingTableRouteConfig(predicate: @where));
214+
if (physicTables.IsEmpty())
215+
throw new ShardingCoreException($"{where.ShardingPrint()} cant found any physic table");
216+
217+
var dbs = physicTables.Select(o => shardingDbContext.GetShareDbContext(dataSourceName, routeTailFactory.Create(o.Tail))).ToList();
218+
foreach (var dbContext in dbs)
219+
{
220+
dbContexts.AddLast(dbContext);
221+
}
222+
}
223+
else
224+
{
225+
var dbContext = shardingDbContext.GetShareDbContext(dataSourceName, routeTailFactory.Create(string.Empty));
226+
dbContexts.AddLast(dbContext);
227+
}
228+
229+
}
230+
231+
return result.ToDictionary(o => o.Key, o => (IEnumerable<DbContext>)o.Value);
232+
}
233+
234+
public static IEnumerable<DbContext> BulkShardingTableExpression<TShardingDbContext, TEntity>(this TShardingDbContext shardingDbContext, Expression<Func<TEntity, bool>> where) where TEntity : class
235+
where TShardingDbContext : DbContext, IShardingDbContext
236+
{
237+
var shardingRuntimeContext = shardingDbContext.GetShardingRuntimeContext();
238+
var entityMetadataManager = shardingRuntimeContext.GetEntityMetadataManager();// (IEntityMetadataManager)ShardingContainer.GetService(typeof(IEntityMetadataManager<>).GetGenericType0(shardingDbContext.GetType()));
239+
if (entityMetadataManager.IsShardingDataSource(typeof(TEntity)))
240+
throw new ShardingCoreInvalidOperationException(typeof(TEntity).FullName);
241+
return shardingDbContext.BulkShardingExpression<TShardingDbContext, TEntity>(where).First().Value;
242+
}
243+
}

samples/Sample.MySql/Controllers/WeatherForecastController.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ public async Task<IActionResult> Get()
105105

106106
// var firstOrDefault = _defaultTableDbContext.Set<SysUserMod>().FromSqlRaw($"select * from {nameof(SysUserMod)}").FirstOrDefault();
107107

108-
var sysUserMods1 = _defaultTableDbContext.Set<SysTest>()
108+
var sysUserMods1 = _defaultTableDbContext.Set<SysTest>().UseConnectionMode(1)
109109
.Select(o => new ssss(){ Id = o.Id, C = _abc.Select.Count(x => x.Id == o.Id) }).ToList();
110110
var sysUserMods2 = _defaultTableDbContext.Set<SysTest>()
111111
.Select(o => new ssss(){ Id = o.Id, C = GetAll().Count(x => x.Id == o.Id) }).ToList();

src/ShardingCore/EFCores/ShardingMigrator.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,15 +70,15 @@ public ShardingMigrator(IShardingRuntimeContext shardingRuntimeContext, IMigrati
7070
#endif
7171
public override void Migrate(string targetMigration = null)
7272
{
73-
this.MigrateAsync(targetMigration).WaitAndUnwrapException();
73+
this.MigrateAsync(targetMigration).WaitAndUnwrapException(false);
7474
// base.Migrate(targetMigration);
7575
}
7676

7777
public override async Task MigrateAsync(string targetMigration = null, CancellationToken cancellationToken = new CancellationToken())
7878
{
7979
var virtualDataSource = _shardingRuntimeContext.GetVirtualDataSource();
8080
var allDataSourceNames = virtualDataSource.GetAllDataSourceNames();
81-
await DynamicShardingHelper.DynamicMigrateWithDataSourcesAsync(_shardingRuntimeContext, allDataSourceNames, null,targetMigration,cancellationToken);
81+
await DynamicShardingHelper.DynamicMigrateWithDataSourcesAsync(_shardingRuntimeContext, allDataSourceNames, null,targetMigration,cancellationToken).ConfigureAwait(false);
8282

8383
}
8484
#if NET6_0 || NET5_0 || NETSTANDARD2_1

src/ShardingCore/Extensions/TaskExtension.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,12 @@ public static void WaitAndUnwrapException(this Task task)
4040
throw new ArgumentNullException(nameof(task));
4141
task.GetAwaiter().GetResult();
4242
}
43+
public static void WaitAndUnwrapException(this Task task,bool continueOnCapturedContext)
44+
{
45+
if (task == null)
46+
throw new ArgumentNullException(nameof(task));
47+
task.ConfigureAwait(continueOnCapturedContext).GetAwaiter().GetResult();
48+
}
4349

4450
/// <summary>
4551
/// Waits for the task to complete, unwrapping any exceptions.

src/ShardingCore/Helpers/DynamicShardingHelper.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -76,13 +76,13 @@ public static async Task DynamicMigrateWithDataSourcesAsync(IShardingRuntimeCont
7676
foreach (var migrationUnits in partitionMigrationUnits)
7777
{
7878
var migrateUnits = migrationUnits.Select(o =>new MigrateUnit(shellDbContext,o)).ToList();
79-
await ExecuteMigrateUnitsAsync(shardingRuntimeContext,migrateUnits,targetMigration,cancellationToken);
79+
await ExecuteMigrateUnitsAsync(shardingRuntimeContext,migrateUnits,targetMigration,cancellationToken).ConfigureAwait((false));
8080
}
8181

8282
//包含默认默认的单独最后一次处理
8383
if (allDataSourceNames.Contains(defaultDataSourceName))
8484
{
85-
await ExecuteMigrateUnitsAsync(shardingRuntimeContext,new List<MigrateUnit>(){new MigrateUnit(shellDbContext,defaultDataSourceName)},targetMigration,cancellationToken);
85+
await ExecuteMigrateUnitsAsync(shardingRuntimeContext,new List<MigrateUnit>(){new MigrateUnit(shellDbContext,defaultDataSourceName)},targetMigration,cancellationToken).ConfigureAwait(false);
8686
}
8787
}
8888
}
@@ -120,7 +120,7 @@ public static async Task DynamicMigrateWithDataSourcesAsync(IShardingRuntimeCont
120120

121121
},cancellationToken);
122122
}).ToArray();
123-
await TaskHelper.WhenAllFastFail(migrateTasks);
123+
await TaskHelper.WhenAllFastFail(migrateTasks).ConfigureAwait(false);
124124
}
125125

126126
public static DbContextOptions CreateShellDbContextOptions(IShardingRuntimeContext shardingRuntimeContext,Type dbContextType,string dataSourceName)

0 commit comments

Comments
 (0)