Google I/O is a wrap! Catch up on TensorFlow sessions View sessions

High-performance Simulation with Kubernetes

This tutorial will describe how to set up high-performance simulation using a TFF runtime running on Kubernetes. The model is the same as in the previous tutorial, High-performance simulations with TFF. The only difference is that here we use a worker pool instead of a local executor.

This tutorial refers to Google Cloud's GKE to create the Kubernetes cluster, but all the steps after the cluster is created can be used with any Kubernetes installation.

View on TensorFlow.org Run in Google Colab View source on GitHub

在 GKE 上启动 TFF 工作进程

注:本教程假定用户目前拥有 GCP 项目。

创建一个 Kubernetes 集群

以下步骤只需执行一次。可以将该集群重用于将来的工作负载。

按照 GKE 说明来创建容器集群。本教程的其余部分假定集群的名称为 tff-cluster,但实际名称并不重要。当您到达“第 5 步:部署应用”时,请停止按照说明操作。

部署 TFF 工作进程应用

与 GCP 交互的命令可以在本地运行,也可以在 Google Cloud Shell 中运行。我们建议使用 Google Cloud Shell,因为它不需要其他设置。

  1. 运行以下命令来启动 Kubernetes 应用。
$ kubectl create deployment tff-workers --image=gcr.io/tensorflow-federated/remote-executor-service:{ {version} }
  1. 为应用添加一个负载均衡器。
$ kubectl expose deployment tff-workers --type=LoadBalancer --port 80 --target-port 8000

注:这会将您的部署公开到互联网,并且仅用于演示目的。对于生产用途,强烈建议使用防火墙和身份验证。

在 Google Cloud Console 上查找负载均衡器的 IP 地址。您稍后会需要它来将训练循环连接到工作进程应用。

(或者)在本地启动 Docker 容器

$ docker run --rm -p 8000:8000 gcr.io/tensorflow-federated/remote_executor_service:{ {version} }

设置 TFF 环境

!pip install --upgrade tensorflow_federated_nightly
!pip install --quiet --upgrade nest_asyncio

import nest_asyncio
nest_asyncio.apply()

定义要训练的模型

import collections
import time

import tensorflow as tf
import tensorflow_federated as tff

source, _ = tff.simulation.datasets.emnist.load_data()


def map_fn(example):
  return collections.OrderedDict(
      x=tf.reshape(example['pixels'], [-1, 784]), y=example['label'])


def client_data(n):
  ds = source.create_tf_dataset_for_client(source.client_ids[n])
  return ds.repeat(10).batch(20).map(map_fn)


train_data = [client_data(n) for n in range(10)]
input_spec = train_data[0].element_spec


def model_fn():
  model = tf.keras.models.Sequential([
      tf.keras.layers.Input(shape=(784,)),
      tf.keras.layers.Dense(units=10, kernel_initializer='zeros'),
      tf.keras.layers.Softmax(),
  ])
  return tff.learning.from_keras_model(
      model,
      input_spec=input_spec,
      loss=tf.keras.losses.SparseCategoricalCrossentropy(),
      metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])


trainer = tff.learning.build_federated_averaging_process(
    model_fn, client_optimizer_fn=lambda: tf.keras.optimizers.SGD(0.02))


def evaluate(num_rounds=10):
  state = trainer.initialize()
  for round in range(num_rounds):
    t1 = time.time()
    state, metrics = trainer.next(state, train_data)
    t2 = time.time()
    print('Round {}: loss {}, round time {}'.format(round, metrics.loss, t2 - t1))

设置远程执行器

默认情况下,TFF 在本地执行所有计算。在此步骤中,我们指示 TFF 连接到我们在上面设置的 Kubernetes 服务。确保在此处复制服务的 IP 地址。

import grpc

ip_address = '0.0.0.0' 
port = 80 

channels = [grpc.insecure_channel(f'{ip_address}:{port}') for _ in range(10)]

tff.backends.native.set_remote_execution_context(channels, rpc_mode='STREAMING')

运行训练

evaluate()
Round 0: loss 4.370407581329346, round time 4.201097726821899
Round 1: loss 4.1407670974731445, round time 3.3283166885375977
Round 2: loss 3.865147590637207, round time 3.098310947418213
Round 3: loss 3.534019708633423, round time 3.1565616130828857
Round 4: loss 3.272688388824463, round time 3.175067663192749
Round 5: loss 2.935391664505005, round time 3.008434534072876
Round 6: loss 2.7399251461029053, round time 3.31435227394104
Round 7: loss 2.5054931640625, round time 3.4411356449127197
Round 8: loss 2.290508985519409, round time 3.158798933029175
Round 9: loss 2.1194536685943604, round time 3.1348156929016113