-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathfile.go
More file actions
113 lines (105 loc) · 2.41 KB
/
file.go
File metadata and controls
113 lines (105 loc) · 2.41 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package main
import (
"bufio"
"bytes"
"fmt"
"hash/fnv"
"log"
"os"
"strconv"
"strings"
)
// splitHugeFileToN split a huge file to N parts, return files name
func splitHugeFileToN(filePath string, N int) []fileWrite {
var count = 0
var wordChan = make(chan word)
var subFiles = make([]fileWrite, count)
var h = fnv.New64()
go readFileToWord(filePath, wordChan)
// create sub file
for {
fileName := fmt.Sprintf("subfile_%d.txt", count)
f, err := os.Create(fileName)
if err != nil {
log.Printf("create file %s failed", fileName)
continue
}
// buffer = 1Gb
subFiles = append(subFiles, fileWrite{f, bufio.NewWriterSize(f, 1024*1024*1024)})
count++
if count > N-1 {
break
}
}
for i := range wordChan {
hash, _ := h.Write(i.value)
//write string|index word to file
_, err := subFiles[hash%N].wr.WriteString(fmt.Sprintf("%s|%d\n", string(i.value), i.index))
if err != nil {
log.Printf("str: %s write failed", string(i.value))
}
}
for i := range subFiles {
if err := subFiles[i].wr.Flush(); err != nil {
log.Printf("file: %s last flush failed", subFiles[i].File.Name())
}
}
return subFiles
}
//readFileToWord read file and send word to chan
func readFileToWord(filePath string, wordChan chan word) {
var index int64
f, err := os.Open(filePath)
if err != nil {
log.Fatal(err)
}
defer func() {
f.Close()
close(wordChan)
}()
scanner := bufio.NewScanner(f)
for scanner.Scan() {
for _, value := range bytes.Split(scanner.Bytes(), []byte(",")) {
wordChan <- word{
value: value,
index: index,
}
index++
}
}
if err := scanner.Err(); err != nil {
log.Fatal(err)
}
}
//readFileToWordMap read file and return a map with wordMeta
func readFileToWordMap(filePath string) map[string]wordMeta {
var filter = make(map[string]wordMeta)
f, err := os.Open(filePath)
if err != nil {
log.Fatal(err)
}
defer func() {
f.Close()
}()
scanner := bufio.NewScanner(f)
for scanner.Scan() {
value := strings.Split(string(scanner.Bytes()), "|")
ind, err := strconv.ParseInt(value[1], 10, 64)
if err != nil {
log.Print(err)
}
log.Printf("value[0]: %s , value[1]: %s , int value[1]:%d", value[0], value[1], ind)
if v, ok := filter[value[0]]; ok {
filter[value[0]] = wordMeta{value[1], v.count + 1}
} else {
filter[value[0]] = wordMeta{
index: value[1],
count: 1,
}
}
}
if err := scanner.Err(); err != nil {
log.Fatal(err)
}
return filter
}