import numpy as np
import numba as nb
from numba import njit, prange
from numba.extending import overload
from typing import Optional, Tuple, Union, Callable, Dict, Any, List
from tqdm import tqdm
__all__ = [
"propagation_distance",
"resistance_distance",
"heat_kernel_distance",
"shortest_path_distance",
"topological_distance",
"compute_node_payoff",
"simulate_network_evolution",
"find_optimal_alpha",
"validate_parameters"
]
# -----------------------------------------------------------------------------
# Helpers
# -----------------------------------------------------------------------------
@njit(fastmath=True)
def _power_iteration(A: np.ndarray, num_iters: int = 100, eps: float = 1e-6) -> float:
"""
Computes spectral radius using power iteration.
Robust to asymmetric matrices by tracking norm growth.
"""
n = A.shape[0]
v = np.ones(n, dtype=np.float64)
for i in range(n):
v[i] += 1e-3 * (i % 2)
norm_v = np.linalg.norm(v)
if norm_v == 0: return 0.0
v = v / norm_v
prev_rho = 0.0
rho = 0.0
for _ in range(num_iters):
v_next = A @ v
norm_next = np.linalg.norm(v_next)
if norm_next < 1e-12:
return 0.0
v = v_next / norm_next
rho = norm_next
if np.abs(rho - prev_rho) < eps * rho:
return rho
prev_rho = rho
return rho
@njit(fastmath=True)
def _spectral_normalization(adjacency: np.ndarray, target_radius: float = 1.0) -> np.ndarray:
# Use power iteration for speed (O(N^2) vs O(N^3))
spec_rad = _power_iteration(adjacency)
if spec_rad > 1e-9:
return adjacency * (target_radius / spec_rad)
return adjacency
@njit(fastmath=True)
def _apply_weighting(adjacency: np.ndarray, distance_matrix: np.ndarray, weight_coefficient: float) -> np.ndarray:
if weight_coefficient == 0.0:
return adjacency.copy()
weights = np.exp(-weight_coefficient * distance_matrix)
return adjacency * weights
@njit(fastmath=True)
def _log_normalize_neg(matrix: np.ndarray) -> np.ndarray:
"""Computes -log(matrix) safely."""
n = matrix.shape[0]
res = np.zeros((n, n), dtype=np.float64)
for i in range(n):
for j in range(n):
val = matrix[i, j]
if val > 1e-20:
res[i, j] = -np.log(val)
else:
res[i, j] = 0.0
return res
@njit(fastmath=True)
def _get_component_size(adjacency: np.ndarray, node: int) -> int:
"""
Computes the size of the connected component containing the given node using BFS.
"""
n = len(adjacency)
visited = np.zeros(n, dtype=np.bool_)
q = np.zeros(n, dtype=np.int64)
head = 0
tail = 0
q[tail] = node
tail += 1
visited[node] = True
count = 0
while head < tail:
u = q[head]
head += 1
count += 1
for v in range(n):
if adjacency[u, v] > 0 and not visited[v]:
visited[v] = True
q[tail] = v
tail += 1
return count
def get_tolerance(tolerance, index):
pass
@overload(get_tolerance)
def ol_get_tolerance(tolerance, index):
if isinstance(tolerance, nb.types.Array):
return lambda tolerance, index: tolerance[index]
else:
return lambda tolerance, index: tolerance
# -----------------------------------------------------------------------------
# Distance Functions
# -----------------------------------------------------------------------------
[docs]
@njit(fastmath=True)
def propagation_distance(
adjacency_matrix: np.ndarray,
spatial_decay: float = 0.8,
symmetric: bool = True,
distance_matrix: Optional[np.ndarray] = None,
weight_coefficient: float = 0.0
) -> np.ndarray:
"""
Computes the propagation distance matrix using Numba.
NOTE: This is a bit unstable under Numba with larger spatial decay values.
Try heat kernel distance for symmetric graphs, they're highly correlated.
The propagation distance is defined as the negative log of the influence matrix.
The influence matrix is computed using either the LAM (Linear Attenuation Model)
or SAR (Spatial Autoregressive) model.
Args:
adjacency_matrix: The adjacency matrix (n_nodes, n_nodes).
spatial_decay: Decay parameter (0 < spatial_decay < 1/spectral_radius).
symmetric: If True, uses SAR model (symmetric influence).
If False, uses LAM model (directed influence).
distance_matrix: The distance matrix (n_nodes, n_nodes). Required if weight_coefficient > 0.
weight_coefficient: The decay coefficient for distance weighting.
Returns:
The propagation distance matrix (n_nodes, n_nodes).
"""
if distance_matrix is not None:
weighted_adj = _apply_weighting(adjacency_matrix, distance_matrix, weight_coefficient)
else:
weighted_adj = adjacency_matrix.copy()
weighted_adj = _spectral_normalization(weighted_adj, 1.0)
n = len(adjacency_matrix)
I = np.eye(n, dtype=np.float64)
A_sys = I - spatial_decay * weighted_adj
# Invert
inv_matrix = np.linalg.solve(A_sys, I)
if symmetric:
influence = inv_matrix @ inv_matrix.T
else:
influence = inv_matrix
return _log_normalize_neg(influence)
[docs]
@njit(fastmath=True)
def resistance_distance(
adjacency: np.ndarray,
distance_matrix: Optional[np.ndarray] = None,
weight_coefficient: float = 0.0
) -> np.ndarray:
"""
Computes resistance distances between all pairs of nodes using Numba.
The resistance distance is computed using the Moore-Penrose pseudoinverse
of the Laplacian matrix. It treats the graph as an electrical network
where edges are resistors.
Args:
adjacency: The adjacency matrix (n_nodes, n_nodes).
distance_matrix: The distance matrix (n_nodes, n_nodes). Required if weight_coefficient > 0.
weight_coefficient: The decay coefficient for distance weighting.
Returns:
The resistance distance matrix (n_nodes, n_nodes).
"""
if distance_matrix is not None:
weighted_adj = _apply_weighting(adjacency, distance_matrix, weight_coefficient)
else:
weighted_adj = adjacency.copy()
# Symmetrize
W = (weighted_adj + weighted_adj.T) / 2.0
degree = np.sum(W, axis=1)
L = np.diag(degree) - W
# Pseudoinverse
pinv = np.linalg.pinv(L)
diag_pinv = np.diag(pinv)
n = len(adjacency)
resistance = np.zeros((n, n), dtype=np.float64)
for i in range(n):
for j in range(n):
val = diag_pinv[i] + diag_pinv[j] - 2 * pinv[i, j]
if val < 0: val = 0.0
if i == j: val = 0.0
resistance[i, j] = val
return resistance
[docs]
@njit(fastmath=True)
def heat_kernel_distance(
adjacency_matrix: np.ndarray,
t: float = 0.5,
eps: float = 1e-10,
distance_matrix: Optional[np.ndarray] = None,
weight_coefficient: float = 0.0
) -> np.ndarray:
"""
Computes the heat kernel distance matrix at diffusion time t using Numba.
The heat kernel distance is defined as -log(exp(-t * L)), where L is the Laplacian.
Args:
adjacency_matrix: The adjacency matrix (n_nodes, n_nodes).
t: Diffusion time parameter.
eps: Small constant to avoid log(0).
distance_matrix: The distance matrix (n_nodes, n_nodes). Required if weight_coefficient > 0.
weight_coefficient: The decay coefficient for distance weighting.
Returns:
The heat kernel distance matrix (n_nodes, n_nodes).
"""
if distance_matrix is not None:
weighted_adj = _apply_weighting(adjacency_matrix, distance_matrix, weight_coefficient)
else:
weighted_adj = adjacency_matrix.copy()
degree = np.sum(weighted_adj, axis=1)
L = np.diag(degree) - weighted_adj
# Use eigh for symmetric matrices (Laplacian is symmetric for undirected)
# If directed, this might be wrong, but heat kernel is typically for undirected.
# Numba supports eigh.
vals, vecs = np.linalg.eigh(L)
# exp(-t * L) = V * diag(exp(-t * vals)) * V.T
exp_vals = np.exp(-t * vals)
# kernel = vecs @ diag(exp_vals) @ vecs.T
# Optimized: (vecs * exp_vals) @ vecs.T
kernel = (vecs * exp_vals) @ vecs.T
# -log
n = len(adjacency_matrix)
dist = np.zeros((n, n), dtype=np.float64)
for i in range(n):
for j in range(n):
val = kernel[i, j]
if val < eps: val = eps
dist[i, j] = -np.log(val)
return dist
[docs]
@njit(fastmath=True)
def shortest_path_distance(
adjacency_matrix: np.ndarray,
distance_matrix: Optional[np.ndarray] = None,
weight_coefficient: float = 0.0
) -> np.ndarray:
"""
Computes shortest-path distances between all pairs of nodes using Floyd-Warshall (Numba).
Args:
adjacency_matrix: The adjacency matrix (n_nodes, n_nodes).
distance_matrix: The distance matrix (n_nodes, n_nodes). Required if weight_coefficient > 0.
weight_coefficient: The decay coefficient for distance weighting.
Returns:
The shortest path distance matrix (n_nodes, n_nodes).
"""
if distance_matrix is not None:
weighted_adj = _apply_weighting(adjacency_matrix, distance_matrix, weight_coefficient)
else:
weighted_adj = adjacency_matrix.copy()
n = len(adjacency_matrix)
dist = np.full((n, n), np.inf, dtype=np.float64)
# Initialize
for i in range(n):
dist[i, i] = 0.0
for j in range(n):
w = weighted_adj[i, j]
if w > 0:
dist[i, j] = 1.0 / w
# Floyd-Warshall
for k in range(n):
for i in range(n):
for j in range(n):
d = dist[i, k] + dist[k, j]
if d < dist[i, j]:
dist[i, j] = d
return dist
[docs]
@njit(fastmath=True)
def topological_distance(
adj_matrix: np.ndarray,
distance_matrix: Optional[np.ndarray] = None,
weight_coefficient: float = 0.0
) -> np.ndarray:
"""
Computes pairwise topological distance based on cosine similarity of neighbors using Numba.
Returns 1 - cosine_similarity. Nodes with similar connectivity patterns
will have small topological distance.
Args:
adj_matrix: The adjacency matrix (n_nodes, n_nodes).
distance_matrix: The distance matrix (n_nodes, n_nodes). Required if weight_coefficient > 0.
weight_coefficient: The decay coefficient for distance weighting.
Returns:
The topological distance matrix (n_nodes, n_nodes).
"""
if distance_matrix is not None:
weighted_adj = _apply_weighting(adj_matrix, distance_matrix, weight_coefficient)
else:
weighted_adj = adj_matrix.copy()
n = len(adj_matrix)
dist = np.zeros((n, n), dtype=np.float64)
# Precompute norms
norms = np.zeros(n, dtype=np.float64)
for i in range(n):
norms[i] = np.sqrt(np.sum(weighted_adj[i]**2))
for i in range(n):
for j in range(i, n):
if norms[i] == 0 or norms[j] == 0:
sim = 0.0
else:
dot = np.sum(weighted_adj[i] * weighted_adj[j])
sim = dot / (norms[i] * norms[j])
d = 1.0 - sim
dist[i, j] = d
dist[j, i] = d
return dist
# -----------------------------------------------------------------------------
# Simulation Logic
# -----------------------------------------------------------------------------
[docs]
@njit(fastmath=True)
def compute_node_payoff(
node: int,
adjacency: np.ndarray,
distance_matrix: np.ndarray,
distance_fn_type: int, # 0: prop, 1: res, 2: heat, 3: sp, 4: topo
alpha: float,
beta: float,
connectivity_penalty: float,
node_resources: Optional[np.ndarray],
# Distance fn params
spatial_decay: float,
symmetric: bool,
weight_coefficient: float,
t_param: float
) -> float:
"""
Computes the payoff for a single node based on distance, wiring cost, and connectivity.
The payoff is defined as:
Payoff = - (alpha * distance_term + beta * wiring_cost + connectivity_penalty * disconnected_nodes)
Args:
node: Index of the node.
adjacency: The current adjacency matrix (n_nodes, n_nodes).
distance_matrix: Pre-computed Euclidean distance matrix (n_nodes, n_nodes).
distance_fn_type: Integer code for distance function (0: prop, 1: res, 2: heat, 3: sp, 4: topo).
alpha: Weight of the distance term.
beta: Weight of the wiring cost term.
connectivity_penalty: Penalty for disconnected components.
node_resources: Optional vector of node resources.
spatial_decay: Parameter for propagation distance.
symmetric: Parameter for propagation distance.
weight_coefficient: Parameter for distance weighting.
t_param: Parameter for heat kernel distance.
Returns:
The calculated payoff value.
"""
# Dispatch
if distance_fn_type == 0:
dists = propagation_distance(adjacency, spatial_decay, symmetric, distance_matrix, weight_coefficient)
elif distance_fn_type == 1:
dists = resistance_distance(adjacency, distance_matrix, weight_coefficient)
elif distance_fn_type == 2:
dists = heat_kernel_distance(adjacency, t_param, 1e-10, distance_matrix, weight_coefficient)
elif distance_fn_type == 3:
dists = shortest_path_distance(adjacency, distance_matrix, weight_coefficient)
else:
dists = topological_distance(adjacency, distance_matrix, weight_coefficient)
# Payoff
comm_cost = np.sum(dists[node])
euclidean = distance_matrix[node]
if node_resources is not None:
effective_dist = np.maximum(0.0, euclidean - node_resources[node])
wiring_cost = np.sum(adjacency[node] * effective_dist)
else:
wiring_cost = np.sum(adjacency[node] * euclidean)
payoff = - (alpha * comm_cost + beta * wiring_cost)
if connectivity_penalty != 0:
comp_size = _get_component_size(adjacency, node)
payoff -= connectivity_penalty * (len(adjacency) - comp_size)
return payoff
@njit(parallel=True, fastmath=True)
def _evaluate_candidates_batch(
current_adj: np.ndarray,
current_payoffs: np.ndarray,
candidates_u: np.ndarray,
candidates_v: np.ndarray,
distance_matrix: np.ndarray,
alpha: float,
beta: float,
connectivity_penalty: float,
node_resources: np.ndarray,
distance_fn_type: int,
spatial_decay: float,
symmetric: bool,
weight_coefficient: float,
t_param: float,
tolerance: Union[float, np.ndarray]
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
n_candidates = len(candidates_u)
is_beneficial = np.zeros(n_candidates, dtype=np.bool_)
n_nodes = len(current_adj)
for k in prange(n_candidates):
u = candidates_u[k]
v = candidates_v[k]
# Copy adj
cand_adj = current_adj.copy()
val = 1.0 - cand_adj[u, v]
cand_adj[u, v] = val
if symmetric:
cand_adj[v, u] = val
# Compute new payoffs for u and v
payoff_u = 0.0
payoff_v = 0.0
# Wiring
if beta != 0:
euclidean_u = distance_matrix[u]
euclidean_v = distance_matrix[v]
effective_dist_u = np.maximum(0.0, euclidean_u - node_resources[u])
effective_dist_v = np.maximum(0.0, euclidean_v - node_resources[v])
wiring_u = np.sum(cand_adj[u] * effective_dist_u)
wiring_v = np.sum(cand_adj[v] * effective_dist_v)
payoff_u -= beta * wiring_u
payoff_v -= beta * wiring_v
# Comm
if alpha != 0:
if distance_fn_type == 0:
dists = propagation_distance(cand_adj, spatial_decay, symmetric, distance_matrix, weight_coefficient)
elif distance_fn_type == 1:
dists = resistance_distance(cand_adj, distance_matrix, weight_coefficient)
elif distance_fn_type == 2:
dists = heat_kernel_distance(cand_adj, t_param, 1e-10, distance_matrix, weight_coefficient)
elif distance_fn_type == 3:
dists = shortest_path_distance(cand_adj, distance_matrix, weight_coefficient)
else:
dists = topological_distance(cand_adj, distance_matrix, weight_coefficient)
comm_u = np.sum(dists[u])
comm_v = np.sum(dists[v])
payoff_u -= alpha * comm_u
payoff_v -= alpha * comm_v
new_payoff_u = payoff_u
new_payoff_v = payoff_v
# Connectivity penalty
if connectivity_penalty != 0:
comp_size_u = _get_component_size(cand_adj, u)
new_payoff_u -= connectivity_penalty * (n_nodes - comp_size_u)
comp_size_v = _get_component_size(cand_adj, v)
new_payoff_v -= connectivity_penalty * (n_nodes - comp_size_v)
diff_u = new_payoff_u - current_payoffs[u]
diff_v = new_payoff_v - current_payoffs[v]
tol_u = get_tolerance(tolerance, u)
tol_v = get_tolerance(tolerance, v)
if diff_u > tol_u or diff_v > tol_v:
is_beneficial[k] = True
return candidates_u, candidates_v, is_beneficial
@njit(fastmath=True)
def _compute_all_payoffs(
adj: np.ndarray,
d_mat: np.ndarray,
a: float,
b: float,
cp: float,
node_res: np.ndarray,
d_type: int,
s_decay: float,
sym: bool,
w_coef: float,
t_par: float
) -> np.ndarray:
n = len(adj)
payoffs = np.zeros(n, dtype=np.float64)
if a != 0:
if d_type == 0:
dists = propagation_distance(adj, s_decay, sym, d_mat, w_coef)
elif d_type == 1:
dists = resistance_distance(adj, d_mat, w_coef)
elif d_type == 2:
dists = heat_kernel_distance(adj, t_par, 1e-10, d_mat, w_coef)
elif d_type == 3:
dists = shortest_path_distance(adj, d_mat, w_coef)
else:
dists = topological_distance(adj, d_mat, w_coef)
comm = np.sum(dists, axis=1)
payoffs -= a * comm
# Wiring with resources
if b != 0:
wiring = np.zeros(n, dtype=np.float64)
for i in range(n):
eff_dist = np.maximum(0.0, d_mat[i] - node_res[i])
wiring[i] = np.sum(adj[i] * eff_dist)
payoffs -= b * wiring
if cp != 0:
for i in range(n):
sz = _get_component_size(adj, i)
payoffs[i] -= cp * (n - sz)
return payoffs
[docs]
def simulate_network_evolution(
distance_matrix: np.ndarray,
n_iterations: int,
distance_fn: Union[str, Callable] = "propagation", # String or dummy
alpha: Union[float, np.ndarray] = 1.0,
beta: Union[float, np.ndarray] = 1.0,
connectivity_penalty: Union[float, np.ndarray] = 0.0,
initial_adjacency: Optional[np.ndarray] = None,
n_jobs: int = -1, # Ignored, used for compat
batch_size: Union[int, np.ndarray] = 32,
node_resources: Optional[np.ndarray] = None,
payoff_tolerance: Union[float, np.ndarray] = 0.0,
random_seed: Optional[int] = None,
symmetric: bool = True,
# Distance fn specific args
spatial_decay: float = 0.8,
weight_coefficient: float = 0.0,
t: float = 0.5, # for heat kernel
verbose: bool = True,
**kwargs
) -> np.ndarray:
"""
Simulates the evolution of a network through game-theoretic payoff optimization using Numba.
At each step, random edges are selected and "flipped" (added or removed).
The change is accepted if it improves the payoff for at least one of the nodes involved
(unilateral consent), subject to a tolerance threshold.
Args:
distance_matrix: Pre-computed Euclidean distance matrix (n_nodes, n_nodes).
n_iterations: Total number of simulation steps.
distance_fn: Name of the distance function ("propagation", "resistance", "heat", "shortest", "topological") or the function itself.
alpha: Weight of the distance term (float or trajectory array).
beta: Weight of the wiring cost term (float or trajectory array).
connectivity_penalty: Penalty for disconnected components (float or trajectory array).
initial_adjacency: Starting adjacency matrix. If None, starts with a ring lattice.
n_jobs: Ignored in Numba implementation (uses internal parallelism).
batch_size: Number of potential edge flips to evaluate per iteration.
node_resources: Optional resources for each node to subsidize wiring costs.
payoff_tolerance: Minimum payoff improvement required to accept a change.
random_seed: Seed for random number generator.
symmetric: If True, enforces undirected edges (symmetry).
spatial_decay: Decay parameter for propagation distance.
weight_coefficient: Weight coefficient for distance weighting.
t: Time parameter for heat kernel distance.
verbose: If True, shows progress bar.
**kwargs: Additional arguments (ignored).
Returns:
A 3D array of shape (n_nodes, n_nodes, n_iterations) containing the
adjacency matrix at each time step.
"""
if random_seed is not None:
np.random.seed(random_seed)
# Map distance_fn to int
# 0: prop, 1: res, 2: heat, 3: sp, 4: topo
if isinstance(distance_fn, str):
name = distance_fn.lower()
else:
name = distance_fn.__name__.lower()
if "propagation" in name: dist_type = 0
elif "resistance" in name: dist_type = 1
elif "heat" in name: dist_type = 2
elif "shortest" in name: dist_type = 3
elif "topological" in name: dist_type = 4
else: dist_type = 0 # Default
n_nodes = distance_matrix.shape[0]
if initial_adjacency is None:
adjacency = np.zeros((n_nodes, n_nodes), dtype=np.float64)
idx = np.arange(n_nodes)
adjacency[idx, (idx + 1) % n_nodes] = 1.0
adjacency[(idx + 1) % n_nodes, idx] = 1.0
else:
adjacency = initial_adjacency.astype(np.float64).copy()
if node_resources is None:
node_resources_arr = np.zeros(n_nodes, dtype=np.float64)
else:
node_resources_arr = node_resources.astype(np.float64)
history = np.zeros((n_nodes, n_nodes, n_iterations))
history[:, :, 0] = adjacency
def get_val(param, idx):
if isinstance(param, (np.ndarray, list)):
return param[idx]
return param
# Initial payoffs
current_payoffs = _compute_all_payoffs(
adj=adjacency,
d_mat=distance_matrix,
a=get_val(alpha, 0),
b=get_val(beta, 0),
cp=get_val(connectivity_penalty, 0),
node_res=node_resources_arr,
d_type=dist_type,
s_decay=spatial_decay,
sym=symmetric,
w_coef=weight_coefficient,
t_par=t
)
for step in tqdm(range(1, n_iterations), desc="Simulating network evolution", disable=not verbose):
a_t = get_val(alpha, step)
b_t = get_val(beta, step)
cp_t = get_val(connectivity_penalty, step)
bs_t = int(get_val(batch_size, step))
tol_t = get_val(payoff_tolerance, step)
# Recompute current payoffs if params changed
current_payoffs = _compute_all_payoffs(
adj=adjacency,
d_mat=distance_matrix,
a=a_t,
b=b_t,
cp=cp_t,
node_res=node_resources_arr,
d_type=dist_type,
s_decay=spatial_decay,
sym=symmetric,
w_coef=weight_coefficient,
t_par=t
)
u_indices = np.random.randint(0, n_nodes, bs_t)
v_indices = np.random.randint(0, n_nodes, bs_t)
mask = u_indices == v_indices
while np.any(mask):
v_indices[mask] = np.random.randint(0, n_nodes, np.sum(mask))
mask = u_indices == v_indices
_, _, beneficial = _evaluate_candidates_batch(
current_adj=adjacency,
current_payoffs=current_payoffs,
candidates_u=u_indices,
candidates_v=v_indices,
distance_matrix=distance_matrix,
alpha=a_t,
beta=b_t,
connectivity_penalty=cp_t,
node_resources=node_resources_arr,
distance_fn_type=dist_type,
spatial_decay=spatial_decay,
symmetric=symmetric,
weight_coefficient=weight_coefficient,
t_param=t,
tolerance=tol_t
)
if np.any(beneficial):
idx_ben = np.where(beneficial)[0]
for k in idx_ben:
u = u_indices[k]
v = v_indices[k]
val = 1.0 - adjacency[u, v]
adjacency[u, v] = val
if symmetric:
adjacency[v, u] = val
history[:, :, step] = adjacency
return history
@njit(parallel=True, fastmath=True)
def _compute_impact_batch(
adjacency: np.ndarray,
distance_matrix: np.ndarray,
edges: np.ndarray,
dist_type: int,
alpha: float,
beta: float,
spatial_decay: float,
weight_coefficient: float,
t_param: float
) -> np.ndarray:
n_edges = len(edges)
n_nodes = len(adjacency)
impact_matrix = np.zeros((n_nodes, n_nodes), dtype=np.float64)
for k in prange(n_edges):
u = edges[k, 0]
v = edges[k, 1]
payoff_u_old = compute_node_payoff(
u, adjacency, distance_matrix, dist_type, alpha, beta, 0.0, None,
spatial_decay, True, weight_coefficient, t_param
)
payoff_v_old = compute_node_payoff(
v, adjacency, distance_matrix, dist_type, alpha, beta, 0.0, None,
spatial_decay, True, weight_coefficient, t_param
)
adj_lesioned = adjacency.copy()
adj_lesioned[u, v] = 0.0
adj_lesioned[v, u] = 0.0
payoff_u_new = compute_node_payoff(
u, adj_lesioned, distance_matrix, dist_type, alpha, beta, 0.0, None,
spatial_decay, True, weight_coefficient, t_param
)
payoff_v_new = compute_node_payoff(
v, adj_lesioned, distance_matrix, dist_type, alpha, beta, 0.0, None,
spatial_decay, True, weight_coefficient, t_param
)
impact_matrix[u, v] = payoff_u_new - payoff_u_old
impact_matrix[v, u] = payoff_v_new - payoff_v_old
return impact_matrix
[docs]
def find_optimal_alpha(
distance_matrix: np.ndarray,
empirical_connectivity: np.ndarray,
distance_fn: Callable,
n_iterations: int = 10_000,
beta: float = 1.0,
alpha_range: tuple[float, float] = (1.0, 100.0),
tolerance: float = 0.01,
max_search_iterations: int = 20,
random_seed: int = 11,
batch_size: int = 16,
symmetric: bool = True,
n_jobs: int = -1,
connectivity_penalty: float = 0.0,
payoff_tolerance: float = 0.0,
verbose: bool = True,
**kwargs
) -> Dict[str, Any]:
"""
Finds the optimal alpha value that produces a network with density closest to empirical.
This function uses a bisection search (with linear interpolation) to find the alpha
parameter that results in a generated network with the same density as the provided
empirical network.
Args:
distance_matrix: Precomputed distance matrix (n_nodes, n_nodes).
empirical_connectivity: Target connectivity matrix to match density with.
distance_fn: Distance metric function.
n_iterations: Number of iterations for each simulation.
beta: Wiring cost parameter.
alpha_range: Range for alpha search (min, max).
tolerance: Acceptable difference between densities.
max_search_iterations: Maximum number of search iterations.
random_seed: Random seed for reproducibility.
batch_size: Batch size for parallel processing.
symmetric: If True, enforces symmetry in generated networks.
n_jobs: Number of parallel jobs.
connectivity_penalty: Penalty for connectivity.
payoff_tolerance: Threshold for accepting new configuration.
verbose: If True, prints search progress.
**kwargs: Additional arguments passed to `simulate_network_evolution`.
Returns:
Dictionary containing:
- 'alpha': Optimal alpha value.
- 'density': Density of the resulting network.
- 'evolution': Full history of adjacency matrices.
Example:
>>> dist_mat = np.random.rand(10, 10)
>>> emp_conn = np.random.randint(0, 2, (10, 10))
>>> result = find_optimal_alpha(
... distance_matrix=dist_mat,
... empirical_connectivity=emp_conn,
... distance_fn=shortest_path_distance
... )
>>> print(f"Optimal alpha: {result['alpha']}")
"""
# Calculate empirical density
n_nodes = empirical_connectivity.shape[0]
empirical_density = np.sum(empirical_connectivity.astype(bool).astype(int)) / (n_nodes * (n_nodes - 1))
# Set up fixed parameters as vectors
beta_vec = np.full(n_iterations, beta)
penalty_vec = np.full(n_iterations, connectivity_penalty)
batch_size_vec = np.full(n_iterations, batch_size)
# Function to simulate network and get density
def simulate_with_alpha(alpha_value):
alpha_vec = np.full(n_iterations, alpha_value)
network = simulate_network_evolution(
distance_matrix=distance_matrix,
n_iterations=n_iterations,
distance_fn=distance_fn,
alpha=alpha_vec,
beta=beta_vec,
connectivity_penalty=penalty_vec,
n_jobs=n_jobs,
random_seed=random_seed,
batch_size=batch_size_vec,
symmetric=symmetric,
payoff_tolerance=payoff_tolerance,
verbose=verbose,
**kwargs
)
# Get final network
final_adj = network[:, :, -1]
density = np.sum(final_adj.astype(bool).astype(int)) / (n_nodes * (n_nodes - 1))
return density, network
# Initialize search with provided range
alpha_min, alpha_max = alpha_range
min_density, min_net = simulate_with_alpha(alpha_min)
max_density, max_net = simulate_with_alpha(alpha_max)
if verbose:
print(f"Initial range: alpha=[{alpha_min}, {alpha_max}], density=[{min_density}, {max_density}]")
print(f"Target density: {empirical_density}")
# Track all tested points
tested_points = [
{'alpha': alpha_min, 'density': min_density, 'evolution': min_net},
{'alpha': alpha_max, 'density': max_density, 'evolution': max_net}
]
# Check if range brackets the target
if (min_density > empirical_density and max_density > empirical_density) or \
(min_density < empirical_density and max_density < empirical_density):
if verbose:
print("Warning: Initial range does not bracket the target density!")
# Return best of the two
return min(tested_points, key=lambda p: abs(p['density'] - empirical_density))
# Bisection search with linear interpolation
iterations = 0
while iterations < max_search_iterations:
# Use linear interpolation to estimate next alpha
alpha_mid = alpha_min + (alpha_max - alpha_min) * \
(empirical_density - min_density) / (max_density - min_density)
# Fallback to bisection if interpolation gives unusual values
if not (alpha_min < alpha_mid < alpha_max):
alpha_mid = (alpha_min + alpha_max) / 2
if verbose:
print(f"Iteration {iterations+1}: Testing alpha={alpha_mid}")
mid_density, mid_net = simulate_with_alpha(alpha_mid)
tested_points.append({'alpha': alpha_mid, 'density': mid_density, 'evolution': mid_net})
if verbose:
print(f"Alpha={alpha_mid} → density={mid_density}, diff from target={mid_density-empirical_density}")
# Check if we're close enough
if abs(mid_density - empirical_density) < tolerance:
return {'alpha': alpha_mid, 'density': mid_density, 'evolution': mid_net}
# Update range
if mid_density < empirical_density:
alpha_min = alpha_mid
min_density = mid_density
else:
alpha_max = alpha_mid
max_density = mid_density
# Check if range is small enough
if alpha_max - alpha_min < tolerance:
return min(tested_points, key=lambda p: abs(p['density'] - empirical_density))
iterations += 1
# If we reach max iterations, return best point found
return min(tested_points, key=lambda p: abs(p['density'] - empirical_density))