Skip to content

Get shapley table

Calculates Shapley values based on the filled contribution_space. Briefly, for a permutation (A,B,C) it will be:

(A,B,C) - (B,C) = Contribution of A to the coalition (B,C). (B,C) - (C) = Contribution of B to the coalition formed with (C). (C) = Contribution of C alone.

This will repeat over all permutations. and the result is a distribution of Shapley values for each element, note that the estimation method we're using here is an "unbiased estimator" so the variance is fairly large.

Parameters:

Name Type Description Default
contributions Dict

Filled Dictionary of coalition:result

None
permutation_space list

Should be the same passed to make_combination_space.

required
lesioned Optional[any]

leseioned element that will not be present in any combination

None
objective_function Callable

The game (in-silico experiment). It should get the complement set and return one numeric value either int or float. This function is just calling it as: objective_function(complement, **objective_function_params)

An example using networkx with some tips: (you sometimes need to specify what should happen during edge-cases like an all-lesioned network)

def local_efficiency(complements, graph): if len(complements) < 0: # the network is intact so: return nx.local_efficiency(graph)

elif len(complements) == len(graph):
    # the network is fully lesioned so:
    return 0.0

else:
    # lesion the system, calculate things
    lesioned = graph.copy()
    lesioned.remove_nodes_from(complements)
    return nx.local_efficiency(lesioned)
None
objective_function_params Dict

Kwargs for the objective_function.

None
lesioned Optional[any]

leseioned element that will not be present in any combination

None
multiprocessing_method str

So far, two methods of parallelization is implemented, 'joblib' and 'ray' and the default method is joblib. If using ray tho, you need to decorate your objective function with @ray.remote decorator. Visit their documentations to see how to go for it. I guess ray works better on HPC clusters (if they support it tho!) and probably doesn't suffer from the sneaky "memory leakage" of joblib. But just by playing around, I realized joblib is faster for tasks that are small themselves. Remedies are here: https://docs.ray.io/en/latest/auto_examples/tips-for-first-time.html

Note: Generally, multiprocessing isn't always faster as explained above. Use it when the function itself takes some like each game takes longer than 0.5 seconds or so. For example, a function that sleeps for a second on a set of 10 elements with 1000 permutations each (1024 games) performs as follows:

- no parallel: 1020 sec
- joblib: 63 sec
- ray: 65 sec

That makes sense since I have 16 cores and 1000/16 is around 62.

required
rng Optional[Generator]

Numpy random generator object used for reproducable results. Default is None.

required
random_seed Optional[int]

sets the random seed of the sampling process. Only used when rng is None. Default is None.

required
n_parallel_games int

Number of parallel jobs (number of to-be-occupied cores), -1 means all CPU cores and 1 means a serial process. I suggest using 1 for debugging since things get messy in parallel!

required
lazy bool

if set to True, objective function will be called lazily instead of calling it all at once and storing the outputs in a dict. Setting it to True saves a lot of memory and might even be faster in certain cases.

False
save_permutations bool

If set to True, the shapley values are calculated by calculating the running mean of the permutations instead of storing the permutations. This parameter is ignored in case the objective function returns a scaler.

False
dual_progress_bar bool

If set to true, you will have two progress bars. One parent that will track the permutations, other child that will track the elements. Its ignored in case the mbar is provided

required
mbar MasterBar

A Fastprogress MasterBar. Use it in case you're calling the interface multiple times to have a nester progress bar.

None

Returns:

Type Description
DataFrame

pd.DataFrame: Shapley table or a dict of Shapely tables, columns will be

DataFrame

elements and indices will be samples (permutations).

DataFrame

It will be a Multi-Index DataFrame if the contributions are a timeseries.

DataFrame

The index at level=1 will be the timestamps

Source code in msapy/msa.py
@typechecked
def get_shapley_table(*,
                      permutation_space: list,
                      contributions: Optional[Dict] = None,
                      lesioned: Optional[any] = None,
                      objective_function: Optional[Callable] = None,
                      objective_function_params: Optional[Dict] = None,
                      lazy=False,
                      save_permutations: bool = False,
                      dual_progress_bars: bool = True,
                      mbar: Optional[MasterBar] = None,) -> pd.DataFrame:
    """
    Calculates Shapley values based on the filled contribution_space.
    Briefly, for a permutation (A,B,C) it will be:

    (A,B,C) - (B,C) = Contribution of A to the coalition (B,C).
    (B,C) - (C) = Contribution of B to the coalition formed with (C).
    (C) = Contribution of C alone.

    This will repeat over all permutations. and the result is a distribution of Shapley values for each element,
    note that the estimation method we're using here is an "unbiased estimator" so the variance is fairly large.

    Args:
        contributions (Dict):
            Filled Dictionary of coalition:result

        permutation_space (list):
            Should be the same passed to make_combination_space.

        lesioned (Optional[any]):
            leseioned element that will not be present in any combination

        objective_function (Callable):
            The game (in-silico experiment). It should get the complement set and return one numeric value
            either int or float.
            This function is just calling it as: objective_function(complement, **objective_function_params)

            An example using networkx with some tips:
            (you sometimes need to specify what should happen during edge-cases like an all-lesioned network)

            def local_efficiency(complements, graph):
                if len(complements) < 0:
                    # the network is intact so:
                    return nx.local_efficiency(graph)

                elif len(complements) == len(graph):
                    # the network is fully lesioned so:
                    return 0.0

                else:
                    # lesion the system, calculate things
                    lesioned = graph.copy()
                    lesioned.remove_nodes_from(complements)
                    return nx.local_efficiency(lesioned)

        objective_function_params (Dict):
            Kwargs for the objective_function.

        lesioned (Optional[any]):
            leseioned element that will not be present in any combination

        multiprocessing_method (str):
            So far, two methods of parallelization is implemented, 'joblib' and 'ray' and the default method is joblib.
            If using ray tho, you need to decorate your objective function with @ray.remote decorator. Visit their
            documentations to see how to go for it. I guess ray works better on HPC clusters (if they support it tho!)
            and probably doesn't suffer from the sneaky "memory leakage" of joblib. But just by playing around,
            I realized joblib is faster for tasks that are small themselves. Remedies are here:
            https://docs.ray.io/en/latest/auto_examples/tips-for-first-time.html

            Note: Generally, multiprocessing isn't always faster as explained above. Use it when the function itself
            takes some like each game takes longer than 0.5 seconds or so. For example, a function that sleeps for a
            second on a set of 10 elements with 1000 permutations each (1024 games) performs as follows:

                - no parallel: 1020 sec
                - joblib: 63 sec
                - ray: 65 sec

            That makes sense since I have 16 cores and 1000/16 is around 62.

        rng (Optional[np.random.Generator]): Numpy random generator object used for reproducable results. Default is None.

        random_seed (Optional[int]):
            sets the random seed of the sampling process. Only used when `rng` is None. Default is None.

        n_parallel_games (int):
            Number of parallel jobs (number of to-be-occupied cores),
            -1 means all CPU cores and 1 means a serial process.
            I suggest using 1 for debugging since things get messy in parallel!

        lazy (bool): if set to True, objective function will be called lazily instead of calling it all at once and storing the outputs in a dict.
            Setting it to True saves a lot of memory and might even be faster in certain cases.

        save_permutations (bool): If set to True, the shapley values are calculated by calculating the running mean of the permutations instead of
            storing the permutations. This parameter is ignored in case the objective function returns a scaler.

        dual_progress_bar (bool): If set to true, you will have two progress bars. One parent that will track the permutations, other child that
            will track the elements. Its ignored in case the mbar is provided

        mbar (MasterBar): A Fastprogress MasterBar. Use it in case you're calling the interface multiple times to have a nester progress bar.

    Returns:
        pd.DataFrame: Shapley table or a dict of Shapely tables, columns will be 
        elements and indices will be samples (permutations). 
        It will be a Multi-Index DataFrame if the contributions are a timeseries.
        The index at `level=1` will be the timestamps
    """
    _check_get_shapley_table_args(contributions, objective_function, lazy)
    _check_valid_permutation_space(permutation_space)

    lesioned = {lesioned} if lesioned else set()
    contributions = {tuple(lesioned): objective_function(tuple(lesioned), **objective_function_params)} if lazy else contributions

    contribution_type, intact_contributions_in_case_lazy = _get_contribution_type(contributions)
    contrib_shape = intact_contributions_in_case_lazy.shape if contribution_type == "nd" else []

    sorted_elements = sorted(permutation_space[0])
    permutation_space = set(permutation_space)

    if not lazy:
        parent_bar = enumerate(permutation_space)
    elif (not dual_progress_bars) or mbar:
        parent_bar = progress_bar(enumerate(permutation_space), total=len(
            permutation_space), leave=False, parent=mbar)
    elif lazy:
        parent_bar = master_bar(
            enumerate(permutation_space), total=len(permutation_space))

    shapley_table = 0 if (contribution_type == 'nd' and not save_permutations) else np.zeros((len(permutation_space), len(sorted_elements), *contrib_shape), dtype=float)

    for i, permutation in parent_bar:
        isolated_contributions = np.zeros((len(permutation), *intact_contributions_in_case_lazy.shape), dtype=float) if contribution_type=="nd" else ([None] * len(permutation))  # got to be a better way!
        child_bar = enumerate(permutation) if not (dual_progress_bars and lazy) else progress_bar(
            enumerate(permutation), total=len(permutation), leave=False, parent=parent_bar)
        # iterate over all elements in the permutation to calculate their isolated contributions

        contributions_including = intact_contributions_in_case_lazy
        for index, element in child_bar:
            including = frozenset(permutation[:index + 1])
            excluding = frozenset(permutation[:index])

            # the isolated contribution of an element is the difference of contribution with that element and without that element
            if lazy:
                contributions_excluding = objective_function(tuple(including.union(lesioned)), **objective_function_params)
                isolated_contributions[sorted_elements.index(element)] = contributions_including - contributions_excluding
                contributions_including = contributions_excluding
            else:
                isolated_contributions[sorted_elements.index(element)] =  contributions[including - lesioned] - contributions[excluding - lesioned]

        if contribution_type == 'nd' and not save_permutations:
            shapley_table += (isolated_contributions - shapley_table) / (i + 1)
        else:
            shapley_table[i] = np.array(isolated_contributions)

    # post processing of shapley values based on what type of contribution it is. The format of output will vary based on if the
    # values are multi-scores, timeseries, etc.
    if contribution_type == 'nd' and not save_permutations:
        shapley_table = shapley_table.reshape(shapley_table.shape[0], -1).T
        shapley_table = pd.DataFrame(
            shapley_table, columns=sorted_elements)
        return ShapleyModeND(shapley_table, intact_contributions_in_case_lazy.shape)

    if contribution_type == "scaler":
        return ShapleyTable(pd.DataFrame(shapley_table, columns=sorted_elements))

    return ShapleyTableND.from_ndarray(shapley_table, columns=sorted_elements)