अपाचे बीम के साथ बड़े डेटासेट बनाना

कुछ डेटासेट एक मशीन पर प्रोसेस किए जाने के लिए बहुत बड़े हैं। tfds Apache Beam का उपयोग करके कई मशीनों में डेटा उत्पन्न करने का समर्थन करता है।

इस दस्तावेज़ के दो खंड हैं:

  • उन उपयोगकर्ताओं के लिए जो मौजूदा बीम डेटासेट बनाना चाहते हैं
  • उन डेवलपर्स के लिए जो एक नया बीम डेटासेट बनाना चाहते हैं

बीम डेटासेट बनाना

नीचे क्लाउड या स्थानीय रूप से बीम डेटासेट बनाने के विभिन्न उदाहरण दिए गए हैं।

Google मेघ डेटा प्रवाह पर

Google क्लाउड डेटाफ़्लो का उपयोग करके पाइपलाइन चलाने और वितरित संगणना का लाभ उठाने के लिए, पहले क्विकस्टार्ट निर्देशों का पालन करें।

एक बार आपका वातावरण स्थापित हो जाने के बाद, आप GCS पर डेटा निर्देशिका का उपयोग करके और --beam_pipeline_options फ़्लैग के लिए आवश्यक विकल्प निर्दिष्ट करके tfds build CLI चला सकते हैं।

स्क्रिप्ट को लॉन्च करना आसान बनाने के लिए, आपके जीसीपी/जीसीएस सेटअप और आपके द्वारा जेनरेट किए जाने वाले डेटासेट के वास्तविक मानों का उपयोग करके निम्न चरों को परिभाषित करना सहायक होता है:

DATASET_NAME=<dataset-name>
DATASET_CONFIG=<dataset-config>
GCP_PROJECT=my-project-id
GCS_BUCKET=gs://my-gcs-bucket

इसके बाद आपको डेटाफ़्लो को कर्मचारियों पर tfds स्थापित करने के लिए कहने के लिए एक फ़ाइल बनाने की आवश्यकता होगी:

echo "tensorflow_datasets[$DATASET_NAME]" > /tmp/beam_requirements.txt

यदि आप tfds-nightly का उपयोग कर रहे हैं, तो अंतिम रिलीज़ के बाद डेटासेट को अपडेट किए जाने की स्थिति में tfds-nightly से प्रतिध्वनि करना सुनिश्चित करें।

echo "tfds-nightly[$DATASET_NAME]" > /tmp/beam_requirements.txt

अंत में, आप नीचे दिए गए आदेश का उपयोग करके कार्य प्रारंभ कर सकते हैं:

tfds build $DATASET_NAME/$DATASET_CONFIG \
  --data_dir=$GCS_BUCKET/tensorflow_datasets \
  --beam_pipeline_options=\
"runner=DataflowRunner,project=$GCP_PROJECT,job_name=$DATASET_NAME-gen,"\
"staging_location=$GCS_BUCKET/binaries,temp_location=$GCS_BUCKET/temp,"\
"requirements_file=/tmp/beam_requirements.txt"

स्थानीय स्तर पर

डिफ़ॉल्ट अपाचे बीम रनर का उपयोग करके स्थानीय रूप से अपनी स्क्रिप्ट चलाने के लिए (इसे स्मृति में सभी डेटा फिट होना चाहिए), आदेश अन्य डेटासेट के समान ही है:

tfds build my_dataset

Apache Flink का उपयोग करके पाइपलाइन चलाने के लिए आप आधिकारिक दस्तावेज पढ़ सकते हैं। सुनिश्चित करें कि आपका बीम फ्लिंक संस्करण संगतता के अनुरूप है

स्क्रिप्ट को लॉन्च करना आसान बनाने के लिए, आपके फ़्लिंक सेटअप और आपके द्वारा उत्पन्न किए जाने वाले डेटासेट के वास्तविक मानों का उपयोग करके निम्नलिखित चर को परिभाषित करना मददगार होता है:

DATASET_NAME=<dataset-name>
DATASET_CONFIG=<dataset-config>
FLINK_CONFIG_DIR=<flink-config-directory>
FLINK_VERSION=<flink-version>

एम्बेडेड फ़्लिंक क्लस्टर पर चलने के लिए, आप नीचे दी गई कमांड का उपयोग करके कार्य शुरू कर सकते हैं:

tfds build $DATASET_NAME/$DATASET_CONFIG \
  --beam_pipeline_options=\
"runner=FlinkRunner,flink_version=$FLINK_VERSION,flink_conf_dir=$FLINK_CONFIG_DIR"

एक कस्टम स्क्रिप्ट के साथ

बीम पर डेटासेट उत्पन्न करने के लिए, एपीआई अन्य डेटासेट के समान ही है। आप DownloadConfig के beam_options (और beam_runner ) तर्कों का उपयोग करके beam.Pipeline अनुकूलित कर सकते हैं।

# If you are running on Dataflow, Spark,..., you may have to set-up runtime
# flags. Otherwise, you can leave flags empty [].
flags = ['--runner=DataflowRunner', '--project=<project-name>', ...]

# `beam_options` (and `beam_runner`) will be forwarded to `beam.Pipeline`
dl_config = tfds.download.DownloadConfig(
    beam_options=beam.options.pipeline_options.PipelineOptions(flags=flags)
)
data_dir = 'gs://my-gcs-bucket/tensorflow_datasets'
builder = tfds.builder('wikipedia/20190301.en', data_dir=data_dir)
builder.download_and_prepare(download_config=dl_config)

बीम डेटासेट लागू करना

आवश्यक शर्तें

अपाचे बीम डेटासेट लिखने के लिए, आपको निम्नलिखित अवधारणाओं से परिचित होना चाहिए:

निर्देश

यदि आप डेटासेट निर्माण मार्गदर्शिका से परिचित हैं, तो बीम डेटासेट जोड़ने के लिए केवल _generate_examples फ़ंक्शन को संशोधित करने की आवश्यकता होती है। जनरेटर के बजाय फ़ंक्शन को बीम ऑब्जेक्ट वापस करना चाहिए:

गैर-बीम डेटासेट:

def _generate_examples(self, path):
  for f in path.iterdir():
    yield _process_example(f)

बीम डेटासेट:

def _generate_examples(self, path):
  return (
      beam.Create(path.iterdir())
      | beam.Map(_process_example)
  )

परीक्षण सहित बाकी सभी 100% समान हो सकते हैं।

कुछ अतिरिक्त विचार:

  • अपाचे बीम आयात करने के लिए tfds.core.lazy_imports का प्रयोग करें। एक आलसी निर्भरता का उपयोग करके, उपयोगकर्ता बीम को स्थापित किए बिना उत्पन्न होने के बाद भी डेटासेट पढ़ सकते हैं।
  • पायथन क्लोजर से सावधान रहें। पाइपलाइन चलाते समय, beam.Map और beam.DoFn कार्यों को pickle उपयोग करके क्रमबद्ध किया जाता है और सभी श्रमिकों को भेजा जाता है। एक beam.PTransform के अंदर परिवर्तनशील वस्तुओं का उपयोग न करें। यदि राज्य को श्रमिकों के बीच साझा किया जाना है तो PTransform करें।
  • जिस तरह से tfds.core.DatasetBuilder अचार के साथ क्रमबद्ध किया जाता है, डेटा निर्माण के दौरान tfds.core.DatasetBuilder बदलने से श्रमिकों पर ध्यान नहीं दिया जाएगा (उदाहरण के लिए self.info.metadata['offset'] = 123 _split_generators में सेट करना संभव नहीं है) और इसे beam.Map(lambda x: x + self.info.metadata['offset']) जैसे श्रमिकों से एक्सेस करें)
  • यदि आपको विभाजनों के बीच कुछ पाइपलाइन चरणों को साझा करने की आवश्यकता है, तो आप एक अतिरिक्त pipeline: beam.Pipeline kwarg to _split_generator और पूर्ण जनरेशन पाइपलाइन को नियंत्रित करें। tfds.core.GeneratorBasedBuilder का _generate_examples दस्तावेज़ीकरण देखें।

उदाहरण

यहां बीम डेटासेट का एक उदाहरण दिया गया है।

class DummyBeamDataset(tfds.core.GeneratorBasedBuilder):

  VERSION = tfds.core.Version('1.0.0')

  def _info(self):
    return self.dataset_info_from_configs(
        features=tfds.features.FeaturesDict({
            'image': tfds.features.Image(shape=(16, 16, 1)),
            'label': tfds.features.ClassLabel(names=['dog', 'cat']),
        }),
    )

  def _split_generators(self, dl_manager):
    ...
    return {
        'train': self._generate_examples(file_dir='path/to/train_data/'),
        'test': self._generate_examples(file_dir='path/to/test_data/'),
    }

  def _generate_examples(self, file_dir: str):
    """Generate examples as dicts."""
    beam = tfds.core.lazy_imports.apache_beam

    def _process_example(filename):
      # Use filename as key
      return filename, {
          'image': os.path.join(file_dir, filename),
          'label': filename.split('.')[1],  # Extract label: "0010102.dog.jpeg"
      }

    return (
        beam.Create(tf.io.gfile.listdir(file_dir))
        | beam.Map(_process_example)
    )

अपनी पाइपलाइन चला रहा है

पाइपलाइन चलाने के लिए, उपरोक्त अनुभाग पर एक नज़र डालें।

tfds build my_dataset --register_checksums

इनपुट के रूप में TFDS का उपयोग कर पाइपलाइन

यदि आप एक बीम पाइपलाइन बनाना चाहते हैं जो एक TFDS डेटासेट को स्रोत के रूप में लेती है, तो आप tfds.beam.ReadFromTFDS का उपयोग कर सकते हैं:

builder = tfds.builder('my_dataset')

_ = (
    pipeline
    | tfds.beam.ReadFromTFDS(builder, split='train')
    | beam.Map(tfds.as_numpy)
    | ...
)

यह डेटासेट के प्रत्येक शार्ड को समानांतर में प्रोसेस करेगा।