Crie um pipeline TFX usando modelos com orquestrador de feixe

Introdução

Este documento fornecerá instruções para criar um TensorFlow Extensão (TFX) gasoduto usando modelos que são fornecidos com o pacote de TFX Python. A maioria das instruções são comandos Linux shell, e as células de código Notebook Jupyter que invocam os comandos usando correspondente ! são fornecidos.

Você vai construir um gasoduto usando Taxi Viagens conjunto de dados liberado pela cidade de Chicago. Recomendamos que você tente criar seu próprio pipeline usando seu conjunto de dados usando esse pipeline como linha de base.

Nós vamos construir um gasoduto usando Apache feixe Orchestrator . Se você está interessado em usar Kubeflow orquestrador no Google Cloud, consulte TFX em Nuvem AI Platform Pipelines tutorial .

Pré-requisitos

  • Linux/Mac OS
  • Python >= 3.5.3

Você pode obter todos os pré-requisitos facilmente executar este notebook no Google Colab .

Etapa 1. Configure seu ambiente.

Ao longo deste documento, apresentaremos comandos duas vezes. Uma vez como um comando shell pronto para copiar e colar, uma vez como uma célula de notebook jupyter. Se você estiver usando o Colab, basta pular o bloco de script de shell e executar as células do notebook.

Você deve preparar um ambiente de desenvolvimento para construir um pipeline.

Instale tfx pacote python. Recomendamos o uso de virtualenv no ambiente local. Você pode usar o seguinte trecho de script de shell para configurar seu ambiente.

# Create a virtualenv for tfx.
virtualenv -p python3 venv
source venv/bin/activate
# Install python packages.
python -m pip install -q --user --upgrade tfx==0.23.0

Se você estiver usando colab:

import sys
!{sys.executable} -m pip install -q --user --upgrade -q tfx==0.23.0

ERRO: some-package 0.some_version.1 tem o requisito other-package!=2.0.,<3,>=1.15, mas você terá other-package 2.0.0 que é incompatível.

Por favor, ignore estes erros neste momento.

# Set `PATH` to include user python binary directory.
HOME=%env HOME
PATH=%env PATH
%env PATH={PATH}:{HOME}/.local/bin
env: PATH=/tmpfs/src/tf_docs_env/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/games:/usr/local/games:/snap/bin:/opt/puppetlabs/bin:/opt/android-studio/current/bin:/usr/local/go/bin:/usr/local/go/packages/bin:/opt/kubernetes/client/bin/:/home/kbuilder/.local/bin:/home/kbuilder/.local/bin

Vamos verificar a versão do TFX.

python -c "import tfx; print('TFX version: {}'.format(tfx.__version__))"
python3 -c "import tfx; print('TFX version: {}'.format(tfx.__version__))"
TFX version: 0.23.0

E, está feito. Estamos prontos para criar um pipeline.

Etapa 2. Copie o modelo predefinido para o diretório do projeto.

Nesta etapa, criaremos um diretório e arquivos de projeto de pipeline de trabalho copiando arquivos adicionais de um modelo predefinido.

Você pode dar o seu pipeline de um nome diferente, alterando a PIPELINE_NAME abaixo. Este também se tornará o nome do diretório do projeto onde seus arquivos serão colocados.

export PIPELINE_NAME="my_pipeline"
export PROJECT_DIR=~/tfx/${PIPELINE_NAME}
PIPELINE_NAME="my_pipeline"
import os
# Create a project directory under Colab content directory.
PROJECT_DIR=os.path.join(os.sep,"content",PIPELINE_NAME)

TFX inclui o taxi modelo com o pacote de TFX python. Se você planeja resolver um problema de previsão pontual, incluindo classificação e regressão, esse modelo pode ser usado como ponto de partida.

Os tfx template copy CLI comando copia predefinidos arquivos de modelo para o diretório do projeto.

tfx template copy \
   --pipeline_name="${PIPELINE_NAME}" \
   --destination_path="${PROJECT_DIR}" \
   --model=taxi
!tfx template copy \
  --pipeline_name={PIPELINE_NAME} \
  --destination_path={PROJECT_DIR} \
  --model=taxi
2020-09-07 09:09:40.131982: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
CLI
Copying taxi pipeline template
Traceback (most recent call last):
  File "/home/kbuilder/.local/bin/tfx", line 8, in <module>
    sys.exit(cli_group())
  File "/home/kbuilder/.local/lib/python3.6/site-packages/click/core.py", line 829, in __call__
    return self.main(*args, **kwargs)
  File "/home/kbuilder/.local/lib/python3.6/site-packages/click/core.py", line 782, in main
    rv = self.invoke(ctx)
  File "/home/kbuilder/.local/lib/python3.6/site-packages/click/core.py", line 1259, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/home/kbuilder/.local/lib/python3.6/site-packages/click/core.py", line 1259, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/home/kbuilder/.local/lib/python3.6/site-packages/click/core.py", line 1066, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/home/kbuilder/.local/lib/python3.6/site-packages/click/core.py", line 610, in invoke
    return callback(*args, **kwargs)
  File "/home/kbuilder/.local/lib/python3.6/site-packages/click/decorators.py", line 73, in new_func
    return ctx.invoke(f, obj, *args, **kwargs)
  File "/home/kbuilder/.local/lib/python3.6/site-packages/click/core.py", line 610, in invoke
    return callback(*args, **kwargs)
  File "/home/kbuilder/.local/lib/python3.6/site-packages/tfx/tools/cli/commands/template.py", line 73, in copy
    template_handler.copy_template(ctx.flags_dict)
  File "/home/kbuilder/.local/lib/python3.6/site-packages/tfx/tools/cli/handler/template_handler.py", line 168, in copy_template
    replace_dict)
  File "/home/kbuilder/.local/lib/python3.6/site-packages/tfx/tools/cli/handler/template_handler.py", line 107, in _copy_and_replace_placeholder_dir
    tf.io.gfile.makedirs(dst)
  File "/tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow/python/lib/io/file_io.py", line 480, in recursive_create_dir_v2
    _pywrap_file_io.RecursivelyCreateDir(compat.as_bytes(path))
tensorflow.python.framework.errors_impl.PermissionDeniedError: /content; Permission denied

Altere o contexto do diretório de trabalho neste notebook para o diretório do projeto.

cd ${PROJECT_DIR}
%cd {PROJECT_DIR}
[Errno 2] No such file or directory: '/content/my_pipeline'
/tmpfs/src/temp/docs/tutorials/tfx

Etapa 3. Navegue pelos arquivos de origem copiados.

O modelo TFX fornece arquivos scaffold básicos para criar um pipeline, incluindo código-fonte Python, dados de exemplo e Jupyter Notebooks para analisar a saída do pipeline. O taxi modelo usa o mesmo conjunto de dados Chicago Taxi e modelo ML como o Tutorial do fluxo de ar .

No Google Colab, você pode navegar pelos arquivos clicando em um ícone de pasta à esquerda. Os arquivos devem ser copiados sob a directoy projeto, cujo nome é my_pipeline neste caso. Você pode clicar nos nomes dos diretórios para ver o conteúdo do diretório e clicar duas vezes nos nomes dos arquivos para abri-los.

Aqui está uma breve introdução a cada um dos arquivos Python.

  • pipeline - Este diretório contém a definição do gasoduto
    • configs.py - define constantes comuns para os corredores de oleodutos
    • pipeline.py - define componentes TFX e um oleoduto
  • models - Este diretório contém ML definições de modelo.
    • features.py , features_test.py - define recursos para o modelo
    • preprocessing.py , preprocessing_test.py - define pré-processamento trabalhos usando tf::Transform
    • estimator - Este diretório contém um modelo baseado Estimador.
      • constants.py - define constantes do modelo
      • model.py , model_test.py - define modelo DNN usando TF estimador
    • keras - Esta pasta contém um modelo baseado Keras.
      • constants.py - define constantes do modelo
      • model.py , model_test.py - define modelo DNN usando Keras
  • beam_dag_runner.py , kubeflow_dag_runner.py - definir os corredores para cada mecanismo de orquestração

Você pode notar que existem alguns arquivos com _test.py em seu nome. Esses são testes de unidade do pipeline e é recomendável adicionar mais testes de unidade à medida que você implementa seus próprios pipelines. Você pode executar testes de unidade, fornecendo o nome do módulo de arquivos de teste com -m bandeira. Você normalmente pode obter um nome de módulo, eliminando .py extensão e substituindo / com . . Por exemplo:

python -m models.features_test
{sys.executable} -m models.features_test
{sys.executable} -m models.keras.model_test
/tmpfs/src/tf_docs_env/bin/python: Error while finding module specification for 'models.features_test' (ModuleNotFoundError: No module named 'models')
/tmpfs/src/tf_docs_env/bin/python: Error while finding module specification for 'models.keras.model_test' (ModuleNotFoundError: No module named 'models')

Etapa 4. Execute seu primeiro pipeline do TFX

Você pode criar um canal usando pipeline create comando.

tfx pipeline create --engine=beam --pipeline_path=beam_dag_runner.py
tfx pipeline create --engine=beam --pipeline_path=beam_dag_runner.py
2020-09-07 09:09:45.612839: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
CLI
Creating pipeline
Invalid pipeline path: beam_dag_runner.py

Em seguida, você pode executar o pipeline criado usando run create comando.

tfx run create --engine=beam --pipeline_name="${PIPELINE_NAME}"
tfx run create --engine=beam --pipeline_name={PIPELINE_NAME}
2020-09-07 09:09:50.725339: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
CLI
Creating a run for pipeline: my_pipeline
Pipeline "my_pipeline" does not exist.

Se for bem sucedido, você verá Component CsvExampleGen is finished. Quando você copia o modelo, apenas um componente, CsvExampleGen, é incluído no pipeline.

Etapa 5. Adicione componentes para validação de dados.

Nesta etapa, você irá adicionar componentes para validação de dados, incluindo StatisticsGen , SchemaGen e ExampleValidator . Se você está interessado em validação de dados, consulte Comece com Tensorflow validação de dados .

Vamos modificar a definição gasoduto copiado no pipeline/pipeline.py . Se você estiver trabalhando em seu ambiente local, use seu editor favorito para editar o arquivo. Se você estiver trabalhando no Google Colab,

Clique no ícone de pasta à esquerda para abrir Files view.

Clique my_pipeline para abrir o diretório e clique pipeline diretório para duplo clique aberto e pipeline.py para abrir o arquivo.

Localizar e tire as 3 linhas que agregam StatisticsGen , SchemaGen e ExampleValidator ao gasoduto. (Dica: encontrar comentários contendo TODO(step 5): ).

Sua alteração será salva automaticamente em alguns segundos. Certifique-se de que o * marca na frente do pipeline.py desapareceu no título da página. Não há botão salvar ou atalho para o editor de arquivos no Colab. Arquivos Python em editor de arquivo pode ser salvo para o ambiente de tempo de execução, mesmo em playground de modo.

Agora você precisa atualizar o pipeline existente com a definição de pipeline modificada. Use a tfx pipeline update comando para atualizar seu pipeline, seguido pelo tfx run create comando para criar um novo prazo de execução do seu pipeline atualizado.

# Update the pipeline
tfx pipeline update --engine=beam --pipeline_path=beam_dag_runner.py
# You can run the pipeline the same way.
tfx run create --engine beam --pipeline_name "${PIPELINE_NAME}"
# Update the pipeline
tfx pipeline update --engine=beam --pipeline_path=beam_dag_runner.py
# You can run the pipeline the same way.
tfx run create --engine beam --pipeline_name {PIPELINE_NAME}
2020-09-07 09:09:55.915484: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
CLI
Updating pipeline
Invalid pipeline path: beam_dag_runner.py
2020-09-07 09:10:01.148250: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
CLI
Creating a run for pipeline: my_pipeline
Pipeline "my_pipeline" does not exist.

Você deve conseguir ver o log de saída dos componentes adicionados. Nosso pipeline cria artefatos de saída em tfx_pipeline_output/my_pipeline diretório.

Etapa 6. Adicione componentes para treinamento.

Nesta etapa, você irá adicionar componentes para a formação e validação do modelo, incluindo Transform , Trainer , ResolverNode , Evaluator , e Pusher .

Abrir pipeline/pipeline.py . Localizar e descomente 5 linhas que agregam Transform , Trainer , ResolverNode , Evaluator e Pusher ao gasoduto. (Dica: Pesquisar TODO(step 6): )

Como você fez antes, agora você precisa atualizar o pipeline existente com a definição de pipeline modificada. As instruções são as mesmas Passo 5. Atualize o gasoduto usando tfx pipeline update , e criar uma corrida de execução usando tfx run create .

tfx pipeline update --engine=beam --pipeline_path=beam_dag_runner.py
tfx run create --engine beam --pipeline_name "${PIPELINE_NAME}"
tfx pipeline update --engine=beam --pipeline_path=beam_dag_runner.py
tfx run create --engine beam --pipeline_name {PIPELINE_NAME}
2020-09-07 09:10:06.281753: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
CLI
Updating pipeline
Invalid pipeline path: beam_dag_runner.py
2020-09-07 09:10:11.333668: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
CLI
Creating a run for pipeline: my_pipeline
Pipeline "my_pipeline" does not exist.

Quando esta execução terminar com sucesso, você criou e executou seu primeiro pipeline TFX usando o orquestrador Beam!

Passo 7. (Opcional) Tente BigQueryExampleGen.

[BigQuery] é um data warehouse em nuvem sem servidor, altamente escalável e econômico. O BigQuery pode ser usado como fonte para exemplos de treinamento em TFX. Neste passo, vamos adicionar BigQueryExampleGen ao gasoduto.

Você precisa de uma Cloud Platform Google conta para usar o BigQuery. Prepare um projeto do GCP.

Faça o login em seu projeto usando a biblioteca auth colab ou gcloud utilidade.

# You need `gcloud` tool to login in local shell environment.
gcloud auth login
if 'google.colab' in sys.modules:
  from google.colab import auth
  auth.authenticate_user()
  print('Authenticated')

Você deve especificar o nome do projeto do GCP para acessar os recursos do BigQuery usando o TFX. Set GOOGLE_CLOUD_PROJECT ambiente variável para o nome do projecto.

export GOOGLE_CLOUD_PROJECT=YOUR_PROJECT_NAME_HERE
# Set your project name below.
# WARNING! ENTER your project name before running this cell.
%env GOOGLE_CLOUD_PROJECT=YOUR_PROJECT_NAME_HERE
env: GOOGLE_CLOUD_PROJECT=YOUR_PROJECT_NAME_HERE

Abrir pipeline/pipeline.py . Comentário fora CsvExampleGen e descomente a linha que criar uma instância de BigQueryExampleGen . Você também precisa descomente query argumento do create_pipeline função.

Precisamos especificar qual projeto GCP usar para BigQuery novamente, e isso é feito através da criação --project em beam_pipeline_args ao criar um pipeline.

Abrir pipeline/configs.py . Uncomment a definição de BIG_QUERY__WITH_DIRECT_RUNNER_BEAM_PIPELINE_ARGS e BIG_QUERY_QUERY . Você deve substituir o ID do projeto e o valor da região neste arquivo pelos valores corretos para seu projeto do GCP.

Abrir beam_dag_runner.py . Descomente dois argumentos, query e beam_pipeline_args , por método create_pipeline ().

Agora o pipeline está pronto para usar o BigQuery como fonte de exemplo. Atualize o pipeline e crie uma execução como fizemos nas etapas 5 e 6.

tfx pipeline update --engine=beam --pipeline_path=beam_dag_runner.py
tfx run create --engine beam --pipeline_name {PIPELINE_NAME}
2020-09-07 09:10:16.406635: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
CLI
Updating pipeline
Invalid pipeline path: beam_dag_runner.py
2020-09-07 09:10:21.439101: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
CLI
Creating a run for pipeline: my_pipeline
Pipeline "my_pipeline" does not exist.

O que vem a seguir: ingerir SEUS dados para o pipeline.

Criamos um pipeline para um modelo usando o conjunto de dados Chicago Taxi. Agora é hora de colocar seus dados no pipeline.

Seus dados podem ser armazenados em qualquer lugar que seu pipeline possa acessar, incluindo GCS ou BigQuery. Você precisará modificar a definição do pipeline para acessar seus dados.

  1. Se seus dados são armazenados em arquivos, modificar o DATA_PATH em kubeflow_dag_runner.py ou beam_dag_runner.py e configurá-lo para a localização de seus arquivos. Se seus dados são armazenados no BigQuery, modificar BIG_QUERY_QUERY no pipeline/configs.py para corretamente consulta para seus dados.
  2. Adicione recursos em models/features.py .
  3. Modificar models/preprocessing.py para transformar dados de entrada para o treinamento .
  4. Modificar models/keras/model.py e models/keras/constants.py para descrever seus ML modelar .
    • Você também pode usar um modelo baseado em estimador. Mudança RUN_FN constante para models.estimator.model.run_fn no pipeline/configs.py .

Por favor, veja o guia componente instrutor para a introdução mais.