Skip to content
0

文章发布较早,内容可能过时,阅读注意甄别。

numpy搭建神经网络

1、基础部分

主要介绍一下各网络层的搭建,前提是安装numpy的包,同时导入import numpy as np

1.1、全连接层

python
class FullyConnectedLayer(object):
    def __init__(self, num_input, num_output):  # 全连接层初始化
        self.d_bias = None
        self.d_weight = None
        self.output = None
        self.input = None
        self.bias = None
        self.weight = None
        self.num_input = num_input
        self.num_output = num_output
        print('\tFully connected layer with input %d, output %d.\n' % (self.num_input, self.num_output))

    def init_param(self, std=0.01):  # 参数初始化
        # 随机初始化
        self.weight = np.random.normal(loc=0.0, scale=std, size=(self.num_input, self.num_output))

        # 零初始化
        # self.weight = np.zeros([self.num_input, self.num_output])

        # Xavier初始化
        # xavier_stddev = np.sqrt(2.0 / (self.num_input + self.num_output))
        # self.weight = np.random.normal(loc=0.0, scale=xavier_stddev, size=(self.num_input, self.num_output))

        # He初始化
        # he_stddev = np.sqrt(2.0 / self.num_input)
        # self.weight = np.random.normal(loc=0.0, scale=he_stddev, size=(self.num_input, self.num_output))
        self.bias = np.zeros([1, self.num_output])

    def forward(self, input):  # 前向传播计算
        self.input = input
        # TODO:全连接层的前向传播,计算输出结果
        # 根据Y = W * X + B计算公式可得前向传播公式
        self.output = np.dot(input, self.weight) + self.bias
        # print(self.input.shape)
        # print(self.output.shape)
        return self.output

    def backward(self, top_diff):  # 反向传播的计算
        # TODO:全连接层的反向传播,计算参数梯度和本层损失
        # 根据公式Y = W * X + B,可知每一层的权重梯度为输入X,所以当前的权重梯度 = 上一层的梯度结果 * 输入input
        self.d_weight = np.dot(self.input.T, top_diff)
        # 根据公式Y = W * X + B,可知每一层的偏置梯度为1,所以当前的偏置梯度 = 上一层的梯度结果对每一列求和
        self.d_bias = np.sum(top_diff, axis=0, keepdims=True)
        # 根据公式Y = W * X + B,可知传递到下一层的梯度为权重W,所以下一层的梯度 = 上一层的梯度 * 权重W
        bottom_diff = np.dot(top_diff, self.weight.T)
        return bottom_diff

    def update_param(self, lr):  # 参数更新
        # TODO:对全连接层参数利用参数进行更新
        # 根据梯度下降法可知权重更新公式为 W = W - lr * d(W)
        self.weight = self.weight - lr * self.d_weight
        # 根据梯度下降法可知偏置更新公式为 B = B - lr * d(B)
        self.bias = self.bias - lr * self.d_bias

    def load_param(self, weight, bias):  # 参数加载
        assert self.weight.shape == weight.shape
        assert self.bias.shape == bias.shape
        self.weight = weight
        self.bias = bias

    def save_param(self):  # 参数保存
        return self.weight, self.bias

1.2、激活层

1.2.1、ReLU

python
class ReLULayer(object):
    def __init__(self):
        self.input = None
        print('\tReLU layer.')

    def forward(self, input):  # 前向传播的计算
        self.input = input
        # TODO:ReLU层的前向传播,计算输出结果
        # 根据ReLU的计算公式,Y = X (X > 0) : 0 (X < 0)
        # 可知 Y = max(0, X)
        output = np.maximum(0, self.input)
        return output

    def backward(self, top_diff):  # 反向传播的计算
        # TODO:ReLU层的反向传播,计算本层损失
        # 根据ReLU的计算公式,Y = X (X > 0) : 0 (X < 0)
        # 可知当 X > 0时,梯度为1;当 X < 0时,梯度为0
        # 所以可知反向传播公式 = 上一层的梯度 * (输入 > 0)
        bottom_diff = top_diff * (self.input > 0)
        return bottom_diff

1.2.2、Sigmoid

python
class SigmoidLayer(object):
    def __init__(self):
        self.input = None
        print('\tSigmoid layer.')

    def forward(self, input):  # 前向传播的计算
        self.input = input
        # TODO:Sigmoid层的前向传播,计算输出结果
        output = 1 / (1 + np.exp(-self.input))
        return output

    def backward(self, top_diff):  # 反向传播的计算
        # TODO:Sigmoid层的反向传播,计算本层损失
        sigmoid_out = 1 / (1 + np.exp(-self.input))
        bottom_diff = top_diff * sigmoid_out * (1 - sigmoid_out)
        return bottom_diff

1.2.3、Softmax

python
class SoftmaxLossLayer(object):
    def __init__(self):
        self.label_onehot = None
        self.batch_size = None
        self.prob = None
        print('\tSoftmax loss layer.')

    def forward(self, input):  # 前向传播的计算
        # TODO:softmax 损失层的前向传播,计算输出结果
        input_max = np.max(input, axis=1, keepdims=True)
        input_exp = np.exp(input - input_max)
        self.prob = input_exp / np.sum(input_exp, axis=1, keepdims=True)
        return self.prob

    def get_loss(self, label):  # 计算损失
        self.batch_size = self.prob.shape[0]
        self.label_onehot = np.zeros_like(self.prob)
        self.label_onehot[np.arange(self.batch_size), label] = 1.0
        loss = -np.sum(np.log(self.prob) * self.label_onehot) / self.batch_size
        return loss

    def backward(self):  # 反向传播的计算
        # TODO:softmax 损失层的反向传播,计算本层损失
        bottom_diff = (self.prob - self.label_onehot) / self.batch_size
        # print('bottom_diff', bottom_diff) # batch_size,10
        return bottom_diff

1.3、卷积层

python
class ConvolutionalLayer(object):
    def __init__(self, kernel_size, channel_in, channel_out, padding, stride):
        self.kernel_size = kernel_size
        self.channel_in = channel_in
        self.channel_out = channel_out
        self.padding = padding
        self.stride = stride
        print('\tConvolutional layer with kernel size %d, input channel %d, output channel %d.' % (
            self.kernel_size, self.channel_in, self.channel_out))

    def init_param(self, std=0.01):
        self.weight = np.random.normal(loc=0.0, scale=std,
                                       size=(self.channel_in, self.kernel_size, self.kernel_size, self.channel_out))
        self.bias = np.zeros([self.channel_out])
        show_matrix(self.weight, 'conv weight ')
        show_matrix(self.bias, 'conv bias ')

    def (self, input):
        start_time = time.time()
        self.input = input  # [N, C, H, W]
        height = self.input.shape[2] + self.padding * 2
        width = self.input.shape[3] + self.padding * 2
        self.input_pad = np.zeros([self.input.shape[0], self.input.shape[1], height, width])
        self.input_pad[:, :, self.padding:self.padding + self.input.shape[2],
        self.padding:self.padding + self.input.shape[3]] = self.input
        height_out = (height - self.kernel_size) // self.stride + 1
        width_out = (width - self.kernel_size) // self.stride + 1
        self.output = np.zeros([self.input.shape[0], self.channel_out, height_out, width_out])
        for idxn in range(self.input.shape[0]):
            for idxc in range(self.channel_out):
                for idxh in range(height_out):
                    for idxw in range(width_out):
                        self.output[idxn, idxc, idxh, idxw] = np.sum(self.input_pad[idxn, :, idxh * self.stride:idxh * self.stride + self.kernel_size,idxw * self.stride:idxw * self.stride + self.kernel_size] * self.weight[:, :, :, idxc]) + self.bias[idxc]
        show_matrix(self.output, 'conv out ')
        show_time(time.time() - start_time, 'conv forward time: ')
        return self.output

    def forward_raw_2(self, input):
        start_time = time.time()
        self.input = input  # [N, C, H, W]
        height = self.input.shape[2] + self.padding * 2
        width = self.input.shape[3] + self.padding * 2
        self.input_pad = np.zeros([self.input.shape[0], self.input.shape[1], height, width])
        self.input_pad[:, :, self.padding:self.padding + self.input.shape[2],
        self.padding:self.padding + self.input.shape[3]] = self.input
        height_out = (height - self.kernel_size) // self.stride + 1
        width_out = (width - self.kernel_size) // self.stride + 1
        self.output = np.zeros([self.input.shape[0], self.channel_out, height_out, width_out])
        self.weight_reshape = np.reshape(self.weight, [-1, self.channel_out])
        for idxn in range(self.input.shape[0]):
            for idxh in range(height_out):
                for idxw in range(width_out):
                    crop = self.input_pad[idxn, :, idxh * self.stride:idxh * self.stride + self.kernel_size, idxw * self.stride:idxw * self.stride + self.kernel_size].reshape([-1])
                    self.output[idxn, :, idxh, idxw] = np.dot(crop, self.weight_reshape) + self.bias
        show_matrix(self.output, 'conv out ')
        show_time(time.time() - start_time, 'conv forward time: ')
        return self.output

    def forward(self, input):
        start_time = time.time()
        self.input = input  # [N, C, H, W]
        height = self.input.shape[2] + self.padding * 2
        width = self.input.shape[3] + self.padding * 2
        self.input_pad = np.zeros([self.input.shape[0], self.input.shape[1], height, width])
        self.input_pad[:, :, self.padding:self.padding + self.input.shape[2],
        self.padding:self.padding + self.input.shape[3]] = self.input
        self.height_out = (height - self.kernel_size) // self.stride + 1
        self.width_out = (width - self.kernel_size) // self.stride + 1
        self.weight_reshape = np.reshape(self.weight, [-1, self.channel_out])
        self.img2col = np.zeros([self.input.shape[0] * self.height_out * self.width_out,
                                 self.channel_in * self.kernel_size * self.kernel_size])
        for idxn in range(self.input.shape[0]):
            for idxh in range(self.height_out):
                for idxw in range(self.width_out):
                    self.img2col[idxn * self.height_out * self.width_out + idxh * self.width_out + idxw, :] = self.input_pad[idxn, :, idxh * self.stride:idxh * self.stride + self.kernel_size, idxw * self.stride:idxw * self.stride + self.kernel_size].reshape([-1])
        output = np.dot(self.img2col, self.weight_reshape) + self.bias
        self.output = output.reshape([self.input.shape[0], self.height_out, self.width_out, -1]).transpose([0, 3, 1, 2])
        show_matrix(self.output, 'conv out ')
        show_time(time.time() - start_time, 'conv forward time: ')
        return self.output

    def backward(self, top_diff):
        bottom_diff = np.zeros(self.input_pad.shape)
        top_diff = top_diff.transpose([0, 2, 3, 1]).reshape(
            [self.input.shape[0] * self.height_out * self.width_out, -1])
        d_img2col = np.dot(top_diff, self.weight_reshape.T)
        d_weight_reshape = np.dot(self.img2col.T, top_diff)
        self.d_weight = np.reshape(d_weight_reshape, self.weight.shape)
        self.d_bias = np.dot(np.ones([1, self.input.shape[0] * self.height_out * self.width_out]), top_diff).reshape(-1)
        for idxn in range(self.input.shape[0]):
            for idxh in range(self.height_out):
                for idxw in range(self.width_out):
                    bottom_diff[idxn, :, idxh * self.stride:idxh * self.stride + self.kernel_size,
                    idxw * self.stride:idxw * self.stride + self.kernel_size] += d_img2col[idxn * self.height_out * self.width_out + idxh * self.width_out + idxw, :].reshape([-1, self.kernel_size, self.kernel_size])
        bottom_diff = bottom_diff[:, :, self.padding:self.padding + self.input.shape[2], self.padding:self.padding + self.input.shape[3]]
        show_matrix(top_diff, 'top_diff--------')
        show_matrix(self.d_weight, 'conv d_w ')
        show_matrix(self.d_bias, 'conv d_b ')
        show_matrix(bottom_diff, 'conv d_h ')
        return bottom_diff

    def backward_raw(self, top_diff):
        self.d_weight = np.zeros(self.weight.shape)
        self.d_bias = np.zeros(self.bias.shape)
        bottom_diff = np.zeros(self.input_pad.shape)
        for idxn in range(top_diff.shape[0]):
            for idxc in range(top_diff.shape[1]):
                for idxh in range(top_diff.shape[2]):
                    for idxw in range(top_diff.shape[3]):
                        self.d_weight[:, :, :, idxc] += top_diff[idxn, idxc, idxh, idxw] * self.input_pad[idxn, :, idxh * self.stride:idxh * self.stride + self.kernel_size, idxw * self.stride:idxw * self.stride + self.kernel_size]
                        self.d_bias[idxc] += top_diff[idxn, idxc, idxh, idxw]
                        bottom_diff[idxn, :, idxh * self.stride:idxh * self.stride + self.kernel_size, idxw * self.stride:idxw * self.stride + self.kernel_size] += top_diff[idxn, idxc, idxh, idxw] * self.weight[:, :, :, idxc]
        bottom_diff = bottom_diff[:, :, self.padding:self.padding + self.input.shape[2], self.padding:self.padding + self.input.shape[3]]
        show_matrix(top_diff, 'top_diff--------')
        show_matrix(self.d_weight, 'conv d_w ')
        show_matrix(self.d_bias, 'conv d_b ')
        show_matrix(bottom_diff, 'conv d_h ')
        return bottom_diff

    def get_gradient(self):
        return self.d_weight, self.d_bias

    def update_param(self, lr):
        self.weight += - lr * self.d_weight
        self.bias += - lr * self.d_bias
        show_matrix(self.weight, 'conv update weight ')
        show_matrix(self.bias, 'conv update bias ')

    def load_param(self, weight, bias):
        assert self.weight.shape == weight.shape
        assert self.bias.shape == bias.shape
        self.weight = weight
        self.bias = bias
        show_matrix(self.weight, 'conv weight ')
        show_matrix(self.bias, 'conv bias ')

NOTE

其中,前向传播有三种写法,分别对应forward_raw_1、forward_raw_2和forward,计算速度为

forward_raw_1 < forward_raw_2 < forward,反向传播也是类似。

1.4、池化层

python
class MaxPoolingLayer(object):
    def __init__(self, kernel_size, stride):
        self.kernel_size = kernel_size
        self.stride = stride
        print('\tMax pooling layer with kernel size %d, stride %d.' % (self.kernel_size, self.stride))

    def forward_raw(self, input):
        start_time = time.time()
        self.input = input  # [N, C, H, W]
        self.max_index = np.zeros(self.input.shape)
        height_out = (self.input.shape[2] - self.kernel_size) // self.stride + 1
        width_out = (self.input.shape[3] - self.kernel_size) // self.stride + 1
        self.output = np.zeros([self.input.shape[0], self.input.shape[1], height_out, width_out])
        for idxn in range(self.input.shape[0]):
            for idxc in range(self.input.shape[1]):
                for idxh in range(height_out):
                    for idxw in range(width_out):
                        self.output[idxn, idxc, idxh, idxw] = np.max(self.input[idxn, idxc, idxh * self.stride:idxh * self.stride + self.kernel_size, idxw * self.stride:idxw * self.stride + self.kernel_size])
                        curren_max_index = np.argmax(self.input[idxn, idxc, idxh * self.stride:idxh * self.stride + self.kernel_size, idxw * self.stride:idxw * self.stride + self.kernel_size])
                        curren_max_index = np.unravel_index(curren_max_index, [self.kernel_size, self.kernel_size])
                        self.max_index[idxn, idxc, idxh * self.stride + curren_max_index[0], idxw * self.stride + curren_max_index[1]] = 1
        show_matrix(self.output, 'max pooling out ')
        show_time(time.time() - start_time, 'max pooling forward time: ')
        return self.output

    def forward(self, input):
        start_time = time.time()
        self.input = input  # [N, C, H, W]
        self.height_out = (self.input.shape[2] - self.kernel_size) // self.stride + 1
        self.width_out = (self.input.shape[3] - self.kernel_size) // self.stride + 1
        img2col = np.zeros([self.input.shape[0], self.input.shape[1], self.height_out * self.width_out, self.kernel_size * self.kernel_size])
        for idxh in range(self.height_out):
            for idxw in range(self.width_out):
                img2col[:, :, idxh * self.width_out + idxw] = self.input[:, :, idxh * self.stride:idxh * self.stride + self.kernel_size, idxw * self.stride:idxw * self.stride + self.kernel_size].reshape([self.input.shape[0], self.input.shape[1], -1])
        self.output = np.max(img2col, axis=-1)
        self.output = np.reshape(self.output, [self.input.shape[0], self.input.shape[1], self.height_out, self.width_out])
        self.argmax = np.argmax(img2col, axis=-1)
        self.argmax = self.argmax.reshape(-1)
        self.max_index = np.zeros([self.argmax.shape[0], img2col.shape[-1]])
        self.max_index[np.arange(self.argmax.shape[0]), self.argmax] = 1.0
        self.max_index = np.reshape(self.max_index, img2col.shape)
        show_matrix(self.output, 'max pooling out ')
        show_time(time.time() - start_time, 'max pooling forward time: ')
        return self.output

    def backward(self, top_diff):
        bottom_diff = np.zeros(self.input.shape)
        top_diff = top_diff.reshape([self.input.shape[0], self.input.shape[1], self.height_out * self.width_out])
        top_diff = np.tile(np.expand_dims(top_diff, axis=-1), [1, 1, 1, self.kernel_size * self.kernel_size])
        d_img2col = top_diff * self.max_index
        for idxh in range(self.height_out):
            for idxw in range(self.width_out):
                bottom_diff[:, :, idxh * self.stride:idxh * self.stride + self.kernel_size, idxw * self.stride:idxw * self.stride + self.kernel_size] = d_img2col[:, :, idxh * self.width_out + idxw].reshape([self.input.shape[0], self.input.shape[1], self.kernel_size, self.kernel_size])
        show_matrix(top_diff, 'top_diff--------')
        show_matrix(bottom_diff, 'max pooling d_h ')
        return bottom_diff

    def backward_raw(self, top_diff):
        bottom_diff = np.zeros(self.input.shape)
        for idxn in range(top_diff.shape[0]):
            for idxc in range(top_diff.shape[1]):
                for idxh in range(top_diff.shape[2]):
                    for idxw in range(top_diff.shape[3]):
                        bottom_diff[idxn, idxc, idxh * self.stride:idxh * self.stride + self.kernel_size, idxw * self.stride:idxw * self.stride + self.kernel_size] = (top_diff[idxn, idxc, idxh, idxw] * self.max_index[idxn, idxc, idxh * self.stride:idxh * self.stride + self.kernel_size, idxw * self.stride:idxw * self.stride + self.kernel_size])
        show_matrix(top_diff, 'top_diff--------')
        show_matrix(bottom_diff, 'max pooling d_h ')
        return bottom_diff

    def backward_raw_book(self, top_diff):
        bottom_diff = np.zeros(self.input.shape)
        for idxn in range(top_diff.shape[0]):
            for idxc in range(top_diff.shape[1]):
                for idxh in range(top_diff.shape[2]):
                    for idxw in range(top_diff.shape[3]):
                        max_index = np.argmax(self.input[idxn, idxc, idxh * self.stride:idxh * self.stride + self.kernel_size, idxw * self.stride:idxw * self.stride + self.kernel_size])
                        max_index = np.unravel_index(max_index, [self.kernel_size, self.kernel_size])
                        bottom_diff[idxn, idxc, idxh * self.stride + max_index[0], idxw * self.stride + max_index[1]] = top_diff[idxn, idxc, idxh, idxw]
        show_matrix(top_diff, 'top_diff--------')
        show_matrix(bottom_diff, 'max pooling d_h ')
        return bottom_diff

NOTE

池化层的前向传播有两种写法,分别对应forward_raw和forward,计算速度为:

forward_raw < forward,反向传播也是如此

另外,类中的函数定义如下:

python
import numpy as np
import time


def show_matrix(mat, name):
    print(name + str(mat.shape) + ' mean %f, std %f' % (mat.mean(), mat.std()))
    # pass


def show_time(time, name):
    print(name + str(time))
    # pass

1.5、Flatten

python
class FlattenLayer(object):
    def __init__(self, input_shape, output_shape):
        self.input_shape = input_shape
        self.output_shape = output_shape
        assert np.prod(self.input_shape) == np.prod(self.output_shape)
        print('\tFlatten layer with input shape %s, output shape %s.' % (str(self.input_shape), str(self.output_shape)))

    def forward(self, input):
        assert list(input.shape[1:]) == list(self.input_shape)
        # matconvnet feature map dim: [N, height, width, channel]
        # ours feature map dim: [N, channel, height, width]
        self.input = np.transpose(input, [0, 2, 3, 1])
        self.output = self.input.reshape([self.input.shape[0]] + list(self.output_shape))
        return self.output

    def backward(self, top_diff):
        assert list(top_diff.shape[1:]) == list(self.output_shape)
        top_diff = np.transpose(top_diff, [0, 3, 1, 2])
        bottom_diff = top_diff.reshape([top_diff.shape[0]] + list(self.input_shape))
        show_matrix(bottom_diff, 'flatten d_h ')
        return bottom_diff

2、实战部分

2.1、使用numpy分类Mnist

2.1.1、目录和数据集

本次实验需要用到数据集和两个文件,请提前下载好Mnist数据集,放在mnist_data文件夹的目录下。然后新建两个python文件,命名为main.pylayers.py

目录结构如下:

python
|-- mnist_data/
|   |-- t10k-images-idx3-ubyte
|   |-- t10k-labels-idx1-ubyte
|   |-- train-images-idx3-ubyte
|   |-- train-labels-idx1-ubyte
|-- main.py
|-- layers.py

数据集可以使用torchvision.datasets进行下载

python
from torchvision import datasets, transforms
# 准备数据集
    train_data = datasets.MNIST(root="./dataset", train=True, transform=trans, download=True)
    test_data = datasets.MNIST(root="./dataset", train=False, transform=trans, download=False)

然后将压缩包.gz文件删除,将剩下的文件放入对应的文件夹中

2.1.2、layers.py文件

主要存放各种神经网络层的numpy实现代码

python
# coding=utf-8
import numpy as np


class FullyConnectedLayer(object):
    def __init__(self, num_input, num_output):  # 全连接层初始化
        self.d_bias = None
        self.d_weight = None
        self.output = None
        self.input = None
        self.bias = None
        self.weight = None
        self.num_input = num_input
        self.num_output = num_output
        print('\tFully connected layer with input %d, output %d.' % (self.num_input, self.num_output))

    def init_param(self, std=0.01):  # 参数初始化
        # 随机初始化
        self.weight = np.random.normal(loc=0.0, scale=std, size=(self.num_input, self.num_output))

        # 零初始化
        # self.weight = np.zeros([self.num_input, self.num_output])

        # Xavier初始化
        # xavier_stddev = np.sqrt(2.0 / (self.num_input + self.num_output))
        # self.weight = np.random.normal(loc=0.0, scale=xavier_stddev, size=(self.num_input, self.num_output))

        # He初始化
        # he_stddev = np.sqrt(2.0 / self.num_input)
        # self.weight = np.random.normal(loc=0.0, scale=he_stddev, size=(self.num_input, self.num_output))
        self.bias = np.zeros([1, self.num_output])

    def forward(self, input):  # 前向传播计算
        self.input = input
        # TODO:全连接层的前向传播,计算输出结果
        # 根据Y = W * X + B计算公式可得前向传播公式
        self.output = np.dot(input, self.weight) + self.bias
        # print(self.input.shape)
        # print(self.output.shape)
        return self.output

    def backward(self, top_diff):  # 反向传播的计算
        # TODO:全连接层的反向传播,计算参数梯度和本层损失
        # 根据公式Y = W * X + B,可知每一层的权重梯度为输入X,所以当前的权重梯度 = 上一层的梯度结果 * 输入input
        self.d_weight = np.dot(self.input.T, top_diff)
        # 根据公式Y = W * X + B,可知每一层的偏置梯度为1,所以当前的偏置梯度 = 上一层的梯度结果对每一列求和
        self.d_bias = np.sum(top_diff, axis=0, keepdims=True)
        # 根据公式Y = W * X + B,可知传递到下一层的梯度为权重W,所以下一层的梯度 = 上一层的梯度 * 权重W
        bottom_diff = np.dot(top_diff, self.weight.T)
        return bottom_diff

    def update_param(self, lr):  # 参数更新
        # TODO:对全连接层参数利用参数进行更新
        # 根据梯度下降法可知权重更新公式为 W = W - lr * d(W)
        self.weight = self.weight - lr * self.d_weight
        # 根据梯度下降法可知偏置更新公式为 B = B - lr * d(B)
        self.bias = self.bias - lr * self.d_bias

    def load_param(self, weight, bias):  # 参数加载
        assert self.weight.shape == weight.shape
        assert self.bias.shape == bias.shape
        self.weight = weight
        self.bias = bias

    def save_param(self):  # 参数保存
        return self.weight, self.bias


class ReLULayer(object):
    def __init__(self):
        self.input = None
        print('\tReLU layer.')

    def forward(self, input):  # 前向传播的计算
        self.input = input
        # TODO:ReLU层的前向传播,计算输出结果
        # 根据ReLU的计算公式,Y = X (X > 0) : 0 (X < 0)
        # 可知 Y = max(0, X)
        output = np.maximum(0, self.input)
        return output

    def backward(self, top_diff):  # 反向传播的计算
        # TODO:ReLU层的反向传播,计算本层损失
        # 根据ReLU的计算公式,Y = X (X > 0) : 0 (X < 0)
        # 可知当 X > 0时,梯度为1;当 X < 0时,梯度为0
        # 所以可知反向传播公式 = 上一层的梯度 * (输入 > 0)
        bottom_diff = top_diff * (self.input > 0)
        return bottom_diff


class SigmoidLayer(object):
    def __init__(self):
        self.input = None
        print('\tSigmoid layer.')

    def forward(self, input):  # 前向传播的计算
        self.input = input
        # TODO:Sigmoid层的前向传播,计算输出结果
        output = 1 / (1 + np.exp(-self.input))
        return output

    def backward(self, top_diff):  # 反向传播的计算
        # TODO:Sigmoid层的反向传播,计算本层损失
        sigmoid_out = 1 / (1 + np.exp(-self.input))
        bottom_diff = top_diff * sigmoid_out * (1 - sigmoid_out)
        return bottom_diff


class SoftmaxLossLayer(object):
    def __init__(self):
        self.label_onehot = None
        self.batch_size = None
        self.prob = None
        print('\tSoftmax loss layer.')

    def forward(self, input):  # 前向传播的计算
        # TODO:softmax 损失层的前向传播,计算输出结果
        input_max = np.max(input, axis=1, keepdims=True)
        input_exp = np.exp(input - input_max)
        self.prob = input_exp / np.sum(input_exp, axis=1, keepdims=True)
        return self.prob

    def get_loss(self, label):  # 计算损失
        self.batch_size = self.prob.shape[0]
        self.label_onehot = np.zeros_like(self.prob)
        self.label_onehot[np.arange(self.batch_size), label] = 1.0
        loss = -np.sum(np.log(self.prob) * self.label_onehot) / self.batch_size
        return loss

    def backward(self):  # 反向传播的计算
        # TODO:softmax 损失层的反向传播,计算本层损失
        bottom_diff = (self.prob - self.label_onehot) / self.batch_size
        # print('bottom_diff', bottom_diff) # batch_size,10
        return bottom_diff

2.1.3、main.py文件

定义主函数,包括数据集、模型、训练评估

2.1.3.1、加载数据集

python
def show_matrix(mat, name):
    print(name + str(mat.shape) + ' mean %f, std %f' % (mat.mean(), mat.std()))


def load_mnist(file_dir, is_images=True):
    bin_file = open(file_dir, 'rb')
    bin_data = bin_file.read()
    bin_file.close()

    if is_images:
        fmt_header = '>iiii'
        magic, num_images, num_rows, num_cols = struct.unpack_from(fmt_header, bin_data, 0)
    else:
        fmt_header = '>ii'
        magic, num_images = struct.unpack_from(fmt_header, bin_data, 0)
        num_rows, num_cols = 1, 1

    data_size = num_images * num_rows * num_cols
    mat_data = struct.unpack_from('>' + str(data_size) + 'B', bin_data, struct.calcsize(fmt_header))
    mat_data = np.reshape(mat_data, [num_images, num_rows * num_cols])
    print(f'Load images from {file_dir}, number: {num_images}, data shape: {str(mat_data.shape)}')
    show_matrix(mat_data, 'load_mnist')
    return mat_data


class Mnist:
    def __init__(self, mnist_dir):
        self.mnist_dir = mnist_dir
        self.TRAIN_DATA = "train-images-idx3-ubyte"
        self.TRAIN_LABEL = "train-labels-idx1-ubyte"
        self.TEST_DATA = "t10k-images-idx3-ubyte"
        self.TEST_LABEL = "t10k-labels-idx1-ubyte"
        self.train_data = None
        self.test_data = None

    def load_data(self):
        # TODO: 调用函数 load_mnist 读取和预处理 MNIST中训练数据和测试数据的图像和标记
        print('Loading MNIST data from files...')
        train_images = load_mnist(os.path.join(self.mnist_dir, self.TRAIN_DATA), True)
        train_labels = load_mnist(os.path.join(self.mnist_dir, self.TRAIN_LABEL), False)
        test_images = load_mnist(os.path.join(self.mnist_dir, self.TEST_DATA), True)
        test_labels = load_mnist(os.path.join(self.mnist_dir, self.TEST_LABEL), False)
        self.train_data = np.append(train_images, train_labels, axis=1)
        self.test_data = np.append(test_images, test_labels, axis=1)
        return self.train_data, self.test_data

2.1.3.2、定义模型

python
class MLP(object):
    def __init__(self, hidden1, hidden2, hidden3, hidden4, batch_size=64, input_size=784, out_classes=10, lr=0.01):
        self.batch_size = batch_size
        self.input_size = input_size
        self.hidden1 = hidden1
        self.hidden2 = hidden2
        self.hidden3 = hidden3
        self.hidden4 = hidden4
        self.out_classes = out_classes
        self.lr = lr

    def build_model(self):  # 建立网络结构
        # TODO:建立三层神经网络结构
        print('Building multi-layer perception model...')
        self.fc1 = FullyConnectedLayer(self.input_size, self.hidden1)
        self.relu1 = ReLULayer()
        # self.sigmoid1 = SigmoidLayer()
        self.fc2 = FullyConnectedLayer(self.hidden1, self.hidden2)
        self.relu2 = ReLULayer()
        # self.sigmoid2 = SigmoidLayer()
        self.fc3 = FullyConnectedLayer(self.hidden2, self.hidden3)
        self.relu3 = ReLULayer()
        self.fc4 = FullyConnectedLayer(self.hidden3, self.hidden4)
        self.relu4 = ReLULayer()
        self.fc5 = FullyConnectedLayer(self.hidden4, self.out_classes)
        # self.relu5 = ReLULayer()
        # self.fc6 = FullyConnectedLayer(self.hidden5, self.hidden6)
        # self.relu6 = ReLULayer()
        # self.fc7 = FullyConnectedLayer(self.hidden6, self.hidden7)
        # self.relu7 = ReLULayer()
        # self.fc8 = FullyConnectedLayer(self.hidden7, self.out_classes)
        self.softmax = SoftmaxLossLayer()
        # self.update_layer_list = [self.fc1, self.fc2]
        # self.update_layer_list = [self.fc1, self.fc2, self.fc3]
        self.update_layer_list = [self.fc1, self.fc2, self.fc3, self.fc4, self.fc5]
        # self.update_layer_list = [self.fc1, self.fc2, self.fc3, self.fc4, self.fc5, self.fc6, self.fc7, self.fc8]

    def init_model(self):
        print('Initializing parameters of each layer in MLP...')
        for layer in self.update_layer_list:
            layer.init_param()

    def load_model(self, param_dir):
        print('Loading parameters from file ' + param_dir)
        params = np.load(param_dir, allow_pickle=True).item()
        self.fc1.load_param(params['w1'], params['b1'])
        self.fc2.load_param(params['w2'], params['b2'])
        self.fc3.load_param(params['w3'], params['b3'])
        self.fc4.load_param(params['w4'], params['b4'])
        self.fc5.load_param(params['w5'], params['b5'])
        # self.fc6.load_param(params['w6'], params['b6'])
        # self.fc7.load_param(params['w7'], params['b7'])
        # self.fc8.load_param(params['w8'], params['b8'])

    def save_model(self, param_dir):
        print('Saving parameters to file ' + param_dir)
        params = {
            'w1': self.fc1.save_param()[0],
            'b1': self.fc1.save_param()[1],
            'w2': self.fc2.save_param()[0],
            'b2': self.fc2.save_param()[1],
            'w3': self.fc3.save_param()[0],
            'b3': self.fc3.save_param()[1],
            'w4': self.fc4.save_param()[0],
            'b4': self.fc4.save_param()[1],
            'w5': self.fc5.save_param()[0],
            'b5': self.fc5.save_param()[1]
        }
        # params['w6'], params['b6'] = self.fc6.save_param()
        # params['w7'], params['b7'] = self.fc7.save_param()
        # params['w8'], params['b8'] = self.fc8.save_param()
        np.save(param_dir, params)

    def forward(self, input):  # 神经网络的前向传播
        # TODO:神经网络的前向传播
        # h1 = self.fc1.forward(input)
        # h1 = self.relu1.forward(h1)
        # h2 = self.fc2.forward(h1)
        # prob = self.softmax.forward(h2)

        # h1 = self.fc1.forward(input)
        # h1 = self.relu1.forward(h1)
        # # h1 = self.sigmoid1.forward(h1)
        # h2 = self.fc2.forward(h1)
        # h2 = self.relu2.forward(h2)
        # # h2 = self.sigmoid2.forward(h2)
        # h3 = self.fc3.forward(h2)
        # prob = self.softmax.forward(h3)
        # print('input', input.shape)  # 100,784
        h1 = self.fc1.forward(input)
        # print('h1', h1.shape)  # 100,256
        h1 = self.relu1.forward(h1)
        # print('h1', h1.shape)  # 100,256
        h2 = self.fc2.forward(h1)
        # print('h2', h2.shape)  # 100,128
        h2 = self.relu2.forward(h2)
        # print('h2', h2.shape)  # 100,128
        h3 = self.fc3.forward(h2)
        # print('h3', h3.shape)  # 100,64
        h3 = self.relu3.forward(h3)
        # print('h3', h3.shape)  # 100,64
        h4 = self.fc4.forward(h3)
        # print('h4', h4.shape)  # 100,16
        h4 = self.relu4.forward(h4)
        # print('h4', h4.shape)  # 100,16
        h5 = self.fc5.forward(h4)
        # print('h5', h5.shape)  # 100,10
        prob = self.softmax.forward(h5)
        # print('prob', prob.shape)  # 100,10

        # h1 = self.fc1.forward(input)
        # h1 = self.relu1.forward(h1)
        # h2 = self.fc2.forward(h1)
        # h2 = self.relu2.forward(h2)
        # h3 = self.fc3.forward(h2)
        # h3 = self.relu3.forward(h3)
        # h4 = self.fc4.forward(h3)
        # h4 = self.relu4.forward(h4)
        # h5 = self.fc5.forward(h4)
        # h5 = self.relu5.forward(h5)
        # h6 = self.fc6.forward(h5)
        # h6 = self.relu6.forward(h6)
        # h7 = self.fc7.forward(h6)
        # h7 = self.relu7.forward(h7)
        # h8 = self.fc8.forward(h7)
        # prob = self.softmax.forward(h8)

        return prob

    def backward(self):  # 神经网络的反向传播
        # TODO:神经网络的反向传播

        # dloss = self.softmax.backward()
        # dh2 = self.fc2.backward(dloss)
        # dh1 = self.relu1.backward(dh2)
        # dh1 = self.fc1.backward(dh1)

        # dloss = self.softmax.backward()
        # dh3 = self.fc3.backward(dloss)
        # dh2 = self.relu2.backward(dh3)
        # dh2 = self.fc2.backward(dh2)
        # dh1 = self.relu1.backward(dh2)
        # dh1 = self.fc1.backward(dh1)

        dloss = self.softmax.backward()
        dh5 = self.fc5.backward(dloss)
        dh4 = self.relu4.backward(dh5)
        dh4 = self.fc4.backward(dh4)
        dh3 = self.relu3.backward(dh4)
        dh3 = self.fc3.backward(dh3)
        dh2 = self.relu2.backward(dh3)
        dh2 = self.fc2.backward(dh2)
        dh1 = self.relu1.backward(dh2)
        dh1 = self.fc1.backward(dh1)
        # dloss = self.softmax.backward()
        # dh8 = self.fc8.backward(dloss)
        # dh7 = self.relu7.backward(dh8)
        # dh7 = self.fc7.backward(dh7)
        # dh6 = self.relu6.backward(dh7)
        # dh6 = self.fc6.backward(dh6)
        # dh5 = self.relu5.backward(dh6)
        # dh5 = self.fc5.backward(dh5)
        # dh4 = self.relu4.backward(dh5)
        # dh4 = self.fc4.backward(dh4)
        # dh3 = self.relu3.backward(dh4)
        # dh3 = self.fc3.backward(dh3)
        # dh2 = self.relu2.backward(dh3)
        # dh2 = self.fc2.backward(dh2)
        # dh1 = self.relu1.backward(dh2)
        # dh1 = self.fc1.backward(dh1)

    def update(self, lr):
        for layer in self.update_layer_list:
            layer.update_param(lr)

2.1.3.3、训练和评估

python
def train(model, train_data, max_epoch=10, print_iter=100):
    max_batch = train_data.shape[0] // model.batch_size
    print('Start training...')
    for epoch in range(max_epoch):
        print('Randomly shuffle MNIST data...')
        np.random.shuffle(train_data)
        loss_list = []
        for batch_idx in range(max_batch):
            batch_images = train_data[batch_idx * model.batch_size:(batch_idx + 1) * model.batch_size, :-1]
            batch_labels = train_data[batch_idx * model.batch_size:(batch_idx + 1) * model.batch_size, -1]
            # print('batch_images: ', batch_images.shape)  # 100,784
            # print('batch_labels:', batch_labels.shape)  # 100
            prob = model.forward(batch_images)
            loss = model.softmax.get_loss(batch_labels)
            loss_list.append(loss)
            model.backward()
            model.update(model.lr)

            if batch_idx % print_iter == 0:
                print(f'Epoch {epoch}, iter {batch_idx}, loss: {loss:.6f}')

        print(f"Epoch {epoch} Average loss: {np.mean(loss_list):.6f}")


def evaluate(model, test_data):
    pred_results = np.zeros([test_data.shape[0]])
    for idx in range(test_data.shape[0] // model.batch_size):
        batch_images = test_data[idx * model.batch_size:(idx + 1) * model.batch_size, :-1]
        prob = model.forward(batch_images)
        pred_labels = np.argmax(prob, axis=1)
        pred_results[idx * model.batch_size:(idx + 1) * model.batch_size] = pred_labels
    if test_data.shape[0] % model.batch_size > 0:
        last_batch = test_data.shape[0] // model.batch_size * model.batch_size
        batch_images = test_data[-last_batch:, :-1]
        prob = model.forward(batch_images)
        pred_labels = np.argmax(prob, axis=1)
        pred_results[-last_batch:] = pred_labels
    # print('pred_results', len(pred_results))
    accuracy = np.mean(pred_results == test_data[:, -1])
    print(f'Accuracy in test set: {accuracy:.6f}')

2.1.3.4、完整代码

python
import numpy as np
import time
import os
import struct
from layers import FullyConnectedLayer, ReLULayer, SoftmaxLossLayer


def show_matrix(mat, name):
    print(name + str(mat.shape) + ' mean %f, std %f' % (mat.mean(), mat.std()))


def load_mnist(file_dir, is_images=True):
    bin_file = open(file_dir, 'rb')
    bin_data = bin_file.read()
    bin_file.close()

    if is_images:
        fmt_header = '>iiii'
        magic, num_images, num_rows, num_cols = struct.unpack_from(fmt_header, bin_data, 0)
    else:
        fmt_header = '>ii'
        magic, num_images = struct.unpack_from(fmt_header, bin_data, 0)
        num_rows, num_cols = 1, 1

    data_size = num_images * num_rows * num_cols
    mat_data = struct.unpack_from('>' + str(data_size) + 'B', bin_data, struct.calcsize(fmt_header))
    mat_data = np.reshape(mat_data, [num_images, num_rows * num_cols])
    print(f'Load images from {file_dir}, number: {num_images}, data shape: {str(mat_data.shape)}')
    show_matrix(mat_data, 'load_mnist')
    return mat_data


class Mnist:
    def __init__(self, mnist_dir):
        self.mnist_dir = mnist_dir
        self.TRAIN_DATA = "train-images-idx3-ubyte"
        self.TRAIN_LABEL = "train-labels-idx1-ubyte"
        self.TEST_DATA = "t10k-images-idx3-ubyte"
        self.TEST_LABEL = "t10k-labels-idx1-ubyte"
        self.train_data = None
        self.test_data = None

    def load_data(self):
        # TODO: 调用函数 load_mnist 读取和预处理 MNIST中训练数据和测试数据的图像和标记
        print('Loading MNIST data from files...')
        train_images = load_mnist(os.path.join(self.mnist_dir, self.TRAIN_DATA), True)
        train_labels = load_mnist(os.path.join(self.mnist_dir, self.TRAIN_LABEL), False)
        test_images = load_mnist(os.path.join(self.mnist_dir, self.TEST_DATA), True)
        test_labels = load_mnist(os.path.join(self.mnist_dir, self.TEST_LABEL), False)
        self.train_data = np.append(train_images, train_labels, axis=1)
        self.test_data = np.append(test_images, test_labels, axis=1)
        return self.train_data, self.test_data


class MLP(object):
    def __init__(self, hidden1, hidden2, hidden3, hidden4, batch_size=64, input_size=784, out_classes=10, lr=0.01):
        self.batch_size = batch_size
        self.input_size = input_size
        self.hidden1 = hidden1
        self.hidden2 = hidden2
        self.hidden3 = hidden3
        self.hidden4 = hidden4
        self.out_classes = out_classes
        self.lr = lr

    def build_model(self):  # 建立网络结构
        # TODO:建立三层神经网络结构
        print('Building multi-layer perception model...')
        self.fc1 = FullyConnectedLayer(self.input_size, self.hidden1)
        self.relu1 = ReLULayer()
        # self.sigmoid1 = SigmoidLayer()
        self.fc2 = FullyConnectedLayer(self.hidden1, self.hidden2)
        self.relu2 = ReLULayer()
        # self.sigmoid2 = SigmoidLayer()
        self.fc3 = FullyConnectedLayer(self.hidden2, self.hidden3)
        self.relu3 = ReLULayer()
        self.fc4 = FullyConnectedLayer(self.hidden3, self.hidden4)
        self.relu4 = ReLULayer()
        self.fc5 = FullyConnectedLayer(self.hidden4, self.out_classes)
        # self.relu5 = ReLULayer()
        # self.fc6 = FullyConnectedLayer(self.hidden5, self.hidden6)
        # self.relu6 = ReLULayer()
        # self.fc7 = FullyConnectedLayer(self.hidden6, self.hidden7)
        # self.relu7 = ReLULayer()
        # self.fc8 = FullyConnectedLayer(self.hidden7, self.out_classes)
        self.softmax = SoftmaxLossLayer()
        # self.update_layer_list = [self.fc1, self.fc2]
        # self.update_layer_list = [self.fc1, self.fc2, self.fc3]
        self.update_layer_list = [self.fc1, self.fc2, self.fc3, self.fc4, self.fc5]
        # self.update_layer_list = [self.fc1, self.fc2, self.fc3, self.fc4, self.fc5, self.fc6, self.fc7, self.fc8]

    def init_model(self):
        print('Initializing parameters of each layer in MLP...')
        for layer in self.update_layer_list:
            layer.init_param()

    def load_model(self, param_dir):
        print('Loading parameters from file ' + param_dir)
        params = np.load(param_dir, allow_pickle=True).item()
        self.fc1.load_param(params['w1'], params['b1'])
        self.fc2.load_param(params['w2'], params['b2'])
        self.fc3.load_param(params['w3'], params['b3'])
        self.fc4.load_param(params['w4'], params['b4'])
        self.fc5.load_param(params['w5'], params['b5'])
        # self.fc6.load_param(params['w6'], params['b6'])
        # self.fc7.load_param(params['w7'], params['b7'])
        # self.fc8.load_param(params['w8'], params['b8'])

    def save_model(self, param_dir):
        print('Saving parameters to file ' + param_dir)
        params = {
            'w1': self.fc1.save_param()[0],
            'b1': self.fc1.save_param()[1],
            'w2': self.fc2.save_param()[0],
            'b2': self.fc2.save_param()[1],
            'w3': self.fc3.save_param()[0],
            'b3': self.fc3.save_param()[1],
            'w4': self.fc4.save_param()[0],
            'b4': self.fc4.save_param()[1],
            'w5': self.fc5.save_param()[0],
            'b5': self.fc5.save_param()[1]
        }
        # params['w6'], params['b6'] = self.fc6.save_param()
        # params['w7'], params['b7'] = self.fc7.save_param()
        # params['w8'], params['b8'] = self.fc8.save_param()
        np.save(param_dir, params)

    def forward(self, input):  # 神经网络的前向传播
        # TODO:神经网络的前向传播
        # h1 = self.fc1.forward(input)
        # h1 = self.relu1.forward(h1)
        # h2 = self.fc2.forward(h1)
        # prob = self.softmax.forward(h2)

        # h1 = self.fc1.forward(input)
        # h1 = self.relu1.forward(h1)
        # # h1 = self.sigmoid1.forward(h1)
        # h2 = self.fc2.forward(h1)
        # h2 = self.relu2.forward(h2)
        # # h2 = self.sigmoid2.forward(h2)
        # h3 = self.fc3.forward(h2)
        # prob = self.softmax.forward(h3)
        # print('input', input.shape)  # 100,784
        h1 = self.fc1.forward(input)
        # print('h1', h1.shape)  # 100,256
        h1 = self.relu1.forward(h1)
        # print('h1', h1.shape)  # 100,256
        h2 = self.fc2.forward(h1)
        # print('h2', h2.shape)  # 100,128
        h2 = self.relu2.forward(h2)
        # print('h2', h2.shape)  # 100,128
        h3 = self.fc3.forward(h2)
        # print('h3', h3.shape)  # 100,64
        h3 = self.relu3.forward(h3)
        # print('h3', h3.shape)  # 100,64
        h4 = self.fc4.forward(h3)
        # print('h4', h4.shape)  # 100,16
        h4 = self.relu4.forward(h4)
        # print('h4', h4.shape)  # 100,16
        h5 = self.fc5.forward(h4)
        # print('h5', h5.shape)  # 100,10
        prob = self.softmax.forward(h5)
        # print('prob', prob.shape)  # 100,10

        # h1 = self.fc1.forward(input)
        # h1 = self.relu1.forward(h1)
        # h2 = self.fc2.forward(h1)
        # h2 = self.relu2.forward(h2)
        # h3 = self.fc3.forward(h2)
        # h3 = self.relu3.forward(h3)
        # h4 = self.fc4.forward(h3)
        # h4 = self.relu4.forward(h4)
        # h5 = self.fc5.forward(h4)
        # h5 = self.relu5.forward(h5)
        # h6 = self.fc6.forward(h5)
        # h6 = self.relu6.forward(h6)
        # h7 = self.fc7.forward(h6)
        # h7 = self.relu7.forward(h7)
        # h8 = self.fc8.forward(h7)
        # prob = self.softmax.forward(h8)

        return prob

    def backward(self):  # 神经网络的反向传播
        # TODO:神经网络的反向传播

        # dloss = self.softmax.backward()
        # dh2 = self.fc2.backward(dloss)
        # dh1 = self.relu1.backward(dh2)
        # dh1 = self.fc1.backward(dh1)

        # dloss = self.softmax.backward()
        # dh3 = self.fc3.backward(dloss)
        # dh2 = self.relu2.backward(dh3)
        # dh2 = self.fc2.backward(dh2)
        # dh1 = self.relu1.backward(dh2)
        # dh1 = self.fc1.backward(dh1)

        dloss = self.softmax.backward()
        dh5 = self.fc5.backward(dloss)
        dh4 = self.relu4.backward(dh5)
        dh4 = self.fc4.backward(dh4)
        dh3 = self.relu3.backward(dh4)
        dh3 = self.fc3.backward(dh3)
        dh2 = self.relu2.backward(dh3)
        dh2 = self.fc2.backward(dh2)
        dh1 = self.relu1.backward(dh2)
        dh1 = self.fc1.backward(dh1)
        # dloss = self.softmax.backward()
        # dh8 = self.fc8.backward(dloss)
        # dh7 = self.relu7.backward(dh8)
        # dh7 = self.fc7.backward(dh7)
        # dh6 = self.relu6.backward(dh7)
        # dh6 = self.fc6.backward(dh6)
        # dh5 = self.relu5.backward(dh6)
        # dh5 = self.fc5.backward(dh5)
        # dh4 = self.relu4.backward(dh5)
        # dh4 = self.fc4.backward(dh4)
        # dh3 = self.relu3.backward(dh4)
        # dh3 = self.fc3.backward(dh3)
        # dh2 = self.relu2.backward(dh3)
        # dh2 = self.fc2.backward(dh2)
        # dh1 = self.relu1.backward(dh2)
        # dh1 = self.fc1.backward(dh1)

    def update(self, lr):
        for layer in self.update_layer_list:
            layer.update_param(lr)


def train(model, train_data, max_epoch=10, print_iter=100):
    max_batch = train_data.shape[0] // model.batch_size
    print('Start training...')
    for epoch in range(max_epoch):
        print('Randomly shuffle MNIST data...')
        np.random.shuffle(train_data)
        loss_list = []
        for batch_idx in range(max_batch):
            batch_images = train_data[batch_idx * model.batch_size:(batch_idx + 1) * model.batch_size, :-1]
            batch_labels = train_data[batch_idx * model.batch_size:(batch_idx + 1) * model.batch_size, -1]
            # print('batch_images: ', batch_images.shape)  # 100,784
            # print('batch_labels:', batch_labels.shape)  # 100
            prob = model.forward(batch_images)
            loss = model.softmax.get_loss(batch_labels)
            loss_list.append(loss)
            model.backward()
            model.update(model.lr)

            if batch_idx % print_iter == 0:
                print(f'Epoch {epoch}, iter {batch_idx}, loss: {loss:.6f}')

        print(f"Epoch {epoch} Average loss: {np.mean(loss_list):.6f}")


def evaluate(model, test_data):
    pred_results = np.zeros([test_data.shape[0]])
    for idx in range(test_data.shape[0] // model.batch_size):
        batch_images = test_data[idx * model.batch_size:(idx + 1) * model.batch_size, :-1]
        prob = model.forward(batch_images)
        pred_labels = np.argmax(prob, axis=1)
        pred_results[idx * model.batch_size:(idx + 1) * model.batch_size] = pred_labels
    if test_data.shape[0] % model.batch_size > 0:
        last_batch = test_data.shape[0] // model.batch_size * model.batch_size
        batch_images = test_data[-last_batch:, :-1]
        prob = model.forward(batch_images)
        pred_labels = np.argmax(prob, axis=1)
        pred_results[-last_batch:] = pred_labels
    # print('pred_results', len(pred_results))
    accuracy = np.mean(pred_results == test_data[:, -1])
    print(f'Accuracy in test set: {accuracy:.6f}')


if __name__ == '__main__':
    h1, h2, h3, h4, e = 256, 128, 64, 16, 10
    mnist = Mnist(mnist_dir=r".\mnist_data")
    train_data, test_data = mnist.load_data()
    mlp = MLP(hidden1=h1, hidden2=h2, hidden3=h3, hidden4=h4)
    mlp.build_model()
    mlp.init_model()

    train(mlp, train_data, e)
    mlp.save_model('mlp-%d-%d-%depoch.npy' % (h1, h2, e))

    # mlp.load_model('mlp-%d-%d-%depoch.npy' % (h1, h2, e))

    start = time.time()
    evaluate(mlp, test_data)
    end = time.time()
    print(f"Inferencing time: {end - start:.6f}")

上面实现的是训练+保存+评估模型,如果要实现读取+评估模型,那么修改if __name__ == '__main__':的代码:

python
if __name__ == '__main__':
    h1, h2, h3, h4, e = 256, 128, 64, 16, 10
    mnist = Mnist(mnist_dir=r".\mnist_data")
    train_data, test_data = mnist.load_data()
    mlp = MLP(hidden1=h1, hidden2=h2, hidden3=h3, hidden4=h4)
    mlp.build_model()
    mlp.init_model()

    # train(mlp, train_data, e)
    # mlp.save_model('mlp-%d-%d-%depoch.npy' % (h1, h2, e))

    mlp.load_model('mlp-%d-%d-%depoch.npy' % (h1, h2, e))

    start = time.time()
    evaluate(mlp, test_data)
    end = time.time()
    print(f"Inferencing time: {end - start:.6f}")
最近更新