Odczytywanie danych z BigQuery za pomocą TFX i Vertex Pipelines

Ten samouczek oparty na notatnikach użyje Google Cloud BigQuery jako źródła danych do trenowania modelu ML. Potok ML zostanie zbudowany przy użyciu TFX i będzie działał na Google Cloud Vertex Pipelines.

Ten notatnik jest oparty na potoku TFX, który zbudowaliśmy w samouczku Simple TFX Pipeline dla Vertex Pipelines . Jeśli jeszcze nie przeczytałeś tego samouczka, powinieneś go przeczytać przed kontynuowaniem tego notatnika.

BigQuery to bezserwerowa, wysoce skalowalna i ekonomiczna wielochmurowa hurtownia danych zaprojektowana z myślą o elastyczności biznesowej. TFX może służyć do odczytywania danych treningowych z BigQuery i publikowania trenowanego modelu w BigQuery.

W tym samouczku użyjemy komponentu BigQueryExampleGen , który odczytuje dane z BigQuery do potoków TFX.

Ten notatnik jest przeznaczony do uruchamiania w Google Colab lub w notatnikach AI Platform . Jeśli nie korzystasz z żadnego z nich, możesz po prostu kliknąć przycisk „Uruchom w Google Colab” powyżej.

Ustawiać

Jeśli ukończyłeś samouczek Simple TFX Pipeline for Vertex Pipelines , będziesz mieć działający projekt GCP i zasobnik GCS i to wszystko, czego potrzebujemy do tego samouczka. Przeczytaj najpierw wstępny samouczek, jeśli go przegapiłeś.

Zainstaluj pakiety Pythona

Zainstalujemy wymagane pakiety Pythona, w tym TFX i KFP, do tworzenia potoków ML i przesyłania zadań do Vertex Pipelines.

# Use the latest version of pip.
pip install --upgrade pip
pip install --upgrade "tfx[kfp]<2"

Czy uruchomiłeś ponownie środowisko wykonawcze?

Jeśli korzystasz z Google Colab, przy pierwszym uruchomieniu powyższej komórki musisz ponownie uruchomić środowisko wykonawcze, klikając powyżej przycisk „RESTART RUNTIME” lub korzystając z menu „Runtime > Restart runtime...”. Wynika to ze sposobu, w jaki Colab ładuje paczki.

Jeśli nie korzystasz z Colab, możesz ponownie uruchomić środowisko uruchomieniowe za pomocą następującej komórki.

# docs_infra: no_execute
import sys
if not 'google.colab' in sys.modules:
 # Automatically restart kernel after installs
 import IPython
 app = IPython.Application.instance()
 app.kernel.do_shutdown(True)

Zaloguj się do Google dla tego notatnika

Jeśli używasz tego notatnika w Colab, uwierzytelnij się za pomocą swojego konta użytkownika:

import sys
if 'google.colab' in sys.modules:
 from google.colab import auth
 auth.authenticate_user()

Jeśli korzystasz z Notatników AI Platform , uwierzytelnij się w Google Cloud przed uruchomieniem następnej sekcji, uruchamiając

gcloud auth login

w oknie Terminala (które można otworzyć w menu Plik > Nowy ). Wystarczy to zrobić tylko raz na instancję notatnika.

Sprawdź wersje pakietów.

import tensorflow as tf
print('TensorFlow version: {}'.format(tf.__version__))
from tfx import v1 as tfx
print('TFX version: {}'.format(tfx.__version__))
import kfp
print('KFP version: {}'.format(kfp.__version__))
TensorFlow version: 2.7.1
TFX version: 1.6.0
KFP version: 1.8.11

Ustaw zmienne

Poniżej skonfigurujemy kilka zmiennych używanych do dostosowania potoków. Wymagane są następujące informacje:

Wprowadź wymagane wartości w komórce poniżej przed uruchomieniem .

GOOGLE_CLOUD_PROJECT = ''     # <--- ENTER THIS
GOOGLE_CLOUD_PROJECT_NUMBER = '' # <--- ENTER THIS
GOOGLE_CLOUD_REGION = ''     # <--- ENTER THIS
GCS_BUCKET_NAME = ''       # <--- ENTER THIS

if not (GOOGLE_CLOUD_PROJECT and GOOGLE_CLOUD_PROJECT_NUMBER and GOOGLE_CLOUD_REGION and GCS_BUCKET_NAME):
  from absl import logging
  logging.error('Please set all required parameters.')
ERROR:absl:Please set all required parameters.

Ustaw gcloud , aby używał Twojego projektu.

gcloud config set project {GOOGLE_CLOUD_PROJECT}
ERROR: (gcloud.config.set) argument VALUE: Must be specified.
Usage: gcloud config set SECTION/PROPERTY VALUE [optional flags]
 optional flags may be --help | --installation

For detailed information on this command and its flags, run:
 gcloud config set --help
PIPELINE_NAME = 'penguin-bigquery'

# Path to various pipeline artifact.
PIPELINE_ROOT = 'gs://{}/pipeline_root/{}'.format(
  GCS_BUCKET_NAME, PIPELINE_NAME)

# Paths for users' Python module.
MODULE_ROOT = 'gs://{}/pipeline_module/{}'.format(
  GCS_BUCKET_NAME, PIPELINE_NAME)

# Paths for users' data.
DATA_ROOT = 'gs://{}/data/{}'.format(GCS_BUCKET_NAME, PIPELINE_NAME)

# This is the path where your model will be pushed for serving.
SERVING_MODEL_DIR = 'gs://{}/serving_model/{}'.format(
  GCS_BUCKET_NAME, PIPELINE_NAME)

print('PIPELINE_ROOT: {}'.format(PIPELINE_ROOT))
PIPELINE_ROOT: gs:///pipeline_root/penguin-bigquery

Domyślnie Vertex Pipelines używa domyślnego konta usługi maszyny wirtualnej GCE w formacie [project-number]-compute@developer.gserviceaccount.com . Musimy zezwolić na używanie BigQuery na tym koncie, aby uzyskać dostęp do BigQuery w potoku. Dodamy do konta rolę „Użytkownik BigQuery”.

!gcloud projects add-iam-policy-binding {GOOGLE_CLOUD_PROJECT} \
 --member=serviceAccount:{GOOGLE_CLOUD_PROJECT_NUMBER}-compute@developer.gserviceaccount.com \
 --role=roles/bigquery.user
ERROR: (gcloud.projects.add-iam-policy-binding) argument PROJECT_ID: Must be specified.
Usage: gcloud projects add-iam-policy-binding PROJECT_ID --member=PRINCIPAL --role=ROLE [optional flags]
 optional flags may be --condition | --condition-from-file | --help

For detailed information on this command and its flags, run:
 gcloud projects add-iam-policy-binding --help

Zapoznaj się z dokumentacją Vertex, aby dowiedzieć się więcej o kontach usług i konfiguracji uprawnień.

Utwórz potok

Potoki TFX są definiowane przy użyciu interfejsów API Pythona, tak jak to zrobiliśmy w samouczku Simple TFX Pipeline for Vertex Pipelines . Wcześniej używaliśmy CsvExampleGen , który odczytuje dane z pliku CSV. W tym samouczku użyjemy komponentu BigQueryExampleGen , który odczytuje dane z BigQuery.

Przygotuj zapytanie BigQuery

Użyjemy tego samego zbioru danych Palmer Penguins . Jednak odczytamy go z tabeli BigQuery tfx-oss-public.palmer_penguins.palmer_penguins , która jest wypełniona tym samym plikiem CSV.

Jeśli korzystasz z Google Colab, możesz bezpośrednio sprawdzić zawartość tabeli BigQuery.

# docs_infra: no_execute
%%bigquery --project {GOOGLE_CLOUD_PROJECT}
SELECT *
FROM `tfx-oss-public.palmer_penguins.palmer_penguins`
LIMIT 5

Wszystkie cechy zostały już znormalizowane do 0~1 z wyjątkiem species , którym jest etykieta. Zbudujemy model klasyfikacyjny, który przewiduje species pingwinów.

BigQueryExampleGen wymaga zapytania, aby określić, które dane mają zostać pobrane. Ponieważ użyjemy wszystkich pól ze wszystkich wierszy w tabeli, zapytanie jest dość proste. W razie potrzeby możesz też określić nazwy pól i dodać warunki WHERE zgodnie ze składnią BigQuery Standard SQL .

QUERY = "SELECT * FROM `tfx-oss-public.palmer_penguins.palmer_penguins`"

Napisz kod modelu.

Użyjemy tego samego kodu modelu, co w samouczku Simple TFX Pipeline .

_trainer_module_file = 'penguin_trainer.py'
%%writefile {_trainer_module_file}

# Copied from https://www.tensorflow.org/tfx/tutorials/tfx/penguin_simple

from typing import List
from absl import logging
import tensorflow as tf
from tensorflow import keras
from tensorflow_transform.tf_metadata import schema_utils

from tfx import v1 as tfx
from tfx_bsl.public import tfxio

from tensorflow_metadata.proto.v0 import schema_pb2

_FEATURE_KEYS = [
  'culmen_length_mm', 'culmen_depth_mm', 'flipper_length_mm', 'body_mass_g'
]
_LABEL_KEY = 'species'

_TRAIN_BATCH_SIZE = 20
_EVAL_BATCH_SIZE = 10

# Since we're not generating or creating a schema, we will instead create
# a feature spec. Since there are a fairly small number of features this is
# manageable for this dataset.
_FEATURE_SPEC = {
  **{
    feature: tf.io.FixedLenFeature(shape=[1], dtype=tf.float32)
      for feature in _FEATURE_KEYS
    },
  _LABEL_KEY: tf.io.FixedLenFeature(shape=[1], dtype=tf.int64)
}


def _input_fn(file_pattern: List[str],
       data_accessor: tfx.components.DataAccessor,
       schema: schema_pb2.Schema,
       batch_size: int) -> tf.data.Dataset:
 """Generates features and label for training.

 Args:
  file_pattern: List of paths or patterns of input tfrecord files.
  data_accessor: DataAccessor for converting input to RecordBatch.
  schema: schema of the input data.
  batch_size: representing the number of consecutive elements of returned
   dataset to combine in a single batch

 Returns:
  A dataset that contains (features, indices) tuple where features is a
   dictionary of Tensors, and indices is a single Tensor of label indices.
 """
 return data_accessor.tf_dataset_factory(
   file_pattern,
   tfxio.TensorFlowDatasetOptions(
     batch_size=batch_size, label_key=_LABEL_KEY),
   schema=schema).repeat()


def _make_keras_model() -> tf.keras.Model:
 """Creates a DNN Keras model for classifying penguin data.

 Returns:
  A Keras Model.
 """
 # The model below is built with Functional API, please refer to
 # https://www.tensorflow.org/guide/keras/overview for all API options.
 inputs = [keras.layers.Input(shape=(1,), name=f) for f in _FEATURE_KEYS]
 d = keras.layers.concatenate(inputs)
 for _ in range(2):
  d = keras.layers.Dense(8, activation='relu')(d)
 outputs = keras.layers.Dense(3)(d)

 model = keras.Model(inputs=inputs, outputs=outputs)
 model.compile(
   optimizer=keras.optimizers.Adam(1e-2),
   loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
   metrics=[keras.metrics.SparseCategoricalAccuracy()])

 model.summary(print_fn=logging.info)
 return model


# TFX Trainer will call this function.
def run_fn(fn_args: tfx.components.FnArgs):
 """Train the model based on given args.

 Args:
  fn_args: Holds args used to train the model as name/value pairs.
 """

 # This schema is usually either an output of SchemaGen or a manually-curated
 # version provided by pipeline author. A schema can also derived from TFT
 # graph if a Transform component is used. In the case when either is missing,
 # `schema_from_feature_spec` could be used to generate schema from very simple
 # feature_spec, but the schema returned would be very primitive.
 schema = schema_utils.schema_from_feature_spec(_FEATURE_SPEC)

 train_dataset = _input_fn(
   fn_args.train_files,
   fn_args.data_accessor,
   schema,
   batch_size=_TRAIN_BATCH_SIZE)
 eval_dataset = _input_fn(
   fn_args.eval_files,
   fn_args.data_accessor,
   schema,
   batch_size=_EVAL_BATCH_SIZE)

 model = _make_keras_model()
 model.fit(
   train_dataset,
   steps_per_epoch=fn_args.train_steps,
   validation_data=eval_dataset,
   validation_steps=fn_args.eval_steps)

 # The result of the training should be saved in `fn_args.serving_model_dir`
 # directory.
 model.save(fn_args.serving_model_dir, save_format='tf')
Writing penguin_trainer.py

Skopiuj plik modułu do GCS, do którego można uzyskać dostęp z komponentów potoku. Ponieważ trenowanie modelu odbywa się w GCP, musimy przesłać tę definicję modelu.

W przeciwnym razie możesz chcieć skompilować obraz kontenera zawierający plik modułu i użyć obrazu do uruchomienia potoku.

gsutil cp {_trainer_module_file} {MODULE_ROOT}/
InvalidUrlError: Cloud URL scheme should be followed by colon and two slashes: "://". Found: "gs:///pipeline_module/penguin-bigquery/".

Napisz definicję potoku

Zdefiniujemy funkcję do tworzenia potoku TFX. Musimy użyć BigQueryExampleGen , który przyjmuje query jako argument. Jeszcze jedna zmiana w stosunku do poprzedniego samouczka polega na tym, że musimy przekazać beam_pipeline_args , które są przekazywane do komponentów podczas ich wykonywania. Użyjemy beam_pipeline_args , aby przekazać dodatkowe parametry do BigQuery.

from typing import List, Optional

def _create_pipeline(pipeline_name: str, pipeline_root: str, query: str,
           module_file: str, serving_model_dir: str,
           beam_pipeline_args: Optional[List[str]],
           ) -> tfx.dsl.Pipeline:
 """Creates a TFX pipeline using BigQuery."""

 # NEW: Query data in BigQuery as a data source.
 example_gen = tfx.extensions.google_cloud_big_query.BigQueryExampleGen(
   query=query)

 # Uses user-provided Python function that trains a model.
 trainer = tfx.components.Trainer(
   module_file=module_file,
   examples=example_gen.outputs['examples'],
   train_args=tfx.proto.TrainArgs(num_steps=100),
   eval_args=tfx.proto.EvalArgs(num_steps=5))

 # Pushes the model to a file destination.
 pusher = tfx.components.Pusher(
   model=trainer.outputs['model'],
   push_destination=tfx.proto.PushDestination(
     filesystem=tfx.proto.PushDestination.Filesystem(
       base_directory=serving_model_dir)))

 components = [
   example_gen,
   trainer,
   pusher,
 ]

 return tfx.dsl.Pipeline(
   pipeline_name=pipeline_name,
   pipeline_root=pipeline_root,
   components=components,
   # NEW: `beam_pipeline_args` is required to use BigQueryExampleGen.
   beam_pipeline_args=beam_pipeline_args)

Uruchom potok na Vertex Pipelines.

Użyjemy Vertex Pipelines do uruchomienia potoku, tak jak to zrobiliśmy w samouczku Simple TFX Pipeline dla Vertex Pipelines .

Musimy również przekazać beam_pipeline_args dla BigQueryExampleGen. Zawiera konfiguracje, takie jak nazwa projektu GCP i tymczasowe miejsce na wykonanie BigQuery.

import os

# We need to pass some GCP related configs to BigQuery. This is currently done
# using `beam_pipeline_args` parameter.
BIG_QUERY_WITH_DIRECT_RUNNER_BEAM_PIPELINE_ARGS = [
  '--project=' + GOOGLE_CLOUD_PROJECT,
  '--temp_location=' + os.path.join('gs://', GCS_BUCKET_NAME, 'tmp'),
  ]

PIPELINE_DEFINITION_FILE = PIPELINE_NAME + '_pipeline.json'

runner = tfx.orchestration.experimental.KubeflowV2DagRunner(
  config=tfx.orchestration.experimental.KubeflowV2DagRunnerConfig(),
  output_filename=PIPELINE_DEFINITION_FILE)
_ = runner.run(
  _create_pipeline(
    pipeline_name=PIPELINE_NAME,
    pipeline_root=PIPELINE_ROOT,
    query=QUERY,
    module_file=os.path.join(MODULE_ROOT, _trainer_module_file),
    serving_model_dir=SERVING_MODEL_DIR,
    beam_pipeline_args=BIG_QUERY_WITH_DIRECT_RUNNER_BEAM_PIPELINE_ARGS))

Wygenerowany plik definicji można przesłać za pomocą klienta kfp.

# docs_infra: no_execute
from google.cloud import aiplatform
from google.cloud.aiplatform import pipeline_jobs

aiplatform.init(project=GOOGLE_CLOUD_PROJECT, location=GOOGLE_CLOUD_REGION)

job = pipeline_jobs.PipelineJob(template_path=PIPELINE_DEFINITION_FILE,
                display_name=PIPELINE_NAME)
job.run(sync=False)

Teraz możesz odwiedzić „Vertex AI > Pipelines” w Google Cloud Console , aby zobaczyć postęp.