Skip to content
10 changes: 10 additions & 0 deletions README.MD
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,13 @@ in Python containing the following methods:
size: O(1)
remove: O(n**2)
traversal: O(k) + O(2n)


##K-Means Classifier
Establish *k* nodes, each one representing a centroid, or cluster of data. For each node desired then, the algorithm positions that centroid at the point where the distance between it and the nearest points is on average smaller than the distance between those points and the next node.

Public methods:
clf.fit(self, data, k): Generates k centroids with which to classify the given data
clf.predict(self, data): Returns classes for some data if clf.fit has already been called.

Work done in collaboration with [Julien Wilson](https://github.com/julienawilson), [Ted Callahan](https://github.com/CCallahanIV), [Patrick Saunders](https://github.com/pasaunders) and [Avery Pratt](https://github.com/AveryPratt).
92 changes: 92 additions & 0 deletions src/kmeans.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
"""Implementation of the K-Means Classifier."""
import random
from math import sqrt


class KMeansClassifier(object):
"""Implementation of the K-Means Classifier."""

def __init__(self, max_iter=5, min_step=None):
"""Initialize a K-Means Classifier object."""
self.max_iter = max_iter
self.min_step = min_step
self.fitted = False
self.centroids = None

def fit(self, data, k=2):
"""Fit K centroids to given data."""
if k < 0 or k > len(data):
raise ValueError("K must be a positive integer less than the length of data.")

data['group'] = None
self.centroids = self._random_centroids(data, k)
iteration = 0
old_centroids = None

while not self._should_stop(old_centroids, iteration, k):
old_centroids = self.centroids
iteration += 1
data = self._classify(data)
self._assign_centroids(data, k)
self.fitted = True

def predict(self, data):
"""Predict the class of given test data after fit."""
if self.fitted is False:
raise RuntimeError('Run KMeansClassifier.fit before running predict.')
distances = []
for centroid in self.centroids:
distances.append((centroid[-1], self._calc_distance(data, centroid[:-1])))
return min(distances, key=lambda x: x[1])[0]

def _calc_distance(self, pt1, pt2):
"""Calculate the distance between two points."""
dist = 0.0
for i in range(len(pt1) - 2):
dist += (pt1[i] - pt2[i])**2
return sqrt(dist)

def _classify(self, data):
"""Assign each datapoint to the nearest centroid."""
for i in range(len(data)):
distances = []
for cent in self.centroids:
distances.append(self._calc_distance(cent, data.iloc[i]))
data.set_value(i, 'group', distances.index(min(distances)))
return data

def _find_mean(self, points):
"""Find the mean coordinates of points."""
col_means = []
for column in points:
col_means.append(points[column].mean())
return col_means

def _assign_centroids(self, data, k):
"""Set centroid coordinates to mean of their assigned datapoints."""
groups = []
for i in range(k):
group = data[data["group"] == i]
groups.append(group)
for idx, group in enumerate(groups):
self.centroids[idx] = self._find_mean(group)

def _should_stop(self, old_centroids, iteration, k):
"""Determine if the fit should stop runnng."""
if iteration > self.max_iter:
return True
if old_centroids:
centroid_movements = []
for i in range(k):
centroid_movements.append(self._calc_distance(old_centroids[i], self.centroids[i]))
if self.min_step:
if max(centroid_movements) < self.min_step:
return True
return False

def _random_centroids(self, data, k):
"""Return randomly generated centroids."""
k_list = []
for i in range(k):
k_list.append([random.uniform(min(data[column]), max(data[column])) for column in data.columns.values[:-2]])
return k_list
81 changes: 81 additions & 0 deletions src/test_kmeans.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
"""Tests for the K-Means Classifier."""

from math import sqrt
import pandas as pd
import pytest
import numpy as np


@pytest.fixture
def kmc():
"""Fixture to return a default KMC."""
from kmeans import KMeansClassifier
return KMeansClassifier()


@pytest.fixture
def some_data():
"""Fixture to return some dummy data."""
data = np.array([[2, 3], [4, 5], [6, 7], [8, 9], [1, 1], [2, 2], [3, 3], [4, 4]])
return pd.DataFrame(data=data)


def test_calc_distance_rows(kmc):
"""Test the _calc_distance method of the K Means Classifier."""
rows = [[2, 2, 1, 0], [0, 0, 1, 0]]
data = pd.DataFrame(data=rows, columns=['x', 'y', 'class', 'dummy'])
assert kmc._calc_distance(data.loc[0], data.loc[1]) == sqrt(8)


def test_calc_distance(kmc):
"""Test distance calculator helper method."""
assert kmc._calc_distance([0, 0, 0, 0], [3, 4, 0, 0]) == 5.0


def test_find_mean(kmc, some_data):
"""Unit test for find mean."""
data_means = kmc._find_mean(some_data)
assert data_means == [3.75, 4.25]


def test_should_stop_false_max_iter(kmc):
"""Test that a kmc does not doesnt stop iterating, with only a max iter of 5 set."""
from kmeans import KMeansClassifier
clusters = KMeansClassifier()
old_centroids = [[2, 3], [7, 8]]
clusters.centroids = [[2, 4], [90, 55]]
assert not clusters._should_stop(old_centroids, 3, 2)


def test_should_stop_true_max_iter(kmc):
"""Test that a kmc does not doesnt stop iterating, with only a max iter of 5 set."""
from kmeans import KMeansClassifier
clusters = KMeansClassifier()
old_centroids = [[2, 3, 0, 0], [7, 8, 0, 0]]
clusters.centroids = [[2, 4], [90, 55]]
assert clusters._should_stop(old_centroids, 6, 2)


def test_classify():
"""Unit test for classifying datapoints."""
from kmeans import KMeansClassifier
clusters = KMeansClassifier()
rows = [[4, 4, 1, 0], [0, 0, 1, 0]]
data = pd.DataFrame(data=rows, columns=['x', 'y', 'class', 'group'])
clusters.centroids = None
clusters.centroids = [[4, 5, 0, 0], [0, 1, 0, 0]]
clusters._classify(data)
assert data.iloc[0, 3] == 0 and data.iloc[1, 3] == 1


def test_fit():
"""Unit integration test for fitting centroids."""
from kmeans import KMeansClassifier
clusters = KMeansClassifier()
rows = [[20, 20, 1, 0], [0, 0, 1, 0]]
data = pd.DataFrame(data=rows, columns=['x', 'y', 'class', 'group'])
clusters.fit(data)
assert clusters.fitted


# def test_random_centroids()