Crie um pipeline TFX para seus dados com o modelo Penguin


Este documento fornecerá instruções para criar um pipeline TensorFlow Extended (TFX) para seu próprio conjunto de dados usando o modelo de pinguim fornecido com o pacote TFX Python. O pipeline criado usará inicialmente o conjunto de dados do Palmer Penguins , mas transformaremos o pipeline do seu conjunto de dados.


  • Linux/Mac OS
  • Python 3.6-3.8
  • Notebook Jupyter

Etapa 1. Copie o modelo predefinido para o diretório do projeto.

Nesta etapa, criaremos um diretório e arquivos de projeto de pipeline de trabalho copiando arquivos do modelo de pinguim no TFX. Você pode pensar nisso como um andaime para seu projeto de pipeline TFX.

Atualizar Pip

Se estivermos executando no Colab, devemos nos certificar de que temos a versão mais recente do Pip. É claro que os sistemas locais podem ser atualizados separadamente.

import sys
if 'google.colab' in sys.modules:
  !pip install --upgrade pip

Instale o pacote necessário

Primeiro, instale o TFX e o TensorFlow Model Analysis (TFMA).

pip install -U tfx tensorflow-model-analysis

Vamos verificar as versões do TFX.

import tensorflow as tf
import tensorflow_model_analysis as tfma
import tfx

print('TF version: {}'.format(tf.__version__))
print('TFMA version: {}'.format(tfma.__version__))
print('TFX version: {}'.format(tfx.__version__))
TF version: 2.7.1
TFMA version: 0.37.0
TFX version: 1.6.0

Estamos prontos para criar um pipeline.

Defina PROJECT_DIR para o destino apropriado para seu ambiente. O valor padrão é ~/imported/${PIPELINE_NAME} , que é apropriado para o ambiente do Google Cloud AI Platform Notebook .

Você pode dar um nome diferente ao seu pipeline alterando o PIPELINE_NAME abaixo. Este também se tornará o nome do diretório do projeto onde seus arquivos serão colocados.

import os
# Set this project directory to your new tfx pipeline project.
PROJECT_DIR=os.path.join(os.path.expanduser("~"), "imported", PIPELINE_NAME)

Copie os arquivos de modelo.

O TFX inclui o modelo penguin com o pacote TFX python. O modelo de penguin contém muitas instruções para trazer seu conjunto de dados para o pipeline, que é o objetivo deste tutorial.

O comando da CLI tfx template copy copia os arquivos de modelo predefinidos para o diretório do projeto.

# Set `PATH` to include user python binary directory and a directory containing `skaffold`.
%env PATH={PATH}:/home/jupyter/.local/bin

!tfx template copy \
  --pipeline-name={PIPELINE_NAME} \
  --destination-path={PROJECT_DIR} \
env: PATH=/tmpfs/src/tf_docs_env/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/games:/usr/local/games:/snap/bin:/opt/puppetlabs/bin:/opt/android-studio/current/bin:/usr/local/go/bin:/usr/local/go/packages/bin:/opt/kubernetes/client/bin/:/home/kbuilder/.local/bin:/home/jupyter/.local/bin
Copying penguin pipeline template -> /home/kbuilder/imported/my_pipeline/ -> /home/kbuilder/imported/my_pipeline/pipeline/ -> /home/kbuilder/imported/my_pipeline/pipeline/ -> /home/kbuilder/imported/my_pipeline/pipeline/ -> /home/kbuilder/imported/my_pipeline/models/ -> /home/kbuilder/imported/my_pipeline/models/ -> /home/kbuilder/imported/my_pipeline/models/ -> /home/kbuilder/imported/my_pipeline/models/ -> /home/kbuilder/imported/my_pipeline/models/ -> /home/kbuilder/imported/my_pipeline/models/ -> /home/kbuilder/imported/my_pipeline/models/ -> /home/kbuilder/imported/my_pipeline/models/ -> /home/kbuilder/imported/my_pipeline/ -> /home/kbuilder/imported/my_pipeline/

Altere o contexto do diretório de trabalho neste notebook para o diretório do projeto.


Procure seus arquivos de origem copiados

O modelo TFX fornece arquivos scaffold básicos para criar um pipeline, incluindo código-fonte Python e dados de exemplo. O modelo de penguin usa o mesmo conjunto de dados e modelo de ML do Palmer Penguins do exemplo do Pinguim .

Aqui está uma breve introdução a cada um dos arquivos Python.

  • pipeline - Este diretório contém a definição do pipeline
    • — define constantes comuns para executores de pipeline
    • — define componentes TFX e um pipeline
  • models - Este diretório contém definições de modelos de ML
    • , — define recursos para o modelo
    • , — define rotinas de pré-processamento para dados
    • — define as constantes do modelo
    • , — define o modelo de ML usando estruturas de ML como o TensorFlow
  • — define um executor para o ambiente local que usa o mecanismo de orquestração local
  • — define um executor para o mecanismo de orquestração do Kubeflow Pipelines

Por padrão, o modelo inclui apenas componentes TFX padrão. Se você precisar de algumas ações personalizadas, poderá criar componentes personalizados para seu pipeline. Consulte o guia de componentes personalizados do TFX para obter detalhes.

Arquivos de teste de unidade.

Você pode notar que existem alguns arquivos com em seu nome. Esses são testes de unidade do pipeline e é recomendável adicionar mais testes de unidade à medida que você implementa seus próprios pipelines. Você pode executar testes de unidade fornecendo o nome do módulo dos arquivos de teste com o sinalizador -m . Geralmente, você pode obter um nome de módulo excluindo a extensão .py e substituindo / por . . Por exemplo:

import sys
!{sys.executable} -m models.features_test
Running tests under Python 3.7.5: /tmpfs/src/tf_docs_env/bin/python
[ RUN      ] FeaturesTest.testLabelKey
INFO:tensorflow:time(__main__.FeaturesTest.testLabelKey): 0.0s
I0203 11:08:46.306882 140258321348416] time(__main__.FeaturesTest.testLabelKey): 0.0s
[       OK ] FeaturesTest.testLabelKey
[ RUN      ] FeaturesTest.test_session
[  SKIPPED ] FeaturesTest.test_session
Ran 2 tests in 0.001s

OK (skipped=1)

Crie um pipeline TFX no ambiente local.

O TFX oferece suporte a vários mecanismos de orquestração para executar pipelines. Usaremos o mecanismo de orquestração local. O mecanismo de orquestração local é executado sem outras dependências e é adequado para desenvolvimento e depuração porque é executado em ambiente local em vez de depender de clusters de computação remotos.

Usaremos para executar seu pipeline usando o orquestrador local. Você precisa criar um pipeline antes de executá-lo. Você pode criar um pipeline com o comando pipeline create .

tfx pipeline create --engine=local
Creating pipeline
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
Pipeline "my_pipeline" created successfully.

O comando pipeline create registra seu pipeline definido em sem realmente executá-lo.

Você executará o pipeline criado com o comando run create nas etapas a seguir.

Etapa 2. Faça a ingestão de SEUS dados para o pipeline.

O pipeline inicial ingere o conjunto de dados do pinguim que está incluído no modelo. Você precisa colocar seus dados no pipeline, e a maioria dos pipelines do TFX começa com o componente ExampleGen.

Escolha um ExemploGen

Seus dados podem ser armazenados em qualquer lugar que seu pipeline possa acessar, em um sistema de arquivos local ou distribuído ou em um sistema que possa ser consultado. O TFX fornece vários componentes ExampleGen para trazer seus dados para um pipeline do TFX. Você pode escolher um dos seguintes componentes de geração de exemplo.

Você também pode criar seu próprio ExampleGen, por exemplo, tfx inclui um ExecampleGen personalizado que usa Presto como fonte de dados. Consulte o guia para obter mais informações sobre como usar e desenvolver executores personalizados.

Depois de decidir qual ExampleGen usar, você precisará modificar a definição do pipeline para usar seus dados.

  1. Modifique o DATA_PATH em e configure-o para o local de seus arquivos.

    • Se você tiver arquivos em ambiente local, especifique o caminho. Esta é a melhor opção para desenvolver ou depurar um pipeline.
    • Se os arquivos estiverem armazenados no GCS, você poderá usar um caminho começando com gs://{bucket_name}/... . Verifique se você pode acessar o GCS de seu terminal, por exemplo, usando gsutil . Siga o guia de autorização no Google Cloud, se necessário.
    • Se você quiser usar um ExampleGen baseado em consulta, como BigQueryExampleGen, precisará de uma instrução de consulta para selecionar dados da fonte de dados. Há mais algumas coisas que você precisa definir para usar o Google Cloud BigQuery como fonte de dados.
    • Em pipeline/ :
      • Altere GOOGLE_CLOUD_PROJECT e GCS_BUCKET_NAME para seu projeto do GCP e nome do bucket. O bucket deve existir antes de executarmos o pipeline.
      • Remova o comentário e defina a variável BIG_QUERY_QUERY para sua instrução de consulta .
    • Em :
      • Comente o argumento data_path e descomente o argumento da query em pipeline.create_pipeline() .
    • Em pipeline/ :
      • Comente o argumento data_path e descomente o argumento da query em create_pipeline() .
      • Use BigQueryExampleGen em vez de CsvExampleGen.
  2. Substitua CsvExampleGen existente para sua classe ExampleGen em pipeline/ . Cada classe ExampleGen tem uma assinatura diferente. Consulte o guia do componente ExampleGen para obter mais detalhes. Não se esqueça de importar os módulos necessários com instruções de import em pipeline/ .

O pipeline inicial consiste em quatro componentes, ExampleGen , StatisticsGen , SchemaGen e ExampleValidator . Não precisamos alterar nada para StatisticsGen , SchemaGen e ExampleValidator . Vamos executar o pipeline pela primeira vez.

# Update and run the pipeline.
!tfx pipeline update --engine=local \
 && tfx run create --engine=local --pipeline_name={PIPELINE_NAME}
Quando essa execução for executada com êxito, você terá criado e executado seu primeiro pipeline TFX para seu modelo. Parabéns!

Seu novo modelo estará localizado em algum lugar no diretório de saída, mas seria melhor ter um modelo em local fixo ou serviço fora do pipeline do TFX que contenha muitos resultados provisórios. Melhor ainda com a avaliação contínua do modelo construído que é fundamental nos sistemas de produção de ML. Veremos como a avaliação contínua e as implantações funcionam no TFX na próxima etapa.

Etapa 5. (Opcional) Avalie o modelo com o Evaluator e publique com o pusher.

O componente Evaluator avalia continuamente cada modelo criado do Trainer e o Pusher copia o modelo para um local predefinido no sistema de arquivos ou até mesmo para os modelos do Google Cloud AI Platform .

Adiciona o componente Avaliador ao pipeline.

No arquivo pipeline/ :

  1. Remova o comentário de # components.append(model_resolver) para adicionar o resolvedor de modelo mais recente ao pipeline. O avaliador pode ser usado para comparar um modelo com o modelo de linha de base antigo que passou pelo avaliador na última execução do pipeline. LatestBlessedModelResolver encontra o modelo mais recente que passou no Evaluator.
  2. Defina o tfma.MetricsSpec adequado para o seu modelo. A avaliação pode ser diferente para cada modelo de ML. No modelo de pinguim, SparseCategoricalAccuracy foi usado porque estamos resolvendo um problema de classificação de várias categorias. Você também precisa especificar tfma.SliceSpec para analisar seu modelo para fatias específicas. Para obter mais detalhes, consulte o guia do componente do avaliador .
  3. Remova o comentário # components.append(evaluator) para adicionar o componente ao pipeline.

Examinar saída do avaliador

Esta etapa requer a extensão de notebook Jupyter do TensorFlow Model Analysis (TFMA). Observe que a versão da extensão do notebook TFMA deve ser idêntica à versão do pacote TFMA python.

O comando a seguir instalará a extensão do notebook TFMA do registro NPM. Pode levar vários minutos para ser concluído.

Se a instalação for concluída, recarregue seu navegador para que a extensão tenha efeito.

with metadata.Metadata(metadata_connection_config) as metadata_handler:
  # Search all aritfacts from the previous pipeline run.
  artifacts = get_latest_artifacts(, PIPELINE_NAME)
  model_evaluation_artifacts = find_latest_artifacts_by_type(, artifacts,
if model_evaluation_artifacts:
  tfma_result = tfma.load_eval_result(model_evaluation_artifacts[0].uri)

Adiciona o componente Pusher ao pipeline.

Se o modelo parece promissor, precisamos publicá-lo. O componente pusher pode publicar o modelo em um local no sistema de arquivos ou nos modelos do GCP AI Platform usando um executor personalizado .

O componente Evaluator avalia continuamente cada modelo criado do Trainer e o Pusher copia o modelo para um local predefinido no sistema de arquivos ou até mesmo para os modelos do Google Cloud AI Platform .

  1. Em , defina SERVING_MODEL_DIR para um diretório a ser publicado.
  2. No arquivo pipeline/ , remova o comentário # components.append(pusher) para adicionar o Pusher ao pipeline.

Você pode atualizar o pipeline e executá-lo novamente.

INFO:absl:MetadataStore with DB connection initialized
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Going to run a new execution 15
INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=15, input_dict={'statistics': [Artifact(artifact: id: 14
type_id: 17
uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/14"
properties {
  key: "split_names"
  value {
    string_value: "[\"train\", \"eval\"]"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:48.556314:StatisticsGen:statistics:0"
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
state: LIVE
create_time_since_epoch: 1643886653128
last_update_time_since_epoch: 1643886653128
, artifact_type: id: 17
name: "ExampleStatistics"
properties {
  key: "span"
  value: INT
properties {
  key: "split_names"
  value: STRING
base_type: STATISTICS
)]}, output_dict=defaultdict(<class 'list'>, {'schema': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/SchemaGen/schema/15"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:48.556314:SchemaGen:schema:0"
, artifact_type: name: "Schema"
)]}), exec_properties={'infer_feature_shape': 1, 'exclude_splits': '[]'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/executor_execution/15/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/stateful_working_dir/2022-02-03T11:10:48.556314', tmp_dir='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/executor_execution/15/.temp/', pipeline_node=node_info {
  type {
    name: "tfx.components.schema_gen.component.SchemaGen"
    base_type: PROCESS
  id: "SchemaGen"
contexts {
  contexts {
    type {
      name: "pipeline"
    name {
      field_value {
        string_value: "my_pipeline"
  contexts {
    type {
      name: "pipeline_run"
    name {
      field_value {
        string_value: "2022-02-03T11:10:48.556314"
  contexts {
    type {
      name: "node"
    name {
      field_value {
        string_value: "my_pipeline.SchemaGen"
inputs {
  inputs {
    key: "statistics"
    value {
      channels {
        producer_node_query {
          id: "StatisticsGen"
        context_queries {
          type {
            name: "pipeline"
          name {
            field_value {
              string_value: "my_pipeline"
        context_queries {
          type {
            name: "pipeline_run"
          name {
            field_value {
              string_value: "2022-02-03T11:10:48.556314"
        context_queries {
          type {
            name: "node"
          name {
            field_value {
              string_value: "my_pipeline.StatisticsGen"
        artifact_query {
          type {
            name: "ExampleStatistics"
            base_type: STATISTICS
        output_key: "statistics"
      min_count: 1
outputs {
  outputs {
    key: "schema"
    value {
      artifact_spec {
        type {
          name: "Schema"
parameters {
  parameters {
    key: "exclude_splits"
    value {
      field_value {
        string_value: "[]"
  parameters {
    key: "infer_feature_shape"
    value {
      field_value {
        int_value: 1
upstream_nodes: "StatisticsGen"
execution_options {
  caching_options {
, pipeline_info=id: "my_pipeline"
, pipeline_run_id='2022-02-03T11:10:48.556314')
INFO:absl:Processing schema from statistics for split train.
INFO:absl:Processing schema from statistics for split eval.
INFO:absl:Schema written to ./tfx_pipeline_output/my_pipeline/SchemaGen/schema/15/schema.pbtxt.
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 15 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'schema': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/SchemaGen/schema/15"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:48.556314:SchemaGen:schema:0"
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
, artifact_type: name: "Schema"
)]}) for execution 15
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component SchemaGen is finished.

Você deve encontrar seu novo modelo em SERVING_MODEL_DIR .

Etapa 6. (Opcional) Implante seu pipeline no Kubeflow Pipelines no GCP.

Como mencionado anteriormente, é bom para fins de depuração ou desenvolvimento, mas não é a melhor solução para cargas de trabalho de produção. Nesta etapa, implantaremos o pipeline no Kubeflow Pipelines no Google Cloud.


Precisamos do pacote kfp python e do programa skaffold para implantar um pipeline em um cluster Kubeflow Pipelines.

Você também precisa de um cluster Kubeflow Pipelines para executar o pipeline. Siga as etapas 1 e 2 no tutorial TFX on Cloud AI Platform Pipelines .

Quando o cluster estiver pronto, abra o painel do pipeline clicando em Open Pipelines Dashboard na página Pipelines do console do Google Cloud . A URL desta página é ENDPOINT para solicitar uma execução de pipeline. O valor do endpoint é tudo no URL após https://, até e incluindo Coloque seu endpoint no seguinte bloco de código.

Para executar nosso código em um cluster do Kubeflow Pipelines, precisamos empacotar nosso código em uma imagem de contêiner. A imagem será criada automaticamente durante a implantação de nosso pipeline, e você só precisa definir um nome e um registro de contêiner para sua imagem. Em nosso exemplo, usaremos o registro de contêiner do Google e o tfx-pipeline .

Defina a localização dos dados.

Seus dados devem estar acessíveis no cluster Kubeflow Pipelines. Se você usou dados em seu ambiente local, talvez seja necessário fazer upload deles para um armazenamento remoto, como o Google Cloud Storage. Por exemplo, podemos fazer upload de dados de pinguim para um bucket padrão que é criado automaticamente quando um cluster Kubeflow Pipelines é implantado como a seguir.

Atualize o local de dados armazenado em DATA_PATH em .

Se você estiver usando BigQueryExampleGen, não há necessidade de fazer upload do arquivo de dados, mas certifique-se de que use a mesma query e argumento beam_pipeline_args para a função pipeline.create_pipeline() .

Implante o pipeline.

Se tudo estiver pronto, você poderá criar um pipeline usando o comando tfx pipeline create .

Agora inicie uma execução com o pipeline recém-criado usando o comando tfx run create .

Ou você também pode executar o pipeline no painel Kubeflow Pipelines. A nova execução será listada em Experiments no painel do Kubeflow Pipelines. Clicar no experimento permitirá monitorar o progresso e visualizar os artefatos criados durante a execução.

Se você estiver interessado em executar seu pipeline no Kubeflow Pipelines, encontre mais instruções no tutorial TFX on Cloud AI Platform Pipelines .


Para limpar todos os recursos do Google Cloud usados ​​nesta etapa, você pode excluir o projeto do Google Cloud usado no tutorial.

Como alternativa, você pode limpar recursos individuais visitando cada console: