अपाचे बीम के साथ बड़े डेटासेट तैयार करना

कुछ डेटासेट एक ही मशीन पर संसाधित होने के लिए बहुत बड़े हैं। tfds अपाचे बीम का उपयोग करके कई मशीनों में डेटा उत्पन्न करने का समर्थन करता है।

इस दस्तावेज़ में दो खंड हैं:

  • उन उपयोगकर्ताओं के लिए जो मौजूदा बीम डेटासेट तैयार करना चाहते हैं
  • उन डेवलपर्स के लिए जो एक नया बीम डेटासेट बनाना चाहते हैं

बीम डेटासेट बनाना

क्लाउड पर या स्थानीय स्तर पर, बीम डेटासेट तैयार करने के विभिन्न उदाहरण नीचे दिए गए हैं।

Google क्लाउड डेटाफ़्लो पर

Google क्लाउड डेटाफ़्लो का उपयोग करके पाइपलाइन चलाने और वितरित गणना का लाभ उठाने के लिए, पहले क्विकस्टार्ट निर्देशों का पालन करें।

एक बार जब आपका वातावरण स्थापित हो जाता है, तो आप जीसीएस पर डेटा निर्देशिका का उपयोग करके और --beam_pipeline_options ध्वज के लिए आवश्यक विकल्प निर्दिष्ट करके tfds build CLI चला सकते हैं।

स्क्रिप्ट को लॉन्च करना आसान बनाने के लिए, आपके GCP/GCS सेटअप के वास्तविक मानों और जिस डेटासेट को आप जनरेट करना चाहते हैं, उसका उपयोग करके निम्नलिखित वेरिएबल्स को परिभाषित करना सहायक होता है:

DATASET_NAME=<dataset-name>
DATASET_CONFIG=<dataset-config>
GCP_PROJECT=my-project-id
GCS_BUCKET=gs://my-gcs-bucket

फिर आपको कर्मचारियों पर tfds स्थापित करने के लिए डेटाफ़्लो को बताने के लिए एक फ़ाइल बनाने की आवश्यकता होगी:

echo "tensorflow_datasets[$DATASET_NAME]" > /tmp/beam_requirements.txt

यदि आप tfds-nightly का उपयोग कर रहे हैं, तो सुनिश्चित करें कि अंतिम रिलीज़ के बाद से डेटासेट अपडेट होने की स्थिति में tfds-nightly से इको हो।

echo "tfds-nightly[$DATASET_NAME]" > /tmp/beam_requirements.txt

यदि आप टीएफडीएस लाइब्रेरी में शामिल नहीं की गई अतिरिक्त निर्भरता का उपयोग कर रहे हैं , तो पायथन पाइपलाइन निर्भरता के प्रबंधन के लिए निर्देशों का पालन करें।

अंत में, आप नीचे दिए गए आदेश का उपयोग करके कार्य लॉन्च कर सकते हैं:

tfds build $DATASET_NAME/$DATASET_CONFIG \
  --data_dir=$GCS_BUCKET/tensorflow_datasets \
  --beam_pipeline_options=\
"runner=DataflowRunner,project=$GCP_PROJECT,job_name=$DATASET_NAME-gen,"\
"staging_location=$GCS_BUCKET/binaries,temp_location=$GCS_BUCKET/temp,"\
"requirements_file=/tmp/beam_requirements.txt"

स्थानीय स्तर पर

डिफ़ॉल्ट अपाचे बीम रनर (इसे मेमोरी में सभी डेटा फिट होना चाहिए) का उपयोग करके अपनी स्क्रिप्ट को स्थानीय रूप से चलाने के लिए, कमांड अन्य डेटासेट के समान है:

tfds build my_dataset

Apache Flink का उपयोग करके पाइपलाइन चलाने के लिए आप आधिकारिक दस्तावेज़ पढ़ सकते हैं। सुनिश्चित करें कि आपका बीम फ़्लिंक संस्करण संगतता के अनुरूप है

स्क्रिप्ट को लॉन्च करना आसान बनाने के लिए, आपके फ़्लिंक सेटअप के वास्तविक मानों और आपके द्वारा जेनरेट किए जाने वाले डेटासेट का उपयोग करके निम्नलिखित वेरिएबल्स को परिभाषित करना सहायक होता है:

DATASET_NAME=<dataset-name>
DATASET_CONFIG=<dataset-config>
FLINK_CONFIG_DIR=<flink-config-directory>
FLINK_VERSION=<flink-version>

एम्बेडेड फ़्लिंक क्लस्टर पर चलने के लिए, आप नीचे दिए गए कमांड का उपयोग करके कार्य लॉन्च कर सकते हैं:

tfds build $DATASET_NAME/$DATASET_CONFIG \
  --beam_pipeline_options=\
"runner=FlinkRunner,flink_version=$FLINK_VERSION,flink_conf_dir=$FLINK_CONFIG_DIR"

एक कस्टम स्क्रिप्ट के साथ

बीम पर डेटासेट उत्पन्न करने के लिए, एपीआई अन्य डेटासेट के समान ही है। आप DownloadConfig के beam_options (और beam_runner ) तर्कों का उपयोग करके beam.Pipeline अनुकूलित कर सकते हैं।

# If you are running on Dataflow, Spark,..., you may have to set-up runtime
# flags. Otherwise, you can leave flags empty [].
flags = ['--runner=DataflowRunner', '--project=<project-name>', ...]

# `beam_options` (and `beam_runner`) will be forwarded to `beam.Pipeline`
dl_config = tfds.download.DownloadConfig(
    beam_options=beam.options.pipeline_options.PipelineOptions(flags=flags)
)
data_dir = 'gs://my-gcs-bucket/tensorflow_datasets'
builder = tfds.builder('wikipedia/20190301.en', data_dir=data_dir)
builder.download_and_prepare(download_config=dl_config)

बीम डेटासेट लागू करना

आवश्यक शर्तें

अपाचे बीम डेटासेट लिखने के लिए, आपको निम्नलिखित अवधारणाओं से परिचित होना चाहिए:

निर्देश

यदि आप डेटासेट निर्माण मार्गदर्शिका से परिचित हैं, तो बीम डेटासेट जोड़ने के लिए केवल _generate_examples फ़ंक्शन को संशोधित करने की आवश्यकता होती है। फ़ंक्शन को जनरेटर के बजाय बीम ऑब्जेक्ट लौटाना चाहिए:

गैर-बीम डेटासेट:

def _generate_examples(self, path):
  for f in path.iterdir():
    yield _process_example(f)

बीम डेटासेट:

def _generate_examples(self, path):
  return (
      beam.Create(path.iterdir())
      | beam.Map(_process_example)
  )

परीक्षण सहित बाकी सभी 100% समान हो सकते हैं।

कुछ अतिरिक्त विचार:

  • अपाचे बीम आयात करने के लिए tfds.core.lazy_imports उपयोग करें। आलसी निर्भरता का उपयोग करके, उपयोगकर्ता बीम को स्थापित किए बिना उत्पन्न होने के बाद भी डेटासेट को पढ़ सकते हैं।
  • पायथन क्लोजर से सावधान रहें। पाइपलाइन चलाते समय, beam.Map और beam.DoFn फ़ंक्शन को pickle उपयोग करके क्रमबद्ध किया जाता है और सभी श्रमिकों को भेजा जाता है। beam.PTransform के अंदर परिवर्तनशील वस्तुओं का उपयोग न करें। यदि राज्य को श्रमिकों के बीच साझा करना है तो ट्रांसफॉर्म करें।
  • जिस तरह से tfds.core.DatasetBuilder अचार के साथ क्रमबद्ध किया गया है, डेटा निर्माण के दौरान tfds.core.DatasetBuilder बदलने पर श्रमिकों पर ध्यान नहीं दिया जाएगा (उदाहरण के लिए _split_generators में self.info.metadata['offset'] = 123 सेट करना संभव नहीं है। और इसे beam.Map(lambda x: x + self.info.metadata['offset']) जैसे कार्यकर्ताओं से एक्सेस करें)
  • यदि आपको स्प्लिट्स के बीच कुछ पाइपलाइन चरणों को साझा करने की आवश्यकता है, तो आप एक अतिरिक्त pipeline: beam.Pipeline kwarg से _split_generator और पूर्ण पीढ़ी पाइपलाइन को नियंत्रित करें। tfds.core.GeneratorBasedBuilder का _generate_examples दस्तावेज़ देखें।

उदाहरण

यहां बीम डेटासेट का एक उदाहरण दिया गया है।

class DummyBeamDataset(tfds.core.GeneratorBasedBuilder):

  VERSION = tfds.core.Version('1.0.0')

  def _info(self):
    return self.dataset_info_from_configs(
        features=tfds.features.FeaturesDict({
            'image': tfds.features.Image(shape=(16, 16, 1)),
            'label': tfds.features.ClassLabel(names=['dog', 'cat']),
        }),
    )

  def _split_generators(self, dl_manager):
    ...
    return {
        'train': self._generate_examples(file_dir='path/to/train_data/'),
        'test': self._generate_examples(file_dir='path/to/test_data/'),
    }

  def _generate_examples(self, file_dir: str):
    """Generate examples as dicts."""
    beam = tfds.core.lazy_imports.apache_beam

    def _process_example(filename):
      # Use filename as key
      return filename, {
          'image': os.path.join(file_dir, filename),
          'label': filename.split('.')[1],  # Extract label: "0010102.dog.jpeg"
      }

    return (
        beam.Create(tf.io.gfile.listdir(file_dir))
        | beam.Map(_process_example)
    )

आपकी पाइपलाइन चल रही है

पाइपलाइन चलाने के लिए, उपरोक्त अनुभाग पर एक नज़र डालें।

tfds build my_dataset --register_checksums

इनपुट के रूप में टीएफडीएस का उपयोग करने वाली पाइपलाइन

यदि आप एक बीम पाइपलाइन बनाना चाहते हैं जो स्रोत के रूप में TFDS डेटासेट लेती है, तो आप tfds.beam.ReadFromTFDS का उपयोग कर सकते हैं:

builder = tfds.builder('my_dataset')

_ = (
    pipeline
    | tfds.beam.ReadFromTFDS(builder, split='train')
    | beam.Map(tfds.as_numpy)
    | ...
)

यह डेटासेट के प्रत्येक टुकड़े को समानांतर में संसाधित करेगा।