diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..27b878d --- /dev/null +++ b/.gitignore @@ -0,0 +1,8 @@ +__pycache__/ +*.pyc +*.pyo +.pytest_cache/ +*.egg-info/ +dist/ +build/ +.env diff --git a/neural_network.py b/neural_network.py new file mode 100644 index 0000000..dc640ee --- /dev/null +++ b/neural_network.py @@ -0,0 +1,241 @@ +""" +Neural Network implementation from scratch using NumPy. + +Supports arbitrary hidden layer sizes, ReLU activations for hidden layers, +softmax output, and cross-entropy loss trained with mini-batch gradient descent. +""" + +import numpy as np + + +def relu(z): + """Rectified Linear Unit activation.""" + return np.maximum(0, z) + + +def relu_derivative(z): + """Derivative of ReLU.""" + return (z > 0).astype(float) + + +def softmax(z): + """Numerically stable softmax activation.""" + shifted = z - np.max(z, axis=0, keepdims=True) + exp_z = np.exp(shifted) + return exp_z / np.sum(exp_z, axis=0, keepdims=True) + + +def cross_entropy_loss(y_pred, y_true): + """ + Cross-entropy loss between predicted probabilities and one-hot encoded labels. + + Args: + y_pred: (num_classes, batch_size) predicted probabilities. + y_true: (num_classes, batch_size) one-hot encoded true labels. + + Returns: + Scalar average loss. + """ + m = y_true.shape[1] + log_probs = -np.log(np.clip(y_pred, 1e-12, 1.0)) + return np.sum(y_true * log_probs) / m + + +class NeuralNetwork: + """ + Fully-connected neural network trained with gradient descent. + + Architecture: Input -> [Dense + ReLU] * num_hidden_layers -> Dense -> Softmax + + Args: + layer_sizes: List of integers specifying the number of units per layer + including the input and output dimensions. + E.g. [784, 128, 64, 10] creates two hidden layers of + sizes 128 and 64 with a 10-class output. + learning_rate: Step size for gradient descent (default 0.1). + seed: Random seed for reproducibility (default 42). + """ + + def __init__(self, layer_sizes, learning_rate=0.1, seed=42): + np.random.seed(seed) + self.layer_sizes = layer_sizes + self.learning_rate = learning_rate + self.num_layers = len(layer_sizes) - 1 # number of weight matrices + self._init_params() + + # ------------------------------------------------------------------ + # Parameter initialisation + # ------------------------------------------------------------------ + + def _init_params(self): + """He initialisation for weights; zeros for biases.""" + self.params = {} + for l in range(1, self.num_layers + 1): + fan_in = self.layer_sizes[l - 1] + fan_out = self.layer_sizes[l] + self.params[f"W{l}"] = np.random.randn(fan_out, fan_in) * np.sqrt(2.0 / fan_in) + self.params[f"b{l}"] = np.zeros((fan_out, 1)) + + # ------------------------------------------------------------------ + # Forward propagation + # ------------------------------------------------------------------ + + def forward(self, X): + """ + Compute forward pass through the network. + + Args: + X: (input_size, batch_size) input matrix. + + Returns: + Tuple (output probabilities, cache dict for backprop). + """ + cache = {"A0": X} + A = X + for l in range(1, self.num_layers + 1): + W = self.params[f"W{l}"] + b = self.params[f"b{l}"] + Z = W @ A + b + if l < self.num_layers: + A = relu(Z) + else: + A = softmax(Z) + cache[f"Z{l}"] = Z + cache[f"A{l}"] = A + return A, cache + + # ------------------------------------------------------------------ + # Backpropagation + # ------------------------------------------------------------------ + + def backward(self, y_true, cache): + """ + Compute gradients via backpropagation. + + Args: + y_true: (num_classes, batch_size) one-hot encoded labels. + cache: Dict produced by forward(). + + Returns: + Dict of gradients keyed by 'W1', 'b1', …, 'WL', 'bL'. + """ + grads = {} + m = y_true.shape[1] + L = self.num_layers + + # Gradient of loss w.r.t. softmax output (combined softmax + CE) + dA = (cache[f"A{L}"] - y_true) / m + + for l in reversed(range(1, L + 1)): + A_prev = cache[f"A{l - 1}"] + W = self.params[f"W{l}"] + Z = cache[f"Z{l}"] + + if l < L: + dZ = dA * relu_derivative(Z) + else: + dZ = dA # Already the combined gradient for softmax + CE + + grads[f"W{l}"] = dZ @ A_prev.T + grads[f"b{l}"] = np.sum(dZ, axis=1, keepdims=True) + dA = W.T @ dZ + + return grads + + # ------------------------------------------------------------------ + # Parameter update + # ------------------------------------------------------------------ + + def update_params(self, grads): + """Gradient descent parameter update.""" + for l in range(1, self.num_layers + 1): + self.params[f"W{l}"] -= self.learning_rate * grads[f"W{l}"] + self.params[f"b{l}"] -= self.learning_rate * grads[f"b{l}"] + + # ------------------------------------------------------------------ + # Training + # ------------------------------------------------------------------ + + def train(self, X_train, y_train, epochs=20, batch_size=64, verbose=True): + """ + Train the network using mini-batch gradient descent. + + Args: + X_train: (input_size, num_samples) training data. + y_train: (num_classes, num_samples) one-hot labels. + epochs: Number of passes over the training set. + batch_size: Mini-batch size. + verbose: Print loss/accuracy each epoch when True. + + Returns: + Dict with 'loss' and 'accuracy' lists (one value per epoch). + """ + history = {"loss": [], "accuracy": []} + m = X_train.shape[1] + + for epoch in range(1, epochs + 1): + # Shuffle + permutation = np.random.permutation(m) + X_shuffled = X_train[:, permutation] + y_shuffled = y_train[:, permutation] + + epoch_loss = 0.0 + num_batches = 0 + + for start in range(0, m, batch_size): + X_batch = X_shuffled[:, start: start + batch_size] + y_batch = y_shuffled[:, start: start + batch_size] + + probs, cache = self.forward(X_batch) + loss = cross_entropy_loss(probs, y_batch) + grads = self.backward(y_batch, cache) + self.update_params(grads) + + epoch_loss += loss + num_batches += 1 + + epoch_loss /= num_batches + epoch_acc = self.evaluate(X_train, y_train) + history["loss"].append(epoch_loss) + history["accuracy"].append(epoch_acc) + + if verbose: + print( + f"Epoch {epoch:>3}/{epochs} " + f"loss: {epoch_loss:.4f} " + f"train_acc: {epoch_acc:.4f}" + ) + + return history + + # ------------------------------------------------------------------ + # Inference + # ------------------------------------------------------------------ + + def predict(self, X): + """ + Return predicted class indices for each sample. + + Args: + X: (input_size, num_samples) input matrix. + + Returns: + 1-D array of predicted class labels. + """ + probs, _ = self.forward(X) + return np.argmax(probs, axis=0) + + def evaluate(self, X, y_true): + """ + Compute classification accuracy. + + Args: + X: (input_size, num_samples) input matrix. + y_true: (num_classes, num_samples) one-hot labels. + + Returns: + Accuracy in [0, 1]. + """ + predictions = self.predict(X) + labels = np.argmax(y_true, axis=0) + return np.mean(predictions == labels) diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..e0485aa --- /dev/null +++ b/requirements.txt @@ -0,0 +1 @@ +numpy>=1.21 diff --git a/test_neural_network.py b/test_neural_network.py new file mode 100644 index 0000000..c52fe21 --- /dev/null +++ b/test_neural_network.py @@ -0,0 +1,272 @@ +""" +Unit tests for the NumPy neural network implementation. + +Tests cover: + - Activation functions (relu, softmax) + - Loss computation (cross_entropy_loss) + - Parameter initialisation + - Forward propagation shapes and output validity + - Backward propagation (gradient shapes, numerical gradient check) + - Parameter update + - Train / evaluate helpers on a toy dataset +""" + +import numpy as np +import pytest + +from neural_network import ( + NeuralNetwork, + cross_entropy_loss, + relu, + relu_derivative, + softmax, +) + +# --------------------------------------------------------------------------- +# Activation function tests +# --------------------------------------------------------------------------- + +class TestRelu: + def test_positive_values_unchanged(self): + x = np.array([1.0, 2.0, 3.0]) + np.testing.assert_array_equal(relu(x), x) + + def test_negative_values_zeroed(self): + x = np.array([-1.0, -0.5, 0.0]) + np.testing.assert_array_equal(relu(x), np.array([0.0, 0.0, 0.0])) + + def test_mixed_values(self): + x = np.array([-2.0, 0.0, 3.0]) + expected = np.array([0.0, 0.0, 3.0]) + np.testing.assert_array_equal(relu(x), expected) + + def test_derivative_positive(self): + x = np.array([1.0, 2.0, 0.1]) + np.testing.assert_array_equal(relu_derivative(x), np.ones(3)) + + def test_derivative_negative(self): + x = np.array([-1.0, -0.5]) + np.testing.assert_array_equal(relu_derivative(x), np.zeros(2)) + + def test_derivative_zero(self): + # Subgradient at 0 should be 0 (our implementation) + assert relu_derivative(np.array([0.0]))[0] == 0.0 + + +class TestSoftmax: + def test_output_sums_to_one(self): + z = np.random.randn(5, 10) + out = softmax(z) + np.testing.assert_allclose(out.sum(axis=0), np.ones(10), atol=1e-6) + + def test_all_outputs_positive(self): + z = np.random.randn(5, 10) + assert np.all(softmax(z) > 0) + + def test_numerical_stability_large_values(self): + z = np.array([[1000.0], [1001.0], [999.0]]) + out = softmax(z) + np.testing.assert_allclose(out.sum(), 1.0, atol=1e-6) + + def test_uniform_input(self): + z = np.zeros((4, 3)) + expected = np.full((4, 3), 0.25) + np.testing.assert_allclose(softmax(z), expected, atol=1e-6) + + +# --------------------------------------------------------------------------- +# Loss function tests +# --------------------------------------------------------------------------- + +class TestCrossEntropyLoss: + def test_perfect_prediction_low_loss(self): + y_pred = np.array([[1.0, 0.0], [0.0, 1.0]]) + y_true = np.array([[1.0, 0.0], [0.0, 1.0]]) + loss = cross_entropy_loss(y_pred, y_true) + assert loss < 1e-10 + + def test_loss_positive(self): + y_pred = softmax(np.random.randn(10, 32)) + y_true = np.eye(10)[:, np.random.randint(0, 10, 32)] + assert cross_entropy_loss(y_pred, y_true) > 0 + + def test_worse_prediction_higher_loss(self): + y_true = np.array([[1.0, 0.0], [0.0, 1.0]]) + y_good = np.array([[0.9, 0.1], [0.1, 0.9]]) + y_bad = np.array([[0.1, 0.9], [0.9, 0.1]]) + assert cross_entropy_loss(y_good, y_true) < cross_entropy_loss(y_bad, y_true) + + +# --------------------------------------------------------------------------- +# NeuralNetwork: initialisation +# --------------------------------------------------------------------------- + +class TestNeuralNetworkInit: + def test_param_shapes(self): + nn = NeuralNetwork([4, 8, 3]) + assert nn.params["W1"].shape == (8, 4) + assert nn.params["b1"].shape == (8, 1) + assert nn.params["W2"].shape == (3, 8) + assert nn.params["b2"].shape == (3, 1) + + def test_biases_initialised_to_zero(self): + nn = NeuralNetwork([4, 8, 3]) + np.testing.assert_array_equal(nn.params["b1"], np.zeros((8, 1))) + np.testing.assert_array_equal(nn.params["b2"], np.zeros((3, 1))) + + def test_three_hidden_layers(self): + nn = NeuralNetwork([10, 16, 8, 4, 2]) + for l in range(1, 5): + assert f"W{l}" in nn.params + assert f"b{l}" in nn.params + + +# --------------------------------------------------------------------------- +# NeuralNetwork: forward propagation +# --------------------------------------------------------------------------- + +class TestForwardProp: + def setup_method(self): + self.nn = NeuralNetwork([4, 8, 3], seed=0) + + def test_output_shape(self): + X = np.random.randn(4, 16) + probs, _ = self.nn.forward(X) + assert probs.shape == (3, 16) + + def test_output_sums_to_one(self): + X = np.random.randn(4, 16) + probs, _ = self.nn.forward(X) + np.testing.assert_allclose(probs.sum(axis=0), np.ones(16), atol=1e-6) + + def test_output_all_positive(self): + X = np.random.randn(4, 16) + probs, _ = self.nn.forward(X) + assert np.all(probs > 0) + + def test_cache_keys_present(self): + X = np.random.randn(4, 5) + _, cache = self.nn.forward(X) + expected_keys = {"A0", "Z1", "A1", "Z2", "A2"} + assert expected_keys == set(cache.keys()) + + +# --------------------------------------------------------------------------- +# NeuralNetwork: backward propagation +# --------------------------------------------------------------------------- + +class TestBackwardProp: + def setup_method(self): + self.nn = NeuralNetwork([4, 8, 3], seed=1) + + def test_gradient_shapes(self): + X = np.random.randn(4, 10) + Y = np.eye(3)[:, np.random.randint(0, 3, 10)] + _, cache = self.nn.forward(X) + grads = self.nn.backward(Y, cache) + assert grads["W1"].shape == (8, 4) + assert grads["b1"].shape == (8, 1) + assert grads["W2"].shape == (3, 8) + assert grads["b2"].shape == (3, 1) + + def test_numerical_gradient_check(self): + """Verify analytical gradients match finite-difference approximations.""" + np.random.seed(99) + nn = NeuralNetwork([3, 4, 2], seed=99) + X = np.random.randn(3, 5) + Y = np.eye(2)[:, np.random.randint(0, 2, 5)] + eps = 1e-5 + + probs, cache = nn.forward(X) + grads = nn.backward(Y, cache) + + for key in ["W1", "b1", "W2", "b2"]: + param = nn.params[key] + numerical_grad = np.zeros_like(param) + it = np.nditer(param, flags=["multi_index"], op_flags=["readwrite"]) + while not it.finished: + idx = it.multi_index + orig = param[idx] + param[idx] = orig + eps + p1, _ = nn.forward(X) + loss_plus = cross_entropy_loss(p1, Y) + param[idx] = orig - eps + p2, _ = nn.forward(X) + loss_minus = cross_entropy_loss(p2, Y) + numerical_grad[idx] = (loss_plus - loss_minus) / (2 * eps) + param[idx] = orig + it.iternext() + + np.testing.assert_allclose( + grads[key], numerical_grad, rtol=1e-4, atol=1e-6, + err_msg=f"Gradient check failed for {key}" + ) + + +# --------------------------------------------------------------------------- +# NeuralNetwork: parameter update +# --------------------------------------------------------------------------- + +class TestParamUpdate: + def test_params_change_after_update(self): + nn = NeuralNetwork([4, 8, 3], seed=2) + W1_before = nn.params["W1"].copy() + grads = { + "W1": np.ones((8, 4)), + "b1": np.ones((8, 1)), + "W2": np.ones((3, 8)), + "b2": np.ones((3, 1)), + } + nn.update_params(grads) + assert not np.allclose(nn.params["W1"], W1_before) + + def test_update_magnitude(self): + lr = 0.5 + nn = NeuralNetwork([2, 2], learning_rate=lr, seed=3) + W1_before = nn.params["W1"].copy() + grad = np.ones((2, 2)) + grads = {"W1": grad, "b1": np.zeros((2, 1))} + nn.update_params(grads) + np.testing.assert_allclose(nn.params["W1"], W1_before - lr * grad) + + +# --------------------------------------------------------------------------- +# NeuralNetwork: end-to-end training on toy data +# --------------------------------------------------------------------------- + +class TestEndToEnd: + def _make_xor_data(self, n=200, seed=0): + """Simple 2-class linearly-inseparable toy dataset.""" + rng = np.random.default_rng(seed) + X = rng.standard_normal((2, n)) + labels = ((X[0] * X[1]) > 0).astype(int) + Y = np.eye(2)[:, labels] + return X, Y + + def test_loss_decreases(self): + X, Y = self._make_xor_data() + nn = NeuralNetwork([2, 16, 2], learning_rate=0.1, seed=7) + history = nn.train(X, Y, epochs=50, batch_size=32, verbose=False) + # Loss should be lower at the end than at the start + assert history["loss"][-1] < history["loss"][0] + + def test_accuracy_above_random(self): + X, Y = self._make_xor_data() + nn = NeuralNetwork([2, 32, 2], learning_rate=0.1, seed=7) + nn.train(X, Y, epochs=100, batch_size=32, verbose=False) + acc = nn.evaluate(X, Y) + # A trained network should do better than random (0.5 for 2 classes) + assert acc > 0.5 + + def test_predict_shape(self): + X, Y = self._make_xor_data(n=30) + nn = NeuralNetwork([2, 8, 2], seed=5) + preds = nn.predict(X) + assert preds.shape == (30,) + assert set(preds).issubset({0, 1}) + + def test_evaluate_returns_scalar_in_range(self): + X, Y = self._make_xor_data(n=50) + nn = NeuralNetwork([2, 8, 2], seed=5) + acc = nn.evaluate(X, Y) + assert 0.0 <= acc <= 1.0 diff --git a/train.py b/train.py new file mode 100644 index 0000000..372b40c --- /dev/null +++ b/train.py @@ -0,0 +1,131 @@ +""" +Train a NumPy neural network on the MNIST dataset and report accuracy. + +Usage: + python train.py + +The script: + 1. Downloads / loads the MNIST dataset. + 2. Pre-processes the images (normalise, flatten). + 3. Trains the network for a configurable number of epochs. + 4. Evaluates and prints the final test accuracy. +""" + +import numpy as np + +from neural_network import NeuralNetwork + +# --------------------------------------------------------------------------- +# Hyper-parameters +# --------------------------------------------------------------------------- +LAYER_SIZES = [784, 128, 64, 10] +LEARNING_RATE = 0.1 +EPOCHS = 20 +BATCH_SIZE = 64 +SEED = 42 + + +# --------------------------------------------------------------------------- +# Data loading helpers +# --------------------------------------------------------------------------- + +def load_mnist(): + """ + Load MNIST returning train/test splits as NumPy arrays. + + The function tries several backends in order: + 1. tensorflow.keras (if TensorFlow is installed) + 2. sklearn fetch_openml (downloads the dataset on first run) + + Returns: + (X_train, y_train, X_test, y_test) where + X_*: float32 arrays of shape (60000/10000, 784) in [0, 1] + y_*: int arrays of shape (60000/10000,) with class labels 0-9 + """ + # ---- Option 1: TensorFlow / Keras ---- + try: + from tensorflow.keras.datasets import mnist # type: ignore + (X_train, y_train), (X_test, y_test) = mnist.load_data() + X_train = X_train.reshape(-1, 784).astype(np.float32) / 255.0 + X_test = X_test.reshape(-1, 784).astype(np.float32) / 255.0 + print("Loaded MNIST via TensorFlow/Keras.") + return X_train, y_train, X_test, y_test + except Exception: + pass + + # ---- Option 2: scikit-learn openml ---- + try: + from sklearn.datasets import fetch_openml # type: ignore + print("Downloading MNIST via scikit-learn (this may take a moment)…") + mnist = fetch_openml("mnist_784", version=1, as_frame=False, parser="auto") + X = mnist.data.astype(np.float32) / 255.0 + y = mnist.target.astype(np.int32) + X_train, y_train = X[:60000], y[:60000] + X_test, y_test = X[60000:], y[60000:] + print("Loaded MNIST via scikit-learn.") + return X_train, y_train, X_test, y_test + except Exception as exc: + raise RuntimeError( + "Could not load MNIST. Please install tensorflow or scikit-learn.\n" + f" pip install tensorflow OR pip install scikit-learn\n" + f"Original error: {exc}" + ) from exc + + +def one_hot_encode(y, num_classes=10): + """Convert integer label vector to one-hot matrix (num_classes x num_samples).""" + m = len(y) + oh = np.zeros((num_classes, m), dtype=np.float32) + oh[y, np.arange(m)] = 1.0 + return oh + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + +def main(): + np.random.seed(SEED) + + # -- Load data ----------------------------------------------------------- + X_train, y_train, X_test, y_test = load_mnist() + + # Transpose to (features, samples) convention expected by the network + X_train = X_train.T # (784, 60000) + X_test = X_test.T # (784, 10000) + + Y_train = one_hot_encode(y_train) # (10, 60000) + Y_test = one_hot_encode(y_test) # (10, 10000) + + print( + f"\nDataset: {X_train.shape[1]} training samples, " + f"{X_test.shape[1]} test samples." + ) + + # -- Build & train network ----------------------------------------------- + print(f"\nNetwork architecture: {LAYER_SIZES}") + print(f"Learning rate: {LEARNING_RATE}, Epochs: {EPOCHS}, Batch size: {BATCH_SIZE}\n") + + model = NeuralNetwork( + layer_sizes=LAYER_SIZES, + learning_rate=LEARNING_RATE, + seed=SEED, + ) + + history = model.train( + X_train, + Y_train, + epochs=EPOCHS, + batch_size=BATCH_SIZE, + verbose=True, + ) + + # -- Final evaluation ---------------------------------------------------- + test_acc = model.evaluate(X_test, Y_test) + print(f"\nFinal test accuracy: {test_acc * 100:.2f}%") + + return history, test_acc + + +if __name__ == "__main__": + main()