Créer un pipeline TFX à l'aide de modèles

introduction

Ce document fournit des instructions pour créer un tensorflow pipeline étendu (TFX) à l' aide des modèles qui sont fournis avec le package TFX Python. La plupart des instructions sont des commandes shell Linux, qui s'exécuteront sur une instance AI Platform Notebooks. Jupyter cellules correspondantes du code portable qui font appel à ces commandes à l' aide ! sont prévus.

Vous allez construire un pipeline à l' aide de taxi Trips ensemble de données publié par la ville de Chicago. Nous vous encourageons vivement à essayer de créer votre propre pipeline à l'aide de votre ensemble de données en utilisant ce pipeline comme référence.

Étape 1. Configurez votre environnement.

AI Platform Pipelines préparera un environnement de développement pour créer un pipeline et un cluster Kubeflow Pipeline pour exécuter le pipeline nouvellement créé.

Installer tfx paquet python avec kfp exigence supplémentaire.

import sys
# Use the latest version of pip.
!pip install --upgrade pip
# Install tfx and kfp Python packages.
!pip install --upgrade "tfx[kfp]<2"

Vérifions les versions de TFX.

python3 -c "from tfx import version ; print('TFX version: {}'.format(version.__version__))"
TFX version: 0.29.0

Dans la plate - forme AI Pipelines, TFX est en cours d' exécution dans un environnement hébergé à l' aide Kubernetes Kubeflow Pipelines .

Définissons quelques variables d'environnement pour utiliser Kubeflow Pipelines.

Tout d'abord, obtenez votre ID de projet GCP.

# Read GCP project id from env.
shell_output=!gcloud config list --format 'value(core.project)' 2>/dev/null
GOOGLE_CLOUD_PROJECT=shell_output[0]
%env GOOGLE_CLOUD_PROJECT={GOOGLE_CLOUD_PROJECT}
print("GCP project ID:" + GOOGLE_CLOUD_PROJECT)
env: GOOGLE_CLOUD_PROJECT=tf-benchmark-dashboard
GCP project ID:tf-benchmark-dashboard

Nous devons également accéder à votre cluster KFP. Vous pouvez y accéder dans votre Google Cloud Console sous le menu "AI Platform > Pipeline". Le « point de terminaison » du cluster KFP se trouve à partir de l'URL du tableau de bord Pipelines, ou vous pouvez l'obtenir à partir de l'URL de la page de démarrage où vous avez lancé ce notebook. Créons une ENDPOINT variable d'environnement et réglez -le sur le point de terminaison de cluster KFP. ENDPOINT ne doit contenir que la partie nom d'hôte de l'URL. Par exemple, si l'URL du tableau de bord KFP est <a href="https://1e9deb537390ca22-dot-asia-east1.pipelines.googleusercontent.com/#/start">https://1e9deb537390ca22-dot-asia-east1.pipelines.googleusercontent.com/#/start</a> , la valeur eNDPOINT devient 1e9deb537390ca22-dot-asia-east1.pipelines.googleusercontent.com .

# This refers to the KFP cluster endpoint
ENDPOINT='' # Enter your ENDPOINT here.
if not ENDPOINT:
    from absl import logging
    logging.error('Set your ENDPOINT in this cell.')
ERROR:absl:Set your ENDPOINT in this cell.

Définissez le nom d'image tfx-pipeline dans le cadre du projet GCP en cours.

# Docker image name for the pipeline image.
CUSTOM_TFX_IMAGE='gcr.io/' + GOOGLE_CLOUD_PROJECT + '/tfx-pipeline'

Et, c'est fait. Nous sommes prêts à créer un pipeline.

Étape 2. Copiez le modèle prédéfini dans votre répertoire de projet.

Dans cette étape, nous allons créer un répertoire et des fichiers de projet de pipeline de travail en copiant des fichiers supplémentaires à partir d'un modèle prédéfini.

Vous pouvez donner votre pipeline un nom différent en changeant le PIPELINE_NAME ci - dessous. Cela deviendra également le nom du répertoire du projet où vos fichiers seront placés.

PIPELINE_NAME="my_pipeline"
import os
PROJECT_DIR=os.path.join(os.path.expanduser("~"),"imported",PIPELINE_NAME)

TFX comprend le taxi , modèle avec le paquet python TFX. Si vous envisagez de résoudre un problème de prédiction ponctuelle, y compris la classification et la régression, ce modèle peut être utilisé comme point de départ.

La tfx template copy commande copie CLI fichiers de modèles prédéfinis dans le répertoire de votre projet.

!tfx template copy \
  --pipeline-name={PIPELINE_NAME} \
  --destination-path={PROJECT_DIR} \
  --model=taxi
CLI
Copying taxi pipeline template
kubeflow_runner.py -> /home/kbuilder/imported/my_pipeline/kubeflow_runner.py
kubeflow_v2_dag_runner.py -> /home/kbuilder/imported/my_pipeline/kubeflow_v2_dag_runner.py
features_test.py -> /home/kbuilder/imported/my_pipeline/models/features_test.py
model_test.py -> /home/kbuilder/imported/my_pipeline/models/estimator/model_test.py
constants.py -> /home/kbuilder/imported/my_pipeline/models/estimator/constants.py
model.py -> /home/kbuilder/imported/my_pipeline/models/estimator/model.py
__init__.py -> /home/kbuilder/imported/my_pipeline/models/estimator/__init__.py
model_test.py -> /home/kbuilder/imported/my_pipeline/models/keras/model_test.py
constants.py -> /home/kbuilder/imported/my_pipeline/models/keras/constants.py
model.py -> /home/kbuilder/imported/my_pipeline/models/keras/model.py
__init__.py -> /home/kbuilder/imported/my_pipeline/models/keras/__init__.py
preprocessing_test.py -> /home/kbuilder/imported/my_pipeline/models/preprocessing_test.py
preprocessing.py -> /home/kbuilder/imported/my_pipeline/models/preprocessing.py
__init__.py -> /home/kbuilder/imported/my_pipeline/models/__init__.py
features.py -> /home/kbuilder/imported/my_pipeline/models/features.py
pipeline.py -> /home/kbuilder/imported/my_pipeline/pipeline/pipeline.py
configs.py -> /home/kbuilder/imported/my_pipeline/pipeline/configs.py
__init__.py -> /home/kbuilder/imported/my_pipeline/pipeline/__init__.py
local_runner.py -> /home/kbuilder/imported/my_pipeline/local_runner.py
model_analysis.ipynb -> /home/kbuilder/imported/my_pipeline/model_analysis.ipynb
__init__.py -> /home/kbuilder/imported/my_pipeline/__init__.py
data_validation.ipynb -> /home/kbuilder/imported/my_pipeline/data_validation.ipynb
.gitignore -> /home/kbuilder/imported/my_pipeline/.gitignore
Traceback (most recent call last):
  File "/tmpfs/src/tf_docs_env/bin/tfx", line 8, in <module>
    sys.exit(cli_group())
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 829, in __call__
    return self.main(*args, **kwargs)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 782, in main
    rv = self.invoke(ctx)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 1259, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 1259, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 1066, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 610, in invoke
    return callback(*args, **kwargs)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/decorators.py", line 73, in new_func
    return ctx.invoke(f, obj, *args, **kwargs)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 610, in invoke
    return callback(*args, **kwargs)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tfx/tools/cli/commands/template.py", line 73, in copy
    template_handler.copy_template(ctx.flags_dict)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tfx/tools/cli/handler/template_handler.py", line 185, in copy_template
    fileio.copy(src_path, dst_path)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tfx/dsl/io/fileio.py", line 51, in copy
    src_fs.copy(src, dst, overwrite=overwrite)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tfx/dsl/io/plugins/tensorflow_gfile.py", line 48, in copy
    tf.io.gfile.copy(src, dst, overwrite=overwrite)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow/python/lib/io/file_io.py", line 516, in copy_v2
    compat.path_to_bytes(src), compat.path_to_bytes(dst), overwrite)
tensorflow.python.framework.errors_impl.AlreadyExistsError: file already exists

Remplacez le contexte du répertoire de travail de ce bloc-notes par le répertoire du projet.

%cd {PROJECT_DIR}
/home/kbuilder/imported/my_pipeline

Étape 3. Parcourez vos fichiers source copiés

Le modèle TFX fournit des fichiers d'échafaudage de base pour créer un pipeline, y compris le code source Python, des exemples de données et des blocs-notes Jupyter pour analyser la sortie du pipeline. Le taxi , modèle utilise le même ensemble de données Chicago taxi et modèle ML comme Airflow Tutorial .

Voici une brève introduction à chacun des fichiers Python.

  • pipeline - Ce répertoire contient la définition de la canalisation
    • configs.py - définit des constantes communes pour les coureurs de pipeline
    • pipeline.py - définit des composants de TFX et un pipeline
  • models - Ce répertoire contient les définitions de modèle ML.
    • features.py , features_test.py - définit caractéristiques pour le modèle
    • preprocessing.py , preprocessing_test.py - définit prétraitements emplois en utilisant tf::Transform
    • estimator - Ce répertoire contient un modèle basé estimateur.
      • constants.py - définit les constantes du modèle
      • model.py , model_test.py - définit le modèle DNN en utilisant l' estimateur de TF
    • keras - Ce répertoire contient un modèle basé Keras.
      • constants.py - définit les constantes du modèle
      • model.py , model_test.py - définit le modèle DNN utilisant Keras
  • local_runner.py , kubeflow_runner.py - définir les coureurs pour chaque moteur d'orchestration

Vous remarquerez peut - être qu'il ya des fichiers avec _test.py en leur nom. Ce sont des tests unitaires du pipeline et il est recommandé d'ajouter d'autres tests unitaires lorsque vous implémentez vos propres pipelines. Vous pouvez exécuter des tests unitaires en fournissant le nom du module de fichiers de test avec -m drapeau. Vous pouvez habituellement obtenir un nom de module en supprimant .py l' extension et le remplacement / avec . . Par example:

{sys.executable} -m models.features_test
{sys.executable} -m models.keras.model_test
Running tests under Python 3.7.5: /tmpfs/src/tf_docs_env/bin/python
[ RUN      ] FeaturesTest.testNumberOfBucketFeatureBucketCount
INFO:tensorflow:time(__main__.FeaturesTest.testNumberOfBucketFeatureBucketCount): 0.0s
I1204 11:33:54.064224 139808961349440 test_util.py:2076] time(__main__.FeaturesTest.testNumberOfBucketFeatureBucketCount): 0.0s
[       OK ] FeaturesTest.testNumberOfBucketFeatureBucketCount
[ RUN      ] FeaturesTest.testTransformedNames
INFO:tensorflow:time(__main__.FeaturesTest.testTransformedNames): 0.0s
I1204 11:33:54.064666 139808961349440 test_util.py:2076] time(__main__.FeaturesTest.testTransformedNames): 0.0s
[       OK ] FeaturesTest.testTransformedNames
[ RUN      ] FeaturesTest.test_session
[  SKIPPED ] FeaturesTest.test_session
----------------------------------------------------------------------
Ran 3 tests in 0.001s

OK (skipped=1)
Running tests under Python 3.7.5: /tmpfs/src/tf_docs_env/bin/python
[ RUN      ] ModelTest.testBuildKerasModel
2021-12-04 11:33:57.507456: W tensorflow/stream_executor/platform/default/dso_loader.cc:60] Could not load dynamic library 'libcusolver.so.10'; dlerror: libcusolver.so.10: cannot open shared object file: No such file or directory
2021-12-04 11:33:57.508566: W tensorflow/core/common_runtime/gpu/gpu_device.cc:1757] Cannot dlopen some GPU libraries. Please make sure the missing libraries mentioned above are installed properly if you would like to use GPU. Follow the guide at https://www.tensorflow.org/install/gpu for how to download and setup the required libraries for your platform.
Skipping registering GPU devices...
I1204 11:33:57.581331 139740839778112 layer_utils.py:191] Model: "model"
I1204 11:33:57.581501 139740839778112 layer_utils.py:192] __________________________________________________________________________________________________
I1204 11:33:57.581558 139740839778112 layer_utils.py:189] Layer (type)                    Output Shape         Param #     Connected to                     
I1204 11:33:57.581596 139740839778112 layer_utils.py:194] ==================================================================================================
I1204 11:33:57.581741 139740839778112 layer_utils.py:189] pickup_latitude_xf (InputLayer) [(None,)]            0                                            
I1204 11:33:57.581793 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.581883 139740839778112 layer_utils.py:189] trip_miles_xf (InputLayer)      [(None,)]            0                                            
I1204 11:33:57.581926 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.582010 139740839778112 layer_utils.py:189] trip_start_hour_xf (InputLayer) [(None,)]            0                                            
I1204 11:33:57.582052 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.582189 139740839778112 layer_utils.py:189] dense_features (DenseFeatures)  (None, 1)            0           pickup_latitude_xf[0][0]         
I1204 11:33:57.582241 139740839778112 layer_utils.py:189]                                                                  trip_miles_xf[0][0]              
I1204 11:33:57.582280 139740839778112 layer_utils.py:189]                                                                  trip_start_hour_xf[0][0]         
I1204 11:33:57.582315 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.582462 139740839778112 layer_utils.py:189] dense (Dense)                   (None, 1)            2           dense_features[0][0]             
I1204 11:33:57.582518 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.582629 139740839778112 layer_utils.py:189] dense_1 (Dense)                 (None, 1)            2           dense[0][0]                      
I1204 11:33:57.582674 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.582824 139740839778112 layer_utils.py:189] dense_features_1 (DenseFeatures (None, 34)           0           pickup_latitude_xf[0][0]         
I1204 11:33:57.582879 139740839778112 layer_utils.py:189]                                                                  trip_miles_xf[0][0]              
I1204 11:33:57.582921 139740839778112 layer_utils.py:189]                                                                  trip_start_hour_xf[0][0]         
I1204 11:33:57.582957 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.583053 139740839778112 layer_utils.py:189] concatenate (Concatenate)       (None, 35)           0           dense_1[0][0]                    
I1204 11:33:57.583099 139740839778112 layer_utils.py:189]                                                                  dense_features_1[0][0]           
I1204 11:33:57.583143 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.583260 139740839778112 layer_utils.py:189] dense_2 (Dense)                 (None, 1)            36          concatenate[0][0]                
I1204 11:33:57.583309 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.583389 139740839778112 layer_utils.py:189] tf.compat.v1.squeeze (TFOpLambd (None,)              0           dense_2[0][0]                    
I1204 11:33:57.583432 139740839778112 layer_utils.py:256] ==================================================================================================
I1204 11:33:57.583687 139740839778112 layer_utils.py:267] Total params: 40
I1204 11:33:57.583751 139740839778112 layer_utils.py:268] Trainable params: 40
I1204 11:33:57.583794 139740839778112 layer_utils.py:269] Non-trainable params: 0
I1204 11:33:57.583832 139740839778112 layer_utils.py:270] __________________________________________________________________________________________________
I1204 11:33:57.649701 139740839778112 layer_utils.py:191] Model: "model_1"
I1204 11:33:57.649825 139740839778112 layer_utils.py:192] __________________________________________________________________________________________________
I1204 11:33:57.649878 139740839778112 layer_utils.py:189] Layer (type)                    Output Shape         Param #     Connected to                     
I1204 11:33:57.649932 139740839778112 layer_utils.py:194] ==================================================================================================
I1204 11:33:57.650066 139740839778112 layer_utils.py:189] pickup_latitude_xf (InputLayer) [(None,)]            0                                            
I1204 11:33:57.650120 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.650207 139740839778112 layer_utils.py:189] trip_miles_xf (InputLayer)      [(None,)]            0                                            
I1204 11:33:57.650259 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.650356 139740839778112 layer_utils.py:189] trip_start_hour_xf (InputLayer) [(None,)]            0                                            
I1204 11:33:57.650398 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.650552 139740839778112 layer_utils.py:189] dense_features_2 (DenseFeatures (None, 1)            0           pickup_latitude_xf[0][0]         
I1204 11:33:57.650603 139740839778112 layer_utils.py:189]                                                                  trip_miles_xf[0][0]              
I1204 11:33:57.650644 139740839778112 layer_utils.py:189]                                                                  trip_start_hour_xf[0][0]         
I1204 11:33:57.650682 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.650812 139740839778112 layer_utils.py:189] dense_3 (Dense)                 (None, 1)            2           dense_features_2[0][0]           
I1204 11:33:57.650864 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.651007 139740839778112 layer_utils.py:189] dense_features_3 (DenseFeatures (None, 34)           0           pickup_latitude_xf[0][0]         
I1204 11:33:57.651061 139740839778112 layer_utils.py:189]                                                                  trip_miles_xf[0][0]              
I1204 11:33:57.651102 139740839778112 layer_utils.py:189]                                                                  trip_start_hour_xf[0][0]         
I1204 11:33:57.651146 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.651229 139740839778112 layer_utils.py:189] concatenate_1 (Concatenate)     (None, 35)           0           dense_3[0][0]                    
I1204 11:33:57.651274 139740839778112 layer_utils.py:189]                                                                  dense_features_3[0][0]           
I1204 11:33:57.651311 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.651462 139740839778112 layer_utils.py:189] dense_4 (Dense)                 (None, 1)            36          concatenate_1[0][0]              
I1204 11:33:57.651547 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.651632 139740839778112 layer_utils.py:189] tf.compat.v1.squeeze_1 (TFOpLam (None,)              0           dense_4[0][0]                    
I1204 11:33:57.651675 139740839778112 layer_utils.py:256] ==================================================================================================
I1204 11:33:57.651959 139740839778112 layer_utils.py:267] Total params: 38
I1204 11:33:57.652019 139740839778112 layer_utils.py:268] Trainable params: 38
I1204 11:33:57.652061 139740839778112 layer_utils.py:269] Non-trainable params: 0
I1204 11:33:57.652098 139740839778112 layer_utils.py:270] __________________________________________________________________________________________________
INFO:tensorflow:time(__main__.ModelTest.testBuildKerasModel): 0.84s
I1204 11:33:57.652639 139740839778112 test_util.py:2076] time(__main__.ModelTest.testBuildKerasModel): 0.84s
[       OK ] ModelTest.testBuildKerasModel
[ RUN      ] ModelTest.test_session
[  SKIPPED ] ModelTest.test_session
----------------------------------------------------------------------
Ran 2 tests in 0.836s

OK (skipped=1)

Étape 4. Exécutez votre premier pipeline TFX

Les composants du pipeline TFX généreront des sorties pour chaque course comme artefacts ML métadonnées , et ils doivent être stockés quelque part. Vous pouvez utiliser n'importe quel stockage auquel le cluster KFP peut accéder, et pour cet exemple, nous utiliserons Google Cloud Storage (GCS). Un bucket GCS par défaut aurait dû être créé automatiquement. Son nom sera <your-project-id>-kubeflowpipelines-default .

Importons nos exemples de données dans le bucket GCS afin de pouvoir les utiliser ultérieurement dans notre pipeline.

gsutil cp data/data.csv gs://{GOOGLE_CLOUD_PROJECT}-kubeflowpipelines-default/tfx-template/data/taxi/data.csv
BucketNotFoundException: 404 gs://tf-benchmark-dashboard-kubeflowpipelines-default bucket does not exist.

Créons un pipeline de TFX en utilisant le tfx pipeline create commande.

!tfx pipeline create  --pipeline-path=kubeflow_runner.py --endpoint={ENDPOINT} \
--build-image
CLI
Usage: tfx pipeline create [OPTIONS]
Try 'tfx pipeline create --help' for help.

Error: no such option: --build-image

Lors de la création d' un pipeline, Dockerfile sera généré pour construire une image Docker. N'oubliez pas de l'ajouter au système de contrôle de source (par exemple, git) avec d'autres fichiers source.

Maintenant commencer une exécution courir avec le pipeline nouvellement créé à l' aide de la tfx run create commande.

tfx run create --pipeline-name={PIPELINE_NAME} --endpoint={ENDPOINT}
CLI
Creating a run for pipeline: my_pipeline
Detected Beam.
[WARNING] Default engine will be changed to "local" in the near future.
Use --engine flag if you intend to use a different orchestrator.
Pipeline "my_pipeline" does not exist.

Vous pouvez également exécuter le pipeline dans le tableau de bord KFP. La nouvelle exécution sera répertoriée sous Tests dans le tableau de bord KFP. Cliquer sur l'expérience vous permettra de surveiller les progrès et de visualiser les artefacts créés pendant l'exécution.

Cependant, nous vous recommandons de visiter le tableau de bord KFP. Vous pouvez accéder au tableau de bord KFP à partir du menu Cloud AI Platform Pipelines dans Google Cloud Console. Une fois que vous aurez visité le tableau de bord, vous pourrez trouver le pipeline et accéder à une multitude d'informations sur le pipeline. Par exemple, vous pouvez trouver vos courses sous le menu expériences, et lorsque vous ouvrez votre course d'exécution sous expériences que vous pouvez trouver tous vos objets de pipeline sous menu Artefacts.

Les problèmes liés aux autorisations sont l'une des principales sources d'échec. Veuillez vous assurer que votre cluster KFP dispose des autorisations nécessaires pour accéder aux API Google Cloud. Cela peut être configuré lorsque vous créez un cluster KFP dans GCP , ou voir le document Dépannage GCP .

Étape 5. Ajoutez des composants pour la validation des données.

Dans cette étape, vous allez ajouter des composants pour la validation des données , y compris StatisticsGen , SchemaGen et ExampleValidator . Si vous êtes intéressé par la validation des données, s'il vous plaît voir Commencez avec tensorflow la validation des données .

Double-cliquez sur le répertoire de changement de pipeline et double-cliquez à nouveau pour ouvrir pipeline.py . Trouvez et décommenter les 3 lignes qui ajoutent StatisticsGen , SchemaGen et ExampleValidator à la canalisation. (Astuce: recherche commentaires contenant TODO(step 5): ). Assurez - vous d'enregistrer pipeline.py après l' avoir modifié.

Vous devez maintenant mettre à jour le pipeline existant avec la définition de pipeline modifiée. Utilisez la tfx pipeline update à tfx run create tfx pipeline update commande pour mettre à jour votre pipeline, suivi de la tfx run create commande pour créer une nouvelle course d'exécution de votre pipeline mis à jour.

# Update the pipeline
!tfx pipeline update \
--pipeline-path=kubeflow_runner.py \
--endpoint={ENDPOINT}
# You can run the pipeline the same way.
!tfx run create --pipeline-name {PIPELINE_NAME} --endpoint={ENDPOINT}
CLI
Updating pipeline
Detected Beam.
[WARNING] Default engine will be changed to "local" in the near future.
Use --engine flag if you intend to use a different orchestrator.
beam runner not found in dsl.
CLI
Creating a run for pipeline: my_pipeline
Detected Beam.
[WARNING] Default engine will be changed to "local" in the near future.
Use --engine flag if you intend to use a different orchestrator.
Pipeline "my_pipeline" does not exist.

Vérifier les sorties du pipeline

Visitez le tableau de bord KFP pour trouver les sorties de pipeline dans la page de votre exécution de pipeline. Cliquez sur l'onglet Expériences sur la gauche, et toutes les pistes dans la page expériences. Vous devriez pouvoir trouver la dernière exécution sous le nom de votre pipeline.

Étape 6. Ajoutez des composants pour la formation.

Dans cette étape, vous allez ajouter des composants pour la formation et la validation des modèles , y compris Transform , Trainer , Resolver , Evaluator et Pusher .

Double-cliquez pour ouvrir pipeline.py . Trouvez et décommenter les 5 lignes qui ajoutent Transform , Trainer , Resolver , Evaluator et Pusher à la canalisation. (Astuce: recherche TODO(step 6): )

Comme vous l'avez fait auparavant, vous devez maintenant mettre à jour le pipeline existant avec la définition de pipeline modifiée. Les instructions sont les mêmes que l' étape 5. Mettre à jour le pipeline à l' aide tfx pipeline update à tfx run create tfx pipeline update , et de créer une course d'exécution à l' aide tfx run create .

!tfx pipeline update \
--pipeline-path=kubeflow_runner.py \
--endpoint={ENDPOINT}
!tfx run create --pipeline-name {PIPELINE_NAME} --endpoint={ENDPOINT}
CLI
Updating pipeline
Detected Beam.
[WARNING] Default engine will be changed to "local" in the near future.
Use --engine flag if you intend to use a different orchestrator.
beam runner not found in dsl.
CLI
Creating a run for pipeline: my_pipeline
Detected Beam.
[WARNING] Default engine will be changed to "local" in the near future.
Use --engine flag if you intend to use a different orchestrator.
Pipeline "my_pipeline" does not exist.

Lorsque cette exécution se termine avec succès, vous avez maintenant créé et exécuté votre premier pipeline TFX dans AI Platform Pipelines !

Étape 7. (Facultatif) Essayez BigQueryExampleGen

BigQuery est un serveur, hautement évolutive et l' entrepôt de données de cloud rentable. BigQuery peut être utilisé comme source d'exemples d'entraînement dans TFX. Dans cette étape, nous allons ajouter BigQueryExampleGen au pipeline.

Double-cliquez pour ouvrir pipeline.py . Commentez CsvExampleGen et décommenter la ligne qui crée une instance de BigQueryExampleGen . Vous devez également décommenter la query argument de la create_pipeline fonction.

Nous devons préciser que le projet GCP à utiliser pour BigQuery, et cela se fait par la mise en --project dans beam_pipeline_args lors de la création d' un pipeline.

Double-cliquez pour ouvrir configs.py . Décommentez la définition de GOOGLE_CLOUD_REGION , BIG_QUERY_WITH_DIRECT_RUNNER_BEAM_PIPELINE_ARGS et BIG_QUERY_QUERY . Vous devez remplacer la valeur de région dans ce fichier par les valeurs correctes pour votre projet GCP.

Changez de répertoire au niveau supérieur. Cliquez sur le nom du répertoire au-dessus de la liste des fichiers. Le nom du répertoire est le nom du pipe - line qui est my_pipeline si vous n'avez pas changé.

Double-cliquez pour ouvrir kubeflow_runner.py . Décommentez deux arguments, query et beam_pipeline_args , pour la create_pipeline fonction.

Le pipeline est maintenant prêt à utiliser BigQuery comme exemple de source. Mettez à jour le pipeline comme auparavant et créez une nouvelle exécution comme nous l'avons fait aux étapes 5 et 6.

!tfx pipeline update \
--pipeline-path=kubeflow_runner.py \
--endpoint={ENDPOINT}
!tfx run create --pipeline-name {PIPELINE_NAME} --endpoint={ENDPOINT}
CLI
Updating pipeline
Detected Beam.
[WARNING] Default engine will be changed to "local" in the near future.
Use --engine flag if you intend to use a different orchestrator.
beam runner not found in dsl.
CLI
Creating a run for pipeline: my_pipeline
Detected Beam.
[WARNING] Default engine will be changed to "local" in the near future.
Use --engine flag if you intend to use a different orchestrator.
Pipeline "my_pipeline" does not exist.

Étape 8. (Facultatif) Essayez avec Dataflow KFP

Plusieurs composants TFX utilise Apache faisceau pour mettre en œuvre des pipelines parallèles de données, et cela signifie que vous pouvez distribuer les charges de travail de traitement de données en utilisant Google Cloud Dataflow . Dans cette étape, nous allons configurer l'orchestrateur Kubeflow pour qu'il utilise le flux de données comme back-end de traitement des données pour Apache Beam.

Double-cliquez sur pipeline dans le répertoire de changement, et double-cliquez pour ouvrir configs.py . Décommentez la définition de GOOGLE_CLOUD_REGION et DATAFLOW_BEAM_PIPELINE_ARGS .

Changez de répertoire au niveau supérieur. Cliquez sur le nom du répertoire au-dessus de la liste des fichiers. Le nom du répertoire est le nom du pipe - line qui est my_pipeline si vous n'avez pas changé.

Double-cliquez pour ouvrir kubeflow_runner.py . Décommentez beam_pipeline_args . (Assurez - vous également de commenter en cours beam_pipeline_args que vous avez ajouté à l' étape 7.)

Le pipeline est maintenant prêt à utiliser Dataflow. Mettez à jour le pipeline et créez une exécution comme nous l'avons fait aux étapes 5 et 6.

!tfx pipeline update \
--pipeline-path=kubeflow_runner.py \
--endpoint={ENDPOINT}
!tfx run create --pipeline-name {PIPELINE_NAME} --endpoint={ENDPOINT}
CLI
Updating pipeline
Detected Beam.
[WARNING] Default engine will be changed to "local" in the near future.
Use --engine flag if you intend to use a different orchestrator.
beam runner not found in dsl.
CLI
Creating a run for pipeline: my_pipeline
Detected Beam.
[WARNING] Default engine will be changed to "local" in the near future.
Use --engine flag if you intend to use a different orchestrator.
Pipeline "my_pipeline" does not exist.

Vous pouvez trouver votre emploi dans Dataflow Dataflow dans la Console Cloud .

Étape 9. (Facultatif) Essayez AI - Cloud Platform Formation et prévision avec KFP

TFX avec plusieurs services Interopérabilité GCP gérés, tels que Nuage AI Plate - forme pour la formation et la prévision . Vous pouvez configurer votre Trainer composant à utiliser la plate - forme de formation AI - Cloud, un service géré pour la formation de modèles ML. De plus, lorsque votre modèle est construit et prêt à être servi, vous pouvez pousser votre modèle de prévision AI - Cloud Platform pour le service. Dans cette étape, nous allons mettre notre Trainer et Pusher composant à utiliser les services cloud AI Platform.

Avant l' édition de fichiers, vous pouvez d' abord activer AI plate - forme de formation et API de prédiction.

Double-cliquez sur pipeline dans le répertoire de changement, et double-cliquez pour ouvrir configs.py . Décommentez la définition de GOOGLE_CLOUD_REGION , GCP_AI_PLATFORM_TRAINING_ARGS et GCP_AI_PLATFORM_SERVING_ARGS . Nous utiliserons notre image conteneur construit sur mesure pour former un modèle en nuage AI Plate - forme de formation, nous devons donc mettre masterConfig.imageUri en GCP_AI_PLATFORM_TRAINING_ARGS à la même valeur que CUSTOM_TFX_IMAGE ci - dessus.

Modifier le répertoire d' un niveau, et double-cliquez pour ouvrir kubeflow_runner.py . Décommentez ai_platform_training_args et ai_platform_serving_args .

Mettez à jour le pipeline et créez une exécution comme nous l'avons fait aux étapes 5 et 6.

!tfx pipeline update \
--pipeline-path=kubeflow_runner.py \
--endpoint={ENDPOINT}
!tfx run create --pipeline-name {PIPELINE_NAME} --endpoint={ENDPOINT}
CLI
Updating pipeline
Detected Beam.
[WARNING] Default engine will be changed to "local" in the near future.
Use --engine flag if you intend to use a different orchestrator.
beam runner not found in dsl.
CLI
Creating a run for pipeline: my_pipeline
Detected Beam.
[WARNING] Default engine will be changed to "local" in the near future.
Use --engine flag if you intend to use a different orchestrator.
Pipeline "my_pipeline" does not exist.

Vous pouvez trouver votre emploi de formation en cloud AI Plate - forme d' emploi . Si votre pipeline terminé avec succès, vous pouvez trouver votre modèle dans les modèles Nuage AI Plate - forme .

Étape 10. Ingérez VOS données dans le pipeline

Nous avons créé un pipeline pour un modèle à l'aide de l'ensemble de données Chicago Taxi. Il est maintenant temps de mettre vos données dans le pipeline.

Vos données peuvent être stockées partout où votre pipeline peut accéder, y compris GCS ou BigQuery. Vous devrez modifier la définition du pipeline pour accéder à vos données.

  1. Si vos données sont stockées dans des fichiers, modifier le DATA_PATH dans kubeflow_runner.py ou local_runner.py et le mettre à l'emplacement de vos fichiers. Si vos données sont stockées dans BigQuery, modifier BIG_QUERY_QUERY en pipeline/configs.py pour interroger correctement vos données.
  2. Ajouter des fonctionnalités dans les models/features.py .
  3. Modifier les models/preprocessing.py pour transformer les données d'entrée pour la formation .
  4. Modifier les models/keras/model.py et models/keras/constants.py pour décrire votre modèle ML .
    • Vous pouvez également utiliser un modèle basé sur un estimateur. Changement RUN_FN constant models.estimator.model.run_fn dans pipeline/configs.py .

S'il vous plaît voir Guide élément formateur pour plus d' introduction.

Nettoyer

Pour nettoyer toutes les ressources Google Cloud utilisés dans ce projet, vous pouvez supprimer le projet Google Cloud vous avez utilisé pour le tutoriel.

Alternativement, vous pouvez nettoyer des ressources individuelles en visitant chaque console :