Source code for ensign.synchronize_labels

# ENSIGN rights
""" Adjusts labels of multiple tensors and/or CP decompositions such that 
identical labels (e.g., "1/1/1970", "10.1.1.1") map to unique indices
(e.g., 5, 437).
"""

import copy
import os
import time

import numpy as np

import ensign.cp_decomp as cpd
import ensign.ensign_io.decomp_io as dio
import ensign.ensign_io.ensign_logging as logging
import ensign.ensign_io.sptensor_io as sio
import ensign.sptensor as spt

BASE_IDX = 0
UPDATE_IDX = 1

logger = logging.get_logger()

[docs]def synchronize_labels(tensor_dirs, modes_to_sync=None, modes_to_sort=[], output=None, in_place=False, verbose=False): """ Synchronizes labels between tensors. Rewrites each tensor in terms of the new label indices. Does not preserve the ordering of any mode maps. Parameters ---------- tensor_dirs : list of str or CPDecomp/SPTensor Paths to directories containing tensors to be synchronized. Or CPDecomp/SPTensor objects. The tensor in the zeroeth place is treated as tensor 0, tensor in place one will be treated as tensor 1, etc. modes_to_sync : list of lists of int Specifies which modes of each tensor to synchronize. e.g. [[0,1,2], [1,2,3]] synchronizes modes 0,1,2 of tensor 0 to modes 1,2,3 of tensor 1 respectively. Default: synchronizes all modes seen in the initial tensor. modes_to_sort: list of int Specifies which of the synchronized mode tuples, indexed from zero, given in modes_to_sync will be sorted according to their labels. For example, if modes_to_sync is [[1,2,3],[4,5,6]] and modes_to_sort is [0,2] then the (first tensor mode, second tensor mode) synchronized tuples are (1,4), (2,5), and (3,6) and modes 1 and 3 of the first tensor and modes 4 and 6 of the second tensor will be sorted. output : str Directory to save the synchronized base and update to. Directories named 'base' and 'update' in the output directory will be overwritten. Default: Do not save to disk. in_place : bool Overwrite the specified base_dir and update_dir. If toggled, the output option will be ignored. Default: False verbose : bool Display verbose output. Default: False Returns ------- list of CPDecomp/SPTensor List of synchronized CPDecomp and/or SPTensor objects. """ if verbose: logging._set_log_level('DEBUG', logger) else: logging._set_log_level('WARN', logger) start = time.time() logger.info('Reading in tensors/decompositions ...') for i, tensor1 in enumerate(tensor_dirs): for j, tensor2 in enumerate(tensor_dirs): if i != j and tensor1 == tensor2: msg = "Duplicate tensors/decompositions listed. They must be distinct." logger.error(msg) raise ValueError(msg) tensors = [] for tensor_dir in tensor_dirs: if isinstance(tensor_dir, str): tensors.append(read_tensor_decomp_files(tensor_dir)) else: tensors.append(convert_cpdecomp_or_sptensor_to_dict(tensor_dir)) if not modes_to_sync: for tensor in tensors: if tensor['order'] != tensors[0]['order']: msg = "If 'modes_to_sync' is not specified, all tensors must have the same order." logger.error(msg) raise ValueError(msg) modes_to_sync = [list(range(tensors[0]['order']))] * len(tensor_dirs) else: for mode in modes_to_sync: if len(mode) != len(modes_to_sync[0]): msg = 'Must be synchronizing the same number of modes between all tensors/decompositions.' logger.error(msg) raise ValueError(msg) if len(modes_to_sync) != len(tensor_dirs): msg = 'Each directory to be synchronized must have a corresponding comma-separated list of modes to synchronize.' logger.error(msg) raise ValueError(msg) for mode_id in modes_to_sort: if mode_id < 0 or mode_id >= len(modes_to_sync[0]): msg = 'Specified modes to sort must be synchronized modes.' logger.error(msg) raise ValueError(msg) for modes, tensor in zip(modes_to_sync, tensors): for mode in modes: if mode < 0 or mode >= tensor['order']: msg = "Mode: {} in modes_to_sync invalid.".format(mode) logger.error(msg) raise ValueError(msg) local_to_global_mode_ids = [] synced_mode_maps = [[]] * len(modes_to_sync[0]) if output and not in_place: if not os.path.exists(output): os.mkdir(output) # Loop through tensors and build globally synced mode maps logger.info('Building synchronized mode maps ...') for tensor_id, tensor in enumerate(tensors): local_to_global_mode_ids.append({}) for synced_mode_id, mode_id in enumerate(modes_to_sync[tensor_id]): synced_mode_map = synced_mode_maps[synced_mode_id] synced_mode_map = synced_mode_map + tensor['mode_maps'][mode_id] if synced_mode_id in modes_to_sort: synced_mode_maps[synced_mode_id] = sorted(list(set(synced_mode_map))) else: synced_mode_maps[synced_mode_id] = list(set(synced_mode_map)) local_to_global_mode_ids[tensor_id][mode_id] = synced_mode_id synced_labels_dicts = [{label: idx for idx, label in enumerate(synced_mode_map)} for synced_mode_map in synced_mode_maps] # Loop through tensors and rewrite logger.info('Rewriting tensors/decompositions ...') for tensor_id, (tensor, tensor_dir) in enumerate(zip(tensors, tensor_dirs)): logger.info(' Synchronizing tensor {} ...'.format(tensor_id)) for mode_id in modes_to_sync[tensor_id]: synced_mode_map = synced_labels_dicts[local_to_global_mode_ids[tensor_id][mode_id]] old_labels = tensor['mode_maps'][mode_id] logger.info(' Incorporating {} new labels into mode {} ...'.format(len(synced_mode_map) - len(old_labels), mode_id)) # Rewrite labels tensor['mode_maps'][mode_id] = synced_mode_maps[local_to_global_mode_ids[tensor_id][mode_id]] # Rewrite sptensor entries column if 'sptensor' in tensor.keys(): tensor['sptensor'][:, mode_id] = \ sync_sptensor_entries(tensor['sptensor'][:, mode_id], old_labels, synced_mode_map) # Rewrite factor matrix if 'factor_matrices' in tensor.keys(): tensor['factor_matrices'][mode_id] = \ sync_factor_matrix(tensor['factor_matrices'][mode_id], old_labels, synced_mode_map) # Write to disk if in_place: logger.info(' Overwriting {}'.format(tensor_dir)) write_tensor_decomp_files(tensor, tensor_dir) elif output: save_path = os.path.join(output, 'tensor_{}'.format(tensor_id)) logger.info(' Writing {} to {}'.format(tensor_dir, save_path)) write_tensor_decomp_files(tensor, save_path) finish = time.time() logger.info('Finished synchronizing {} tensors/decompositions over {} modes in {} seconds.'.format(len(tensors), len(modes_to_sync[0]), finish - start)) return [convert_dict_to_sptensor_or_cpdecomp(tensor) for tensor in tensors]
[docs]def synchronize_labels_stream(base_dir, update_dir, streaming_mode=0, output=None, in_place=False, ignore_base_tensor=False, verbose=False): """ Synchronizes specified modes between a base decomposition and an update tensor. Will attempt to synchronize tensor_data.txt files, decomp_mode_<x>.txt files and map_mode_<x>.txt files. Preserves the ordering of the mode maps of the base tensor. Parameters ---------- base_dir : str or CPDecomp/SPTensor Paths to base directory containing a tensor and/or decomposition. New indices are appended to the mode maps and factor matrices of this decomposition to preserve the original order. Must contain mode maps. update_dir : str or CPDecomp/SPTensor Paths to update directory containing a tensor and/or decomposition. Mode maps and tensor data files are rewritten in terms of the new label indices. Must contain mode maps. Note: Any decomposition files will be updated according to new label indices, however an existing decomposition will be invalidated if overlapping labels exist in the streaming mode between the update and base. streaming_mode : int Mode to stream along. Every other mode will be synchronized. Overlapping labels in the streaming mode will be removed from the update, along with any tensor entries that contain these labels. Removed entries will be logged to a file named 'dropped.csv'. Default: 0. output : str Directory to save the synchronized base and update to. Directories named 'base' and 'update' in the output directory will be overwritten. Default: Do not save to disk. in_place : bool Overwrite the specified base_dir and update_dir. If toggled, the output option will be ignored. Default: False ignore_base_tensor : bool Performance option. Will only synchronize the base decomposition and the update tensor/decomposition. If a tensor_data.txt file exists in the base directory, it will not be updated to reflect the synchronization, and it will be invalid with respect to the map_mode_<x>.txt and decomp_mode_<x>.txt files. Default: False verbose : bool Display verbose output. Default: False. Returns ------- (base, update) : (CPDecomp/SPTensor, CPDecomp/SPTensor) Returns either CPDecomp objects or SPTensor objects. If factor matrices are available, CPDecomp objects will be returned. Otherwise, SPTensor objects will be returned. """ if verbose: logging._set_log_level('DEBUG', logger) else: logging._set_log_level('WARN', logger) start = time.time() logger.info('Loading decompositions and/or tensors ...') try: # If one is a CPDecomp instance, and the other an SPTensor instance, testing equality will raise an exception if base_dir == update_dir: print('Base and update tensors/decompositions are already the same. Nothing to synchronize.') return base_dir, update_dir except: pass if isinstance(base_dir, str): base = read_tensor_decomp_files(base_dir) else: base = convert_cpdecomp_or_sptensor_to_dict(base_dir) if isinstance(update_dir, str): update = read_tensor_decomp_files(update_dir) else: update = convert_cpdecomp_or_sptensor_to_dict(update_dir) if base['order'] != update['order']: msg = 'Base and update tensors must have the same order.' logger.error(msg) raise ValueError(msg) if streaming_mode >= base['order'] or streaming_mode < 0: msg = 'Streaming mode invalid.' logger.error(msg) raise ValueError(msg) modes_to_sync = [list(range(base['order']))] * 2 modes_to_sync[0].remove(streaming_mode) # Removes 'streaming_mode' from both sublists clean_overlap = [streaming_mode, streaming_mode] # Create the save directory if output and not in_place: if not os.path.exists(output): os.mkdir(output) dropped_entries = None if clean_overlap is not None: logger.info('Cleaning overlap between mode {} of base and mode {} of update ...'.format(clean_overlap[BASE_IDX], clean_overlap[UPDATE_IDX])) overlap = list(set(base['mode_maps'][clean_overlap[BASE_IDX]]).intersection(update['mode_maps'][clean_overlap[UPDATE_IDX]])) if len(overlap) == len(update['mode_maps'][clean_overlap[UPDATE_IDX]]): logger.warning('Every label in the streaming mode of the update tensor exists in the base, there are no new values to stream.') if overlap: logger.info(' Removing {} labels from mode {} of the update tensor ...'.format(len(overlap), clean_overlap[UPDATE_IDX])) clean_mode_map = copy.deepcopy(update['mode_maps'][clean_overlap[UPDATE_IDX]]) for label in overlap: clean_mode_map.remove(label) update, dropped_entries = scrub_overlap_entries(update, overlap, clean_mode_map, clean_overlap[UPDATE_IDX]) update['mode_maps'][clean_overlap[UPDATE_IDX]] = clean_mode_map # Synchronize mode by mode for base_mode_id, update_mode_id in zip(modes_to_sync[BASE_IDX], modes_to_sync[UPDATE_IDX]): base, update = synchronize_mode(base, update, base_mode_id, update_mode_id, ignore_base_tensor, overlap) logger.info('Writing to disk ...') if 'sptensor' in update.keys(): if (len(update['sptensor']) == 0): logger.warning("Update tensor is of size 0. Check input data or options.") if in_place: logger.info(' Writing base to {}'.format(base_dir)) write_tensor_decomp_files(base, base_dir) logger.info(' Writing update to {}'.format(update_dir)) write_tensor_decomp_files(update, update_dir) if dropped_entries: with open(os.path.join(update_dir, 'dropped.csv'), 'w') as f: f.write(','.join(list(update['mode_names'].values()) + ['entry_val', 'entry_idx'])) for row in dropped_entries: f.write(','.join(row)) f.write('\n') elif output: base_save_path = os.path.join(output, 'base') update_save_path = os.path.join(output, 'update') logger.info(' Writing base to {}'.format(base_save_path)) write_tensor_decomp_files(base, base_save_path) logger.info(' Writing update to {}'.format(update_save_path)) write_tensor_decomp_files(update, update_save_path) if dropped_entries: with open(os.path.join(update_save_path, 'dropped.csv'), 'w') as f: f.write(','.join(list(update['mode_names'].values()) + ['entry_val', 'entry_idx'])) f.write('\n') for row in dropped_entries: f.write(','.join(row)) f.write('\n') finish = time.time() logger.info('Finished synchronization in {} seconds.'.format(finish - start)) return convert_dict_to_sptensor_or_cpdecomp(base), convert_dict_to_sptensor_or_cpdecomp(update)
def convert_cpdecomp_or_sptensor_to_dict(tensor_decomp): ret = {} if isinstance(tensor_decomp, cpd.CPDecomp): ret['order'] = tensor_decomp.order ret['weights'] = tensor_decomp.weights ret['factor_matrices'] = {i: tensor_decomp.factors[i] for i in range(ret['order'])} ret['mode_maps'] = {i: tensor_decomp.labels[i] for i in range(ret['order'])} ret['mode_names'] = {i: tensor_decomp.mode_names[i] for i in range(ret['order'])} if tensor_decomp.sptensor is not None: ret['sptensor'] = tensor_decomp.sptensor.entries.values if tensor_decomp.streaming_dataless_fit is not None: ret['streaming'] = tensor_decomp.streaming_dataless_fit else: ret['order'] = tensor_decomp.order ret['mode_maps'] = {i: tensor_decomp.labels[i] for i in range(ret['order'])} ret['mode_names'] = {i: tensor_decomp.mode_names[i] for i in range(ret['order'])} ret['sptensor'] = tensor_decomp.entries.values return ret def convert_dict_to_sptensor_or_cpdecomp(tensor_decomp): if 'factor_matrices' in tensor_decomp.keys(): ret = cpd.CPDecomp() ret.rank = tensor_decomp['factor_matrices'][0].shape[1] ret.order = tensor_decomp['order'] ret.mode_names = [tensor_decomp['mode_names'][i] for i in range(ret.order)] ret.labels = [tensor_decomp['mode_maps'][i] for i in range(ret.order)] ret.mode_sizes = [len(tensor_decomp['mode_maps'][i]) for i in range(ret.order)] ret.factors = [tensor_decomp['factor_matrices'][i] for i in range(ret.order)] ret.weights = tensor_decomp['weights'] if 'sptensor' in tensor_decomp.keys(): ret.sptensor = spt.SPTensor( order=ret.order, nnz=tensor_decomp['sptensor'].shape[0], mode_sizes=ret.mode_sizes, mode_names=ret.mode_names, labels=ret.labels, entries=tensor_decomp['sptensor']) return ret if 'streaming' in tensor_decomp.keys(): ret.streaming_dataless_fit = tensor_decomp['streaming'] else: if 'sptensor' in tensor_decomp.keys(): return spt.SPTensor( order=tensor_decomp['order'], mode_sizes=[len(tensor_decomp['mode_maps'][i]) for i in range(tensor_decomp['order'])], mode_names=[tensor_decomp['mode_names'][i] for i in range(tensor_decomp['order'])], labels=[tensor_decomp['mode_maps'][i] for i in range(tensor_decomp['order'])], entries=tensor_decomp['sptensor'], nnz=tensor_decomp['sptensor'].shape[0] ) else: print('WARNING returned sptensor only contains label information, no entries!') return spt.SPTensor( order=tensor_decomp['order'], mode_sizes=[len(tensor_decomp['mode_maps'][i]) for i in range(tensor_decomp['order'])], mode_names=[tensor_decomp['mode_names'][i] for i in range(tensor_decomp['order'])], labels=[tensor_decomp['mode_maps'][i] for i in range(tensor_decomp['order'])] ) def synchronize_mode(base, update, base_mode_id, update_mode_id, ignore_base_tensor, overlap): logger.info('Synchronizing mode {} of base to mode {} of update ...'.format(base_mode_id, update_mode_id)) # Get labels base_labels = base['mode_maps'][base_mode_id] update_labels = update['mode_maps'][update_mode_id] # Get synced mode map synced_labels = sorted(list(set(base_labels + update_labels))) synced_labels_dict = {label: idx for idx, label in enumerate(synced_labels)} # Synchronize Factor Matrices --------------------------------------------- if 'factor_matrices' in base.keys(): logger.info(' Synchronizing base factor matrix ...') base['factor_matrices'][base_mode_id] = \ sync_factor_matrix(base['factor_matrices'][base_mode_id], base_labels, synced_labels_dict) else: # With sync_stream, base decomposition must exist raise IOError('No decomposition files were found in base directory. Aborting.') if 'factor_matrices' in update.keys(): if overlap: logger.warning('Decomposition files were found in update directory and '+ 'overlapping labels were removed from the update tensor, the update ' + 'decomposition will no longer be valid.') logger.info(' Synchronizing update factor matrix ...') update['factor_matrices'][update_mode_id] = \ sync_factor_matrix(update['factor_matrices'][update_mode_id], update_labels, synced_labels_dict) # ------------------------------------------------------------------------- # Synchronize Sparse Tensors ---------------------------------------------- if ('sptensor' in base.keys()) and (not ignore_base_tensor): logger.info(' Synchronizing base sparse tensor ...') base['sptensor'][:, base_mode_id] = sync_sptensor_entries(base['sptensor'][:, base_mode_id], base_labels, synced_labels_dict) if ('sptensor' in update.keys()): logger.info(' Synchronizing update sparse tensor ...') update['sptensor'][:, update_mode_id] = \ sync_sptensor_entries(update['sptensor'][:, update_mode_id], update_labels, synced_labels_dict) else: # With sync_stream, update tensor must exist msg = 'No tensor_data.txt file was found in the update directory. Aborting.' logger.error(msg) raise IOError(msg) # ------------------------------------------------------------------------- logger.info(' Synchronizing labels ...') base['mode_maps'][base_mode_id] = synced_labels update['mode_maps'][update_mode_id] = synced_labels return base, update def sync_factor_matrix(old_factor_matrix, old_labels, synced_labels_dict): # Reorder rows of factor matrix according to new mode indices _, rank = old_factor_matrix.shape # Get the rank as that will not change new_factor_matrix = np.zeros((len(synced_labels_dict), rank)) # Initialize new (bigger) factor matrix sync_indices = [synced_labels_dict[label] for label in old_labels] # Get the new indices for where the original rows should now be new_factor_matrix[sync_indices, :] = old_factor_matrix # Assign old factor matrix to new factor matrix according to new indices return new_factor_matrix def sync_sptensor_entries(old_entries_column, old_labels, synced_labels_dict): # Replace sparse tensor coordinates with new mode indices new_entries_column = [] if (len(old_entries_column) > 0): for entry in np.nditer(old_entries_column): # nditer simply allows for fast iteration new_entries_column.append(synced_labels_dict[old_labels[int(entry)]]) return new_entries_column def scrub_overlap_entries(update, overlap, clean_mode_map, mode_id): # Delete rows that have labels in the stream mode that overlap with the base old_mode_map = update['mode_maps'][mode_id] row_idx = 0 delete_idxs = [] dropped_entries = [] for label_idx in np.nditer(update['sptensor'][:, mode_id]): if old_mode_map[int(label_idx)] in overlap: delete_idxs.append(row_idx) # Collect dropped entries dropped_entry = [] for mode_num in range(update['order']): dropped_entry.append(str(update['mode_maps'][mode_num][int(update['sptensor'][row_idx, mode_num])])) dropped_entry.append(str(update['sptensor'][row_idx, update['order']])) dropped_entry.append(str(row_idx)) dropped_entries.append(dropped_entry) row_idx += 1 update['sptensor'] = np.delete(update['sptensor'], delete_idxs, axis=0) logger.info(' Removed {} tensor entries containing overlapping labels.'.format(len(delete_idxs))) if (len(clean_mode_map) > 0) and (len(update['sptensor']) > 0): # Re-number label indices in streaming mode now that overlapping indices are gone update_label_idx = {update['mode_maps'][mode_id].index(label): clean_mode_map.index(label) for label in clean_mode_map} row_idx = 0 for label_idx in np.nditer(update['sptensor'][:, mode_id]): update['sptensor'][row_idx, mode_id] = update_label_idx[int(label_idx)] row_idx += 1 else: logger.info("No indices to re-label as clean mode map is empty.") return update, dropped_entries def read_tensor_decomp_files(directory): # Reads in mode maps and the tensor/decomposition if either are available. # Returns a dictionary containing a subset of the following potential keys: # 'order', 'mode_maps', 'mode_names', 'sptensor', 'factor_matrices' # Order, mode_maps and mode_names should always be available. tensor_decomp = {} # Initialize dictionary # Get available files in directory available_files = os.listdir(directory) mode_map_files = list(filter(lambda x: x.startswith('map_mode_') and x.endswith('.txt'), available_files)) # Verify that mode map files exist. # If so, use them to deduce the order of the tensor tensor_decomp['order'] = len(mode_map_files) if tensor_decomp['order'] == 0: msg = 'All directories must contain map_mode_<x>.txt files to synchronize: {}'.format(directory) logger.error(msg) raise IOError(msg) # Read in mode maps and associated mode names mode_map_dict, mode_name_dict = {}, {} for mode_id in range(tensor_decomp['order']): map_mode_file_path = os.path.join(directory, 'map_mode_{}.txt'.format(mode_id)) mode_name_dict[mode_id], mode_map_dict[mode_id] = sio.read_labels(map_mode_file_path) tensor_decomp['mode_maps'] = mode_map_dict tensor_decomp['mode_names'] = mode_name_dict # Read in tensor data if available if 'tensor_data.txt' in available_files: tensor_decomp['sptensor'] = sio.read_sptensor_entries(os.path.join(directory, 'tensor_data.txt'))[3] if 'weights.txt' in available_files: tensor_decomp['weights'] = dio.read_weights(os.path.join(directory, 'weights.txt'))[0] if 'streaming.txt' in available_files: tensor_decomp['streaming'] = dio.read_streaming(os.path.join(directory, 'streaming.txt')) # Read in factor matrices if available if 'decomp_mode_' in ' '.join(available_files): decomp_mode_dict = {} for mode_id in range(tensor_decomp['order']): decomp_mode_file_path = os.path.join(directory, 'decomp_mode_{}.txt'.format(mode_id)) decomp_mode_dict[mode_id] = dio.read_factor_matrix(decomp_mode_file_path)[0] tensor_decomp['factor_matrices'] = decomp_mode_dict return tensor_decomp def write_tensor_decomp_files(tensor_decomp, directory): # Writes the mode maps and any other available components to disk. if not os.path.exists(directory): os.mkdir(directory) # Write mode maps for mode_id in range(tensor_decomp['order']): sio.write_labels(directory, mode_id, tensor_decomp['mode_names'][mode_id], tensor_decomp['mode_maps'][mode_id]) # Write factor matrices if available if 'factor_matrices' in tensor_decomp.keys(): for mode_id in range(tensor_decomp['order']): dio.write_factor_matrix(directory, mode_id, tensor_decomp['factor_matrices'][mode_id]) # Write tensor data if available if 'sptensor' in tensor_decomp.keys(): mode_sizes = [len(tensor_decomp['mode_maps'][i]) for i in range(tensor_decomp['order'])] sio.write_sptensor_entries(directory, mode_sizes, tensor_decomp['sptensor']) # Write weights if available if 'weights' in tensor_decomp.keys(): dio.write_weights(directory, tensor_decomp['weights']) # Write streaming if available if 'streaming' in tensor_decomp.keys(): dio.write_streaming(directory, tensor_decomp['streaming'])