Crea una pipeline TFX utilizzando i modelli

introduzione

Questo documento fornisce le istruzioni per creare un tensorflow Extended (TFX) gasdotto utilizzando i modelli che vengono forniti con il pacchetto TFX Python. Molte delle istruzioni sono comandi della shell Linux, che verranno eseguiti su un'istanza AI Platform Notebooks. Celle corrispondenti codici Notebook Jupyter che invocano i comandi utilizzando ! sono forniti.

Si costruirà un oleodotto utilizzando taxi Viaggi set di dati rilasciato dal Comune di Chicago. Ti consigliamo vivamente di provare a creare la tua pipeline utilizzando il tuo set di dati utilizzando questa pipeline come linea di base.

Passaggio 1. Configura il tuo ambiente.

AI Platform Pipelines preparerà un ambiente di sviluppo per costruire una pipeline e un cluster Kubeflow Pipeline per eseguire la pipeline di nuova costruzione.

Installare tfx pacchetto python con kfp requisito supplementare.

import sys
# Use the latest version of pip.
!pip install --upgrade pip
# Install tfx and kfp Python packages.
!pip install --upgrade "tfx[kfp]<2"

Controlliamo le versioni di TFX.

python3 -c "from tfx import version ; print('TFX version: {}'.format(version.__version__))"
TFX version: 0.29.0

Nel AI Piattaforma Pipelines, TFX è in esecuzione in un ambiente hosted kubernetes utilizzando Kubeflow Condotte .

Impostiamo alcune variabili di ambiente per utilizzare Kubeflow Pipelines.

Innanzitutto, ottieni il tuo ID progetto GCP.

# Read GCP project id from env.
shell_output=!gcloud config list --format 'value(core.project)' 2>/dev/null
GOOGLE_CLOUD_PROJECT=shell_output[0]
%env GOOGLE_CLOUD_PROJECT={GOOGLE_CLOUD_PROJECT}
print("GCP project ID:" + GOOGLE_CLOUD_PROJECT)
env: GOOGLE_CLOUD_PROJECT=tf-benchmark-dashboard
GCP project ID:tf-benchmark-dashboard

Dobbiamo anche accedere al tuo cluster KFP. Puoi accedervi nella tua Google Cloud Console nel menu "Piattaforma AI > Pipeline". L'"endpoint" del cluster KFP può essere trovato dall'URL del dashboard Pipelines oppure dall'URL della pagina Guida introduttiva in cui è stato avviato questo notebook. Creiamo un ENDPOINT variabile d'ambiente e impostarlo al cluster endpoint KFP. ENDPOINT deve contenere solo la parte del nome host dell'URL. Ad esempio, se l'URL del cruscotto KFP è <a href="https://1e9deb537390ca22-dot-asia-east1.pipelines.googleusercontent.com/#/start">https://1e9deb537390ca22-dot-asia-east1.pipelines.googleusercontent.com/#/start</a> , valore ENDPOINT diventa 1e9deb537390ca22-dot-asia-east1.pipelines.googleusercontent.com .

# This refers to the KFP cluster endpoint
ENDPOINT='' # Enter your ENDPOINT here.
if not ENDPOINT:
    from absl import logging
    logging.error('Set your ENDPOINT in this cell.')
ERROR:absl:Set your ENDPOINT in this cell.

Impostare il nome dell'immagine come tfx-pipeline nell'ambito del progetto GCP corrente.

# Docker image name for the pipeline image.
CUSTOM_TFX_IMAGE='gcr.io/' + GOOGLE_CLOUD_PROJECT + '/tfx-pipeline'

Ed è fatto. Siamo pronti per creare una pipeline.

Passaggio 2. Copia il modello predefinito nella directory del progetto.

In questo passaggio, creeremo una directory di progetto della pipeline funzionante e file copiando file aggiuntivi da un modello predefinito.

Si può dare la pipeline un nome diverso cambiando il PIPELINE_NAME di seguito. Questo diventerà anche il nome della directory del progetto in cui verranno inseriti i tuoi file.

PIPELINE_NAME="my_pipeline"
import os
PROJECT_DIR=os.path.join(os.path.expanduser("~"),"imported",PIPELINE_NAME)

TFX include il taxi modello con il pacchetto TFX pitone. Se stai pianificando di risolvere un problema di previsione puntuale, inclusa la classificazione e la regressione, questo modello potrebbe essere utilizzato come punto di partenza.

I tfx template copy copie comando CLI predefinite file di template nella directory del progetto.

!tfx template copy \
  --pipeline-name={PIPELINE_NAME} \
  --destination-path={PROJECT_DIR} \
  --model=taxi
CLI
Copying taxi pipeline template
kubeflow_runner.py -> /home/kbuilder/imported/my_pipeline/kubeflow_runner.py
kubeflow_v2_dag_runner.py -> /home/kbuilder/imported/my_pipeline/kubeflow_v2_dag_runner.py
features_test.py -> /home/kbuilder/imported/my_pipeline/models/features_test.py
model_test.py -> /home/kbuilder/imported/my_pipeline/models/estimator/model_test.py
constants.py -> /home/kbuilder/imported/my_pipeline/models/estimator/constants.py
model.py -> /home/kbuilder/imported/my_pipeline/models/estimator/model.py
__init__.py -> /home/kbuilder/imported/my_pipeline/models/estimator/__init__.py
model_test.py -> /home/kbuilder/imported/my_pipeline/models/keras/model_test.py
constants.py -> /home/kbuilder/imported/my_pipeline/models/keras/constants.py
model.py -> /home/kbuilder/imported/my_pipeline/models/keras/model.py
__init__.py -> /home/kbuilder/imported/my_pipeline/models/keras/__init__.py
preprocessing_test.py -> /home/kbuilder/imported/my_pipeline/models/preprocessing_test.py
preprocessing.py -> /home/kbuilder/imported/my_pipeline/models/preprocessing.py
__init__.py -> /home/kbuilder/imported/my_pipeline/models/__init__.py
features.py -> /home/kbuilder/imported/my_pipeline/models/features.py
pipeline.py -> /home/kbuilder/imported/my_pipeline/pipeline/pipeline.py
configs.py -> /home/kbuilder/imported/my_pipeline/pipeline/configs.py
__init__.py -> /home/kbuilder/imported/my_pipeline/pipeline/__init__.py
local_runner.py -> /home/kbuilder/imported/my_pipeline/local_runner.py
model_analysis.ipynb -> /home/kbuilder/imported/my_pipeline/model_analysis.ipynb
__init__.py -> /home/kbuilder/imported/my_pipeline/__init__.py
data_validation.ipynb -> /home/kbuilder/imported/my_pipeline/data_validation.ipynb
.gitignore -> /home/kbuilder/imported/my_pipeline/.gitignore
Traceback (most recent call last):
  File "/tmpfs/src/tf_docs_env/bin/tfx", line 8, in <module>
    sys.exit(cli_group())
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 829, in __call__
    return self.main(*args, **kwargs)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 782, in main
    rv = self.invoke(ctx)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 1259, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 1259, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 1066, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 610, in invoke
    return callback(*args, **kwargs)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/decorators.py", line 73, in new_func
    return ctx.invoke(f, obj, *args, **kwargs)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 610, in invoke
    return callback(*args, **kwargs)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tfx/tools/cli/commands/template.py", line 73, in copy
    template_handler.copy_template(ctx.flags_dict)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tfx/tools/cli/handler/template_handler.py", line 185, in copy_template
    fileio.copy(src_path, dst_path)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tfx/dsl/io/fileio.py", line 51, in copy
    src_fs.copy(src, dst, overwrite=overwrite)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tfx/dsl/io/plugins/tensorflow_gfile.py", line 48, in copy
    tf.io.gfile.copy(src, dst, overwrite=overwrite)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow/python/lib/io/file_io.py", line 516, in copy_v2
    compat.path_to_bytes(src), compat.path_to_bytes(dst), overwrite)
tensorflow.python.framework.errors_impl.AlreadyExistsError: file already exists

Modificare il contesto della directory di lavoro in questo notebook nella directory del progetto.

%cd {PROJECT_DIR}
/home/kbuilder/imported/my_pipeline

Passaggio 3. Sfoglia i file di origine copiati

Il modello TFX fornisce file di scaffold di base per creare una pipeline, inclusi codice sorgente Python, dati di esempio e Jupyter Notebook per analizzare l'output della pipeline. Il taxi modello utilizza lo stesso insieme di dati di Chicago di taxi e il modello ML come il flusso d'aria Tutorial .

Ecco una breve introduzione a ciascuno dei file Python.

  • pipeline - Questa directory contiene la definizione del gasdotto
    • configs.py - definisce costanti comuni per i corridori gasdotti
    • pipeline.py - definisce componenti TFX e un gasdotto
  • models - Questa directory contiene le definizioni ML modello.
    • features.py , features_test.py - definisce caratteristiche per il modello
    • preprocessing.py , preprocessing_test.py - definisce pre-elaborazione lavori utilizzando tf::Transform
    • estimator - Questa directory contiene un modello basato estimatore.
      • constants.py - definisce le costanti del modello
      • model.py , model_test.py - definisce modello di DNN utilizzando stimatore TF
    • keras - Questa directory contiene un modello basato Keras.
      • constants.py - definisce le costanti del modello
      • model.py , model_test.py - definisce modello di DNN utilizzando Keras
  • local_runner.py , kubeflow_runner.py - definire ripiani per ogni motore di orchestrazione

Si potrebbe notare che ci sono alcuni file con _test.py nel loro nome. Si tratta di unit test della pipeline e si consiglia di aggiungere altri unit test man mano che si implementano le proprie pipeline. È possibile eseguire unit test fornendo il nome del modulo di file di test con -m bandiera. Di solito è possibile ottenere un nome di modulo cancellando .py estensione e la sostituzione / con . . Per esempio:

{sys.executable} -m models.features_test
{sys.executable} -m models.keras.model_test
Running tests under Python 3.7.5: /tmpfs/src/tf_docs_env/bin/python
[ RUN      ] FeaturesTest.testNumberOfBucketFeatureBucketCount
INFO:tensorflow:time(__main__.FeaturesTest.testNumberOfBucketFeatureBucketCount): 0.0s
I1204 11:33:54.064224 139808961349440 test_util.py:2076] time(__main__.FeaturesTest.testNumberOfBucketFeatureBucketCount): 0.0s
[       OK ] FeaturesTest.testNumberOfBucketFeatureBucketCount
[ RUN      ] FeaturesTest.testTransformedNames
INFO:tensorflow:time(__main__.FeaturesTest.testTransformedNames): 0.0s
I1204 11:33:54.064666 139808961349440 test_util.py:2076] time(__main__.FeaturesTest.testTransformedNames): 0.0s
[       OK ] FeaturesTest.testTransformedNames
[ RUN      ] FeaturesTest.test_session
[  SKIPPED ] FeaturesTest.test_session
----------------------------------------------------------------------
Ran 3 tests in 0.001s

OK (skipped=1)
Running tests under Python 3.7.5: /tmpfs/src/tf_docs_env/bin/python
[ RUN      ] ModelTest.testBuildKerasModel
2021-12-04 11:33:57.507456: W tensorflow/stream_executor/platform/default/dso_loader.cc:60] Could not load dynamic library 'libcusolver.so.10'; dlerror: libcusolver.so.10: cannot open shared object file: No such file or directory
2021-12-04 11:33:57.508566: W tensorflow/core/common_runtime/gpu/gpu_device.cc:1757] Cannot dlopen some GPU libraries. Please make sure the missing libraries mentioned above are installed properly if you would like to use GPU. Follow the guide at https://www.tensorflow.org/install/gpu for how to download and setup the required libraries for your platform.
Skipping registering GPU devices...
I1204 11:33:57.581331 139740839778112 layer_utils.py:191] Model: "model"
I1204 11:33:57.581501 139740839778112 layer_utils.py:192] __________________________________________________________________________________________________
I1204 11:33:57.581558 139740839778112 layer_utils.py:189] Layer (type)                    Output Shape         Param #     Connected to                     
I1204 11:33:57.581596 139740839778112 layer_utils.py:194] ==================================================================================================
I1204 11:33:57.581741 139740839778112 layer_utils.py:189] pickup_latitude_xf (InputLayer) [(None,)]            0                                            
I1204 11:33:57.581793 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.581883 139740839778112 layer_utils.py:189] trip_miles_xf (InputLayer)      [(None,)]            0                                            
I1204 11:33:57.581926 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.582010 139740839778112 layer_utils.py:189] trip_start_hour_xf (InputLayer) [(None,)]            0                                            
I1204 11:33:57.582052 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.582189 139740839778112 layer_utils.py:189] dense_features (DenseFeatures)  (None, 1)            0           pickup_latitude_xf[0][0]         
I1204 11:33:57.582241 139740839778112 layer_utils.py:189]                                                                  trip_miles_xf[0][0]              
I1204 11:33:57.582280 139740839778112 layer_utils.py:189]                                                                  trip_start_hour_xf[0][0]         
I1204 11:33:57.582315 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.582462 139740839778112 layer_utils.py:189] dense (Dense)                   (None, 1)            2           dense_features[0][0]             
I1204 11:33:57.582518 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.582629 139740839778112 layer_utils.py:189] dense_1 (Dense)                 (None, 1)            2           dense[0][0]                      
I1204 11:33:57.582674 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.582824 139740839778112 layer_utils.py:189] dense_features_1 (DenseFeatures (None, 34)           0           pickup_latitude_xf[0][0]         
I1204 11:33:57.582879 139740839778112 layer_utils.py:189]                                                                  trip_miles_xf[0][0]              
I1204 11:33:57.582921 139740839778112 layer_utils.py:189]                                                                  trip_start_hour_xf[0][0]         
I1204 11:33:57.582957 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.583053 139740839778112 layer_utils.py:189] concatenate (Concatenate)       (None, 35)           0           dense_1[0][0]                    
I1204 11:33:57.583099 139740839778112 layer_utils.py:189]                                                                  dense_features_1[0][0]           
I1204 11:33:57.583143 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.583260 139740839778112 layer_utils.py:189] dense_2 (Dense)                 (None, 1)            36          concatenate[0][0]                
I1204 11:33:57.583309 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.583389 139740839778112 layer_utils.py:189] tf.compat.v1.squeeze (TFOpLambd (None,)              0           dense_2[0][0]                    
I1204 11:33:57.583432 139740839778112 layer_utils.py:256] ==================================================================================================
I1204 11:33:57.583687 139740839778112 layer_utils.py:267] Total params: 40
I1204 11:33:57.583751 139740839778112 layer_utils.py:268] Trainable params: 40
I1204 11:33:57.583794 139740839778112 layer_utils.py:269] Non-trainable params: 0
I1204 11:33:57.583832 139740839778112 layer_utils.py:270] __________________________________________________________________________________________________
I1204 11:33:57.649701 139740839778112 layer_utils.py:191] Model: "model_1"
I1204 11:33:57.649825 139740839778112 layer_utils.py:192] __________________________________________________________________________________________________
I1204 11:33:57.649878 139740839778112 layer_utils.py:189] Layer (type)                    Output Shape         Param #     Connected to                     
I1204 11:33:57.649932 139740839778112 layer_utils.py:194] ==================================================================================================
I1204 11:33:57.650066 139740839778112 layer_utils.py:189] pickup_latitude_xf (InputLayer) [(None,)]            0                                            
I1204 11:33:57.650120 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.650207 139740839778112 layer_utils.py:189] trip_miles_xf (InputLayer)      [(None,)]            0                                            
I1204 11:33:57.650259 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.650356 139740839778112 layer_utils.py:189] trip_start_hour_xf (InputLayer) [(None,)]            0                                            
I1204 11:33:57.650398 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.650552 139740839778112 layer_utils.py:189] dense_features_2 (DenseFeatures (None, 1)            0           pickup_latitude_xf[0][0]         
I1204 11:33:57.650603 139740839778112 layer_utils.py:189]                                                                  trip_miles_xf[0][0]              
I1204 11:33:57.650644 139740839778112 layer_utils.py:189]                                                                  trip_start_hour_xf[0][0]         
I1204 11:33:57.650682 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.650812 139740839778112 layer_utils.py:189] dense_3 (Dense)                 (None, 1)            2           dense_features_2[0][0]           
I1204 11:33:57.650864 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.651007 139740839778112 layer_utils.py:189] dense_features_3 (DenseFeatures (None, 34)           0           pickup_latitude_xf[0][0]         
I1204 11:33:57.651061 139740839778112 layer_utils.py:189]                                                                  trip_miles_xf[0][0]              
I1204 11:33:57.651102 139740839778112 layer_utils.py:189]                                                                  trip_start_hour_xf[0][0]         
I1204 11:33:57.651146 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.651229 139740839778112 layer_utils.py:189] concatenate_1 (Concatenate)     (None, 35)           0           dense_3[0][0]                    
I1204 11:33:57.651274 139740839778112 layer_utils.py:189]                                                                  dense_features_3[0][0]           
I1204 11:33:57.651311 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.651462 139740839778112 layer_utils.py:189] dense_4 (Dense)                 (None, 1)            36          concatenate_1[0][0]              
I1204 11:33:57.651547 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.651632 139740839778112 layer_utils.py:189] tf.compat.v1.squeeze_1 (TFOpLam (None,)              0           dense_4[0][0]                    
I1204 11:33:57.651675 139740839778112 layer_utils.py:256] ==================================================================================================
I1204 11:33:57.651959 139740839778112 layer_utils.py:267] Total params: 38
I1204 11:33:57.652019 139740839778112 layer_utils.py:268] Trainable params: 38
I1204 11:33:57.652061 139740839778112 layer_utils.py:269] Non-trainable params: 0
I1204 11:33:57.652098 139740839778112 layer_utils.py:270] __________________________________________________________________________________________________
INFO:tensorflow:time(__main__.ModelTest.testBuildKerasModel): 0.84s
I1204 11:33:57.652639 139740839778112 test_util.py:2076] time(__main__.ModelTest.testBuildKerasModel): 0.84s
[       OK ] ModelTest.testBuildKerasModel
[ RUN      ] ModelTest.test_session
[  SKIPPED ] ModelTest.test_session
----------------------------------------------------------------------
Ran 2 tests in 0.836s

OK (skipped=1)

Passaggio 4. Esegui la tua prima pipeline TFX

I componenti in cantiere TFX genereranno uscite per ciascun periodo, come artefatti ML metadati , e hanno bisogno di essere memorizzati da qualche parte. Puoi utilizzare qualsiasi spazio di archiviazione a cui può accedere il cluster KFP e per questo esempio utilizzeremo Google Cloud Storage (GCS). Un bucket GCS predefinito dovrebbe essere stato creato automaticamente. Il suo nome sarà <your-project-id>-kubeflowpipelines-default .

Carichiamo i nostri dati di esempio nel bucket GCS in modo da poterli utilizzare nella nostra pipeline in un secondo momento.

gsutil cp data/data.csv gs://{GOOGLE_CLOUD_PROJECT}-kubeflowpipelines-default/tfx-template/data/taxi/data.csv
BucketNotFoundException: 404 gs://tf-benchmark-dashboard-kubeflowpipelines-default bucket does not exist.

Creiamo un oleodotto TFX utilizzando il tfx pipeline create comando.

!tfx pipeline create  --pipeline-path=kubeflow_runner.py --endpoint={ENDPOINT} \
--build-image
CLI
Usage: tfx pipeline create [OPTIONS]
Try 'tfx pipeline create --help' for help.

Error: no such option: --build-image

Durante la creazione di una pipeline, Dockerfile verrà generato per costruire un'immagine Docker. Non dimenticare di aggiungerlo al sistema di controllo del codice sorgente (ad esempio git) insieme ad altri file sorgente.

Ora inizia un'esecuzione correre con il gasdotto appena creato utilizzando la tfx run create comando.

tfx run create --pipeline-name={PIPELINE_NAME} --endpoint={ENDPOINT}
CLI
Creating a run for pipeline: my_pipeline
Detected Beam.
[WARNING] Default engine will be changed to "local" in the near future.
Use --engine flag if you intend to use a different orchestrator.
Pipeline "my_pipeline" does not exist.

Oppure puoi anche eseguire la pipeline nel dashboard di KFP. La nuova esecuzione verrà elencata in Esperimenti nella dashboard di KFP. Facendo clic sull'esperimento sarà possibile monitorare l'avanzamento e visualizzare gli artefatti creati durante l'esecuzione.

Tuttavia, ti consigliamo di visitare il dashboard di KFP. Puoi accedere al dashboard di KFP dal menu Cloud AI Platform Pipelines in Google Cloud Console. Una volta visitata la dashboard, sarai in grado di trovare la pipeline e accedere a una vasta gamma di informazioni sulla pipeline. Ad esempio, è possibile trovare le tue corse sotto il menu esperimenti, e quando si apre la corsa di esecuzione sotto esperimenti si possono trovare tutti i manufatti del gasdotto sotto il menu Artefatti.

Una delle principali fonti di errore sono i problemi relativi ai permessi. Assicurati che il tuo cluster KFP disponga delle autorizzazioni per accedere alle API di Google Cloud. Questo può essere configurato quando si crea un cluster KFP in GCP , o vedere il documento Risoluzione dei problemi in GCP .

Passaggio 5. Aggiungi componenti per la convalida dei dati.

In questa fase, si aggiungerà componenti per la convalida dei dati tra cui StatisticsGen , SchemaGen e ExampleValidator . Se siete interessati a convalida dei dati, si prega di consultare Inizia con tensorflow Data Validation .

Fare doppio clic per cambiare la directory di pipeline e fare doppio clic di nuovo per aprire pipeline.py . Trovare e rimuovere il commento le 3 linee che aggiungono StatisticsGen , SchemaGen , e ExampleValidator alla pipeline. (TIP: Ricerca per i commenti che contengono TODO(step 5): ). Assicurati di salvare pipeline.py dopo la modifica.

È ora necessario aggiornare la pipeline esistente con la definizione della pipeline modificata. Utilizzare il tfx pipeline update comando per aggiornare il gasdotto, seguita dalla tfx run create comando per creare una nuova corsa esecuzione del gasdotto aggiornato.

# Update the pipeline
!tfx pipeline update \
--pipeline-path=kubeflow_runner.py \
--endpoint={ENDPOINT}
# You can run the pipeline the same way.
!tfx run create --pipeline-name {PIPELINE_NAME} --endpoint={ENDPOINT}
CLI
Updating pipeline
Detected Beam.
[WARNING] Default engine will be changed to "local" in the near future.
Use --engine flag if you intend to use a different orchestrator.
beam runner not found in dsl.
CLI
Creating a run for pipeline: my_pipeline
Detected Beam.
[WARNING] Default engine will be changed to "local" in the near future.
Use --engine flag if you intend to use a different orchestrator.
Pipeline "my_pipeline" does not exist.

Controllare gli output della pipeline

Visita la dashboard di KFP per trovare gli output della pipeline nella pagina per l'esecuzione della pipeline. Fare clic sulla scheda Esperimenti sulla sinistra, e Tutte le piste nella pagina Esperimenti. Dovresti essere in grado di trovare l'ultima esecuzione sotto il nome della pipeline.

Passaggio 6. Aggiungi componenti per la formazione.

In questa fase, si aggiungerà componenti per la formazione e la validazione dei modelli tra cui Transform , Trainer , Resolver , Evaluator , e Pusher .

Fare doppio clic per aprire pipeline.py . Trovare e rimuovere il commento le 5 linee che aggiungono Transform , Trainer , Resolver , Evaluator e Pusher alla pipeline. (Suggerimento: ricerca di TODO(step 6): )

Come in precedenza, ora è necessario aggiornare la pipeline esistente con la definizione della pipeline modificata. Le istruzioni sono le stesse di Fase 5. Aggiornare il gasdotto utilizzando tfx pipeline update , e creare una corsa di esecuzione utilizzando tfx run create .

!tfx pipeline update \
--pipeline-path=kubeflow_runner.py \
--endpoint={ENDPOINT}
!tfx run create --pipeline-name {PIPELINE_NAME} --endpoint={ENDPOINT}
CLI
Updating pipeline
Detected Beam.
[WARNING] Default engine will be changed to "local" in the near future.
Use --engine flag if you intend to use a different orchestrator.
beam runner not found in dsl.
CLI
Creating a run for pipeline: my_pipeline
Detected Beam.
[WARNING] Default engine will be changed to "local" in the near future.
Use --engine flag if you intend to use a different orchestrator.
Pipeline "my_pipeline" does not exist.

Quando questa esecuzione viene completata correttamente, ora hai creato ed eseguito la tua prima pipeline TFX in AI Platform Pipelines!

Passo 7. (opzionale) Prova BigQueryExampleGen

BigQuery è un serverless, altamente scalabile e data warehouse di cloud costo-efficacia. BigQuery può essere utilizzato come fonte per esempi di formazione in TFX. In questa fase, aggiungeremo BigQueryExampleGen alla pipeline.

Fare doppio clic per aprire pipeline.py . Commentare CsvExampleGen e rimuovere il commento dalla linea che crea un'istanza di BigQueryExampleGen . È inoltre necessario rimuovere il commento dalla query argomento della create_pipeline funzioni.

Abbiamo bisogno di specificare quale progetto GCP da utilizzare per BigQuery, e questo è fatto impostando --project in beam_pipeline_args durante la creazione di un oleodotto.

Fare doppio clic per aprire configs.py . Rimuovere il commento la definizione di GOOGLE_CLOUD_REGION , BIG_QUERY_WITH_DIRECT_RUNNER_BEAM_PIPELINE_ARGS e BIG_QUERY_QUERY . Dovresti sostituire il valore della regione in questo file con i valori corretti per il tuo progetto GCP.

Cambia directory di un livello superiore. Fare clic sul nome della directory sopra l'elenco dei file. Il nome della directory è il nome del gasdotto, che è my_pipeline se non è stato modificato.

Fare doppio clic per aprire kubeflow_runner.py . Decommentate due argomenti, query e beam_pipeline_args , per la create_pipeline funzione.

Ora la pipeline è pronta per utilizzare BigQuery come origine di esempio. Aggiorna la pipeline come prima e crea una nuova esecuzione come abbiamo fatto nei passaggi 5 e 6.

!tfx pipeline update \
--pipeline-path=kubeflow_runner.py \
--endpoint={ENDPOINT}
!tfx run create --pipeline-name {PIPELINE_NAME} --endpoint={ENDPOINT}
CLI
Updating pipeline
Detected Beam.
[WARNING] Default engine will be changed to "local" in the near future.
Use --engine flag if you intend to use a different orchestrator.
beam runner not found in dsl.
CLI
Creating a run for pipeline: my_pipeline
Detected Beam.
[WARNING] Default engine will be changed to "local" in the near future.
Use --engine flag if you intend to use a different orchestrator.
Pipeline "my_pipeline" does not exist.

Punto 8. (opzionale) Prova Dataflow con KFP

Diversi TFX Components utilizza Apache fascio di implementare pipeline di dati in parallelo, e significa che è possibile distribuire i carichi di lavoro di elaborazione di dati utilizzando Google Cloud Dataflow . In questo passaggio, imposteremo l'orchestratore Kubeflow in modo che utilizzi il flusso di dati come back-end di elaborazione dati per Apache Beam.

Fare doppio clic su pipeline a change directory, e fare doppio clic per aprire configs.py . Rimuovere il commento la definizione di GOOGLE_CLOUD_REGION e DATAFLOW_BEAM_PIPELINE_ARGS .

Cambia directory di un livello superiore. Fare clic sul nome della directory sopra l'elenco dei file. Il nome della directory è il nome del gasdotto, che è my_pipeline se non è stato modificato.

Fare doppio clic per aprire kubeflow_runner.py . Rimuovere il commento beam_pipeline_args . (Assicuratevi anche di commentare attuali beam_pipeline_args aggiunto nel passaggio 7.)

Ora la pipeline è pronta per utilizzare Dataflow. Aggiorna la pipeline e crea un'esecuzione eseguita come abbiamo fatto nei passaggi 5 e 6.

!tfx pipeline update \
--pipeline-path=kubeflow_runner.py \
--endpoint={ENDPOINT}
!tfx run create --pipeline-name {PIPELINE_NAME} --endpoint={ENDPOINT}
CLI
Updating pipeline
Detected Beam.
[WARNING] Default engine will be changed to "local" in the near future.
Use --engine flag if you intend to use a different orchestrator.
beam runner not found in dsl.
CLI
Creating a run for pipeline: my_pipeline
Detected Beam.
[WARNING] Default engine will be changed to "local" in the near future.
Use --engine flag if you intend to use a different orchestrator.
Pipeline "my_pipeline" does not exist.

Si possono trovare i lavori Dataflow a flusso di dati in Cloud Console .

Passaggio 9. (Facoltativo) Prova cloud AI Training Platform e previsione con KFP

Interagisce TFX con diversi servizi gestiti GCP, come AI Cloud Platform per la Formazione e previsione . È possibile impostare il Trainer componente di utilizzare cloud AI piattaforma di formazione, un servizio gestito per la formazione di modelli ML. Inoltre, quando il modello è costruito e pronto per essere servito, si può spingere il vostro modello di Cloud Platform AI predizione per servire. In questa fase, imposteremo il nostro Trainer e Pusher componente di utilizzare i servizi Cloud Platform AI.

Prima di modificare i file, si potrebbe per prima cosa abilitare AI Platform Training & Prediction API.

Fare doppio clic su pipeline a change directory, e fare doppio clic per aprire configs.py . Rimuovere il commento la definizione di GOOGLE_CLOUD_REGION , GCP_AI_PLATFORM_TRAINING_ARGS e GCP_AI_PLATFORM_SERVING_ARGS . Useremo la nostra immagine contenitore su misura per la formazione di un modello in Cloud Platform AI Formazione, quindi dovremmo impostare masterConfig.imageUri in GCP_AI_PLATFORM_TRAINING_ARGS per lo stesso valore di CUSTOM_TFX_IMAGE sopra.

Cambiare la directory di livello superiore, e fare doppio clic per aprire kubeflow_runner.py . Rimuovere il commento ai_platform_training_args e ai_platform_serving_args .

Aggiorna la pipeline e crea un'esecuzione eseguita come abbiamo fatto nei passaggi 5 e 6.

!tfx pipeline update \
--pipeline-path=kubeflow_runner.py \
--endpoint={ENDPOINT}
!tfx run create --pipeline-name {PIPELINE_NAME} --endpoint={ENDPOINT}
CLI
Updating pipeline
Detected Beam.
[WARNING] Default engine will be changed to "local" in the near future.
Use --engine flag if you intend to use a different orchestrator.
beam runner not found in dsl.
CLI
Creating a run for pipeline: my_pipeline
Detected Beam.
[WARNING] Default engine will be changed to "local" in the near future.
Use --engine flag if you intend to use a different orchestrator.
Pipeline "my_pipeline" does not exist.

Si possono trovare i lavori di formazione in cloud AI Platform Jobs . Se il gasdotto completato con successo, è possibile trovare il modello in modelli di cloud piattaforma IA .

Passaggio 10. Importa i TUOI dati nella pipeline

Abbiamo creato una pipeline per un modello utilizzando il dataset Chicago Taxi. Ora è il momento di mettere i tuoi dati nella pipeline.

I tuoi dati possono essere archiviati ovunque possa accedere la tua pipeline, inclusi GCS o BigQuery. Sarà necessario modificare la definizione della pipeline per accedere ai dati.

  1. Se i dati sono memorizzati in file, modificare il DATA_PATH in kubeflow_runner.py o local_runner.py e impostarlo sulla posizione dei file. Se i dati sono memorizzati in BigQuery, modificare BIG_QUERY_QUERY in pipeline/configs.py correttamente query per i dati.
  2. Aggiungi funzionalità in models/features.py .
  3. Modificare models/preprocessing.py per trasformare i dati di input per la formazione .
  4. Modificare models/keras/model.py e models/keras/constants.py per descrivere il vostro modello ML .
    • Puoi anche utilizzare un modello basato su stimatore. Cambiare RUN_FN costante per models.estimator.model.run_fn in pipeline/configs.py .

Si prega di consultare Trainer guida componente per più introduzione.

Pulire

Per pulire tutte le risorse di Google Cloud utilizzate in questo progetto, è possibile eliminare il progetto Google Cloud si è utilizzato per il tutorial.

In alternativa, puoi ripulire le singole risorse visitando ciascuna console: