หน้านี้ได้รับการแปลโดย Cloud Translation API
Switch to English

จบจากตัวอย่างสำหรับผู้อ่าน BigQuery TensorFlow

ดูบน TensorFlow.org ทำงานใน Google Colab ดูแหล่งที่มาบน GitHub ดาวน์โหลดสมุดบันทึก

ภาพรวม

บทช่วยสอนนี้แสดงวิธีใช้ โปรแกรมอ่าน BigQuery TensorFlow สำหรับการฝึกอบรมเครือข่ายนิวรัลโดยใช้ Keras Sequential API

ชุด

บทช่วยสอนนี้ใช้ ชุดข้อมูลรายได้ของการสำรวจสำมะโนประชากรของสหรัฐอเมริกาที่ จัดทำโดย UC Irvine Machine Learning Repository ชุดข้อมูลนี้มีข้อมูลเกี่ยวกับผู้คนจากฐานข้อมูลสำมะโนประชากรปี 1994 รวมถึงอายุการศึกษาสถานภาพการสมรสอาชีพและว่าพวกเขาทำเงินได้มากกว่า 50,000 เหรียญต่อปีหรือไม่

ติดตั้ง

ตั้งค่าโครงการ GCP ของคุณ

ขั้นตอนต่อไปนี้เป็นสิ่งจำเป็นโดยไม่คำนึงถึงสภาพแวดล้อมของโน้ตบุ๊คของคุณ

  1. เลือกหรือสร้างโครงการ GCP
  2. ตรวจสอบให้แน่ใจว่าเปิดใช้งานการเรียกเก็บเงินสำหรับโครงการของคุณ
  3. เปิดใช้งาน BigQuery Storage API
  4. ป้อนรหัสโครงการของคุณในเซลล์ด้านล่าง จากนั้นเรียกใช้เซลล์เพื่อให้แน่ใจว่า Cloud SDK ใช้โครงการที่เหมาะสมสำหรับคำสั่งทั้งหมดในสมุดบันทึกนี้

ติดตั้งแพคเกจที่จำเป็นและเริ่มต้นใหม่รันไทม์

 try:
  # Use the Colab's preinstalled TensorFlow 2.x
  %tensorflow_version 2.x 
except:
  pass
 
pip install fastavro
pip install tensorflow-io==0.9.0
pip install google-cloud-bigquery-storage

รับรองความถูกต้อง

 from google.colab import auth
auth.authenticate_user()
print('Authenticated')
 

ตั้งค่ารหัสโครงการของคุณ

 PROJECT_ID = "<YOUR PROJECT>" 
! gcloud config set project $PROJECT_ID
%env GCLOUD_PROJECT=$PROJECT_ID
 

นำเข้าไลบรารี Python กำหนดค่าคงที่

 from __future__ import absolute_import, division, print_function, unicode_literals

import os
from six.moves import urllib
import tempfile

import numpy as np
import pandas as pd
import tensorflow as tf

from google.cloud import bigquery
from google.api_core.exceptions import GoogleAPIError

LOCATION = 'us'

# Storage directory
DATA_DIR = os.path.join(tempfile.gettempdir(), 'census_data')

# Download options.
DATA_URL = 'https://storage.googleapis.com/cloud-samples-data/ml-engine/census/data'
TRAINING_FILE = 'adult.data.csv'
EVAL_FILE = 'adult.test.csv'
TRAINING_URL = '%s/%s' % (DATA_URL, TRAINING_FILE)
EVAL_URL = '%s/%s' % (DATA_URL, EVAL_FILE)

DATASET_ID = 'census_dataset'
TRAINING_TABLE_ID = 'census_training_table'
EVAL_TABLE_ID = 'census_eval_table'

CSV_SCHEMA = [
      bigquery.SchemaField("age", "FLOAT64"),
      bigquery.SchemaField("workclass", "STRING"),
      bigquery.SchemaField("fnlwgt", "FLOAT64"),
      bigquery.SchemaField("education", "STRING"),
      bigquery.SchemaField("education_num", "FLOAT64"),
      bigquery.SchemaField("marital_status", "STRING"),
      bigquery.SchemaField("occupation", "STRING"),
      bigquery.SchemaField("relationship", "STRING"),
      bigquery.SchemaField("race", "STRING"),
      bigquery.SchemaField("gender", "STRING"),
      bigquery.SchemaField("capital_gain", "FLOAT64"),
      bigquery.SchemaField("capital_loss", "FLOAT64"),
      bigquery.SchemaField("hours_per_week", "FLOAT64"),
      bigquery.SchemaField("native_country", "STRING"),
      bigquery.SchemaField("income_bracket", "STRING"),
  ]

UNUSED_COLUMNS = ["fnlwgt", "education_num"]
 

นำเข้าข้อมูลสำมะโนประชากรไปยัง BigQuery

กำหนดวิธีการช่วยเหลือในการโหลดข้อมูลลงใน BigQuery

 def create_bigquery_dataset_if_necessary(dataset_id):
  # Construct a full Dataset object to send to the API.
  client = bigquery.Client(project=PROJECT_ID)
  dataset = bigquery.Dataset(bigquery.dataset.DatasetReference(PROJECT_ID, dataset_id))
  dataset.location = LOCATION

  try:
    dataset = client.create_dataset(dataset)  # API request
    return True
  except GoogleAPIError as err:
    if err.code != 409: # http_client.CONFLICT
      raise
  return False

 
 def load_data_into_bigquery(url, table_id):
  create_bigquery_dataset_if_necessary(DATASET_ID)
  client = bigquery.Client(project=PROJECT_ID)
  dataset_ref = client.dataset(DATASET_ID)
  table_ref = dataset_ref.table(table_id)
  job_config = bigquery.LoadJobConfig()
  job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE
  job_config.source_format = bigquery.SourceFormat.CSV
  job_config.schema = CSV_SCHEMA

  load_job = client.load_table_from_uri(
      url, table_ref, job_config=job_config
  )
  print("Starting job {}".format(load_job.job_id))

  load_job.result()  # Waits for table load to complete.
  print("Job finished.")

  destination_table = client.get_table(table_ref)
  print("Loaded {} rows.".format(destination_table.num_rows))
 

โหลดข้อมูลสำมะโนใน BigQuery

 load_data_into_bigquery(TRAINING_URL, TRAINING_TABLE_ID)
load_data_into_bigquery(EVAL_URL, EVAL_TABLE_ID)
 
Starting job 2ceffef8-e6e4-44bb-9e86-3d97b0501187
Job finished.
Loaded 32561 rows.
Starting job bf66f1b3-2506-408b-9009-c19f4ae9f58a
Job finished.
Loaded 16278 rows.

ยืนยันว่านำเข้าข้อมูลแล้ว

สิ่งที่ต้องทำ: แทนที่ <YOUR PROJECT> ด้วย PROJECT_ID ของคุณ

 %%bigquery --use_bqstorage_api
SELECT * FROM `<YOUR PROJECT>.census_dataset.census_training_table` LIMIT 5
 

โหลดข้อมูลสำมะโนในชุดข้อมูล TensorFlow โดยใช้โปรแกรมอ่าน BigQuery

อ่านและแปลงข้อมูล cesnus จาก BigQuery เป็นชุดข้อมูล TensorFlow

 from tensorflow.python.framework import ops
from tensorflow.python.framework import dtypes
from tensorflow_io.bigquery import BigQueryClient
from tensorflow_io.bigquery import BigQueryReadSession
  
def transofrom_row(row_dict):
  # Trim all string tensors
  trimmed_dict = { column:
                  (tf.strings.strip(tensor) if tensor.dtype == 'string' else tensor) 
                  for (column,tensor) in row_dict.items()
                  }
  # Extract feature column
  income_bracket = trimmed_dict.pop('income_bracket')
  # Convert feature column to 0.0/1.0
  income_bracket_float = tf.cond(tf.equal(tf.strings.strip(income_bracket), '>50K'), 
                 lambda: tf.constant(1.0), 
                 lambda: tf.constant(0.0))
  return (trimmed_dict, income_bracket_float)

def read_bigquery(table_name):
  tensorflow_io_bigquery_client = BigQueryClient()
  read_session = tensorflow_io_bigquery_client.read_session(
      "projects/" + PROJECT_ID,
      PROJECT_ID, table_name, DATASET_ID,
      list(field.name for field in CSV_SCHEMA 
           if not field.name in UNUSED_COLUMNS),
      list(dtypes.double if field.field_type == 'FLOAT64' 
           else dtypes.string for field in CSV_SCHEMA
           if not field.name in UNUSED_COLUMNS),
      requested_streams=2)
  
  dataset = read_session.parallel_read_rows()
  transformed_ds = dataset.map (transofrom_row)
  return transformed_ds

 
 BATCH_SIZE = 32

training_ds = read_bigquery(TRAINING_TABLE_ID).shuffle(10000).batch(BATCH_SIZE)
eval_ds = read_bigquery(EVAL_TABLE_ID).batch(BATCH_SIZE)
 

กำหนดคอลัมน์คุณสมบัติ

 def get_categorical_feature_values(column):
  query = 'SELECT DISTINCT TRIM({}) FROM `{}`.{}.{}'.format(column, PROJECT_ID, DATASET_ID, TRAINING_TABLE_ID)
  client = bigquery.Client(project=PROJECT_ID)
  dataset_ref = client.dataset(DATASET_ID)
  job_config = bigquery.QueryJobConfig()
  query_job = client.query(query, job_config=job_config)
  result = query_job.to_dataframe()
  return result.values[:,0]
 
 from tensorflow import feature_column

feature_columns = []

# numeric cols
for header in ['capital_gain', 'capital_loss', 'hours_per_week']:
  feature_columns.append(feature_column.numeric_column(header))

# categorical cols
for header in ['workclass', 'marital_status', 'occupation', 'relationship',
               'race', 'native_country', 'education']:
  categorical_feature = feature_column.categorical_column_with_vocabulary_list(
        header, get_categorical_feature_values(header))
  categorical_feature_one_hot = feature_column.indicator_column(categorical_feature)
  feature_columns.append(categorical_feature_one_hot)

# bucketized cols
age = feature_column.numeric_column('age')
age_buckets = feature_column.bucketized_column(age, boundaries=[18, 25, 30, 35, 40, 45, 50, 55, 60, 65])
feature_columns.append(age_buckets)

feature_layer = tf.keras.layers.DenseFeatures(feature_columns)
 

สร้างและฝึกฝนโมเดล

สร้างแบบจำลอง

 Dense = tf.keras.layers.Dense
model = tf.keras.Sequential(
  [
    feature_layer,
      Dense(100, activation=tf.nn.relu, kernel_initializer='uniform'),
      Dense(75, activation=tf.nn.relu),
      Dense(50, activation=tf.nn.relu),
      Dense(25, activation=tf.nn.relu),
      Dense(1, activation=tf.nn.sigmoid)
  ])

# Compile Keras model
model.compile(
    loss='binary_crossentropy', 
    metrics=['accuracy'])
 

โมเดลรถไฟ

 model.fit(training_ds, epochs=5)
 
WARNING:tensorflow:Layer sequential is casting an input tensor from dtype float64 to the layer's dtype of float32, which is new behavior in TensorFlow 2.  The layer has dtype float32 because it's dtype defaults to floatx.

If you intended to run this layer in float32, you can safely ignore this warning. If in doubt, this warning is likely only an issue if you are porting a TensorFlow 1.X model to TensorFlow 2.

To change all layers to have dtype float64 by default, call `tf.keras.backend.set_floatx('float64')`. To change just this layer, pass dtype='float64' to the layer constructor. If you are the author of this layer, you can disable autocasting by passing autocast=False to the base Layer constructor.

Warning:tensorflow:From /usr/local/lib/python3.6/dist-packages/tensorflow_core/python/feature_column/feature_column_v2.py:4276: IndicatorColumn._variable_shape (from tensorflow.python.feature_column.feature_column_v2) is deprecated and will be removed in a future version.
Instructions for updating:
The old _FeatureColumn APIs are being deprecated. Please use the new FeatureColumn APIs instead.
WARNING:tensorflow:From /usr/local/lib/python3.6/dist-packages/tensorflow_core/python/feature_column/feature_column_v2.py:4331: VocabularyListCategoricalColumn._num_buckets (from tensorflow.python.feature_column.feature_column_v2) is deprecated and will be removed in a future version.
Instructions for updating:
The old _FeatureColumn APIs are being deprecated. Please use the new FeatureColumn APIs instead.
Epoch 1/5
1018/1018 [==============================] - 17s 17ms/step - loss: 0.5985 - accuracy: 0.8105
Epoch 2/5
1018/1018 [==============================] - 10s 10ms/step - loss: 0.3670 - accuracy: 0.8324
Epoch 3/5
1018/1018 [==============================] - 11s 10ms/step - loss: 0.3487 - accuracy: 0.8393
Epoch 4/5
1018/1018 [==============================] - 11s 10ms/step - loss: 0.3398 - accuracy: 0.8435
Epoch 5/5
1018/1018 [==============================] - 11s 11ms/step - loss: 0.3377 - accuracy: 0.8455

<tensorflow.python.keras.callbacks.History at 0x7f978f5b91d0>

ประเมินโมเดล

ประเมินโมเดล

 loss, accuracy = model.evaluate(eval_ds)
print("Accuracy", accuracy)
 
509/509 [==============================] - 8s 15ms/step - loss: 0.3338 - accuracy: 0.8398
Accuracy 0.8398452

ประเมินตัวอย่างสุ่มสองสามตัวอย่าง

 sample_x = {
    'age' : np.array([56, 36]), 
    'workclass': np.array(['Local-gov', 'Private']), 
    'education': np.array(['Bachelors', 'Bachelors']), 
    'marital_status': np.array(['Married-civ-spouse', 'Married-civ-spouse']), 
    'occupation': np.array(['Tech-support', 'Other-service']), 
    'relationship': np.array(['Husband', 'Husband']), 
    'race': np.array(['White', 'Black']), 
    'gender': np.array(['Male', 'Male']), 
    'capital_gain': np.array([0, 7298]), 
    'capital_loss': np.array([0, 0]), 
    'hours_per_week': np.array([40, 36]), 
    'native_country': np.array(['United-States', 'United-States'])
  }

model.predict(sample_x)
 
array([[0.5541261],
       [0.6209938]], dtype=float32)

ทรัพยากร