Créer un pipeline TFX à l'aide de modèles avec l'orchestrateur Beam

introduction

Ce document fournit des instructions pour créer un tensorflow pipeline étendu (TFX) à l' aide des modèles qui sont fournis avec le package TFX Python. La plupart des instructions sont des commandes shell Linux, et Jupyter cellules correspondantes du code portable qui font appel à ces commandes à l' aide ! sont prévus.

Vous allez construire un pipeline à l' aide de taxi Trips ensemble de données publié par la ville de Chicago. Nous vous encourageons fortement à essayer de créer votre propre pipeline à l'aide de votre ensemble de données en utilisant ce pipeline comme référence.

Nous allons construire un pipeline en utilisant Apache faisceau Orchestrator . Si vous êtes intéressé à utiliser Kubeflow orchestrateur sur Google Cloud, s'il vous plaît voir TFX didacticiel AI - Cloud Platform Pipelines .

Conditions préalables

  • Linux/Mac OS
  • Python >= 3.5.3

Vous pouvez obtenir toutes les conditions préalables facilement en cours d' exécution sur ce portable Google Colab .

Étape 1. Configurez votre environnement.

Tout au long de ce document, nous présenterons deux fois les commandes. Une fois en tant que commande shell prête à copier-coller, une fois en tant que cellule de bloc-notes jupyter. Si vous utilisez Colab, ignorez simplement le bloc de script shell et exécutez les cellules du bloc-notes.

Vous devez préparer un environnement de développement pour créer un pipeline.

Installer tfx paquet python. Nous vous recommandons d' utiliser virtualenv dans l'environnement local. Vous pouvez utiliser l'extrait de script shell suivant pour configurer votre environnement.

# Create a virtualenv for tfx.
virtualenv -p python3 venv
source venv/bin/activate
# Install python packages.
python -m pip install -q --user --upgrade tfx==0.23.0

Si vous utilisez Colab :

import sys
!{sys.executable} -m pip install -q --user --upgrade -q tfx==0.23.0

ERREUR : un-package 0.some_version.1 a l'exigence other-package!=2.0.,<3,>=1.15, mais vous aurez un other-package 2.0.0 qui est incompatible.

Veuillez ignorer ces erreurs pour le moment.

# Set `PATH` to include user python binary directory.
HOME=%env HOME
PATH=%env PATH
%env PATH={PATH}:{HOME}/.local/bin
env: PATH=/tmpfs/src/tf_docs_env/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/games:/usr/local/games:/snap/bin:/opt/puppetlabs/bin:/opt/android-studio/current/bin:/usr/local/go/bin:/usr/local/go/packages/bin:/opt/kubernetes/client/bin/:/home/kbuilder/.local/bin:/home/kbuilder/.local/bin

Vérifions la version de TFX.

python -c "import tfx; print('TFX version: {}'.format(tfx.__version__))"
python3 -c "import tfx; print('TFX version: {}'.format(tfx.__version__))"
TFX version: 0.23.0

Et, c'est fait. Nous sommes prêts à créer un pipeline.

Étape 2. Copiez le modèle prédéfini dans votre répertoire de projet.

Dans cette étape, nous allons créer un répertoire et des fichiers de projet de pipeline de travail en copiant des fichiers supplémentaires à partir d'un modèle prédéfini.

Vous pouvez donner votre pipeline un nom différent en changeant le PIPELINE_NAME ci - dessous. Cela deviendra également le nom du répertoire du projet où vos fichiers seront placés.

export PIPELINE_NAME="my_pipeline"
export PROJECT_DIR=~/tfx/${PIPELINE_NAME}
PIPELINE_NAME="my_pipeline"
import os
# Create a project directory under Colab content directory.
PROJECT_DIR=os.path.join(os.sep,"content",PIPELINE_NAME)

TFX comprend le taxi , modèle avec le paquet python TFX. Si vous envisagez de résoudre un problème de prédiction ponctuelle, y compris la classification et la régression, ce modèle peut être utilisé comme point de départ.

La tfx template copy commande copie CLI fichiers de modèles prédéfinis dans le répertoire de votre projet.

tfx template copy \
   --pipeline_name="${PIPELINE_NAME}" \
   --destination_path="${PROJECT_DIR}" \
   --model=taxi
!tfx template copy \
  --pipeline_name={PIPELINE_NAME} \
  --destination_path={PROJECT_DIR} \
  --model=taxi
2020-09-07 09:09:40.131982: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
CLI
Copying taxi pipeline template
Traceback (most recent call last):
  File "/home/kbuilder/.local/bin/tfx", line 8, in <module>
    sys.exit(cli_group())
  File "/home/kbuilder/.local/lib/python3.6/site-packages/click/core.py", line 829, in __call__
    return self.main(*args, **kwargs)
  File "/home/kbuilder/.local/lib/python3.6/site-packages/click/core.py", line 782, in main
    rv = self.invoke(ctx)
  File "/home/kbuilder/.local/lib/python3.6/site-packages/click/core.py", line 1259, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/home/kbuilder/.local/lib/python3.6/site-packages/click/core.py", line 1259, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/home/kbuilder/.local/lib/python3.6/site-packages/click/core.py", line 1066, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/home/kbuilder/.local/lib/python3.6/site-packages/click/core.py", line 610, in invoke
    return callback(*args, **kwargs)
  File "/home/kbuilder/.local/lib/python3.6/site-packages/click/decorators.py", line 73, in new_func
    return ctx.invoke(f, obj, *args, **kwargs)
  File "/home/kbuilder/.local/lib/python3.6/site-packages/click/core.py", line 610, in invoke
    return callback(*args, **kwargs)
  File "/home/kbuilder/.local/lib/python3.6/site-packages/tfx/tools/cli/commands/template.py", line 73, in copy
    template_handler.copy_template(ctx.flags_dict)
  File "/home/kbuilder/.local/lib/python3.6/site-packages/tfx/tools/cli/handler/template_handler.py", line 168, in copy_template
    replace_dict)
  File "/home/kbuilder/.local/lib/python3.6/site-packages/tfx/tools/cli/handler/template_handler.py", line 107, in _copy_and_replace_placeholder_dir
    tf.io.gfile.makedirs(dst)
  File "/tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow/python/lib/io/file_io.py", line 480, in recursive_create_dir_v2
    _pywrap_file_io.RecursivelyCreateDir(compat.as_bytes(path))
tensorflow.python.framework.errors_impl.PermissionDeniedError: /content; Permission denied

Remplacez le contexte du répertoire de travail de ce bloc-notes par le répertoire du projet.

cd ${PROJECT_DIR}
%cd {PROJECT_DIR}
[Errno 2] No such file or directory: '/content/my_pipeline'
/tmpfs/src/temp/docs/tutorials/tfx

Étape 3. Parcourez vos fichiers source copiés.

Le modèle TFX fournit des fichiers d'échafaudage de base pour créer un pipeline, y compris le code source Python, des exemples de données et des blocs-notes Jupyter pour analyser la sortie du pipeline. Le taxi , modèle utilise le même ensemble de données Chicago taxi et modèle ML comme Airflow Tutorial .

Dans Google Colab, vous pouvez parcourir les fichiers en cliquant sur une icône de dossier sur la gauche. Les fichiers doivent être copiés dans le cadre du projet directoy, dont le nom est my_pipeline dans ce cas. Vous pouvez cliquer sur les noms de répertoires pour voir le contenu du répertoire et double-cliquer sur les noms de fichiers pour les ouvrir.

Voici une brève introduction à chacun des fichiers Python.

  • pipeline - Ce répertoire contient la définition de la canalisation
    • configs.py - définit des constantes communes pour les coureurs de pipeline
    • pipeline.py - définit des composants de TFX et un pipeline
  • models - Ce répertoire contient les définitions de modèle ML.
    • features.py , features_test.py - définit caractéristiques pour le modèle
    • preprocessing.py , preprocessing_test.py - définit prétraitements emplois en utilisant tf::Transform
    • estimator - Ce répertoire contient un modèle basé estimateur.
      • constants.py - définit les constantes du modèle
      • model.py , model_test.py - définit le modèle DNN en utilisant l' estimateur de TF
    • keras - Ce répertoire contient un modèle basé Keras.
      • constants.py - définit les constantes du modèle
      • model.py , model_test.py - définit le modèle DNN utilisant Keras
  • beam_dag_runner.py , kubeflow_dag_runner.py - définir les coureurs pour chaque moteur d'orchestration

Vous remarquerez peut - être qu'il ya des fichiers avec _test.py en leur nom. Ce sont des tests unitaires du pipeline et il est recommandé d'ajouter d'autres tests unitaires lorsque vous implémentez vos propres pipelines. Vous pouvez exécuter des tests unitaires en fournissant le nom du module de fichiers de test avec -m drapeau. Vous pouvez habituellement obtenir un nom de module en supprimant .py l' extension et le remplacement / avec . . Par example:

python -m models.features_test
{sys.executable} -m models.features_test
{sys.executable} -m models.keras.model_test
/tmpfs/src/tf_docs_env/bin/python: Error while finding module specification for 'models.features_test' (ModuleNotFoundError: No module named 'models')
/tmpfs/src/tf_docs_env/bin/python: Error while finding module specification for 'models.keras.model_test' (ModuleNotFoundError: No module named 'models')

Étape 4. Exécutez votre premier pipeline TFX

Vous pouvez créer un pipeline à l' aide de pipeline create commande.

tfx pipeline create --engine=beam --pipeline_path=beam_dag_runner.py
tfx pipeline create --engine=beam --pipeline_path=beam_dag_runner.py
2020-09-07 09:09:45.612839: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
CLI
Creating pipeline
Invalid pipeline path: beam_dag_runner.py

Ensuite, vous pouvez exécuter le pipeline créé à l' aide run create commande.

tfx run create --engine=beam --pipeline_name="${PIPELINE_NAME}"
tfx run create --engine=beam --pipeline_name={PIPELINE_NAME}
2020-09-07 09:09:50.725339: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
CLI
Creating a run for pipeline: my_pipeline
Pipeline "my_pipeline" does not exist.

En cas de succès, vous verrez des Component CsvExampleGen is finished. Lorsque vous copiez le modèle, un seul composant, CsvExampleGen, est inclus dans le pipeline.

Étape 5. Ajoutez des composants pour la validation des données.

Dans cette étape, vous allez ajouter des composants pour la validation des données , y compris StatisticsGen , SchemaGen et ExampleValidator . Si vous êtes intéressé par la validation des données, s'il vous plaît voir Commencez avec tensorflow la validation des données .

Nous allons modifier la définition du pipeline copié dans pipeline/pipeline.py . Si vous travaillez sur votre environnement local, utilisez votre éditeur préféré pour éditer le fichier. Si vous travaillez sur Google Colab,

Cliquez sur l' icône de dossier sur la gauche pour ouvrir des Files vue.

Cliquez my_pipeline pour ouvrir le répertoire et cliquez sur pipeline répertoire pour ouvrir et double-cliquez sur pipeline.py pour ouvrir le fichier.

Trouvez et décommenter les 3 lignes qui ajoutent StatisticsGen , SchemaGen et ExampleValidator à la canalisation. (Astuce: trouver les commentaires contenant TODO(step 5): ).

Votre modification sera enregistrée automatiquement en quelques secondes. Assurez - vous que la * marque devant la pipeline.py a disparu dans le titre de l' onglet. Il n'y a pas de bouton d'enregistrement ni de raccourci pour l'éditeur de fichiers dans Colab. Fichiers Python dans l' éditeur de fichiers peuvent être enregistrés dans l'environnement d'exécution , même en playground de playground en mode.

Vous devez maintenant mettre à jour le pipeline existant avec la définition de pipeline modifiée. Utilisez la tfx pipeline update à tfx run create tfx pipeline update commande pour mettre à jour votre pipeline, suivi de la tfx run create commande pour créer une nouvelle course d'exécution de votre pipeline mis à jour.

# Update the pipeline
tfx pipeline update --engine=beam --pipeline_path=beam_dag_runner.py
# You can run the pipeline the same way.
tfx run create --engine beam --pipeline_name "${PIPELINE_NAME}"
# Update the pipeline
tfx pipeline update --engine=beam --pipeline_path=beam_dag_runner.py
# You can run the pipeline the same way.
tfx run create --engine beam --pipeline_name {PIPELINE_NAME}
2020-09-07 09:09:55.915484: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
CLI
Updating pipeline
Invalid pipeline path: beam_dag_runner.py
2020-09-07 09:10:01.148250: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
CLI
Creating a run for pipeline: my_pipeline
Pipeline "my_pipeline" does not exist.

Vous devriez pouvoir voir le journal de sortie des composants ajoutés. Notre pipeline crée des artefacts de sortie dans tfx_pipeline_output/my_pipeline répertoire.

Étape 6. Ajoutez des composants pour la formation.

Dans cette étape, vous allez ajouter des composants pour la formation et la validation des modèles , y compris Transform , Trainer , ResolverNode , Evaluator et Pusher .

Ouvrir pipeline/pipeline.py . Trouvez et décommentez 5 lignes qui ajoutent Transform , Trainer , ResolverNode , Evaluator et Pusher à la canalisation. (Conseil: La recherche TODO(step 6): )

Comme vous l'avez fait auparavant, vous devez maintenant mettre à jour le pipeline existant avec la définition de pipeline modifiée. Les instructions sont les mêmes que l' étape 5. Mettre à jour le pipeline à l' aide tfx pipeline update à tfx run create tfx pipeline update , et de créer une course d'exécution à l' aide tfx run create .

tfx pipeline update --engine=beam --pipeline_path=beam_dag_runner.py
tfx run create --engine beam --pipeline_name "${PIPELINE_NAME}"
tfx pipeline update --engine=beam --pipeline_path=beam_dag_runner.py
tfx run create --engine beam --pipeline_name {PIPELINE_NAME}
2020-09-07 09:10:06.281753: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
CLI
Updating pipeline
Invalid pipeline path: beam_dag_runner.py
2020-09-07 09:10:11.333668: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
CLI
Creating a run for pipeline: my_pipeline
Pipeline "my_pipeline" does not exist.

Lorsque cette exécution se termine avec succès, vous avez maintenant créé et exécuté votre premier pipeline TFX à l'aide de l'orchestrateur Beam !

Étape 7. (Facultatif) Essayez BigQueryExampleGen.

[BigQuery] est un entrepôt de données cloud sans serveur, hautement évolutif et rentable. BigQuery peut être utilisé comme source d'exemples d'entraînement dans TFX. Dans cette étape, nous allons ajouter BigQueryExampleGen au pipeline.

Vous avez besoin d' une plate - forme Google Cloud compte utiliser BigQuery. Veuillez préparer un projet GCP.

Connectez - vous à votre projet en utilisant la bibliothèque colab auth ou gcloud utilitaire.

# You need `gcloud` tool to login in local shell environment.
gcloud auth login
if 'google.colab' in sys.modules:
  from google.colab import auth
  auth.authenticate_user()
  print('Authenticated')

Vous devez spécifier le nom de votre projet GCP pour accéder aux ressources BigQuery à l'aide de TFX. Set GOOGLE_CLOUD_PROJECT variable d'environnement au nom de votre projet.

export GOOGLE_CLOUD_PROJECT=YOUR_PROJECT_NAME_HERE
# Set your project name below.
# WARNING! ENTER your project name before running this cell.
%env GOOGLE_CLOUD_PROJECT=YOUR_PROJECT_NAME_HERE
env: GOOGLE_CLOUD_PROJECT=YOUR_PROJECT_NAME_HERE

Ouvrir pipeline/pipeline.py . Commentez CsvExampleGen et la ligne uncomment qui créent une instance de BigQueryExampleGen . Vous devez également uncomment query argument de la create_pipeline fonction.

Nous devons préciser que le projet GCP à utiliser pour BigQuery à nouveau, et cela se fait par la mise en --project dans beam_pipeline_args lors de la création d' un pipeline.

Ouvrir pipeline/configs.py . La définition de Uncomment BIG_QUERY__WITH_DIRECT_RUNNER_BEAM_PIPELINE_ARGS et BIG_QUERY_QUERY . Vous devez remplacer l'ID du projet et la valeur de la région dans ce fichier par les valeurs correctes pour votre projet GCP.

Ouvrez beam_dag_runner.py . Décommenter deux arguments, la query et beam_pipeline_args , pour la méthode de create_pipeline ().

Le pipeline est maintenant prêt à utiliser BigQuery comme exemple de source. Mettez à jour le pipeline et créez une exécution comme nous l'avons fait aux étapes 5 et 6.

tfx pipeline update --engine=beam --pipeline_path=beam_dag_runner.py
tfx run create --engine beam --pipeline_name {PIPELINE_NAME}
2020-09-07 09:10:16.406635: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
CLI
Updating pipeline
Invalid pipeline path: beam_dag_runner.py
2020-09-07 09:10:21.439101: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
CLI
Creating a run for pipeline: my_pipeline
Pipeline "my_pipeline" does not exist.

Prochaine étape : ingérez VOS données dans le pipeline.

Nous avons créé un pipeline pour un modèle à l'aide de l'ensemble de données Chicago Taxi. Il est maintenant temps de mettre vos données dans le pipeline.

Vos données peuvent être stockées partout où votre pipeline peut accéder, y compris GCS ou BigQuery. Vous devrez modifier la définition du pipeline pour accéder à vos données.

  1. Si vos données sont stockées dans des fichiers, modifier le DATA_PATH dans kubeflow_dag_runner.py ou beam_dag_runner.py et le mettre à l'emplacement de vos fichiers. Si vos données sont stockées dans BigQuery, modifier BIG_QUERY_QUERY en pipeline/configs.py pour interroger correctement vos données.
  2. Ajouter des fonctionnalités dans les models/features.py .
  3. Modifier les models/preprocessing.py pour transformer les données d'entrée pour la formation .
  4. Modifier les models/keras/model.py et models/keras/constants.py pour décrire votre modèle ML .
    • Vous pouvez également utiliser un modèle basé sur un estimateur. Changement RUN_FN constant models.estimator.model.run_fn dans pipeline/configs.py .

S'il vous plaît voir Guide élément formateur pour plus d' introduction.