이 페이지는 Cloud Translation API를 통해 번역되었습니다.
Switch to English

Apache Beam으로 빅 데이터 생성

일부 데이터 세트가 너무 커서 단일 시스템에서 처리 할 수 ​​없습니다. tfdsApache Beam 을 사용하여 많은 머신에서 데이터 생성을 지원합니다.

이 문서에는 두 개의 섹션이 있습니다.

  • 기존 Beam 데이터 셋을 생성하려는 사용자
  • 새로운 Beam 데이터 셋을 생성하려는 개발자

내용의 테이블:

빔 데이터 세트 생성

다음은 클라우드 또는 로컬에서 Beam 데이터 세트를 생성하는 다른 예입니다.

Google Cloud Dataflow에서

Google Cloud Dataflow를 사용하여 파이프 라인을 실행하고 분산 계산을 이용하려면 먼저 빠른 시작 지침을 따르십시오.

환경이 설정되면 GCS 의 데이터 디렉토리를 사용하고 --beam_pipeline_options 플래그에 필요한 옵션 을 지정하여 download_and_prepare 스크립트를 실행할 수 있습니다.

스크립트를보다 쉽게 ​​시작하려면 GCP / GCS 설정의 실제 값과 생성하려는 데이터 세트를 사용하여 다음 변수를 정의하면 도움이됩니다.

 DATASET_NAME=<dataset-name>
DATASET_CONFIG=<dataset-config>
GCP_PROJECT=my-project-id
GCS_BUCKET=gs://my-gcs-bucket
 

그런 다음 작업자에게 tfds 를 설치 tfds Dataflow에 지시하는 파일을 작성해야합니다.

 echo "tensorflow_datasets[$DATASET_NAME]" > /tmp/beam_requirements.txt
 

당신이 사용하는 경우 tfds-nightly 에서 에코 할 수 있는지 확인 tfds-nightly 경우 데이터 세트가 마지막 릴리스 이후 업데이트되었습니다.

 echo "tfds-nightly[$DATASET_NAME]" > /tmp/beam_requirements.txt
 

마지막으로 아래 명령을 사용하여 작업을 시작할 수 있습니다.

 python -m tensorflow_datasets.scripts.download_and_prepare \
  --datasets=$DATASET_NAME/$DATASET_CONFIG \
  --data_dir=$GCS_BUCKET/tensorflow_datasets \
  --beam_pipeline_options=\
"runner=DataflowRunner,project=$GCP_PROJECT,job_name=$DATASET_NAME-gen,"\
"staging_location=$GCS_BUCKET/binaries,temp_location=$GCS_BUCKET/temp,"\
"requirements_file=/tmp/beam_requirements.txt"
 

토지 상에서

기본 Apache Beam 러너를 사용하여 로컬로 스크립트를 실행하려면 명령이 다른 데이터 세트와 동일합니다.

 python -m tensorflow_datasets.scripts.download_and_prepare \
  --datasets=my_new_dataset
 

커스텀 스크립트

Beam에서 데이터 세트를 생성하려면 API는 다른 데이터 세트와 동일하지만 Beam 옵션 또는 러너를 DownloadConfig 로 전달해야합니다.

 # If you are running on Dataflow, Spark,..., you may have to set-up runtime
# flags. Otherwise, you can leave flags empty [].
flags = ['--runner=DataflowRunner', '--project=<project-name>', ...]

# To use Beam, you have to set at least one of `beam_options` or `beam_runner`
dl_config = tfds.download.DownloadConfig(
    beam_options=beam.options.pipeline_options.PipelineOptions(flags=flags)
)

data_dir = 'gs://my-gcs-bucket/tensorflow_datasets'
builder = tfds.builder('wikipedia/20190301.en', data_dir=data_dir)
builder.download_and_prepare(
    download_dir=FLAGS.download_dir,
    download_config=dl_config,
)
 

Beam 데이터 셋 구현

전제 조건

Apache Beam 데이터 세트를 작성하려면 다음 개념에 익숙해야합니다.

명령

데이터 세트 생성 안내서에 익숙한 경우 Beam 데이터 세트를 추가하면 약간만 수정하면됩니다.

  • DatasetBuildertfds.core.BeamBasedBuilder 대신 tfds.core.GeneratorBasedBuilder 에서 상속됩니다.
  • 빔 데이터 세트는 추상 메소드의 구현해야합니다 _build_pcollection(self, **kwargs) 대신 메소드 _generate_examples(self, **kwargs) . _build_pcollection 반환해야 beam.PCollection 분할과 관련된 예제와 함께합니다.
  • Beam 데이터 세트에 대한 단위 테스트 작성은 다른 데이터 세트와 동일합니다.

몇 가지 추가 고려 사항 :

  • tfds.core.lazy_imports 를 사용하여 Apache Beam을 가져 오십시오. 지연 종속성을 사용하면 사용자는 Beam을 설치하지 않고도 생성 된 데이터 세트를 계속 읽을 수 있습니다.
  • 파이썬 클로저에주의하십시오. 파이프 라인을 실행할 때 beam.Mapbeam.DoFn 함수는 pickle 사용하여 직렬화되고 모든 작업자에게 전송됩니다. 버그가 생길 수 있습니다. 예를 들어 함수 외부에서 선언 된 함수에서 가변 객체를 사용하는 경우 pickle 오류 또는 예기치 않은 동작이 발생할 수 있습니다. 이 수정은 일반적으로 닫힌 개체의 변형을 피하기위한 것입니다.
  • Beam 파이프 라인의 DatasetBuilder 에서 메소드를 사용하는 것이 좋습니다. 그러나 피클 중 클래스가 직렬화되는 방식은 작성 중 기능에 대한 변경 사항이 무시됩니다.

다음은 Beam 데이터 세트의 예입니다. 보다 복잡한 실제 예를 보려면 Wikipedia 데이터 세트를 살펴보십시오.

 class DummyBeamDataset(tfds.core.BeamBasedBuilder):

  VERSION = tfds.core.Version('1.0.0')

  def _info(self):
    return tfds.core.DatasetInfo(
        builder=self,
        features=tfds.features.FeaturesDict({
            'image': tfds.features.Image(shape=(16, 16, 1)),
            'label': tfds.features.ClassLabel(names=['dog', 'cat']),
        }),
    )

  def _split_generators(self, dl_manager):
    ...
    return [
        tfds.core.SplitGenerator(
            name=tfds.Split.TRAIN,
            gen_kwargs=dict(file_dir='path/to/train_data/'),
        ),
        splits_lib.SplitGenerator(
            name=tfds.Split.TEST,
            gen_kwargs=dict(file_dir='path/to/test_data/'),
        ),
    ]

  def _build_pcollection(self, pipeline, file_dir):
    """Generate examples as dicts."""
    beam = tfds.core.lazy_imports.apache_beam

    def _process_example(filename):
      # Use filename as key
      return filename, {
          'image': os.path.join(file_dir, filename),
          'label': filename.split('.')[1],  # Extract label: "0010102.dog.jpeg"
      }

    return (
        pipeline
        | beam.Create(tf.io.gfile.listdir(file_dir))
        | beam.Map(_process_example)
    )

 

파이프 라인 실행

파이프 라인을 실행하려면 위 섹션을 살펴보십시오.

 python -m tensorflow_datasets.scripts.download_and_prepare \
  --register_checksums \
  --datasets=my_new_dataset