Source code for ensign.sptensor

# ENSIGN rights
"""Sparse tensor tools. 

This module contains functions for reading, writing, and representing sparse 
tensors.
"""
import json
import os
import re

import numpy as np
import pandas as pd

from ensign.constants import *
import ensign.ensign_io.ensign_logging as ensign_logging
import ensign.ensign_io.sptensor_io as sio

log = ensign_logging.get_logger()

[docs]class SPTensor: """Represents a sparse tensor. Attributes ---------- order : int Order of sparse tensor. nnz : int Number of nonzero tensor entries. mode_sizes : list of int The number of indices in each mode accessed by ``mode_id``. mode_names : list of str The name of each mode indexed by ``mode_id``. entries : Pandas dataframe 2D dataframe of tensor entries. This dataframe has ``order + 1`` columns and ``nnz`` rows. The first ``order`` columns represent 64-bit ``int`` tensor indices. The final column represents a ``float64`` nonzero tensor entry corresponding to the tensor indices in its row. labels : list of list of str, optional An optional list of per-mode index labels accessed by ``mode_id``. Each list of labels corresponds to the labels for each index of the mode. Suppose we have a mode with ``mode_id = 3`` and ``"Yes"``, ``"No"``, ``"Maybe"`` mapped to indices 0-2. For this mode, ``labels[3]`` is equal to ``["Yes", "No", "Maybe"]``. SPTensors are not required to have labels. spt_backtrack : list of lists of tuples An optional list with as many elements as non-zero tensor entries. Each element is a list containing tuples that indicate all file-line pairs in the CSV file that contributed to that entry. For example, given a tensor with three entries such that the first tracks to CSV 0, line 15 and CSV 1, line 7, the second tracks to CSV 0, line 45, and the third tracks to CSV 1, line 23, the list would be: [[(0,15), (1,7)], [(0,45)], [(1,23)]] queries : list of list of strings An optional list with as many elements as number of modes. Each of these lists has as many entries as non-zero tensor entries. Each entry is a string describing selection criteria for the bin corresponding to the given entry and mode. For example, in a tensor with float64, int64, and ip modes binned by round=1, log10, and ipsubnet=255.0.0.0, respectively, the list could be: [["c1 >= 0.1 AND c1 < 0.2", ...], ["c2 >=10 AND c2 < 100", ...], ["c3 & 255.0.0.0 == 10.0.0.0", ...]] """
[docs] def __init__(self, order=0, nnz=0, mode_sizes=None, entries=None, mode_names=None, labels=None, spt_backtrack=None, queries=None, directory=None): self.order = order self.nnz = nnz self.mode_sizes = mode_sizes self.mode_names = mode_names self.entries = entries self.labels = labels self.spt_backtrack = spt_backtrack self.queries = queries if directory: sptensor = read_sptensor(directory) self.order = sptensor.order self.nnz = sptensor.nnz self.mode_sizes = sptensor.mode_sizes self.mode_names = sptensor.mode_names self.entries = sptensor.entries self.labels = sptensor.labels self.spt_backtrack = sptensor.spt_backtrack self.queries = sptensor.queries
def __eq__(self, t): if type(self) != type(t): return False if self is None and t is None: return True elif self is None or t is None: return False if self.spt_backtrack is not None: self.spt_backtrack = [sorted(bt) for bt in self.spt_backtrack] if t.spt_backtrack is not None: t.spt_backtrack = [sorted(bt) for bt in t.spt_backtrack] if self.order != t.order or\ self.nnz != t.nnz or\ self.mode_sizes != t.mode_sizes or\ not np.array_equal(self.entries, t.entries) or\ self.mode_names != t.mode_names or\ self.labels != t.labels or\ self.spt_backtrack != t.spt_backtrack: return False return True def __ne__(self, t): return not self.__eq__(t) def __str__(self): ret = {'order': self.order, 'nnz': self.nnz, 'mode_sizes': self.mode_sizes, 'mode_names': self.mode_names} return json.dumps(ret)
[docs] def write(self, outdir): """ Writes a text representation of the sparse tensor to the given directory. If the directory does not exist, it will be created. If the directory exists, it will be replaced. Parameters ---------- outdir : str Absolute or relative path of the directory that will contain the sparse tensor """ write_sptensor(outdir, self)
[docs]def write_sptensor(directory, sptensor, tensor_filename='tensor_data.txt'): """Writes a sparse tensor to the filesystem. Parameters ---------- directory : str Directory to write the tensor to. This directory will be created if nonexistent and **will be overwritten** if it exists. sptensor : SPTensor The sparse tensor to be written. Raises ------ Exception If the sparse tensor could not be written. See also -------- ensign.sptensor.SPTensor : Sparse tensor class """ if not os.path.exists(directory): os.mkdir(directory) for mode_id, mode_name in enumerate(sptensor.mode_names): sio.write_labels(directory, mode_id, mode_name, sptensor.labels[mode_id]) sio.write_sptensor_entries(directory, sptensor.mode_sizes, sptensor.entries.values, tensor_filename) if sptensor.spt_backtrack is not None: sio.write_sptensor_backtrack(directory, sptensor.spt_backtrack) else: if os.path.isfile(directory + '/spt_backtrack.txt'): os.remove(directory + '/spt_backtrack.txt') if sptensor.queries is not None: for mode_id, mode_name in enumerate(sptensor.mode_names): sio.write_labels(directory, mode_id, mode_name, sptensor.queries[mode_id], prefix='queries_mode_')
[docs]def read_sptensor(directory, tensor_file='tensor_data.txt', read_map_modes=True): """Reads a sparse tensor file from the filesystem. Looks for mode maps formatted as map_mode_<i>.txt and reads the metadata into ``sptensor`` if found. Parameters ---------- directory : str Name of directory containing tensor data and mode maps. tensor_file : str Filename of sparse tensor file. Default: 'tensor_data.txt' read_map_modes : str If True, read_sptensor will look for and read from existing mode map files to determine mode names and labels. Returns ------- sptensor : SPTensor The sparse tensor in ``directory`` Raises ------ Exception If the sparse tensor cannot be read or is not well formed. See also -------- ensign.sptensor.SPTensor : Sparse tensor class """ # Error Checking if directory == None or type(directory) != str: msg = "directory is not of type String." log.error(msg) raise TypeError(msg) if len(directory) < 1: msg = "Argument 'directory' is not a valid directory." log.error(msg) raise ValueError(msg) if directory[-1] != '/': directory += '/' # Get data from tensor_data.txt with open(directory+tensor_file, "r") as f: order, nnz, mode_sizes, entries = sio.read_sptensor_entries(f) # Get backtrack data spt_backtrack = sio.read_sptensor_backtrack(directory) # Get queries queries_fn_pattern = re.compile('queries_mode_[0-9]+.txt') queries_fns = sorted(filter(lambda x: queries_fn_pattern.fullmatch(x) is not None, os.listdir(directory))) if queries_fns: _, queries = sio.read_many_labels(directory, queries_fns) else: queries = None # Get labels label_fns = [] mode_map_fn_pattern = re.compile('map_mode_[0-9]+.txt') if read_map_modes: label_fns = sorted(filter(lambda x: mode_map_fn_pattern.fullmatch(x) is not None, os.listdir(directory))) if label_fns: mode_names, labels = sio.read_many_labels(directory, label_fns) df_entries = pd.DataFrame(data=entries, columns=mode_names+['val_idx']) for mode_id in range(order): df_entries[mode_names[mode_id]] = df_entries[mode_names[mode_id]].astype(int) return SPTensor(order, nnz, mode_sizes, df_entries, mode_names, labels, spt_backtrack, queries) else: mode_names = list(map(lambda x: 'mode_'+str(x), list(range(len(mode_sizes))))) labels = [['label_{}-{}'.format(str(i), str(x)) for x in range(mode_size)] for i, mode_size in enumerate(mode_sizes)] df_entries = pd.DataFrame(data=entries, columns=mode_names+['val_idx']) for mode_id in range(order): df_entries[mode_names[mode_id]] = df_entries[mode_names[mode_id]].astype(int) return SPTensor(order, nnz, mode_sizes, df_entries, mode_names, labels, spt_backtrack, queries)
def read_sptensor_file(filename): """Reads a sparse tensor file from the filesystem. Looks for mode maps formatted as map_mode_<i>.txt and reads the metadata into ``sptensor`` if found. Parameters ---------- filename : str Path to a directory containing sptensor files or to a tensor_data.txt file. Returns ------- sptensor : SPTensor The sparse tensor in ``filename`` Raises ------ Exception If the sparse tensor cannot be read or is not well formed. See also -------- ensign.sptensor.SPTensor : Sparse tensor class """ if os.path.isdir(filename): return read_sptensor(filename) elif os.path.isfile(filename): return read_sptensor(os.path.dirname(filename)+'/', os.path.basename(filename)) else: msg = '{} is not a valid sptensor file or directory.'.format(filename) log.error(msg) raise IOError(msg) def write_sptensor_file(filename, sptensor): """Writes a sparse tensor to the filesystem. Parameters ---------- filename : str Directory/filename to write the tensor to. If a filename, the directory will be inferred. This directory will be created if nonexistent and **will be overwritten** if it exists. sptensor : SPTensor The sparse tensor to be written. Raises ------ Exception If the sparse tensor could not be written. See also -------- ensign.sptensor.SPTensor : Sparse tensor class """ if os.path.isdir(filename): return write_sptensor(filename, sptensor) else: return write_sptensor(os.path.dirname(filename)+'/', sptensor, os.path.basename(filename))