Transformer Code Breakdown block by block - beyondnlp/nlp GitHub Wiki

source : https://www.k-a.in/transformers.html

  • embedding class
import torch
import torch.nn as nn
import math
class InputEmbeddings(nn.Module):
    # μƒμ„±μžλŠ” λͺ¨λΈ 차원과 μ–΄νœ˜ 크기λ₯Ό λ§€κ°œλ³€μˆ˜λ‘œ λ°›μŠ΅λ‹ˆλ‹€.
    def __init__(self, d_model:int, vocab_size:int):
        # λͺ¨λΈμ˜ dimκ³Ό vocab 크기λ₯Ό 클래슀 μ†μ„±μœΌλ‘œ μ €μž₯ν•©λ‹ˆλ‹€.
        self.d_model = d_model
        self.vocab_size = vocab_size
        # 토큰 인덱슀λ₯Ό d_model 차원 벑터에 λ§€ν•‘ν•˜λŠ” μž„λ² λ”© 계측을 μƒμ„±ν•©λ‹ˆλ‹€.
        self.embedding = nn.Embedding(vocab_size,d_model)

    # μž…λ ₯ xλ₯Ό μ²˜λ¦¬ν•˜λŠ” μ •λ°©ν–₯ 패슀 방식.
    def forward(self,x):
        # μž„λ² λ”©μ„ μ μš©ν•˜κ³ κ³±ν•©λ‹ˆλ‹€. 논문에 기술된 λŒ€λ‘œ 뢄산을 μ•ˆμ •μ μœΌλ‘œ μœ μ§€ν•©λ‹ˆλ‹€.
        return self.embedding(x) * math.sqrt(self.d_model)
  • positional encoding class
# μž„λ² λ”©μ— μœ„μΉ˜ 정보λ₯Ό μΆ”κ°€ν•˜κΈ° μœ„ν•œ ν΄λž˜μŠ€μž…λ‹ˆλ‹€.
class PositionalEncoding(nn.Module):
    # μƒμ„±μžλŠ” λͺ¨λΈ 차원, μ΅œλŒ€ μ‹œν€€μŠ€ 길이, λ“œλ‘­μ•„μ›ƒ λΉ„μœ¨μ„ λ§€κ°œλ³€μˆ˜λ‘œ λ°›μŠ΅λ‹ˆλ‹€.
    def __init__(self, d_model:int, seq_len:int, dropout:float) -> None:
        # 차원을 클래슀 μ†μ„±μœΌλ‘œ μ €μž₯ν•©λ‹ˆλ‹€.
        self.d_model = d_model
        self.seq_len = seq_len
        # λ“œλ‘­μ•„μ›ƒ λ ˆμ΄μ–΄λ₯Ό μƒμ„±ν•©λ‹ˆλ‹€
        self.droput = nn.Dropout(dropout)
        # μœ„μΉ˜ 인코딩을 μœ„ν•œ ν…μ„œλ₯Ό 0으둜 μ΄ˆκΈ°ν™”ν•©λ‹ˆλ‹€.
        pe = torch.zeros(seq_len,d_model)
        position = torch.arange(0,seq_len,dtype=torch.float).unsqueeze(1)
        # 0λΆ€ν„° seq_len-1κΉŒμ§€μ˜ μœ„μΉ˜ 인덱슀λ₯Ό κ°–λŠ” μ—΄ 벑터λ₯Ό μƒμ„±ν•©λ‹ˆλ‹€.
        div_term = torch.exp(torch.arange(0,d_model,2).float()*(-math.log(10000.0)/d_model))
        # λ…Όλ¬Έ 곡식에 따라 사인 ν•¨μˆ˜μ— λŒ€ν•œ λ‚˜λˆ—μ…ˆ 항을 μƒμ„±ν•©λ‹ˆλ‹€.
        pe[:,0::2] = torch.sin(position*div_term)
        pe[:,1::2] = torch.cos(position*div_term)
        # μœ„μΉ˜ μΈμ½”λ”©μ˜ ν™€μˆ˜ μΈλ±μŠ€μ— 코사인을 μ μš©ν•©λ‹ˆλ‹€.
        pe = pe.unsqueeze(0)
        # μœ„μΉ˜ 인코딩 ν…μ„œλ₯Ό 버퍼(λ§€κ°œλ³€μˆ˜κ°€ μ•„λ‹Œ 영ꡬ μƒνƒœ)둜 λ“±λ‘ν•©λ‹ˆλ‹€.
        self.register_buffer('pe',pe)
    # μž…λ ₯ x에 λŒ€ν•œ 순방ν–₯ 패슀 방법.
    def forward(self,x):
        # μž…λ ₯에 μœ„μΉ˜ 인코딩을 μΆ”κ°€ν•˜κ³ , μž…λ ₯ μ‹œν€€μŠ€ 길이에 맞게 νŠΈλ¦¬λ°ν•˜λ©°, κ·Έλž˜λ””μ–ΈνŠΈλŠ” λΉ„ν™œμ„±ν™”λ©λ‹ˆλ‹€.
        x = x + (self.pe[:,:x.shape[1],:]).requires_grad_(False)
        # μž…λ ₯κ³Ό μœ„μΉ˜ μΈμ½”λ”©μ˜ 합계에 λ“œλ‘­μ•„μ›ƒμ„ μ μš©ν•©λ‹ˆλ‹€.
        return self.dropput(x)
  • multi head attention block class
# λ©€ν‹°ν—€λ“œ μ–΄ν…μ…˜ λ©”μ»€λ‹ˆμ¦˜μ„ κ΅¬ν˜„ν•©λ‹ˆλ‹€.
class MultiHeadAttentionBlock(nn.Module):
    # λͺ¨λΈ 차원, ν—€λ“œ 수, 쀑도 νƒˆλ½λ₯ μ„ κ°–μΆ˜ μƒμ„±μžμž…λ‹ˆλ‹€.
    def __init__(self,d_model:int,h:int,dropout:float)->None:
        # 차원을 μ†μ„±μœΌλ‘œ μ €μž₯ν•©λ‹ˆλ‹€.
        self.d_model = d_model
        self.h = h
        # λͺ¨λΈμ˜ 차원이 ν—€λ“œ 개수둜 λ‚˜λˆ„μ–΄ λ–¨μ–΄μ§€λŠ”μ§€ ν™•μΈν•©λ‹ˆλ‹€.
        assert d_model % h == 0, "d_model is not divisible by h"
        self.d_k = d_model // h
        # 편ν–₯ μ—†λŠ” 쿼리 벑터에 λŒ€ν•œ μ„ ν˜• 투영.
        self.w_q = nn.Linear(d_model,d_model,bias=False)
        self.w_k = nn.Linear(d_model,d_model,bias=False)
        self.w_v = nn.Linear(d_model,d_model,bias=False)
        self.w_o = nn.Linear(d_model,d_model,bias=False)

        self.dropout = nn.Dropout(dropout)

    # ν™•μž₯된 점곱 μ–΄ν…μ…˜μ„ κ΅¬ν˜„ν•˜λŠ” 정적 λ©”μ„œλ“œμž…λ‹ˆλ‹€.
    @staticmethod
    def attention(query,key,value,mask,dropout:nn.Dropout):
        # ν‚€/쿼리의 차원을 κ°€μ Έμ˜΅λ‹ˆλ‹€.
        d_k = query.shape[-1]
        # ν–‰λ ¬ κ³±μ…ˆκ³Ό μŠ€μΌ€μΌλ§μ„ μ‚¬μš©ν•˜μ—¬ 주의 점수λ₯Ό κ³„μ‚°ν•©λ‹ˆλ‹€.
        attention_scores = (query @ key.transpose(-2,-1))/math.sqrt(d_k)
        # 마슀크된 μœ„μΉ˜λ₯Ό 음의 λ¬΄ν•œλŒ€(맀우 κ·Έλ ‡μ§€λ§Œ μ‹€μ œλ‘œλŠ” μ•„λ‹˜) κ°’μœΌλ‘œ μ„€μ •ν•˜μ—¬ 마슀크λ₯Ό μ μš©ν•©λ‹ˆλ‹€.
        if mask is not None:    attention_scores.masked_fill_(mask==0,-1e9)
        attention_scores = attention_scores.softmax(dim=-1)
        # attention score에 λ“œλ‘­μ•„μ›ƒμ„ μ μš©ν•©λ‹ˆλ‹€.
        if dropout is not None:    attention_scores = dropout(attention_scores) 
        return (attention_scores @ value), attention_scores

    def forward(self,q,k,v,mask):
        query = self.w_q(q)
        key = self.w_k(k)
        value = self.w_v(v)

        # λ©€ν‹°ν—€λ“œ 처리λ₯Ό μœ„ν•΄ ν…μ„œλ₯Ό μž¬κ΅¬μ„±ν•˜κ³  μ „μΉ˜ν•©λ‹ˆλ‹€.
        query = query.view(query.shape[0],query.shape[1],self.h,self.d_k).transpose(1,2)
        key = key.view(key.shape[0],key.shape[1],self.h,self.d_k).transpose(1,2)
        value = value.view(value.shape[0],value.shape[1],self.h,self.d_k).transpose(1,2)
 
        # attention을 κ³„μ‚°ν•˜κ³  점수λ₯Ό μ €μž₯ν•©λ‹ˆλ‹€.
        x,self.attention_scores = MultiHeadAttentionBlock.attention(query,key,value,mask,self.dropout)


        # 좜λ ₯물을 μ›λž˜ 크기둜 λ‹€μ‹œ μ‘°μ •ν•©λ‹ˆλ‹€.
        x = x.transpose(1,2).contiguous().view(x.shape[0],-1,self.h*self.d_k)

        return self.w_o(x)
  • layer norm class

λ„€νŠΈμ›Œν¬ ν™œμ„±ν™”λ₯Ό μ•ˆμ •ν™”ν•˜κΈ° μœ„ν•΄ λ ˆμ΄μ–΄ μ •κ·œν™”λ₯Ό κ΅¬ν˜„ν•©λ‹ˆλ‹€.

class LayerNormalization(nn.Module): # 0으둜 λ‚˜λˆ„λŠ” 것을 λ°©μ§€ν•˜κΈ° μœ„ν•΄ κΈ°λŠ₯ κ°œμˆ˜μ™€ μž‘μ€ 엑싀둠을 μ‚¬μš©ν•œ μƒμ„±μžμž…λ‹ˆλ‹€. def init(self,features: int,eps:float=10**-6) -> None:

    self.eps = eps
    # ν•™μŠ΅ κ°€λŠ₯ν•œ μŠ€μΌ€μΌλ§ λ§€κ°œλ³€μˆ˜λŠ” 1둜 μ΄ˆκΈ°ν™”λ©λ‹ˆλ‹€.
    self.alpha = nn.Parameter(torch.ones(features))
    # ν•™μŠ΅ κ°€λŠ₯ν•œ 편ν–₯ λ§€κ°œλ³€μˆ˜λŠ” 0으둜 μ΄ˆκΈ°ν™”λ©λ‹ˆλ‹€.
    self.bias = nn.Parameter(torch.zeros(features))
# μž…λ ₯ x에 λŒ€ν•œ 순방ν–₯ 패슀 방법.
def forward(self,x):
    # νŠΉμ„± 차원 전체에 걸쳐 평균을 κ³„μ‚°ν•©λ‹ˆλ‹€.
    mean = x.mean(dim = -1, keepdim = True)
     # νŠΉμ„± 차원 전체에 걸쳐 ν‘œμ€€ 편차λ₯Ό κ³„μ‚°ν•©λ‹ˆλ‹€.
    std = x.std(dim = -1, keepdim = True)
    # μž…λ ₯을 μ •κ·œν™”ν•˜κ³ , 크기 μ‘°μ • 및 편ν–₯을 μ μš©ν•©λ‹ˆλ‹€.
    return self.alpha * (x-mean)/(std+self.eps) + self.bias

* feedforward block
class FeedForwardBlock(nn.Module):
    def __init__(self,d_model:int,d_ff:int,dropout:float) -> None:

        # d_modelμ—μ„œ d_ff μ°¨μ›μœΌλ‘œμ˜ 첫 번째 μ„ ν˜• λ³€ν™˜μž…λ‹ˆλ‹€.
        self.linear_1 = nn.Linear(d_model,d_ff)
        # μ •κ·œν™”λ₯Ό μœ„ν•œ λ“œλ‘­μ•„μ›ƒ λ ˆμ΄μ–΄.
        self.dropout = nn.Dropout(dropout)
        # d_ffμ—μ„œ d_model μ°¨μ›μœΌλ‘œ λ‹€μ‹œ λŒμ•„κ°€λŠ” 두 번째 μ„ ν˜• λ³€ν™˜μž…λ‹ˆλ‹€.
        self.linear_2 = nn.Linear(d_ff,d_model)

    def forward(self,x):
        # 첫 번째 μ„ ν˜• λ³€ν™˜, ReLU ν™œμ„±ν™”, λ“œλ‘­μ•„μ›ƒ, 두 번째 μ„ ν˜• λ³€ν™˜μ„ μ μš©ν•©λ‹ˆλ‹€.
        return self.linear_2(self.dropout(torch.relu(self.linear_1(x))))
  • Residual connection class class ResidualConnection(nn.Module): def init(self,features: int,dropout:float) -> None:

      self.dropout = nn.Dropout(dropout)
      self.norm = LayerNormalization(features)
    

    def forward(self,x,sublayer): # λ ˆμ΄μ–΄ ν‘œμ€€, ν•˜μœ„ λ ˆμ΄μ–΄, λ“œλ‘­μ•„μ›ƒ, μž”μ—¬ 연결을 μ μš©ν•©λ‹ˆλ‹€. return x + self.dropout(sublayer(self.norm(x)))



* encoder block
class EncoderBlock(nn.Module):
    def __init__(self, features: int, self_attention_block: MultiHeadAttentionBlock, feed_forward_block: FeedForwardBlock, dropout: float) -> None:

        self.self_attention_block = self_attention_block
        self.feed_forward_block = feed_forward_block
        self.residual_connections = nn.ModuleList([ResidualConnection(features,dropout) for _ in range(2)])

    def forward(self,x,src_mask):
        x = self.residual_connections[0](x,lambda x: self.self_attention_block(x,x,x,src_mask))
        x = self.residual_connections[1](x, self.feed_forward_block)
        return x