编码实现Dropout正则化编码

wuchangjian2021-11-05 19:04:56编程学习

dropout_layers.py

包含了Dropout前向传播以及反向传播,组合Dropout传播层

#-*- coding: utf-8 -*-
import numpy as np
from .layers import *

def dropout_forward(x, dropout_param):
    """
    执行dropout前向传播过程。
    Inputs:
    - x: 输入数据
    - dropout_param: 字典类型的dropout参数,使用下列键值:
        - p: dropout激活参数,每个神经元的激活概率p。
        - mode: 'test'或'train',train:使用激活概率p与神经元进行"and"运算;
                                                            test:去除激活概率p仅仅返回输入值。
        - seed: 随机数生成种子. 
    Outputs:
    - out: 和输入数据形状相同。
    - cache:元组(dropout_param, mask). 
                    训练模式:掩码mask用于激活该层神经元为“1”激活,为“0”抑制。
                    测试模式:去除掩码操作。
    """
    p, mode = dropout_param['p'], dropout_param['mode']
    # 设置随机种子,控制输出稳定
    if 'seed' in dropout_param:
        np.random.seed(dropout_param['seed'])

    mask = None  # 掩码层,用于筛选神经元
    out = None  # 输出层结果

    # 训练阶段
    if mode == 'train':
        ###########################################################################
        #                                任务:执行训练阶段dropout前向传播。                                            #
        ###########################################################################
        # 使用mask=mask/p,这样就可以保证输出均值与输入均值相同
        mask = (np.random.rand(*x.shape)<p)/p
        out = x*mask
        ###########################################################################
        #                                                     结束编码                                                                            #
        ###########################################################################
    # 测试阶段
    elif mode == 'test':
        ###########################################################################
        #                             任务: 执行测试阶段dropout前向传播。                                            #
        ###########################################################################
        out = x
        ###########################################################################
        #                                                     结束编码                                                                            #
        ###########################################################################
    # 输入参数与掩码层
    cache = (dropout_param, mask)
    out = out.astype(x.dtype, copy=False)
    return out, cache


def dropout_backward(dout, cache):
    """
    dropout反向传播过程。
    Inputs:
    - dout: 上层梯度,形状和其输入相同。
    - cache: 前向传播中的缓存(dropout_param, mask)。
    """
    dropout_param, mask = cache
    mode = dropout_param['mode']
    
    dx = None
    if mode == 'train':
        ###########################################################################
        #                                            任务:实现dropout反向传播                                                    #
        ###########################################################################
        dx = dout*mask
        ###########################################################################
        #                                                        结束编码                                                                         #
        ###########################################################################
    elif mode == 'test':
        dx = dout
    return dx

def affine_relu_dropout_forward(x,w,b,dropout_param):
    """
    组合affine_relu_dropout前向传播过程。
    Inputs:
    - x: 输入数据,其形状为(N, d_1, ..., d_k)的numpy数组。
    - w: 权重矩阵,其形状为(D,M)的numpy数组,
             D表示输入数据维度,M表示输出数据维度。
             可以将D看成输入的神经元个数,M看成输出神经元个数。
    - b: 偏置向量,其形状为(M,)的numpy数组。
    
    - dropout_param: 字典类型的dropout参数,使用下列键值:
        - p: dropout激活参数,每个神经元的激活概率p。
        - mode: 'test'或'train',train:使用激活概率p与神经元进行"and"运算;
                                                            test:去除激活概率p仅仅返回输入值。
        - seed: 随机数生成种子.    

    Outputs:
    - out: 和输入数据形状相同。
    - cache:缓存包含(cache_affine,cache_relu,cache_dropout)
                    cache_affine:仿射前向传播的各项缓存;
                    cache_relu:ReLU前向传播的各项缓存;
                    cache_dropout:dropout前向传播的各项缓存。
    """ 
    out_dropout = None
    cache = None
    #############################################################################
    #                             任务: 实现 affine_relu_dropout 神经元前向传播.                            #
    #                 注意:你需要调用affine_forward以及relu_forward函数,                            #
    #                            并将各自的缓存保存在cache中                                                                    #
    #############################################################################    
    out_affine, cache_affine = affine_forward(x, w, b)
    out_relu, cache_relu = relu_forward(out_affine)
    out_dropout, cache_dropout = dropout_forward(out_relu, dropout_param)
    cache = (cache_affine, cache_relu, cache_dropout)
    ###########################################################################
    #                                                        结束编码                                                                         #
    ###########################################################################        
    return out_dropout,cache

def affine_relu_dropout_backward(dout,cache):
    """
     affine_relu_dropout神经元的反向传播过程。
     
    Input:
    - dout: 形状为(N, M)的上层梯度。
    - cache: 缓存(cache_affine,cache_relu,cache_dropout)。

    Returns:
    - dx: 输入数据x的梯度,其形状为(N, d1, ..., d_k)
    - dw: 权重矩阵w的梯度,其形状为(D,M)
    - db: 偏置项b的梯度,其形状为(M,)
    """    
    cache_affine,cache_relu,cache_dropout = cache
    dx,dw,db=None,None,None
    ###########################################################################
    #                             任务:实现affine_relu_dropout反向传播                                         #
    ###########################################################################    
    ddropout = dropout_backward(dout, cache_dropout)
    drelu = relu_backward(ddropout, cache_relu)
    dx, dw, db = affine_backward(drelu, cache_affine)     
    ###########################################################################
    #                                                    结束编码                                                                            #
    ###########################################################################
    return dx,dw,db
    

layers.py

之前已经写好的前向传播与后向传播代码以及softmax的损失函数。

#-*- coding: utf-8 -*-
import numpy as np

def affine_forward(x, w, b):
    """
    计算神经网络当前层的前馈传播,该方法计算在全连接情况下的得分函数。
    注:如果不理解affine仿射变换,简单的理解为在全连接情况下的得分函数即可。

    输入数据x的形状为(N, d_1, ..., d_k),其中N表示数据量,(d_1, ..., d_k)表示
    每一通道的数据维度,如果是图片数据就为(长,宽,色道)。数据的总维度为
    D = d_1 * ... * d_k,因此我们需要数据整合成完整的(N,D)形式再进行仿射变换。
    
    Inputs:
    - x: 输入数据,其形状为(N, d_1, ..., d_k)的numpy数组。
    - w: 权重矩阵,其形状为(D,M)的numpy数组,D表示输入数据维度,M表示输出数据维度
             可以将D看成输入的神经元个数,M看成输出神经元个数。
    - b: 偏置向量,其形状为(M,)的numpy数组。
    
    Returns 元组:
    - out: 形状为(N, M)的输出结果。
    - cache: 将输入进行缓存(x, w, b)。
    """
    out = None
    #############################################################################
    #                                            任务: 实现全连接前向传播                                                         #
    #                                     注:首先你需要将输入数据重塑成行。                                            #
    #############################################################################
    N=x.shape[0]
    x_new=x.reshape(N,-1)#将x重塑成2维向量
    out=np.dot(x_new,w)+b
    #############################################################################
    #                                                         结束编码                                                                            #
    #############################################################################
    cache = (x, w, b)
    return out, cache


def affine_backward(dout, cache):
    """
 计算仿射层的反向传播.

    Inputs:
    - dout: 形状为(N, M)的上层梯度
    - cache: 元组:
        - x: (N, d_1, ... d_k)的输入数据
        - w: 形状为(D, M)的权重矩阵

    Returns 元组:
    - dx: 输入数据x的梯度,其形状为(N, d1, ..., d_k)
    - dw: 权重矩阵w的梯度,其形状为(D,M)
    - db: 偏置项b的梯度,其形状为(M,)
    """
    x, w, b = cache
    dx, dw, db = None, None, None
    #############################################################################
    #                                    任务: 实现仿射层反向传播                                                                 #
    #                 注意:你需要将x重塑成(N,D)后才能计算各梯度,                                            #
    #                            求完梯度后你需要将dx的形状与x重塑成一样                                            #
    #############################################################################
    db = np.sum(dout,axis=0)
    xx= x.reshape(x.shape[0],-1)
    dw = np.dot(xx.T,dout)
    dx = np.dot(dout,w.T)
    dx=np.reshape(dx,x.shape)
    #############################################################################
    #                                                         结束编码                                                                            #
    #############################################################################
    return dx, dw, db


def relu_forward(x):
    """
    计算rectified linear units (ReLUs)激活函数的前向传播,并保存相应缓存

    Input:
    - x: 输入数据

    Returns 元组:
    - out: 和输入数据x形状相同
    - cache: x
    """
    out = None
    #############################################################################
    #                         任务: 实现ReLU 的前向传播.                                                                        #
    #                        注意:你只需要1行代码即可完成                                                                    #
    #############################################################################
    out =np.maximum(0,x)
    #############################################################################
    #                                                         结束编码                                                                            #
    #############################################################################
    cache = x
    return out, cache


def relu_backward(dout, cache):
    """
    计算 rectified linear units (ReLUs)激活函数的反向传播.

    Input:
    - dout: 上层误差梯度
    - cache: 输入 x,其形状应该和dout相同

    Returns:
    - dx: x的梯度
    """
    dx, x = None, cache
    #############################################################################
    #                             任务: 实现 ReLU 反向传播.                                                                     #
    #############################################################################
    dx=dout
    dx[x<=0]=0
    #############################################################################
    #                                                        结束编码                                                                             #
    #############################################################################
    return dx

def affine_relu_forward(x, w, b):
    """
     ReLU神经元前向传播

    Inputs:
    - x: 输入到 affine层的数据
    - w, b:    affine层的权重矩阵和偏置向量

    Returns 元组:
    - out:    ReLU的输出结果
    - cache: 前向传播的缓存
    """
    ######################################################################
    #                             任务: 实现 ReLU神经元前向传播.                                             #
    #                注意:你需要调用affine_forward以及relu_forward函数,                #
    #                            并将各自的缓存保存在cache中                                                     #
    ######################################################################
    a, fc_cache = affine_forward(x, w, b)
    out, relu_cache = relu_forward(a)
    cache = (fc_cache, relu_cache)
    ######################################################################
    #                                         结束编码                                                                             #
    ######################################################################
    return out, cache


def affine_relu_backward(dout, cache):
    """
     ReLU神经元的反向传播
     
    Input:
    - dout: 上层误差梯度
    - cache: affine缓存,以及relu缓存

    Returns:
    - dx: 输入数据x的梯度
    - dw: 权重矩阵w的梯度
    - db: 偏置向量b的梯度
    """
    #############################################################################
    #                             任务: 实现 ReLU神经元反向传播.                                                            #
    #############################################################################
    fc_cache, relu_cache = cache
    da = relu_backward(dout, relu_cache)
    dx, dw, db = affine_backward(da, fc_cache)
    #############################################################################
    #                                     结束编码                                                                                             #
    #############################################################################
    return dx, dw, db

def softmax_loss(x, y):
    probs = np.exp(x - np.max(x, axis=1, keepdims=True))
    probs /= np.sum(probs, axis=1, keepdims=True)
    N = x.shape[0]
    loss = -np.sum(np.log(probs[np.arange(N), y])) / N
    dx = probs.copy()
    dx[np.arange(N), y] -= 1
    dx /= N

    return loss, dx

fc_net.py

实现了深层全连接神经网络。

#-*- coding: utf-8 -*-
import sys, os
sys.path.append(os.path.realpath(os.path.dirname(os.path.realpath(__file__))))

import numpy as np
from layers import *
from dropout_layers import *


class FullyConnectedNet(object):
    """
    深层全连接神经网络,其中隐藏层使用ReLU作为激活函数,输出层使用softmax作为分类器.
    该网络结构应该为:{affine - relu- [dropout]}x(L - 1) - affine - softmax
    """

    def __init__(self, input_dim=3*32*32,hidden_dims=[100,100],    num_classes=10,
                             dropout=0, reg=0.0, weight_scale=1e-2, seed=None):
        """
        初始化全连接网络.
        
        Inputs:
        - input_dim: 输入维度
        - hidden_dims:    隐藏层各层维度,如[100,100]
        - num_classes: 分类数量.
        - dropout: 如果dropout = 0 表示不使用dropout.
        - reg:正则化衰减因子.
        - weight_scale:权重范围,给予初始化权重的标准差.
        - seed: 使用seed产生相同的随机数.
        """
        self.use_dropout = dropout > 0
        self.reg = reg
        self.num_layers = 1 + len(hidden_dims)
        self.params = {}

        layers_dims = [input_dim] + hidden_dims + [num_classes]
        # 初始化每一层的参数
        for i in range(self.num_layers):
                self.params['W'+str(i+1)]=weight_scale*np.random.randn(
                        layers_dims[i],layers_dims[i+1])
                self.params['b' + str(i + 1)] = np.zeros(
                        (1, layers_dims[i + 1]))
        
        # 初始化与dropout有关的参数
        self.dropout_param = {}
        if self.use_dropout:
            self.dropout_param = {'mode': 'train', 'p': dropout}
            if seed is not None:
                self.dropout_param['seed'] = seed
        

    def loss(self, X, y=None):
        '''
        计算损失函数,有两种模式Dropout传播与无Dropout传播
        '''
        # 首先判断执行模式
        mode = 'test' if y is None else 'train'
        # 设置执行模式
        if self.dropout_param is not None:
            self.dropout_param['mode'] = mode     
        
        scores = None
        ############################################################################
        #                                    任务:执行全连接网络的前馈过程。                                                #
        #                             计算数据的分类得分,将结果保存在scores中。                                 #
        #            当使用dropout时,你需要使用self.dropout_param进行dropout前馈。            #
        #                     例如 if self.use_dropout: dropout传播     else:正常传播                 #
        ############################################################################
        outs, cache = {}, {}
        outs[0] = X
        num_h = self.num_layers-1
        # 隐藏层
        for i in range(num_h):
            if self.use_dropout:
                outs[i+1], cache[i+1] = affine_relu_dropout_forward(
                    outs[i], self.params['W'+str(i+1)], self.params['b'+str(i+1)],
                    self.dropout_param)
            else:
                outs[i+1], cache[i+1] = affine_relu_forward(
                    outs[i], self.params['W'+str(i+1)], self.params['b'+str(i+1)])
        # 输出层
        scores, cache[num_h+1] = affine_forward(outs[num_h], 
                    self.params['W'+str(num_h+1)], self.params['b'+str(num_h+1)])
        ############################################################################
        #                                                         结束编码                                                                         #
        ############################################################################
        if mode == 'test':
            return scores

        loss, grads = 0.0, {}
        ############################################################################
        #            任务:实现全连接网络的反向传播。                                                                        #
        #                将损失值储存在loss中,梯度值储存在grads字典中                                         #
        #         注意网络需要设置两种模式:有dropout,无dropout                                             #
        #             例如if self.use_dropout: dropout传播,else:正常传播                             #
        ############################################################################
        dout = {}
        loss, dy = softmax_loss(scores, y)
        h = self.num_layers-1
        for i in range(self.num_layers):
            loss += 0.5*self.reg*(np.sum(self.params['W'+str(i+1)]
                    * self.params['W'+str(i+1)]))
        dout[h], grads['W'+str(h+1)], grads['b'+str(h+1)] = affine_backward(dy, 
                    cache[h+1])
        grads['W'+str(h+1)]	+= self.reg * self.params['W'+str(h+1)]
        for i in range(h):
            if self.use_dropout:
                dout[h-1-i], grads['W'+str(h-i)], grads['b'+str(h-i)] =  affine_relu_dropout_backward(dout[h-i], cache[h-i])
            else:
                dout[h-1-i], grads['W'+str(h-i)], grads['b'+str(h-i)] = affine_relu_backward(dout[h-i], cache[h-i])
            grads['W'+str(h-i)]+=self.reg*self.params['W'+str(h-i)]
        ############################################################################
        #                                                         结束编码                                                                         #
        ############################################################################
        return loss, grads

trainer.py

解耦训练器的实现

#-*- coding: utf-8 -*-
import sys, os
sys.path.append(os.path.realpath(os.path.dirname(os.path.realpath(__file__))))

import numpy as np
import updater


class Trainer(object):
    """
    使用形式:
    
    data = {
        'X_train': # 训练数据
        'y_train': # 训练类标
        'X_val': # 验证数据
        'X_train': # 验证类标
    }
    model = MyAwesomeModel(hidden_size=100, reg=10)
    Trainer = Trainer(model, data,
                                    update_rule='sgd',
                                    updater_config={
                                        'learning_rate': 1e-3,
                                    },
                                    lr_decay=0.95,
                                    num_epochs=10, batch_size=100,
                                    print_every=100)
    Trainer.train()
    """

    def __init__(self, model, data, **kwargs):
        """
        初始化训练器各项配置。
        必须参数:
        - model: 神经网络模型,如:DNN,CNN,RNN
        - data: 数据字典,其中:
            'X_train':    形状为(N_train, d_1, ..., d_k)的训练数据
            'X_val':    形状为(N_val, d_1, ..., d_k)的验证数据
            'y_train':    形状为(N_train,)的训练数据类标
            'y_val':    形状为(N_val,)的验证数据类标
            
        可选参数:
        - update_rule: 更新规则,其存放在updater.py文件中,默认选项为'sgd'。
        - updater_config:更新规则所对应的超参数配置,同见updater.py文件。
        - lr_decay: 学习率衰减系数。
        - batch_size: 批量数据大小。
        - num_epochs: 训练周期。
        - print_every: 整数型,迭代训练print_every次模型,打印一次中间结果。
        - verbose: 布尔型; 是否在训练期间打印中间结果
        """
        self.model = model
        self.X_train = data[ 'X_train']
        self.y_train = data[ 'y_train']
        self.X_val = data[ 'X_val']
        self.y_val = data[ 'y_val']
        
        # 弹出可选参数,进行相关配置。
        self.update_rule = kwargs.pop('update_rule', 'sgd')
        self.updater_config = kwargs.pop('updater_config', {})
        # 默认不采用学习衰减
        self.lr_decay = kwargs.pop('lr_decay', 1.0)
        self.batch_size = kwargs.pop('batch_size', 100)
        self.num_epochs = kwargs.pop('num_epochs', 10)

        self.print_every = kwargs.pop('print_every', 10)
        self.verbose = kwargs.pop('verbose', True)

        # 若可选参数错误,抛出异常
        if len(kwargs) > 0:
            # 获取不符合规定的key
            extra = ', '.join('"%s"' % k for k in kwargs.keys())
            # 报错
            raise ValueError('Unrecognized arguments %s' % extra)


        #确认updater中含有更新规则
        if not hasattr(updater, self.update_rule):
            raise ValueError('Invalid update_rule "%s"' % self.update_rule)
        self.update_rule = getattr(updater, self.update_rule)

        # 初始化相关变量
        self.epoch = 0
        # 最佳的验证准确率
        self.best_val_acc = 0
        # 最佳参数
        self.best_params = {}
        # 训练过程中的中间数据
        self.loss_history = []
        self.train_acc_history = []
        self.val_acc_history = []

        # 对updater_config中的参数进行深拷贝
        self.updater_configs = {}
        # 获取各层的权重
        for p in self.model.params:
            d = {k: v for k, v in self.updater_config.items()}
            self.updater_configs[p] = d


    def _step(self):
        """
        执行单步梯度更新
        """
        # 采样批量数据
        num_train = self.X_train.shape[0]
        batch_mask = np.random.choice(num_train, self.batch_size)
        X_batch = self.X_train[batch_mask]
        y_batch = self.y_train[batch_mask]

        # 计算损失及梯度
        loss, grads = self.model.loss(X_batch, y_batch)
        self.loss_history.append(loss)

        # 更新参数
        for p, w in self.model.params.items():
            dw = grads[p]
            config = self.updater_configs[p]
            # 计算出更新后的值
            next_w, next_config = self.update_rule(w, dw, config)
            # 参数更新
            self.model.params[p] = next_w
            self.updater_configs[p] = next_config

    

    def check_accuracy(self, X, y, num_samples=None, batch_size=100):
        """
        根据提供的数据检验精度,若数据集过大,可进行采样测试。
        
        Inputs:
        - X: 形状为(N, d_1, ..., d_k)的数据
        - y: 形状为 (N,)的数据类标
        - num_samples: 采样次数
        - batch_size:批量数据大小
            
        Returns:
        - acc: 测试数据正确率
        """
        
        # 对数据进行采样
        N = X.shape[0]
        # 采样次数为空或者训练数据大于采样数据
        if num_samples is not None and N > num_samples:
            # 随机选择训练数据
            mask = np.random.choice(N, num_samples)
            N = num_samples
            X = X[mask]
            y = y[mask]

        # 计算精度
        num_batches = N / batch_size
        if N % batch_size != 0:
            num_batches += 1
        y_pred = []
        # 预测所有的数据
        for i in range(int(num_batches)):
            start = i * batch_size
            end = (i + 1) * batch_size
            scores = self.model.loss(X[start:end])
            y_pred.append(np.argmax(scores, axis=1))
        # 水平方向进行叠加
        y_pred = np.hstack(y_pred)
        # 计算准确率
        acc = np.mean(y_pred == y)
        return acc


    def train(self):
        """
        根据配置训练模型
        """
        num_train = self.X_train.shape[0]  #训练集大小
        # 每个周期的迭代次数
        iterations_per_epoch = max(num_train / self.batch_size, 1)
        # 总的迭代数
        num_iterations = self.num_epochs * iterations_per_epoch

        for t in range(int(num_iterations)):
            self._step()

            # 打印损失值
            if self.verbose and t % self.print_every == 0:
                print('(迭代 %d / %d) 损失值: %f' % (t + 1, num_iterations, self.loss_history[-1]))

            # 更新学习率
            epoch_end = (t + 1) % iterations_per_epoch == 0
            if epoch_end:
                self.epoch += 1
                # 学习率衰减
                for k in self.updater_configs:
                    self.updater_configs[k]['learning_rate'] *= self.lr_decay


            #在训练的开始,末尾,每一轮训练周期检验精度
            first_it = (t == 0)
            last_it = (t == num_iterations + 1)
            if first_it or last_it or epoch_end:
                train_acc = self.check_accuracy(self.X_train, self.y_train,
                                                                                num_samples=1000)
                val_acc = self.check_accuracy(self.X_val, self.y_val)
                self.train_acc_history.append(train_acc)
                self.val_acc_history.append(val_acc)

                if self.verbose:
                    print('(周期 %d / %d) 训练精度: %f; 验证精度: %f' % (
                                 self.epoch, self.num_epochs, train_acc, val_acc))

                # 记录最佳模型
                if val_acc > self.best_val_acc:
                    self.best_val_acc = val_acc
                    self.best_params = {}
                    for k, v in self.model.params.items():
                        self.best_params[k] = v.copy()

        # 训练结束后返回最佳模型
        self.model.params = self.best_params


updater.py

解耦更新器,主要负责更新神经网络的权重,其传入参数有神经网络的权重 w w w、当前权重的梯度 d w dw dw及相应的更新配置。

#-*- coding: utf-8 -*-
import numpy as np

"""

频繁使用的神经网络一阶梯度更新规则。每次更新接收:当前的网络权重,
训练获得的梯度,以及相关配置进行权重更新。
def update(w, dw, config = None):
Inputs:
  - w:当前权重.
  - dw: 和权重形状相同的梯度.
  - config: 字典型超参数配置,比如学习率,动量值等。如果更新规则需要用到缓存,
    在配置中需要保存相应的缓存。
Returns:
  - next_w: 更新后的权重.
  - config: 更新规则相应的配置.
"""


def sgd(w, dw, config=None):
  """
  随机梯度下降更新规则.

  config :
  - learning_rate: 学习率.
  """
  if config is None: 
      config = {}
  config.setdefault('learning_rate', 1e-2)

  w -= config['learning_rate'] * dw
  return w, config


相关文章

win11任务管理器_win11系统怎么打开任务管理器

win11任务管理器_win11系统怎么打开任务管理器

  升级win11系统之后右击任务栏启动任务管理器的入口被关闭了,任务管理...

JetpackCompose-列表的使用

在Jetpackcompose中使用列表要是用LazyColumn和item @...

发表评论    

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。