Cree una canalización TFX usando plantillas

Introducción

Este documento proporciona instrucciones para crear un TensorFlow Plus (TFX) de tuberías utilizando plantillas que se proporcionan con el paquete TFX Python. Muchas de las instrucciones son comandos de shell de Linux, que se ejecutarán en una instancia de AI Platform Notebooks. Correspondientes células de código Notebook Jupyter que invocan estos comandos usando ! están provistos.

Usted va a construir una tubería usando Taxi Viajes conjunto de datos dado a conocer por la ciudad de Chicago. Le recomendamos encarecidamente que intente crear su propia canalización utilizando su conjunto de datos utilizando esta canalización como referencia.

Paso 1. Configure su entorno.

AI Platform Pipelines preparará un entorno de desarrollo para crear una canalización y un clúster de Kubeflow Pipeline para ejecutar la canalización recién creada.

Instalar tfx paquete python con kfp requisito adicional.

import sys
# Use the latest version of pip.
!pip install --upgrade pip
# Install tfx and kfp Python packages.
!pip install --upgrade "tfx[kfp]<2"

Comprobemos las versiones de TFX.

python3 -c "from tfx import version ; print('TFX version: {}'.format(version.__version__))"
TFX version: 0.29.0

En AI Plataforma Tuberías, TFX se ejecuta en un entorno de servidor con Kubernetes Kubeflow tuberías .

Configuremos algunas variables de entorno para usar Kubeflow Pipelines.

Primero, obtenga su ID de proyecto de GCP.

# Read GCP project id from env.
shell_output=!gcloud config list --format 'value(core.project)' 2>/dev/null
GOOGLE_CLOUD_PROJECT=shell_output[0]
%env GOOGLE_CLOUD_PROJECT={GOOGLE_CLOUD_PROJECT}
print("GCP project ID:" + GOOGLE_CLOUD_PROJECT)
env: GOOGLE_CLOUD_PROJECT=tf-benchmark-dashboard
GCP project ID:tf-benchmark-dashboard

También necesitamos acceder a su grupo KFP. Puede acceder a él en su Google Cloud Console en el menú "AI Platform > Pipeline". El "punto final" del clúster de KFP se puede encontrar en la URL del panel de control de Pipelines, o puede obtenerlo en la URL de la página de inicio donde inició este cuaderno. Vamos a crear un ENDPOINT variable de entorno y la pusieron punto final a la agrupación KFP. ENDPOINT debe contener solo la parte del nombre de host de la URL. Por ejemplo, si la URL del salpicadero KFP es <a href="https://1e9deb537390ca22-dot-asia-east1.pipelines.googleusercontent.com/#/start">https://1e9deb537390ca22-dot-asia-east1.pipelines.googleusercontent.com/#/start</a> , el valor se convierte PUNTO FINAL 1e9deb537390ca22-dot-asia-east1.pipelines.googleusercontent.com .

# This refers to the KFP cluster endpoint
ENDPOINT='' # Enter your ENDPOINT here.
if not ENDPOINT:
    from absl import logging
    logging.error('Set your ENDPOINT in this cell.')
ERROR:absl:Set your ENDPOINT in this cell.

Establecer el nombre de la imagen como tfx-pipeline marco del proyecto GCP actual.

# Docker image name for the pipeline image.
CUSTOM_TFX_IMAGE='gcr.io/' + GOOGLE_CLOUD_PROJECT + '/tfx-pipeline'

Y está hecho. Estamos listos para crear una tubería.

Paso 2. Copie la plantilla predefinida en el directorio de su proyecto.

En este paso, crearemos un directorio y archivos de proyecto de canalización de trabajo copiando archivos adicionales de una plantilla predefinida.

Usted puede dar su canal de un nombre diferente cambiando la PIPELINE_NAME a continuación. Este también se convertirá en el nombre del directorio del proyecto donde se colocarán sus archivos.

PIPELINE_NAME="my_pipeline"
import os
PROJECT_DIR=os.path.join(os.path.expanduser("~"),"imported",PIPELINE_NAME)

TFX incluye el taxi plantilla con el paquete TFX pitón. Si planea resolver un problema de predicción puntual, incluida la clasificación y la regresión, esta plantilla podría usarse como punto de partida.

Las tfx template copy de la CLI comando copia los archivos de plantillas predefinidas en el directorio del proyecto.

!tfx template copy \
  --pipeline-name={PIPELINE_NAME} \
  --destination-path={PROJECT_DIR} \
  --model=taxi
CLI
Copying taxi pipeline template
kubeflow_runner.py -> /home/kbuilder/imported/my_pipeline/kubeflow_runner.py
kubeflow_v2_dag_runner.py -> /home/kbuilder/imported/my_pipeline/kubeflow_v2_dag_runner.py
features_test.py -> /home/kbuilder/imported/my_pipeline/models/features_test.py
model_test.py -> /home/kbuilder/imported/my_pipeline/models/estimator/model_test.py
constants.py -> /home/kbuilder/imported/my_pipeline/models/estimator/constants.py
model.py -> /home/kbuilder/imported/my_pipeline/models/estimator/model.py
__init__.py -> /home/kbuilder/imported/my_pipeline/models/estimator/__init__.py
model_test.py -> /home/kbuilder/imported/my_pipeline/models/keras/model_test.py
constants.py -> /home/kbuilder/imported/my_pipeline/models/keras/constants.py
model.py -> /home/kbuilder/imported/my_pipeline/models/keras/model.py
__init__.py -> /home/kbuilder/imported/my_pipeline/models/keras/__init__.py
preprocessing_test.py -> /home/kbuilder/imported/my_pipeline/models/preprocessing_test.py
preprocessing.py -> /home/kbuilder/imported/my_pipeline/models/preprocessing.py
__init__.py -> /home/kbuilder/imported/my_pipeline/models/__init__.py
features.py -> /home/kbuilder/imported/my_pipeline/models/features.py
pipeline.py -> /home/kbuilder/imported/my_pipeline/pipeline/pipeline.py
configs.py -> /home/kbuilder/imported/my_pipeline/pipeline/configs.py
__init__.py -> /home/kbuilder/imported/my_pipeline/pipeline/__init__.py
local_runner.py -> /home/kbuilder/imported/my_pipeline/local_runner.py
model_analysis.ipynb -> /home/kbuilder/imported/my_pipeline/model_analysis.ipynb
__init__.py -> /home/kbuilder/imported/my_pipeline/__init__.py
data_validation.ipynb -> /home/kbuilder/imported/my_pipeline/data_validation.ipynb
.gitignore -> /home/kbuilder/imported/my_pipeline/.gitignore
Traceback (most recent call last):
  File "/tmpfs/src/tf_docs_env/bin/tfx", line 8, in <module>
    sys.exit(cli_group())
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 829, in __call__
    return self.main(*args, **kwargs)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 782, in main
    rv = self.invoke(ctx)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 1259, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 1259, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 1066, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 610, in invoke
    return callback(*args, **kwargs)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/decorators.py", line 73, in new_func
    return ctx.invoke(f, obj, *args, **kwargs)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 610, in invoke
    return callback(*args, **kwargs)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tfx/tools/cli/commands/template.py", line 73, in copy
    template_handler.copy_template(ctx.flags_dict)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tfx/tools/cli/handler/template_handler.py", line 185, in copy_template
    fileio.copy(src_path, dst_path)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tfx/dsl/io/fileio.py", line 51, in copy
    src_fs.copy(src, dst, overwrite=overwrite)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tfx/dsl/io/plugins/tensorflow_gfile.py", line 48, in copy
    tf.io.gfile.copy(src, dst, overwrite=overwrite)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow/python/lib/io/file_io.py", line 516, in copy_v2
    compat.path_to_bytes(src), compat.path_to_bytes(dst), overwrite)
tensorflow.python.framework.errors_impl.AlreadyExistsError: file already exists

Cambie el contexto del directorio de trabajo en este cuaderno al directorio del proyecto.

%cd {PROJECT_DIR}
/home/kbuilder/imported/my_pipeline

Paso 3. Explore sus archivos fuente copiados

La plantilla TFX proporciona archivos de andamio básicos para construir una canalización, incluido el código fuente de Python, datos de muestra y Jupyter Notebooks para analizar la salida de la canalización. El taxi plantilla utiliza el mismo conjunto de datos de Chicago taxi y el modelo ML como el Tutorial de flujo de aire .

Aquí hay una breve introducción a cada uno de los archivos de Python.

  • pipeline - Este directorio contiene la definición de la tubería
    • configs.py - define constantes comunes para los corredores de tuberías
    • pipeline.py - define los componentes TFX y una tubería
  • models - Este directorio contiene definiciones del modelo de ML.
    • features.py , features_test.py - define características para el modelo
    • preprocessing.py , preprocessing_test.py - preprocesamiento define los trabajos que utilizan tf::Transform
    • estimator - Este directorio contiene un modelo basado Estimador.
      • constants.py - define constantes del modelo
      • model.py , model_test.py - define modelo DNN usando estimador TF
    • keras - Este directorio contiene un modelo basado Keras.
      • constants.py - define constantes del modelo
      • model.py , model_test.py - define modelo DNN usando Keras
  • local_runner.py , kubeflow_runner.py - definir los corredores para cada motor de orquestación

Usted puede notar que hay algunos archivos con _test.py en su nombre. Estas son pruebas unitarias de la canalización y se recomienda agregar más pruebas unitarias a medida que implementa sus propias canalizaciones. Puede ejecutar pruebas unitarias suministrando el nombre del módulo de archivos de prueba con -m bandera. Generalmente, usted puede obtener un nombre de módulo mediante la supresión de .py extensión y la sustitución de / con . . Por ejemplo:

{sys.executable} -m models.features_test
{sys.executable} -m models.keras.model_test
Running tests under Python 3.7.5: /tmpfs/src/tf_docs_env/bin/python
[ RUN      ] FeaturesTest.testNumberOfBucketFeatureBucketCount
INFO:tensorflow:time(__main__.FeaturesTest.testNumberOfBucketFeatureBucketCount): 0.0s
I1204 11:33:54.064224 139808961349440 test_util.py:2076] time(__main__.FeaturesTest.testNumberOfBucketFeatureBucketCount): 0.0s
[       OK ] FeaturesTest.testNumberOfBucketFeatureBucketCount
[ RUN      ] FeaturesTest.testTransformedNames
INFO:tensorflow:time(__main__.FeaturesTest.testTransformedNames): 0.0s
I1204 11:33:54.064666 139808961349440 test_util.py:2076] time(__main__.FeaturesTest.testTransformedNames): 0.0s
[       OK ] FeaturesTest.testTransformedNames
[ RUN      ] FeaturesTest.test_session
[  SKIPPED ] FeaturesTest.test_session
----------------------------------------------------------------------
Ran 3 tests in 0.001s

OK (skipped=1)
Running tests under Python 3.7.5: /tmpfs/src/tf_docs_env/bin/python
[ RUN      ] ModelTest.testBuildKerasModel
2021-12-04 11:33:57.507456: W tensorflow/stream_executor/platform/default/dso_loader.cc:60] Could not load dynamic library 'libcusolver.so.10'; dlerror: libcusolver.so.10: cannot open shared object file: No such file or directory
2021-12-04 11:33:57.508566: W tensorflow/core/common_runtime/gpu/gpu_device.cc:1757] Cannot dlopen some GPU libraries. Please make sure the missing libraries mentioned above are installed properly if you would like to use GPU. Follow the guide at https://www.tensorflow.org/install/gpu for how to download and setup the required libraries for your platform.
Skipping registering GPU devices...
I1204 11:33:57.581331 139740839778112 layer_utils.py:191] Model: "model"
I1204 11:33:57.581501 139740839778112 layer_utils.py:192] __________________________________________________________________________________________________
I1204 11:33:57.581558 139740839778112 layer_utils.py:189] Layer (type)                    Output Shape         Param #     Connected to                     
I1204 11:33:57.581596 139740839778112 layer_utils.py:194] ==================================================================================================
I1204 11:33:57.581741 139740839778112 layer_utils.py:189] pickup_latitude_xf (InputLayer) [(None,)]            0                                            
I1204 11:33:57.581793 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.581883 139740839778112 layer_utils.py:189] trip_miles_xf (InputLayer)      [(None,)]            0                                            
I1204 11:33:57.581926 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.582010 139740839778112 layer_utils.py:189] trip_start_hour_xf (InputLayer) [(None,)]            0                                            
I1204 11:33:57.582052 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.582189 139740839778112 layer_utils.py:189] dense_features (DenseFeatures)  (None, 1)            0           pickup_latitude_xf[0][0]         
I1204 11:33:57.582241 139740839778112 layer_utils.py:189]                                                                  trip_miles_xf[0][0]              
I1204 11:33:57.582280 139740839778112 layer_utils.py:189]                                                                  trip_start_hour_xf[0][0]         
I1204 11:33:57.582315 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.582462 139740839778112 layer_utils.py:189] dense (Dense)                   (None, 1)            2           dense_features[0][0]             
I1204 11:33:57.582518 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.582629 139740839778112 layer_utils.py:189] dense_1 (Dense)                 (None, 1)            2           dense[0][0]                      
I1204 11:33:57.582674 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.582824 139740839778112 layer_utils.py:189] dense_features_1 (DenseFeatures (None, 34)           0           pickup_latitude_xf[0][0]         
I1204 11:33:57.582879 139740839778112 layer_utils.py:189]                                                                  trip_miles_xf[0][0]              
I1204 11:33:57.582921 139740839778112 layer_utils.py:189]                                                                  trip_start_hour_xf[0][0]         
I1204 11:33:57.582957 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.583053 139740839778112 layer_utils.py:189] concatenate (Concatenate)       (None, 35)           0           dense_1[0][0]                    
I1204 11:33:57.583099 139740839778112 layer_utils.py:189]                                                                  dense_features_1[0][0]           
I1204 11:33:57.583143 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.583260 139740839778112 layer_utils.py:189] dense_2 (Dense)                 (None, 1)            36          concatenate[0][0]                
I1204 11:33:57.583309 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.583389 139740839778112 layer_utils.py:189] tf.compat.v1.squeeze (TFOpLambd (None,)              0           dense_2[0][0]                    
I1204 11:33:57.583432 139740839778112 layer_utils.py:256] ==================================================================================================
I1204 11:33:57.583687 139740839778112 layer_utils.py:267] Total params: 40
I1204 11:33:57.583751 139740839778112 layer_utils.py:268] Trainable params: 40
I1204 11:33:57.583794 139740839778112 layer_utils.py:269] Non-trainable params: 0
I1204 11:33:57.583832 139740839778112 layer_utils.py:270] __________________________________________________________________________________________________
I1204 11:33:57.649701 139740839778112 layer_utils.py:191] Model: "model_1"
I1204 11:33:57.649825 139740839778112 layer_utils.py:192] __________________________________________________________________________________________________
I1204 11:33:57.649878 139740839778112 layer_utils.py:189] Layer (type)                    Output Shape         Param #     Connected to                     
I1204 11:33:57.649932 139740839778112 layer_utils.py:194] ==================================================================================================
I1204 11:33:57.650066 139740839778112 layer_utils.py:189] pickup_latitude_xf (InputLayer) [(None,)]            0                                            
I1204 11:33:57.650120 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.650207 139740839778112 layer_utils.py:189] trip_miles_xf (InputLayer)      [(None,)]            0                                            
I1204 11:33:57.650259 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.650356 139740839778112 layer_utils.py:189] trip_start_hour_xf (InputLayer) [(None,)]            0                                            
I1204 11:33:57.650398 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.650552 139740839778112 layer_utils.py:189] dense_features_2 (DenseFeatures (None, 1)            0           pickup_latitude_xf[0][0]         
I1204 11:33:57.650603 139740839778112 layer_utils.py:189]                                                                  trip_miles_xf[0][0]              
I1204 11:33:57.650644 139740839778112 layer_utils.py:189]                                                                  trip_start_hour_xf[0][0]         
I1204 11:33:57.650682 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.650812 139740839778112 layer_utils.py:189] dense_3 (Dense)                 (None, 1)            2           dense_features_2[0][0]           
I1204 11:33:57.650864 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.651007 139740839778112 layer_utils.py:189] dense_features_3 (DenseFeatures (None, 34)           0           pickup_latitude_xf[0][0]         
I1204 11:33:57.651061 139740839778112 layer_utils.py:189]                                                                  trip_miles_xf[0][0]              
I1204 11:33:57.651102 139740839778112 layer_utils.py:189]                                                                  trip_start_hour_xf[0][0]         
I1204 11:33:57.651146 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.651229 139740839778112 layer_utils.py:189] concatenate_1 (Concatenate)     (None, 35)           0           dense_3[0][0]                    
I1204 11:33:57.651274 139740839778112 layer_utils.py:189]                                                                  dense_features_3[0][0]           
I1204 11:33:57.651311 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.651462 139740839778112 layer_utils.py:189] dense_4 (Dense)                 (None, 1)            36          concatenate_1[0][0]              
I1204 11:33:57.651547 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.651632 139740839778112 layer_utils.py:189] tf.compat.v1.squeeze_1 (TFOpLam (None,)              0           dense_4[0][0]                    
I1204 11:33:57.651675 139740839778112 layer_utils.py:256] ==================================================================================================
I1204 11:33:57.651959 139740839778112 layer_utils.py:267] Total params: 38
I1204 11:33:57.652019 139740839778112 layer_utils.py:268] Trainable params: 38
I1204 11:33:57.652061 139740839778112 layer_utils.py:269] Non-trainable params: 0
I1204 11:33:57.652098 139740839778112 layer_utils.py:270] __________________________________________________________________________________________________
INFO:tensorflow:time(__main__.ModelTest.testBuildKerasModel): 0.84s
I1204 11:33:57.652639 139740839778112 test_util.py:2076] time(__main__.ModelTest.testBuildKerasModel): 0.84s
[       OK ] ModelTest.testBuildKerasModel
[ RUN      ] ModelTest.test_session
[  SKIPPED ] ModelTest.test_session
----------------------------------------------------------------------
Ran 2 tests in 0.836s

OK (skipped=1)

Paso 4. Ejecute su primera canalización TFX

Los componentes de la tubería TFX generarán salidas para cada corrida como artefactos de metadatos ML , y tienen que ser almacenados en algún lugar. Puede usar cualquier almacenamiento al que pueda acceder el clúster de KFP y, para este ejemplo, usaremos Google Cloud Storage (GCS). Se debería haber creado automáticamente un segmento de GCS predeterminado. Su nombre será <your-project-id>-kubeflowpipelines-default .

Carguemos nuestros datos de muestra en el depósito de GCS para que podamos usarlos en nuestra canalización más adelante.

gsutil cp data/data.csv gs://{GOOGLE_CLOUD_PROJECT}-kubeflowpipelines-default/tfx-template/data/taxi/data.csv
BucketNotFoundException: 404 gs://tf-benchmark-dashboard-kubeflowpipelines-default bucket does not exist.

Vamos a crear una tubería TFX utilizando la tfx pipeline create comandos.

!tfx pipeline create  --pipeline-path=kubeflow_runner.py --endpoint={ENDPOINT} \
--build-image
CLI
Usage: tfx pipeline create [OPTIONS]
Try 'tfx pipeline create --help' for help.

Error: no such option: --build-image

Si bien la creación de una tubería, Dockerfile se generará a construir una imagen de estibador. No olvide agregarlo al sistema de control de código fuente (por ejemplo, git) junto con otros archivos fuente.

Ahora empieza una ejecución correr con la tubería recién creado utilizando la tfx run create comandos.

tfx run create --pipeline-name={PIPELINE_NAME} --endpoint={ENDPOINT}
CLI
Creating a run for pipeline: my_pipeline
Detected Beam.
[WARNING] Default engine will be changed to "local" in the near future.
Use --engine flag if you intend to use a different orchestrator.
Pipeline "my_pipeline" does not exist.

O bien, también puede ejecutar la canalización en el Panel de KFP. La nueva ejecución de ejecución aparecerá en Experimentos en el panel de KFP. Hacer clic en el experimento le permitirá monitorear el progreso y visualizar los artefactos creados durante la ejecución.

Sin embargo, recomendamos visitar el panel de control de KFP. Puede acceder al panel de control de KFP desde el menú Pipelines de Cloud AI Platform en Google Cloud Console. Una vez que visite el tablero, podrá encontrar la tubería y acceder a una gran cantidad de información sobre la tubería. Por ejemplo, usted puede encontrar sus carreras en el menú de experimentos, y cuando se abre el plazo de ejecución en virtud de experimentos se puede encontrar todos los objetos de la tubería en el menú de artefactos.

Una de las principales fuentes de fallas son los problemas relacionados con los permisos. Asegúrese de que su clúster de KFP tenga permisos para acceder a las API de Google Cloud. Esto se puede configurar cuando se crea un grupo de KFP en BPC , o ver el documento Solución de problemas en la BPC .

Paso 5. Agregue componentes para la validación de datos.

En este paso, va a agregar componentes para la validación de datos, incluyendo StatisticsGen , SchemaGen y ExampleValidator . Si usted está interesado en la validación de datos, consulte Comience con Tensorflow validación de datos .

Haga doble clic para cambiar directorio para pipeline y haga doble clic de nuevo para abrir pipeline.py . Encuentra y elimine las 3 líneas que se suman StatisticsGen , SchemaGen y ExampleValidator a la tubería. (Consejo: búsqueda de los comentarios que contengan TODO(step 5): ). Asegúrese de guardar pipeline.py después de modificarlo.

Ahora necesita actualizar la canalización existente con la definición de canalización modificada. Utilice la tfx pipeline update comando para actualizar su tubería, seguido por la tfx run create comando para crear una nueva ejecución de ejecución de la canalización actualizada.

# Update the pipeline
!tfx pipeline update \
--pipeline-path=kubeflow_runner.py \
--endpoint={ENDPOINT}
# You can run the pipeline the same way.
!tfx run create --pipeline-name {PIPELINE_NAME} --endpoint={ENDPOINT}
CLI
Updating pipeline
Detected Beam.
[WARNING] Default engine will be changed to "local" in the near future.
Use --engine flag if you intend to use a different orchestrator.
beam runner not found in dsl.
CLI
Creating a run for pipeline: my_pipeline
Detected Beam.
[WARNING] Default engine will be changed to "local" in the near future.
Use --engine flag if you intend to use a different orchestrator.
Pipeline "my_pipeline" does not exist.

Verifique las salidas de la canalización

Visite el panel de control de KFP para encontrar salidas de canalización en la página para su ejecución de canalización. Haga clic en la pestaña Experimentos de la izquierda, y todas las carreras en la página experimentos. Debería poder encontrar la última ejecución bajo el nombre de su canalización.

Paso 6. Agregue componentes para la capacitación.

En este paso, va a agregar componentes para la formación y la validación de modelos incluyendo Transform , Trainer , Resolver , Evaluator y Pusher .

Haga doble clic para abrir el pipeline.py . Encuentra y elimine las 5 líneas que se suman Transform , Trainer , Resolver , Evaluator y Pusher de la tubería. (Consejo: búsqueda de TODO(step 6): )

Como hizo antes, ahora necesita actualizar la canalización existente con la definición de canalización modificada. Las instrucciones son las mismas que en el Paso 5. Actualización de la tubería mediante tfx pipeline update , y crean un plazo de ejecución utilizando tfx run create .

!tfx pipeline update \
--pipeline-path=kubeflow_runner.py \
--endpoint={ENDPOINT}
!tfx run create --pipeline-name {PIPELINE_NAME} --endpoint={ENDPOINT}
CLI
Updating pipeline
Detected Beam.
[WARNING] Default engine will be changed to "local" in the near future.
Use --engine flag if you intend to use a different orchestrator.
beam runner not found in dsl.
CLI
Creating a run for pipeline: my_pipeline
Detected Beam.
[WARNING] Default engine will be changed to "local" in the near future.
Use --engine flag if you intend to use a different orchestrator.
Pipeline "my_pipeline" does not exist.

Cuando esta ejecución de ejecución termine con éxito, habrá creado y ejecutado su primera canalización TFX en AI Platform Pipelines.

Paso 7. (Opcional) Pruebe BigQueryExampleGen

BigQuery es un serverless, altamente escalable, y el almacén de datos de nube rentable. BigQuery se puede usar como fuente para ejemplos de entrenamiento en TFX. En este paso, vamos a añadir BigQueryExampleGen a la tubería.

Haga doble clic para abrir el pipeline.py . Comentar CsvExampleGen y elimine la línea que crea una instancia de BigQueryExampleGen . También es necesario comentar la query argumento de la create_pipeline función.

Tenemos que especificar qué proyecto GCP a utilizar para BigQuery, y esto se hace mediante el establecimiento de --project en beam_pipeline_args al crear una tubería.

Haga doble clic para abrir el configs.py . Descomentar la definición de GOOGLE_CLOUD_REGION , BIG_QUERY_WITH_DIRECT_RUNNER_BEAM_PIPELINE_ARGS y BIG_QUERY_QUERY . Debe reemplazar el valor de la región en este archivo con los valores correctos para su proyecto de GCP.

Cambia de directorio un nivel más arriba. Haga clic en el nombre del directorio encima de la lista de archivos. El nombre del directorio es el nombre de la tubería que es my_pipeline si no ha cambiado.

Haga doble clic para abrir el kubeflow_runner.py . Descomente dos argumentos, query y beam_pipeline_args , para el create_pipeline función.

Ahora la canalización está lista para usar BigQuery como fuente de ejemplo. Actualice la canalización como antes y cree una nueva ejecución como lo hicimos en los pasos 5 y 6.

!tfx pipeline update \
--pipeline-path=kubeflow_runner.py \
--endpoint={ENDPOINT}
!tfx run create --pipeline-name {PIPELINE_NAME} --endpoint={ENDPOINT}
CLI
Updating pipeline
Detected Beam.
[WARNING] Default engine will be changed to "local" in the near future.
Use --engine flag if you intend to use a different orchestrator.
beam runner not found in dsl.
CLI
Creating a run for pipeline: my_pipeline
Detected Beam.
[WARNING] Default engine will be changed to "local" in the near future.
Use --engine flag if you intend to use a different orchestrator.
Pipeline "my_pipeline" does not exist.

Paso 8. (Opcional) Intente Dataflow con KFP

Varios Componentes TFX utiliza Apache Beam para implementar procesos de datos en paralelo, y significa que puede distribuir las cargas de trabajo de procesamiento de datos utilizando Google Cloud flujo de datos . En este paso, configuraremos el orquestador de Kubeflow para que use el flujo de datos como back-end de procesamiento de datos para Apache Beam.

Haga doble clic en pipeline de cambio de directorio, y doble clic para abrir el configs.py . Descomentar la definición de GOOGLE_CLOUD_REGION y DATAFLOW_BEAM_PIPELINE_ARGS .

Cambia de directorio un nivel más arriba. Haga clic en el nombre del directorio encima de la lista de archivos. El nombre del directorio es el nombre de la tubería que es my_pipeline si no ha cambiado.

Haga doble clic para abrir el kubeflow_runner.py . Descomente beam_pipeline_args . (También asegúrese de comentar actuales beam_pipeline_args que agregó en el paso 7.)

Ahora la canalización está lista para usar Dataflow. Actualice la tubería y cree una ejecución de ejecución como lo hicimos en los pasos 5 y 6.

!tfx pipeline update \
--pipeline-path=kubeflow_runner.py \
--endpoint={ENDPOINT}
!tfx run create --pipeline-name {PIPELINE_NAME} --endpoint={ENDPOINT}
CLI
Updating pipeline
Detected Beam.
[WARNING] Default engine will be changed to "local" in the near future.
Use --engine flag if you intend to use a different orchestrator.
beam runner not found in dsl.
CLI
Creating a run for pipeline: my_pipeline
Detected Beam.
[WARNING] Default engine will be changed to "local" in the near future.
Use --engine flag if you intend to use a different orchestrator.
Pipeline "my_pipeline" does not exist.

Usted puede encontrar sus trabajos de flujo de datos de flujo de datos en la nube de la consola .

Paso 9. (Opcional) Pruebe Nube AI plataforma de formación y predicción con KFP

Interopera con varios servicios TFX GCP administrados, como Cloud Platform AI para la Formación y predicción . Puede configurar su Trainer componente a utilizar la nube AI plataforma de formación, un servicio gestionado para la formación de modelos ML. Por otra parte, cuando el modelo está construido y listo para ser servido, puede llevar a su modelo de Cloud Platform AI Predicción para servir. En este paso, vamos a configurar nuestro Trainer y Pusher componente para utilizar los servicios de la nube de la plataforma IA.

Antes de editar archivos, puede que primero tenga que habilitar AI plataforma de formación y Prediction API.

Haga doble clic en pipeline de cambio de directorio, y doble clic para abrir el configs.py . Descomentar la definición de GOOGLE_CLOUD_REGION , GCP_AI_PLATFORM_TRAINING_ARGS y GCP_AI_PLATFORM_SERVING_ARGS . Haremos uso de nuestra imagen Envase hecha a la medida para entrenar un modelo en la nube AI plataforma de formación, por lo que debemos establecer masterConfig.imageUri en GCP_AI_PLATFORM_TRAINING_ARGS al mismo valor que CUSTOM_TFX_IMAGE anteriormente.

Cambie el directorio a un nivel superior, y doble clic para abrir el kubeflow_runner.py . Descomente ai_platform_training_args y ai_platform_serving_args .

Actualice la tubería y cree una ejecución de ejecución como lo hicimos en los pasos 5 y 6.

!tfx pipeline update \
--pipeline-path=kubeflow_runner.py \
--endpoint={ENDPOINT}
!tfx run create --pipeline-name {PIPELINE_NAME} --endpoint={ENDPOINT}
CLI
Updating pipeline
Detected Beam.
[WARNING] Default engine will be changed to "local" in the near future.
Use --engine flag if you intend to use a different orchestrator.
beam runner not found in dsl.
CLI
Creating a run for pipeline: my_pipeline
Detected Beam.
[WARNING] Default engine will be changed to "local" in the near future.
Use --engine flag if you intend to use a different orchestrator.
Pipeline "my_pipeline" does not exist.

Usted puede encontrar sus puestos de trabajo de formación en la nube AI Plataforma Jobs . Si su tubería completada con éxito, puede encontrar su modelo en modelos de nube de la plataforma IA .

Paso 10. Ingiera SUS datos en la canalización

Hicimos una canalización para un modelo utilizando el conjunto de datos de Chicago Taxi. Ahora es el momento de poner sus datos en proceso.

Sus datos se pueden almacenar en cualquier lugar al que pueda acceder su canalización, incluidos GCS o BigQuery. Deberá modificar la definición de la canalización para acceder a sus datos.

  1. Si los datos se almacenan en archivos, modificar el DATA_PATH en kubeflow_runner.py o local_runner.py y ponerlo a la ubicación de los archivos. Si los datos se almacenan en BigQuery, modificar BIG_QUERY_QUERY en pipeline/configs.py a consulta correctamente para sus datos.
  2. Añadir características de models/features.py .
  3. Modificar models/preprocessing.py para transformar los datos de entrada para el entrenamiento .
  4. Modificar models/keras/model.py y models/keras/constants.py que describen el modelo ML .
    • También puede utilizar un modelo basado en estimadores. Cambio RUN_FN constante para models.estimator.model.run_fn en pipeline/configs.py .

Por favor, véase el entrenador guía de componentes para más introducción.

Limpiar

Para limpiar todos los recursos de Google Cloud utilizados en este proyecto, se puede eliminar el proyecto de Google Cloud que utilizó para el tutorial.

Alternativamente, puede limpiar recursos individuales visitando cada consola: