로컬에서 TFX 파이프라인 구축

TFX를 사용하면 다음과 같은 목적으로 머신러닝(ML) 워크플로를 파이프라인으로 더 쉽게 조정할 수 있습니다.

  • 모델을 정기적으로 재교육, 평가, 배포할 수 있는 ML 프로세스를 자동화하세요.
  • 모델 성능에 대한 심층 분석과 새로 훈련된 모델의 검증을 포함하는 ML 파이프라인을 생성하여 성능과 안정성을 보장하세요.
  • 훈련 데이터에서 이상 징후를 모니터링하고 훈련 제공 편향을 제거합니다.
  • 다양한 하이퍼파라미터 세트로 파이프라인을 실행하여 실험 속도를 높입니다.

일반적인 파이프라인 개발 프로세스는 프로덕션에 배포되기 전에 데이터 분석 및 구성 요소 설정을 통해 로컬 시스템에서 시작됩니다. 이 가이드에서는 파이프라인을 로컬로 구축하는 두 가지 방법을 설명합니다.

  • ML 워크플로의 요구 사항에 맞게 TFX 파이프라인 템플릿을 맞춤설정하세요. TFX 파이프라인 템플릿은 TFX 표준 구성요소를 사용하여 권장사항을 보여주는 사전 구축된 워크플로입니다.
  • TFX를 사용하여 파이프라인을 빌드합니다. 이 사용 사례에서는 템플릿에서 시작하지 않고 파이프라인을 정의합니다.

파이프라인을 개발하는 동안 LocalDagRunner 사용하여 실행할 수 있습니다. 그런 다음 파이프라인 구성 요소가 잘 정의되고 테스트되면 Kubeflow 또는 Airflow와 같은 프로덕션 등급 오케스트레이터를 사용합니다.

시작하기 전에

TFX는 Python 패키지이므로 가상 환경이나 Docker 컨테이너와 같은 Python 개발 환경을 설정해야 합니다. 그 다음에:

pip install tfx

TFX 파이프라인을 처음 사용하는 경우 계속하기 전에 TFX 파이프라인의 핵심 개념을 자세히 알아보세요 .

템플릿을 사용하여 파이프라인 구축

TFX 파이프라인 템플릿을 사용하면 사용 사례에 맞게 맞춤설정할 수 있는 사전 빌드된 파이프라인 정의 세트를 제공하여 파이프라인 개발을 더 쉽게 시작할 수 있습니다.

다음 섹션에서는 템플릿 사본을 생성하고 필요에 맞게 사용자 정의하는 방법을 설명합니다.

파이프라인 템플릿의 복사본을 생성합니다.

  1. 사용 가능한 TFX 파이프라인 템플릿 목록을 참조하세요.

    tfx template list
    
  2. 목록에서 템플릿을 선택하세요.

    tfx template copy --model=template --pipeline_name=pipeline-name \
    --destination_path=destination-path
    

    다음을 교체하십시오.

    • template : 복사하려는 템플릿의 이름입니다.
    • pipeline-name : 생성할 파이프라인의 이름입니다.
    • destination-path : 템플릿을 복사할 경로입니다.

    tfx template copy 명령 에 대해 자세히 알아보세요.

  3. 파이프라인 템플릿의 복사본이 지정한 경로에 생성되었습니다.

파이프라인 템플릿 살펴보기

이 섹션에서는 템플릿으로 생성된 스캐폴딩에 대한 개요를 제공합니다.

  1. 파이프라인의 루트 디렉터리에 복사된 디렉터리 및 파일 탐색

    • 다음을 포함하는 파이프라인 디렉터리
      • pipeline.py - 파이프라인을 정의하고 사용 중인 구성 요소를 나열합니다.
      • configs.py - 데이터가 어디서 왔는지, 어떤 오케스트레이터가 사용되는지 등의 구성 세부 정보를 보관합니다.
    • 데이터 디렉토리
      • 여기에는 일반적으로 ExampleGen 의 기본 소스인 data.csv 파일이 포함됩니다. configs.py 에서 데이터 소스를 변경할 수 있습니다.
    • 전처리 코드 및 모델 구현이 포함된 모델 디렉터리

    • 템플릿은 로컬 환경 및 Kubeflow용 DAG 실행기를 복사합니다.

    • 일부 템플릿에는 Python Notebook도 포함되어 있어 Machine Learning MetaData를 통해 데이터와 아티팩트를 탐색할 수 있습니다.

  2. 파이프라인 디렉터리에서 다음 명령을 실행합니다.

    tfx pipeline create --pipeline_path local_runner.py
    
    tfx run create --pipeline_name pipeline_name
    

    이 명령은 파이프라인에 다음 디렉터리를 추가하는 LocalDagRunner 사용하여 파이프라인 실행을 만듭니다.

    • 로컬에서 사용되는 ML 메타데이터 저장소가 포함된 tfx_metadata 디렉터리입니다.
    • 파이프라인의 파일 출력을 포함하는 tfx_pipeline_output 디렉터리.
  3. 파이프라인의 pipeline/configs.py 파일을 열고 내용을 검토하세요. 이 스크립트는 파이프라인 및 구성 요소 기능에서 사용되는 구성 옵션을 정의합니다. 여기서는 데이터 소스의 위치나 실행 시 학습 단계 수 등을 지정합니다.

  4. 파이프라인의 pipeline/pipeline.py 파일을 열고 내용을 검토하세요. 이 스크립트는 TFX 파이프라인을 생성합니다. 처음에 파이프라인에는 ExampleGen 구성 요소만 포함되어 있습니다.

    • 파이프라인에 더 많은 단계를 추가하려면 pipeline.pyTODO 주석에 있는 지침을 따르세요.
  5. local_runner.py 파일을 열고 내용을 검토합니다. 이 스크립트는 파이프라인 실행을 생성하고 data_pathpreprocessing_fn 과 같은 실행 매개변수를 지정합니다.

  6. 템플릿으로 생성된 스캐폴딩을 검토하고 LocalDagRunner 사용하여 파이프라인 실행을 생성했습니다. 그런 다음 요구 사항에 맞게 템플릿을 사용자 정의합니다.

파이프라인 맞춤설정

이 섹션에서는 템플릿 사용자 정의를 시작하는 방법에 대한 개요를 제공합니다.

  1. 파이프라인을 설계하세요. 템플릿이 제공하는 스캐폴딩은 TFX 표준 구성요소를 사용하여 표 형식 데이터에 대한 파이프라인을 구현하는 데 도움이 됩니다. 기존 ML 워크플로를 파이프라인으로 이동하는 경우 TFX 표준 구성요소를 최대한 활용하려면 코드를 수정해야 할 수도 있습니다. 워크플로에 고유하거나 TFX 표준 구성요소에서 아직 지원되지 않는 기능을 구현하는 맞춤 구성요소를 만들어야 할 수도 있습니다.

  2. 파이프라인을 설계한 후에는 다음 프로세스를 사용하여 파이프라인을 반복적으로 사용자 지정합니다. 파이프라인에 데이터를 수집하는 구성 요소(일반적으로 ExampleGen 구성 요소)부터 시작합니다.

    1. 사용 사례에 맞게 파이프라인이나 구성요소를 맞춤설정하세요. 이러한 사용자 정의에는 다음과 같은 변경 사항이 포함될 수 있습니다.

      • 파이프라인 매개변수 변경.
      • 파이프라인에 구성요소를 추가하거나 제거합니다.
      • 데이터 입력 소스를 교체합니다. 이 데이터 소스는 파일일 수도 있고 BigQuery와 같은 서비스에 대한 쿼리일 수도 있습니다.
      • 파이프라인에서 구성요소의 구성을 변경합니다.
      • 구성 요소의 사용자 정의 기능을 변경합니다.
    2. local_runner.py 스크립트를 사용하거나 다른 조정자를 사용하는 경우 다른 적절한 DAG 실행기를 사용하여 구성요소를 로컬로 실행합니다. 스크립트가 실패하면 실패를 디버그하고 스크립트 실행을 다시 시도하세요.

    3. 이 사용자 정의가 작동하면 다음 사용자 정의로 넘어갑니다.

  3. 반복적으로 작업하면서 필요에 맞게 템플릿 워크플로의 각 단계를 사용자 정의할 수 있습니다.

커스텀 파이프라인 구축

템플릿을 사용하지 않고 커스텀 파이프라인을 구축하는 방법에 대해 자세히 알아보려면 다음 지침을 따르세요.

  1. 파이프라인을 설계하세요. TFX 표준 구성요소는 완전한 ML 워크플로를 구현하는 데 도움이 되는 검증된 기능을 제공합니다. 기존 ML 워크플로를 파이프라인으로 이동하는 경우 TFX 표준 구성요소를 최대한 활용하려면 코드를 수정해야 할 수도 있습니다. 데이터 증대와 같은 기능을 구현하는 사용자 정의 구성 요소를 생성해야 할 수도 있습니다.

  2. 다음 예제를 사용하여 파이프라인을 정의하는 스크립트 파일을 생성합니다. 이 가이드에서는 이 파일을 my_pipeline.py 라고 합니다.

    import os
    from typing import Optional, Text, List
    from absl import logging
    from ml_metadata.proto import metadata_store_pb2
    import tfx.v1 as tfx
    
    PIPELINE_NAME = 'my_pipeline'
    PIPELINE_ROOT = os.path.join('.', 'my_pipeline_output')
    METADATA_PATH = os.path.join('.', 'tfx_metadata', PIPELINE_NAME, 'metadata.db')
    ENABLE_CACHE = True
    
    def create_pipeline(
      pipeline_name: Text,
      pipeline_root:Text,
      enable_cache: bool,
      metadata_connection_config: Optional[
        metadata_store_pb2.ConnectionConfig] = None,
      beam_pipeline_args: Optional[List[Text]] = None
    ):
      components = []
    
      return tfx.dsl.Pipeline(
            pipeline_name=pipeline_name,
            pipeline_root=pipeline_root,
            components=components,
            enable_cache=enable_cache,
            metadata_connection_config=metadata_connection_config,
            beam_pipeline_args=beam_pipeline_args, 
        )
    
    def run_pipeline():
      my_pipeline = create_pipeline(
          pipeline_name=PIPELINE_NAME,
          pipeline_root=PIPELINE_ROOT,
          enable_cache=ENABLE_CACHE,
          metadata_connection_config=tfx.orchestration.metadata.sqlite_metadata_connection_config(METADATA_PATH)
          )
    
      tfx.orchestration.LocalDagRunner().run(my_pipeline)
    
    if __name__ == '__main__':
      logging.set_verbosity(logging.INFO)
      run_pipeline()
    

    다음 단계에서는 create_pipeline 에서 파이프라인을 정의하고 로컬 실행기를 사용하여 파이프라인을 로컬로 실행합니다.

    다음 프로세스를 사용하여 파이프라인을 반복적으로 구축합니다.

    1. 사용 사례에 맞게 파이프라인이나 구성요소를 맞춤설정하세요. 이러한 사용자 정의에는 다음과 같은 변경 사항이 포함될 수 있습니다.

      • 파이프라인 매개변수 변경.
      • 파이프라인에 구성요소를 추가하거나 제거합니다.
      • 데이터 입력 파일을 교체합니다.
      • 파이프라인에서 구성요소의 구성을 변경합니다.
      • 구성 요소의 사용자 정의 기능을 변경합니다.
    2. 로컬 실행기를 사용하거나 스크립트를 직접 실행하여 구성 요소를 로컬로 실행합니다. 스크립트가 실패하면 실패를 디버그하고 스크립트 실행을 다시 시도하세요.

    3. 이 사용자 정의가 작동하면 다음 사용자 정의로 넘어갑니다.

    파이프라인 워크플로의 첫 번째 노드에서 시작합니다. 일반적으로 첫 번째 노드는 데이터를 파이프라인으로 수집합니다.

  3. 워크플로의 첫 번째 노드를 파이프라인에 추가합니다. 이 예에서 파이프라인은 ExampleGen 표준 구성 요소를 사용하여 ./data 디렉터리에서 CSV를 로드합니다.

    from tfx.components import CsvExampleGen
    
    DATA_PATH = os.path.join('.', 'data')
    
    def create_pipeline(
      pipeline_name: Text,
      pipeline_root:Text,
      data_path: Text,
      enable_cache: bool,
      metadata_connection_config: Optional[
        metadata_store_pb2.ConnectionConfig] = None,
      beam_pipeline_args: Optional[List[Text]] = None
    ):
      components = []
    
      example_gen = tfx.components.CsvExampleGen(input_base=data_path)
      components.append(example_gen)
    
      return tfx.dsl.Pipeline(
            pipeline_name=pipeline_name,
            pipeline_root=pipeline_root,
            components=components,
            enable_cache=enable_cache,
            metadata_connection_config=metadata_connection_config,
            beam_pipeline_args=beam_pipeline_args, 
        )
    
    def run_pipeline():
      my_pipeline = create_pipeline(
        pipeline_name=PIPELINE_NAME,
        pipeline_root=PIPELINE_ROOT,
        data_path=DATA_PATH,
        enable_cache=ENABLE_CACHE,
        metadata_connection_config=tfx.orchestration.metadata.sqlite_metadata_connection_config(METADATA_PATH)
        )
    
      tfx.orchestration.LocalDagRunner().run(my_pipeline)
    

    CsvExampleGen 지정된 데이터 경로에 있는 CSV의 데이터를 사용하여 직렬화된 예제 레코드를 생성합니다. 데이터 루트로 CsvExampleGen 구성 요소의 input_base 매개 변수를 설정합니다.

  4. my_pipeline.py 와 동일한 디렉터리에 data 디렉터리를 만듭니다. data 디렉터리에 작은 CSV 파일을 추가합니다.

  5. 다음 명령을 사용하여 my_pipeline.py 스크립트를 실행합니다.

    python my_pipeline.py
    

    결과는 다음과 같아야 합니다.

    INFO:absl:Component CsvExampleGen depends on [].
    INFO:absl:Component CsvExampleGen is scheduled.
    INFO:absl:Component CsvExampleGen is running.
    INFO:absl:Running driver for CsvExampleGen
    INFO:absl:MetadataStore with DB connection initialized
    INFO:absl:Running executor for CsvExampleGen
    INFO:absl:Generating examples.
    INFO:absl:Using 1 process(es) for Local pipeline execution.
    INFO:absl:Processing input csv data ./data/* to TFExample.
    WARNING:root:Couldn't find python-snappy so the implementation of _TFRecordUtil._masked_crc32c is not as fast as it could be.
    INFO:absl:Examples generated.
    INFO:absl:Running publisher for CsvExampleGen
    INFO:absl:MetadataStore with DB connection initialized
    INFO:absl:Component CsvExampleGen is finished.
    
  6. 계속해서 파이프라인에 구성 요소를 반복적으로 추가합니다.