Construindo componentes totalmente personalizados

Este guia descreve como usar a API do TFX para criar um componente totalmente personalizado. Componentes totalmente personalizados permitem que você crie componentes definindo a especificação do componente, o executor e as classes de interface do componente. Essa abordagem permite reutilizar e estender um componente padrão para atender às suas necessidades.

Se você é novo nos pipelines do TFX, saiba mais sobre os principais conceitos dos pipelines do TFX .

Executor personalizado ou componente personalizado

Se apenas a lógica de processamento personalizada for necessária enquanto as entradas, saídas e propriedades de execução do componente forem as mesmas de um componente existente, um executor personalizado será suficiente. Um componente totalmente personalizado é necessário quando qualquer uma das entradas, saídas ou propriedades de execução são diferentes de qualquer componente TFX existente.

Como criar um componente personalizado?

Desenvolver um componente totalmente personalizado requer:

  • Um conjunto definido de especificações de artefatos de entrada e saída para o novo componente. Especialmente, os tipos de artefatos de entrada devem ser consistentes com os tipos de artefatos de saída dos componentes que produzem os artefatos e os tipos de artefatos de saída devem ser consistentes com os tipos de artefatos de entrada dos componentes que consomem os artefatos, se houver.
  • Os parâmetros de execução não artefato que são necessários para o novo componente.

ComponentSpec

A classe ComponentSpec define o contrato do componente definindo os artefatos de entrada e saída para um componente, bem como os parâmetros que são usados ​​para a execução do componente. Tem três partes:

  • INPUTS : Um dicionário de parâmetros tipados para os artefatos de entrada que são passados ​​para o executor do componente. Normalmente, os artefatos de entrada são as saídas dos componentes upstream e, portanto, compartilham o mesmo tipo.
  • OUTPUTS : Um dicionário de parâmetros tipados para os artefatos de saída que o componente produz.
  • PARAMETERS : Um dicionário de itens ExecutionParameter adicionais que serão passados ​​para o executor do componente. Esses são parâmetros não artefatos que queremos definir de forma flexível no pipeline DSL e passar para execução.

Aqui está um exemplo do ComponentSpec:

class HelloComponentSpec(types.ComponentSpec):
  """ComponentSpec for Custom TFX Hello World Component."""

  PARAMETERS = {
      # These are parameters that will be passed in the call to
      # create an instance of this component.
      'name': ExecutionParameter(type=Text),
  }
  INPUTS = {
      # This will be a dictionary with input artifacts, including URIs
      'input_data': ChannelParameter(type=standard_artifacts.Examples),
  }
  OUTPUTS = {
      # This will be a dictionary which this component will populate
      'output_data': ChannelParameter(type=standard_artifacts.Examples),
  }

Executor

Em seguida, escreva o código executor para o novo componente. Basicamente, uma nova subclasse de base_executor.BaseExecutor precisa ser criada com sua função Do substituída. Na função Do , os argumentos input_dict , output_dict e exec_properties que são passados ​​no map para INPUTS , OUTPUTS e PARAMETERS que são definidos em ComponentSpec respectivamente. Para exec_properties , o valor pode ser obtido diretamente por meio de uma pesquisa de dicionário. Para artefatos em input_dict e output_dict , existem funções convenientes disponíveis na classe artisan_utils que podem ser usadas para buscar instância de artefato ou uri de artefato.

class Executor(base_executor.BaseExecutor):
  """Executor for HelloComponent."""

  def Do(self, input_dict: Dict[Text, List[types.Artifact]],
         output_dict: Dict[Text, List[types.Artifact]],
         exec_properties: Dict[Text, Any]) -> None:
    ...

    split_to_instance = {}
    for artifact in input_dict['input_data']:
      for split in json.loads(artifact.split_names):
        uri = artifact_utils.get_split_uri([artifact], split)
        split_to_instance[split] = uri

    for split, instance in split_to_instance.items():
      input_dir = instance
      output_dir = artifact_utils.get_split_uri(
          output_dict['output_data'], split)
      for filename in tf.io.gfile.listdir(input_dir):
        input_uri = os.path.join(input_dir, filename)
        output_uri = os.path.join(output_dir, filename)
        io_utils.copy_file(src=input_uri, dst=output_uri, overwrite=True)

Teste de unidade de um executor personalizado

Testes de unidade para o executor personalizado podem ser criados semelhantes a este .

Interface do componente

Agora que a parte mais complexa está concluída, a próxima etapa é montar essas peças em uma interface de componente, para permitir que o componente seja usado em um pipeline. Existem vários passos:

  • Torne a interface do componente uma subclasse de base_component.BaseComponent
  • Atribua uma variável de classe SPEC_CLASS com a classe ComponentSpec que foi definida anteriormente
  • Atribua uma variável de classe EXECUTOR_SPEC com a classe Executor que foi definida anteriormente
  • Defina a função construtora __init__() usando os argumentos da função para construir uma instância da classe ComponentSpec e invocar a superfunção com esse valor, juntamente com um nome opcional

Quando uma instância do componente é criada, a lógica de verificação de tipo na classe base_component.BaseComponent será invocada para garantir que os argumentos que foram passados ​​sejam compatíveis com as informações de tipo definidas na classe ComponentSpec .

from tfx.types import standard_artifacts
from hello_component import executor

class HelloComponent(base_component.BaseComponent):
  """Custom TFX Hello World Component."""

  SPEC_CLASS = HelloComponentSpec
  EXECUTOR_SPEC = executor_spec.ExecutorClassSpec(executor.Executor)

  def __init__(self,
               input_data: types.Channel = None,
               output_data: types.Channel = None,
               name: Optional[Text] = None):
    if not output_data:
      examples_artifact = standard_artifacts.Examples()
      examples_artifact.split_names = input_data.get()[0].split_names
      output_data = channel_utils.as_channel([examples_artifact])

    spec = HelloComponentSpec(input_data=input_data,
                              output_data=output_data, name=name)
    super(HelloComponent, self).__init__(spec=spec)

Montar em um pipeline TFX

A última etapa é conectar o novo componente personalizado a um pipeline do TFX. Além de adicionar uma instância do novo componente, o seguinte também é necessário:

  • Conecte corretamente os componentes upstream e downstream do novo componente a ele. Isso é feito referenciando as saídas do componente upstream no novo componente e referenciando as saídas do novo componente nos componentes downstream
  • Adicione a nova instância do componente à lista de componentes ao construir o pipeline.

O exemplo abaixo destaca as alterações mencionadas. O exemplo completo pode ser encontrado no repositório TFX GitHub .

def _create_pipeline():
  ...
  example_gen = CsvExampleGen(input_base=examples)
  hello = component.HelloComponent(
      input_data=example_gen.outputs['examples'], name='HelloWorld')
  statistics_gen = StatisticsGen(examples=hello.outputs['output_data'])
  ...
  return pipeline.Pipeline(
      ...
      components=[example_gen, hello, statistics_gen, ...],
      ...
  )

Implante um componente totalmente personalizado

Além das alterações de código, todas as partes recém-adicionadas ( ComponentSpec , Executor , interface do componente ) precisam estar acessíveis no ambiente de execução do pipeline para executar o pipeline corretamente.