RNN(LSTM) による系列ラベル予測 - you1025/my_something_flagments GitHub Wiki

LSTM を用いて品詞の予測タスクを試してみる。
用いるデータは参考書籍の 著者サイト から流用する。

データの準備

書籍用データの解凍を作業したディレクトリを path_to_download_dir とする。

import os
import numpy as np

# データ格納ディレクトリの指定
ROOT = "/path_to_download_dir/pytorch-nlp/Chapter3/LSTM"

dic = np.load(os.path.join(ROOT, "dic.pkl"), allow_pickle=True)
labels = np.load(os.path.join(ROOT, "label.pkl"), allow_pickle=True)

x_train = np.load(os.path.join(ROOT, "xtrain.pkl"), allow_pickle=True)[:100]
x_valid = np.load(os.path.join(ROOT, "xtest.pkl"),  allow_pickle=True)[:100]
y_train = np.load(os.path.join(ROOT, "ytrain.pkl"), allow_pickle=True)[:100]
y_valid = np.load(os.path.join(ROOT, "ytest.pkl"),  allow_pickle=True)[:100]

辞書データ

登録単語をキーに単語 ID が値となっている。
全部で 39,500 アイテムが格納されている。

ID が 1 始まりとなっている事に注意。
0 はパディング用に予約済み(という事だと思われる)。

dic
#{'万能': 1,
# 'で': 2,
# 'は': 3,
#︙
#}

単語と ID を逆にしたものも用意しておく。

id2word = { id:key for key, id in dic.items() }
id2word
#{1: '万能',
# 2: 'で',
# 3: 'は',
#:
#}

# 学習データの最初のサンプルを単語の組に変換
[ id2word[id] for id in x_train[0] ]
#['万能', 'で', 'は', 'ない', 'です', '。']

ラベルデータ

ラベルは普通に 0 始まり。
全部で 16 アイテムが格納されている。

labels
#{'名詞': 0,
# '助詞': 1,
# '形容詞': 2,
#︙
#}

こちらも品詞と ID を逆にしたものを用意しておく。

id2label = { id:key for key, id in labels.items() }
id2label
#{0: '名詞',
# 1: '助詞',
# 2: '形容詞',
# 3: '助動詞',
#︙
#}

# 学習データの最初のサンプルを単語の組に変換
[ id2label[id] for id in y_train[0] ]
#['名詞', '助詞', '助詞', '形容詞', '助動詞', '補助記号']

Dataset の作成

今回のデータはサンプル毎にサイズが異なり、まとめて Tensor に変換する事が出来ないためオリジナル Dataset を作成して 1 サンプルずつ Tensor として返すようにする。

import torch
from torch.utils.data import Dataset

class MyDataset(Dataset):
    def __init__(self, x_data, y_data):
        self.data  = x_data
        self.label = y_data

    def __len__(self):
        return len(self.label)

    def __getitem__(self, index):
        # 1 サンプルずつ Tensor に変換
        # サンプル毎にサイズが異なるのでまとめて変換するとエラー発生
        x = torch.tensor(self.data[index],  dtype=torch.long)
        y = torch.tensor(self.label[index], dtype=torch.long)

        return (x, y)

データセットを作成し最初の 2 件のサンプルを取り出してみる。
2 サンプルともサイズが異なる事が確認できる。

my_dataset = MyDataset(x_train, y_train)

# 最初のサンプル(単語 ID と品詞 ID の対)
my_dataset[0]
#(tensor([1, 2, 3, 4, 5, 6]), tensor([0, 1, 1, 2, 3, 4]))

# 2 番目のサンプル
my_dataset[1]
#(tensor([ 2,  7,  8,  9, 10, 11,  2, 12, 13, 14, 15, 14, 16, 17, 18, 19,  8, 20,
#         21, 19, 22, 14, 23, 10,  2,  8, 24,  5,  6]),
# tensor([1, 1, 4, 0, 1, 0, 1, 0, 5, 1, 5, 1, 5, 3, 0, 1, 4, 2, 0, 3, 5, 1, 5, 1,
#         3, 4, 0, 3, 4]))

サンプル毎にサイズが異なるとまとめて推論処理を行う際に都合が悪いので、後でデータを用いる際にパディング(単語は 0, 品詞は -1 で埋める)処理を行う。

DataLoader の作成

DataLoader のコンストラクタに collate_fn を指定しないとミニバッチ全体で x, y 毎に 1 つの Tensor に変換しようとしてエラー(サンプル毎にサイズが異なる事による)が発生するため、my_collate_fn を用いて x, y 毎に Tensor から成るリストを返す事で解決している。

from torch.utils.data import DataLoader

# ((x1, y1), (x2, y2), …) を ((x1, x2, …), (y1, y2, …)) に変換(左記 x, y は Tensor)
def my_collate_fn(batch):
    x_data, y_data = list(zip(*batch))
    return (x_data, y_data)

# DataLoader の作成
BATCH_SIZE = 2
my_loader = DataLoader(my_dataset, batch_size=BATCH_SIZE, shuffle=True, collate_fn=my_collate_fn)

# ミニバッチを 1 つ取り出す
# 同サイズの単語 ID の組と品詞 ID の組が 2(=BATCH_SIZE) セット出力される
batch_data = next(iter(my_loader))
batch_data
#((tensor([126,  10, 293,   5,  34,   8, 239, 289,  29, 294,  34, 295,  13,   4,
#           81,   7, 296, 193,  44,   6]),
#  tensor([213,  34, 214,  17,  10,   2, 215,  14, 216,  50,  17,   6])),
# (tensor([6, 1, 0, 3, 1, 4, 0, 0, 1, 0, 1, 0, 5, 3, 0, 1, 8, 5, 3, 4]),
#  tensor([0, 1, 2, 3, 1, 3, 5, 1, 5, 3, 3, 4])))

# ミニバッチ内の最初のサンプルを文章に戻してみる
word_ids = batch_data[0][0].numpy()
"".join([ id2word[id] for id in word_ids ])
#'私の実感ですが、企業価値と株価が比例しないこともたびたびあります。'

モデルの定義

from torch.nn import Module, Embedding, LSTM, Linear

class MyLSTM(Module):
    def __init__(self, vocabulary_size, label_size, hidden_state_size, layer_size, bidirectional, dropout):
        super(MyLSTM, self).__init__()
        self.embd = Embedding(vocabulary_size, hidden_state_size, padding_idx=0)

        # num_layers: LSTM レイヤ数
        # bidirectional: 双方向
        # dropout: LSTM レイヤの直後(上)に挟む Dropout 層の適用率
        self.lstm = LSTM(input_size=hidden_state_size, hidden_size=hidden_state_size, batch_first=True, num_layers=layer_size, bidirectional=bidirectional, dropout=dropout)

        # bidirectional によって入力サイズが規定される
        self.ln = Linear(in_features=hidden_state_size * (2 if bidirectional else 1), out_features=label_size)

    def forward(self, x):
        x = self.embd(x)
        x, _ = self.lstm(x)
        y = self.ln(x)
        return y
LAYER_SIZE = 1
BIDIRECTIONAL = False
DROPOUT = 0.5
net = MyLSTM(vocabulary_size=len(dic)+1, label_size=len(labels), hidden_state_size=100, layer_size=LAYER_SIZE, bidirectional=BIDIRECTIONAL, dropout=DROPOUT)
net
#MyLSTM(
#  (embd): Embedding(39501, 100, padding_idx=0)
#  (lstm): LSTM(100, 100, batch_first=True, dropout=0.5)
#  (ln): Linear(in_features=100, out_features=16, bias=True)
#)

# サンプルデータを適用してみる
batch_id = 0
x = next(iter(my_loader))[batch_id][0]
net(x)
#tensor([[ 9.4292e-02, -5.4811e-02,  2.9660e-02, -2.0182e-02, -1.4442e-02,
#         -1.1534e-02,  1.0821e-01, -2.6894e-02, -2.6246e-02,  2.0228e-02,
#          1.0994e-01,  1.1693e-01, -1.7964e-02, -1.4456e-02, -4.3143e-02,
#         -7.2171e-02],
#        [-6.0301e-02, -2.0494e-01,  8.1875e-02,  1.6491e-01,  3.8714e-02,
#         -1.2241e-01,  8.2848e-02, -5.1826e-03,  4.9655e-02, -9.5035e-03,
#          8.3253e-02,  2.5401e-01, -1.4762e-02, -8.7707e-02, -4.8405e-02,
#         -5.4874e-02],
#︙
#)

学習

評価関数の定義

データ全体(≠ミニバッチ)でロスと正答率を算出する。
ロスは全サンプルにおける平均値を算出している。

def calc_evaluations(x_data, y_data, net, criterion, device):
    net.eval()
    with torch.no_grad():
        results = []
        losses  = []
        for x, y in zip(x_data, y_data):
            x = torch.tensor([x], dtype=torch.long).to(device)
            y = torch.tensor(y,   dtype=torch.long).to(device)

            output = net(x)

            # ロスの算出
            loss = criterion(output[0], y)
            losses.append(loss.item())

            # サンプル毎に正解かどうかを判定
            y_pred = output.cpu().numpy()[0].argmax(axis=1)
            y_actual = y.cpu().numpy()
            results.extend(y_actual == y_pred)

    avg_loss = np.mean(losses)
    accuracy = np.mean(results)
    return (avg_loss, accuracy)

ロスの定義

ミニバッチ毎のロスの総和を算出。

def calc_minibatch_loss(outputs, labels, criterion):
    # データ全体のロスの総和を算出
    loss = None
    for output, label in zip(outputs, labels):
        if loss is None:
            loss = criterion(output, label)
        else:
            loss += criterion(output, label)

    return loss

学習の実行

from time import time
from torch.nn import CrossEntropyLoss
from torch.nn.utils.rnn import pad_sequence
from torch.optim import SGD

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

# Dataset
train_dataset = MyDataset(x_train, y_train)
valid_dataset = MyDataset(x_valid, y_valid)

# DataLoader
BATCH_SIZE = 32
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, collate_fn=my_collate_fn)
valid_loader = DataLoader(valid_dataset, batch_size=BATCH_SIZE, shuffle=True, collate_fn=my_collate_fn)

# model 定義
LAYER_SIZE = 1
BIDIRECTIONAL = False
DROPOUT = 0.5
net = MyLSTM(
    vocabulary_size=len(dic)+1,
    label_size=len(labels),
    hidden_state_size=100,
    layer_size=LAYER_SIZE,
    bidirectional=BIDIRECTIONAL,
    dropout=DROPOUT
)
net.to(device)

optimizer = SGD(net.parameters(), lr=0.01)
criterion = CrossEntropyLoss(ignore_index=-1)

num_epochs = 10
for epoch in range(num_epochs):
    # 各 100 イテレーションの実行を計測するための開始時刻
    iter_start_time = time()

    iteration = 0
    avg_losses = []

    net.train()
    for x, y in train_loader:
        # padding でミニバッチ内のサンプル毎のサイズを揃える
        x = pad_sequence(x, batch_first=True)
        y = pad_sequence(y, batch_first=True, padding_value=-1)

        x = x.to(device)
        y = y.to(device)

        outputs = net(x)

        # ミニバッチにおけるロスの総和を算出
        loss = calc_minibatch_loss(outputs, y, criterion)
        # 平均ロスを算出
        avg_loss = loss.item() / outputs.size(dim=0)
        avg_losses.append(avg_loss)

        # 100 イテレーション毎にログを出力
        if (iteration+1) % 100 == 0:
            iter_duration = time() - iter_start_time

            # ミニバッチ内平均ロスの更に 100 イテレーション平均である事に注意
            avg_avg_loss = np.mean(avg_losses)

            print(f"epoch: {epoch+1:2d}, iteration: {iteration+1:5d}, loss: {avg_avg_loss:.3f}, 1000 iter: {iter_duration:.1f} sec.")

            # 平均ロスの一覧および開始時刻をリセット
            avg_losses = []
            iter_start_time = time()

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        iteration += 1

    # ロスの算出
    train_loss, train_accuracy = calc_evaluations(x_train, y_train, net, criterion, device)
    test_loss,  test_accuracy  = calc_evaluations(x_valid, y_valid, net, criterion, device)
    print(f"epoch: {epoch+1:2d}, train_loss: {train_loss:.3f}, train_accuracy: {train_accuracy:.3f}, test_loss: {test_loss:.3f}, test_accuracy: {test_accuracy:.3f}")

参考