Cree una canalización TFX utilizando plantillas con Beam Orchestrator

Introducción

Este documento proporciona instrucciones para crear un TensorFlow Plus (TFX) de tuberías utilizando plantillas que se proporcionan con el paquete TFX Python. La mayor parte de las instrucciones son comandos shell de Linux, y las células de código Notebook Jupyter que invocan estos comandos usando correspondiente ! están provistos.

Usted va a construir una tubería usando Taxi Viajes conjunto de datos dado a conocer por la ciudad de Chicago. Le recomendamos encarecidamente que intente crear su propia canalización utilizando su conjunto de datos utilizando esta canalización como base.

Vamos a construir una tubería usando Apache haz Orchestrator . Si está interesado en utilizar Kubeflow orquestador en la nube de Google, consulte TFX en la nube AI Plataforma Tuberías tutorial .

Prerrequisitos

  • Linux / MacOS
  • Python> = 3.5.3

Se puede llegar fácilmente todos los requisitos previos mediante la ejecución de este bloc de notas en Google Colab .

Paso 1. Configure su entorno.

A lo largo de este documento, presentaremos comandos dos veces. Una vez como un comando de shell listo para copiar y pegar, una vez como una celda de cuaderno de jupyter. Si está utilizando Colab, simplemente omita el bloque de script de shell y ejecute las celdas del cuaderno.

Debe preparar un entorno de desarrollo para construir una canalización.

Instalar tfx paquete python. Recomendamos el uso de virtualenv en el entorno local. Puede utilizar el siguiente fragmento de script de shell para configurar su entorno.

# Create a virtualenv for tfx.
virtualenv -p python3 venv
source venv/bin/activate
# Install python packages.
python -m pip install -q --user --upgrade tfx==0.23.0

Si está utilizando colab:

import sys
!{sys.executable} -m pip install -q --user --upgrade -q tfx==0.23.0

ERROR: some-package 0.some_version.1 tiene el requisito other-package! = 2.0., <3,> = 1.15, pero tendrá otro paquete 2.0.0 que es incompatible.

Por favor ignore estos errores en este momento.

# Set `PATH` to include user python binary directory.
HOME=%env HOME
PATH=%env PATH
%env PATH={PATH}:{HOME}/.local/bin
env: PATH=/tmpfs/src/tf_docs_env/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/games:/usr/local/games:/snap/bin:/opt/puppetlabs/bin:/opt/android-studio/current/bin:/usr/local/go/bin:/usr/local/go/packages/bin:/opt/kubernetes/client/bin/:/home/kbuilder/.local/bin:/home/kbuilder/.local/bin

Comprobemos la versión de TFX.

python -c "import tfx; print('TFX version: {}'.format(tfx.__version__))"
python3 -c "import tfx; print('TFX version: {}'.format(tfx.__version__))"
TFX version: 0.23.0

Y está hecho. Estamos listos para crear una tubería.

Paso 2. Copie la plantilla predefinida en el directorio de su proyecto.

En este paso, crearemos un directorio y archivos de proyecto de canalización de trabajo copiando archivos adicionales de una plantilla predefinida.

Usted puede dar su canal de un nombre diferente cambiando la PIPELINE_NAME a continuación. Este también se convertirá en el nombre del directorio del proyecto donde se colocarán sus archivos.

export PIPELINE_NAME="my_pipeline"
export PROJECT_DIR=~/tfx/${PIPELINE_NAME}
PIPELINE_NAME="my_pipeline"
import os
# Create a project directory under Colab content directory.
PROJECT_DIR=os.path.join(os.sep,"content",PIPELINE_NAME)

TFX incluye el taxi plantilla con el paquete TFX pitón. Si planea resolver un problema de predicción puntual, incluida la clasificación y la regresión, esta plantilla podría usarse como punto de partida.

Las tfx template copy de la CLI comando copia los archivos de plantillas predefinidas en el directorio del proyecto.

tfx template copy \
   --pipeline_name="${PIPELINE_NAME}" \
   --destination_path="${PROJECT_DIR}" \
   --model=taxi
!tfx template copy \
  --pipeline_name={PIPELINE_NAME} \
  --destination_path={PROJECT_DIR} \
  --model=taxi
2020-09-07 09:09:40.131982: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
CLI
Copying taxi pipeline template
Traceback (most recent call last):
  File "/home/kbuilder/.local/bin/tfx", line 8, in <module>
    sys.exit(cli_group())
  File "/home/kbuilder/.local/lib/python3.6/site-packages/click/core.py", line 829, in __call__
    return self.main(*args, **kwargs)
  File "/home/kbuilder/.local/lib/python3.6/site-packages/click/core.py", line 782, in main
    rv = self.invoke(ctx)
  File "/home/kbuilder/.local/lib/python3.6/site-packages/click/core.py", line 1259, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/home/kbuilder/.local/lib/python3.6/site-packages/click/core.py", line 1259, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/home/kbuilder/.local/lib/python3.6/site-packages/click/core.py", line 1066, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/home/kbuilder/.local/lib/python3.6/site-packages/click/core.py", line 610, in invoke
    return callback(*args, **kwargs)
  File "/home/kbuilder/.local/lib/python3.6/site-packages/click/decorators.py", line 73, in new_func
    return ctx.invoke(f, obj, *args, **kwargs)
  File "/home/kbuilder/.local/lib/python3.6/site-packages/click/core.py", line 610, in invoke
    return callback(*args, **kwargs)
  File "/home/kbuilder/.local/lib/python3.6/site-packages/tfx/tools/cli/commands/template.py", line 73, in copy
    template_handler.copy_template(ctx.flags_dict)
  File "/home/kbuilder/.local/lib/python3.6/site-packages/tfx/tools/cli/handler/template_handler.py", line 168, in copy_template
    replace_dict)
  File "/home/kbuilder/.local/lib/python3.6/site-packages/tfx/tools/cli/handler/template_handler.py", line 107, in _copy_and_replace_placeholder_dir
    tf.io.gfile.makedirs(dst)
  File "/tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow/python/lib/io/file_io.py", line 480, in recursive_create_dir_v2
    _pywrap_file_io.RecursivelyCreateDir(compat.as_bytes(path))
tensorflow.python.framework.errors_impl.PermissionDeniedError: /content; Permission denied

Cambie el contexto del directorio de trabajo en este cuaderno al directorio del proyecto.

cd ${PROJECT_DIR}
%cd {PROJECT_DIR}
[Errno 2] No such file or directory: '/content/my_pipeline'
/tmpfs/src/temp/docs/tutorials/tfx

Paso 3. Examine los archivos de origen copiados.

La plantilla TFX proporciona archivos de andamio básicos para construir una canalización, incluido el código fuente de Python, datos de muestra y Jupyter Notebooks para analizar la salida de la canalización. El taxi plantilla utiliza el mismo conjunto de datos de Chicago taxi y el modelo ML como el Tutorial de flujo de aire .

En Google Colab, puede buscar archivos haciendo clic en el icono de una carpeta a la izquierda. Los archivos deben ser copiados en el marco del proyecto directoy, cuyo nombre es my_pipeline en este caso. Puede hacer clic en los nombres de los directorios para ver el contenido del directorio y hacer doble clic en los nombres de los archivos para abrirlos.

Aquí hay una breve introducción a cada uno de los archivos de Python.

  • pipeline - Este directorio contiene la definición de la tubería
    • configs.py - define constantes comunes para los corredores de tuberías
    • pipeline.py - define los componentes TFX y una tubería
  • models - Este directorio contiene definiciones del modelo de ML.
    • features.py , features_test.py - define características para el modelo
    • preprocessing.py , preprocessing_test.py - preprocesamiento define los trabajos que utilizan tf::Transform
    • estimator - Este directorio contiene un modelo basado Estimador.
      • constants.py - define constantes del modelo
      • model.py , model_test.py - define modelo DNN usando estimador TF
    • keras - Este directorio contiene un modelo basado Keras.
      • constants.py - define constantes del modelo
      • model.py , model_test.py - define modelo DNN usando Keras
  • beam_dag_runner.py , kubeflow_dag_runner.py - definir los corredores para cada motor de orquestación

Usted puede notar que hay algunos archivos con _test.py en su nombre. Estas son pruebas unitarias de la canalización y se recomienda agregar más pruebas unitarias a medida que implementa sus propias canalizaciones. Puede ejecutar pruebas unitarias suministrando el nombre del módulo de archivos de prueba con -m bandera. Generalmente, usted puede obtener un nombre de módulo mediante la supresión de .py extensión y la sustitución de / con . . Por ejemplo:

python -m models.features_test
{sys.executable} -m models.features_test
{sys.executable} -m models.keras.model_test
/tmpfs/src/tf_docs_env/bin/python: Error while finding module specification for 'models.features_test' (ModuleNotFoundError: No module named 'models')
/tmpfs/src/tf_docs_env/bin/python: Error while finding module specification for 'models.keras.model_test' (ModuleNotFoundError: No module named 'models')

Paso 4. Ejecute su primera canalización TFX

Puede crear una tubería utilizando pipeline create comandos.

tfx pipeline create --engine=beam --pipeline_path=beam_dag_runner.py
tfx pipeline create --engine=beam --pipeline_path=beam_dag_runner.py
2020-09-07 09:09:45.612839: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
CLI
Creating pipeline
Invalid pipeline path: beam_dag_runner.py

A continuación, puede ejecutar la tubería creado usando run create comandos.

tfx run create --engine=beam --pipeline_name="${PIPELINE_NAME}"
tfx run create --engine=beam --pipeline_name={PIPELINE_NAME}
2020-09-07 09:09:50.725339: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
CLI
Creating a run for pipeline: my_pipeline
Pipeline "my_pipeline" does not exist.

Si tiene éxito, verá Component CsvExampleGen is finished. Cuando copia la plantilla, solo se incluye un componente, CsvExampleGen, en la canalización.

Paso 5. Agregue componentes para la validación de datos.

En este paso, va a agregar componentes para la validación de datos, incluyendo StatisticsGen , SchemaGen y ExampleValidator . Si usted está interesado en la validación de datos, consulte Comience con Tensorflow validación de datos .

Vamos a modificar la definición de tuberías copiado de pipeline/pipeline.py . Si está trabajando en su entorno local, use su editor favorito para editar el archivo. Si está trabajando en Google Colab,

Haga clic en el icono de carpeta de la izquierda para abrir Files vista.

Haga clic my_pipeline para abrir el directorio y haga clic en pipeline directorio para abrir y haga doble clic pipeline.py para abrir el archivo.

Encuentra y elimine las 3 líneas que se suman StatisticsGen , SchemaGen y ExampleValidator a la tubería. (Consejo: Buscar comentarios que contengan TODO(step 5): ).

Su cambio se guardará automáticamente en unos segundos. Asegúrese de que la * marca delante de la pipeline.py desapareció en el título de la pestaña. No hay un botón de guardar ni un atajo para el editor de archivos en Colab. Archivos de Python en editor de archivos se pueden guardar en el entorno de ejecución, incluso en playground modo.

Ahora necesita actualizar la canalización existente con la definición de canalización modificada. Utilice la tfx pipeline update comando para actualizar su tubería, seguido por la tfx run create comando para crear una nueva ejecución de ejecución de la canalización actualizada.

# Update the pipeline
tfx pipeline update --engine=beam --pipeline_path=beam_dag_runner.py
# You can run the pipeline the same way.
tfx run create --engine beam --pipeline_name "${PIPELINE_NAME}"
# Update the pipeline
tfx pipeline update --engine=beam --pipeline_path=beam_dag_runner.py
# You can run the pipeline the same way.
tfx run create --engine beam --pipeline_name {PIPELINE_NAME}
2020-09-07 09:09:55.915484: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
CLI
Updating pipeline
Invalid pipeline path: beam_dag_runner.py
2020-09-07 09:10:01.148250: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
CLI
Creating a run for pipeline: my_pipeline
Pipeline "my_pipeline" does not exist.

Debería poder ver el registro de salida de los componentes agregados. Nuestra tubería de salida crea artefactos en tfx_pipeline_output/my_pipeline directorio.

Paso 6. Agregue componentes para la capacitación.

En este paso, va a agregar componentes para la formación y la validación de modelos incluyendo Transform , Trainer , ResolverNode , Evaluator y Pusher .

Abrir pipeline/pipeline.py . Encuentra y elimine 5 líneas que se suman Transform , Trainer , ResolverNode , Evaluator y Pusher de la tubería. (Consejo: Buscar TODO(step 6): )

Como hizo antes, ahora necesita actualizar la canalización existente con la definición de canalización modificada. Las instrucciones son las mismas que en el Paso 5. Actualización de la tubería mediante tfx pipeline update , y crean un plazo de ejecución utilizando tfx run create .

tfx pipeline update --engine=beam --pipeline_path=beam_dag_runner.py
tfx run create --engine beam --pipeline_name "${PIPELINE_NAME}"
tfx pipeline update --engine=beam --pipeline_path=beam_dag_runner.py
tfx run create --engine beam --pipeline_name {PIPELINE_NAME}
2020-09-07 09:10:06.281753: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
CLI
Updating pipeline
Invalid pipeline path: beam_dag_runner.py
2020-09-07 09:10:11.333668: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
CLI
Creating a run for pipeline: my_pipeline
Pipeline "my_pipeline" does not exist.

Cuando esta ejecución finalice correctamente, habrá creado y ejecutado su primera canalización TFX con Beam Orchestrator.

Paso 7. (Opcional) Pruebe BigQueryExampleGen.

[BigQuery] es un almacén de datos en la nube rentable, altamente escalable y sin servidor. BigQuery se puede usar como fuente para ejemplos de entrenamiento en TFX. En este paso, vamos a añadir BigQueryExampleGen a la tubería.

Se necesita un Google Cloud Platform cuenta para usar BigQuery. Prepara un proyecto de GCP.

Ingrese a su proyecto utilizando la biblioteca de autenticación colab o gcloud utilidad.

# You need `gcloud` tool to login in local shell environment.
gcloud auth login
if 'google.colab' in sys.modules:
  from google.colab import auth
  auth.authenticate_user()
  print('Authenticated')

Debes especificar el nombre de tu proyecto de GCP para acceder a los recursos de BigQuery mediante TFX. Conjunto GOOGLE_CLOUD_PROJECT variable de entorno al nombre del proyecto.

export GOOGLE_CLOUD_PROJECT=YOUR_PROJECT_NAME_HERE
# Set your project name below.
# WARNING! ENTER your project name before running this cell.
%env GOOGLE_CLOUD_PROJECT=YOUR_PROJECT_NAME_HERE
env: GOOGLE_CLOUD_PROJECT=YOUR_PROJECT_NAME_HERE

Abrir pipeline/pipeline.py . Comentar CsvExampleGen y elimine la línea que crea una instancia de BigQueryExampleGen . También es necesario uncomment query argumento de la create_pipeline función.

Tenemos que especificar qué proyecto GCP a utilizar para BigQuery de nuevo, y esto se hace mediante el establecimiento de --project en beam_pipeline_args al crear una tubería.

Abrir pipeline/configs.py . Descomentar la definición de BIG_QUERY__WITH_DIRECT_RUNNER_BEAM_PIPELINE_ARGS y BIG_QUERY_QUERY . Debes reemplazar el ID del proyecto y el valor de la región en este archivo con los valores correctos para tu proyecto de GCP.

Abrir beam_dag_runner.py . Descomente dos argumentos, query y beam_pipeline_args , método create_pipeline () para.

Ahora la canalización está lista para usar BigQuery como fuente de ejemplo. Actualice la canalización y cree una ejecución como hicimos en los pasos 5 y 6.

tfx pipeline update --engine=beam --pipeline_path=beam_dag_runner.py
tfx run create --engine beam --pipeline_name {PIPELINE_NAME}
2020-09-07 09:10:16.406635: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
CLI
Updating pipeline
Invalid pipeline path: beam_dag_runner.py
2020-09-07 09:10:21.439101: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
CLI
Creating a run for pipeline: my_pipeline
Pipeline "my_pipeline" does not exist.

Lo que sigue: transfiera SUS datos a la canalización.

Hicimos una canalización para un modelo utilizando el conjunto de datos de Chicago Taxi. Ahora es el momento de poner sus datos en proceso.

Sus datos se pueden almacenar en cualquier lugar al que pueda acceder su canalización, incluidos GCS o BigQuery. Deberá modificar la definición de la canalización para acceder a sus datos.

  1. Si los datos se almacenan en archivos, modificar el DATA_PATH en kubeflow_dag_runner.py o beam_dag_runner.py y ponerlo a la ubicación de los archivos. Si los datos se almacenan en BigQuery, modificar BIG_QUERY_QUERY en pipeline/configs.py a consulta correctamente para sus datos.
  2. Añadir características de models/features.py .
  3. Modificar models/preprocessing.py para transformar los datos de entrada para el entrenamiento .
  4. Modificar models/keras/model.py y models/keras/constants.py que describen el modelo ML .
    • También puede utilizar un modelo basado en estimadores. Cambio RUN_FN constante para models.estimator.model.run_fn en pipeline/configs.py .

Por favor, véase el entrenador guía de componentes para más introducción.