In this notebook, we define the backbone code for RL environments, following OpenAI Gym.
Then, we create some example environments that we shall use in subsequent coding sessions throught the course: we will create three gridworld environments: GridWorld, GridWorld2, and Windy GridWorld. We also create a Qubit environment, and discuss some OpenAI Gym environments.
import numpy as np
from scipy.linalg import expm
class MyEnv():
"""
Gym style environment for RL. You may also inherit the class structure from OpenAI Gym.
Parameters:
n_time_steps: int
Total number of time steps within each episode
seed: int
seed of the RNG (for reproducibility)
"""
def __init__(self, n_time_steps, seed):
"""
Initialize the environment.
"""
### define action space variables
### define state space variables
pass
def step(self, action):
"""
Interface between environment and agent. Performs one step in the environemnt.
Parameters:
action: int
the index of the respective action in the action array
Returns:
output: ( object, float, bool)
information provided by the environment about its current state:
(state, reward, done)
"""
pass
return self.state, reward, done
def set_seed(self,seed=0):
"""
Sets the seed of the RNG.
"""
pass
def reset(self):
"""
Resets the environment to its initial values.
Returns:
state: object
the initial state of the environment
"""
pass
return self.state
def render(self):
"""
Plots the state of the environment. For visulization purposes only.
"""
pass
# ... add extra private and public functions as necessary
Consider the GridWorld problem Example 3.5 from Sutton & Barto's "Reinforcement Learning: an Introduction", (MIT Press, 2018):
A $5\times 5$ grid with open boundary conditions has two pairs of special states: $(A,A')$ and $(B,B')$, such that from state $A$ ($B$) the environment always goes into stte $A'$ ($B'$). The state transitions receive the rewards $r(s,s')$:
From each state $s$, the RL agent can take four possible actions $a$: $north$, $south$, $east$, and $west$.
The action space is discrete four-element set $\mathcal{A}=(north, south, east, west)$
The state space is the two-dimensional grid $\mathcal{S}=\mathbb{Z}_5^2$: each state $s=(m,n)$ is labeled by two integers $m,n\in\{0,1,2,3,4\}$. The special states have the coordinates $A=(1,4)$, $A'=(1,0)$, $B=(3,4)$, and $B'=(3,2)$.
Finally, the reward space is given by the discrete set $\mathcal{R}=\{-1,0,5,10\}$.
class GridWorldEnv():
"""
Gym style environment for GridWorld
Parameters:
n_time_steps: int
Total number of time steps within each episode
seed: int
seed of the RNG (for reproducibility)
"""
def __init__(self, n_time_steps=10, seed=0):
"""
Initialize the environment.
"""
self.n_time_steps = n_time_steps
### define action space variables
self.actions=np.array([0,1,2,3])
#['north', 'south', 'east', 'west'] in coordinate form
self.action_space = [np.array([0,1]), np.array([0,-1]), np.array([1,0]), np.array([-1,0])]
### define state space variables
self.state_A = np.array([1,4])
self.state_Ap = np.array([1,0])
self.state_B = np.array([3,4])
self.state_Bp = np.array([3,2])
# set seed
self.set_seed(seed)
self.reset()
def step(self, action):
"""
Interface between environment and agent. Performs one step in the environemnt.
Parameters:
action: int
the index of the respective action in the action array
Returns:
output: ( np.array, float, bool)
information provided by the environment about its current state:
(state, reward, done)
"""
# check if action tries to take state across the grid boundary
bdry_bool= (self.state[0]==0 and action==3) or (self.state[0]==4 and action==2) \
or (self.state[1]==0 and action==1) or (self.state[1]==4 and action==0)
# environment dynamics (deterministic)
if np.linalg.norm(self.state - self.state_A) < 1E-14:
self.state=self.state_Ap.copy()
reward=10
elif np.linalg.norm(self.state - self.state_B) < 1E-14:
self.state=self.state_Bp.copy()
reward=5
elif bdry_bool:
reward=-1
else:
self.state+=self.action_space[action]
reward=0
done=False # infinite-horizon task
self.current_step += 1
return self.state, reward, done
def set_seed(self,seed=0):
"""
Sets the seed of the RNG.
"""
np.random.seed(seed)
def reset(self):
"""
Resets the environment to its initial values.
Returns:
state: np.array
the initial state of the environment
"""
self.current_step = 0
self.state = np.array([2,2]) #initialize to some state on the grid
return self.state
def sample(self):
"""
Returns a randomly sampled action.
"""
return np.random.choice(self.actions) # equiprobable policy
Let us now test the GridWorld environment. We do so by fixing the number of time steps, n_time_steps
, and the seed
. We then create the environment and reset it. Finally, we want to loop over the
n_time_steps=20
seed=0
env=GridWorldEnv(n_time_steps=n_time_steps,seed=seed)
env.reset()
for _ in range(n_time_steps):
# pick a random action
action=env.sample() # equiprobable policy
# take an environment step
state=env.state.copy()
state_p, reward, done = env.step(action)
print("{}. s={}, a={}, r={}, s'={}".format(_, state, env.action_space[action], reward, state_p))
This is a finite-horizon, i.e. episodic, GridWorld environment. We consider the $4\times 4$ grid from Example 4.1 in Sutton & Barto.
state space: $\mathcal{S} = \{0,1,2,\dots,15\}$, where $0=s=15$ is the terminal state.
action space: $\mathcal{A} = \{north,south, east,west\}$. Actions trying to take the agent off the grid leave the state unchanged: to implement this behavior, we will define smaller actions spaces $\mathcal{A}(s_\mathrm{boundary})$ for all states $s_\mathrm{boundary}$ at the bounary of the grid.
reward space: $\mathcal{R}=\{-1\}$; $r(s,s',a)=-1$ for all states $s,s'\in\mathcal{S}$ and all allowed actions $a\in\mathcal{A}(s)$.
class Episodic_GridWorldEnv():
"""
Gym style environment for GridWorld
Parameters:
n_time_steps: int
Total number of time steps within each episode
seed: int
seed of the RNG (for reproducibility)
"""
def __init__(self, n_time_steps=10, seed=0):
"""
Initialize the environment.
"""
self.n_time_steps = n_time_steps
### define action space variables
#['north', 'south', 'east', 'west']
self.action_space = [np.array([0,1]), np.array([0,-1]), np.array([1,0]), np.array([-1,0])]
# define the allowed actions from every state s, taking into account the boundary
self.actions={}
for m in range(4):
for n in range(4):
if m==0:
if n==0:
self.actions[m,n]=np.array([0,2])
elif n==3:
self.actions[m,n]=np.array([1,2])
else:
self.actions[m,n]=np.array([0,1,2])
elif m==3:
if n==0:
self.actions[m,n]=np.array([0,3])
elif n==3:
self.actions[m,n]=np.array([1,3])
else:
self.actions[m,n]=np.array([0,1,3])
elif 0<m<3:
if n==0:
self.actions[m,n]=np.array([0,2,3])
elif n==3:
self.actions[m,n]=np.array([1,2,3])
else:
self.actions[m,n]=np.array([0,1,2,3])
### define state space variables
# the two terminal states
self.state_T1 = np.array([0,0])
self.state_T2 = np.array([3,3])
# set seed
self.set_seed(seed)
self.reset()
def step(self, action):
"""
Interface between environment and agent. Performs one step in the environemnt.
Parameters:
action: int
the index of the respective action in the action array
Returns:
output: ( np.array, float, bool)
information provided by the environment about its current state:
(state, reward, done)
"""
# check if action tries to take state across the grid boundary
bdry_bool= (self.state[0]==0 and action==3) or (self.state[0]==3 and action==2) \
or (self.state[1]==0 and action==1) or (self.state[1]==3 and action==0)
# environment dynamics (deterministic)
reward=-1 # all trasitions have reward -1
# if state is not at the boundary, update the state
if not bdry_bool:
self.state+=self.action_space[action]
done=False
if np.linalg.norm(self.state - self.state_T1) < 1E-14 or np.linalg.norm(self.state - self.state_T2) < 1E-14:
done=True
self.current_step += 1
return self.state, reward, done
def set_seed(self,seed=0):
"""
Sets the seed of the RNG.
"""
np.random.seed(seed)
def reset(self, random=False):
"""
Resets the environment to its initial values.
Returns:
state: np.array
the initial state of the environment
random: bool
controls whether the initial state is a random state on the grid or a fixed initials state.
"""
self.current_step = 0
if random:
self.state = np.random.randint(4,size=(2))
while np.linalg.norm(self.state - self.state_T1) < 1E-14 or np.linalg.norm(self.state - self.state_T2) < 1E-14:
self.state = np.random.randint(4,size=(2))
else:
self.state = np.array([2,2]) #initialize to some state on the grid
return self.state
Let us test the environment to make sure it is implemented properly. Note that we are fixing the seed, so if you want to see a different output, you should change the value of seed
.
env=Episodic_GridWorldEnv()
seed=4
env.set_seed(seed)
env.reset()
done=False
j=0
while not done:
state=env.state.copy()
#print(env.actions[state[0],state[1]])
# pick a random action
action=np.random.choice(env.actions[state[0],state[1]]) # equiprobable policy from state s
# take an environment step
state_p, reward, done = env.step(action)
print("{0:2d}. s={1}, a={2:}, r={3:2d}, s'={4}".format(j, state, env.action_space[action], reward, state_p))
j+=1
if done:
print('\nreached terminal state!')
break
This is a finite-horizon, i.e. episodic, GridWorld environment. We consider the $10\times 7$ grid from Example 6.5 in Sutton & Barto.
state space: $\mathcal{S} = \{(m,n)|m=0,\dots,9, n=0,\dots,6\}$, where the terminal state is $G=(7,3)$.
action space: $\mathcal{A} = \{north,south, east,west\}$; actions trying to take the agent off the grid leave the state unchanged.
reward space: $\mathcal{R}=\{-1\}$; $r(s,s',a)=-1$ for all states $s,s'\in\mathcal{S}$ and allowed actions $a\in\mathcal{A}(s)$.
class WindyGridWorldEnv():
"""
Gym style environment for GridWorld
Parameters:
n_time_steps: int
Total number of time steps within each episode
seed: int
seed of the RNG (for reproducibility)
"""
def __init__(self, n_time_steps=10, seed=0):
"""
Initialize the environment.
"""
self.n_time_steps = n_time_steps
### define action space variables
#['north', 'south', 'east', 'west']
self.action_space = [np.array([0,1]), np.array([0,-1]), np.array([1,0]), np.array([-1,0])]
# wind shift
self.wind = np.array([0,0,0,1,1,1,2,2,1,0])
### define state space variables
# the initial and terminal states
self.state_S = np.array([0,3]) # initial state
self.state_G = np.array([7,3]) # terminal state
# set seed
self.set_seed(seed)
self.reset()
def step(self, action):
"""
Interface between environment and agent. Performs one step in the environemnt.
Parameters:
action: int
the index of the respective action in the action array
Returns:
output: ( np.array, float, bool)
information provided by the environment about its current state:
(state, reward, done)
"""
# check if action tries to take state across the grid boundary
bdry_bool= (self.state[0]==0 and action==3) or (self.state[0]==9 and action==2) \
or (self.state[1]==0 and action==1) or (self.state[1]==6 and action==0)
# environment dynamics (deterministic)
reward=-1 # all trasitions have reward -1
if not bdry_bool:
# check if wind pushes state outside the boundary
if self.state[1]+self.wind[self.state[0]]+self.action_space[action][1]<=6:
self.state[1]+=self.wind[self.state[0]]
self.state+=self.action_space[action]
# check if state is terminal
done=False
if np.linalg.norm(self.state - self.state_G) < 1E-14:
done=True
self.current_step += 1
return self.state, reward, done
def set_seed(self,seed=0):
"""
Sets the seed of the RNG.
"""
np.random.seed(seed)
def reset(self, random=False):
"""
Resets the environment to its initial values.
Returns:
state: np.array
the initial state of the environment
random: bool
controls whether the initial state is a random state on the grid or a fixed initials state.
"""
self.current_step = 0
self.state = self.state_S.copy() #initialize to S
return self.state
Let us test the Windy GridWorld
env=WindyGridWorldEnv()
env.reset()
done=False
j=0
while not done:
# pick a random action
action=np.random.choice([0,1,2,3]) # equiprobable policy
# take an environment step
state=env.state.copy()
state_p, reward, done = env.step(action)
print("{}. s={}, a={}, r={}, s'={}".format(j, state, env.action_space[action], reward, state_p))
j+=1
if done:
print('\nreached terminal state!')
break
We now define an environment for a quantum bit of information (qubit).
The state of a qubit $|\psi\rangle\in\mathbb{C}^2$ is modeled by a two-dimensional complex-valued vector with unit norm: $\langle\psi|\psi\rangle:=\sqrt{|\psi_1|^2+|\psi_2|^2}=1$. Every qubit state is uniquely described by two angles $\theta\in[0,\pi]$ and $\varphi\in[0,2\pi)$:
\begin{eqnarray} |\psi\rangle= \begin{pmatrix} \psi_1 \\ \psi_2 \end{pmatrix}= \mathrm{e}^{i\alpha} \begin{pmatrix} \cos\frac{\theta}{2} \\ \mathrm{e}^{i\varphi}\sin\frac{\theta}{2} \end{pmatrix} \end{eqnarray}The overall phase $\alpha$ of a single quantum state has no physical meaning. Thus, any qubit state can be pictured as an arrow on the unit sphere (called the Bloch sphere) with coordinates $(\theta,\phi)$.
To operate on qubits, we use quantum gates. Quantum gates are represented as unitary transformations $U\in \mathrm{U(2)}$, where $\mathrm{U(2)}$ is the unitary group. Gates act on qubit states by matrix multiplication to transform an input state $|\psi\rangle$ to the output state $|\psi'\rangle$: $|\psi'\rangle=U|\psi\rangle$. For this problem, we consider four gates
\begin{equation} U_0=\boldsymbol{1},\qquad U_x=\mathrm{exp}(-i\delta t \sigma^x/2),\qquad U_y=\mathrm{exp}(-i\delta t \sigma^y/2),\qquad U_z=\mathrm{exp}(-i\delta t \sigma^z/2), \end{equation}where $\delta t$ is a fixed time step, $\mathrm{exp}(\cdot)$ is the matrix exponential, $\boldsymbol{1}$ is the identity, and the Pauli matrices are defined as
\begin{equation} \boldsymbol{1}=\begin{pmatrix} 1 & 0 \\ 0 & 1 \end{pmatrix} ,\qquad \sigma^x=\begin{pmatrix} 0 & 1 \\ 1 & 0 \end{pmatrix} ,\qquad \sigma^y=\begin{pmatrix} 0 & -i \\ i & 0 \end{pmatrix} ,\ \qquad \sigma^z=\begin{pmatrix} 1 & 0 \\ 0 & -1 \end{pmatrix} \end{equation}To determine if a qubit, described by the state $|\psi\rangle$, is in a desired target state $|\psi_\mathrm{target}\rangle$, we compute the fidelity
\begin{eqnarray} F=|\langle\psi_\mathrm{target}|\psi\rangle|^2 = |(\psi_\mathrm{target})^\ast_1 \psi_1 + (\psi_\mathrm{target})^\ast_2 \psi_2|^2,\qquad F\in[0,1] \end{eqnarray}where $\ast$ stands for complex conjugation. Physically, the fidelity corresponds to the angle between the arrows representing the qubit state on the Bloch sphere (we want to maximize the fidelity but minimize the angle between the states).
Now, let us define an RL environment, which contains the laws of physics that govern the dynamics of the qubit (i.e. the application of the gate operations to the qubit state). Our RL agent will later interact with this environment to learn how to control the qubit to bring it from an initial state to a prescribed target state.
We define the RL states $s=(\theta,\varphi)$ as an array containing the Bloch sphere angles of the quantum state. Each step within an episode, the agent can choose to apply one out of the actions, corresponding to the four gates $(\boldsymbol{1},U_x,U_y,U_z)$. We use the instantaneous fidelity w.r.t. the target state as a reward: $r_t=F=|\langle\psi_\ast|\psi(t)\rangle|^2$:
state space: $\mathcal{S} = \{(\theta,\varphi)|\theta\in[0,\pi],\varphi\in[0,2\pi)\}$. The terminal states are a region of the Bloch sphere around the target state $|\psi_\mathrm{target}\rangle=(1,0)^t$ (i.e. the qubit state we want to prepare): the target qubit state has the Bloch sphere coordinates $s_\mathrm{terminal}=(0,0)$, so the region corresponds to polar cap close to the pole; the size of the polar cap is set by some small number cap_size=1E-2
.
action space: $\mathcal{A} = \{\boldsymbol{1},U_x,U_y,U_z\}$. Actions act on RL states as follows:
reward space: $\mathcal{R}=[0,1]$. We use the fidelity between the next state $s'$ and the terminal state $s_\mathrm{terminal}$ as a reward at every episode step:
$$r(s,s',a)= F = |\langle\psi_\mathrm{target}|U_a|\psi(s)\rangle|^2=|\langle\psi_\mathrm{target}|\psi(s')\rangle|^2$$for all states $s,s'\in\mathcal{S}$ and actions $a\in\mathcal{A}$.
class QubitEnv():
"""
Gym style environment for RL. You may also inherit the class structure from OpenAI Gym.
Parameters:
n_time_steps: int
Total number of time steps within each episode
seed: int
seed of the RNG (for reproducibility)
"""
def __init__(self, n_time_steps, seed):
"""
Initialize the environment.
"""
self.n_time_steps = n_time_steps
### define action space variables
delta_t = 2*np.pi/n_time_steps # set a value for the time step
# define Pauli matrices
Id =np.array([[1.0,0.0 ], [0.0 ,+1.0]])
sigma_x=np.array([[0.0,1.0 ], [1.0 , 0.0]])
sigma_y=np.array([[0.0,-1.0j], [1.0j, 0.0]])
sigma_z=np.array([[1.0,0.0 ], [0.0 ,-1.0]])
self.action_space=[]
for generator in [Id, sigma_x, sigma_y, sigma_z]:
self.action_space.append( expm(-1j*delta_t*generator) )
### define state space variables
self.S_terminal = np.array([0.0,0.0])
self.psi_terminal = self.RL_to_qubit_state(self.S_terminal)
self.cap_size = 1E-2
# set seed
self.set_seed(seed)
self.reset()
def step(self, action):
"""
Interface between environment and agent. Performs one step in the environemnt.
Parameters:
action: int
the index of the respective action in the action array
Returns:
output: ( object, float, bool)
information provided by the environment about its current state:
(state, reward, done)
"""
# apply gate to quantum state
self.psi = self.action_space[action].dot(self.psi)
# compute RL state
self.state = self.qubit_to_RL_state(self.psi)
# compute reward
reward = np.abs( self.psi_terminal.conj().dot(self.psi) )**2
# check if state is terminal
done=False
if np.abs(reward - 1.0) < self.cap_size:
done=True
return self.state, reward, done
def set_seed(self,seed=0):
"""
Sets the seed of the RNG.
"""
np.random.seed(seed)
def reset(self, random=True):
"""
Resets the environment to its initial values.
Returns:
state: object
the initial state of the environment
random: bool
controls whether the initial state is a random state on the sphere or a fixed initial state.
"""
if random:
theta = np.pi*np.random.uniform(0.0,1.0)
phi = 2*np.pi*np.random.uniform(0.0,1.0)
else:
# start from south pole of Bloch sphere
theta=np.pi
phi=0.0
self.state=np.array([theta,phi])
self.psi=self.RL_to_qubit_state(self.state)
return self.state
def render(self):
"""
Plots the state of the environment. For visulization purposes only.
"""
pass
def RL_to_qubit_state(self,s):
"""
Take as input the RL state s, and return the quantum state |psi>
"""
theta, phi = s
psi = np.array([np.cos(0.5*theta), np.exp(1j*phi)*np.sin(0.5*theta)] )
return psi
def qubit_to_RL_state(self,psi):
"""
Take as input the RL state s, and return the quantum state |psi>
"""
# take away unphysical global phase
alpha = np.angle(psi[0])
psi_new = np.exp(-1j*alpha) * psi
# find Bloch sphere angles
theta = 2.0*np.arccos(psi_new[0]).real
phi = np.angle(psi_new[1])
return np.array([theta, phi])
np.set_printoptions(suppress=True,precision=2)
n_time_steps = 100
seed=6
env=QubitEnv(n_time_steps,seed)
env.reset(random=True)
done=False
j=0
while not done:
# pick a random action
action=np.random.choice([0,1,2,3]) # equiprobable policy
# take an environment step
state=env.state.copy()
state_p, reward, done = env.step(action)
print("{}. s={}, a={}, r={}, s'={}\n".format(j, state, action, np.round(reward,6), state_p))
j+=1
if done:
print('\nreached terminal state!')
break
Next, we shall look at some OpenAI anvironments: Atari video games, the Cart Pole problem, and the Mountain Car problem.
import matplotlib.pyplot as plt
%matplotlib inline
from IPython import display
import gym
from IPython import display
import matplotlib
import matplotlib.pyplot as plt
%matplotlib inline
#env = gym.make('BreakoutDeterministic-v4')
#env = gym.make('SpaceInvaders-v0')
#env = gym.make('CartPole-v1')
env = gym.make('MountainCar-v0')
env.reset()
img = plt.imshow(env.render(mode='rgb_array')) # only call this once
n_time_steps=100
for _ in range(n_time_steps):
# plot frame
img.set_data(env.render(mode='rgb_array')) # just update the data
display.display(plt.gcf())
display.clear_output(wait=True)
# choose action
action = env.action_space.sample()
# take action
frame, reward, is_done, _ = env.step(action)
print(frame.shape, reward, is_done, _)
print(env.__dir__() )