TensorFlow RL Basics
Reinforcement Learning (RL) represents one of the most exciting paradigms in machine learning, mimicking how humans learn through trial and error. In this tutorial, we'll explore how to implement basic reinforcement learning algorithms using TensorFlow, Google's powerful open-source machine learning library.
Introduction to Reinforcement Learning
Reinforcement Learning is a type of machine learning where an agent learns to make decisions by taking actions in an environment to maximize some notion of cumulative reward. Unlike supervised learning, the agent is not told which actions to take but must discover which actions yield the most reward through exploration.
Key Components of RL
- Agent: The learner or decision-maker
- Environment: The world the agent interacts with
- State: The current situation of the agent
- Action: What the agent can do
- Reward: Feedback from the environment
- Policy: Strategy the agent employs to decide actions
- Value Function: Prediction of future rewards
Setting Up Your Environment
Before diving into code, let's set up our TensorFlow environment:
# Install required packages
# !pip install tensorflow tensorflow-probability gym matplotlib
import tensorflow as tf
import tensorflow_probability as tfp
import numpy as np
import gym
import matplotlib.pyplot as plt
print(f"TensorFlow version: {tf.__version__}")
Expected output:
TensorFlow version: 2.11.0
Creating a Simple Environment
For simplicity, we'll use OpenAI Gym, which provides a standard API for reinforcement learning environments. Let's start with a simple environment called "CartPole-v1":
# Create the environment
env = gym.make('CartPole-v1')
# Let's examine the environment
print(f"Action space: {env.action_space}")
print(f"Observation space: {env.observation_space}")
# Reset the environment and get the initial state
state = env.reset()
print(f"Initial state: {state}")
Expected output:
Action space: Discrete(2)
Observation space: Box(-3.4028234663852886e+38, 3.4028234663852886e+38, (4,), float32)
Initial state: [ 0.03073904 -0.00145001 -0.03240252 -0.00474306]
The CartPole environment consists of a pole attached to a cart. The goal is to prevent the pole from falling by moving the cart left or right.
Building a Simple Q-Learning Agent
Let's implement one of the simplest RL algorithms: Q-Learning. We'll discretize the continuous state space to make it more manageable:
class QLearningAgent:
def __init__(self, env, learning_rate=0.1, discount_factor=0.95, exploration_rate=1.0, exploration_decay=0.995):
self.env = env
self.learning_rate = learning_rate
self.discount_factor = discount_factor
self.exploration_rate = exploration_rate
self.exploration_decay = exploration_decay
# Discretize the state space for simplicity
self.bins = [
np.linspace(-2.4, 2.4, 10), # Cart position
np.linspace(-3.0, 3.0, 10), # Cart velocity
np.linspace(-0.5, 0.5, 10), # Pole angle
np.linspace(-2.0, 2.0, 10) # Pole angular velocity
]
# Initialize Q-table with zeros
self.q_table = np.zeros((10, 10, 10, 10, env.action_space.n))
def discretize_state(self, state):
"""Convert continuous state to discrete state index tuple"""
discretized = []
for i, s in enumerate(state):
discretized.append(np.digitize(s, self.bins[i]) - 1)
return tuple(discretized)
def select_action(self, state):
"""Select action using epsilon-greedy strategy"""
if np.random.random() < self.exploration_rate:
# Explore: choose random action
return self.env.action_space.sample()
else:
# Exploit: choose best action based on Q-values
return np.argmax(self.q_table[state])
def update_q_table(self, state, action, reward, next_state, done):
"""Update Q-values using the Q-learning update rule"""
current_q = self.q_table[state][action]
if done:
max_next_q = 0
else:
max_next_q = np.max(self.q_table[next_state])
new_q = current_q + self.learning_rate * (reward + self.discount_factor * max_next_q - current_q)
self.q_table[state][action] = new_q
def decay_exploration(self):
"""Decay exploration rate to gradually shift from exploration to exploitation"""
self.exploration_rate *= self.exploration_decay
self.exploration_rate = max(0.01, self.exploration_rate) # Don't let it go below 1%
Now, let's train our agent:
def train_agent(agent, episodes=1000):
"""Train the Q-learning agent"""
rewards = []
for episode in range(episodes):
state = agent.env.reset()
discretized_state = agent.discretize_state(state)
episode_reward = 0
done = False
while not done:
action = agent.select_action(discretized_state)
next_state, reward, done, _ = agent.env.step(action)
discretized_next_state = agent.discretize_state(next_state)
agent.update_q_table(discretized_state, action, reward, discretized_next_state, done)
discretized_state = discretized_next_state
episode_reward += reward
agent.decay_exploration()
rewards.append(episode_reward)
if episode % 50 == 0:
print(f"Episode {episode}, Reward: {episode_reward}, Exploration rate: {agent.exploration_rate:.4f}")
return rewards
# Create and train agent
env = gym.make('CartPole-v1')
agent = QLearningAgent(env)
rewards = train_agent(agent, episodes=500)
# Plot learning progress
plt.figure(figsize=(10, 6))
plt.plot(rewards)
plt.xlabel('Episode')
plt.ylabel('Total Reward')
plt.title('Learning Progress')
plt.savefig('q_learning_progress.png')
plt.show()
Expected output (will vary due to randomness):
Episode 0, Reward: 14.0, Exploration rate: 0.9950
Episode 50, Reward: 27.0, Exploration rate: 0.7778
Episode 100, Reward: 42.0, Exploration rate: 0.6063
Episode 150, Reward: 65.0, Exploration rate: 0.4725
...
Episode 450, Reward: 195.0, Exploration rate: 0.1053
Building a Policy Gradient Agent with TensorFlow
Now let's implement a more advanced RL algorithm using TensorFlow: a Policy Gradient agent. This method directly optimizes the policy rather than estimating value functions:
class PolicyGradientAgent:
def __init__(self, env, learning_rate=0.01):
self.env = env
self.state_size = env.observation_space.shape[0]
self.action_size = env.action_space.n
# Build the policy network
self.model = self._build_model(learning_rate)
# Store episode data
self.states = []
self.actions = []
self.rewards = []
def _build_model(self, learning_rate):
"""Build the policy network using TensorFlow"""
model = tf.keras.Sequential([
tf.keras.layers.Dense(24, input_shape=(self.state_size,), activation='relu'),
tf.keras.layers.Dense(24, activation='relu'),
tf.keras.layers.Dense(self.action_size, activation='softmax')
])
optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)
model.compile(optimizer=optimizer, loss='categorical_crossentropy')
return model
def select_action(self, state):
"""Select action based on the policy network"""
state = np.reshape(state, [1, self.state_size])
action_probs = self.model.predict(state, verbose=0)[0]
return np.random.choice(self.action_size, p=action_probs)
def store_transition(self, state, action, reward):
"""Store state, action, reward for this episode"""
self.states.append(state)
self.actions.append(action)
self.rewards.append(reward)
def train(self):
"""Train the policy network using the policy gradient method"""
episode_length = len(self.rewards)
# Calculate discounted rewards
discounted_rewards = np.zeros_like(self.rewards, dtype=np.float32)
running_reward = 0
for t in reversed(range(episode_length)):
running_reward = running_reward * 0.95 + self.rewards[t]
discounted_rewards[t] = running_reward
# Normalize rewards
discounted_rewards = (discounted_rewards - np.mean(discounted_rewards)) / (np.std(discounted_rewards) + 1e-7)
# Prepare training data
states = np.vstack(self.states)
actions_one_hot = tf.keras.utils.to_categorical(self.actions, self.action_size)
# Custom training step
with tf.GradientTape() as tape:
# Forward pass
logits = self.model(states)
# Calculate loss - negative log likelihood weighted by rewards
neg_log_prob = tf.reduce_sum(
-tf.math.log(tf.clip_by_value(logits, 1e-10, 1.0)) * actions_one_hot,
axis=1
)
loss = tf.reduce_mean(neg_log_prob * discounted_rewards)
# Get gradients and apply them
grads = tape.gradient(loss, self.model.trainable_variables)
self.model.optimizer.apply_gradients(zip(grads, self.model.trainable_variables))
# Clear episode data
self.states = []
self.actions = []
self.rewards = []
return loss.numpy()
Let's train our Policy Gradient agent:
def train_policy_gradient(agent, episodes=1000, max_steps=500):
"""Train the policy gradient agent"""
rewards = []
for episode in range(episodes):
state = agent.env.reset()
episode_reward = 0
for step in range(max_steps):
action = agent.select_action(state)
next_state, reward, done, _ = agent.env.step(action)
agent.store_transition(state, action, reward)
state = next_state
episode_reward += reward
if done:
break
loss = agent.train()
rewards.append(episode_reward)
if episode % 10 == 0:
print(f"Episode: {episode}, Reward: {episode_reward}, Loss: {loss:.6f}")
return rewards
# Create and train Policy Gradient agent
env = gym.make('CartPole-v1')
pg_agent = PolicyGradientAgent(env)
pg_rewards = train_policy_gradient(pg_agent, episodes=200)
# Plot learning progress
plt.figure(figsize=(10, 6))
plt.plot(pg_rewards)
plt.xlabel('Episode')
plt.ylabel('Total Reward')
plt.title('Policy Gradient Learning Progress')
plt.savefig('pg_learning_progress.png')
plt.show()
Expected output (will vary):
Episode: 0, Reward: 24.0, Loss: 0.693147
Episode: 10, Reward: 29.0, Loss: 0.523041
Episode: 20, Reward: 42.0, Loss: 0.348721
...
Episode: 190, Reward: 187.0, Loss: 0.024516
Visualizing Agent Performance
Let's visualize how our trained agent performs:
def visualize_agent(agent, env_name, episodes=5, max_steps=500):
"""Visualize agent performance"""
env = gym.make(env_name, render_mode='human')
for episode in range(episodes):
state = env.reset()
total_reward = 0
for step in range(max_steps):
# For the Q-learning agent
if hasattr(agent, 'discretize_state'):
discretized_state = agent.discretize_state(state)
action = agent.select_action(discretized_state)
else: # For the Policy Gradient agent
action = agent.select_action(state)
state, reward, done, _ = env.step(action)
total_reward += reward
if done:
break
print(f"Episode {episode + 1}: Total reward: {total_reward}")
env.close()
# To visualize, uncomment these lines when running in an environment that supports rendering:
# print("Q-Learning Agent Performance:")
# visualize_agent(agent, 'CartPole-v1')
#
# print("Policy Gradient Agent Performance:")
# visualize_agent(pg_agent, 'CartPole-v1')
Real-World Applications
Reinforcement Learning with TensorFlow has numerous real-world applications:
- Robotics: Training robots to perform complex tasks through trial and error
- Game AI: Creating intelligent game characters that adapt to player behavior
- Autonomous Vehicles: Teaching cars to navigate and make driving decisions
- Resource Management: Optimizing resource allocation in data centers
- Financial Trading: Developing automated trading strategies
Example: Training a Trading Agent
Here's a simplified example of how you might structure a RL trading agent:
class SimpleTradingEnvironment:
def __init__(self, price_data):
self.price_data = price_data
self.current_step = 0
self.action_space = gym.spaces.Discrete(3) # Buy, Hold, Sell
self.observation_space = gym.spaces.Box(
low=-np.inf, high=np.inf, shape=(5,), dtype=np.float32
)
self.portfolio_value = 10000
self.shares_held = 0
def reset(self):
self.current_step = 0
self.portfolio_value = 10000
self.shares_held = 0
return self._next_observation()
def _next_observation(self):
# Create observation with price data and portfolio info
obs = np.array([
self.price_data[self.current_step], # Current price
self.price_data[self.current_step-1] if self.current_step > 0 else 0, # Previous price
self.price_data[self.current_step-2] if self.current_step > 1 else 0, # Price 2 steps ago
self.portfolio_value,
self.shares_held
])
return obs
def step(self, action):
# Execute action (0=Buy, 1=Hold, 2=Sell)
current_price = self.price_data[self.current_step]
if action == 0: # Buy
shares_to_buy = int(self.portfolio_value * 0.2 / current_price)
self.shares_held += shares_to_buy
self.portfolio_value -= shares_to_buy * current_price
elif action == 2: # Sell
self.portfolio_value += self.shares_held * current_price
self.shares_held = 0
# Move to next time step
self.current_step += 1
# Calculate reward (change in portfolio value)
new_portfolio_value = self.portfolio_value + self.shares_held * self.price_data[self.current_step]
reward = new_portfolio_value - 10000 # Reward relative to starting value
# Check if done
done = self.current_step >= len(self.price_data) - 1
# Get next observation
obs = self._next_observation()
return obs, reward, done, {}
Summary
In this tutorial, we've covered the basics of Reinforcement Learning using TensorFlow:
- We introduced the core concepts of Reinforcement Learning
- We implemented a simple Q-Learning agent for discrete state spaces
- We built a more advanced Policy Gradient agent using TensorFlow
- We visualized our agents' performance
- We discussed real-world applications of RL
This is just the beginning of what you can do with TensorFlow and Reinforcement Learning. As you progress, you can explore more advanced algorithms like Deep Q-Networks (DQN), Proximal Policy Optimization (PPO), or Soft Actor-Critic (SAC).
Additional Resources
Exercises for Practice
- CartPole Mastery: Modify the Q-learning agent to consistently achieve the maximum score (500) in the CartPole environment.
- Different Environment: Apply the Policy Gradient agent to a different environment like MountainCar-v0 or Acrobot-v1.
- Hyperparameter Tuning: Experiment with different learning rates, network architectures, and discount factors.
- Custom Environment: Create your own custom environment and train an agent on it.
- Implement DQN: Extend your learning by implementing a Deep Q-Network algorithm.
Happy Reinforcement Learning!
If you spot any mistakes on this website, please let me know at feedback@compilenrun.com. I’d greatly appreciate your feedback! :)