-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathimagestacker
More file actions
executable file
·174 lines (157 loc) · 7.15 KB
/
imagestacker
File metadata and controls
executable file
·174 lines (157 loc) · 7.15 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
#!/usr/bin/env python3
import argparse
import sys
import warnings
from PIL import Image
import tempfile
import subprocess
import os
from scipy.stats import multivariate_normal
import math
import numpy as np
import itertools
from multiprocessing import Pool, cpu_count
def run_rawtherapee(image_list, tempfile, pp3=None):
"""
Run rawtherapee on the list of .nef files, using the given pp3 file as an input.
Save the outputs as a tif in the specified temporary file.
If no pp3 file is specified, continue with rawtherapee defaults.
Return a list of the names of the processed TIF files
"""
proc_name = ['rawtherapee-cli']
proc_args = [] if pp3 is None else ['-p', pp3]
proc_args += ['-o', tempfile,
'-q',
'-t',
'-c', *image_list]
subprocess.run(proc_name + proc_args)
tempfile_contents = os.listdir(tempfile)
tempfile+='/'
return [tempfile+tif_name for tif_name in tempfile_contents
if tif_name.split('.')[-1]=='tif']
def load_images(image_names):
"""
For a given image name, load the image.
If a string is passed, return the image object corresponding to the loaded image.
If a list of strings are given, return a list of image objects.
"""
if isinstance(image_names, str):
image_names = [image_names]
warnings.simplefilter('error', Image.DecompressionBombWarning) # raise exception for decompression bomb
try:
ret = [Image.open(s).convert('RGB') for s in image_names]
except IOError as error_message:
sys.exit(error_message + "\nExiting.")
return ret[0] if len(ret)==1 else ret
def is_consistent_sizes(image_objects):
"""
Take a list of image objects. Return true if all images in the list are the
same size, false otherwise.
"""
sizes_set = set([im.size for im in image_objects])
return len(sizes_set)==1
def align_images(image_objects):
"""
End goal: take a list of images, return the same list of aligned images
"""
# TODO implement
warnings.warn("Image alignment is not currently implemented, continuing without alignment")
return image_objects
def build_pdf(pixel_list):
"""
Take an n-by-k array of values. Let each row be an RGB tuple.
Return a callable pdf function, with the pdf given by a
sum of gaussians centered at each pixel's colour tuple.
"""
pix_array = np.array(pixel_list)
covariance = np.cov(pix_array.T)
def probability_density(x):
return sum([multivariate_normal.pdf(x, mean=pix, cov=covariance, allow_singular=True)
for pix in pixel_list])
return probability_density
def find_max_likelihood_pixel(pixel_list, pdf):
"""
Given a k-by-3 array of pixel values and a pixel PDF,
find the pixel in the list that maximises the pdf.
Return the maximum likelihood pixel in the list.
"""
return max(pixel_list, key=pdf)
class progress_bar:
iteration = 0
max_iter = 0
last_floor = -1
def __init__(self,iters):
self.max_iter = iters
def print(self):
progress=100*self.iteration/self.max_iter
print("\r[ {0} ] {1}%".format('#'*math.floor(progress/5)+'-'*(20-math.floor(progress/5)),math.floor(progress)),end="",flush=True)
def update(self, n_iterations=1):
self.iteration = self.iteration+n_iterations
progress=100*self.iteration/self.max_iter
if math.floor(progress) != self.last_floor:
self.last_floor = math.floor(progress)
print("\r[ {0} ] {1}%".format('#'*math.floor(progress/5)+'-'*(20-math.floor(progress/5)),self.last_floor),end="",flush=True)
if __name__ == "__main__":
# parse args
parser = argparse.ArgumentParser(description="Take a set of RAW files. Process them using rawtherapee. Align the resulting images. Use the set of images to construct a single maximum likelihood photograph, hopefully removing any people from the photo.")
parser.add_argument("image_names", nargs="+",
help="Name of RAW files to process")
parser.add_argument("-a", "--align", action="store_true",
help="Align the images before stacking them, if set (default no alignment)")
parser.add_argument("-n", "--nodevelop", action="store_true",
help="Don't develop RAW files, and instead run the stacker on unprocessed input files")
parser.add_argument("-m", "--multicore", action="store_true",
help="Share image stacking between multiple cores, if set (default false)")
parser.add_argument("-p", "--pp3", type=str, default=None,
help="pp3 file for processing RAWs. Rawtherapee defaults used, if pp3 file is omitted (default)")
parser.add_argument("-g", "--gaussian", action="store_true",
help="Use a gaussian mixture model to determine optimal pixel value (very slow!)")
parser.add_argument("-o", "--output", type=str, default="output.jpg",
help="Output file name and extension, default output.jpg")
args = parser.parse_args()
with tempfile.TemporaryDirectory() as td:
# Load images
if args.nodevelop:
imagename_list = args.image_names
image_objects = load_images(imagename_list)
else:
processed_images = run_rawtherapee(args.image_names, td, args.pp3)
image_objects = load_images(processed_images)
print("Found {0} files for stacking".format(len(image_objects)))
# Check all images are the same size
if not is_consistent_sizes(image_objects):
sys.exit("Error: images are of different sizes, so cannot be stacked. Exiting.")
if args.align:
align_images(image_objects)
# Stack
image_size = image_objects[0].size
output_image_object = Image.new(mode = 'RGB', size=image_size, color=(255,0,0))
output_image_pixels = output_image_object.load()
pixel_coords = itertools.product(range(image_size[0]), range(image_size[1]))
progress = progress_bar(image_size[0]*image_size[1])
def stack_pixel(coord, n_progress_updates=1):
pixel_list = [im.getpixel(coord) for im in image_objects]
if args.gaussian:
pdf = build_pdf(current_pixel_list)
output_val = find_max_likelihood_pixel(pixel_list, pdf)
else:
pix_array = np.array(pixel_list)
medians = np.median(pix_array, axis=0)
medians = np.round(medians)
output_val = tuple([int(x) for x in medians])
progress.update(n_progress_updates)
return output_val
print("Starting stack")
if args.multicore:
with Pool() as pool:
n_cores = cpu_count()
outvals = pool.starmap(stack_pixel, ((c,n_cores) for c in pixel_coords))
else:
outvals = []
for pixel_coord in pixel_coords:
outvals.append(stack_pixel(pixel_coord))
pixel_coords = itertools.product(range(image_size[0]), range(image_size[1]))
for (x,y), val in zip(pixel_coords, outvals):
output_image_pixels[x,y] = val
output_image_object.save(args.output)
print("Saved stack to {0}".format(args.out))