Diese Seite wurde von der Cloud Translation API übersetzt.
Switch to English

Treiber

Ansicht auf TensorFlow.org In Google Colab ausführen Quelle auf GitHub anzeigen Notizbuch herunterladen

Einführung

Ein gängiges Muster beim verstärkten Lernen besteht darin, eine Richtlinie in einer Umgebung für eine bestimmte Anzahl von Schritten oder Episoden auszuführen. Dies geschieht beispielsweise während der Datenerfassung, Auswertung und Erstellung eines Videos des Agenten.

Während das Schreiben in Python relativ einfach ist, ist das Schreiben und Debuggen in TensorFlow viel komplexer, da es sich um tf.while Schleifen, tf.cond und tf.control_dependencies . Daher abstrahieren wir diesen Begriff einer Run-Schleife in eine Klasse namens driver und bieten gut getestete Implementierungen sowohl in Python als auch in TensorFlow.

Darüber hinaus werden die Daten, auf die der Treiber bei jedem Schritt stößt, in einem benannten Tupel namens Trajectory gespeichert und an eine Reihe von Beobachtern wie Wiedergabepuffer und Metriken gesendet. Diese Daten umfassen die Beobachtung aus der Umgebung, die von der Richtlinie empfohlenen Maßnahmen, die erhaltene Belohnung, die Art des aktuellen und den nächsten Schritt usw.

Installieren

Wenn Sie noch keine tf-Agenten oder ein Fitnessstudio installiert haben, führen Sie Folgendes aus:

pip install -q tf-agents
pip install -q gym
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import tensorflow as tf


from tf_agents.environments import suite_gym
from tf_agents.environments import tf_py_environment
from tf_agents.policies import random_py_policy
from tf_agents.policies import random_tf_policy
from tf_agents.metrics import py_metrics
from tf_agents.metrics import tf_metrics
from tf_agents.drivers import py_driver
from tf_agents.drivers import dynamic_episode_driver

tf.compat.v1.enable_v2_behavior()

Python-Treiber

Die PyDriver Klasse benötigt eine Python-Umgebung, eine Python-Richtlinie und eine Liste von Beobachtern, die bei jedem Schritt aktualisiert werden müssen. Die Hauptmethode ist run() , mit der die Umgebung mithilfe von Aktionen aus der Richtlinie max_steps , bis mindestens eines der folgenden Beendigungskriterien erfüllt ist: Die Anzahl der Schritte erreicht max_steps oder die Anzahl der Episoden erreicht max_episodes .

Die Implementierung ist ungefähr wie folgt:

class PyDriver(object):

  def __init__(self, env, policy, observers, max_steps=1, max_episodes=1):
    self._env = env
    self._policy = policy
    self._observers = observers or []
    self._max_steps = max_steps or np.inf
    self._max_episodes = max_episodes or np.inf

  def run(self, time_step, policy_state=()):
    num_steps = 0
    num_episodes = 0
    while num_steps < self._max_steps and num_episodes < self._max_episodes:

      # Compute an action using the policy for the given time_step
      action_step = self._policy.action(time_step, policy_state)

      # Apply the action to the environment and get the next step
      next_time_step = self._env.step(action_step.action)

      # Package information into a trajectory
      traj = trajectory.Trajectory(
         time_step.step_type,
         time_step.observation,
         action_step.action,
         action_step.info,
         next_time_step.step_type,
         next_time_step.reward,
         next_time_step.discount)

      for observer in self._observers:
        observer(traj)

      # Update statistics to check termination
      num_episodes += np.sum(traj.is_last())
      num_steps += np.sum(~traj.is_boundary())

      time_step = next_time_step
      policy_state = action_step.state

    return time_step, policy_state

Lassen Sie uns nun das Beispiel einer zufälligen Richtlinie in der CartPole-Umgebung durchgehen, die Ergebnisse in einem Wiedergabepuffer speichern und einige Metriken berechnen.

env = suite_gym.load('CartPole-v0')
policy = random_py_policy.RandomPyPolicy(time_step_spec=env.time_step_spec(), 
                                         action_spec=env.action_spec())
replay_buffer = []
metric = py_metrics.AverageReturnMetric()
observers = [replay_buffer.append, metric]
driver = py_driver.PyDriver(
    env, policy, observers, max_steps=20, max_episodes=1)

initial_time_step = env.reset()
final_time_step, _ = driver.run(initial_time_step)

print('Replay Buffer:')
for traj in replay_buffer:
  print(traj)

print('Average Return: ', metric.result())
Replay Buffer:
Trajectory(step_type=array(0, dtype=int32), observation=array([-0.02733938, -0.00415951,  0.00625636,  0.00037665], dtype=float32), action=array(0), policy_info=(), next_step_type=array(1, dtype=int32), reward=array(1., dtype=float32), discount=array(1., dtype=float32))
Trajectory(step_type=array(1, dtype=int32), observation=array([-0.02742257, -0.19937062,  0.0062639 ,  0.29502696], dtype=float32), action=array(0), policy_info=(), next_step_type=array(1, dtype=int32), reward=array(1., dtype=float32), discount=array(1., dtype=float32))
Trajectory(step_type=array(1, dtype=int32), observation=array([-0.03140998, -0.39458132,  0.01216444,  0.5896788 ], dtype=float32), action=array(0), policy_info=(), next_step_type=array(1, dtype=int32), reward=array(1., dtype=float32), discount=array(1., dtype=float32))
Trajectory(step_type=array(1, dtype=int32), observation=array([-0.03930161, -0.58987147,  0.02395801,  0.8861686 ], dtype=float32), action=array(1), policy_info=(), next_step_type=array(1, dtype=int32), reward=array(1., dtype=float32), discount=array(1., dtype=float32))
Trajectory(step_type=array(1, dtype=int32), observation=array([-0.05109904, -0.3950828 ,  0.04168138,  0.6011125 ], dtype=float32), action=array(0), policy_info=(), next_step_type=array(1, dtype=int32), reward=array(1., dtype=float32), discount=array(1., dtype=float32))
Trajectory(step_type=array(1, dtype=int32), observation=array([-0.05900069, -0.59076226,  0.05370363,  0.90662754], dtype=float32), action=array(0), policy_info=(), next_step_type=array(1, dtype=int32), reward=array(1., dtype=float32), discount=array(1., dtype=float32))
Trajectory(step_type=array(1, dtype=int32), observation=array([-0.07081594, -0.78656864,  0.07183618,  1.2156949 ], dtype=float32), action=array(0), policy_info=(), next_step_type=array(1, dtype=int32), reward=array(1., dtype=float32), discount=array(1., dtype=float32))
Trajectory(step_type=array(1, dtype=int32), observation=array([-0.08654731, -0.9825399 ,  0.09615008,  1.5299953 ], dtype=float32), action=array(1), policy_info=(), next_step_type=array(1, dtype=int32), reward=array(1., dtype=float32), discount=array(1., dtype=float32))
Trajectory(step_type=array(1, dtype=int32), observation=array([-0.10619811, -0.78869987,  0.12674999,  1.2688029 ], dtype=float32), action=array(0), policy_info=(), next_step_type=array(1, dtype=int32), reward=array(1., dtype=float32), discount=array(1., dtype=float32))
Trajectory(step_type=array(1, dtype=int32), observation=array([-0.12197211, -0.98519194,  0.15212604,  1.5983413 ], dtype=float32), action=array(0), policy_info=(), next_step_type=array(1, dtype=int32), reward=array(1., dtype=float32), discount=array(1., dtype=float32))
Trajectory(step_type=array(1, dtype=int32), observation=array([-0.14167595, -1.1817541 ,  0.18409286,  1.9343323 ], dtype=float32), action=array(0), policy_info=(), next_step_type=array(2, dtype=int32), reward=array(1., dtype=float32), discount=array(0., dtype=float32))
Average Return:  11.0

TensorFlow-Treiber

Wir haben auch Treiber in TensorFlow, die Python-Treibern funktional ähnlich sind, jedoch TF-Umgebungen, TF-Richtlinien, TF-Beobachter usw. verwenden. Derzeit haben wir 2 TensorFlow-Treiber: DynamicStepDriver , der nach einer bestimmten Anzahl von (gültigen) Umgebungsschritten beendet wird, und DynamicEpisodeDriver . die nach einer bestimmten Anzahl von Folgen endet. Schauen wir uns ein Beispiel für die DynamicEpisode in Aktion an.

env = suite_gym.load('CartPole-v0')
tf_env = tf_py_environment.TFPyEnvironment(env)

tf_policy = random_tf_policy.RandomTFPolicy(action_spec=tf_env.action_spec(),
                                            time_step_spec=tf_env.time_step_spec())


num_episodes = tf_metrics.NumberOfEpisodes()
env_steps = tf_metrics.EnvironmentSteps()
observers = [num_episodes, env_steps]
driver = dynamic_episode_driver.DynamicEpisodeDriver(
    tf_env, tf_policy, observers, num_episodes=2)

# Initial driver.run will reset the environment and initialize the policy.
final_time_step, policy_state = driver.run()

print('final_time_step', final_time_step)
print('Number of Steps: ', env_steps.result().numpy())
print('Number of Episodes: ', num_episodes.result().numpy())
final_time_step TimeStep(step_type=<tf.Tensor: shape=(1,), dtype=int32, numpy=array([0], dtype=int32)>, reward=<tf.Tensor: shape=(1,), dtype=float32, numpy=array([0.], dtype=float32)>, discount=<tf.Tensor: shape=(1,), dtype=float32, numpy=array([1.], dtype=float32)>, observation=<tf.Tensor: shape=(1, 4), dtype=float32, numpy=
array([[ 0.01828483, -0.04844216,  0.0462279 ,  0.04172863]],
      dtype=float32)>)
Number of Steps:  48
Number of Episodes:  2

# Continue running from previous state
final_time_step, _ = driver.run(final_time_step, policy_state)

print('final_time_step', final_time_step)
print('Number of Steps: ', env_steps.result().numpy())
print('Number of Episodes: ', num_episodes.result().numpy())
final_time_step TimeStep(step_type=<tf.Tensor: shape=(1,), dtype=int32, numpy=array([0], dtype=int32)>, reward=<tf.Tensor: shape=(1,), dtype=float32, numpy=array([0.], dtype=float32)>, discount=<tf.Tensor: shape=(1,), dtype=float32, numpy=array([1.], dtype=float32)>, observation=<tf.Tensor: shape=(1, 4), dtype=float32, numpy=
array([[ 0.03592875,  0.01797493, -0.02492136,  0.04149204]],
      dtype=float32)>)
Number of Steps:  83
Number of Episodes:  4