在 bAbI 数据集上训练一个记忆网络。
参考文献:
-
Jason Weston, Antoine Bordes, Sumit Chopra, Tomas Mikolov, Alexander M. Rush, "Towards AI-Complete Question Answering: A Set of Prerequisite Toy Tasks"
-
Sainbayar Sukhbaatar, Arthur Szlam, Jason Weston, Rob Fergus, "End-To-End Memory Networks"
120 轮迭代后,在 'single_supporting_fact_10k' 任务上达到了 98.6% 的准确率。 每轮迭代时间: 3s on CPU (core i7).
from __future__ import print_function
from keras.models import Sequential, Model
from keras.layers.embeddings import Embedding
from keras.layers import Input, Activation, Dense, Permute, Dropout
from keras.layers import add, dot, concatenate
from keras.layers import LSTM
from keras.utils.data_utils import get_file
from keras.preprocessing.sequence import pad_sequences
from functools import reduce
import tarfile
import numpy as np
import re
def tokenize(sent):
'''返回包含标点符号的句子的标记。
>>> tokenize('Bob dropped the apple. Where is the apple?')
['Bob', 'dropped', 'the', 'apple', '.', 'Where', 'is', 'the', 'apple', '?']
'''
return [x.strip() for x in re.split(r'(\W+)?', sent) if x.strip()]
def parse_stories(lines, only_supporting=False):
'''解析 bAbi 任务格式中提供的故事
如果 only_supporting 为 true,
则只保留支持答案的句子。
'''
data = []
story = []
for line in lines:
line = line.decode('utf-8').strip()
nid, line = line.split(' ', 1)
nid = int(nid)
if nid == 1:
story = []
if '\t' in line:
q, a, supporting = line.split('\t')
q = tokenize(q)
if only_supporting:
# 只选择相关的子故事
supporting = map(int, supporting.split())
substory = [story[i - 1] for i in supporting]
else:
# 提供所有子故事
substory = [x for x in story if x]
data.append((substory, q, a))
story.append('')
else:
sent = tokenize(line)
story.append(sent)
return data
def get_stories(f, only_supporting=False, max_length=None):
'''给定文件名,读取文件,检索故事,
然后将句子转换为一个独立故事。
如果提供了 max_length,
任何长于 max_length 的故事都将被丢弃。
'''
data = parse_stories(f.readlines(), only_supporting=only_supporting)
flatten = lambda data: reduce(lambda x, y: x + y, data)
data = [(flatten(story), q, answer) for story, q, answer in data
if not max_length or len(flatten(story)) < max_length]
return data
def vectorize_stories(data):
inputs, queries, answers = [], [], []
for story, query, answer in data:
inputs.append([word_idx[w] for w in story])
queries.append([word_idx[w] for w in query])
answers.append(word_idx[answer])
return (pad_sequences(inputs, maxlen=story_maxlen),
pad_sequences(queries, maxlen=query_maxlen),
np.array(answers))
try:
path = get_file('babi-tasks-v1-2.tar.gz',
origin='https://s3.amazonaws.com/text-datasets/'
'babi_tasks_1-20_v1-2.tar.gz')
except:
print('Error downloading dataset, please download it manually:\n'
'$ wget http://www.thespermwhale.com/jaseweston/babi/tasks_1-20_v1-2'
'.tar.gz\n'
'$ mv tasks_1-20_v1-2.tar.gz ~/.keras/datasets/babi-tasks-v1-2.tar.gz')
raise
challenges = {
# QA1 任务,10,000 样本
'single_supporting_fact_10k': 'tasks_1-20_v1-2/en-10k/qa1_'
'single-supporting-fact_{}.txt',
# QA2 任务,1000 样本
'two_supporting_facts_10k': 'tasks_1-20_v1-2/en-10k/qa2_'
'two-supporting-facts_{}.txt',
}
challenge_type = 'single_supporting_fact_10k'
challenge = challenges[challenge_type]
print('Extracting stories for the challenge:', challenge_type)
with tarfile.open(path) as tar:
train_stories = get_stories(tar.extractfile(challenge.format('train')))
test_stories = get_stories(tar.extractfile(challenge.format('test')))
vocab = set()
for story, q, answer in train_stories + test_stories:
vocab |= set(story + q + [answer])
vocab = sorted(vocab)
# 保留 0 以留作 pad_sequences 进行 masking
vocab_size = len(vocab) + 1
story_maxlen = max(map(len, (x for x, _, _ in train_stories + test_stories)))
query_maxlen = max(map(len, (x for _, x, _ in train_stories + test_stories)))
print('-')
print('Vocab size:', vocab_size, 'unique words')
print('Story max length:', story_maxlen, 'words')
print('Query max length:', query_maxlen, 'words')
print('Number of training stories:', len(train_stories))
print('Number of test stories:', len(test_stories))
print('-')
print('Here\'s what a "story" tuple looks like (input, query, answer):')
print(train_stories[0])
print('-')
print('Vectorizing the word sequences...')
word_idx = dict((c, i + 1) for i, c in enumerate(vocab))
inputs_train, queries_train, answers_train = vectorize_stories(train_stories)
inputs_test, queries_test, answers_test = vectorize_stories(test_stories)
print('-')
print('inputs: integer tensor of shape (samples, max_length)')
print('inputs_train shape:', inputs_train.shape)
print('inputs_test shape:', inputs_test.shape)
print('-')
print('queries: integer tensor of shape (samples, max_length)')
print('queries_train shape:', queries_train.shape)
print('queries_test shape:', queries_test.shape)
print('-')
print('answers: binary (1 or 0) tensor of shape (samples, vocab_size)')
print('answers_train shape:', answers_train.shape)
print('answers_test shape:', answers_test.shape)
print('-')
print('Compiling...')
# 占位符
input_sequence = Input((story_maxlen,))
question = Input((query_maxlen,))
# 编码器
# 将输入序列编码为向量的序列
input_encoder_m = Sequential()
input_encoder_m.add(Embedding(input_dim=vocab_size,
output_dim=64))
input_encoder_m.add(Dropout(0.3))
# 输出: (samples, story_maxlen, embedding_dim)
# 将输入编码为的向量的序列(向量尺寸为 query_maxlen)
input_encoder_c = Sequential()
input_encoder_c.add(Embedding(input_dim=vocab_size,
output_dim=query_maxlen))
input_encoder_c.add(Dropout(0.3))
# 输出: (samples, story_maxlen, query_maxlen)
# 将问题编码为向量的序列
question_encoder = Sequential()
question_encoder.add(Embedding(input_dim=vocab_size,
output_dim=64,
input_length=query_maxlen))
question_encoder.add(Dropout(0.3))
# 输出: (samples, query_maxlen, embedding_dim)
# 编码输入序列和问题(均已索引化)为密集向量的序列
input_encoded_m = input_encoder_m(input_sequence)
input_encoded_c = input_encoder_c(input_sequence)
question_encoded = question_encoder(question)
# 计算第一个输入向量和问题向量序列的『匹配』('match')
# 尺寸: `(samples, story_maxlen, query_maxlen)`
match = dot([input_encoded_m, question_encoded], axes=(2, 2))
match = Activation('softmax')(match)
# 将匹配矩阵与第二个输入向量序列相加
response = add([match, input_encoded_c]) # (samples, story_maxlen, query_maxlen)
response = Permute((2, 1))(response) # (samples, query_maxlen, story_maxlen)
# 拼接匹配矩阵和问题向量序列
answer = concatenate([response, question_encoded])
# 原始论文使用一个矩阵乘法来进行归约操作。
# 我们在此选择使用 RNN。
answer = LSTM(32)(answer) # (samples, 32)
# 一个正则化层 - 可能还需要更多层
answer = Dropout(0.3)(answer)
answer = Dense(vocab_size)(answer) # (samples, vocab_size)
# 输出词汇表的一个概率分布
answer = Activation('softmax')(answer)
# 构建最终模型
model = Model([input_sequence, question], answer)
model.compile(optimizer='rmsprop', loss='sparse_categorical_crossentropy',
metrics=['accuracy'])
# 训练
model.fit([inputs_train, queries_train], answers_train,
batch_size=32,
epochs=120,
validation_data=([inputs_test, queries_test], answers_test))