Source code for ast_toolbox.algos.ga

import gym.utils.seeding as seeding
import numpy as np
from dowel import logger
from dowel import tabular
from garage.misc import tensor_utils as np_tensor_utils
from garage.tf.algos.batch_polopt import BatchPolopt
from garage.tf.misc import tensor_utils


[docs]class GA(BatchPolopt): """Deep Genetic Algorithm from Such et al. [1]_. Parameters ---------- top_paths : :py:class:`ast_toolbox.mcts.BoundedPriorityQueues`, optional The bounded priority queue to store top-rewarded trajectories. step_size : float, optional Standard deviation for each mutation. step_size_anneal : float, optional The linear annealing rate of step_size after each iteration. pop_size : int, optional The population size truncation_size : int, optional The number of top-performed individuals that are chosen as parents. keep_best : int, optional The number of top-performed individuals that remain unchanged for next generation. f_F : string, optional The method used to calculate fitness: 'mean' for the average return, 'max' for the max return. log_interval : int, optional The log interval in terms of environment calls. kwargs : Keyword arguments passed to :doc:`garage.tf.algos.BatchPolopt <garage:_apidoc/garage.tf.algos>`. References ---------- .. [1] Such, Felipe Petroski, et al. "Deep neuroevolution: Genetic algorithms are a competitive alternative for training deep neural networks for reinforcement learning." arXiv:1712.06567 (2017). """ def __init__( self, top_paths=None, n_itr=2, batch_size=500, step_size=0.01, step_size_anneal=1.0, pop_size=5, truncation_size=2, keep_best=1, f_F="mean", log_interval=4000, init_step=1.0, **kwargs): self.top_paths = top_paths self.best_mean = -np.inf self.best_var = 0.0 self.n_itr = n_itr self.batch_size = batch_size self.step_size = step_size self.step_size_anneal = step_size_anneal self.pop_size = pop_size self.truncation_size = truncation_size self.keep_best = keep_best self.f_F = f_F self.log_interval = log_interval self.init_step = init_step self.seeds = np.zeros([n_itr, pop_size], dtype=int) self.magnitudes = np.zeros([n_itr, pop_size]) self.parents = np.zeros(pop_size, dtype=int) self.np_random, seed = seeding.np_random() # used in set_params super(GA, self).__init__(**kwargs)
[docs] def initial(self): """Initiate trainer internal parameters. """ self.seeds[0, :] = np.random.randint(low=0, high=int(2**16), size=(1, self.pop_size)) self.magnitudes[0, :] = self.init_step * np.ones(self.pop_size) self.policy.set_param_values(self.policy.get_param_values()) self.stepNum = 0
[docs] def init_opt(self): """Initiate trainer internal tensorflow operations. """ return dict()
[docs] def train(self, runner): """Start training. Parameters ---------- runner : :py:class:`garage.experiment.LocalRunner <garage:garage.experiment.LocalRunner>` ``LocalRunner`` is passed to give algorithm the access to ``runner.step_epochs()``, which provides services such as snapshotting and sampler control. """ self.initial() for itr in runner.step_epochs(): all_paths = {} for p in range(self.pop_size): with logger.prefix('idv #%d | ' % p): logger.log("Updating Params") self.set_params(itr, p) logger.log("Obtaining samples...") paths = self.obtain_samples(itr, runner) logger.log("Processing samples...") samples_data = self.process_samples(itr, paths) # all_paths[p]=paths all_paths[p] = samples_data # logger.log("Logging diagnostics...") # self.log_diagnostics(paths) logger.log("Optimizing Population...") self.optimize_policy(itr, all_paths) self.step_size = self.step_size * self.step_size_anneal self.record_tabular(itr) runner.step_itr += 1 return None
[docs] def record_tabular(self, itr): """Record training performace per-iteration. Parameters ---------- itr : int The iteration number. """ tabular.record('Itr', itr) tabular.record('StepNum', self.stepNum) # This causes tabular logging inconsistant # if self.top_paths is not None: # for (topi, path) in enumerate(self.top_paths): # tabular.record('reward ' + str(topi), path[0]) tabular.record('BestMean', self.best_mean) tabular.record('BestVar', self.best_var) tabular.record('StepSize', self.step_size) tabular.record('Max Magnitude', np.max(self.magnitudes[itr, :])) tabular.record('Min Magnitude', np.min(self.magnitudes[itr, :])) tabular.record('Mean Magnitude', np.mean(self.magnitudes[itr, :])) self.extra_recording(itr)
[docs] def extra_recording(self, itr): """Record extra training statistics per-iteration. Parameters ---------- itr : int The iteration number. """ return None
[docs] def set_params(self, itr, p): """Set the current policy paramter to the specified iteration and individual. Parameters ---------- itr : int The iteration number. p : int The individual index. """ for i in range(itr + 1): # print("seed: ", self.seeds[i,p]) self.np_random.seed(int(self.seeds[i, p])) if i == 0: # first generation param_values = self.policy.get_param_values(trainable=True) param_values = self.magnitudes[i, p] * self.np_random.normal(size=param_values.shape) # param_values = init_policy_np(self.policy, self.np_random) # params = self.policy.get_params() # sess = tf.get_default_session() # sess.run(tf.variables_initializer(params)) # param_values = self.policy.get_param_values() elif self.seeds[i, p] != 0: param_values = param_values + self.magnitudes[i, p] * self.np_random.normal(size=param_values.shape) self.policy.set_param_values(param_values, trainable=True)
[docs] def get_fitness(self, itr, all_paths): """Calculate the fitness of the collexted paths. Parameters ---------- itr : int The iteration number. all_paths : list[dict] The collected paths from the sampler. Returns ---------- fitness : list[float] The list of fitness of each individual. """ fitness = np.zeros(self.pop_size) for p in range(self.pop_size): rewards = all_paths[p]["rewards"] valid_rewards = rewards * all_paths[p]["valids"] path_rewards = np.sum(valid_rewards, -1) if self.f_F == "max": fitness[p] = np.max(path_rewards) else: fitness[p] = np.mean(path_rewards) return fitness
[docs] def select_parents(self, fitness): """Select the individuals to be the parents of the next generation. Parameters ---------- fitness : list[float] The list of fitness of each individual. """ sort_indx = np.flip(np.argsort(fitness), axis=0) self.parents[0:self.truncation_size] = sort_indx[0:self.truncation_size] self.parents[self.truncation_size:self.pop_size] = \ sort_indx[np.random.randint(low=0, high=self.truncation_size, size=self.pop_size - self.truncation_size)]
[docs] def mutation(self, itr, new_seeds, new_magnitudes, all_paths): """Generate new random seeds and magnitudes for the next generation. The first self.keep_best seeds are set to no-mutation value (0). Parameters ---------- itr : int The iteration number. new_seeds : :py:class:`numpy.ndarry` The original seeds. new_magnitudes : :py:class:`numpy.ndarry` The original magnitudes. all_paths : list[dict] The collected paths from the sampler. Returns ------- new_seeds : :py:class:`numpy.ndarry` The new seeds. new_magnitudes : :py:class:`numpy.ndarry` The new magnitudes. """ if itr + 1 < self.n_itr: new_seeds[itr + 1, :] = np.random.randint(low=0, high=int(2**32), size=(1, self.pop_size)) new_magnitudes[itr + 1, :] = self.step_size for i in range(0, self.keep_best): new_seeds[itr + 1, i] = 0 return new_seeds, new_magnitudes
[docs] def optimize_policy(self, itr, all_paths): """Update the population represented by self.seeds and self.parents. Parameters ---------- itr : int The iteration number. all_paths : list[dict] The collected paths from the sampler. """ fitness = self.get_fitness(itr, all_paths) self.select_parents(fitness) new_seeds = np.zeros_like(self.seeds) new_seeds[:, :] = self.seeds[:, self.parents] new_magnitudes = np.zeros_like(self.magnitudes) new_magnitudes[:, :] = self.magnitudes[:, self.parents] if itr + 1 < self.n_itr: new_seeds, new_magnitudes = self.mutation(itr, new_seeds, new_magnitudes, all_paths) self.seeds = new_seeds self.magnitudes = new_magnitudes return dict()
[docs] def obtain_samples(self, itr, runner): """Collect rollout samples using the current policy paramter. Parameters ---------- itr : int The iteration number. runner : :py:class:`garage.experiment.LocalRunner <garage:garage.experiment.LocalRunner>` ``LocalRunner`` is passed to give algorithm the access to ``runner.obtain_samples()``, which collects rollout paths from the sampler. Returns ------- paths : list[dict] The collected paths from the sampler. """ self.stepNum += self.batch_size # paths = self.sampler.obtain_samples(itr) paths = runner.obtain_samples(runner.step_itr) undiscounted_returns = [sum(path["rewards"]) for path in paths] if np.mean(undiscounted_returns) > self.best_mean: self.best_mean = np.mean(undiscounted_returns) self.best_var = np.var(undiscounted_returns) if not (self.top_paths is None): action_seqs = [path["actions"] for path in paths] [self.top_paths.enqueue( action_seq, R, make_copy=True) for (action_seq, R) in zip(action_seqs, undiscounted_returns)] return paths
[docs] def process_samples(self, itr, paths): """Return processed sample data based on the collected paths. Parameters ---------- itr : int The iteration number. paths : list[dict] The collected paths from the sampler. Returns ------- samples_data : dict Processed sample data with same trajectory length (padded with 0) """ baselines = [] returns = [] max_path_length = self.max_path_length if self.flatten_input: paths = [ dict( observations=(self.env_spec.observation_space.flatten_n( path['observations'])), actions=( self.env_spec.action_space.flatten_n( # noqa: E126 path['actions'])), rewards=path['rewards'], env_infos=path['env_infos'], agent_infos=path['agent_infos']) for path in paths ] else: paths = [ dict( observations=path['observations'], actions=( self.env_spec.action_space.flatten_n( # noqa: E126 path['actions'])), rewards=path['rewards'], env_infos=path['env_infos'], agent_infos=path['agent_infos']) for path in paths ] if hasattr(self.baseline, 'predict_n'): all_path_baselines = self.baseline.predict_n(paths) else: all_path_baselines = [ self.baseline.predict(path) for path in paths ] for idx, path in enumerate(paths): path_baselines = np.append(all_path_baselines[idx], 0) deltas = (path['rewards'] + self.discount * path_baselines[1:] - path_baselines[:-1]) path['advantages'] = np_tensor_utils.discount_cumsum( deltas, self.discount * self.gae_lambda) path['deltas'] = deltas for idx, path in enumerate(paths): # baselines path['baselines'] = all_path_baselines[idx] baselines.append(path['baselines']) # returns path['returns'] = np_tensor_utils.discount_cumsum( path['rewards'], self.discount) returns.append(path['returns']) # make all paths the same length obs = [path['observations'] for path in paths] obs = tensor_utils.pad_tensor_n(obs, max_path_length) actions = [path['actions'] for path in paths] actions = tensor_utils.pad_tensor_n(actions, max_path_length) rewards = [path['rewards'] for path in paths] rewards = tensor_utils.pad_tensor_n(rewards, max_path_length) returns = [path['returns'] for path in paths] returns = tensor_utils.pad_tensor_n(returns, max_path_length) baselines = tensor_utils.pad_tensor_n(baselines, max_path_length) agent_infos = [path['agent_infos'] for path in paths] agent_infos = tensor_utils.stack_tensor_dict_list([ tensor_utils.pad_tensor_dict(p, max_path_length) for p in agent_infos ]) env_infos = [path['env_infos'] for path in paths] env_infos = tensor_utils.stack_tensor_dict_list([ tensor_utils.pad_tensor_dict(p, max_path_length) for p in env_infos ]) valids = [np.ones_like(path['returns']) for path in paths] valids = tensor_utils.pad_tensor_n(valids, max_path_length) # average_discounted_return = (np.mean( # [path['returns'][0] for path in paths])) undiscounted_returns = [sum(path['rewards']) for path in paths] self.episode_reward_mean.extend(undiscounted_returns) # ent = np.sum(self.policy.distribution.entropy(agent_infos) * # valids) / np.sum(valids) samples_data = dict( observations=obs, actions=actions, rewards=rewards, baselines=baselines, returns=returns, valids=valids, agent_infos=agent_infos, env_infos=env_infos, paths=paths, average_return=np.mean(undiscounted_returns), ) return samples_data
[docs] def get_itr_snapshot(self, itr, samples_data): """Get the snapshot of the current population. Parameters ---------- itr : int The iteration number. samples_data : dict The processed data samples. Returns ------- snaposhot : dict The training snapshot. """ return dict( itr=itr, policy=self.policy, seeds=self.seeds, magnitudes=self.magnitudes, )