-
Notifications
You must be signed in to change notification settings - Fork 18
Expand file tree
/
Copy pathAntColonyOptimizer.py
More file actions
272 lines (242 loc) · 11.8 KB
/
AntColonyOptimizer.py
File metadata and controls
272 lines (242 loc) · 11.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
import numpy as np
import matplotlib.pyplot as plt
import time
import warnings
warnings.filterwarnings("ignore")
class AntColonyOptimizer:
def __init__(self, ants, evaporation_rate, intensification, alpha=1.0, beta=0.0, beta_evaporation_rate=0,
choose_best=.1):
"""
Ant colony optimizer. Traverses a graph and finds either the max or min distance between nodes.
:param ants: number of ants to traverse the graph
:param evaporation_rate: rate at which pheromone evaporates
:param intensification: constant added to the best path
:param alpha: weighting of pheromone
:param beta: weighting of heuristic (1/distance)
:param beta_evaporation_rate: rate at which beta decays (optional)
:param choose_best: probability to choose the best route
"""
# Parameters
self.ants = ants
self.evaporation_rate = evaporation_rate
self.pheromone_intensification = intensification
self.heuristic_alpha = alpha
self.heuristic_beta = beta
self.beta_evaporation_rate = beta_evaporation_rate
self.choose_best = choose_best
# Internal representations
self.pheromone_matrix = None
self.heuristic_matrix = None
self.probability_matrix = None
self.map = None
self.set_of_available_nodes = None
# Internal stats
self.best_series = []
self.best = None
self.fitted = False
self.best_path = None
self.fit_time = None
# Plotting values
self.stopped_early = False
def __str__(self):
string = "Ant Colony Optimizer"
string += "\n--------------------"
string += "\nDesigned to optimize either the minimum or maximum distance between nodes in a square matrix that behaves like a distance matrix."
string += "\n--------------------"
string += "\nNumber of ants:\t\t\t\t{}".format(self.ants)
string += "\nEvaporation rate:\t\t\t{}".format(self.evaporation_rate)
string += "\nIntensification factor:\t\t{}".format(self.pheromone_intensification)
string += "\nAlpha Heuristic:\t\t\t{}".format(self.heuristic_alpha)
string += "\nBeta Heuristic:\t\t\t\t{}".format(self.heuristic_beta)
string += "\nBeta Evaporation Rate:\t\t{}".format(self.beta_evaporation_rate)
string += "\nChoose Best Percentage:\t\t{}".format(self.choose_best)
string += "\n--------------------"
string += "\nUSAGE:"
string += "\nNumber of ants influences how many paths are explored each iteration."
string += "\nThe alpha and beta heuristics affect how much influence the pheromones or the distance heuristic weigh an ants' decisions."
string += "\nBeta evaporation reduces the influence of the heuristic over time."
string += "\nChoose best is a percentage of how often an ant will choose the best route over probabilistically choosing a route based on pheromones."
string += "\n--------------------"
if self.fitted:
string += "\n\nThis optimizer has been fitted."
else:
string += "\n\nThis optimizer has NOT been fitted."
return string
def _initialize(self):
"""
Initializes the model by creating the various matrices and generating the list of available nodes
"""
assert self.map.shape[0] == self.map.shape[1], "Map is not a distance matrix!"
num_nodes = self.map.shape[0]
self.pheromone_matrix = np.ones((num_nodes, num_nodes))
# Remove the diagonal since there is no pheromone from node i to itself
self.pheromone_matrix[np.eye(num_nodes) == 1] = 0
self.heuristic_matrix = 1 / self.map
self.probability_matrix = (self.pheromone_matrix ** self.heuristic_alpha) * (
self.heuristic_matrix ** self.heuristic_beta) # element by element multiplcation
self.set_of_available_nodes = list(range(num_nodes))
def _reinstate_nodes(self):
"""
Resets available nodes to all nodes for the next iteration
"""
self.set_of_available_nodes = list(range(self.map.shape[0]))
def _update_probabilities(self):
"""
After evaporation and intensification, the probability matrix needs to be updated. This function
does that.
"""
self.probability_matrix = (self.pheromone_matrix ** self.heuristic_alpha) * (
self.heuristic_matrix ** self.heuristic_beta)
def _choose_next_node(self, from_node):
"""
Chooses the next node based on probabilities. If p < p_choose_best, then the best path is chosen, otherwise
it is selected from a probability distribution weighted by the pheromone.
:param from_node: the node the ant is coming from
:return: index of the node the ant is going to
"""
numerator = self.probability_matrix[from_node, self.set_of_available_nodes]
if np.random.random() < self.choose_best:
next_node = np.argmax(numerator)
else:
denominator = np.sum(numerator)
probabilities = numerator / denominator
next_node = np.random.choice(range(len(probabilities)), p=probabilities)
return next_node
def _remove_node(self, node):
self.set_of_available_nodes.remove(node)
def _evaluate(self, paths, mode):
"""
Evaluates the solutions of the ants by adding up the distances between nodes.
:param paths: solutions from the ants
:param mode: max or min
:return: x and y coordinates of the best path as a tuple, the best path, and the best score
"""
scores = np.zeros(len(paths))
coordinates_i = []
coordinates_j = []
for index, path in enumerate(paths):
score = 0
coords_i = []
coords_j = []
for i in range(len(path) - 1):
coords_i.append(path[i])
coords_j.append(path[i + 1])
score += self.map[path[i], path[i + 1]]
scores[index] = score
coordinates_i.append(coords_i)
coordinates_j.append(coords_j)
if mode == 'min':
best = np.argmin(scores)
elif mode == 'max':
best = np.argmax(scores)
return (coordinates_i[best], coordinates_j[best]), paths[best], scores[best]
def _evaporation(self):
"""
Evaporate some pheromone as the inverse of the evaporation rate. Also evaporates beta if desired.
"""
self.pheromone_matrix *= (1 - self.evaporation_rate)
self.heuristic_beta *= (1 - self.beta_evaporation_rate)
def _intensify(self, best_coords):
"""
Increases the pheromone by some scalar for the best route.
:param best_coords: x and y (i and j) coordinates of the best route
"""
i = best_coords[0]
j = best_coords[1]
self.pheromone_matrix[i, j] += self.pheromone_intensification
def fit(self, map_matrix, iterations=100, mode='min', early_stopping_count=20, verbose=True):
"""
Fits the ACO to a specific map. This was designed with the Traveling Salesman problem in mind.
:param map_matrix: Distance matrix or some other matrix with similar properties
:param iterations: number of iterations
:param mode: whether to get the minimum path or maximum path
:param early_stopping_count: how many iterations of the same score to make the algorithm stop early
:return: the best score
"""
if verbose: print("Beginning ACO Optimization with {} iterations...".format(iterations))
self.map = map_matrix
start = time.time()
self._initialize()
num_equal = 0
for i in range(iterations):
start_iter = time.time()
paths = []
path = []
for ant in range(self.ants):
current_node = self.set_of_available_nodes[np.random.randint(0, len(self.set_of_available_nodes))]
start_node = current_node
while True:
path.append(current_node)
self._remove_node(current_node)
if len(self.set_of_available_nodes) != 0:
current_node_index = self._choose_next_node(current_node)
current_node = self.set_of_available_nodes[current_node_index]
else:
break
path.append(start_node) # go back to start
self._reinstate_nodes()
paths.append(path)
path = []
best_path_coords, best_path, best_score = self._evaluate(paths, mode)
if i == 0:
best_score_so_far = best_score
else:
if mode == 'min':
if best_score < best_score_so_far:
best_score_so_far = best_score
self.best_path = best_path
elif mode == 'max':
if best_score > best_score_so_far:
best_score_so_far = best_score
self.best_path = best_path
if best_score == best_score_so_far:
num_equal += 1
else:
num_equal = 0
self.best_series.append(best_score)
self._evaporation()
self._intensify(best_path_coords)
self._update_probabilities()
if verbose: print("Best score at iteration {}: {}; overall: {} ({}s)"
"".format(i, round(best_score, 2), round(best_score_so_far, 2),
round(time.time() - start_iter)))
if best_score == best_score_so_far and num_equal == early_stopping_count:
self.stopped_early = True
print("Stopping early due to {} iterations of the same score.".format(early_stopping_count))
break
self.fit_time = round(time.time() - start)
self.fitted = True
if mode == 'min':
self.best = self.best_series[np.argmin(self.best_series)]
if verbose: print(
"ACO fitted. Runtime: {} minutes. Best score: {}".format(self.fit_time // 60, self.best))
return self.best
elif mode == 'max':
self.best = self.best_series[np.argmax(self.best_series)]
if verbose: print(
"ACO fitted. Runtime: {} minutes. Best score: {}".format(self.fit_time // 60, self.best))
return self.best
else:
raise ValueError("Invalid mode! Choose 'min' or 'max'.")
def plot(self):
"""
Plots the score over time after the model has been fitted.
:return: None if the model isn't fitted yet
"""
if not self.fitted:
print("Ant Colony Optimizer not fitted! There exists nothing to plot.")
return None
else:
fig, ax = plt.subplots(figsize=(20, 15))
ax.plot(self.best_series, label="Best Run")
ax.set_xlabel("Iteration")
ax.set_ylabel("Performance")
ax.text(.8, .6,
'Ants: {}\nEvap Rate: {}\nIntensify: {}\nAlpha: {}\nBeta: {}\nBeta Evap: {}\nChoose Best: {}\n\nFit Time: {}m{}'.format(
self.ants, self.evaporation_rate, self.pheromone_intensification, self.heuristic_alpha,
self.heuristic_beta, self.beta_evaporation_rate, self.choose_best, self.fit_time // 60,
["\nStopped Early!" if self.stopped_early else ""][0]),
bbox={'facecolor': 'gray', 'alpha': 0.8, 'pad': 10}, transform=ax.transAxes)
ax.legend()
plt.title("Ant Colony Optimization Results (best: {})".format(np.round(self.best, 2)))
plt.show()