package emote.models

Classes

class DynamicModel(nn.Module):

Wrapper class for model. DynamicModel class functions as a wrapper for models including ensembles. It also provides data manipulations that are common when using dynamics models with observations and actions (e.g., predicting delta observations, input normalization).

Methods

def __init__(self, *model, learned_rewards, obs_process_fn, no_delta_list) -> None

Arguments:

  • model(nn.Module): the model to wrap.
  • learned_rewards(bool): if True, the wrapper considers the last output of the model to correspond to reward predictions.
  • obs_process_fn(Optional[nn.Module]): if provided, observations will be passed through this function before being given to the model.
  • no_delta_list(Optional[list[int]]): if provided, represents a list of dimensions over which the model predicts the actual observation and not just a delta.
def forward(self, x) -> tuple[torch.Tensor, ...]

Computes the output of the dynamics model.

Arguments:

  • x(torch.Tensor): input

Returns:

  • (tuple of tensors): predicted tensors
def loss(self, obs, next_obs, action, reward) -> tuple[torch.Tensor, dict[str, any]]

Computes the model loss over a batch of transitions.

Arguments:

  • obs(torch.Tensor): current observations
  • next_obs(torch.Tensor): next observations
  • action(torch.Tensor): actions
  • reward(torch.Tensor): rewards

Returns:

  • (tensor and optional dict): the loss tensor and optional info
def sample(
    self,
    action,
    observation,
    rng
) -> tuple[torch.Tensor, Optional[torch.Tensor]]

Samples a simulated transition from the dynamics model. The function first normalizes the inputs to the model, and then denormalize the model output as the final output.

Arguments:

  • action(torch.Tensor): the action at.
  • observation(torch.Tensor): the observation/state st.
  • rng(torch.Generator): a random number generator.

Returns:

  • predicted observation and rewards.
def get_model_input(self, obs, action) -> torch.Tensor

The function prepares the input to the neural network model by concatenating observations and actions. In case, obs_process_fn is given, the observations are processed by the function prior to the concatenation.

Arguments:

  • obs(torch.Tensor): observation tensor
  • action(torch.Tensor): action tensor

Returns:

  • the concatenation of obs and actions
def process_batch(
    self,
    obs,
    next_obs,
    action,
    reward
) -> tuple[torch.Tensor, torch.Tensor]

The function processes the given batch, normalizes inputs and targets, and prepares them for the training.

Arguments:

  • obs(torch.Tensor): the observations tensor
  • next_obs(torch.Tensor): the next observation tensor
  • action(torch.Tensor): the actions tensor
  • reward(torch.Tensor): the rewards tensor

Returns:

  • (tuple[torch.Tensor, torch.Tensor]): the training input and target tensors
def save(self, save_dir) -> None

Saving the model.

Arguments:

  • save_dir(str): the directory to save the model
def load(self, load_dir) -> None

Loading the model.

Arguments:

  • load_dir(str): the directory to load the model

class ModelLoss(LossCallback):

Trains a dynamic model by minimizing the model loss.

Methods

def __init__(
    self
,
    *model,
    opt,
    lr_schedule,
    max_grad_norm,
    name,
    data_group,
    input_key
) -> None

Arguments:

  • model(DynamicModel): A dynamic model
  • opt(optim.Optimizer): An optimizer.
  • lr_schedule(Optional[optim.lr_scheduler._LRScheduler]): A learning rate scheduler
  • max_grad_norm(float): Clip the norm of the gradient during backprop using this value.
  • name(str): The name of the module. Used e.g. while logging.
  • data_group(str): The name of the data group from which this Loss takes its data.
  • input_key(str)
def loss(self, observation, next_observation, actions, rewards) -> None

class ModelEnv:

Wraps a dynamics model into a gym-like environment.

Methods

def __init__(
    self
,
    *num_envs,
    model,
    termination_fn,
    reward_fn,
    generator,
    input_key
) -> None

Arguments:

  • num_envs(int): the number of envs to simulate in parallel (batch_size).
  • model(DynamicModel): the dynamic model to wrap.
  • termination_fn(TermFnType): a function that receives observations, and returns a boolean flag indicating whether the episode should end or not.
  • reward_fn(Optional[RewardFnType]): a function that receives actions and observations and returns the value of the resulting reward in the environment.
  • generator(Optional[torch.Generator]): a torch random number generator
  • input_key(str)
def reset(self, initial_obs_batch, len_rollout) -> None

Resets the model environment.

Arguments:

  • initial_obs_batch(torch.Tensor): a batch of initial observations.
  • len_rollout(int): the max length of the model rollout
def step(self, actions) -> tuple[Tensor, Tensor, Tensor, dict[str, Tensor]]

Steps the model environment with the given batch of actions.

Arguments:

  • actions(np.ndarray): the actions for each "episode" to rollout. Shape must be batch_size x dim_actions. If a np.ndarray is given, it's converted to a torch.Tensor and sent to the model device.

Returns:

  • (tuple | dict): contains the predicted next observation, reward, done flag. The done flag and rewards are computed using the termination_fn and reward_fn passed in the constructor. The rewards can also be predicted by the model.
def dict_step(
    self,
    actions
) -> tuple[dict[AgentId, DictObservation], dict[str, float]]

The function to step the Gym-like model with dict_action.

Arguments:

  • actions(dict[AgentId, DictResponse]): the dict actions.

Returns:

  • (tuple[dict[AgentId, DictObservation], dict[str, float]]): the predicted next dict observation, reward, and done flag.
def dict_reset(self, obs, len_rollout) -> dict[AgentId, DictObservation]

Resets the model env.

Arguments:

  • obs(torch.Tensor): the initial observations.
  • len_rollout(int): the max rollout length

Returns:

  • the formatted initial observation.

class EnsembleOfGaussian(nn.Module):

Methods

def __init__(
    self
,
    *in_size,
    out_size,
    device,
    num_layers,
    ensemble_size,
    hidden_size,
    learn_logvar_bounds,
    deterministic
) -> None
def default_forward(self, x) -> tuple[torch.Tensor, torch.Tensor]
def forward(self, x) -> tuple[torch.Tensor, torch.Tensor]

Computes mean and logvar predictions for the given input.

Arguments:

  • x(torch.Tensor): the input to the model.

Returns:

  • (tuple of two tensors): the predicted mean and log variance of the output.
def loss(self, model_in, target) -> tuple[torch.Tensor, dict[str, any]]

Computes Gaussian NLL loss.

Arguments:

  • model_in(torch.Tensor): input tensor.
  • target(Optional[torch.Tensor]): target tensor.

Returns:

  • (a tuple of tensor and dict): a loss tensor and a dict which includes extra info.
def sample(self, model_input, rng) -> torch.Tensor

Samples next observation, reward and terminal from the model using the ensemble.

Arguments:

  • model_input(torch.Tensor): the observation and action.
  • rng(torch.Generator): a random number generator.

Returns:

  • predicted observation, rewards, terminal indicator and model state dictionary.
def save(self, save_dir) -> None

Saves the model to the given directory.

Arguments:

  • save_dir(str)
def load(self, load_dir) -> None

Loads the model from the given path.

Arguments:

  • load_dir(str)

class ModelBasedCollector(LoggingMixin, BatchCallback):

ModelBasedCollector class is used to sample rollouts from the trained dynamic model. The rollouts are stored in a replay buffer memory.

Arguments: model_env: The Gym-like dynamic model agent: The policy used to sample actions memory: The memory to store the new synthetic samples rollout_scheduler: A scheduler used to set the rollout-length when unrolling the dynamic model num_bp_to_retain_buffer: The number of BP steps to keep samples. Samples will be over-written (first in first out) for bp steps larger than this. data_group: The data group to receive data from. This must be set to get real (Gym) samples

Methods

def __init__(
    self,
    model_env,
    agent,
    memory,
    rollout_scheduler,
    num_bp_to_retain_buffer,
    data_group,
    input_key
) -> None

Arguments:

  • model_env(ModelEnv)
  • agent(AgentProxy)
  • memory(MemoryProxy)
  • rollout_scheduler(BPStepScheduler)
  • num_bp_to_retain_buffer (default: 1000000)
  • data_group(str) (default: default)
  • input_key(str) (default: obs)
def begin_batch(self) -> None
def get_batch(self, observation) -> None
def collect_sample(self) -> None

Collect a single rollout.

def update_rollout_size(self) -> None

class BatchSampler(BatchCallback):

BatchSampler class is used to provide batches of data for the RL training callbacks. In every BP step, it samples one batch from either the gym buffer or the model buffer based on a Bernoulli probability distribution. It outputs the batch to a separate data-group which will be used by other RL training callbacks.

Arguments: dataloader (MemoryLoader): the dataloader to load data from the model buffer prob_scheduler (BPStepScheduler): the scheduler to update the prob of data samples to come from the model vs. the Gym buffer data_group (str): the data_group to receive data rl_data_group (str): the data_group to upload data for RL training generator (torch.Generator (optional)): an optional random generator

Methods

def __init__(
    self,
    dataloader,
    prob_scheduler,
    data_group,
    rl_data_group,
    generator
) -> None

Arguments:

  • dataloader(MemoryLoader)
  • prob_scheduler(BPStepScheduler)
  • data_group(str) (default: default)
  • rl_data_group(str) (default: rl_buffer)
  • generator(Optional[torch.Generator])
def begin_batch(self) -> None

Generates a batch of data either by sampling from the model buffer or by cloning the input batch

Returns:

  • the batch of data
def sample_model_batch(self) -> None

Samples a batch of data from the model buffer

Returns:

  • batch samples
def use_model_batch(self) -> None

Decides if batch should come from the model-generated buffer

Returns:

  • True if model samples should be used, False otherwise.

class LossProgressCheck(LoggingMixin, BatchCallback):

Methods

def __init__(self, model, num_bp, data_group, input_key) -> None
def begin_batch(self) -> None
def end_cycle(self) -> None
def get_batch(self, observation, next_observation, actions, rewards) -> None

class DeterministicModel(nn.Module):

Methods

def __init__(self, in_size, out_size, device, hidden_size, num_hidden_layers) -> None
def forward(self, x) -> torch.Tensor
def loss(self, model_in, target) -> tuple[torch.Tensor, dict[str, any]]
def sample(self, model_input, rng) -> torch.Tensor

Samples next observation, reward and terminal from the model.

Arguments:

  • model_input(torch.Tensor): the observation and action.
  • rng(torch.Generator): a random number generator.

Returns:

  • predicted observation, rewards, terminal indicator and model state dictionary.