Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added __pycache__/mutation.cpython-312.pyc
Binary file not shown.
Binary file added __pycache__/utils.cpython-312.pyc
Binary file not shown.
Binary file added __pycache__/utils.cpython-313.pyc
Binary file not shown.
9 changes: 9 additions & 0 deletions ga/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
"""GA package: operators, population, evaluator for Rute SPKLU problem."""
from .operators import selection, tournament_selection, get_elites, bcrc_crossover, apply_mutation, apply_mutation_advanced
from .evaluator import fitness_function
from .population import generate_random_population

__all__ = [
'selection', 'tournament_selection', 'get_elites',
'bcrc_crossover', 'apply_mutation', 'apply_mutation_advanced', 'fitness_function', 'generate_random_population'
]
Binary file added ga/__pycache__/__init__.cpython-312.pyc
Binary file not shown.
Binary file added ga/__pycache__/__init__.cpython-313.pyc
Binary file not shown.
Binary file added ga/__pycache__/evaluator.cpython-312.pyc
Binary file not shown.
Binary file added ga/__pycache__/evaluator.cpython-313.pyc
Binary file not shown.
Binary file added ga/__pycache__/operators.cpython-312.pyc
Binary file not shown.
Binary file added ga/__pycache__/operators.cpython-313.pyc
Binary file not shown.
Binary file added ga/__pycache__/population.cpython-312.pyc
Binary file not shown.
Binary file added ga/__pycache__/population.cpython-313.pyc
Binary file not shown.
59 changes: 59 additions & 0 deletions ga/evaluator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
"""Fitness evaluator: simulates battery and charging for delimiter-flat chromosomes."""
from typing import List, Dict, Tuple, Any
from utils import split_routes, get_distance, INFEASIBLE_PENALTY


def fitness_function(chromosome: List[str], distance_matrix: Dict[Tuple[str, str], float],
battery_capacity: float = 100.0, consumption_rate: float = 1.0,
charging_rate: float = 1.0, w_distance: float = 0.6, w_charging: float = 0.4) -> float:
"""Compute fitness: weighted sum of total distance and charging time.

Returns a smaller-is-better fitness. Infeasible solutions get a large penalty.
Chromosome encoding: delimiter-flat list with 'S*' representing charging stations,
'D*' depots and 'C*' customers.
"""
total_distance = 0.0
total_charging_time = 0.0

routes = split_routes(chromosome)

for route in routes:
battery = battery_capacity
for i in range(len(route) - 1):
a, b = route[i], route[i + 1]
dist = get_distance(a, b, distance_matrix)
if dist == float('inf'):
return INFEASIBLE_PENALTY
total_distance += dist
needed_energy = dist * consumption_rate

# If current node is charging station, charge minimally to reach next station/depot
if a.startswith('S'):
# lookahead to next S or D
energy_needed_to_next = 0.0
for j in range(i, len(route) - 1):
u, v = route[j], route[j + 1]
d2 = get_distance(u, v, distance_matrix)
if d2 == float('inf'):
return INFEASIBLE_PENALTY
energy_needed_to_next += d2 * consumption_rate
if route[j + 1].startswith('S') or route[j + 1].startswith('D'):
break
required_energy = max(0.0, energy_needed_to_next - battery)
charge_time = required_energy / charging_rate
total_charging_time += charge_time
battery += required_energy
if battery > battery_capacity:
battery = battery_capacity

if battery < needed_energy:
# insufficient battery to travel (invalid)
return INFEASIBLE_PENALTY

battery -= needed_energy

# normalization
D = total_distance / 100.0
C = total_charging_time / 100.0
fitness = w_distance * D + w_charging * C
return fitness
129 changes: 129 additions & 0 deletions ga/operators.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
"""Selection, crossover and mutation operators for GA.

Functions expect delimiter-flat chromosome encoding (list[str]) where '|' separates routes.
"""
from typing import List, Tuple, Dict, Any
import random
import copy
from utils import split_routes, join_routes, get_distance


def get_elites(population: List[List[str]], fitness_scores: List[float], elite_size: int) -> List[List[str]]:
paired = list(zip(population, fitness_scores))
paired.sort(key=lambda x: x[1])
return [p for p, _ in paired[:elite_size]]


def tournament_selection(population: List[List[str]], fitness_scores: List[float], tournament_size: int) -> List[str]:
indices = random.sample(range(len(population)), k=min(tournament_size, len(population)))
best = None
best_score = float('inf')
for i in indices:
if fitness_scores[i] < best_score:
best_score = fitness_scores[i]
best = population[i]
return best


def selection(population: List[List[str]], fitness_scores: List[float], elite_size: int, tournament_size: int) -> List[List[str]]:
parents: List[List[str]] = []
parents.extend(get_elites(population, fitness_scores, elite_size))
remaining = len(population) - elite_size
for _ in range(remaining):
parents.append(tournament_selection(population, fitness_scores, tournament_size))
return parents


# ---- BCRC crossover (from notebook) adapted ----
def remove_customer_routes(routes: List[List[str]], customer: str) -> List[List[str]]:
return [[x for x in r if x != customer] for r in routes]


def insertion_cost(route: List[str], customer: str, dist: Dict[Tuple[str, str], float]) -> Tuple[float, int]:
if len(route) < 2:
return 0.0, 0
best_cost = float('inf')
best_pos = 0
for i in range(len(route) - 1):
a, b = route[i], route[i+1]
cost_before = get_distance(a, b, dist)
cost_after = get_distance(a, customer, dist) + get_distance(customer, b, dist)
delta = cost_after - cost_before
if delta < best_cost:
best_cost = delta
best_pos = i + 1
return best_cost, best_pos


def bcrc_crossover(chrom1: List[str], chrom2: List[str], dist: Dict[Tuple[str, str], float]) -> List[str]:
p1 = copy.deepcopy(chrom1)
p2 = copy.deepcopy(chrom2)
routes1 = split_routes(p1)
routes2 = split_routes(p2)
all_customers = [x for r in routes1 for x in r if x.startswith('C')]
if not all_customers:
return chrom2.copy()
customer = random.choice(all_customers)
child_routes = remove_customer_routes(routes2, customer)
best_global_cost = float('inf')
best_route_idx = None
best_insert_pos = None
for idx, route in enumerate(child_routes):
if len(route) < 2:
continue
cost, pos = insertion_cost(route, customer, dist)
if cost < best_global_cost:
best_global_cost = cost
best_route_idx = idx
best_insert_pos = pos
if best_route_idx is not None:
child_routes[best_route_idx].insert(best_insert_pos, customer)
else:
for idx, route in enumerate(child_routes):
if len(route) >= 2:
child_routes[idx].insert(1, customer)
break
return join_routes(child_routes)


# ---- Mutation (simpler variant) ----
def apply_mutation(chromosome: List[str], mutation_rate: float = 0.1) -> List[str]:
# simple swap mutation within a route
if random.random() > mutation_rate:
return chromosome
routes = split_routes(chromosome)
# pick a random route with at least two customers (excluding depots)
route_idx = None
for _ in range(5):
i = random.randrange(len(routes))
customers = [x for x in routes[i] if x.startswith('C')]
if len(customers) >= 2:
route_idx = i
break
if route_idx is None:
return chromosome
# perform swap on customer positions within chosen route
route = routes[route_idx]
cust_positions = [i for i, g in enumerate(route) if g.startswith('C')]
a, b = random.sample(cust_positions, 2)
route[a], route[b] = route[b], route[a]
routes[route_idx] = route
return join_routes(routes)


# ---- Advanced mutation wrapper (uses legacy `mutation.py` implementation) ----
def apply_mutation_advanced(chromosome: List[str], depot_locations: Dict[str, tuple], customer_locations: Dict[str, tuple], mutation_probability: float = 0.1, beta: float = 0.2) -> List[str]:
"""Use existing sophisticated mutation logic from `mutation.py` if available.

This function imports the original `apply_mutation` from `mutation.py` which
implements inter-depot and intra-depot strategies. It provides a thin wrapper
so callers can choose between the simple mutation above or the advanced one.
"""
try:
import mutation as mut
except Exception:
# fallback to simple mutation if module not importable
return apply_mutation(chromosome, mutation_rate=mutation_probability)

# the legacy mutation expects depot/customer dicts and returns a new chromosome
return mut.apply_mutation(chromosome, depot_locations, customer_locations, mutation_probability=mutation_probability, beta=beta)
32 changes: 32 additions & 0 deletions ga/population.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
"""Population generation utilities for GA.

This module contains a small random population generator that creates delimiter-flat
chromosomes for use in smoke runs. For production, replace with the more advanced
generator in `populasi_awal_final.py` adapted to delimiter encoding.
"""
from typing import List, Dict, Any
import random
from utils import flatten_from_nested


def simple_route_from_depot_customers(depot: str, customers: List[str]) -> List[str]:
# simple route: depot -> customers in order -> depot
return [depot] + customers + [depot]


def generate_random_population(depots: List[str], customers: List[str], population_size: int = 10) -> List[List[str]]:
population: List[List[str]] = []
for _ in range(population_size):
remaining = customers.copy()
random.shuffle(remaining)
# split into random chunks (random number of routes)
num_routes = random.randint(1, max(1, min(len(depots), len(customers))))
# split customers roughly evenly
chunk_size = max(1, len(customers) // num_routes)
routes = []
for i in range(0, len(customers), chunk_size):
chunk = remaining[i:i+chunk_size]
depot = random.choice(depots)
routes.append(simple_route_from_depot_customers(depot, chunk))
population.append(flatten_from_nested(routes))
return population
143 changes: 143 additions & 0 deletions main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
"""Minimal GA runner (smoke test) for Rute SPKLU problem.

This script builds a tiny synthetic problem and runs a few GA generations to
demonstrate end-to-end wiring of utils, ga.population, ga.operators and ga.evaluator.
"""
from ga.population import generate_random_population
from ga.operators import selection, bcrc_crossover, apply_mutation
from ga.evaluator import fitness_function
from utils import build_distance_matrix, load_nodes_from_csv

import argparse
import random
import os


def synthetic_nodes():
# create small synthetic nodes. IDs: D1, S1, C1..C5
nodes = {
'D1': {'x': 0.0, 'y': 0.0, 'demand': 0},
'S1': {'x': 5.0, 'y': 0.0, 'demand': 0},
'C1': {'x': 2.0, 'y': 1.0, 'demand': 1},
'C2': {'x': 2.5, 'y': -1.0, 'demand': 1},
'C3': {'x': 4.0, 'y': 1.5, 'demand': 1},
'C4': {'x': -1.0, 'y': -0.5, 'demand': 1},
'C5': {'x': -2.0, 'y': 1.0, 'demand': 1},
}
return nodes


def run_smoke():
random.seed(1)
nodes = synthetic_nodes()
dist = build_distance_matrix(nodes)

depots = ['D1']
customers = ['C1', 'C2', 'C3', 'C4', 'C5']

pop = generate_random_population(depots, customers, population_size=8)

generations = 20
elite_size = 1
tournament_size = 3

best = None
best_score = float('inf')

for g in range(generations):
fitnesses = [fitness_function(ch, dist) for ch in pop]
# track best
for ch, f in zip(pop, fitnesses):
if f < best_score:
best_score = f
best = ch

print(f"Gen {g}: best={best_score:.6f}")

parents = selection(pop, fitnesses, elite_size=elite_size, tournament_size=tournament_size)

# crossover
next_pop = []
while len(next_pop) < len(pop):
a = random.choice(parents)
b = random.choice(parents)
child = bcrc_crossover(a, b, dist)
child = apply_mutation(child, mutation_rate=0.2)
next_pop.append(child)

pop = next_pop

print('\nBest chromosome:', best)
print('Best fitness:', best_score)


def run_from_csv(csv_path: str):
"""Run GA using nodes loaded from a CSV file.

The loader expects node IDs in the CSV that start with 'D' for depot,
'S' for charging stations and 'C' for customers. If those prefixes are
not present, the function will attempt to infer depots/customers heuristically.
"""
nodes = load_nodes_from_csv(csv_path)
if not nodes:
print(f"No nodes loaded from {csv_path}; falling back to synthetic dataset.")
return run_smoke()

dist = build_distance_matrix(nodes)

# infer roles by prefix (D=depot, S=spklu, C=customer)
depots = [n for n in nodes.keys() if str(n).upper().startswith('D')]
spklus = [n for n in nodes.keys() if str(n).upper().startswith('S')]
customers = [n for n in nodes.keys() if str(n).upper().startswith('C')]

# fallback heuristics
if not depots:
# if no D* found, take first node as depot
depots = [next(iter(nodes.keys()))]
if not customers:
# all non-depot/non-spklu nodes are customers
customers = [n for n in nodes.keys() if n not in depots + spklus]

print(f"Using {len(depots)} depots, {len(spklus)} SPKLU, {len(customers)} customers from {csv_path}")

pop = generate_random_population(depots, customers, population_size=20)

generations = 50
elite_size = 2
tournament_size = 3

best = None
best_score = float('inf')

for g in range(generations):
fitnesses = [fitness_function(ch, dist) for ch in pop]
for ch, f in zip(pop, fitnesses):
if f < best_score:
best_score = f
best = ch
print(f"Gen {g}: best={best_score:.6f}")
parents = selection(pop, fitnesses, elite_size=elite_size, tournament_size=tournament_size)
next_pop = []
while len(next_pop) < len(pop):
a = random.choice(parents)
b = random.choice(parents)
child = bcrc_crossover(a, b, dist)
child = apply_mutation(child, mutation_rate=0.2)
next_pop.append(child)
pop = next_pop

print('\nBest chromosome:', best)
print('Best fitness:', best_score)


if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Run GA for Rute SPKLU (smoke or real CSV)')
parser.add_argument('--csv', '-c', help='Path to nodes CSV (optional). If omitted, runs synthetic smoke example.')
args = parser.parse_args()

if args.csv and os.path.exists(args.csv):
run_from_csv(args.csv)
else:
if args.csv:
print(f"CSV path provided but not found: {args.csv}. Running synthetic smoke instead.")
run_smoke()
Loading