-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathProgram.cs
More file actions
121 lines (93 loc) · 3.49 KB
/
Program.cs
File metadata and controls
121 lines (93 loc) · 3.49 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
using Akka;
using Akka.Actor;
using Akka.Streams;
using Akka.Streams.Dsl;
using Akka.Util;
using var sys = ActorSystem.Create("system");
using var mat = sys.Materializer();
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen();
var app = builder.Build();
app.UseSwagger();
var apiToken = Environment.GetEnvironmentVariable("github_api_token");
app.MapPost("/collect", RunJob);
app.UseSwaggerUI();
app.Run();
IResult RunJob(List<string> usernames)
{
var source = Source.From(usernames)
.Select(user => $"https://api.github.com/users/{user}/events?per_page=10&page=1");
var worker = Flow.Create<string>()
.Via(Flows.Fetcher<Activity>(apiToken))
.SelectAsync(1, async x =>
{
await Task.Delay(1000);
return x;
});
var sink = Sink.ForEach<Page<Activity>>(x =>
{
Console.WriteLine(x.Url);
Console.WriteLine(string.Join(" | ", x.GroupBy(x => x.Type).Select(x => $"{x.Key}: {x.Count()}")));
Console.WriteLine();
});
var maxLevelOfParallelism = 2;
var levelOfParallelism = usernames.Count() > maxLevelOfParallelism ? maxLevelOfParallelism : usernames.Count();
source.Via(Flows.Balancer(worker, levelOfParallelism))
.RunWith(sink, mat);
return Results.Ok();
}
public static class Flows
{
public static Flow<string, Page<T>, NotUsed> Fetcher<T>(string? apiToken)
{
return Flow.Create<string>()
.ConcatMany(firstUrl =>
{
var httpClient = new HttpClient();
if (apiToken != null)
{
httpClient.DefaultRequestHeaders.Add("authorization", $"token {apiToken}");
}
httpClient.DefaultRequestHeaders.UserAgent.Add(new System.Net.Http.Headers.ProductInfoHeaderValue(new System.Net.Http.Headers.ProductHeaderValue("github-useractivity-client")));
return Source.UnfoldAsync(firstUrl, url =>
{
var pageTask = httpClient.GetAsync(url);
var next = pageTask.ContinueWith(task =>
{
var responseMessage = task.Result;
var content = responseMessage.Content.ReadFromJsonAsync<IReadOnlyList<T>>().Result ?? new List<T>();
var nextLink = responseMessage.GetNextLink();
if (nextLink == null) return Option<(string, Page<T>)>.None;
else return new Option<(string, Page<T>)>((nextLink, new(content, url)));
});
return next;
});
});
}
public static Flow<TIn, TOut, NotUsed> Balancer<TIn, TOut>(Flow<TIn, TOut, NotUsed> worker, int workerCount)
{
return Flow.FromGraph(GraphDsl.Create(b =>
{
var balancer = b.Add(new Balance<TIn>(workerCount, waitForAllDownstreams: true));
var merge = b.Add(new Merge<TOut>(workerCount));
for (var i = 0; i < workerCount; i++)
b.From(balancer).Via(worker.Async()).To(merge);
return new FlowShape<TIn, TOut>(balancer.In, merge.Out);
}));
}
}
public class Activity
{
public string? Id { get; set; }
public string? Type { get; set; }
}
public class Page<T> : List<T>
{
public Page(IReadOnlyList<T> list, string url)
: base(list)
{
Url = url;
}
public string Url { get; }
}