1.2.6 实战:日期转换

Transformer模型的完整架构如图1-27所示。从功能角度来看,编码器的核心作用是从输入序列中提取特征,解码器的核心作用则是处理生成任务。从结构上看,编码器=嵌入层+位置编码+N×[(多头自注意力+残差连接+标准化)+(前馈神经网络+残差连接+标准化)],解码器=嵌入层+位置编码+N×[(带掩码的多头自注意力+残差连接+标准化)+(多头注意力+残差连接+标准化)+(前馈神经网络+残差连接+标准化)]。本节我们将通过Transformer模型再实现一次日期转换的功能。

图1-27 Transformer模型的完整架构

1.组件定义

首先定义多头注意力层。

python

class MultiHeadAttention(nn.Module):

  def __init__(self, n_head, model_dim, drop_rate):

    super().__init__()

    # 每个注意力头的维度

    self.head_dim=model_dim//n_head

    # 注意力头的数量

    self.n_head=n_head

    # 模型的维度

    self.model_dim=model_dim

    # 初始化线性变换层,用于生成query、key和value

    self.wq=nn.Linear(model_dim, n_head * self.head_dim)

    self.wk=nn.Linear(model_dim, n_head * self.head_dim)

    self.wv=nn.Linear(model_dim, n_head * self.head_dim)

    # 输出的全连接层

    self.output_dense=nn.Linear(model_dim, model_dim)

    # Dropout层,用于防止模型过拟合

    self.output_drop=nn.Dropout(drop_rate)

    # 层标准化,用于稳定神经网络的训练

    self.layer_norm=nn.LayerNorm(model_dim)

    self.attention=None


  def forward(self, q, k, v, mask):

    # 保存原始输入q,用于后续的残差连接

    residual=q

    # 分别对输入的q、k、v做线性变换,生成query、key和value

    query=self.wq(q)

    key=self.wk(k)

    value=self.wv(v)

    # 对生成的query、key和value进行头分割,以便进行多头注意力计算

    query=self.split_heads(query)

    key=self.split_heads(key)

    value=self.split_heads(value)

    # 计算上下文向量

    context=self.scaled_dot_product_attention(query, key, value, mask)

    # 对上下文向量进行线性变换

    output=self.output_dense(context)

    # 添加dropout

    output=self.output_drop(output)

    # 添加残差连接并进行层标准化

    output=self.layer_norm(residual+output)

    return output


  def split_heads(self, x):

    # 将输入x的形状(shape)变为(n, step, n_head, head_dim),然后重排,得到(n, n_head, step, head_dim)

    x=th.reshape(x, (x.shape[0], x.shape[1], self.n_head, self.head_dim))

    return x.permute(0, 2, 1, 3)


  def scaled_dot_product_attention(self, q, k, v, mask=None):

    # 计算缩放因子

    dk=th.tensor(k.shape[-1]).type(th.float)

    # 计算注意力分数

    score=th.matmul(q, k.permute(0, 1, 3, 2))/(th.sqrt(dk)+1e-8)

    if mask is not None:

      # 如果提供了mask,则将mask位置的分数设置为负无穷,使得这些位置的softmax值接近0

      score=score.masked_fill_(mask,-np.inf)

    # 应用softmax函数计算得到注意力权重

    self.attention=softmax(score,dim=-1)

    # 计算上下文向量

    context=th.matmul(self.attention,v)

    # 重排上下文向量的维度并进行维度合并

    context=context.permute(0, 2, 1, 3)

    context=context.reshape((context.shape[0], context.shape[1],-1))

     return context

重点介绍一下scaled_dot_product_attention函数,此函数实现了“缩放点积注意力机制”的注意力计算过程。这是Transformer模型中的核心部分。以下是函数执行的主要步骤。

1)计算缩放因子。函数计算了缩放因子dk,它等于k(对应key)的最后一个维度。这个缩放因子用于在计算注意力分数时,缓解可能因维度较高而导致的点积梯度消失或梯度爆炸问题。

2)计算注意力分数。函数计算了注意力分数score。注意力分数是通过对q(对应query)和k(对应key)进行点积运算并除以缩放因子dk的平方根来计算的。

3)应用mask。如果提供了mask,那么函数将在计算softmax值之前将mask位置的分数设置为负无穷。这将使得这些位置的softmax值接近0,也就是说,模型不会关注这些位置。

4)计算注意力权重。函数通过对score应用softmax函数,计算得到注意力权重self.attention。

5)计算上下文向量。使用注意力权重和v(对应value)进行矩阵乘法,计算出上下文向量context。

6)重排和合并维度。函数通过重排和合并维度,得到了最终的上下文向量。这个上下文向量将被用作多头注意力机制的输出。


注意:这个过程被多次应用于多头注意力机制中,每个头都会有自己的query、key和value,它们通过不同的线性变换得到,然后用于计算各自的注意力权重和上下文向量。


然后,我们定义注意力计算后的前馈神经网络,它在每个Transformer模型的编码器和解码器的层中使用,并且独立地应用于每个位置的输入。这个网络包含两个线性变换层,其中间隔一个ReLU激活函数,并且在输出之前使用了dropout和层标准化。

python

class PositionWiseFFN(nn.Module):

  def __init__(self, model_dim, dropout=0.0):

    super().__init__()

    # 前馈神经网络的隐藏层维度,设为模型维度的4倍

    ffn_dim=model_dim * 4

    # 第一个线性变换层,其输出维度为前馈神经网络的隐藏层维度

    self.linear1=nn.Linear(model_dim, ffn_dim)

    # 第二个线性变换层,其输出维度为模型的维度

    self.linear2=nn.Linear(ffn_dim, model_dim)

    # Dropout层,用于防止模型过拟合

    self.dropout=nn.Dropout(dropout)

    # 层标准化,用于稳定神经网络的训练

    self.layer_norm=nn.LayerNorm(model_dim)


  def forward(self, x):

    # 对输入x进行前馈神经网络的计算

    # 首先,通过第一个线性变换层并使用ReLU作为激活函数

    output=relu(self.linear1(x))

    # 然后,通过第二个线性变换层

    output=self.linear2(output)

    # 接着,对上述输出进行dropout操作

    output=self.dropout(output)

    # 最后,对输入x和前馈神经网络的输出做残差连接,然后进行层标准化

    output=self.layer_norm(x+output)

     return output # 返回结果,其形状为[n, step, dim]

2.实现编码器与解码器

之后我们定义Transformer模型的编码器。

python

class EncoderLayer(nn.Module):

  def __init__(self, n_head, emb_dim, drop_rate):

    super().__init__()

    # 多头注意力机制层

    self.mha=MultiHeadAttention(n_head, emb_dim, drop_rate)

    # 前馈神经网络层

    self.ffn=PositionWiseFFN(emb_dim, drop_rate)


  def forward(self, xz, mask):

    # xz的形状为 [n, step, emb_dim]

    # 通过多头注意力机制层处理xz,得到context,其形状也为 [n, step, emb_dim]

    context=self.mha(xz, xz, xz, mask)

    # 将context传入前馈神经网络层,得到输出

    output=self.ffn(context)

    return output


class Encoder(nn.Module):

  def __init__(self, n_head, emb_dim, drop_rate, n_layer):

    super().__init__()

    # 定义n_layer个EncoderLayer,保存在ModuleList中

    self.encoder_layers=nn.ModuleList(

      [EncoderLayer(n_head, emb_dim, drop_rate) for _ in range(n_layer)]

    )


  def forward(self, xz, mask):

    # 依次通过所有的EncoderLayer

    for encoder in self.encoder_layers:

      xz=encoder(xz, mask)

       return xz # 返回的xz形状为 [n, step, emb_dim]

再定义Transformer模型的解码器。

python

class DecoderLayer(nn.Module):

  def __init__(self, n_head, model_dim, drop_rate):

    super().__init__()

    # 定义两个多头注意力机制层

    self.mha=nn.ModuleList([MultiHeadAttention(n_head, model_dim, drop_rate) for _ in range(2)])

    # 定义一个前馈神经网络层

    self.ffn=PositionWiseFFN(model_dim, drop_rate)


  def forward(self, yz, xz, yz_look_ahead_mask, xz_pad_mask):

    # 执行第一个注意力层的计算,3个输入均为yz,使用自注意力机制

    dec_output=self.mha[0](yz, yz, yz, yz_look_ahead_mask) # [n, step, model_dim]

    # 执行第二个注意力层的计算,其中Q来自前一个注意力层的输出,K和V来自编码器的输出

    dec_output=self.mha[1](dec_output, xz, xz, xz_pad_mask) # [n, step, model_dim]

    # 通过前馈神经网络层

    dec_output=self.ffn(dec_output)  # [n, step, model_dim]

    return dec_output


class Decoder(nn.Module):

  def __init__(self, n_head, model_dim, drop_rate, n_layer):

    super().__init__()

    # 定义n_layer个DecoderLayer,保存在ModuleList中

    self.num_layers=n_layer

    self.decoder_layers=nn.ModuleList(

      [DecoderLayer(n_head, model_dim, drop_rate) for _ in range(n_layer)]

    )


  def forward(self, yz, xz, yz_look_ahead_mask, xz_pad_mask):

    # 依次通过所有的DecoderLayer

    for decoder in self.decoder_layers:

      yz=decoder(yz, xz, yz_look_ahead_mask, xz_pad_mask)

  return yz # 返回的yz形状为 [n, step, model_dim]

重点介绍一下解码器的两个注意力层。在解码器的前向传播过程中,输入的yz首先会传入第一个多头注意力机制层中。这个注意力层是一个自注意力机制层,也就是说其query、key和value都来自yz。并且,这个注意力层使用了一个look ahead mask方法,使得在计算注意力分数时,每个位置只能关注它之前的位置,而不能关注它之后的位置。这是因为在预测时,模型只能看到已经预测出的词,不能看到还未预测出的词。这个注意力层的输出dec_output也是一个序列,其形状为[n, step, model_dim]。

然后,dec_output和编码器的输出xz一起传入第二个多头注意力机制层中。这个注意力层的query来自dec_output,而key和value来自xz。并且,这个注意力层使用了一个padding mask方法,使得在计算注意力分数时,模型不会关注xz中的padding位置。这个注意力层的输出dec_output同样是一个序列,其形状为[n, step, model_dim]。

在解码器中,每个位置的输出不仅取决于当前位置的输入,还取决于前面位置的输入和编码器所有位置的输出。这意味着,解码器可以捕捉到输入和输出之间的复杂依赖关系。

3.组装Transformer

在处理输入时,还需要定义位置编码层,用于处理序列数据。这个层的作用是将序列中每个位置的词编码为一个固定大小的向量,这个向量包含了词的信息和它在序列中的位置信息。

python

class PositionEmbedding(nn.Module):

  def __init__(self, max_len, emb_dim, n_vocab):

    super().__init__()

    # 生成位置编码矩阵

    pos=np.expand_dims(np.arange(max_len), 1) # [max_len, 1]

    # 使用正弦和余弦函数生成位置编码

    pe=pos/np.power(1000, 2*np.expand_dims(np.arange(emb_dim)//2, 0)/emb_dim)

    pe[:, 0::2]=np.sin(pe[:, 0::2])

    pe[:, 1::2]=np.cos(pe[:, 1::2])

    pe=np.expand_dims(pe, 0) # [1, max_len, emb_dim]

    self.pe=th.from_numpy(pe).type(th.float32)


    # 定义词嵌入层

    self.embeddings=nn.Embedding(n_vocab, emb_dim)

    # 初始化词嵌入层的权重

    self.embeddings.weight.data.normal_(0, 0.1)


  def forward(self, x):

    # 确保位置编码在与词嵌入权重相同的设备上

    device=self.embeddings.weight.device

    self.pe=self.pe.to(device)

    # 计算输入的词嵌入权重,并加上位置编码

    x_embed=self.embeddings(x)+self.pe # [n, step, emb_dim]

    return x_embed # [n, step, emb_dim]

最后,我们将其组装成Transformer。

python

class Transformer(nn.Module):

  def __init__(self, n_vocab, max_len, n_layer=6, emb_dim=512, n_head=8, drop_rate=0.1, padding_idx=0):

    super().__init__()

    # 初始化最大长度、填充索引、词汇表大小

    self.max_len=max_len

    self.padding_idx=th.tensor(padding_idx)

    self.dec_v_emb=n_vocab

    # 初始化位置嵌入、编码器、解码器和输出层

    self.embed=PositionEmbedding(max_len, emb_dim, n_vocab)

    self.encoder=Encoder(n_head, emb_dim, drop_rate, n_layer)

    self.decoder=Decoder(n_head, emb_dim, drop_rate, n_layer)

    self.output=nn.Linear(emb_dim, n_vocab)

    # 初始化优化器

    self.opt=th.optim.Adam(self.parameters(), lr=0.002)


  def forward(self, x, y):

    # 对输入和目标进行嵌入

    x_embed, y_embed=self.embed(x), self.embed(y)

    # 创建填充掩码

    pad_mask=self._pad_mask(x)

    # 对输入进行编码

    encoded_z=self.encoder(x_embed, pad_mask)

    # 创建前瞻掩码

    yz_look_ahead_mask=self._look_ahead_mask(y)

    # 将编码后的输入和前瞻掩码传入解码器

    decoded_z=self.decoder(

      y_embed, encoded_z, yz_look_ahead_mask, pad_mask)

    # 通过输出层得到最终输出

    output=self.output(decoded_z)

    return output


  def step(self, x, y):

    # 清空梯度

    self.opt.zero_grad()

    # 计算输出和损失

    logits=self(x, y[:, :-1])

    loss=cross_entropy(logits.reshape(-1, self.dec_v_emb), y[:, 1:].reshape(-1))

    # 进行反向传播

    loss.backward()

    # 更新参数

    self.opt.step()

    return loss.cpu().data.numpy(), logits


  def _pad_bool(self, seqs):

    # 创建掩码,标记哪些位置是填充的

    return th.eq(seqs, self.padding_idx)


  def _pad_mask(self, seqs):

    # 将填充掩码扩展到合适的维度

    len_q=seqs.size(1)

    mask=self._pad_bool(seqs).unsqueeze(1).expand(-1, len_q,-1)

    return mask.unsqueeze(1)


  def _look_ahead_mask(self, seqs):

    # 创建前瞻掩码,防止在生成序列时看到未来位置的信息

    device=next(self.parameters()).device

    _, seq_len=seqs.shape

    mask=th.triu(th.ones((seq_len, seq_len), dtype=th.long),

           diagonal=1).to(device)

      mask=th.where(self._pad_bool(seqs)[:, None, None, :], 1, mask[None, None, :, :]).to(device)

       return mask>0

在处理序列数据(如文本或时间序列数据)时,通常会遇到一个问题,即序列的长度不一致。在大多数深度学习框架中,我们需要将一个批量的数据整理成相同的形状才能进行计算。因此,我们需要一种方法来处理长度不一的序列,这就是“填充”(padding)的用处。通过填充,我们可以将不同长度的序列转变为相同长度,具体来说,我们会找到批量中最长的序列,然后将其他较短的序列通过添加特殊的“填充值”(如0或特殊的标记)来扩展到相同的长度。

填充之后,我们就可以将序列数据整理成相同的形状,这样就可以用来训练模型了。然而,填充值是没有实际意义的,我们不希望它们对模型的训练造成影响。因此,我们通常会创建一个掩码(mask),用来告诉模型哪些位置是填充值,也就是Transformer模型定义中的_pad_mask和_look_ahead_mask函数。它们会返回一个布尔值矩阵,标记输入中哪些位置是填充值。

python

def pad_zero(seqs, max_len):

  # 初始化一个全是填充标识符PAD_token的二维矩阵,大小为(len(seqs), max_len)

  padded=np.full((len(seqs), max_len), fill_value=PAD_token, dtype=np.int32)

  for i, seq in enumerate(seqs):

    # 将seqs中的每个seq序列的元素填入padded对应的行中,未填满的部分仍为PAD_token

    padded[i, :len(seq)]=seq

return padded

4.训练与评估

接下来就可以开始训练了。

python

# 初始化一个Transformer模型,设置词汇表大小、最大序列长度、层数、嵌入维度、多头注意力的头数、dropout比率和填充标记的索引

model=Transformer(n_vocab=dataset.num_word, max_len=MAX_LENGTH, n_layer=3, emb_dim=32, n_head=8, drop_rate=0.1, padding_idx=0)

# 检测是否有可用的GPU,如果有,则使用GPU进行计算;如果没有,则使用CPU

device=th.device("cuda" if th.cuda.is_available() else "cpu")

# 将模型移动到相应的设备(CPU或GPU)

model=model.to(device)

# 创建一个数据集,包含1000个样本

dataset=DateDataset(1000)

# 创建一个数据加载器,设定批量大小为32,每个批量的数据会被打乱

dataloader=DataLoader(dataset, batch_size=32, shuffle=True)

# 执行10个训练周期

for i in range(10):

  # 对于数据加载器中的每批数据,对输入和目标张量进行零填充,使其长度达到最大,然后将其转换为PyTorch张量,并移动到相应的设备(CPU或GPU)

  for input_tensor, target_tensor, _ in dataloader:

    input_tensor=th.from_numpy(

      pad_zero(input_tensor, max_len=MAX_LENGTH)).long().to(device)

    target_tensor=th.from_numpy(

      pad_zero(target_tensor, MAX_LENGTH+1)).long().to(device)

    # 使用模型的step方法进行一步训练,并获取损失值

    loss, _=model.step(input_tensor, target_tensor)

  # 打印每个训练周期后的损失值

  print(f"epoch: {i+1}, \tloss: {loss}")

类似于Seq2Seq结构模型的日期转换,我们可以定义一个评估方法,查看Transformer模型能否正确地进行日期转换。

python

def evaluate(model, x, y):

  model.eval()

  x=th.from_numpy(pad_zero([x], max_len=MAX_LENGTH)).long().to(device)

  y=th.from_numpy(pad_zero([y], max_len=MAX_LENGTH)).long().to(device)

  decoder_outputs=model(x, y)

  _, topi=decoder_outputs.topk(1)

  decoded_ids=topi.squeeze()

  decoded_words=[]

  for idx in decoded_ids:

    decoded_words.append(dataset.index2word[idx.item()])

  return ''.join(decoded_words)

最终模型的输出如图1-28所示。

图1-28 Transformer模型的日期转换输出示例