Aide à protéger la Grande barrière de corail avec tensorflow sur Kaggle Rejoignez Défi

Tutoriel sur les bandits armés multiples dans TF-Agents

Commencer

Voir sur TensorFlow.org Exécuter dans Google Colab Voir la source sur GitHub Télécharger le cahier

Installer

Si vous n'avez pas installé les dépendances suivantes, exécutez :

pip install tf-agents

Importations

import abc
import numpy as np
import tensorflow as tf

from tf_agents.agents import tf_agent
from tf_agents.drivers import driver
from tf_agents.environments import py_environment
from tf_agents.environments import tf_environment
from tf_agents.environments import tf_py_environment
from tf_agents.policies import tf_policy
from tf_agents.specs import array_spec
from tf_agents.specs import tensor_spec
from tf_agents.trajectories import time_step as ts
from tf_agents.trajectories import trajectory
from tf_agents.trajectories import policy_step

nest = tf.nest

introduction

Le problème du bandit multi-bras (MAB) est un cas particulier de l'apprentissage par renforcement : un agent collecte des récompenses dans un environnement en effectuant certaines actions après avoir observé un état de l'environnement. La principale différence entre le RL général et le MAB est que dans le MAB, nous supposons que l'action entreprise par l'agent n'influence pas l'état suivant de l'environnement. Par conséquent, les agents ne modélisent pas les transitions d'état, ne créditent pas les récompenses des actions passées, ou ne « planifient pas à l'avance » pour atteindre des états riches en récompenses.

Comme dans d' autres domaines de RL, l'objectif d'un agent MAB consiste à trouver une politique qui recueille comme récompense autant que possible. Ce serait une erreur, cependant, de toujours essayer d'exploiter l'action qui promet la plus haute récompense, car il y a alors une chance que nous manquions de meilleures actions si nous n'explorons pas suffisamment. Ceci est le principal problème à résoudre dans (MAB) souvent appelé le dilemme exploration-exploitation.

Environnements Bandit, les politiques et les agents du MAB se trouvent dans les sous - répertoires de tf_agents / bandits .

Environnements

Dans TF-agents, la classe d'environnement joue le rôle de donner des informations sur l'état actuel (ce qu'on appelle l' observation ou de contexte), recevant en entrée une action, d' effectuer une transition d'état, et délivrer en sortie une récompense. Cette classe s'occupe également de réinitialiser lorsqu'un épisode se termine, afin qu'un nouvel épisode puisse commencer. Ceci est réalisé en appelant une reset fonction lorsqu'un état est étiqueté comme « dernier » de l'épisode.

Pour plus de détails, voir le tutoriel de TF-agents .

Comme mentionné ci-dessus, le MAB diffère du RL général en ce que les actions n'influencent pas l'observation suivante. Une autre différence est que dans Bandits, il n'y a pas d'« épisodes » : chaque pas de temps commence par une nouvelle observation, indépendamment des pas de temps précédents.

Pour faire des observations sûres se trouvent indépendant et de faire abstraction du concept d'épisodes RL, nous présentons les sous - classes de PyEnvironment et TFEnvironment : BanditPyEnvironment et BanditTFEnvironment . Ces classes exposent deux fonctions membres privées qui restent à implémenter par l'utilisateur :

@abc.abstractmethod
def _observe(self):

et

@abc.abstractmethod
def _apply_action(self, action):

La _observe fonction renvoie une observation. Ensuite, la politique choisit une action basée sur cette observation. Le _apply_action reçoit cette action en tant qu'entrée, et renvoie la récompense correspondante. Ces fonctions membres privées sont appelées par les fonctions reset à step reset et step , respectivement.

class BanditPyEnvironment(py_environment.PyEnvironment):

  def __init__(self, observation_spec, action_spec):
    self._observation_spec = observation_spec
    self._action_spec = action_spec
    super(BanditPyEnvironment, self).__init__()

  # Helper functions.
  def action_spec(self):
    return self._action_spec

  def observation_spec(self):
    return self._observation_spec

  def _empty_observation(self):
    return tf.nest.map_structure(lambda x: np.zeros(x.shape, x.dtype),
                                 self.observation_spec())

  # These two functions below should not be overridden by subclasses.
  def _reset(self):
    """Returns a time step containing an observation."""
    return ts.restart(self._observe(), batch_size=self.batch_size)

  def _step(self, action):
    """Returns a time step containing the reward for the action taken."""
    reward = self._apply_action(action)
    return ts.termination(self._observe(), reward)

  # These two functions below are to be implemented in subclasses.
  @abc.abstractmethod
  def _observe(self):
    """Returns an observation."""

  @abc.abstractmethod
  def _apply_action(self, action):
    """Applies `action` to the Environment and returns the corresponding reward.
    """

Les outils ci - dessus provisoires de classe abstraite PyEnvironment de _reset et _step fonctions et expose les fonctions abstraites _observe et _apply_action à mettre en œuvre par les sous - classes.

Un exemple simple de classe d'environnement

La classe suivante donne un environnement très simple pour lequel l'observation est un entier aléatoire compris entre -2 et 2, il y a 3 actions possibles (0, 1, 2), et la récompense est le produit de l'action et de l'observation.

class SimplePyEnvironment(BanditPyEnvironment):

  def __init__(self):
    action_spec = array_spec.BoundedArraySpec(
        shape=(), dtype=np.int32, minimum=0, maximum=2, name='action')
    observation_spec = array_spec.BoundedArraySpec(
        shape=(1,), dtype=np.int32, minimum=-2, maximum=2, name='observation')
    super(SimplePyEnvironment, self).__init__(observation_spec, action_spec)

  def _observe(self):
    self._observation = np.random.randint(-2, 3, (1,), dtype='int32')
    return self._observation

  def _apply_action(self, action):
    return action * self._observation

Maintenant, nous pouvons utiliser cet environnement pour obtenir des observations et recevoir des récompenses pour nos actions.

environment = SimplePyEnvironment()
observation = environment.reset().observation
print("observation: %d" % observation)

action = 2

print("action: %d" % action)
reward = environment.step(action).reward
print("reward: %f" % reward)
observation: -1
action: 2
reward: -2.000000

Environnements TF

On peut définir un environnement de bandit par le sous - classement BanditTFEnvironment , ou, de façon similaire aux environnements RL, on peut définir un BanditPyEnvironment et l' envelopper avec TFPyEnvironment . Par souci de simplicité, nous allons avec cette dernière option dans ce tutoriel.

tf_environment = tf_py_environment.TFPyEnvironment(environment)

Stratégies

Une politique dans un problème de bandit fonctionne de la même manière que dans un problème de RL: il fournit une action (ou une distribution d'actions), compte tenu d' une observation en entrée.

Pour plus de détails, voir le tutoriel TF-agents politiques .

Comme avec les environnements, il y a deux façons de construire une politique: On peut créer un PyPolicy et l' envelopper avec TFPyPolicy ou créer directement un TFPolicy . Ici, nous choisissons d'aller avec la méthode directe.

Cet exemple étant assez simple, nous pouvons définir manuellement la politique optimale. L'action ne dépend que du signe de l'observation, 0 quand est négatif et 2 quand est positif.

class SignPolicy(tf_policy.TFPolicy):
  def __init__(self):
    observation_spec = tensor_spec.BoundedTensorSpec(
        shape=(1,), dtype=tf.int32, minimum=-2, maximum=2)
    time_step_spec = ts.time_step_spec(observation_spec)

    action_spec = tensor_spec.BoundedTensorSpec(
        shape=(), dtype=tf.int32, minimum=0, maximum=2)

    super(SignPolicy, self).__init__(time_step_spec=time_step_spec,
                                     action_spec=action_spec)
  def _distribution(self, time_step):
    pass

  def _variables(self):
    return ()

  def _action(self, time_step, policy_state, seed):
    observation_sign = tf.cast(tf.sign(time_step.observation[0]), dtype=tf.int32)
    action = observation_sign + 1
    return policy_step.PolicyStep(action, policy_state)

Maintenant, nous pouvons demander une observation à l'environnement, appeler la politique pour choisir une action, puis l'environnement affichera la récompense :

sign_policy = SignPolicy()

current_time_step = tf_environment.reset()
print('Observation:')
print (current_time_step.observation)
action = sign_policy.action(current_time_step).action
print('Action:')
print (action)
reward = tf_environment.step(action).reward
print('Reward:')
print(reward)
Observation:
tf.Tensor([[2]], shape=(1, 1), dtype=int32)
Action:
tf.Tensor([2], shape=(1,), dtype=int32)
Reward:
tf.Tensor([[4.]], shape=(1, 1), dtype=float32)

La façon dont les environnements de bandits sont mis en œuvre garantit que chaque fois que nous faisons un pas, nous recevons non seulement la récompense pour l'action que nous avons entreprise, mais également la prochaine observation.

step = tf_environment.reset()
action = 1
next_step = tf_environment.step(action)
reward = next_step.reward
next_observation = next_step.observation
print("Reward: ")
print(reward)
print("Next observation:")
print(next_observation)
Reward: 
tf.Tensor([[-2.]], shape=(1, 1), dtype=float32)
Next observation:
tf.Tensor([[0]], shape=(1, 1), dtype=int32)

Agents

Maintenant que nous avons des environnements et des politiques de bandit, il est temps de définir également des agents de bandit, qui se chargent de modifier la politique en fonction des échantillons d'entraînement.

L'API pour les agents de bandit ne diffère pas de celle des agents RL: l'agent a juste besoin de mettre en œuvre la _initialize et _train méthodes et définir une policy et un collect_policy .

Un environnement plus compliqué

Avant d'écrire notre agent bandit, nous devons avoir un environnement un peu plus difficile à comprendre. Pour pimenter les choses un peu, l'environnement prochain soit toujours donner la reward = observation * action ou reward = -observation * action . Cela sera décidé lors de l'initialisation de l'environnement.

class TwoWayPyEnvironment(BanditPyEnvironment):

  def __init__(self):
    action_spec = array_spec.BoundedArraySpec(
        shape=(), dtype=np.int32, minimum=0, maximum=2, name='action')
    observation_spec = array_spec.BoundedArraySpec(
        shape=(1,), dtype=np.int32, minimum=-2, maximum=2, name='observation')

    # Flipping the sign with probability 1/2.
    self._reward_sign = 2 * np.random.randint(2) - 1
    print("reward sign:")
    print(self._reward_sign)

    super(TwoWayPyEnvironment, self).__init__(observation_spec, action_spec)

  def _observe(self):
    self._observation = np.random.randint(-2, 3, (1,), dtype='int32')
    return self._observation

  def _apply_action(self, action):
    return self._reward_sign * action * self._observation[0]

two_way_tf_environment = tf_py_environment.TFPyEnvironment(TwoWayPyEnvironment())
reward sign:
-1

Une politique plus compliquée

Un environnement plus compliqué appelle une politique plus compliquée. Nous avons besoin d'une politique qui détecte le comportement de l'environnement sous-jacent. La stratégie doit gérer trois situations :

  1. L'agent n'a pas encore détecté la version de l'environnement en cours d'exécution.
  2. L'agent a détecté que la version d'origine de l'environnement est en cours d'exécution.
  3. L'agent a détecté que la version inversée de l'environnement est en cours d'exécution.

Nous définissons un tf_variable nommé _situation pour stocker ces informations codées sous forme de valeurs dans [0, 2] , puis fait fonctionner la politique en conséquence.

class TwoWaySignPolicy(tf_policy.TFPolicy):
  def __init__(self, situation):
    observation_spec = tensor_spec.BoundedTensorSpec(
        shape=(1,), dtype=tf.int32, minimum=-2, maximum=2)
    action_spec = tensor_spec.BoundedTensorSpec(
        shape=(), dtype=tf.int32, minimum=0, maximum=2)
    time_step_spec = ts.time_step_spec(observation_spec)
    self._situation = situation
    super(TwoWaySignPolicy, self).__init__(time_step_spec=time_step_spec,
                                           action_spec=action_spec)
  def _distribution(self, time_step):
    pass

  def _variables(self):
    return [self._situation]

  def _action(self, time_step, policy_state, seed):
    sign = tf.cast(tf.sign(time_step.observation[0, 0]), dtype=tf.int32)
    def case_unknown_fn():
      # Choose 1 so that we get information on the sign.
      return tf.constant(1, shape=(1,))

    # Choose 0 or 2, depending on the situation and the sign of the observation.
    def case_normal_fn():
      return tf.constant(sign + 1, shape=(1,))
    def case_flipped_fn():
      return tf.constant(1 - sign, shape=(1,))

    cases = [(tf.equal(self._situation, 0), case_unknown_fn),
             (tf.equal(self._situation, 1), case_normal_fn),
             (tf.equal(self._situation, 2), case_flipped_fn)]
    action = tf.case(cases, exclusive=True)
    return policy_step.PolicyStep(action, policy_state)

L'agent

Il est maintenant temps de définir l'agent qui détecte le signe de l'environnement et définit la politique de manière appropriée.

class SignAgent(tf_agent.TFAgent):
  def __init__(self):
    self._situation = tf.Variable(0, dtype=tf.int32)
    policy = TwoWaySignPolicy(self._situation)
    time_step_spec = policy.time_step_spec
    action_spec = policy.action_spec
    super(SignAgent, self).__init__(time_step_spec=time_step_spec,
                                    action_spec=action_spec,
                                    policy=policy,
                                    collect_policy=policy,
                                    train_sequence_length=None)

  def _initialize(self):
    return tf.compat.v1.variables_initializer(self.variables)

  def _train(self, experience, weights=None):
    observation = experience.observation
    action = experience.action
    reward = experience.reward

    # We only need to change the value of the situation variable if it is
    # unknown (0) right now, and we can infer the situation only if the
    # observation is not 0.
    needs_action = tf.logical_and(tf.equal(self._situation, 0),
                                  tf.not_equal(reward, 0))


    def new_situation_fn():
      """This returns either 1 or 2, depending on the signs."""
      return (3 - tf.sign(tf.cast(observation[0, 0, 0], dtype=tf.int32) *
                          tf.cast(action[0, 0], dtype=tf.int32) *
                          tf.cast(reward[0, 0], dtype=tf.int32))) / 2

    new_situation = tf.cond(needs_action,
                            new_situation_fn,
                            lambda: self._situation)
    new_situation = tf.cast(new_situation, tf.int32)
    tf.compat.v1.assign(self._situation, new_situation)
    return tf_agent.LossInfo((), ())

sign_agent = SignAgent()

Dans le code ci - dessus, l'agent définit la politique et la variable la situation est partagée par l'agent et la politique.

En outre, le paramètre experience de la _train fonction est une trajectoire:

Trajectoires

Dans TF-agents, les trajectories sont nommées tuples qui contiennent des échantillons provenant des étapes précédentes prises. Ces exemples sont ensuite utilisés par l'agent pour former et mettre à jour la stratégie. Dans RL, les trajectoires doivent contenir des informations sur l'état actuel, l'état suivant et si l'épisode actuel est terminé. Puisque dans le monde Bandit nous n'avons pas besoin de ces choses, nous avons mis en place une fonction d'aide pour créer une trajectoire :

# We need to add another dimension here because the agent expects the
# trajectory of shape [batch_size, time, ...], but in this tutorial we assume
# that both batch size and time are 1. Hence all the expand_dims.

def trajectory_for_bandit(initial_step, action_step, final_step):
  return trajectory.Trajectory(observation=tf.expand_dims(initial_step.observation, 0),
                               action=tf.expand_dims(action_step.action, 0),
                               policy_info=action_step.info,
                               reward=tf.expand_dims(final_step.reward, 0),
                               discount=tf.expand_dims(final_step.discount, 0),
                               step_type=tf.expand_dims(initial_step.step_type, 0),
                               next_step_type=tf.expand_dims(final_step.step_type, 0))

Formation d'un agent

Maintenant, toutes les pièces sont prêtes pour la formation de notre agent bandit.

step = two_way_tf_environment.reset()
for _ in range(10):
  action_step = sign_agent.collect_policy.action(step)
  next_step = two_way_tf_environment.step(action_step.action)
  experience = trajectory_for_bandit(step, action_step, next_step)
  print(experience)
  sign_agent.train(experience)
  step = next_step
Trajectory(
{'action': <tf.Tensor: shape=(1, 1), dtype=int32, numpy=array([[1]], dtype=int32)>,
 'discount': <tf.Tensor: shape=(1, 1), dtype=float32, numpy=array([[0.]], dtype=float32)>,
 'next_step_type': <tf.Tensor: shape=(1, 1), dtype=int32, numpy=array([[2]], dtype=int32)>,
 'observation': <tf.Tensor: shape=(1, 1, 1), dtype=int32, numpy=array([[[0]]], dtype=int32)>,
 'policy_info': (),
 'reward': <tf.Tensor: shape=(1, 1), dtype=float32, numpy=array([[0.]], dtype=float32)>,
 'step_type': <tf.Tensor: shape=(1, 1), dtype=int32, numpy=array([[0]], dtype=int32)>})
Trajectory(
{'action': <tf.Tensor: shape=(1, 1), dtype=int32, numpy=array([[1]], dtype=int32)>,
 'discount': <tf.Tensor: shape=(1, 1), dtype=float32, numpy=array([[0.]], dtype=float32)>,
 'next_step_type': <tf.Tensor: shape=(1, 1), dtype=int32, numpy=array([[2]], dtype=int32)>,
 'observation': <tf.Tensor: shape=(1, 1, 1), dtype=int32, numpy=array([[[-1]]], dtype=int32)>,
 'policy_info': (),
 'reward': <tf.Tensor: shape=(1, 1), dtype=float32, numpy=array([[1.]], dtype=float32)>,
 'step_type': <tf.Tensor: shape=(1, 1), dtype=int32, numpy=array([[2]], dtype=int32)>})
Trajectory(
{'action': <tf.Tensor: shape=(1, 1), dtype=int32, numpy=array([[0]], dtype=int32)>,
 'discount': <tf.Tensor: shape=(1, 1), dtype=float32, numpy=array([[0.]], dtype=float32)>,
 'next_step_type': <tf.Tensor: shape=(1, 1), dtype=int32, numpy=array([[2]], dtype=int32)>,
 'observation': <tf.Tensor: shape=(1, 1, 1), dtype=int32, numpy=array([[[1]]], dtype=int32)>,
 'policy_info': (),
 'reward': <tf.Tensor: shape=(1, 1), dtype=float32, numpy=array([[0.]], dtype=float32)>,
 'step_type': <tf.Tensor: shape=(1, 1), dtype=int32, numpy=array([[2]], dtype=int32)>})
Trajectory(
{'action': <tf.Tensor: shape=(1, 1), dtype=int32, numpy=array([[0]], dtype=int32)>,
 'discount': <tf.Tensor: shape=(1, 1), dtype=float32, numpy=array([[0.]], dtype=float32)>,
 'next_step_type': <tf.Tensor: shape=(1, 1), dtype=int32, numpy=array([[2]], dtype=int32)>,
 'observation': <tf.Tensor: shape=(1, 1, 1), dtype=int32, numpy=array([[[1]]], dtype=int32)>,
 'policy_info': (),
 'reward': <tf.Tensor: shape=(1, 1), dtype=float32, numpy=array([[0.]], dtype=float32)>,
 'step_type': <tf.Tensor: shape=(1, 1), dtype=int32, numpy=array([[2]], dtype=int32)>})
Trajectory(
{'action': <tf.Tensor: shape=(1, 1), dtype=int32, numpy=array([[0]], dtype=int32)>,
 'discount': <tf.Tensor: shape=(1, 1), dtype=float32, numpy=array([[0.]], dtype=float32)>,
 'next_step_type': <tf.Tensor: shape=(1, 1), dtype=int32, numpy=array([[2]], dtype=int32)>,
 'observation': <tf.Tensor: shape=(1, 1, 1), dtype=int32, numpy=array([[[1]]], dtype=int32)>,
 'policy_info': (),
 'reward': <tf.Tensor: shape=(1, 1), dtype=float32, numpy=array([[0.]], dtype=float32)>,
 'step_type': <tf.Tensor: shape=(1, 1), dtype=int32, numpy=array([[2]], dtype=int32)>})
Trajectory(
{'action': <tf.Tensor: shape=(1, 1), dtype=int32, numpy=array([[0]], dtype=int32)>,
 'discount': <tf.Tensor: shape=(1, 1), dtype=float32, numpy=array([[0.]], dtype=float32)>,
 'next_step_type': <tf.Tensor: shape=(1, 1), dtype=int32, numpy=array([[2]], dtype=int32)>,
 'observation': <tf.Tensor: shape=(1, 1, 1), dtype=int32, numpy=array([[[1]]], dtype=int32)>,
 'policy_info': (),
 'reward': <tf.Tensor: shape=(1, 1), dtype=float32, numpy=array([[0.]], dtype=float32)>,
 'step_type': <tf.Tensor: shape=(1, 1), dtype=int32, numpy=array([[2]], dtype=int32)>})
Trajectory(
{'action': <tf.Tensor: shape=(1, 1), dtype=int32, numpy=array([[2]], dtype=int32)>,
 'discount': <tf.Tensor: shape=(1, 1), dtype=float32, numpy=array([[0.]], dtype=float32)>,
 'next_step_type': <tf.Tensor: shape=(1, 1), dtype=int32, numpy=array([[2]], dtype=int32)>,
 'observation': <tf.Tensor: shape=(1, 1, 1), dtype=int32, numpy=array([[[-1]]], dtype=int32)>,
 'policy_info': (),
 'reward': <tf.Tensor: shape=(1, 1), dtype=float32, numpy=array([[2.]], dtype=float32)>,
 'step_type': <tf.Tensor: shape=(1, 1), dtype=int32, numpy=array([[2]], dtype=int32)>})
Trajectory(
{'action': <tf.Tensor: shape=(1, 1), dtype=int32, numpy=array([[2]], dtype=int32)>,
 'discount': <tf.Tensor: shape=(1, 1), dtype=float32, numpy=array([[0.]], dtype=float32)>,
 'next_step_type': <tf.Tensor: shape=(1, 1), dtype=int32, numpy=array([[2]], dtype=int32)>,
 'observation': <tf.Tensor: shape=(1, 1, 1), dtype=int32, numpy=array([[[-2]]], dtype=int32)>,
 'policy_info': (),
 'reward': <tf.Tensor: shape=(1, 1), dtype=float32, numpy=array([[4.]], dtype=float32)>,
 'step_type': <tf.Tensor: shape=(1, 1), dtype=int32, numpy=array([[2]], dtype=int32)>})
Trajectory(
{'action': <tf.Tensor: shape=(1, 1), dtype=int32, numpy=array([[0]], dtype=int32)>,
 'discount': <tf.Tensor: shape=(1, 1), dtype=float32, numpy=array([[0.]], dtype=float32)>,
 'next_step_type': <tf.Tensor: shape=(1, 1), dtype=int32, numpy=array([[2]], dtype=int32)>,
 'observation': <tf.Tensor: shape=(1, 1, 1), dtype=int32, numpy=array([[[2]]], dtype=int32)>,
 'policy_info': (),
 'reward': <tf.Tensor: shape=(1, 1), dtype=float32, numpy=array([[0.]], dtype=float32)>,
 'step_type': <tf.Tensor: shape=(1, 1), dtype=int32, numpy=array([[2]], dtype=int32)>})
Trajectory(
{'action': <tf.Tensor: shape=(1, 1), dtype=int32, numpy=array([[0]], dtype=int32)>,
 'discount': <tf.Tensor: shape=(1, 1), dtype=float32, numpy=array([[0.]], dtype=float32)>,
 'next_step_type': <tf.Tensor: shape=(1, 1), dtype=int32, numpy=array([[2]], dtype=int32)>,
 'observation': <tf.Tensor: shape=(1, 1, 1), dtype=int32, numpy=array([[[1]]], dtype=int32)>,
 'policy_info': (),
 'reward': <tf.Tensor: shape=(1, 1), dtype=float32, numpy=array([[0.]], dtype=float32)>,
 'step_type': <tf.Tensor: shape=(1, 1), dtype=int32, numpy=array([[2]], dtype=int32)>})

D'après la sortie, on peut voir qu'après la deuxième étape (sauf si l'observation était 0 à la première étape), la politique choisit l'action de la bonne manière et donc la récompense collectée est toujours non négative.

Un vrai exemple de bandit contextuel

Dans le reste de ce tutoriel, nous utilisons les pré-mis en œuvre des environnements et des agents de la TF-agents Bandits bibliothèque.

# Imports for example.
from tf_agents.bandits.agents import lin_ucb_agent
from tf_agents.bandits.environments import stationary_stochastic_py_environment as sspe
from tf_agents.bandits.metrics import tf_metrics
from tf_agents.drivers import dynamic_step_driver
from tf_agents.replay_buffers import tf_uniform_replay_buffer

import matplotlib.pyplot as plt

Environnement stochastique stationnaire avec fonctions de gain linéaire

L'environnement utilisé dans cet exemple est le StationaryStochasticPyEnvironment . Cet environnement prend comme paramètre une fonction (généralement bruyante) pour donner des observations (contexte), et pour chaque bras prend une fonction (également bruyante) qui calcule la récompense en fonction de l'observation donnée. Dans notre exemple, nous échantillonnons le contexte uniformément à partir d'un cube de dimension d, et les fonctions de récompense sont des fonctions linéaires du contexte, plus un peu de bruit gaussien.

batch_size = 2 # @param
arm0_param = [-3, 0, 1, -2] # @param
arm1_param = [1, -2, 3, 0] # @param
arm2_param = [0, 0, 1, 1] # @param
def context_sampling_fn(batch_size):
  """Contexts from [-10, 10]^4."""
  def _context_sampling_fn():
    return np.random.randint(-10, 10, [batch_size, 4]).astype(np.float32)
  return _context_sampling_fn

class LinearNormalReward(object):
  """A class that acts as linear reward function when called."""
  def __init__(self, theta, sigma):
    self.theta = theta
    self.sigma = sigma
  def __call__(self, x):
    mu = np.dot(x, self.theta)
    return np.random.normal(mu, self.sigma)

arm0_reward_fn = LinearNormalReward(arm0_param, 1)
arm1_reward_fn = LinearNormalReward(arm1_param, 1)
arm2_reward_fn = LinearNormalReward(arm2_param, 1)

environment = tf_py_environment.TFPyEnvironment(
    sspe.StationaryStochasticPyEnvironment(
        context_sampling_fn(batch_size),
        [arm0_reward_fn, arm1_reward_fn, arm2_reward_fn],
        batch_size=batch_size))

L'agent LinUCB

L'agent ci - dessous met en œuvre le LinUCB algorithme.

observation_spec = tensor_spec.TensorSpec([4], tf.float32)
time_step_spec = ts.time_step_spec(observation_spec)
action_spec = tensor_spec.BoundedTensorSpec(
    dtype=tf.int32, shape=(), minimum=0, maximum=2)

agent = lin_ucb_agent.LinearUCBAgent(time_step_spec=time_step_spec,
                                     action_spec=action_spec)

Métrique de regret

Mesure la plus importante de Bandits est regret, calculé comme la différence entre la récompense recueillie par l'agent et la récompense attendue d'une politique d'oracle qui a l' accès aux fonctions de récompense de l ' environnement. Le RegretMetric a donc besoin d' une fonction baseline_reward_fn qui calcule la meilleure récompense attendue réalisable compte tenu d' une observation. Pour notre exemple, nous devons prendre le maximum des équivalents sans bruit des fonctions de récompense que nous avons déjà définies pour l'environnement.

def compute_optimal_reward(observation):
  expected_reward_for_arms = [
      tf.linalg.matvec(observation, tf.cast(arm0_param, dtype=tf.float32)),
      tf.linalg.matvec(observation, tf.cast(arm1_param, dtype=tf.float32)),
      tf.linalg.matvec(observation, tf.cast(arm2_param, dtype=tf.float32))]
  optimal_action_reward = tf.reduce_max(expected_reward_for_arms, axis=0)
  return optimal_action_reward

regret_metric = tf_metrics.RegretMetric(compute_optimal_reward)

Entraînement

Maintenant, nous rassemblons tous les composants que nous avons introduits ci-dessus : l'environnement, la politique et l'agent. Nous courons la politique sur l'environnement et les données de formation de sortie avec l'aide d'un conducteur, et former l'agent sur les données.

Notez qu'il existe deux paramètres qui spécifient ensemble le nombre de pas effectués. num_iterations précise , nous courons combien de fois la boucle d'entraînement, tandis que le conducteur prendra steps_per_loop étapes par itération. La principale raison pour laquelle ces deux paramètres sont conservés est que certaines opérations sont effectuées par itération, tandis que d'autres sont effectuées par le pilote à chaque étape. Par exemple, l'agent train fonction est appelée une seule fois par itération. Le compromis ici est que si nous nous entraînons plus souvent, notre politique est « plus fraîche », d'autre part, la formation en lots plus importants pourrait être plus efficace en termes de temps.

num_iterations = 90 # @param
steps_per_loop = 1 # @param

replay_buffer = tf_uniform_replay_buffer.TFUniformReplayBuffer(
    data_spec=agent.policy.trajectory_spec,
    batch_size=batch_size,
    max_length=steps_per_loop)

observers = [replay_buffer.add_batch, regret_metric]

driver = dynamic_step_driver.DynamicStepDriver(
    env=environment,
    policy=agent.collect_policy,
    num_steps=steps_per_loop * batch_size,
    observers=observers)

regret_values = []

for _ in range(num_iterations):
  driver.run()
  loss_info = agent.train(replay_buffer.gather_all())
  replay_buffer.clear()
  regret_values.append(regret_metric.result())

plt.plot(regret_values)
plt.ylabel('Average Regret')
plt.xlabel('Number of Iterations')
WARNING:tensorflow:From /tmp/ipykernel_11392/3138849230.py:21: ReplayBuffer.gather_all (from tf_agents.replay_buffers.replay_buffer) is deprecated and will be removed in a future version.
Instructions for updating:
Use `as_dataset(..., single_deterministic_pass=True)` instead.
Text(0.5, 0, 'Number of Iterations')

png

Après avoir exécuté le dernier extrait de code, le graphique résultant (espérons-le) montre que le regret moyen diminue à mesure que l'agent est formé et que la politique s'améliore pour déterminer quelle est la bonne action, compte tenu de l'observation.

Et après?

Pour voir des exemples plus de travail, s'il vous plaît voir les bandits / agents / exemples répertoire qui a des exemples prêts à fonctionner pour différents agents et environnements.

La bibliothèque TF-Agents est également capable de gérer les bandits multi-bras avec des fonctionnalités par bras. À cette fin, nous renvoyons le lecteur au bandit par le bras tutoriel .