-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.go
More file actions
143 lines (115 loc) · 3.1 KB
/
main.go
File metadata and controls
143 lines (115 loc) · 3.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
package main
import (
"fmt"
"log"
"runtime"
"sync"
"time"
"github.com/imrandil/the_real_world/db"
"github.com/imrandil/the_real_world/generate"
"github.com/imrandil/the_real_world/models"
_ "github.com/lib/pq"
)
func fanIn(done <-chan int, channels ...<-chan models.QueryResult) <-chan models.QueryResult {
var wg sync.WaitGroup
fannedInStream := make(chan models.QueryResult)
transfer := func(c <-chan models.QueryResult) {
defer wg.Done()
for i := range c {
select {
case <-done:
return
case fannedInStream <- i:
}
}
}
for _, c := range channels {
wg.Add(1)
go transfer(c)
}
go func() {
wg.Wait()
close(fannedInStream)
}()
return fannedInStream
}
func main() {
serviceURI := "postgres://*********" //this will go into env file
start := time.Now()
db, err := db.ConnectDB(serviceURI)
if err != nil {
log.Fatal(err)
}
defer db.Close()
done := make(chan int)
defer close(done)
//process csv file
//data.ProcessCSV()
//drop the table if exists..
// err = models.DropTable(db)
// if err != nil {
// log.Fatal(err)
// }
// Create the table if it doesn't exist
// err = models.CreateTable(db)
// if err != nil {
// log.Fatal(err)
// }
// data, err := utils.LoadDataFromJSON("output.json")
// if err != nil {
// log.Fatal(err)
// }
//now insert the data into db
// err = models.InsertMarketDataBulk(db, data)
// if err != nil {
// log.Fatal(err)
// }
// Calculate the number of rows each goroutine should process
CPUCount := runtime.NumCPU()
count, err := models.GetCountOfTable(db)
if err != nil {
log.Fatal(err)
}
rowsPerRoutine := count / CPUCount
remainingRows := count % CPUCount // Distribute the remaining rows among the goroutines
startRow := 0
dataRetrievalChannels := make([]<-chan models.QueryResult, CPUCount)
// Spawn goroutines to retrieve data from the database
for i := 0; i < CPUCount; i++ {
// Calculate the end row for this goroutine
endRow := startRow + rowsPerRoutine
if i < remainingRows {
endRow++ // Distribute remaining rows among the first few goroutines
}
// Retrieve data from the specified range of rows
dataRetrievalChannels[i] = models.GetTableCountAndItems(db, startRow, endRow, done)
// Update start row for the next goroutine
startRow = endRow
}
// Fan in results from all goroutines
// Collect items from all goroutines
var allItems []*models.MarketData
for result := range fanIn(done, dataRetrievalChannels...) {
if result.Err != nil {
fmt.Println("Error:", result.Err)
continue
}
allItems = append(allItems, result.Items...)
// items := result.Items
// for _, item := range items {
// fmt.Printf("%+v\n", *item)
// }
// fmt.Println("length of items", len(items))
}
// Generate PDF with all retrieved items
err = generate.GeneratePDF(allItems)
if err != nil {
log.Fatal(err)
}
// fmt.Printf("data %+v\n", items) // Use %+v to print struct field names with values
// fmt.Println("Data inserted successfully")
fmt.Println(time.Since(start))
fmt.Println("count row", count)
fmt.Println("PDF generated successfully")
// fmt.Println("nubmerofrows", len(items))
}