Crie um pipeline TFX usando modelos com orquestrador local

Introdução

Este documento fornecerá instruções para criar um TensorFlow Extensão (TFX) gasoduto usando modelos que são fornecidos com o pacote de TFX Python. A maioria das instruções são comandos Linux shell, e as células de código Notebook Jupyter que invocam os comandos usando correspondente ! são fornecidos.

Você vai construir um gasoduto usando Taxi Viagens conjunto de dados liberado pela cidade de Chicago. Nós o encorajamos fortemente a tentar construir seu próprio pipeline usando seu conjunto de dados, utilizando este pipeline como uma linha de base.

Vamos construir um pipeline que funciona no ambiente local. Se você está interessado em usar Kubeflow orquestrador no Google Cloud, consulte TFX em Nuvem AI Platform Pipelines tutorial .

Pré-requisitos

  • Linux / MacOS
  • Python> = 3.5.3

Você pode obter todos os pré-requisitos facilmente executar este notebook no Google Colab .

Etapa 1. Configure seu ambiente.

Ao longo deste documento, apresentaremos os comandos duas vezes. Uma vez como um comando de shell para copiar e colar, uma vez como uma célula de notebook jupyter. Se você estiver usando o Colab, basta pular o bloco de script de shell e executar as células do notebook.

Você deve preparar um ambiente de desenvolvimento para construir um pipeline.

Instale tfx pacote python. Recomendamos o uso de virtualenv no ambiente local. Você pode usar o seguinte snippet de script de shell para configurar seu ambiente.

# Create a virtualenv for tfx.
virtualenv -p python3 venv
source venv/bin/activate
# Install python packages.
python -m pip install --upgrade "tfx<2"

Se você estiver usando o colab:

import sys
!{sys.executable} -m pip install --upgrade "tfx<2"

ERROR: some-package 0.some_version.1 tem o requisito other-package! = 2.0., <3,> = 1.15, mas você terá other-package 2.0.0 que é incompatível.

Por favor, ignore esses erros neste momento.

# Set `PATH` to include user python binary directory.
HOME=%env HOME
PATH=%env PATH
%env PATH={PATH}:{HOME}/.local/bin
env: PATH=/tmpfs/src/tf_docs_env/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/games:/usr/local/games:/snap/bin:/opt/puppetlabs/bin:/opt/android-studio/current/bin:/usr/local/go/bin:/usr/local/go/packages/bin:/opt/kubernetes/client/bin/:/home/kbuilder/.local/bin:/home/kbuilder/.local/bin

Vamos verificar a versão do TFX.

python -c "from tfx import version ; print('TFX version: {}'.format(version.__version__))"
python3 -c "from tfx import version ; print('TFX version: {}'.format(version.__version__))"
TFX version: 1.4.0

E está feito. Estamos prontos para criar um pipeline.

Etapa 2. Copie o modelo predefinido para o diretório do projeto.

Nesta etapa, criaremos um diretório e arquivos de projeto de pipeline de trabalho, copiando arquivos adicionais de um modelo predefinido.

Você pode dar o seu pipeline de um nome diferente, alterando a PIPELINE_NAME abaixo. Este também se tornará o nome do diretório do projeto onde seus arquivos serão colocados.

export PIPELINE_NAME="my_pipeline"
export PROJECT_DIR=~/tfx/${PIPELINE_NAME}
PIPELINE_NAME="my_pipeline"
import os
# Create a project directory under Colab content directory.
PROJECT_DIR=os.path.join(os.sep,"content",PIPELINE_NAME)

TFX inclui o taxi modelo com o pacote de TFX python. Se você está planejando resolver um problema de previsão baseada em pontos, incluindo classificação e regressão, este modelo pode ser usado como ponto de partida.

Os tfx template copy CLI comando copia predefinidos arquivos de modelo para o diretório do projeto.

tfx template copy \
   --pipeline_name="${PIPELINE_NAME}" \
   --destination_path="${PROJECT_DIR}" \
   --model=taxi
!tfx template copy \
  --pipeline_name={PIPELINE_NAME} \
  --destination_path={PROJECT_DIR} \
  --model=taxi
CLI
Copying taxi pipeline template
Traceback (most recent call last):
  File "/tmpfs/src/tf_docs_env/bin/tfx", line 8, in <module>
    sys.exit(cli_group())
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 829, in __call__
    return self.main(*args, **kwargs)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 782, in main
    rv = self.invoke(ctx)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 1259, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 1259, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 1066, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 610, in invoke
    return callback(*args, **kwargs)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/decorators.py", line 73, in new_func
    return ctx.invoke(f, obj, *args, **kwargs)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 610, in invoke
    return callback(*args, **kwargs)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tfx/tools/cli/commands/template.py", line 66, in copy
    template_handler.copy_template(ctx.flags_dict)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tfx/tools/cli/handler/template_handler.py", line 171, in copy_template
    replace_dict)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tfx/tools/cli/handler/template_handler.py", line 110, in _copy_and_replace_placeholder_dir
    fileio.makedirs(dst)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tfx/dsl/io/fileio.py", line 78, in makedirs
    _get_filesystem(path).makedirs(path)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tfx/dsl/io/plugins/tensorflow_gfile.py", line 71, in makedirs
    tf.io.gfile.makedirs(path)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow/python/lib/io/file_io.py", line 514, in recursive_create_dir_v2
    _pywrap_file_io.RecursivelyCreateDir(compat.path_to_bytes(path))
tensorflow.python.framework.errors_impl.PermissionDeniedError: /content; Permission denied

Altere o contexto do diretório de trabalho neste bloco de notas para o diretório do projeto.

cd ${PROJECT_DIR}
%cd {PROJECT_DIR}
[Errno 2] No such file or directory: '/content/my_pipeline'
/tmpfs/src/temp/docs/tutorials/tfx

Etapa 3. Navegue pelos arquivos de origem copiados.

O modelo TFX fornece arquivos scaffold básicos para construir um pipeline, incluindo código-fonte Python, dados de amostra e Notebooks Jupyter para analisar a saída do pipeline. O taxi modelo usa o mesmo conjunto de dados Chicago Taxi e modelo ML como o Tutorial do fluxo de ar .

No Google Colab, você pode navegar pelos arquivos clicando em um ícone de pasta à esquerda. Os arquivos devem ser copiados sob a directoy projeto, cujo nome é my_pipeline neste caso. Você pode clicar nos nomes dos diretórios para ver o conteúdo do diretório e clicar duas vezes nos nomes dos arquivos para abri-los.

Aqui está uma breve introdução a cada um dos arquivos Python.

  • pipeline - Este diretório contém a definição do gasoduto
    • configs.py - define constantes comuns para os corredores de oleodutos
    • pipeline.py - define componentes TFX e um oleoduto
  • models - Esta pasta contém definições de modelo ML.
    • features.py , features_test.py - define recursos para o modelo
    • preprocessing.py , preprocessing_test.py - define pré-processamento trabalhos usando tf::Transform
    • estimator - Este diretório contém um modelo baseado Estimador.
      • constants.py - define constantes do modelo
      • model.py , model_test.py - define modelo DNN usando TF estimador
    • keras - Esta pasta contém um modelo baseado Keras.
      • constants.py - define constantes do modelo
      • model.py , model_test.py - define modelo DNN usando Keras
  • local_runner.py , kubeflow_runner.py - definir os corredores para cada mecanismo de orquestração

Você pode notar que existem alguns arquivos com _test.py em seu nome. Esses são testes de unidade do pipeline e é recomendável adicionar mais testes de unidade à medida que você implementa seus próprios pipelines. Você pode executar testes de unidade, fornecendo o nome do módulo de arquivos de teste com -m bandeira. Você normalmente pode obter um nome de módulo, eliminando .py extensão e substituindo / com . . Por exemplo:

python -m models.features_test
{sys.executable} -m models.features_test
{sys.executable} -m models.keras.model_test
/tmpfs/src/tf_docs_env/bin/python: Error while finding module specification for 'models.features_test' (ModuleNotFoundError: No module named 'models')
/tmpfs/src/tf_docs_env/bin/python: Error while finding module specification for 'models.keras.model_test' (ModuleNotFoundError: No module named 'models')

Etapa 4. Execute seu primeiro pipeline TFX

Você pode criar um canal usando pipeline create comando.

tfx pipeline create --engine=local --pipeline_path=local_runner.py
tfx pipeline create --engine=local --pipeline_path=local_runner.py
CLI
Creating pipeline
Invalid pipeline path: local_runner.py

Em seguida, você pode executar o pipeline criado usando run create comando.

tfx run create --engine=local --pipeline_name="${PIPELINE_NAME}"
tfx run create --engine=local --pipeline_name={PIPELINE_NAME}
CLI
Creating a run for pipeline: my_pipeline
/tmpfs/src/tf_docs_env/bin/python: can't open file 'local_runner.py': [Errno 2] No such file or directory
Error while running "/tmpfs/src/tf_docs_env/bin/python local_runner.py"

Se for bem sucedido, você verá Component CsvExampleGen is finished. Quando você copia o modelo, apenas um componente, CsvExampleGen, é incluído no pipeline.

Etapa 5. Adicionar componentes para validação de dados.

Nesta etapa, você irá adicionar componentes para validação de dados, incluindo StatisticsGen , SchemaGen e ExampleValidator . Se você está interessado em validação de dados, consulte Comece com Tensorflow validação de dados .

Vamos modificar a definição gasoduto copiado no pipeline/pipeline.py . Se você estiver trabalhando em seu ambiente local, use seu editor favorito para editar o arquivo. Se você está trabalhando no Google Colab,

Clique no ícone de pasta à esquerda para abrir Files view.

Clique my_pipeline para abrir o diretório e clique pipeline diretório para duplo clique aberto e pipeline.py para abrir o arquivo.

Localizar e tire as 3 linhas que agregam StatisticsGen , SchemaGen e ExampleValidator ao gasoduto. (Dica: encontrar comentários contendo TODO(step 5): ).

Sua alteração será salva automaticamente em alguns segundos. Certifique-se de que o * marca na frente do pipeline.py desapareceu no título da página. Não há botão de salvar ou atalho para o editor de arquivos no Colab. Arquivos Python em editor de arquivo pode ser salvo para o ambiente de tempo de execução, mesmo em playground de modo.

Agora você precisa atualizar o pipeline existente com a definição de pipeline modificada. Use a tfx pipeline update comando para atualizar seu pipeline, seguido pelo tfx run create comando para criar um novo prazo de execução do seu pipeline atualizado.

# Update the pipeline
tfx pipeline update --engine=local --pipeline_path=local_runner.py
# You can run the pipeline the same way.
tfx run create --engine local --pipeline_name "${PIPELINE_NAME}"
# Update the pipeline
tfx pipeline update --engine=local --pipeline_path=local_runner.py
# You can run the pipeline the same way.
tfx run create --engine local --pipeline_name {PIPELINE_NAME}
CLI
Updating pipeline
Invalid pipeline path: local_runner.py
CLI
Creating a run for pipeline: my_pipeline
/tmpfs/src/tf_docs_env/bin/python: can't open file 'local_runner.py': [Errno 2] No such file or directory
Error while running "/tmpfs/src/tf_docs_env/bin/python local_runner.py"

Você deve ser capaz de ver o log de saída dos componentes adicionados. Nosso pipeline cria artefatos de saída em tfx_pipeline_output/my_pipeline diretório.

Etapa 6. Adicionar componentes para treinamento.

Nesta etapa, você irá adicionar componentes para a formação e validação do modelo, incluindo Transform , Trainer , Resolver , Evaluator , e Pusher .

Abrir pipeline/pipeline.py . Localizar e descomente 5 linhas que agregam Transform , Trainer , Resolver , Evaluator e Pusher ao gasoduto. (Dica: Pesquisar TODO(step 6): )

Como você fez antes, agora você precisa atualizar o pipeline existente com a definição de pipeline modificada. As instruções são as mesmas Passo 5. Atualize o gasoduto usando tfx pipeline update , e criar uma corrida de execução usando tfx run create .

tfx pipeline update --engine=local --pipeline_path=local_runner.py
tfx run create --engine local --pipeline_name "${PIPELINE_NAME}"
tfx pipeline update --engine=local --pipeline_path=local_runner.py
tfx run create --engine local --pipeline_name {PIPELINE_NAME}
CLI
Updating pipeline
Invalid pipeline path: local_runner.py
CLI
Creating a run for pipeline: my_pipeline
/tmpfs/src/tf_docs_env/bin/python: can't open file 'local_runner.py': [Errno 2] No such file or directory
Error while running "/tmpfs/src/tf_docs_env/bin/python local_runner.py"

Quando a execução dessa execução terminar com êxito, você terá criado e executado seu primeiro pipeline TFX usando o orquestrador local!

Passo 7. (Opcional) Tente BigQueryExampleGen.

[BigQuery] é um data warehouse em nuvem sem servidor, altamente escalonável e econômico. O BigQuery pode ser usado como fonte para exemplos de treinamento em TFX. Neste passo, vamos adicionar BigQueryExampleGen ao gasoduto.

Você precisa de uma Cloud Platform Google conta para usar o BigQuery. Prepare um projeto do GCP.

Faça o login em seu projeto usando a biblioteca auth colab ou gcloud utilidade.

# You need `gcloud` tool to login in local shell environment.
gcloud auth login
if 'google.colab' in sys.modules:
  from google.colab import auth
  auth.authenticate_user()
  print('Authenticated')

Você deve especificar o nome do projeto do GCP para acessar os recursos do BigQuery usando TFX. Set GOOGLE_CLOUD_PROJECT ambiente variável para o nome do projecto.

export GOOGLE_CLOUD_PROJECT=YOUR_PROJECT_NAME_HERE
# Set your project name below.
# WARNING! ENTER your project name before running this cell.
%env GOOGLE_CLOUD_PROJECT=YOUR_PROJECT_NAME_HERE
env: GOOGLE_CLOUD_PROJECT=YOUR_PROJECT_NAME_HERE

Abrir pipeline/pipeline.py . Comentário fora CsvExampleGen e descomente a linha que criar uma instância de BigQueryExampleGen . Você também precisa descomente query argumento do create_pipeline função.

Precisamos especificar qual projeto GCP usar para BigQuery novamente, e isso é feito através da criação --project em beam_pipeline_args ao criar um pipeline.

Abrir pipeline/configs.py . Uncomment a definição de BIG_QUERY__WITH_DIRECT_RUNNER_BEAM_PIPELINE_ARGS e BIG_QUERY_QUERY . Você deve substituir o ID do projeto e o valor da região neste arquivo pelos valores corretos para seu projeto do GCP.

Abrir local_runner.py . Descomente dois argumentos, query e beam_pipeline_args , por método create_pipeline ().

Agora o pipeline está pronto para usar o BigQuery como fonte de exemplo. Atualize o pipeline e crie uma execução como fizemos nas etapas 5 e 6.

tfx pipeline update --engine=local --pipeline_path=local_runner.py
tfx run create --engine local --pipeline_name {PIPELINE_NAME}
CLI
Updating pipeline
Invalid pipeline path: local_runner.py
CLI
Creating a run for pipeline: my_pipeline
/tmpfs/src/tf_docs_env/bin/python: can't open file 'local_runner.py': [Errno 2] No such file or directory
Error while running "/tmpfs/src/tf_docs_env/bin/python local_runner.py"

O que vem a seguir: Ingerir SEUS dados no pipeline.

Fizemos um pipeline para um modelo usando o conjunto de dados Chicago Taxi. Agora é hora de colocar seus dados no pipeline.

Seus dados podem ser armazenados em qualquer lugar que seu pipeline possa acessar, incluindo GCS ou BigQuery. Você precisará modificar a definição do pipeline para acessar seus dados.

  1. Se seus dados são armazenados em arquivos, modificar o DATA_PATH em kubeflow_runner.py ou local_runner.py e configurá-lo para a localização de seus arquivos. Se seus dados são armazenados no BigQuery, modificar BIG_QUERY_QUERY no pipeline/configs.py para corretamente consulta para seus dados.
  2. Adicione recursos em models/features.py .
  3. Modificar models/preprocessing.py para transformar dados de entrada para o treinamento .
  4. Modificar models/keras/model.py e models/keras/constants.py para descrever o modelo ML .
    • Você também pode usar um modelo baseado em estimador. Mudança RUN_FN constante para models.estimator.model.run_fn no pipeline/configs.py .

Por favor, veja o guia componente instrutor para a introdução mais.