このページは Cloud Translation API によって翻訳されました。
Switch to English

TensorFlow確率のケーススタディ:共分散推定

TensorFlow.orgで表示 GoogleColabで実行 GitHubでソースを表示ノートブックをダウンロード

このノートブックは、TensorFlowの確率を学ぶためのケーススタディとして作成しました。私が解決することを選択した問題は、2次元平均0ガウス確率変数のサンプルの共分散行列を推定することです。この問題には、いくつかの優れた機能があります。

  • 共分散に逆ウィシャート事前分布を使用する場合(一般的なアプローチ)、問題には分析解があるため、結果を確認できます。
  • この問題には、制約されたパラメーターのサンプリングが含まれ、興味深い複雑さが加わります。
  • 最も簡単なソリューションは最速のソリューションではないため、実行する最適化作業がいくつかあります。

私は自分の経験を書き留めることにしました。 TFPの細かい部分に頭を巻くのに少し時間がかかったので、このノートブックはかなり単純に起動し、徐々により複雑なTFP機能に移行します。途中で多くの問題に遭遇し、それらを特定するのに役立つプロセスと、最終的に見つけた回避策の両方をキャプチャしようとしました。私は多くの詳細を含めようとしました(個々のステップが正しいことを確認するための多くのテストを含みます)。

なぜTensorFlow確率を学ぶのですか?

TensorFlow Probabilityは、いくつかの理由で私のプロジェクトにとって魅力的であることがわかりました。

  • TensorFlow確率を使用すると、ノートブックでインタラクティブに複雑なモデルをプロトタイプ開発できます。コードを小さな断片に分割して、インタラクティブに単体テストでテストすることができます。
  • スケールアップの準備ができたら、TensorFlowを複数のマシン上の複数の最適化されたプロセッサで実行するために用意されているすべてのインフラストラクチャを利用できます。
  • 最後に、私はスタンが本当に好きですが、デバッグするのは非常に難しいと思います。すべてのモデリングコードは、コードを調べたり、中間状態を調べたりするためのツールがほとんどないスタンドアロン言語で作成する必要があります。

欠点は、TensorFlow ProbabilityがStanやPyMC3よりもはるかに新しいため、ドキュメントが進行中であり、まだ構築されていない機能がたくさんあることです。幸いなことに、TFPの基盤はしっかりしていて、モジュール方式で設計されているため、機能をかなり簡単に拡張できます。このノートブックでは、ケーススタディを解決することに加えて、TFPを拡張する方法をいくつか紹介します。

これは誰のためですか

私は、読者がいくつかの重要な前提条件を持ってこのノートブックに来ていると思います。あなたがすべき:

最初の試み

これが私の最初の問題の試みです。ネタバレ:私の解決策は機能しません、そしてそれは物事を正しくするためにいくつかの試みが必要になるでしょう!このプロセスにはしばらく時間がかかりますが、以下の各試行はTFPの新しい部分を学習するのに役立ちました。

注:TFPは現在、逆ウィシャート分布を実装していません(最後に、独自の逆ウィシャートをロールする方法を説明します)。その代わりに、問題をウィシャート事前分布を使用して精度行列を推定する問題に変更します。

import collections
import math
import os
import time

import numpy as np
import pandas as pd
import scipy
import scipy.stats
import matplotlib.pyplot as plt

import tensorflow.compat.v2 as tf
tf.enable_v2_behavior()
import tensorflow_probability as tfp
tfd = tfp.distributions
tfb = tfp.bijectors

ステップ1:観察結果をまとめる

ここでの私のデータはすべて合成であるため、これは実際の例よりも少し整理されているように見えます。ただし、独自の合成データを生成できない理由はありません。

ヒント:モデルの形式を決定したら、いくつかのパラメーター値を選択し、選択したモデルを使用していくつかの合成データを生成できます。実装のサニティチェックとして、選択したパラメーターの真の値が推定に含まれていることを確認できます。デバッグ/テストサイクルを高速化するために、モデルの簡略化されたバージョンを検討する場合があります(たとえば、使用するディメンションやサンプルの数を減らします)。

ヒント:観測値をNumPy配列として操作するのが最も簡単です。注意すべき重要な点の1つは、NumPyはデフォルトでfloat64を使用し、TensorFlowはデフォルトでfloat32を使用することです。

一般に、TensorFlow操作ではすべての引数に同じ型が必要であり、型を変更するには明示的なデータキャストを行う必要があります。 float64オブザベーションを使用する場合は、多くのキャスト操作を追加する必要があります。対照的に、NumPyは自動的にキャストを処理します。したがって、 TensorFlowにfloat64を使用させるよりも、Numpyデータをfloat32に変換する方がはるかに簡単です。

いくつかのパラメータ値を選択してください

# We're assuming 2-D data with a known true mean of (0, 0)
true_mean = np.zeros([2], dtype=np.float32)
# We'll make the 2 coordinates correlated
true_cor = np.array([[1.0, 0.9], [0.9, 1.0]], dtype=np.float32)
# And we'll give the 2 coordinates different variances
true_var = np.array([4.0, 1.0], dtype=np.float32)
# Combine the variances and correlations into a covariance matrix
true_cov = np.expand_dims(np.sqrt(true_var), axis=1).dot(
    np.expand_dims(np.sqrt(true_var), axis=1).T) * true_cor
# We'll be working with precision matrices, so we'll go ahead and compute the
# true precision matrix here
true_precision = np.linalg.inv(true_cov)
# Here's our resulting covariance matrix
print(true_cov)
# Verify that it's positive definite, since np.random.multivariate_normal
# complains about it not being positive definite for some reason.
# (Note that I'll be including a lot of sanity checking code in this notebook -
# it's a *huge* help for debugging)
print('eigenvalues: ', np.linalg.eigvals(true_cov))
[[4.  1.8]
 [1.8 1. ]]
eigenvalues:  [4.843075   0.15692513]

いくつかの合成観測を生成する

TensorFlow Probabilityは、データの初期ディメンションがサンプルインデックスを表し、データの最終ディメンションがサンプルのディメンションを表すという規則を使用していることに注意してください。

ここでは、それぞれが長さ2のベクトルである100個のサンプルが必要です。形状( my_dataの配列my_dataを生成します。 my_data[i, :]は$ i $番目のサンプルであり、長さ2のベクトルです。

my_data型をfloat32にすることを忘れないでください!)

# Set the seed so the results are reproducible.
np.random.seed(123)

# Now generate some observations of our random variable.
# (Note that I'm suppressing a bunch of spurious about the covariance matrix
# not being positive semidefinite via check_valid='ignore' because it really is
# positive definite!)
my_data = np.random.multivariate_normal(
    mean=true_mean, cov=true_cov, size=100,
    check_valid='ignore').astype(np.float32)
my_data.shape
(100, 2)

観察結果をサニティチェックする

バグの潜在的な原因の1つは、合成データを台無しにすることです。簡単なチェックをしてみましょう。

# Do a scatter plot of the observations to make sure they look like what we
# expect (higher variance on the x-axis, y values strongly correlated with x)
plt.scatter(my_data[:, 0], my_data[:, 1], alpha=0.75)
plt.show()

png

print('mean of observations:', np.mean(my_data, axis=0))
print('true mean:', true_mean)
mean of observations: [-0.24009615 -0.16638893]
true mean: [0. 0.]

print('covariance of observations:\n', np.cov(my_data, rowvar=False))
print('true covariance:\n', true_cov)
covariance of observations:
 [[3.95307734 1.68718486]
 [1.68718486 0.94910269]]
true covariance:
 [[4.  1.8]
 [1.8 1. ]]

わかりました、私たちのサンプルは合理的に見えます。次の一歩。

ステップ2:NumPyで尤度関数を実装する

TF確率でMCMCサンプリングを実行するために記述する必要がある主なものは、対数尤度関数です。一般に、NumPyよりもTFを書くのは少し難しいので、NumPyで最初の実装を行うと便利だと思います。尤度関数を、$ P(data | parameters)$に対応するデータ尤度関数と$ P(parameters)$に対応する事前尤度関数の2つに分割します。

目標はテスト用の値を生成することだけなので、これらのNumPy関数を超最適化/ベクトル化する必要はないことに注意してください。正確さが重要な考慮事項です!

最初に、データ対数尤度部分を実装します。それは非常に簡単です。覚えておくべきことの1つは、精度の高い行列を使用するため、それに応じてパラメーター化することです。

def log_lik_data_numpy(precision, data):
  # np.linalg.inv is a really inefficient way to get the covariance matrix, but
  # remember we don't care about speed here
  cov = np.linalg.inv(precision)
  rv = scipy.stats.multivariate_normal(true_mean, cov)
  return np.sum(rv.logpdf(data))

# test case: compute the log likelihood of the data given the true parameters
log_lik_data_numpy(true_precision, my_data)
-280.81822950593767

事後分布の分析ソリューションがあるため、精度行列にはウィシャート事前分布を使用します(ウィキペディアの共役事前分布の便利な表を参照)。

ウィッシャート分布には2つのパラメーターがあります。

  • 自由度の数(ウィキペディアでは$ \ nu $とラベル付けされています)
  • スケールマトリックス(ウィキペディアでは$ V $とラベル付けされています)

パラメータ$ \ nu、V $を持つウィシャート分布の平均は$ E [W] = \ nu V $であり、分散は$ \ text {Var}(W_ {ij})= \ nu(v_ {ij}です。 ^ 2 + v_ {ii} v_ {jj})$

いくつかの有用な直感:平均0と共分散$ V $の多変量正規確率変数から$ \ nu $独立描画$ x_1 \ ldots x _ {\ nu} $を生成し、合計$ W =を形成することにより、Wishartサンプルを生成できます。 \ sum_ {i = 1} ^ {\ nu} x_i x_i ^ T $。

Wishartサンプルを$ \ nu $で除算して再スケーリングすると、$ x_i $のサンプル共分散行列が得られます。このサンプル共分散行列は、$ \ nu $が増加するにつれて$ V $に向かう傾向があります。 $ \ nu $が小さい場合、サンプルの共分散行列には多くの変動があるため、$ \ nu $の小さい値は弱い事前分布に対応し、$ \ nu $の大きい値は強い事前分布に対応します。 $ \ nu $は、少なくともサンプリングする空間の次元と同じ大きさである必要があることに注意してください。そうしないと、特異行列が生成されます。

$ \ nu = 3 $を使用するので、事前確率は弱く、$ V = \ frac {1} {\ nu} I $を使用して、共分散推定値を同一性に引き寄せます(平均値を思い出してください)。 $ \ nu V $)です。

PRIOR_DF = 3
PRIOR_SCALE = np.eye(2, dtype=np.float32) / PRIOR_DF

def log_lik_prior_numpy(precision):
  rv = scipy.stats.wishart(df=PRIOR_DF, scale=PRIOR_SCALE)
  return rv.logpdf(precision)

# test case: compute the prior for the true parameters
log_lik_prior_numpy(true_precision)
-9.103606346649766

ウィッシャート分布は、既知の平均$ \ mu $の多変量正規分布の精度行列を推定するための共役事前分布です。

以前のWishartパラメーターが$ \ nu、V $であり、多変量正規分布$ x_1、\ ldots、x_n $の$ n $個の観測値があるとします。事後パラメータは$ n + \ nu、\ left(V ^ {-1} + \ sum_ {i = 1} ^ n(x_i- \ mu)(x_i- \ mu)^ T \ right)^ {-1 } $。

n = my_data.shape[0]
nu_prior = PRIOR_DF
v_prior = PRIOR_SCALE
nu_posterior = nu_prior + n
v_posterior = np.linalg.inv(np.linalg.inv(v_prior) + my_data.T.dot(my_data))
posterior_mean = nu_posterior * v_posterior
v_post_diag = np.expand_dims(np.diag(v_posterior), axis=1)
posterior_sd = np.sqrt(nu_posterior *
                       (v_posterior ** 2.0 + v_post_diag.dot(v_post_diag.T)))

事後確率と真の値の簡単なプロット。事後確率はサンプルの事後確率に近いですが、アイデンティティに向かって少し縮小されていることに注意してください。また、真の値は後部の最頻値からかなり離れていることにも注意してください。おそらくこれは、前部がデータとあまりよく一致していないためです。実際の問題では、共分散の前にスケーリングされた逆ウィシャートのようなものを使用したほうがよいでしょう(たとえば、この主題に関するAndrew Gelmanの解説を参照)が、その後の分析はうまくいきません。

sample_precision = np.linalg.inv(np.cov(my_data, rowvar=False, bias=False))
fig, axes = plt.subplots(2, 2)
fig.set_size_inches(10, 10)
for i in range(2):
  for j in range(2):
    ax = axes[i, j]
    loc = posterior_mean[i, j]
    scale = posterior_sd[i, j]
    xmin = loc - 3.0 * scale
    xmax = loc + 3.0 * scale
    x = np.linspace(xmin, xmax, 1000)
    y = scipy.stats.norm.pdf(x, loc=loc, scale=scale)
    ax.plot(x, y)
    ax.axvline(true_precision[i, j], color='red', label='True precision')
    ax.axvline(sample_precision[i, j], color='red', linestyle=':', label='Sample precision')
    ax.set_title('precision[%d, %d]' % (i, j))
plt.legend()
plt.show()

png

ステップ3:TensorFlowに尤度関数を実装する

ネタバレ:私たちの最初の試みはうまくいきません。その理由については、以下で説明します。

ヒント:尤度関数を開発するときは、TensorFlowイーガーモードを使用してください。熱心なモードでは、TFはNumPyのように動作します。すべてがすぐに実行されるため、 Session.run()を使用せずにインタラクティブにデバッグできSession.run() 。こちらのメモをご覧ください

予備:配布クラス

TFPには、ログ確率を生成するために使用する配布クラスのコレクションがあります。注意すべき点の1つは、これらのクラスは単一のサンプルではなくテンソルのサンプルで機能することです。これにより、ベクトル化と関連する高速化が可能になります。

分布は、2つの異なる方法でサンプルのテンソルを処理できます。単一のスカラーパラメーターを持つ分布を含む具体的な例を使用して、これら2つの方法を説明するのが最も簡単です。 rateパラメータを持つポアソン分布を使用します。

  • rateパラメータに単一の値を使用してPoissonを作成する場合、そのsample()メソッドを呼び出すと単一の値が返されます。この値はeventと呼ばれ、この場合、イベントはすべてスカラーです。
  • rateパラメータの値のテンソルを使用してポアソンを作成する場合、そのsample()メソッドを呼び出すと、レートテンソルの値ごとに1つずつ、複数の値が返されるようになりました。オブジェクトは、それぞれが独自のレートを持つ独立したPoissonsのコレクションとして機能し、 sample()呼び出しによって返される各値は、これらのPoissonsの1つに対応します。この独立しているが同一に分散されていないイベントのコレクションは、 batchと呼ばれます。
  • sample()メソッドは、デフォルトで空のタプルになるsample_shapeパラメーターをsample_shapeます。 sample_shape空でない値をsample_shape 、サンプルは複数のバッチを返します。このバッチのコレクションはsampleと呼ばれsample

分布のlog_prob()メソッドは、どのように類似なるようにデータを消費sample() 、それを生成します。 log_prob()は、サンプル、つまりイベントの複数の独立したバッチの確率を返します。

  • スカラーrateで作成されたポアソンオブジェクトがある場合、各バッチはスカラーであり、サンプルのテンソルを渡すと、同じサイズのログ確率のテンソルが取得されます。
  • rate値の形状Tのテンソルで作成されたポアソンオブジェクトがある場合、各バッチは形状Tテンソルです。形状D、Tのサンプルのテンソルを渡すと、形状D、Tの対数確率のテンソルが得られます。

以下は、これらのケースを説明するいくつかの例です。イベント、バッチ、およびシェイプの詳細なチュートリアルについては、 このノートブックを参照してください。

# case 1: get log probabilities for a vector of iid draws from a single
# normal distribution
norm1 = tfd.Normal(loc=0., scale=1.)
probs1 = norm1.log_prob(tf.constant([1., 0.5, 0.]))

# case 2: get log probabilities for a vector of independent draws from
# multiple normal distributions with different parameters.  Note the vector
# values for loc and scale in the Normal constructor.
norm2 = tfd.Normal(loc=[0., 2., 4.], scale=[1., 1., 1.])
probs2 = norm2.log_prob(tf.constant([1., 0.5, 0.]))

print('iid draws from a single normal:', probs1.numpy())
print('draws from a batch of normals:', probs2.numpy())
iid draws from a single normal: [-1.4189385 -1.0439385 -0.9189385]
draws from a batch of normals: [-1.4189385 -2.0439386 -8.918939 ]

データログの可能性

まず、データ対数尤度関数を実装します。

VALIDATE_ARGS = True
ALLOW_NAN_STATS = False

NumPyの場合との重要な違いの1つは、TensorFlow尤度関数が、単一の行列だけでなく、精度の高い行列のベクトルを処理する必要があることです。複数のチェーンからサンプリングする場合は、パラメーターのベクトルが使用されます。

精度行列のバッチ(つまり、チェーンごとに1つの行列)で機能する分布オブジェクトを作成します。

データの対数確率を計算するときは、バッチ変数ごとに1つのコピーが存在するように、パラメーターと同じ方法でデータを複製する必要があります。複製されたデータの形状は次のとおりである必要があります。

[sample shape, batch shape, event shape]

この場合、イベントの形状は2です(2次元ガウス分布を使用しているため)。 100個のサンプルがあるため、サンプルの形状は100です。バッチ形状は、使用している精度マトリックスの数になります。尤度関数を呼び出すたびにデータを複製するのは無駄なので、事前にデータを複製して、複製されたバージョンを渡します。

これは非効率的な実装であることに注意してくださいMultivariateNormalFullCovarianceは、最後の最適化セクションで説明するいくつかの代替案に比べてコストがかかります。

def log_lik_data(precisions, replicated_data):
  n = tf.shape(precisions)[0]  # number of precision matrices
  # We're estimating a precision matrix; we have to invert to get log
  # probabilities.  Cholesky inversion should be relatively efficient,
  # but as we'll see later, it's even better if we can avoid doing the Cholesky
  # decomposition altogether.
  precisions_cholesky = tf.linalg.cholesky(precisions)
  covariances = tf.linalg.cholesky_solve(
      precisions_cholesky, tf.linalg.eye(2, batch_shape=[n]))
  rv_data = tfd.MultivariateNormalFullCovariance(
      loc=tf.zeros([n, 2]),
      covariance_matrix=covariances,
      validate_args=VALIDATE_ARGS,
      allow_nan_stats=ALLOW_NAN_STATS)

  return tf.reduce_sum(rv_data.log_prob(replicated_data), axis=0)
# For our test, we'll use a tensor of 2 precision matrices.
# We'll need to replicate our data for the likelihood function.
# Remember, TFP wants the data to be structured so that the sample dimensions
# are first (100 here), then the batch dimensions (2 here because we have 2
# precision matrices), then the event dimensions (2 because we have 2-D
# Gaussian data).  We'll need to add a middle dimension for the batch using
# expand_dims, and then we'll need to create 2 replicates in this new dimension
# using tile.
n = 2
replicated_data = np.tile(np.expand_dims(my_data, axis=1), reps=[1, 2, 1])
print(replicated_data.shape)
(100, 2, 2)

ヒント:非常に役立つことがわかったのは、TensorFlow関数のサニティチェックを少し書くことです。 TFのベクトル化を台無しにするのは本当に簡単なので、より単純なNumPy関数を使用することは、TF出力を検証するための優れた方法です。これらは小さな単体テストと考えてください。

# check against the numpy implementation
precisions = np.stack([np.eye(2, dtype=np.float32), true_precision])
n = precisions.shape[0]
lik_tf = log_lik_data(precisions, replicated_data=replicated_data).numpy()

for i in range(n):
  print(i)
  print('numpy:', log_lik_data_numpy(precisions[i], my_data))
  print('tensorflow:', lik_tf[i])
0
numpy: -430.71218815801365
tensorflow: -430.71207
1
numpy: -280.81822950593767
tensorflow: -280.8182

以前の対数尤度

データの複製について心配する必要がないため、以前の方が簡単です。

@tf.function(autograph=False)
def log_lik_prior(precisions):
  rv_precision = tfd.WishartTriL(
      df=PRIOR_DF,
      scale_tril=tf.linalg.cholesky(PRIOR_SCALE),
      validate_args=VALIDATE_ARGS,
      allow_nan_stats=ALLOW_NAN_STATS)
  return rv_precision.log_prob(precisions)
# check against the numpy implementation
precisions = np.stack([np.eye(2, dtype=np.float32), true_precision])
n = precisions.shape[0]
lik_tf = log_lik_prior(precisions).numpy()

for i in range(n):
  print(i)
  print('numpy:', log_lik_prior_numpy(precisions[i]))
  print('tensorflow:', lik_tf[i])
0
numpy: -2.2351873809649625
tensorflow: -2.2351875
1
numpy: -9.103606346649766
tensorflow: -9.103608

共同対数尤度関数を作成します

上記のデータ対数尤度関数は観測値に依存しますが、サンプラーにはそれらがありません。 [closure](https://en.wikipedia.org/wiki/Closure_(computer_programming)を使用すると、グローバル変数を使用せずに依存関係を取り除くことができます。クロージャには、必要な変数を含む環境を構築する外部関数が含まれます。内部機能。

def get_log_lik(data, n_chains=1):
  # The data argument that is passed in will be available to the inner function
  # below so it doesn't have to be passed in as a parameter.
  replicated_data = np.tile(np.expand_dims(data, axis=1), reps=[1, n_chains, 1])

  @tf.function(autograph=False)
  def _log_lik(precision):
    return log_lik_data(precision, replicated_data) + log_lik_prior(precision)

  return _log_lik

ステップ4:サンプル

では、試してみましょう。簡単にするために、1つのチェーンを使用し、開始点として単位行列を使用します。後でもっと慎重に行います。

繰り返しますが、これは機能しません。例外が発生します。

@tf.function(autograph=False)
def sample():
  tf.random.set_seed(123)
  init_precision = tf.expand_dims(tf.eye(2), axis=0)

  # Use expand_dims because we want to pass in a tensor of starting values
  log_lik_fn = get_log_lik(my_data, n_chains=1)

  # we'll just do a few steps here
  num_results = 10
  num_burnin_steps = 10
  states = tfp.mcmc.sample_chain(
     num_results=num_results,
     num_burnin_steps=num_burnin_steps,
     current_state=[
         init_precision,
     ],
     kernel=tfp.mcmc.HamiltonianMonteCarlo(
         target_log_prob_fn=log_lik_fn,
         step_size=0.1,
         num_leapfrog_steps=3),
     trace_fn=None,
     seed=123)
  return states

try:
  states = sample()
except Exception as e:
  # shorten the giant stack trace
  lines = str(e).split('\n')
  print('\n'.join(lines[:5]+['...']+lines[-3:]))
 Cholesky decomposition was not successful. The input might not be valid.
     [[{ {node mcmc_sample_chain/trace_scan/while/body/_79/smart_for_loop/while/body/_371/mh_one_step/hmc_kernel_one_step/leapfrog_integrate/while/body/_537/leapfrog_integrate_one_step/maybe_call_fn_and_grads/value_and_gradients/StatefulPartitionedCall/Cholesky} }]] [Op:__inference_sample_2849]

Function call stack:
sample
...
Function call stack:
sample


問題の特定

InvalidArgumentError (see above for traceback): Cholesky decomposition was not successful. The input might not be valid.それはあまり役に立ちません。何が起こったのかについてもっと知ることができるかどうか見てみましょう。

  • 各ステップのパラメーターを出力して、失敗した値を確認できるようにします
  • 特定の問題を防ぐために、いくつかのアサーションを追加します。

アサーションはTensorFlow操作であるため注意が必要です。また、アサーションが実行され、グラフから最適化されないように注意する必要があります。 TFアサーションに精通していない場合は、TensorFlowデバッグのこの概要を読む価値があります。 tf.control_dependenciesを使用して、アサーションを明示的に強制的に実行できtf.control_dependencies (以下のコードのコメントを参照)。

TensorFlowのネイティブのPrint関数は、アサーションと同じ動作をします。これは操作であり、確実に実行されるように注意する必要があります。ノートブックで作業している場合、 Printによってさらに問題が発生します。その出力はstderrに送信され、 stderrはセルに表示されません。ここではトリックを使います:代わりに使用するのではtf.Print 、私たちは私たち自身を経由してTensorFlowの印刷操作を作成しますtf.pyfunc 。アサーションと同様に、メソッドが実行されることを確認する必要があります。

def get_log_lik_verbose(data, n_chains=1):
  # The data argument that is passed in will be available to the inner function
  # below so it doesn't have to be passed in as a parameter.
  replicated_data = np.tile(np.expand_dims(data, axis=1), reps=[1, n_chains, 1])

  def _log_lik(precisions):
    # An internal method we'll make into a TensorFlow operation via tf.py_func
    def _print_precisions(precisions):
      print('precisions:\n', precisions)
      return False  # operations must return something!
    # Turn our method into a TensorFlow operation
    print_op = tf.compat.v1.py_func(_print_precisions, [precisions], tf.bool)

    # Assertions are also operations, and some care needs to be taken to ensure
    # that they're executed
    assert_op = tf.assert_equal(
        precisions, tf.linalg.matrix_transpose(precisions),
        message='not symmetrical', summarize=4, name='symmetry_check')

    # The control_dependencies statement forces its arguments to be executed
    # before subsequent operations
    with tf.control_dependencies([print_op, assert_op]):
      return (log_lik_data(precisions, replicated_data) +
              log_lik_prior(precisions))

  return _log_lik
@tf.function(autograph=False)
def sample():
  tf.random.set_seed(123)
  init_precision = tf.eye(2)[tf.newaxis, ...]
  log_lik_fn = get_log_lik_verbose(my_data)
  # we'll just do a few steps here
  num_results = 10
  num_burnin_steps = 10
  states = tfp.mcmc.sample_chain(
      num_results=num_results,
      num_burnin_steps=num_burnin_steps,
      current_state=[
          init_precision,
      ],
      kernel=tfp.mcmc.HamiltonianMonteCarlo(
          target_log_prob_fn=log_lik_fn,
          step_size=0.1,
          num_leapfrog_steps=3),
      trace_fn=None,
      seed=123)

try:
  states = sample()
except Exception as e:
  # shorten the giant stack trace
  lines = str(e).split('\n')
  print('\n'.join(lines[:5]+['...']+lines[-3:]))
precisions:
 [[[1. 0.]
  [0. 1.]]]
precisions:
 [[[1. 0.]
  [0. 1.]]]
precisions:
 [[[ 0.24315196 -0.2761638 ]
  [-0.33882257  0.8622    ]]]
 assertion failed: [not symmetrical] [Condition x == y did not hold element-wise:] [x (leapfrog_integrate_one_step/add:0) = ] [[[0.243151963 -0.276163787][-0.338822573 0.8622]]] [y (leapfrog_integrate_one_step/maybe_call_fn_and_grads/value_and_gradients/matrix_transpose/transpose:0) = ] [[[0.243151963 -0.338822573][-0.276163787 0.8622]]]
     [[{ {node mcmc_sample_chain/trace_scan/while/body/_96/smart_for_loop/while/body/_381/mh_one_step/hmc_kernel_one_step/leapfrog_integrate/while/body/_503/leapfrog_integrate_one_step/maybe_call_fn_and_grads/value_and_gradients/symmetry_check_1/Assert/AssertGuard/else/_577/Assert} }]] [Op:__inference_sample_4837]

Function call stack:
sample
...
Function call stack:
sample


これが失敗する理由

サンプラーが試行する最初の新しいパラメーター値は、非対称マトリックスです。コレスキー分解は対称(および正定値)行列に対してのみ定義されているため、コレスキー分解は失敗します。

ここでの問題は、対象のパラメータが精度行列であり、精度行列は実数、対称、正定値でなければならないことです。サンプラーはこの制約について何も知りません(おそらく勾配を介する場合を除く)。したがって、特にステップサイズが大きい場合、サンプラーが無効な値を提案し、例外が発生する可能性があります。

ハミルトニアンモンテカルロサンプラーを使用すると、非常に小さいステップサイズを使用して問題を回避できる場合があります。これは、勾配によってパラメーターが無効な領域から遠ざかるはずですが、ステップサイズが小さいと収束が遅くなるためです。勾配について何も知らないメトロポリス・ヘイスティングスのサンプラーで、私たちは運命にあります。

バージョン2:制約のないパラメーターへの再パラメーター化

上記の問題には簡単な解決策があります。新しいパラメーターにこれらの制約がなくなるように、モデルのパラメーターを再設定できます。 TFPは、まさにそれを行うための便利なツールセット(バイジェクター)を提供します。

バイジェクターによるパラメーター化

精度行列は実数で対称でなければなりません。これらの制約がない代替のパラメーター化が必要です。出発点は、精度行列のコレスキー分解です。コレスキー因子は依然として制約されています-それらは下三角行列であり、それらの対角要素は正でなければなりません。ただし、コレスキー因子の対角線の対数をとると、対数は正に制約されなくなり、下三角部分を1次元ベクトルに平坦化すると、下三角制約がなくなります。 。この場合の結果は、制約のない長さ3のベクトルになります。

Stanマニュアルには、変換を使用してパラメーターのさまざまなタイプの制約を削除するための優れた章があります。)

この再パラメータ化は、データログ尤度関数にほとんど影響を与えません-変換を反転して精度マトリックスを取得する必要があります-しかし、事前の影響はより複雑です。与えられた精度行列の確率はウィッシャート分布によって与えられることを指定しました。変換行列の確率はどれくらいですか?

単調関数$ g $を1次元確率変数$ X $、$ Y = g(X)$に適用すると、$ Y $の密度は次の式で与えられることを思い出してください。

$$ f_Y(y) = | \frac{d}{dy}(g^{-1}(y)) | f_X(g^{-1}(y)) $$

$ g ^ {-1} $項の導関数は、$ g $がローカルボリュームを変更する方法を説明します。高次元の確率変数の場合、補正係数は$ g ^ {-1} $のヤコビアンの行列式の絶対値です( ここを参照)。

逆変換のヤコビアンを対数事前尤度関数に追加する必要があります。幸い、TFPのBijectorクラスがこれを処理してくれます。

Bijectorクラスは、確率密度関数の変数を変更するために使用される可逆で滑らかな関数を表すために使用されます。バイジェクターにはすべて、変換を実行するforward()メソッド、それを反転するinverse()メソッド、およびinverse_log_det_jacobian()するときに必要なヤコビアン補正を提供するforward_log_det_jacobian() inverse_log_det_jacobian()メソッドとinverse_log_det_jacobian()メソッドがあります。

TFPは、 Chain演算子を介した合成を通じて組み合わせて、非常に複雑な変換を形成できる便利なバイジェクターのコレクションを提供します。この例では、次の3つのバイジェクターを作成します(チェーン内の操作は右から左に実行されます)。

  1. 変換の最初のステップは、精度行列に対してコレスキー分解を実行することです。そのためのBijectorクラスはありません。ただし、CholeskyOuterProductバイジェクターは、2つのコレスキー因子の積を取ります。 Invert演算子を使用して、その操作の逆を使用できます。
  2. 次のステップは、コレスキー因子の対角要素の対数を取ることです。これは、 TransformDiagonalバイジェクターとExpバイジェクターの逆を介して実現します。
  3. 最後に、 FillTriangularバイジェクターの逆行列を使用して、行列の下三角部分をベクトルに平坦化します。
# Our transform has 3 stages that we chain together via composition:
precision_to_unconstrained = tfb.Chain([
    # step 3: flatten the lower triangular portion of the matrix
    tfb.Invert(tfb.FillTriangular(validate_args=VALIDATE_ARGS)),
    # step 2: take the log of the diagonals    
    tfb.TransformDiagonal(tfb.Invert(tfb.Exp(validate_args=VALIDATE_ARGS))),
    # step 1: decompose the precision matrix into its Cholesky factors
    tfb.Invert(tfb.CholeskyOuterProduct(validate_args=VALIDATE_ARGS)),
])
# sanity checks
m = tf.constant([[1., 2.], [2., 8.]])
m_fwd = precision_to_unconstrained.forward(m)
m_inv = precision_to_unconstrained.inverse(m_fwd)

# bijectors handle tensors of values, too!
m2 = tf.stack([m, tf.eye(2)])
m2_fwd = precision_to_unconstrained.forward(m2)
m2_inv = precision_to_unconstrained.inverse(m2_fwd)

print('single input:')
print('m:\n', m.numpy())
print('precision_to_unconstrained(m):\n', m_fwd.numpy())
print('inverse(precision_to_unconstrained(m)):\n', m_inv.numpy())
print()

print('tensor of inputs:')
print('m2:\n', m2.numpy())
print('precision_to_unconstrained(m2):\n', m2_fwd.numpy())
print('inverse(precision_to_unconstrained(m2)):\n', m2_inv.numpy())
single input:
m:
 [[1. 2.]
 [2. 8.]]
precision_to_unconstrained(m):
 [0.6931472 2.        0.       ]
inverse(precision_to_unconstrained(m)):
 [[1. 2.]
 [2. 8.]]

tensor of inputs:
m2:
 [[[1. 2.]
  [2. 8.]]

 [[1. 0.]
  [0. 1.]]]
precision_to_unconstrained(m2):
 [[0.6931472 2.        0.       ]
 [0.        0.        0.       ]]
inverse(precision_to_unconstrained(m2)):
 [[[1. 2.]
  [2. 8.]]

 [[1. 0.]
  [0. 1.]]]

TransformedDistributionクラスは、分布にバイジェクターを適用し、 log_prob()必要なヤコビアン補正を行うプロセスを自動化します。私たちの新しい事前情報は次のようになります。

def log_lik_prior_transformed(transformed_precisions):
  rv_precision = tfd.TransformedDistribution(
      tfd.WishartTriL(
          df=PRIOR_DF,
          scale_tril=tf.linalg.cholesky(PRIOR_SCALE),
          validate_args=VALIDATE_ARGS,
          allow_nan_stats=ALLOW_NAN_STATS),
      bijector=precision_to_unconstrained,
      validate_args=VALIDATE_ARGS)
  return rv_precision.log_prob(transformed_precisions)
# Check against the numpy implementation.  Note that when comparing, we need
# to add in the Jacobian correction.
precisions = np.stack([np.eye(2, dtype=np.float32), true_precision])
transformed_precisions = precision_to_unconstrained.forward(precisions)
lik_tf = log_lik_prior_transformed(transformed_precisions).numpy()
corrections = precision_to_unconstrained.inverse_log_det_jacobian(
    transformed_precisions, event_ndims=1).numpy()
n = precisions.shape[0]

for i in range(n):
  print(i)
  print('numpy:', log_lik_prior_numpy(precisions[i]) + corrections[i])
  print('tensorflow:', lik_tf[i])
0
numpy: -0.8488930160357633
tensorflow: -0.84889317
1
numpy: -7.305657151741624
tensorflow: -7.305659

データ対数尤度の変換を反転する必要があります。

precision = precision_to_unconstrained.inverse(transformed_precision)

実際には、精度行列のコレスキー分解が必要なので、ここでは部分的な逆行列を実行する方が効率的です。ただし、最適化は後で使用し、部分的な逆行列は読者の演習として残しておきます。

def log_lik_data_transformed(transformed_precisions, replicated_data):
  # We recover the precision matrix by inverting our bijector.  This is
  # inefficient since we really want the Cholesky decomposition of the
  # precision matrix, and the bijector has that in hand during the inversion,
  # but we'll worry about efficiency later.
  n = tf.shape(transformed_precisions)[0]
  precisions = precision_to_unconstrained.inverse(transformed_precisions)
  precisions_cholesky = tf.linalg.cholesky(precisions)
  covariances = tf.linalg.cholesky_solve(
      precisions_cholesky, tf.linalg.eye(2, batch_shape=[n]))
  rv_data = tfd.MultivariateNormalFullCovariance(
      loc=tf.zeros([n, 2]),
      covariance_matrix=covariances,
      validate_args=VALIDATE_ARGS,
      allow_nan_stats=ALLOW_NAN_STATS)

  return tf.reduce_sum(rv_data.log_prob(replicated_data), axis=0)
# sanity check
precisions = np.stack([np.eye(2, dtype=np.float32), true_precision])
transformed_precisions = precision_to_unconstrained.forward(precisions)
lik_tf = log_lik_data_transformed(
    transformed_precisions, replicated_data).numpy()

for i in range(precisions.shape[0]):
  print(i)
  print('numpy:', log_lik_data_numpy(precisions[i], my_data))
  print('tensorflow:', lik_tf[i])
0
numpy: -430.71218815801365
tensorflow: -430.71207
1
numpy: -280.81822950593767
tensorflow: -280.8182

ここでも、新しい関数をクロージャでラップします。

def get_log_lik_transformed(data, n_chains=1):
  # The data argument that is passed in will be available to the inner function
  # below so it doesn't have to be passed in as a parameter.
  replicated_data = np.tile(np.expand_dims(data, axis=1), reps=[1, n_chains, 1])

  @tf.function(autograph=False)
  def _log_lik_transformed(transformed_precisions):
    return (log_lik_data_transformed(transformed_precisions, replicated_data) +
            log_lik_prior_transformed(transformed_precisions))

  return _log_lik_transformed
# make sure everything runs
log_lik_fn = get_log_lik_transformed(my_data)
m = tf.eye(2)[tf.newaxis, ...]
lik = log_lik_fn(precision_to_unconstrained.forward(m)).numpy()
print(lik)
[-431.5611]

サンプリング

無効なパラメーター値が原因でサンプラーが爆発することを心配する必要がなくなったので、実際のサンプルをいくつか生成してみましょう。

サンプラーはパラメーターの制約のないバージョンで動作するため、初期値を制約のないバージョンに変換する必要があります。生成するサンプルもすべて制約のない形式になるため、元に戻す必要があります。バイジェクターはベクトル化されているため、簡単にベクトル化できます。

# We'll choose a proper random initial value this time
np.random.seed(123)
initial_value_cholesky = np.array(
    [[0.5 + np.random.uniform(), 0.0],
     [-0.5 + np.random.uniform(), 0.5 + np.random.uniform()]],
    dtype=np.float32)
initial_value =  initial_value_cholesky.dot(
  initial_value_cholesky.T)[np.newaxis, ...]

# The sampler works with unconstrained values, so we'll transform our initial
# value
initial_value_transformed = precision_to_unconstrained.forward(
  initial_value).numpy()
# Sample!
@tf.function(autograph=False)
def sample():
  tf.random.set_seed(123)
  log_lik_fn = get_log_lik_transformed(my_data, n_chains=1)

  num_results = 1000
  num_burnin_steps = 1000

  states, is_accepted = tfp.mcmc.sample_chain(
      num_results=num_results,
      num_burnin_steps=num_burnin_steps,
      current_state=[
          initial_value_transformed,
      ],
      kernel=tfp.mcmc.HamiltonianMonteCarlo(
          target_log_prob_fn=log_lik_fn,
          step_size=0.1,
          num_leapfrog_steps=3),
      trace_fn=lambda _, pkr: pkr.is_accepted,
      seed=123)
  # transform samples back to their constrained form
  precision_samples = [precision_to_unconstrained.inverse(s) for s in states]
  return states, precision_samples, is_accepted

states, precision_samples, is_accepted = sample()

サンプラーの出力の平均を分析的な事後平均と比較してみましょう!

print('True posterior mean:\n', posterior_mean)
print('Sample mean:\n', np.mean(np.reshape(precision_samples, [-1, 2, 2]), axis=0))
True posterior mean:
 [[ 0.9641779 -1.6534661]
 [-1.6534661  3.8683164]]
Sample mean:
 [[ 1.4315274  -0.25587553]
 [-0.25587553  0.5740424 ]]

遠いです!その理由を理解しましょう。まず、サンプルを見てみましょう。

np.reshape(precision_samples, [-1, 2, 2])
array([[[ 1.4315385, -0.2558777],
        [-0.2558777,  0.5740494]],

       [[ 1.4315385, -0.2558777],
        [-0.2558777,  0.5740494]],

       [[ 1.4315385, -0.2558777],
        [-0.2558777,  0.5740494]],

       ...,

       [[ 1.4315385, -0.2558777],
        [-0.2558777,  0.5740494]],

       [[ 1.4315385, -0.2558777],
        [-0.2558777,  0.5740494]],

       [[ 1.4315385, -0.2558777],
        [-0.2558777,  0.5740494]]], dtype=float32)

ええと、それらはすべて同じ値を持っているように見えます。その理由を理解しましょう。

kernel_results_変数は、各状態のサンプラーに関する情報を提供する名前付きタプルです。ここでは、 is_acceptedフィールドが重要です。

# Look at the acceptance for the last 100 samples
print(np.squeeze(is_accepted)[-100:])
print('Fraction of samples accepted:', np.mean(np.squeeze(is_accepted)))
[False False False False False False False False False False False False
 False False False False False False False False False False False False
 False False False False False False False False False False False False
 False False False False False False False False False False False False
 False False False False False False False False False False False False
 False False False False False False False False False False False False
 False False False False False False False False False False False False
 False False False False False False False False False False False False
 False False False False]
Fraction of samples accepted: 0.0

すべてのサンプルが拒否されました!おそらく、ステップサイズが大きすぎました。 stepsize=0.1純粋に任意に選択しました。

バージョン3:適応ステップサイズでのサンプリング

ステップサイズの任意の選択によるサンプリングが失敗したため、いくつかの議題項目があります。

  1. 適応ステップサイズを実装し、
  2. いくつかの収束チェックを実行します。

tensorflow_probability/python/mcmc/hmc.pyには、適応ステップサイズを実装するための優れたサンプルコードがいくつかあります。私はそれを以下に適応させました。

ステップごとに個別のsess.run()ステートメントがあることに注意してください。これは、必要に応じてステップごとの診断を簡単に追加できるため、デバッグに非常に役立ちます。たとえば、増分の進行状況、各ステップの時間などを表示できます。

ヒント:サンプリングを台無しにする一般的な方法の1つは、グラフをループ内で成長させることです。 (セッションの実行前にグラフをファイナライズする理由は、このような問題を防ぐためです。)ただし、finalize()を使用していない場合は、コードのクロールが遅くなるかどうかのデバッグチェックとして、グラフを印刷するのが便利です。 len(mygraph.get_operations())を介した各ステップでのサイズ-長さが長くなる場合は、おそらく何か悪いことをしています。

ここでは、3つの独立したチェーンを実行します。チェーン間でいくつかの比較を行うと、収束を確認するのに役立ちます。

# The number of chains is determined by the shape of the initial values.
# Here we'll generate 3 chains, so we'll need a tensor of 3 initial values.
N_CHAINS = 3

np.random.seed(123)

initial_values = []
for i in range(N_CHAINS):
  initial_value_cholesky = np.array(
      [[0.5 + np.random.uniform(), 0.0],
       [-0.5 + np.random.uniform(), 0.5 + np.random.uniform()]],
      dtype=np.float32)
  initial_values.append(initial_value_cholesky.dot(initial_value_cholesky.T))
initial_values = np.stack(initial_values)

initial_values_transformed = precision_to_unconstrained.forward(
  initial_values).numpy()
@tf.function(autograph=False)
def sample():
  tf.random.set_seed(123)
  log_lik_fn = get_log_lik_transformed(my_data)

  # Tuning acceptance rates:
  dtype = np.float32
  num_burnin_iter = 3000
  num_warmup_iter = int(0.8 * num_burnin_iter) 
  num_chain_iter = 2500

  # Set the target average acceptance ratio for the HMC as suggested by
  # Beskos et al. (2013):
  # https://projecteuclid.org/download/pdfview_1/euclid.bj/1383661192
  target_accept_rate = 0.651

  # Initialize the HMC sampler.
  hmc = tfp.mcmc.HamiltonianMonteCarlo(
      target_log_prob_fn=log_lik_fn,
      step_size=0.01,
      num_leapfrog_steps=3)

  # Adapt the step size using standard adaptive MCMC procedure. See Section 4.2
  # of Andrieu and Thoms (2008):
  # http://www4.ncsu.edu/~rsmith/MA797V_S12/Andrieu08_AdaptiveMCMC_Tutorial.pdf
  adapted_kernel = tfp.mcmc.SimpleStepSizeAdaptation(
      inner_kernel=hmc,
      num_adaptation_steps=num_warmup_iter,
      target_accept_prob=target_accept_rate)

  states, is_accepted = tfp.mcmc.sample_chain(
      num_results=num_chain_iter,
      num_burnin_steps=num_burnin_iter,
      current_state=initial_values_transformed,
      kernel=adapted_kernel,
      trace_fn=lambda _, pkr: pkr.inner_results.is_accepted,
      parallel_iterations=1)
  # transform samples back to their constrained form
  precision_samples = precision_to_unconstrained.inverse(states)
  return states, precision_samples, is_accepted

states, precision_samples, is_accepted = sample()

簡単なチェック:サンプリング中の受け入れ率は、目標の0.651に近づいています。

print(np.mean(is_accepted))
0.6190666666666667

さらに良いことに、サンプルの平均と標準偏差は、分析ソリューションから期待されるものに近いです。

precision_samples_reshaped = np.reshape(precision_samples, [-1, 2, 2])
print('True posterior mean:\n', posterior_mean)
print('Mean of samples:\n', np.mean(precision_samples_reshaped, axis=0))
True posterior mean:
 [[ 0.9641779 -1.6534661]
 [-1.6534661  3.8683164]]
Mean of samples:
 [[ 0.96426415 -1.6519215 ]
 [-1.6519215   3.8614824 ]]

print('True posterior standard deviation:\n', posterior_sd)
print('Standard deviation of samples:\n', np.std(precision_samples_reshaped, axis=0))
True posterior standard deviation:
 [[0.13435492 0.25050813]
 [0.25050813 0.53903675]]
Standard deviation of samples:
 [[0.13622096 0.25235635]
 [0.25235635 0.5394968 ]]

収束の確認

一般に、チェックする分析ソリューションがないため、サンプラーが収束していることを確認する必要があります。標準チェックの1つは、Gelman-Rubin $ \ hat {R} $統計であり、複数のサンプリングチェーンが必要です。 $ \ hat {R} $は、チェーン間の(平均の)分散が、チェーンが同じように分布している場合に予想されるものを超える程度を測定します。 1に近い$ \ hat {R} $の値は、おおよその収束を示すために使用されます。詳細については、ソースを参照してください。

r_hat = tfp.mcmc.potential_scale_reduction(precision_samples).numpy()
print(r_hat)
[[1.0038308 1.0005717]
 [1.0005717 1.0006068]]

モデル批評

分析的な解決策がなかったとしたら、これは実際のモデルの批判をするときです。

これは、グラウンドトゥルース(赤)に関連するサンプルコンポーネントの簡単なヒストグラムです。サンプルは、サンプルの精度マトリックス値から単位行列に向かって以前に縮小されていることに注意してください。

fig, axes = plt.subplots(2, 2, sharey=True)
fig.set_size_inches(8, 8)
for i in range(2):
  for j in range(2):
    ax = axes[i, j]
    ax.hist(precision_samples_reshaped[:, i, j])
    ax.axvline(true_precision[i, j], color='red',
               label='True precision')
    ax.axvline(sample_precision[i, j], color='red', linestyle=':',
               label='Sample precision')
    ax.set_title('precision[%d, %d]' % (i, j))
plt.tight_layout()
plt.legend()
plt.show()

png

精度コンポーネントのペアのいくつかの散布図は、事後の相関構造のために、真の事後値が上記の周辺分布から表示されるほど可能性が低いことを示しています。

fig, axes = plt.subplots(4, 4)
fig.set_size_inches(12, 12)
for i1 in range(2):
  for j1 in range(2):
    index1 = 2 * i1 + j1
    for i2 in range(2):
      for j2 in range(2):
        index2 = 2 * i2 + j2
        ax = axes[index1, index2]
        ax.scatter(precision_samples_reshaped[:, i1, j1],
                   precision_samples_reshaped[:, i2, j2], alpha=0.1)
        ax.axvline(true_precision[i1, j1], color='red')
        ax.axhline(true_precision[i2, j2], color='red')
        ax.axvline(sample_precision[i1, j1], color='red', linestyle=':')
        ax.axhline(sample_precision[i2, j2], color='red', linestyle=':')
        ax.set_title('(%d, %d) vs (%d, %d)' % (i1, j1, i2, j2))
plt.tight_layout()
plt.show()

png

バージョン4:制約されたパラメーターのより簡単なサンプリング

Bijectorsは、精度行列のサンプリングを簡単にしましたが、制約のない表現との間でかなりの量の手動変換が行われました。もっと簡単な方法があります!

TransformedTransitionKernel

TransformedTransitionKernelこのプロセスを簡素化します。サンプラーをラップし、すべての変換を処理します。引数として、制約のないパラメーター値を制約のあるパラメーター値にマップするバイジェクターのリストを取ります。したがって、ここでは、上記で使用したprecision_to_unconstrainedバイジェクターの逆が必要です。 tfb.Invert(precision_to_unconstrained)使用することもできますが、逆数の逆数を取る必要があります(TensorFlowはtf.Invert(tf.Invert())tf.Identity())に単純化するほど賢くないため、代わりに新しいバイジェクターを作成します。

拘束バイジェクター

# The bijector we need for the TransformedTransitionKernel is the inverse of
# the one we used above
unconstrained_to_precision = tfb.Chain([
    # step 3: take the product of Cholesky factors
    tfb.CholeskyOuterProduct(validate_args=VALIDATE_ARGS),
    # step 2: exponentiate the diagonals    
    tfb.TransformDiagonal(tfb.Exp(validate_args=VALIDATE_ARGS)),
    # step 1: map a vector to a lower triangular matrix
    tfb.FillTriangular(validate_args=VALIDATE_ARGS),
])
# quick sanity check
m = [[1., 2.], [2., 8.]]
m_inv = unconstrained_to_precision.inverse(m).numpy()
m_fwd = unconstrained_to_precision.forward(m_inv).numpy()

print('m:\n', m)
print('unconstrained_to_precision.inverse(m):\n', m_inv)
print('forward(unconstrained_to_precision.inverse(m)):\n', m_fwd)
m:
 [[1.0, 2.0], [2.0, 8.0]]
unconstrained_to_precision.inverse(m):
 [0.6931472 2.        0.       ]
forward(unconstrained_to_precision.inverse(m)):
 [[1. 2.]
 [2. 8.]]

TransformedTransitionKernelを使用したサンプリング

TransformedTransitionKernelを使用すると、パラメーターを手動で変換する必要がなくなります。初期値とサンプルはすべて精密行列です。制約のないバイジェクターをカーネルに渡すだけで、すべての変換が処理されます。

@tf.function(autograph=False)
def sample():
  tf.random.set_seed(123)
  log_lik_fn = get_log_lik(my_data)

  # Tuning acceptance rates:
  dtype = np.float32
  num_burnin_iter = 3000
  num_warmup_iter = int(0.8 * num_burnin_iter) 
  num_chain_iter = 2500

  # Set the target average acceptance ratio for the HMC as suggested by
  # Beskos et al. (2013):
  # https://projecteuclid.org/download/pdfview_1/euclid.bj/1383661192
  target_accept_rate = 0.651

  # Initialize the HMC sampler.
  hmc = tfp.mcmc.HamiltonianMonteCarlo(
      target_log_prob_fn=log_lik_fn,
      step_size=0.01,
      num_leapfrog_steps=3)

  ttk = tfp.mcmc.TransformedTransitionKernel(
      inner_kernel=hmc, bijector=unconstrained_to_precision)

  # Adapt the step size using standard adaptive MCMC procedure. See Section 4.2
  # of Andrieu and Thoms (2008):
  # http://www4.ncsu.edu/~rsmith/MA797V_S12/Andrieu08_AdaptiveMCMC_Tutorial.pdf
  adapted_kernel = tfp.mcmc.SimpleStepSizeAdaptation(
      inner_kernel=ttk,
      num_adaptation_steps=num_warmup_iter,
      target_accept_prob=target_accept_rate)

  states = tfp.mcmc.sample_chain(
      num_results=num_chain_iter,
      num_burnin_steps=num_burnin_iter,
      current_state=initial_values,
      kernel=adapted_kernel,
      trace_fn=None,
      parallel_iterations=1)
  # transform samples back to their constrained form
  return states

precision_samples  = sample()

収束の確認

$ \ hat {R} $収束チェックは良さそうです!

r_hat = tfp.mcmc.potential_scale_reduction(precision_samples).numpy()
print(r_hat)
[[1.0013582 1.0019467]
 [1.0019467 1.0011805]]

分析後部との比較

もう一度、分析後部に対して確認しましょう。

# The output samples have shape [n_steps, n_chains, 2, 2]
# Flatten them to [n_steps * n_chains, 2, 2] via reshape:
precision_samples_reshaped = np.reshape(precision_samples, [-1, 2, 2])
print('True posterior mean:\n', posterior_mean)
print('Mean of samples:\n', np.mean(precision_samples_reshaped, axis=0))
True posterior mean:
 [[ 0.9641779 -1.6534661]
 [-1.6534661  3.8683164]]
Mean of samples:
 [[ 0.96687526 -1.6552585 ]
 [-1.6552585   3.867676  ]]

print('True posterior standard deviation:\n', posterior_sd)
print('Standard deviation of samples:\n', np.std(precision_samples_reshaped, axis=0))
True posterior standard deviation:
 [[0.13435492 0.25050813]
 [0.25050813 0.53903675]]
Standard deviation of samples:
 [[0.13329624 0.24913791]
 [0.24913791 0.53983927]]

最適化

エンドツーエンドで実行できるようになったので、より最適化されたバージョンを実行してみましょう。この例では速度はそれほど重要ではありませんが、行列が大きくなると、いくつかの最適化によって大きな違いが生じます。

私たちができる大きな速度の改善の1つは、コレスキー分解の観点からパラメーターを再設定することです。その理由は、データ尤度関数には共分散と精度行列の両方が必要なためです。行列の反転はコストがかかり($ n \ times n $行列の場合は$ O(n ^ 3)$)、共分散または精度行列のいずれかでパラメーター化する場合、もう一方を取得するために反転を行う必要があります。

注意として、実際の正定値の対称行列$ M $は、$ M = LL ^ T $の形式の積に分解できます。ここで、行列$ L $は下三角行列であり、正の対角線を持ちます。 $ M $のコレスキー分解を考えると、$ M $(下三角行列と上三角行列の積)と$ M ^ {-1} $(逆置換による)の両方をより効率的に取得できます。コレスキー分解自体は計算するのに安価ではありませんが、コレスキー因子の観点からパラメーター化する場合、初期パラメーター値のコレスキー分解を計算するだけで済みます。

共分散行列のコレスキー分解を使用する

TFPには、共分散行列のコレスキー因子の観点からパラメーター化された多変量正規分布のバージョンであるMultivariateNormalTriLがあります。したがって、共分散行列のコレスキー因子の観点からパラメーター化する場合、データ対数尤度を効率的に計算できます。課題は、同様の効率で以前の対数尤度を計算することです。

サンプルのコレスキー因子で機能する逆ウィシャート分布のバージョンがあれば、すべて設定されます。残念ながら、私たちはしません。 (ただし、チームはコードの送信を歓迎します!)代わりに、サンプルのコレスキー因子と一連の約数を処理するバージョンのウィッシャート分布を使用できます。

現時点では、物事を本当に効率的にするためにいくつかのストックバイジェクターが不足していますが、このプロセスを演習として示し、TFPのバイジェクターの力を示す有用な例を示したいと思います。

コレスキー因子で動作するウィッシャート分布

Wishart分布には、入力行列と出力行列をコレスキー因子にすることを指定する便利なフラグinput_output_choleskyがあります。完全な行列よりもコレスキー因子を使用する方が効率的で数値的に有利であるため、これが望ましい理由です。フラグのセマンティクスに関する重要なポイント:これは、分布への入力と出力の表現が変更される必要があることを示すだけです。これは、 log_prob()に対するヤコビ行列の修正を伴う分布の完全な再パラメーター化を示すものではありません。関数。実際にこの完全な再パラメーター化を実行したいので、独自のディストリビューションを構築します。

# An optimized Wishart distribution that has been transformed to operate on
# Cholesky factors instead of full matrices.  Note that we gain a modest
# additional speedup by specifying the Cholesky factor of the scale matrix
# (i.e. by passing in the scale_tril parameter instead of scale).

class CholeskyWishart(tfd.TransformedDistribution):
  """Wishart distribution reparameterized to use Cholesky factors."""
  def __init__(self,
      df,
      scale_tril,
      validate_args=False,
      allow_nan_stats=True,
      name='CholeskyWishart'):
    # Wishart has a bunch of methods that we want to support but not
    # implement.  We'll subclass TransformedDistribution here to take care of
    # those.  We'll override the few for which speed is critical and implement
    # them with a separate Wishart for which input_output_cholesky=True
    super(CholeskyWishart, self).__init__(
        distribution=tfd.WishartTriL(
            df=df,
            scale_tril=scale_tril,
            input_output_cholesky=False,
            validate_args=validate_args,
            allow_nan_stats=allow_nan_stats),
        bijector=tfb.Invert(tfb.CholeskyOuterProduct()),
        validate_args=validate_args,
        name=name
    )
    # Here's the Cholesky distribution we'll use for log_prob() and sample()
    self.cholesky = tfd.WishartTriL(
        df=df,
        scale_tril=scale_tril,
        input_output_cholesky=True,
        validate_args=validate_args,
        allow_nan_stats=allow_nan_stats)

  def _log_prob(self, x):
    return (self.cholesky.log_prob(x) +
            self.bijector.inverse_log_det_jacobian(x, event_ndims=2))

  def _sample_n(self, n, seed=None):
    return self.cholesky._sample_n(n, seed)
# some checks
PRIOR_SCALE_CHOLESKY = np.linalg.cholesky(PRIOR_SCALE)

@tf.function(autograph=False)
def compute_log_prob(m):
  w_transformed = tfd.TransformedDistribution(
      tfd.WishartTriL(df=PRIOR_DF, scale_tril=PRIOR_SCALE_CHOLESKY),
      bijector=tfb.Invert(tfb.CholeskyOuterProduct()))
  w_optimized = CholeskyWishart(
      df=PRIOR_DF, scale_tril=PRIOR_SCALE_CHOLESKY)
  log_prob_transformed = w_transformed.log_prob(m)
  log_prob_optimized = w_optimized.log_prob(m)
  return log_prob_transformed, log_prob_optimized

for matrix in [np.eye(2, dtype=np.float32),
               np.array([[1., 0.], [2., 8.]], dtype=np.float32)]:
  log_prob_transformed, log_prob_optimized = [
      t.numpy() for t in compute_log_prob(matrix)]
  print('Transformed Wishart:', log_prob_transformed)
  print('Optimized Wishart', log_prob_optimized)
Transformed Wishart: -0.84889317
Optimized Wishart -0.84889317
Transformed Wishart: -99.269455
Optimized Wishart -99.269455

逆ウィシャート分布の構築

共分散行列$ C $を$ C = LL ^ T $に分解します。ここで、$ L $は下三角行列であり、正の対角線を持ちます。 $ C \ sim W ^ {-1}(\ nu、V)$が与えられた場合、$ L $の確率を知りたいと思います。ここで$ W ^ {-1} $は逆ウィシャート分布です。

逆ウィシャート分布には、$ C \ sim W ^ {-1}(\ nu、V)$の場合、精度行列$ C ^ {-1} \ sim W(\ nu、V ^ {-1 })$。だから我々は経由$ L $の確率を得ることができますTransformedDistributionパラメータとしてウィッシャート分布とその逆のコレスキーファクタに精度行列のコレスキーファクタをマップbijectorかかります。

$ C ^ {-1} $のコレスキー因子から$ L $に到達する簡単な(ただし超効率的ではない)方法は、逆解することによってコレスキー因子を反転し、次にこれらの反転された因子から共分散行列を形成することです。次に、コレスキー分解を行います。

$ C ^ {-1} = MM ^ T $のコレスキー分解とします。 $ M $は下三角MatrixInverseTriLので、 MatrixInverseTriLバイジェクターを使用して反転できます。

$ M ^ {-1} $から$ C $を形成するのは少し注意が必要です:$ C =(MM ^ T)^ {-1} = M ^ {-T} M ^ {-1} = M ^ {-T }(M ^ {-T})^ T $。 $ M $は下三角であるため、$ M ^ {-1} $も下三角になり、$ M ^ {-T} $は上三角になります。 CholeskyOuterProduct()バイジェクターは下三角行列でのみ機能するため、これを使用して$ M ^ {-T} $から$ C $を形成することはできません。私たちの回避策は、行列の行と列を並べ替える一連のバイジェクターです。

幸い、このロジックはCholeskyToInvCholeskyバイジェクターにカプセル化されています。

すべてのピースを組み合わせる

# verify that the bijector works
m = np.array([[1., 0.], [2., 8.]], dtype=np.float32)
c_inv = m.dot(m.T)
c = np.linalg.inv(c_inv)
c_chol = np.linalg.cholesky(c)
wishart_cholesky_to_iw_cholesky = tfb.CholeskyToInvCholesky()
w_fwd = wishart_cholesky_to_iw_cholesky.forward(m).numpy()

print('numpy =\n', c_chol)
print('bijector =\n', w_fwd)
numpy =
 [[ 1.0307764   0.        ]
 [-0.03031695  0.12126781]]
bijector =
 [[ 1.0307764   0.        ]
 [-0.03031695  0.12126781]]

最終的な配布

コレスキー因子で動作する逆ウィシャートは次のとおりです。

inverse_wishart_cholesky = tfd.TransformedDistribution(
    distribution=CholeskyWishart(
        df=PRIOR_DF,
        scale_tril=np.linalg.cholesky(np.linalg.inv(PRIOR_SCALE))),
    bijector=tfb.CholeskyToInvCholesky())

逆ウィシャートがありますが、バイジェクターでコレスキー分解を行う必要があるため、少し遅いです。精度行列のパラメーター化に戻り、最適化のためにそこで何ができるかを見てみましょう。

Final(!)バージョン:精度行列のコレスキー分解を使用

別のアプローチは、精度行列のコレスキー因子を使用することです。ここでは、事前尤度関数の計算は簡単ですが、TFPには精度によってパラメーター化された多変量正規のバージョンがないため、データ対数尤度関数はより多くの作業を必要とします。

最適化された事前対数尤度

上で作成したCholeskyWishart分布を使用して、事前分布を作成します。

# Our new prior.
PRIOR_SCALE_CHOLESKY = np.linalg.cholesky(PRIOR_SCALE)

def log_lik_prior_cholesky(precisions_cholesky):
  rv_precision = CholeskyWishart(
      df=PRIOR_DF,
      scale_tril=PRIOR_SCALE_CHOLESKY,
      validate_args=VALIDATE_ARGS,
      allow_nan_stats=ALLOW_NAN_STATS)
  return rv_precision.log_prob(precisions_cholesky)
# Check against the slower TF implementation and the NumPy implementation.
# Note that when comparing to NumPy, we need to add in the Jacobian correction.
precisions = [np.eye(2, dtype=np.float32),
              true_precision]
precisions_cholesky = np.stack([np.linalg.cholesky(m) for m in precisions])
precisions = np.stack(precisions)
lik_tf = log_lik_prior_cholesky(precisions_cholesky).numpy()
lik_tf_slow = tfd.TransformedDistribution(
    distribution=tfd.WishartTriL(
        df=PRIOR_DF, scale_tril=tf.linalg.cholesky(PRIOR_SCALE)),
    bijector=tfb.Invert(tfb.CholeskyOuterProduct())).log_prob(
    precisions_cholesky).numpy()
corrections = tfb.Invert(tfb.CholeskyOuterProduct()).inverse_log_det_jacobian(
    precisions_cholesky, event_ndims=2).numpy()
n = precisions.shape[0]

for i in range(n):
  print(i)
  print('numpy:', log_lik_prior_numpy(precisions[i]) + corrections[i])
  print('tensorflow slow:', lik_tf_slow[i])
  print('tensorflow fast:', lik_tf[i])
0
numpy: -0.8488930160357633
tensorflow slow: -0.84889317
tensorflow fast: -0.84889317
1
numpy: -7.442875031036973
tensorflow slow: -7.442877
tensorflow fast: -7.442876

最適化されたデータログの可能性

TFPのバイジェクターを使用して、多変量正規の独自のバージョンを作成できます。重要なアイデアは次のとおりです。

要素が$ N(0、1)$のiidサンプルである列ベクトル$ X $があるとします。 $ \ text {mean}(X)= 0 $と$ \ text {cov}(X)= I $があります

ここで、$ Y = AX + b $とします。 $ \ text {mean}(Y)= b $と$ \ text {cov}(Y)= AA ^ T $があります

したがって、アフィン変換$ Ax + b $を使用して、平均$ b $と共分散$ C $のベクトルを、$ AA ^ T = C $で提供されるiid標準正規サンプルのベクトルに作成できます。 $ C $のコレスキー分解には目的の特性があります。ただし、他の解決策もあります。

$ P = C ^ {-1} $とし、$ P $のコレスキー分解を$ B $、つまり$ BB ^ T = P $とします。今

$ P ^ {-1} =(BB ^ T)^ {-1} = B ^ {-T} B ^ {-1} = B ^ {-T}(B ^ {-T})^ T $

したがって、目的の平均と共分散を取得する別の方法は、アフィン変換$ Y = B ^ {-T} X + b $を使用することです。

私たちのアプローチ( このノートブックの礼儀):

  1. tfd.Independent()を使用して、1-D Normal確率変数のバッチを単一の多次元確率変数に結合します。 The reinterpreted_batch_ndims parameter for Independent() specifies the number of batch dimensions that should be reinterpreted as event dimensions. In our case we create a 1-D batch of length 2 that we transform into a 1-D event of length 2, so reinterpreted_batch_ndims=1 .
  2. Apply a bijector to add the desired covariance: tfb.Invert(tfb.Affine(scale_tril=precision_cholesky, adjoint=True)) . Note that above we're multiplying our iid normal random variables by the transpose of the inverse of the Cholesky factor of the precision matrix $(B^{-T}X)$. The tfb.Invert takes care of inverting $B$, and the adjoint=True flag performs the transpose.
  3. Apply a bijector to add the desired offset: tfb.Affine(shift=shift) Note that we have to do the shift as a separate step from the initial inverted affine transform because otherwise the inverted scale is applied to the shift (since the inverse of $y=Ax+b$ is $x=A^{-1}y - A^{-1}b$).
class MVNPrecisionCholesky(tfd.TransformedDistribution):
  """Multivariate normal parameterized by loc and Cholesky precision matrix."""

  def __init__(self, loc, precision_cholesky, name=None):
    super(MVNPrecisionCholesky, self).__init__(
        distribution=tfd.Independent(
            tfd.Normal(loc=tf.zeros_like(loc),
                       scale=tf.ones_like(loc)),
            reinterpreted_batch_ndims=1),
        bijector=tfb.Chain([
            tfb.Affine(shift=loc),
            tfb.Invert(tfb.Affine(scale_tril=precision_cholesky,
                                  adjoint=True)),
        ]),
        name=name)
@tf.function(autograph=False)
def log_lik_data_cholesky(precisions_cholesky, replicated_data):
  n = tf.shape(precisions_cholesky)[0]  # number of precision matrices
  rv_data = MVNPrecisionCholesky(
      loc=tf.zeros([n, 2]),
      precision_cholesky=precisions_cholesky)
  return tf.reduce_sum(rv_data.log_prob(replicated_data), axis=0)
# check against the numpy implementation
true_precision_cholesky = np.linalg.cholesky(true_precision)
precisions = [np.eye(2, dtype=np.float32), true_precision]
precisions_cholesky = np.stack([np.linalg.cholesky(m) for m in precisions])
precisions = np.stack(precisions)
n = precisions_cholesky.shape[0]
replicated_data = np.tile(np.expand_dims(my_data, axis=1), reps=[1, 2, 1])
lik_tf = log_lik_data_cholesky(precisions_cholesky, replicated_data).numpy()

for i in range(n):
  print(i)
  print('numpy:', log_lik_data_numpy(precisions[i], my_data))
  print('tensorflow:', lik_tf[i])
0
numpy: -430.71218815801365
tensorflow: -430.71207
1
numpy: -280.81822950593767
tensorflow: -280.81824

Combined log likelihood function

Now we combine our prior and data log likelihood functions in a closure.

def get_log_lik_cholesky(data, n_chains=1):
  # The data argument that is passed in will be available to the inner function
  # below so it doesn't have to be passed in as a parameter.
  replicated_data = np.tile(np.expand_dims(data, axis=1), reps=[1, n_chains, 1])

  @tf.function(autograph=False)
  def _log_lik_cholesky(precisions_cholesky):
    return (log_lik_data_cholesky(precisions_cholesky, replicated_data) +
            log_lik_prior_cholesky(precisions_cholesky))

  return _log_lik_cholesky

Constraining bijector

Our samples are constrained to be valid Cholesky factors, which means they must be lower triangular matrices with positive diagonals. The TransformedTransitionKernel needs a bijector that maps unconstrained tensors to/from tensors with our desired constraints. We've removed the Cholesky decomposition from the bijector's inverse, which speeds things up.

unconstrained_to_precision_cholesky = tfb.Chain([
    # step 2: exponentiate the diagonals    
    tfb.TransformDiagonal(tfb.Exp(validate_args=VALIDATE_ARGS)),
    # step 1: expand the vector to a lower triangular matrix
    tfb.FillTriangular(validate_args=VALIDATE_ARGS),
])
# some checks
inv = unconstrained_to_precision_cholesky.inverse(precisions_cholesky).numpy()
fwd = unconstrained_to_precision_cholesky.forward(inv).numpy()
print('precisions_cholesky:\n', precisions_cholesky)
print('\ninv:\n', inv)
print('\nfwd(inv):\n', fwd)
precisions_cholesky:
 [[[ 1.         0.       ]
  [ 0.         1.       ]]

 [[ 1.1470785  0.       ]
  [-2.0647411  1.0000004]]]

inv:
 [[ 0.0000000e+00  0.0000000e+00  0.0000000e+00]
 [ 3.5762781e-07 -2.0647411e+00  1.3721828e-01]]

fwd(inv):
 [[[ 1.         0.       ]
  [ 0.         1.       ]]

 [[ 1.1470785  0.       ]
  [-2.0647411  1.0000004]]]

Initial values

We generate a tensor of initial values. We're working with Cholesky factors, so we generate some Cholesky factor initial values.

# The number of chains is determined by the shape of the initial values.
# Here we'll generate 3 chains, so we'll need a tensor of 3 initial values.
N_CHAINS = 3

np.random.seed(123)

initial_values_cholesky = []
for i in range(N_CHAINS):
  initial_values_cholesky.append(np.array(
      [[0.5 + np.random.uniform(), 0.0],
       [-0.5 + np.random.uniform(), 0.5 + np.random.uniform()]],
      dtype=np.float32))
initial_values_cholesky = np.stack(initial_values_cholesky)

Sampling

We sample N_CHAINS chains using the TransformedTransitionKernel .

@tf.function(autograph=False)
def sample():
  tf.random.set_seed(123)
  log_lik_fn = get_log_lik_cholesky(my_data)

  # Tuning acceptance rates:
  dtype = np.float32
  num_burnin_iter = 3000
  num_warmup_iter = int(0.8 * num_burnin_iter) 
  num_chain_iter = 2500

  # Set the target average acceptance ratio for the HMC as suggested by
  # Beskos et al. (2013):
  # https://projecteuclid.org/download/pdfview_1/euclid.bj/1383661192
  target_accept_rate = 0.651

  # Initialize the HMC sampler.
  hmc = tfp.mcmc.HamiltonianMonteCarlo(
      target_log_prob_fn=log_lik_fn,
      step_size=0.01,
      num_leapfrog_steps=3)

  ttk = tfp.mcmc.TransformedTransitionKernel(
      inner_kernel=hmc, bijector=unconstrained_to_precision_cholesky)

  # Adapt the step size using standard adaptive MCMC procedure. See Section 4.2
  # of Andrieu and Thoms (2008):
  # http://www4.ncsu.edu/~rsmith/MA797V_S12/Andrieu08_AdaptiveMCMC_Tutorial.pdf
  adapted_kernel = tfp.mcmc.SimpleStepSizeAdaptation(
      inner_kernel=ttk,
      num_adaptation_steps=num_warmup_iter,
      target_accept_prob=target_accept_rate)

  states = tfp.mcmc.sample_chain(
      num_results=num_chain_iter,
      num_burnin_steps=num_burnin_iter,
      current_state=initial_values,
      kernel=adapted_kernel,
      trace_fn=None,
      parallel_iterations=1)
  # transform samples back to their constrained form
  samples = tf.linalg.matmul(states, states, transpose_b=True)
  return samples

precision_samples = sample()

Convergence check

A quick convergence check looks good:

r_hat = tfp.mcmc.potential_scale_reduction(precision_samples).numpy()
print(r_hat)
[[1.0013583 1.0019467]
 [1.0019467 1.0011804]]

Comparing results to the analytic posterior

# The output samples have shape [n_steps, n_chains, 2, 2]
# Flatten them to [n_steps * n_chains, 2, 2] via reshape:
precision_samples_reshaped = np.reshape(precision_samples, newshape=[-1, 2, 2])

And again, the sample means and standard deviations match those of the analytic posterior.

print('True posterior mean:\n', posterior_mean)
print('Mean of samples:\n', np.mean(precision_samples_reshaped, axis=0))
True posterior mean:
 [[ 0.9641779 -1.6534661]
 [-1.6534661  3.8683164]]
Mean of samples:
 [[ 0.9668749 -1.6552604]
 [-1.6552604  3.8676758]]

print('True posterior standard deviation:\n', posterior_sd)
print('Standard deviation of samples:\n', np.std(precision_samples_reshaped, axis=0))
True posterior standard deviation:
 [[0.13435492 0.25050813]
 [0.25050813 0.53903675]]
Standard deviation of samples:
 [[0.13329637 0.24913797]
 [0.24913797 0.53983945]]

Ok, all done! We've got our optimized sampler working.