手写数字识别是属于机器学习中的分类问题,它有许多的机器学习的算法可以解决,如SVM,CNN等。最近在Kaggle上看到这一问题,并且恰巧之前有学过一些卷积神经网络(Convolutional Neural Network),于是打算用Python写一个简单CNN,实现手写数字识别器。

      首先建立一个手写识别器的模型(如下图),它由输入层(input),采样层(max pooling),特征提取层(convolution)和连接层(connected)构成。

DR0

      手写数字的数据来源于Kaggle上的题目:Digit Recognizer。它的每一项数据是由一个2828个像素点组成。因此我们可将输入层的数据定义为一个2828的矩阵A,然后通过处理矩阵A来最终判断它可能代表哪一个数字。

digit

      一般情况下,一张图片缩小一定的倍数是不会影响到识别的。而考虑到输入的每一个矩阵A需要处理的数据有784项。在训练时,这么多数据需要很长时间才能处理完。因此我们可以先做一次采样,将2828的矩阵A转化为一个1414的矩阵B,这样需要处理的数据就缩减为原来的1/4。即,max pooling层所做的就是采样,它的具体实现算法如下图,取每个2*2矩阵中最大的那个数。

DR1

      对于convolution层,它做的主要工作是对图片的某一个特征进行提取。具体的算法如下图,对数据矩阵B特定部分与特征矩阵F进行相乘(卷积运算),若原始数据与特征矩阵越匹配,则计算出的结果越大。通过不同的特征矩阵F分别提起出图片的不同特征,而F类似是一个滤波器,只留下特定的信息。在不同的问题中,特征矩阵F一般是不同的,可以通过对数据的不断学习,得出一些比较好的特征矩阵F。但是在下面所实现的Python代码中,直接定义了四个值固定的特征矩阵,而并没有通过学习得到特征矩阵F,主要是简化实现CNN的代码。

DR2

      当特征提取完成后,我们可以根据提取的数据进行分类了。即我们通过connected层计算出来分类结果。connected内部的结构如下图所示,它有十个神经元分别表示0到9,它们各自都有144个输入,通过144个输入与不同的权值计算,然后通过一个激励函数得出结果,结果最高的那个神经元的序号为手写数字识别的最终输出。

DR3

      我们对图中的结构通过数学公式来定义,输入x与输出y之间的关系如下式。因此,需要机器学习的就是参数w和参数b。对于这两个参数可以借用梯度下降的算法来实现。首先定义损失函数E,然后需要分别对w和b求导,最后根据导数值不断更新参数w和b,使得损失函数E取到最小值。

DR4

      通过Python实现上述模型,使用Kaggle上Digit Recognizer的训练集进行学习,最终在训练集上得到的正确率为86.04%。在Kaggle上的测试集所得到的正确率为85.37%。对于上述模型,有几个方面可以进一步优化,从而提高正确率:一是特征矩阵可以通过机器学习的方式来进行选择,此时需要借用一些算法计算梯度来更新特征矩阵,如BP算法;二是可以增加多层convolution和max pooling层;三是本模型中的connected层只有一层结构,可以增加多层神经元,而在一般CNN中Fully Connected Feedforward Network层,就是通过多层神经网络来实现。

DRS


源代码:

import numpy as np
import math
import pandas

def max_num(a,b) :
        if a > b :
                return a
        else :
                return b

def max_pooling(A, n) :
        B = np.zeros((n,n))
        for i in range(n) :
                for j in range(n) :
                        a = max_num(A[2*i] [ j*2],  A[2*i] [ j*2+1])
                        b = max_num(A[2*i+1] [ j*2], A[2*i+1 ] [j*2+1])
                        B[i,j] = max_num(a,b)
        return B
        
def converlution(A, B, n, s) :
        C = np.zeros((n,n))
        for i in range(n) :
                for j in range(n) :
                        C[i][j] = np.sum(A[i:i+s, j:j+s] * B)
        return C
     
def sigmoid(x) :
        c = math.e**(-x)
        return 1 / (1 + c)
        
def f_sigmoid(y) :
        for i in range(len(y)) :
                y[i] = sigmoid(y[i])
        return y
        
def cnetwork(x, w, b) :
        y = np.zeros(10)
        for i in range(10) :
                for h in range(144) :
                        y[i] += x[h] * w[h][i] 
                y[i] += b[i]        
        y = f_sigmoid(y)
        return y
 
def max_array(y) :
        n = len(y)
        t = y[0]
        j = 0
        for i in range(n) :
                if t < y[i] :
                        t = y[i]
                        j = i
        return j               
         
def update_w(w, step, z, y, r) :
        d = y - r
        for i in range(144) :
                for h in range(10) :
                        #并未乘上sigmoid的导数值
                        #原因是sigmoid函数除了在0附近的导数之外,其余地方的导数都很小,导致收敛会非常慢,而sigmoid的导数都大于零,因此忽略其数值
                        w[i][h] = w[i][h] - step * d[h] * x[i]
        return w

def update_b(b, step, y, r) :
        d = y - r
        for h in range(10) :
                b[h] = b[h] - step * d[h]
        return b
        
if __name__ == "__main__" :
        
        step1 = 0.01
        step2 = 0.01
        data = pandas.DataFrame(pandas.read_csv("train.csv"))
        Y = data.values[:, 0]
        X = data.values[:, 1:]
        num = len(Y) 
        
    
        f1 = np.array([[0,1,0],[0,1,0],[0,1,0]])
        f2 = np.array([[0,0,0],[1,1,1],[0,0,0]])
        f3 = np.array([[1,0,0],[0,1,0],[0,0,1]])   
        f4 = np.array([[0,0,1],[0,1,0],[1,0,0]])   
     
        w = np.random.random((144, 10))
        b = np.random.random(10)
        
        print("Training ...")
        for j in range(3) :
                print(j)
                for i in range(num) :
                        A = max_pooling(X[i,:].reshape((28,28)), 14)
                        B1 = converlution(A, f1, 12, 3)
                        B2 = converlution(A, f2, 12, 3)
                        B3 = converlution(A, f3, 12, 3)
                        B4 = converlution(A, f4, 12, 3)
                        C1 = max_pooling(B1, 6)
                        C2 = max_pooling(B2, 6)
                        C3 = max_pooling(B3, 6)
                        C4 = max_pooling(B4, 6)
                        x = np.append(np.append(C1.flatten(),C2.flatten()), np.append(C3.flatten(),C4.flatten()))
                        y = cnetwork(x, w, b)                            
                        r = np.zeros(10)
                        r[Y[i]] = 1
                        b = update_b(b, step1, y,r)
                        w = update_w(w, step2, x, y,r)
                step1 = step1/10
                step2 = step2/10
                
        error = 0
        for j in range(num) :
                A = max_pooling(X[j,:].reshape((28,28)), 14)
                B1 = converlution(A, f1, 12, 3)
                B2 = converlution(A, f2, 12, 3)
                B3 = converlution(A, f3, 12, 3)
                B4 = converlution(A, f4, 12, 3)
                C1 = max_pooling(B1, 6)
                C2 = max_pooling(B2, 6)
                C3 = max_pooling(B3, 6)
                C4 = max_pooling(B4, 6)
                x = np.append(np.append(C1.flatten(),C2.flatten()), np.append(C3.flatten(),C4.flatten()))
                y = cnetwork(x, w, b)                                          
                if Y[j] != max_array(y) :
                        error += 1
                        
                        
        print("error:")
        print(error)
        print("Scored:")                       
        print(1- error/num) 

         
        print("Testing ...")
        test = pandas.DataFrame(pandas.read_csv("test.csv"))
        T = test.values
        num = len(T)
        id = []
        label = []
        for k in range(num) :
                A = max_pooling(T[k,:].reshape((28,28)), 14)
                B1 = converlution(A, f1, 12, 3)
                B2 = converlution(A, f2, 12, 3)
                B3 = converlution(A, f3, 12, 3)
                B4 = converlution(A, f4, 12, 3)
                C1 = max_pooling(B1, 6)
                C2 = max_pooling(B2, 6)
                C3 = max_pooling(B3, 6)
                C4 = max_pooling(B4, 6)
                x = np.append(np.append(C1.flatten(),C2.flatten()), np.append(C3.flatten(),C4.flatten()))
                y = cnetwork(x, w, b)                                          
                id.append(k+1)
                label.append(max_array(y))
        
        save = pandas.DataFrame({'ImageId': id, 'Label': label})
        save.to_csv('submission.csv',index = False,)