Membangun Komponen yang Sepenuhnya Kustom

Panduan ini menjelaskan cara menggunakan TFX API untuk membangun komponen kustom sepenuhnya. Komponen yang sepenuhnya disesuaikan memungkinkan Anda membuat komponen dengan menentukan spesifikasi komponen, eksekutor, dan kelas antarmuka komponen. Pendekatan ini memungkinkan Anda menggunakan kembali dan memperluas komponen standar agar sesuai dengan kebutuhan Anda.

Jika Anda baru mengenal saluran pipa TFX, pelajari lebih lanjut tentang konsep inti saluran pipa TFX .

Pelaksana khusus atau komponen khusus

Jika hanya logika pemrosesan kustom yang diperlukan sementara properti input, output, dan eksekusi komponen sama dengan komponen yang sudah ada, maka eksekutor kustom sudah cukup. Komponen yang sepenuhnya disesuaikan diperlukan bila salah satu input, output, atau properti eksekusi berbeda dari komponen TFX yang ada.

Bagaimana cara membuat komponen khusus?

Mengembangkan komponen yang sepenuhnya khusus memerlukan:

  • Serangkaian spesifikasi artefak masukan dan keluaran yang ditentukan untuk komponen baru. Khususnya, jenis artefak masukan harus konsisten dengan jenis artefak keluaran dari komponen yang menghasilkan artefak dan jenis artefak keluaran harus konsisten dengan jenis artefak masukan dari komponen yang menggunakan artefak tersebut, jika ada.
  • Parameter eksekusi non-artefak yang diperlukan untuk komponen baru.

Spesifikasi Komponen

Kelas ComponentSpec mendefinisikan kontrak komponen dengan mendefinisikan artefak input dan output ke komponen serta parameter yang digunakan untuk eksekusi komponen. Ini memiliki tiga bagian:

  • INPUT : Kamus parameter yang diketik untuk artefak masukan yang diteruskan ke pelaksana komponen. Biasanya artefak masukan adalah keluaran dari komponen hulu dan dengan demikian memiliki tipe yang sama.
  • OUTPUT : Kamus parameter yang diketik untuk artefak keluaran yang dihasilkan komponen.
  • PARAMETER : Kamus item ExecutionParameter tambahan yang akan diteruskan ke pelaksana komponen. Ini adalah parameter non-artefak yang ingin kami definisikan secara fleksibel di pipeline DSL dan diteruskan ke eksekusi.

Berikut ini contoh ComponentSpec:

class HelloComponentSpec(types.ComponentSpec):
  """ComponentSpec for Custom TFX Hello World Component."""

  PARAMETERS = {
      # These are parameters that will be passed in the call to
      # create an instance of this component.
      'name': ExecutionParameter(type=Text),
  }
  INPUTS = {
      # This will be a dictionary with input artifacts, including URIs
      'input_data': ChannelParameter(type=standard_artifacts.Examples),
  }
  OUTPUTS = {
      # This will be a dictionary which this component will populate
      'output_data': ChannelParameter(type=standard_artifacts.Examples),
  }

Pelaksana

Selanjutnya, tulis kode eksekutor untuk komponen baru. Pada dasarnya, subkelas baru dari base_executor.BaseExecutor perlu dibuat dengan mengganti fungsi Do nya. Dalam fungsi Do , argumen input_dict , output_dict dan exec_properties yang diteruskan dalam peta ke INPUTS , OUTPUTS dan PARAMETERS yang masing-masing ditentukan dalam ComponentSpec. Untuk exec_properties , nilainya dapat diambil langsung melalui pencarian kamus. Untuk artefak di input_dict dan output_dict , ada fungsi praktis yang tersedia di kelas artefak_utils yang dapat digunakan untuk mengambil instance artefak atau uri artefak.

class Executor(base_executor.BaseExecutor):
  """Executor for HelloComponent."""

  def Do(self, input_dict: Dict[Text, List[types.Artifact]],
         output_dict: Dict[Text, List[types.Artifact]],
         exec_properties: Dict[Text, Any]) -> None:
    ...

    split_to_instance = {}
    for artifact in input_dict['input_data']:
      for split in json.loads(artifact.split_names):
        uri = artifact_utils.get_split_uri([artifact], split)
        split_to_instance[split] = uri

    for split, instance in split_to_instance.items():
      input_dir = instance
      output_dir = artifact_utils.get_split_uri(
          output_dict['output_data'], split)
      for filename in tf.io.gfile.listdir(input_dir):
        input_uri = os.path.join(input_dir, filename)
        output_uri = os.path.join(output_dir, filename)
        io_utils.copy_file(src=input_uri, dst=output_uri, overwrite=True)

Unit menguji pelaksana khusus

Tes unit untuk pelaksana khusus dapat dibuat mirip dengan ini .

Antarmuka komponen

Sekarang setelah bagian paling rumit selesai, langkah selanjutnya adalah merakit bagian-bagian ini menjadi antarmuka komponen, untuk memungkinkan komponen digunakan dalam pipeline. Ada beberapa langkah:

  • Jadikan antarmuka komponen sebagai subkelas dari base_component.BaseComponent
  • Tetapkan variabel kelas SPEC_CLASS dengan kelas ComponentSpec yang telah ditentukan sebelumnya
  • Tetapkan variabel kelas EXECUTOR_SPEC dengan kelas Executor yang telah ditentukan sebelumnya
  • Definisikan fungsi konstruktor __init__() dengan menggunakan argumen pada fungsi tersebut untuk membuat instance kelas ComponentSpec dan memanggil fungsi super dengan nilai tersebut, bersama dengan nama opsional

Ketika sebuah instance komponen dibuat, logika pemeriksaan tipe di kelas base_component.BaseComponent akan dipanggil untuk memastikan bahwa argumen yang diteruskan kompatibel dengan tipe info yang ditentukan di kelas ComponentSpec .

from tfx.types import standard_artifacts
from hello_component import executor

class HelloComponent(base_component.BaseComponent):
  """Custom TFX Hello World Component."""

  SPEC_CLASS = HelloComponentSpec
  EXECUTOR_SPEC = executor_spec.ExecutorClassSpec(executor.Executor)

  def __init__(self,
               input_data: types.Channel = None,
               output_data: types.Channel = None,
               name: Optional[Text] = None):
    if not output_data:
      examples_artifact = standard_artifacts.Examples()
      examples_artifact.split_names = input_data.get()[0].split_names
      output_data = channel_utils.as_channel([examples_artifact])

    spec = HelloComponentSpec(input_data=input_data,
                              output_data=output_data, name=name)
    super(HelloComponent, self).__init__(spec=spec)

Rakit menjadi pipa TFX

Langkah terakhir adalah menyambungkan komponen kustom baru ke dalam pipeline TFX. Selain menambahkan instance komponen baru, hal berikut juga diperlukan:

  • Hubungkan komponen upstream dan downstream dari komponen baru ke sana dengan benar. Hal ini dilakukan dengan mereferensikan keluaran komponen hulu pada komponen baru dan mereferensikan keluaran komponen baru pada komponen hilir.
  • Tambahkan instans komponen baru ke daftar komponen saat membuat alur.

Contoh di bawah menyoroti perubahan yang disebutkan di atas. Contoh lengkap dapat ditemukan di repo TFX GitHub .

def _create_pipeline():
  ...
  example_gen = CsvExampleGen(input_base=examples)
  hello = component.HelloComponent(
      input_data=example_gen.outputs['examples'], name='HelloWorld')
  statistics_gen = StatisticsGen(examples=hello.outputs['output_data'])
  ...
  return pipeline.Pipeline(
      ...
      components=[example_gen, hello, statistics_gen, ...],
      ...
  )

Terapkan komponen yang sepenuhnya khusus

Selain perubahan kode, semua bagian yang baru ditambahkan ( ComponentSpec , Executor , antarmuka komponen) harus dapat diakses di lingkungan yang menjalankan pipeline agar dapat menjalankan pipeline dengan benar.