Utwórz potok TFX za pomocą szablonów

Wstęp

Dokument ten będzie zawierać instrukcje, aby utworzyć TensorFlow Extended (TFX) rurociąg przy użyciu szablonów, które są dostarczane z pakietem TFX Pythona. Wiele instrukcji to polecenia powłoki systemu Linux, które będą działać w instancji AI Platform Notebooks. Odpowiednie komórki Jupyter kod Notebook, które korzystają te polecenia przy użyciu ! są zapewnione.

Będziesz budować rurociągu przy użyciu Taxi Trips zbiór danych wydany przez miasto Chicago. Zdecydowanie zachęcamy do spróbowania zbudowania własnego potoku przy użyciu zestawu danych przy użyciu tego potoku jako punktu odniesienia.

Krok 1. Skonfiguruj swoje środowisko.

AI Platform Pipelines przygotuje środowisko programistyczne do budowy potoku oraz klaster Kubeflow Pipeline do obsługi nowo wybudowanego potoku.

Zainstalować tfx pakiet Pythona w kfp dodatkowym wymogiem.

import sys
# Use the latest version of pip.
!pip install --upgrade pip
# Install tfx and kfp Python packages.
!pip install --upgrade "tfx[kfp]<2"

Sprawdźmy wersje TFX.

python3 -c "from tfx import version ; print('TFX version: {}'.format(version.__version__))"
TFX version: 0.29.0

W AI Platforma rurociągi, TFX działa w środowisku hostowane Kubernetes wykorzystaniem Kubeflow Rurociągi .

Ustawmy kilka zmiennych środowiskowych do korzystania z Kubeflow Pipelines.

Najpierw zdobądź identyfikator projektu GCP.

# Read GCP project id from env.
shell_output=!gcloud config list --format 'value(core.project)' 2>/dev/null
GOOGLE_CLOUD_PROJECT=shell_output[0]
%env GOOGLE_CLOUD_PROJECT={GOOGLE_CLOUD_PROJECT}
print("GCP project ID:" + GOOGLE_CLOUD_PROJECT)
env: GOOGLE_CLOUD_PROJECT=tf-benchmark-dashboard
GCP project ID:tf-benchmark-dashboard

Potrzebujemy również dostępu do Twojego klastra KFP. Możesz uzyskać do niego dostęp w konsoli Google Cloud w menu „AI Platform > Pipeline”. „Punkt końcowy” klastra KFP można znaleźć w adresie URL pulpitu nawigacyjnego Pipelines lub można go uzyskać z adresu URL strony Wprowadzenie, na której uruchomiono ten notatnik. Stwórzmy ENDPOINT zmiennej środowiskowej i ustawić go do punktu końcowego klastra KFP. ENDPOINT powinien zawierać tylko część adresu URL dotyczącą nazwy hosta. Na przykład, jeśli adres URL deski rozdzielczej KFP jest <a href="https://1e9deb537390ca22-dot-asia-east1.pipelines.googleusercontent.com/#/start">https://1e9deb537390ca22-dot-asia-east1.pipelines.googleusercontent.com/#/start</a> wartość ENDPOINT staje 1e9deb537390ca22-dot-asia-east1.pipelines.googleusercontent.com .

# This refers to the KFP cluster endpoint
ENDPOINT='' # Enter your ENDPOINT here.
if not ENDPOINT:
    from absl import logging
    logging.error('Set your ENDPOINT in this cell.')
ERROR:absl:Set your ENDPOINT in this cell.

Ustaw nazwę obrazu jako tfx-pipeline pod aktualnym projekcie GCP.

# Docker image name for the pipeline image.
CUSTOM_TFX_IMAGE='gcr.io/' + GOOGLE_CLOUD_PROJECT + '/tfx-pipeline'

I gotowe. Jesteśmy gotowi do stworzenia potoku.

Krok 2. Skopiuj predefiniowany szablon do katalogu projektu.

W tym kroku utworzymy działający katalog i pliki projektu potoku, kopiując dodatkowe pliki z predefiniowanego szablonu.

Możesz dać rurociągu inną nazwę, zmieniając PIPELINE_NAME poniżej. Będzie to również nazwa katalogu projektu, w którym zostaną umieszczone twoje pliki.

PIPELINE_NAME="my_pipeline"
import os
PROJECT_DIR=os.path.join(os.path.expanduser("~"),"imported",PIPELINE_NAME)

TFX zawiera taxi szablon z TFX pakietu python. Jeśli planujesz rozwiązać problem z przewidywaniem punktowym, w tym klasyfikację i regresję, ten szablon może być użyty jako punkt wyjścia.

Do tfx template copy CLI polecenie kopiuje pliki predefiniowane szablony do katalogu projektu.

!tfx template copy \
  --pipeline-name={PIPELINE_NAME} \
  --destination-path={PROJECT_DIR} \
  --model=taxi
CLI
Copying taxi pipeline template
kubeflow_runner.py -> /home/kbuilder/imported/my_pipeline/kubeflow_runner.py
kubeflow_v2_dag_runner.py -> /home/kbuilder/imported/my_pipeline/kubeflow_v2_dag_runner.py
features_test.py -> /home/kbuilder/imported/my_pipeline/models/features_test.py
model_test.py -> /home/kbuilder/imported/my_pipeline/models/estimator/model_test.py
constants.py -> /home/kbuilder/imported/my_pipeline/models/estimator/constants.py
model.py -> /home/kbuilder/imported/my_pipeline/models/estimator/model.py
__init__.py -> /home/kbuilder/imported/my_pipeline/models/estimator/__init__.py
model_test.py -> /home/kbuilder/imported/my_pipeline/models/keras/model_test.py
constants.py -> /home/kbuilder/imported/my_pipeline/models/keras/constants.py
model.py -> /home/kbuilder/imported/my_pipeline/models/keras/model.py
__init__.py -> /home/kbuilder/imported/my_pipeline/models/keras/__init__.py
preprocessing_test.py -> /home/kbuilder/imported/my_pipeline/models/preprocessing_test.py
preprocessing.py -> /home/kbuilder/imported/my_pipeline/models/preprocessing.py
__init__.py -> /home/kbuilder/imported/my_pipeline/models/__init__.py
features.py -> /home/kbuilder/imported/my_pipeline/models/features.py
pipeline.py -> /home/kbuilder/imported/my_pipeline/pipeline/pipeline.py
configs.py -> /home/kbuilder/imported/my_pipeline/pipeline/configs.py
__init__.py -> /home/kbuilder/imported/my_pipeline/pipeline/__init__.py
local_runner.py -> /home/kbuilder/imported/my_pipeline/local_runner.py
model_analysis.ipynb -> /home/kbuilder/imported/my_pipeline/model_analysis.ipynb
__init__.py -> /home/kbuilder/imported/my_pipeline/__init__.py
data_validation.ipynb -> /home/kbuilder/imported/my_pipeline/data_validation.ipynb
.gitignore -> /home/kbuilder/imported/my_pipeline/.gitignore
Traceback (most recent call last):
  File "/tmpfs/src/tf_docs_env/bin/tfx", line 8, in <module>
    sys.exit(cli_group())
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 829, in __call__
    return self.main(*args, **kwargs)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 782, in main
    rv = self.invoke(ctx)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 1259, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 1259, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 1066, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 610, in invoke
    return callback(*args, **kwargs)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/decorators.py", line 73, in new_func
    return ctx.invoke(f, obj, *args, **kwargs)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 610, in invoke
    return callback(*args, **kwargs)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tfx/tools/cli/commands/template.py", line 73, in copy
    template_handler.copy_template(ctx.flags_dict)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tfx/tools/cli/handler/template_handler.py", line 185, in copy_template
    fileio.copy(src_path, dst_path)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tfx/dsl/io/fileio.py", line 51, in copy
    src_fs.copy(src, dst, overwrite=overwrite)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tfx/dsl/io/plugins/tensorflow_gfile.py", line 48, in copy
    tf.io.gfile.copy(src, dst, overwrite=overwrite)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow/python/lib/io/file_io.py", line 516, in copy_v2
    compat.path_to_bytes(src), compat.path_to_bytes(dst), overwrite)
tensorflow.python.framework.errors_impl.AlreadyExistsError: file already exists

Zmień kontekst katalogu roboczego w tym notatniku na katalog projektu.

%cd {PROJECT_DIR}
/home/kbuilder/imported/my_pipeline

Krok 3. Przeglądaj skopiowane pliki źródłowe

Szablon TFX zawiera podstawowe pliki szkieletowe do tworzenia potoku, w tym kod źródłowy Pythona, przykładowe dane i notatniki Jupyter do analizy danych wyjściowych potoku. taxi szablon korzysta z tego samego zestawu danych Chicago Taxi i model ML jako Airflow Tutorial .

Oto krótkie wprowadzenie do każdego z plików Pythona.

  • pipeline - Katalog ten zawiera definicję gazociągu
    • configs.py - określa wspólne stałe dla biegaczy rurociągowych
    • pipeline.py - określa elementy TFX i rurociągów
  • models - Ten katalog zawiera definicje ML.
    • features.py , features_test.py - określa funkcje dla modelu
    • preprocessing.py , preprocessing_test.py - określa przerób zadania wykorzystujące tf::Transform
    • estimator - Katalog ten zawiera model oparty prognozy.
      • constants.py - definiuje stałe modelu
      • model.py , model_test.py - określa wzór DNN pomocą estymatora TF
    • keras - Ten katalog zawiera model oparty Keras.
      • constants.py - definiuje stałe modelu
      • model.py , model_test.py - określa modelu DNN korzystając Keras
  • local_runner.py , kubeflow_runner.py - zdefiniować dla każdego silnika biegaczy orkiestracji

Można zauważyć, że istnieją jakieś pliki z _test.py w ich imieniu. Są to testy jednostkowe potoku i zaleca się dodanie większej liczby testów jednostkowych podczas implementowania własnych potoków. Można uruchomić testy jednostkowe poprzez dostarczanie nazwę modułu plików testowych z -m flagi. Zazwyczaj można uzyskać nazwę modułu usuwając .py rozszerzenie i zastąpienie / z . . Na przykład:

{sys.executable} -m models.features_test
{sys.executable} -m models.keras.model_test
Running tests under Python 3.7.5: /tmpfs/src/tf_docs_env/bin/python
[ RUN      ] FeaturesTest.testNumberOfBucketFeatureBucketCount
INFO:tensorflow:time(__main__.FeaturesTest.testNumberOfBucketFeatureBucketCount): 0.0s
I1204 11:33:54.064224 139808961349440 test_util.py:2076] time(__main__.FeaturesTest.testNumberOfBucketFeatureBucketCount): 0.0s
[       OK ] FeaturesTest.testNumberOfBucketFeatureBucketCount
[ RUN      ] FeaturesTest.testTransformedNames
INFO:tensorflow:time(__main__.FeaturesTest.testTransformedNames): 0.0s
I1204 11:33:54.064666 139808961349440 test_util.py:2076] time(__main__.FeaturesTest.testTransformedNames): 0.0s
[       OK ] FeaturesTest.testTransformedNames
[ RUN      ] FeaturesTest.test_session
[  SKIPPED ] FeaturesTest.test_session
----------------------------------------------------------------------
Ran 3 tests in 0.001s

OK (skipped=1)
Running tests under Python 3.7.5: /tmpfs/src/tf_docs_env/bin/python
[ RUN      ] ModelTest.testBuildKerasModel
2021-12-04 11:33:57.507456: W tensorflow/stream_executor/platform/default/dso_loader.cc:60] Could not load dynamic library 'libcusolver.so.10'; dlerror: libcusolver.so.10: cannot open shared object file: No such file or directory
2021-12-04 11:33:57.508566: W tensorflow/core/common_runtime/gpu/gpu_device.cc:1757] Cannot dlopen some GPU libraries. Please make sure the missing libraries mentioned above are installed properly if you would like to use GPU. Follow the guide at https://www.tensorflow.org/install/gpu for how to download and setup the required libraries for your platform.
Skipping registering GPU devices...
I1204 11:33:57.581331 139740839778112 layer_utils.py:191] Model: "model"
I1204 11:33:57.581501 139740839778112 layer_utils.py:192] __________________________________________________________________________________________________
I1204 11:33:57.581558 139740839778112 layer_utils.py:189] Layer (type)                    Output Shape         Param #     Connected to                     
I1204 11:33:57.581596 139740839778112 layer_utils.py:194] ==================================================================================================
I1204 11:33:57.581741 139740839778112 layer_utils.py:189] pickup_latitude_xf (InputLayer) [(None,)]            0                                            
I1204 11:33:57.581793 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.581883 139740839778112 layer_utils.py:189] trip_miles_xf (InputLayer)      [(None,)]            0                                            
I1204 11:33:57.581926 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.582010 139740839778112 layer_utils.py:189] trip_start_hour_xf (InputLayer) [(None,)]            0                                            
I1204 11:33:57.582052 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.582189 139740839778112 layer_utils.py:189] dense_features (DenseFeatures)  (None, 1)            0           pickup_latitude_xf[0][0]         
I1204 11:33:57.582241 139740839778112 layer_utils.py:189]                                                                  trip_miles_xf[0][0]              
I1204 11:33:57.582280 139740839778112 layer_utils.py:189]                                                                  trip_start_hour_xf[0][0]         
I1204 11:33:57.582315 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.582462 139740839778112 layer_utils.py:189] dense (Dense)                   (None, 1)            2           dense_features[0][0]             
I1204 11:33:57.582518 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.582629 139740839778112 layer_utils.py:189] dense_1 (Dense)                 (None, 1)            2           dense[0][0]                      
I1204 11:33:57.582674 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.582824 139740839778112 layer_utils.py:189] dense_features_1 (DenseFeatures (None, 34)           0           pickup_latitude_xf[0][0]         
I1204 11:33:57.582879 139740839778112 layer_utils.py:189]                                                                  trip_miles_xf[0][0]              
I1204 11:33:57.582921 139740839778112 layer_utils.py:189]                                                                  trip_start_hour_xf[0][0]         
I1204 11:33:57.582957 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.583053 139740839778112 layer_utils.py:189] concatenate (Concatenate)       (None, 35)           0           dense_1[0][0]                    
I1204 11:33:57.583099 139740839778112 layer_utils.py:189]                                                                  dense_features_1[0][0]           
I1204 11:33:57.583143 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.583260 139740839778112 layer_utils.py:189] dense_2 (Dense)                 (None, 1)            36          concatenate[0][0]                
I1204 11:33:57.583309 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.583389 139740839778112 layer_utils.py:189] tf.compat.v1.squeeze (TFOpLambd (None,)              0           dense_2[0][0]                    
I1204 11:33:57.583432 139740839778112 layer_utils.py:256] ==================================================================================================
I1204 11:33:57.583687 139740839778112 layer_utils.py:267] Total params: 40
I1204 11:33:57.583751 139740839778112 layer_utils.py:268] Trainable params: 40
I1204 11:33:57.583794 139740839778112 layer_utils.py:269] Non-trainable params: 0
I1204 11:33:57.583832 139740839778112 layer_utils.py:270] __________________________________________________________________________________________________
I1204 11:33:57.649701 139740839778112 layer_utils.py:191] Model: "model_1"
I1204 11:33:57.649825 139740839778112 layer_utils.py:192] __________________________________________________________________________________________________
I1204 11:33:57.649878 139740839778112 layer_utils.py:189] Layer (type)                    Output Shape         Param #     Connected to                     
I1204 11:33:57.649932 139740839778112 layer_utils.py:194] ==================================================================================================
I1204 11:33:57.650066 139740839778112 layer_utils.py:189] pickup_latitude_xf (InputLayer) [(None,)]            0                                            
I1204 11:33:57.650120 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.650207 139740839778112 layer_utils.py:189] trip_miles_xf (InputLayer)      [(None,)]            0                                            
I1204 11:33:57.650259 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.650356 139740839778112 layer_utils.py:189] trip_start_hour_xf (InputLayer) [(None,)]            0                                            
I1204 11:33:57.650398 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.650552 139740839778112 layer_utils.py:189] dense_features_2 (DenseFeatures (None, 1)            0           pickup_latitude_xf[0][0]         
I1204 11:33:57.650603 139740839778112 layer_utils.py:189]                                                                  trip_miles_xf[0][0]              
I1204 11:33:57.650644 139740839778112 layer_utils.py:189]                                                                  trip_start_hour_xf[0][0]         
I1204 11:33:57.650682 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.650812 139740839778112 layer_utils.py:189] dense_3 (Dense)                 (None, 1)            2           dense_features_2[0][0]           
I1204 11:33:57.650864 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.651007 139740839778112 layer_utils.py:189] dense_features_3 (DenseFeatures (None, 34)           0           pickup_latitude_xf[0][0]         
I1204 11:33:57.651061 139740839778112 layer_utils.py:189]                                                                  trip_miles_xf[0][0]              
I1204 11:33:57.651102 139740839778112 layer_utils.py:189]                                                                  trip_start_hour_xf[0][0]         
I1204 11:33:57.651146 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.651229 139740839778112 layer_utils.py:189] concatenate_1 (Concatenate)     (None, 35)           0           dense_3[0][0]                    
I1204 11:33:57.651274 139740839778112 layer_utils.py:189]                                                                  dense_features_3[0][0]           
I1204 11:33:57.651311 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.651462 139740839778112 layer_utils.py:189] dense_4 (Dense)                 (None, 1)            36          concatenate_1[0][0]              
I1204 11:33:57.651547 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.651632 139740839778112 layer_utils.py:189] tf.compat.v1.squeeze_1 (TFOpLam (None,)              0           dense_4[0][0]                    
I1204 11:33:57.651675 139740839778112 layer_utils.py:256] ==================================================================================================
I1204 11:33:57.651959 139740839778112 layer_utils.py:267] Total params: 38
I1204 11:33:57.652019 139740839778112 layer_utils.py:268] Trainable params: 38
I1204 11:33:57.652061 139740839778112 layer_utils.py:269] Non-trainable params: 0
I1204 11:33:57.652098 139740839778112 layer_utils.py:270] __________________________________________________________________________________________________
INFO:tensorflow:time(__main__.ModelTest.testBuildKerasModel): 0.84s
I1204 11:33:57.652639 139740839778112 test_util.py:2076] time(__main__.ModelTest.testBuildKerasModel): 0.84s
[       OK ] ModelTest.testBuildKerasModel
[ RUN      ] ModelTest.test_session
[  SKIPPED ] ModelTest.test_session
----------------------------------------------------------------------
Ran 2 tests in 0.836s

OK (skipped=1)

Krok 4. Uruchom swój pierwszy potok TFX

Komponenty w rurociągu TFX wygeneruje wyjścia dla każdego biegu jako artefakty ML metadanych , i muszą być przechowywane gdzieś. Możesz użyć dowolnego magazynu, do którego klaster KFP ma dostęp, a w tym przykładzie użyjemy Google Cloud Storage (GCS). Domyślny zasobnik GCS powinien zostać utworzony automatycznie. Jego nazwa będzie <your-project-id>-kubeflowpipelines-default .

Prześlijmy nasze przykładowe dane do zasobnika GCS, abyśmy mogli później użyć ich w naszym potoku.

gsutil cp data/data.csv gs://{GOOGLE_CLOUD_PROJECT}-kubeflowpipelines-default/tfx-template/data/taxi/data.csv
BucketNotFoundException: 404 gs://tf-benchmark-dashboard-kubeflowpipelines-default bucket does not exist.

Stwórzmy rurociągu TFX używając tfx pipeline create komendę.

!tfx pipeline create  --pipeline-path=kubeflow_runner.py --endpoint={ENDPOINT} \
--build-image
CLI
Usage: tfx pipeline create [OPTIONS]
Try 'tfx pipeline create --help' for help.

Error: no such option: --build-image

Podczas tworzenia rurociągu, Dockerfile zostanie wygenerowany zbudować Docker obraz. Nie zapomnij dodać go do systemu kontroli źródła (na przykład git) wraz z innymi plikami źródłowymi.

Teraz rozpocząć wykonywanie uruchomić z nowo utworzonego rurociągu Używanie tfx run create komendę.

tfx run create --pipeline-name={PIPELINE_NAME} --endpoint={ENDPOINT}
CLI
Creating a run for pipeline: my_pipeline
Detected Beam.
[WARNING] Default engine will be changed to "local" in the near future.
Use --engine flag if you intend to use a different orchestrator.
Pipeline "my_pipeline" does not exist.

Możesz też uruchomić potok na pulpicie nawigacyjnym KFP. Nowy przebieg wykonania zostanie wyświetlony w obszarze Eksperymenty na pulpicie nawigacyjnym KFP. Kliknięcie w eksperyment pozwoli Ci monitorować postęp i wizualizować artefakty utworzone podczas wykonywania.

Zalecamy jednak odwiedzenie pulpitu nawigacyjnego KFP. Dostęp do pulpitu nawigacyjnego KFP możesz uzyskać z menu Cloud AI Platform Pipelines w Google Cloud Console. Gdy odwiedzisz pulpit nawigacyjny, będziesz mógł znaleźć potok i uzyskać dostęp do wielu informacji o potoku. Na przykład, można znaleźć swoje biegi w menu eksperymenty, a kiedy otworzy swój bieg wykonanie pod eksperymentów można znaleźć wszystkie artefakty z rurociągu pod menu artefaktów.

Jednym z głównych źródeł niepowodzeń są problemy związane z uprawnieniami. Upewnij się, że Twój klaster KFP ma uprawnienia dostępu do interfejsów API Google Cloud. To może być skonfigurowany podczas tworzenia klastra KFP w GCP lub zobacz dokument usterek w GCP .

Krok 5. Dodaj komponenty do walidacji danych.

W tym kroku należy dodać elementy do sprawdzania poprawności danych w tym StatisticsGen , SchemaGen i ExampleValidator . Jeśli jesteś zainteresowany w walidacji danych, proszę zobaczyć Zacznij korzystać z Tensorflow sprawdzania poprawności danych .

Kliknij dwukrotnie, aby zmienić katalog na pipeline i dwukrotnie kliknij ponownie, aby otworzyć pipeline.py . Znajdź i usuń te 3 linie, które dodają StatisticsGen , SchemaGen i ExampleValidator do rurociągu. (Wskazówka: wyszukiwanie komentarzy zawierających TODO(step 5): ). Upewnij się, aby zapisać pipeline.py po edycji.

Teraz musisz zaktualizować istniejący potok o zmodyfikowaną definicję potoku. Użyj tfx pipeline update polecenie, aby zaktualizować rurociągu, a następnie przez tfx run create polecenie, aby utworzyć nowy przebieg realizacji swojego zaktualizowanego rurociągu.

# Update the pipeline
!tfx pipeline update \
--pipeline-path=kubeflow_runner.py \
--endpoint={ENDPOINT}
# You can run the pipeline the same way.
!tfx run create --pipeline-name {PIPELINE_NAME} --endpoint={ENDPOINT}
CLI
Updating pipeline
Detected Beam.
[WARNING] Default engine will be changed to "local" in the near future.
Use --engine flag if you intend to use a different orchestrator.
beam runner not found in dsl.
CLI
Creating a run for pipeline: my_pipeline
Detected Beam.
[WARNING] Default engine will be changed to "local" in the near future.
Use --engine flag if you intend to use a different orchestrator.
Pipeline "my_pipeline" does not exist.

Sprawdź wyjścia rurociągu

Odwiedź pulpit nawigacyjny KFP, aby znaleźć dane wyjściowe potoku na stronie dla uruchomienia potoku. Kliknij kartę eksperymenty na lewo, a wszystko działa w stronę eksperymentów. Powinieneś być w stanie znaleźć najnowsze uruchomienie pod nazwą twojego potoku.

Krok 6. Dodaj komponenty do treningu.

W tym kroku należy dodać elementy do szkolenia i walidacji modelu tym Transform , Trainer , Resolver , Evaluator i Pusher .

Dwukrotne kliknięcie otwiera pipeline.py . Znajdź i usuń z 5 linii, które dodają Transform , Trainer , Resolver , Evaluator i Pusher do rurociągu. (Wskazówka: wyszukiwanie TODO(step 6): )

Tak jak poprzednio, musisz teraz zaktualizować istniejący potok przy użyciu zmodyfikowanej definicji potoku. Instrukcje są takie same jak Krok 5. Aktualizacja rurociąg za pomocą tfx pipeline update i utworzyć przebieg wykonania przy użyciu tfx run create .

!tfx pipeline update \
--pipeline-path=kubeflow_runner.py \
--endpoint={ENDPOINT}
!tfx run create --pipeline-name {PIPELINE_NAME} --endpoint={ENDPOINT}
CLI
Updating pipeline
Detected Beam.
[WARNING] Default engine will be changed to "local" in the near future.
Use --engine flag if you intend to use a different orchestrator.
beam runner not found in dsl.
CLI
Creating a run for pipeline: my_pipeline
Detected Beam.
[WARNING] Default engine will be changed to "local" in the near future.
Use --engine flag if you intend to use a different orchestrator.
Pipeline "my_pipeline" does not exist.

Po pomyślnym zakończeniu tego uruchomienia, utworzyłeś i uruchomiłeś swój pierwszy potok TFX w AI Platform Pipelines!

Krok 7. (Opcjonalnie) Spróbuj BigQueryExampleGen

BigQuery to Serverless, wysoce skalowalne i opłacalne magazyn chmura danych. BigQuery może służyć jako źródło przykładów szkoleniowych w TFX. W tym kroku dodamy BigQueryExampleGen do rurociągu.

Dwukrotne kliknięcie otwiera pipeline.py . Wykomentuj CsvExampleGen odkomentowane linia, która tworzy instancję BigQueryExampleGen . Należy również odkomentuj query argument create_pipeline funkcji.

Musimy określić, które GCP projekt używać do BigQuery, a odbywa się to poprzez ustawienie --project w beam_pipeline_args podczas tworzenia rurociągu.

Dwukrotne kliknięcie otwiera configs.py . Odkomentuj definicja GOOGLE_CLOUD_REGION , BIG_QUERY_WITH_DIRECT_RUNNER_BEAM_PIPELINE_ARGS i BIG_QUERY_QUERY . Zastąp wartość regionu w tym pliku poprawnymi wartościami dla projektu GCP.

Zmień katalog o jeden poziom w górę. Kliknij nazwę katalogu nad listą plików. Nazwa katalogu jest nazwą rurociągu, który jest my_pipeline jeśli nie zmieni.

Dwukrotne kliknięcie otwiera kubeflow_runner.py . Odkomentuj dwa argumenty, query i beam_pipeline_args , dla create_pipeline funkcji.

Teraz potok jest gotowy do użycia BigQuery jako przykładowego źródła. Zaktualizuj potok jak poprzednio i utwórz nowe uruchomienie, tak jak zrobiliśmy to w krokach 5 i 6.

!tfx pipeline update \
--pipeline-path=kubeflow_runner.py \
--endpoint={ENDPOINT}
!tfx run create --pipeline-name {PIPELINE_NAME} --endpoint={ENDPOINT}
CLI
Updating pipeline
Detected Beam.
[WARNING] Default engine will be changed to "local" in the near future.
Use --engine flag if you intend to use a different orchestrator.
beam runner not found in dsl.
CLI
Creating a run for pipeline: my_pipeline
Detected Beam.
[WARNING] Default engine will be changed to "local" in the near future.
Use --engine flag if you intend to use a different orchestrator.
Pipeline "my_pipeline" does not exist.

Krok 8. (Opcjonalnie) Spróbuj przepływ danych z KFP

Kilka Komponenty TFX wykorzystuje Apache Beam wdrożyć rurociągi danych równoległe, a to oznacza, że można rozpowszechniać przetwarzania danych obciążeń przy użyciu Google Cloud przepływu danych . W tym kroku ustawimy koordynatora Kubeflow tak, aby używał przepływu danych jako zaplecza przetwarzania danych dla Apache Beam.

Podwójne kliknięcie pipeline do katalogu zmian i kliknij dwukrotnie, aby otworzyć configs.py . Odkomentuj definicja GOOGLE_CLOUD_REGION i DATAFLOW_BEAM_PIPELINE_ARGS .

Zmień katalog o jeden poziom w górę. Kliknij nazwę katalogu nad listą plików. Nazwa katalogu jest nazwą rurociągu, który jest my_pipeline jeśli nie zmieni.

Dwukrotne kliknięcie otwiera kubeflow_runner.py . Odkomentuj beam_pipeline_args . (Również upewnić się, aby skomentować bieżące beam_pipeline_args że dodany w kroku 7.)

Teraz potok jest gotowy do użycia Dataflow. Zaktualizuj potok i utwórz uruchomienie wykonania, tak jak to zrobiliśmy w krokach 5 i 6.

!tfx pipeline update \
--pipeline-path=kubeflow_runner.py \
--endpoint={ENDPOINT}
!tfx run create --pipeline-name {PIPELINE_NAME} --endpoint={ENDPOINT}
CLI
Updating pipeline
Detected Beam.
[WARNING] Default engine will be changed to "local" in the near future.
Use --engine flag if you intend to use a different orchestrator.
beam runner not found in dsl.
CLI
Creating a run for pipeline: my_pipeline
Detected Beam.
[WARNING] Default engine will be changed to "local" in the near future.
Use --engine flag if you intend to use a different orchestrator.
Pipeline "my_pipeline" does not exist.

Można znaleźć pracę przepływu danych w przepływ danych w chmurze konsoli .

Krok 9. (Opcjonalnie) Spróbuj Chmura AI platformy szkoleniowej i przewidywania z KFP

TFX współpracuje z kilkoma usług zarządzanych GCP, takie jak cloud AI Platforma Szkoleniowo Prediction . Można ustawić Trainer komponent do korzystania z Cloud AI platformy szkoleniowej, zarządzanej usługi szkolenia dla modeli ML. Ponadto, gdy dany model jest zbudowany i gotowy, aby Mu służono, można przesunąć swój model do Prediction Chmura AI Platforma do serwowania. W tym kroku ustawiamy nasz Trainer i Pusher komponent do korzystania z usług cloud AI platformy.

Przed edycją plików, może najpierw trzeba włączyć AI platformy szkoleniowej & Prediction API.

Podwójne kliknięcie pipeline do katalogu zmian i kliknij dwukrotnie, aby otworzyć configs.py . Odkomentuj definicja GOOGLE_CLOUD_REGION , GCP_AI_PLATFORM_TRAINING_ARGS i GCP_AI_PLATFORM_SERVING_ARGS . Będziemy wykorzystywać nasze niestandardowe wbudowany pojemnik na zdjęcie, aby trenować w modelu cloud AI platformy szkoleniowej, więc powinniśmy ustawić masterConfig.imageUri w GCP_AI_PLATFORM_TRAINING_ARGS na taką samą wartość jak CUSTOM_TFX_IMAGE powyżej.

Zmiana katalogu o jeden poziom w górę, i kliknij dwukrotnie, aby otworzyć kubeflow_runner.py . Odkomentuj ai_platform_training_args i ai_platform_serving_args .

Zaktualizuj potok i utwórz uruchomienie wykonania, tak jak to zrobiliśmy w krokach 5 i 6.

!tfx pipeline update \
--pipeline-path=kubeflow_runner.py \
--endpoint={ENDPOINT}
!tfx run create --pipeline-name {PIPELINE_NAME} --endpoint={ENDPOINT}
CLI
Updating pipeline
Detected Beam.
[WARNING] Default engine will be changed to "local" in the near future.
Use --engine flag if you intend to use a different orchestrator.
beam runner not found in dsl.
CLI
Creating a run for pipeline: my_pipeline
Detected Beam.
[WARNING] Default engine will be changed to "local" in the near future.
Use --engine flag if you intend to use a different orchestrator.
Pipeline "my_pipeline" does not exist.

Można znaleźć oferty szkoleniowe w chmurze AI Platforma Jobs . Jeśli rurociąg zakończone powodzeniem można znaleźć w modelu Models Chmura AI Platformy .

Krok 10. Pozyskaj SWOJE dane do potoku

Stworzyliśmy potok dla modelu przy użyciu zestawu danych Chicago Taxi. Teraz nadszedł czas, aby umieścić swoje dane w potoku.

Twoje dane mogą być przechowywane w dowolnym miejscu, do którego masz dostęp, w tym w GCS lub BigQuery. Będziesz musiał zmodyfikować definicję potoku, aby uzyskać dostęp do danych.

  1. Jeśli dane są przechowywane w plikach, modyfikować DATA_PATH w kubeflow_runner.py lub local_runner.py i ustawić go do lokalizacji plików. Jeśli dane są przechowywane w BigQuery, modyfikować BIG_QUERY_QUERY w pipeline/configs.py poprawnie zapytania dla danych.
  2. Dodaj funkcje w models/features.py .
  3. Modyfikować models/preprocessing.py do przekształcania danych wejściowych do treningu .
  4. Modyfikować models/keras/model.py i models/keras/constants.py do opisania modelu ML .
    • Możesz również użyć modelu opartego na estymatorze. Zmień RUN_FN stała się models.estimator.model.run_fn w pipeline/configs.py .

Proszę zobaczyć komponentu przewodnik Trainer więcej wstępie.

Sprzątanie

Aby oczyścić wszystkich zasobów Google Cloud zastosowane w tym projekcie, można usunąć projekt Google Cloud został użyty do samouczka.

Alternatywnie możesz wyczyścić poszczególne zasoby, odwiedzając każdą konsolę: