[代码]基于RNN的文本生成算法

RNN

Posted by 王维维 on October 15, 2016

“能让机器自动生成blog就好了!”

前言

跳过废话,直接看正文

RNN相对于传统的神经网络来说对于把握上下文之间的关系更为擅长,因此现在被大量用在自然语言处理的相关任务中,例如生成与训练文集相似的文字、序列标注、中文分词等。

此文列出两种基于RNN的文本生成算法,以供参考。


正文

基于字符的文本生成算法

'''Example script to generate text from Nietzsche's writings.
At least 20 epochs are required before the generated text
starts sounding coherent.
It is recommended to run this script on GPU, as recurrent
networks are quite computationally intensive.
If you try this script on new data, make sure your corpus
has at least ~100k characters. ~1M is better.
'''

from __future__ import print_function
from keras.models import Sequential
from keras.layers import Dense, Activation, Dropout
from keras.layers import LSTM
from keras.optimizers import RMSprop
from keras.utils.data_utils import get_file
import numpy as np
import random
import sys

start_time = time.time()
output_file_handler = open('out.log', 'w')
sys.stdout = output_file_handler

path = get_file('nietzsche.txt', origin="https://s3.amazonaws.com/text-datasets/nietzsche.txt")
text = open(path).read().lower()
print('corpus length:', len(text))

chars = sorted(list(set(text)))
print('total chars:', len(chars))
char_indices = dict((c, i) for i, c in enumerate(chars))
indices_char = dict((i, c) for i, c in enumerate(chars))

# cut the text in semi-redundant sequences of maxlen characters
maxlen = 40
step = 3
sentences = []
next_chars = []
for i in range(0, len(text) - maxlen, step):
    sentences.append(text[i: i + maxlen])
    next_chars.append(text[i + maxlen])
print('nb sequences:', len(sentences))

print('Vectorization...')
X = np.zeros((len(sentences), maxlen, len(chars)), dtype=np.bool)
y = np.zeros((len(sentences), len(chars)), dtype=np.bool)
for i, sentence in enumerate(sentences):
    for t, char in enumerate(sentence):
        X[i, t, char_indices[char]] = 1
    y[i, char_indices[next_chars[i]]] = 1


# build the model: a single LSTM
print('Build model...')
model = Sequential()
model.add(LSTM(128, input_shape=(maxlen, len(chars))))
model.add(Dense(len(chars)))
model.add(Activation('softmax'))

optimizer = RMSprop(lr=0.01)
model.compile(loss='categorical_crossentropy', optimizer=optimizer)


def sample(preds, temperature=1.0):
    # helper function to sample an index from a probability array
    preds = np.asarray(preds).astype('float64')
    preds = np.log(preds) / temperature
    exp_preds = np.exp(preds)
    preds = exp_preds / np.sum(exp_preds)
    probas = np.random.multinomial(1, preds, 1)
    return np.argmax(probas)

# train the model, output generated text after each iteration
for iteration in range(1, 60):
    end_time = time.time()
    print 'training used time : ' + str(end_time - start_time)

    print()
    print('-' * 50)
    print('Iteration', iteration)
    model.fit(X, y, batch_size=128, nb_epoch=1)

    start_index = random.randint(0, len(text) - maxlen - 1)

    for diversity in [0.2, 0.5, 1.0, 1.2]:
        print()
        print('----- diversity:', diversity)

        generated = ''
        sentence = text[start_index: start_index + maxlen]
        generated += sentence
        print('----- Generating with seed: "' + sentence + '"')
        sys.stdout.write(generated)

        for i in range(400):
            x = np.zeros((1, maxlen, len(chars)))
            for t, char in enumerate(sentence):
                x[0, t, char_indices[char]] = 1.

            preds = model.predict(x, verbose=0)[0]
            next_index = sample(preds, diversity)
            next_char = indices_char[next_index]

            generated += next_char
            sentence = sentence[1:] + next_char

            sys.stdout.write(next_char)
            sys.stdout.flush()
        print()

此代码为keras的官方例子

基于字符的文本生成算法

'''Example script to generate text using keras and word2vec

At least 20 epochs are required before the generated text
starts sounding coherent.

It is recommended to run this script on GPU, as recurrent
networks are quite computationally intensive.

'''

from __future__ import print_function
from keras.models import Sequential
from keras.layers import Dense, Activation, Dropout
from keras.layers import LSTM
from keras.optimizers import RMSprop
from keras.utils.data_utils import get_file
from nltk import tokenize
import numpy as np
import random
import sys
import os
import nltk

import gensim, logging
import os
logging.basicConfig(format='%(asctime)s : %(levelname)s : %(message)s', level=logging.INFO)

# a memory-friendly iterator
class MySentences(object):
    def __init__(self, dirname, min_word_count_in_sentence = 1):
        self.dirname = dirname
        self.min_word_count_in_sentence = min_word_count_in_sentence;
    
    def process_line(self, line):
        words = line.split()
        return words

    def __iter__(self):
        for fname in os.listdir(self.dirname):
            for line in open(os.path.join(self.dirname, fname)):
                processed_line = self.process_line(line)
                if (len(processed_line) >= self.min_word_count_in_sentence):
                    yield processed_line
                else:
                    continue

def generate_word2vec_train_files(input_dir, output_dir, sentence_start_token, sentence_end_token, unkown_token, word_min_count, word2vec_size):
    print('generate_word2vec_train_files...')
    tmp_word2vec_model = gensim.models.Word2Vec(min_count = word_min_count, size = word2vec_size)
    original_sentences = MySentences(input_dir)
    tmp_word2vec_model.build_vocab(original_sentences)
    original_word2vec_vocab = tmp_word2vec_model.vocab

    make_dir_if_not_exist(output_dir)
    for fname in os.listdir(input_dir):
        output_file = open(os.path.join(output_dir, fname), 'w')
        line_count = 0
        for line in open(os.path.join(input_dir, fname)):
            line = line.strip(' -=:\"\'_*\n')
            if len(line) == 0:
                continue
            sentences = tokenize.sent_tokenize(line)
            for idx, sentence in enumerate(sentences):
                words = sentence.split()
                for word_idx, word in enumerate(words):
                    if word not in original_word2vec_vocab:
                        words[word_idx] = unkown_token#TODO
                sentence = " ".join(word for word in words)
                sentences[idx] = sentence_start_token + ' ' + sentence + ' ' + sentence_end_token + '\n'
            line_count += len(sentences)
            output_file.writelines(sentences)
        output_file.close()
        print("line_count", line_count)

def train_word2vec_model(dataset_dir, save_model_file, word_min_count, word2vec_size):
    print('train_word2vec_model...')
    word2vec_model = gensim.models.Word2Vec(min_count = word_min_count, size = word2vec_size)
    train_sentences = MySentences(dataset_dir)
    word2vec_model.build_vocab(train_sentences)
    sentences = MySentences(dataset_dir)
    word2vec_model.train(sentences)
    word2vec_model.save(save_model_file)
    return word2vec_model

def load_existing_word2vec_model(model_file_path):
    model =None
    if os.path.exists(model_file_path):
        print("load existing model...")
        model = gensim.models.Word2Vec.load(model_file_path)
    return model

def generate_rnn_train_files(input_dir, output_dir, fixed_sentence_len, unkown_token, sentence_start_token, sentence_end_token):
    print('generate_rnn_train_files...')
    make_dir_if_not_exist(output_dir)

    long_than_fixed_len_count = 0;
    total_sentence_count = 0;
    for fname in os.listdir(input_dir):
        output_file = open(os.path.join(output_dir, fname), 'w')
        for sentence in open(os.path.join(input_dir, fname)):
            sentence = sentence.strip('\n')
            total_sentence_count += 1
            words = sentence.split()
            len_of_sentence = len(words)
            if len_of_sentence > fixed_sentence_len:
                long_than_fixed_len_count += 1
                continue
            elif len_of_sentence < fixed_sentence_len:
                for i in range(0, fixed_sentence_len - len_of_sentence):
                    sentence = sentence + ' ' + sentence_end_token
            output_file.write(sentence + '\n')
        output_file.close()
    print ("sentence longer than fixed_len : %d / %d" %(long_than_fixed_len_count, total_sentence_count))

def train_rnn_model(dataset_dir, fixed_sentence_len, word2vec_size, word2vec_model):
    # build the model: a single LSTM
    print('Build RNN model...')
    rnn_model = Sequential()
    rnn_model.add(LSTM(128, input_shape=(fixed_sentence_len, word2vec_size)))
    rnn_model.add(Dense(word2vec_size))
    rnn_model.add(Activation('softmax'))

    optimizer = RMSprop(lr=0.01)
    rnn_model.compile(loss='categorical_crossentropy', optimizer=optimizer)
    
    print('Generating RNN train data...')
    X = [] #np.zeros((0, fixed_sentence_len, word2vec_size), dtype=np.float32)
    y = [] #np.zeros((0, word2vec_size), dtype=np.float32)
    sentences = MySentences(dataset_dir)
    for sentence in sentences:
        tmp_x = np.asarray([word2vec_model[w] for w in sentence[:-1]])
        tmp_y = np.asarray([word2vec_model[w] for w in sentence[1:]])
        tmp_x = np.zeros((fixed_sentence_len, word2vec_size), dtype=np.float32)
        for idx, word in enumerate(sentence):
            tmp_x[idx] = word2vec_model[word]
            X.append()
    # X, y = generate_rnn_train_data()
    print(X)
    print(y)
    print('Generate RNN train data end!')

    # rnn_model.fit()
    print('Build RNN model over!')

    return rnn_model

class Config:
    WORD2VEC_MODE_FILE = "./word2vec_model.model"
    ORIGINAL_TRAIN_DATASET_DIR = "./small_train_text"
    WORD2VEC_TRAIN_DATASET_DIR = "./small_word2vec_train_text"
    RNN_TRAIN_DATASET_DIR = "./small_rnn_train_text"
    SENTENCE_START_TOKEN = "SENTENCE_START_TOKEN"
    SENTENCE_END_TOKEN = "SENTENCE_END_TOKEN"
    UNKNOWN_TOKEN = "UNKNOWN_TOKEN"
    FIXED_SENTENCE_LEN = 30
    MIN_COUNT = 2;
    WORD2VEC_SIZE = 20;

def make_dir_if_not_exist(dirpath):
    if not os.path.exists(dirpath):
        os.mkdir(dirpath)

def main():

    # word2vec train
    word2vec_model = load_existing_word2vec_model(Config.WORD2VEC_MODE_FILE)

    if word2vec_model == None:
        generate_word2vec_train_files(
            Config.ORIGINAL_TRAIN_DATASET_DIR, Config.WORD2VEC_TRAIN_DATASET_DIR,
            Config.SENTENCE_START_TOKEN, Config.SENTENCE_END_TOKEN, Config.UNKNOWN_TOKEN, Config.MIN_COUNT, Config.WORD2VEC_SIZE)

        word2vec_model = train_word2vec_model(Config.WORD2VEC_TRAIN_DATASET_DIR, Config.WORD2VEC_MODE_FILE, Config.MIN_COUNT, Config.WORD2VEC_SIZE)

    # rnn train
    generate_rnn_train_files(
        Config.WORD2VEC_TRAIN_DATASET_DIR, Config.RNN_TRAIN_DATASET_DIR,
        Config.FIXED_SENTENCE_LEN, Config.UNKNOWN_TOKEN,
        Config.SENTENCE_START_TOKEN, Config.SENTENCE_END_TOKEN)

    rnn_model = train_rnn_model(Config.RNN_TRAIN_DATASET_DIR, Config.FIXED_SENTENCE_LEN, Config.WORD2VEC_SIZE, word2vec_model)

main()

# if __name__ == "__main__":
#     main()
# 

此代码还未完成,将来我再抽空将它完成,这里只是给一个思路。

更多代码参考github

后记

目前利用基于RNN的文本生成算法能够生成通顺的句子,但是不能用来创作文章,因此RNN这种暴力模仿的文本生成算法终究不是解决之道,这个方向还有更多工作要做。