Source code for algorithms.qsom.qsom

"""
This module defines a Q-SOM helper that is used as an entrypoint to simplify
the instantiation of Q-SOM Agents from a Gym Environment.

It handles creating the correct structures, giving the correct parameters, ...
"""
import numpy as np

from algorithms.model import Model
from algorithms.qsom.qsom_agent import QsomAgent
from algorithms.qsom.som import SOM
from algorithms.util.action_perturbator import EpsilonActionPerturbator
from algorithms.util.action_selector import BoltzmannActionSelector
from smartgrid.environment import SmartGrid


[docs] class QSOM(Model): """ The Q-SOM learning algorithm: based on Q-Learning + Self-Organizing Maps. Two SOMs are used, a State-SOM that learns to handle the observation (state) space, i.e., to map continuous observations to discrete space identifiers; and an Action-SOM that learns to handle the action space, i.e., to map discrete actions identifiers to continuous action parameters. A Q-Table learns the interests of (discrete) actions in (discrete) states. List of hyperparameters that this model expects: initial_tau Initial value of the Boltzmann temperature, which controls the exploration-exploitation trade-off. tau_decay Whether to decrease (decay) the Boltzmann temperature over the time steps, so as to encourage exploitation rather than exploration in later time steps. See also the ``tau_decay_coeff`` below. tau_decay_coeff Coefficient of reduction of the Boltzmann temperature, each step, if the decay is enabled. Applied multiplicatively to the current tau each time step, i.e., ``tau = tau * tau_decay_coeff``. noise The noise parameter that controls the random distribution when perturbing actions. The higher the noise, the more the action will be perturbed (i.e., far from its original, unperturbed version). sigma_state Size of the neighborhood for the State-SOM. sigma_action Size of the neighborhood for the Action-SOM. lr_state Learning rate for the State-SOM. lr_action Learning rate for the Action-SOM. q_learning_rate Learning rate for the Q-Table. q_discount_factor The gamma value controls the horizon of expected rewards: the higher it is, the more the agent will take into account the future states, and rewards that can be expected from these future states, when determining its policy. If set to 0, the agent will simply maximize the current expected reward (greedy policy). update_all Whether to update all Q-Values (Smith's optimization) at each step. This speeds up the learning of interests. use_neighborhood Whether to use the State- and Action-SOMs neighborhoods when updating the Q-Values. """ default_hyperparameters = { "q_learning_rate": 0.7, "q_discount_factor": 0.9, "update_all": True, "use_neighborhood": True, "sigma_state": 1.0, "lr_state": 0.8, "sigma_action": 1.0, "lr_action": 0.7, "initial_tau": 0.5, "tau_decay": False, "tau_decay_coeff": 1.0, "noise": 0.08 }
[docs] def __init__(self, env: SmartGrid, hyper_parameters: dict = None): if hyper_parameters is None: hyper_parameters = QSOM.default_hyperparameters super().__init__(env, hyper_parameters) self.qsom_agents = [] action_selector = BoltzmannActionSelector(self.hyper_parameters["initial_tau"], self.hyper_parameters["tau_decay"], self.hyper_parameters["tau_decay_coeff"]) action_perturbator = EpsilonActionPerturbator(self.hyper_parameters["noise"]) for num_agent in range(env.n_agent): obs_space = env.observation_space[num_agent] assert len(obs_space.shape) == 1, 'Observation space must be 1D' action_space = env.action_space[num_agent] assert len(action_space.shape) == 1, 'Action space must be 1D' state_som = SOM(12, 12, obs_space.shape[0], sigma=self.hyper_parameters["sigma_state"], learning_rate=self.hyper_parameters["lr_state"]) action_som = SOM(3, 3, action_space.shape[0], sigma=self.hyper_parameters["sigma_action"], learning_rate=self.hyper_parameters["lr_action"]) qsom_agent = QsomAgent(obs_space, action_space, state_som, action_som, action_selector, action_perturbator, q_learning_rate=self.hyper_parameters["q_learning_rate"], q_discount_factor=self.hyper_parameters["q_discount_factor"], update_all=self.hyper_parameters["update_all"], use_neighborhood=self.hyper_parameters["use_neighborhood"]) self.qsom_agents.append(qsom_agent)
[docs] def forward(self, observations_per_agent): """Choose an action for each agent, based on their observations.""" observations_per_agent = [ np.concatenate(( observations_per_agent['local'][i], observations_per_agent['global'], )) for i in range(self.env.n_agent) ] assert len(observations_per_agent) == len(self.qsom_agents) actions = [ self.qsom_agents[i].forward(observations_per_agent[i]) for i in range(len(self.qsom_agents)) ] return actions
[docs] def backward(self, new_observations_per_agent, reward_per_agent): """Make each agent learn, based on their rewards and observations.""" new_observations_per_agent = [ np.concatenate(( new_observations_per_agent['local'][i], new_observations_per_agent['global'], )) for i in range(self.env.n_agent) ] assert len(reward_per_agent) == len(self.qsom_agents) assert len(new_observations_per_agent) == len(self.qsom_agents) for i, agent in enumerate(self.qsom_agents): agent.backward(new_observations_per_agent[i], reward_per_agent[i])