tensorflow(三)-学习笔记整理
多分类图像识别案例CIFAR-10CIFAR-10数据集由10个类别的60000 32x32彩色图像组成,每个类别有6000张图像。有50000个训练图像和10000个测试图像。数据集分为五个训练集和一个测试集,每个集有10000个图像。测试集包含来自每个类的正好1000个随机选择的图像。训练集的每个类别5000个图像。图像类别如下:数据集下载里面有很多种版本,这里下载 CIFAR-10二进制版本
多分类图像识别案例
CIFAR-10
CIFAR-10数据集由10个类别的60000 32x32彩色图像组成,每个类别有6000张图像。有50000个训练图像和10000个测试图像。数据集分为五个训练集和一个测试集,每个集有10000个图像。测试集包含来自每个类的正好1000个随机选择的图像。训练集的每个类别5000个图像。图像类别如下:
数据集下载
里面有很多种版本,这里下载 CIFAR-10二进制版本。
二进制版本格式
二进制版本包含文件data_batch_1.bin,data_batch_2.bin,data_batch_4.bin,data_batch_5.bin以及test_batch.bin。这些文件的格式如下:
<1 x label> <3072 x像素>
...
<1 x label> <3072 x像素>
第一个字节是第一个图像的标签,它是0-9范围内的数字。接下来的3072个字节是图像像素的值。前1024个字节是红色通道值,接下来是1024个绿色,最后1024个是蓝色。
所以每个文件包含10000个这样的3073字节的“行”的图像,还有一个名为batches.meta.txt的文件。这是一个ASCII文件,将范围为0-9的数字标签映射到有意义的类名。
图片信息的读取与写入
二进制文件的读取
使用tf.FixedLengthRecordReader去读取,将其保存到TFRecords文件当中,以这种文件格式保存当作模型训练数据的来源
在此设计一个CifarRead类去完成。将会初始化每个图片的大小数据
def __init__(self, filelist=None):
# 文件列表
self.filelist = filelist
# 每张图片大小数据初始化
self.height = 32
self.width = 32
self.channel = 3
self.label_bytes = 1
self.image_bytes = self.height * self.width * self.channel
self.bytes = self.image_bytes + self.label_bytes
读取代码:
def read_decode(self):
"""
读取数据并转换成张量
:return: 图片数据,标签值
"""
# 1、构造文件队列
file_queue = tf.train.string_input_producer(self.filelist)
# 2、构造二进制文件的阅读器,解码成张量
reader = tf.FixedLengthRecordReader(self.bytes)
key, value = reader.read(file_queue)
# 解码成张量
image_label = tf.decode_raw(value, tf.uint8)
# 分割标签与数据
label_tensor = tf.cast(tf.slice(image_label, [0], [self.label_bytes]), tf.int32)
image = tf.slice(image_label, [self.label_bytes], [self.image_bytes])
print(image)
# 3、图片数据格式转换
image_tensor = tf.reshape(image, [self.height, self.width, self.channel])
# 4、图片数据批处理,一次从二进制文件中读取多少数据出来
image_batch, label_batch = tf.train.batch([image_tensor, label_tensor], batch_size=5000, num_threads=1, capacity=50000)
return image_batch, label_batch
保存和读取TFRecords文件当中
def write_to_tfrecords(self, image_batch, label_batch):
"""
把读取出来的数据进行存储(tfrecords)
:param image_batch: 图片RGB值
:param label_batch: 图片标签
:return:
"""
# 1、构造存储器
writer = tf.python_io.TFRecordWriter(FLAGS.image_dir)
# 2、每张图片进行example协议化,存储
for i in range(5000):
print(i)
# 图片的张量要转换成字符串才能写进去,否则大小格式不对
image = image_batch[i].eval().tostring()
# 标签值
label = int(label_batch[i].eval())
# 构造example协议快,存进去的名字是提供给取的时候使用
example = tf.train.Example(features=tf.train.Features(feature={
"image": tf.train.Feature(bytes_list=tf.train.BytesList(value=[image])),
"label": tf.train.Feature(int64_list=tf.train.Int64List(value=[label]))
}))
writer.write(example.SerializeToString())
return None
def read_tfrecords(self):
# 1、构建文件的队列
file_queue = tf.train.string_input_producer([FLAGS.image_dir])
# 2、构建tfrecords文件阅读器
reader = tf.TFRecordReader()
key, value = reader.read(file_queue)
# 3、解析example协议块,返回字典数据,feature["image"],feature["label"]
feature = tf.parse_single_example(value, features={
"image": tf.FixedLenFeature([], tf.string),
"label": tf.FixedLenFeature([], tf.int64)
})
# 4、解码图片数据,标签数据不用
# 图片数据处理
image = tf.decode_raw(feature["image"], tf.uint8)
# 处理一下形状
image_reshape = tf.reshape(image, [self.height, self.width, self.channel])
# 改变数据类型
image_tensor = tf.cast(image_reshape, tf.float32)
# 标签数据处理
label_tensor = tf.cast(feature["label"], tf.int32)
# 批处理图片数据,训练数据每批次读取多少
image_batch, label_batch = tf.train.batch([image_tensor, label_tensor], batch_size=10, num_threads=1, capacity=10)
return image_batch, label_batch
将数据读取的代码放入cifar_data.py文件当中,当作原始数据读取,完整代码如下
import tensorflow as tf
import os
"""用于获取Cifar TFRecords数据文件的程序"""
FLAGS = tf.app.flags.FLAGS
tf.app.flags.DEFINE_string("image_dir", "./cifar10.tfrecords","数据文件目录")
class CifarRead(object):
def __init__(self, filelist=None):
# 文件列表
self.filelist = filelist
# 每张图片大小数据初始化
self.height = 32
self.width = 32
self.channel = 3
self.label_bytes = 1
self.image_bytes = self.height * self.width * self.channel
self.bytes = self.image_bytes + self.label_bytes
def read_decode(self):
"""
读取数据并转换成张量
:return: 图片数据,标签值
"""
# 1、构造文件队列
file_queue = tf.train.string_input_producer(self.filelist)
# 2、构造二进制文件的阅读器,解码成张量
reader = tf.FixedLengthRecordReader(self.bytes)
key, value = reader.read(file_queue)
# 解码成张量
image_label = tf.decode_raw(value, tf.uint8)
# 分割标签与数据
label_tensor = tf.cast(tf.slice(image_label, [0], [self.label_bytes]), tf.int32)
image = tf.slice(image_label, [self.label_bytes], [self.image_bytes])
print(image)
# 3、图片数据格式转换
image_tensor = tf.reshape(image, [self.height, self.width, self.channel])
# 4、图片数据批处理
image_batch, label_batch = tf.train.batch([image_tensor, label_tensor], batch_size=5000, num_threads=1, capacity=50000)
return image_batch, label_batch
def write_to_tfrecords(self, image_batch, label_batch):
"""
把读取出来的数据进行存储(tfrecords)
:param image_batch: 图片RGB值
:param label_batch: 图片标签
:return:
"""
# 1、构造存储器
writer = tf.python_io.TFRecordWriter(FLAGS.image_dir)
# 2、每张图片进行example协议化,存储
for i in range(5000):
print(i)
# 图片的张量要转换成字符串才能写进去,否则大小格式不对
image = image_batch[i].eval().tostring()
# 标签值
label = int(label_batch[i].eval())
# 构造example协议快,存进去的名字是提供给取的时候使用
example = tf.train.Example(features=tf.train.Features(feature={
"image": tf.train.Feature(bytes_list=tf.train.BytesList(value=[image])),
"label": tf.train.Feature(int64_list=tf.train.Int64List(value=[label]))
}))
writer.write(example.SerializeToString())
return None
def read_tfrecords(self):
# 1、构建文件的队列
file_queue = tf.train.string_input_producer([FLAGS.image_dir])
# 2、构建tfrecords文件阅读器
reader = tf.TFRecordReader()
key, value = reader.read(file_queue)
# 3、解析example协议块,返回字典数据,feature["image"],feature["label"]
feature = tf.parse_single_example(value, features={
"image": tf.FixedLenFeature([], tf.string),
"label": tf.FixedLenFeature([], tf.int64)
})
# 4、解码图片数据,标签数据不用
# 图片数据处理
image = tf.decode_raw(feature["image"], tf.uint8)
# 处理一下形状
image_reshape = tf.reshape(image, [self.height, self.width, self.channel])
# 改变数据类型
image_tensor = tf.cast(image_reshape, tf.float32)
# 标签数据处理
label_tensor = tf.cast(feature["label"], tf.int32)
# 批处理图片数据
image_batch, label_batch = tf.train.batch([image_tensor, label_tensor], batch_size=10, num_threads=1, capacity=10)
return image_batch, label_batch
if __name__ == "__main__":
# 生成文件名列表(路径+文件名)
filename = os.listdir("./cifar10/cifar-10-batches-bin")
filelist = [os.path.join("./cifar10/cifar-10-batches-bin", file) for file in filename if file[-3:] == "bin"]
# 实例化
cfr = CifarRead(filelist)
# 生成张量
image_batch, label_batch = cfr.read_decode()
# image_batch, label_batch = cfr.read_tfrecords()
# 会话
with tf.Session() as sess:
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(sess=sess, coord=coord, start=True)
print(sess.run([image_batch, label_batch]))
# 写进tfrecords文件
cfr.write_to_tfrecords(image_batch, label_batch)
coord.request_stop()
coord.join(threads)
模型接口的建立
将模型接口都放在cifar_omdel.py文件当中,设计了四个函数,input()作为从cifar_data文件中数据的获取,inference()作为神经网络模型的建立,total_loss()计算模型的损失,train()来通过梯度下降训练减少损失
input代码
def input():
"""
获取输入数据
:return: image,label
"""
# 实例化
cfr = cifar_data.CifarRead()
# 生成张量
image_batch, lab_batch = cfr.read_tfrecords()
# 将目标值转换为one-hot编码格式
label = tf.one_hot(label_batch, depth=10, on_value=1.0)
return image_batch, label, label_batch
inference代码
在这里使用的卷积神经网络模型,需要修改图像的通道数以及经过两次卷积池化变换后的图像大小。
def inference(image_batch):
"""
得到模型的输出
:return: 预测概率输出以及占位符
"""
# 1、数据占位符建立
with tf.variable_scope("data"):
# 样本标签值
# y_label = tf.placeholder(tf.float32, [None, 10])
# 样本特征值
# x = tf.placeholder(tf.float32, [None, IMAGE_HEIGHT * IMAGE_WIDTH * IMAGE_DEPTH])
# 改变形状,以提供给卷积层使用
x_image = tf.reshape(image_batch, [-1, 32, 32, 3])
# 2、卷积池化第一层
with tf.variable_scope("conv1"):
# 构建权重, 5*5, 3个输入通道,32个输出通道
w_conv1 = weight_variable([5, 5, 3, 32])
# 构建偏置, 个数位输出通道数
b_conv1 = bias_variable([32])
# 进行卷积,激活,指定滑动窗口,填充类型
y_relu1 = tf.nn.relu(tf.nn.conv2d(x_image, w_conv1, strides=[1, 1, 1, 1], padding="SAME") + b_conv1)
y_conv1 = tf.nn.max_pool(y_relu1, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='SAME')
# 3、卷积池化第二层
with tf.variable_scope("conv_pool2"):
# 构建权重, 5*5, 一个输入通道,32个输出通道
w_conv2 = weight_variable([5, 5, 32, 64])
# 构建偏置, 个数位输出通道数
b_conv2 = bias_variable([64])
# 进行卷积,激活,指定滑动窗口,填充类型
y_relu2 = tf.nn.relu(tf.nn.conv2d(y_conv1, w_conv2, strides=[1, 1, 1, 1], padding="SAME") + b_conv2)
y_conv2 = tf.nn.max_pool(y_relu2, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='SAME')
# 4、全连接第一层
with tf.variable_scope("FC1"):
# 构建权重,[7*7*64, 1024],根据前面的卷积池化后一步步计算的大小变换是32->16->8
w_fc1 = weight_variable([8 * 8 * 64, 1024])
# 构建偏置,个数位第一次全连接层输出个数
b_fc1 = bias_variable([1024])
y_reshape = tf.reshape(y_conv2, [-1, 8 * 8 * 64])
# 全连接结果激活
y_fc1 = tf.nn.relu(tf.matmul(y_reshape, w_fc1) + b_fc1)
# 5、全连接第二层
with tf.variable_scope("FC2"):
# droupout层
droup = tf.nn.dropout(y_fc1, 1.0)
# 构建权重,[1024, 10]
w_fc2 = weight_variable([1024, 10])
# 构建偏置 [10]
b_fc2 = bias_variable([10])
# 最后的全连接层
y_logit = tf.matmul(droup, w_fc2) + b_fc2
return y_logit
total_loss代码
def total_loss(y_label, y_logit):
"""
计算训练损失
:param y_label: 目标值
:param y_logit: 计算值
:return: 损失
"""
with tf.variable_scope("loss"):
# softmax回归,以及计算交叉损失熵
cross_entropy = tf.nn.softmax_cross_entropy_with_logits(labels=y_label, logits=y_logit)
# 计算损失平均值
loss = tf.reduce_mean(cross_entropy)
return loss
train代码
def train(loss, y_label, y_logit, global_step):
"""
训练数据得出准确率
:param loss: 损失大小
:return:
"""
with tf.variable_scope("train"):
# 让学习率根据步伐,自动变换学习率,指定了每10步衰减基数为0.99,0.001为初始的学习率
lr = tf.train.exponential_decay(0.001,
global_step,
10,
0.99,
staircase=True)
# 优化器
train_op = tf.train.GradientDescentOptimizer(lr).minimize(loss, global_step=global_step)
# 计算准确率
equal_list = tf.equal(tf.argmax(y_logit, 1), tf.argmax(y_label, 1))
accuracy = tf.reduce_mean(tf.cast(equal_list, tf.float32))
return train_op, accuracy
完整代码
import tensorflow as tf
import os
import cifar_data
#
#
from tensorflow.examples.tutorials.mnist import input_data
IMAGE_HEIGHT = 32
IMAGE_WIDTH = 32
IMAGE_DEPTH = 3
# 按照指定形状构建权重变量
def weight_variable(shape):
init = tf.truncated_normal(shape=shape, mean=0.0, stddev=1.0, dtype=tf.float32)
weight = tf.Variable(init)
return weight
# 按照制定形状构建偏置变量
def bias_variable(shape):
bias = tf.constant([1.0], shape=shape)
return tf.Variable(bias)
def inference(image_batch):
"""
得到模型的输出
:return: 预测概率输出以及占位符
"""
# 1、数据占位符建立
with tf.variable_scope("data"):
# 样本标签值
# y_label = tf.placeholder(tf.float32, [None, 10])
# 样本特征值
# x = tf.placeholder(tf.float32, [None, IMAGE_HEIGHT * IMAGE_WIDTH * IMAGE_DEPTH])
# 改变形状,以提供给卷积层使用
x_image = tf.reshape(image_batch, [-1, 32, 32, 3])
# 2、卷积池化第一层
with tf.variable_scope("conv1"):
# 构建权重, 5*5, 3个输入通道,32个输出通道
w_conv1 = weight_variable([5, 5, 3, 32])
# 构建偏置, 个数位输出通道数
b_conv1 = bias_variable([32])
# 进行卷积,激活,指定滑动窗口,填充类型
y_relu1 = tf.nn.relu(tf.nn.conv2d(x_image, w_conv1, strides=[1, 1, 1, 1], padding="SAME") + b_conv1)
y_conv1 = tf.nn.max_pool(y_relu1, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='SAME')
# 3、卷积池化第二层
with tf.variable_scope("conv_pool2"):
# 构建权重, 5*5, 一个输入通道,32个输出通道
w_conv2 = weight_variable([5, 5, 32, 64])
# 构建偏置, 个数位输出通道数
b_conv2 = bias_variable([64])
# 进行卷积,激活,指定滑动窗口,填充类型
y_relu2 = tf.nn.relu(tf.nn.conv2d(y_conv1, w_conv2, strides=[1, 1, 1, 1], padding="SAME") + b_conv2)
y_conv2 = tf.nn.max_pool(y_relu2, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='SAME')
# 4、全连接第一层
with tf.variable_scope("FC1"):
# 构建权重,[7*7*64, 1024],根据前面的卷积池化后一步步计算的大小变换是32->16->8
w_fc1 = weight_variable([8 * 8 * 64, 1024])
# 构建偏置,个数位第一次全连接层输出个数
b_fc1 = bias_variable([1024])
y_reshape = tf.reshape(y_conv2, [-1, 8 * 8 * 64])
# 全连接结果激活
y_fc1 = tf.nn.relu(tf.matmul(y_reshape, w_fc1) + b_fc1)
# 5、全连接第二层
with tf.variable_scope("FC2"):
# droupout层
droup = tf.nn.dropout(y_fc1, 1.0)
# 构建权重,[1024, 10]
w_fc2 = weight_variable([1024, 10])
# 构建偏置 [10]
b_fc2 = bias_variable([10])
# 最后的全连接层
y_logit = tf.matmul(droup, w_fc2) + b_fc2
return y_logit
def total_loss(y_label, y_logit):
"""
计算训练损失
:param y_label: 目标值
:param y_logit: 计算值
:return: 损失
"""
with tf.variable_scope("loss"):
# 将y_label转换为one-hot编码形式
# y_onehot = tf.one_hot(y_label, depth=10, on_value=1.0)
# softmax回归,以及计算交叉损失熵
cross_entropy = tf.nn.softmax_cross_entropy_with_logits(labels=y_label, logits=y_logit)
# 计算损失平均值
loss = tf.reduce_mean(cross_entropy)
return loss
def train(loss, y_label, y_logit, global_step):
"""
训练数据得出准确率
:param loss: 损失大小
:return:
"""
with tf.variable_scope("train"):
# 让学习率根据步伐,自动变换学习率,指定了每10步衰减基数为0.99,0.001为初始的学习率
lr = tf.train.exponential_decay(0.001,
global_step,
10,
0.99,
staircase=True)
# 优化器
train_op = tf.train.GradientDescentOptimizer(lr).minimize(loss, global_step=global_step)
# 计算准确率
equal_list = tf.equal(tf.argmax(y_logit, 1), tf.argmax(y_label, 1))
accuracy = tf.reduce_mean(tf.cast(equal_list, tf.float32))
return train_op, accuracy
def input():
"""
获取输入数据
:return: image,label
"""
# 实例化
cfr = cifar_data.CifarRead()
# 生成张量
image_batch, lab_batch = cfr.read_tfrecords()
# 将目标值转换为one-hot编码格式
label = tf.one_hot(label_batch, depth=10, on_value=1.0)
return image_batch, label, label_batch
训练以及高级会话函数
主训练逻辑
在cifar_train.py文件实现主要训练逻辑。在这里使用一个新的会话函数,叫tf.train.MonitoredTrainingSession
优点: 1、它自动的建立events文件、checkpoint文件,以记录重要的信息。 2、可以定义钩子函数,可以自定义每批次的训练信息,训练的限制等等
**注意:**在这个里面需要添加一个全局步数,这个步数是每批次训练的时候进行+1计数,内部使用.
import tensorflow as tf
import cifar_model
import time
from datetime import datetime
def train():
# 在图中进行训练
with tf.Graph().as_default():
# 定义全局步数,必须得使用这个,否则会出现StopCounterHook错误
global_step = tf.contrib.framework.get_or_create_global_step()
# 获取数据
image, label, label_1 = cifar_model.input()
# 通过模型进行类别预测
y_logit = cifar_model.inference(image)
# 计算损失
loss = cifar_model.total_loss(label, y_logit)
# 进行优化器减少损失
train_op, accuracy = cifar_model.train(loss, label, y_logit, global_step)
# 通过钩子定义模型输出
class _LoggerHook(tf.train.SessionRunHook):
"""Logs loss and runtime."""
def begin(self):
self._step = -1
self._start_time = time.time()
def before_run(self, run_context):
self._step += 1
return tf.train.SessionRunArgs(loss, float(accuracy.eval())) # Asks for loss value.
def after_run(self, run_context, run_values):
if self._step % 10 == 0:
current_time = time.time()
duration = current_time - self._start_time
self._start_time = current_time
loss_value = run_values.results
examples_per_sec = 10 * 10 / duration
sec_per_batch = float(duration / 10)
format_str = ('%s: step %d, loss = %.2f (%.1f examples/sec; %.3f '
'sec/batch)')
print(format_str % (datetime.now(), self._step, loss_value,
examples_per_sec, sec_per_batch))
with tf.train.MonitoredTrainingSession(
checkpoint_dir="./cifartrain/train",
hooks=[tf.train.StopAtStepHook(last_step=500),# 定义执行的训练轮数也就是max_step,超过了就会报错
tf.train.NanTensorHook(loss),
_LoggerHook()],
config=tf.ConfigProto(
log_device_placement=False)) as mon_sess:
while not mon_sess.should_stop():
mon_sess.run(train_op)
def main(argv):
train()
if __name__ == "__main__":
tf.app.run()
分布式Tensorflow
Tensorflow的一个特色就是分布式计算。分布式Tensorflow是由高性能的gRPC框架作为底层技术来支持的。这是一个通信框架gRPC(google remote procedure call),是一个高性能、跨平台的RPC框架。RPC协议,即远程过程调用协议,是指通过网络从远程计算机程序上请求服务。
分布式原理
Tensorflow分布式是由多个服务器进程和客户端进程组成。有几种部署方式,列如单机多卡和多机多卡(分布式)。
单机多卡
单机多卡是指单台服务器有多块GPU设备。假设一台机器上有4块GPU,单机多GPU的训练过程如下:
- 在单机单GPU的训练中,数据是一个batch一个batch的训练。 在单机多GPU中,数据一次处理4个batch(假设是4个GPU训练), 每个GPU处理一个batch的数据计算。
- 变量,或者说参数,保存在CPU上。数据由CPU分发给4个GPU,在GPU上完成计算,得到每个批次要更新的梯度
- 在CPU上收集完4个GPU上要更新的梯度,计算一下平均梯度,然后更新。
- 循环进行上面步骤
多机多卡(分布式)
而分布式是指有多台计算机,充分使用多台计算机的性能,处理数据的能力。可以根据不同计算机划分不同的工作节点。当数据量或者计算量达到超过一台计算机处理能力的上上限的话,必须使用分布式
分布式的架构
分布式架构的组成可以说是一个集群的组成方式。那么一般我们在进行Tensorflow分布式时,需要建立一个集群。通常是分布式的作业集合。一个作业中又包含了很多的任务(工作结点),每个任务由一个工作进程来执行。
节点之间的关系
一般来说,在分布式机器学习框架中,将作业分成参数作业(parameter job)和工作结点作业(worker job)。运行参数作业的服务器称为参数服务器(parameter server,PS),负责管理参数的存储和更新,工作结点作业负责主要从事计算的任务,如运行操作。
参数服务器,当模型越来越大时,模型的参数越来越多,多到一台机器的性能不够完成对模型参数的更新的时候,就需要把参数分开放到不同的机器去存储和更新。参数服务器可以是由多台机器组成的集群。工作节点是进行模型的计算的。Tensorflow的分布式实现了作业间的数据传输,也就是参数作业到工作结点作业的前向传播,以及工作节点到参数作业的反向传播。
分布式的模式
数据并行
数据并总的原理很简单。其中CPU主要负责梯度平均和参数更新,而GPU主要负责训练模型副本。
- 模型副本定义在GPU上
- 对于每一个GPU,都是从CPU获得数据,前向传播进行计算,得到损失,并计算出梯度
- CPU接到GPU的梯度,取平均值,然后进行梯度更新
每一个设备的计算速度不一样,有的快有的满,那么CPU在更新变量的时候,是应该等待每一个设备的一个batch进行完成,然后求和取平均来更新呢?还是让一部分先计算完的就先更新,后计算完的将前面的覆盖呢?这就由同步更新和异步更新的问题。
同步更新和异步更新
更新参数分为同步和异步两种方式,即异步随机梯度下降法(Async-SGD)和同步随机梯度下降法(Sync-SGD)
- 同步随即梯度下降法的含义是在进行训练时,每个节点的工作任务需要读入共享参数,执行并行的梯度计算,同步需要等待所有工作节点把局部的梯度算好,然后将所有共享参数进行合并、累加,再一次性更新到模型的参数;下一个批次中,所有工作节点拿到模型更新后的参数再进行训练。这种方案的优势是,每个训练批次都考虑了所有工作节点的训练情况,损失下降比较稳定;劣势是,性能瓶颈在于最慢的工作结点上。
- 异步随机梯度下降法的含义是每个工作结点上的任务独立计算局部梯度,并异步更新到模型的参数中,不需要执行协调和等待操作。这种方案的优势是,性能不存在瓶颈;劣势是,每个工作节点计算的梯度值发送回参数服务器会有参数更新的冲突,一定程度上会影响算法的收敛速度,在损失下降的过程中抖动较大。
分布式接口
创建集群的方法是为每一个任务启动一个服务,这些任务可以分布在不同的机器上,也可以同一台机器上启动多个任务,使用不同的GPU等来运行。每个任务都会创建完成一下工作
- 创建一个tf.train.ClusterSpec,用于对集群中的所有任务进行描述,该描述内容对所有任务应该是相同的
- 创建一个tf.train.Server,用于创建一个任务,并运行相应作业上的计算任务。
Tensorflow的分布式API使用如下:
- tf.train.ClusterSpec()
创建ClusterSpec,表示参与分布式TensorFlow计算的一组进程
cluster = tf.train.ClusterSpec({"worker": ["worker0.example.com:2222", /job:worker/task:0
"worker1.example.com:2222", /job:worker/task:1
"worker2.example.com:2222"], /job:worker/task:2
"ps": ["ps0.example.com:2222", /job:ps/task:0
"ps1.example.com:2222"]}) /job:ps/task:1
创建Tensorflow的集群描述信息,其中ps和worker为作业名称,通过指定ip地址加端口创建,
tf.train.Server(server_or_cluster_def, job_name=None, task_index=None, protocol=None, config=None, start=True)
- server_or_cluster_def: 集群描述
- job_name: 任务类型名称
- task_index: 任务数
创建一个服务(主节点或者工作节点服务),用于运行相应作业上的计算任务,运行的任务在task_index指定的机器上启动,例如在不同的ip+端口上启动两个工作任务
# 第一个任务
cluster = tf.train.ClusterSpec({"worker": ["localhost:2222","localhost:2223"]})
server = tf.train.Server(cluster, job_name="worker", task_index=0)
# 第二个任务
cluster = tf.train.ClusterSpec({"worker": ["localhost:2222","localhost:2223"]})
server = tf.train.Server(cluster, job_name="worker", task_index=1)
属性:target
- 返回tf.Session连接到此服务器的目标
方法:join()
- 参数服务器端等待接受参数任务,直到服务器关闭
tf.device(device_name_or_function)
工作人任务端的代码在指定的设备上执行张量运算,指定代码运行在CPU或者GPU上
with tf.device("/job:ps/task:0"):
weights = tf.Variable(...)
词向量-word2vec
图像和音频处理系统采用的是庞大的高维度数据集,对于图像数据来说,此类数据集会编码为单个原始像素强度的向量。不过,自然语言处理系统一直以来都将字词视为离散的原子符号,将字词表示为唯一的离散 ID 还会导致数据稀疏性,并且通常意味着需要更多数据才能成功训练统计模型。使用向量表示法可以扫除其中一些障碍。
- 计算相似度:寻找相似词、或者用于文章之间的相似度
- 文本生成、机器翻译等
词向量
定义:将文字通过一串数字向量表示
词的独热表示:One-hot Representation
- 采用稀疏方式 存储,简单易实现
- 灯泡:[0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0]、灯管:[0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0]
维度过大词汇鸿沟现象:任意两个词之间都是孤立的。光从这两个向量中看不出两个词是否有关系,哪怕”灯泡”和”灯管”这两个词是同义词也不行
词的分布式表示:Distributed representation
- 传统的独热表示( one-hot representation)仅仅将词符号化,不包含任何语义信息
- Distributed representation 最早由 Hinton在 1986 年提出。它是一种低维实数向量,这种向量一般长成这个样子: [0.792, −0.177, −0.107, 0.109, −0.542, …]
- 最大的贡献就是让相关或者相似的词,在距离上更接近了
词向量来源思想
统计语言模型: 统计语言模型把语言(词的序列)看作一个随机事件,并赋予相应的概率来描述其属于某种语言集合的可能性
例如:一个句子由w1,w2,w3,w4,w5,…….这些词组,使得P(w1,w2,w3,w4,w5……)概率大(可以从训练语料中得出)
- N-Gram(CBOW方式)
- 语言是一种序列,词与词之间并不是相互独立
- 一元模型(unigram model):假设某个出现的概率与前面所有词无关
- P(s) = P(w1)P(w2)P(w3)…P(w4)
- 二元模型(bigram model):假设某个出现的概率与前面一个词相关
- P(s) = P(w1)P(w2|w1)P(w3|w2)…P(w_i|w_i-1)
- 三元模型(trigram model):假设某个出现的概率与前面两个词相关
- P(s) = P(w1)P(w2|w1)P(w3|w1,w2)…P(w_i|w_i-2,w_i-1)
注:目前使用较多的是三元模型,由于训练语料限制,无法追求更大的N,并且N越大导致计算量越来越大
模型:CBOW模型、Skip-gram都是能够训练出词向量,cbow和skip-gram都是在word2vec中用于将文本进行向量表示的实现方法。
训练词向量的神经网络模型
神经概率语言模型一直以来都使用最大似然率 (ML) 原则进行训练,以最大限度地提高使用 softmax函数根据之前的字词 h(表示“历史”字词)正确预测出下一个字词 wt(表示“目标”字词)的概率。
通过一个三层神经网络得出,由约书亚.本吉奥(Yoshua Bengio)提出word2vec模型神经网络雏形
模型解释:
输入层:将context(w)每个词映射成一个长度为m的词向量(长度训练者指定),词向量在开始是随机的,也参与网络训练
使用随机初始化的方法建立一个|m|×N个词大小的查找表(lookup table)
context(w):可以称之为上下文窗口长度,类似N-gram取多少个词作为添加
投影层:将所有的上下文此项来给你拼接成一个长向量,作为目标w的特征向量。长度为m(n-1)
隐藏层:拼接后的向量会经过一个规模为h的隐藏层,论文中使用tanh
y = U\cdot tanh(Wx+b1) + b2y=U⋅tanh(Wx+b1)+b2
输出层:最后输出会通过softmax输出所有词个数大小比如N的大小概率分布
训练过程:
训练时,使用交叉熵作为损失函数,反向传播算法进行训练
当训练完成时,就得到了 N-gram 神经语言模型,以及副产品词向量
初始化的矩阵查找表是和神经网络的参数同时训练更新
通过窗口输入句子中的连续三个词,w1,w2,w3
- 输入网络中已是随机初始化的向量,如w1:[0,0,0,0,……,0],值的向量长度自定义,三个词向量,输入到网络中。(词向量作为参数)
- 目标值为原句子的后面一个w4,通过onehot编码定义
- 网络训练,网络参数更新,自动调整w1,w2,w3的向量值,达到经过最后的softmax(多分类概率),输出预测概率,与目标值计算损失
以下面的数据集为例
the quick brown fox jumped over the lazy dog
首先形成一个数据集,其中包含字词以及字词在其中出现的上下文。我们可以通过任何有意义的方式定义“上下文”,事实上人们研究了语法上下文(、目标左侧的字词、目标右侧的字词等。暂时我们使用 vanilla 定义,将“上下文”定义为目标字词左侧和右侧的字词窗口。使用大小为 1 的窗口,我们将获得以下数据集
([the, brown], quick), ([quick, fox], brown), ([brown, jumped], fox), ...
分布式词向量并不是word2vec的作者发明的,word2vec就是实现词向量的一种工具。具体实现通过cbow等方式
案例:IMDB电影评论文本的神经网络分类
目的:对IMDB电影评论数据进行训练,预测分类
步骤:
1、电影评论数据读取
2、模型输入特征列指定
3、模型训练与保存
电影评论数据读取
我们将要使用的数据集是 IMDB Large Movie Review Dataset,包含用于训练的 25000 段带有明显情感倾向的电影评论,测试集有 25000 段。我们将会用此数据集训练一个二分类模型,用于判断一篇评论是积极的还是消极的。
比如一个负面评论(2 颗星)的片段:
Now, I LOVE Italian horror films. The cheesier they are, the better. However, this is not cheesy Italian. This is week-old spaghetti sauce with rotting meatballs. It is amateur hour on every level. There is no suspense, no horror, with just a few drops of blood scattered around to remind you that you are in fact watching a horror film.
用 0 将所有句子补齐到相同长度(这里是 200),这样对于训练集和测试集就分别有一个两维的 25000×200 的数组。
# 指定总共多少不同的词,每个样本的序列长度最大多少
vocab_size = 5000
sentence_size = 200
imdb = keras.datasets.imdb
(x_train_variable, y_train), (x_test_variable, y_test) = imdb.load_data(num_words=vocab_size)
x_train = keras.preprocessing.sequence.pad_sequences(
x_train_variable,
maxlen=sentence_size,
padding='post',
value=0)
x_test = keras.preprocessing.sequence.pad_sequences(
x_test_variable,
maxlen=sentence_size,
padding='post',
value=0)
填充序列pad_sequences
keras.preprocessing.sequence.pad_sequences(sequences, maxlen=None, dtype='int32',
padding='pre', truncating='pre', value=0.)
将长为nb_samples的序列转化为形如(nb_samples,nb_timesteps)2D numpy array。如果提供了参数maxlen,nb_timesteps=maxlen,否则其值为最长序列的长度。其他短于该长度的序列都会在后部填充0以达到该长度。长于nb_timesteps的序列将会被截断,以使其匹配目标长度。padding和截断发生的位置分别取决于padding和truncating.
参数
sequences:浮点数或整数构成的两层嵌套列表
maxlen:None或整数,为序列的最大长度。大于此长度的序列将被截短,小于此长度的序列将在后部填0.
dtype:返回的numpy array的数据类型
padding:‘pre’或‘post’,确定当需要补0时,在序列的起始还是结尾补
truncating:‘pre’或‘post’,确定当需要截断序列时,从起始还是结尾截断
value:浮点数,此值将在填充时代替默认的填充值0
Input Functions的定义
def parser(x, y):
features = {"x": x}
return features, y
def train_input_fn():
dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
dataset = dataset.shuffle(buffer_size=len(x_train_variable))
dataset = dataset.batch(100)
dataset = dataset.map(parser)
dataset = dataset.repeat()
iterator = dataset.make_one_shot_iterator()
return iterator.get_next()
def eval_input_fn():
dataset = tf.data.Dataset.from_tensor_slices((x_test, y_test))
dataset = dataset.batch(100)
dataset = dataset.map(parser)
iterator = dataset.make_one_shot_iterator()
return iterator.get_next()
注:要在 input_fn 中使用 Dataset(input_fn 属于 tf.estimator.Estimator),只需返回 Dataset 即可,框架将负责创建和初始化迭代器。
模型输入特征列指定
指定特征列
column = tf.feature_column.categorical_column_with_identity('feature', vocab_size)
embedding_size = 50
word_embedding_column = tf.feature_column.embedding_column(
column, dimension=embedding_size
)
进行模型训练
指定模型的神经网络的神经元数量,以及几层;特征列;模型输出目录
classifier = tf.estimator.DNNClassifier(
hidden_units=[100],
feature_columns=[word_embedding_column],
model_dir='./tmp/embeddings'
)
classifier.train(input_fn=train_input_fn, steps=25000)
eval_results = classifier.evaluate(input_fn=eval_input_fn)
print(eval_results)
推荐算法
推荐模型构建流程
Data(数据)->Features(特征)->ML Algorithm(选择算法训练模型)->Prediction Output(预测输出)
- 数据清洗/数据处理
- 数据来源
- 显性数据
- Rating 打分
- Comments 评论/评价
- 隐形数据
- Order history 历史订单
- Cart events 加购物车
- Page views 页面浏览
- Click-thru 点击
- Search log 搜索记录
- 数据量/数据能否满足要求
- 显性数据
- 数据来源
- 特征工程
- 从数据中筛选特征
- 一个给定的商品,可能被拥有类似品味或需求的用户购买
- 使用用户行为数据描述商品
- 从数据中筛选特征
用数据表示特征
- 将所有用户行为合并在一起 ,形成一个user-item 矩阵
选择合适的算法 - 协同过滤
- 基于内容
产生推荐结果
对推荐结果进行评估,评估通过后上线
协同过滤推荐算法(Collaborative Filtering)
基本的协同过滤推荐算法基于以下假设:
- “跟你喜好相似的人喜欢的东西你也很有可能喜欢” :基于用户的协同过滤推荐(User-based CF)
- “跟你喜欢的东西相似的东西你也很有可能喜欢 ”:基于物品的协同过滤推荐(Item-based CF)
实现协同过滤推荐有以下几个步骤:
- 找出最相似的人或物品:TOP-N相似的人或物品
通过计算两两的相似度来进行排序,即可找出TOP-N相似的人或物品 - 根据相似的人或物品产生推荐结果
利用TOP-N结果生成初始推荐结果,然后过滤掉用户已经有过记录的物品或明确表示不感兴趣的物品
以下是一个简单的示例,数据集相当于一个用户对物品的购买记录表:打勾表示用户对物品的有购买记录
关于相似度计算这里先用一个简单的思想:如有两个同学X和Y,X同学爱好[足球、篮球、乒乓球],Y同学爱好[网球、足球、篮球、羽毛球],可见他们的共同爱好有2个,那么他们的相似度可以用:2/3 * 2/4 = 1/3 ≈ 0.33 来表示。
User-Based CF
Item-Based CF
相似度计算(Similarity Calculation)
- 相似度的计算方法
- 欧氏距离, 是一个欧式空间下度量距离的方法. 两个物体, 都在同一个空间下表示为两个点, 假如叫做p,q, 分别都是n个坐标, 那么欧式距离就是衡量这两个点之间的距离. 欧氏距离不适用于布尔向量之间
欧氏距离的值是一个非负数, 最大值正无穷, 通常计算相似度的结果希望是[-1,1]或[0,1]之间,一般可以使用
余弦相似度 - 度量的是两个向量之间的夹角, 用夹角的余弦值来度量相似的情况
- 两个向量的夹角为0是,余弦值为1, 当夹角为90度是余弦值为0,为180度是余弦值为-1
- 余弦相似度在度量文本相似度, 用户相似度 物品相似度的时候较为常用
- 余弦相似度的特点, 与向量长度无关,余弦相似度计算要对向量长度归一化, 两个向量只要方向一致,无论程度强弱, 都可以视为’相似’
皮尔逊相关系数Pearson - 实际上也是余弦相似度, 不过先对向量做了中心化, 向量a b各自减去向量的均值后, 再计算余弦相似度
- 皮尔逊相似度计算结果在-1,1之间 -1表示负相关, 1表示正相关
- 度量两个变量是不是同增同减
- 皮尔逊相关系数度量的是两个变量的变化趋势是否一致, 不适合计算布尔值向量之间的相关度
杰卡德相似度 Jaccard - 两个集合的交集元素个数在并集中所占的比例, 非常适用于布尔向量表示
- 分子是两个布尔向量做点积计算, 得到的就是交集元素的个数
- 分母是两个布尔向量做或运算, 再求元素和
如何选择余弦相似度 - 余弦相似度/皮尔逊相关系数适合用户评分数据(实数值),
- 杰卡德相似度适用于隐式反馈数据(0,1布尔值 是否收藏,是否点击,是否加购物车)
- 欧氏距离, 是一个欧式空间下度量距离的方法. 两个物体, 都在同一个空间下表示为两个点, 假如叫做p,q, 分别都是n个坐标, 那么欧式距离就是衡量这两个点之间的距离. 欧氏距离不适用于布尔向量之间
协同过滤推荐算法代码实现
- 构建数据集:
users = ["User1", "User2", "User3", "User4", "User5"]
items = ["Item A", "Item B", "Item C", "Item D", "Item E"]
# 构建数据集
datasets = [
["buy",None,"buy","buy",None],
["buy",None,None,"buy","buy"],
["buy",None,"buy",None,None],
[None,"buy",None,"buy","buy"],
["buy","buy","buy",None,"buy"],
]
- 计算时数据通常都需要对数据进行处理,或者编码,目的是为了便于我们对数据进行运算处理,比如这里是比较简单的情形,我们用1、0分别来表示用户的是否购买过该物品,则我们的数据集其实应该是这样的:
users = ["User1", "User2", "User3", "User4", "User5"]
items = ["Item A", "Item B", "Item C", "Item D", "Item E"]
# 用户购买记录数据集
datasets = [
[1,0,1,1,0],
[1,0,0,1,1],
[1,0,1,0,0],
[0,1,0,1,1],
[1,1,1,0,1],
]
import pandas as pd
df = pd.DataFrame(datasets,
columns=items,
index=users)
print(df)
- 进行相似度的计算,不过对于相似度的计算其实是有很多专门的相似度计算方法的,比如余弦相似度、皮尔逊相关系数、杰卡德相似度等等。这里选择使用杰卡德相似系数[0,1]
from sklearn.metrics import jaccard_similarity_score
# 直接计算某两项的杰卡德相似系数
# 计算Item A 和Item B的相似度
print(jaccard_similarity_score(df["Item A"], df["Item B"]))
# 计算所有的数据两两的杰卡德相似系数
from sklearn.metrics.pairwise import pairwise_distances
# 计算用户间相似度
user_similar = 1 - pairwise_distances(df, metric="jaccard")
user_similar = pd.DataFrame(user_similar, columns=users, index=users)
print("用户之间的两两相似度:")
print(user_similar)
# 计算物品间相似度
item_similar = 1 - pairwise_distances(df.T, metric="jaccard")
item_similar = pd.DataFrame(item_similar, columns=items, index=items)
print("物品之间的两两相似度:")
print(item_similar)
有了两两的相似度,接下来就可以筛选TOP-N相似结果,并进行推荐了
- User-Based CF
import pandas as pd
import numpy as np
from pprint import pprint
users = ["User1", "User2", "User3", "User4", "User5"]
items = ["Item A", "Item B", "Item C", "Item D", "Item E"]
# 用户购买记录数据集
datasets = [
[1,0,1,1,0],
[1,0,0,1,1],
[1,0,1,0,0],
[0,1,0,1,1],
[1,1,1,0,1],
]
df = pd.DataFrame(datasets,
columns=items,
index=users)
# 计算所有的数据两两的杰卡德相似系数
from sklearn.metrics.pairwise import pairwise_distances
# 计算用户间相似度 1-杰卡德距离=杰卡德相似度
user_similar = 1 - pairwise_distances(df, metric="jaccard")
user_similar = pd.DataFrame(user_similar, columns=users, index=users)
print("用户之间的两两相似度:")
print(user_similar)
topN_users = {}
# 遍历每一行数据
for i in user_similar.index:
# 取出每一列数据,并删除自身,然后排序数据
_df = user_similar.loc[i].drop([i])
#sort_values 排序 按照相似度降序排列
_df_sorted = _df.sort_values(ascending=False)
# 从排序之后的结果中切片 取出前两条(相似度最高的两个)
top2 = list(_df_sorted.index[:2])
topN_users[i] = top2
print("Top2相似用户:")
pprint(topN_users)
# 准备空白dict用来保存推荐结果
rs_results = {}
#遍历所有的最相似用户
for user, sim_users in topN_users.items():
rs_result = set() # 存储推荐结果
for sim_user in sim_users:
# 构建初始的推荐结果
rs_result = rs_result.union(set(df.ix[sim_user].replace(0,np.nan).dropna().index))
# 过滤掉已经购买过的物品
rs_result -= set(df.ix[user].replace(0,np.nan).dropna().index)
rs_results[user] = rs_result
print("最终推荐结果:")
pprint(rs_results)
- Item-Based CF
import pandas as pd
import numpy as np
from pprint import pprint
users = ["User1", "User2", "User3", "User4", "User5"]
items = ["Item A", "Item B", "Item C", "Item D", "Item E"]
# 用户购买记录数据集
datasets = [
[1,0,1,1,0],
[1,0,0,1,1],
[1,0,1,0,0],
[0,1,0,1,1],
[1,1,1,0,1],
]
df = pd.DataFrame(datasets,
columns=items,
index=users)
# 计算所有的数据两两的杰卡德相似系数
from sklearn.metrics.pairwise import pairwise_distances
# 计算物品间相似度
item_similar = 1 - pairwise_distances(df.T, metric="jaccard")
item_similar = pd.DataFrame(item_similar, columns=items, index=items)
print("物品之间的两两相似度:")
print(item_similar)
topN_items = {}
# 遍历每一行数据
for i in item_similar.index:
# 取出每一列数据,并删除自身,然后排序数据
_df = item_similar.loc[i].drop([i])
_df_sorted = _df.sort_values(ascending=False)
top2 = list(_df_sorted.index[:2])
topN_items[i] = top2
print("Top2相似物品:")
pprint(topN_items)
rs_results = {}
# 构建推荐结果
for user in df.index: # 遍历所有用户
rs_result = set()
for item in df.ix[user].replace(0,np.nan).dropna().index: # 取出每个用户当前已购物品列表
# 根据每个物品找出最相似的TOP-N物品,构建初始推荐结果
rs_result = rs_result.union(topN_items[item])
# 过滤掉用户已购的物品
rs_result -= set(df.ix[user].replace(0,np.nan).dropna().index)
# 添加到结果中
rs_results[user] = rs_result
print("最终推荐结果:")
pprint(rs_results)
关于协同过滤推荐算法使用的数据集
在前面的demo中,我们只是使用用户对物品的一个购买记录,类似也可以是比如浏览点击记录、收听记录等等。这样数据我们预测的结果其实相当于是在预测用户是否对某物品感兴趣,对于喜好程度不能很好的预测。
因此在协同过滤推荐算法中其实会更多的利用用户对物品的“评分”数据来进行预测,通过评分数据集,我们可以预测用户对于他没有评分过的物品的评分。其实现原理和思想和都是一样的,只是使用的数据集是用户-物品的评分数据。
关于用户-物品评分矩阵
用户-物品的评分矩阵,根据评分矩阵的稀疏程度会有不同的解决方案
- 稠密评分矩阵
- 稀疏评分矩阵
使用协同过滤推荐算法对用户进行评分预测
数据集:
目的:预测用户1对物品E的评分
构建数据集:注意这里构建评分数据时,对于缺失的部分我们需要保留为None,如果设置为0那么会被当作评分值为0去对待
users = ["User1", "User2", "User3", "User4", "User5"]
items = ["Item A", "Item B", "Item C", "Item D", "Item E"]
# 用户购买记录数据集
datasets = [
[5,3,4,4,None],
[3,1,2,3,3],
[4,3,4,3,5],
[3,3,1,5,4],
[1,5,5,2,1],
]
计算相似度:对于评分数据这里我们采用皮尔逊相关系数[-1,1]来计算,-1表示强负相关,+1表示强正相关
pandas中corr方法可直接用于计算皮尔逊相关系数
df = pd.DataFrame(datasets,
columns=items,
index=users)
print("用户之间的两两相似度:")
# 直接计算皮尔逊相关系数
# 默认是按列进行计算,因此如果计算用户间的相似度,当前需要进行转置
user_similar = df.T.corr()
print(user_similar.round(4))
print("物品之间的两两相似度:")
item_similar = df.corr()
print(item_similar.round(4))
运行结果:
用户之间的两两相似度:
User1 User2 User3 User4 User5
User1 1.0000 0.8528 0.7071 0.0000 -0.7921
User2 0.8528 1.0000 0.4677 0.4900 -0.9001
User3 0.7071 0.4677 1.0000 -0.1612 -0.4666
User4 0.0000 0.4900 -0.1612 1.0000 -0.6415
User5 -0.7921 -0.9001 -0.4666 -0.6415 1.0000
物品之间的两两相似度:
Item A Item B Item C Item D Item E
Item A 1.0000 -0.4767 -0.1231 0.5322 0.9695
Item B -0.4767 1.0000 0.6455 -0.3101 -0.4781
Item C -0.1231 0.6455 1.0000 -0.7206 -0.4276
Item D 0.5322 -0.3101 -0.7206 1.0000 0.5817
Item E 0.9695 -0.4781 -0.4276 0.5817 1.0000
可以看到与用户1最相似的是用户2和用户3;与物品A最相似的物品分别是物品E和物品D。
注意:我们在预测评分时,往往是通过与其有正相关的用户或物品进行预测,如果不存在正相关的情况,那么将无法做出预测。这一点尤其是在稀疏评分矩阵中尤为常见,因为稀疏评分矩阵中很难得出正相关系数。
评分预测:
User-Based CF 评分预测:使用用户间的相似度进行预测
关于评分预测的方法也有比较多的方案,下面介绍一种效果比较好的方案,该方案考虑了用户本身的评分评分以及近邻用户的加权平均相似度打分来进行预测
我们要预测用户1对物品E的评分,那么可以根据与用户1最近邻的用户2和用户3进行预测,计算如下:
最终预测出用户1对物品5的评分为3.91
Item-Based CF 评分预测:使用物品间的相似度进行预测
这里利用物品相似度预测的计算同上,同样考虑了用户自身的平均打分因素,结合预测物品与相似物品的加权平均相似度打分进行来进行预测
我们要预测用户1对物品E的评分,那么可以根据与物品E最近邻的物品A和物品D进行预测,计算如下:
对比可见,User-Based CF预测评分和Item-Based CF的评分结果也是存在差异的,因为严格意义上他们其实应当属于两种不同的推荐算法,各自在不同的领域不同场景下,都会比另一种的效果更佳,但具体哪一种更佳,必须经过合理的效果评估,因此在实现推荐系统时这两种算法往往都是需要去实现的,然后对产生的推荐效果进行评估分析选出更优方案。
案例–基于协同过滤的电影推荐
User-Based CF 预测电影评分
数据集下载
下载地址
建议下载ml-latest-small.zip,数据量小,便于我们单机使用和运行
加载ratings.csv,转换为用户-电影评分矩阵并计算用户之间相似度
import os
import pandas as pd
import numpy as np
DATA_PATH = "./datasets/ml-latest-small/ratings.csv"
dtype = {"userId": np.int32, "movieId": np.int32, "rating": np.float32}
# 加载数据,我们只用前三列数据,分别是用户ID,电影ID,已经用户对电影的对应评分
ratings = pd.read_csv(data_path, dtype=dtype, usecols=range(3))
# 透视表,将电影ID转换为列名称,转换成为一个User-Movie的评分矩阵
ratings_matrix = ratings.pivot_table(index=["userId"], columns=["movieId"],values="rating")
#计算用户之间相似度
user_similar = ratings_matrix.T.corr()
预测用户对物品的评分 (以用户1对电影1评分为例)
评分公式
# 1. 找出uid用户的相似用户
similar_users = user_similar[1].drop([1]).dropna()
# 相似用户筛选规则:正相关的用户
similar_users = similar_users.where(similar_users>0).dropna()
# 2. 从用户1的近邻相似用户中筛选出对物品1有评分记录的近邻用户
ids = set(ratings_matrix[1].dropna().index)&set(similar_users.index)
finally_similar_users = similar_users.ix[list(1)]
# 3. 结合uid用户与其近邻用户的相似度预测uid用户对iid物品的评分
numerator = 0 # 评分预测公式的分子部分的值
denominator = 0 # 评分预测公式的分母部分的值
for sim_uid, similarity in finally_similar_users.iteritems():
# 近邻用户的评分数据
sim_user_rated_movies = ratings_matrix.ix[sim_uid].dropna()
# 近邻用户对iid物品的评分
sim_user_rating_for_item = sim_user_rated_movies[1]
# 计算分子的值
numerator += similarity * sim_user_rating_for_item
# 计算分母的值
denominator += similarity
# 4 计算预测的评分值
predict_rating = numerator/denominator
print("预测出用户<%d>对电影<%d>的评分:%0.2f" % (1, 1, predict_rating))
封装成方法 预测任意用户对任意电影的评分
def predict(uid, iid, ratings_matrix, user_similar):
'''
预测给定用户对给定物品的评分值
:param uid: 用户ID
:param iid: 物品ID
:param ratings_matrix: 用户-物品评分矩阵
:param user_similar: 用户两两相似度矩阵
:return: 预测的评分值
'''
print("开始预测用户<%d>对电影<%d>的评分..."%(uid, iid))
# 1. 找出uid用户的相似用户
similar_users = user_similar[uid].drop([uid]).dropna()
# 相似用户筛选规则:正相关的用户
similar_users = similar_users.where(similar_users>0).dropna()
if similar_users.empty is True:
raise Exception("用户<%d>没有相似的用户" % uid)
# 2. 从uid用户的近邻相似用户中筛选出对iid物品有评分记录的近邻用户
ids = set(ratings_matrix[iid].dropna().index)&set(similar_users.index)
finally_similar_users = similar_users.ix[list(ids)]
# 3. 结合uid用户与其近邻用户的相似度预测uid用户对iid物品的评分
numerator = 0 # 评分预测公式的分子部分的值
denominator = 0 # 评分预测公式的分母部分的值
for sim_uid, similarity in finally_similar_users.iteritems():
# 近邻用户的评分数据
sim_user_rated_movies = ratings_matrix.ix[sim_uid].dropna()
# 近邻用户对iid物品的评分
sim_user_rating_for_item = sim_user_rated_movies[iid]
# 计算分子的值
numerator += similarity * sim_user_rating_for_item
# 计算分母的值
denominator += similarity
# 计算预测的评分值并返回
predict_rating = numerator/denominator
print("预测出用户<%d>对电影<%d>的评分:%0.2f" % (uid, iid, predict_rating))
return round(predict_rating, 2)
为某一用户预测所有电影评分
def predict_all(uid, ratings_matrix, user_similar):
'''
预测全部评分
:param uid: 用户id
:param ratings_matrix: 用户-物品打分矩阵
:param user_similar: 用户两两间的相似度
:return: 生成器,逐个返回预测评分
'''
# 准备要预测的物品的id列表
item_ids = ratings_matrix.columns
# 逐个预测
for iid in item_ids:
try:
rating = predict(uid, iid, ratings_matrix, user_similar)
except Exception as e:
print(e)
else:
yield uid, iid, rating
if __name__ == '__main__':
for i in predict_all(1, ratings_matrix, user_similar):
pass
根据评分为指定用户推荐topN个电影
def top_k_rs_result(k):
results = predict_all(1, ratings_matrix, user_similar)
return sorted(results, key=lambda x: x[2], reverse=True)[:k]
if __name__ == '__main__':
from pprint import pprint
result = top_k_rs_result(20)
pprint(result)
Item-Based CF 预测电影评分
加载ratings.csv,转换为用户-电影评分矩阵并计算用户之间相似度
import os
import pandas as pd
import numpy as np
DATA_PATH = "./datasets/ml-latest-small/ratings.csv"
dtype = {"userId": np.int32, "movieId": np.int32, "rating": np.float32}
# 加载数据,我们只用前三列数据,分别是用户ID,电影ID,已经用户对电影的对应评分
ratings = pd.read_csv(data_path, dtype=dtype, usecols=range(3))
# 透视表,将电影ID转换为列名称,转换成为一个User-Movie的评分矩阵
ratings_matrix = ratings.pivot_table(index=["userId"], columns=["movieId"],values="rating")
#计算用户之间相似度
item_similar = ratings_matrix.corr()
预测用户对物品的评分 (以用户1对电影1评分为例)
评分公式
# 1. 找出iid物品的相似物品
similar_items = item_similar[1].drop([1]).dropna()
# 相似物品筛选规则:正相关的物品
similar_items = similar_items.where(similar_items>0).dropna()
# 2. 从iid物品的近邻相似物品中筛选出uid用户评分过的物品
ids = set(ratings_matrix.ix[1].dropna().index)&set(similar_items.index)
finally_similar_items = similar_items.ix[list(ids)]
# 3. 结合iid物品与其相似物品的相似度和uid用户对其相似物品的评分,预测uid对iid的评分
numerator = 0 # 评分预测公式的分子部分的值
denominator = 0 # 评分预测公式的分母部分的值
for sim_iid, similarity in finally_similar_items.iteritems():
# 近邻物品的评分数据
sim_item_rated_movies = ratings_matrix[sim_iid].dropna()
# 1用户对相似物品物品的评分
sim_item_rating_from_user = sim_item_rated_movies[1]
# 计算分子的值
numerator += similarity * sim_item_rating_from_user
# 计算分母的值
denominator += similarity
# 计算预测的评分值并返回
predict_rating = sum_up/sum_down
print("预测出用户<%d>对电影<%d>的评分:%0.2f" % (uid, iid, predict_rating))
封装成方法 预测任意用户对任意电影的评分
def predict(uid, iid, ratings_matrix, user_similar):
'''
预测给定用户对给定物品的评分值
:param uid: 用户ID
:param iid: 物品ID
:param ratings_matrix: 用户-物品评分矩阵
:param user_similar: 用户两两相似度矩阵
:return: 预测的评分值
'''
print("开始预测用户<%d>对电影<%d>的评分..."%(uid, iid))
# 1. 找出uid用户的相似用户
similar_users = user_similar[uid].drop([uid]).dropna()
# 相似用户筛选规则:正相关的用户
similar_users = similar_users.where(similar_users>0).dropna()
if similar_users.empty is True:
raise Exception("用户<%d>没有相似的用户" % uid)
# 2. 从uid用户的近邻相似用户中筛选出对iid物品有评分记录的近邻用户
ids = set(ratings_matrix[iid].dropna().index)&set(similar_users.index)
finally_similar_users = similar_users.ix[list(ids)]
# 3. 结合uid用户与其近邻用户的相似度预测uid用户对iid物品的评分
numerator = 0 # 评分预测公式的分子部分的值
denominator = 0 # 评分预测公式的分母部分的值
for sim_uid, similarity in finally_similar_users.iteritems():
# 近邻用户的评分数据
sim_user_rated_movies = ratings_matrix.ix[sim_uid].dropna()
# 近邻用户对iid物品的评分
sim_user_rating_for_item = sim_user_rated_movies[iid]
# 计算分子的值
numerator += similarity * sim_user_rating_for_item
# 计算分母的值
denominator += similarity
# 计算预测的评分值并返回
predict_rating = numerator/denominator
print("预测出用户<%d>对电影<%d>的评分:%0.2f" % (uid, iid, predict_rating))
return round(predict_rating, 2)
为某一用户预测所有电影评分
def predict_all(uid, ratings_matrix, item_similar):
'''
预测全部评分
:param uid: 用户id
:param ratings_matrix: 用户-物品打分矩阵
:param item_similar: 物品两两间的相似度
:return: 生成器,逐个返回预测评分
'''
# 准备要预测的物品的id列表
item_ids = ratings_matrix.columns
# 逐个预测
for iid in item_ids:
try:
rating = predict(uid, iid, ratings_matrix, item_similar)
except Exception as e:
print(e)
else:
yield uid, iid, rating
if __name__ == '__main__':
for i in predict_all(1, ratings_matrix, item_similar):
pass
根据评分为指定用户推荐topN个电影
def top_k_rs_result(k):
results = predict_all(1, ratings_matrix, item_similar)
return sorted(results, key=lambda x: x[2], reverse=True)[:k]
if __name__ == '__main__':
from pprint import pprint
result = top_k_rs_result(20)
pprint(result)
推荐系统评估
评估数据来源显示反馈和隐式反馈
常用评估指标
• 准确性 • 信任度 • 满意度 • 实时性 • 覆盖率 • 鲁棒性 • 多样性 • 可扩展性 • 新颖性 • 商业⽬标 • 惊喜度 • ⽤户留存
-
准确性 (理论角度) Netflix 美国录像带租赁
- 评分预测
- RMSE MAE
- topN推荐
- 召回率 精准率
- 评分预测
-
准确性 (业务角度)
-
覆盖度
- 信息熵 对于推荐越大越好
- 覆盖率
-
多样性&新颖性&惊喜性
- 多样性:推荐列表中两两物品的不相似性。
- 新颖性:未曾关注的类别、作者;推荐结果的平均流⾏度
- 惊喜性:历史不相似(惊)但很满意(喜)
- 往往需要牺牲准确性
- 使⽤历史⾏为预测⽤户对某个物品的喜爱程度
- 系统过度强调实时性
-
Exploitation & Exploration 探索与利用问题
- Exploitation(开发 利用):选择现在可能最佳的⽅案
- Exploration(探测 搜索):选择现在不确定的⼀些⽅案,但未来可能会有⾼收益的⽅案
- 在做两类决策的过程中,不断更新对所有决策的不确定性的认知,优化 长期的⽬标
-
EE问题实践
- 兴趣扩展: 相似话题, 搭配推荐
- 人群算法: userCF 用户聚类
- 平衡个性化推荐和热门推荐比例
- 随机丢弃用户行为历史
- 随机扰动模型参数
-
EE可能带来的问题
- 探索伤害用户体验, 可能导致用户流失
- 探索带来的长期收益(留存率)评估周期长, KPI压力大
- 如何平衡实时兴趣和长期兴趣
- 如何平衡短期产品体验和长期系统生态
- 如何平衡大众口味和小众需求
推荐系统评估方法
- 评估方法
- 问卷调查: 成本高
- 离线评估:
- 只能在用户看到过的候选集上做评估, 且跟线上真实效果存在偏差
- 只能评估少数指标
- 速度快, 不损害用户体验
- 在线评估: 灰度发布 & A/B测试 50% 全量上线
- 实践: 离线评估和在线评估结合, 定期做问卷调查
更多推荐
所有评论(0)