Source code for yanat.utils

from __future__ import annotations
import numpy as np
from typing import Generator, Optional, Union
import pandas as pd
import _pickle as pk
from datetime import datetime
from pathlib import Path
from sklearn.model_selection import ParameterGrid
from tqdm import tqdm
from copy import deepcopy


[docs] def find_density(adjacency_matrix: np.ndarray) -> float: """Finds the density of the given adjacency matrix. It's the ratio of the number of edges to the number of possible edges. Args: adjacency_matrix (np.ndarray): The adjacency matrix of the network. Returns: float: The density of the network. """ return np.where(adjacency_matrix != 0, 1, 0).sum() / adjacency_matrix.shape[0] ** 2
[docs] def minmax_normalize( data: Union[pd.DataFrame, np.ndarray], ) -> Union[pd.DataFrame, np.ndarray]: """Normalizes data between 0 and 1. Args: data (Union[pd.DataFrame, np.ndarray]): Data to be normalized. Can be a DataFrame or an np array but in both cases it should be at most 2D. Returns: Union[pd.DataFrame, np.ndarray]: Normalized data with the same shape as the input. """ return (data - data.min()) / (data.max() - data.min())
[docs] def log_normalize(adjacency_matrix: np.ndarray) -> np.ndarray: """Returns the logarithm of the data (adjacency_matrix) but also takes care of the infinit values. Args: adjacency_matrix (np.ndarray): Adjacency matrix of the network. Technically can be any matrix but I did it for the adjacency matrices. Returns: np.ndarray: Normalized data with the same shape as the input. """ return np.nan_to_num(np.log(adjacency_matrix), neginf=0, posinf=0)
[docs] def log_minmax_normalize(adjacency_matrix: np.ndarray) -> np.ndarray: """It first takes the logarithm of the data and then normalizes it between 0 and 1. It also takes care of the infinit values and those nasty things. Args: adjacency_matrix (np.ndarray): Adjacency matrix of the network. Technically can be any matrix but I did it for the adjacency matrices. Returns: np.ndarray: Normalized data with the same shape as the input. """ lognorm_adjacency_matrix = minmax_normalize(log_normalize(adjacency_matrix)) np.fill_diagonal(lognorm_adjacency_matrix, 0.0) return np.where(lognorm_adjacency_matrix != 1.0, lognorm_adjacency_matrix, 0.0)
[docs] def spectral_normalization( target_radius: float, adjacency_matrix: np.ndarray ) -> np.ndarray: """Normalizes the adjacency matrix to have a spectral radius of the target_radius. Good to keep the system stable. Args: target_radius (float): A value below 1.0. It's the spectral radius that you want to achieve. But use 1.0 if you're planning to change the global coupling strength somewhere. adjacency_matrix (np.ndarray): Adjacency matrix of the network. Returns: np.ndarray: Normalized adjacency matrix with the same shape as the input. """ spectral_radius = np.max(np.abs(np.linalg.eigvals(adjacency_matrix))) return adjacency_matrix * target_radius / spectral_radius
[docs] def strength_normalization(adjacency_matrix: np.ndarray) -> np.ndarray: """ Normalizes the adjacency matrix to subside the effect of high-strength (or high-degree) nodes. This function implements the strength normalization algorithm described in [1]. The algorithm aims to reduce the influence of high-strength nodes in a network by scaling the adjacency matrix. Parameters: adjacency_matrix (np.ndarray): The adjacency matrix of the network. It should be a square matrix of shape (n, n), where n is the number of nodes in the network. Returns: np.ndarray: The normalized adjacency matrix with the same shape as the input. References: [1] https://royalsocietypublishing.org/doi/full/10.1098/rsif.2008.0484 """ strength: np.ndarray = adjacency_matrix.sum(1) normalized_strength: np.ndarray = np.power(strength, -0.5) diagonalized_normalized_strength: np.ndarray = np.diag(normalized_strength) normalized_adjacency_matrix: np.ndarray = ( diagonalized_normalized_strength
[docs] @ adjacency_matrix @ diagonalized_normalized_strength ) return normalized_adjacency_matrix
def optimal_influence_default_values( adjacency_matrix: np.ndarray, location: str = "adjacency_matrices_for_oi", random_seed: int = 11, ) -> dict: """ Returns the default values for the parameters of the optimal_influence function. Parameters: adjacency_matrix (np.ndarray): The adjacency matrix representing the network structure. location (str, optional): The location to save the adjacency matrix file. Defaults to "adjacency_matrices_for_oi". random_seed (int, optional): The random seed for generating input noise. Defaults to 11. Returns: dict: Default values for the parameters of the optimal_influence function. """ rng: Generator = np.random.default_rng(seed=random_seed) NOISE_STRENGTH: float = 1 DELTA: float = 0.01 TAU: float = 0.02 G: float = 0.5 DURATION: int = 10 input_noise: np.ndarray = rng.normal( 0, NOISE_STRENGTH, (adjacency_matrix.shape[0], int(DURATION / DELTA)) ) model_params: dict = { "dt": DELTA, "timeconstant": TAU, "coupling": G, "duration": DURATION, } timestamp: str = datetime.now().strftime("%Y_%m_%d-%I_%M_%S_%p") base_path: Path = Path(location) base_path.mkdir(parents=True, exist_ok=True) file_location: str = ( base_path / f"adjmat_{adjacency_matrix.shape[0]}_nodes_{timestamp}" ) with open(f"{file_location}.pkl", "wb") as f: pk.dump(adjacency_matrix, f) game_params: dict = { "adjacency_matrix": f"{file_location}.pkl", "input_noise": input_noise, "model_params": model_params, } # TODO: allow already pickled adjacency matrices to be used as input. # TODO: allow the user to specify an arbitrary parameter while keeping the rest as default. return game_params
[docs] def simple_fit( parameter_space: list[ParameterGrid], target_matrix: np.ndarray, model: callable, model_kwargs: Optional[dict] = None, normalize: Union[bool, callable] = False, ) -> ParameterGrid: """Simple (and honestly, ugly) fitting function to find the best parameters for a (communication) model. Does a normal for-loop so it's not as efficient but at the moment, doesn't need to be either! Args: parameter_space (list): Parameter space to search in, as of now, it should be a list[ParameterGrid] and I gotta fix it! target_matrix (np.ndarray): Which matrix to compare the model's output to, usually FC or OI. model (callable): Which (communication) model to use. model_kwargs (Optional[dict], optional): Extra things that the model wants. Defaults to None. normalize (Union[bool, callable], optional): If the output needs to be normalized before taking correlation. Defaults to False. Returns: list: Updated copy of the parameter space with the correlation values. """ model_kwargs: dict = model_kwargs if model_kwargs else {} results = deepcopy(parameter_space) for parameter in tqdm(results, total=len(parameter_space), desc="C3PO noises: "): estimation: np.ndarray = model(**parameter, **model_kwargs) if normalize: estimation: np.ndarray = normalize(estimation) r: float = _matrix_correlation(target_matrix, estimation) parameter.update({"correlation": r}) # TODO: This should be parallelized and sklearn compatible. return results
def _matrix_correlation(one_matrix: np.ndarray, another_matrix: np.ndarray) -> float: """Computes the Pearson's correlation between two matrices (not just the upper-triangle). Args: one_matrix (np.ndarray): One of the matrices. another_matrix (np.ndarray): Guess what, the other matrix. Returns: float: Pearson's correlation between the two matrices. """ return np.corrcoef(one_matrix.flatten(), another_matrix.flatten())[0, 1] # TODO: add a function to create example adjacency matrices for demonstration purposes.