Tutorial do componente de função TFX Python

Este bloco de notas contém exemplos de como criar e executar componentes de função Python dentro do TFX InteractiveContext e em um pipeline TFX orquestrado localmente.

Para mais contexto e informações, consulte a componentes função personalizada Python página no site de documentação TFX.

Configurar

Primeiro instalaremos o TFX e importaremos os módulos necessários. TFX requer Python 3.

Verifique a versão do sistema Python

import sys
sys.version
'3.7.5 (default, Feb 23 2021, 13:22:40) \n[GCC 8.4.0]'

Atualizar Pip

Para evitar a atualização do Pip em um sistema quando executado localmente, verifique se estamos executando no Colab. É claro que os sistemas locais podem ser atualizados separadamente.

try:
  import colab
  !pip install --upgrade pip
except:
  pass

Instale TFX

pip install -U tfx

Você reiniciou o tempo de execução?

Se você estiver usando o Google Colab, na primeira vez que executar a célula acima, será necessário reiniciar o tempo de execução (Tempo de execução> Reiniciar tempo de execução ...). Isso ocorre devido à maneira como o Colab carrega os pacotes.

Pacotes de importação

Importamos TFX e verificamos sua versão.

# Check version
from tfx import v1 as tfx
tfx.__version__
'1.4.0'

Componentes de função Python personalizados

Nesta seção, criaremos componentes a partir de funções Python. Não estaremos resolvendo nenhum problema real de ML - essas funções simples são usadas apenas para ilustrar o processo de desenvolvimento do componente de função do Python.

Veja função Python guia baseado em componentes para mais documentação.

Crie componentes personalizados do Python

Começamos escrevendo uma função que gera alguns dados fictícios. Isso é escrito em seu próprio arquivo de módulo Python.

%%writefile my_generator.py

import os
import tensorflow as tf  # Used for writing files.

from tfx import v1 as tfx

# Non-public APIs, just for showcase.
from tfx.types.experimental.simple_artifacts import Dataset

@tfx.dsl.components.component
def MyGenerator(data: tfx.dsl.components.OutputArtifact[Dataset]):
  """Create a file with dummy data in the output artifact."""
  with tf.io.gfile.GFile(os.path.join(data.uri, 'data_file.txt'), 'w') as f:
    f.write('Dummy data')

  # Set metadata and ensure that it gets passed to downstream components.
  data.set_string_custom_property('my_custom_field', 'my_custom_value')
Writing my_generator.py

Em seguida, escrevemos um segundo componente que usa os dados fictícios produzidos. Vamos apenas calcular o hash dos dados e retorná-lo.

%%writefile my_consumer.py

import hashlib
import os
import tensorflow as tf

from tfx import v1 as tfx

# Non-public APIs, just for showcase.
from tfx.types.experimental.simple_artifacts import Dataset
from tfx.types.standard_artifacts import String

@tfx.dsl.components.component
def MyConsumer(data: tfx.dsl.components.InputArtifact[Dataset],
               hash: tfx.dsl.components.OutputArtifact[String],
               algorithm: tfx.dsl.components.Parameter[str] = 'sha256'):
  """Reads the contents of data and calculate."""
  with tf.io.gfile.GFile(
      os.path.join(data.uri, 'data_file.txt'), 'r') as f:
    contents = f.read()
  h = hashlib.new(algorithm)
  h.update(tf.compat.as_bytes(contents))
  hash.value = h.hexdigest()

  # Read a custom property from the input artifact and set to the output.
  custom_value = data.get_string_custom_property('my_custom_field')
  hash.set_string_custom_property('input_custom_field', custom_value)
Writing my_consumer.py

Execute no notebook com o InteractiveContext

Agora, vamos demonstrar o uso de nossos novos componentes no TFX InteractiveContext.

Para mais informações sobre o que você pode fazer com o notebook TFX InteractiveContext, ver a in-notebook TFX Keras Tutorial Component .

from my_generator import MyGenerator
from my_consumer import MyConsumer

Construir o InteractiveContext

# Here, we create an InteractiveContext using default parameters. This will
# use a temporary directory with an ephemeral ML Metadata database instance.
# To use your own pipeline root or database, the optional properties
# `pipeline_root` and `metadata_connection_config` may be passed to
# InteractiveContext. Calls to InteractiveContext are no-ops outside of the
# notebook.
from tfx.orchestration.experimental.interactive.interactive_context import InteractiveContext
context = InteractiveContext()
WARNING:absl:InteractiveContext pipeline_root argument not provided: using temporary directory /tmp/tfx-interactive-2021-12-05T10_37_04.715534-3q0k1y0m as root for pipeline outputs.
WARNING:absl:InteractiveContext metadata_connection_config not provided: using SQLite ML Metadata database at /tmp/tfx-interactive-2021-12-05T10_37_04.715534-3q0k1y0m/metadata.sqlite.

Executar o seu componente interativa com context.run()

Em seguida, corremos nossos componentes de forma interativa dentro do notebook com context.run() . Nosso componente consumidor usa as saídas do componente gerador.

generator = MyGenerator()
context.run(generator)
WARNING: Logging before InitGoogleLogging() is written to STDERR
I1205 10:37:04.765872 28682 rdbms_metadata_access_object.cc:686] No property is defined for the Type
consumer = MyConsumer(
    data=generator.outputs['data'],
    algorithm='md5')
context.run(consumer)
I1205 10:37:04.808555 28682 rdbms_metadata_access_object.cc:686] No property is defined for the Type

Após a execução, podemos inspecionar o conteúdo do artefato de saída "hash" do componente consumidor no disco.

tail -v {consumer.outputs['hash'].get()[0].uri}
==> /tmp/tfx-interactive-2021-12-05T10_37_04.715534-3q0k1y0m/MyConsumer/hash/2/value <==
0015fe7975d1a2794b59aa12635703f1

É isso, e agora você escreveu e executou seus próprios componentes personalizados!

Escreva uma definição de pipeline

A seguir, criaremos um pipeline usando esses mesmos componentes. Enquanto estiver usando o InteractiveContext dentro de um notebook funciona bem para a experimentação, a definição de um gasoduto permite implantar seu pipeline em corredores locais ou remotos para uso de produção.

Aqui, vamos demonstrar o uso do LocalDagRunner rodando localmente em sua máquina. Para execução de produção, os runners Airflow ou Kubeflow podem ser mais adequados.

Construir um pipeline

import os
import tempfile
from tfx import v1 as tfx

# Select a persistent TFX root directory to store your output artifacts.
# For demonstration purposes only, we use a temporary directory.
PIPELINE_ROOT = tempfile.mkdtemp()
# Select a pipeline name so that multiple runs of the same logical pipeline
# can be grouped.
PIPELINE_NAME = "function-based-pipeline"
# We use a ML Metadata configuration that uses a local SQLite database in
# the pipeline root directory. Other backends for ML Metadata are available
# for production usage.
METADATA_CONNECTION_CONFIG = tfx.orchestration.metadata.sqlite_metadata_connection_config(
    os.path.join(PIPELINE_ROOT, 'metadata.sqlite'))

def function_based_pipeline():
  # Here, we construct our generator and consumer components in the same way.
  generator = MyGenerator()
  consumer = MyConsumer(
      data=generator.outputs['data'],
      algorithm='md5')

  return tfx.dsl.Pipeline(
      pipeline_name=PIPELINE_NAME,
      pipeline_root=PIPELINE_ROOT,
      components=[generator, consumer],
      metadata_connection_config=METADATA_CONNECTION_CONFIG)

my_pipeline = function_based_pipeline()

Execute o seu pipeline com o LocalDagRunner

tfx.orchestration.LocalDagRunner().run(my_pipeline)
I1205 10:37:04.983860 28682 rdbms_metadata_access_object.cc:686] No property is defined for the Type
I1205 10:37:04.990442 28682 rdbms_metadata_access_object.cc:686] No property is defined for the Type
I1205 10:37:04.996665 28682 rdbms_metadata_access_object.cc:686] No property is defined for the Type
I1205 10:37:05.003470 28682 rdbms_metadata_access_object.cc:686] No property is defined for the Type
I1205 10:37:05.013659 28682 rdbms_metadata_access_object.cc:686] No property is defined for the Type
I1205 10:37:05.031374 28682 rdbms_metadata_access_object.cc:686] No property is defined for the Type
I1205 10:37:05.048280 28682 rdbms_metadata_access_object.cc:686] No property is defined for the Type
I1205 10:37:05.067972 28682 rdbms_metadata_access_object.cc:686] No property is defined for the Type

Podemos inspecionar os artefatos de saída gerados por esta execução de pipeline.

find {PIPELINE_ROOT}
/tmp/tmpydmun02b
/tmp/tmpydmun02b/metadata.sqlite
/tmp/tmpydmun02b/MyConsumer
/tmp/tmpydmun02b/MyConsumer/.system
/tmp/tmpydmun02b/MyConsumer/.system/executor_execution
/tmp/tmpydmun02b/MyConsumer/.system/executor_execution/2
/tmp/tmpydmun02b/MyConsumer/hash
/tmp/tmpydmun02b/MyConsumer/hash/2
/tmp/tmpydmun02b/MyConsumer/hash/2/value
/tmp/tmpydmun02b/MyGenerator
/tmp/tmpydmun02b/MyGenerator/data
/tmp/tmpydmun02b/MyGenerator/data/1
/tmp/tmpydmun02b/MyGenerator/data/1/data_file.txt
/tmp/tmpydmun02b/MyGenerator/.system
/tmp/tmpydmun02b/MyGenerator/.system/executor_execution
/tmp/tmpydmun02b/MyGenerator/.system/executor_execution/1

Agora você escreveu seus próprios componentes personalizados e orquestrou sua execução no LocalDagRunner! Para as próximas etapas, veja tutoriais e guias adicionais no site TFX .