דף זה תורגם על ידי Cloud Translation API.
Switch to English

הפקת מערכי נתונים גדולים עם Apache Beam

כמה מערכי נתונים גדולים מכדי לעבד אותם במכונה אחת. tfds תומך בהפקת נתונים במחשבים רבים באמצעות Apache Beam .

למסמך זה יש שני חלקים:

  • למשתמש שרוצים ליצור מערך נתונים קיים של Beam
  • למפתחים שרוצים ליצור מערך נתונים חדש של Beam

תוכן ענינים:

יצירת מערך נתונים של Beam

להלן דוגמאות שונות לייצור מערך נתונים של Beam, הן בענן והן באופן מקומי.

ב- Google Cloud Dataflow

כדי להפעיל את הצינור באמצעות Google Cloud Dataflow ולנצל את החישוב המופץ, עקוב תחילה אחר הוראות Quickstart .

לאחר הגדרת הסביבה שלך, אתה יכול להריץ את הסקריפט download_and_prepare באמצעות ספריית נתונים ב- GCS ולציין את האפשרויות הנדרשות עבור הדגל - --beam_pipeline_options .

כדי להקל על הפעלת הסקריפט, כדאי להגדיר את המשתנים הבאים באמצעות הערכים בפועל עבור הגדרת GCP / GCS שלך ואת מערך הנתונים שתרצה ליצור:

 DATASET_NAME=<dataset-name>
DATASET_CONFIG=<dataset-config>
GCP_PROJECT=my-project-id
GCS_BUCKET=gs://my-gcs-bucket
 

לאחר מכן תצטרך ליצור קובץ כדי לומר ל- tfds להתקין tfds על העובדים:

 echo "tensorflow_datasets[$DATASET_NAME]" > /tmp/beam_requirements.txt
 

אם אתה משתמש ב- tfds-nightly , הקפד להדהד מ- tfds-nightly למקרה tfds-nightly הנתונים עודכנה מאז המהדורה האחרונה.

 echo "tfds-nightly[$DATASET_NAME]" > /tmp/beam_requirements.txt
 

לבסוף, תוכלו להפעיל את העבודה באמצעות הפקודה שלהלן:

 python -m tensorflow_datasets.scripts.download_and_prepare \
  --datasets=$DATASET_NAME/$DATASET_CONFIG \
  --data_dir=$GCS_BUCKET/tensorflow_datasets \
  --beam_pipeline_options=\
"runner=DataflowRunner,project=$GCP_PROJECT,job_name=$DATASET_NAME-gen,"\
"staging_location=$GCS_BUCKET/binaries,temp_location=$GCS_BUCKET/temp,"\
"requirements_file=/tmp/beam_requirements.txt"
 

מקומית

כדי להריץ את הסקריפט באופן מקומי באמצעות רץ Apache Beam המוגדר כברירת מחדל, הפקודה זהה לזו של מערכי נתונים אחרים:

 python -m tensorflow_datasets.scripts.download_and_prepare \
  --datasets=my_new_dataset
 

עם תסריט מותאם אישית

כדי ליצור את מערך הנתונים ב- Beam, ה- API זהה למערכות נתונים אחרות, אך עליך להעביר את אפשרויות Beam או הרץ ל- DownloadConfig .

 # If you are running on Dataflow, Spark,..., you may have to set-up runtime
# flags. Otherwise, you can leave flags empty [].
flags = ['--runner=DataflowRunner', '--project=<project-name>', ...]

# To use Beam, you have to set at least one of `beam_options` or `beam_runner`
dl_config = tfds.download.DownloadConfig(
    beam_options=beam.options.pipeline_options.PipelineOptions(flags=flags)
)

data_dir = 'gs://my-gcs-bucket/tensorflow_datasets'
builder = tfds.builder('wikipedia/20190301.en', data_dir=data_dir)
builder.download_and_prepare(
    download_dir=FLAGS.download_dir,
    download_config=dl_config,
)
 

הטמעת מערך נתונים של Beam

תנאים מוקדמים

על מנת לכתוב מערכי נתונים של Apache Beam, עליך להכיר את המושגים הבאים:

הוראות

אם אתה מכיר את המדריך ליצירת מערכי נתונים , הוספת מערך נתונים של Beam דורשת רק כמה שינויים:

  • DatasetBuilder שלך יירש מ- tfds.core.BeamBasedBuilder במקום tfds.core.GeneratorBasedBuilder .
  • מערכי נתונים של קרן צריכים ליישם את השיטה המופשטת _build_pcollection(self, **kwargs) במקום את השיטה _generate_examples(self, **kwargs) . _build_pcollection צריך להחזיר קרן. beam.PCollection עם הדוגמאות הקשורות לפיצול.
  • כתיבת מבחן יחידה עבור מערך הנתונים של Beam זהה למערכות נתונים אחרות.

כמה שיקולים נוספים:

  • השתמש ב tfds.core.lazy_imports כדי לייבא את Apache Beam. על ידי שימוש בתלות עצלה, משתמשים יכולים עדיין לקרוא את מערך הנתונים לאחר שנוצר מבלי שהם צריכים להתקין את Beam.
  • היזהר עם סגירת פייתון. בעת הפעלת הצינור, את beam.Map ו beam.DoFn הפונקציות בהמשכים באמצעות pickle ונשלחו לכל העובדים. זה יכול ליצור באגים; לדוגמה, אם אתה משתמש באובייקט הניתן לשינוי בפונקציות שלך שהוכרז מחוץ לפונקציה, אתה עלול להיתקל בשגיאות pickle או בהתנהגות בלתי צפויה. התיקון הוא בדרך כלל כדי להימנע מהשתקה של עצמים סגורים.
  • השימוש בשיטות ב- DatasetBuilder בצינור Beam הוא בסדר. עם זאת, בדרך בה סדרת המחלקה במהלך הכבישה, השינויים שנעשו בתכונות במהלך היצירה יתעלמו במקרה הטוב.

דוגמא

הנה דוגמה למערך נתונים של Beam. לדוגמה אמיתית מורכבת יותר, עיין במערך הנתונים של Wikipedia .

 class DummyBeamDataset(tfds.core.BeamBasedBuilder):

  VERSION = tfds.core.Version('1.0.0')

  def _info(self):
    return tfds.core.DatasetInfo(
        builder=self,
        features=tfds.features.FeaturesDict({
            'image': tfds.features.Image(shape=(16, 16, 1)),
            'label': tfds.features.ClassLabel(names=['dog', 'cat']),
        }),
    )

  def _split_generators(self, dl_manager):
    ...
    return [
        tfds.core.SplitGenerator(
            name=tfds.Split.TRAIN,
            gen_kwargs=dict(file_dir='path/to/train_data/'),
        ),
        splits_lib.SplitGenerator(
            name=tfds.Split.TEST,
            gen_kwargs=dict(file_dir='path/to/test_data/'),
        ),
    ]

  def _build_pcollection(self, pipeline, file_dir):
    """Generate examples as dicts."""
    beam = tfds.core.lazy_imports.apache_beam

    def _process_example(filename):
      # Use filename as key
      return filename, {
          'image': os.path.join(file_dir, filename),
          'label': filename.split('.')[1],  # Extract label: "0010102.dog.jpeg"
      }

    return (
        pipeline
        | beam.Create(tf.io.gfile.listdir(file_dir))
        | beam.Map(_process_example)
    )

 

מפעיל את הצינור שלך

כדי להפעיל את הצינור, התבונן בקטע שלמעלה.

 python -m tensorflow_datasets.scripts.download_and_prepare \
  --register_checksums \
  --datasets=my_new_dataset