قم بإنشاء خط أنابيب TFX لبياناتك باستخدام نموذج Penguin


سيوفر هذا المستند إرشادات لإنشاء خط أنابيب TensorFlow Extended (TFX) لمجموعة البيانات الخاصة بك باستخدام نموذج penguin الذي يتم توفيره مع حزمة TFX Python. سيستخدم خط الأنابيب الذي تم إنشاؤه مجموعة بيانات Palmer Penguins في البداية ، لكننا سنقوم بتحويل خط الأنابيب لمجموعة البيانات الخاصة بك.

المتطلبات الأساسية

  • لينكس / ماك
  • بايثون 3.6-3.8
  • دفتر Jupyter

الخطوة 1. انسخ النموذج المحدد مسبقًا إلى دليل المشروع.

في هذه الخطوة ، سننشئ دليل وملفات مشروع خط أنابيب عمل عن طريق نسخ الملفات من قالب البطريق في TFX. يمكنك التفكير في هذا على أنه سقالة لمشروع خط أنابيب TFX الخاص بك.

تحديث النقطة

إذا كنا نعمل في Colab ، فعلينا التأكد من أن لدينا أحدث إصدار من Pip. يمكن بالطبع تحديث الأنظمة المحلية بشكل منفصل.

import sys
if 'google.colab' in sys.modules:
  !pip install --upgrade pip

قم بتثبيت الحزمة المطلوبة

أولاً ، قم بتثبيت تحليل نموذج TFX و TensorFlow (TFMA).

pip install -U tfx tensorflow-model-analysis

دعنا نتحقق من إصدارات TFX.

import tensorflow as tf
import tensorflow_model_analysis as tfma
import tfx

print('TF version: {}'.format(tf.__version__))
print('TFMA version: {}'.format(tfma.__version__))
print('TFX version: {}'.format(tfx.__version__))
TF version: 2.7.1
TFMA version: 0.37.0
TFX version: 1.6.0

نحن على استعداد لإنشاء خط أنابيب.

اضبط PROJECT_DIR على الوجهة المناسبة لبيئتك. القيمة الافتراضية هي ~/imported/${PIPELINE_NAME} وهي مناسبة لبيئة Google Cloud AI Platform Notebook .

يمكنك تسمية خط الأنابيب الخاص بك باسم مختلف عن طريق تغيير PIPELINE_NAME أدناه. سيصبح هذا أيضًا اسم دليل المشروع حيث سيتم وضع ملفاتك.

import os
# Set this project directory to your new tfx pipeline project.
PROJECT_DIR=os.path.join(os.path.expanduser("~"), "imported", PIPELINE_NAME)

نسخ ملفات القوالب.

يتضمن TFX قالب penguin مع حزمة TFX python. يحتوي نموذج penguin على العديد من الإرشادات لإدخال مجموعة البيانات الخاصة بك في خط الأنابيب وهو الغرض من هذا البرنامج التعليمي.

ينسخ الأمر tfx template copy CLI ملفات القوالب المحددة مسبقًا إلى دليل المشروع الخاص بك.

# Set `PATH` to include user python binary directory and a directory containing `skaffold`.
%env PATH={PATH}:/home/jupyter/.local/bin

!tfx template copy \
  --pipeline-name={PIPELINE_NAME} \
  --destination-path={PROJECT_DIR} \
env: PATH=/tmpfs/src/tf_docs_env/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/games:/usr/local/games:/snap/bin:/opt/puppetlabs/bin:/opt/android-studio/current/bin:/usr/local/go/bin:/usr/local/go/packages/bin:/opt/kubernetes/client/bin/:/home/kbuilder/.local/bin:/home/jupyter/.local/bin
Copying penguin pipeline template
kubeflow_runner.py -> /home/kbuilder/imported/my_pipeline/kubeflow_runner.py
configs.py -> /home/kbuilder/imported/my_pipeline/pipeline/configs.py
pipeline.py -> /home/kbuilder/imported/my_pipeline/pipeline/pipeline.py
__init__.py -> /home/kbuilder/imported/my_pipeline/pipeline/__init__.py
model.py -> /home/kbuilder/imported/my_pipeline/models/model.py
features.py -> /home/kbuilder/imported/my_pipeline/models/features.py
features_test.py -> /home/kbuilder/imported/my_pipeline/models/features_test.py
preprocessing_test.py -> /home/kbuilder/imported/my_pipeline/models/preprocessing_test.py
preprocessing.py -> /home/kbuilder/imported/my_pipeline/models/preprocessing.py
model_test.py -> /home/kbuilder/imported/my_pipeline/models/model_test.py
__init__.py -> /home/kbuilder/imported/my_pipeline/models/__init__.py
constants.py -> /home/kbuilder/imported/my_pipeline/models/constants.py
local_runner.py -> /home/kbuilder/imported/my_pipeline/local_runner.py
__init__.py -> /home/kbuilder/imported/my_pipeline/__init__.py

قم بتغيير سياق دليل العمل في دفتر الملاحظات هذا إلى دليل المشروع.


تصفح ملفات المصدر المنسوخة

يوفر قالب TFX ملفات سقالة أساسية لبناء خط أنابيب ، بما في ذلك كود مصدر Python وبيانات العينة. يستخدم نموذج penguin نفس مجموعة بيانات Palmer Penguins ونموذج ML مثل مثال Penguin .

فيما يلي مقدمة موجزة عن كل ملف من ملفات Python.

  • pipeline - يحتوي هذا الدليل على تعريف خط الأنابيب
    • configs.py - يعرّف الثوابت المشتركة لعدائي خطوط الأنابيب
    • pipeline.py - يحدد مكونات TFX وخط أنابيب
  • models - يحتوي هذا الدليل على تعريفات نموذج ML
    • features.py ، features_test.py - يحدد ميزات النموذج
    • preprocessing.py ، preprocessing_test.py - يحدد إجراءات المعالجة المسبقة للبيانات
    • constants.py py - تحدد ثوابت النموذج
    • model.py ، model_test.py - يحدد نموذج ML باستخدام أطر ML مثل TensorFlow
  • local_runner.py - تحديد عداء للبيئة المحلية التي تستخدم محرك تزامن محلي
  • kubeflow_runner.py - تحديد عداء لمحرك تزامن خطوط الأنابيب Kubeflow

بشكل افتراضي ، يشتمل القالب فقط على مكونات TFX القياسية. إذا كنت بحاجة إلى بعض الإجراءات المخصصة ، فيمكنك إنشاء مكونات مخصصة لخط الأنابيب الخاص بك. يرجى مراجعة دليل المكون المخصص لـ TFX للحصول على التفاصيل.

ملفات اختبار الوحدة.

قد تلاحظ أن هناك بعض الملفات التي تحتوي على _test.py في أسمائها. هذه اختبارات وحدة لخط الأنابيب ويوصى بإضافة المزيد من اختبارات الوحدة أثناء تنفيذ خطوط الأنابيب الخاصة بك. يمكنك إجراء اختبارات الوحدة من خلال تزويد اسم الوحدة النمطية لملفات الاختبار بعلامة -m . يمكنك عادةً الحصول على اسم وحدة عن طريق حذف امتداد .py واستبدال / بـ . . فمثلا:

import sys
!{sys.executable} -m models.features_test
Running tests under Python 3.7.5: /tmpfs/src/tf_docs_env/bin/python
[ RUN      ] FeaturesTest.testLabelKey
INFO:tensorflow:time(__main__.FeaturesTest.testLabelKey): 0.0s
I0203 11:08:46.306882 140258321348416 test_util.py:2309] time(__main__.FeaturesTest.testLabelKey): 0.0s
[       OK ] FeaturesTest.testLabelKey
[ RUN      ] FeaturesTest.test_session
[  SKIPPED ] FeaturesTest.test_session
Ran 2 tests in 0.001s

OK (skipped=1)

قم بإنشاء خط أنابيب TFX في البيئة المحلية.

تدعم TFX العديد من محركات التزامن لتشغيل خطوط الأنابيب. سوف نستخدم محرك تزامن محلي. يعمل محرك التزامن المحلي بدون أي تبعيات أخرى ، وهو مناسب للتطوير وتصحيح الأخطاء لأنه يعمل على البيئة المحلية بدلاً من الاعتماد على مجموعات الحوسبة عن بُعد.

سنستخدم local_runner.py لتشغيل خط الأنابيب الخاص بك باستخدام منسق محلي. يجب عليك إنشاء خط أنابيب قبل تشغيله. يمكنك إنشاء خط أنابيب باستخدام أمر pipeline create .

tfx pipeline create --engine=local --pipeline_path=local_runner.py
Creating pipeline
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
Pipeline "my_pipeline" created successfully.

يسجل الأمر pipeline create command خط الأنابيب المحدد في local_runner.py دون تشغيله بالفعل.

ستقوم بتشغيل خط الأنابيب الذي تم إنشاؤه باستخدام الأمر run create في الخطوات التالية.

الخطوة 2. استيعاب البيانات الخاصة بك في خط الأنابيب.

يستوعب خط الأنابيب الأولي مجموعة بيانات البطريق المضمنة في النموذج. تحتاج إلى وضع بياناتك في خط الأنابيب ، وتبدأ معظم خطوط أنابيب TFX بمكون ExampleGen.

اختر ExampleGen

يمكن تخزين بياناتك في أي مكان يمكن لخط الأنابيب الوصول إليه ، إما على نظام ملفات محلي أو موزع ، أو نظام قادر على الاستعلام. يوفر TFX العديد من مكونات ExampleGen لجلب بياناتك إلى خط أنابيب TFX. يمكنك اختيار واحد من الأمثلة التالية لتوليد المكونات.

يمكنك أيضًا إنشاء ExampleGen الخاصة بك ، على سبيل المثال ، يتضمن tfx ExecampleGen مخصصًا يستخدم Presto كمصدر بيانات. انظر الدليل لمزيد من المعلومات حول كيفية استخدام وتطوير المنفذين المخصصين.

بمجرد أن تقرر استخدام ExampleGen ، سوف تحتاج إلى تعديل تعريف خط الأنابيب لاستخدام بياناتك.

  1. قم بتعديل DATA_PATH في local_runner.py على موقع ملفاتك.

    • إذا كانت لديك ملفات في البيئة المحلية ، فحدد المسار. هذا هو أفضل خيار لتطوير أو تصحيح خط أنابيب.
    • إذا تم تخزين الملفات في GCS ، يمكنك استخدام مسار يبدأ بـ gs://{bucket_name}/... يرجى التأكد من أنه يمكنك الوصول إلى GCS من جهازك الطرفي ، على سبيل المثال ، باستخدام gsutil . يرجى اتباع دليل الترخيص في Google Cloud إذا لزم الأمر.
    • إذا كنت تريد استخدام ExampleGen القائمة على الاستعلام مثل BigQueryExampleGen ، فإنك تحتاج إلى عبارة Query لتحديد البيانات من مصدر البيانات. هناك بعض الأشياء الأخرى التي تحتاج إلى تعيينها لاستخدام Google Cloud BigQuery كمصدر بيانات.
    • في pipeline/configs.py :
      • غيّر GOOGLE_CLOUD_PROJECT و GCS_BUCKET_NAME إلى مشروع GCP واسم المجموعة. يجب أن يكون الدلو موجودًا قبل تشغيل خط الأنابيب.
      • قم بإلغاء التعليق وتعيين متغير BIG_QUERY_QUERY على جملة الاستعلام الخاصة بك .
    • في local_runner.py :
      • قم بالتعليق على وسيطة data_path و uncomment query الوسيطة بدلاً من ذلك في pipeline.create_pipeline() .
    • في pipeline/pipeline.py .
      • قم بالتعليق على وسيطة data_path وسيطة query uncomment في create_pipeline() .
      • استخدم BigQueryExampleGen بدلاً من CsvExampleGen.
  2. استبدل CsvExampleGen الحالي بفئة ExampleGen الخاصة بك في pipeline/pipeline.py . كل فئة ExampleGen لها توقيع مختلف. يرجى الاطلاع على دليل مكونات ExampleGen لمزيد من التفاصيل. لا تنسَ استيراد الوحدات المطلوبة مع عبارات import في pipeline/pipeline.py .

يتكون خط الأنابيب الأولي من أربعة مكونات ، ExampleGen ، و StatisticsGen ، و SchemaGen ، و ExampleValidator . لا نحتاج إلى تغيير أي شيء لـ StatisticsGen و SchemaGen و ExampleValidator . لنقم بتشغيل خط الأنابيب لأول مرة.

# Update and run the pipeline.
!tfx pipeline update --engine=local --pipeline_path=local_runner.py \
 && tfx run create --engine=local --pipeline_name={PIPELINE_NAME}
Updating pipeline
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
Pipeline "my_pipeline" updated successfully.
Creating a run for pipeline: my_pipeline
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Using deployment config:
 executor_specs {
  key: "CsvExampleGen"
  value {
    beam_executable_spec {
      python_executor_spec {
        class_path: "tfx.components.example_gen.csv_example_gen.executor.Executor"
executor_specs {
  key: "SchemaGen"
  value {
    python_class_executable_spec {
      class_path: "tfx.components.schema_gen.executor.Executor"
executor_specs {
  key: "StatisticsGen"
  value {
    beam_executable_spec {
      python_executor_spec {
        class_path: "tfx.components.statistics_gen.executor.Executor"
custom_driver_specs {
  key: "CsvExampleGen"
  value {
    python_class_executable_spec {
      class_path: "tfx.components.example_gen.driver.FileBasedDriver"
metadata_connection_config {
  database_connection_config {
    sqlite {
      filename_uri: "./tfx_metadata/my_pipeline/metadata.db"
      connection_mode: READWRITE_OPENCREATE

INFO:absl:Using connection config:
 sqlite {
  filename_uri: "./tfx_metadata/my_pipeline/metadata.db"
  connection_mode: READWRITE_OPENCREATE

INFO:absl:Component CsvExampleGen is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen"
  id: "CsvExampleGen"
contexts {
  contexts {
    type {
      name: "pipeline"
    name {
      field_value {
        string_value: "my_pipeline"
  contexts {
    type {
      name: "pipeline_run"
    name {
      field_value {
        string_value: "2022-02-03T11:09:12.120566"
  contexts {
    type {
      name: "node"
    name {
      field_value {
        string_value: "my_pipeline.CsvExampleGen"
outputs {
  outputs {
    key: "examples"
    value {
      artifact_spec {
        type {
          name: "Examples"
          properties {
            key: "span"
            value: INT
          properties {
            key: "split_names"
            value: STRING
          properties {
            key: "version"
            value: INT
          base_type: DATASET
parameters {
  parameters {
    key: "input_base"
    value {
      field_value {
        string_value: "/home/kbuilder/imported/my_pipeline/data"
  parameters {
    key: "input_config"
    value {
      field_value {
        string_value: "{\n  \"splits\": [\n    {\n      \"name\": \"single_split\",\n      \"pattern\": \"*\"\n    }\n  ]\n}"
  parameters {
    key: "output_config"
    value {
      field_value {
        string_value: "{\n  \"split_config\": {\n    \"splits\": [\n      {\n        \"hash_buckets\": 2,\n        \"name\": \"train\"\n      },\n      {\n        \"hash_buckets\": 1,\n        \"name\": \"eval\"\n      }\n    ]\n  }\n}"
  parameters {
    key: "output_data_format"
    value {
      field_value {
        int_value: 6
  parameters {
    key: "output_file_format"
    value {
      field_value {
        int_value: 5
downstream_nodes: "StatisticsGen"
execution_options {
  caching_options {

INFO:absl:MetadataStore with DB connection initialized
INFO:absl:select span and version = (0, None)
INFO:absl:latest span and version = (0, None)
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Going to run a new execution 1
INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=1, input_dict={}, output_dict=defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/1"
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:09:12.120566:CsvExampleGen:examples:0"
custom_properties {
  key: "span"
  value {
    int_value: 0
, artifact_type: name: "Examples"
properties {
  key: "span"
  value: INT
properties {
  key: "split_names"
  value: STRING
properties {
  key: "version"
  value: INT
base_type: DATASET
)]}), exec_properties={'input_config': '{\n  "splits": [\n    {\n      "name": "single_split",\n      "pattern": "*"\n    }\n  ]\n}', 'output_data_format': 6, 'output_file_format': 5, 'output_config': '{\n  "split_config": {\n    "splits": [\n      {\n        "hash_buckets": 2,\n        "name": "train"\n      },\n      {\n        "hash_buckets": 1,\n        "name": "eval"\n      }\n    ]\n  }\n}', 'input_base': '/home/kbuilder/imported/my_pipeline/data', 'span': 0, 'version': None, 'input_fingerprint': 'split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/1/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/stateful_working_dir/2022-02-03T11:09:12.120566', tmp_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/1/.temp/', pipeline_node=node_info {
  type {
    name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen"
  id: "CsvExampleGen"
contexts {
  contexts {
    type {
      name: "pipeline"
    name {
      field_value {
        string_value: "my_pipeline"
  contexts {
    type {
      name: "pipeline_run"
    name {
      field_value {
        string_value: "2022-02-03T11:09:12.120566"
  contexts {
    type {
      name: "node"
    name {
      field_value {
        string_value: "my_pipeline.CsvExampleGen"
outputs {
  outputs {
    key: "examples"
    value {
      artifact_spec {
        type {
          name: "Examples"
          properties {
            key: "span"
            value: INT
          properties {
            key: "split_names"
            value: STRING
          properties {
            key: "version"
            value: INT
          base_type: DATASET
parameters {
  parameters {
    key: "input_base"
    value {
      field_value {
        string_value: "/home/kbuilder/imported/my_pipeline/data"
  parameters {
    key: "input_config"
    value {
      field_value {
        string_value: "{\n  \"splits\": [\n    {\n      \"name\": \"single_split\",\n      \"pattern\": \"*\"\n    }\n  ]\n}"
  parameters {
    key: "output_config"
    value {
      field_value {
        string_value: "{\n  \"split_config\": {\n    \"splits\": [\n      {\n        \"hash_buckets\": 2,\n        \"name\": \"train\"\n      },\n      {\n        \"hash_buckets\": 1,\n        \"name\": \"eval\"\n      }\n    ]\n  }\n}"
  parameters {
    key: "output_data_format"
    value {
      field_value {
        int_value: 6
  parameters {
    key: "output_file_format"
    value {
      field_value {
        int_value: 5
downstream_nodes: "StatisticsGen"
execution_options {
  caching_options {
, pipeline_info=id: "my_pipeline"
, pipeline_run_id='2022-02-03T11:09:12.120566')
INFO:absl:Generating examples.
INFO:absl:Processing input csv data /home/kbuilder/imported/my_pipeline/data/* to TFExample.
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.
E0203 11:09:12.848598153    5127 fork_posix.cc:70]           Fork support is only compatible with the epoll1 and poll polling strategies
WARNING:apache_beam.io.tfrecordio:Couldn't find python-snappy so the implementation of _TFRecordUtil._masked_crc32c is not as fast as it could be.
INFO:absl:Examples generated.
INFO:absl:Value type <class 'NoneType'> of key version in exec_properties is not supported, going to drop it
INFO:absl:Value type <class 'list'> of key _beam_pipeline_args in exec_properties is not supported, going to drop it
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 1 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/1"
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:09:12.120566:CsvExampleGen:examples:0"
custom_properties {
  key: "span"
  value {
    int_value: 0
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
, artifact_type: name: "Examples"
properties {
  key: "span"
  value: INT
properties {
  key: "split_names"
  value: STRING
properties {
  key: "version"
  value: INT
base_type: DATASET
)]}) for execution 1
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component CsvExampleGen is finished.
INFO:absl:Component StatisticsGen is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "tfx.components.statistics_gen.component.StatisticsGen"
    base_type: PROCESS
  id: "StatisticsGen"
contexts {
  contexts {
    type {
      name: "pipeline"
    name {
      field_value {
        string_value: "my_pipeline"
  contexts {
    type {
      name: "pipeline_run"
    name {
      field_value {
        string_value: "2022-02-03T11:09:12.120566"
  contexts {
    type {
      name: "node"
    name {
      field_value {
        string_value: "my_pipeline.StatisticsGen"
inputs {
  inputs {
    key: "examples"
    value {
      channels {
        producer_node_query {
          id: "CsvExampleGen"
        context_queries {
          type {
            name: "pipeline"
          name {
            field_value {
              string_value: "my_pipeline"
        context_queries {
          type {
            name: "pipeline_run"
          name {
            field_value {
              string_value: "2022-02-03T11:09:12.120566"
        context_queries {
          type {
            name: "node"
          name {
            field_value {
              string_value: "my_pipeline.CsvExampleGen"
        artifact_query {
          type {
            name: "Examples"
            base_type: DATASET
        output_key: "examples"
      min_count: 1
outputs {
  outputs {
    key: "statistics"
    value {
      artifact_spec {
        type {
          name: "ExampleStatistics"
          properties {
            key: "span"
            value: INT
          properties {
            key: "split_names"
            value: STRING
          base_type: STATISTICS
parameters {
  parameters {
    key: "exclude_splits"
    value {
      field_value {
        string_value: "[]"
upstream_nodes: "CsvExampleGen"
downstream_nodes: "SchemaGen"
execution_options {
  caching_options {

INFO:absl:MetadataStore with DB connection initialized
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Going to run a new execution 2
INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=2, input_dict={'examples': [Artifact(artifact: id: 1
type_id: 15
uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/1"
properties {
  key: "split_names"
  value {
    string_value: "[\"train\", \"eval\"]"
custom_properties {
  key: "file_format"
  value {
    string_value: "tfrecords_gzip"
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:09:12.120566:CsvExampleGen:examples:0"
custom_properties {
  key: "payload_format"
  value {
    string_value: "FORMAT_TF_EXAMPLE"
custom_properties {
  key: "span"
  value {
    int_value: 0
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
state: LIVE
create_time_since_epoch: 1643886553302
last_update_time_since_epoch: 1643886553302
, artifact_type: id: 15
name: "Examples"
properties {
  key: "span"
  value: INT
properties {
  key: "split_names"
  value: STRING
properties {
  key: "version"
  value: INT
base_type: DATASET
)]}, output_dict=defaultdict(<class 'list'>, {'statistics': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/2"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:09:12.120566:StatisticsGen:statistics:0"
, artifact_type: name: "ExampleStatistics"
properties {
  key: "span"
  value: INT
properties {
  key: "split_names"
  value: STRING
base_type: STATISTICS
)]}), exec_properties={'exclude_splits': '[]'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/2/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/stateful_working_dir/2022-02-03T11:09:12.120566', tmp_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/2/.temp/', pipeline_node=node_info {
  type {
    name: "tfx.components.statistics_gen.component.StatisticsGen"
    base_type: PROCESS
  id: "StatisticsGen"
contexts {
  contexts {
    type {
      name: "pipeline"
    name {
      field_value {
        string_value: "my_pipeline"
  contexts {
    type {
      name: "pipeline_run"
    name {
      field_value {
        string_value: "2022-02-03T11:09:12.120566"
  contexts {
    type {
      name: "node"
    name {
      field_value {
        string_value: "my_pipeline.StatisticsGen"
inputs {
  inputs {
    key: "examples"
    value {
      channels {
        producer_node_query {
          id: "CsvExampleGen"
        context_queries {
          type {
            name: "pipeline"
          name {
            field_value {
              string_value: "my_pipeline"
        context_queries {
          type {
            name: "pipeline_run"
          name {
            field_value {
              string_value: "2022-02-03T11:09:12.120566"
        context_queries {
          type {
            name: "node"
          name {
            field_value {
              string_value: "my_pipeline.CsvExampleGen"
        artifact_query {
          type {
            name: "Examples"
            base_type: DATASET
        output_key: "examples"
      min_count: 1
outputs {
  outputs {
    key: "statistics"
    value {
      artifact_spec {
        type {
          name: "ExampleStatistics"
          properties {
            key: "span"
            value: INT
          properties {
            key: "split_names"
            value: STRING
          base_type: STATISTICS
parameters {
  parameters {
    key: "exclude_splits"
    value {
      field_value {
        string_value: "[]"
upstream_nodes: "CsvExampleGen"
downstream_nodes: "SchemaGen"
execution_options {
  caching_options {
, pipeline_info=id: "my_pipeline"
, pipeline_run_id='2022-02-03T11:09:12.120566')
INFO:absl:Generating statistics for split train.
INFO:absl:Statistics for split train written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/2/Split-train.
INFO:absl:Generating statistics for split eval.
INFO:absl:Statistics for split eval written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/2/Split-eval.
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 2 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'statistics': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/2"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:09:12.120566:StatisticsGen:statistics:0"
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
, artifact_type: name: "ExampleStatistics"
properties {
  key: "span"
  value: INT
properties {
  key: "split_names"
  value: STRING
base_type: STATISTICS
)]}) for execution 2
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component StatisticsGen is finished.
INFO:absl:Component SchemaGen is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "tfx.components.schema_gen.component.SchemaGen"
    base_type: PROCESS
  id: "SchemaGen"
contexts {
  contexts {
    type {
      name: "pipeline"
    name {
      field_value {
        string_value: "my_pipeline"
  contexts {
    type {
      name: "pipeline_run"
    name {
      field_value {
        string_value: "2022-02-03T11:09:12.120566"
  contexts {
    type {
      name: "node"
    name {
      field_value {
        string_value: "my_pipeline.SchemaGen"
inputs {
  inputs {
    key: "statistics"
    value {
      channels {
        producer_node_query {
          id: "StatisticsGen"
        context_queries {
          type {
            name: "pipeline"
          name {
            field_value {
              string_value: "my_pipeline"
        context_queries {
          type {
            name: "pipeline_run"
          name {
            field_value {
              string_value: "2022-02-03T11:09:12.120566"
        context_queries {
          type {
            name: "node"
          name {
            field_value {
              string_value: "my_pipeline.StatisticsGen"
        artifact_query {
          type {
            name: "ExampleStatistics"
            base_type: STATISTICS
        output_key: "statistics"
      min_count: 1
outputs {
  outputs {
    key: "schema"
    value {
      artifact_spec {
        type {
          name: "Schema"
parameters {
  parameters {
    key: "exclude_splits"
    value {
      field_value {
        string_value: "[]"
  parameters {
    key: "infer_feature_shape"
    value {
      field_value {
        int_value: 1
upstream_nodes: "StatisticsGen"
execution_options {
  caching_options {

INFO:absl:MetadataStore with DB connection initialized
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Going to run a new execution 3
INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=3, input_dict={'statistics': [Artifact(artifact: id: 2
type_id: 17
uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/2"
properties {
  key: "split_names"
  value {
    string_value: "[\"train\", \"eval\"]"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:09:12.120566:StatisticsGen:statistics:0"
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
state: LIVE
create_time_since_epoch: 1643886556588
last_update_time_since_epoch: 1643886556588
, artifact_type: id: 17
name: "ExampleStatistics"
properties {
  key: "span"
  value: INT
properties {
  key: "split_names"
  value: STRING
base_type: STATISTICS
)]}, output_dict=defaultdict(<class 'list'>, {'schema': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/SchemaGen/schema/3"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:09:12.120566:SchemaGen:schema:0"
, artifact_type: name: "Schema"
)]}), exec_properties={'infer_feature_shape': 1, 'exclude_splits': '[]'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/executor_execution/3/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/stateful_working_dir/2022-02-03T11:09:12.120566', tmp_dir='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/executor_execution/3/.temp/', pipeline_node=node_info {
  type {
    name: "tfx.components.schema_gen.component.SchemaGen"
    base_type: PROCESS
  id: "SchemaGen"
contexts {
  contexts {
    type {
      name: "pipeline"
    name {
      field_value {
        string_value: "my_pipeline"
  contexts {
    type {
      name: "pipeline_run"
    name {
      field_value {
        string_value: "2022-02-03T11:09:12.120566"
  contexts {
    type {
      name: "node"
    name {
      field_value {
        string_value: "my_pipeline.SchemaGen"
inputs {
  inputs {
    key: "statistics"
    value {
      channels {
        producer_node_query {
          id: "StatisticsGen"
        context_queries {
          type {
            name: "pipeline"
          name {
            field_value {
              string_value: "my_pipeline"
        context_queries {
          type {
            name: "pipeline_run"
          name {
            field_value {
              string_value: "2022-02-03T11:09:12.120566"
        context_queries {
          type {
            name: "node"
          name {
            field_value {
              string_value: "my_pipeline.StatisticsGen"
        artifact_query {
          type {
            name: "ExampleStatistics"
            base_type: STATISTICS
        output_key: "statistics"
      min_count: 1
outputs {
  outputs {
    key: "schema"
    value {
      artifact_spec {
        type {
          name: "Schema"
parameters {
  parameters {
    key: "exclude_splits"
    value {
      field_value {
        string_value: "[]"
  parameters {
    key: "infer_feature_shape"
    value {
      field_value {
        int_value: 1
upstream_nodes: "StatisticsGen"
execution_options {
  caching_options {
, pipeline_info=id: "my_pipeline"
, pipeline_run_id='2022-02-03T11:09:12.120566')
INFO:absl:Processing schema from statistics for split train.
INFO:absl:Processing schema from statistics for split eval.
INFO:absl:Schema written to ./tfx_pipeline_output/my_pipeline/SchemaGen/schema/3/schema.pbtxt.
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 3 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'schema': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/SchemaGen/schema/3"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:09:12.120566:SchemaGen:schema:0"
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
, artifact_type: name: "Schema"
)]}) for execution 3
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component SchemaGen is finished.

يجب أن تشاهد "انتهى المكون ExampleValidator." إذا تم تشغيل خط الأنابيب بنجاح.

افحص ناتج خط الأنابيب.

ينتج خط أنابيب TFX نوعين من المخرجات ، المصنوعات اليدوية وبيانات التعريف DB (MLMD) التي تحتوي على بيانات وصفية للقطع الأثرية وعمليات تنفيذ خطوط الأنابيب. يتم تحديد موقع الإخراج في local_runner.py . بشكل افتراضي ، يتم تخزين العناصر الأثرية في دليل tfx_pipeline_output ويتم تخزين البيانات الوصفية كقاعدة بيانات sqlite ضمن دليل tfx_metadata .

يمكنك استخدام واجهات برمجة تطبيقات MLMD لفحص هذه النواتج. أولاً ، سنحدد بعض وظائف الأداة المساعدة للبحث في نتائج المخرجات التي تم إنتاجها للتو.

import tensorflow as tf
import tfx
from ml_metadata import errors
from ml_metadata.proto import metadata_store_pb2
from tfx.types import artifact_utils

# TODO(b/171447278): Move these functions into TFX library.

def get_latest_executions(store, pipeline_name, component_id = None):
  """Fetch all pipeline runs."""
  if component_id is None:  # Find entire pipeline runs.
    run_contexts = [
        c for c in store.get_contexts_by_type('run')
        if c.properties['pipeline_name'].string_value == pipeline_name
  else:  # Find specific component runs.
    run_contexts = [
        c for c in store.get_contexts_by_type('component_run')
        if c.properties['pipeline_name'].string_value == pipeline_name and
           c.properties['component_id'].string_value == component_id
  if not run_contexts:
    return []
  # Pick the latest run context.
  latest_context = max(run_contexts,
                       key=lambda c: c.last_update_time_since_epoch)
  return store.get_executions_by_context(latest_context.id)

def get_latest_artifacts(store, pipeline_name, component_id = None):
  """Fetch all artifacts from latest pipeline execution."""
  executions = get_latest_executions(store, pipeline_name, component_id)

  # Fetch all artifacts produced from the given executions.
  execution_ids = [e.id for e in executions]
  events = store.get_events_by_execution_ids(execution_ids)
  artifact_ids = [
      event.artifact_id for event in events
      if event.type == metadata_store_pb2.Event.OUTPUT
  return store.get_artifacts_by_id(artifact_ids)

def find_latest_artifacts_by_type(store, artifacts, artifact_type):
  """Get the latest artifacts of a specified type."""
  # Get type information from MLMD
    artifact_type = store.get_artifact_type(artifact_type)
  except errors.NotFoundError:
    return []
  # Filter artifacts with type.
  filtered_artifacts = [aritfact for aritfact in artifacts
                        if aritfact.type_id == artifact_type.id]
  # Convert MLMD artifact data into TFX Artifact instances.
  return [artifact_utils.deserialize_artifact(artifact_type, artifact)
      for artifact in filtered_artifacts]

from tfx.orchestration.experimental.interactive import visualizations

def visualize_artifacts(artifacts):
  """Visualizes artifacts using standard visualization modules."""
  for artifact in artifacts:
    visualization = visualizations.get_registry().get_visualization(
    if visualization:

from tfx.orchestration.experimental.interactive import standard_visualizations

import pprint

from tfx.orchestration import metadata
from tfx.types import artifact_utils
from tfx.types import standard_artifacts

def preview_examples(artifacts):
  """Preview a few records from Examples artifacts."""
  pp = pprint.PrettyPrinter()
  for artifact in artifacts:
    print("==== Examples artifact:{}({})".format(artifact.name, artifact.uri))
    for split in artifact_utils.decode_split_names(artifact.split_names):
      print("==== Reading from split:{}".format(split))
      split_uri = artifact_utils.get_split_uri([artifact], split)

      # Get the list of files in this directory (all compressed TFRecord files)
      tfrecord_filenames = [os.path.join(split_uri, name)
                            for name in os.listdir(split_uri)]
      # Create a `TFRecordDataset` to read these files
      dataset = tf.data.TFRecordDataset(tfrecord_filenames,
      # Iterate over the first 2 records and decode them.
      for tfrecord in dataset.take(2):
        serialized_example = tfrecord.numpy()
        example = tf.train.Example()

import local_runner

metadata_connection_config = metadata.sqlite_metadata_connection_config(

الآن يمكننا قراءة البيانات الوصفية للنواتج الأثرية من MLMD.

with metadata.Metadata(metadata_connection_config) as metadata_handler:
    # Search all aritfacts from the previous pipeline run.
    artifacts = get_latest_artifacts(metadata_handler.store, PIPELINE_NAME)
    # Find artifacts of Examples type.
    examples_artifacts = find_latest_artifacts_by_type(
        metadata_handler.store, artifacts,
    # Find artifacts generated from StatisticsGen.
    stats_artifacts = find_latest_artifacts_by_type(
        metadata_handler.store, artifacts,
    # Find artifacts generated from SchemaGen.
    schema_artifacts = find_latest_artifacts_by_type(
        metadata_handler.store, artifacts,
    # Find artifacts generated from ExampleValidator.
    anomalies_artifacts = find_latest_artifacts_by_type(
        metadata_handler.store, artifacts,

الآن يمكننا فحص مخرجات كل مكون. يتم استخدام Tensorflow Data Validation (TFDV) في StatisticsGen و SchemaGen و ExampleValidator ، ويمكن استخدام TFDV لتصور المخرجات من هذه المكونات.

في هذا البرنامج التعليمي ، سوف نستخدم طرق مساعدة المرئيات في TFX والتي تستخدم TFDV داخليًا لإظهار التصور. يرجى الاطلاع على البرنامج التعليمي لمكونات TFX لمعرفة المزيد حول كل مكون.

افحص نموذج الإخراج ExampleGen

دعونا نفحص الإخراج من ExampleGen. ألق نظرة على المثالين الأولين لكل تقسيم:


بشكل افتراضي ، يقسم TFX ExampleGen الأمثلة إلى قسمين ، تدريب وتقييم ، ولكن يمكنك ضبط تكوين الانقسام .

افحص الإخراج من StatisticsGen


يتم توفير هذه الإحصائيات إلى SchemaGen لإنشاء مخطط للبيانات تلقائيًا.

افحص الإخراج من SchemaGen


يتم استنتاج هذا المخطط تلقائيًا من إخراج StatisticsGen. سنستخدم هذا المخطط الذي تم إنشاؤه في هذا البرنامج التعليمي ، ولكن يمكنك أيضًا تعديل المخطط وتخصيصه .

افحص الإخراج من ExampleValidator


إذا تم العثور على أي شذوذ ، فيمكنك مراجعة بياناتك بحيث تتبع جميع الأمثلة افتراضاتك. قد تكون المخرجات من المكونات الأخرى مثل StatistcsGen مفيدة. الحالات الشاذة التي تم العثور عليها لا تمنع تنفيذ خط الأنابيب.

يمكنك رؤية الميزات المتاحة من مخرجات SchemaGen . إذا كان من الممكن استخدام ميزاتك لإنشاء نموذج ML في Trainer مباشرةً ، فيمكنك تخطي الخطوة التالية والانتقال إلى الخطوة 4. وإلا يمكنك القيام ببعض الأعمال الهندسية المميزة في الخطوة التالية. عنصر Transform ضروري عندما تكون عمليات التمرير الكامل مثل حساب المتوسطات مطلوبة ، خاصة عندما تحتاج إلى القياس.

الخطوة 3. (اختياري) هندسة الميزات مع مكون التحويل.

في هذه الخطوة ، ستحدد وظائف هندسية متنوعة للميزات والتي سيتم استخدامها بواسطة مكون Transform في خط الأنابيب. راجع دليل مكونات التحويل لمزيد من المعلومات.

يعد هذا ضروريًا فقط إذا تطلب رمز التدريب ميزة (ميزات) إضافية غير متوفرة في إخراج ExampleGen. بخلاف ذلك ، لا تتردد في التقدم سريعًا إلى الخطوة التالية لاستخدام المدرب.

تحديد ميزات النموذج

تحتوي models/features.py على ثوابت لتحديد ميزات النموذج بما في ذلك أسماء الميزات وحجم المفردات وما إلى ذلك. بشكل افتراضي ، يحتوي نموذج penguin على اثنين من التكاليف ، FEATURE_KEYS و LABEL_KEY ، لأن نموذج penguin لدينا يحل مشكلة التصنيف باستخدام التعلم الخاضع للإشراف وجميع الميزات هي ميزات رقمية مستمرة. شاهد تعريفات الميزات من مثال تاكسي شيكاغو للحصول على مثال آخر.

تنفيذ المعالجة المسبقة للتدريب / الخدمة في preprocessing_fn ().

تحدث هندسة الميزات الفعلية في وظيفة preprocessing_fn() في models/preprocessing.py .

في preprocessing_fn ، يمكنك تحديد سلسلة من الوظائف التي تتعامل مع إملاء إدخال الموترات لإنتاج إملاء الإخراج الخاص بالموترات. توجد وظائف مساعدة مثل scale_to_0_1 و compute_and_apply_vocabulary في TensorFlow Transform API أو يمكنك ببساطة استخدام وظائف TensorFlow العادية. بشكل افتراضي ، يتضمن نموذج penguin أمثلة لاستخدامات دالة tft.scale_to_z_score لتطبيع قيم الميزة.

راجع دليل Tensflow Transform لمزيد من المعلومات حول تأليف preprocessing_fn .

أضف مكون التحويل إلى خط الأنابيب.

إذا كان لديك preprocessing_fn جاهزًا ، فأضف مكون Transform إلى خط الأنابيب.

  1. في ملف pipeline/pipeline.py ، قم بإلغاء التعليق # components.append(transform) لإضافة المكون إلى خط الأنابيب.

يمكنك تحديث خط الأنابيب وتشغيله مرة أخرى.

!tfx pipeline update --engine=local --pipeline_path=local_runner.py \
 && tfx run create --engine=local --pipeline_name={PIPELINE_NAME}
Updating pipeline
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
Pipeline "my_pipeline" updated successfully.
Creating a run for pipeline: my_pipeline
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Using deployment config:
 executor_specs {
  key: "CsvExampleGen"
  value {
    beam_executable_spec {
      python_executor_spec {
        class_path: "tfx.components.example_gen.csv_example_gen.executor.Executor"
executor_specs {
  key: "SchemaGen"
  value {
    python_class_executable_spec {
      class_path: "tfx.components.schema_gen.executor.Executor"
executor_specs {
  key: "StatisticsGen"
  value {
    beam_executable_spec {
      python_executor_spec {
        class_path: "tfx.components.statistics_gen.executor.Executor"
custom_driver_specs {
  key: "CsvExampleGen"
  value {
    python_class_executable_spec {
      class_path: "tfx.components.example_gen.driver.FileBasedDriver"
metadata_connection_config {
  database_connection_config {
    sqlite {
      filename_uri: "./tfx_metadata/my_pipeline/metadata.db"
      connection_mode: READWRITE_OPENCREATE

INFO:absl:Using connection config:
 sqlite {
  filename_uri: "./tfx_metadata/my_pipeline/metadata.db"
  connection_mode: READWRITE_OPENCREATE

INFO:absl:Component CsvExampleGen is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen"
  id: "CsvExampleGen"
contexts {
  contexts {
    type {
      name: "pipeline"
    name {
      field_value {
        string_value: "my_pipeline"
  contexts {
    type {
      name: "pipeline_run"
    name {
      field_value {
        string_value: "2022-02-03T11:09:37.055994"
  contexts {
    type {
      name: "node"
    name {
      field_value {
        string_value: "my_pipeline.CsvExampleGen"
outputs {
  outputs {
    key: "examples"
    value {
      artifact_spec {
        type {
          name: "Examples"
          properties {
            key: "span"
            value: INT
          properties {
            key: "split_names"
            value: STRING
          properties {
            key: "version"
            value: INT
          base_type: DATASET
parameters {
  parameters {
    key: "input_base"
    value {
      field_value {
        string_value: "/home/kbuilder/imported/my_pipeline/data"
  parameters {
    key: "input_config"
    value {
      field_value {
        string_value: "{\n  \"splits\": [\n    {\n      \"name\": \"single_split\",\n      \"pattern\": \"*\"\n    }\n  ]\n}"
  parameters {
    key: "output_config"
    value {
      field_value {
        string_value: "{\n  \"split_config\": {\n    \"splits\": [\n      {\n        \"hash_buckets\": 2,\n        \"name\": \"train\"\n      },\n      {\n        \"hash_buckets\": 1,\n        \"name\": \"eval\"\n      }\n    ]\n  }\n}"
  parameters {
    key: "output_data_format"
    value {
      field_value {
        int_value: 6
  parameters {
    key: "output_file_format"
    value {
      field_value {
        int_value: 5
downstream_nodes: "StatisticsGen"
execution_options {
  caching_options {

INFO:absl:MetadataStore with DB connection initialized
INFO:absl:select span and version = (0, None)
INFO:absl:latest span and version = (0, None)
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Going to run a new execution 4
INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=4, input_dict={}, output_dict=defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/4"
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:09:37.055994:CsvExampleGen:examples:0"
custom_properties {
  key: "span"
  value {
    int_value: 0
, artifact_type: name: "Examples"
properties {
  key: "span"
  value: INT
properties {
  key: "split_names"
  value: STRING
properties {
  key: "version"
  value: INT
base_type: DATASET
)]}), exec_properties={'output_file_format': 5, 'input_base': '/home/kbuilder/imported/my_pipeline/data', 'output_config': '{\n  "split_config": {\n    "splits": [\n      {\n        "hash_buckets": 2,\n        "name": "train"\n      },\n      {\n        "hash_buckets": 1,\n        "name": "eval"\n      }\n    ]\n  }\n}', 'output_data_format': 6, 'input_config': '{\n  "splits": [\n    {\n      "name": "single_split",\n      "pattern": "*"\n    }\n  ]\n}', 'span': 0, 'version': None, 'input_fingerprint': 'split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/4/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/stateful_working_dir/2022-02-03T11:09:37.055994', tmp_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/4/.temp/', pipeline_node=node_info {
  type {
    name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen"
  id: "CsvExampleGen"
contexts {
  contexts {
    type {
      name: "pipeline"
    name {
      field_value {
        string_value: "my_pipeline"
  contexts {
    type {
      name: "pipeline_run"
    name {
      field_value {
        string_value: "2022-02-03T11:09:37.055994"
  contexts {
    type {
      name: "node"
    name {
      field_value {
        string_value: "my_pipeline.CsvExampleGen"
outputs {
  outputs {
    key: "examples"
    value {
      artifact_spec {
        type {
          name: "Examples"
          properties {
            key: "span"
            value: INT
          properties {
            key: "split_names"
            value: STRING
          properties {
            key: "version"
            value: INT
          base_type: DATASET
parameters {
  parameters {
    key: "input_base"
    value {
      field_value {
        string_value: "/home/kbuilder/imported/my_pipeline/data"
  parameters {
    key: "input_config"
    value {
      field_value {
        string_value: "{\n  \"splits\": [\n    {\n      \"name\": \"single_split\",\n      \"pattern\": \"*\"\n    }\n  ]\n}"
  parameters {
    key: "output_config"
    value {
      field_value {
        string_value: "{\n  \"split_config\": {\n    \"splits\": [\n      {\n        \"hash_buckets\": 2,\n        \"name\": \"train\"\n      },\n      {\n        \"hash_buckets\": 1,\n        \"name\": \"eval\"\n      }\n    ]\n  }\n}"
  parameters {
    key: "output_data_format"
    value {
      field_value {
        int_value: 6
  parameters {
    key: "output_file_format"
    value {
      field_value {
        int_value: 5
downstream_nodes: "StatisticsGen"
execution_options {
  caching_options {
, pipeline_info=id: "my_pipeline"
, pipeline_run_id='2022-02-03T11:09:37.055994')
INFO:absl:Generating examples.
INFO:absl:Processing input csv data /home/kbuilder/imported/my_pipeline/data/* to TFExample.
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.
E0203 11:09:37.596944686    5287 fork_posix.cc:70]           Fork support is only compatible with the epoll1 and poll polling strategies
WARNING:apache_beam.io.tfrecordio:Couldn't find python-snappy so the implementation of _TFRecordUtil._masked_crc32c is not as fast as it could be.
INFO:absl:Examples generated.
INFO:absl:Value type <class 'NoneType'> of key version in exec_properties is not supported, going to drop it
INFO:absl:Value type <class 'list'> of key _beam_pipeline_args in exec_properties is not supported, going to drop it
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 4 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/4"
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:09:37.055994:CsvExampleGen:examples:0"
custom_properties {
  key: "span"
  value {
    int_value: 0
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
, artifact_type: name: "Examples"
properties {
  key: "span"
  value: INT
properties {
  key: "split_names"
  value: STRING
properties {
  key: "version"
  value: INT
base_type: DATASET
)]}) for execution 4
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component CsvExampleGen is finished.
INFO:absl:Component StatisticsGen is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "tfx.components.statistics_gen.component.StatisticsGen"
    base_type: PROCESS
  id: "StatisticsGen"
contexts {
  contexts {
    type {
      name: "pipeline"
    name {
      field_value {
        string_value: "my_pipeline"
  contexts {
    type {
      name: "pipeline_run"
    name {
      field_value {
        string_value: "2022-02-03T11:09:37.055994"
  contexts {
    type {
      name: "node"
    name {
      field_value {
        string_value: "my_pipeline.StatisticsGen"
inputs {
  inputs {
    key: "examples"
    value {
      channels {
        producer_node_query {
          id: "CsvExampleGen"
        context_queries {
          type {
            name: "pipeline"
          name {
            field_value {
              string_value: "my_pipeline"
        context_queries {
          type {
            name: "pipeline_run"
          name {
            field_value {
              string_value: "2022-02-03T11:09:37.055994"
        context_queries {
          type {
            name: "node"
          name {
            field_value {
              string_value: "my_pipeline.CsvExampleGen"
        artifact_query {
          type {
            name: "Examples"
            base_type: DATASET
        output_key: "examples"
      min_count: 1
outputs {
  outputs {
    key: "statistics"
    value {
      artifact_spec {
        type {
          name: "ExampleStatistics"
          properties {
            key: "span"
            value: INT
          properties {
            key: "split_names"
            value: STRING
          base_type: STATISTICS
parameters {
  parameters {
    key: "exclude_splits"
    value {
      field_value {
        string_value: "[]"
upstream_nodes: "CsvExampleGen"
downstream_nodes: "SchemaGen"
execution_options {
  caching_options {

INFO:absl:MetadataStore with DB connection initialized
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Going to run a new execution 5
INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=5, input_dict={'examples': [Artifact(artifact: id: 4
type_id: 15
uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/4"
properties {
  key: "split_names"
  value {
    string_value: "[\"train\", \"eval\"]"
custom_properties {
  key: "file_format"
  value {
    string_value: "tfrecords_gzip"
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:09:37.055994:CsvExampleGen:examples:0"
custom_properties {
  key: "payload_format"
  value {
    string_value: "FORMAT_TF_EXAMPLE"
custom_properties {
  key: "span"
  value {
    int_value: 0
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
state: LIVE
create_time_since_epoch: 1643886578210
last_update_time_since_epoch: 1643886578210
, artifact_type: id: 15
name: "Examples"
properties {
  key: "span"
  value: INT
properties {
  key: "split_names"
  value: STRING
properties {
  key: "version"
  value: INT
base_type: DATASET
)]}, output_dict=defaultdict(<class 'list'>, {'statistics': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/5"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:09:37.055994:StatisticsGen:statistics:0"
, artifact_type: name: "ExampleStatistics"
properties {
  key: "span"
  value: INT
properties {
  key: "split_names"
  value: STRING
base_type: STATISTICS
)]}), exec_properties={'exclude_splits': '[]'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/5/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/stateful_working_dir/2022-02-03T11:09:37.055994', tmp_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/5/.temp/', pipeline_node=node_info {
  type {
    name: "tfx.components.statistics_gen.component.StatisticsGen"
    base_type: PROCESS
  id: "StatisticsGen"
contexts {
  contexts {
    type {
      name: "pipeline"
    name {
      field_value {
        string_value: "my_pipeline"
  contexts {
    type {
      name: "pipeline_run"
    name {
      field_value {
        string_value: "2022-02-03T11:09:37.055994"
  contexts {
    type {
      name: "node"
    name {
      field_value {
        string_value: "my_pipeline.StatisticsGen"
inputs {
  inputs {
    key: "examples"
    value {
      channels {
        producer_node_query {
          id: "CsvExampleGen"
        context_queries {
          type {
            name: "pipeline"
          name {
            field_value {
              string_value: "my_pipeline"
        context_queries {
          type {
            name: "pipeline_run"
          name {
            field_value {
              string_value: "2022-02-03T11:09:37.055994"
        context_queries {
          type {
            name: "node"
          name {
            field_value {
              string_value: "my_pipeline.CsvExampleGen"
        artifact_query {
          type {
            name: "Examples"
            base_type: DATASET
        output_key: "examples"
      min_count: 1
outputs {
  outputs {
    key: "statistics"
    value {
      artifact_spec {
        type {
          name: "ExampleStatistics"
          properties {
            key: "span"
            value: INT
          properties {
            key: "split_names"
            value: STRING
          base_type: STATISTICS
parameters {
  parameters {
    key: "exclude_splits"
    value {
      field_value {
        string_value: "[]"
upstream_nodes: "CsvExampleGen"
downstream_nodes: "SchemaGen"
execution_options {
  caching_options {
, pipeline_info=id: "my_pipeline"
, pipeline_run_id='2022-02-03T11:09:37.055994')
INFO:absl:Generating statistics for split train.
INFO:absl:Statistics for split train written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/5/Split-train.
INFO:absl:Generating statistics for split eval.
INFO:absl:Statistics for split eval written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/5/Split-eval.
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 5 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'statistics': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/5"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:09:37.055994:StatisticsGen:statistics:0"
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
, artifact_type: name: "ExampleStatistics"
properties {
  key: "span"
  value: INT
properties {
  key: "split_names"
  value: STRING
base_type: STATISTICS
)]}) for execution 5
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component StatisticsGen is finished.
INFO:absl:Component SchemaGen is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "tfx.components.schema_gen.component.SchemaGen"
    base_type: PROCESS
  id: "SchemaGen"
contexts {
  contexts {
    type {
      name: "pipeline"
    name {
      field_value {
        string_value: "my_pipeline"
  contexts {
    type {
      name: "pipeline_run"
    name {
      field_value {
        string_value: "2022-02-03T11:09:37.055994"
  contexts {
    type {
      name: "node"
    name {
      field_value {
        string_value: "my_pipeline.SchemaGen"
inputs {
  inputs {
    key: "statistics"
    value {
      channels {
        producer_node_query {
          id: "StatisticsGen"
        context_queries {
          type {
            name: "pipeline"
          name {
            field_value {
              string_value: "my_pipeline"
        context_queries {
          type {
            name: "pipeline_run"
          name {
            field_value {
              string_value: "2022-02-03T11:09:37.055994"
        context_queries {
          type {
            name: "node"
          name {
            field_value {
              string_value: "my_pipeline.StatisticsGen"
        artifact_query {
          type {
            name: "ExampleStatistics"
            base_type: STATISTICS
        output_key: "statistics"
      min_count: 1
outputs {
  outputs {
    key: "schema"
    value {
      artifact_spec {
        type {
          name: "Schema"
parameters {
  parameters {
    key: "exclude_splits"
    value {
      field_value {
        string_value: "[]"
  parameters {
    key: "infer_feature_shape"
    value {
      field_value {
        int_value: 1
upstream_nodes: "StatisticsGen"
execution_options {
  caching_options {

INFO:absl:MetadataStore with DB connection initialized
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Going to run a new execution 6
INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=6, input_dict={'statistics': [Artifact(artifact: id: 5
type_id: 17
uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/5"
properties {
  key: "split_names"
  value {
    string_value: "[\"train\", \"eval\"]"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:09:37.055994:StatisticsGen:statistics:0"
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
state: LIVE
create_time_since_epoch: 1643886581527
last_update_time_since_epoch: 1643886581527
, artifact_type: id: 17
name: "ExampleStatistics"
properties {
  key: "span"
  value: INT
properties {
  key: "split_names"
  value: STRING
base_type: STATISTICS
)]}, output_dict=defaultdict(<class 'list'>, {'schema': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/SchemaGen/schema/6"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:09:37.055994:SchemaGen:schema:0"
, artifact_type: name: "Schema"
)]}), exec_properties={'exclude_splits': '[]', 'infer_feature_shape': 1}, execution_output_uri='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/executor_execution/6/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/stateful_working_dir/2022-02-03T11:09:37.055994', tmp_dir='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/executor_execution/6/.temp/', pipeline_node=node_info {
  type {
    name: "tfx.components.schema_gen.component.SchemaGen"
    base_type: PROCESS
  id: "SchemaGen"
contexts {
  contexts {
    type {
      name: "pipeline"
    name {
      field_value {
        string_value: "my_pipeline"
  contexts {
    type {
      name: "pipeline_run"
    name {
      field_value {
        string_value: "2022-02-03T11:09:37.055994"
  contexts {
    type {
      name: "node"
    name {
      field_value {
        string_value: "my_pipeline.SchemaGen"
inputs {
  inputs {
    key: "statistics"
    value {
      channels {
        producer_node_query {
          id: "StatisticsGen"
        context_queries {
          type {
            name: "pipeline"
          name {
            field_value {
              string_value: "my_pipeline"
        context_queries {
          type {
            name: "pipeline_run"
          name {
            field_value {
              string_value: "2022-02-03T11:09:37.055994"
        context_queries {
          type {
            name: "node"
          name {
            field_value {
              string_value: "my_pipeline.StatisticsGen"
        artifact_query {
          type {
            name: "ExampleStatistics"
            base_type: STATISTICS
        output_key: "statistics"
      min_count: 1
outputs {
  outputs {
    key: "schema"
    value {
      artifact_spec {
        type {
          name: "Schema"
parameters {
  parameters {
    key: "exclude_splits"
    value {
      field_value {
        string_value: "[]"
  parameters {
    key: "infer_feature_shape"
    value {
      field_value {
        int_value: 1
upstream_nodes: "StatisticsGen"
execution_options {
  caching_options {
, pipeline_info=id: "my_pipeline"
, pipeline_run_id='2022-02-03T11:09:37.055994')
INFO:absl:Processing schema from statistics for split train.
INFO:absl:Processing schema from statistics for split eval.
INFO:absl:Schema written to ./tfx_pipeline_output/my_pipeline/SchemaGen/schema/6/schema.pbtxt.
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 6 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'schema': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/SchemaGen/schema/6"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:09:37.055994:SchemaGen:schema:0"
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
, artifact_type: name: "Schema"
)]}) for execution 6
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component SchemaGen is finished.

إذا تم تشغيل خط الأنابيب بنجاح ، يجب أن ترى "تم الانتهاء من تحويل المكون." في مكان ما في السجل. نظرًا لأن مكون Transform ومكون ExampleValidator لا يعتمدان على بعضهما البعض ، لم يتم إصلاح ترتيب عمليات التنفيذ. ومع ذلك ، يمكن أن يكون أي من Transform و ExampleValidator هو المكون الأخير في تنفيذ خط الأنابيب.

افحص الإخراج من التحويل

ينشئ مكون التحويل نوعين من المخرجات ، رسم بياني Tensorflow وأمثلة محولة. الأمثلة المحولة هي أمثلة لنوع الأداة التي يتم إنتاجها أيضًا بواسطة ExampleGen ، ولكن هذا المثال يحتوي على قيم معلم محولة بدلاً من ذلك.

يمكنك فحصها كما فعلنا في الخطوة السابقة.

with metadata.Metadata(metadata_connection_config) as metadata_handler:
    # Search all aritfacts from the previous run of Transform component.
    artifacts = get_latest_artifacts(metadata_handler.store,
                                     PIPELINE_NAME, "Transform")
    # Find artifacts of Examples type.
    transformed_examples_artifacts = find_latest_artifacts_by_type(
        metadata_handler.store, artifacts,

الخطوة 4. تدريب النموذج الخاص بك باستخدام مكون "المدرب".

سنقوم ببناء نموذج ML باستخدام مكون Trainer . راجع دليل مكون المدرب لمزيد من المعلومات. تحتاج إلى تقديم رمز النموذج الخاص بك إلى مكون "المدرب".

حدد نموذجك.

في قالب البطريق ، يتم استخدام models.model.run_fn كوسيطة run_fn لمكون Trainer . وهذا يعني أنه سيتم استدعاء وظيفة run_fn() في models/model.py عند تشغيل مكون Trainer . يمكنك رؤية الكود لإنشاء نموذج DNN بسيط باستخدام keras API في كود معين. راجع TensorFlow 2.x في دليل TFX لمزيد من المعلومات حول استخدام keras API في TFX.

في هذا run_fn ، يجب عليك بناء نموذج وحفظه في دليل مشار إليه بواسطة fn_args.serving_model_dir والذي تم تحديده بواسطة المكون. يمكنك استخدام وسيطات أخرى في fn_args والتي يتم تمريرها إلى run_fn . راجع الرموز ذات الصلة للحصول على القائمة الكاملة للوسيطات في fn_args .

حدد ميزاتك في models/features.py واستخدمها حسب الحاجة. إذا قمت بتحويل ميزاتك في الخطوة 3 ، فيجب عليك استخدام الميزات المحولة كمدخلات لنموذجك.

أضف مكون المدرب إلى خط الأنابيب.

إذا كان run_fn جاهزًا ، أضف مكون Trainer إلى خط الأنابيب.

  1. في ملف pipeline/pipeline.py ، قم بإلغاء التعليق # components.append(trainer) لإضافة المكون إلى خط الأنابيب.

قد تعتمد وسيطات مكون المدرب على ما إذا كنت تستخدم مكون التحويل أم لا.

  • إذا كنت لا تستخدم مكون Transform ، فلن تحتاج إلى تغيير الوسائط.
  • إذا كنت تستخدم مكون Transform ، فستحتاج إلى تغيير الوسيطات عند إنشاء مثيل مكون Trainer .

    • تغيير وسيطة examples إلى examples=transform.outputs['transformed_examples'], . نحن بحاجة إلى استخدام الأمثلة المحولة للتدريب.
    • أضف وسيطة transform_graph مثل transform_graph=transform.outputs['transform_graph'], . يحتوي هذا الرسم البياني على رسم بياني TensorFlow لعمليات التحويل.
    • بعد التغييرات أعلاه ، سيبدو رمز إنشاء مكون المدرب كما يلي.
    # If you use a Transform component.
    trainer = Trainer(

يمكنك تحديث خط الأنابيب وتشغيله مرة أخرى.

!tfx pipeline update --engine=local --pipeline_path=local_runner.py \
 && tfx run create --engine=local --pipeline_name={PIPELINE_NAME}
Updating pipeline
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
Pipeline "my_pipeline" updated successfully.
Creating a run for pipeline: my_pipeline
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Using deployment config:
 executor_specs {
  key: "CsvExampleGen"
  value {
    beam_executable_spec {
      python_executor_spec {
        class_path: "tfx.components.example_gen.csv_example_gen.executor.Executor"
executor_specs {
  key: "SchemaGen"
  value {
    python_class_executable_spec {
      class_path: "tfx.components.schema_gen.executor.Executor"
executor_specs {
  key: "StatisticsGen"
  value {
    beam_executable_spec {
      python_executor_spec {
        class_path: "tfx.components.statistics_gen.executor.Executor"
custom_driver_specs {
  key: "CsvExampleGen"
  value {
    python_class_executable_spec {
      class_path: "tfx.components.example_gen.driver.FileBasedDriver"
metadata_connection_config {
  database_connection_config {
    sqlite {
      filename_uri: "./tfx_metadata/my_pipeline/metadata.db"
      connection_mode: READWRITE_OPENCREATE

INFO:absl:Using connection config:
 sqlite {
  filename_uri: "./tfx_metadata/my_pipeline/metadata.db"
  connection_mode: READWRITE_OPENCREATE

INFO:absl:Component CsvExampleGen is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen"
  id: "CsvExampleGen"
contexts {
  contexts {
    type {
      name: "pipeline"
    name {
      field_value {
        string_value: "my_pipeline"
  contexts {
    type {
      name: "pipeline_run"
    name {
      field_value {
        string_value: "2022-02-03T11:10:00.469382"
  contexts {
    type {
      name: "node"
    name {
      field_value {
        string_value: "my_pipeline.CsvExampleGen"
outputs {
  outputs {
    key: "examples"
    value {
      artifact_spec {
        type {
          name: "Examples"
          properties {
            key: "span"
            value: INT
          properties {
            key: "split_names"
            value: STRING
          properties {
            key: "version"
            value: INT
          base_type: DATASET
parameters {
  parameters {
    key: "input_base"
    value {
      field_value {
        string_value: "/home/kbuilder/imported/my_pipeline/data"
  parameters {
    key: "input_config"
    value {
      field_value {
        string_value: "{\n  \"splits\": [\n    {\n      \"name\": \"single_split\",\n      \"pattern\": \"*\"\n    }\n  ]\n}"
  parameters {
    key: "output_config"
    value {
      field_value {
        string_value: "{\n  \"split_config\": {\n    \"splits\": [\n      {\n        \"hash_buckets\": 2,\n        \"name\": \"train\"\n      },\n      {\n        \"hash_buckets\": 1,\n        \"name\": \"eval\"\n      }\n    ]\n  }\n}"
  parameters {
    key: "output_data_format"
    value {
      field_value {
        int_value: 6
  parameters {
    key: "output_file_format"
    value {
      field_value {
        int_value: 5
downstream_nodes: "StatisticsGen"
execution_options {
  caching_options {

INFO:absl:MetadataStore with DB connection initialized
INFO:absl:select span and version = (0, None)
INFO:absl:latest span and version = (0, None)
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Going to run a new execution 7
INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=7, input_dict={}, output_dict=defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/7"
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:00.469382:CsvExampleGen:examples:0"
custom_properties {
  key: "span"
  value {
    int_value: 0
, artifact_type: name: "Examples"
properties {
  key: "span"
  value: INT
properties {
  key: "split_names"
  value: STRING
properties {
  key: "version"
  value: INT
base_type: DATASET
)]}), exec_properties={'input_base': '/home/kbuilder/imported/my_pipeline/data', 'output_config': '{\n  "split_config": {\n    "splits": [\n      {\n        "hash_buckets": 2,\n        "name": "train"\n      },\n      {\n        "hash_buckets": 1,\n        "name": "eval"\n      }\n    ]\n  }\n}', 'output_data_format': 6, 'output_file_format': 5, 'input_config': '{\n  "splits": [\n    {\n      "name": "single_split",\n      "pattern": "*"\n    }\n  ]\n}', 'span': 0, 'version': None, 'input_fingerprint': 'split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/7/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/stateful_working_dir/2022-02-03T11:10:00.469382', tmp_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/7/.temp/', pipeline_node=node_info {
  type {
    name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen"
  id: "CsvExampleGen"
contexts {
  contexts {
    type {
      name: "pipeline"
    name {
      field_value {
        string_value: "my_pipeline"
  contexts {
    type {
      name: "pipeline_run"
    name {
      field_value {
        string_value: "2022-02-03T11:10:00.469382"
  contexts {
    type {
      name: "node"
    name {
      field_value {
        string_value: "my_pipeline.CsvExampleGen"
outputs {
  outputs {
    key: "examples"
    value {
      artifact_spec {
        type {
          name: "Examples"
          properties {
            key: "span"
            value: INT
          properties {
            key: "split_names"
            value: STRING
          properties {
            key: "version"
            value: INT
          base_type: DATASET
parameters {
  parameters {
    key: "input_base"
    value {
      field_value {
        string_value: "/home/kbuilder/imported/my_pipeline/data"
  parameters {
    key: "input_config"
    value {
      field_value {
        string_value: "{\n  \"splits\": [\n    {\n      \"name\": \"single_split\",\n      \"pattern\": \"*\"\n    }\n  ]\n}"
  parameters {
    key: "output_config"
    value {
      field_value {
        string_value: "{\n  \"split_config\": {\n    \"splits\": [\n      {\n        \"hash_buckets\": 2,\n        \"name\": \"train\"\n      },\n      {\n        \"hash_buckets\": 1,\n        \"name\": \"eval\"\n      }\n    ]\n  }\n}"
  parameters {
    key: "output_data_format"
    value {
      field_value {
        int_value: 6
  parameters {
    key: "output_file_format"
    value {
      field_value {
        int_value: 5
downstream_nodes: "StatisticsGen"
execution_options {
  caching_options {
, pipeline_info=id: "my_pipeline"
, pipeline_run_id='2022-02-03T11:10:00.469382')
INFO:absl:Generating examples.
INFO:absl:Processing input csv data /home/kbuilder/imported/my_pipeline/data/* to TFExample.
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.
E0203 11:10:01.173700221    5436 fork_posix.cc:70]           Fork support is only compatible with the epoll1 and poll polling strategies
WARNING:apache_beam.io.tfrecordio:Couldn't find python-snappy so the implementation of _TFRecordUtil._masked_crc32c is not as fast as it could be.
INFO:absl:Examples generated.
INFO:absl:Value type <class 'NoneType'> of key version in exec_properties is not supported, going to drop it
INFO:absl:Value type <class 'list'> of key _beam_pipeline_args in exec_properties is not supported, going to drop it
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 7 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/7"
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:00.469382:CsvExampleGen:examples:0"
custom_properties {
  key: "span"
  value {
    int_value: 0
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
, artifact_type: name: "Examples"
properties {
  key: "span"
  value: INT
properties {
  key: "split_names"
  value: STRING
properties {
  key: "version"
  value: INT
base_type: DATASET
)]}) for execution 7
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component CsvExampleGen is finished.
INFO:absl:Component StatisticsGen is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "tfx.components.statistics_gen.component.StatisticsGen"
    base_type: PROCESS
  id: "StatisticsGen"
contexts {
  contexts {
    type {
      name: "pipeline"
    name {
      field_value {
        string_value: "my_pipeline"
  contexts {
    type {
      name: "pipeline_run"
    name {
      field_value {
        string_value: "2022-02-03T11:10:00.469382"
  contexts {
    type {
      name: "node"
    name {
      field_value {
        string_value: "my_pipeline.StatisticsGen"
inputs {
  inputs {
    key: "examples"
    value {
      channels {
        producer_node_query {
          id: "CsvExampleGen"
        context_queries {
          type {
            name: "pipeline"
          name {
            field_value {
              string_value: "my_pipeline"
        context_queries {
          type {
            name: "pipeline_run"
          name {
            field_value {
              string_value: "2022-02-03T11:10:00.469382"
        context_queries {
          type {
            name: "node"
          name {
            field_value {
              string_value: "my_pipeline.CsvExampleGen"
        artifact_query {
          type {
            name: "Examples"
            base_type: DATASET
        output_key: "examples"
      min_count: 1
outputs {
  outputs {
    key: "statistics"
    value {
      artifact_spec {
        type {
          name: "ExampleStatistics"
          properties {
            key: "span"
            value: INT
          properties {
            key: "split_names"
            value: STRING
          base_type: STATISTICS
parameters {
  parameters {
    key: "exclude_splits"
    value {
      field_value {
        string_value: "[]"
upstream_nodes: "CsvExampleGen"
downstream_nodes: "SchemaGen"
execution_options {
  caching_options {

INFO:absl:MetadataStore with DB connection initialized
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Going to run a new execution 8
INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=8, input_dict={'examples': [Artifact(artifact: id: 7
type_id: 15
uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/7"
properties {
  key: "split_names"
  value {
    string_value: "[\"train\", \"eval\"]"
custom_properties {
  key: "file_format"
  value {
    string_value: "tfrecords_gzip"
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:00.469382:CsvExampleGen:examples:0"
custom_properties {
  key: "payload_format"
  value {
    string_value: "FORMAT_TF_EXAMPLE"
custom_properties {
  key: "span"
  value {
    int_value: 0
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
state: LIVE
create_time_since_epoch: 1643886601629
last_update_time_since_epoch: 1643886601629
, artifact_type: id: 15
name: "Examples"
properties {
  key: "span"
  value: INT
properties {
  key: "split_names"
  value: STRING
properties {
  key: "version"
  value: INT
base_type: DATASET
)]}, output_dict=defaultdict(<class 'list'>, {'statistics': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/8"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:00.469382:StatisticsGen:statistics:0"
, artifact_type: name: "ExampleStatistics"
properties {
  key: "span"
  value: INT
properties {
  key: "split_names"
  value: STRING
base_type: STATISTICS
)]}), exec_properties={'exclude_splits': '[]'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/8/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/stateful_working_dir/2022-02-03T11:10:00.469382', tmp_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/8/.temp/', pipeline_node=node_info {
  type {
    name: "tfx.components.statistics_gen.component.StatisticsGen"
    base_type: PROCESS
  id: "StatisticsGen"
contexts {
  contexts {
    type {
      name: "pipeline"
    name {
      field_value {
        string_value: "my_pipeline"
  contexts {
    type {
      name: "pipeline_run"
    name {
      field_value {
        string_value: "2022-02-03T11:10:00.469382"
  contexts {
    type {
      name: "node"
    name {
      field_value {
        string_value: "my_pipeline.StatisticsGen"
inputs {
  inputs {
    key: "examples"
    value {
      channels {
        producer_node_query {
          id: "CsvExampleGen"
        context_queries {
          type {
            name: "pipeline"
          name {
            field_value {
              string_value: "my_pipeline"
        context_queries {
          type {
            name: "pipeline_run"
          name {
            field_value {
              string_value: "2022-02-03T11:10:00.469382"
        context_queries {
          type {
            name: "node"
          name {
            field_value {
              string_value: "my_pipeline.CsvExampleGen"
        artifact_query {
          type {
            name: "Examples"
            base_type: DATASET
        output_key: "examples"
      min_count: 1
outputs {
  outputs {
    key: "statistics"
    value {
      artifact_spec {
        type {
          name: "ExampleStatistics"
          properties {
            key: "span"
            value: INT
          properties {
            key: "split_names"
            value: STRING
          base_type: STATISTICS
parameters {
  parameters {
    key: "exclude_splits"
    value {
      field_value {
        string_value: "[]"
upstream_nodes: "CsvExampleGen"
downstream_nodes: "SchemaGen"
execution_options {
  caching_options {
, pipeline_info=id: "my_pipeline"
, pipeline_run_id='2022-02-03T11:10:00.469382')
INFO:absl:Generating statistics for split train.
INFO:absl:Statistics for split train written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/8/Split-train.
INFO:absl:Generating statistics for split eval.
INFO:absl:Statistics for split eval written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/8/Split-eval.
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 8 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'statistics': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/8"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:00.469382:StatisticsGen:statistics:0"
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
, artifact_type: name: "ExampleStatistics"
properties {
  key: "span"
  value: INT
properties {
  key: "split_names"
  value: STRING
base_type: STATISTICS
)]}) for execution 8
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component StatisticsGen is finished.
INFO:absl:Component SchemaGen is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "tfx.components.schema_gen.component.SchemaGen"
    base_type: PROCESS
  id: "SchemaGen"
contexts {
  contexts {
    type {
      name: "pipeline"
    name {
      field_value {
        string_value: "my_pipeline"
  contexts {
    type {
      name: "pipeline_run"
    name {
      field_value {
        string_value: "2022-02-03T11:10:00.469382"
  contexts {
    type {
      name: "node"
    name {
      field_value {
        string_value: "my_pipeline.SchemaGen"
inputs {
  inputs {
    key: "statistics"
    value {
      channels {
        producer_node_query {
          id: "StatisticsGen"
        context_queries {
          type {
            name: "pipeline"
          name {
            field_value {
              string_value: "my_pipeline"
        context_queries {
          type {
            name: "pipeline_run"
          name {
            field_value {
              string_value: "2022-02-03T11:10:00.469382"
        context_queries {
          type {
            name: "node"
          name {
            field_value {
              string_value: "my_pipeline.StatisticsGen"
        artifact_query {
          type {
            name: "ExampleStatistics"
            base_type: STATISTICS
        output_key: "statistics"
      min_count: 1
outputs {
  outputs {
    key: "schema"
    value {
      artifact_spec {
        type {
          name: "Schema"
parameters {
  parameters {
    key: "exclude_splits"
    value {
      field_value {
        string_value: "[]"
  parameters {
    key: "infer_feature_shape"
    value {
      field_value {
        int_value: 1
upstream_nodes: "StatisticsGen"
execution_options {
  caching_options {

INFO:absl:MetadataStore with DB connection initialized
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Going to run a new execution 9
INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=9, input_dict={'statistics': [Artifact(artifact: id: 8
type_id: 17
uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/8"
properties {
  key: "split_names"
  value {
    string_value: "[\"train\", \"eval\"]"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:00.469382:StatisticsGen:statistics:0"
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
state: LIVE
create_time_since_epoch: 1643886605023
last_update_time_since_epoch: 1643886605023
, artifact_type: id: 17
name: "ExampleStatistics"
properties {
  key: "span"
  value: INT
properties {
  key: "split_names"
  value: STRING
base_type: STATISTICS
)]}, output_dict=defaultdict(<class 'list'>, {'schema': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/SchemaGen/schema/9"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:00.469382:SchemaGen:schema:0"
, artifact_type: name: "Schema"
)]}), exec_properties={'exclude_splits': '[]', 'infer_feature_shape': 1}, execution_output_uri='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/executor_execution/9/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/stateful_working_dir/2022-02-03T11:10:00.469382', tmp_dir='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/executor_execution/9/.temp/', pipeline_node=node_info {
  type {
    name: "tfx.components.schema_gen.component.SchemaGen"
    base_type: PROCESS
  id: "SchemaGen"
contexts {
  contexts {
    type {
      name: "pipeline"
    name {
      field_value {
        string_value: "my_pipeline"
  contexts {
    type {
      name: "pipeline_run"
    name {
      field_value {
        string_value: "2022-02-03T11:10:00.469382"
  contexts {
    type {
      name: "node"
    name {
      field_value {
        string_value: "my_pipeline.SchemaGen"
inputs {
  inputs {
    key: "statistics"
    value {
      channels {
        producer_node_query {
          id: "StatisticsGen"
        context_queries {
          type {
            name: "pipeline"
          name {
            field_value {
              string_value: "my_pipeline"
        context_queries {
          type {
            name: "pipeline_run"
          name {
            field_value {
              string_value: "2022-02-03T11:10:00.469382"
        context_queries {
          type {
            name: "node"
          name {
            field_value {
              string_value: "my_pipeline.StatisticsGen"
        artifact_query {
          type {
            name: "ExampleStatistics"
            base_type: STATISTICS
        output_key: "statistics"
      min_count: 1
outputs {
  outputs {
    key: "schema"
    value {
      artifact_spec {
        type {
          name: "Schema"
parameters {
  parameters {
    key: "exclude_splits"
    value {
      field_value {
        string_value: "[]"
  parameters {
    key: "infer_feature_shape"
    value {
      field_value {
        int_value: 1
upstream_nodes: "StatisticsGen"
execution_options {
  caching_options {
, pipeline_info=id: "my_pipeline"
, pipeline_run_id='2022-02-03T11:10:00.469382')
INFO:absl:Processing schema from statistics for split train.
INFO:absl:Processing schema from statistics for split eval.
INFO:absl:Schema written to ./tfx_pipeline_output/my_pipeline/SchemaGen/schema/9/schema.pbtxt.
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 9 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'schema': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/SchemaGen/schema/9"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:00.469382:SchemaGen:schema:0"
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
, artifact_type: name: "Schema"
)]}) for execution 9
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component SchemaGen is finished.

عند تشغيل هذا التنفيذ بنجاح ، تكون قد قمت الآن بإنشاء وتشغيل أول خط أنابيب TFX للطراز الخاص بك. تهانينا!

سيكون نموذجك الجديد موجودًا في مكان ما ضمن دليل الإخراج ، ولكن سيكون من الأفضل أن يكون لديك نموذج في موقع ثابت أو خدمة خارج خط أنابيب TFX الذي يحمل العديد من النتائج المؤقتة. أفضل حتى مع التقييم المستمر للنموذج المبني وهو أمر بالغ الأهمية في أنظمة إنتاج ML. سنرى كيف يعمل التقييم المستمر والنشر في TFX في الخطوة التالية.

الخطوة الخامسة (اختيارية) قم بتقييم النموذج باستخدام أداة التقييم وانشرها باستخدام أداة الدفع.

يقوم مكون Evaluator باستمرار بتقييم كل نموذج مبني من Trainer ، ويقوم Pusher بنسخ النموذج إلى موقع محدد مسبقًا في نظام الملفات أو حتى إلى نماذج Google Cloud AI Platform .

يضيف مكون مُقيِّم إلى خط الأنابيب.

في ملف pipeline/pipeline.py :

  1. # components.append(model_resolver) لإضافة أحدث محلل نموذج إلى خط الأنابيب. يمكن استخدام المقيِّم لمقارنة نموذج بنموذج الأساس القديم الذي اجتاز "المُقيِّم" في آخر تشغيل لخط الأنابيب. LatestBlessedModelResolver يعثر على أحدث طراز اجتاز المقيِّم.
  2. قم بتعيين tfma.MetricsSpec المناسب لطرازك. قد يختلف التقييم لكل نموذج ML. في نموذج penguin ، تم استخدام SparseCategoricalAccuracy لأننا نقوم بحل مشكلة تصنيف متعددة الفئات. تحتاج أيضًا إلى تحديد tfma.SliceSpec لتحليل نموذجك لشرائح معينة. لمزيد من التفاصيل ، راجع دليل مكون المقيم .
  3. Uncomment # components.append(evaluator) لإضافة المكون إلى خط الأنابيب.

يمكنك تحديث خط الأنابيب وتشغيله مرة أخرى.

# Update and run the pipeline.
!tfx pipeline update --engine=local --pipeline_path=local_runner.py \
 && tfx run create --engine=local --pipeline_name={PIPELINE_NAME}
Updating pipeline
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
Pipeline "my_pipeline" updated successfully.
Creating a run for pipeline: my_pipeline
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Using deployment config:
 executor_specs {
  key: "CsvExampleGen"
  value {
    beam_executable_spec {
      python_executor_spec {
        class_path: "tfx.components.example_gen.csv_example_gen.executor.Executor"
executor_specs {
  key: "SchemaGen"
  value {
    python_class_executable_spec {
      class_path: "tfx.components.schema_gen.executor.Executor"
executor_specs {
  key: "StatisticsGen"
  value {
    beam_executable_spec {
      python_executor_spec {
        class_path: "tfx.components.statistics_gen.executor.Executor"
custom_driver_specs {
  key: "CsvExampleGen"
  value {
    python_class_executable_spec {
      class_path: "tfx.components.example_gen.driver.FileBasedDriver"
metadata_connection_config {
  database_connection_config {
    sqlite {
      filename_uri: "./tfx_metadata/my_pipeline/metadata.db"
      connection_mode: READWRITE_OPENCREATE

INFO:absl:Using connection config:
 sqlite {
  filename_uri: "./tfx_metadata/my_pipeline/metadata.db"
  connection_mode: READWRITE_OPENCREATE

INFO:absl:Component CsvExampleGen is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen"
  id: "CsvExampleGen"
contexts {
  contexts {
    type {
      name: "pipeline"
    name {
      field_value {
        string_value: "my_pipeline"
  contexts {
    type {
      name: "pipeline_run"
    name {
      field_value {
        string_value: "2022-02-03T11:10:24.358660"
  contexts {
    type {
      name: "node"
    name {
      field_value {
        string_value: "my_pipeline.CsvExampleGen"
outputs {
  outputs {
    key: "examples"
    value {
      artifact_spec {
        type {
          name: "Examples"
          properties {
            key: "span"
            value: INT
          properties {
            key: "split_names"
            value: STRING
          properties {
            key: "version"
            value: INT
          base_type: DATASET
parameters {
  parameters {
    key: "input_base"
    value {
      field_value {
        string_value: "/home/kbuilder/imported/my_pipeline/data"
  parameters {
    key: "input_config"
    value {
      field_value {
        string_value: "{\n  \"splits\": [\n    {\n      \"name\": \"single_split\",\n      \"pattern\": \"*\"\n    }\n  ]\n}"
  parameters {
    key: "output_config"
    value {
      field_value {
        string_value: "{\n  \"split_config\": {\n    \"splits\": [\n      {\n        \"hash_buckets\": 2,\n        \"name\": \"train\"\n      },\n      {\n        \"hash_buckets\": 1,\n        \"name\": \"eval\"\n      }\n    ]\n  }\n}"
  parameters {
    key: "output_data_format"
    value {
      field_value {
        int_value: 6
  parameters {
    key: "output_file_format"
    value {
      field_value {
        int_value: 5
downstream_nodes: "StatisticsGen"
execution_options {
  caching_options {

INFO:absl:MetadataStore with DB connection initialized
INFO:absl:select span and version = (0, None)
INFO:absl:latest span and version = (0, None)
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Going to run a new execution 10
INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=10, input_dict={}, output_dict=defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/10"
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:24.358660:CsvExampleGen:examples:0"
custom_properties {
  key: "span"
  value {
    int_value: 0
, artifact_type: name: "Examples"
properties {
  key: "span"
  value: INT
properties {
  key: "split_names"
  value: STRING
properties {
  key: "version"
  value: INT
base_type: DATASET
)]}), exec_properties={'output_file_format': 5, 'output_config': '{\n  "split_config": {\n    "splits": [\n      {\n        "hash_buckets": 2,\n        "name": "train"\n      },\n      {\n        "hash_buckets": 1,\n        "name": "eval"\n      }\n    ]\n  }\n}', 'output_data_format': 6, 'input_base': '/home/kbuilder/imported/my_pipeline/data', 'input_config': '{\n  "splits": [\n    {\n      "name": "single_split",\n      "pattern": "*"\n    }\n  ]\n}', 'span': 0, 'version': None, 'input_fingerprint': 'split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/10/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/stateful_working_dir/2022-02-03T11:10:24.358660', tmp_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/10/.temp/', pipeline_node=node_info {
  type {
    name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen"
  id: "CsvExampleGen"
contexts {
  contexts {
    type {
      name: "pipeline"
    name {
      field_value {
        string_value: "my_pipeline"
  contexts {
    type {
      name: "pipeline_run"
    name {
      field_value {
        string_value: "2022-02-03T11:10:24.358660"
  contexts {
    type {
      name: "node"
    name {
      field_value {
        string_value: "my_pipeline.CsvExampleGen"
outputs {
  outputs {
    key: "examples"
    value {
      artifact_spec {
        type {
          name: "Examples"
          properties {
            key: "span"
            value: INT
          properties {
            key: "split_names"
            value: STRING
          properties {
            key: "version"
            value: INT
          base_type: DATASET
parameters {
  parameters {
    key: "input_base"
    value {
      field_value {
        string_value: "/home/kbuilder/imported/my_pipeline/data"
  parameters {
    key: "input_config"
    value {
      field_value {
        string_value: "{\n  \"splits\": [\n    {\n      \"name\": \"single_split\",\n      \"pattern\": \"*\"\n    }\n  ]\n}"
  parameters {
    key: "output_config"
    value {
      field_value {
        string_value: "{\n  \"split_config\": {\n    \"splits\": [\n      {\n        \"hash_buckets\": 2,\n        \"name\": \"train\"\n      },\n      {\n        \"hash_buckets\": 1,\n        \"name\": \"eval\"\n      }\n    ]\n  }\n}"
  parameters {
    key: "output_data_format"
    value {
      field_value {
        int_value: 6
  parameters {
    key: "output_file_format"
    value {
      field_value {
        int_value: 5
downstream_nodes: "StatisticsGen"
execution_options {
  caching_options {
, pipeline_info=id: "my_pipeline"
, pipeline_run_id='2022-02-03T11:10:24.358660')
INFO:absl:Generating examples.
INFO:absl:Processing input csv data /home/kbuilder/imported/my_pipeline/data/* to TFExample.
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.
E0203 11:10:24.894390124    5584 fork_posix.cc:70]           Fork support is only compatible with the epoll1 and poll polling strategies
WARNING:apache_beam.io.tfrecordio:Couldn't find python-snappy so the implementation of _TFRecordUtil._masked_crc32c is not as fast as it could be.
INFO:absl:Examples generated.
INFO:absl:Value type <class 'NoneType'> of key version in exec_properties is not supported, going to drop it
INFO:absl:Value type <class 'list'> of key _beam_pipeline_args in exec_properties is not supported, going to drop it
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 10 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/10"
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:24.358660:CsvExampleGen:examples:0"
custom_properties {
  key: "span"
  value {
    int_value: 0
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
, artifact_type: name: "Examples"
properties {
  key: "span"
  value: INT
properties {
  key: "split_names"
  value: STRING
properties {
  key: "version"
  value: INT
base_type: DATASET
)]}) for execution 10
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component CsvExampleGen is finished.
INFO:absl:Component StatisticsGen is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "tfx.components.statistics_gen.component.StatisticsGen"
    base_type: PROCESS
  id: "StatisticsGen"
contexts {
  contexts {
    type {
      name: "pipeline"
    name {
      field_value {
        string_value: "my_pipeline"
  contexts {
    type {
      name: "pipeline_run"
    name {
      field_value {
        string_value: "2022-02-03T11:10:24.358660"
  contexts {
    type {
      name: "node"
    name {
      field_value {
        string_value: "my_pipeline.StatisticsGen"
inputs {
  inputs {
    key: "examples"
    value {
      channels {
        producer_node_query {
          id: "CsvExampleGen"
        context_queries {
          type {
            name: "pipeline"
          name {
            field_value {
              string_value: "my_pipeline"
        context_queries {
          type {
            name: "pipeline_run"
          name {
            field_value {
              string_value: "2022-02-03T11:10:24.358660"
        context_queries {
          type {
            name: "node"
          name {
            field_value {
              string_value: "my_pipeline.CsvExampleGen"
        artifact_query {
          type {
            name: "Examples"
            base_type: DATASET
        output_key: "examples"
      min_count: 1
outputs {
  outputs {
    key: "statistics"
    value {
      artifact_spec {
        type {
          name: "ExampleStatistics"
          properties {
            key: "span"
            value: INT
          properties {
            key: "split_names"
            value: STRING
          base_type: STATISTICS
parameters {
  parameters {
    key: "exclude_splits"
    value {
      field_value {
        string_value: "[]"
upstream_nodes: "CsvExampleGen"
downstream_nodes: "SchemaGen"
execution_options {
  caching_options {

INFO:absl:MetadataStore with DB connection initialized
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Going to run a new execution 11
INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=11, input_dict={'examples': [Artifact(artifact: id: 10
type_id: 15
uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/10"
properties {
  key: "split_names"
  value {
    string_value: "[\"train\", \"eval\"]"
custom_properties {
  key: "file_format"
  value {
    string_value: "tfrecords_gzip"
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:24.358660:CsvExampleGen:examples:0"
custom_properties {
  key: "payload_format"
  value {
    string_value: "FORMAT_TF_EXAMPLE"
custom_properties {
  key: "span"
  value {
    int_value: 0
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
state: LIVE
create_time_since_epoch: 1643886625515
last_update_time_since_epoch: 1643886625515
, artifact_type: id: 15
name: "Examples"
properties {
  key: "span"
  value: INT
properties {
  key: "split_names"
  value: STRING
properties {
  key: "version"
  value: INT
base_type: DATASET
)]}, output_dict=defaultdict(<class 'list'>, {'statistics': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/11"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:24.358660:StatisticsGen:statistics:0"
, artifact_type: name: "ExampleStatistics"
properties {
  key: "span"
  value: INT
properties {
  key: "split_names"
  value: STRING
base_type: STATISTICS
)]}), exec_properties={'exclude_splits': '[]'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/11/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/stateful_working_dir/2022-02-03T11:10:24.358660', tmp_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/11/.temp/', pipeline_node=node_info {
  type {
    name: "tfx.components.statistics_gen.component.StatisticsGen"
    base_type: PROCESS
  id: "StatisticsGen"
contexts {
  contexts {
    type {
      name: "pipeline"
    name {
      field_value {
        string_value: "my_pipeline"
  contexts {
    type {
      name: "pipeline_run"
    name {
      field_value {
        string_value: "2022-02-03T11:10:24.358660"
  contexts {
    type {
      name: "node"
    name {
      field_value {
        string_value: "my_pipeline.StatisticsGen"
inputs {
  inputs {
    key: "examples"
    value {
      channels {
        producer_node_query {
          id: "CsvExampleGen"
        context_queries {
          type {
            name: "pipeline"
          name {
            field_value {
              string_value: "my_pipeline"
        context_queries {
          type {
            name: "pipeline_run"
          name {
            field_value {
              string_value: "2022-02-03T11:10:24.358660"
        context_queries {
          type {
            name: "node"
          name {
            field_value {
              string_value: "my_pipeline.CsvExampleGen"
        artifact_query {
          type {
            name: "Examples"
            base_type: DATASET
        output_key: "examples"
      min_count: 1
outputs {
  outputs {
    key: "statistics"
    value {
      artifact_spec {
        type {
          name: "ExampleStatistics"
          properties {
            key: "span"
            value: INT
          properties {
            key: "split_names"
            value: STRING
          base_type: STATISTICS
parameters {
  parameters {
    key: "exclude_splits"
    value {
      field_value {
        string_value: "[]"
upstream_nodes: "CsvExampleGen"
downstream_nodes: "SchemaGen"
execution_options {
  caching_options {
, pipeline_info=id: "my_pipeline"
, pipeline_run_id='2022-02-03T11:10:24.358660')
INFO:absl:Generating statistics for split train.
INFO:absl:Statistics for split train written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/11/Split-train.
INFO:absl:Generating statistics for split eval.
INFO:absl:Statistics for split eval written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/11/Split-eval.
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 11 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'statistics': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/11"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:24.358660:StatisticsGen:statistics:0"
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
, artifact_type: name: "ExampleStatistics"
properties {
  key: "span"
  value: INT
properties {
  key: "split_names"
  value: STRING
base_type: STATISTICS
)]}) for execution 11
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component StatisticsGen is finished.
INFO:absl:Component SchemaGen is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "tfx.components.schema_gen.component.SchemaGen"
    base_type: PROCESS
  id: "SchemaGen"
contexts {
  contexts {
    type {
      name: "pipeline"
    name {
      field_value {
        string_value: "my_pipeline"
  contexts {
    type {
      name: "pipeline_run"
    name {
      field_value {
        string_value: "2022-02-03T11:10:24.358660"
  contexts {
    type {
      name: "node"
    name {
      field_value {
        string_value: "my_pipeline.SchemaGen"
inputs {
  inputs {
    key: "statistics"
    value {
      channels {
        producer_node_query {
          id: "StatisticsGen"
        context_queries {
          type {
            name: "pipeline"
          name {
            field_value {
              string_value: "my_pipeline"
        context_queries {
          type {
            name: "pipeline_run"
          name {
            field_value {
              string_value: "2022-02-03T11:10:24.358660"
        context_queries {
          type {
            name: "node"
          name {
            field_value {
              string_value: "my_pipeline.StatisticsGen"
        artifact_query {
          type {
            name: "ExampleStatistics"
            base_type: STATISTICS
        output_key: "statistics"
      min_count: 1
outputs {
  outputs {
    key: "schema"
    value {
      artifact_spec {
        type {
          name: "Schema"
parameters {
  parameters {
    key: "exclude_splits"
    value {
      field_value {
        string_value: "[]"
  parameters {
    key: "infer_feature_shape"
    value {
      field_value {
        int_value: 1
upstream_nodes: "StatisticsGen"
execution_options {
  caching_options {

INFO:absl:MetadataStore with DB connection initialized
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Going to run a new execution 12
INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=12, input_dict={'statistics': [Artifact(artifact: id: 11
type_id: 17
uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/11"
properties {
  key: "split_names"
  value {
    string_value: "[\"train\", \"eval\"]"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:24.358660:StatisticsGen:statistics:0"
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
state: LIVE
create_time_since_epoch: 1643886628941
last_update_time_since_epoch: 1643886628941
, artifact_type: id: 17
name: "ExampleStatistics"
properties {
  key: "span"
  value: INT
properties {
  key: "split_names"
  value: STRING
base_type: STATISTICS
)]}, output_dict=defaultdict(<class 'list'>, {'schema': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/SchemaGen/schema/12"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:24.358660:SchemaGen:schema:0"
, artifact_type: name: "Schema"
)]}), exec_properties={'infer_feature_shape': 1, 'exclude_splits': '[]'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/executor_execution/12/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/stateful_working_dir/2022-02-03T11:10:24.358660', tmp_dir='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/executor_execution/12/.temp/', pipeline_node=node_info {
  type {
    name: "tfx.components.schema_gen.component.SchemaGen"
    base_type: PROCESS
  id: "SchemaGen"
contexts {
  contexts {
    type {
      name: "pipeline"
    name {
      field_value {
        string_value: "my_pipeline"
  contexts {
    type {
      name: "pipeline_run"
    name {
      field_value {
        string_value: "2022-02-03T11:10:24.358660"
  contexts {
    type {
      name: "node"
    name {
      field_value {
        string_value: "my_pipeline.SchemaGen"
inputs {
  inputs {
    key: "statistics"
    value {
      channels {
        producer_node_query {
          id: "StatisticsGen"
        context_queries {
          type {
            name: "pipeline"
          name {
            field_value {
              string_value: "my_pipeline"
        context_queries {
          type {
            name: "pipeline_run"
          name {
            field_value {
              string_value: "2022-02-03T11:10:24.358660"
        context_queries {
          type {
            name: "node"
          name {
            field_value {
              string_value: "my_pipeline.StatisticsGen"
        artifact_query {
          type {
            name: "ExampleStatistics"
            base_type: STATISTICS
        output_key: "statistics"
      min_count: 1
outputs {
  outputs {
    key: "schema"
    value {
      artifact_spec {
        type {
          name: "Schema"
parameters {
  parameters {
    key: "exclude_splits"
    value {
      field_value {
        string_value: "[]"
  parameters {
    key: "infer_feature_shape"
    value {
      field_value {
        int_value: 1
upstream_nodes: "StatisticsGen"
execution_options {
  caching_options {
, pipeline_info=id: "my_pipeline"
, pipeline_run_id='2022-02-03T11:10:24.358660')
INFO:absl:Processing schema from statistics for split train.
INFO:absl:Processing schema from statistics for split eval.
INFO:absl:Schema written to ./tfx_pipeline_output/my_pipeline/SchemaGen/schema/12/schema.pbtxt.
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 12 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'schema': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/SchemaGen/schema/12"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:24.358660:SchemaGen:schema:0"
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
, artifact_type: name: "Schema"
)]}) for execution 12
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component SchemaGen is finished.

افحص ناتج المقيم

تتطلب هذه الخطوة تمديد دفتر Jupyter لتحليل نموذج TensorFlow (TFMA). لاحظ أن إصدار ملحق الكمبيوتر المحمول TFMA يجب أن يكون مطابقًا لإصدار حزمة TFMA python.

سيقوم الأمر التالي بتثبيت ملحق دفتر الملاحظات TFMA من سجل NPM. قد يستغرق الأمر عدة دقائق حتى يكتمل.

# Install TFMA notebook extension.
jupyter labextension install tensorflow_model_analysis@{tfma.__version__}
usage: jupyter [-h] [--version] [--config-dir] [--data-dir] [--runtime-dir]
               [--paths] [--json] [--debug]

Jupyter: Interactive Computing

positional arguments:
  subcommand     the subcommand to launch

optional arguments:
  -h, --help     show this help message and exit
  --version      show the versions of core jupyter packages and exit
  --config-dir   show Jupyter config dir
  --data-dir     show Jupyter data dir
  --runtime-dir  show Jupyter runtime dir
  --paths        show all Jupyter paths. Add --json for machine-readable
  --json         output paths as machine-readable json
  --debug        output debug information about paths

Available subcommands: bundlerextension console dejavu execute kernel
kernelspec migrate nbconvert nbextension notebook qtconsole run
serverextension troubleshoot trust

Jupyter command `jupyter-labextension` not found.

في حالة اكتمال التثبيت ، يرجى إعادة تحميل المتصفح الخاص بك لجعل الامتداد ساري المفعول.

with metadata.Metadata(metadata_connection_config) as metadata_handler:
  # Search all aritfacts from the previous pipeline run.
  artifacts = get_latest_artifacts(metadata_handler.store, PIPELINE_NAME)
  model_evaluation_artifacts = find_latest_artifacts_by_type(
      metadata_handler.store, artifacts,
if model_evaluation_artifacts:
  tfma_result = tfma.load_eval_result(model_evaluation_artifacts[0].uri)

يضيف مكون دافع إلى خط الأنابيب.

إذا كان النموذج يبدو واعدًا ، فنحن بحاجة إلى نشره. يمكن لمكون Pusher نشر النموذج إلى موقع في نظام الملفات أو إلى نماذج النظام الأساسي لـ GCP AI باستخدام منفذ مخصص .

يقوم مكون Evaluator باستمرار بتقييم كل نموذج مبني من Trainer ، ويقوم Pusher بنسخ النموذج إلى موقع محدد مسبقًا في نظام الملفات أو حتى إلى نماذج Google Cloud AI Platform .

  1. في local_runner.py ، اضبط SERVING_MODEL_DIR على دليل للنشر.
  2. في ملف pipeline/pipeline.py ، قم بإلغاء التعليق # components.append(pusher) لإضافة دافع إلى خط الأنابيب.

يمكنك تحديث خط الأنابيب وتشغيله مرة أخرى.

# Update and run the pipeline.
!tfx pipeline update --engine=local --pipeline_path=local_runner.py \
 && tfx run create --engine=local --pipeline_name={PIPELINE_NAME}
Updating pipeline
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
Pipeline "my_pipeline" updated successfully.
Creating a run for pipeline: my_pipeline
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Using deployment config:
 executor_specs {
  key: "CsvExampleGen"
  value {
    beam_executable_spec {
      python_executor_spec {
        class_path: "tfx.components.example_gen.csv_example_gen.executor.Executor"
executor_specs {
  key: "SchemaGen"
  value {
    python_class_executable_spec {
      class_path: "tfx.components.schema_gen.executor.Executor"
executor_specs {
  key: "StatisticsGen"
  value {
    beam_executable_spec {
      python_executor_spec {
        class_path: "tfx.components.statistics_gen.executor.Executor"
custom_driver_specs {
  key: "CsvExampleGen"
  value {
    python_class_executable_spec {
      class_path: "tfx.components.example_gen.driver.FileBasedDriver"
metadata_connection_config {
  database_connection_config {
    sqlite {
      filename_uri: "./tfx_metadata/my_pipeline/metadata.db"
      connection_mode: READWRITE_OPENCREATE

INFO:absl:Using connection config:
 sqlite {
  filename_uri: "./tfx_metadata/my_pipeline/metadata.db"
  connection_mode: READWRITE_OPENCREATE

INFO:absl:Component CsvExampleGen is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen"
  id: "CsvExampleGen"
contexts {
  contexts {
    type {
      name: "pipeline"
    name {
      field_value {
        string_value: "my_pipeline"
  contexts {
    type {
      name: "pipeline_run"
    name {
      field_value {
        string_value: "2022-02-03T11:10:48.556314"
  contexts {
    type {
      name: "node"
    name {
      field_value {
        string_value: "my_pipeline.CsvExampleGen"
outputs {
  outputs {
    key: "examples"
    value {
      artifact_spec {
        type {
          name: "Examples"
          properties {
            key: "span"
            value: INT
          properties {
            key: "split_names"
            value: STRING
          properties {
            key: "version"
            value: INT
          base_type: DATASET
parameters {
  parameters {
    key: "input_base"
    value {
      field_value {
        string_value: "/home/kbuilder/imported/my_pipeline/data"
  parameters {
    key: "input_config"
    value {
      field_value {
        string_value: "{\n  \"splits\": [\n    {\n      \"name\": \"single_split\",\n      \"pattern\": \"*\"\n    }\n  ]\n}"
  parameters {
    key: "output_config"
    value {
      field_value {
        string_value: "{\n  \"split_config\": {\n    \"splits\": [\n      {\n        \"hash_buckets\": 2,\n        \"name\": \"train\"\n      },\n      {\n        \"hash_buckets\": 1,\n        \"name\": \"eval\"\n      }\n    ]\n  }\n}"
  parameters {
    key: "output_data_format"
    value {
      field_value {
        int_value: 6
  parameters {
    key: "output_file_format"
    value {
      field_value {
        int_value: 5
downstream_nodes: "StatisticsGen"
execution_options {
  caching_options {

INFO:absl:MetadataStore with DB connection initialized
INFO:absl:select span and version = (0, None)
INFO:absl:latest span and version = (0, None)
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Going to run a new execution 13
INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=13, input_dict={}, output_dict=defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/13"
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:48.556314:CsvExampleGen:examples:0"
custom_properties {
  key: "span"
  value {
    int_value: 0
, artifact_type: name: "Examples"
properties {
  key: "span"
  value: INT
properties {
  key: "split_names"
  value: STRING
properties {
  key: "version"
  value: INT
base_type: DATASET
)]}), exec_properties={'input_config': '{\n  "splits": [\n    {\n      "name": "single_split",\n      "pattern": "*"\n    }\n  ]\n}', 'output_config': '{\n  "split_config": {\n    "splits": [\n      {\n        "hash_buckets": 2,\n        "name": "train"\n      },\n      {\n        "hash_buckets": 1,\n        "name": "eval"\n      }\n    ]\n  }\n}', 'output_file_format': 5, 'output_data_format': 6, 'input_base': '/home/kbuilder/imported/my_pipeline/data', 'span': 0, 'version': None, 'input_fingerprint': 'split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/13/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/stateful_working_dir/2022-02-03T11:10:48.556314', tmp_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/13/.temp/', pipeline_node=node_info {
  type {
    name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen"
  id: "CsvExampleGen"
contexts {
  contexts {
    type {
      name: "pipeline"
    name {
      field_value {
        string_value: "my_pipeline"
  contexts {
    type {
      name: "pipeline_run"
    name {
      field_value {
        string_value: "2022-02-03T11:10:48.556314"
  contexts {
    type {
      name: "node"
    name {
      field_value {
        string_value: "my_pipeline.CsvExampleGen"
outputs {
  outputs {
    key: "examples"
    value {
      artifact_spec {
        type {
          name: "Examples"
          properties {
            key: "span"
            value: INT
          properties {
            key: "split_names"
            value: STRING
          properties {
            key: "version"
            value: INT
          base_type: DATASET
parameters {
  parameters {
    key: "input_base"
    value {
      field_value {
        string_value: "/home/kbuilder/imported/my_pipeline/data"
  parameters {
    key: "input_config"
    value {
      field_value {
        string_value: "{\n  \"splits\": [\n    {\n      \"name\": \"single_split\",\n      \"pattern\": \"*\"\n    }\n  ]\n}"
  parameters {
    key: "output_config"
    value {
      field_value {
        string_value: "{\n  \"split_config\": {\n    \"splits\": [\n      {\n        \"hash_buckets\": 2,\n        \"name\": \"train\"\n      },\n      {\n        \"hash_buckets\": 1,\n        \"name\": \"eval\"\n      }\n    ]\n  }\n}"
  parameters {
    key: "output_data_format"
    value {
      field_value {
        int_value: 6
  parameters {
    key: "output_file_format"
    value {
      field_value {
        int_value: 5
downstream_nodes: "StatisticsGen"
execution_options {
  caching_options {
, pipeline_info=id: "my_pipeline"
, pipeline_run_id='2022-02-03T11:10:48.556314')
INFO:absl:Generating examples.
INFO:absl:Processing input csv data /home/kbuilder/imported/my_pipeline/data/* to TFExample.
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.
E0203 11:10:49.163841363    5734 fork_posix.cc:70]           Fork support is only compatible with the epoll1 and poll polling strategies
WARNING:apache_beam.io.tfrecordio:Couldn't find python-snappy so the implementation of _TFRecordUtil._masked_crc32c is not as fast as it could be.
INFO:absl:Examples generated.
INFO:absl:Value type <class 'NoneType'> of key version in exec_properties is not supported, going to drop it
INFO:absl:Value type <class 'list'> of key _beam_pipeline_args in exec_properties is not supported, going to drop it
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 13 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/13"
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:48.556314:CsvExampleGen:examples:0"
custom_properties {
  key: "span"
  value {
    int_value: 0
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
, artifact_type: name: "Examples"
properties {
  key: "span"
  value: INT
properties {
  key: "split_names"
  value: STRING
properties {
  key: "version"
  value: INT
base_type: DATASET
)]}) for execution 13
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component CsvExampleGen is finished.
INFO:absl:Component StatisticsGen is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "tfx.components.statistics_gen.component.StatisticsGen"
    base_type: PROCESS
  id: "StatisticsGen"
contexts {
  contexts {
    type {
      name: "pipeline"
    name {
      field_value {
        string_value: "my_pipeline"
  contexts {
    type {
      name: "pipeline_run"
    name {
      field_value {
        string_value: "2022-02-03T11:10:48.556314"
  contexts {
    type {
      name: "node"
    name {
      field_value {
        string_value: "my_pipeline.StatisticsGen"
inputs {
  inputs {
    key: "examples"
    value {
      channels {
        producer_node_query {
          id: "CsvExampleGen"
        context_queries {
          type {
            name: "pipeline"
          name {
            field_value {
              string_value: "my_pipeline"
        context_queries {
          type {
            name: "pipeline_run"
          name {
            field_value {
              string_value: "2022-02-03T11:10:48.556314"
        context_queries {
          type {
            name: "node"
          name {
            field_value {
              string_value: "my_pipeline.CsvExampleGen"
        artifact_query {
          type {
            name: "Examples"
            base_type: DATASET
        output_key: "examples"
      min_count: 1
outputs {
  outputs {
    key: "statistics"
    value {
      artifact_spec {
        type {
          name: "ExampleStatistics"
          properties {
            key: "span"
            value: INT
          properties {
            key: "split_names"
            value: STRING
          base_type: STATISTICS
parameters {
  parameters {
    key: "exclude_splits"
    value {
      field_value {
        string_value: "[]"
upstream_nodes: "CsvExampleGen"
downstream_nodes: "SchemaGen"
execution_options {
  caching_options {

INFO:absl:MetadataStore with DB connection initialized
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Going to run a new execution 14
INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=14, input_dict={'examples': [Artifact(artifact: id: 13
type_id: 15
uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/13"
properties {
  key: "split_names"
  value {
    string_value: "[\"train\", \"eval\"]"
custom_properties {
  key: "file_format"
  value {
    string_value: "tfrecords_gzip"
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:48.556314:CsvExampleGen:examples:0"
custom_properties {
  key: "payload_format"
  value {
    string_value: "FORMAT_TF_EXAMPLE"
custom_properties {
  key: "span"
  value {
    int_value: 0
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
state: LIVE
create_time_since_epoch: 1643886649739
last_update_time_since_epoch: 1643886649739
, artifact_type: id: 15
name: "Examples"
properties {
  key: "span"
  value: INT
properties {
  key: "split_names"
  value: STRING
properties {
  key: "version"
  value: INT
base_type: DATASET
)]}, output_dict=defaultdict(<class 'list'>, {'statistics': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/14"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:48.556314:StatisticsGen:statistics:0"
, artifact_type: name: "ExampleStatistics"
properties {
  key: "span"
  value: INT
properties {
  key: "split_names"
  value: STRING
base_type: STATISTICS
)]}), exec_properties={'exclude_splits': '[]'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/14/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/stateful_working_dir/2022-02-03T11:10:48.556314', tmp_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/14/.temp/', pipeline_node=node_info {
  type {
    name: "tfx.components.statistics_gen.component.StatisticsGen"
    base_type: PROCESS
  id: "StatisticsGen"
contexts {
  contexts {
    type {
      name: "pipeline"
    name {
      field_value {
        string_value: "my_pipeline"
  contexts {
    type {
      name: "pipeline_run"
    name {
      field_value {
        string_value: "2022-02-03T11:10:48.556314"
  contexts {
    type {
      name: "node"
    name {
      field_value {
        string_value: "my_pipeline.StatisticsGen"
inputs {
  inputs {
    key: "examples"
    value {
      channels {
        producer_node_query {
          id: "CsvExampleGen"
        context_queries {
          type {
            name: "pipeline"
          name {
            field_value {
              string_value: "my_pipeline"
        context_queries {
          type {
            name: "pipeline_run"
          name {
            field_value {
              string_value: "2022-02-03T11:10:48.556314"
        context_queries {
          type {
            name: "node"
          name {
            field_value {
              string_value: "my_pipeline.CsvExampleGen"
        artifact_query {
          type {
            name: "Examples"
            base_type: DATASET
        output_key: "examples"
      min_count: 1
outputs {
  outputs {
    key: "statistics"
    value {
      artifact_spec {
        type {
          name: "ExampleStatistics"
          properties {
            key: "span"
            value: INT
          properties {
            key: "split_names"
            value: STRING
          base_type: STATISTICS
parameters {
  parameters {
    key: "exclude_splits"
    value {
      field_value {
        string_value: "[]"
upstream_nodes: "CsvExampleGen"
downstream_nodes: "SchemaGen"
execution_options {
  caching_options {
, pipeline_info=id: "my_pipeline"
, pipeline_run_id='2022-02-03T11:10:48.556314')
INFO:absl:Generating statistics for split train.
INFO:absl:Statistics for split train written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/14/Split-train.
INFO:absl:Generating statistics for split eval.
INFO:absl:Statistics for split eval written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/14/Split-eval.
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 14 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'statistics': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/14"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:48.556314:StatisticsGen:statistics:0"
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
, artifact_type: name: "ExampleStatistics"
properties {
  key: "span"
  value: INT
properties {
  key: "split_names"
  value: STRING
base_type: STATISTICS
)]}) for execution 14
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component StatisticsGen is finished.
INFO:absl:Component SchemaGen is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "tfx.components.schema_gen.component.SchemaGen"
    base_type: PROCESS
  id: "SchemaGen"
contexts {
  contexts {
    type {
      name: "pipeline"
    name {
      field_value {
        string_value: "my_pipeline"
  contexts {
    type {
      name: "pipeline_run"
    name {
      field_value {
        string_value: "2022-02-03T11:10:48.556314"
  contexts {
    type {
      name: "node"
    name {
      field_value {
        string_value: "my_pipeline.SchemaGen"
inputs {
  inputs {
    key: "statistics"
    value {
      channels {
        producer_node_query {
          id: "StatisticsGen"
        context_queries {
          type {
            name: "pipeline"
          name {
            field_value {
              string_value: "my_pipeline"
        context_queries {
          type {
            name: "pipeline_run"
          name {
            field_value {
              string_value: "2022-02-03T11:10:48.556314"
        context_queries {
          type {
            name: "node"
          name {
            field_value {
              string_value: "my_pipeline.StatisticsGen"
        artifact_query {
          type {
            name: "ExampleStatistics"
            base_type: STATISTICS
        output_key: "statistics"
      min_count: 1
outputs {
  outputs {
    key: "schema"
    value {
      artifact_spec {
        type {
          name: "Schema"
parameters {
  parameters {
    key: "exclude_splits"
    value {
      field_value {
        string_value: "[]"
  parameters {
    key: "infer_feature_shape"
    value {
      field_value {
        int_value: 1
upstream_nodes: "StatisticsGen"
execution_options {
  caching_options {

INFO:absl:MetadataStore with DB connection initialized
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Going to run a new execution 15
INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=15, input_dict={'statistics': [Artifact(artifact: id: 14
type_id: 17
uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/14"
properties {
  key: "split_names"
  value {
    string_value: "[\"train\", \"eval\"]"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:48.556314:StatisticsGen:statistics:0"
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
state: LIVE
create_time_since_epoch: 1643886653128
last_update_time_since_epoch: 1643886653128
, artifact_type: id: 17
name: "ExampleStatistics"
properties {
  key: "span"
  value: INT
properties {
  key: "split_names"
  value: STRING
base_type: STATISTICS
)]}, output_dict=defaultdict(<class 'list'>, {'schema': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/SchemaGen/schema/15"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:48.556314:SchemaGen:schema:0"
, artifact_type: name: "Schema"
)]}), exec_properties={'infer_feature_shape': 1, 'exclude_splits': '[]'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/executor_execution/15/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/stateful_working_dir/2022-02-03T11:10:48.556314', tmp_dir='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/executor_execution/15/.temp/', pipeline_node=node_info {
  type {
    name: "tfx.components.schema_gen.component.SchemaGen"
    base_type: PROCESS
  id: "SchemaGen"
contexts {
  contexts {
    type {
      name: "pipeline"
    name {
      field_value {
        string_value: "my_pipeline"
  contexts {
    type {
      name: "pipeline_run"
    name {
      field_value {
        string_value: "2022-02-03T11:10:48.556314"
  contexts {
    type {
      name: "node"
    name {
      field_value {
        string_value: "my_pipeline.SchemaGen"
inputs {
  inputs {
    key: "statistics"
    value {
      channels {
        producer_node_query {
          id: "StatisticsGen"
        context_queries {
          type {
            name: "pipeline"
          name {
            field_value {
              string_value: "my_pipeline"
        context_queries {
          type {
            name: "pipeline_run"
          name {
            field_value {
              string_value: "2022-02-03T11:10:48.556314"
        context_queries {
          type {
            name: "node"
          name {
            field_value {
              string_value: "my_pipeline.StatisticsGen"
        artifact_query {
          type {
            name: "ExampleStatistics"
            base_type: STATISTICS
        output_key: "statistics"
      min_count: 1
outputs {
  outputs {
    key: "schema"
    value {
      artifact_spec {
        type {
          name: "Schema"
parameters {
  parameters {
    key: "exclude_splits"
    value {
      field_value {
        string_value: "[]"
  parameters {
    key: "infer_feature_shape"
    value {
      field_value {
        int_value: 1
upstream_nodes: "StatisticsGen"
execution_options {
  caching_options {
, pipeline_info=id: "my_pipeline"
, pipeline_run_id='2022-02-03T11:10:48.556314')
INFO:absl:Processing schema from statistics for split train.
INFO:absl:Processing schema from statistics for split eval.
INFO:absl:Schema written to ./tfx_pipeline_output/my_pipeline/SchemaGen/schema/15/schema.pbtxt.
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 15 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'schema': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/SchemaGen/schema/15"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:48.556314:SchemaGen:schema:0"
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
, artifact_type: name: "Schema"
)]}) for execution 15
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component SchemaGen is finished.

من المفترض أن تتمكن من العثور على نموذجك الجديد في SERVING_MODEL_DIR .

الخطوة السادسة (اختيارية) انشر خط الأنابيب الخاص بك إلى خطوط أنابيب Kubeflow على GCP.

كما ذكرنا سابقًا ، يعد local_runner.py لأغراض التصحيح أو التطوير ولكنه ليس أفضل حل لأعباء العمل الإنتاجية. في هذه الخطوة ، سننشر خط الأنابيب إلى Kubeflow Pipelines على Google Cloud.


نحتاج إلى حزمة kfp python وبرنامج skaffold لنشر خط أنابيب إلى مجموعة خطوط أنابيب Kubeflow.

pip install --upgrade -q kfp

# Download skaffold and set it executable.
curl -Lo skaffold https://storage.googleapis.com/skaffold/releases/latest/skaffold-linux-amd64 && chmod +x skaffold

تحتاج إلى نقل ثنائي skaffold إلى المكان الذي يمكن أن تجده فيه صدفتك. أو يمكنك تحديد المسار إلى skaffold عند تشغيل tfx binary --skaffold-cmd .

# Move skaffold binary into your path
mv skaffold /home/jupyter/.local/bin/
mv: cannot move 'skaffold' to '/home/jupyter/.local/bin/': No such file or directory

تحتاج أيضًا إلى مجموعة خطوط أنابيب Kubeflow لتشغيل خط الأنابيب. يرجى اتباع الخطوتين 1 و 2 في TFX في البرنامج التعليمي لخطوط أنابيب Cloud AI Platform .

عندما تكون مجموعتك جاهزة ، افتح لوحة معلومات خط الأنابيب بالنقر فوق فتح لوحة معلومات خطوط الأنابيب في Pipelines بوحدة التحكم السحابية من Google . عنوان URL لهذه الصفحة هو ENDPOINT لطلب تشغيل خط أنابيب. قيمة نقطة النهاية هي كل شيء في عنوان URL بعد https: // وحتى ، ويتضمن ، googleusercontent.com. ضع نقطة النهاية الخاصة بك على كتلة التعليمات البرمجية التالية.

ENDPOINT='' # Enter your ENDPOINT here.

لتشغيل الكود الخاص بنا في مجموعة خطوط أنابيب Kubeflow ، نحتاج إلى حزم الكود الخاص بنا في صورة حاوية. سيتم إنشاء الصورة تلقائيًا أثناء نشر خط الأنابيب الخاص بنا ، وستحتاج فقط إلى تعيين اسم وسجل حاوية لصورتك. في مثالنا ، سوف نستخدم Google Container Registry ، ونطلق عليه اسم tfx-pipeline .

# Read GCP project id from env.
shell_output=!gcloud config list --format 'value(core.project)' 2>/dev/null

# Docker image name for the pipeline image.
CUSTOM_TFX_IMAGE='gcr.io/' + GOOGLE_CLOUD_PROJECT + '/tfx-pipeline'

تعيين موقع البيانات.

يجب أن تكون بياناتك قابلة للوصول من مجموعة خطوط أنابيب Kubeflow. إذا كنت قد استخدمت البيانات في بيئتك المحلية ، فقد تحتاج إلى تحميلها على التخزين البعيد مثل Google Cloud Storage. على سبيل المثال ، يمكننا تحميل بيانات البطريق إلى حاوية افتراضية يتم إنشاؤها تلقائيًا عند نشر مجموعة خطوط أنابيب Kubeflow كما يلي.

gsutil cp data/data.csv gs://{GOOGLE_CLOUD_PROJECT}-kubeflowpipelines-default/tfx-template/data/penguin/
Copying file://data/data.csv [Content-Type=text/csv]...
NotFoundException: 404 The destination bucket gs://tf-benchmark-dashboard-kubeflowpipelines-default does not exist or the write to the destination must be restarted

قم بتحديث موقع البيانات المخزن في DATA_PATH في kubeflow_runner.py .

إذا كنت تستخدم BigQueryExampleGen ، فلا داعي لتحميل ملف البيانات ، ولكن يُرجى التأكد من أن kubeflow_runner.py يستخدم نفس query beam_pipeline_args pipeline.create_pipeline() .

انشر خط الأنابيب.

إذا كان كل شيء جاهزًا ، يمكنك إنشاء خط أنابيب باستخدام الأمر tfx pipeline create .

!tfx pipeline create  \
--engine=kubeflow \
--pipeline-path=kubeflow_runner.py \
--endpoint={ENDPOINT} \
[Error] --build-target-image flag was DELETED. You should specify the build target image at the `KubeflowDagRunnerConfig` class instead, and use --build-image flag without argument to build a container image when creating or updating a pipeline.

ابدأ الآن تشغيل تنفيذ بخط الأنابيب الذي تم إنشاؤه حديثًا باستخدام الأمر tfx run create .

tfx run create --engine=kubeflow --pipeline-name={PIPELINE_NAME} --endpoint={ENDPOINT}
Creating a run for pipeline: my_pipeline
Failed to load kube config.
Traceback (most recent call last):
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/connection.py", line 175, in _new_conn
    (self._dns_host, self.port), self.timeout, **extra_kw
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/util/connection.py", line 95, in create_connection
    raise err
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/util/connection.py", line 85, in create_connection
ConnectionRefusedError: [Errno 111] Connection refused

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/connectionpool.py", line 710, in urlopen
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/connectionpool.py", line 398, in _make_request
    conn.request(method, url, **httplib_request_kw)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/connection.py", line 239, in request
    super(HTTPConnection, self).request(method, url, body=body, headers=headers)
  File "/usr/lib/python3.7/http/client.py", line 1256, in request
    self._send_request(method, url, body, headers, encode_chunked)
  File "/usr/lib/python3.7/http/client.py", line 1302, in _send_request
    self.endheaders(body, encode_chunked=encode_chunked)
  File "/usr/lib/python3.7/http/client.py", line 1251, in endheaders
    self._send_output(message_body, encode_chunked=encode_chunked)
  File "/usr/lib/python3.7/http/client.py", line 1030, in _send_output
  File "/usr/lib/python3.7/http/client.py", line 970, in send
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/connection.py", line 205, in connect
    conn = self._new_conn()
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/connection.py", line 187, in _new_conn
    self, "Failed to establish a new connection: %s" % e
urllib3.exceptions.NewConnectionError: <urllib3.connection.HTTPConnection object at 0x7ff729e34190>: Failed to establish a new connection: [Errno 111] Connection refused

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/tmpfs/src/tf_docs_env/bin/tfx", line 8, in <module>
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 829, in __call__
    return self.main(*args, **kwargs)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 782, in main
    rv = self.invoke(ctx)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 1259, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 1259, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 1066, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 610, in invoke
    return callback(*args, **kwargs)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/decorators.py", line 73, in new_func
    return ctx.invoke(f, obj, *args, **kwargs)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 610, in invoke
    return callback(*args, **kwargs)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tfx/tools/cli/commands/run.py", line 94, in create_run
    handler = handler_factory.create_handler(ctx.flags_dict)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tfx/tools/cli/handler/handler_factory.py", line 93, in create_handler
    return kubeflow_handler.KubeflowHandler(flags_dict)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tfx/tools/cli/handler/kubeflow_handler.py", line 62, in __init__
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/kfp/_client.py", line 197, in __init__
    if not self._context_setting['namespace'] and self.get_kfp_healthz(
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/kfp/_client.py", line 411, in get_kfp_healthz
    response = self._healthz_api.get_healthz()
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/kfp_server_api/api/healthz_service_api.py", line 63, in get_healthz
    return self.get_healthz_with_http_info(**kwargs)  # noqa: E501
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/kfp_server_api/api/healthz_service_api.py", line 148, in get_healthz_with_http_info
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/kfp_server_api/api_client.py", line 369, in call_api
    _preload_content, _request_timeout, _host)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/kfp_server_api/api_client.py", line 185, in __call_api
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/kfp_server_api/api_client.py", line 393, in request
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/kfp_server_api/rest.py", line 234, in GET
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/kfp_server_api/rest.py", line 212, in request
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/request.py", line 75, in request
    method, url, fields=fields, headers=headers, **urlopen_kw
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/request.py", line 96, in request_encode_url
    return self.urlopen(method, url, **extra_kw)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/poolmanager.py", line 375, in urlopen
    response = conn.urlopen(method, u.request_uri, **kw)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/connectionpool.py", line 826, in urlopen
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/connectionpool.py", line 826, in urlopen
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/connectionpool.py", line 826, in urlopen
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/connectionpool.py", line 786, in urlopen
    method, url, error=e, _pool=self, _stacktrace=sys.exc_info()[2]
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/util/retry.py", line 592, in increment
    raise MaxRetryError(_pool, url, error or ResponseError(cause))
urllib3.exceptions.MaxRetryError: HTTPConnectionPool(host='localhost', port=80): Max retries exceeded with url: /apis/v1beta1/healthz (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7ff729e34190>: Failed to establish a new connection: [Errno 111] Connection refused'))

أو يمكنك أيضًا تشغيل خط الأنابيب في لوحة معلومات Kubeflow Pipelines. سيتم إدراج التشغيل الجديد ضمن Experiments في لوحة معلومات Kubeflow Pipelines. سيسمح لك النقر فوق التجربة بمراقبة التقدم وتصور القطع الأثرية التي تم إنشاؤها أثناء تشغيل التنفيذ.

إذا كنت مهتمًا بتشغيل خط الأنابيب الخاص بك على Kubeflow Pipelines ، فابحث عن المزيد من الإرشادات في TFX في البرنامج التعليمي Cloud AI Platform Pipelines .


لتنظيف جميع موارد Google Cloud المستخدمة في هذه الخطوة ، يمكنك حذف مشروع Google Cloud الذي استخدمته في البرنامج التعليمي.

بدلاً من ذلك ، يمكنك تنظيف الموارد الفردية من خلال زيارة كل وحدة تحكم: