Crea una pipeline TFX utilizzando i modelli con Beam orchestrator

introduzione

Questo documento fornisce le istruzioni per creare un tensorflow Extended (TFX) gasdotto utilizzando i modelli che vengono forniti con il pacchetto TFX Python. La maggior parte delle istruzioni sono comandi di shell di Linux, e le corrispondenti cellule di codice Notebook Jupyter che invocano i comandi utilizzando ! sono forniti.

Si costruirà un oleodotto utilizzando taxi Viaggi set di dati rilasciato dal Comune di Chicago. Ti consigliamo vivamente di provare a creare la tua pipeline utilizzando il tuo set di dati utilizzando questa pipeline come linea di base.

Costruiremo una pipeline utilizzando Apache fascio Orchestrator . Se siete interessati a utilizzare Kubeflow orchestratore su Google Cloud, consulta TFX su Cloud Platform AI Condotte esercitazione .

Prerequisiti

  • Linux/MacOS
  • Python >= 3.5.3

È possibile ottenere tutti i prerequisiti facilmente l'esecuzione di questo notebook su Google Colab .

Passaggio 1. Configura il tuo ambiente.

In questo documento presenteremo i comandi due volte. Una volta come comando della shell pronto per copiare e incollare, una volta come cella di un notebook jupyter. Se stai usando Colab, salta semplicemente il blocco dello script della shell ed esegui le celle del notebook.

È necessario preparare un ambiente di sviluppo per creare una pipeline.

Installare tfx pacchetto python. Si consiglia l'uso del virtualenv nell'ambiente locale. Puoi utilizzare il seguente frammento di script della shell per configurare il tuo ambiente.

# Create a virtualenv for tfx.
virtualenv -p python3 venv
source venv/bin/activate
# Install python packages.
python -m pip install -q --user --upgrade tfx==0.23.0

Se stai usando colab:

import sys
!{sys.executable} -m pip install -q --user --upgrade -q tfx==0.23.0

ERRORE: some-package 0.some_version.1 ha il requisito other-package!=2.0.,<3,>=1.15, ma avrai other-package 2.0.0 che non è compatibile.

Si prega di ignorare questi errori in questo momento.

# Set `PATH` to include user python binary directory.
HOME=%env HOME
PATH=%env PATH
%env PATH={PATH}:{HOME}/.local/bin
env: PATH=/tmpfs/src/tf_docs_env/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/games:/usr/local/games:/snap/bin:/opt/puppetlabs/bin:/opt/android-studio/current/bin:/usr/local/go/bin:/usr/local/go/packages/bin:/opt/kubernetes/client/bin/:/home/kbuilder/.local/bin:/home/kbuilder/.local/bin

Controlliamo la versione di TFX.

python -c "import tfx; print('TFX version: {}'.format(tfx.__version__))"
python3 -c "import tfx; print('TFX version: {}'.format(tfx.__version__))"
TFX version: 0.23.0

Ed è fatto. Siamo pronti per creare una pipeline.

Passaggio 2. Copia il modello predefinito nella directory del progetto.

In questo passaggio, creeremo una directory di progetto della pipeline funzionante e file copiando file aggiuntivi da un modello predefinito.

Si può dare la pipeline un nome diverso cambiando il PIPELINE_NAME di seguito. Questo diventerà anche il nome della directory del progetto in cui verranno inseriti i tuoi file.

export PIPELINE_NAME="my_pipeline"
export PROJECT_DIR=~/tfx/${PIPELINE_NAME}
PIPELINE_NAME="my_pipeline"
import os
# Create a project directory under Colab content directory.
PROJECT_DIR=os.path.join(os.sep,"content",PIPELINE_NAME)

TFX include il taxi modello con il pacchetto TFX pitone. Se stai pianificando di risolvere un problema di previsione puntuale, inclusa la classificazione e la regressione, questo modello potrebbe essere utilizzato come punto di partenza.

I tfx template copy copie comando CLI predefinite file di template nella directory del progetto.

tfx template copy \
   --pipeline_name="${PIPELINE_NAME}" \
   --destination_path="${PROJECT_DIR}" \
   --model=taxi
!tfx template copy \
  --pipeline_name={PIPELINE_NAME} \
  --destination_path={PROJECT_DIR} \
  --model=taxi
2020-09-07 09:09:40.131982: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
CLI
Copying taxi pipeline template
Traceback (most recent call last):
  File "/home/kbuilder/.local/bin/tfx", line 8, in <module>
    sys.exit(cli_group())
  File "/home/kbuilder/.local/lib/python3.6/site-packages/click/core.py", line 829, in __call__
    return self.main(*args, **kwargs)
  File "/home/kbuilder/.local/lib/python3.6/site-packages/click/core.py", line 782, in main
    rv = self.invoke(ctx)
  File "/home/kbuilder/.local/lib/python3.6/site-packages/click/core.py", line 1259, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/home/kbuilder/.local/lib/python3.6/site-packages/click/core.py", line 1259, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/home/kbuilder/.local/lib/python3.6/site-packages/click/core.py", line 1066, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/home/kbuilder/.local/lib/python3.6/site-packages/click/core.py", line 610, in invoke
    return callback(*args, **kwargs)
  File "/home/kbuilder/.local/lib/python3.6/site-packages/click/decorators.py", line 73, in new_func
    return ctx.invoke(f, obj, *args, **kwargs)
  File "/home/kbuilder/.local/lib/python3.6/site-packages/click/core.py", line 610, in invoke
    return callback(*args, **kwargs)
  File "/home/kbuilder/.local/lib/python3.6/site-packages/tfx/tools/cli/commands/template.py", line 73, in copy
    template_handler.copy_template(ctx.flags_dict)
  File "/home/kbuilder/.local/lib/python3.6/site-packages/tfx/tools/cli/handler/template_handler.py", line 168, in copy_template
    replace_dict)
  File "/home/kbuilder/.local/lib/python3.6/site-packages/tfx/tools/cli/handler/template_handler.py", line 107, in _copy_and_replace_placeholder_dir
    tf.io.gfile.makedirs(dst)
  File "/tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow/python/lib/io/file_io.py", line 480, in recursive_create_dir_v2
    _pywrap_file_io.RecursivelyCreateDir(compat.as_bytes(path))
tensorflow.python.framework.errors_impl.PermissionDeniedError: /content; Permission denied

Modificare il contesto della directory di lavoro in questo notebook nella directory del progetto.

cd ${PROJECT_DIR}
%cd {PROJECT_DIR}
[Errno 2] No such file or directory: '/content/my_pipeline'
/tmpfs/src/temp/docs/tutorials/tfx

Passaggio 3. Sfoglia i file di origine copiati.

Il modello TFX fornisce file di scaffold di base per creare una pipeline, inclusi codice sorgente Python, dati di esempio e Jupyter Notebook per analizzare l'output della pipeline. Il taxi modello utilizza lo stesso insieme di dati di Chicago di taxi e il modello ML come il flusso d'aria Tutorial .

In Google Colab, puoi sfogliare i file facendo clic sull'icona di una cartella a sinistra. I file devono essere copiati sotto il Directoy del progetto, il cui nome è my_pipeline in questo caso. È possibile fare clic sui nomi delle directory per visualizzare il contenuto della directory e fare doppio clic sui nomi dei file per aprirli.

Ecco una breve introduzione a ciascuno dei file Python.

  • pipeline - Questa directory contiene la definizione del gasdotto
    • configs.py - definisce costanti comuni per i corridori gasdotti
    • pipeline.py - definisce componenti TFX e un gasdotto
  • models - Questa directory contiene le definizioni ML modello.
    • features.py , features_test.py - definisce caratteristiche per il modello
    • preprocessing.py , preprocessing_test.py - definisce pre-elaborazione lavori utilizzando tf::Transform
    • estimator - Questa directory contiene un modello basato estimatore.
      • constants.py - definisce le costanti del modello
      • model.py , model_test.py - definisce modello di DNN utilizzando stimatore TF
    • keras - Questa directory contiene un modello basato Keras.
      • constants.py - definisce le costanti del modello
      • model.py , model_test.py - definisce modello di DNN utilizzando Keras
  • beam_dag_runner.py , kubeflow_dag_runner.py - definire supporti per ogni motore di orchestrazione

Si potrebbe notare che ci sono alcuni file con _test.py nel loro nome. Si tratta di unit test della pipeline e si consiglia di aggiungere altri unit test man mano che si implementano le proprie pipeline. È possibile eseguire unit test fornendo il nome del modulo di file di test con -m bandiera. Di solito è possibile ottenere un nome di modulo cancellando .py estensione e la sostituzione / con . . Per esempio:

python -m models.features_test
{sys.executable} -m models.features_test
{sys.executable} -m models.keras.model_test
/tmpfs/src/tf_docs_env/bin/python: Error while finding module specification for 'models.features_test' (ModuleNotFoundError: No module named 'models')
/tmpfs/src/tf_docs_env/bin/python: Error while finding module specification for 'models.keras.model_test' (ModuleNotFoundError: No module named 'models')

Passaggio 4. Esegui la tua prima pipeline TFX

È possibile creare una pipeline utilizzando pipeline create comando.

tfx pipeline create --engine=beam --pipeline_path=beam_dag_runner.py
tfx pipeline create --engine=beam --pipeline_path=beam_dag_runner.py
2020-09-07 09:09:45.612839: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
CLI
Creating pipeline
Invalid pipeline path: beam_dag_runner.py

Quindi, è possibile eseguire la pipeline creata utilizzando run create comando.

tfx run create --engine=beam --pipeline_name="${PIPELINE_NAME}"
tfx run create --engine=beam --pipeline_name={PIPELINE_NAME}
2020-09-07 09:09:50.725339: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
CLI
Creating a run for pipeline: my_pipeline
Pipeline "my_pipeline" does not exist.

In caso di successo, si vedrà Component CsvExampleGen is finished. Quando copi il modello, nella pipeline viene incluso solo un componente, CsvExampleGen.

Passaggio 5. Aggiungi componenti per la convalida dei dati.

In questa fase, si aggiungerà componenti per la convalida dei dati tra cui StatisticsGen , SchemaGen e ExampleValidator . Se siete interessati a convalida dei dati, si prega di consultare Inizia con tensorflow Data Validation .

Noi modificare definizione gasdotto copiato in pipeline/pipeline.py . Se stai lavorando nel tuo ambiente locale, usa il tuo editor preferito per modificare il file. Se stai lavorando su Google Colab,

Fare clic sull'icona della cartella a sinistra per aprire Files vista.

Clicca my_pipeline per aprire la directory e fare clic su pipeline directory per aprire e fare doppio clic pipeline.py per aprire il file.

Trovare e rimuovere il commento le 3 linee che aggiungono StatisticsGen , SchemaGen , e ExampleValidator alla pipeline. (Suggerimento: i commenti che contengono TODO(step 5): ).

La tua modifica verrà salvata automaticamente in pochi secondi. Assicurarsi che il * marchio di fronte al pipeline.py scomparso nel titolo della scheda. Non esiste alcun pulsante di salvataggio o collegamento per l'editor di file in Colab. File Python in editor di file possono essere salvati per l'ambiente di runtime, anche in playground modalità.

È ora necessario aggiornare la pipeline esistente con la definizione della pipeline modificata. Utilizzare il tfx pipeline update comando per aggiornare il gasdotto, seguita dalla tfx run create comando per creare una nuova corsa esecuzione del gasdotto aggiornato.

# Update the pipeline
tfx pipeline update --engine=beam --pipeline_path=beam_dag_runner.py
# You can run the pipeline the same way.
tfx run create --engine beam --pipeline_name "${PIPELINE_NAME}"
# Update the pipeline
tfx pipeline update --engine=beam --pipeline_path=beam_dag_runner.py
# You can run the pipeline the same way.
tfx run create --engine beam --pipeline_name {PIPELINE_NAME}
2020-09-07 09:09:55.915484: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
CLI
Updating pipeline
Invalid pipeline path: beam_dag_runner.py
2020-09-07 09:10:01.148250: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
CLI
Creating a run for pipeline: my_pipeline
Pipeline "my_pipeline" does not exist.

Dovresti essere in grado di vedere il registro di output dai componenti aggiunti. La nostra pipeline crea manufatti in uscita tfx_pipeline_output/my_pipeline directory.

Passaggio 6. Aggiungi componenti per la formazione.

In questa fase, si aggiungerà componenti per la formazione e la validazione dei modelli tra cui Transform , Trainer , ResolverNode , Evaluator , e Pusher .

Aprire pipeline/pipeline.py . Trovare e rimuovere il commento 5 linee che aggiungono Transform , Trainer , ResolverNode , Evaluator e Pusher alla pipeline. (Suggerimento: find TODO(step 6): )

Come in precedenza, ora è necessario aggiornare la pipeline esistente con la definizione della pipeline modificata. Le istruzioni sono le stesse di Fase 5. Aggiornare il gasdotto utilizzando tfx pipeline update , e creare una corsa di esecuzione utilizzando tfx run create .

tfx pipeline update --engine=beam --pipeline_path=beam_dag_runner.py
tfx run create --engine beam --pipeline_name "${PIPELINE_NAME}"
tfx pipeline update --engine=beam --pipeline_path=beam_dag_runner.py
tfx run create --engine beam --pipeline_name {PIPELINE_NAME}
2020-09-07 09:10:06.281753: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
CLI
Updating pipeline
Invalid pipeline path: beam_dag_runner.py
2020-09-07 09:10:11.333668: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
CLI
Creating a run for pipeline: my_pipeline
Pipeline "my_pipeline" does not exist.

Quando questa esecuzione viene completata correttamente, ora hai creato ed eseguito la tua prima pipeline TFX utilizzando Beam orchestrator!

Passo 7. (opzionale) Prova BigQueryExampleGen.

[BigQuery] è un data warehouse su cloud serverless, altamente scalabile e conveniente. BigQuery può essere utilizzato come fonte per esempi di formazione in TFX. In questa fase, aggiungeremo BigQueryExampleGen alla pipeline.

Hai bisogno di una piattaforma cloud di Google account da utilizzare BigQuery. Prepara un progetto GCP.

Accesso al progetto utilizzando la libreria di autenticazione CoLab o gcloud utility.

# You need `gcloud` tool to login in local shell environment.
gcloud auth login
if 'google.colab' in sys.modules:
  from google.colab import auth
  auth.authenticate_user()
  print('Authenticated')

Devi specificare il nome del tuo progetto GCP per accedere alle risorse BigQuery utilizzando TFX. Set GOOGLE_CLOUD_PROJECT variabile di ambiente per il nome del progetto.

export GOOGLE_CLOUD_PROJECT=YOUR_PROJECT_NAME_HERE
# Set your project name below.
# WARNING! ENTER your project name before running this cell.
%env GOOGLE_CLOUD_PROJECT=YOUR_PROJECT_NAME_HERE
env: GOOGLE_CLOUD_PROJECT=YOUR_PROJECT_NAME_HERE

Aprire pipeline/pipeline.py . Commentare CsvExampleGen e rimuovere il commento dalla linea che creare un'istanza BigQueryExampleGen . È inoltre necessario rimuovere il commento query argomento della create_pipeline funzioni.

Abbiamo bisogno di specificare quale progetto GCP da utilizzare per BigQuery di nuovo, e questo è fatto impostando --project in beam_pipeline_args durante la creazione di un oleodotto.

Aprire pipeline/configs.py . Rimuovere il commento la definizione di BIG_QUERY__WITH_DIRECT_RUNNER_BEAM_PIPELINE_ARGS e BIG_QUERY_QUERY . Dovresti sostituire l'ID progetto e il valore della regione in questo file con i valori corretti per il tuo progetto GCP.

Aprire beam_dag_runner.py . Decommentate due argomenti, query e beam_pipeline_args , per il metodo create_pipeline ().

Ora la pipeline è pronta per utilizzare BigQuery come origine di esempio. Aggiorna la pipeline e crea un'esecuzione come abbiamo fatto nei passaggi 5 e 6.

tfx pipeline update --engine=beam --pipeline_path=beam_dag_runner.py
tfx run create --engine beam --pipeline_name {PIPELINE_NAME}
2020-09-07 09:10:16.406635: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
CLI
Updating pipeline
Invalid pipeline path: beam_dag_runner.py
2020-09-07 09:10:21.439101: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
CLI
Creating a run for pipeline: my_pipeline
Pipeline "my_pipeline" does not exist.

Cosa c'è dopo: importa i TUOI dati nella pipeline.

Abbiamo creato una pipeline per un modello utilizzando il dataset Chicago Taxi. Ora è il momento di mettere i tuoi dati nella pipeline.

I tuoi dati possono essere archiviati ovunque possa accedere la tua pipeline, inclusi GCS o BigQuery. Sarà necessario modificare la definizione della pipeline per accedere ai dati.

  1. Se i dati sono memorizzati in file, modificare il DATA_PATH in kubeflow_dag_runner.py o beam_dag_runner.py e impostarlo sulla posizione dei file. Se i dati sono memorizzati in BigQuery, modificare BIG_QUERY_QUERY in pipeline/configs.py correttamente query per i dati.
  2. Aggiungi funzionalità in models/features.py .
  3. Modificare models/preprocessing.py per trasformare i dati di input per la formazione .
  4. Modificare models/keras/model.py e models/keras/constants.py per descrivere il vostro modello ML .
    • Puoi anche utilizzare un modello basato su stimatore. Cambiare RUN_FN costante per models.estimator.model.run_fn in pipeline/configs.py .

Si prega di consultare Trainer guida componente per più introduzione.