module emote.memory.memory

Sequence builder collates observations into sequences stored in the memory.

The sequence builder is the API between "instant" based APIs such as the agent proxy and the episode-based functionality of the memory implementation. The goal of the sequence builder is to consume individual timesteps per agent and collate them into episodes before submission into the memory.

Classes

class Episode:

An episode of data being constructed.

Fields

  • data: Dict[str, List[Matrix]] = field(default_factory=lambda : defaultdict(list))

Methods

def append(self, observation) -> Tuple
def complete(self, observation) -> Mapping[str, Matrix]
def from_initial(observation) -> Episode

class MemoryTableProxy:

The sequence builder wraps a sequence-based memory to build full episodes from [identity, observation] data.

Not thread safe.

Methods

def __init__(
    self,
    memory_table,
    minimum_length_threshold,
    use_terminal
,
    *name
) -> None

Arguments:

  • memory_table(MemoryTable)
  • minimum_length_threshold(Optional[int])
  • use_terminal(bool)
  • name(str)
def name(self) -> None
def size(self) -> None
def resize(self, new_size) -> None
def store(self, path) -> None
def is_initial(self, identity) -> None

Returns true if identity is not already used in a partial sequence. Does not validate if the identity is associated with a complete episode.

Arguments:

  • identity(int)
def add(self, observations, responses) -> None
def timers(self) -> None

class MemoryProxyWrapper:

Base class for memory proxy wrappers. This class forwards non-existing method accessess to the inner MemoryProxy or MemoryProxyWrapper.

Methods

def __init__(self, inner) -> None

Arguments:

  • inner('MemoryProxyWrapper' | MemoryProxy)
def state_dict(self) -> dict[str, Any]
def load_state_dict(
    self,
    state_dict,
    load_network,
    load_optimizer,
    load_hparams
) -> None

class MemoryTableProxyWrapper(MemoryProxyWrapper):

Methods

def __init__(self, *inner) -> None
def store(self, path) -> None

class LoggingProxyWrapper(LoggingMixin, MemoryTableProxyWrapper):

Methods

def __init__(self, inner, writer, log_interval) -> None
def state_dict(self) -> dict[str, Any]
def load_state_dict(
    self,
    state_dict,
    load_network,
    load_optimizer,
    load_hparams
) -> None
def add(self, observations, responses) -> None
def report(self, metrics, metrics_lists) -> None
def get_report(
    self,
    keys
) -> Tuple[dict[str, int | float | list[float]], dict[str, list[float]]]

class MemoryExporterProxyWrapper(LoggingMixin, MemoryTableProxyWrapper):

Export the memory at regular intervals.

Methods

def __init__(
    self,
    memory,
    target_memory_name,
    inf_steps_per_memory_export,
    experiment_root_path,
    min_time_per_export
) -> None

Arguments:

  • memory(MemoryTableProxy | MemoryTableProxyWrapper)
  • target_memory_name
  • inf_steps_per_memory_export
  • experiment_root_path(str)
  • min_time_per_export(int) (default: 600)
def add(self, observations, responses) -> None

First add the new batch to the memory.

Arguments:

  • observations(Dict[AgentId, DictObservation])
  • responses(Dict[AgentId, DictResponse])

class MemoryLoader:

Methods

def __init__(
    self,
    memory_table,
    rollout_count,
    rollout_length,
    size_key,
    data_group
) -> None
def is_ready(self) -> None

True if the data loader has enough data to start providing data.

class JointMemoryLoader:

A memory loader capable of loading data from multiple MemoryLoaders.

Methods

def __init__(self, loaders, size_key) -> None

Arguments:

  • loaders(list[MemoryLoader])
  • size_key(str) (default: batch_size)
def is_ready(self) -> None

class JointMemoryLoaderWithDataGroup(JointMemoryLoader):

A JointMemoryLoader that places its data inside of a user-specified datagroup.

Methods

def __init__(self, loaders, data_group, size_key) -> None

Arguments:

  • loaders(list[MemoryLoader])
  • data_group(str)
  • size_key(str) (default: batch_size)

class MemoryWarmup(Callback):

A blocker to ensure memory has data. This ensures the memory has enough data when training starts, as the memory will panic otherwise. This is useful if you use an async data generator.

If you do not use an async data generator this can deadlock your training loop and prevent progress.

Methods

def __init__(self, loader, exporter, shutdown_signal) -> None

Arguments:

  • loader(MemoryLoader)
  • exporter(Optional[OnnxExporter])
  • shutdown_signal(Optional[Callable[[], bool]])
def begin_training(self) -> None