Ejemplo de extremo a extremo para el lector de BigQuery TensorFlow

Descripción general

Este tutorial muestra cómo utilizar lector de BigQuery TensorFlow para la formación de redes neuronales mediante la API secuencial Keras.

Conjunto de datos

En este tutorial se utiliza el censo de Estados Unidos Ingresos conjunto de datos proporcionado por la Universidad de California en Irvine Machine Learning Repositorio . Este conjunto de datos contiene información sobre personas de una base de datos del censo de 1994, incluida la edad, la educación, el estado civil, la ocupación y si ganan más de $ 50,000 al año.


Configura tu proyecto de GCP

Se requieren los siguientes pasos, independientemente del entorno de su computadora portátil.

  1. Seleccione o cree un proyecto de GCP.
  2. Asegúrese de que la facturación esté habilitada para su proyecto.
  3. Habilita la API de BigQuery Storage
  4. Ingrese su ID de proyecto en la celda a continuación. Luego, ejecute la celda para asegurarse de que el SDK de Cloud use el proyecto correcto para todos los comandos en este cuaderno.

Instale los paquetes necesarios y reinicie el tiempo de ejecución

  # Use the Colab's preinstalled TensorFlow 2.x
  %tensorflow_version 2.x 
pip install fastavro
pip install tensorflow-io==0.9.0
pip install google-cloud-bigquery-storage


from google.colab import auth

Establezca su ID de PROYECTO

! gcloud config set project $PROJECT_ID

Importar bibliotecas de Python, definir constantes

from __future__ import absolute_import, division, print_function, unicode_literals

import os
from six.moves import urllib
import tempfile

import numpy as np
import pandas as pd
import tensorflow as tf

from google.cloud import bigquery
from google.api_core.exceptions import GoogleAPIError


# Storage directory
DATA_DIR = os.path.join(tempfile.gettempdir(), 'census_data')

# Download options.
DATA_URL = 'https://storage.googleapis.com/cloud-samples-data/ml-engine/census/data'
TRAINING_FILE = 'adult.data.csv'
EVAL_FILE = 'adult.test.csv'

DATASET_ID = 'census_dataset'
TRAINING_TABLE_ID = 'census_training_table'
EVAL_TABLE_ID = 'census_eval_table'

      bigquery.SchemaField("age", "FLOAT64"),
      bigquery.SchemaField("workclass", "STRING"),
      bigquery.SchemaField("fnlwgt", "FLOAT64"),
      bigquery.SchemaField("education", "STRING"),
      bigquery.SchemaField("education_num", "FLOAT64"),
      bigquery.SchemaField("marital_status", "STRING"),
      bigquery.SchemaField("occupation", "STRING"),
      bigquery.SchemaField("relationship", "STRING"),
      bigquery.SchemaField("race", "STRING"),
      bigquery.SchemaField("gender", "STRING"),
      bigquery.SchemaField("capital_gain", "FLOAT64"),
      bigquery.SchemaField("capital_loss", "FLOAT64"),
      bigquery.SchemaField("hours_per_week", "FLOAT64"),
      bigquery.SchemaField("native_country", "STRING"),
      bigquery.SchemaField("income_bracket", "STRING"),

UNUSED_COLUMNS = ["fnlwgt", "education_num"]

Importar datos del censo a BigQuery

Definir métodos auxiliares para cargar datos en BigQuery

def create_bigquery_dataset_if_necessary(dataset_id):
  # Construct a full Dataset object to send to the API.
  client = bigquery.Client(project=PROJECT_ID)
  dataset = bigquery.Dataset(bigquery.dataset.DatasetReference(PROJECT_ID, dataset_id))
  dataset.location = LOCATION

    dataset = client.create_dataset(dataset)  # API request
    return True
  except GoogleAPIError as err:
    if err.code != 409: # http_client.CONFLICT
  return False
def load_data_into_bigquery(url, table_id):
  client = bigquery.Client(project=PROJECT_ID)
  dataset_ref = client.dataset(DATASET_ID)
  table_ref = dataset_ref.table(table_id)
  job_config = bigquery.LoadJobConfig()
  job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE
  job_config.source_format = bigquery.SourceFormat.CSV
  job_config.schema = CSV_SCHEMA

  load_job = client.load_table_from_uri(
      url, table_ref, job_config=job_config
  print("Starting job {}".format(load_job.job_id))

  load_job.result()  # Waits for table load to complete.
  print("Job finished.")

  destination_table = client.get_table(table_ref)
  print("Loaded {} rows.".format(destination_table.num_rows))

Cargar datos del censo en BigQuery.

load_data_into_bigquery(TRAINING_URL, TRAINING_TABLE_ID)
load_data_into_bigquery(EVAL_URL, EVAL_TABLE_ID)
Confirme que los datos fueron importados

TODO: reemplace <SU PROYECTO> con su PROJECT_ID

%%bigquery --use_bqstorage_api
SELECT * FROM `<YOUR PROJECT>.census_dataset.census_training_table` LIMIT 5

Cargar datos del censo en TensorFlow DataSet con el lector de BigQuery

Leer y transformar datos de cesnus de BigQuery en TensorFlow DataSet

from tensorflow.python.framework import ops
from tensorflow.python.framework import dtypes
from tensorflow_io.bigquery import BigQueryClient
from tensorflow_io.bigquery import BigQueryReadSession

def transform_row(row_dict):
  # Trim all string tensors
  trimmed_dict = { column:
                  (tf.strings.strip(tensor) if tensor.dtype == 'string' else tensor) 
                  for (column,tensor) in row_dict.items()
  # Extract feature column
  income_bracket = trimmed_dict.pop('income_bracket')
  # Convert feature column to 0.0/1.0
  income_bracket_float = tf.cond(tf.equal(tf.strings.strip(income_bracket), '>50K'), 
                 lambda: tf.constant(1.0), 
                 lambda: tf.constant(0.0))
  return (trimmed_dict, income_bracket_float)

def read_bigquery(table_name):
  tensorflow_io_bigquery_client = BigQueryClient()
  read_session = tensorflow_io_bigquery_client.read_session(
      "projects/" + PROJECT_ID,
      PROJECT_ID, table_name, DATASET_ID,
      list(field.name for field in CSV_SCHEMA 
           if not field.name in UNUSED_COLUMNS),
      list(dtypes.double if field.field_type == 'FLOAT64' 
           else dtypes.string for field in CSV_SCHEMA
           if not field.name in UNUSED_COLUMNS),

  dataset = read_session.parallel_read_rows()
  transformed_ds = dataset.map(transform_row)
  return transformed_ds

training_ds = read_bigquery(TRAINING_TABLE_ID).shuffle(10000).batch(BATCH_SIZE)
eval_ds = read_bigquery(EVAL_TABLE_ID).batch(BATCH_SIZE)

Definir columnas de características

def get_categorical_feature_values(column):
  query = 'SELECT DISTINCT TRIM({}) FROM `{}`.{}.{}'.format(column, PROJECT_ID, DATASET_ID, TRAINING_TABLE_ID)
  client = bigquery.Client(project=PROJECT_ID)
  dataset_ref = client.dataset(DATASET_ID)
  job_config = bigquery.QueryJobConfig()
  query_job = client.query(query, job_config=job_config)
  result = query_job.to_dataframe()
  return result.values[:,0]
from tensorflow import feature_column

feature_columns = []

# numeric cols
for header in ['capital_gain', 'capital_loss', 'hours_per_week']:

# categorical cols
for header in ['workclass', 'marital_status', 'occupation', 'relationship',
               'race', 'native_country', 'education']:
  categorical_feature = feature_column.categorical_column_with_vocabulary_list(
        header, get_categorical_feature_values(header))
  categorical_feature_one_hot = feature_column.indicator_column(categorical_feature)

# bucketized cols
age = feature_column.numeric_column('age')
age_buckets = feature_column.bucketized_column(age, boundaries=[18, 25, 30, 35, 40, 45, 50, 55, 60, 65])

feature_layer = tf.keras.layers.DenseFeatures(feature_columns)

Construir y entrenar modelo

Modelo de construcción

Dense = tf.keras.layers.Dense
model = tf.keras.Sequential(
      Dense(100, activation=tf.nn.relu, kernel_initializer='uniform'),
      Dense(75, activation=tf.nn.relu),
      Dense(50, activation=tf.nn.relu),
      Dense(25, activation=tf.nn.relu),
      Dense(1, activation=tf.nn.sigmoid)

# Compile Keras model

Modelo de tren

model.fit(training_ds, epochs=5)
Evaluar modelo

Evaluar modelo

loss, accuracy = model.evaluate(eval_ds)
print("Accuracy", accuracy)
Accuracy 0.8398452

Evaluar un par de muestras aleatorias

sample_x = {
    'age' : np.array([56, 36]), 
    'workclass': np.array(['Local-gov', 'Private']), 
    'education': np.array(['Bachelors', 'Bachelors']), 
    'marital_status': np.array(['Married-civ-spouse', 'Married-civ-spouse']), 
    'occupation': np.array(['Tech-support', 'Other-service']), 
    'relationship': np.array(['Husband', 'Husband']), 
    'race': np.array(['White', 'Black']), 
    'gender': np.array(['Male', 'Male']), 
    'capital_gain': np.array([0, 7298]), 
    'capital_loss': np.array([0, 0]), 
    'hours_per_week': np.array([40, 36]), 
    'native_country': np.array(['United-States', 'United-States'])

