Создание конвейера TFX с использованием шаблонов с помощью Beam orchestrator

Введение

Этот документ будет содержать инструкции по созданию TensorFlow Extended (TFX) трубопровод , используя шаблоны , которые предоставляются с пакетом TFX Python. Большинство команд являются командами Linux оболочки, и соответствующего Jupyter клетки кода Notebook , которые ссылаются на эти команды с помощью ! предоставлены.

Вы будете строить трубопровод с помощью такси Trips набора данных выпущен городом Чикаго. Мы настоятельно рекомендуем вам попытаться создать свой собственный конвейер, используя свой набор данных, используя этот конвейер в качестве основы.

Мы будем строить трубопровод с помощью Apache Beam Orchestrator . Если вы заинтересованы в использовании Kubeflow Orchestrator на Google Cloud, пожалуйста , см TFX на Cloud AI Платформа трубопроводов учебник .

Предпосылки

  • Linux / MacOS
  • Python> = 3.5.3

Вы можете получить все предпосылки легко работает ноутбук на Google Colab .

Шаг 1. Настройте свою среду.

В этом документе мы дважды представим команды. Один раз как команду оболочки, готовую к копированию и вставке, один раз как ячейку записной книжки jupyter. Если вы используете Colab, просто пропустите блок сценария оболочки и выполните ячейки записной книжки.

Вы должны подготовить среду разработки для построения конвейера.

Установить tfx пакет питона. Мы рекомендуем использовать virtualenv в локальной среде. Вы можете использовать следующий фрагмент сценария оболочки для настройки своей среды.

# Create a virtualenv for tfx.
virtualenv -p python3 venv
source venv/bin/activate
# Install python packages.
python -m pip install -q --user --upgrade tfx==0.23.0

Если вы используете колаб:

import sys
!{sys.executable} -m pip install -q --user --upgrade -q tfx==0.23.0

ОШИБКА: some-package 0.some_version.1 требует other-package! = 2.0., <3,> = 1.15, но у вас будет другой пакет 2.0.0, который несовместим.

Пожалуйста, игнорируйте эти ошибки в данный момент.

# Set `PATH` to include user python binary directory.
HOME=%env HOME
PATH=%env PATH
%env PATH={PATH}:{HOME}/.local/bin
env: PATH=/tmpfs/src/tf_docs_env/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/games:/usr/local/games:/snap/bin:/opt/puppetlabs/bin:/opt/android-studio/current/bin:/usr/local/go/bin:/usr/local/go/packages/bin:/opt/kubernetes/client/bin/:/home/kbuilder/.local/bin:/home/kbuilder/.local/bin

Проверим версию TFX.

python -c "import tfx; print('TFX version: {}'.format(tfx.__version__))"
python3 -c "import tfx; print('TFX version: {}'.format(tfx.__version__))"
TFX version: 0.23.0

И это сделано. Мы готовы создать конвейер.

Шаг 2. Скопируйте предопределенный шаблон в каталог вашего проекта.

На этом этапе мы создадим каталог и файлы проекта рабочего конвейера, скопировав дополнительные файлы из предопределенного шаблона.

Вы можете дать вашему трубопроводному другое имя, изменив PIPELINE_NAME ниже. Это также станет именем каталога проекта, в который будут помещены ваши файлы.

export PIPELINE_NAME="my_pipeline"
export PROJECT_DIR=~/tfx/${PIPELINE_NAME}
PIPELINE_NAME="my_pipeline"
import os
# Create a project directory under Colab content directory.
PROJECT_DIR=os.path.join(os.sep,"content",PIPELINE_NAME)

TFX включает taxi шаблон с пакетом TFX питона. Если вы планируете решить задачу точечного прогнозирования, включая классификацию и регрессию, этот шаблон можно использовать в качестве отправной точки.

В tfx template copy CLI команда копирует файлы шаблонов предопределены в каталог проекта.

tfx template copy \
   --pipeline_name="${PIPELINE_NAME}" \
   --destination_path="${PROJECT_DIR}" \
   --model=taxi
!tfx template copy \
  --pipeline_name={PIPELINE_NAME} \
  --destination_path={PROJECT_DIR} \
  --model=taxi
2020-09-07 09:09:40.131982: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
CLI
Copying taxi pipeline template
Traceback (most recent call last):
  File "/home/kbuilder/.local/bin/tfx", line 8, in <module>
    sys.exit(cli_group())
  File "/home/kbuilder/.local/lib/python3.6/site-packages/click/core.py", line 829, in __call__
    return self.main(*args, **kwargs)
  File "/home/kbuilder/.local/lib/python3.6/site-packages/click/core.py", line 782, in main
    rv = self.invoke(ctx)
  File "/home/kbuilder/.local/lib/python3.6/site-packages/click/core.py", line 1259, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/home/kbuilder/.local/lib/python3.6/site-packages/click/core.py", line 1259, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/home/kbuilder/.local/lib/python3.6/site-packages/click/core.py", line 1066, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/home/kbuilder/.local/lib/python3.6/site-packages/click/core.py", line 610, in invoke
    return callback(*args, **kwargs)
  File "/home/kbuilder/.local/lib/python3.6/site-packages/click/decorators.py", line 73, in new_func
    return ctx.invoke(f, obj, *args, **kwargs)
  File "/home/kbuilder/.local/lib/python3.6/site-packages/click/core.py", line 610, in invoke
    return callback(*args, **kwargs)
  File "/home/kbuilder/.local/lib/python3.6/site-packages/tfx/tools/cli/commands/template.py", line 73, in copy
    template_handler.copy_template(ctx.flags_dict)
  File "/home/kbuilder/.local/lib/python3.6/site-packages/tfx/tools/cli/handler/template_handler.py", line 168, in copy_template
    replace_dict)
  File "/home/kbuilder/.local/lib/python3.6/site-packages/tfx/tools/cli/handler/template_handler.py", line 107, in _copy_and_replace_placeholder_dir
    tf.io.gfile.makedirs(dst)
  File "/tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow/python/lib/io/file_io.py", line 480, in recursive_create_dir_v2
    _pywrap_file_io.RecursivelyCreateDir(compat.as_bytes(path))
tensorflow.python.framework.errors_impl.PermissionDeniedError: /content; Permission denied

Измените контекст рабочего каталога в этой записной книжке на каталог проекта.

cd ${PROJECT_DIR}
%cd {PROJECT_DIR}
[Errno 2] No such file or directory: '/content/my_pipeline'
/tmpfs/src/temp/docs/tutorials/tfx

Шаг 3. Просмотрите скопированные исходные файлы.

Шаблон TFX предоставляет базовые файлы скаффолда для построения конвейера, включая исходный код Python, образцы данных и блокноты Jupyter для анализа выходных данных конвейера. taxi шаблон использует тот же набор данных Чикаго такси и модель ML как Airflow Учебник .

В Google Colab вы можете просматривать файлы, щелкая значок папки слева. Файлы должны быть скопированы под directoy проекта, имя которого my_pipeline в этом случае. Вы можете щелкнуть имена каталогов, чтобы просмотреть содержимое каталога, и дважды щелкнуть имена файлов, чтобы открыть их.

Вот краткое введение в каждый из файлов Python.

  • pipeline - Этот каталог содержит определение трубопровода
    • configs.py - определяет общие константы для бегунов трубопровода
    • pipeline.py - определяет TFX компоненты и трубопроводов
  • models - Этот каталог содержит определение модели ML.
    • features.py , features_test.py - определяет функции для модели
    • preprocessing.py , preprocessing_test.py - определяет предварительной обработки заданий с помощью tf::Transform
    • estimator - Этот каталог содержит модель , основанный оценщика.
      • constants.py - определяет константы модели
      • model.py , model_test.py - определяет DNN модель с использованием оценщика TF
    • keras - Этот каталог содержит модель , основанную Keras.
      • constants.py - определяет константы модели
      • model.py , model_test.py - определяет модель DNN с помощью Keras
  • beam_dag_runner.py , kubeflow_dag_runner.py - определить бегуны для каждого оркестровки двигателя

Вы можете заметить , что есть некоторые файлы с _test.py на их имя. Это модульные тесты конвейера, и рекомендуется добавлять дополнительные модульные тесты по мере реализации ваших собственных конвейеров. Вы можете запустить юнит - тесты, указав имя модуля тестовых файлов с -m флагом. Обычно вы можете получить имя модуля, удалив .py расширения и замену / с . . Например:

python -m models.features_test
{sys.executable} -m models.features_test
{sys.executable} -m models.keras.model_test
/tmpfs/src/tf_docs_env/bin/python: Error while finding module specification for 'models.features_test' (ModuleNotFoundError: No module named 'models')
/tmpfs/src/tf_docs_env/bin/python: Error while finding module specification for 'models.keras.model_test' (ModuleNotFoundError: No module named 'models')

Шаг 4. Запустите свой первый конвейер TFX.

Вы можете создать трубопровод с помощью pipeline create команду.

tfx pipeline create --engine=beam --pipeline_path=beam_dag_runner.py
tfx pipeline create --engine=beam --pipeline_path=beam_dag_runner.py
2020-09-07 09:09:45.612839: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
CLI
Creating pipeline
Invalid pipeline path: beam_dag_runner.py

Затем, вы можете запустить созданный трубопровод , используя run create команды.

tfx run create --engine=beam --pipeline_name="${PIPELINE_NAME}"
tfx run create --engine=beam --pipeline_name={PIPELINE_NAME}
2020-09-07 09:09:50.725339: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
CLI
Creating a run for pipeline: my_pipeline
Pipeline "my_pipeline" does not exist.

В случае успеха, вы увидите Component CsvExampleGen is finished. Когда вы копируете шаблон, в конвейер включается только один компонент, CsvExampleGen.

Шаг 5. Добавьте компоненты для проверки данных.

На этом этапе вы будете добавлять компоненты для проверки достоверности данных , включая StatisticsGen , SchemaGen и ExampleValidator . Если вы заинтересованы в проверке достоверности данных, пожалуйста , см Начало работы с Tensorflow Data Validation .

Мы доработаем скопированное определение трубопровода в pipeline/pipeline.py . Если вы работаете в своей локальной среде, используйте свой любимый редактор для редактирования файла. Если вы работаете над Google Colab,

Нажмите значок папки слева для открытых Files зрения.

Нажмите my_pipeline , чтобы открыть каталог и нажмите pipeline каталог , чтобы открыть и дважды щелкните pipeline.py , чтобы открыть файл.

Найти и раскомментируйте 3 линии , которые добавляют StatisticsGen , SchemaGen и ExampleValidator к трубопроводу. (Подсказка: найти комментарии , содержащие TODO(step 5): ).

Ваше изменение будет автоматически сохранено через несколько секунд. Убедитесь , что * знак перед pipeline.py исчез в названии вкладки. В Colab нет кнопки сохранения или ярлыка для редактора файлов. Файлы Python в редакторе файлов могут быть сохранены в среде выполнения даже в playground режиме.

Теперь вам нужно обновить существующий конвейер с измененным определением конвейера. Используйте tfx pipeline update команду , чтобы обновить трубопровод, а затем в tfx run create команду для создания нового исполнения пробега обновленного трубопровода.

# Update the pipeline
tfx pipeline update --engine=beam --pipeline_path=beam_dag_runner.py
# You can run the pipeline the same way.
tfx run create --engine beam --pipeline_name "${PIPELINE_NAME}"
# Update the pipeline
tfx pipeline update --engine=beam --pipeline_path=beam_dag_runner.py
# You can run the pipeline the same way.
tfx run create --engine beam --pipeline_name {PIPELINE_NAME}
2020-09-07 09:09:55.915484: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
CLI
Updating pipeline
Invalid pipeline path: beam_dag_runner.py
2020-09-07 09:10:01.148250: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
CLI
Creating a run for pipeline: my_pipeline
Pipeline "my_pipeline" does not exist.

Вы должны увидеть выходной журнал добавленных компонентов. Наш трубопровод создает выходные артефакты в tfx_pipeline_output/my_pipeline каталоге.

Шаг 6. Добавьте компоненты для обучения.

На этом этапе вы будете добавлять компоненты для подготовки и проверки модели , включая Transform , Trainer , ResolverNode , Evaluator и Pusher .

Открытый pipeline/pipeline.py . Найти и раскомментируйте 5 линий , которые добавляют Transform , Trainer , ResolverNode , Evaluator и Pusher к трубопроводу. (Подсказка: находка TODO(step 6): )

Как и раньше, теперь вам нужно обновить существующий конвейер с помощью измененного определения конвейера. Инструкции такие же , как Шаг 5. Обновление трубопровода с помощью tfx pipeline update , а также создать прогон исполнения , используя tfx run create .

tfx pipeline update --engine=beam --pipeline_path=beam_dag_runner.py
tfx run create --engine beam --pipeline_name "${PIPELINE_NAME}"
tfx pipeline update --engine=beam --pipeline_path=beam_dag_runner.py
tfx run create --engine beam --pipeline_name {PIPELINE_NAME}
2020-09-07 09:10:06.281753: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
CLI
Updating pipeline
Invalid pipeline path: beam_dag_runner.py
2020-09-07 09:10:11.333668: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
CLI
Creating a run for pipeline: my_pipeline
Pipeline "my_pipeline" does not exist.

Когда этот запуск завершается успешно, вы создали и запустили свой первый конвейер TFX с помощью Beam orchestrator!

Шаг 7. (Необязательно) Попробуйте BigQueryExampleGen.

[BigQuery] - это бессерверное, хорошо масштабируемое и экономичное облачное хранилище данных. BigQuery можно использовать в качестве источника для обучающих примеров в TFX. На этом шаге мы добавим BigQueryExampleGen к трубопроводу.

Вам нужен Google Cloud Platform счет использования BigQuery. Пожалуйста, подготовьте проект GCP.

Войти в свой проект с помощью colab библиотеки аутентификации или gcloud утилиты.

# You need `gcloud` tool to login in local shell environment.
gcloud auth login
if 'google.colab' in sys.modules:
  from google.colab import auth
  auth.authenticate_user()
  print('Authenticated')

Для доступа к ресурсам BigQuery с помощью TFX необходимо указать имя проекта GCP. Установить GOOGLE_CLOUD_PROJECT переменных сред с названием проекта.

export GOOGLE_CLOUD_PROJECT=YOUR_PROJECT_NAME_HERE
# Set your project name below.
# WARNING! ENTER your project name before running this cell.
%env GOOGLE_CLOUD_PROJECT=YOUR_PROJECT_NAME_HERE
env: GOOGLE_CLOUD_PROJECT=YOUR_PROJECT_NAME_HERE

Открытый pipeline/pipeline.py . Закомментируйте CsvExampleGen и раскомментируйте строку , которые создают экземпляр BigQueryExampleGen . Кроме того, необходимо раскомментировать query аргумент create_pipeline функции.

Нам нужно определить , какой GCP проект использовать для BigQuery снова, и это делается путем установки --project в beam_pipeline_args при создании трубопровода.

Открытый pipeline/configs.py . Раскоментируйте определение BIG_QUERY__WITH_DIRECT_RUNNER_BEAM_PIPELINE_ARGS и BIG_QUERY_QUERY . Вы должны заменить идентификатор проекта и значение региона в этом файле на правильные значения для вашего проекта GCP.

Открыть beam_dag_runner.py . Раскомментируйте два аргумента, query и beam_pipeline_args , для метода create_pipeline ().

Теперь конвейер готов к использованию BigQuery в качестве примера источника. Обновите конвейер и создайте прогон, как мы делали на шагах 5 и 6.

tfx pipeline update --engine=beam --pipeline_path=beam_dag_runner.py
tfx run create --engine beam --pipeline_name {PIPELINE_NAME}
2020-09-07 09:10:16.406635: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
CLI
Updating pipeline
Invalid pipeline path: beam_dag_runner.py
2020-09-07 09:10:21.439101: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
CLI
Creating a run for pipeline: my_pipeline
Pipeline "my_pipeline" does not exist.

Что дальше: загрузите ВАШИ данные в конвейер.

Мы создали конвейер для модели, используя набор данных Chicago Taxi. Пришло время поместить ваши данные в конвейер.

Ваши данные могут храниться везде, где есть доступ к вашему конвейеру, включая GCS или BigQuery. Вам нужно будет изменить определение конвейера для доступа к вашим данным.

  1. Если ваши данные хранятся в файлах, изменять DATA_PATH в kubeflow_dag_runner.py или beam_dag_runner.py и установить его на место ваших файлов. Если ваши данные хранятся в BigQuery, изменять BIG_QUERY_QUERY в pipeline/configs.py правильно запрос для ваших данных.
  2. Добавить компоненты в models/features.py .
  3. Изменение models/preprocessing.py для преобразования входных данных для обучения .
  4. Изменение models/keras/model.py и models/keras/constants.py , чтобы описать модель ML .
    • Вы также можете использовать модель, основанную на оценке. Изменение RUN_FN константа models.estimator.model.run_fn в pipeline/configs.py .

Пожалуйста , смотрите компонент руководство Trainer для внедрения более.