สร้างไปป์ไลน์ TFX โดยใช้เทมเพลตด้วย Beam orchestrator

บทนำ

เอกสารนี้จะให้คำแนะนำในการสร้าง TensorFlow ขยาย (TFX) ท่อโดยใช้แม่แบบที่มีให้กับแพคเกจ TFX หลาม ที่สุดของคำแนะนำคำสั่งเปลือก Linux และสอดคล้อง Jupyter เซลล์รหัสโน๊ตบุ๊คที่เรียกคำสั่งที่ใช้ ! ถูกจัดหา.

คุณจะสร้างท่อส่งโดยใช้ รถแท็กซี่การเดินทางชุดข้อมูลที่ ปล่อยออกมาจากเมืองชิคาโก เราขอแนะนำให้คุณพยายามสร้างไปป์ไลน์ของคุณเองโดยใช้ชุดข้อมูลของคุณโดยใช้ไปป์ไลน์นี้เป็นพื้นฐาน

เราจะสร้างท่อโดยใช้ Apache Beam Orchestrator หากคุณกำลังสนใจในการใช้ Kubeflow ประพันธ์บน Google Cloud โปรดดู TFX เกี่ยวกับ Cloud AI แพลตฟอร์มท่อกวดวิชา

ข้อกำหนดเบื้องต้น

  • Linux / MacOS
  • หลาม >= 3.5.3

คุณจะได้รับสิ่งที่จำเป็นทั้งหมดได้อย่างง่ายดายโดย การเรียกใช้โน๊ตบุ๊คนี้บน Google Colab

ขั้นตอนที่ 1. ตั้งค่าสภาพแวดล้อมของคุณ

ตลอดทั้งเอกสารนี้ เราจะนำเสนอคำสั่งสองครั้ง ครั้งหนึ่งเป็นคำสั่งเชลล์พร้อมคัดลอกและวาง ครั้งเดียวเป็นเซลล์โน้ตบุ๊ก jupyter หากคุณกำลังใช้ Colab เพียงข้ามบล็อกเชลล์สคริปต์และเรียกใช้เซลล์โน้ตบุ๊ก

คุณควรเตรียมสภาพแวดล้อมการพัฒนาเพื่อสร้างไปป์ไลน์

ติดตั้ง tfx แพคเกจหลาม เราแนะนำให้ใช้ virtualenv ในสภาพแวดล้อมในท้องถิ่น คุณสามารถใช้ข้อมูลโค้ดเชลล์ต่อไปนี้เพื่อตั้งค่าสภาพแวดล้อมของคุณ

# Create a virtualenv for tfx.
virtualenv -p python3 venv
source venv/bin/activate
# Install python packages.
python -m pip install -q --user --upgrade tfx==0.23.0

หากคุณกำลังใช้ colab:

import sys
!{sys.executable} -m pip install -q --user --upgrade -q tfx==0.23.0

ข้อผิดพลาด: บางแพ็คเกจ 0.some_version.1 มีความต้องการแพ็คเกจอื่น!=2.0.,<3,>=1.15 แต่คุณจะมีแพ็คเกจอื่น 2.0.0 ซึ่งเข้ากันไม่ได้

โปรดละเว้นข้อผิดพลาดเหล่านี้ในขณะนี้

# Set `PATH` to include user python binary directory.
HOME=%env HOME
PATH=%env PATH
%env PATH={PATH}:{HOME}/.local/bin
env: PATH=/tmpfs/src/tf_docs_env/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/games:/usr/local/games:/snap/bin:/opt/puppetlabs/bin:/opt/android-studio/current/bin:/usr/local/go/bin:/usr/local/go/packages/bin:/opt/kubernetes/client/bin/:/home/kbuilder/.local/bin:/home/kbuilder/.local/bin

มาดูเวอร์ชันของ TFX กัน

python -c "import tfx; print('TFX version: {}'.format(tfx.__version__))"
python3 -c "import tfx; print('TFX version: {}'.format(tfx.__version__))"
TFX version: 0.23.0

และเสร็จแล้ว เราพร้อมที่จะสร้างไปป์ไลน์

ขั้นตอนที่ 2 คัดลอกเทมเพลตที่กำหนดไว้ล่วงหน้าไปยังไดเร็กทอรีโครงการของคุณ

ในขั้นตอนนี้ เราจะสร้างไดเร็กทอรีและไฟล์โปรเจ็กต์ไปป์ไลน์ที่ใช้งานได้ โดยการคัดลอกไฟล์เพิ่มเติมจากเทมเพลตที่กำหนดไว้ล่วงหน้า

คุณอาจจะทำให้ท่อส่งชื่อที่แตกต่างกันโดยการเปลี่ยน PIPELINE_NAME ด้านล่าง นี่จะกลายเป็นชื่อของไดเร็กทอรีโครงการที่จะใส่ไฟล์ของคุณ

export PIPELINE_NAME="my_pipeline"
export PROJECT_DIR=~/tfx/${PIPELINE_NAME}
PIPELINE_NAME="my_pipeline"
import os
# Create a project directory under Colab content directory.
PROJECT_DIR=os.path.join(os.sep,"content",PIPELINE_NAME)

TFX รวมถึง taxi แม่แบบกับแพคเกจหลาม TFX หากคุณกำลังวางแผนที่จะแก้ปัญหาการคาดคะเนแบบ point-wise รวมทั้งการจำแนกและการถดถอย คุณสามารถใช้เทมเพลตนี้เป็นจุดเริ่มต้นได้

tfx template copy CLI สำเนาคำสั่งที่กำหนดไว้ล่วงหน้าแฟ้มแม่แบบลงในไดเรกทอรีโครงการของคุณ

tfx template copy \
   --pipeline_name="${PIPELINE_NAME}" \
   --destination_path="${PROJECT_DIR}" \
   --model=taxi
!tfx template copy \
  --pipeline_name={PIPELINE_NAME} \
  --destination_path={PROJECT_DIR} \
  --model=taxi
2020-09-07 09:09:40.131982: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
CLI
Copying taxi pipeline template
Traceback (most recent call last):
  File "/home/kbuilder/.local/bin/tfx", line 8, in <module>
    sys.exit(cli_group())
  File "/home/kbuilder/.local/lib/python3.6/site-packages/click/core.py", line 829, in __call__
    return self.main(*args, **kwargs)
  File "/home/kbuilder/.local/lib/python3.6/site-packages/click/core.py", line 782, in main
    rv = self.invoke(ctx)
  File "/home/kbuilder/.local/lib/python3.6/site-packages/click/core.py", line 1259, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/home/kbuilder/.local/lib/python3.6/site-packages/click/core.py", line 1259, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/home/kbuilder/.local/lib/python3.6/site-packages/click/core.py", line 1066, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/home/kbuilder/.local/lib/python3.6/site-packages/click/core.py", line 610, in invoke
    return callback(*args, **kwargs)
  File "/home/kbuilder/.local/lib/python3.6/site-packages/click/decorators.py", line 73, in new_func
    return ctx.invoke(f, obj, *args, **kwargs)
  File "/home/kbuilder/.local/lib/python3.6/site-packages/click/core.py", line 610, in invoke
    return callback(*args, **kwargs)
  File "/home/kbuilder/.local/lib/python3.6/site-packages/tfx/tools/cli/commands/template.py", line 73, in copy
    template_handler.copy_template(ctx.flags_dict)
  File "/home/kbuilder/.local/lib/python3.6/site-packages/tfx/tools/cli/handler/template_handler.py", line 168, in copy_template
    replace_dict)
  File "/home/kbuilder/.local/lib/python3.6/site-packages/tfx/tools/cli/handler/template_handler.py", line 107, in _copy_and_replace_placeholder_dir
    tf.io.gfile.makedirs(dst)
  File "/tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow/python/lib/io/file_io.py", line 480, in recursive_create_dir_v2
    _pywrap_file_io.RecursivelyCreateDir(compat.as_bytes(path))
tensorflow.python.framework.errors_impl.PermissionDeniedError: /content; Permission denied

เปลี่ยนบริบทไดเรกทอรีทำงานในสมุดบันทึกนี้เป็นไดเรกทอรีโครงการ

cd ${PROJECT_DIR}
%cd {PROJECT_DIR}
[Errno 2] No such file or directory: '/content/my_pipeline'
/tmpfs/src/temp/docs/tutorials/tfx

ขั้นตอนที่ 3 เรียกดูไฟล์ต้นฉบับที่คัดลอกของคุณ

เทมเพลต TFX มีไฟล์นั่งร้านพื้นฐานสำหรับสร้างไปป์ไลน์ ซึ่งรวมถึงซอร์สโค้ด Python ข้อมูลตัวอย่าง และ Jupyter Notebooks เพื่อวิเคราะห์เอาต์พุตของไปป์ไลน์ taxi แม่แบบใช้ชุดเดียวกันชิคาโกแท็กซี่และรุ่น ML เป็น Airflow กวดวิชา

ใน Google Colab คุณสามารถเรียกดูไฟล์ได้โดยคลิกที่ไอคอนโฟลเดอร์ทางด้านซ้าย ไฟล์ที่ควรจะคัดลอกภายใต้ directoy โครงการที่มีชื่อเป็น my_pipeline ในกรณีนี้ คุณสามารถคลิกชื่อไดเร็กทอรีเพื่อดูเนื้อหาของไดเร็กทอรี และดับเบิลคลิกชื่อไฟล์เพื่อเปิด

ต่อไปนี้คือข้อมูลเบื้องต้นเกี่ยวกับไฟล์ Python แต่ละไฟล์

  • pipeline - ไดเรกทอรีนี้มีความหมายของท่อ
    • configs.py - กำหนดค่าคงที่พบบ่อยสำหรับนักวิ่งท่อ
    • pipeline.py - กำหนดส่วนประกอบ TFX และท่อ
  • models - ไดเรกทอรีนี้ประกอบด้วย ML นิยามรูปแบบ
    • features.py , features_test.py - กำหนดให้บริการสำหรับรูปแบบ
    • preprocessing.py , preprocessing_test.py - กำหนด preprocessing งานโดยใช้ tf::Transform
    • estimator - ไดเรกทอรีนี้มีรูปแบบการประมาณการตาม
      • constants.py - กำหนดค่าคงที่ของรูปแบบ
      • model.py , model_test.py - กำหนดรูปแบบ DNN ใช้ประมาณการ TF
    • keras - ไดเรกทอรีนี้มีรูปแบบการ Keras ตาม
      • constants.py - กำหนดค่าคงที่ของรูปแบบ
      • model.py , model_test.py - กำหนดรูปแบบการใช้ DNN Keras
  • beam_dag_runner.py , kubeflow_dag_runner.py - กำหนดวิ่งสำหรับเครื่องยนต์ประสานแต่ละ

คุณอาจพบว่ามีไฟล์บางอย่างกับ _test.py ในชื่อของพวกเขา นี่คือการทดสอบหน่วยของไปป์ไลน์ และขอแนะนำให้เพิ่มการทดสอบหน่วยเพิ่มเติมเมื่อคุณใช้ไพพ์ไลน์ของคุณเอง คุณสามารถเรียกใช้การทดสอบหน่วยโดยการจัดหาชื่อโมดูลของไฟล์ที่ทดสอบด้วย -m ธง คุณมักจะได้รับชื่อโมดูลโดยการลบ .py ขยายและการเปลี่ยน / . . ตัวอย่างเช่น:

python -m models.features_test
{sys.executable} -m models.features_test
{sys.executable} -m models.keras.model_test
/tmpfs/src/tf_docs_env/bin/python: Error while finding module specification for 'models.features_test' (ModuleNotFoundError: No module named 'models')
/tmpfs/src/tf_docs_env/bin/python: Error while finding module specification for 'models.keras.model_test' (ModuleNotFoundError: No module named 'models')

ขั้นตอนที่ 4 เรียกใช้ไปป์ไลน์ TFX แรกของคุณ

คุณสามารถสร้างท่อโดยใช้ pipeline create คำสั่ง

tfx pipeline create --engine=beam --pipeline_path=beam_dag_runner.py
tfx pipeline create --engine=beam --pipeline_path=beam_dag_runner.py
2020-09-07 09:09:45.612839: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
CLI
Creating pipeline
Invalid pipeline path: beam_dag_runner.py

จากนั้นคุณสามารถเรียกใช้ท่อสร้างขึ้นโดยใช้ run create คำสั่ง

tfx run create --engine=beam --pipeline_name="${PIPELINE_NAME}"
tfx run create --engine=beam --pipeline_name={PIPELINE_NAME}
2020-09-07 09:09:50.725339: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
CLI
Creating a run for pipeline: my_pipeline
Pipeline "my_pipeline" does not exist.

หากประสบความสำเร็จคุณจะเห็น Component CsvExampleGen is finished. เมื่อคุณคัดลอกเทมเพลต CsvExampleGen จะรวมองค์ประกอบเดียวเท่านั้นในไปป์ไลน์

ขั้นตอนที่ 5. เพิ่มส่วนประกอบสำหรับการตรวจสอบข้อมูล

ในขั้นตอนนี้คุณจะเพิ่มชิ้นส่วนสำหรับการตรวจสอบข้อมูลรวมทั้ง StatisticsGen , SchemaGen และ ExampleValidator หากคุณมีความสนใจในการตรวจสอบข้อมูลโปรดดูที่ เริ่มต้นด้วยการตรวจสอบข้อมูล Tensorflow

เราจะปรับเปลี่ยนคำนิยามท่อคัดลอกใน pipeline/pipeline.py หากคุณกำลังทำงานกับสภาพแวดล้อมภายในของคุณ ให้ใช้โปรแกรมแก้ไขที่คุณโปรดปรานเพื่อแก้ไขไฟล์ หากคุณกำลังทำงานกับ Google Colab

คลิกที่โฟลเดอร์ไอคอนบนซ้ายเพื่อเปิด Files มุมมอง

คลิก my_pipeline เพื่อเปิดไดเรกทอรีและคลิก pipeline ไดเรกทอรีที่จะเปิดและดับเบิลคลิก pipeline.py เพื่อเปิดไฟล์

ค้นหาและ uncomment 3 สายที่เพิ่ม StatisticsGen , SchemaGen และ ExampleValidator การท่อ (เคล็ดลับ: แสดงความคิดเห็นพบว่ามี TODO(step 5): )

การเปลี่ยนแปลงของคุณจะถูกบันทึกโดยอัตโนมัติในไม่กี่วินาที ตรวจสอบให้แน่ใจว่า * เครื่องหมายในด้านหน้าของ pipeline.py หายไปในชื่อแท็บ ไม่มีปุ่มบันทึกหรือทางลัดสำหรับตัวแก้ไขไฟล์ใน Colab ไฟล์หลามในการแก้ไขไฟล์สามารถบันทึกไว้ในสภาพแวดล้อมรันไทม์แม้ใน playground โหมด

ตอนนี้คุณต้องอัปเดตไปป์ไลน์ที่มีอยู่ด้วยข้อกำหนดไปป์ไลน์ที่แก้ไข ใช้ tfx pipeline update คำสั่งในการปรับปรุงท่อของคุณตามด้วย tfx run create คำสั่งในการสร้างการดำเนินการการทำงานใหม่ของท่อที่อัปเดตของคุณ

# Update the pipeline
tfx pipeline update --engine=beam --pipeline_path=beam_dag_runner.py
# You can run the pipeline the same way.
tfx run create --engine beam --pipeline_name "${PIPELINE_NAME}"
# Update the pipeline
tfx pipeline update --engine=beam --pipeline_path=beam_dag_runner.py
# You can run the pipeline the same way.
tfx run create --engine beam --pipeline_name {PIPELINE_NAME}
2020-09-07 09:09:55.915484: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
CLI
Updating pipeline
Invalid pipeline path: beam_dag_runner.py
2020-09-07 09:10:01.148250: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
CLI
Creating a run for pipeline: my_pipeline
Pipeline "my_pipeline" does not exist.

คุณควรจะสามารถเห็นบันทึกการส่งออกจากส่วนประกอบที่เพิ่มเข้ามา ท่อของเราจะสร้างสิ่งประดิษฐ์ที่ส่งออกใน tfx_pipeline_output/my_pipeline ไดเรกทอรี

ขั้นตอนที่ 6 เพิ่มส่วนประกอบสำหรับการฝึกอบรม

ในขั้นตอนนี้คุณจะเพิ่มส่วนประกอบสำหรับการฝึกอบรมและการตรวจสอบรูปแบบรวมทั้ง Transform , Trainer , ResolverNode , Evaluator และ Pusher

เปิด pipeline/pipeline.py ค้นหาและ uncomment 5 สายที่เพิ่ม Transform , Trainer , ResolverNode , Evaluator และ Pusher การท่อ (เคล็ดลับ: ค้นหา TODO(step 6): )

อย่างที่คุณทำก่อนหน้านี้ ตอนนี้คุณต้องอัปเดตไปป์ไลน์ที่มีอยู่ด้วยข้อกำหนดไปป์ไลน์ที่แก้ไข คำแนะนำที่เป็นเช่นเดียวกับขั้นตอนที่ 5. การปรับปรุงท่อโดยใช้ tfx pipeline update และสร้างการดำเนินการการดำเนินการโดยใช้ tfx run create

tfx pipeline update --engine=beam --pipeline_path=beam_dag_runner.py
tfx run create --engine beam --pipeline_name "${PIPELINE_NAME}"
tfx pipeline update --engine=beam --pipeline_path=beam_dag_runner.py
tfx run create --engine beam --pipeline_name {PIPELINE_NAME}
2020-09-07 09:10:06.281753: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
CLI
Updating pipeline
Invalid pipeline path: beam_dag_runner.py
2020-09-07 09:10:11.333668: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
CLI
Creating a run for pipeline: my_pipeline
Pipeline "my_pipeline" does not exist.

เมื่อการดำเนินการนี้เสร็จสิ้น คุณได้สร้างและรันไปป์ไลน์ TFX แรกของคุณโดยใช้ Beam orchestrator!

ขั้นตอนที่ 7 (อุปกรณ์เสริม) ลอง BigQueryExampleGen

[BigQuery] เป็นคลังข้อมูลบนระบบคลาวด์แบบไร้เซิร์ฟเวอร์ ปรับขนาดได้สูง และคุ้มค่าใช้จ่าย สามารถใช้ BigQuery เป็นแหล่งสำหรับตัวอย่างการฝึกอบรมใน TFX ในขั้นตอนนี้เราจะเพิ่ม BigQueryExampleGen การท่อ

คุณจำเป็นต้องมี แพลตฟอร์ม Google Cloud บัญชีที่จะใช้ BigQuery โปรดเตรียมโครงการ GCP

เข้าสู่ระบบของคุณโดยใช้โครงการห้องสมุดรับรองความถูกต้องหรือ Colab gcloud ยูทิลิตี้

# You need `gcloud` tool to login in local shell environment.
gcloud auth login
if 'google.colab' in sys.modules:
  from google.colab import auth
  auth.authenticate_user()
  print('Authenticated')

คุณควรระบุชื่อโปรเจ็กต์ GCP เพื่อเข้าถึงทรัพยากร BigQuery โดยใช้ TFX ชุด GOOGLE_CLOUD_PROJECT ตัวแปรสภาพแวดล้อมกับชื่อโครงการของคุณ

export GOOGLE_CLOUD_PROJECT=YOUR_PROJECT_NAME_HERE
# Set your project name below.
# WARNING! ENTER your project name before running this cell.
%env GOOGLE_CLOUD_PROJECT=YOUR_PROJECT_NAME_HERE
env: GOOGLE_CLOUD_PROJECT=YOUR_PROJECT_NAME_HERE

เปิด pipeline/pipeline.py แสดงความคิดเห็นออก CsvExampleGen และ uncomment บรรทัดซึ่งสร้างตัวอย่างของ BigQueryExampleGen นอกจากนี้คุณยังต้อง uncomment query อาร์กิวเมนต์ของ create_pipeline ฟังก์ชั่น

เราจำเป็นต้องระบุว่าโครงการ GCP ที่จะใช้สำหรับ BigQuery อีกครั้งและนี้จะทำโดยการตั้งค่า --project ใน beam_pipeline_args เมื่อมีการสร้างท่อ

เปิด pipeline/configs.py uncomment นิยามของ BIG_QUERY__WITH_DIRECT_RUNNER_BEAM_PIPELINE_ARGS และ BIG_QUERY_QUERY คุณควรแทนที่รหัสโปรเจ็กต์และค่าภูมิภาคในไฟล์นี้ด้วยค่าที่ถูกต้องสำหรับโปรเจ็กต์ GCP

เปิด beam_dag_runner.py uncomment สองข้อโต้แย้ง query และ beam_pipeline_args สำหรับ create_pipeline () วิธีการ

ตอนนี้ไปป์ไลน์พร้อมที่จะใช้ BigQuery เป็นแหล่งตัวอย่างแล้ว อัปเดตไปป์ไลน์และสร้างการรันตามที่เราทำในขั้นตอนที่ 5 และ 6

tfx pipeline update --engine=beam --pipeline_path=beam_dag_runner.py
tfx run create --engine beam --pipeline_name {PIPELINE_NAME}
2020-09-07 09:10:16.406635: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
CLI
Updating pipeline
Invalid pipeline path: beam_dag_runner.py
2020-09-07 09:10:21.439101: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
CLI
Creating a run for pipeline: my_pipeline
Pipeline "my_pipeline" does not exist.

อะไรต่อไป: นำเข้าข้อมูลของคุณไปยังไปป์ไลน์

เราทำไปป์ไลน์สำหรับโมเดลโดยใช้ชุดข้อมูล Chicago Taxi ถึงเวลาใส่ข้อมูลของคุณลงในไปป์ไลน์แล้ว

ข้อมูลของคุณจัดเก็บได้ทุกที่ที่ไปป์ไลน์เข้าถึงได้ รวมถึง GCS หรือ BigQuery คุณจะต้องแก้ไขข้อกำหนดไปป์ไลน์เพื่อเข้าถึงข้อมูลของคุณ

  1. ถ้าข้อมูลของคุณจะถูกเก็บไว้ในไฟล์, การปรับเปลี่ยน DATA_PATH ใน kubeflow_dag_runner.py หรือ beam_dag_runner.py และการตั้งค่าไปยังตำแหน่งของไฟล์ของคุณ ถ้าข้อมูลของคุณถูกเก็บไว้ใน BigQuery แก้ไข BIG_QUERY_QUERY ใน pipeline/configs.py อย่างถูกต้องแบบสอบถามสำหรับข้อมูลของคุณ
  2. เพิ่มคุณสมบัติใน models/features.py
  3. ปรับเปลี่ยน models/preprocessing.py ที่จะ เปลี่ยนการป้อนข้อมูลสำหรับการฝึกอบรม
  4. ปรับเปลี่ยน models/keras/model.py และ models/keras/constants.py เพื่อ อธิบายรุ่น ML ของคุณ
    • คุณสามารถใช้แบบจำลองตามตัวประมาณการได้เช่นกัน เปลี่ยน RUN_FN อย่างต่อเนื่องเพื่อ models.estimator.model.run_fn ใน pipeline/configs.py

โปรดดู เทรนเนอร์คู่มือส่วนประกอบ สำหรับการแนะนำเพิ่มเติม