Source code for ucas_dm.utils

from .prediction_algorithms.base_algo import BaseAlgo
import pandas as pd
import json
import multiprocessing as mp
import time
import os


[docs]class Evaluator: """ This class provide some methods to evaluate the performance of a recommend algorithm. Two measures are supported for now: Recall and Precision """
[docs] def __init__(self, data_set): """ :param data_set: User view log. """ self.__data_set = pd.DataFrame(data_set) self.__data_set.columns = ['user_id', 'item_id', 'view_time'] self.__data_set['view_time'] = pd.to_datetime(self.__data_set['view_time']) self.__data_set.drop_duplicates(inplace=True)
[docs] def evaluate(self, algo=BaseAlgo(), k=[], n_jobs=1, split_date='2000-1-1', debug=False, verbose=False, auto_log=False): """ :param algo: recommend algorithm :param k: list of integers represent the number of recommended items. :param n_jobs: The maximum number of evaluating in parallel. Use multi-thread to speed up the evaluating. :param split_date: on which date we split the log data into train and test. :param debug: if true, the evaluator will use 5000 instances in data set to run the test. :param verbose: whether to print the total time that evaluation cost. :param auto_log: if true, Evaluator will automatically save performance data to './performance.log' :return: average recall and precision """ assert (n_jobs > 0), 'n_jobs must be greater than 0.' if debug: data_set = self.__data_set[:5000] else: data_set = self.__data_set train_set = data_set[data_set['view_time'] < split_date][['user_id', 'item_id']] test_set = data_set[data_set['view_time'] >= split_date][['user_id', 'item_id']] res = [] start_time = time.time() algo.train(train_set) end_time1 = time.time() for _k in k: s_time = time.time() recall, precision = Evaluator._job_dispatch(algo, _k, train_set, test_set, n_jobs) e_time = time.time() res.append((recall, precision)) if verbose: print("Totally cost: %.1f(s)" % (e_time - s_time + end_time1 - start_time)) if auto_log: Evaluator._log_to_file(algo.to_dict(), recall=recall, precision=precision, k_recommend=_k, train_time=end_time1 - start_time, recommend_time=e_time - s_time, n_jobs=n_jobs, debug=debug) return res
[docs] @staticmethod def _log_to_file(algo_dict, **kwargs): """ This func will save algorithm's dict data and it's performance data to ./performance.log in json format. :param algo_dict: algorithm's dict format. :param kwargs: Performance data of the algorithm. """ file_path = "./performance.log" print("Saving to " + file_path) for k in kwargs.keys(): algo_dict[k] = kwargs[k] if os.path.exists(file_path): with open(file_path, 'r+', encoding='utf-8') as f: logs = json.load(f) f.seek(0) keys = logs.keys() last_index = int(list(keys)[-1].replace("record", '')) new_key = "record" + str(last_index + 1) logs[new_key] = algo_dict json.dump(logs, f, ensure_ascii=False, indent=4) else: with open(file_path, 'w', encoding='utf-8') as f: logs = {"record1": algo_dict} json.dump(logs, f, ensure_ascii=False, indent=4)
@staticmethod def _job_dispatch(algo, k, train_set, test_set, n_jobs): class TestJob(mp.Process): def __init__(self, func, result_list, *args): super().__init__() self.func = func self.args = args self.res = result_list def run(self): self.res.append(self.func(*self.args)) manager = mp.Manager() res_list = manager.list() user_ids = train_set[['user_id']].drop_duplicates() user_num = user_ids.shape[0] part_num = int((user_num + n_jobs - 1) / n_jobs) job_list = [] recall = [] precision = [] for i in range(n_jobs): part_users = user_ids[i * part_num:i * part_num + part_num] part_train_set = train_set[train_set['user_id'].isin(part_users['user_id'])] part_test_set = test_set[test_set['user_id'].isin(part_users['user_id'])] j = TestJob(Evaluator._one_round_test, res_list, algo, k, part_train_set, part_test_set) job_list.append(j) j.start() for job in job_list: job.join() # All processes finished here. for i in range(len(res_list)): recall.append(res_list[i][0]) precision.append(res_list[i][1]) return sum(recall) / len(recall), sum(precision) / len(precision) @staticmethod def _one_round_test(algo, k, train_set, test_set): user_id_series = train_set['user_id'].drop_duplicates() recall_log = [] precision_log = [] for user_id in user_id_series: user_viewed_in_test = test_set[test_set['user_id'] == user_id].drop_duplicates() if user_viewed_in_test.shape[0] == 0: continue _, recommend = algo.top_k_recommend(user_id, k) recommend = pd.DataFrame({'item_id': recommend}) assert (recommend.shape[0] > 0), 'Recommend error' cover_number = (recommend[recommend['item_id'].isin(user_viewed_in_test['item_id'])]).shape[0] recall_log.append(cover_number / user_viewed_in_test.shape[0]) precision_log.append(cover_number / recommend.shape[0]) recall = sum(recall_log) / len(recall_log) precision = sum(precision_log) / len(precision_log) return recall, precision