TensorFlow Environments
Introduction
In reinforcement learning (RL), an environment is the world in which an agent operates. It defines the rules of interaction, the states the agent can observe, the actions it can take, and the rewards it receives. TensorFlow provides several tools and libraries to create, manage, and interact with these environments for reinforcement learning tasks.
In this tutorial, we'll explore how to work with environments in TensorFlow-based reinforcement learning. We'll cover the basics of environment interfaces, how to create custom environments, and how to use pre-built environments from popular libraries like OpenAI Gym and TF-Agents.
Understanding RL Environments in TensorFlow
Environments in reinforcement learning follow a standard pattern of interaction:
- The agent observes the current state of the environment
- The agent selects an action based on the observed state
- The environment transitions to a new state based on the action
- The environment provides a reward signal to the agent
- The process repeats until a terminal state is reached
In TensorFlow's reinforcement learning ecosystem, environments are typically implemented as Python classes that follow specific interfaces to ensure compatibility with RL algorithms.
TF-Agents Environment Interface
TF-Agents is TensorFlow's official library for reinforcement learning. It provides a standardized environment interface that all environments must implement:
class Environment(object):
def reset(self):
"""Resets the environment and returns an initial observation."""
pass
def step(self, action):
"""Takes an action and returns a time step (next_state, reward, done, info)."""
pass
def observation_spec(self):
"""Returns the observation spec."""
pass
def action_spec(self):
"""Returns the action spec."""
pass
The key methods are:
reset()
: Initializes or resets the environment to its starting statestep(action)
: Takes an action and returns the new state, reward, whether the episode is done, and additional infoobservation_spec()
: Describes the shape and type of observationsaction_spec()
: Describes the shape and type of valid actions
Using OpenAI Gym Environments with TensorFlow
One of the most popular collections of reinforcement learning environments is OpenAI Gym. TF-Agents provides wrappers to use Gym environments seamlessly.
First, let's install the necessary libraries:
pip install tensorflow tf-agents gym
Here's how to use a Gym environment with TF-Agents:
import gym
import numpy as np
import tensorflow as tf
from tf_agents.environments import suite_gym
from tf_agents.environments.tf_py_environment import TFPyEnvironment
# Create a Gym environment
gym_env = suite_gym.load('CartPole-v1')
# Convert to TF environment
tf_env = TFPyEnvironment(gym_env)
# Print environment specs
print("Observation Spec:")
print(tf_env.observation_spec())
print("\nAction Spec:")
print(tf_env.action_spec())
# Reset the environment and get initial time step
time_step = tf_env.reset()
print("\nInitial time step:")
print(time_step)
# Take a random action
action = tf.constant([1], dtype=tf.int32) # 1 means push cart to the right
next_time_step = tf_env.step(action)
print("\nNext time step after action:")
print(next_time_step)
Output (may vary):
Observation Spec:
BoundedTensorSpec(shape=(4,), dtype=tf.float32, name='observation', minimum=[-4.8000002e+00 -3.4028235e+38 -4.1887903e-01 -3.4028235e+38], maximum=[4.8000002e+00 3.4028235e+38 4.1887903e-01 3.4028235e+38])
Action Spec:
BoundedTensorSpec(shape=(), dtype=tf.int32, name='action', minimum=0, maximum=1)
Initial time step:
TimeStep(step_type=<tf.Tensor: shape=(1,), dtype=int32, numpy=array([0], dtype=int32)>, reward=<tf.Tensor: shape=(1,), dtype=float32, numpy=array([0.], dtype=float32)>, discount=<tf.Tensor: shape=(1,), dtype=float32, numpy=array([1.], dtype=float32)>, observation=<tf.Tensor: shape=(1, 4), dtype=float32, numpy=array([[ 0.03073904, -0.00145001, -0.03449621, 0.0095754 ]], dtype=float32)>)
Next time step after action:
TimeStep(step_type=<tf.Tensor: shape=(1,), dtype=int32, numpy=array([1], dtype=int32)>, reward=<tf.Tensor: shape=(1,), dtype=float32, numpy=array([1.], dtype=float32)>, discount=<tf.Tensor: shape=(1,), dtype=float32, numpy=array([1.], dtype=float32)>, observation=<tf.Tensor: shape=(1, 4), dtype=float32, numpy=array([[ 0.03070999, 0.19338598, -0.0343473, -0.28843832]], dtype=float32)>)
Creating a Custom Environment
To create your own environment, you can implement the TF-Agents environment interface. Here's a simple example of a custom grid-world environment:
import numpy as np
from tf_agents.environments import py_environment
from tf_agents.specs import array_spec
from tf_agents.trajectories import time_step as ts
class SimpleGridWorldEnv(py_environment.PyEnvironment):
"""A simple 4x4 grid world environment with a target location."""
def __init__(self):
# 0: up, 1: right, 2: down, 3: left
self._action_spec = array_spec.BoundedArraySpec(
shape=(), dtype=np.int32, minimum=0, maximum=3, name='action')
# Environment is a 4x4 grid represented as (x, y) coordinates
self._observation_spec = array_spec.BoundedArraySpec(
shape=(2,), dtype=np.int32, minimum=0, maximum=3, name='observation')
self._state = [0, 0] # Starting position at top-left
self._target = [3, 3] # Target at bottom-right
self._episode_ended = False
def action_spec(self):
return self._action_spec
def observation_spec(self):
return self._observation_spec
def _reset(self):
self._state = [0, 0]
self._episode_ended = False
return ts.restart(np.array(self._state, dtype=np.int32))
def _step(self, action):
if self._episode_ended:
# The last action ended the episode. Ignore the current action and start
# a new episode.
return self.reset()
# Make sure the action is valid
if action < 0 or action > 3:
raise ValueError(f'`action` should be between 0 and 3, got {action}')
# Apply the action to update the state
if action == 0: # up
self._state[1] = max(0, self._state[1] - 1)
elif action == 1: # right
self._state[0] = min(3, self._state[0] + 1)
elif action == 2: # down
self._state[1] = min(3, self._state[1] + 1)
elif action == 3: # left
self._state[0] = max(0, self._state[0] - 1)
# Check if we reached the target
if self._state[0] == self._target[0] and self._state[1] == self._target[1]:
self._episode_ended = True
return ts.termination(np.array(self._state, dtype=np.int32), reward=10)
else:
return ts.transition(
np.array(self._state, dtype=np.int32), reward=-1, discount=0.9)
Now let's test our custom environment:
from tf_agents.environments.tf_py_environment import TFPyEnvironment
# Create an instance of our environment
env = SimpleGridWorldEnv()
# Convert to a TensorFlow environment
tf_env = TFPyEnvironment(env)
# Reset the environment
time_step = tf_env.reset()
print("Initial state:", time_step.observation.numpy())
# Take a sequence of actions to reach the target
actions = [1, 1, 1, 2, 2, 2] # right, right, right, down, down, down
rewards = []
for action in actions:
time_step = tf_env.step(tf.constant([action]))
print(f"Action: {action}, New state: {time_step.observation.numpy()[0]}, Reward: {time_step.reward.numpy()[0]}")
rewards.append(time_step.reward.numpy()[0])
print(f"Total reward: {sum(rewards)}")
Output:
Initial state: [[0 0]]
Action: 1, New state: [1 0], Reward: -1.0
Action: 1, New state: [2 0], Reward: -1.0
Action: 1, New state: [3 0], Reward: -1.0
Action: 2, New state: [3 1], Reward: -1.0
Action: 2, New state: [3 2], Reward: -1.0
Action: 2, New state: [3 3], Reward: 10.0
Total reward: 5.0
Vectorized Environments
For more efficient training, TF-Agents supports running multiple environment instances in parallel. This is especially useful when using algorithms like PPO or A3C:
from tf_agents.environments import tf_py_environment
from tf_agents.environments import parallel_py_environment
# Create 4 environments in parallel
env_constructors = [lambda: suite_gym.load('CartPole-v1') for _ in range(4)]
parallel_env = parallel_py_environment.ParallelPyEnvironment(env_constructors)
tf_parallel_env = tf_py_environment.TFPyEnvironment(parallel_env)
# Check the batch size
print("Batch size:", tf_parallel_env.batch_size)
print("Observation shape:", tf_parallel_env.observation_spec().shape)
Output:
Batch size: 4
Observation shape: (4,)
Environment Wrappers
TF-Agents provides environment wrappers that can modify the behavior of existing environments:
from tf_agents.environments import suite_gym
from tf_agents.environments.wrappers import TimeLimit
# Load a gym environment with a time limit of 100 steps
env = suite_gym.load('MountainCar-v0', max_episode_steps=100)
# Print original time limit
print("Original time limit:", env._max_episode_steps)
# Wrap with a new time limit
limited_env = TimeLimit(env, 50)
print("New time limit:", limited_env._max_episode_steps)
Output:
Original time limit: 100
New time limit: 50
Real-World Application: Training a DQN Agent
Let's put everything together and train a simple Deep Q-Network (DQN) agent on the CartPole environment:
from tf_agents.agents.dqn import dqn_agent
from tf_agents.environments import suite_gym
from tf_agents.environments import tf_py_environment
from tf_agents.networks import q_network
from tf_agents.policies import random_tf_policy
from tf_agents.replay_buffers import tf_uniform_replay_buffer
from tf_agents.trajectories import trajectory
from tf_agents.utils import common
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
# Set up the environment
env_name = 'CartPole-v1'
train_py_env = suite_gym.load(env_name)
eval_py_env = suite_gym.load(env_name)
train_env = tf_py_environment.TFPyEnvironment(train_py_env)
eval_env = tf_py_environment.TFPyEnvironment(eval_py_env)
# Set up the Q-network
fc_layer_params = (100,)
q_net = q_network.QNetwork(
train_env.observation_spec(),
train_env.action_spec(),
fc_layer_params=fc_layer_params)
# Set up the optimizer
optimizer = tf.keras.optimizers.Adam(learning_rate=1e-3)
# Set up the DQN agent
train_step_counter = tf.Variable(0)
agent = dqn_agent.DqnAgent(
train_env.time_step_spec(),
train_env.action_spec(),
q_network=q_net,
optimizer=optimizer,
td_errors_loss_fn=common.element_wise_squared_loss,
train_step_counter=train_step_counter)
agent.initialize()
# Define a function to collect data
def collect_step(environment, policy, buffer):
time_step = environment.current_time_step()
action_step = policy.action(time_step)
next_time_step = environment.step(action_step.action)
traj = trajectory.from_transition(time_step, action_step, next_time_step)
buffer.add_batch(traj)
# Define a simple evaluation function
def compute_avg_return(environment, policy, num_episodes=10):
total_return = 0.0
for _ in range(num_episodes):
time_step = environment.reset()
episode_return = 0.0
while not time_step.is_last():
action_step = policy.action(time_step)
time_step = environment.step(action_step.action)
episode_return += time_step.reward
total_return += episode_return
avg_return = total_return / num_episodes
return avg_return.numpy()[0]
# Set up the replay buffer
replay_buffer = tf_uniform_replay_buffer.TFUniformReplayBuffer(
data_spec=agent.collect_data_spec,
batch_size=train_env.batch_size,
max_length=10000)
# Collect initial data with a random policy
random_policy = random_tf_policy.RandomTFPolicy(
train_env.time_step_spec(), train_env.action_spec())
for _ in range(1000):
collect_step(train_env, random_policy, replay_buffer)
# Set up the training loop
num_iterations = 1000
eval_interval = 100
batch_size = 64
# (Optional) Initialize the dataset once for better performance
dataset = replay_buffer.as_dataset(
num_parallel_calls=3, sample_batch_size=batch_size,
num_steps=2).prefetch(3)
iterator = iter(dataset)
# Training loop
returns = []
train_losses = []
for iteration in range(num_iterations):
# Collect a few steps and save to the replay buffer
for _ in range(4):
collect_step(train_env, agent.policy, replay_buffer)
# Sample a batch of data from the buffer and update the agent
experience, unused_info = next(iterator)
train_loss = agent.train(experience).loss
train_losses.append(train_loss)
step = agent.train_step_counter.numpy()
if step % eval_interval == 0:
avg_return = compute_avg_return(eval_env, agent.policy)
print(f'step = {step}: Average Return = {avg_return}')
returns.append(avg_return)
# Plot results
plt.figure(figsize=(12, 4))
plt.subplot(121)
plt.plot(range(0, num_iterations, eval_interval), returns)
plt.title('Average Return')
plt.xlabel('Iterations')
plt.ylabel('Average Return')
plt.subplot(122)
plt.plot(range(num_iterations), train_losses)
plt.title('Training Loss')
plt.xlabel('Iterations')
plt.ylabel('Loss')
plt.tight_layout()
plt.show()
This example demonstrates the complete workflow of using TensorFlow environments for reinforcement learning:
- Setting up the environment
- Creating a DQN agent with a Q-network
- Collecting experiences in a replay buffer
- Training the agent by sampling from the buffer
- Evaluating the agent's performance
Common Environment Types in TensorFlow
TF-Agents comes with a variety of environment suites that you can use:
from tf_agents.environments import suite_gym # OpenAI Gym
from tf_agents.environments import suite_atari # Atari games
from tf_agents.environments import suite_dm_control # DeepMind Control Suite
from tf_agents.environments import suite_mario # Super Mario Bros
# Examples of loading environments from different suites:
gym_env = suite_gym.load('CartPole-v1')
atari_env = suite_atari.load('PongNoFrameskip-v4')
dm_env = suite_dm_control.load('cartpole', 'balance')
Summary
In this tutorial, we've covered the fundamentals of working with environments in TensorFlow for reinforcement learning:
- Environment Interfaces: Understanding the core methods like
reset()
,step()
, and environment specifications - Using Pre-built Environments: Working with OpenAI Gym and other environment suites
- Creating Custom Environments: Building your own environments by implementing the environment interface
- Vectorized Environments: Running multiple environments in parallel for more efficient training
- Environment Wrappers: Modifying environment behavior with wrappers
- Practical Application: Training a DQN agent on a standard environment
By mastering these concepts, you'll be well-equipped to develop and train reinforcement learning agents using TensorFlow on a variety of problems.
Additional Resources
- TF-Agents Documentation
- OpenAI Gym Documentation
- DeepMind Control Suite Paper
- TensorFlow RL Tutorial
Exercises
- Modify the
SimpleGridWorldEnv
to include obstacles that the agent must avoid. - Create a custom wrapper that normalizes observations to have a mean of 0 and a standard deviation of 1.
- Implement a different reward function for the CartPole environment that encourages the pole to stay perfectly vertical.
- Train a reinforcement learning agent on a different Gym environment like
LunarLander-v2
. - Extend the grid world example to a larger grid size and visualize the agent's path.
If you spot any mistakes on this website, please let me know at feedback@compilenrun.com. I’d greatly appreciate your feedback! :)