Beam Orchestrator로 템플릿을 사용하여 TFX 파이프라인 생성

소개

이 문서는 (TFX) 파이프 라인 TFX 파이썬 패키지와 함께 제공되는 템플릿을 사용하여 확장 TensorFlow을 만들 수있는 지침을 제공합니다. 지침의 대부분은 리눅스 쉘 명령이며, 사용하는 명령을 호출 Jupyter 노트북 코드 대응하는 셀 ! 제공된다.

다음을 사용하여 파이프 라인을 구축 할 것입니다 택시는 데이터 세트 여행 시카고시에서 발표합니다. 이 파이프라인을 기준으로 활용하여 데이터 세트를 사용하여 고유한 파이프라인을 구축할 것을 강력히 권장합니다.

우리는 사용하여 파이프 라인을 구축 할 예정 아파치 빔 오케 스트레이터를 . 당신이 Kubeflow 지휘자 Google 클라우드에를 사용에 관심이 있다면, 참조하시기 바랍니다 클라우드 AI 플랫폼 파이프 라인 튜토리얼에 TFX을 .

전제 조건

  • 리눅스 / 맥OS
  • 파이썬 >= 3.5.3

당신은 쉽게 모든 전제 조건을 얻을 수있는 구글 Colab에이 노트북을 실행 .

1단계. 환경을 설정합니다.

이 문서 전체에서 명령을 두 번 제시합니다. 한 번은 복사하여 붙여넣기 가능한 셸 명령으로, 한 번은 jupyter 노트북 셀로. Colab을 사용하는 경우 셸 스크립트 블록을 건너뛰고 노트북 셀을 실행하면 됩니다.

파이프라인을 구축하려면 개발 환경을 준비해야 합니다.

설치 tfx 파이썬 패키지를. 우리는 사용하는 것이 좋습니다 virtualenv 로컬 환경에서. 다음 쉘 스크립트 스니펫을 사용하여 환경을 설정할 수 있습니다.

# Create a virtualenv for tfx.
virtualenv -p python3 venv
source venv/bin/activate
# Install python packages.
python -m pip install -q --user --upgrade tfx==0.23.0

colab을 사용하는 경우:

import sys
!{sys.executable} -m pip install -q --user --upgrade -q tfx==0.23.0

오류: 일부 패키지 0.some_version.1에는 other-package!=2.0.,<3,>=1.15 요구 사항이 있지만 호환되지 않는 other-package 2.0.0이 있습니다.

지금은 이러한 오류를 무시하십시오.

# Set `PATH` to include user python binary directory.
HOME=%env HOME
PATH=%env PATH
%env PATH={PATH}:{HOME}/.local/bin
env: PATH=/tmpfs/src/tf_docs_env/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/games:/usr/local/games:/snap/bin:/opt/puppetlabs/bin:/opt/android-studio/current/bin:/usr/local/go/bin:/usr/local/go/packages/bin:/opt/kubernetes/client/bin/:/home/kbuilder/.local/bin:/home/kbuilder/.local/bin

TFX의 버전을 확인해보자.

python -c "import tfx; print('TFX version: {}'.format(tfx.__version__))"
python3 -c "import tfx; print('TFX version: {}'.format(tfx.__version__))"
TFX version: 0.23.0

그리고 완료되었습니다. 파이프라인을 만들 준비가 되었습니다.

2단계. 미리 정의된 템플릿을 프로젝트 디렉토리에 복사합니다.

이 단계에서는 미리 정의된 템플릿에서 추가 파일을 복사하여 작업 파이프라인 프로젝트 디렉터리와 파일을 만듭니다.

당신은 변경하여 파이프 라인에 다른 이름을 제공 할 수 있습니다 PIPELINE_NAME 아래를. 이것은 또한 파일이 저장될 프로젝트 디렉토리의 이름이 됩니다.

export PIPELINE_NAME="my_pipeline"
export PROJECT_DIR=~/tfx/${PIPELINE_NAME}
PIPELINE_NAME="my_pipeline"
import os
# Create a project directory under Colab content directory.
PROJECT_DIR=os.path.join(os.sep,"content",PIPELINE_NAME)

TFX는 포함 taxi TFX 파이썬 패키지 템플릿을. 분류 및 회귀를 포함하여 점별 예측 문제를 해결하려는 경우 이 템플릿을 시작점으로 사용할 수 있습니다.

tfx template copy CLI 명령 사본은 프로젝트 디렉토리에 템플릿 파일을 미리 정의.

tfx template copy \
   --pipeline_name="${PIPELINE_NAME}" \
   --destination_path="${PROJECT_DIR}" \
   --model=taxi
!tfx template copy \
  --pipeline_name={PIPELINE_NAME} \
  --destination_path={PROJECT_DIR} \
  --model=taxi
2020-09-07 09:09:40.131982: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
CLI
Copying taxi pipeline template
Traceback (most recent call last):
  File "/home/kbuilder/.local/bin/tfx", line 8, in <module>
    sys.exit(cli_group())
  File "/home/kbuilder/.local/lib/python3.6/site-packages/click/core.py", line 829, in __call__
    return self.main(*args, **kwargs)
  File "/home/kbuilder/.local/lib/python3.6/site-packages/click/core.py", line 782, in main
    rv = self.invoke(ctx)
  File "/home/kbuilder/.local/lib/python3.6/site-packages/click/core.py", line 1259, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/home/kbuilder/.local/lib/python3.6/site-packages/click/core.py", line 1259, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/home/kbuilder/.local/lib/python3.6/site-packages/click/core.py", line 1066, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/home/kbuilder/.local/lib/python3.6/site-packages/click/core.py", line 610, in invoke
    return callback(*args, **kwargs)
  File "/home/kbuilder/.local/lib/python3.6/site-packages/click/decorators.py", line 73, in new_func
    return ctx.invoke(f, obj, *args, **kwargs)
  File "/home/kbuilder/.local/lib/python3.6/site-packages/click/core.py", line 610, in invoke
    return callback(*args, **kwargs)
  File "/home/kbuilder/.local/lib/python3.6/site-packages/tfx/tools/cli/commands/template.py", line 73, in copy
    template_handler.copy_template(ctx.flags_dict)
  File "/home/kbuilder/.local/lib/python3.6/site-packages/tfx/tools/cli/handler/template_handler.py", line 168, in copy_template
    replace_dict)
  File "/home/kbuilder/.local/lib/python3.6/site-packages/tfx/tools/cli/handler/template_handler.py", line 107, in _copy_and_replace_placeholder_dir
    tf.io.gfile.makedirs(dst)
  File "/tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow/python/lib/io/file_io.py", line 480, in recursive_create_dir_v2
    _pywrap_file_io.RecursivelyCreateDir(compat.as_bytes(path))
tensorflow.python.framework.errors_impl.PermissionDeniedError: /content; Permission denied

이 노트북의 작업 디렉토리 컨텍스트를 프로젝트 디렉토리로 변경하십시오.

cd ${PROJECT_DIR}
%cd {PROJECT_DIR}
[Errno 2] No such file or directory: '/content/my_pipeline'
/tmpfs/src/temp/docs/tutorials/tfx

3단계. 복사한 소스 파일을 찾습니다.

TFX 템플릿은 Python 소스 코드, 샘플 데이터 및 파이프라인의 출력을 분석하기 위한 Jupyter 노트북을 포함하여 파이프라인을 빌드하기 위한 기본 스캐폴드 파일을 제공합니다. taxi 템플릿은 같은 시카고 택시 데이터 세트 및 ML 모델을 사용하여 공기 흐름 자습서 .

Google Colab에서 왼쪽의 폴더 아이콘을 클릭하여 파일을 탐색할 수 있습니다. 파일은 그 이름이 프로젝트 directoy, 아래에 복사해야 my_pipeline 이 경우이다. 디렉토리 이름을 클릭하여 디렉토리 내용을 보고 파일 이름을 두 번 클릭하여 열 수 있습니다.

다음은 각 Python 파일에 대한 간략한 소개입니다.

  • pipeline -이 디렉토리는 파이프 라인의 정의를 포함
    • configs.py - 파이프 라인 주자에 대한 공통의 상수를 정의
    • pipeline.py - 정의 TFX 구성 요소 및 파이프 라인
  • models -이 디렉토리는 ML 모델 정의가 포함되어 있습니다.
    • features.py , features_test.py - 정의 모델에 대한 기능
    • preprocessing.py , preprocessing_test.py - 사용하는 작업 전처리 정의하는 tf::Transform
    • estimator -이 디렉토리는 견적 기반 모델이 포함되어 있습니다.
      • constants.py - 모델을 정의 상수
      • model.py , model_test.py - TF 추정을 사용하여 DNN 모델을 정의
    • keras -이 디렉토리는 Keras 기반 모델이 포함되어 있습니다.
      • constants.py - 모델을 정의 상수
      • model.py , model_test.py - Keras를 사용하여 DNN 모델을 정의
  • beam_dag_runner.py , kubeflow_dag_runner.py - 각 오케스트레이션 엔진 주자를 정의

당신은 몇 가지 파일이 있다는 것을 알 수 있습니다 _test.py 이름이다. 이는 파이프라인의 단위 테스트이며 고유한 파이프라인을 구현할 때 더 많은 단위 테스트를 추가하는 것이 좋습니다. 당신은 함께 테스트 파일의 모듈 이름을 제공하여 단위 테스트를 실행할 수 있습니다 -m 플래그. 당신은 일반적으로 삭제하여 모듈 이름을 얻을 수 .py 확장 및 교체 /. . 예를 들어:

python -m models.features_test
{sys.executable} -m models.features_test
{sys.executable} -m models.keras.model_test
/tmpfs/src/tf_docs_env/bin/python: Error while finding module specification for 'models.features_test' (ModuleNotFoundError: No module named 'models')
/tmpfs/src/tf_docs_env/bin/python: Error while finding module specification for 'models.keras.model_test' (ModuleNotFoundError: No module named 'models')

4단계. 첫 번째 TFX 파이프라인 실행

다음을 사용하여 파이프 라인을 만들 수 있습니다 pipeline create 명령을 사용합니다.

tfx pipeline create --engine=beam --pipeline_path=beam_dag_runner.py
tfx pipeline create --engine=beam --pipeline_path=beam_dag_runner.py
2020-09-07 09:09:45.612839: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
CLI
Creating pipeline
Invalid pipeline path: beam_dag_runner.py

그런 다음 사용하여 생성 된 파이프 라인을 실행할 수 있습니다 run create 명령을 사용합니다.

tfx run create --engine=beam --pipeline_name="${PIPELINE_NAME}"
tfx run create --engine=beam --pipeline_name={PIPELINE_NAME}
2020-09-07 09:09:50.725339: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
CLI
Creating a run for pipeline: my_pipeline
Pipeline "my_pipeline" does not exist.

성공하면, 당신은 볼 수 있습니다 Component CsvExampleGen is finished. 템플릿을 복사하면 CsvExampleGen이라는 하나의 구성 요소만 파이프라인에 포함됩니다.

5단계. 데이터 유효성 검사를 위한 구성 요소를 추가합니다.

이 단계에서는 포함하여 데이터 유효성 검사를위한 구성 요소를 추가합니다 StatisticsGen , SchemaGenExampleValidator . 당신이 데이터 유효성 검사에 관심이 있다면, 참조하시기 바랍니다 Tensorflow 데이터 유효성 검사 시작하기 .

우리는에서 복사 파이프 라인의 정의를 수정합니다 pipeline/pipeline.py . 로컬 환경에서 작업하는 경우 선호하는 편집기를 사용하여 파일을 편집하십시오. Google Colab에서 작업하는 경우

오픈 왼쪽에 폴더 아이콘을 클릭하여 Files 보기.

클릭 my_pipeline 디렉토리를 열고 클릭 pipeline 열고 두 번 클릭에 디렉토리를 pipeline.py 파일을 열 수 있습니다.

찾기 및 추가 3 줄의 주석 StatisticsGen , SchemaGenExampleValidator 파이프 라인을. (팁 : 포함 코멘트를 찾을 TODO(step 5): ).

변경 사항은 몇 초 후에 자동으로 저장됩니다. 확실한 확인 * 의 앞에 표시 pipeline.py 탭 제목에 사라졌다. Colab에는 파일 편집기에 대한 저장 버튼이나 단축키가 없습니다. 파일 편집기에서 파이썬 파일도에서 런타임 환경에 저장할 수 있습니다 playground 모드.

이제 수정된 파이프라인 정의로 기존 파이프라인을 업데이트해야 합니다. 사용 tfx pipeline update 에 의해 다음 파이프 라인 업데이트 명령을 tfx run create 업데이트 된 파이프 라인의 새로운 실행 실행을 만들 명령을 사용합니다.

# Update the pipeline
tfx pipeline update --engine=beam --pipeline_path=beam_dag_runner.py
# You can run the pipeline the same way.
tfx run create --engine beam --pipeline_name "${PIPELINE_NAME}"
# Update the pipeline
tfx pipeline update --engine=beam --pipeline_path=beam_dag_runner.py
# You can run the pipeline the same way.
tfx run create --engine beam --pipeline_name {PIPELINE_NAME}
2020-09-07 09:09:55.915484: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
CLI
Updating pipeline
Invalid pipeline path: beam_dag_runner.py
2020-09-07 09:10:01.148250: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
CLI
Creating a run for pipeline: my_pipeline
Pipeline "my_pipeline" does not exist.

추가된 구성 요소의 출력 로그를 볼 수 있어야 합니다. 우리의 파이프 라인은 출력 아티팩트 생성 tfx_pipeline_output/my_pipeline 디렉토리.

6단계. 교육을 위한 구성 요소를 추가합니다.

이 단계에서는 훈련 및 모델 검증을 포함하여 위해 요소를 추가합니다 Transform , Trainer , ResolverNode , EvaluatorPusher .

열기 pipeline/pipeline.py . 찾기 및 추가 주석 5 개 라인 Transform , Trainer , ResolverNode , EvaluatorPusher 파이프 라인에. (팁 : 찾기 TODO(step 6): )

이전과 마찬가지로 이제 수정된 파이프라인 정의로 기존 파이프라인을 업데이트해야 합니다. 지침은 5 단계 업데이트를 사용하여 파이프 라인과 동일 tfx pipeline update 하고 사용하여 실행 실행 만들 tfx run create .

tfx pipeline update --engine=beam --pipeline_path=beam_dag_runner.py
tfx run create --engine beam --pipeline_name "${PIPELINE_NAME}"
tfx pipeline update --engine=beam --pipeline_path=beam_dag_runner.py
tfx run create --engine beam --pipeline_name {PIPELINE_NAME}
2020-09-07 09:10:06.281753: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
CLI
Updating pipeline
Invalid pipeline path: beam_dag_runner.py
2020-09-07 09:10:11.333668: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
CLI
Creating a run for pipeline: my_pipeline
Pipeline "my_pipeline" does not exist.

이 실행 실행이 성공적으로 완료되면 이제 Beam Orchestrator를 사용하여 첫 번째 TFX 파이프라인을 만들고 실행한 것입니다!

7 단계 (선택 사항) BigQueryExampleGen을보십시오.

[BigQuery]는 확장성이 뛰어나고 비용 효율적인 서버리스 클라우드 데이터 웨어하우스입니다. BigQuery는 TFX에서 학습 예제의 소스로 사용할 수 있습니다. 이 단계에서, 우리는 추가합니다 BigQueryExampleGen 파이프 라인에.

당신은 필요 Google 클라우드 플랫폼 의 BigQuery를 사용하는 계정. GCP 프로젝트를 준비해주세요.

colab 인증 라이브러리 또는 사용하여 프로젝트에 로그인 gcloud 유틸리티를.

# You need `gcloud` tool to login in local shell environment.
gcloud auth login
if 'google.colab' in sys.modules:
  from google.colab import auth
  auth.authenticate_user()
  print('Authenticated')

TFX를 사용하여 BigQuery 리소스에 액세스하려면 GCP 프로젝트 이름을 지정해야 합니다. 설정 GOOGLE_CLOUD_PROJECT 프로젝트 이름에 환경 변수.

export GOOGLE_CLOUD_PROJECT=YOUR_PROJECT_NAME_HERE
# Set your project name below.
# WARNING! ENTER your project name before running this cell.
%env GOOGLE_CLOUD_PROJECT=YOUR_PROJECT_NAME_HERE
env: GOOGLE_CLOUD_PROJECT=YOUR_PROJECT_NAME_HERE

열기 pipeline/pipeline.py . 주석 CsvExampleGen 의 인스턴스를 생성하고 주석 라인 BigQueryExampleGen . 또한 주석에 필요한 query 의 인수 create_pipeline 기능.

우리는 다시 BigQuery에 사용되는 GCP 프로젝트를 지정해야하고, 이것은 설정하면됩니다 --projectbeam_pipeline_args 파이프 라인을 만들 때.

열기 pipeline/configs.py . 의 주석 정의 BIG_QUERY__WITH_DIRECT_RUNNER_BEAM_PIPELINE_ARGSBIG_QUERY_QUERY . 이 파일의 프로젝트 ID와 지역 값을 GCP 프로젝트의 올바른 값으로 바꿔야 합니다.

열기 beam_dag_runner.py . 의 주석을 두 개의 인수, querybeam_pipeline_args create_pipeline () 메소드.

이제 파이프라인에서 BigQuery를 예시 소스로 사용할 준비가 되었습니다. 파이프라인을 업데이트하고 5단계와 6단계에서 했던 것처럼 실행을 생성합니다.

tfx pipeline update --engine=beam --pipeline_path=beam_dag_runner.py
tfx run create --engine beam --pipeline_name {PIPELINE_NAME}
2020-09-07 09:10:16.406635: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
CLI
Updating pipeline
Invalid pipeline path: beam_dag_runner.py
2020-09-07 09:10:21.439101: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
CLI
Creating a run for pipeline: my_pipeline
Pipeline "my_pipeline" does not exist.

다음 단계: 데이터를 파이프라인으로 수집합니다.

Chicago Taxi 데이터 세트를 사용하여 모델에 대한 파이프라인을 만들었습니다. 이제 데이터를 파이프라인에 넣을 차례입니다.

데이터는 GCS 또는 BigQuery를 포함하여 파이프라인이 액세스할 수 있는 모든 곳에 저장할 수 있습니다. 데이터에 액세스하려면 파이프라인 정의를 수정해야 합니다.

  1. 데이터가 파일에 저장되는 경우, 수정 DATA_PATHkubeflow_dag_runner.py 또는 beam_dag_runner.py 및 파일의 위치로 설정합니다. 데이터가 BigQuery에서 저장되어있는 경우, 수정 BIG_QUERY_QUERYpipeline/configs.py 제대로 쿼리 데이터에 대한에.
  2. 에 기능 추가 models/features.py .
  3. 수정 models/preprocessing.py 위해 훈련을위한 입력 데이터를 변환 .
  4. 수정 models/keras/model.pymodels/keras/constants.py 제품에 대해 설명 당신의 ML 모델 .
    • 추정기 기반 모델을 사용할 수도 있습니다. 변경 RUN_FN 상수하기 models.estimator.model.run_fnpipeline/configs.py .

참조하십시오 트레이너 요소 가이드를 더 도입.