Ajuda a proteger a Grande Barreira de Corais com TensorFlow em Kaggle Junte Desafio

Construindo Componentes Totalmente Customizados

Este guia descreve como usar a API TFX para construir um componente totalmente personalizado. Componentes totalmente personalizados permitem que você crie componentes definindo a especificação do componente, o executor e as classes de interface do componente. Essa abordagem permite reutilizar e estender um componente padrão para atender às suas necessidades.

Se você é novo para pipelines TFX, aprender mais sobre os conceitos fundamentais de dutos TFX .

Executor personalizado ou componente personalizado

Se apenas a lógica de processamento customizada for necessária enquanto as entradas, saídas e propriedades de execução do componente são as mesmas de um componente existente, um executor customizado é suficiente. Um componente totalmente customizado é necessário quando qualquer uma das entradas, saídas ou propriedades de execução são diferentes de quaisquer componentes TFX existentes.

Como criar um componente customizado?

O desenvolvimento de um componente totalmente personalizado requer:

  • Um conjunto definido de especificações de artefato de entrada e saída para o novo componente. Especialmente, os tipos para os artefatos de entrada devem ser consistentes com os tipos de artefatos de saída dos componentes que produzem os artefatos e os tipos para os artefatos de saída devem ser consistentes com os tipos de artefatos de entrada dos componentes que consomem os artefatos, se houver.
  • Os parâmetros de execução de não artefato necessários para o novo componente.

ComponentSpec

O ComponentSpec classe define o contrato componente, definindo os artefactos de entrada e de saída para um componente, bem como os parâmetros que são utilizados para a execução do componente. Existem três partes nele:

  • ENTRADAS: Um dicionário de parâmetros digitados para os artefatos de entrada que são para o executor componente. Normalmente, os artefatos de entrada são as saídas dos componentes upstream e, portanto, compartilham o mesmo tipo.
  • SAÍDAS: um dicionário de parâmetros digitados para os artefatos de saída que o componente produz.
  • PARÂMETROS: Um dicionário de adicionais ExecutionParameter itens que serão passados para o executor componente. Esses são parâmetros não artefatos que queremos definir de maneira flexível na DSL do pipeline e passar para a execução.

Aqui está um exemplo de ComponentSpec:

class HelloComponentSpec(types.ComponentSpec):
  """ComponentSpec for Custom TFX Hello World Component."""

  PARAMETERS = {
      # These are parameters that will be passed in the call to
      # create an instance of this component.
      'name': ExecutionParameter(type=Text),
  }
  INPUTS = {
      # This will be a dictionary with input artifacts, including URIs
      'input_data': ChannelParameter(type=standard_artifacts.Examples),
  }
  OUTPUTS = {
      # This will be a dictionary which this component will populate
      'output_data': ChannelParameter(type=standard_artifacts.Examples),
  }

Executor

Em seguida, escreva o código do executor para o novo componente. Basicamente, uma nova subclasse de base_executor.BaseExecutor precisa ser criado com a sua Do função anulado. No Do função, os argumentos input_dict , output_dict e exec_properties que são passados no mapa para INPUTS , OUTPUTS e PARAMETERS que são definidos em ComponentSpec respectivamente. Para exec_properties , o valor pode ser obtida diretamente através de uma pesquisa no dicionário. Para artefatos em input_dict e output_dict , existem funções convenientes disponíveis em artifact_utils classe que pode ser usado para buscar artefato instância ou artefato uri.

class Executor(base_executor.BaseExecutor):
  """Executor for HelloComponent."""

  def Do(self, input_dict: Dict[Text, List[types.Artifact]],
         output_dict: Dict[Text, List[types.Artifact]],
         exec_properties: Dict[Text, Any]) -> None:
    ...

    split_to_instance = {}
    for artifact in input_dict['input_data']:
      for split in json.loads(artifact.split_names):
        uri = artifact_utils.get_split_uri([artifact], split)
        split_to_instance[split] = uri

    for split, instance in split_to_instance.items():
      input_dir = instance
      output_dir = artifact_utils.get_split_uri(
          output_dict['output_data'], split)
      for filename in tf.io.gfile.listdir(input_dir):
        input_uri = os.path.join(input_dir, filename)
        output_uri = os.path.join(output_dir, filename)
        io_utils.copy_file(src=input_uri, dst=output_uri, overwrite=True)

Teste de unidade de um executor personalizado

Testes de unidade para o executor personalizado pode ser criado semelhante a este .

Interface de componente

Agora que a parte mais complexa está concluída, a próxima etapa é montar essas peças em uma interface de componente, para permitir que o componente seja usado em um pipeline. Existem várias etapas:

  • Faça o componente de interface de uma subclasse de base_component.BaseComponent
  • Atribuir uma variável de classe SPEC_CLASS com o ComponentSpec classe que foi definido anteriormente
  • Atribuir uma variável de classe EXECUTOR_SPEC com a classe Executor que foi definido anteriormente
  • Definir a __init__() função construtora, utilizando os argumentos para a função para a construção de uma instância da classe ComponentSpec e invocar a função super com esse valor, juntamente com um nome opcional

Quando uma instância do componente é criado, verificação de tipo de lógica no base_component.BaseComponent classe será chamado para garantir que os argumentos que foram passados em são compatíveis com a informação tipo definido no ComponentSpec classe.

from tfx.types import standard_artifacts
from hello_component import executor

class HelloComponent(base_component.BaseComponent):
  """Custom TFX Hello World Component."""

  SPEC_CLASS = HelloComponentSpec
  EXECUTOR_SPEC = executor_spec.ExecutorClassSpec(executor.Executor)

  def __init__(self,
               input_data: types.Channel = None,
               output_data: types.Channel = None,
               name: Optional[Text] = None):
    if not output_data:
      examples_artifact = standard_artifacts.Examples()
      examples_artifact.split_names = input_data.get()[0].split_names
      output_data = channel_utils.as_channel([examples_artifact])

    spec = HelloComponentSpec(input_data=input_data,
                              output_data=output_data, name=name)
    super(HelloComponent, self).__init__(spec=spec)

Monte em um pipeline TFX

A última etapa é conectar o novo componente personalizado em um pipeline TFX. Além de adicionar uma instância do novo componente, o seguinte também é necessário:

  • Conecte corretamente os componentes upstream e downstream do novo componente a ele. Isso é feito referenciando as saídas do componente upstream no novo componente e referenciando as saídas do novo componente nos componentes downstream
  • Adicione a nova instância do componente à lista de componentes ao construir o pipeline.

O exemplo abaixo destaca as alterações mencionadas. Exemplo completo pode ser encontrado no repo TFX GitHub .

def _create_pipeline():
  ...
  example_gen = CsvExampleGen(input_base=examples)
  hello = component.HelloComponent(
      input_data=example_gen.outputs['examples'], name='HelloWorld')
  statistics_gen = StatisticsGen(examples=hello.outputs['output_data'])
  ...
  return pipeline.Pipeline(
      ...
      components=[example_gen, hello, statistics_gen, ...],
      ...
  )

Implante um componente totalmente personalizado

Ao lado de alterações de código, todas as peças recém-adicionadas ( ComponentSpec , Executor , interface do componente) necessidade de ser acessível em ambiente correndo gasoduto, a fim de executar o pipeline corretamente.