ایجاد مجموعه داده های بزرگ با پرتو Apache

برخی از مجموعه‌های داده آنقدر بزرگ هستند که نمی‌توانند روی یک ماشین پردازش شوند. tfds از تولید داده در بسیاری از ماشین ها با استفاده از پرتو آپاچی پشتیبانی می کند.

این سند دارای دو بخش است:

  • برای کاربرانی که می خواهند یک مجموعه داده Beam موجود ایجاد کنند
  • برای توسعه دهندگانی که می خواهند مجموعه داده Beam جدیدی ایجاد کنند

ایجاد مجموعه داده Beam

در زیر نمونه‌های مختلفی از تولید مجموعه داده‌های Beam، چه در فضای ابری و چه به صورت محلی، آورده شده است.

در Google Cloud Dataflow

برای اجرای خط لوله با استفاده از Google Cloud Dataflow و استفاده از محاسبات توزیع شده، ابتدا دستورالعمل‌های شروع سریع را دنبال کنید.

هنگامی که محیط شما راه اندازی شد، می توانید tfds build CLI را با استفاده از دایرکتوری داده در GCS و مشخص کردن گزینه های مورد نیاز برای پرچم --beam_pipeline_options اجرا کنید.

برای آسان‌تر کردن راه‌اندازی اسکریپت، تعریف متغیرهای زیر با استفاده از مقادیر واقعی برای تنظیم GCP/GCS و مجموعه داده‌ای که می‌خواهید ایجاد کنید مفید است:

DATASET_NAME=<dataset-name>
DATASET_CONFIG=<dataset-config>
GCP_PROJECT=my-project-id
GCS_BUCKET=gs://my-gcs-bucket

سپس باید یک فایل ایجاد کنید تا به Dataflow بگویید tfds را روی کارگران نصب کند:

echo "tensorflow_datasets[$DATASET_NAME]" > /tmp/beam_requirements.txt

اگر از tfds-nightly استفاده می‌کنید، مطمئن شوید که از tfds-nightly بازتاب داده می‌شود در صورتی که مجموعه داده از آخرین نسخه به‌روزرسانی شده باشد.

echo "tfds-nightly[$DATASET_NAME]" > /tmp/beam_requirements.txt

اگر از وابستگی‌های اضافی استفاده می‌کنید که در کتابخانه TFDS گنجانده نشده است ، دستورالعمل‌های مدیریت وابستگی‌های خط لوله پایتون را دنبال کنید.

در نهایت، می توانید کار را با استفاده از دستور زیر اجرا کنید:

tfds build $DATASET_NAME/$DATASET_CONFIG \
  --data_dir=$GCS_BUCKET/tensorflow_datasets \
  --beam_pipeline_options=\
"runner=DataflowRunner,project=$GCP_PROJECT,job_name=$DATASET_NAME-gen,"\
"staging_location=$GCS_BUCKET/binaries,temp_location=$GCS_BUCKET/temp,"\
"requirements_file=/tmp/beam_requirements.txt"

به صورت محلی

برای اجرای اسکریپت خود به صورت محلی با استفاده از اجراکننده پیش‌فرض پرتو Apache (باید همه داده‌ها را در حافظه جا دهد)، دستور مانند سایر مجموعه‌های داده است:

tfds build my_dataset

برای اجرای خط لوله با استفاده از Apache Flink می توانید اسناد رسمی را بخوانید. مطمئن شوید که Beam شما با سازگاری نسخه Flink سازگار است

برای آسان‌تر کردن راه‌اندازی اسکریپت، تعریف متغیرهای زیر با استفاده از مقادیر واقعی تنظیمات Flink و مجموعه داده‌ای که می‌خواهید ایجاد کنید مفید است:

DATASET_NAME=<dataset-name>
DATASET_CONFIG=<dataset-config>
FLINK_CONFIG_DIR=<flink-config-directory>
FLINK_VERSION=<flink-version>

برای اجرا بر روی یک خوشه Flink تعبیه شده، می توانید کار را با استفاده از دستور زیر اجرا کنید:

tfds build $DATASET_NAME/$DATASET_CONFIG \
  --beam_pipeline_options=\
"runner=FlinkRunner,flink_version=$FLINK_VERSION,flink_conf_dir=$FLINK_CONFIG_DIR"

با اسکریپت سفارشی

برای تولید مجموعه داده در Beam، API مانند سایر مجموعه‌های داده است. شما می توانید beam.Pipeline با استفاده از آرگومان های beam_optionsbeam_runner ) در DownloadConfig سفارشی کنید.

# If you are running on Dataflow, Spark,..., you may have to set-up runtime
# flags. Otherwise, you can leave flags empty [].
flags = ['--runner=DataflowRunner', '--project=<project-name>', ...]

# `beam_options` (and `beam_runner`) will be forwarded to `beam.Pipeline`
dl_config = tfds.download.DownloadConfig(
    beam_options=beam.options.pipeline_options.PipelineOptions(flags=flags)
)
data_dir = 'gs://my-gcs-bucket/tensorflow_datasets'
builder = tfds.builder('wikipedia/20190301.en', data_dir=data_dir)
builder.download_and_prepare(download_config=dl_config)

پیاده سازی مجموعه داده Beam

پیش نیازها

برای نوشتن مجموعه داده های Apache Beam باید با مفاهیم زیر آشنا باشید:

دستورالعمل ها

اگر با راهنمای ایجاد مجموعه داده آشنا هستید، برای افزودن مجموعه داده Beam فقط باید تابع _generate_examples را تغییر دهید. تابع باید به جای یک مولد، یک شی پرتو را برگرداند:

مجموعه داده غیر پرتو:

def _generate_examples(self, path):
  for f in path.iterdir():
    yield _process_example(f)

مجموعه داده پرتو:

def _generate_examples(self, path):
  return (
      beam.Create(path.iterdir())
      | beam.Map(_process_example)
  )

همه بقیه می توانند 100٪ یکسان باشند، از جمله آزمایشات.

برخی ملاحظات اضافی:

  • برای وارد کردن Apache Beam از tfds.core.lazy_imports استفاده کنید. با استفاده از وابستگی تنبل، کاربران همچنان می توانند پس از تولید مجموعه داده بدون نیاز به نصب Beam، آن را بخوانند.
  • مراقب بسته شدن پایتون باشید. هنگام اجرای خط لوله، توابع beam.Map و beam.DoFn با استفاده از pickle سریال می شوند و برای همه کارگران ارسال می شوند. از اشیای قابل تغییر در داخل beam.PTransform استفاده نکنید. اگر حالت باید بین کارگران به اشتراک گذاشته شود، تغییر شکل دهید.
  • با توجه به نحوه سریال سازی tfds.core.DatasetBuilder با pickle، جهش tfds.core.DatasetBuilder در حین ایجاد داده بر روی کارگران نادیده گرفته می شود (به عنوان مثال تنظیم self.info.metadata['offset'] = 123 در _split_generators ممکن نیست. و از کارگران مانند beam.Map(lambda x: x + self.info.metadata['offset']) به آن دسترسی پیدا کنید.
  • اگر نیاز دارید که برخی از مراحل خط لوله را بین تقسیم‌ها به اشتراک بگذارید، می‌توانید یک pipeline: beam.Pipeline kwarg به _split_generator و کنترل خط لوله تولید کامل. به مستندات _generate_examples tfds.core.GeneratorBasedBuilder مراجعه کنید.

مثال

در اینجا نمونه ای از مجموعه داده های Beam آورده شده است.

class DummyBeamDataset(tfds.core.GeneratorBasedBuilder):

  VERSION = tfds.core.Version('1.0.0')

  def _info(self):
    return self.dataset_info_from_configs(
        features=tfds.features.FeaturesDict({
            'image': tfds.features.Image(shape=(16, 16, 1)),
            'label': tfds.features.ClassLabel(names=['dog', 'cat']),
        }),
    )

  def _split_generators(self, dl_manager):
    ...
    return {
        'train': self._generate_examples(file_dir='path/to/train_data/'),
        'test': self._generate_examples(file_dir='path/to/test_data/'),
    }

  def _generate_examples(self, file_dir: str):
    """Generate examples as dicts."""
    beam = tfds.core.lazy_imports.apache_beam

    def _process_example(filename):
      # Use filename as key
      return filename, {
          'image': os.path.join(file_dir, filename),
          'label': filename.split('.')[1],  # Extract label: "0010102.dog.jpeg"
      }

    return (
        beam.Create(tf.io.gfile.listdir(file_dir))
        | beam.Map(_process_example)
    )

اجرای خط لوله شما

برای اجرای خط لوله، به بخش بالا نگاه کنید.

tfds build my_dataset --register_checksums

خط لوله با استفاده از TFDS به عنوان ورودی

اگر می خواهید خط لوله پرتویی ایجاد کنید که یک مجموعه داده TFDS را به عنوان منبع دریافت کند، می توانید از tfds.beam.ReadFromTFDS استفاده کنید:

builder = tfds.builder('my_dataset')

_ = (
    pipeline
    | tfds.beam.ReadFromTFDS(builder, split='train')
    | beam.Map(tfds.as_numpy)
    | ...
)

هر قطعه از مجموعه داده را به صورت موازی پردازش می کند.