使用tensorflow实现seq2seq

使用tensorflow实现seq2seq

上篇文章记录了seq2seq和attention机制的基本原理,这篇文章趁热打铁看看如何自己写代码实现。

tf2的一些API操作

推荐一个学习tensorflow的教程:https://github.com/lyhue1991/eat_tensorflow2_in_30_days

1
import tensorflow as tf

连接操作(tf.concat)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
t1 = [[1, 2, 3], [4, 5, 6]] # 2, 3
t2 = [[7, 8, 9], [10, 11, 12]]
tf.concat([t1, t2], axis=0)

<tf.Tensor: shape=(4, 3), dtype=int32, numpy=
array([[ 1, 2, 3],
[ 4, 5, 6],
[ 7, 8, 9],
[10, 11, 12]])>


tf.concat([t1, t2], axis=1)

<tf.Tensor: shape=(2, 6), dtype=int32, numpy=
array([[ 1, 2, 3, 7, 8, 9],
[ 4, 5, 6, 10, 11, 12]])>

增加维度的操作(tf.expand_dims)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
t3 = [[1, 2, 3],[4, 5, 6]] # shape [2, 3] 

tf.expand_dims(t3, axis=0)
<tf.Tensor: shape=(1, 2, 3), dtype=int32, numpy=
array([[[1, 2, 3],
[4, 5, 6]]])>

tf.expand_dims(t3, 1)
<tf.Tensor: shape=(2, 1, 3), dtype=int32, numpy=
array([[[1, 2, 3]],

[[4, 5, 6]]])>

tf.expand_dims(t3, 2)
<tf.Tensor: shape=(2, 3, 1), dtype=int32, numpy=
array([[[1],
[2],
[3]],

[[4],
[5],
[6]]])>

减维操作(tf.squeeze)

1
2
3
4
5
t4 = tf.expand_dims(t3, 2) 
tf.squeeze(t4, axis=2)
<tf.Tensor: shape=(2, 3), dtype=int32, numpy=
array([[1, 2, 3],
[4, 5, 6]])>

注意: tf.squeeze只能从张量形状中移除大小为1的维度 ,如果上面设置axis=0或1,将会报错。

更改维度操作(tf.reshape)

1
2
3
4
5
6
tf.reshape(t3, [3, 2])

<tf.Tensor: shape=(3, 2), dtype=int32, numpy=
array([[1, 2],
[3, 4],
[5, 6]])>

类型转换操作(tf.cast)

1
2
3
4
x = tf.constant([1.8, 2.2], dtype=tf.float32)
tf.dtypes.cast(x, tf.int32)

<tf.Tensor: shape=(2,), dtype=int32, numpy=array([1, 2])>

堆叠操作(tf.stack)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
x = tf.constant([1, 4]) 
y = tf.constant([2, 5])
z = tf.constant([3, 6])

tf.stack([x, y, z], axis=0)
<tf.Tensor: shape=(3, 2), dtype=int32, numpy=
array([[1, 4],
[2, 5],
[3, 6]])>

tf.stack([x, y, z], axis=1)
<tf.Tensor: shape=(2, 3), dtype=int32, numpy=
array([[1, 2, 3],
[4, 5, 6]])>

不含attention的seq2seq实现

Encoder

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
class Encoder(tf.keras.Model):
def __init__(self, vocab_size, embedding_dim, enc_units, batch_sz):
"""
:param vocab_size: 词表大小
:param embedding_dim: 词向量维度
:param enc_units: encoder units数量
:param batch_sz: 批大小
"""
super(Encoder, self).__init__()
self.batch_sz = batch_sz
self.enc_units = enc_units
# 此时的Embedding层是一个可学习的层,实例化参数(词表大小, 嵌入维度)
# 词表大小也就是词的个数,嵌入维度也就是每个词向量的长度
self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
self.gru = tf.keras.layers.GRU(self.enc_units,
recurrent_initializer='glorot_uniform')

def __call__(self, x, hidden):
"""
:param x: 输入x shape(batch_sz, input_sequence_length)
:param hidden: 隐藏层状态 shape(batch_sz, enc_units)
:return:
"""
# 通过embedding层x shape变为(batch_sz, input_sequence_length, embedding_dim)
x = self.embedding(x)
# 不带attention时,仅需要context vector,也就是每个句子的最终输出
# output shape(batch_size, enc_units)
output = self.gru(x, initial_state=hidden)
return output

def initialize_hidden_state(self):
return tf.zeros((self.batch_sz, self.enc_units))

Decoder

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
class Decoder(tf.keras.Model):
def __init__(self, vocab_size, embedding_dim, dec_units, batch_sz):
"""
:param vocab_size: 词表大小
:param embedding_dim: 词向量维度
:param dec_units: decoder units数量
:param batch_sz: 批大小
"""
super(Decoder, self).__init__()
self.batch_sz = batch_sz
self.dec_units = dec_units
self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
# 需要返回每个时间步的输出,不需要返回state
self.gru = tf.keras.layers.GRU(self.dec_units,
return_sequences=True,
recurrent_initializer='glorot_uniform')
self.fc = tf.keras.layers.Dense(vocab_size, activation="softmax")

def __call__(self, x, context_vector):
"""
:param x: decoder 的输入 shape(batch_sz, 1)
:param context_vector: encoder的输出 shape(batch_sz, enc_units)
:return:
"""
# x在通过嵌入层后的形状(batch_sz, 1, embedding_dim)
x = self.embedding(x)

# x在拼接(concatenation)后的形状==(批大小, 1, embedding_dim+enc_units)
x = tf.concat([tf.expand_dims(context_vector, 1), x], axis=-1)

# 将合并后的向量传送到GRU,output shape(batch_sz, 1, dec_units)
output = self.gru(x)

# 输出的形状 == (batch_sz*1, dec_units)
output = tf.reshape(output, (-1, output.shape[2]))

# 输出的形状 == (batch_sz,vocab_siz)
prediction = self.fc(output)

return prediction

测试代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
if __name__ == "__main__":

Vocab_Size = 10000
Embedding_Dim = 300
Enc_Units = 128
Dec_Units = 128
Batch_Sz = 64
input_sequences_length = 256
encoder = Encoder(Vocab_Size, Embedding_Dim, Enc_Units, Batch_Sz)
decoder = Decoder(Vocab_Size, Embedding_Dim, Enc_Units, Batch_Sz)

# 模拟生成输入数据
example_input_batch = tf.ones(shape=(Batch_Sz, input_sequences_length), dtype=tf.int32)
sample_hidden = encoder.initialize_hidden_state()

sample_output = encoder(example_input_batch, sample_hidden)
print(f'encoder output shape (batch_size, enc_units): {sample_output.shape}')

decoder_output = decoder(tf.random.uniform((64, 1)), sample_output)

print(f'Decoder output shape: (batch_size, vocab size) {decoder_output.shape}')


encoder output shape (batch_size, enc_units): (64, 128)
Decoder output shape: (batch_size, vocab size) (64, 10000)

含attention的seq2seq实现

encoder

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
class Encoder(tf.keras.Model):
def __init__(self, embedding_matrix, enc_units, batch_sz):
"""
:param embedding_matrix: 词向量矩阵,即每个词的词向量
:param enc_units: encoder units数量
:param batch_sz: 批大小
"""
super(Encoder, self).__init__()
self.batch_sz = batch_sz
self.enc_units = enc_units
# embedding_matrix是自己训练得到的词向量矩阵(也可以下载的预训练词向量矩阵)
vocab_size, embedding_dim = embedding_matrix.shape
# 此时的embedding层不可学习,是从给定的词向量矩阵加载的(当然也可以先加载预训练词向量,然后再微调学习)
self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim,
weights=[embedding_matrix],
trainable=False)
# 因为需要所有时间步的隐藏状态,所以return_sequences=True
self.gru = tf.keras.layers.GRU(self.enc_units,
return_sequences=True,
return_state=True,
recurrent_initializer='glorot_uniform')

def call(self, x, hidden):
"""
:param x: 输入,shape(batch_size, input_length)
:param hidden: 隐藏层状态, shape(batch_size, enc_units)
:return:
"""
# 经过embedding层后, x shape(batch_size, input_length, embedding_dim)
x = self.embedding(x)
# output shape(batch_size, input_length, enc_units)
# state shape(batch_size, enc_units)
output, state = self.gru(x, initial_state=hidden)
return output, state

def initialize_hidden_state(self):
# 初始化隐藏层状态值,初始化为0
return tf.zeros((self.batch_sz, self.enc_units))

decoder

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
class Decoder(tf.keras.Model):
def __init__(self, embedding_matrix, dec_units, batch_sz):
"""
:param embedding_matrix: 词向量矩阵
:param dec_units: decoder units 数量
:param batch_sz: 批大小
"""
super(Decoder, self).__init__()
self.batch_sz = batch_sz
self.dec_units = dec_units
vocab_size, embedding_dim = embedding_matrix.shape

# 定义Embedding层,加载预训练的词向量
self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim,
weights=[embedding_matrix],
trainable=False)

self.gru = tf.keras.layers.GRU(self.dec_units,
return_sequences=True,
return_state=True,
recurrent_initializer='glorot_uniform')
# 定义最后的fc层,用于预测词的概率
self.fc = tf.keras.layers.Dense(vocab_size, activation='softmax')
# 注意力机制 used for attention
self.attention = BahdanauAttention(self.dec_units)

def call(self, x, hidden, enc_output):
"""
:param x: 解码器的输入, shape(batch_size, 1)
:param hidden: 上一次的隐藏层状态(第一次使用编码器隐藏层),shape(batch_size, enc_units)
:param enc_output: 编码器输出, shape(batch_size, input_length, enc_units)
:return:
"""
# 使用上次的隐藏层(第一次使用编码器隐藏层)、编码器输出计算注意力权重
context_vector, attention_weights = self.attention(hidden, enc_output)

# x shape after passing through embedding == (batch_size, 1, embedding_dim)
x = self.embedding(x)

# 将上一循环的预测结果跟注意力权重值结合在一起作为本次的GRU网络输入
# x shape after concatenation == (batch_size, 1, embedding_dim + hidden_size)
x = tf.concat([tf.expand_dims(context_vector, 1), x], axis=-1)

# output shape(batch_size, 1, dec_units)
# state shape(batch_size, dec_units),这个state将会用于下一轮decoder的hidden输入
output, state = self.gru(x, hidden)

# output shape == (batch_size * 1, dec_units)
output = tf.reshape(output, (-1, output.shape[2]))

# prediction shape == (batch_size, vocab_size)
prediction = self.fc(output)
return prediction, state, attention_weights

attention实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class BahdanauAttention(tf.keras.Model):
def __init__(self, units):
super(BahdanauAttention, self).__init__()
self.W1 = tf.keras.layers.Dense(units)
self.W2 = tf.keras.layers.Dense(units)
self.V = tf.keras.layers.Dense(1)

def call(self, hidden_state, enc_output):
"""
:param hidden_state: 隐藏层状态,shape(batch_size, enc_units)
:param enc_output: 编码器输出,shape(batch_size, input_length, enc_units)
:return:
"""
# hidden_with_time_axis shape == (batch_size, 1, enc_units)
hidden_with_time_axis = tf.expand_dims(hidden_state, 1)

# 计算注意力权重值,得到score,shape == (batch_size, input_length, 1)
score = self.V(
tf.nn.tanh(self.W1(enc_output) + self.W2(hidden_with_time_axis))
)
# 归一化score,得到 attention_weights,与score的shape相同
attention_weights = tf.nn.softmax(score, axis=1)

# 使用注意力权重*编码器输出作为返回值,将来会作为解码器的输入
# context_vector shape(batch_size, input_length, enc_units)
context_vector = attention_weights * enc_output
# context_vector shape after sum == (batch_size, enc_units)
context_vector = tf.reduce_sum(context_vector, axis=1)

return context_vector, attention_weights

随机初始化输入数据和一些必要参数来进行测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# 加载训练好的词向量矩阵
def load_embedding_matrix(filepath=embedding_matrix_path, max_vocab_size=50000):
"""加载 embedding_matrix_path"""
embedding_matrix = np.load(filepath + '.npy')
flag_matrix = np.zeros_like(embedding_matrix[:Vocab.MASK_COUNT])
return np.concatenate([flag_matrix, embedding_matrix])[:max_vocab_size]


if __name__ == '__main__':
# GPU资源配置(此部分代码非此部分的核心代码可不关注)
config_gpu(use_cpu=True)
# 获得参数(此部分代码非此部分的核心代码可不关注)
params = get_params()
# 读取vocab训练(Vocab在前面一篇训练词向量的实战中有提及)
vocab = Vocab(params["vocab_path"], params["vocab_size"])
# 计算vocab size,即词的个数
vocab_size = vocab.count
# 使用GenSim训练好的embedding matrix
embedding_matrix = load_embedding_matrix()

input_sequence_len = 250 # 输入每个句子的长度
batch_size = 64
embedding_dim = 500 # 词向量维度
units = 1024 # 编码、解码单元数量

# 编码器结构 embedding_matrix, enc_units, batch_sz
encoder = Encoder(embedding_matrix, units, batch_size)
# example_input(64, 250)
example_input_batch = tf.ones(shape=(batch_size, input_sequence_len), dtype=tf.int32)
# sample_hidden(64, 1024),初始化为0
sample_hidden = encoder.initialize_hidden_state()

sample_output, sample_hidden = encoder(example_input_batch, sample_hidden)
# 打印结果
print('Encoder output shape: (batch size, sequence length, hidden_dim) {}'.format(sample_output.shape))
print('Encoder Hidden state shape: (batch size, hidden_dim) {}'.format(sample_hidden.shape))

attention_layer = BahdanauAttention(10)
attention_result, attention_weights = attention_layer(sample_hidden, sample_output)

print("Attention result shape: (batch size, units) {}".format(attention_result.shape))
print("Attention weights shape: (batch_size, sequence_length, 1) {}".format(attention_weights.shape))

decoder = Decoder(embedding_matrix, units, batch_size)
sample_decoder_output, state, attention_weights = decoder(tf.random.uniform((64, 1)),
sample_hidden, sample_output)

print('Decoder output shape: (batch_size, vocab size) {}'.format(sample_decoder_output.shape))

输出

1
2
3
4
5
Encoder output shape: (batch size, sequence length, hidden_dim) (64, 250, 1024)
Encoder Hidden state shape: (batch size, hidden_dim) (64, 1024)
Attention result shape: (batch size, units) (64, 1024)
Attention weights shape: (batch_size, sequence_length, 1) (64, 250, 1)
Decoder output shape: (batch_size, vocab size) (64, 28693)

hidden_dim与enc_units一个意思