Trang này được dịch bởi Cloud Translation API.
Switch to English

Ví dụ từ đầu đến cuối cho trình đọc BigQuery TensorFlow

Xem trên TensorFlow.org Chạy trong Google Colab Xem nguồn trên GitHub Tải vở

Tổng quat

Hướng dẫn này chỉ ra cách sử dụng trình đọc BigQuery TensorFlow để đào tạo mạng thần kinh bằng cách sử dụng API tuần tự Keras.

Bộ dữ liệu

Hướng dẫn này sử dụng Bộ dữ liệu thu nhập điều tra dân số Hoa Kỳ được cung cấp bởi Kho lưu trữ máy học UC Irvine . Bộ dữ liệu này chứa thông tin về những người từ cơ sở dữ liệu Điều tra dân số năm 1994, bao gồm tuổi tác, học vấn, tình trạng hôn nhân, nghề nghiệp và liệu họ có kiếm được hơn 50.000 đô la mỗi năm hay không.

Thiết lập

Thiết lập dự án GCP của bạn

Các bước sau đây là bắt buộc, bất kể môi trường máy tính xách tay của bạn.

  1. Chọn hoặc tạo một dự án GCP.
  2. Hãy chắc chắn rằng thanh toán được kích hoạt cho dự án của bạn.
  3. Kích hoạt API lưu trữ BigQuery
  4. Nhập ID dự án của bạn vào ô bên dưới. Sau đó chạy ô để đảm bảo Cloud SDK sử dụng đúng dự án cho tất cả các lệnh trong sổ ghi chép này.

Cài đặt các gói cần thiết và khởi động lại thời gian chạy

 try:
  # Use the Colab's preinstalled TensorFlow 2.x
  %tensorflow_version 2.x 
except:
  pass
 
pip install fastavro
pip install tensorflow-io==0.9.0
pip install google-cloud-bigquery-storage

Xác thực

 from google.colab import auth
auth.authenticate_user()
print('Authenticated')
 

Đặt ID dự án của bạn

 PROJECT_ID = "<YOUR PROJECT>" 
! gcloud config set project $PROJECT_ID
%env GCLOUD_PROJECT=$PROJECT_ID
 

Nhập thư viện Python, xác định hằng

 from __future__ import absolute_import, division, print_function, unicode_literals

import os
from six.moves import urllib
import tempfile

import numpy as np
import pandas as pd
import tensorflow as tf

from google.cloud import bigquery
from google.api_core.exceptions import GoogleAPIError

LOCATION = 'us'

# Storage directory
DATA_DIR = os.path.join(tempfile.gettempdir(), 'census_data')

# Download options.
DATA_URL = 'https://storage.googleapis.com/cloud-samples-data/ml-engine/census/data'
TRAINING_FILE = 'adult.data.csv'
EVAL_FILE = 'adult.test.csv'
TRAINING_URL = '%s/%s' % (DATA_URL, TRAINING_FILE)
EVAL_URL = '%s/%s' % (DATA_URL, EVAL_FILE)

DATASET_ID = 'census_dataset'
TRAINING_TABLE_ID = 'census_training_table'
EVAL_TABLE_ID = 'census_eval_table'

CSV_SCHEMA = [
      bigquery.SchemaField("age", "FLOAT64"),
      bigquery.SchemaField("workclass", "STRING"),
      bigquery.SchemaField("fnlwgt", "FLOAT64"),
      bigquery.SchemaField("education", "STRING"),
      bigquery.SchemaField("education_num", "FLOAT64"),
      bigquery.SchemaField("marital_status", "STRING"),
      bigquery.SchemaField("occupation", "STRING"),
      bigquery.SchemaField("relationship", "STRING"),
      bigquery.SchemaField("race", "STRING"),
      bigquery.SchemaField("gender", "STRING"),
      bigquery.SchemaField("capital_gain", "FLOAT64"),
      bigquery.SchemaField("capital_loss", "FLOAT64"),
      bigquery.SchemaField("hours_per_week", "FLOAT64"),
      bigquery.SchemaField("native_country", "STRING"),
      bigquery.SchemaField("income_bracket", "STRING"),
  ]

UNUSED_COLUMNS = ["fnlwgt", "education_num"]
 

Nhập dữ liệu điều tra dân số vào BigQuery

Xác định các phương thức của trình trợ giúp để tải dữ liệu vào BigQuery

 def create_bigquery_dataset_if_necessary(dataset_id):
  # Construct a full Dataset object to send to the API.
  client = bigquery.Client(project=PROJECT_ID)
  dataset = bigquery.Dataset(bigquery.dataset.DatasetReference(PROJECT_ID, dataset_id))
  dataset.location = LOCATION

  try:
    dataset = client.create_dataset(dataset)  # API request
    return True
  except GoogleAPIError as err:
    if err.code != 409: # http_client.CONFLICT
      raise
  return False

 
 def load_data_into_bigquery(url, table_id):
  create_bigquery_dataset_if_necessary(DATASET_ID)
  client = bigquery.Client(project=PROJECT_ID)
  dataset_ref = client.dataset(DATASET_ID)
  table_ref = dataset_ref.table(table_id)
  job_config = bigquery.LoadJobConfig()
  job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE
  job_config.source_format = bigquery.SourceFormat.CSV
  job_config.schema = CSV_SCHEMA

  load_job = client.load_table_from_uri(
      url, table_ref, job_config=job_config
  )
  print("Starting job {}".format(load_job.job_id))

  load_job.result()  # Waits for table load to complete.
  print("Job finished.")

  destination_table = client.get_table(table_ref)
  print("Loaded {} rows.".format(destination_table.num_rows))
 

Tải dữ liệu điều tra dân số trong BigQuery.

 load_data_into_bigquery(TRAINING_URL, TRAINING_TABLE_ID)
load_data_into_bigquery(EVAL_URL, EVAL_TABLE_ID)
 
Starting job 2ceffef8-e6e4-44bb-9e86-3d97b0501187
Job finished.
Loaded 32561 rows.
Starting job bf66f1b3-2506-408b-9009-c19f4ae9f58a
Job finished.
Loaded 16278 rows.

Xác nhận rằng dữ liệu đã được nhập

TODO: thay thế <DỰ ÁN CỦA BẠN> bằng DỰ ÁN_ID của bạn

 %%bigquery --use_bqstorage_api
SELECT * FROM `<YOUR PROJECT>.census_dataset.census_training_table` LIMIT 5
 

Tải dữ liệu điều tra dân số trong Bộ dữ liệu TensorFlow bằng trình đọc BigQuery

Đọc và chuyển đổi dữ liệu Caesnus từ BigQuery thành Tập dữ liệu TensorFlow

 from tensorflow.python.framework import ops
from tensorflow.python.framework import dtypes
from tensorflow_io.bigquery import BigQueryClient
from tensorflow_io.bigquery import BigQueryReadSession
  
def transofrom_row(row_dict):
  # Trim all string tensors
  trimmed_dict = { column:
                  (tf.strings.strip(tensor) if tensor.dtype == 'string' else tensor) 
                  for (column,tensor) in row_dict.items()
                  }
  # Extract feature column
  income_bracket = trimmed_dict.pop('income_bracket')
  # Convert feature column to 0.0/1.0
  income_bracket_float = tf.cond(tf.equal(tf.strings.strip(income_bracket), '>50K'), 
                 lambda: tf.constant(1.0), 
                 lambda: tf.constant(0.0))
  return (trimmed_dict, income_bracket_float)

def read_bigquery(table_name):
  tensorflow_io_bigquery_client = BigQueryClient()
  read_session = tensorflow_io_bigquery_client.read_session(
      "projects/" + PROJECT_ID,
      PROJECT_ID, table_name, DATASET_ID,
      list(field.name for field in CSV_SCHEMA 
           if not field.name in UNUSED_COLUMNS),
      list(dtypes.double if field.field_type == 'FLOAT64' 
           else dtypes.string for field in CSV_SCHEMA
           if not field.name in UNUSED_COLUMNS),
      requested_streams=2)
  
  dataset = read_session.parallel_read_rows()
  transformed_ds = dataset.map (transofrom_row)
  return transformed_ds

 
 BATCH_SIZE = 32

training_ds = read_bigquery(TRAINING_TABLE_ID).shuffle(10000).batch(BATCH_SIZE)
eval_ds = read_bigquery(EVAL_TABLE_ID).batch(BATCH_SIZE)
 

Xác định các cột tính năng

 def get_categorical_feature_values(column):
  query = 'SELECT DISTINCT TRIM({}) FROM `{}`.{}.{}'.format(column, PROJECT_ID, DATASET_ID, TRAINING_TABLE_ID)
  client = bigquery.Client(project=PROJECT_ID)
  dataset_ref = client.dataset(DATASET_ID)
  job_config = bigquery.QueryJobConfig()
  query_job = client.query(query, job_config=job_config)
  result = query_job.to_dataframe()
  return result.values[:,0]
 
 from tensorflow import feature_column

feature_columns = []

# numeric cols
for header in ['capital_gain', 'capital_loss', 'hours_per_week']:
  feature_columns.append(feature_column.numeric_column(header))

# categorical cols
for header in ['workclass', 'marital_status', 'occupation', 'relationship',
               'race', 'native_country', 'education']:
  categorical_feature = feature_column.categorical_column_with_vocabulary_list(
        header, get_categorical_feature_values(header))
  categorical_feature_one_hot = feature_column.indicator_column(categorical_feature)
  feature_columns.append(categorical_feature_one_hot)

# bucketized cols
age = feature_column.numeric_column('age')
age_buckets = feature_column.bucketized_column(age, boundaries=[18, 25, 30, 35, 40, 45, 50, 55, 60, 65])
feature_columns.append(age_buckets)

feature_layer = tf.keras.layers.DenseFeatures(feature_columns)
 

Xây dựng và đào tạo mô hình

Xây dựng mô hình

 Dense = tf.keras.layers.Dense
model = tf.keras.Sequential(
  [
    feature_layer,
      Dense(100, activation=tf.nn.relu, kernel_initializer='uniform'),
      Dense(75, activation=tf.nn.relu),
      Dense(50, activation=tf.nn.relu),
      Dense(25, activation=tf.nn.relu),
      Dense(1, activation=tf.nn.sigmoid)
  ])

# Compile Keras model
model.compile(
    loss='binary_crossentropy', 
    metrics=['accuracy'])
 

Mô hình tàu hỏa

 model.fit(training_ds, epochs=5)
 
WARNING:tensorflow:Layer sequential is casting an input tensor from dtype float64 to the layer's dtype of float32, which is new behavior in TensorFlow 2.  The layer has dtype float32 because it's dtype defaults to floatx.

If you intended to run this layer in float32, you can safely ignore this warning. If in doubt, this warning is likely only an issue if you are porting a TensorFlow 1.X model to TensorFlow 2.

To change all layers to have dtype float64 by default, call `tf.keras.backend.set_floatx('float64')`. To change just this layer, pass dtype='float64' to the layer constructor. If you are the author of this layer, you can disable autocasting by passing autocast=False to the base Layer constructor.

Warning:tensorflow:From /usr/local/lib/python3.6/dist-packages/tensorflow_core/python/feature_column/feature_column_v2.py:4276: IndicatorColumn._variable_shape (from tensorflow.python.feature_column.feature_column_v2) is deprecated and will be removed in a future version.
Instructions for updating:
The old _FeatureColumn APIs are being deprecated. Please use the new FeatureColumn APIs instead.
WARNING:tensorflow:From /usr/local/lib/python3.6/dist-packages/tensorflow_core/python/feature_column/feature_column_v2.py:4331: VocabularyListCategoricalColumn._num_buckets (from tensorflow.python.feature_column.feature_column_v2) is deprecated and will be removed in a future version.
Instructions for updating:
The old _FeatureColumn APIs are being deprecated. Please use the new FeatureColumn APIs instead.
Epoch 1/5
1018/1018 [==============================] - 17s 17ms/step - loss: 0.5985 - accuracy: 0.8105
Epoch 2/5
1018/1018 [==============================] - 10s 10ms/step - loss: 0.3670 - accuracy: 0.8324
Epoch 3/5
1018/1018 [==============================] - 11s 10ms/step - loss: 0.3487 - accuracy: 0.8393
Epoch 4/5
1018/1018 [==============================] - 11s 10ms/step - loss: 0.3398 - accuracy: 0.8435
Epoch 5/5
1018/1018 [==============================] - 11s 11ms/step - loss: 0.3377 - accuracy: 0.8455

<tensorflow.python.keras.callbacks.History at 0x7f978f5b91d0>

Đánh giá mô hình

Đánh giá mô hình

 loss, accuracy = model.evaluate(eval_ds)
print("Accuracy", accuracy)
 
509/509 [==============================] - 8s 15ms/step - loss: 0.3338 - accuracy: 0.8398
Accuracy 0.8398452

Đánh giá một vài mẫu ngẫu nhiên

 sample_x = {
    'age' : np.array([56, 36]), 
    'workclass': np.array(['Local-gov', 'Private']), 
    'education': np.array(['Bachelors', 'Bachelors']), 
    'marital_status': np.array(['Married-civ-spouse', 'Married-civ-spouse']), 
    'occupation': np.array(['Tech-support', 'Other-service']), 
    'relationship': np.array(['Husband', 'Husband']), 
    'race': np.array(['White', 'Black']), 
    'gender': np.array(['Male', 'Male']), 
    'capital_gain': np.array([0, 7298]), 
    'capital_loss': np.array([0, 0]), 
    'hours_per_week': np.array([40, 36]), 
    'native_country': np.array(['United-States', 'United-States'])
  }

model.predict(sample_x)
 
array([[0.5541261],
       [0.6209938]], dtype=float32)

Tài nguyên