Agents/Policies

The Batcher, Agent, EpisodeBatcher classes are now deprecated (but still working). The simplified alternative is to use RL_Agent and RL_Batcher instead.

An agent is a parameterized policy

An agent is the (only) abstraction needed to allow rlstructures to collect interactions at scale. One Agent corresponds to a set of policies (formally \(\pi_z\))

  • An Agent class represents a policy (or multiple policies through the agent_info argument) acting on a batch of environment

  • An Agent may include (or not) one or multiple pytorch modules

  • An Agent is stateless, and only implements a __call__ method

  • The __call__(agent_state,observation,agent_info=None,history=None) methods take as an input:

    • agent_state is the state of the agent at time t-1 (as a DictTensor)

    • observation comes from the rlstructures.VecEnv environment

    • agent_info corresponds to additional (the \(z\) in \(\pi_z\)) information provided to the agent (e.g the value of epsilon for epsilon-greedy policies)

    • history may be a TemporalDictTensor representing a set of previous transitions (e.g. used for implementing Transformer based methods, but its value is always None in the default implementation of an agent), and activated only if Agent.require_history()==True.

  • Note that agent_state.n_elems()==observation.n_elems() which is the number of environments on which the agent is computed.

  • agent_info=None, history=None is mandatory in the method definition, and the agent must initialize, for itself, the value of agent_info if agent_info is None

As an output, the __call__ method returns a triplet (old_state,action,new_state) where:

  • action is the action outputed by the agent as a DictTensor. Note that action.n_elems()==observation.n_elems(). This information will be transmitted to the environment through the env.step method. Note also that the action may contain any information that you would like to store in the resulting trajectory like debugging information for instance (e.g. agent step).

  • new_state is the update of the state of the agent at time t+1. This new state is the information transmitted to the agent at the next call.

  • old_state is the state of the agent before action/new_state computation

    • Conceptually, (old_state,observation,action,new_state) corresponds to a transition in the underlying MDP

    • In most of the cases, old_state is strictly equal to agent_state

    • When agent_state is None, the agent will have to initialize itself, such that old_state will be the initial state of the agent

Please, consider the tutorial examples to see different agent implementations.

We provide here an example of Agent:

from rlstructures import Agent,DictTensor
import torch

class UniformAgent(Agent):
    def __init__(self,n_actions):
        super().__init__()
        self.n_actions=n_actions

    def __call__(self,state,observation,agent_info=None,history=None):
        B=observation.n_elems()

        agent_state=None
        if state is None:
            agent_state=DictTensor({"timestep":torch.zeros(B).long()})
        else:
            agent_state=state

        scores=torch.randn(B,self.n_actions)
        probabilities=torch.softmax(scores,dim=1)
        actions=torch.distributions.Categorical(probabilities).sample()
        new_state=DictTensor({"timestep":agent_state["timestep"]+1})
        return agent_state,DictTensor({"action":actions}),new_state

Agent and Batcher

An Agent and a VecEnv are used together through a Batcher to collect episodes or trajectories (a trajectory is a piece of episode). The simplest Batcher is the MonoThreadEpisodeBatcher which is running in the main process. rlstructures also provides: * EpisodeBatcher which is a multi-processes batcher sampling full episodes * Batcher which is a multi-processed batcher sampling N next timesteps

The multi-process batchers are described later in the documentation.

Creating a batcher involved providing functions able to create the ‘rlstructures.VecEnv’ and the ‘rlstructures.Agent’ objects as (pickable) functions and arguments.

import gym
from gym.wrappers import TimeLimit
from rlstructures.env_wrappers import GymEnv

def create_env(max_episode_steps=100):
    envs=[]
    for k in range(4):
        e=gym.make("CartPole-v0")
        e=TimeLimit(e, max_episode_steps=max_episode_steps)
    return GymEnv(envs,seed=10)

def create_agent(n_actions):
    return UniformAgent(n_actions)

The creation of the batcher is quite simple.

from rlstructures.batchers import EpisodeBatcher
batcher=EpisodeBatcher(
        create_agent=create_agent,
        agent_args={"n_actions":2},
        create_env=create_env,
        env_args={"max_episode_steps":100}
)

Depending on the batcher, one may then use different acquisition functions In the mono-process case, one can use the

  • execute(agent_info=None,env_info=None) function returns env.n_envs() episodes

  • Acquired episodes are accessible by calling the get method that returns a TemporalDictTensor

  • Note that, at each execution, the user has to provide an agent_info value that will be transmitted to each of the agents, and an env_info value that will be transmitted to each environment (through the reset function), allowing the execution of multiple policies on multiple environments in a single batcher call.

And finally, consider that each agent implements the Agent.update function that will allow one to update the parameters of the agent.

Batchers

Batchers are objects allowing the execution of multiple policies over multiple environments, using multiple processes. We provide two batchers: 1) EpisodeBatcher to acquire complete episodes (until a done is reached) 2) Batcher to acquire the n next timesteps over multiple environments

Examples about how to use batchers are given in the other sections.

Important: All the information (e.g DictTensor) produced by both the Agent and the Env will be available to the user in the returned TemporalDictTensor

Parallelization Schema

The generic parallelization schema is illustrated in the following picture.

Parallelization Schema
  • One batcher creates multiple processes

  • Each process contains a copy of the Agent and a copy of the rlstructures.VecEnv (the copy are made through create_agent and create_env functions that are arguments at the batcher creation)

    • In our case, each Agent as its own copy of the pytorch model (note that it can be a shared_memory model to avoid to use extra memory)

  • Each VecEnv corresponds to multiple simple environments

  • At execute, the processes start to acquire information by simulating the interaction betweeh the agent and the environments

  • At get, the information collected by the processes are merged to a TemporalDictTensor that is the output of get

  • The call of Batcher.update will call Agent.update in all the processes to typically update the model of each Agent

Multiprocess Batcher

A trajectory batcher will just acquire N timesteps (and not full episodes)

  • n_timesteps is the number of steps to acquire at each call

  • n_slots is the number of simulatenous acquisitions which is typically n_slots=n_threads*n_envs

batcher=Batcher(
        n_timesteps=100,
        n_slots=16,
        n_threads=4,
        seeds=[1,2,3,4],
        create_agent=create_agent,
        agent_args={"n_actions":2},
        create_env=create_env,
        env_args={"max_episode_steps":100}
)

A trajectory batcher has to be reset with corresponding agent_info and env_info values.

Then calling execute will acquire the next T steps (over environment instances that are still running). The execute method will return None if all environments have stopped

batcher.reset(agent_info=DictTensor({"agent_id":torch.arange(16)}),env_info=DictTensor({"env_id":torch.arange(16)}))
import time

batcher.execute()
t=batcher.get()

while not t is None:
    batcher.execute()
    t=batcher.get(blocking=True)

Multiprocess Episode Batcher

Let us consider that we define multiple environments identified by an environment_id, such that two environments with two different ids does not behave exactly the same. This can be easily implemented by using the env_info* argument in the reset function:

from rlstructures import Agent,DictTensor
import torch
import os
import sys
import gym
from gym.wrappers import TimeLimit
from rlstructures.env_wrappers import GymEnv
from rlstructures.batchers import EpisodeBatcher,Batcher
import gym
from gym.utils import seeding

class MyEnv(gym.Env):
    def __init__(self):
        super().__init__()

    def seed(self,seed=None):
        self.np_random,seed=seeding.np_random(seed)

    def reset(self,env_info={"env_id":0}):
        assert "env_id" in env_info
        self.env_id=env_info["env_id"]
        self.x=self.np_random.rand()*2.0-1.0
        self.identifier=self.np_random.rand()
        obs={"x":self.x,"identifier":self.identifier,"env_id":self.env_id}
        return obs

    def step(self,action):
        if action==0:
            self.x-=0.3
        else:
            self.x+=0.3
        done = self.x<-1 or self.x>1

        obs={"x":self.x,"identifier":self.identifier,"env_id":self.env_id},self.x,done,{}
        return obs

As you can see, the env_info can be used as an input parameter for the environment allowing one to model multiple environments through a single class.

We can do the same with agents, and implement an Agent that is parametrized by an agent_info. In our case, the agent is just an agent outputting its agent_id as an action. Advanced examples are shown in the rlaglos directory (e.g stochastic/deterministic polices, epsilon-greedy policies, …)

class UniformAgent(Agent):
    def __init__(self,n_actions):
        super().__init__()
        self.n_actions=n_actions

    def __call__(self,state,observation,agent_info=None,history=None):
        B=observation.n_elems()
        agent_state=None

        #Initialize agent_info is not specified
        if agent_info is None:
            agent_info=DictTensor({"agent_id":torch.tensor([0]).repeat(B)})

        #initialize the state of the agent if not specified
        if state is None:
            agent_state=DictTensor({"timestep":torch.zeros(B).long()})
        else:
            agent_state=state

        scores=torch.randn(B,self.n_actions)
        probabilities=torch.softmax(scores,dim=1)
        actions=torch.distributions.Categorical(probabilities).sample()
        new_state=DictTensor({"timestep":agent_state["timestep"]+1})
        # We also decide to output the action probabilities
        return agent_state,DictTensor({"action":actions,"action_probabilities":probabilities,"agent_id":agent_info["agent_id"]}),new_state

By specifying a particular value of env_info and agent_info when calling the batcher.execute method, the user may control which agent interacts with which environment. Let us illustrate this using Multi-processes batchers

def create_env(seed=0,max_episode_steps=100):
    envs=[]
    for k in range(4):
        e=MyEnv()
        e=TimeLimit(e, max_episode_steps=max_episode_steps)
        envs.append(e)
    return GymEnv(envs,seed=seed)

def create_agent(buffer=None,n_actions=None):
    # Here, the buffer argument must be specified
    return UniformAgent(buffer,n_actions)

Since we are using multi-process batchers, we have to switch to spawn mode.

if __name__ == "__main__":
    import torch.multiprocessing as mp
    mp.set_start_method("spawn")

The EpisodeBatcher will sample full episodes (until the environment returns done==True) If one considers a rlstructures.VecEnv env, and n_threads (or processes), then the batcher will sample n_episodes = N * env.n_envs()*n_threads episodes at each execution (where N is chosen by the user) * seeds is a list of environment seeds, one seed per process * n_timesteps is the maximum size of the episodes * n_slots is the maximum number of episodes simultaneously acquired

   batcher=EpisodeBatcher(
           n_timesteps=100,
           n_slots=128,
           n_threads=4,
           seeds=[1,2,3,4],
           create_agent=create_agent,
           agent_args={"n_actions":2},
           create_env=create_env,
           env_args={"max_episode_steps":100}
   )


Since we will sample 32 episodes, we need to configure the 32 agents and 32 environments that will interact:
agent_info=DictTensor({"agent_id":torch.arange(32)})
env_info=DictTensor({"env_id":torch.arange(32)})

Executing the batcher will start the acquisition process. It is a non-blocking function that launches the acqusition:

batcher.execute(n_episodes=32,agent_info=agent_info,env_info=env_info)

Getting episodes is done by using get. Note that when blocking=True, the process will wait until the end of the acquisition (examples of blocking=False are given in the tutorials).

trajectories=batcher.get(blocking=True)
  • the reexecute method is a shortcut to call execute again with the same arguments:

batcher.reexecute()
trajectories=batcher.get()