Beamオーケストレーターでテンプレートを使用してTFXパイプラインを作成します

序章

この文書では、TFX Pythonパッケージで提供されたテンプレートを使用して拡張TensorFlow(TFX)パイプラインを作成するための指示を提供します。命令のほとんどは、Linuxのシェルコマンドで、使用してこれらのコマンドを呼び出すJupyterノートコードセルを対応します!提供されています。

あなたは使用してパイプラインを構築しますタクシーをセットTripsのシカゴ市が発表しました。このパイプラインをベースラインとして利用して、データセットを使用して独自のパイプラインを構築することを強くお勧めします。

私たちは、使用してパイプラインを構築しますApacheのビームOrchestratorのを。あなたはKubeflowオーケストGoogleクラウド上での使用に興味がある場合は、以下を参照してくださいクラウドAIプラットフォームパイプラインのチュートリアルでTFXを

前提条件

  • Linux / MacOS
  • Python> = 3.5.3

あなたは簡単で、すべての前提条件を得ることができますGoogleのコラボで、このノートブックを実行しています

ステップ1.環境をセットアップします。

このドキュメント全体を通して、コマンドを2回示します。コピーアンドペースト対応のシェルコマンドとして1回、jupyterノートブックセルとして1回。 Colabを使用している場合は、シェルスクリプトブロックをスキップしてノートブックセルを実行するだけです。

パイプラインを構築するための開発環境を準備する必要があります。

インストールtfxのpythonパッケージを。私たちは、の使用をお勧めしますvirtualenvローカル環境で。次のシェルスクリプトスニペットを使用して、環境を設定できます。

# Create a virtualenv for tfx.
virtualenv -p python3 venv
source venv/bin/activate
# Install python packages.
python -m pip install -q --user --upgrade tfx==0.23.0

colabを使用している場合:

import sys
!{sys.executable} -m pip install -q --user --upgrade -q tfx==0.23.0

エラー:some-package 0.some_version.1にはother-package!= 2.0。、<3、> = 1.15の要件がありますが、互換性のないother-package2.0.0があります。

現時点では、これらのエラーは無視してください。

# Set `PATH` to include user python binary directory.
HOME=%env HOME
PATH=%env PATH
%env PATH={PATH}:{HOME}/.local/bin
env: PATH=/tmpfs/src/tf_docs_env/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/games:/usr/local/games:/snap/bin:/opt/puppetlabs/bin:/opt/android-studio/current/bin:/usr/local/go/bin:/usr/local/go/packages/bin:/opt/kubernetes/client/bin/:/home/kbuilder/.local/bin:/home/kbuilder/.local/bin

TFXのバージョンを確認してみましょう。

python -c "import tfx; print('TFX version: {}'.format(tfx.__version__))"
python3 -c "import tfx; print('TFX version: {}'.format(tfx.__version__))"
TFX version: 0.23.0

そして、それは完了です。パイプラインを作成する準備が整いました。

ステップ2.事前定義されたテンプレートをプロジェクトディレクトリにコピーします。

このステップでは、事前定義されたテンプレートから追加のファイルをコピーして、作業パイプラインプロジェクトディレクトリとファイルを作成します。

あなたは、変更することによって、あなたのパイプラインの別の名前を付けることがPIPELINE_NAME下回ります。これは、ファイルが配置されるプロジェクトディレクトリの名前にもなります。

export PIPELINE_NAME="my_pipeline"
export PROJECT_DIR=~/tfx/${PIPELINE_NAME}
PIPELINE_NAME="my_pipeline"
import os
# Create a project directory under Colab content directory.
PROJECT_DIR=os.path.join(os.sep,"content",PIPELINE_NAME)

TFXは、 taxi TFXのPythonパッケージでテンプレートを。分類や回帰など、ポイントごとの予測問題を解決することを計画している場合は、このテンプレートを開始点として使用できます。

tfx template copy CLIコマンドのコピーがプロジェクトディレクトリにテンプレートファイルをあらかじめ定義します。

tfx template copy \
   --pipeline_name="${PIPELINE_NAME}" \
   --destination_path="${PROJECT_DIR}" \
   --model=taxi
!tfx template copy \
  --pipeline_name={PIPELINE_NAME} \
  --destination_path={PROJECT_DIR} \
  --model=taxi
2020-09-07 09:09:40.131982: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
CLI
Copying taxi pipeline template
Traceback (most recent call last):
  File "/home/kbuilder/.local/bin/tfx", line 8, in <module>
    sys.exit(cli_group())
  File "/home/kbuilder/.local/lib/python3.6/site-packages/click/core.py", line 829, in __call__
    return self.main(*args, **kwargs)
  File "/home/kbuilder/.local/lib/python3.6/site-packages/click/core.py", line 782, in main
    rv = self.invoke(ctx)
  File "/home/kbuilder/.local/lib/python3.6/site-packages/click/core.py", line 1259, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/home/kbuilder/.local/lib/python3.6/site-packages/click/core.py", line 1259, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/home/kbuilder/.local/lib/python3.6/site-packages/click/core.py", line 1066, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/home/kbuilder/.local/lib/python3.6/site-packages/click/core.py", line 610, in invoke
    return callback(*args, **kwargs)
  File "/home/kbuilder/.local/lib/python3.6/site-packages/click/decorators.py", line 73, in new_func
    return ctx.invoke(f, obj, *args, **kwargs)
  File "/home/kbuilder/.local/lib/python3.6/site-packages/click/core.py", line 610, in invoke
    return callback(*args, **kwargs)
  File "/home/kbuilder/.local/lib/python3.6/site-packages/tfx/tools/cli/commands/template.py", line 73, in copy
    template_handler.copy_template(ctx.flags_dict)
  File "/home/kbuilder/.local/lib/python3.6/site-packages/tfx/tools/cli/handler/template_handler.py", line 168, in copy_template
    replace_dict)
  File "/home/kbuilder/.local/lib/python3.6/site-packages/tfx/tools/cli/handler/template_handler.py", line 107, in _copy_and_replace_placeholder_dir
    tf.io.gfile.makedirs(dst)
  File "/tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow/python/lib/io/file_io.py", line 480, in recursive_create_dir_v2
    _pywrap_file_io.RecursivelyCreateDir(compat.as_bytes(path))
tensorflow.python.framework.errors_impl.PermissionDeniedError: /content; Permission denied

このノートブックの作業ディレクトリコンテキストをプロジェクトディレクトリに変更します。

cd ${PROJECT_DIR}
%cd {PROJECT_DIR}
[Errno 2] No such file or directory: '/content/my_pipeline'
/tmpfs/src/temp/docs/tutorials/tfx

ステップ3.コピーしたソースファイルを参照します。

TFXテンプレートは、Pythonソースコード、サンプルデータ、パイプラインの出力を分析するJupyter Notebookなど、パイプラインを構築するための基本的なスキャフォールドファイルを提供します。 taxiテンプレートは、同じシカゴのタクシーデータセットおよびMLモデルを使用してエアフローチュートリアル

Google Colabでは、左側のフォルダアイコンをクリックしてファイルを参照できます。ファイルは、その名前であるプロジェクトdirectoy、下にコピーする必要がありますmy_pipelineこのケースでは。ディレクトリ名をクリックするとディレクトリの内容が表示され、ファイル名をダブルクリックして開くことができます。

ここでは、各Pythonファイルの簡単な紹介をします。

  • pipeline -このディレクトリには、パイプラインの定義が含まれています
    • configs.py -パイプラインランナーのための共通の定数を定義しています
    • pipeline.py -定義TFXコンポーネントおよびパイプライン
  • models -このディレクトリには、MLモデルの定義が含まれています。
    • features.pyfeatures_test.py -モデルの定義機能
    • preprocessing.pypreprocessing_test.py -使用してジョブを前処理定義tf::Transform
    • estimator -このディレクトリには、見積もりベースのモデルが含まれています。
      • constants.py -モデルの定義定数
      • model.pymodel_test.py - TF推定器を用いてDNNモデルを定義
    • keras -このディレクトリにはKerasベースのモデルが含まれています。
      • constants.py -モデルの定義定数
      • model.pymodel_test.py - Kerasを使用してDNNモデルを定義します
  • beam_dag_runner.pykubeflow_dag_runner.py -各オーケストレーションエンジンのランナーを定義

あなたは、といくつかのファイルがあることに気づくかもしれません_test.py自分の名前には。これらはパイプラインの単体テストであり、独自のパイプラインを実装するときに単体テストを追加することをお勧めします。あなたはとテストファイルのモジュール名を供給することにより、ユニットテストを実行することができ-mフラグ。あなたは通常、削除することにより、モジュール名を取得することができます.py拡張子を交換し、 /. 。例えば:

python -m models.features_test
{sys.executable} -m models.features_test
{sys.executable} -m models.keras.model_test
/tmpfs/src/tf_docs_env/bin/python: Error while finding module specification for 'models.features_test' (ModuleNotFoundError: No module named 'models')
/tmpfs/src/tf_docs_env/bin/python: Error while finding module specification for 'models.keras.model_test' (ModuleNotFoundError: No module named 'models')

ステップ4.最初のTFXパイプラインを実行します

あなたは使用してパイプラインを作成することができますpipeline createコマンドを使用します。

tfx pipeline create --engine=beam --pipeline_path=beam_dag_runner.py
tfx pipeline create --engine=beam --pipeline_path=beam_dag_runner.py
2020-09-07 09:09:45.612839: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
CLI
Creating pipeline
Invalid pipeline path: beam_dag_runner.py

次に、あなたが使用して作成したパイプラインを実行することができますrun create 、コマンドを。

tfx run create --engine=beam --pipeline_name="${PIPELINE_NAME}"
tfx run create --engine=beam --pipeline_name={PIPELINE_NAME}
2020-09-07 09:09:50.725339: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
CLI
Creating a run for pipeline: my_pipeline
Pipeline "my_pipeline" does not exist.

成功した場合は、表示されますComponent CsvExampleGen is finished.テンプレートをコピーすると、パイプラインに含まれるコンポーネントはCsvExampleGenの1つだけです。

ステップ5.データ検証用のコンポーネントを追加します。

このステップでは、などのデータ検証のためのコンポーネントを追加しますStatisticsGenSchemaGen 、およびExampleValidator 。あなたはデータの検証に興味がある場合は、以下を参照してくださいTensorflowデータの検証を使ってみましょう

私たちは、中コピーパイプライン定義修正するpipeline/pipeline.py 。ローカル環境で作業している場合は、お気に入りのエディターを使用してファイルを編集します。 Google Colabで作業している場合は、

オープンに左にフォルダアイコンをクリックしてFiles表示

クリックmy_pipelineディレクトリを開き、クリックするpipelineオープンとダブルクリックにディレクトリをpipeline.pyファイルを開きます

検索および追加3行コメントを解除StatisticsGenSchemaGen 、およびExampleValidatorパイプラインにします。 (ヒント:含むコメントを見つけるTODO(step 5):

変更は数秒で自動的に保存されます。ていることを確認してください*の前のマークpipeline.pyタブのタイトルで姿を消しました。 Colabにはファイルエディタの保存ボタンやショートカットはありません。ファイルエディタでPythonのファイルはさえにランタイム環境に保存することができplaygroundモード。

次に、パイプライン定義を変更して既存のパイプラインを更新する必要があります。使用tfx pipeline updateに続いて、あなたのパイプライン、更新するためのコマンドをtfx run create更新し、パイプラインの新しい実行実行を作成するコマンドを。

# Update the pipeline
tfx pipeline update --engine=beam --pipeline_path=beam_dag_runner.py
# You can run the pipeline the same way.
tfx run create --engine beam --pipeline_name "${PIPELINE_NAME}"
# Update the pipeline
tfx pipeline update --engine=beam --pipeline_path=beam_dag_runner.py
# You can run the pipeline the same way.
tfx run create --engine beam --pipeline_name {PIPELINE_NAME}
2020-09-07 09:09:55.915484: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
CLI
Updating pipeline
Invalid pipeline path: beam_dag_runner.py
2020-09-07 09:10:01.148250: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
CLI
Creating a run for pipeline: my_pipeline
Pipeline "my_pipeline" does not exist.

追加されたコンポーネントからの出力ログを確認できるはずです。当社のパイプラインは、出力成果物を作成しtfx_pipeline_output/my_pipelineディレクトリ。

ステップ6.トレーニング用のコンポーネントを追加します。

このステップでは、などのトレーニングおよびモデル検証のためにコンポーネントを追加しますTransformTrainerResolverNodeEvaluator 、およびPusher

オープンpipeline/pipeline.py 。検索と追加のコメントを解除5行TransformTrainerResolverNodeEvaluator及びPusherパイプラインに。 (ヒント:検索のTODO(step 6):

以前と同様に、変更されたパイプライン定義で既存のパイプラインを更新する必要があります。命令が使用してパイプラインステップ5アップデートと同じですtfx pipeline updateし、使用して実行し、実行作成tfx run create

tfx pipeline update --engine=beam --pipeline_path=beam_dag_runner.py
tfx run create --engine beam --pipeline_name "${PIPELINE_NAME}"
tfx pipeline update --engine=beam --pipeline_path=beam_dag_runner.py
tfx run create --engine beam --pipeline_name {PIPELINE_NAME}
2020-09-07 09:10:06.281753: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
CLI
Updating pipeline
Invalid pipeline path: beam_dag_runner.py
2020-09-07 09:10:11.333668: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
CLI
Creating a run for pipeline: my_pipeline
Pipeline "my_pipeline" does not exist.

この実行の実行が正常に終了すると、Beamオーケストレーターを使用して最初のTFXパイプラインを作成して実行できます。

ステップ7(オプション)BigQueryExampleGenを試してみてください。

[BigQuery]は、サーバーレスで、拡張性が高く、費用対効果の高いクラウドデータウェアハウスです。 BigQueryは、TFXのトレーニング例のソースとして使用できます。この手順では、追加されますBigQueryExampleGenパイプラインに。

あなたは必要とするGoogleのクラウドプラットフォームのBigQueryを使用するアカウントを。 GCPプロジェクトを準備してください。

コラボ認証ライブラリまたは使用してプロジェクトへのログインgcloudユーティリティを。

# You need `gcloud` tool to login in local shell environment.
gcloud auth login
if 'google.colab' in sys.modules:
  from google.colab import auth
  auth.authenticate_user()
  print('Authenticated')

TFXを使用してBigQueryリソースにアクセスするには、GCPプロジェクト名を指定する必要があります。セットGOOGLE_CLOUD_PROJECTプロジェクト名に環境変数。

export GOOGLE_CLOUD_PROJECT=YOUR_PROJECT_NAME_HERE
# Set your project name below.
# WARNING! ENTER your project name before running this cell.
%env GOOGLE_CLOUD_PROJECT=YOUR_PROJECT_NAME_HERE
env: GOOGLE_CLOUD_PROJECT=YOUR_PROJECT_NAME_HERE

オープンpipeline/pipeline.py 。コメントアウトCsvExampleGenのインスタンスを作成し、コメントを解除行BigQueryExampleGen 。また、コメントを解除する必要がありqueryの引数create_pipeline機能。

私たちは再びBigQueryのに使用するGCPプロジェクトを指定する必要があり、これは設定することによって行われる--projectbeam_pipeline_argsパイプラインを作成するとき。

オープンpipeline/configs.py 。コメントを解除の定義BIG_QUERY__WITH_DIRECT_RUNNER_BEAM_PIPELINE_ARGSBIG_QUERY_QUERY 。このファイルのプロジェクトIDとリージョンの値をGCPプロジェクトの正しい値に置き換える必要があります。

オープンbeam_dag_runner.py 。コメントを外して二つの引数、 queryおよびbeam_pipeline_args create_pipeline()メソッドのために、。

これで、パイプラインはサンプルソースとしてBigQueryを使用する準備が整いました。手順5と6で行ったように、パイプラインを更新して実行を作成します。

tfx pipeline update --engine=beam --pipeline_path=beam_dag_runner.py
tfx run create --engine beam --pipeline_name {PIPELINE_NAME}
2020-09-07 09:10:16.406635: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
CLI
Updating pipeline
Invalid pipeline path: beam_dag_runner.py
2020-09-07 09:10:21.439101: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
CLI
Creating a run for pipeline: my_pipeline
Pipeline "my_pipeline" does not exist.

次のステップ:データをパイプラインに取り込みます。

ChicagoTaxiデータセットを使用してモデルのパイプラインを作成しました。次に、データをパイプラインに入れます。

データは、GCSやBigQueryなど、パイプラインがアクセスできる場所であればどこにでも保存できます。データにアクセスするには、パイプライン定義を変更する必要があります。

  1. あなたのデータがファイルに保存されている場合は、変更DATA_PATHkubeflow_dag_runner.pybeam_dag_runner.pyと、ファイルの場所に設定します。あなたのデータがBigQueryの中に格納されている場合、修正BIG_QUERY_QUERY中でpipeline/configs.py正しくクエリデータのために。
  2. 機能の追加models/features.py
  3. 修正models/preprocessing.pyするための訓練のための入力データを変換します
  4. 修正models/keras/model.pymodels/keras/constants.pyするあなたのMLのモデルを記述
    • 推定量ベースのモデルを使用することもできます。変更RUN_FNに一定のmodels.estimator.model.run_fnpipeline/configs.py

参照してくださいトレーナーコンポーネントガイドより多くの導入のために。