Crie um pipeline TFX usando modelos

Introdução

Este documento fornecerá instruções para criar um TensorFlow Extensão (TFX) gasoduto usando modelos que são fornecidos com o pacote de TFX Python. Muitas das instruções são comandos do shell do Linux, que serão executados em uma instância do AI Platform Notebooks. Correspondentes células código Notebook Jupyter que invocam os comandos usando ! são fornecidos.

Você vai construir um gasoduto usando Taxi Viagens conjunto de dados liberado pela cidade de Chicago. Recomendamos fortemente que você tente construir seu próprio pipeline usando seu conjunto de dados, utilizando este pipeline como uma linha de base.

Etapa 1. Configure seu ambiente.

O AI Platform Pipelines preparará um ambiente de desenvolvimento para construir um pipeline e um cluster Kubeflow Pipeline para executar o pipeline recém-construído.

Instale tfx pacote de python com kfp exigência extra.

import sys
# Use the latest version of pip.
!pip install --upgrade pip
# Install tfx and kfp Python packages.
!pip install --upgrade "tfx[kfp]<2"

Vamos verificar as versões do TFX.

python3 -c "from tfx import version ; print('TFX version: {}'.format(version.__version__))"
TFX version: 0.29.0

Em AI Platform Pipelines, TFX está sendo executado em um ambiente Kubernetes hospedado usando Kubeflow Pipelines .

Vamos definir algumas variáveis ​​de ambiente para usar Kubeflow Pipelines.

Primeiro, obtenha seu ID de projeto do GCP.

# Read GCP project id from env.
shell_output=!gcloud config list --format 'value(core.project)' 2>/dev/null
GOOGLE_CLOUD_PROJECT=shell_output[0]
%env GOOGLE_CLOUD_PROJECT={GOOGLE_CLOUD_PROJECT}
print("GCP project ID:" + GOOGLE_CLOUD_PROJECT)
env: GOOGLE_CLOUD_PROJECT=tf-benchmark-dashboard
GCP project ID:tf-benchmark-dashboard

Também precisamos acessar seu cluster KFP. Você pode acessá-lo em seu Google Cloud Console no menu "AI Platform> Pipeline". O "ponto de extremidade" do cluster KFP pode ser encontrado na URL do painel Pipelines, ou você pode obtê-lo na URL da página de Introdução onde você iniciou este bloco de notas. Vamos criar um ENDPOINT variável de ambiente e configurá-lo para o ponto final aglomerado KFP. ENDPOINT deve conter apenas a parte do nome do host do URL. Por exemplo, se o URL do dashboard KFP é <a href="https://1e9deb537390ca22-dot-asia-east1.pipelines.googleusercontent.com/#/start">https://1e9deb537390ca22-dot-asia-east1.pipelines.googleusercontent.com/#/start</a> , valor ENDPOINT se torna 1e9deb537390ca22-dot-asia-east1.pipelines.googleusercontent.com .

# This refers to the KFP cluster endpoint
ENDPOINT='' # Enter your ENDPOINT here.
if not ENDPOINT:
    from absl import logging
    logging.error('Set your ENDPOINT in this cell.')
ERROR:absl:Set your ENDPOINT in this cell.

Definir o nome da imagem como tfx-pipeline no âmbito do projecto GCP atual.

# Docker image name for the pipeline image.
CUSTOM_TFX_IMAGE='gcr.io/' + GOOGLE_CLOUD_PROJECT + '/tfx-pipeline'

E está feito. Estamos prontos para criar um pipeline.

Etapa 2. Copie o modelo predefinido para o diretório do projeto.

Nesta etapa, criaremos um diretório e arquivos de projeto de pipeline de trabalho, copiando arquivos adicionais de um modelo predefinido.

Você pode dar o seu pipeline de um nome diferente, alterando a PIPELINE_NAME abaixo. Este também se tornará o nome do diretório do projeto onde seus arquivos serão colocados.

PIPELINE_NAME="my_pipeline"
import os
PROJECT_DIR=os.path.join(os.path.expanduser("~"),"imported",PIPELINE_NAME)

TFX inclui o taxi modelo com o pacote de TFX python. Se você está planejando resolver um problema de previsão baseada em pontos, incluindo classificação e regressão, este modelo pode ser usado como ponto de partida.

Os tfx template copy CLI comando copia predefinidos arquivos de modelo para o diretório do projeto.

!tfx template copy \
  --pipeline-name={PIPELINE_NAME} \
  --destination-path={PROJECT_DIR} \
  --model=taxi
CLI
Copying taxi pipeline template
kubeflow_runner.py -> /home/kbuilder/imported/my_pipeline/kubeflow_runner.py
kubeflow_v2_dag_runner.py -> /home/kbuilder/imported/my_pipeline/kubeflow_v2_dag_runner.py
features_test.py -> /home/kbuilder/imported/my_pipeline/models/features_test.py
model_test.py -> /home/kbuilder/imported/my_pipeline/models/estimator/model_test.py
constants.py -> /home/kbuilder/imported/my_pipeline/models/estimator/constants.py
model.py -> /home/kbuilder/imported/my_pipeline/models/estimator/model.py
__init__.py -> /home/kbuilder/imported/my_pipeline/models/estimator/__init__.py
model_test.py -> /home/kbuilder/imported/my_pipeline/models/keras/model_test.py
constants.py -> /home/kbuilder/imported/my_pipeline/models/keras/constants.py
model.py -> /home/kbuilder/imported/my_pipeline/models/keras/model.py
__init__.py -> /home/kbuilder/imported/my_pipeline/models/keras/__init__.py
preprocessing_test.py -> /home/kbuilder/imported/my_pipeline/models/preprocessing_test.py
preprocessing.py -> /home/kbuilder/imported/my_pipeline/models/preprocessing.py
__init__.py -> /home/kbuilder/imported/my_pipeline/models/__init__.py
features.py -> /home/kbuilder/imported/my_pipeline/models/features.py
pipeline.py -> /home/kbuilder/imported/my_pipeline/pipeline/pipeline.py
configs.py -> /home/kbuilder/imported/my_pipeline/pipeline/configs.py
__init__.py -> /home/kbuilder/imported/my_pipeline/pipeline/__init__.py
local_runner.py -> /home/kbuilder/imported/my_pipeline/local_runner.py
model_analysis.ipynb -> /home/kbuilder/imported/my_pipeline/model_analysis.ipynb
__init__.py -> /home/kbuilder/imported/my_pipeline/__init__.py
data_validation.ipynb -> /home/kbuilder/imported/my_pipeline/data_validation.ipynb
.gitignore -> /home/kbuilder/imported/my_pipeline/.gitignore
Traceback (most recent call last):
  File "/tmpfs/src/tf_docs_env/bin/tfx", line 8, in <module>
    sys.exit(cli_group())
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 829, in __call__
    return self.main(*args, **kwargs)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 782, in main
    rv = self.invoke(ctx)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 1259, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 1259, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 1066, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 610, in invoke
    return callback(*args, **kwargs)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/decorators.py", line 73, in new_func
    return ctx.invoke(f, obj, *args, **kwargs)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 610, in invoke
    return callback(*args, **kwargs)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tfx/tools/cli/commands/template.py", line 73, in copy
    template_handler.copy_template(ctx.flags_dict)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tfx/tools/cli/handler/template_handler.py", line 185, in copy_template
    fileio.copy(src_path, dst_path)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tfx/dsl/io/fileio.py", line 51, in copy
    src_fs.copy(src, dst, overwrite=overwrite)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tfx/dsl/io/plugins/tensorflow_gfile.py", line 48, in copy
    tf.io.gfile.copy(src, dst, overwrite=overwrite)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow/python/lib/io/file_io.py", line 516, in copy_v2
    compat.path_to_bytes(src), compat.path_to_bytes(dst), overwrite)
tensorflow.python.framework.errors_impl.AlreadyExistsError: file already exists

Altere o contexto do diretório de trabalho neste bloco de notas para o diretório do projeto.

%cd {PROJECT_DIR}
/home/kbuilder/imported/my_pipeline

Etapa 3. Navegue pelos seus arquivos de origem copiados

O modelo TFX fornece arquivos scaffold básicos para construir um pipeline, incluindo código-fonte Python, dados de amostra e Notebooks Jupyter para analisar a saída do pipeline. O taxi modelo usa o mesmo conjunto de dados Chicago Taxi e modelo ML como o Tutorial do fluxo de ar .

Aqui está uma breve introdução a cada um dos arquivos Python.

  • pipeline - Este diretório contém a definição do gasoduto
    • configs.py - define constantes comuns para os corredores de oleodutos
    • pipeline.py - define componentes TFX e um oleoduto
  • models - Esta pasta contém definições de modelo ML.
    • features.py , features_test.py - define recursos para o modelo
    • preprocessing.py , preprocessing_test.py - define pré-processamento trabalhos usando tf::Transform
    • estimator - Este diretório contém um modelo baseado Estimador.
      • constants.py - define constantes do modelo
      • model.py , model_test.py - define modelo DNN usando TF estimador
    • keras - Esta pasta contém um modelo baseado Keras.
      • constants.py - define constantes do modelo
      • model.py , model_test.py - define modelo DNN usando Keras
  • local_runner.py , kubeflow_runner.py - definir os corredores para cada mecanismo de orquestração

Você pode notar que existem alguns arquivos com _test.py em seu nome. Esses são testes de unidade do pipeline e é recomendável adicionar mais testes de unidade à medida que você implementa seus próprios pipelines. Você pode executar testes de unidade, fornecendo o nome do módulo de arquivos de teste com -m bandeira. Você normalmente pode obter um nome de módulo, eliminando .py extensão e substituindo / com . . Por exemplo:

{sys.executable} -m models.features_test
{sys.executable} -m models.keras.model_test
Running tests under Python 3.7.5: /tmpfs/src/tf_docs_env/bin/python
[ RUN      ] FeaturesTest.testNumberOfBucketFeatureBucketCount
INFO:tensorflow:time(__main__.FeaturesTest.testNumberOfBucketFeatureBucketCount): 0.0s
I1204 11:33:54.064224 139808961349440 test_util.py:2076] time(__main__.FeaturesTest.testNumberOfBucketFeatureBucketCount): 0.0s
[       OK ] FeaturesTest.testNumberOfBucketFeatureBucketCount
[ RUN      ] FeaturesTest.testTransformedNames
INFO:tensorflow:time(__main__.FeaturesTest.testTransformedNames): 0.0s
I1204 11:33:54.064666 139808961349440 test_util.py:2076] time(__main__.FeaturesTest.testTransformedNames): 0.0s
[       OK ] FeaturesTest.testTransformedNames
[ RUN      ] FeaturesTest.test_session
[  SKIPPED ] FeaturesTest.test_session
----------------------------------------------------------------------
Ran 3 tests in 0.001s

OK (skipped=1)
Running tests under Python 3.7.5: /tmpfs/src/tf_docs_env/bin/python
[ RUN      ] ModelTest.testBuildKerasModel
2021-12-04 11:33:57.507456: W tensorflow/stream_executor/platform/default/dso_loader.cc:60] Could not load dynamic library 'libcusolver.so.10'; dlerror: libcusolver.so.10: cannot open shared object file: No such file or directory
2021-12-04 11:33:57.508566: W tensorflow/core/common_runtime/gpu/gpu_device.cc:1757] Cannot dlopen some GPU libraries. Please make sure the missing libraries mentioned above are installed properly if you would like to use GPU. Follow the guide at https://www.tensorflow.org/install/gpu for how to download and setup the required libraries for your platform.
Skipping registering GPU devices...
I1204 11:33:57.581331 139740839778112 layer_utils.py:191] Model: "model"
I1204 11:33:57.581501 139740839778112 layer_utils.py:192] __________________________________________________________________________________________________
I1204 11:33:57.581558 139740839778112 layer_utils.py:189] Layer (type)                    Output Shape         Param #     Connected to                     
I1204 11:33:57.581596 139740839778112 layer_utils.py:194] ==================================================================================================
I1204 11:33:57.581741 139740839778112 layer_utils.py:189] pickup_latitude_xf (InputLayer) [(None,)]            0                                            
I1204 11:33:57.581793 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.581883 139740839778112 layer_utils.py:189] trip_miles_xf (InputLayer)      [(None,)]            0                                            
I1204 11:33:57.581926 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.582010 139740839778112 layer_utils.py:189] trip_start_hour_xf (InputLayer) [(None,)]            0                                            
I1204 11:33:57.582052 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.582189 139740839778112 layer_utils.py:189] dense_features (DenseFeatures)  (None, 1)            0           pickup_latitude_xf[0][0]         
I1204 11:33:57.582241 139740839778112 layer_utils.py:189]                                                                  trip_miles_xf[0][0]              
I1204 11:33:57.582280 139740839778112 layer_utils.py:189]                                                                  trip_start_hour_xf[0][0]         
I1204 11:33:57.582315 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.582462 139740839778112 layer_utils.py:189] dense (Dense)                   (None, 1)            2           dense_features[0][0]             
I1204 11:33:57.582518 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.582629 139740839778112 layer_utils.py:189] dense_1 (Dense)                 (None, 1)            2           dense[0][0]                      
I1204 11:33:57.582674 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.582824 139740839778112 layer_utils.py:189] dense_features_1 (DenseFeatures (None, 34)           0           pickup_latitude_xf[0][0]         
I1204 11:33:57.582879 139740839778112 layer_utils.py:189]                                                                  trip_miles_xf[0][0]              
I1204 11:33:57.582921 139740839778112 layer_utils.py:189]                                                                  trip_start_hour_xf[0][0]         
I1204 11:33:57.582957 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.583053 139740839778112 layer_utils.py:189] concatenate (Concatenate)       (None, 35)           0           dense_1[0][0]                    
I1204 11:33:57.583099 139740839778112 layer_utils.py:189]                                                                  dense_features_1[0][0]           
I1204 11:33:57.583143 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.583260 139740839778112 layer_utils.py:189] dense_2 (Dense)                 (None, 1)            36          concatenate[0][0]                
I1204 11:33:57.583309 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.583389 139740839778112 layer_utils.py:189] tf.compat.v1.squeeze (TFOpLambd (None,)              0           dense_2[0][0]                    
I1204 11:33:57.583432 139740839778112 layer_utils.py:256] ==================================================================================================
I1204 11:33:57.583687 139740839778112 layer_utils.py:267] Total params: 40
I1204 11:33:57.583751 139740839778112 layer_utils.py:268] Trainable params: 40
I1204 11:33:57.583794 139740839778112 layer_utils.py:269] Non-trainable params: 0
I1204 11:33:57.583832 139740839778112 layer_utils.py:270] __________________________________________________________________________________________________
I1204 11:33:57.649701 139740839778112 layer_utils.py:191] Model: "model_1"
I1204 11:33:57.649825 139740839778112 layer_utils.py:192] __________________________________________________________________________________________________
I1204 11:33:57.649878 139740839778112 layer_utils.py:189] Layer (type)                    Output Shape         Param #     Connected to                     
I1204 11:33:57.649932 139740839778112 layer_utils.py:194] ==================================================================================================
I1204 11:33:57.650066 139740839778112 layer_utils.py:189] pickup_latitude_xf (InputLayer) [(None,)]            0                                            
I1204 11:33:57.650120 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.650207 139740839778112 layer_utils.py:189] trip_miles_xf (InputLayer)      [(None,)]            0                                            
I1204 11:33:57.650259 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.650356 139740839778112 layer_utils.py:189] trip_start_hour_xf (InputLayer) [(None,)]            0                                            
I1204 11:33:57.650398 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.650552 139740839778112 layer_utils.py:189] dense_features_2 (DenseFeatures (None, 1)            0           pickup_latitude_xf[0][0]         
I1204 11:33:57.650603 139740839778112 layer_utils.py:189]                                                                  trip_miles_xf[0][0]              
I1204 11:33:57.650644 139740839778112 layer_utils.py:189]                                                                  trip_start_hour_xf[0][0]         
I1204 11:33:57.650682 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.650812 139740839778112 layer_utils.py:189] dense_3 (Dense)                 (None, 1)            2           dense_features_2[0][0]           
I1204 11:33:57.650864 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.651007 139740839778112 layer_utils.py:189] dense_features_3 (DenseFeatures (None, 34)           0           pickup_latitude_xf[0][0]         
I1204 11:33:57.651061 139740839778112 layer_utils.py:189]                                                                  trip_miles_xf[0][0]              
I1204 11:33:57.651102 139740839778112 layer_utils.py:189]                                                                  trip_start_hour_xf[0][0]         
I1204 11:33:57.651146 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.651229 139740839778112 layer_utils.py:189] concatenate_1 (Concatenate)     (None, 35)           0           dense_3[0][0]                    
I1204 11:33:57.651274 139740839778112 layer_utils.py:189]                                                                  dense_features_3[0][0]           
I1204 11:33:57.651311 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.651462 139740839778112 layer_utils.py:189] dense_4 (Dense)                 (None, 1)            36          concatenate_1[0][0]              
I1204 11:33:57.651547 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.651632 139740839778112 layer_utils.py:189] tf.compat.v1.squeeze_1 (TFOpLam (None,)              0           dense_4[0][0]                    
I1204 11:33:57.651675 139740839778112 layer_utils.py:256] ==================================================================================================
I1204 11:33:57.651959 139740839778112 layer_utils.py:267] Total params: 38
I1204 11:33:57.652019 139740839778112 layer_utils.py:268] Trainable params: 38
I1204 11:33:57.652061 139740839778112 layer_utils.py:269] Non-trainable params: 0
I1204 11:33:57.652098 139740839778112 layer_utils.py:270] __________________________________________________________________________________________________
INFO:tensorflow:time(__main__.ModelTest.testBuildKerasModel): 0.84s
I1204 11:33:57.652639 139740839778112 test_util.py:2076] time(__main__.ModelTest.testBuildKerasModel): 0.84s
[       OK ] ModelTest.testBuildKerasModel
[ RUN      ] ModelTest.test_session
[  SKIPPED ] ModelTest.test_session
----------------------------------------------------------------------
Ran 2 tests in 0.836s

OK (skipped=1)

Etapa 4. Execute seu primeiro pipeline TFX

Componentes no pipeline TFX irá gerar saídas para cada execução como Artifacts ML metadados , e eles precisam ser armazenados em algum lugar. Você pode usar qualquer armazenamento que o cluster KFP possa acessar e, para este exemplo, usaremos o Google Cloud Storage (GCS). Um intervalo GCS padrão deve ter sido criado automaticamente. Seu nome será <your-project-id>-kubeflowpipelines-default .

Vamos fazer upload de nossos dados de amostra para o intervalo GCS para que possamos usá-los em nosso pipeline mais tarde.

gsutil cp data/data.csv gs://{GOOGLE_CLOUD_PROJECT}-kubeflowpipelines-default/tfx-template/data/taxi/data.csv
BucketNotFoundException: 404 gs://tf-benchmark-dashboard-kubeflowpipelines-default bucket does not exist.

Vamos criar um gasoduto TFX usando o tfx pipeline create comando.

!tfx pipeline create  --pipeline-path=kubeflow_runner.py --endpoint={ENDPOINT} \
--build-image
CLI
Usage: tfx pipeline create [OPTIONS]
Try 'tfx pipeline create --help' for help.

Error: no such option: --build-image

Ao criar um gasoduto, Dockerfile serão gerados para construir uma imagem Docker. Não se esqueça de adicioná-lo ao sistema de controle de origem (por exemplo, git) junto com outros arquivos de origem.

Agora inicie uma execução correr com o gasoduto recém-criada usando o tfx run create comando.

tfx run create --pipeline-name={PIPELINE_NAME} --endpoint={ENDPOINT}
CLI
Creating a run for pipeline: my_pipeline
Detected Beam.
[WARNING] Default engine will be changed to "local" in the near future.
Use --engine flag if you intend to use a different orchestrator.
Pipeline "my_pipeline" does not exist.

Ou você também pode executar o pipeline no Painel KFP. A nova execução será listada em Experimentos no Painel KFP. Clicar no experimento permitirá que você monitore o progresso e visualize os artefatos criados durante a execução.

No entanto, recomendamos visitar o Painel KFP. Você pode acessar o KFP Dashboard no menu Cloud AI Platform Pipelines no Google Cloud Console. Depois de visitar o painel, você poderá encontrar o pipeline e acessar uma grande variedade de informações sobre ele. Por exemplo, você pode encontrar suas corridas no menu experimentos, e quando você abrir o seu prazo de execução sob Experimentos você pode encontrar todos os seus artefatos a partir do gasoduto sob o menu Artifacts.

Uma das principais fontes de falha são os problemas relacionados à permissão. Certifique-se de que seu cluster KFP tenha permissões para acessar APIs do Google Cloud. Isso pode ser configurado quando você criar um cluster KFP em GCP , ou ver documento Solução de problemas no GCP .

Etapa 5. Adicionar componentes para validação de dados.

Nesta etapa, você irá adicionar componentes para validação de dados, incluindo StatisticsGen , SchemaGen e ExampleValidator . Se você está interessado em validação de dados, consulte Comece com Tensorflow validação de dados .

Clique duas vezes em que altere o diretório para pipeline e clique duas vezes novamente para abrir pipeline.py . Localizar e tire as 3 linhas que agregam StatisticsGen , SchemaGen e ExampleValidator ao gasoduto. (Dica: procure por comentários que contenham TODO(step 5): ). Certifique-se de salvar pipeline.py após editá-lo.

Agora você precisa atualizar o pipeline existente com a definição de pipeline modificada. Use a tfx pipeline update comando para atualizar seu pipeline, seguido pelo tfx run create comando para criar um novo prazo de execução do seu pipeline atualizado.

# Update the pipeline
!tfx pipeline update \
--pipeline-path=kubeflow_runner.py \
--endpoint={ENDPOINT}
# You can run the pipeline the same way.
!tfx run create --pipeline-name {PIPELINE_NAME} --endpoint={ENDPOINT}
CLI
Updating pipeline
Detected Beam.
[WARNING] Default engine will be changed to "local" in the near future.
Use --engine flag if you intend to use a different orchestrator.
beam runner not found in dsl.
CLI
Creating a run for pipeline: my_pipeline
Detected Beam.
[WARNING] Default engine will be changed to "local" in the near future.
Use --engine flag if you intend to use a different orchestrator.
Pipeline "my_pipeline" does not exist.

Verifique as saídas do pipeline

Visite o painel KFP para encontrar saídas de pipeline na página para sua execução de pipeline. Clique na guia Experimentos na esquerda, e Todos é executado na página Experimentos. Você deve conseguir encontrar a última execução com o nome de seu pipeline.

Etapa 6. Adicionar componentes para treinamento.

Nesta etapa, você irá adicionar componentes para a formação e validação do modelo, incluindo Transform , Trainer , Resolver , Evaluator , e Pusher .

Duplo-Clique para abrir pipeline.py . Localizar e tire as 5 linhas que agregam Transform , Trainer , Resolver , Evaluator e Pusher ao gasoduto. (Dica: procure por TODO(step 6): )

Como você fez antes, agora você precisa atualizar o pipeline existente com a definição de pipeline modificada. As instruções são as mesmas Passo 5. Atualize o gasoduto usando tfx pipeline update , e criar uma corrida de execução usando tfx run create .

!tfx pipeline update \
--pipeline-path=kubeflow_runner.py \
--endpoint={ENDPOINT}
!tfx run create --pipeline-name {PIPELINE_NAME} --endpoint={ENDPOINT}
CLI
Updating pipeline
Detected Beam.
[WARNING] Default engine will be changed to "local" in the near future.
Use --engine flag if you intend to use a different orchestrator.
beam runner not found in dsl.
CLI
Creating a run for pipeline: my_pipeline
Detected Beam.
[WARNING] Default engine will be changed to "local" in the near future.
Use --engine flag if you intend to use a different orchestrator.
Pipeline "my_pipeline" does not exist.

Quando a execução dessa execução for concluída com êxito, você terá criado e executado seu primeiro pipeline TFX no AI Platform Pipelines!

Passo 7. (Opcional) Tente BigQueryExampleGen

BigQuery é um serverless, altamente escalável, e armazém de dados em nuvem de baixo custo. O BigQuery pode ser usado como fonte para exemplos de treinamento em TFX. Neste passo, vamos adicionar BigQueryExampleGen ao gasoduto.

Duplo-Clique para abrir pipeline.py . Comentário fora CsvExampleGen e descomente a linha que cria uma instância de BigQueryExampleGen . Você também precisa remover o comentário da query argumento do create_pipeline função.

Precisamos especificar qual projeto GCP usar para BigQuery, e isso é feito através da criação --project em beam_pipeline_args ao criar um pipeline.

Duplo-Clique para abrir configs.py . Uncomment a definição de GOOGLE_CLOUD_REGION , BIG_QUERY_WITH_DIRECT_RUNNER_BEAM_PIPELINE_ARGS e BIG_QUERY_QUERY . Você deve substituir o valor da região neste arquivo pelos valores corretos para seu projeto do GCP.

Mude o diretório um nível acima. Clique no nome do diretório acima da lista de arquivos. O nome do diretório é o nome do gasoduto que é my_pipeline se você não mudar.

Duplo-Clique para abrir kubeflow_runner.py . Descomente dois argumentos, query e beam_pipeline_args , para a create_pipeline função.

Agora o pipeline está pronto para usar o BigQuery como fonte de exemplo. Atualize o pipeline como antes e crie uma nova execução de execução como fizemos nas etapas 5 e 6.

!tfx pipeline update \
--pipeline-path=kubeflow_runner.py \
--endpoint={ENDPOINT}
!tfx run create --pipeline-name {PIPELINE_NAME} --endpoint={ENDPOINT}
CLI
Updating pipeline
Detected Beam.
[WARNING] Default engine will be changed to "local" in the near future.
Use --engine flag if you intend to use a different orchestrator.
beam runner not found in dsl.
CLI
Creating a run for pipeline: my_pipeline
Detected Beam.
[WARNING] Default engine will be changed to "local" in the near future.
Use --engine flag if you intend to use a different orchestrator.
Pipeline "my_pipeline" does not exist.

Passo 8. (Opcional) Tentar fluxo de dados com KFP

Vários componentes TFX usa Apache feixe de implementar pipelines de dados paralelos, e isso significa que você pode distribuir dados cargas de trabalho de processamento usando o Google Cloud Dataflow . Nesta etapa, definiremos o orquestrador Kubeflow para usar o fluxo de dados como back-end de processamento de dados para o Apache Beam.

Clique duas vezes em pipeline para o diretório mudança, e dê um duplo clique para abrir configs.py . Uncomment a definição de GOOGLE_CLOUD_REGION e DATAFLOW_BEAM_PIPELINE_ARGS .

Mude o diretório um nível acima. Clique no nome do diretório acima da lista de arquivos. O nome do diretório é o nome do gasoduto que é my_pipeline se você não mudar.

Duplo-Clique para abrir kubeflow_runner.py . Descomente beam_pipeline_args . (Também certifique-se de comentar atuais beam_pipeline_args que você adicionou na etapa 7.)

Agora o pipeline está pronto para usar o Dataflow. Atualize o pipeline e crie uma execução de execução como fizemos nas etapas 5 e 6.

!tfx pipeline update \
--pipeline-path=kubeflow_runner.py \
--endpoint={ENDPOINT}
!tfx run create --pipeline-name {PIPELINE_NAME} --endpoint={ENDPOINT}
CLI
Updating pipeline
Detected Beam.
[WARNING] Default engine will be changed to "local" in the near future.
Use --engine flag if you intend to use a different orchestrator.
beam runner not found in dsl.
CLI
Creating a run for pipeline: my_pipeline
Detected Beam.
[WARNING] Default engine will be changed to "local" in the near future.
Use --engine flag if you intend to use a different orchestrator.
Pipeline "my_pipeline" does not exist.

Você pode encontrar seus empregos Dataflow em Dataflow em Cloud Console .

Passo 9. (Opcional) Tente Training Platform AI Nuvem e Previsão com KFP

Interopera TFX com vários serviços GCP gerenciados, como Platform AI Nuvem de Formação e Previsão . Você pode definir o seu Trainer componente para usar Nuvem AI Plataforma de Formação, um serviço gerenciado para treinar modelos ML. Além disso, quando o modelo é construído e pronto para ser servido, você pode empurrar o seu modelo para Cloud Platform AI Previsão para servir. Neste passo, vamos definir o nosso Trainer e Pusher componente para usar serviços de nuvem AI plataforma.

Antes de editar arquivos, você pode primeiro tem que permitir AI Platform Training & Prediction API.

Clique duas vezes em pipeline para o diretório mudança, e dê um duplo clique para abrir configs.py . Uncomment a definição de GOOGLE_CLOUD_REGION , GCP_AI_PLATFORM_TRAINING_ARGS e GCP_AI_PLATFORM_SERVING_ARGS . Usaremos nossa imagem recipiente personalizado construído para treinar um modelo no AI Plataforma de Formação Nuvem, por isso, devemos definir masterConfig.imageUri em GCP_AI_PLATFORM_TRAINING_ARGS para o mesmo valor que CUSTOM_TFX_IMAGE acima.

Altere o diretório um nível acima, e dê um duplo clique para abrir kubeflow_runner.py . Descomente ai_platform_training_args e ai_platform_serving_args .

Atualize o pipeline e crie uma execução de execução como fizemos nas etapas 5 e 6.

!tfx pipeline update \
--pipeline-path=kubeflow_runner.py \
--endpoint={ENDPOINT}
!tfx run create --pipeline-name {PIPELINE_NAME} --endpoint={ENDPOINT}
CLI
Updating pipeline
Detected Beam.
[WARNING] Default engine will be changed to "local" in the near future.
Use --engine flag if you intend to use a different orchestrator.
beam runner not found in dsl.
CLI
Creating a run for pipeline: my_pipeline
Detected Beam.
[WARNING] Default engine will be changed to "local" in the near future.
Use --engine flag if you intend to use a different orchestrator.
Pipeline "my_pipeline" does not exist.

Você pode encontrar seus trabalhos de formação em Nuvem AI Platform Jobs . Se seu pipeline concluída com êxito, você pode encontrar o seu modelo em Nuvem AI Plataforma Models .

Etapa 10. Ingerir SEUS dados para o pipeline

Fizemos um pipeline para um modelo usando o conjunto de dados Chicago Taxi. Agora é hora de colocar seus dados no pipeline.

Seus dados podem ser armazenados em qualquer lugar que seu pipeline possa acessar, incluindo GCS ou BigQuery. Você precisará modificar a definição do pipeline para acessar seus dados.

  1. Se seus dados são armazenados em arquivos, modificar o DATA_PATH em kubeflow_runner.py ou local_runner.py e configurá-lo para a localização de seus arquivos. Se seus dados são armazenados no BigQuery, modificar BIG_QUERY_QUERY no pipeline/configs.py para corretamente consulta para seus dados.
  2. Adicione recursos em models/features.py .
  3. Modificar models/preprocessing.py para transformar dados de entrada para o treinamento .
  4. Modificar models/keras/model.py e models/keras/constants.py para descrever o modelo ML .
    • Você também pode usar um modelo baseado em estimador. Mudança RUN_FN constante para models.estimator.model.run_fn no pipeline/configs.py .

Por favor, veja o guia componente instrutor para a introdução mais.

Limpando

Para limpar todos os recursos do Google Cloud utilizados neste projeto, você pode excluir o projeto do Google Cloud você usou para o tutorial.

Como alternativa, você pode limpar recursos individuais visitando cada consoles: