使用 REINFORCE 訓練 Mujoco

agent-environment-diagram
本教學有 2 個目的
  1. 了解如何從頭開始實作 REINFORCE [1] 以解決 Mujoco 的 InvertedPendulum-v4

  2. 使用 Gymnasium 的 v0.26+ step() 函數實作深度強化學習演算法

我們將使用REINFORCE,最早的策略梯度方法之一。與先承擔學習價值函數,然後從中推導出策略的負擔不同,REINFORCE 直接優化策略。換句話說,它經過訓練以最大化蒙地卡羅回報的機率。稍後會詳細介紹。

倒單擺是 Mujoco 的 cartpole,但現在由 Mujoco 物理模擬器驅動 - 這允許更複雜的實驗(例如改變重力的影響)。這個環境涉及一個可以線性移動的小車,其一端固定著一根桿子,另一端是自由的。小車可以向左或向右推動,目標是通過對小車施力來平衡小車頂部的桿子。有關環境的更多資訊,請參閱 https://gymnasium.dev.org.tw/environments/mujoco/inverted_pendulum/

訓練目標:平衡小車頂部的桿子(倒單擺)

動作:智能體採用 1D 向量作為動作。動作空間是連續的 (action)[-3, 3] 中,其中 action 代表施加在小車上的數值力(大小代表力的大小,符號代表方向)

方法:我們使用 PyTorch 從頭開始編寫 REINFORCE,以訓練神經網路策略來掌握倒單擺。

Gymnasium v0.26+ Env.step() 函數的說明

env.step(A) 允許我們在當前環境 'env' 中執行動作 'A'。然後環境執行動作並傳回五個變數

  • next_obs:這是智能體在執行動作後將收到的觀察。

  • reward:這是智能體在執行動作後將收到的獎勵。

  • terminated:這是一個布林變數,指示環境是否已終止。

  • truncated:這也是一個布林變數,指示 episode 是否因提前截斷而結束,即達到時間限制。

  • info:這是一個字典,可能包含有關環境的其他資訊。

from __future__ import annotations

import random

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
import torch
import torch.nn as nn
from torch.distributions.normal import Normal

import gymnasium as gym


plt.rcParams["figure.figsize"] = (10, 5)

策略網路

../../../_images/reinforce_invpend_gym_v26_fig2.png

我們先建立一個策略,智能體將使用 REINFORCE 學習該策略。策略是從當前環境觀察到要採取的動作的機率分佈的映射。本教學中使用的策略由神經網路參數化。它由 2 個線性層組成,這兩個線性層在預測的平均值和標準差之間共享。此外,單個線性層用於估計平均值和標準差。nn.Tanh 用作隱藏層之間的非線性。以下函數估計常態分佈的平均值和標準差,從中採樣動作。因此,預期策略會學習適當的權重,以根據當前觀察輸出平均值和標準差。

class Policy_Network(nn.Module):
    """Parametrized Policy Network."""

    def __init__(self, obs_space_dims: int, action_space_dims: int):
        """Initializes a neural network that estimates the mean and standard deviation
         of a normal distribution from which an action is sampled from.

        Args:
            obs_space_dims: Dimension of the observation space
            action_space_dims: Dimension of the action space
        """
        super().__init__()

        hidden_space1 = 16  # Nothing special with 16, feel free to change
        hidden_space2 = 32  # Nothing special with 32, feel free to change

        # Shared Network
        self.shared_net = nn.Sequential(
            nn.Linear(obs_space_dims, hidden_space1),
            nn.Tanh(),
            nn.Linear(hidden_space1, hidden_space2),
            nn.Tanh(),
        )

        # Policy Mean specific Linear Layer
        self.policy_mean_net = nn.Sequential(
            nn.Linear(hidden_space2, action_space_dims)
        )

        # Policy Std Dev specific Linear Layer
        self.policy_stddev_net = nn.Sequential(
            nn.Linear(hidden_space2, action_space_dims)
        )

    def forward(self, x: torch.Tensor) -> tuple[torch.Tensor, torch.Tensor]:
        """Conditioned on the observation, returns the mean and standard deviation
         of a normal distribution from which an action is sampled from.

        Args:
            x: Observation from the environment

        Returns:
            action_means: predicted mean of the normal distribution
            action_stddevs: predicted standard deviation of the normal distribution
        """
        shared_features = self.shared_net(x.float())

        action_means = self.policy_mean_net(shared_features)
        action_stddevs = torch.log(
            1 + torch.exp(self.policy_stddev_net(shared_features))
        )

        return action_means, action_stddevs

建立智能體

../../../_images/reinforce_invpend_gym_v26_fig3.jpeg

現在我們已經完成了策略的建立,讓我們開發 REINFORCE,它賦予了策略網路生命。REINFORCE 的演算法可以在上面找到。如前所述,REINFORCE 旨在最大化蒙地卡羅回報。

趣聞:REINFORCE 是 “ ‘RE’ward ‘I’ncrement ‘N’on-negative ‘F’actor times ‘O’ffset ‘R’einforcement times ‘C’haracteristic ‘E’ligibility” 的首字母縮寫

注意:超參數的選擇是為了訓練一個表現尚可的智能體。沒有進行廣泛的超參數調整。

class REINFORCE:
    """REINFORCE algorithm."""

    def __init__(self, obs_space_dims: int, action_space_dims: int):
        """Initializes an agent that learns a policy via REINFORCE algorithm [1]
        to solve the task at hand (Inverted Pendulum v4).

        Args:
            obs_space_dims: Dimension of the observation space
            action_space_dims: Dimension of the action space
        """

        # Hyperparameters
        self.learning_rate = 1e-4  # Learning rate for policy optimization
        self.gamma = 0.99  # Discount factor
        self.eps = 1e-6  # small number for mathematical stability

        self.probs = []  # Stores probability values of the sampled action
        self.rewards = []  # Stores the corresponding rewards

        self.net = Policy_Network(obs_space_dims, action_space_dims)
        self.optimizer = torch.optim.AdamW(self.net.parameters(), lr=self.learning_rate)

    def sample_action(self, state: np.ndarray) -> float:
        """Returns an action, conditioned on the policy and observation.

        Args:
            state: Observation from the environment

        Returns:
            action: Action to be performed
        """
        state = torch.tensor(np.array([state]))
        action_means, action_stddevs = self.net(state)

        # create a normal distribution from the predicted
        #   mean and standard deviation and sample an action
        distrib = Normal(action_means[0] + self.eps, action_stddevs[0] + self.eps)
        action = distrib.sample()
        prob = distrib.log_prob(action)

        action = action.numpy()

        self.probs.append(prob)

        return action

    def update(self):
        """Updates the policy network's weights."""
        running_g = 0
        gs = []

        # Discounted return (backwards) - [::-1] will return an array in reverse
        for R in self.rewards[::-1]:
            running_g = R + self.gamma * running_g
            gs.insert(0, running_g)

        deltas = torch.tensor(gs)

        log_probs = torch.stack(self.probs)

        # Calculate the mean of log probabilities for all actions in the episode
        log_prob_mean = log_probs.mean()

        # Update the loss with the mean log probability and deltas
        # Now, we compute the correct total loss by taking the sum of the element-wise products.
        loss = -torch.sum(log_prob_mean * deltas)

        # Update the policy network
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        # Empty / zero out all episode-centric/related variables
        self.probs = []
        self.rewards = []

現在讓我們使用 REINFORCE 訓練策略,以掌握倒單擺的任務。

以下是訓練程序的概述

for seed in random seeds

重新初始化智能體

for episode in range of max number of episodes
直到 episode 完成

根據當前觀察採樣動作

採取動作並接收獎勵和下一個觀察

儲存採取的動作、其機率和觀察到的獎勵

更新策略

注意:深度 RL 在許多常見用例中,關於隨機種子相當脆弱 (https://spinningup.openai.com/en/latest/spinningup/spinningup.html)。因此,測試各種種子非常重要,我們將會這樣做。

# Create and wrap the environment
env = gym.make("InvertedPendulum-v4")
wrapped_env = gym.wrappers.RecordEpisodeStatistics(env, 50)  # Records episode-reward

total_num_episodes = int(5e3)  # Total number of episodes
# Observation-space of InvertedPendulum-v4 (4)
obs_space_dims = env.observation_space.shape[0]
# Action-space of InvertedPendulum-v4 (1)
action_space_dims = env.action_space.shape[0]
rewards_over_seeds = []

for seed in [1, 2, 3, 5, 8]:  # Fibonacci seeds
    # set seed
    torch.manual_seed(seed)
    random.seed(seed)
    np.random.seed(seed)

    # Reinitialize agent every seed
    agent = REINFORCE(obs_space_dims, action_space_dims)
    reward_over_episodes = []

    for episode in range(total_num_episodes):
        # gymnasium v26 requires users to set seed while resetting the environment
        obs, info = wrapped_env.reset(seed=seed)

        done = False
        while not done:
            action = agent.sample_action(obs)

            # Step return type - `tuple[ObsType, SupportsFloat, bool, bool, dict[str, Any]]`
            # These represent the next observation, the reward from the step,
            # if the episode is terminated, if the episode is truncated and
            # additional info from the step
            obs, reward, terminated, truncated, info = wrapped_env.step(action)
            agent.rewards.append(reward)

            # End the episode when either truncated or terminated is true
            #  - truncated: The episode duration reaches max number of timesteps
            #  - terminated: Any of the state space values is no longer finite.
            done = terminated or truncated

        reward_over_episodes.append(wrapped_env.return_queue[-1])
        agent.update()

        if episode % 1000 == 0:
            avg_reward = int(np.mean(wrapped_env.return_queue))
            print("Episode:", episode, "Average Reward:", avg_reward)

    rewards_over_seeds.append(reward_over_episodes)

繪製學習曲線

rewards_to_plot = [[reward[0] for reward in rewards] for rewards in rewards_over_seeds]
df1 = pd.DataFrame(rewards_to_plot).melt()
df1.rename(columns={"variable": "episodes", "value": "reward"}, inplace=True)
sns.set(style="darkgrid", context="talk", palette="rainbow")
sns.lineplot(x="episodes", y="reward", data=df1).set(
    title="REINFORCE for InvertedPendulum-v4"
)
plt.show()
../../../_images/reinforce_invpend_gym_v26_fig4.png

作者:Siddarth Chandrasekar

許可證:MIT 許可證

參考文獻

[1] Williams, Ronald J.. “Simple statistical gradient-following algorithms for connectionist reinforcement learning.” Machine Learning 8 (2004): 229-256.