TFXパイプラインをローカルで構築する

TFXを使用すると、次の目的で、機械学習(ML)ワークフローをパイプラインとして簡単に調整できます。

  • MLプロセスを自動化します。これにより、モデルを定期的に再トレーニング、評価、およびデプロイできます。
  • モデルのパフォーマンスの詳細な分析と新しくトレーニングされたモデルの検証を含むMLパイプラインを作成して、パフォーマンスと信頼性を確保します。
  • トレーニングデータの異常を監視し、トレーニングに役立つスキューを排除します
  • さまざまなハイパーパラメータのセットを使用してパイプラインを実行することにより、実験の速度を上げます。

典型的なパイプライン開発プロセスは、データ分析とコンポーネントのセットアップを伴うローカルマシンで開始されてから、本番環境に展開されます。このガイドでは、パイプラインをローカルで構築する2つの方法について説明します。

  • MLワークフローのニーズに合わせてTFXパイプラインテンプレートをカスタマイズします。 TFXパイプラインテンプレートは、TFX標準コンポーネントを使用したベストプラクティスを示す事前に構築されたワークフローです。
  • TFXを使用してパイプラインを構築します。このユースケースでは、テンプレートから開始せずにパイプラインを定義します。

あなたのパイプラインを開発しているとして、あなたがそれを実行することができますLocalDagRunner 。次に、パイプラインコンポーネントが適切に定義およびテストされたら、KubeflowやAirflowなどの本番環境グレードのオーケストレーターを使用します。

あなたが始める前に

TFXはPythonパッケージであるため、仮想環境やDockerコンテナーなどのPython開発環境をセットアップする必要があります。それで:

pip install tfx

あなたはTFXパイプラインを初めて使用する場合は、より多くのTFXパイプラインのための中心的な概念について学び続ける前に。

テンプレートを使用してパイプラインを構築する

TFXパイプラインテンプレートは、ユースケースに合わせてカスタマイズできるパイプライン定義のビルド済みセットを提供することにより、パイプライン開発の開始を容易にします。

次のセクションでは、テンプレートのコピーを作成し、ニーズに合わせてカスタマイズする方法について説明します。

パイプラインテンプレートのコピーを作成します

  1. 利用可能なTFXパイプラインテンプレートのリストを参照してください。

    tfx template list
    
  2. リストからテンプレートを選択します

    tfx template copy --model=template --pipeline_name=pipeline-name \
    --destination_path=destination-path
    

    以下を置き換えます。

    • template :コピーしたいテンプレートの名前。
    • pipeline-name :作成するパイプラインの名前。
    • destination-path :にテンプレートをコピーするためのパス。

    詳細情報tfx template copyコマンドを

  3. 指定したパスにパイプラインテンプレートのコピーが作成されました。

パイプラインテンプレートを調べる

このセクションでは、テンプレートによって作成されたスキャフォールディングの概要を説明します。

  1. パイプラインのルートディレクトリにコピーされたディレクトリとファイルを調べます

    • パイプラインのディレクトリ
      • pipeline.py -パイプラインを定義し、コンポーネントが使用されているリスト
      • configs.py -そのようなデータ又はそこから来ているオーケストレータ使用されている場合などホールド構成の詳細
    • データディレクトリ
      • これは、一般的に含まれていdata.csvのデフォルトのソースであるファイル、 ExampleGen 。あなたは、データソースに変更することができconfigs.py
    • 前処理コードとモデルの実装とモデルディレクトリ

    • テンプレートは、ローカル環境とKubeflowのDAGランナーをコピーします。

    • 一部のテンプレートにはPythonノートブックも含まれているため、Machine LearningMetaDataを使用してデータとアーティファクトを探索できます。

  2. パイプラインディレクトリで次のコマンドを実行します。

    tfx pipeline create --pipeline_path local_runner.py
    
    tfx run create --pipeline_name pipeline_name
    

    コマンドは、使用してパイプラインの実行を作成しLocalDagRunnerあなたのパイプラインに次のディレクトリを追加し、:

    • ローカルで使用MLメタデータストアが含まれているtfx_metadataディレクトリ
    • パイプラインのファイル出力が含まれているtfx_pipeline_outputディレクトリ
  3. あなたのパイプラインの開きpipeline/configs.pyファイルを、その内容を確認します。このスクリプトは、パイプラインとコンポーネント関数で使用される構成オプションを定義します。ここで、データソースの場所や実行のトレーニングステップ数などを指定します。

  4. あなたのパイプラインの開きpipeline/pipeline.pyファイルを、その内容を確認します。このスクリプトは、TFXパイプラインを作成します。最初に、パイプラインは含まれていExampleGenコンポーネントを。

    • TODOコメントの指示に従ってくださいpipeline.pyパイプラインに複数のステップを追加します。
  5. オープンlocal_runner.pyファイルとレビュー内容。このスクリプトは、パイプラインの実行を作成し、実行などのパラメータ、指定data_pathpreprocessing_fn

  6. あなたは、テンプレートによって作成された足場を検討し、使用してパイプラインの実行作成したLocalDagRunner 。次に、要件に合わせてテンプレートをカスタマイズします。

パイプラインをカスタマイズする

このセクションでは、テンプレートのカスタマイズを開始する方法の概要を説明します。

  1. パイプラインを設計します。テンプレートが提供するスキャフォールディングは、TFX標準コンポーネントを使用して表形式データのパイプラインを実装するのに役立ちます。あなたがパイプラインに既存のMLワークフローを移動する場合、あなたはをフルに活用するためにあなたのコードを修正する必要があるかもしれませんTFX標準コンポーネントを。また、作成する必要があり、カスタムコンポーネントはまだTFX標準コンポーネントでサポートされていないワークフローまたはに固有の機能を実装しています。

  2. パイプラインを設計したら、次のプロセスを使用してパイプラインを繰り返しカスタマイズします。通常は、あなたのパイプライン、へインジェストデータというコンポーネントから開始ExampleGenコンポーネント。

    1. ユースケースに合わせてパイプラインまたはコンポーネントをカスタマイズします。これらのカスタマイズには、次のような変更が含まれる場合があります。

      • パイプラインパラメータの変更。
      • パイプラインへのコンポーネントの追加または削除。
      • データ入力ソースの交換。このデータソースは、ファイルまたはBigQueryなどのサービスへのクエリのいずれかです。
      • パイプラインでのコンポーネントの構成の変更。
      • コンポーネントのカスタマイズ機能の変更。
    2. ローカルで使用してコンポーネントを実行しますlocal_runner.py別のオーケストレーターを使用している場合、スクリプト、または他の適切なDAGランナーを。スクリプトが失敗した場合は、失敗をデバッグして、スクリプトの実行を再試行してください。

    3. このカスタマイズが機能したら、次のカスタマイズに進みます。

  3. 繰り返し作業することで、テンプレートワークフローの各ステップをニーズに合わせてカスタマイズできます。

カスタムパイプラインを構築する

テンプレートを使用せずにカスタムパイプラインを構築する方法の詳細については、次の手順を使用してください。

  1. パイプラインを設計します。 TFX標準コンポーネントは、完全なMLワークフローの実装に役立つ実証済みの機能を提供します。既存のMLワークフローをパイプラインに移動する場合は、TFX標準コンポーネントを最大限に活用するためにコードを修正する必要がある場合があります。また、作成する必要があるかもしれないカスタムコンポーネント、データの増強などの機能を実装しています。

  2. 次の例を使用して、パイプラインを定義するスクリプトファイルを作成します。このガイドでは、このファイルを参照my_pipeline.py

    import os
    from typing import Optional, Text, List
    from absl import logging
    from ml_metadata.proto import metadata_store_pb2
    import tfx.v1 as tfx
    
    PIPELINE_NAME = 'my_pipeline'
    PIPELINE_ROOT = os.path.join('.', 'my_pipeline_output')
    METADATA_PATH = os.path.join('.', 'tfx_metadata', PIPELINE_NAME, 'metadata.db')
    ENABLE_CACHE = True
    
    def create_pipeline(
      pipeline_name: Text,
      pipeline_root:Text,
      enable_cache: bool,
      metadata_connection_config: Optional[
        metadata_store_pb2.ConnectionConfig] = None,
      beam_pipeline_args: Optional[List[Text]] = None
    ):
      components = []
    
      return tfx.dsl.Pipeline(
            pipeline_name=pipeline_name,
            pipeline_root=pipeline_root,
            components=components,
            enable_cache=enable_cache,
            metadata_connection_config=metadata_connection_config,
            beam_pipeline_args=beam_pipeline_args, 
        )
    
    def run_pipeline():
      my_pipeline = create_pipeline(
          pipeline_name=PIPELINE_NAME,
          pipeline_root=PIPELINE_ROOT,
          enable_cache=ENABLE_CACHE,
          metadata_connection_config=tfx.orchestration.metadata.sqlite_metadata_connection_config(METADATA_PATH)
          )
    
      tfx.orchestration.LocalDagRunner().run(my_pipeline)
    
    if __name__ == '__main__':
      logging.set_verbosity(logging.INFO)
      run_pipeline()
    

    今後のステップでは、あなたはあなたの中にパイプラインを定義create_pipeline 、地元のランナーを使用してローカルであなたのパイプラインを実行します。

    次のプロセスを使用して、パイプラインを繰り返し構築します。

    1. ユースケースに合わせてパイプラインまたはコンポーネントをカスタマイズします。これらのカスタマイズには、次のような変更が含まれる場合があります。

      • パイプラインパラメータの変更。
      • パイプラインへのコンポーネントの追加または削除。
      • データ入力ファイルの置き換え。
      • パイプラインでのコンポーネントの構成の変更。
      • コンポーネントのカスタマイズ機能の変更。
    2. ローカルランナーを使用するか、スクリプトを直接実行して、コンポーネントをローカルで実行します。スクリプトが失敗した場合は、失敗をデバッグして、スクリプトの実行を再試行してください。

    3. このカスタマイズが機能したら、次のカスタマイズに進みます。

    パイプラインのワークフローの最初のノードから開始します。通常、最初のノードはデータをパイプラインに取り込みます。

  3. ワークフローの最初のノードをパイプラインに追加します。この例では、パイプラインが使用ExampleGenでディレクトリからCSVファイルをロードするための標準的なコンポーネントを./data

    from tfx.components import CsvExampleGen
    
    DATA_PATH = os.path.join('.', 'data')
    
    def create_pipeline(
      pipeline_name: Text,
      pipeline_root:Text,
      data_path: Text,
      enable_cache: bool,
      metadata_connection_config: Optional[
        metadata_store_pb2.ConnectionConfig] = None,
      beam_pipeline_args: Optional[List[Text]] = None
    ):
      components = []
    
      example_gen = tfx.components.CsvExampleGen(input_base=data_path)
      components.append(example_gen)
    
      return tfx.dsl.Pipeline(
            pipeline_name=pipeline_name,
            pipeline_root=pipeline_root,
            components=components,
            enable_cache=enable_cache,
            metadata_connection_config=metadata_connection_config,
            beam_pipeline_args=beam_pipeline_args, 
        )
    
    def run_pipeline():
      my_pipeline = create_pipeline(
        pipeline_name=PIPELINE_NAME,
        pipeline_root=PIPELINE_ROOT,
        data_path=DATA_PATH,
        enable_cache=ENABLE_CACHE,
        metadata_connection_config=tfx.orchestration.metadata.sqlite_metadata_connection_config(METADATA_PATH)
        )
    
      tfx.orchestration.LocalDagRunner().run(my_pipeline)
    

    CsvExampleGen指定されたデータパスにCSV内のデータを使用して直列化例レコードを作成します。設定することによりCsvExampleGenコンポーネントのinput_baseデータルートでパラメータを。

  4. 作成しdataと同じディレクトリにディレクトリをmy_pipeline.py 。小さなCSVファイルを追加dataディレクトリ。

  5. あなたの実行するには、次のコマンドを使用しmy_pipeline.pyスクリプトを。

    python my_pipeline.py
    

    結果は次のようになります。

    INFO:absl:Component CsvExampleGen depends on [].
    INFO:absl:Component CsvExampleGen is scheduled.
    INFO:absl:Component CsvExampleGen is running.
    INFO:absl:Running driver for CsvExampleGen
    INFO:absl:MetadataStore with DB connection initialized
    INFO:absl:Running executor for CsvExampleGen
    INFO:absl:Generating examples.
    INFO:absl:Using 1 process(es) for Local pipeline execution.
    INFO:absl:Processing input csv data ./data/* to TFExample.
    WARNING:root:Couldn't find python-snappy so the implementation of _TFRecordUtil._masked_crc32c is not as fast as it could be.
    INFO:absl:Examples generated.
    INFO:absl:Running publisher for CsvExampleGen
    INFO:absl:MetadataStore with DB connection initialized
    INFO:absl:Component CsvExampleGen is finished.
    
  6. パイプラインにコンポーネントを繰り返し追加し続けます。