Utwórz potok TFX za pomocą szablonów za pomocą programu Beam Orchestrator

Wstęp

Dokument ten będzie zawierać instrukcje, aby utworzyć TensorFlow Extended (TFX) rurociąg przy użyciu szablonów, które są dostarczane z pakietem TFX Pythona. Większość instrukcji są poleceń powłoki systemu Linux, a odpowiadające komórki kodu Notebook Jupyter które wywołują te polecenia przy użyciu ! są zapewnione.

Będziesz budować rurociągu przy użyciu Taxi Trips zbiór danych wydany przez miasto Chicago. Zdecydowanie zachęcamy do próby zbudowania własnego potoku przy użyciu zestawu danych, używając tego potoku jako linii bazowej.

Będziemy budować rurociągu przy użyciu Apache Beam Orchestrator . Jeśli jesteś zainteresowany wykorzystaniem Kubeflow Orchestrator na Google Cloud, proszę zobaczyć TFX na tutorialu Chmura AI Platforma rurociągów .

Warunki wstępne

  • Linux / MacOS
  • Python >= 3.5.3

Możesz pobrać wszystkie przesłanki łatwo uruchomieniem tego notebooka na Google Colab .

Krok 1. Skonfiguruj swoje środowisko.

W tym dokumencie dwukrotnie przedstawimy polecenia. Raz jako polecenie powłoki gotowe do kopiowania i wklejania, raz jako komórka notatnika jupyter. Jeśli używasz Colab, po prostu pomiń blok skryptu powłoki i uruchom komórki notatnika.

Powinieneś przygotować środowisko programistyczne do zbudowania potoku.

Zainstalować tfx pakiet Pythona. Zalecamy stosowanie virtualenv w środowisku lokalnym. Do skonfigurowania środowiska można użyć następującego fragmentu skryptu powłoki.

# Create a virtualenv for tfx.
virtualenv -p python3 venv
source venv/bin/activate
# Install python packages.
python -m pip install -q --user --upgrade tfx==0.23.0

Jeśli korzystasz z colabu:

import sys
!{sys.executable} -m pip install -q --user --upgrade -q tfx==0.23.0

BŁĄD: some-package 0.some_version.1 wymaga innego pakietu!=2.0.,<3,>=1.15, ale będziesz mieć inny pakiet 2.0.0, który jest niekompatybilny.

W tej chwili zignoruj ​​te błędy.

# Set `PATH` to include user python binary directory.
HOME=%env HOME
PATH=%env PATH
%env PATH={PATH}:{HOME}/.local/bin
env: PATH=/tmpfs/src/tf_docs_env/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/games:/usr/local/games:/snap/bin:/opt/puppetlabs/bin:/opt/android-studio/current/bin:/usr/local/go/bin:/usr/local/go/packages/bin:/opt/kubernetes/client/bin/:/home/kbuilder/.local/bin:/home/kbuilder/.local/bin

Sprawdźmy wersję TFX.

python -c "import tfx; print('TFX version: {}'.format(tfx.__version__))"
python3 -c "import tfx; print('TFX version: {}'.format(tfx.__version__))"
TFX version: 0.23.0

I gotowe. Jesteśmy gotowi do stworzenia potoku.

Krok 2. Skopiuj predefiniowany szablon do katalogu projektu.

W tym kroku utworzymy działający katalog i pliki projektu potoku, kopiując dodatkowe pliki z predefiniowanego szablonu.

Możesz dać rurociągu inną nazwę, zmieniając PIPELINE_NAME poniżej. Będzie to również nazwa katalogu projektu, w którym zostaną umieszczone twoje pliki.

export PIPELINE_NAME="my_pipeline"
export PROJECT_DIR=~/tfx/${PIPELINE_NAME}
PIPELINE_NAME="my_pipeline"
import os
# Create a project directory under Colab content directory.
PROJECT_DIR=os.path.join(os.sep,"content",PIPELINE_NAME)

TFX zawiera taxi szablon z TFX pakietu python. Jeśli planujesz rozwiązać problem z przewidywaniem punktowym, w tym klasyfikację i regresję, ten szablon może być użyty jako punkt wyjścia.

Do tfx template copy CLI polecenie kopiuje pliki predefiniowane szablony do katalogu projektu.

tfx template copy \
   --pipeline_name="${PIPELINE_NAME}" \
   --destination_path="${PROJECT_DIR}" \
   --model=taxi
!tfx template copy \
  --pipeline_name={PIPELINE_NAME} \
  --destination_path={PROJECT_DIR} \
  --model=taxi
2020-09-07 09:09:40.131982: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
CLI
Copying taxi pipeline template
Traceback (most recent call last):
  File "/home/kbuilder/.local/bin/tfx", line 8, in <module>
    sys.exit(cli_group())
  File "/home/kbuilder/.local/lib/python3.6/site-packages/click/core.py", line 829, in __call__
    return self.main(*args, **kwargs)
  File "/home/kbuilder/.local/lib/python3.6/site-packages/click/core.py", line 782, in main
    rv = self.invoke(ctx)
  File "/home/kbuilder/.local/lib/python3.6/site-packages/click/core.py", line 1259, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/home/kbuilder/.local/lib/python3.6/site-packages/click/core.py", line 1259, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/home/kbuilder/.local/lib/python3.6/site-packages/click/core.py", line 1066, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/home/kbuilder/.local/lib/python3.6/site-packages/click/core.py", line 610, in invoke
    return callback(*args, **kwargs)
  File "/home/kbuilder/.local/lib/python3.6/site-packages/click/decorators.py", line 73, in new_func
    return ctx.invoke(f, obj, *args, **kwargs)
  File "/home/kbuilder/.local/lib/python3.6/site-packages/click/core.py", line 610, in invoke
    return callback(*args, **kwargs)
  File "/home/kbuilder/.local/lib/python3.6/site-packages/tfx/tools/cli/commands/template.py", line 73, in copy
    template_handler.copy_template(ctx.flags_dict)
  File "/home/kbuilder/.local/lib/python3.6/site-packages/tfx/tools/cli/handler/template_handler.py", line 168, in copy_template
    replace_dict)
  File "/home/kbuilder/.local/lib/python3.6/site-packages/tfx/tools/cli/handler/template_handler.py", line 107, in _copy_and_replace_placeholder_dir
    tf.io.gfile.makedirs(dst)
  File "/tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow/python/lib/io/file_io.py", line 480, in recursive_create_dir_v2
    _pywrap_file_io.RecursivelyCreateDir(compat.as_bytes(path))
tensorflow.python.framework.errors_impl.PermissionDeniedError: /content; Permission denied

Zmień kontekst katalogu roboczego w tym notatniku na katalog projektu.

cd ${PROJECT_DIR}
%cd {PROJECT_DIR}
[Errno 2] No such file or directory: '/content/my_pipeline'
/tmpfs/src/temp/docs/tutorials/tfx

Krok 3. Przeglądaj skopiowane pliki źródłowe.

Szablon TFX zawiera podstawowe pliki szkieletowe do tworzenia potoku, w tym kod źródłowy Pythona, przykładowe dane i notatniki Jupyter do analizy danych wyjściowych potoku. taxi szablon korzysta z tego samego zestawu danych Chicago Taxi i model ML jako Airflow Tutorial .

W Google Colab możesz przeglądać pliki, klikając ikonę folderu po lewej stronie. Pliki powinny być kopiowane pod directoy projektu, którego nazwa jest my_pipeline w tym przypadku. Możesz kliknąć nazwy katalogów, aby zobaczyć zawartość katalogu, i dwukrotnie kliknąć nazwy plików, aby je otworzyć.

Oto krótkie wprowadzenie do każdego z plików Pythona.

  • pipeline - Katalog ten zawiera definicję gazociągu
    • configs.py - określa wspólne stałe dla biegaczy rurociągowych
    • pipeline.py - określa elementy TFX i rurociągów
  • models - Ten katalog zawiera definicje ML.
    • features.py , features_test.py - określa funkcje dla modelu
    • preprocessing.py , preprocessing_test.py - określa przerób zadania wykorzystujące tf::Transform
    • estimator - Katalog ten zawiera model oparty prognozy.
      • constants.py - definiuje stałe modelu
      • model.py , model_test.py - określa wzór DNN pomocą estymatora TF
    • keras - Ten katalog zawiera model oparty Keras.
      • constants.py - definiuje stałe modelu
      • model.py , model_test.py - określa modelu DNN korzystając Keras
  • beam_dag_runner.py , kubeflow_dag_runner.py - zdefiniować dla każdego silnika biegaczy orkiestracji

Można zauważyć, że istnieją jakieś pliki z _test.py w ich imieniu. Są to testy jednostkowe potoku i zaleca się dodanie większej liczby testów jednostkowych podczas implementowania własnych potoków. Można uruchomić testy jednostkowe poprzez dostarczanie nazwę modułu plików testowych z -m flagi. Zazwyczaj można uzyskać nazwę modułu usuwając .py rozszerzenie i zastąpienie / z . . Na przykład:

python -m models.features_test
{sys.executable} -m models.features_test
{sys.executable} -m models.keras.model_test
/tmpfs/src/tf_docs_env/bin/python: Error while finding module specification for 'models.features_test' (ModuleNotFoundError: No module named 'models')
/tmpfs/src/tf_docs_env/bin/python: Error while finding module specification for 'models.keras.model_test' (ModuleNotFoundError: No module named 'models')

Krok 4. Uruchom swój pierwszy potok TFX

Można utworzyć za pomocą rurociągu pipeline create komendę.

tfx pipeline create --engine=beam --pipeline_path=beam_dag_runner.py
tfx pipeline create --engine=beam --pipeline_path=beam_dag_runner.py
2020-09-07 09:09:45.612839: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
CLI
Creating pipeline
Invalid pipeline path: beam_dag_runner.py

Następnie można uruchomić utworzoną rurociągu za pomocą run create komendę.

tfx run create --engine=beam --pipeline_name="${PIPELINE_NAME}"
tfx run create --engine=beam --pipeline_name={PIPELINE_NAME}
2020-09-07 09:09:50.725339: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
CLI
Creating a run for pipeline: my_pipeline
Pipeline "my_pipeline" does not exist.

Jeśli się powiedzie, zobaczysz Component CsvExampleGen is finished. Podczas kopiowania szablonu tylko jeden składnik, CsvExampleGen, jest dołączany do potoku.

Krok 5. Dodaj komponenty do walidacji danych.

W tym kroku należy dodać elementy do sprawdzania poprawności danych w tym StatisticsGen , SchemaGen i ExampleValidator . Jeśli jesteś zainteresowany w walidacji danych, proszę zobaczyć Zacznij korzystać z Tensorflow sprawdzania poprawności danych .

Będziemy modyfikować skopiowany definicję gazociągu w pipeline/pipeline.py . Jeśli pracujesz w swoim środowisku lokalnym, użyj swojego ulubionego edytora do edycji pliku. Jeśli pracujesz w Google Colab,

Kliknij ikonę folderu po lewej stronie, aby otworzyć Files widzenia.

Kliknij my_pipeline aby otworzyć katalog i kliknij pipeline katalogu, aby otworzyć i kliknij dwukrotnie pipeline.py aby otworzyć plik.

Znajdź i usuń te 3 linie, które dodają StatisticsGen , SchemaGen i ExampleValidator do rurociągu. (Wskazówka: znaleźć komentarzy zawierających TODO(step 5): ).

Twoja zmiana zostanie automatycznie zapisana za kilka sekund. Upewnij się, że * znak przed pipeline.py zniknął w tytule zakładki. W Colab nie ma przycisku zapisu ani skrótu do edytora plików. Pliki Python w edytorze plik może być zapisany w środowisku wykonawczym nawet na playground trybie.

Teraz musisz zaktualizować istniejący potok o zmodyfikowaną definicję potoku. Użyj tfx pipeline update polecenie, aby zaktualizować rurociągu, a następnie przez tfx run create polecenie, aby utworzyć nowy przebieg realizacji swojego zaktualizowanego rurociągu.

# Update the pipeline
tfx pipeline update --engine=beam --pipeline_path=beam_dag_runner.py
# You can run the pipeline the same way.
tfx run create --engine beam --pipeline_name "${PIPELINE_NAME}"
# Update the pipeline
tfx pipeline update --engine=beam --pipeline_path=beam_dag_runner.py
# You can run the pipeline the same way.
tfx run create --engine beam --pipeline_name {PIPELINE_NAME}
2020-09-07 09:09:55.915484: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
CLI
Updating pipeline
Invalid pipeline path: beam_dag_runner.py
2020-09-07 09:10:01.148250: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
CLI
Creating a run for pipeline: my_pipeline
Pipeline "my_pipeline" does not exist.

Powinieneś być w stanie zobaczyć dziennik wyjściowy z dodanych komponentów. Nasz rurociąg tworzy artefakty wyjściowe w tfx_pipeline_output/my_pipeline katalogu.

Krok 6. Dodaj komponenty do treningu.

W tym kroku należy dodać elementy do szkolenia i walidacji modelu tym Transform , Trainer , ResolverNode , Evaluator i Pusher .

Otwarty pipeline/pipeline.py . Znajdź i usuń 5 linii, które dodają Transform , Trainer , ResolverNode , Evaluator i Pusher do rurociągu. (Wskazówka: Find TODO(step 6): )

Tak jak poprzednio, musisz teraz zaktualizować istniejący potok przy użyciu zmodyfikowanej definicji potoku. Instrukcje są takie same jak Krok 5. Aktualizacja rurociąg za pomocą tfx pipeline update i utworzyć przebieg wykonania przy użyciu tfx run create .

tfx pipeline update --engine=beam --pipeline_path=beam_dag_runner.py
tfx run create --engine beam --pipeline_name "${PIPELINE_NAME}"
tfx pipeline update --engine=beam --pipeline_path=beam_dag_runner.py
tfx run create --engine beam --pipeline_name {PIPELINE_NAME}
2020-09-07 09:10:06.281753: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
CLI
Updating pipeline
Invalid pipeline path: beam_dag_runner.py
2020-09-07 09:10:11.333668: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
CLI
Creating a run for pipeline: my_pipeline
Pipeline "my_pipeline" does not exist.

Po pomyślnym zakończeniu tego uruchomienia, utworzyłeś i uruchomiłeś swój pierwszy potok TFX za pomocą programu Beam Orchestrator!

Krok 7. (Opcjonalnie) Spróbuj BigQueryExampleGen.

[BigQuery] to bezserwerowa, wysoce skalowalna i ekonomiczna hurtownia danych w chmurze. BigQuery może służyć jako źródło przykładów szkoleniowych w TFX. W tym kroku dodamy BigQueryExampleGen do rurociągu.

Musisz Google Cloud Platform konto, aby korzystać z tego narzędzia. Przygotuj projekt GCP.

Zaloguj się do swojego projektu przy użyciu colab bibliotekę auth lub gcloud użyteczność.

# You need `gcloud` tool to login in local shell environment.
gcloud auth login
if 'google.colab' in sys.modules:
  from google.colab import auth
  auth.authenticate_user()
  print('Authenticated')

Aby uzyskać dostęp do zasobów BigQuery przy użyciu TFX, określ nazwę swojego projektu GCP. Zestaw GOOGLE_CLOUD_PROJECT zmienną środowiskową do nazwy projektu.

export GOOGLE_CLOUD_PROJECT=YOUR_PROJECT_NAME_HERE
# Set your project name below.
# WARNING! ENTER your project name before running this cell.
%env GOOGLE_CLOUD_PROJECT=YOUR_PROJECT_NAME_HERE
env: GOOGLE_CLOUD_PROJECT=YOUR_PROJECT_NAME_HERE

Otwarty pipeline/pipeline.py . Wykomentuj CsvExampleGen i odkomentować linię, która tworzy instancję BigQueryExampleGen . Należy również Odkomentuj query argument create_pipeline funkcji.

Musimy określić, które GCP projekt używać do BigQuery ponownie, a odbywa się to poprzez ustawienie --project w beam_pipeline_args podczas tworzenia rurociągu.

Otwarty pipeline/configs.py . Odkomentuj definicja BIG_QUERY__WITH_DIRECT_RUNNER_BEAM_PIPELINE_ARGS i BIG_QUERY_QUERY . Zastąp identyfikator projektu i wartość regionu w tym pliku prawidłowymi wartościami dla projektu GCP.

Otwarte beam_dag_runner.py . Odkomentuj dwa argumenty, query i beam_pipeline_args , dla metody create_pipeline ().

Teraz potok jest gotowy do użycia BigQuery jako przykładowego źródła. Zaktualizuj potok i utwórz przebieg, tak jak w kroku 5 i 6.

tfx pipeline update --engine=beam --pipeline_path=beam_dag_runner.py
tfx run create --engine beam --pipeline_name {PIPELINE_NAME}
2020-09-07 09:10:16.406635: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
CLI
Updating pipeline
Invalid pipeline path: beam_dag_runner.py
2020-09-07 09:10:21.439101: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
CLI
Creating a run for pipeline: my_pipeline
Pipeline "my_pipeline" does not exist.

Co dalej: pozyskaj SWOJE dane do potoku.

Stworzyliśmy potok dla modelu przy użyciu zestawu danych Chicago Taxi. Teraz nadszedł czas, aby umieścić swoje dane w potoku.

Twoje dane mogą być przechowywane w dowolnym miejscu, do którego masz dostęp, w tym w GCS lub BigQuery. Będziesz musiał zmodyfikować definicję potoku, aby uzyskać dostęp do danych.

  1. Jeśli dane są przechowywane w plikach, modyfikować DATA_PATH w kubeflow_dag_runner.py lub beam_dag_runner.py i ustawić go do lokalizacji plików. Jeśli dane są przechowywane w BigQuery, modyfikować BIG_QUERY_QUERY w pipeline/configs.py poprawnie zapytania dla danych.
  2. Dodaj funkcje w models/features.py .
  3. Modyfikować models/preprocessing.py do przekształcania danych wejściowych do treningu .
  4. Modyfikować models/keras/model.py i models/keras/constants.py do opisania modelu ML .
    • Możesz również użyć modelu opartego na estymatorze. Zmień RUN_FN stała się models.estimator.model.run_fn w pipeline/configs.py .

Proszę zobaczyć komponentu przewodnik Trainer więcej wstępie.