Source code for chmncc.dataset.parser

Original parser
This code was adapted from

import numpy as np
import torch
import networkx as nx
import keras
from itertools import chain
from typing import Tuple, Dict, Tuple

# Skip the root nodes
to_skip = ["root", "GO0003674", "GO0005575", "GO0008150"]

[docs]class arff_data: """All the datasets they provide are in arff, this is the class""" def __init__(self, arff_file, is_GO, is_test=False): """Initialize the arff_data Args: arf_file [string]: arff file isGO [boolean]: whether it is the GO dataset is_test [boolean]: whether the dataset is test """ self.X, self.Y, self.A, self.terms, self.g = parse_arff( arff_file=arff_file, is_GO=is_GO, is_test=is_test ) # set all the non-skippable elements self.to_eval = [t not in to_skip for t in self.terms] r_, c_ = np.where(np.isnan(self.X)) m = np.nanmean(self.X, axis=0) # compute the mean ignoring the nans for i, j in zip(r_, c_): # set the mean values for the nans self.X[i, j] = m[j]
[docs]def parse_arff( arff_file: str, is_GO=False, is_test=False ) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor, nx.DiGraph]: """Parse the arff data Args: arf_file [str]: arff file isGO [bool]: whether it is the GO dataset is_test [bool]: whether the dataset is test Return: X [torch.Tensor] data instances Y [torch.Tensor] labels R [torch.Tensor[torch.Tensor]] adjacency matrix g [nx.DiGraph] graph """ with open(arff_file) as f: read_data = False X = [] Y = [] # create the graph g = nx.DiGraph() feature_types = [] d = [] cats_lens = [] for num_line, l in enumerate(f): if l.startswith("@ATTRIBUTE"): if l.startswith("@ATTRIBUTE class"): h = l.split("hierarchical")[1].strip() for branch in h.split(","): terms = branch.split("/") if is_GO: # GO add edge g.add_edge(terms[1], terms[0]) else: if len(terms) == 1: # add edge from root to term g.add_edge(terms[0], "root") else: # create the children terms for i in range(2, len(terms) + 1): g.add_edge( ".".join(terms[:i]), ".".join(terms[: i - 1]) ) # sort the nodes with respect to the distances of the root nodes = sorted( g.nodes(), key=lambda x: (nx.shortest_path_length(g, x, "root"), x) if is_GO else (len(x.split(".")), x), ) # get the nodes list nodes_idx = dict(zip(nodes, range(len(nodes)))) # reverse g_t = g.reverse() else: _, f_name, f_type = l.split() if f_type == "numeric" or f_type == "NUMERIC": d.append([]) cats_lens.append(1) feature_types.append( lambda x, i: [float(x)] if x != "?" else [np.nan] ) else: cats = f_type[1:-1].split(",") cats_lens.append(len(cats)) d.append( { key: keras.utils.to_categorical(i, len(cats)).tolist() for i, key in enumerate(cats) } ) feature_types.append( lambda x, i: d[i].get(x, [0.0] * cats_lens[i]) ) elif l.startswith("@DATA"): read_data = True elif read_data: y_ = np.zeros(len(nodes)) d_line = l.split("%")[0].strip().split(",") lab = d_line[len(feature_types)].strip() X.append( list( chain( *[ feature_types[i](x, i) for i, x in enumerate(d_line[: len(feature_types)]) ] ) ) ) # build the labels for t in lab.split("@"): y_[ [ nodes_idx.get(a) for a in nx.ancestors(g_t, t.replace("/", ".")) ] ] = 1 y_[nodes_idx[t.replace("/", ".")]] = 1 Y.append(y_) X = np.array(X) Y = np.stack(Y) return X, Y, np.array(nx.to_numpy_matrix(g, nodelist=nodes)), nodes, g
[docs]def initialize_dataset( name: str, datasets: Dict[str, Tuple[bool, str, str, str]] ) -> Tuple[arff_data, arff_data, arff_data]: """Initialize the dataset Args: name [str]: name of the dataset to prepare datasets Dict[List[bool, str, str, str]]: whether the dataset is GO, the train, validation and test data location Returns: train dataset [arff_data] validation dataset [arff_data] test dataset [arff_data] """ is_GO, train, val, test = datasets[name] return arff_data(train, is_GO), arff_data(val, is_GO), arff_data(test, is_GO, True)
[docs]def initialize_other_dataset( name: str, datasets: Dict[str, Tuple[bool, str, str]] ) -> Tuple[arff_data, arff_data]: """Initialize the dataset Args: name [str]: name of the dataset to prepare datasets [bool, str, str]: whether the dataset is go (?), the train and test data location Returns: train dataset [arff_data] test dataset [arff_data] """ is_GO, train, test = datasets[name] return arff_data(train, is_GO), arff_data(test, is_GO, True)