AsyncVectorEnv¶
- class gymnasium.vector.AsyncVectorEnv(env_fns: Sequence[Callable[[], Env]], shared_memory: bool = True, copy: bool = True, context: str | None = None, daemon: bool = True, worker: Callable[[int, Callable[[], Env], Connection, Connection, bool, Queue], None] | None = None, observation_mode: str | Space = 'same', autoreset_mode: str | AutoresetMode = AutoresetMode.NEXT_STEP)[source]¶
向量化環境,並行運行多個環境。
它使用
multiprocessing
進程和管道進行通信。範例
>>> import gymnasium as gym >>> envs = gym.make_vec("Pendulum-v1", num_envs=2, vectorization_mode="async") >>> envs AsyncVectorEnv(Pendulum-v1, num_envs=2) >>> envs = gym.vector.AsyncVectorEnv([ ... lambda: gym.make("Pendulum-v1", g=9.81), ... lambda: gym.make("Pendulum-v1", g=1.62) ... ]) >>> envs AsyncVectorEnv(num_envs=2) >>> observations, infos = envs.reset(seed=42) >>> observations array([[-0.14995256, 0.9886932 , -0.12224312], [ 0.5760367 , 0.8174238 , -0.91244936]], dtype=float32) >>> infos {} >>> _ = envs.action_space.seed(123) >>> observations, rewards, terminations, truncations, infos = envs.step(envs.action_space.sample()) >>> observations array([[-0.1851753 , 0.98270553, 0.714599 ], [ 0.6193494 , 0.7851154 , -1.0808398 ]], dtype=float32) >>> rewards array([-2.96495728, -1.00214607]) >>> terminations array([False, False]) >>> truncations array([False, False]) >>> infos {}
- 參數:
env_fns – 創建環境的函數。
shared_memory – 如果為
True
,則來自工作進程的觀測結果將通過共享變量傳回。如果觀測結果很大(例如圖像),這可以提高效率。copy – 如果為
True
,則AsyncVectorEnv.reset()
和AsyncVectorEnv.step()
方法會返回觀測結果的副本。context – multiprocessing 的上下文。如果為
None
,則使用預設上下文。daemon – 如果為
True
,則子進程會啟用daemon
標誌;也就是說,如果主進程退出,它們將退出。但是,daemon=True
會阻止子進程產生子進程,因此對於某些環境,您可能希望將其設置為False
。worker – 如果設定,則在子進程中使用該工作進程而不是預設的工作進程。這對於覆蓋某些內部向量環境邏輯很有用,例如,如何處理終止或截斷時的重置。
observation_mode – 定義環境觀測空間應如何批次處理。‘same’ 定義應該有
n
個相同空間的副本。‘different’ 定義可以有多個具有不同參數的觀測空間,但需要相同的形狀和 dtype,警告,可能會引發意外錯誤。傳遞Tuple[Space, Space]
物件允許定義自訂的single_observation_space
和observation_space
,警告,可能會引發意外錯誤。autoreset_mode – 使用的自動重置模式,請參閱 https://farama.org/Vector-Autoreset-Mode 以獲取更多資訊。
警告
worker 是一個進階模式選項。它提供了高度的靈活性和很高的自找麻煩的機會;因此,如果您要編寫自己的 worker,建議從
_worker
(或_async_worker
) 方法的程式碼開始,並添加變更。- 引發:
RuntimeError – 如果某些子環境的觀測空間與 observation_space(或預設情況下,第一個子環境的觀測空間)不符。
ValueError – 如果 observation_space 是自訂空間(即不是 Gym 中的預設空間,例如 gymnasium.spaces.Box、gymnasium.spaces.Discrete 或 gymnasium.spaces.Dict)且 shared_memory 為 True。
- reset(*, seed: int | list[int] | None = None, options: dict[str, Any] | None = None) tuple[ObsType, dict[str, Any]] [source]¶
並行重置所有子環境,並返回串聯的觀測結果和資訊批次。
- 參數:
seed – 環境重置種子
options – 是否返回選項
- 返回:
來自向量化環境的一批觀測結果和資訊。
- step(actions: ActType) tuple[ObsType, ArrayType, ArrayType, ArrayType, dict[str, Any]] [source]¶
為每個並行環境採取一個動作。
- 參數:
actions –
action_space
動作批次的元素。- 返回:
Batch of (observations, rewards, terminations, truncations, infos)
- close(**kwargs: Any)¶
關閉所有並行環境並釋放資源。
它還關閉所有現有的圖像查看器,然後調用
close_extras()
並將closed
設置為True
。警告
此函數本身不關閉環境,應在
close_extras()
中處理。這對於同步和異步向量化環境都是通用的。注意
這將在垃圾回收或程式結束時自動調用。
- 參數:
**kwargs – 傳遞給
close_extras()
的關鍵字參數
- call(name: str, *args: Any, **kwargs: Any) tuple[Any, ...] [source]¶
使用 args 和 kwargs 從每個並行環境調用一個方法。
- 參數:
name (str) – 要調用的方法或屬性的名稱。
*args – 應用於方法調用的位置參數。
**kwargs – 應用於方法調用的關鍵字參數。
- 返回:
每個環境的方法或屬性的個別調用結果列表。
- get_attr(name: str) tuple[Any, ...] [source]¶
從每個並行環境獲取一個屬性。
- 參數:
name (str) – 要從每個個別環境獲取的屬性的名稱。
- 返回:
具有名稱的屬性
- set_attr(name: str, values: list[Any] | tuple[Any] | object)[source]¶
設置子環境的屬性。
- 參數:
name – 要在每個個別環境中設置的屬性的名稱。
values – 要設置的屬性的值。如果
values
是列表或元組,則它對應於每個個別環境的值,否則為所有環境設置單個值。
- 引發:
ValueError – 值必須是列表或元組,長度等於環境數量。
AlreadyPendingCallError – 在等待未決調用完成時調用
set_attr()
。
附加方法¶
- property AsyncVectorEnv.np_random: tuple[Generator, ...]¶
返回包裝環境的 numpy 隨機數生成器元組。
- property AsyncVectorEnv.np_random_seed: tuple[int, ...]¶
返回所有包裝環境的 np_random 種子元組。