更多精彩,请移驾『基础技术』继续阅读原文
http://www.jichujishu.com/articles/2018/07/11/1531302822077.html

Tensorflow:基于 LSTM 生成藏头诗

最近在学习 TensorFlow,学习到了 RNN 这一块,相关的资料不是很多,了解到使用 RNN 可以生成藏头诗之后,我就决定拿这个下手啦!
本文不介绍 RNN 以及 LSTM 的相关基本知识,如要了解,请自行百度。
本文是在学习了 TensorFlow7: 基于 RNN 生成古诗词
这一篇博客之后继续在其基础上修改的代码,若要了解相关内容可以先去上面的博客去看一下。

【注:本博客所使用的数据、代码、模型文件均已放在百度云上:
链接: https://pan.baidu.com/s/1qY4mt1y 密码: 47y2】

模型生成

首先我们要训练好模型。这里采用的是 2 层的 LSTM 框架,每层有 128 个隐藏层节点,batch_size 设为 64。训练数据来源于全唐诗(可在上面百度云资源分享当中找到)。特别注意到的一点是这里每训练完一次就对训练数据做 shuffle。
源代码如下:

#!/usr/bin/python3
#-*- coding: UTF-8 -*-
import collections  
import numpy as np  
import tensorflow as tf  

#------------------------------- 数据预处理 ---------------------------#  

poetry_file ='poetry.txt'  

# 诗集  
poetrys = []  
with open(poetry_file, "r") as f:  
    for line in f:  
        try:  
            line = line.decode('UTF-8')
            line = line.strip(u'\n')
            title, content = line.strip(u' ').split(u':')  
            content = content.replace(u' ',u'')  
            if u'_' in content or u'(' in content or u'(' in content or u'《' in content or u'[' in content:  
                continue  
            if len(content) < 5 or len(content) > 79:  
                continue  
            content = u'[' + content + u']'  
            poetrys.append(content)  
        except Exception as e:   
            pass  

# 按诗的字数排序  
poetrys = sorted(poetrys,key=lambda line: len(line))  
print('唐诗总数:', len(poetrys))  

# 统计每个字出现次数  
all_words = []  
for poetry in poetrys:  
    all_words += [word for word in poetry]  
counter = collections.Counter(all_words)  
count_pairs = sorted(counter.items(), key=lambda x: -x[1])  
words, _ = zip(*count_pairs)  

# 取前多少个常用字  
words = words[:len(words)] + (' ',)  
# 每个字映射为一个数字 ID  
word_num_map = dict(zip(words, range(len(words))))  
# 把诗转换为向量形式,参考 TensorFlow 练习 1  
to_num = lambda word: word_num_map.get(word, len(words))  
poetrys_vector = [ list(map(to_num, poetry)) for poetry in poetrys]  
#[[314, 3199, 367, 1556, 26, 179, 680, 0, 3199, 41, 506, 40, 151, 4, 98, 1],  
#[339, 3, 133, 31, 302, 653, 512, 0, 37, 148, 294, 25, 54, 833, 3, 1, 965, 1315, 377, 1700, 562, 21, 37, 0, 2, 1253, 21, 36, 264, 877, 809, 1]  
#....]  

# 每次取 64 首诗进行训练  
batch_size = 64
n_chunk = len(poetrys_vector) // batch_size  

class DataSet(object):
    def __init__(self,data_size):
        self._data_size = data_size
        self._epochs_completed = 0
        self._index_in_epoch = 0
        self._data_index = np.arange(data_size)

    def next_batch(self,batch_size):
        start = self._index_in_epoch
        if start + batch_size > self._data_size:
            np.random.shuffle(self._data_index)
            self._epochs_completed = self._epochs_completed + 1
            self._index_in_epoch = batch_size
            full_batch_features ,full_batch_labels = self.data_batch(0,batch_size)
            return full_batch_features ,full_batch_labels 
        else:
            self._index_in_epoch += batch_size
            end = self._index_in_epoch
            full_batch_features ,full_batch_labels = self.data_batch(start,end)
            if self._index_in_epoch == self._data_size:
                self._index_in_epoch = 0
                self._epochs_completed = self._epochs_completed + 1
                np.random.shuffle(self._data_index)
            return full_batch_features,full_batch_labels

    def data_batch(self,start,end):
        batches = []
        for i in range(start,end):
            batches.append(poetrys_vector[self._data_index[i]])

        length = max(map(len,batches))

        xdata = np.full((end - start,length), word_num_map[' '], np.int32)  
        for row in range(end - start):  
            xdata[row,:len(batches[row])] = batches[row]  
        ydata = np.copy(xdata)  
        ydata[:,:-1] = xdata[:,1:]  
        return xdata,ydata

#---------------------------------------RNN--------------------------------------#  

input_data = tf.placeholder(tf.int32, [batch_size, None])  
output_targets = tf.placeholder(tf.int32, [batch_size, None])  
# 定义 RNN  
def neural_network(model='lstm', rnn_size=128, num_layers=2):  
    if model == 'rnn':  
        cell_fun = tf.nn.rnn_cell.BasicRNNCell  
    elif model == 'gru':  
        cell_fun = tf.nn.rnn_cell.GRUCell  
    elif model == 'lstm':  
        cell_fun = tf.nn.rnn_cell.BasicLSTMCell  

    cell = cell_fun(rnn_size, state_is_tuple=True)  
    cell = tf.nn.rnn_cell.MultiRNNCell([cell] * num_layers, state_is_tuple=True)  

    initial_state = cell.zero_state(batch_size, tf.float32)  

    with tf.variable_scope('rnnlm'):  
        softmax_w = tf.get_variable("softmax_w", [rnn_size, len(words)])  
        softmax_b = tf.get_variable("softmax_b", [len(words)])  
        with tf.device("/cpu:0"):  
            embedding = tf.get_variable("embedding", [len(words), rnn_size])  
            inputs = tf.nn.embedding_lookup(embedding, input_data)  

    outputs, last_state = tf.nn.dynamic_rnn(cell, inputs, initial_state=initial_state, scope='rnnlm')  
    output = tf.reshape(outputs,[-1, rnn_size])  

    logits = tf.matmul(output, softmax_w) + softmax_b  
    probs = tf.nn.softmax(logits)  
    return logits, last_state, probs, cell, initial_state 

def load_model(sess, saver,ckpt_path):
    latest_ckpt = tf.train.latest_checkpoint(ckpt_path)
    if latest_ckpt:
        print ('resume from', latest_ckpt)
        saver.restore(sess, latest_ckpt)
        return int(latest_ckpt[latest_ckpt.rindex('-') + 1:])
    else:
        print ('building model from scratch')
        sess.run(tf.global_variables_initializer())
        return -1

#训练  
def train_neural_network():  
    logits, last_state, _, _, _ = neural_network()  
    targets = tf.reshape(output_targets, [-1])  
    loss = tf.nn.seq2seq.sequence_loss_by_example([logits], [targets], [tf.ones_like(targets, dtype=tf.float32)], len(words))  
    cost = tf.reduce_mean(loss)  
    learning_rate = tf.Variable(0.0, trainable=False)  
    tvars = tf.trainable_variables()  
    grads, _ = tf.clip_by_global_norm(tf.gradients(cost, tvars), 5)  
    #optimizer = tf.train.GradientDescentOptimizer(learning_rate)
    optimizer = tf.train.AdamOptimizer(learning_rate)   
    train_op = optimizer.apply_gradients(zip(grads, tvars))  

    Session_config = tf.ConfigProto(allow_soft_placement=True)
    Session_config.gpu_options.allow_growth = True  

    trainds = DataSet(len(poetrys_vector))

    with tf.Session(config=Session_config) as sess:
        with tf.device('/gpu:2'):  
            sess.run(tf.initialize_all_variables())  

            saver = tf.train.Saver(tf.all_variables())
            last_epoch = load_model(sess, saver,'model/') 

            for epoch in range(last_epoch + 1,100):
                sess.run(tf.assign(learning_rate, 0.002 * (0.97 ** epoch)))  
                #sess.run(tf.assign(learning_rate, 0.01))  

                all_loss = 0.0 
                for batche in range(n_chunk): 
                    x,y = trainds.next_batch(batch_size)
                    train_loss, _ , _ = sess.run([cost, last_state, train_op], feed_dict={input_data: x, output_targets: y})  

                    all_loss = all_loss + train_loss 

                    if batche % 50 == 1:
                        #print(epoch, batche, 0.01,train_loss) 
                        print(epoch, batche, 0.002 * (0.97 ** epoch),train_loss) 

                saver.save(sess, 'model/poetry.module', global_step=epoch) 
                print (epoch,'Loss:', all_loss * 1.0 / n_chunk) 

train_neural_network()  

使用该代码会将训练好的模型参数保存在 “model” 文件夹下。经过 100 个 epoch 之后,平均 loss 会降到 2.6 左右。训练好的模型也已经放在了上面分享的百度云资源当中。

生成古诗

使用训练好的模型可以轻松生成各种古诗。
下面就是几个例子:

生成藏头诗

上代码:

#!/usr/bin/python3
#-*- coding: UTF-8 -*-
import collections  
import numpy as np  
import tensorflow as tf  
'''
This one will produce a poetry with heads.
'''

#------------------------------- 数据预处理 ---------------------------#  

poetry_file ='poetry.txt'  

# 诗集  
poetrys = []  
with open(poetry_file, "r") as f:  
    for line in f:  
        try:  
            line = line.decode('UTF-8')
            line = line.strip(u'\n')
            title, content = line.strip(u' ').split(u':')  
            content = content.replace(u' ',u'')  
            if u'_' in content or u'(' in content or u'(' in content or u'《' in content or u'[' in content:  
                continue  
            if len(content) < 5 or len(content) > 79:  
                continue  
            content = u'[' + content + u']'  
            poetrys.append(content)  
        except Exception as e:   
            pass  

# 按诗的字数排序  
poetrys = sorted(poetrys,key=lambda line: len(line))  
print('唐诗总数:', len(poetrys))  

# 统计每个字出现次数  
all_words = []  
for poetry in poetrys:  
    all_words += [word for word in poetry]  
counter = collections.Counter(all_words)  
count_pairs = sorted(counter.items(), key=lambda x: -x[1])  
words, _ = zip(*count_pairs)  

# 取前多少个常用字  
words = words[:len(words)] + (' ',)  
# 每个字映射为一个数字 ID  
word_num_map = dict(zip(words, range(len(words))))  
# 把诗转换为向量形式,参考 TensorFlow 练习 1  
to_num = lambda word: word_num_map.get(word, len(words))  
poetrys_vector = [ list(map(to_num, poetry)) for poetry in poetrys]  
#[[314, 3199, 367, 1556, 26, 179, 680, 0, 3199, 41, 506, 40, 151, 4, 98, 1],  
#[339, 3, 133, 31, 302, 653, 512, 0, 37, 148, 294, 25, 54, 833, 3, 1, 965, 1315, 377, 1700, 562, 21, 37, 0, 2, 1253, 21, 36, 264, 877, 809, 1]  
#....]  

# 每次取 64 首诗进行训练  
batch_size = 1
n_chunk = len(poetrys_vector) // batch_size  

class DataSet(object):
    def __init__(self,data_size):
        self._data_size = data_size
        self._epochs_completed = 0
        self._index_in_epoch = 0
        self._data_index = np.arange(data_size)

    def next_batch(self,batch_size):
        start = self._index_in_epoch
        if start + batch_size > self._data_size:
            np.random.shuffle(self._data_index)
            self._epochs_completed = self._epochs_completed + 1
            self._index_in_epoch = batch_size
            full_batch_features ,full_batch_labels = self.data_batch(0,batch_size)
            return full_batch_features ,full_batch_labels 
        else:
            self._index_in_epoch += batch_size
            end = self._index_in_epoch
            full_batch_features ,full_batch_labels = self.data_batch(start,end)
            if self._index_in_epoch == self._data_size:
                self._index_in_epoch = 0
                self._epochs_completed = self._epochs_completed + 1
                np.random.shuffle(self._data_index)
            return full_batch_features,full_batch_labels

    def data_batch(self,start,end):
        batches = []
        for i in range(start,end):
            batches.append(poetrys_vector[self._data_index[i]])

        length = max(map(len,batches))

        xdata = np.full((end - start,length), word_num_map[' '], np.int32)  
        for row in range(end - start):  
            xdata[row,:len(batches[row])] = batches[row]  
        ydata = np.copy(xdata)  
        ydata[:,:-1] = xdata[:,1:]  
        return xdata,ydata

#---------------------------------------RNN--------------------------------------#  

input_data = tf.placeholder(tf.int32, [batch_size, None])  
output_targets = tf.placeholder(tf.int32, [batch_size, None])  
# 定义 RNN  
def neural_network(model='lstm', rnn_size=128, num_layers=2):  
    if model == 'rnn':  
        cell_fun = tf.nn.rnn_cell.BasicRNNCell  
    elif model == 'gru':  
        cell_fun = tf.nn.rnn_cell.GRUCell  
    elif model == 'lstm':  
        cell_fun = tf.nn.rnn_cell.BasicLSTMCell  

    cell = cell_fun(rnn_size, state_is_tuple=True)  
    cell = tf.nn.rnn_cell.MultiRNNCell([cell] * num_layers, state_is_tuple=True)  

    initial_state = cell.zero_state(batch_size, tf.float32)  

    with tf.variable_scope('rnnlm'):  
        softmax_w = tf.get_variable("softmax_w", [rnn_size, len(words)])  
        softmax_b = tf.get_variable("softmax_b", [len(words)])  
        with tf.device("/cpu:0"):  
            embedding = tf.get_variable("embedding", [len(words), rnn_size])  
            inputs = tf.nn.embedding_lookup(embedding, input_data)  

    outputs, last_state = tf.nn.dynamic_rnn(cell, inputs, initial_state=initial_state, scope='rnnlm')  
    output = tf.reshape(outputs,[-1, rnn_size])  

    logits = tf.matmul(output, softmax_w) + softmax_b  
    probs = tf.nn.softmax(logits)  
    return logits, last_state, probs, cell, initial_state 

#------------------------------- 生成古诗 ---------------------------------#  
# 使用训练完成的模型  

def gen_head_poetry(heads, type):
    if type != 5 and type != 7:
        print 'The second para has to be 5 or 7!'
        return   
    def to_word(weights):  
        t = np.cumsum(weights)  
        s = np.sum(weights)  
        sample = int(np.searchsorted(t, np.random.rand(1)*s))  
        return words[sample]  

    _, last_state, probs, cell, initial_state = neural_network()
    Session_config = tf.ConfigProto(allow_soft_placement = True)
    Session_config.gpu_options.allow_growth=True 

    with tf.Session(config=Session_config) as sess: 
        with tf.device('/gpu:1'): 

            sess.run(tf.initialize_all_variables())  

            saver = tf.train.Saver(tf.all_variables())  
            saver.restore(sess, 'model/poetry.module-99')
            poem = ''
            for head in  heads:
                flag = True
                while flag:

                    state_ = sess.run(cell.zero_state(1, tf.float32)) 

                    x = np.array([list(map(word_num_map.get, u'['))])  
                    [probs_, state_] = sess.run([probs, last_state], feed_dict={input_data: x, initial_state: state_})  

                    sentence = head

                    x = np.zeros((1,1))  
                    x[0,0] = word_num_map[sentence]  
                    [probs_, state_] = sess.run([probs, last_state], feed_dict={input_data: x, initial_state: state_})  
                    word = to_word(probs_)
                    sentence += word  

                    while word != u'。':  
                        x = np.zeros((1,1))  
                        x[0,0] = word_num_map[word]  
                        [probs_, state_] = sess.run([probs, last_state], feed_dict={input_data: x, initial_state: state_})  
                        word = to_word(probs_)
                        sentence += word  

                    if len(sentence) == 2 + 2 * type:
                        sentence += u'\n'
                        poem += sentence
                        flag = False

            return poem

print(gen_head_poetry(u'天下之大',5)) 

最后从函数接口可以看到,除了可以自己定义诗的头外,还可以定义是五言绝句还是七言绝句。
来看几个五言绝句的例子:

再来看几个七言绝句的例子:

那么是不是可以用它来写情诗呢?
当然可以啦!

当然对机器学习不熟悉的朋友,如果对机器学习有兴趣,推荐以下两本书:

《机器学习实战》-> 《机器学习实战》点击查看

《TensorFlow 机器学习实战指南》 -> 《TensorFlow 机器学习实战指南》点击查看

  • B3log

    B3log 是一个开源组织,名字来源于“Bulletin Board Blog”缩写,目标是将独立博客与论坛结合,形成一种新的网络社区体验,详细请看 B3log 构思。目前 B3log 已经开源了多款产品:PipeSoloSymWide 等,欢迎大家加入,贡献开源。

    3131 引用 • 3886 回帖 • 656 关注
  • 机器学习

    机器学习(Machine Learning)是一门多领域交叉学科,涉及概率论、统计学、逼近论、凸分析、算法复杂度理论等多门学科。专门研究计算机怎样模拟或实现人类的学习行为,以获取新的知识或技能,重新组织已有的知识结构使之不断改善自身的性能。

    45 引用 • 18 回帖
  • TensorFlow

    TensorFlow 是一个采用数据流图(data flow graphs),用于数值计算的开源软件库。节点(Nodes)在图中表示数学操作,图中的线(edges)则表示在节点间相互联系的多维数据数组,即张量(tensor)。

    9 引用 • 4 回帖 • 2 关注
感谢    关注    收藏    赞同    反对    举报    分享