gitignore + first sources

This commit is contained in:
i.ortega 2020-05-04 18:27:12 +02:00
parent 5224f846a8
commit 659f3ff853
4 changed files with 416 additions and 0 deletions

11
.gitignore vendored Normal file
View File

@ -0,0 +1,11 @@
.DS_Store
.idea
*.log
tmp/
*.py[cod]
*.egg
build
htmlcov
*/__pycache__/

184
src/model.py Normal file
View File

@ -0,0 +1,184 @@
import random
from typing import Tuple
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch import Tensor
import torch
#This file is based on the code developed by Sean Robertson. Go to https://github.com/spro/practical-pytorch for more information.
class Encoder(nn.Module):
def __init__(self,
input_dim: int,
emb_dim: int,
enc_hid_dim: int,
dec_hid_dim: int,
dropout: float):
super().__init__()
self.input_dim = input_dim
self.emb_dim = emb_dim
self.enc_hid_dim = enc_hid_dim
self.dec_hid_dim = dec_hid_dim
self.dropout = dropout
self.embedding = nn.Embedding(input_dim, emb_dim)
self.rnn = nn.GRU(emb_dim, enc_hid_dim, bidirectional=True)
self.fc = nn.Linear(enc_hid_dim * 2, dec_hid_dim)
self.dropout = nn.Dropout(dropout)
def forward(self,
src: Tensor) -> Tuple[Tensor]:
embedded = self.dropout(self.embedding(src))
outputs, hidden = self.rnn(embedded)
hidden = torch.tanh(self.fc(torch.cat((hidden[-2,:,:], hidden[-1,:,:]), dim = 1)))
return outputs, hidden
class Attention(nn.Module):
def __init__(self,
enc_hid_dim: int,
dec_hid_dim: int,
attn_dim: int):
super().__init__()
self.enc_hid_dim = enc_hid_dim
self.dec_hid_dim = dec_hid_dim
self.attn_in = (enc_hid_dim * 2) + dec_hid_dim
self.attn = nn.Linear(self.attn_in, attn_dim)
def forward(self,
decoder_hidden: Tensor,
encoder_outputs: Tensor) -> Tensor:
src_len = encoder_outputs.shape[0]
repeated_decoder_hidden = decoder_hidden.unsqueeze(1).repeat(1, src_len, 1)
encoder_outputs = encoder_outputs.permute(1, 0, 2)
energy = torch.tanh(self.attn(torch.cat((
repeated_decoder_hidden,
encoder_outputs),
dim = 2)))
attention = torch.sum(energy, dim=2)
return F.softmax(attention, dim=1)
class Decoder(nn.Module):
def __init__(self,
output_dim: int,
emb_dim: int,
enc_hid_dim: int,
dec_hid_dim: int,
dropout: int,
attention: nn.Module):
super().__init__()
self.emb_dim = emb_dim
self.enc_hid_dim = enc_hid_dim
self.dec_hid_dim = dec_hid_dim
self.output_dim = output_dim
self.dropout = dropout
self.attention = attention
self.embedding = nn.Embedding(output_dim, emb_dim)
self.rnn = nn.GRU((enc_hid_dim * 2) + emb_dim, dec_hid_dim)
self.out = nn.Linear(self.attention.attn_in + emb_dim, output_dim)
self.dropout = nn.Dropout(dropout)
def _weighted_encoder_rep(self,
decoder_hidden: Tensor,
encoder_outputs: Tensor) -> Tensor:
a = self.attention(decoder_hidden, encoder_outputs)
a = a.unsqueeze(1)
encoder_outputs = encoder_outputs.permute(1, 0, 2)
weighted_encoder_rep = torch.bmm(a, encoder_outputs)
weighted_encoder_rep = weighted_encoder_rep.permute(1, 0, 2)
return weighted_encoder_rep
def forward(self,
input: Tensor,
decoder_hidden: Tensor,
encoder_outputs: Tensor) -> Tuple[Tensor]:
input = input.unsqueeze(0)
embedded = self.dropout(self.embedding(input))
weighted_encoder_rep = self._weighted_encoder_rep(decoder_hidden,
encoder_outputs)
rnn_input = torch.cat((embedded, weighted_encoder_rep), dim = 2)
output, decoder_hidden = self.rnn(rnn_input, decoder_hidden.unsqueeze(0))
embedded = embedded.squeeze(0)
output = output.squeeze(0)
weighted_encoder_rep = weighted_encoder_rep.squeeze(0)
output = self.out(torch.cat((output,
weighted_encoder_rep,
embedded), dim = 1))
return output, decoder_hidden.squeeze(0)
class Seq2Seq(nn.Module):
def __init__(self,
encoder: nn.Module,
decoder: nn.Module,
device: torch.device):
super().__init__()
self.encoder = encoder
self.decoder = decoder
self.device = device
def forward(self,
src: Tensor,
trg: Tensor,
teacher_forcing_ratio: float = 0.5) -> Tensor:
batch_size = src.shape[1]
max_len = trg.shape[0]
trg_vocab_size = self.decoder.output_dim
outputs = torch.zeros(max_len, batch_size, trg_vocab_size).to(self.device)
encoder_outputs, hidden = self.encoder(src)
# first input to the decoder is the <sos> token
output = trg[0,:]
for t in range(1, max_len):
output, hidden = self.decoder(output, hidden, encoder_outputs)
outputs[t] = output
teacher_force = random.random() < teacher_forcing_ratio
top1 = output.max(1)[1]
output = (trg[t] if teacher_force else top1)
return outputs

74
src/test.py Normal file
View File

@ -0,0 +1,74 @@
from torchtext.data.utils import _basic_english_normalize
import torch
import random
from argparse import ArgumentParser
from model import *
from spacy.tokenizer import Tokenizer
from spacy.lang.eu import Basque
nlp = Basque()
def tokenizer(s):
return list(map(lambda x: x.text, nlp(s)))
parser = ArgumentParser(description='Azpitituluetan oinarritutako elkarrizketa \
sistemaren proba')
parser.add_argument('-decoding_strategy', type=str, default='top1', choices=['top1', 'topk', 'multinomial'])
args = parser.parse_args()
def decode(logits, decoding_strategy='max', k=3, temp=0.4):
if decoding_strategy=='top1':
target = logits.max(1)[1]
elif decoding_strategy=='topk':
target = logits.topk(k)[1][0][random.randint(0, k-1)].unsqueeze(-1)
else:
target = torch.multinomial(logits.squeeze().div(temp).exp().cpu(), 1)
return target
def evaluate(sentence):
with torch.no_grad():
sentence = '<sos> ' + sentence + ' <eos>'
sent_len = len(sentence.split())
sentence = torch.Tensor([text_field.vocab.stoi[i] for i in sentence.lower().split()]).long().view(sent_len, 1)
target = torch.Tensor([text_field.vocab.stoi['<sos>']]).long()
output_sentence = ''
encoder_outputs, hidden = model.encoder(sentence)
for t in range(MAX_LENGTH):
# first input to the decoder is the <sos> token
output, hidden = model.decoder(target, hidden, encoder_outputs)
target = decode(output, decoding_strategy)
word = text_field.vocab.itos[target.numpy()[0]]
if word == '<eos>':
return output_sentence
else:
output_sentence = output_sentence + ' ' + word
return output_sentence
#Load model and fields
text_field = torch.load('../model/text_field.Field')
model = torch.load('../model/model.pt', map_location=torch.device('cpu'))
torch.nn.Module.dump_patches = True
MAX_LENGTH = 10
#Print welcome message
print('-----------------------------------------')
print(' Ongi etorri elkarrizketara.')
print("Idatz ezazu 'Agur' elkarrizketa bukatzeko.")
print('------------------------------------------')
#Main system loop
user = input('- ')
model.eval()
decoding_strategy = args.decoding_strategy
while user != 'Agur' and user != 'Agur':
sentence = evaluate(' '.join(tokenizer(user)))
print('-' + sentence.strip().capitalize())
user = input('-')
sentence = evaluate(' '.join(tokenizer(user)))
print('-' + sentence.strip().capitalize())

147
src/train.py Normal file
View File

@ -0,0 +1,147 @@
from torchtext.data import Field, BucketIterator, TabularDataset
import torch
from torchtext import data
from model import Seq2Seq, Encoder, Decoder, Attention
import math
import time
from tqdm import tqdm
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from spacy.tokenizer import Tokenizer
from spacy.lang.eu import Basque
nlp = Basque()
def tokenizer(s):
return list(map(lambda x: x.text, nlp(s)))
text_field = Field(init_token = '<sos>',
eos_token = '<eos>',lower=True,
tokenize=tokenizer, tokenizer_language='eu')
fields = [('query', text_field), ('answer', text_field)]
train_data = TabularDataset(path='../data/eu_train.tsv', format='tsv', fields=fields)
text_field.build_vocab(train_data, min_freq=5)
print("Vocabulary has been built")
print("Vocab len is {}".format(len(text_field.vocab)))
#Save the text field for testing
torch.save(text_field, '../model/text_field.Field')
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
BATCH_SIZE = 32
train_iterator = BucketIterator(
dataset=train_data,
batch_size=BATCH_SIZE,
sort_key=lambda x: data.interleave_keys(len(x.query), len(x.answer)),
device=device)
#Tamainak egokitu zuen beharretara
INPUT_DIM = len(text_field.vocab)
OUTPUT_DIM = len(text_field.vocab)
ENC_EMB_DIM = 32
DEC_EMB_DIM = 32
ENC_HID_DIM = 64
DEC_HID_DIM = 64
ATTN_DIM = 8
ENC_DROPOUT = 0.5
DEC_DROPOUT = 0.5
enc = Encoder(INPUT_DIM, ENC_EMB_DIM, ENC_HID_DIM, DEC_HID_DIM, ENC_DROPOUT)
attn = Attention(ENC_HID_DIM, DEC_HID_DIM, ATTN_DIM)
dec = Decoder(OUTPUT_DIM, DEC_EMB_DIM, ENC_HID_DIM, DEC_HID_DIM, DEC_DROPOUT, attn)
model = Seq2Seq(enc, dec, device).to(device)
def init_weights(m: nn.Module):
for name, param in m.named_parameters():
if 'weight' in name:
nn.init.normal_(param.data, mean=0, std=0.01)
else:
nn.init.constant_(param.data, 0)
model.apply(init_weights)
optimizer = optim.Adam(model.parameters())
def count_parameters(model: nn.Module):
return sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f'The model has {count_parameters(model):,} trainable parameters')
PAD_IDX = text_field.vocab.stoi['<pad>']
criterion = nn.CrossEntropyLoss(ignore_index=PAD_IDX)
def train(model: nn.Module,
iterator: BucketIterator,
optimizer: optim.Optimizer,
criterion: nn.Module,
clip: float):
model.train()
epoch_loss = 0
for _, batch in tqdm(enumerate(iterator),total=len(iterator)):
src = batch.query
trg = batch.answer
optimizer.zero_grad()
output = model(src, trg)
output = output[1:].view(-1, output.shape[-1])
trg = trg[1:].view(-1)
loss = criterion(output, trg)
loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), clip)
optimizer.step()
epoch_loss += loss.item()
return epoch_loss / len(iterator)
def epoch_time(start_time: int,
end_time: int):
elapsed_time = end_time - start_time
elapsed_mins = int(elapsed_time / 60)
elapsed_secs = int(elapsed_time - (elapsed_mins * 60))
return elapsed_mins, elapsed_secs
N_EPOCHS = 10
CLIP = 1
best_valid_loss = float('inf')
for epoch in tqdm(range(N_EPOCHS)):
start_time = time.time()
train_loss = train(model, train_iterator, optimizer, criterion, CLIP)
end_time = time.time()
epoch_mins, epoch_secs = epoch_time(start_time, end_time)
print(f'Epoch: {epoch+1:02} | Time: {epoch_mins}m {epoch_secs}s')
print(f'\tTrain Loss: {train_loss:.3f} | Train PPL: {math.exp(train_loss):7.3f}')
# Save checkpoint
torch.save(model, '../model/model.pt')