قم بإنشاء خط أنابيب TFX باستخدام قوالب مع Beam orchestrator

مقدمة

وهذه الوثيقة توفير الإرشادات لإنشاء TensorFlow الموسعة (TFX) خط أنابيب باستخدام القوالب التي يتم توفيرها مع حزمة TFX بيثون. معظم الإرشادات هي أوامر لينكس قذيفة، وJupyter خلايا كود الدفتري التي تحتج تلك الأوامر باستخدام المقابلة ! تم تقديمة.

سوف بناء خط أنابيب باستخدام تاكسي رحلات بيانات الصادرة عن مدينة شيكاغو. نحن نشجعك بشدة على محاولة بناء خط الأنابيب الخاص بك باستخدام مجموعة البيانات الخاصة بك من خلال استخدام خط الأنابيب هذا كخط أساس.

سنبني خط أنابيب باستخدام أباتشي شعاع للإشراف . إذا كنت مهتما في استخدام Kubeflow تشرف على جوجل الغيمة، يرجى الاطلاع TFX على الغيمة AI منصة خطوط الأنابيب تعليمي .

المتطلبات الأساسية

  • لينكس / ماك
  • بايثون> = 3.5.3

يمكنك الحصول على كل الشروط بسهولة عن طريق تشغيل هذا الكمبيوتر الدفتري على جوجل Colab .

الخطوة 1. قم بإعداد بيئتك.

خلال هذا المستند ، سوف نقدم الأوامر مرتين. مرة واحدة كأمر shell جاهز للنسخ واللصق ، مرة واحدة كخلية دفتر jupyter. إذا كنت تستخدم Colab ، فما عليك سوى تخطي كتلة البرنامج النصي shell وتنفيذ خلايا دفتر الملاحظات.

يجب عليك إعداد بيئة تطوير لبناء خط أنابيب.

تثبيت tfx حزمة بايثون. نحن ننصح باستخدام virtualenv في البيئة المحلية. يمكنك استخدام مقتطف البرنامج النصي الشيل التالي لإعداد بيئتك.

# Create a virtualenv for tfx.
virtualenv -p python3 venv
source venv/bin/activate
# Install python packages.
python -m pip install -q --user --upgrade tfx==0.23.0

إذا كنت تستخدم كولاب:

import sys
!{sys.executable} -m pip install -q --user --upgrade -q tfx==0.23.0

خطأ: بعض الحزم 0.some_version.1 بها متطلبات حزمة أخرى! = 2.0. ، <3 ،> = 1.15 ، لكن سيكون لديك حزمة أخرى 2.0.0 غير متوافقة.

الرجاء تجاهل هذه الأخطاء في هذه اللحظة.

# Set `PATH` to include user python binary directory.
HOME=%env HOME
PATH=%env PATH
%env PATH={PATH}:{HOME}/.local/bin
env: PATH=/tmpfs/src/tf_docs_env/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/games:/usr/local/games:/snap/bin:/opt/puppetlabs/bin:/opt/android-studio/current/bin:/usr/local/go/bin:/usr/local/go/packages/bin:/opt/kubernetes/client/bin/:/home/kbuilder/.local/bin:/home/kbuilder/.local/bin

دعنا نتحقق من إصدار TFX.

python -c "import tfx; print('TFX version: {}'.format(tfx.__version__))"
python3 -c "import tfx; print('TFX version: {}'.format(tfx.__version__))"
TFX version: 0.23.0

وقد انتهى الأمر. نحن على استعداد لإنشاء خط أنابيب.

الخطوة 2. انسخ النموذج المحدد مسبقًا إلى دليل المشروع.

في هذه الخطوة ، سننشئ دليل وملفات مشروع خط أنابيب عمل عن طريق نسخ ملفات إضافية من قالب محدد مسبقًا.

يمكنك إعطاء خط أنابيب اسما مختلفا عن طريق تغيير PIPELINE_NAME أدناه. سيصبح هذا أيضًا اسم دليل المشروع حيث سيتم وضع ملفاتك.

export PIPELINE_NAME="my_pipeline"
export PROJECT_DIR=~/tfx/${PIPELINE_NAME}
PIPELINE_NAME="my_pipeline"
import os
# Create a project directory under Colab content directory.
PROJECT_DIR=os.path.join(os.sep,"content",PIPELINE_NAME)

يشمل TFX على taxi قالب مع حزمة TFX الثعبان. إذا كنت تخطط لحل مشكلة تنبؤ نقطي ، بما في ذلك التصنيف والانحدار ، فيمكن استخدام هذا النموذج كنقطة بداية.

في tfx template copy CLI الأمر بنسخ محددة مسبقا ملفات القالب إلى دليل المشروع الخاص بك.

tfx template copy \
   --pipeline_name="${PIPELINE_NAME}" \
   --destination_path="${PROJECT_DIR}" \
   --model=taxi
!tfx template copy \
  --pipeline_name={PIPELINE_NAME} \
  --destination_path={PROJECT_DIR} \
  --model=taxi
2020-09-07 09:09:40.131982: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
CLI
Copying taxi pipeline template
Traceback (most recent call last):
  File "/home/kbuilder/.local/bin/tfx", line 8, in <module>
    sys.exit(cli_group())
  File "/home/kbuilder/.local/lib/python3.6/site-packages/click/core.py", line 829, in __call__
    return self.main(*args, **kwargs)
  File "/home/kbuilder/.local/lib/python3.6/site-packages/click/core.py", line 782, in main
    rv = self.invoke(ctx)
  File "/home/kbuilder/.local/lib/python3.6/site-packages/click/core.py", line 1259, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/home/kbuilder/.local/lib/python3.6/site-packages/click/core.py", line 1259, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/home/kbuilder/.local/lib/python3.6/site-packages/click/core.py", line 1066, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/home/kbuilder/.local/lib/python3.6/site-packages/click/core.py", line 610, in invoke
    return callback(*args, **kwargs)
  File "/home/kbuilder/.local/lib/python3.6/site-packages/click/decorators.py", line 73, in new_func
    return ctx.invoke(f, obj, *args, **kwargs)
  File "/home/kbuilder/.local/lib/python3.6/site-packages/click/core.py", line 610, in invoke
    return callback(*args, **kwargs)
  File "/home/kbuilder/.local/lib/python3.6/site-packages/tfx/tools/cli/commands/template.py", line 73, in copy
    template_handler.copy_template(ctx.flags_dict)
  File "/home/kbuilder/.local/lib/python3.6/site-packages/tfx/tools/cli/handler/template_handler.py", line 168, in copy_template
    replace_dict)
  File "/home/kbuilder/.local/lib/python3.6/site-packages/tfx/tools/cli/handler/template_handler.py", line 107, in _copy_and_replace_placeholder_dir
    tf.io.gfile.makedirs(dst)
  File "/tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow/python/lib/io/file_io.py", line 480, in recursive_create_dir_v2
    _pywrap_file_io.RecursivelyCreateDir(compat.as_bytes(path))
tensorflow.python.framework.errors_impl.PermissionDeniedError: /content; Permission denied

قم بتغيير سياق دليل العمل في دفتر الملاحظات هذا إلى دليل المشروع.

cd ${PROJECT_DIR}
%cd {PROJECT_DIR}
[Errno 2] No such file or directory: '/content/my_pipeline'
/tmpfs/src/temp/docs/tutorials/tfx

الخطوة 3. تصفح ملفات المصدر المنسوخة.

يوفر قالب TFX ملفات سقالة أساسية لإنشاء خط أنابيب ، بما في ذلك كود مصدر Python وبيانات العينة ودفاتر Jupyter لتحليل إخراج خط الأنابيب. في taxi يستخدم قالب نفس مجموعة البيانات شيكاغو تاكسي ونموذج ML مثل تدفق الهواء التعليمي .

في Google Colab ، يمكنك تصفح الملفات بالنقر فوق رمز المجلد الموجود على اليسار. يجب نسخ الملفات ضمن directoy المشروع، اسمه my_pipeline في هذه الحالة. يمكنك النقر فوق أسماء الدلائل لمشاهدة محتوى الدليل والنقر نقرًا مزدوجًا فوق أسماء الملفات لفتحها.

فيما يلي مقدمة موجزة عن كل ملف من ملفات Python.

  • pipeline - يحتوي هذا الدليل على تعريف خط أنابيب
    • configs.py - يحدد الثوابت المشتركة للعدائين خط أنابيب
    • pipeline.py - يعرف مكونات TFX وخط أنابيب
  • models - يحتوي هذا الدليل على تعريفات نموذج ML.
    • features.py ، features_test.py - يعرف ملامح لنموذج
    • preprocessing.py ، preprocessing_test.py - يعرف تجهيزها وظائف باستخدام tf::Transform
    • estimator - يحتوي هذا الدليل على نموذج يستند مقدر.
      • constants.py - يعرف الثوابت من طراز
      • model.py ، model_test.py - يحدد نموذج DNN باستخدام TF مقدر
    • keras - يحتوي هذا الدليل نموذج يستند Keras.
      • constants.py - يعرف الثوابت من طراز
      • model.py ، model_test.py - يحدد نموذج DNN باستخدام Keras
  • beam_dag_runner.py ، kubeflow_dag_runner.py - تعريف المتسابقين لكل محرك تزامن

كنت قد لاحظت أن هناك بعض الملفات مع _test.py باسمهم. هذه اختبارات وحدة لخط الأنابيب ويوصى بإضافة المزيد من اختبارات الوحدة أثناء تنفيذ خطوط الأنابيب الخاصة بك. يمكنك تشغيل وحدة الاختبارات من خلال تقديم اسم وحدة من الملفات اختبار مع -m العلم. يمكنك عادة الحصول على اسم وحدة عن طريق حذف .py تمديد واستبدال / مع . . فمثلا:

python -m models.features_test
{sys.executable} -m models.features_test
{sys.executable} -m models.keras.model_test
/tmpfs/src/tf_docs_env/bin/python: Error while finding module specification for 'models.features_test' (ModuleNotFoundError: No module named 'models')
/tmpfs/src/tf_docs_env/bin/python: Error while finding module specification for 'models.keras.model_test' (ModuleNotFoundError: No module named 'models')

الخطوة 4. قم بتشغيل خط أنابيب TFX الأول الخاص بك

يمكنك إنشاء خط أنابيب باستخدام pipeline create الأوامر.

tfx pipeline create --engine=beam --pipeline_path=beam_dag_runner.py
tfx pipeline create --engine=beam --pipeline_path=beam_dag_runner.py
2020-09-07 09:09:45.612839: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
CLI
Creating pipeline
Invalid pipeline path: beam_dag_runner.py

بعد ذلك، يمكنك تشغيل خط أنابيب تم إنشاؤها باستخدام run create الأوامر.

tfx run create --engine=beam --pipeline_name="${PIPELINE_NAME}"
tfx run create --engine=beam --pipeline_name={PIPELINE_NAME}
2020-09-07 09:09:50.725339: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
CLI
Creating a run for pipeline: my_pipeline
Pipeline "my_pipeline" does not exist.

في حال نجاحها، سترى Component CsvExampleGen is finished. عند نسخ القالب ، يتم تضمين مكون واحد فقط ، CsvExampleGen ، في خط الأنابيب.

الخطوة 5. إضافة مكونات للتحقق من صحة البيانات.

في هذه الخطوة، سوف إضافة مكونات التحقق من صحة البيانات بما في ذلك StatisticsGen ، SchemaGen ، و ExampleValidator . إذا كنت مهتما في التحقق من صحة البيانات، الرجاء مراجعة تبدأ مع Tensorflow التحقق من صحة البيانات .

وسوف نقوم بتعديل تعريف خط أنابيب نسخها في pipeline/pipeline.py . إذا كنت تعمل على بيئتك المحلية ، فاستخدم المحرر المفضل لديك لتحرير الملف. إذا كنت تعمل على Google Colab ،

انقر أيقونة المجلد على اليسار لفتح Files الرأي.

انقر my_pipeline لفتح الدليل وانقر pipeline الدليل إلى فتح وانقر نقرا مزدوجا فوق pipeline.py لفتح الملف.

البحث و uncomment 3 خطوط التي تضيف StatisticsGen ، SchemaGen ، و ExampleValidator إلى خط أنابيب. (نصيحة: العثور على تعليقات تحتوي على TODO(step 5): ).

سيتم حفظ التغيير الخاص بك تلقائيًا في بضع ثوانٍ. تأكد من أن * علامة أمام pipeline.py اختفت في العنوان التبويب. لا يوجد زر حفظ أو اختصار لمحرر الملفات في Colab. يمكن حفظ الملفات بيثون في محرر الملف إلى بيئة التشغيل حتى في playground واسطة.

أنت الآن بحاجة إلى تحديث خط الأنابيب الحالي بتعريف خط الأنابيب المعدل. استخدام tfx pipeline update الأوامر لتحديث خط أنابيب الخاص بك، تليها tfx run create الأوامر لإنشاء المدى إعدام جديدة من خطوط الأنابيب التي تم تحديثها.

# Update the pipeline
tfx pipeline update --engine=beam --pipeline_path=beam_dag_runner.py
# You can run the pipeline the same way.
tfx run create --engine beam --pipeline_name "${PIPELINE_NAME}"
# Update the pipeline
tfx pipeline update --engine=beam --pipeline_path=beam_dag_runner.py
# You can run the pipeline the same way.
tfx run create --engine beam --pipeline_name {PIPELINE_NAME}
2020-09-07 09:09:55.915484: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
CLI
Updating pipeline
Invalid pipeline path: beam_dag_runner.py
2020-09-07 09:10:01.148250: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
CLI
Creating a run for pipeline: my_pipeline
Pipeline "my_pipeline" does not exist.

يجب أن تكون قادرًا على رؤية سجل الإخراج من المكونات المضافة. لدينا خط أنابيب يخلق التحف الانتاج في tfx_pipeline_output/my_pipeline الدليل.

الخطوة 6. أضف مكونات للتدريب.

في هذه الخطوة، سوف إضافة مكونات لتدريب والتحقق من صحة النموذج بما في ذلك Transform ، Trainer ، ResolverNode ، Evaluator ، و Pusher .

فتح pipeline/pipeline.py . البحث و uncomment 5 خطوط التي تضيف Transform ، Trainer ، ResolverNode ، Evaluator و Pusher إلى خط أنابيب. (نصيحة: العثور TODO(step 6): )

كما فعلت من قبل ، تحتاج الآن إلى تحديث خط الأنابيب الحالي بتعريف خط الأنابيب المعدل. التعليمات هي نفس الخطوة 5. تحديث خط الأنابيب باستخدام tfx pipeline update ، وخلق شوط التنفيذ باستخدام tfx run create .

tfx pipeline update --engine=beam --pipeline_path=beam_dag_runner.py
tfx run create --engine beam --pipeline_name "${PIPELINE_NAME}"
tfx pipeline update --engine=beam --pipeline_path=beam_dag_runner.py
tfx run create --engine beam --pipeline_name {PIPELINE_NAME}
2020-09-07 09:10:06.281753: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
CLI
Updating pipeline
Invalid pipeline path: beam_dag_runner.py
2020-09-07 09:10:11.333668: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
CLI
Creating a run for pipeline: my_pipeline
Pipeline "my_pipeline" does not exist.

عند انتهاء تشغيل هذا التنفيذ بنجاح ، تكون قد أنشأت الآن وتشغيل أول خط أنابيب TFX باستخدام Beam orchestrator!

الخطوة 7. (اختياري) حاول BigQueryExampleGen.

[BigQuery] عبارة عن مستودع بيانات سحابي بدون خادم وقابل للتطوير بدرجة عالية وفعال من حيث التكلفة. يمكن استخدام BigQuery كمصدر لتدريب الأمثلة في TFX. في هذه الخطوة، وسوف نضيف BigQueryExampleGen إلى خط أنابيب.

كنت في حاجة الى جوجل سحابة منصة حساب لاستخدام الاستعلام الشامل. يرجى إعداد مشروع GCP.

تسجيل الدخول إلى المشروع باستخدام مكتبة المصادقة colab أو gcloud فائدة.

# You need `gcloud` tool to login in local shell environment.
gcloud auth login
if 'google.colab' in sys.modules:
  from google.colab import auth
  auth.authenticate_user()
  print('Authenticated')

يجب تحديد اسم مشروع GCP للوصول إلى موارد BigQuery باستخدام TFX. مجموعة GOOGLE_CLOUD_PROJECT متغير بيئة إلى اسم المشروع الخاص بك.

export GOOGLE_CLOUD_PROJECT=YOUR_PROJECT_NAME_HERE
# Set your project name below.
# WARNING! ENTER your project name before running this cell.
%env GOOGLE_CLOUD_PROJECT=YOUR_PROJECT_NAME_HERE
env: GOOGLE_CLOUD_PROJECT=YOUR_PROJECT_NAME_HERE

فتح pipeline/pipeline.py . التعليق الخروج CsvExampleGen وحرر السطر الذي خلق مثيل BigQueryExampleGen . تحتاج أيضا إلى غير تعليق query حجة create_pipeline وظيفة.

نحن بحاجة إلى تحديد أي مشروع GCP لاستخدامها في الاستعلام الشامل مرة أخرى، ويتم ذلك عن طريق وضع --project في beam_pipeline_args عند إنشاء خط أنابيب.

فتح pipeline/configs.py . غير تعليق تعريف BIG_QUERY__WITH_DIRECT_RUNNER_BEAM_PIPELINE_ARGS و BIG_QUERY_QUERY . يجب استبدال معرّف المشروع وقيمة المنطقة في هذا الملف بالقيم الصحيحة لمشروع Google Cloud Platform الخاص بك.

مفتوحة beam_dag_runner.py . حجتين غير تعليق، query و beam_pipeline_args ، لطريقة create_pipeline ().

الآن أصبح خط الأنابيب جاهزًا لاستخدام BigQuery كمصدر نموذجي. قم بتحديث خط الأنابيب وإنشاء تشغيل كما فعلنا في الخطوتين 5 و 6.

tfx pipeline update --engine=beam --pipeline_path=beam_dag_runner.py
tfx run create --engine beam --pipeline_name {PIPELINE_NAME}
2020-09-07 09:10:16.406635: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
CLI
Updating pipeline
Invalid pipeline path: beam_dag_runner.py
2020-09-07 09:10:21.439101: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
CLI
Creating a run for pipeline: my_pipeline
Pipeline "my_pipeline" does not exist.

ما التالي: استيعاب البيانات الخاصة بك في خط الأنابيب.

لقد صنعنا خط أنابيب لنموذج باستخدام مجموعة بيانات Chicago Taxi. حان الوقت الآن لوضع بياناتك في خط الأنابيب.

يمكن تخزين بياناتك في أي مكان يمكن لخط الأنابيب الوصول إليه ، بما في ذلك GCS أو BigQuery. ستحتاج إلى تعديل تعريف خط الأنابيب للوصول إلى بياناتك.

  1. إذا تم تخزين البيانات في الملفات، تعديل DATA_PATH في kubeflow_dag_runner.py أو beam_dag_runner.py وتعيينه إلى موقع الملفات الخاصة بك. إذا تم تخزين البيانات في الاستعلام الشامل، تعديل BIG_QUERY_QUERY في pipeline/configs.py إلى صحيح الاستعلام عن البيانات الخاصة بك.
  2. إضافة ميزات في models/features.py .
  3. تعديل models/preprocessing.py ل تحويل البيانات المدخلة للتدريب .
  4. تعديل models/keras/model.py و models/keras/constants.py ل وصف نموذج ML الخاص بك .
    • يمكنك استخدام نموذج قائم على المقدر أيضًا. تغيير RUN_FN ثابت إلى models.estimator.model.run_fn في pipeline/configs.py .

يرجى الاطلاع على دليل المكون المدرب لمزيد من المقدمة.