Source code for clinamen2.cmaes.params_and_state

"""The core Cholesky CMA-ES implementation.


    References:
        [1] O. Krause, D. R. Arbonès, C. Igel, "CMA-ES with Optimal Covariance
        Update and Storage Complexity", part of [2], 2016.

        [2] D. Lee, M. Sugiyama, U. Luxburg, I. Guyon, R. Garnett, "Advances
        in Neural Information Processing Systems 29, 2016.
"""
from dataclasses import dataclass
from typing import Callable, Optional, Tuple, Union

import numpy as np
import numpy.typing as npt
import scipy as sp
import scipy.stats
from scipy.linalg import solve_triangular

from clinamen2.runner.basic_runner import Runner, WorkerResult


[docs] @dataclass(frozen=True) class AlgorithmParameters: """Class for keeping track of the initial, immutable parameters of a run. You are unlikely to need to manually instantiate this class. Use `init_default_algorithm_parameters` instead, which provides a simpler interface covering common use cases and uses recommended defaults wherever possible. Directly instantiating this class offers more flexibility, but deviating from the literature recommendations for parameter defaults without good reason is not encouraged. Basic checks on parameters are performed upon direct instantiation, but no defaults are used. Every single parameter is therefore required. See References for more information on individual parameters. Args: dimension: The dimensionality of the problem. initial_step_size: Global step size at the beginning of a run. random_seed: Random seed to start the run with. pop_size: The size of the "population", i.e., of each sample of random deviates. mu: Parent number, equal to the number of positive weights. In the standard flavor of CMA-ES (with no negative weights), it is the mu best-ranked individuals that contribute to the update of the normal distribution. weights: The pop_size weights used in the algorithm. c_sigma: The learning rate for the conjugate evolution path used for step-size control. It must lie in (0, 1). d_sigma: The damping term, which must be positive. c_m: The learning rate for updating the mean. Generally 1, usually <= 1. c_c: The learning rate for the evolution path used in the cumulation procedure. It must lie in [0, 1]. c_1: The learning rate for the rank-1 update of the covariance matrix. It must lie in [0, 1 - c_mu]. c_mu: The learning rate for the rank-mu update of the covariance matrix. It must lie in [0, 1 - c_1]. """ dimension: int initial_step_size: float random_seed: int pop_size: int mu: int weights: npt.ArrayLike c_sigma: float d_sigma: float c_m: float c_c: float c_1: float c_mu: float def __post_init__(self): if self.dimension <= 0: raise ValueError("'dimension' must be a positive integer.") if self.initial_step_size <= 0: raise ValueError("'initial_step_size' must be positive.") if self.pop_size < 2: raise ValueError("'pop_size' must be at least two.") if not 0 < self.mu <= self.pop_size: raise ValueError("'mu' must be in (0, 'pop_size'].") object.__setattr__(self, "weights", np.asarray(self.weights)) if self.weights.shape != (self.pop_size,): raise ValueError( "'weights' must be 1-d array_like of length 'pop_size'" ) if np.any(self.weights[: self.mu] < 0): raise ValueError("First mu weights must all be nonnegative.") if not np.isclose(np.sum(self.weights[: self.mu]), 1): raise ValueError("First mu weights must sum to one.") if np.any(self.weights[: self.mu - 1] - self.weights[1 : self.mu] < 0): raise ValueError( "First mu weights must be in non-ascending order." ) object.__setattr__( self, "mu_eff", 1 / np.sum(self.weights[: self.mu] ** 2) ) if not 0 < self.c_sigma < 1: raise ValueError("'c_sigma' must be in (0, 1).") if self.d_sigma <= 0: raise ValueError("'d_sigma' must be positive.") if not 0 <= self.c_m <= 1: raise ValueError("'c_m' must be in [0, 1].") if not 0 <= self.c_c <= 1: raise ValueError("'c_c' must be in [0, 1].") if not 0 <= self.c_1 <= 1 - self.c_mu: raise ValueError("'c_1' must be in [0, 1 - c_mu]") if not 0 <= self.c_mu <= 1 - self.c_1: raise ValueError("'c_mu' must be in [0, 1 - c_1]")
[docs] def init_default_algorithm_parameters( dimension: int, pop_size: int = None, alpha_cov: float = 2.0, initial_step_size: float = 1.0, random_seed: int = 0, ) -> AlgorithmParameters: """Initialize CMA-ES parameters to recommended defaults wherever possible. This function is the recommended way of instantiating an AlgorithmParameters object. It exposes only a minimal set of options to the user and computes all other parameters from these using literature defaults. The only strictly required argument is the dimension. Weights for ranks > mu are initialized to zero, i.e., no negative weights as in Ref. [1] are used. Args: dimension: The dimensionality of the problem, i.e., number of degrees of freedom. pop_size: The number of samples drawn from the Gaussian at each generation. If not given, it is calculated from the dimension according to `pop_size = 4 + floor(3 * log(dimension))`. Using smaller values than this default is not recommended. alpha_cov: A parameter that enters into the calculation of defaults for the learning rates used in covariance matrix update. initial_step_size: Global step size at the beginning of a run. random_seed: Random seed to start the run with. Returns: An initialized AlgorithmParameters object. """ # Population size and weights if pop_size is None: lamb = 4 + int(np.floor(3 * np.log(dimension))) else: lamb = pop_size mu = int(np.floor(lamb / 2)) weights_prime = np.log((lamb + 1) / 2) - np.log(np.arange(1, lamb + 1)) weights = np.zeros(lamb) weights[:mu] = weights_prime[:mu] / np.sum(weights_prime[:mu]) mu_eff = 1 / np.sum(weights[:mu] ** 2) # Step-size control c_sigma = (mu_eff + 2) / (dimension + mu_eff + 5) d_sigma = ( 1 + 2 * np.max([0, np.sqrt((mu_eff - 1) / (dimension + 1)) - 1]) + c_sigma ) # Covariance matrix adaptation c_c = (4 + mu_eff / dimension) / (dimension + 4 + 2 * mu_eff / dimension) c_1 = alpha_cov / ((dimension + 1.3) ** 2 + mu_eff) num = mu_eff - 2 + 1 / mu_eff den = (dimension + 2) ** 2 + alpha_cov * mu_eff / 2 c_mu = np.min([1 - c_1, alpha_cov * num / den]) return AlgorithmParameters( dimension=dimension, initial_step_size=initial_step_size, random_seed=random_seed, pop_size=lamb, mu=mu, weights=weights, c_sigma=c_sigma, d_sigma=d_sigma, c_m=1.0, c_c=c_c, c_1=c_1, c_mu=c_mu, )
[docs] @dataclass(frozen=True) class AlgorithmState: """Class for keeping track of the state of the run. Includes the CMA-ES data (e.g. ``mean``) and keeps track of the random state. Args: random_state: Dictionary representing the state of the random generator numpy.random.default_rng(). mean: Mean of the Gaussian. cholesky_factor: Cholesky factor of the covariance matrix. step_size: Global standard deviation of the Gaussian. generation: The generation number. p_sigma: Evolution path of step size. p_c: Evolution path of the covariance. """ random_state: dict mean: npt.ArrayLike cholesky_factor: npt.ArrayLike step_size: float generation: int p_sigma: npt.ArrayLike p_c: npt.ArrayLike def __post_init__(self): pass
[docs] def create_init_algorithm_state(parameters: AlgorithmParameters) -> Callable: """Create function that initializes an AlgorithmState. Args: parameters: Initial, immutable parameters of the CMA-ES run. Returns: A function for initializing an AlgorithmState. """ def init_algorithm_state( mean: Optional[npt.ArrayLike] = None, cholesky_factor: Optional[npt.ArrayLike] = None, generation: Optional[int] = None, p_sigma: Optional[npt.ArrayLike] = None, p_c: Optional[npt.ArrayLike] = None, ) -> AlgorithmState: """Function for creating a new AlgorithmState. Performs all checks. Args: mean: Mean of the Gaussian. Default is zeros of shape (parameters.dimension, ). cholesky_factor: Cholesky factor of the covariance matrix. Default is the identity matrix of shape (parameters.dimension, parameters.dimension) p_sigma: Evolution path of step size. Default is zeros with shape of the mean. p_cov: Evolution path of the covariance. Default is zeros with shape of the mean. Returns: An AlgorithmState with default settings. """ rng = np.random.default_rng(seed=parameters.random_seed) random_state = rng.bit_generator.state if mean is None: mean = np.zeros(parameters.dimension) else: if mean.shape != (parameters.dimension,): raise AttributeError( f"The mean vector must have shape " f"{(parameters.dimension,)}" ) if cholesky_factor is None: cholesky_factor = np.eye(parameters.dimension) else: if cholesky_factor.shape != ( parameters.dimension, parameters.dimension, ): raise AttributeError( f"The cholesky_factor must have shape " f"{(parameters.dimension, parameters.dimension)}" ) step_size = parameters.initial_step_size if generation is None: generation = 0 else: if not isinstance(generation, int): raise TypeError( f"The generation must be an integer >= 0, " f"got {generation}." ) if generation < 0: raise ValueError( f"The generation must be an integer >= 0, got {generation}." ) if p_sigma is None: p_sigma = np.zeros_like(mean) else: if p_sigma.shape != mean.shape: raise AttributeError( f"The p_sigma vector must have shape " f"{np.zeros_like(mean).shape}" ) if p_c is None: p_c = np.zeros_like(mean) else: if p_c.shape != mean.shape: raise AttributeError( f"The p_cov vector must have shape " f"{np.zeros_like(mean).shape}" ) return AlgorithmState( random_state=random_state, step_size=step_size, mean=mean, cholesky_factor=cholesky_factor, generation=generation, p_sigma=p_sigma, p_c=p_c, ) if not isinstance(parameters, AlgorithmParameters): raise TypeError("Parameters must be of type AlgorithmParameters.") return init_algorithm_state
[docs] def create_sample_and_evaluate( sample_individuals: Callable, evaluate_loss: Callable, input_pipeline: Callable = lambda x: x, ) -> Callable: """Create function that samples a population and evaluates its loss. Args: sample_individuals: Function that samples a number of individuals from a state. evaluate_loss: Function that returns the loss of one individual or a population of individuals. input_pipeline: Transform dof such that evaluate_loss can handle the data. Default is identity. Returns: A function to sample and evaluate a population from a state. """ def sample_and_evaluate( state: AlgorithmState, ) -> Tuple[npt.ArrayLike, AlgorithmState, npt.ArrayLike]: """Function for sampling a population from a state and evaluating the loss. Args: state: State of the previous CMA step. Returns: tuple containing - A population of individuals sampled from the AlgorithmState. - The new AlgorithmState. - The loss of all individuals. """ population, state = sample_individuals(state) loss = evaluate_loss(input_pipeline(population)) return population, state, loss return sample_and_evaluate
[docs] def create_sample_and_sequential_evaluate( sample_individuals: Callable, evaluate_loss: Callable, input_pipeline: Callable = lambda x: x, ) -> Callable: """Create function that samples a population and evaluates its loss. Args: sample_individuals: Function that samples a number of individuals from a state. evaluate_loss: Function that returns the loss of one individual or a population of individuals. input_pipeline: Transform dof such that evaluate_loss can handle the data. Default is identity. Returns: A function to sample and evaluate a population from a state. """ def sample_and_evaluate( state: AlgorithmState, ) -> Tuple[npt.ArrayLike, AlgorithmState, npt.ArrayLike]: """Function for sampling a population from a state and evaluating the loss. Args: state: State of the previous CMA step. Returns: tuple containing - A population of individuals sampled from the AlgorithmState. - The new AlgorithmState. - The loss of all individuals. """ population, state = sample_individuals(state) loss = [] for individual in population: loss.append(evaluate_loss(input_pipeline(individual))) loss = np.array(loss) return population, state, loss return sample_and_evaluate
[docs] def create_resample_and_evaluate( sample_individuals: Callable, evaluate_batch: Callable, evaluate_single: Callable, input_pipeline_batch: Callable = lambda x: x, input_pipeline_single: Callable = lambda x: x, ) -> Callable: """Create function that samples a population and evaluates its loss. Samples that are rejected, i.e., they come with an exception are resampled. Args: sample_individuals: Function that samples a number of individuals from a state. evaluate_batch: Function that returns a tuple containing the loss of a batch of individuals and additional information in a dictionary (at least exception if applicable). evaluate_single: Function that returns a tuple containing the loss of an individual and additional information in a dictionary (at least exception if applicable). input_pipeline_batch: Transform dof such that evaluate_loss can handle the data. Default is identity. input_pipeline_single: Transform dof such that evaluate_loss can handle the data. Default is identity. Returns: A function to sample (with resampling) and evaluate a population from a state. """ def resample_and_evaluate( state: AlgorithmState, n_samples: int, n_attempts: Optional[int] = int(1e6), return_failures: bool = False, ) -> Tuple[npt.ArrayLike, AlgorithmState, npt.ArrayLike]: """Function for sampling a population from a state and evaluating the loss. Args: state: State of the previous CMA step. n_samples: Number of successfully evaluated individuals to be returned. n_attempts: Maximum number of attempts to reach n_samples. Default is 1e6 to avoid infinite loops. return_failures: If set to True, individuals for which the evaluation failed are returned in a list of dictionaries. Returns: tuple containing - A population of individuals sampled from the AlgorithmState. - The new AlgorithmState. - An array containing the loss of all passing individuals. - Additional information returned by the evaluation function in a list of dictionaries, e.g. uncertainty or forces if applicable. - Optional: A list of dictionaries containing individuals sampled from the AlgorithmState where the evaluation failed. Includes at least the associated exception. """ population = [] loss = [] information = [] failed = [] returned_population, state = sample_individuals(state, n_samples) returned_loss, returned_information, _ = evaluate_batch( input_pipeline_batch(returned_population) ) # sample and batch evaluate the required samples attempts = n_samples for i, info in enumerate(returned_information): if "exception" in info.keys(): failed.append( { "individual": returned_population[i], "loss": returned_loss[i], **info, } ) else: population.append(returned_population[i]) loss.append(returned_loss[i]) information.append(info) # resample and single evaluate individuals while attempts <= n_attempts and len(population) < n_samples: attempts += 1 resampled_population, state = sample_individuals( state, n_samples=1 ) resampled_loss, resampled_information, _ = evaluate_single( input_pipeline_single(resampled_population) ) if "exception" in resampled_information.keys(): failed.append( { "individual": resampled_population[0], "loss": resampled_loss, **resampled_information, } ) else: population.append(resampled_population[0]) loss.append(resampled_loss) information.append(resampled_information) if len(population) < n_samples: raise OverflowError( f"Evaluation attempt limit of {n_attempts} reached " ) if return_failures: return ( np.asarray(population), state, np.asarray(loss), information, failed, ) else: return ( np.asarray(population), state, np.asarray(loss), information, ) return resample_and_evaluate
[docs] def create_sample_and_run( sample_individuals: Callable, runner: Runner, ) -> Callable: """Create function that samples a population and evaluates its loss. Utilizes an instance of Runner. With resampling for failures. Args: sample_individuals: Function that samples a number of individuals from a state. runner: Function that submits all individuals to a Runner and pops the results. Returns: A function to sample and evaluate a population from a state. """ def sample_and_run( state: AlgorithmState, n_samples: int, n_attempts: Optional[int] = int(1e6), return_failures: bool = False, ) -> Union[ Tuple[npt.ArrayLike, AlgorithmState, npt.ArrayLike], Tuple[npt.ArrayLike, AlgorithmState, npt.ArrayLike, npt.ArrayLike], ]: """Function for sampling a population from a state and evaluating the loss. Args: state: State of the previous CMA step. n_samples: Number of successfully evaluated individuals to be returned. n_attempts: Maximum number of attempts to reach n_samples. Default is 1e6 to avoid infinite loops. return_failures: If set to True, individuals for which evaluation failed are returned in an additional dictionary. Returns: tuple containing - A dictionary of individuals sampled from the AlgorithmState that were successfully evaluated. - The new AlgorithmState. - A dictionary containing the loss of all individuals. - Additional information returned by the evaluation function in a list of dictionaries, e.g. atoms object if applicable. - Optional: A dictionary containing individuals sampled from the AlgorithmState where the evaluation failed. Includes the associated exception. """ sampled_dict = {} evaluated_dict = {} failed_dict = {} loss_dict = {} information_dict = {} population, state = sample_individuals(state, n_samples=n_samples) for individual in population: key = runner.submit(individual) sampled_dict[key] = individual for attempt_index, future in enumerate(runner.pop()): if attempt_index == n_attempts: raise OverflowError( f"Evaluation attempt limit of {n_attempts} reached " ) if future.exception() is None: evaluated_dict[future.key] = sampled_dict[future.key] result = WorkerResult(*future.result()) loss_dict[future.key] = result.loss information_dict[future.key] = result.information else: if return_failures: failed_dict[future.key] = ( future.exception(), sampled_dict[future.key], ) resampled, state = sample_individuals(state, n_samples=1) key = runner.submit(resampled[0]) sampled_dict[key] = resampled[0] if return_failures: return ( evaluated_dict, state, loss_dict, information_dict, failed_dict, ) else: return evaluated_dict, state, loss_dict, information_dict return sample_and_run
[docs] def create_sample_from_state(parameters: AlgorithmParameters) -> Callable: """Create function that samples a population. Args: parameters: Initial, immutable parameters of the CMA-ES run. Returns: A function to sample invididuals from a state. """ def sample_from_state( state: AlgorithmState, n_samples: Optional[int] = None ) -> Tuple[npt.ArrayLike, AlgorithmState]: """Function for sampling a population from a state Args: state: State of the previous CMA step. n_samples: Number of individuals to be sampled. Default is parameters.pop_size. Returns: tuple containing - A population of individuals sampled from the AlgorithmState. - The new AlgorithmState. """ if n_samples is None: n_samples = parameters.pop_size rng = np.random.default_rng() rng.bit_generator.state = state.random_state # sample from multivariate normal distribution (0, identity) base_population = rng.multivariate_normal( np.zeros(parameters.dimension), np.eye(parameters.dimension), n_samples, ) population = [] for p in range(n_samples): individual = ( state.step_size * (state.cholesky_factor @ base_population[p]) + state.mean ) population.append(individual) new_random_state = rng.bit_generator.state new_state = AlgorithmState( random_state=new_random_state, mean=state.mean, cholesky_factor=state.cholesky_factor, step_size=state.step_size, generation=state.generation, p_sigma=state.p_sigma, p_c=state.p_c, ) return np.array(population), new_state return sample_from_state
[docs] def calculate_new_mean( parameters: AlgorithmParameters, original_state: AlgorithmState, population: npt.ArrayLike, ) -> npt.ArrayLike: """Calculates the new mean of the distribution. Args: parameters: Initial, immutable parameters of the CMA-ES run. original_state: State of the previous CMA step. population: Individuals of the current generation. Returns: The new mean of the distribution. """ centered_population = ( population - original_state.mean ) / original_state.step_size weighted_population = ( parameters.weights[: parameters.mu, np.newaxis] * centered_population[: parameters.mu] ) y_w = np.sum(weighted_population[: parameters.mu], axis=0) return ( original_state.mean + parameters.c_m * original_state.step_size * y_w )
[docs] def rank_one_update( A: npt.ArrayLike, beta: float, v: npt.ArrayLike ) -> npt.ArrayLike: """Perform the rank-one update of the Cholesky factor as described in [1]. Args: A: Cholesky factor of the covariance matrix. beta: Pre-factor (expected: learning rate * weight). v: Centered individual. Returns: Cholesky factor A' of (C + beta*v*v.T). """ alpha = v b_factor = 1.0 n_cols = A.shape[0] cols = [] for j in range(n_cols): column_old = A[j:, j] left = alpha[0] top_left_old = column_old[0] top_left_new = np.sqrt( top_left_old**2 + (beta / b_factor) * left**2 ) gamma = top_left_old**2 * b_factor + beta * left**2 alpha = alpha[1:] - (left / top_left_old) * column_old[1:] new_col = np.concatenate( [ np.zeros(j), [top_left_new], (top_left_new / top_left_old) * column_old[1:] + (top_left_new * beta * left / gamma) * alpha, ] ) cols.append(new_col) b_factor += beta * left**2 / top_left_old**2 return np.asarray(cols).T
[docs] def create_update_algorithm_state(parameters: AlgorithmParameters) -> Callable: """Create function that creates an updated AlgorithmState. Args: parameters: Initial, immutable parameters of the CMA-ES run. Returns: A function for getting an updated AlgorithmState. """ def update_algorithm_state( original_state: AlgorithmState, population: npt.ArrayLike, ) -> AlgorithmState: """Function for creating an updated AlgorithmState Args: original_state: The original AlgorithmState to calculate the new one from. population: An array of all individuals of the original population. Returns: A new AlgorithmState. """ # check parameters for consistency if population.shape[0] != parameters.pop_size: raise ValueError( f"Size of population does not match expected population " f"size of {parameters.pop_size}." ) if population.shape[1] != parameters.dimension: raise ValueError( f"Dimension {population.shape[1]} of individuals does not " f"match expected dimension of {parameters.dimension}." ) mean = calculate_new_mean(parameters, original_state, population) p_c = (1.0 - parameters.c_c) * original_state.p_c + np.sqrt( parameters.c_c * (2 - parameters.c_c) * parameters.mu_eff ) * (mean - original_state.mean) / original_state.step_size cholesky_factor = ( np.sqrt(1.0 - parameters.c_1 - parameters.c_mu) * original_state.cholesky_factor ) cholesky_factor = rank_one_update(cholesky_factor, parameters.c_1, p_c) for i in range(parameters.mu): cholesky_factor = rank_one_update( cholesky_factor, parameters.c_mu * parameters.weights[i], (population[i] - original_state.mean) / original_state.step_size, ) cholesky_inv = solve_triangular( original_state.cholesky_factor, np.eye(parameters.dimension), lower=True, ) p_sigma = ( 1.0 - parameters.c_sigma ) * original_state.p_sigma + np.sqrt( parameters.c_sigma * (2.0 - parameters.c_sigma) * parameters.mu_eff ) * cholesky_inv @ ( (mean - original_state.mean) / original_state.step_size ) step_size = original_state.step_size * np.exp( parameters.c_sigma / parameters.d_sigma * ( np.linalg.norm(p_sigma) / sp.stats.chi.mean(parameters.dimension) - 1.0 ) ) return AlgorithmState( random_state=original_state.random_state, mean=mean, cholesky_factor=cholesky_factor, step_size=step_size, generation=original_state.generation + 1, p_sigma=p_sigma, p_c=p_c, ) return update_algorithm_state