Source code for ucas_dm.prediction_algorithms.content_based_algo
from .base_algo import BaseAlgo
import numpy as np
import pandas as pd
import faiss
import multiprocessing as mp
from multiprocessing import cpu_count
[docs]class ContentBasedAlgo(BaseAlgo):
"""
Content-based prediction algorithm
"""
[docs] def __init__(self, item_vector, dimension):
"""
:param item_vector: Should be a pd.DataFrame contains item_id(integer) and its vector([float]) | id | vector |
:param dimension: Vector's dimensions.
"""
super().__init__()
self._dimension = dimension
self._item_vector = pd.DataFrame(item_vector)
self._item_vector.columns = ['item_id', 'vec']
self._retrieval_model = self._generate_retrieval_model()
self._user_vector = {}
self._user_log = None
[docs] def train(self, train_set):
"""
Main job is calculating user model for every user. Use multi-process to speed up the training.
See :meth:`BaseAlog.train <base_algo.BaseAlgo.train>` for more details.
"""
class TrainJob(mp.Process):
def __init__(self, func, result_list, *args):
super().__init__()
self.func = func
self.args = args
self.res = result_list
def run(self):
self.res.append(self.func(*self.args))
self._user_log = pd.DataFrame(train_set)
self._user_log.columns = ['user_id', 'item_id']
self._user_log.drop_duplicates(inplace=True)
'''Calculate user model'''
manager = mp.Manager()
res_list = manager.list()
user_ids = self._user_log['user_id'].drop_duplicates().values.tolist()
part = 3
cpus = cpu_count()
job_list = []
jobs = int(cpus / part) # Use 1/3 of the cpus
if jobs <= 0:
jobs = 1
part_ids_num = int((len(user_ids) + jobs - 1) / jobs)
for i in range(jobs):
part_ids = user_ids[i * part_ids_num:i * part_ids_num + part_ids_num]
j = TrainJob(self._build_user_model, res_list, part_ids)
job_list.append(j)
j.start()
for job in job_list:
job.join()
for ids_dict in res_list:
for key in ids_dict.keys():
self._user_vector[key] = ids_dict[key]
return self
[docs] def top_k_recommend(self, u_id, k):
"""
See :meth:`BaseAlog.top_k_recommend <base_algo.BaseAlgo.top_k_recommend>` for more details.
"""
if self._retrieval_model is None:
raise RuntimeError('Run method train() first.')
specific_user_log = self._user_log[self._user_log['user_id'] == u_id]
viewed_num = specific_user_log.shape[0]
assert (viewed_num != 0), "User id doesn't exist."
specific_user_vec = self._user_vector[u_id]
normal_specific_user_vec = ContentBasedAlgo._vector_normalize(np.array([specific_user_vec]).astype('float32'))
''' k+viewed_num make sure that we have at least k unseen items '''
distance, index = self._retrieval_model.search(normal_specific_user_vec, k + viewed_num)
item_res = self._item_vector.loc[index[0]]
res = pd.DataFrame({'dis': distance[0], 'item_id': item_res['item_id']})
res = res[~res['item_id'].isin(specific_user_log['item_id'])]
res = res[:k]
''' return top-k smallest cosine distance and the ids of the items which hold that distance. '''
return res['dis'].values.tolist(), res['item_id'].values.tolist()
[docs] @classmethod
def load(cls, fname):
"""
See :meth:`BaseAlog.load <base_algo.BaseAlgo.load>` for more details.
"""
res = super(ContentBasedAlgo, cls).load(fname)
assert (hasattr(res, '_retrieval_model')), 'Not a standard ContentBasedAlgo class.'
setattr(res, '_retrieval_model', faiss.read_index(fname + ".retrieval"))
return res
[docs] def save(self, fname, ignore=None):
"""
See :meth:`BaseAlog.save <base_algo.BaseAlgo.save>` for more details.
"""
if ignore is None:
ignore = []
ignore.append('_retrieval_model')
faiss.write_index(self._retrieval_model, fname + ".retrieval")
super().save(fname, ignore)
[docs] def _generate_retrieval_model(self):
"""
Use the retrieval model(faiss) to speed up the vector indexing
:return: Ready-to-work retrieval model from faiss
"""
real_vecs = self._item_vector['vec'].values.tolist()
item_vector_array = np.array(real_vecs)
item_vector_array = ContentBasedAlgo._vector_normalize(item_vector_array.astype('float32'))
retrieval_model = faiss.IndexFlatIP(self._dimension)
retrieval_model.add(item_vector_array)
return retrieval_model
[docs] def _build_user_model(self, user_ids):
"""
This method will calculate user model for all users in user_ids.
:param user_ids: users' id list
:return: A dict contains user's id and vector.
"""
res_dict = {}
for user_id in user_ids:
specific_user_log = self._user_log[self._user_log['user_id'] == user_id]
log_vecs = pd.merge(specific_user_log, self._item_vector, how='left', on=['item_id'])
assert (sum(log_vecs['vec'].notnull()) == log_vecs.shape[0]), 'Item vector sheet has null values'
res_dict[user_id] = ContentBasedAlgo._calc_dim_average(np.array(log_vecs['vec'].values.tolist()))
return res_dict
[docs] def to_dict(self):
pass
[docs] @staticmethod
def _calc_dim_average(vectors_array):
"""
This func calculate the average value on every dimension of vectors_array, but it only count none-zero values.
:param vectors_array: np.array contains a list of vectors.
:return: A vector has the average value in every dimension.
"""
array = np.array(vectors_array)
threshold = 0.001
res = array.sum(axis=0, dtype='float32')
valid_count = (array > threshold).sum(axis=0)
valid_count[valid_count == 0] = 1
res /= valid_count
return res
[docs] @staticmethod
def _vector_normalize(vectors_array):
vector_len_list = np.sqrt((vectors_array ** 2).sum(axis=1, keepdims=True))
# handle all-zero vectors
vector_len_list[vector_len_list == 0] = 1
res = vectors_array / vector_len_list
return res