# ENSIGN rights
""" Adjusts labels of multiple tensors and/or CP decompositions such that
identical labels (e.g., "1/1/1970", "10.1.1.1") map to unique indices
(e.g., 5, 437).
"""
import copy
import os
import time
import numpy as np
import ensign.cp_decomp as cpd
import ensign.ensign_io.decomp_io as dio
import ensign.ensign_io.ensign_logging as logging
import ensign.ensign_io.sptensor_io as sio
import ensign.sptensor as spt
BASE_IDX = 0
UPDATE_IDX = 1
logger = logging.get_logger()
[docs]def synchronize_labels(tensor_dirs, modes_to_sync=None, modes_to_sort=[], output=None, in_place=False, verbose=False):
"""
Synchronizes labels between tensors. Rewrites each tensor in terms of the new label indices.
Does not preserve the ordering of any mode maps.
Parameters
----------
tensor_dirs : list of str or CPDecomp/SPTensor
Paths to directories containing tensors to be synchronized.
Or CPDecomp/SPTensor objects.
The tensor in the zeroeth place is treated as tensor 0, tensor
in place one will be treated as tensor 1, etc.
modes_to_sync : list of lists of int
Specifies which modes of each tensor to synchronize.
e.g. [[0,1,2], [1,2,3]] synchronizes modes 0,1,2 of tensor 0
to modes 1,2,3 of tensor 1 respectively.
Default: synchronizes all modes seen in the initial tensor.
modes_to_sort: list of int
Specifies which of the synchronized mode tuples, indexed from zero,
given in modes_to_sync will be sorted according to their labels. For
example, if modes_to_sync is [[1,2,3],[4,5,6]] and modes_to_sort is [0,2]
then the (first tensor mode, second tensor mode) synchronized tuples are
(1,4), (2,5), and (3,6) and modes 1 and 3 of the first tensor and modes
4 and 6 of the second tensor will be sorted.
output : str
Directory to save the synchronized base and update to. Directories named
'base' and 'update' in the output directory will be overwritten.
Default: Do not save to disk.
in_place : bool
Overwrite the specified base_dir and update_dir. If toggled, the output
option will be ignored. Default: False
verbose : bool
Display verbose output. Default: False
Returns
-------
list of CPDecomp/SPTensor
List of synchronized CPDecomp and/or SPTensor objects.
"""
if verbose:
logging._set_log_level('DEBUG', logger)
else:
logging._set_log_level('WARN', logger)
start = time.time()
logger.info('Reading in tensors/decompositions ...')
for i, tensor1 in enumerate(tensor_dirs):
for j, tensor2 in enumerate(tensor_dirs):
if i != j and tensor1 == tensor2:
msg = "Duplicate tensors/decompositions listed. They must be distinct."
logger.error(msg)
raise ValueError(msg)
tensors = []
for tensor_dir in tensor_dirs:
if isinstance(tensor_dir, str):
tensors.append(read_tensor_decomp_files(tensor_dir))
else:
tensors.append(convert_cpdecomp_or_sptensor_to_dict(tensor_dir))
if not modes_to_sync:
for tensor in tensors:
if tensor['order'] != tensors[0]['order']:
msg = "If 'modes_to_sync' is not specified, all tensors must have the same order."
logger.error(msg)
raise ValueError(msg)
modes_to_sync = [list(range(tensors[0]['order']))] * len(tensor_dirs)
else:
for mode in modes_to_sync:
if len(mode) != len(modes_to_sync[0]):
msg = 'Must be synchronizing the same number of modes between all tensors/decompositions.'
logger.error(msg)
raise ValueError(msg)
if len(modes_to_sync) != len(tensor_dirs):
msg = 'Each directory to be synchronized must have a corresponding comma-separated list of modes to synchronize.'
logger.error(msg)
raise ValueError(msg)
for mode_id in modes_to_sort:
if mode_id < 0 or mode_id >= len(modes_to_sync[0]):
msg = 'Specified modes to sort must be synchronized modes.'
logger.error(msg)
raise ValueError(msg)
for modes, tensor in zip(modes_to_sync, tensors):
for mode in modes:
if mode < 0 or mode >= tensor['order']:
msg = "Mode: {} in modes_to_sync invalid.".format(mode)
logger.error(msg)
raise ValueError(msg)
local_to_global_mode_ids = []
synced_mode_maps = [[]] * len(modes_to_sync[0])
if output and not in_place:
if not os.path.exists(output):
os.mkdir(output)
# Loop through tensors and build globally synced mode maps
logger.info('Building synchronized mode maps ...')
for tensor_id, tensor in enumerate(tensors):
local_to_global_mode_ids.append({})
for synced_mode_id, mode_id in enumerate(modes_to_sync[tensor_id]):
synced_mode_map = synced_mode_maps[synced_mode_id]
synced_mode_map = synced_mode_map + tensor['mode_maps'][mode_id]
if synced_mode_id in modes_to_sort:
synced_mode_maps[synced_mode_id] = sorted(list(set(synced_mode_map)))
else:
synced_mode_maps[synced_mode_id] = list(set(synced_mode_map))
local_to_global_mode_ids[tensor_id][mode_id] = synced_mode_id
synced_labels_dicts = [{label: idx for idx, label in enumerate(synced_mode_map)} for synced_mode_map in synced_mode_maps]
# Loop through tensors and rewrite
logger.info('Rewriting tensors/decompositions ...')
for tensor_id, (tensor, tensor_dir) in enumerate(zip(tensors, tensor_dirs)):
logger.info(' Synchronizing tensor {} ...'.format(tensor_id))
for mode_id in modes_to_sync[tensor_id]:
synced_mode_map = synced_labels_dicts[local_to_global_mode_ids[tensor_id][mode_id]]
old_labels = tensor['mode_maps'][mode_id]
logger.info(' Incorporating {} new labels into mode {} ...'.format(len(synced_mode_map) - len(old_labels), mode_id))
# Rewrite labels
tensor['mode_maps'][mode_id] = synced_mode_maps[local_to_global_mode_ids[tensor_id][mode_id]]
# Rewrite sptensor entries column
if 'sptensor' in tensor.keys():
tensor['sptensor'][:, mode_id] = \
sync_sptensor_entries(tensor['sptensor'][:, mode_id],
old_labels, synced_mode_map)
# Rewrite factor matrix
if 'factor_matrices' in tensor.keys():
tensor['factor_matrices'][mode_id] = \
sync_factor_matrix(tensor['factor_matrices'][mode_id],
old_labels, synced_mode_map)
# Write to disk
if in_place:
logger.info(' Overwriting {}'.format(tensor_dir))
write_tensor_decomp_files(tensor, tensor_dir)
elif output:
save_path = os.path.join(output, 'tensor_{}'.format(tensor_id))
logger.info(' Writing {} to {}'.format(tensor_dir, save_path))
write_tensor_decomp_files(tensor, save_path)
finish = time.time()
logger.info('Finished synchronizing {} tensors/decompositions over {} modes in {} seconds.'.format(len(tensors), len(modes_to_sync[0]), finish - start))
return [convert_dict_to_sptensor_or_cpdecomp(tensor) for tensor in tensors]
[docs]def synchronize_labels_stream(base_dir, update_dir, streaming_mode=0, output=None, in_place=False, ignore_base_tensor=False, verbose=False):
"""
Synchronizes specified modes between a base decomposition and an update tensor.
Will attempt to synchronize tensor_data.txt files, decomp_mode_<x>.txt files
and map_mode_<x>.txt files. Preserves the ordering of the mode maps of the base
tensor.
Parameters
----------
base_dir : str or CPDecomp/SPTensor
Paths to base directory containing a tensor and/or decomposition. New indices
are appended to the mode maps and factor matrices of this decomposition to
preserve the original order. Must contain mode maps.
update_dir : str or CPDecomp/SPTensor
Paths to update directory containing a tensor and/or decomposition. Mode maps
and tensor data files are rewritten in terms of the new label indices.
Must contain mode maps.
Note: Any decomposition files will be updated according to new label indices,
however an existing decomposition will be invalidated if overlapping labels
exist in the streaming mode between the update and base.
streaming_mode : int
Mode to stream along. Every other mode will be synchronized. Overlapping
labels in the streaming mode will be removed from the update, along with
any tensor entries that contain these labels. Removed entries will be
logged to a file named 'dropped.csv'. Default: 0.
output : str
Directory to save the synchronized base and update to. Directories named
'base' and 'update' in the output directory will be overwritten.
Default: Do not save to disk.
in_place : bool
Overwrite the specified base_dir and update_dir. If toggled, the output
option will be ignored. Default: False
ignore_base_tensor : bool
Performance option. Will only synchronize the base decomposition and the
update tensor/decomposition. If a tensor_data.txt file exists in the base
directory, it will not be updated to reflect the synchronization, and it will be
invalid with respect to the map_mode_<x>.txt and decomp_mode_<x>.txt files.
Default: False
verbose : bool
Display verbose output. Default: False.
Returns
-------
(base, update) : (CPDecomp/SPTensor, CPDecomp/SPTensor)
Returns either CPDecomp objects or SPTensor objects. If
factor matrices are available, CPDecomp objects will be
returned. Otherwise, SPTensor objects will be returned.
"""
if verbose:
logging._set_log_level('DEBUG', logger)
else:
logging._set_log_level('WARN', logger)
start = time.time()
logger.info('Loading decompositions and/or tensors ...')
try: # If one is a CPDecomp instance, and the other an SPTensor instance, testing equality will raise an exception
if base_dir == update_dir:
print('Base and update tensors/decompositions are already the same. Nothing to synchronize.')
return base_dir, update_dir
except:
pass
if isinstance(base_dir, str):
base = read_tensor_decomp_files(base_dir)
else:
base = convert_cpdecomp_or_sptensor_to_dict(base_dir)
if isinstance(update_dir, str):
update = read_tensor_decomp_files(update_dir)
else:
update = convert_cpdecomp_or_sptensor_to_dict(update_dir)
if base['order'] != update['order']:
msg = 'Base and update tensors must have the same order.'
logger.error(msg)
raise ValueError(msg)
if streaming_mode >= base['order'] or streaming_mode < 0:
msg = 'Streaming mode invalid.'
logger.error(msg)
raise ValueError(msg)
modes_to_sync = [list(range(base['order']))] * 2
modes_to_sync[0].remove(streaming_mode) # Removes 'streaming_mode' from both sublists
clean_overlap = [streaming_mode, streaming_mode]
# Create the save directory
if output and not in_place:
if not os.path.exists(output):
os.mkdir(output)
dropped_entries = None
if clean_overlap is not None:
logger.info('Cleaning overlap between mode {} of base and mode {} of update ...'.format(clean_overlap[BASE_IDX], clean_overlap[UPDATE_IDX]))
overlap = list(set(base['mode_maps'][clean_overlap[BASE_IDX]]).intersection(update['mode_maps'][clean_overlap[UPDATE_IDX]]))
if len(overlap) == len(update['mode_maps'][clean_overlap[UPDATE_IDX]]):
logger.warning('Every label in the streaming mode of the update tensor exists in the base, there are no new values to stream.')
if overlap:
logger.info(' Removing {} labels from mode {} of the update tensor ...'.format(len(overlap), clean_overlap[UPDATE_IDX]))
clean_mode_map = copy.deepcopy(update['mode_maps'][clean_overlap[UPDATE_IDX]])
for label in overlap:
clean_mode_map.remove(label)
update, dropped_entries = scrub_overlap_entries(update, overlap, clean_mode_map, clean_overlap[UPDATE_IDX])
update['mode_maps'][clean_overlap[UPDATE_IDX]] = clean_mode_map
# Synchronize mode by mode
for base_mode_id, update_mode_id in zip(modes_to_sync[BASE_IDX], modes_to_sync[UPDATE_IDX]):
base, update = synchronize_mode(base, update, base_mode_id, update_mode_id, ignore_base_tensor, overlap)
logger.info('Writing to disk ...')
if 'sptensor' in update.keys():
if (len(update['sptensor']) == 0):
logger.warning("Update tensor is of size 0. Check input data or options.")
if in_place:
logger.info(' Writing base to {}'.format(base_dir))
write_tensor_decomp_files(base, base_dir)
logger.info(' Writing update to {}'.format(update_dir))
write_tensor_decomp_files(update, update_dir)
if dropped_entries:
with open(os.path.join(update_dir, 'dropped.csv'), 'w') as f:
f.write(','.join(list(update['mode_names'].values()) + ['entry_val', 'entry_idx']))
for row in dropped_entries:
f.write(','.join(row))
f.write('\n')
elif output:
base_save_path = os.path.join(output, 'base')
update_save_path = os.path.join(output, 'update')
logger.info(' Writing base to {}'.format(base_save_path))
write_tensor_decomp_files(base, base_save_path)
logger.info(' Writing update to {}'.format(update_save_path))
write_tensor_decomp_files(update, update_save_path)
if dropped_entries:
with open(os.path.join(update_save_path, 'dropped.csv'), 'w') as f:
f.write(','.join(list(update['mode_names'].values()) + ['entry_val', 'entry_idx']))
f.write('\n')
for row in dropped_entries:
f.write(','.join(row))
f.write('\n')
finish = time.time()
logger.info('Finished synchronization in {} seconds.'.format(finish - start))
return convert_dict_to_sptensor_or_cpdecomp(base), convert_dict_to_sptensor_or_cpdecomp(update)
def convert_cpdecomp_or_sptensor_to_dict(tensor_decomp):
ret = {}
if isinstance(tensor_decomp, cpd.CPDecomp):
ret['order'] = tensor_decomp.order
ret['weights'] = tensor_decomp.weights
ret['factor_matrices'] = {i: tensor_decomp.factors[i] for i in range(ret['order'])}
ret['mode_maps'] = {i: tensor_decomp.labels[i] for i in range(ret['order'])}
ret['mode_names'] = {i: tensor_decomp.mode_names[i] for i in range(ret['order'])}
if tensor_decomp.sptensor is not None:
ret['sptensor'] = tensor_decomp.sptensor.entries.values
if tensor_decomp.streaming_dataless_fit is not None:
ret['streaming'] = tensor_decomp.streaming_dataless_fit
else:
ret['order'] = tensor_decomp.order
ret['mode_maps'] = {i: tensor_decomp.labels[i] for i in range(ret['order'])}
ret['mode_names'] = {i: tensor_decomp.mode_names[i] for i in range(ret['order'])}
ret['sptensor'] = tensor_decomp.entries.values
return ret
def convert_dict_to_sptensor_or_cpdecomp(tensor_decomp):
if 'factor_matrices' in tensor_decomp.keys():
ret = cpd.CPDecomp()
ret.rank = tensor_decomp['factor_matrices'][0].shape[1]
ret.order = tensor_decomp['order']
ret.mode_names = [tensor_decomp['mode_names'][i] for i in range(ret.order)]
ret.labels = [tensor_decomp['mode_maps'][i] for i in range(ret.order)]
ret.mode_sizes = [len(tensor_decomp['mode_maps'][i]) for i in range(ret.order)]
ret.factors = [tensor_decomp['factor_matrices'][i] for i in range(ret.order)]
ret.weights = tensor_decomp['weights']
if 'sptensor' in tensor_decomp.keys():
ret.sptensor = spt.SPTensor(
order=ret.order,
nnz=tensor_decomp['sptensor'].shape[0],
mode_sizes=ret.mode_sizes,
mode_names=ret.mode_names,
labels=ret.labels,
entries=tensor_decomp['sptensor'])
return ret
if 'streaming' in tensor_decomp.keys():
ret.streaming_dataless_fit = tensor_decomp['streaming']
else:
if 'sptensor' in tensor_decomp.keys():
return spt.SPTensor(
order=tensor_decomp['order'],
mode_sizes=[len(tensor_decomp['mode_maps'][i]) for i in range(tensor_decomp['order'])],
mode_names=[tensor_decomp['mode_names'][i] for i in range(tensor_decomp['order'])],
labels=[tensor_decomp['mode_maps'][i] for i in range(tensor_decomp['order'])],
entries=tensor_decomp['sptensor'],
nnz=tensor_decomp['sptensor'].shape[0]
)
else:
print('WARNING returned sptensor only contains label information, no entries!')
return spt.SPTensor(
order=tensor_decomp['order'],
mode_sizes=[len(tensor_decomp['mode_maps'][i]) for i in range(tensor_decomp['order'])],
mode_names=[tensor_decomp['mode_names'][i] for i in range(tensor_decomp['order'])],
labels=[tensor_decomp['mode_maps'][i] for i in range(tensor_decomp['order'])]
)
def synchronize_mode(base, update, base_mode_id, update_mode_id, ignore_base_tensor, overlap):
logger.info('Synchronizing mode {} of base to mode {} of update ...'.format(base_mode_id, update_mode_id))
# Get labels
base_labels = base['mode_maps'][base_mode_id]
update_labels = update['mode_maps'][update_mode_id]
# Get synced mode map
synced_labels = sorted(list(set(base_labels + update_labels)))
synced_labels_dict = {label: idx for idx, label in enumerate(synced_labels)}
# Synchronize Factor Matrices ---------------------------------------------
if 'factor_matrices' in base.keys():
logger.info(' Synchronizing base factor matrix ...')
base['factor_matrices'][base_mode_id] = \
sync_factor_matrix(base['factor_matrices'][base_mode_id],
base_labels, synced_labels_dict)
else: # With sync_stream, base decomposition must exist
raise IOError('No decomposition files were found in base directory. Aborting.')
if 'factor_matrices' in update.keys():
if overlap:
logger.warning('Decomposition files were found in update directory and '+
'overlapping labels were removed from the update tensor, the update ' +
'decomposition will no longer be valid.')
logger.info(' Synchronizing update factor matrix ...')
update['factor_matrices'][update_mode_id] = \
sync_factor_matrix(update['factor_matrices'][update_mode_id],
update_labels, synced_labels_dict)
# -------------------------------------------------------------------------
# Synchronize Sparse Tensors ----------------------------------------------
if ('sptensor' in base.keys()) and (not ignore_base_tensor):
logger.info(' Synchronizing base sparse tensor ...')
base['sptensor'][:, base_mode_id] = sync_sptensor_entries(base['sptensor'][:, base_mode_id],
base_labels, synced_labels_dict)
if ('sptensor' in update.keys()):
logger.info(' Synchronizing update sparse tensor ...')
update['sptensor'][:, update_mode_id] = \
sync_sptensor_entries(update['sptensor'][:, update_mode_id],
update_labels, synced_labels_dict)
else: # With sync_stream, update tensor must exist
msg = 'No tensor_data.txt file was found in the update directory. Aborting.'
logger.error(msg)
raise IOError(msg)
# -------------------------------------------------------------------------
logger.info(' Synchronizing labels ...')
base['mode_maps'][base_mode_id] = synced_labels
update['mode_maps'][update_mode_id] = synced_labels
return base, update
def sync_factor_matrix(old_factor_matrix, old_labels, synced_labels_dict):
# Reorder rows of factor matrix according to new mode indices
_, rank = old_factor_matrix.shape # Get the rank as that will not change
new_factor_matrix = np.zeros((len(synced_labels_dict), rank)) # Initialize new (bigger) factor matrix
sync_indices = [synced_labels_dict[label] for label in old_labels] # Get the new indices for where the original rows should now be
new_factor_matrix[sync_indices, :] = old_factor_matrix # Assign old factor matrix to new factor matrix according to new indices
return new_factor_matrix
def sync_sptensor_entries(old_entries_column, old_labels, synced_labels_dict):
# Replace sparse tensor coordinates with new mode indices
new_entries_column = []
if (len(old_entries_column) > 0):
for entry in np.nditer(old_entries_column): # nditer simply allows for fast iteration
new_entries_column.append(synced_labels_dict[old_labels[int(entry)]])
return new_entries_column
def scrub_overlap_entries(update, overlap, clean_mode_map, mode_id):
# Delete rows that have labels in the stream mode that overlap with the base
old_mode_map = update['mode_maps'][mode_id]
row_idx = 0
delete_idxs = []
dropped_entries = []
for label_idx in np.nditer(update['sptensor'][:, mode_id]):
if old_mode_map[int(label_idx)] in overlap:
delete_idxs.append(row_idx)
# Collect dropped entries
dropped_entry = []
for mode_num in range(update['order']):
dropped_entry.append(str(update['mode_maps'][mode_num][int(update['sptensor'][row_idx, mode_num])]))
dropped_entry.append(str(update['sptensor'][row_idx, update['order']]))
dropped_entry.append(str(row_idx))
dropped_entries.append(dropped_entry)
row_idx += 1
update['sptensor'] = np.delete(update['sptensor'], delete_idxs, axis=0)
logger.info(' Removed {} tensor entries containing overlapping labels.'.format(len(delete_idxs)))
if (len(clean_mode_map) > 0) and (len(update['sptensor']) > 0):
# Re-number label indices in streaming mode now that overlapping indices are gone
update_label_idx = {update['mode_maps'][mode_id].index(label): clean_mode_map.index(label) for label in clean_mode_map}
row_idx = 0
for label_idx in np.nditer(update['sptensor'][:, mode_id]):
update['sptensor'][row_idx, mode_id] = update_label_idx[int(label_idx)]
row_idx += 1
else:
logger.info("No indices to re-label as clean mode map is empty.")
return update, dropped_entries
def read_tensor_decomp_files(directory):
# Reads in mode maps and the tensor/decomposition if either are available.
# Returns a dictionary containing a subset of the following potential keys:
# 'order', 'mode_maps', 'mode_names', 'sptensor', 'factor_matrices'
# Order, mode_maps and mode_names should always be available.
tensor_decomp = {} # Initialize dictionary
# Get available files in directory
available_files = os.listdir(directory)
mode_map_files = list(filter(lambda x: x.startswith('map_mode_') and x.endswith('.txt'), available_files))
# Verify that mode map files exist.
# If so, use them to deduce the order of the tensor
tensor_decomp['order'] = len(mode_map_files)
if tensor_decomp['order'] == 0:
msg = 'All directories must contain map_mode_<x>.txt files to synchronize: {}'.format(directory)
logger.error(msg)
raise IOError(msg)
# Read in mode maps and associated mode names
mode_map_dict, mode_name_dict = {}, {}
for mode_id in range(tensor_decomp['order']):
map_mode_file_path = os.path.join(directory, 'map_mode_{}.txt'.format(mode_id))
mode_name_dict[mode_id], mode_map_dict[mode_id] = sio.read_labels(map_mode_file_path)
tensor_decomp['mode_maps'] = mode_map_dict
tensor_decomp['mode_names'] = mode_name_dict
# Read in tensor data if available
if 'tensor_data.txt' in available_files:
tensor_decomp['sptensor'] = sio.read_sptensor_entries(os.path.join(directory, 'tensor_data.txt'))[3]
if 'weights.txt' in available_files:
tensor_decomp['weights'] = dio.read_weights(os.path.join(directory, 'weights.txt'))[0]
if 'streaming.txt' in available_files:
tensor_decomp['streaming'] = dio.read_streaming(os.path.join(directory, 'streaming.txt'))
# Read in factor matrices if available
if 'decomp_mode_' in ' '.join(available_files):
decomp_mode_dict = {}
for mode_id in range(tensor_decomp['order']):
decomp_mode_file_path = os.path.join(directory, 'decomp_mode_{}.txt'.format(mode_id))
decomp_mode_dict[mode_id] = dio.read_factor_matrix(decomp_mode_file_path)[0]
tensor_decomp['factor_matrices'] = decomp_mode_dict
return tensor_decomp
def write_tensor_decomp_files(tensor_decomp, directory):
# Writes the mode maps and any other available components to disk.
if not os.path.exists(directory):
os.mkdir(directory)
# Write mode maps
for mode_id in range(tensor_decomp['order']):
sio.write_labels(directory, mode_id, tensor_decomp['mode_names'][mode_id],
tensor_decomp['mode_maps'][mode_id])
# Write factor matrices if available
if 'factor_matrices' in tensor_decomp.keys():
for mode_id in range(tensor_decomp['order']):
dio.write_factor_matrix(directory, mode_id, tensor_decomp['factor_matrices'][mode_id])
# Write tensor data if available
if 'sptensor' in tensor_decomp.keys():
mode_sizes = [len(tensor_decomp['mode_maps'][i])
for i in range(tensor_decomp['order'])]
sio.write_sptensor_entries(directory, mode_sizes, tensor_decomp['sptensor'])
# Write weights if available
if 'weights' in tensor_decomp.keys():
dio.write_weights(directory, tensor_decomp['weights'])
# Write streaming if available
if 'streaming' in tensor_decomp.keys():
dio.write_streaming(directory, tensor_decomp['streaming'])