|
1 | 1 | using System; |
| 2 | +using System.Collections.Concurrent; |
| 3 | +using System.Collections.Generic; |
| 4 | +using System.Linq; |
2 | 5 | using System.Threading; |
3 | 6 |
|
4 | 7 | namespace NHibernate.Test |
5 | 8 | { |
6 | 9 | public class MultiThreadRunner<T> |
7 | 10 | { |
8 | 11 | public delegate void ExecuteAction(T subject); |
9 | | - private readonly int numThreads; |
10 | | - private readonly ExecuteAction[] actions; |
11 | | - private readonly Random rnd = new Random(); |
12 | | - private bool running; |
13 | | - private int timeout = 1000; |
14 | | - private int timeoutBetweenThreadStart = 30; |
15 | 12 |
|
16 | | - public MultiThreadRunner(int numThreads, ExecuteAction[] actions) |
| 13 | + private readonly int _numThreads; |
| 14 | + private readonly ExecuteAction[] _actions; |
| 15 | + private readonly Random _rnd = new Random(); |
| 16 | + private volatile bool _running; |
| 17 | + private ConcurrentQueue<Exception> _errors = new ConcurrentQueue<Exception>(); |
| 18 | + |
| 19 | + public MultiThreadRunner(int numThreads, params ExecuteAction[] actions) |
17 | 20 | { |
18 | | - if(numThreads < 1) |
| 21 | + if (numThreads < 1) |
19 | 22 | { |
20 | | - throw new ArgumentOutOfRangeException("numThreads",numThreads,"Must be GT 1"); |
| 23 | + throw new ArgumentOutOfRangeException(nameof(numThreads), numThreads, "Must be GTE 1"); |
21 | 24 | } |
22 | 25 | if (actions == null || actions.Length == 0) |
23 | 26 | { |
24 | | - throw new ArgumentNullException("actions"); |
| 27 | + throw new ArgumentNullException(nameof(actions)); |
25 | 28 | } |
26 | | - foreach (ExecuteAction action in actions) |
| 29 | + if (actions.Any(action => action == null)) |
27 | 30 | { |
28 | | - if(action==null) |
29 | | - throw new ArgumentNullException("actions", "null delegate"); |
| 31 | + throw new ArgumentNullException(nameof(actions), "null delegate"); |
30 | 32 | } |
31 | | - this.numThreads = numThreads; |
32 | | - this.actions = actions; |
| 33 | + _numThreads = numThreads; |
| 34 | + _actions = actions; |
33 | 35 | } |
34 | 36 |
|
35 | | - public int EndTimeout |
36 | | - { |
37 | | - get { return timeout; } |
38 | | - set { timeout = value; } |
39 | | - } |
| 37 | + public int EndTimeout { get; set; } = 1000; |
40 | 38 |
|
41 | | - public int TimeoutBetweenThreadStart |
42 | | - { |
43 | | - get { return timeoutBetweenThreadStart; } |
44 | | - set { timeoutBetweenThreadStart = value; } |
45 | | - } |
| 39 | + public int TimeoutBetweenThreadStart { get; set; } = 30; |
46 | 40 |
|
47 | | - public void Run(T subjectInstance) |
48 | | - { |
49 | | - running = true; |
50 | | - Thread[] t = new Thread[numThreads]; |
51 | | - for (int i = 0; i < numThreads; i++) |
52 | | - { |
53 | | - t[i] = new Thread(ThreadProc); |
54 | | - t[i].Name = i.ToString(); |
55 | | - t[i].Start(subjectInstance); |
56 | | - if (i > 2) |
57 | | - Thread.Sleep(timeoutBetweenThreadStart); |
58 | | - } |
| 41 | + public Exception[] GetErrors() => _errors.ToArray(); |
| 42 | + public void ClearErrors() => _errors = new ConcurrentQueue<Exception>(); |
59 | 43 |
|
60 | | - Thread.Sleep(timeout); |
| 44 | + public int Run(T subjectInstance) |
| 45 | + { |
| 46 | + var allThreads = new List<ThreadHolder<T>>(); |
61 | 47 |
|
62 | | - // Tell the threads to shut down, then wait until they all |
63 | | - // finish. |
64 | | - running = false; |
65 | | - for (int i = 0; i < numThreads; i++) |
| 48 | + var launcher = new Thread( |
| 49 | + () => |
| 50 | + { |
| 51 | + try |
| 52 | + { |
| 53 | + for (var i = 0; i < _numThreads; i++) |
| 54 | + { |
| 55 | + var threadHolder = new ThreadHolder<T> |
| 56 | + { |
| 57 | + Thread = new Thread(ThreadProc) { Name = i.ToString() }, |
| 58 | + Subject = subjectInstance |
| 59 | + }; |
| 60 | + threadHolder.Thread.Start(threadHolder); |
| 61 | + allThreads.Add(threadHolder); |
| 62 | + if (i > 2 && TimeoutBetweenThreadStart > 0) |
| 63 | + Thread.Sleep(TimeoutBetweenThreadStart); |
| 64 | + } |
| 65 | + } |
| 66 | + catch (Exception e) |
| 67 | + { |
| 68 | + _errors.Enqueue(e); |
| 69 | + throw; |
| 70 | + } |
| 71 | + }); |
| 72 | + var totalLoops = 0; |
| 73 | + _running = true; |
| 74 | + // Use a separated thread for launching in case too many threads are asked: the inner Start will freeze |
| 75 | + // but would be able to resume once _running would have been set to false, causing first threads to stop. |
| 76 | + launcher.Start(); |
| 77 | + // Sleep for the required timeout, taking into account the start delay (if all threads are launchable without |
| 78 | + // having to wait due to thread starvation). |
| 79 | + Thread.Sleep(TimeoutBetweenThreadStart * _numThreads + EndTimeout); |
| 80 | + // Tell the threads to shut down, then wait until they all finish. |
| 81 | + _running = false; |
| 82 | + launcher.Join(); |
| 83 | + foreach (var threadHolder in allThreads.Where(t => t != null)) |
66 | 84 | { |
67 | | - t[i].Join(); |
| 85 | + threadHolder.Thread.Join(); |
| 86 | + totalLoops += threadHolder.LoopsDone; |
68 | 87 | } |
| 88 | + return totalLoops; |
69 | 89 | } |
70 | 90 |
|
71 | 91 | private void ThreadProc(object arg) |
72 | 92 | { |
73 | | - T subjectInstance = (T) arg; |
74 | | - while (running) |
| 93 | + try |
| 94 | + { |
| 95 | + var holder = (ThreadHolder<T>) arg; |
| 96 | + while (_running) |
| 97 | + { |
| 98 | + var actionIdx = _rnd.Next(0, _actions.Length); |
| 99 | + _actions[actionIdx](holder.Subject); |
| 100 | + holder.LoopsDone++; |
| 101 | + } |
| 102 | + } |
| 103 | + catch (Exception e) |
75 | 104 | { |
76 | | - int actionIdx = rnd.Next(0, actions.Length); |
77 | | - actions[actionIdx](subjectInstance); |
| 105 | + _errors.Enqueue(e); |
| 106 | + throw; |
78 | 107 | } |
79 | 108 | } |
| 109 | + |
| 110 | + private class ThreadHolder<TH> |
| 111 | + { |
| 112 | + public Thread Thread { get; set; } |
| 113 | + public int LoopsDone { get; set; } |
| 114 | + public TH Subject { get; set; } |
| 115 | + } |
80 | 116 | } |
81 | 117 | } |
0 commit comments