Membangun Saluran Pipa TFX Secara Lokal

TFX mempermudah pengaturan alur kerja machine learning (ML) Anda sebagai pipeline, untuk:

  • Otomatiskan proses ML Anda, yang memungkinkan Anda melatih ulang, mengevaluasi, dan menerapkan model Anda secara rutin.
  • Buat pipeline ML yang mencakup analisis mendalam terhadap performa model dan validasi model yang baru dilatih untuk memastikan performa dan keandalan.
  • Pantau data pelatihan untuk menemukan anomali dan hilangkan distorsi penyajian pelatihan
  • Tingkatkan kecepatan eksperimen dengan menjalankan pipeline dengan kumpulan hyperparameter yang berbeda.

Proses pengembangan pipeline umumnya dimulai pada mesin lokal, dengan analisis data dan penyiapan komponen, sebelum diterapkan ke produksi. Panduan ini menjelaskan dua cara untuk membangun saluran pipa secara lokal.

  • Sesuaikan templat alur TFX agar sesuai dengan kebutuhan alur kerja ML Anda. Templat alur TFX adalah alur kerja bawaan yang menunjukkan praktik terbaik menggunakan komponen standar TFX.
  • Bangun saluran pipa menggunakan TFX. Dalam kasus penggunaan ini, Anda menentukan alur tanpa memulai dari templat.

Saat Anda mengembangkan saluran pipa, Anda dapat menjalankannya dengan LocalDagRunner . Kemudian, setelah komponen pipeline ditentukan dan diuji dengan baik, Anda akan menggunakan orkestrator tingkat produksi seperti Kubeflow atau Airflow.

Sebelum Anda mulai

TFX adalah paket Python, jadi Anda perlu menyiapkan lingkungan pengembangan Python, seperti lingkungan virtual atau container Docker. Kemudian:

pip install tfx

Jika Anda baru mengenal pipeline TFX, pelajari lebih lanjut konsep inti pipeline TFX sebelum melanjutkan.

Bangun alur menggunakan templat

Templat Alur TFX mempermudah memulai pengembangan alur dengan menyediakan serangkaian definisi alur bawaan yang dapat Anda sesuaikan untuk kasus penggunaan Anda.

Bagian berikut menjelaskan cara membuat salinan templat dan mengkustomisasinya untuk memenuhi kebutuhan Anda.

Buat salinan templat alur

  1. Lihat daftar templat saluran TFX yang tersedia:

    tfx template list
    
  2. Pilih templat dari daftar

    tfx template copy --model=template --pipeline_name=pipeline-name \
    --destination_path=destination-path
    

    Ganti yang berikut ini:

    • template : Nama template yang ingin Anda salin.
    • pipeline-name : Nama pipeline yang akan dibuat.
    • destination-path : Jalur untuk menyalin templat.

    Pelajari lebih lanjut tentang perintah tfx template copy .

  3. Salinan templat alur telah dibuat di jalur yang Anda tentukan.

Jelajahi templat alur

Bagian ini memberikan gambaran umum tentang scaffolding yang dibuat oleh template.

  1. Jelajahi direktori dan file yang disalin ke direktori akar saluran Anda

    • Direktori saluran pipa dengan
      • pipeline.py - mendefinisikan pipeline, dan mencantumkan komponen mana yang sedang digunakan
      • configs.py - menyimpan detail konfigurasi seperti dari mana data berasal atau orkestrator mana yang digunakan
    • Direktori data
      • Biasanya ini berisi file data.csv , yang merupakan sumber default untuk ExampleGen . Anda dapat mengubah sumber data di configs.py .
    • Direktori model dengan kode prapemrosesan dan implementasi model

    • Templat ini menyalin runner DAG untuk lingkungan lokal dan Kubeflow.

    • Beberapa templat juga menyertakan Notebook Python sehingga Anda dapat menjelajahi data dan artefak dengan MetaData Machine Learning.

  2. Jalankan perintah berikut di direktori alur Anda:

    tfx pipeline create --pipeline_path local_runner.py
    
    tfx run create --pipeline_name pipeline_name
    

    Perintah ini membuat alur berjalan menggunakan LocalDagRunner , yang menambahkan direktori berikut ke alur Anda:

    • Direktori tfx_metadata yang berisi penyimpanan Metadata ML yang digunakan secara lokal.
    • Direktori tfx_pipeline_output yang berisi output file pipeline.
  3. Buka file pipeline/configs.py saluran Anda dan tinjau isinya. Skrip ini mendefinisikan opsi konfigurasi yang digunakan oleh alur dan fungsi komponen. Di sinilah Anda akan menentukan hal-hal seperti lokasi sumber data atau jumlah langkah pelatihan yang dijalankan.

  4. Buka file pipeline pipeline/pipeline.py saluran Anda dan tinjau isinya. Skrip ini membuat alur TFX. Awalnya, alur hanya berisi komponen ExampleGen .

    • Ikuti petunjuk di komentar TODO di pipeline.py untuk menambahkan langkah-langkah lainnya ke pipeline.
  5. Buka file local_runner.py dan tinjau isinya. Skrip ini membuat proses alur dan menentukan parameter proses, seperti data_path dan preprocessing_fn .

  6. Anda telah meninjau scaffolding yang dibuat oleh templat dan membuat alur yang dijalankan menggunakan LocalDagRunner . Selanjutnya, sesuaikan template agar sesuai dengan kebutuhan Anda.

Sesuaikan saluran pipa Anda

Bagian ini memberikan ikhtisar tentang cara mulai menyesuaikan template Anda.

  1. Rancang saluran pipa Anda. Perancah yang disediakan templat membantu Anda mengimplementasikan alur untuk data tabular menggunakan komponen standar TFX. Jika Anda memindahkan alur kerja ML yang ada ke dalam pipeline, Anda mungkin perlu merevisi kode Anda untuk memanfaatkan sepenuhnya komponen standar TFX . Anda mungkin juga perlu membuat komponen khusus yang mengimplementasikan fitur unik untuk alur kerja Anda atau yang belum didukung oleh komponen standar TFX.

  2. Setelah Anda mendesain alur, sesuaikan alur secara berulang menggunakan proses berikut. Mulai dari komponen yang menyerap data ke dalam saluran Anda, yang biasanya merupakan komponen ExampleGen .

    1. Sesuaikan alur atau komponen agar sesuai dengan kasus penggunaan Anda. Penyesuaian ini mungkin mencakup perubahan seperti:

      • Mengubah parameter pipa.
      • Menambahkan komponen ke pipeline atau menghapusnya.
      • Mengganti sumber input data. Sumber data ini dapat berupa file atau kueri ke dalam layanan seperti BigQuery.
      • Mengubah konfigurasi komponen di pipeline.
      • Mengubah fungsi penyesuaian komponen.
    2. Jalankan komponen secara lokal menggunakan skrip local_runner.py , atau runner DAG lain yang sesuai jika Anda menggunakan orkestrator lain. Jika skrip gagal, debug kegagalan tersebut dan coba jalankan skrip lagi.

    3. Setelah penyesuaian ini berfungsi, lanjutkan ke penyesuaian berikutnya.

  3. Bekerja secara berulang, Anda dapat menyesuaikan setiap langkah dalam alur kerja templat untuk memenuhi kebutuhan Anda.

Bangun saluran khusus

Gunakan petunjuk berikut untuk mempelajari selengkapnya tentang membuat alur kustom tanpa menggunakan templat.

  1. Rancang saluran pipa Anda. Komponen standar TFX menyediakan fungsionalitas yang telah terbukti untuk membantu Anda menerapkan alur kerja ML yang lengkap. Jika Anda memindahkan alur kerja ML yang ada ke dalam pipeline, Anda mungkin perlu merevisi kode Anda untuk memanfaatkan sepenuhnya komponen standar TFX. Anda mungkin juga perlu membuat komponen khusus yang mengimplementasikan fitur seperti augmentasi data.

  2. Buat file skrip untuk menentukan alur Anda menggunakan contoh berikut. Panduan ini merujuk ke file ini sebagai my_pipeline.py .

    import os
    from typing import Optional, Text, List
    from absl import logging
    from ml_metadata.proto import metadata_store_pb2
    import tfx.v1 as tfx
    
    PIPELINE_NAME = 'my_pipeline'
    PIPELINE_ROOT = os.path.join('.', 'my_pipeline_output')
    METADATA_PATH = os.path.join('.', 'tfx_metadata', PIPELINE_NAME, 'metadata.db')
    ENABLE_CACHE = True
    
    def create_pipeline(
      pipeline_name: Text,
      pipeline_root:Text,
      enable_cache: bool,
      metadata_connection_config: Optional[
        metadata_store_pb2.ConnectionConfig] = None,
      beam_pipeline_args: Optional[List[Text]] = None
    ):
      components = []
    
      return tfx.dsl.Pipeline(
            pipeline_name=pipeline_name,
            pipeline_root=pipeline_root,
            components=components,
            enable_cache=enable_cache,
            metadata_connection_config=metadata_connection_config,
            beam_pipeline_args=beam_pipeline_args, <!-- needed? -->
        )
    
    def run_pipeline():
      my_pipeline = create_pipeline(
          pipeline_name=PIPELINE_NAME,
          pipeline_root=PIPELINE_ROOT,
          enable_cache=ENABLE_CACHE,
          metadata_connection_config=tfx.orchestration.metadata.sqlite_metadata_connection_config(METADATA_PATH)
          )
    
      tfx.orchestration.LocalDagRunner().run(my_pipeline)
    
    if __name__ == '__main__':
      logging.set_verbosity(logging.INFO)
      run_pipeline()

    Pada langkah selanjutnya, Anda menentukan alur Anda di create_pipeline dan menjalankan alur Anda secara lokal menggunakan runner lokal.

    Bangun alur Anda secara berulang menggunakan proses berikut.

    1. Sesuaikan alur atau komponen agar sesuai dengan kasus penggunaan Anda. Penyesuaian ini mungkin mencakup perubahan seperti:

      • Mengubah parameter pipa.
      • Menambahkan komponen ke pipeline atau menghapusnya.
      • Mengganti file masukan data.
      • Mengubah konfigurasi komponen di pipeline.
      • Mengubah fungsi penyesuaian komponen.
    2. Jalankan komponen secara lokal menggunakan runner lokal atau dengan menjalankan skrip secara langsung. Jika skrip gagal, debug kegagalan tersebut dan coba jalankan skrip lagi.

    3. Setelah penyesuaian ini berfungsi, lanjutkan ke penyesuaian berikutnya.

    Mulai dari simpul pertama dalam alur kerja alur Anda, biasanya simpul pertama menyerap data ke dalam alur Anda.

  3. Tambahkan simpul pertama dalam alur kerja Anda ke alur Anda. Dalam contoh ini, alur menggunakan komponen standar ExampleGen untuk memuat CSV dari direktori di ./data .

    from tfx.components import CsvExampleGen
    
    DATA_PATH = os.path.join('.', 'data')
    
    def create_pipeline(
      pipeline_name: Text,
      pipeline_root:Text,
      data_path: Text,
      enable_cache: bool,
      metadata_connection_config: Optional[
        metadata_store_pb2.ConnectionConfig] = None,
      beam_pipeline_args: Optional[List[Text]] = None
    ):
      components = []
    
      example_gen = tfx.components.CsvExampleGen(input_base=data_path)
      components.append(example_gen)
    
      return tfx.dsl.Pipeline(
            pipeline_name=pipeline_name,
            pipeline_root=pipeline_root,
            components=components,
            enable_cache=enable_cache,
            metadata_connection_config=metadata_connection_config,
            beam_pipeline_args=beam_pipeline_args, <!-- needed? -->
        )
    
    def run_pipeline():
      my_pipeline = create_pipeline(
        pipeline_name=PIPELINE_NAME,
        pipeline_root=PIPELINE_ROOT,
        data_path=DATA_PATH,
        enable_cache=ENABLE_CACHE,
        metadata_connection_config=tfx.orchestration.metadata.sqlite_metadata_connection_config(METADATA_PATH)
        )
    
      tfx.orchestration.LocalDagRunner().run(my_pipeline)

    CsvExampleGen membuat rekaman contoh berseri menggunakan data di CSV pada jalur data yang ditentukan. Dengan menyetel parameter input_base komponen CsvExampleGen dengan root data.

  4. Buat direktori data di direktori yang sama dengan my_pipeline.py . Tambahkan file CSV kecil ke direktori data .

  5. Gunakan perintah berikut untuk menjalankan skrip my_pipeline.py Anda.

    python my_pipeline.py
    

    Hasilnya akan seperti berikut:

    INFO:absl:Component CsvExampleGen depends on [].
    INFO:absl:Component CsvExampleGen is scheduled.
    INFO:absl:Component CsvExampleGen is running.
    INFO:absl:Running driver for CsvExampleGen
    INFO:absl:MetadataStore with DB connection initialized
    INFO:absl:Running executor for CsvExampleGen
    INFO:absl:Generating examples.
    INFO:absl:Using 1 process(es) for Local pipeline execution.
    INFO:absl:Processing input csv data ./data/* to TFExample.
    WARNING:root:Couldn't find python-snappy so the implementation of _TFRecordUtil._masked_crc32c is not as fast as it could be.
    INFO:absl:Examples generated.
    INFO:absl:Running publisher for CsvExampleGen
    INFO:absl:MetadataStore with DB connection initialized
    INFO:absl:Component CsvExampleGen is finished.
    
  6. Lanjutkan menambahkan komponen ke alur Anda secara berulang.