Создайте конвейер TFX с помощью шаблонов

Введение

Этот документ будет содержать инструкции по созданию TensorFlow Extended (TFX) трубопровод , используя шаблоны , которые предоставляются с пакетом TFX Python. Многие из инструкций представляют собой команды оболочки Linux, которые будут выполняться в экземпляре AI Platform Notebooks. Соответствующие ячейки кода Notebook Jupyter , которые ссылаются на эти команды с помощью ! предоставлены.

Вы будете строить трубопровод с помощью такси Trips набора данных выпущен городом Чикаго. Мы настоятельно рекомендуем вам попробовать построить свой собственный конвейер, используя свой набор данных, используя этот конвейер в качестве основы.

Шаг 1. Настройте свою среду.

AI Platform Pipelines подготовит среду разработки для создания конвейера и кластер Kubeflow Pipeline для запуска недавно построенного конвейера.

Установите tfx питона пакет с kfp дополнительным требованием.

import sys
# Use the latest version of pip.
!pip install --upgrade pip
# Install tfx and kfp Python packages.
!pip install --upgrade "tfx[kfp]<2"

Проверим версии TFX.

python3 -c "from tfx import version ; print('TFX version: {}'.format(version.__version__))"
TFX version: 0.29.0

В AI Platform Трубопроводы, TFX работает в среде хостинга Kubernetes использованием Kubeflow трубопроводов .

Давайте установим некоторые переменные среды для использования Kubeflow Pipelines.

Сначала получите идентификатор проекта GCP.

# Read GCP project id from env.
shell_output=!gcloud config list --format 'value(core.project)' 2>/dev/null
GOOGLE_CLOUD_PROJECT=shell_output[0]
%env GOOGLE_CLOUD_PROJECT={GOOGLE_CLOUD_PROJECT}
print("GCP project ID:" + GOOGLE_CLOUD_PROJECT)
env: GOOGLE_CLOUD_PROJECT=tf-benchmark-dashboard
GCP project ID:tf-benchmark-dashboard

Нам также нужен доступ к вашему кластеру KFP. Вы можете получить к нему доступ в своей облачной консоли Google в меню «AI Platform> Pipeline». «Конечную точку» кластера KFP можно найти по URL-адресу панели мониторинга Pipelines или по URL-адресу страницы «Приступая к работе», на которой вы запустили эту записную книжку. Давайте создадим ENDPOINT переменную окружения и установите его в кластер конечной KFP. ENDPOINT должен содержать только часть имени хоста URL-адреса. Например, если URL в KFP приборной панели <a href="https://1e9deb537390ca22-dot-asia-east1.pipelines.googleusercontent.com/#/start">https://1e9deb537390ca22-dot-asia-east1.pipelines.googleusercontent.com/#/start</a> , значение ENDPOINT становится 1e9deb537390ca22-dot-asia-east1.pipelines.googleusercontent.com .

# This refers to the KFP cluster endpoint
ENDPOINT='' # Enter your ENDPOINT here.
if not ENDPOINT:
    from absl import logging
    logging.error('Set your ENDPOINT in this cell.')
ERROR:absl:Set your ENDPOINT in this cell.

Установить имя изображения в качестве tfx-pipeline в рамках текущего проекта GCP.

# Docker image name for the pipeline image.
CUSTOM_TFX_IMAGE='gcr.io/' + GOOGLE_CLOUD_PROJECT + '/tfx-pipeline'

И это сделано. Мы готовы создать конвейер.

Шаг 2. Скопируйте предопределенный шаблон в каталог вашего проекта.

На этом этапе мы создадим каталог и файлы проекта рабочего конвейера, скопировав дополнительные файлы из предопределенного шаблона.

Вы можете дать вашему трубопроводному другое имя, изменив PIPELINE_NAME ниже. Это также станет именем каталога проекта, в который будут помещены ваши файлы.

PIPELINE_NAME="my_pipeline"
import os
PROJECT_DIR=os.path.join(os.path.expanduser("~"),"imported",PIPELINE_NAME)

TFX включает taxi шаблон с пакетом TFX питона. Если вы планируете решить задачу точечного прогнозирования, включая классификацию и регрессию, этот шаблон можно использовать в качестве отправной точки.

В tfx template copy CLI команда копирует файлы шаблонов предопределены в каталог проекта.

!tfx template copy \
  --pipeline-name={PIPELINE_NAME} \
  --destination-path={PROJECT_DIR} \
  --model=taxi
CLI
Copying taxi pipeline template
kubeflow_runner.py -> /home/kbuilder/imported/my_pipeline/kubeflow_runner.py
kubeflow_v2_dag_runner.py -> /home/kbuilder/imported/my_pipeline/kubeflow_v2_dag_runner.py
features_test.py -> /home/kbuilder/imported/my_pipeline/models/features_test.py
model_test.py -> /home/kbuilder/imported/my_pipeline/models/estimator/model_test.py
constants.py -> /home/kbuilder/imported/my_pipeline/models/estimator/constants.py
model.py -> /home/kbuilder/imported/my_pipeline/models/estimator/model.py
__init__.py -> /home/kbuilder/imported/my_pipeline/models/estimator/__init__.py
model_test.py -> /home/kbuilder/imported/my_pipeline/models/keras/model_test.py
constants.py -> /home/kbuilder/imported/my_pipeline/models/keras/constants.py
model.py -> /home/kbuilder/imported/my_pipeline/models/keras/model.py
__init__.py -> /home/kbuilder/imported/my_pipeline/models/keras/__init__.py
preprocessing_test.py -> /home/kbuilder/imported/my_pipeline/models/preprocessing_test.py
preprocessing.py -> /home/kbuilder/imported/my_pipeline/models/preprocessing.py
__init__.py -> /home/kbuilder/imported/my_pipeline/models/__init__.py
features.py -> /home/kbuilder/imported/my_pipeline/models/features.py
pipeline.py -> /home/kbuilder/imported/my_pipeline/pipeline/pipeline.py
configs.py -> /home/kbuilder/imported/my_pipeline/pipeline/configs.py
__init__.py -> /home/kbuilder/imported/my_pipeline/pipeline/__init__.py
local_runner.py -> /home/kbuilder/imported/my_pipeline/local_runner.py
model_analysis.ipynb -> /home/kbuilder/imported/my_pipeline/model_analysis.ipynb
__init__.py -> /home/kbuilder/imported/my_pipeline/__init__.py
data_validation.ipynb -> /home/kbuilder/imported/my_pipeline/data_validation.ipynb
.gitignore -> /home/kbuilder/imported/my_pipeline/.gitignore
Traceback (most recent call last):
  File "/tmpfs/src/tf_docs_env/bin/tfx", line 8, in <module>
    sys.exit(cli_group())
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 829, in __call__
    return self.main(*args, **kwargs)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 782, in main
    rv = self.invoke(ctx)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 1259, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 1259, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 1066, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 610, in invoke
    return callback(*args, **kwargs)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/decorators.py", line 73, in new_func
    return ctx.invoke(f, obj, *args, **kwargs)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 610, in invoke
    return callback(*args, **kwargs)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tfx/tools/cli/commands/template.py", line 73, in copy
    template_handler.copy_template(ctx.flags_dict)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tfx/tools/cli/handler/template_handler.py", line 185, in copy_template
    fileio.copy(src_path, dst_path)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tfx/dsl/io/fileio.py", line 51, in copy
    src_fs.copy(src, dst, overwrite=overwrite)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tfx/dsl/io/plugins/tensorflow_gfile.py", line 48, in copy
    tf.io.gfile.copy(src, dst, overwrite=overwrite)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow/python/lib/io/file_io.py", line 516, in copy_v2
    compat.path_to_bytes(src), compat.path_to_bytes(dst), overwrite)
tensorflow.python.framework.errors_impl.AlreadyExistsError: file already exists

Измените контекст рабочего каталога в этой записной книжке на каталог проекта.

%cd {PROJECT_DIR}
/home/kbuilder/imported/my_pipeline

Шаг 3. Просмотрите скопированные исходные файлы.

Шаблон TFX предоставляет базовые файлы скаффолда для построения конвейера, включая исходный код Python, образцы данных и блокноты Jupyter для анализа выходных данных конвейера. taxi шаблон использует тот же набор данных Чикаго такси и модель ML как Airflow Учебник .

Вот краткое введение в каждый из файлов Python.

  • pipeline - Этот каталог содержит определение трубопровода
    • configs.py - определяет общие константы для бегунов трубопровода
    • pipeline.py - определяет TFX компоненты и трубопроводов
  • models - Этот каталог содержит определение модели ML.
    • features.py , features_test.py - определяет функции для модели
    • preprocessing.py , preprocessing_test.py - определяет предварительной обработки заданий с помощью tf::Transform
    • estimator - Этот каталог содержит модель , основанный оценщика.
      • constants.py - определяет константы модели
      • model.py , model_test.py - определяет DNN модель с использованием оценщика TF
    • keras - Этот каталог содержит модель , основанную Keras.
      • constants.py - определяет константы модели
      • model.py , model_test.py - определяет модель DNN с помощью Keras
  • local_runner.py , kubeflow_runner.py - определить бегуны для каждого оркестровки двигателя

Вы можете заметить , что есть некоторые файлы с _test.py на их имя. Это модульные тесты конвейера, и рекомендуется добавлять дополнительные модульные тесты по мере реализации ваших собственных конвейеров. Вы можете запустить юнит - тесты, указав имя модуля тестовых файлов с -m флагом. Обычно вы можете получить имя модуля, удалив .py расширения и замену / с . . Например:

{sys.executable} -m models.features_test
{sys.executable} -m models.keras.model_test
Running tests under Python 3.7.5: /tmpfs/src/tf_docs_env/bin/python
[ RUN      ] FeaturesTest.testNumberOfBucketFeatureBucketCount
INFO:tensorflow:time(__main__.FeaturesTest.testNumberOfBucketFeatureBucketCount): 0.0s
I1204 11:33:54.064224 139808961349440 test_util.py:2076] time(__main__.FeaturesTest.testNumberOfBucketFeatureBucketCount): 0.0s
[       OK ] FeaturesTest.testNumberOfBucketFeatureBucketCount
[ RUN      ] FeaturesTest.testTransformedNames
INFO:tensorflow:time(__main__.FeaturesTest.testTransformedNames): 0.0s
I1204 11:33:54.064666 139808961349440 test_util.py:2076] time(__main__.FeaturesTest.testTransformedNames): 0.0s
[       OK ] FeaturesTest.testTransformedNames
[ RUN      ] FeaturesTest.test_session
[  SKIPPED ] FeaturesTest.test_session
----------------------------------------------------------------------
Ran 3 tests in 0.001s

OK (skipped=1)
Running tests under Python 3.7.5: /tmpfs/src/tf_docs_env/bin/python
[ RUN      ] ModelTest.testBuildKerasModel
2021-12-04 11:33:57.507456: W tensorflow/stream_executor/platform/default/dso_loader.cc:60] Could not load dynamic library 'libcusolver.so.10'; dlerror: libcusolver.so.10: cannot open shared object file: No such file or directory
2021-12-04 11:33:57.508566: W tensorflow/core/common_runtime/gpu/gpu_device.cc:1757] Cannot dlopen some GPU libraries. Please make sure the missing libraries mentioned above are installed properly if you would like to use GPU. Follow the guide at https://www.tensorflow.org/install/gpu for how to download and setup the required libraries for your platform.
Skipping registering GPU devices...
I1204 11:33:57.581331 139740839778112 layer_utils.py:191] Model: "model"
I1204 11:33:57.581501 139740839778112 layer_utils.py:192] __________________________________________________________________________________________________
I1204 11:33:57.581558 139740839778112 layer_utils.py:189] Layer (type)                    Output Shape         Param #     Connected to                     
I1204 11:33:57.581596 139740839778112 layer_utils.py:194] ==================================================================================================
I1204 11:33:57.581741 139740839778112 layer_utils.py:189] pickup_latitude_xf (InputLayer) [(None,)]            0                                            
I1204 11:33:57.581793 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.581883 139740839778112 layer_utils.py:189] trip_miles_xf (InputLayer)      [(None,)]            0                                            
I1204 11:33:57.581926 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.582010 139740839778112 layer_utils.py:189] trip_start_hour_xf (InputLayer) [(None,)]            0                                            
I1204 11:33:57.582052 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.582189 139740839778112 layer_utils.py:189] dense_features (DenseFeatures)  (None, 1)            0           pickup_latitude_xf[0][0]         
I1204 11:33:57.582241 139740839778112 layer_utils.py:189]                                                                  trip_miles_xf[0][0]              
I1204 11:33:57.582280 139740839778112 layer_utils.py:189]                                                                  trip_start_hour_xf[0][0]         
I1204 11:33:57.582315 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.582462 139740839778112 layer_utils.py:189] dense (Dense)                   (None, 1)            2           dense_features[0][0]             
I1204 11:33:57.582518 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.582629 139740839778112 layer_utils.py:189] dense_1 (Dense)                 (None, 1)            2           dense[0][0]                      
I1204 11:33:57.582674 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.582824 139740839778112 layer_utils.py:189] dense_features_1 (DenseFeatures (None, 34)           0           pickup_latitude_xf[0][0]         
I1204 11:33:57.582879 139740839778112 layer_utils.py:189]                                                                  trip_miles_xf[0][0]              
I1204 11:33:57.582921 139740839778112 layer_utils.py:189]                                                                  trip_start_hour_xf[0][0]         
I1204 11:33:57.582957 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.583053 139740839778112 layer_utils.py:189] concatenate (Concatenate)       (None, 35)           0           dense_1[0][0]                    
I1204 11:33:57.583099 139740839778112 layer_utils.py:189]                                                                  dense_features_1[0][0]           
I1204 11:33:57.583143 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.583260 139740839778112 layer_utils.py:189] dense_2 (Dense)                 (None, 1)            36          concatenate[0][0]                
I1204 11:33:57.583309 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.583389 139740839778112 layer_utils.py:189] tf.compat.v1.squeeze (TFOpLambd (None,)              0           dense_2[0][0]                    
I1204 11:33:57.583432 139740839778112 layer_utils.py:256] ==================================================================================================
I1204 11:33:57.583687 139740839778112 layer_utils.py:267] Total params: 40
I1204 11:33:57.583751 139740839778112 layer_utils.py:268] Trainable params: 40
I1204 11:33:57.583794 139740839778112 layer_utils.py:269] Non-trainable params: 0
I1204 11:33:57.583832 139740839778112 layer_utils.py:270] __________________________________________________________________________________________________
I1204 11:33:57.649701 139740839778112 layer_utils.py:191] Model: "model_1"
I1204 11:33:57.649825 139740839778112 layer_utils.py:192] __________________________________________________________________________________________________
I1204 11:33:57.649878 139740839778112 layer_utils.py:189] Layer (type)                    Output Shape         Param #     Connected to                     
I1204 11:33:57.649932 139740839778112 layer_utils.py:194] ==================================================================================================
I1204 11:33:57.650066 139740839778112 layer_utils.py:189] pickup_latitude_xf (InputLayer) [(None,)]            0                                            
I1204 11:33:57.650120 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.650207 139740839778112 layer_utils.py:189] trip_miles_xf (InputLayer)      [(None,)]            0                                            
I1204 11:33:57.650259 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.650356 139740839778112 layer_utils.py:189] trip_start_hour_xf (InputLayer) [(None,)]            0                                            
I1204 11:33:57.650398 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.650552 139740839778112 layer_utils.py:189] dense_features_2 (DenseFeatures (None, 1)            0           pickup_latitude_xf[0][0]         
I1204 11:33:57.650603 139740839778112 layer_utils.py:189]                                                                  trip_miles_xf[0][0]              
I1204 11:33:57.650644 139740839778112 layer_utils.py:189]                                                                  trip_start_hour_xf[0][0]         
I1204 11:33:57.650682 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.650812 139740839778112 layer_utils.py:189] dense_3 (Dense)                 (None, 1)            2           dense_features_2[0][0]           
I1204 11:33:57.650864 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.651007 139740839778112 layer_utils.py:189] dense_features_3 (DenseFeatures (None, 34)           0           pickup_latitude_xf[0][0]         
I1204 11:33:57.651061 139740839778112 layer_utils.py:189]                                                                  trip_miles_xf[0][0]              
I1204 11:33:57.651102 139740839778112 layer_utils.py:189]                                                                  trip_start_hour_xf[0][0]         
I1204 11:33:57.651146 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.651229 139740839778112 layer_utils.py:189] concatenate_1 (Concatenate)     (None, 35)           0           dense_3[0][0]                    
I1204 11:33:57.651274 139740839778112 layer_utils.py:189]                                                                  dense_features_3[0][0]           
I1204 11:33:57.651311 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.651462 139740839778112 layer_utils.py:189] dense_4 (Dense)                 (None, 1)            36          concatenate_1[0][0]              
I1204 11:33:57.651547 139740839778112 layer_utils.py:258] __________________________________________________________________________________________________
I1204 11:33:57.651632 139740839778112 layer_utils.py:189] tf.compat.v1.squeeze_1 (TFOpLam (None,)              0           dense_4[0][0]                    
I1204 11:33:57.651675 139740839778112 layer_utils.py:256] ==================================================================================================
I1204 11:33:57.651959 139740839778112 layer_utils.py:267] Total params: 38
I1204 11:33:57.652019 139740839778112 layer_utils.py:268] Trainable params: 38
I1204 11:33:57.652061 139740839778112 layer_utils.py:269] Non-trainable params: 0
I1204 11:33:57.652098 139740839778112 layer_utils.py:270] __________________________________________________________________________________________________
INFO:tensorflow:time(__main__.ModelTest.testBuildKerasModel): 0.84s
I1204 11:33:57.652639 139740839778112 test_util.py:2076] time(__main__.ModelTest.testBuildKerasModel): 0.84s
[       OK ] ModelTest.testBuildKerasModel
[ RUN      ] ModelTest.test_session
[  SKIPPED ] ModelTest.test_session
----------------------------------------------------------------------
Ran 2 tests in 0.836s

OK (skipped=1)

Шаг 4. Запустите свой первый конвейер TFX.

Компоненты в трубопроводе TFX будет генерировать выходы для каждого запуска , как Артефакты ML метаданных , и они должны быть где - то хранить. Вы можете использовать любое хранилище, к которому может получить доступ кластер KFP, и для этого примера мы будем использовать Google Cloud Storage (GCS). Корзина GCS по умолчанию должна была быть создана автоматически. Его имя будет <your-project-id>-kubeflowpipelines-default .

Давайте загрузим наши образцы данных в корзину GCS, чтобы мы могли использовать их в нашем конвейере позже.

gsutil cp data/data.csv gs://{GOOGLE_CLOUD_PROJECT}-kubeflowpipelines-default/tfx-template/data/taxi/data.csv
BucketNotFoundException: 404 gs://tf-benchmark-dashboard-kubeflowpipelines-default bucket does not exist.

Давайте создадим TFX трубопровода с помощью tfx pipeline create команду.

!tfx pipeline create  --pipeline-path=kubeflow_runner.py --endpoint={ENDPOINT} \
--build-image
CLI
Usage: tfx pipeline create [OPTIONS]
Try 'tfx pipeline create --help' for help.

Error: no such option: --build-image

При создании трубопровода, Dockerfile будет сгенерирован построить Docker изображение. Не забудьте добавить его в систему управления версиями (например, git) вместе с другими исходными файлами.

Теперь начать выполнение запуска с вновь созданным трубопроводом с помощью tfx run create команду.

tfx run create --pipeline-name={PIPELINE_NAME} --endpoint={ENDPOINT}
CLI
Creating a run for pipeline: my_pipeline
Detected Beam.
[WARNING] Default engine will be changed to "local" in the near future.
Use --engine flag if you intend to use a different orchestrator.
Pipeline "my_pipeline" does not exist.

Или вы также можете запустить конвейер на панели инструментов KFP. Новый запуск будет указан в разделе «Эксперименты» на панели инструментов KFP. Щелчок по эксперименту позволит вам отслеживать прогресс и визуализировать артефакты, созданные во время выполнения.

Однако мы рекомендуем посетить панель управления KFP. Вы можете получить доступ к панели управления KFP из меню Cloud AI Platform Pipelines в Google Cloud Console. Зайдя на панель управления, вы сможете найти конвейер и получить доступ к огромному количеству информации о конвейере. Например, вы можете найти ваш работают в меню Экспериментов, и при открытии выполнения запустить под экспериментами вы можете найти все артефакты из трубопровода под меню Артефактов.

Один из основных источников сбоя - проблемы, связанные с разрешениями. Убедитесь, что у вашего кластера KFP есть разрешения на доступ к API Google Cloud. Это может быть настроено при создании КФПА кластера в опорных точках , или см Troubleshooting документа в GCP .

Шаг 5. Добавьте компоненты для проверки данных.

На этом этапе вы будете добавлять компоненты для проверки достоверности данных , включая StatisticsGen , SchemaGen и ExampleValidator . Если вы заинтересованы в проверке достоверности данных, пожалуйста , см Начало работы с Tensorflow Data Validation .

Двойной щелчок по каталогу изменения к pipeline и дважды щелкните еще раз , чтобы открытый pipeline.py . Найти и раскомментируйте 3 линии , которые добавляют StatisticsGen , SchemaGen и ExampleValidator к трубопроводу. (Подсказка: поиск для комментариев , содержащих TODO(step 5): ). Убедитесь в том , чтобы сохранить pipeline.py после редактирования.

Теперь вам нужно обновить существующий конвейер с измененным определением конвейера. Используйте tfx pipeline update команду , чтобы обновить трубопровод, а затем в tfx run create команду для создания нового исполнения пробега обновленного трубопровода.

# Update the pipeline
!tfx pipeline update \
--pipeline-path=kubeflow_runner.py \
--endpoint={ENDPOINT}
# You can run the pipeline the same way.
!tfx run create --pipeline-name {PIPELINE_NAME} --endpoint={ENDPOINT}
CLI
Updating pipeline
Detected Beam.
[WARNING] Default engine will be changed to "local" in the near future.
Use --engine flag if you intend to use a different orchestrator.
beam runner not found in dsl.
CLI
Creating a run for pipeline: my_pipeline
Detected Beam.
[WARNING] Default engine will be changed to "local" in the near future.
Use --engine flag if you intend to use a different orchestrator.
Pipeline "my_pipeline" does not exist.

Проверить выходы трубопровода

Посетите панель управления KFP, чтобы найти выходные данные конвейера на странице для вашего запуска конвейера. Перейдите на вкладку Эксперименты слева, и все пробеги на странице Experiments. Вы должны найти последний запуск по имени вашего конвейера.

Шаг 6. Добавьте компоненты для обучения.

На этом этапе вы будете добавлять компоненты для подготовки и проверки модели , включая Transform , Trainer , Resolver , Evaluator и Pusher .

Дважды щелкните , чтобы открыть pipeline.py . Найти и раскомментируйте 5 линий , которые добавляют Transform , Trainer , Resolver , Evaluator и Pusher к трубопроводу. (Подсказка: поиск TODO(step 6): )

Как и раньше, теперь вам нужно обновить существующий конвейер с помощью измененного определения конвейера. Инструкции такие же , как Шаг 5. Обновление трубопровода с помощью tfx pipeline update , а также создать прогон исполнения , используя tfx run create .

!tfx pipeline update \
--pipeline-path=kubeflow_runner.py \
--endpoint={ENDPOINT}
!tfx run create --pipeline-name {PIPELINE_NAME} --endpoint={ENDPOINT}
CLI
Updating pipeline
Detected Beam.
[WARNING] Default engine will be changed to "local" in the near future.
Use --engine flag if you intend to use a different orchestrator.
beam runner not found in dsl.
CLI
Creating a run for pipeline: my_pipeline
Detected Beam.
[WARNING] Default engine will be changed to "local" in the near future.
Use --engine flag if you intend to use a different orchestrator.
Pipeline "my_pipeline" does not exist.

Когда этот запуск завершается успешно, вы создали и запускаете свой первый конвейер TFX в конвейерах платформы AI!

Шаг 7. (Необязательно) Попробуйте BigQueryExampleGen

BigQuery является бессерверным, масштабируемым и рентабельным складом облака данных. BigQuery можно использовать в качестве источника для обучающих примеров в TFX. На этом шаге мы добавим BigQueryExampleGen к трубопроводу.

Дважды щелкните , чтобы открыть pipeline.py . Закомментируйте CsvExampleGen и раскомментируйте строки , которая создает экземпляр BigQueryExampleGen . Кроме того, необходимо раскомментировать query аргумент create_pipeline функции.

Нам нужно определить , какой GCP проект использовать для BigQuery, и это делается путем установки --project в beam_pipeline_args при создании трубопровода.

Дважды щелкните , чтобы открыть configs.py . Раскоментируйте определение GOOGLE_CLOUD_REGION , BIG_QUERY_WITH_DIRECT_RUNNER_BEAM_PIPELINE_ARGS и BIG_QUERY_QUERY . Вы должны заменить значение региона в этом файле на правильные значения для вашего проекта GCP.

Сменить каталог на один уровень выше. Щелкните имя каталога над списком файлов. Имя каталога является название трубопровода , который my_pipeline , если вы не изменяли.

Дважды щелкните , чтобы открыть kubeflow_runner.py . Раскомментируйте два аргумента, query и beam_pipeline_args , для create_pipeline функции.

Теперь конвейер готов к использованию BigQuery в качестве примера источника. Обновите конвейер, как раньше, и создайте новый запуск выполнения, как мы делали на шагах 5 и 6.

!tfx pipeline update \
--pipeline-path=kubeflow_runner.py \
--endpoint={ENDPOINT}
!tfx run create --pipeline-name {PIPELINE_NAME} --endpoint={ENDPOINT}
CLI
Updating pipeline
Detected Beam.
[WARNING] Default engine will be changed to "local" in the near future.
Use --engine flag if you intend to use a different orchestrator.
beam runner not found in dsl.
CLI
Creating a run for pipeline: my_pipeline
Detected Beam.
[WARNING] Default engine will be changed to "local" in the near future.
Use --engine flag if you intend to use a different orchestrator.
Pipeline "my_pipeline" does not exist.

Шаг 8. (Необязательно) Попробуйте DataFlow с КДР

Несколько TFX компоненты используют Apache Beam для реализации трубопроводов данных Параллельных, и это означает , что вы можете распределить обработку задач обработки данных с помощью Google Cloud DataFlow . На этом этапе мы настроим оркестратор Kubeflow на использование потока данных в качестве серверной части обработки данных для Apache Beam.

Дважды щелкните pipeline в каталог изменений и дважды щелкните , чтобы открыть configs.py . Раскоментируйте определение GOOGLE_CLOUD_REGION и DATAFLOW_BEAM_PIPELINE_ARGS .

Сменить каталог на один уровень выше. Щелкните имя каталога над списком файлов. Имя каталога является название трубопровода , который my_pipeline , если вы не изменяли.

Дважды щелкните , чтобы открыть kubeflow_runner.py . Раскомментируйте beam_pipeline_args . (Также убедитесь , что закомментировать текущие beam_pipeline_args , что вы добавили в шаге 7.)

Теперь конвейер готов к использованию Dataflow. Обновите конвейер и создайте запуск выполнения, как мы делали на шагах 5 и 6.

!tfx pipeline update \
--pipeline-path=kubeflow_runner.py \
--endpoint={ENDPOINT}
!tfx run create --pipeline-name {PIPELINE_NAME} --endpoint={ENDPOINT}
CLI
Updating pipeline
Detected Beam.
[WARNING] Default engine will be changed to "local" in the near future.
Use --engine flag if you intend to use a different orchestrator.
beam runner not found in dsl.
CLI
Creating a run for pipeline: my_pipeline
Detected Beam.
[WARNING] Default engine will be changed to "local" in the near future.
Use --engine flag if you intend to use a different orchestrator.
Pipeline "my_pipeline" does not exist.

Вы можете найти свои рабочие места DataFlow в DataFlow в облаке консоли .

Шаг 9. (Необязательно) Попробуйте Cloud Platform AI Обучение и прогнозирование с КДР

TFX Взаимодействует с несколькими управляемых GCP услуг, таких как облако AI платформы для подготовки и прогнозирования . Вы можете настроить свой Trainer компонент использовать Cloud Platform AI Обучение, управляемый сервис для подготовки моделей ML. Кроме того, если ваша модель построена и готова к показу, вы можете нажать на модель Cloud Platform AI прогнозирования для обслуживания. На этом шаге мы установим наш Trainer и Pusher компонент использовать услуги Cloud Platform AI.

Перед редактированием файлов, вы можете сначала включить AI Platform Training & Prediction API.

Дважды щелкните pipeline в каталог изменений и дважды щелкните , чтобы открыть configs.py . Раскоментируйте определение GOOGLE_CLOUD_REGION , GCP_AI_PLATFORM_TRAINING_ARGS и GCP_AI_PLATFORM_SERVING_ARGS . Мы будем использовать наш собственный встроенный контейнер изображения для обучения модели в облаке AI Platform обучения, поэтому мы должны установить masterConfig.imageUri в GCP_AI_PLATFORM_TRAINING_ARGS к тому же значению, CUSTOM_TFX_IMAGE выше.

Изменение каталога на один уровень вверх, а затем дважды щелкните , чтобы открыть kubeflow_runner.py . Раскомментируйте ai_platform_training_args и ai_platform_serving_args .

Обновите конвейер и создайте запуск выполнения, как мы делали на шагах 5 и 6.

!tfx pipeline update \
--pipeline-path=kubeflow_runner.py \
--endpoint={ENDPOINT}
!tfx run create --pipeline-name {PIPELINE_NAME} --endpoint={ENDPOINT}
CLI
Updating pipeline
Detected Beam.
[WARNING] Default engine will be changed to "local" in the near future.
Use --engine flag if you intend to use a different orchestrator.
beam runner not found in dsl.
CLI
Creating a run for pipeline: my_pipeline
Detected Beam.
[WARNING] Default engine will be changed to "local" in the near future.
Use --engine flag if you intend to use a different orchestrator.
Pipeline "my_pipeline" does not exist.

Вы можете найти свои учебные работы в облаке AI Platform Работа . Если трубопровод успешно завершена, вы можете найти свою модель в Cloud Platform AI моделей .

Шаг 10. Загрузите ВАШИ данные в конвейер.

Мы создали конвейер для модели, используя набор данных Chicago Taxi. Пришло время поместить ваши данные в конвейер.

Ваши данные могут храниться везде, где есть доступ к вашему конвейеру, включая GCS или BigQuery. Вам нужно будет изменить определение конвейера для доступа к вашим данным.

  1. Если ваши данные хранятся в файлах, изменять DATA_PATH в kubeflow_runner.py или local_runner.py и установить его на место ваших файлов. Если ваши данные хранятся в BigQuery, изменять BIG_QUERY_QUERY в pipeline/configs.py правильно запрос для ваших данных.
  2. Добавить компоненты в models/features.py .
  3. Изменение models/preprocessing.py для преобразования входных данных для обучения .
  4. Изменение models/keras/model.py и models/keras/constants.py , чтобы описать модель ML .
    • Вы также можете использовать модель, основанную на оценке. Изменение RUN_FN константа models.estimator.model.run_fn в pipeline/configs.py .

Пожалуйста , смотрите компонент руководство Trainer для внедрения более.

Убираться

Для того, чтобы очистить все ресурсы Google Cloud , используемые в этом проекте, вы можете удалить проект Google Cloud вы использовали для урока.

Кроме того, вы можете очистить отдельные ресурсы, посетив каждую консоль: