-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdownsampling.py
More file actions
147 lines (114 loc) · 5.18 KB
/
downsampling.py
File metadata and controls
147 lines (114 loc) · 5.18 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
import os
import random
import argparse
from typing import List, Tuple, Dict
def read_ellipse_data(file_path: str) -> List[str]:
"""Read ellipse data file"""
try:
with open(file_path, 'r', encoding='utf-8') as file:
return file.readlines()
except Exception as e:
print(f"Failed to read file {file_path}: {e}")
return []
def sample_ellipse_data(data_lines: List[str], target_lines: int) -> List[str]:
"""
Sample data from a single file, maintaining original order
Args:
data_lines: All lines in the file
target_lines: Number of lines to keep for the current file
Returns:
List of sampled lines (maintaining original order)
"""
if not data_lines:
return []
total_lines = len(data_lines)
# If file lines are less than target lines, keep all
if total_lines <= target_lines:
print(f"Warning: File line count ({total_lines}) is less than target count ({target_lines}), keeping all lines")
return data_lines
# Calculate sampling interval to maintain uniform distribution
step = total_lines / target_lines
indices = [int(i * step) for i in range(target_lines)]
# Select lines in original order
return [data_lines[i] for i in indices]
def write_sampled_data(file_path: str, data_lines: List[str]) -> None:
"""Write sampled data"""
try:
with open(file_path, 'w', encoding='utf-8') as file:
file.writelines(data_lines)
print(f"Sampled data written to {file_path}")
except Exception as e:
print(f"Failed to write file {file_path}: {e}")
def main(root_path: str, total_target_lines: int = 20000) -> None:
"""Main function: Process all ellipse data files in the specified directory"""
# Get all txt files
txt_files = [f for f in os.listdir(root_path) if f.endswith('.txt')]
if not txt_files:
print(f"No txt files found in directory {root_path}")
return
# Read line counts of all files
file_line_counts: Dict[str, int] = {}
total_lines = 0
for txt_file in txt_files:
file_path = os.path.join(root_path, txt_file)
lines = read_ellipse_data(file_path)
count = len(lines)
file_line_counts[txt_file] = (count, lines)
total_lines += count
print(f"Found {len(txt_files)} txt files, total lines: {total_lines}")
print(f"Total target lines: {total_target_lines}")
if total_lines <= total_target_lines:
print("Total data lines already less than or equal to target, no sampling needed")
return
# Calculate lines to keep for each file - use proportional allocation to ensure exact total
file_target_counts: Dict[str, int] = {}
# Allocate target lines based on original line count proportion
remaining_lines = total_target_lines
remaining_files = len(txt_files)
# Sort by line count, process files with more lines first
sorted_files = sorted(file_line_counts.items(),
key=lambda x: x[1][0], reverse=True)
# Calculate lines to keep per file (proportional)
for file_name, (count, _) in sorted_files:
if remaining_files <= 0:
break
# Calculate theoretical lines to keep (proportional)
if total_lines > 0:
theoretical_lines = int(count * total_target_lines / total_lines)
else:
theoretical_lines = 0
# Ensure at least 1 line is kept (if possible)
min_lines = min(count, max(1, theoretical_lines))
# Process the last file to ensure exact total
if remaining_files == 1:
target_lines = remaining_lines
else:
# Avoid allocating too many lines resulting in exceeding target
target_lines = min(min_lines, remaining_lines - (remaining_files - 1))
file_target_counts[file_name] = target_lines
remaining_lines -= target_lines
remaining_files -= 1
# Execute sampling and write to files
current_total = 0
for file_name, (count, lines) in sorted_files:
target_lines = file_target_counts.get(file_name, 0)
if target_lines <= 0:
print(f"Warning: File {file_name} was not allocated any lines to keep")
continue
sampled_lines = sample_ellipse_data(lines, target_lines)
current_total += len(sampled_lines)
file_path = os.path.join(root_path, file_name)
write_sampled_data(file_path, sampled_lines)
print(f"Sampling complete, final total lines: {current_total} (Target: {total_target_lines})")
if current_total != total_target_lines:
print(f"Warning: Final line count differs from target, difference: {current_total - total_target_lines}")
print("This may be due to some files having too few lines")
if __name__ == "__main__":
# Set command line arguments
parser = argparse.ArgumentParser(description='Ellipse data sampling tool')
parser.add_argument('--root_path', type=str, required=True,
help='Directory containing ellipse data files')
parser.add_argument('--nums', type=int, default=20000,
help='Total number of lines to keep')
args = parser.parse_args()
main(args.root_path, args.nums)