이 페이지는 Cloud Translation API를 통해 번역되었습니다.
Switch to English

TensorFlow Probability 사례 연구 : 공분산 추정

TensorFlow.org에서보기 Google Colab에서 실행 GitHub에서 소스보기 노트북 다운로드

TensorFlow Probability를 배우기 위해이 노트북을 사례 연구로 작성했습니다. 내가 해결하기로 선택한 문제는 2 차원 평균 0 가우스 랜덤 변수의 샘플에 대한 공분산 행렬을 추정하는 것입니다. 이 문제에는 몇 가지 좋은 기능이 있습니다.

  • 공분산 (일반적인 접근 방식)에 대해 역 Wishart 사전을 사용하면 문제에 분석 솔루션이 있으므로 결과를 확인할 수 있습니다.
  • 문제는 제한된 매개 변수를 샘플링하는 것과 관련되어 흥미로운 복잡성을 추가합니다.
  • 가장 간단한 솔루션은 가장 빠른 솔루션이 아니므로 수행 할 몇 가지 최적화 작업이 있습니다.

나는 내 경험을 기록하기로 결정했다. TFP의 미세한 부분에 대해 머리를 감싸는 데 시간이 좀 걸렸기 때문에이 노트북은 상당히 간단하게 시작한 다음 점차 더 복잡한 TFP 기능까지 작동합니다. 그 과정에서 많은 문제가 발생했고, 문제를 식별하는 데 도움이 된 프로세스와 결국 발견 한 해결 방법을 모두 포착하려고 노력했습니다. 나는 많은 세부 사항을 포함하려고 노력했습니다 (개별 단계가 올바른지 확인하기위한 많은 테스트 포함).

TensorFlow Probability를 배우는 이유는 무엇입니까?

TensorFlow Probability가 몇 가지 이유로 내 프로젝트에 매력적이라는 것을 알았습니다.

  • TensorFlow 확률을 사용하면 노트북에서 대화식으로 복잡한 모델을 개발하는 프로토 타입을 만들 수 있습니다. 대화 형 및 단위 테스트로 테스트 할 수있는 작은 조각으로 코드를 나눌 수 있습니다.
  • 확장 할 준비가되면 TensorFlow를 여러 머신의 최적화 된 여러 프로세서에서 실행하기 위해 우리가 마련한 모든 인프라를 활용할 수 있습니다.
  • 마지막으로 Stan을 정말 좋아하지만 디버그하기가 매우 어렵습니다. 코드를 살펴보고 중간 상태를 검사하는 등의 작업을 수행 할 수있는 도구가 거의없는 독립형 언어로 모든 모델링 코드를 작성해야합니다.

단점은 TensorFlow Probability가 Stan 및 PyMC3보다 훨씬 더 새롭기 때문에 문서가 진행 중이며 아직 빌드되지 않은 많은 기능이 있다는 것입니다. 다행스럽게도 TFP의 기반이 견고하고 기능을 상당히 간단하게 확장 할 수있는 모듈 방식으로 설계되었습니다. 이 노트북에서는 사례 연구를 해결하는 것 외에도 TFP를 확장하는 몇 가지 방법을 보여 드리겠습니다.

누구를위한 것인가

독자들이 몇 가지 중요한 전제 조건을 가지고이 노트북을 방문한다고 가정하고 있습니다. 다음을 수행해야합니다.

첫번째 시도

문제에 대한 첫 번째 시도입니다. 스포일러 : 내 솔루션이 작동하지 않고 제대로 작동하려면 몇 번의 시도가 필요합니다! 과정은 시간이 걸리지 만 아래의 각 시도는 TFP의 새로운 부분을 배우는 데 유용했습니다.

한 가지 참고 : TFP는 현재 역 Wishart 분포를 구현하지 않습니다 (마지막에서 역 Wishart를 롤링하는 방법을 살펴 보겠습니다). 대신 Wishart 사전을 사용하여 정밀도 행렬을 추정하는 문제로 변경하겠습니다.

import collections
import math
import os
import time

import numpy as np
import pandas as pd
import scipy
import scipy.stats
import matplotlib.pyplot as plt

import tensorflow.compat.v2 as tf
tf.enable_v2_behavior()
import tensorflow_probability as tfp
tfd = tfp.distributions
tfb = tfp.bijectors

1 단계 : 관찰 결과 수집

여기 내 데이터는 모두 합성되어 있으므로 실제 사례보다 약간 깔끔해 보일 것입니다. 그러나 자신의 합성 데이터를 생성 할 수없는 이유는 없습니다.

: 모델의 형식을 결정한 후에는 매개 변수 값을 선택하고 선택한 모델을 사용하여 합성 데이터를 생성 할 수 있습니다. 구현 상태를 점검하기 위해 선택한 매개 변수의 실제 값이 예상치에 포함되어 있는지 확인할 수 있습니다. 디버깅 / 테스트주기를 더 빠르게하려면 모델의 단순화 된 버전을 고려할 수 있습니다 (예 : 더 적은 차원 또는 더 적은 샘플 사용).

팁 : 관찰을 NumPy 배열로 사용하는 것이 가장 쉽습니다. 한 가지 중요한 점은 NumPy가 기본적으로 float64를 사용하는 반면 TensorFlow는 기본적으로 float32를 사용한다는 것입니다.

일반적으로 TensorFlow 작업은 모든 인수가 동일한 유형을 갖기를 원하며 유형을 변경하려면 명시 적 데이터 캐스팅을 수행해야합니다. float64 관찰을 사용하는 경우 많은 캐스트 작업을 추가해야합니다. 반대로 NumPy는 자동으로 캐스팅을 처리합니다. 따라서 TensorFlow가 float64를 사용하도록하는 것보다 Numpy 데이터를 float32로 변환하는 것이 훨씬 쉽습니다.

일부 매개 변수 값 선택

# We're assuming 2-D data with a known true mean of (0, 0)
true_mean = np.zeros([2], dtype=np.float32)
# We'll make the 2 coordinates correlated
true_cor = np.array([[1.0, 0.9], [0.9, 1.0]], dtype=np.float32)
# And we'll give the 2 coordinates different variances
true_var = np.array([4.0, 1.0], dtype=np.float32)
# Combine the variances and correlations into a covariance matrix
true_cov = np.expand_dims(np.sqrt(true_var), axis=1).dot(
    np.expand_dims(np.sqrt(true_var), axis=1).T) * true_cor
# We'll be working with precision matrices, so we'll go ahead and compute the
# true precision matrix here
true_precision = np.linalg.inv(true_cov)
# Here's our resulting covariance matrix
print(true_cov)
# Verify that it's positive definite, since np.random.multivariate_normal
# complains about it not being positive definite for some reason.
# (Note that I'll be including a lot of sanity checking code in this notebook -
# it's a *huge* help for debugging)
print('eigenvalues: ', np.linalg.eigvals(true_cov))
[[4.  1.8]
 [1.8 1. ]]
eigenvalues:  [4.843075   0.15692513]

합성 관찰 생성

TensorFlow Probability는 데이터의 초기 차원이 샘플 인덱스를 나타내고 데이터의 최종 차원이 샘플의 차원을 나타내는 규칙을 사용합니다.

여기에서는 각각 길이가 2 인 벡터 인 100 개의 샘플을 원합니다. 모양이 (100, 2) 인 배열 my_data 를 생성합니다. my_data[i, :] 는 $ i $ 번째 샘플이고 길이가 2 인 벡터입니다.

( my_data float32 유형을 지정하는 것을 잊지 마십시오!)

# Set the seed so the results are reproducible.
np.random.seed(123)

# Now generate some observations of our random variable.
# (Note that I'm suppressing a bunch of spurious about the covariance matrix
# not being positive semidefinite via check_valid='ignore' because it really is
# positive definite!)
my_data = np.random.multivariate_normal(
    mean=true_mean, cov=true_cov, size=100,
    check_valid='ignore').astype(np.float32)
my_data.shape
(100, 2)

온전한 관찰 관찰

버그의 잠재적 원인 중 하나는 합성 데이터를 엉망으로 만드는 것입니다! 몇 가지 간단한 확인을 해보겠습니다.

# Do a scatter plot of the observations to make sure they look like what we
# expect (higher variance on the x-axis, y values strongly correlated with x)
plt.scatter(my_data[:, 0], my_data[:, 1], alpha=0.75)
plt.show()

png

print('mean of observations:', np.mean(my_data, axis=0))
print('true mean:', true_mean)
mean of observations: [-0.24009615 -0.16638893]
true mean: [0. 0.]

print('covariance of observations:\n', np.cov(my_data, rowvar=False))
print('true covariance:\n', true_cov)
covariance of observations:
 [[3.95307734 1.68718486]
 [1.68718486 0.94910269]]
true covariance:
 [[4.  1.8]
 [1.8 1. ]]

좋습니다. 샘플이 합리적으로 보입니다. 다음 단계.

2 단계 : NumPy에서 우도 함수 구현

TF Probability에서 MCMC 샘플링을 수행하기 위해 작성해야하는 주요 내용은 로그 가능성 함수입니다. 일반적으로 NumPy보다 TF를 작성하는 것이 약간 까다롭기 때문에 NumPy에서 초기 구현을 수행하는 것이 도움이됩니다. 우도 함수를 $ P (data | parameters) $에 해당하는 데이터 우도 함수와 $ P (parameters) $에 해당하는 이전 우도 함수 두 부분으로 나눌 것입니다.

이러한 NumPy 함수는 테스트를 위해 일부 값을 생성하는 것이 목표이기 때문에 최적화 / 벡터화 할 필요가 없습니다. 정확성이 핵심 고려 사항입니다!

먼저 데이터 로그 가능성 부분을 구현합니다. 꽤 간단합니다. 기억해야 할 한 가지는 정밀 행렬로 작업 할 것이므로 그에 따라 매개 변수화 할 것입니다.

def log_lik_data_numpy(precision, data):
  # np.linalg.inv is a really inefficient way to get the covariance matrix, but
  # remember we don't care about speed here
  cov = np.linalg.inv(precision)
  rv = scipy.stats.multivariate_normal(true_mean, cov)
  return np.sum(rv.logpdf(data))

# test case: compute the log likelihood of the data given the true parameters
log_lik_data_numpy(true_precision, my_data)
-280.81822950593767

사후에 대한 분석 솔루션이 있기 때문에 정밀도 행렬에 Wishart 사전을 사용할 것입니다 ( Wikipedia의 편리한 공액 사전 표 참조).

Wishart 분포 에는 2 개의 모수가 있습니다.

  • 자유도 (Wikipedia에서 $ \ nu $로 표시됨)
  • 스케일 행렬 (Wikipedia에서 $ V $로 표시됨)

모수가 $ \ nu, V $ 인 Wishart 분포의 평균은 $ E [W] = \ nu V $이고 분산은 $ \ text {Var} (W_ {ij}) = \ nu (v_ {ij}입니다. ^ 2 + v_ {ii} v_ {jj}) $

몇 가지 유용한 직관 : 평균이 0이고 공분산이 $ V $ 인 다변량 정규 랜덤 변수에서 $ \ nu $ 독립 그리기 $ x_1 \ ldots x _ {\ nu} $를 생성 한 다음 합계 $ W =를 형성하여 Wishart 샘플을 생성 할 수 있습니다. \ sum_ {i = 1} ^ {\ nu} x_i x_i ^ T $.

Wishart 샘플을 $ \ nu $로 나누어 스케일을 조정하면 $ x_i $의 샘플 공분산 행렬을 얻습니다. 이 샘플 공분산 행렬은 $ \ nu $가 증가함에 따라 $ V $로 향해야합니다. $ \ nu $가 작 으면 표본 공분산 행렬에 많은 변동이 있으므로 $ \ nu $의 작은 값은 더 약한 사전에 해당하고 $ \ nu $의 큰 값은 더 강한 사전에 해당합니다. $ \ nu $는 샘플링하는 공간의 차원 이상이어야합니다. 그렇지 않으면 특이 행렬이 생성됩니다.

우리는 $ \ nu = 3 $를 사용할 것이므로 우리는 약한 사전을 가지고 $ V = \ frac {1} {\ nu} I $를 사용하여 공분산 추정치를 동일성쪽으로 가져올 것입니다 (평균 $ \ nu V $)입니다.

PRIOR_DF = 3
PRIOR_SCALE = np.eye(2, dtype=np.float32) / PRIOR_DF

def log_lik_prior_numpy(precision):
  rv = scipy.stats.wishart(df=PRIOR_DF, scale=PRIOR_SCALE)
  return rv.logpdf(precision)

# test case: compute the prior for the true parameters
log_lik_prior_numpy(true_precision)
-9.103606346649766

Wishart 분포는 평균 $ \ mu $가 알려진 다변량 정규 분포의 정밀도 행렬을 추정하기위한 켤레 사전입니다.

이전 Wishart 매개 변수가 $ \ nu, V $이고 다변량 정규 값 $ x_1, \ ldots, x_n $에 대한 $ n $ 관측 값이 있다고 가정합니다. 사후 매개 변수는 $ n + \ nu, \ left (V ^ {-1} + \ sum_ {i = 1} ^ n (x_i- \ mu) (x_i- \ mu) ^ T \ right) ^ {-1입니다. } $.

n = my_data.shape[0]
nu_prior = PRIOR_DF
v_prior = PRIOR_SCALE
nu_posterior = nu_prior + n
v_posterior = np.linalg.inv(np.linalg.inv(v_prior) + my_data.T.dot(my_data))
posterior_mean = nu_posterior * v_posterior
v_post_diag = np.expand_dims(np.diag(v_posterior), axis=1)
posterior_sd = np.sqrt(nu_posterior *
                       (v_posterior ** 2.0 + v_post_diag.dot(v_post_diag.T)))

사후 및 실제 값의 빠른 플롯. 사후는 샘플 사후에 가깝지만 정체성쪽으로 약간 축소되었습니다. 또한 실제 값은 사후 모드와는 상당히 거리가 멀다는 점에 유의하십시오. 아마도 이는 사전이 우리 데이터와 매우 잘 일치하지 않기 때문일 것입니다. 실제 문제에서 우리는 공분산에 대해 스케일 된 역 Wishart와 같은 작업을 더 잘 수행 할 가능성이 있지만 (예를 들어 주제에 대한 Andrew Gelman의 주석 참조) 좋은 분석 사후가 없습니다.

sample_precision = np.linalg.inv(np.cov(my_data, rowvar=False, bias=False))
fig, axes = plt.subplots(2, 2)
fig.set_size_inches(10, 10)
for i in range(2):
  for j in range(2):
    ax = axes[i, j]
    loc = posterior_mean[i, j]
    scale = posterior_sd[i, j]
    xmin = loc - 3.0 * scale
    xmax = loc + 3.0 * scale
    x = np.linspace(xmin, xmax, 1000)
    y = scipy.stats.norm.pdf(x, loc=loc, scale=scale)
    ax.plot(x, y)
    ax.axvline(true_precision[i, j], color='red', label='True precision')
    ax.axvline(sample_precision[i, j], color='red', linestyle=':', label='Sample precision')
    ax.set_title('precision[%d, %d]' % (i, j))
plt.legend()
plt.show()

png

3 단계 : TensorFlow에서 우도 함수 구현

스포일러 : 우리의 첫 번째 시도는 효과가 없을 것입니다. 아래에서 이유에 대해 설명하겠습니다.

: 가능성 함수를 개발할 때 TensorFlow eager 모드를 사용하세요. Eager 모드는 TF가 NumPy와 비슷하게 작동하도록합니다. 모든 것이 즉시 실행되므로 Session.run() 을 사용하는 대신 대화식으로 디버깅 할 수 있습니다. 여기 에서 참고를 참조 하십시오 .

예비 : 유통 클래스

TFP에는 로그 확률을 생성하는 데 사용할 배포 클래스 모음이 있습니다. 한 가지 주목할 점은 이러한 클래스가 단일 샘플이 아닌 텐서 샘플로 작동한다는 것입니다. 이는 벡터화 및 관련 속도 향상을 허용합니다.

분포는 두 가지 방법으로 샘플 텐서와 함께 작동 할 수 있습니다. 단일 스칼라 매개 변수가있는 분포를 포함하는 구체적인 예를 사용하여이 두 가지 방법을 설명하는 것이 가장 간단합니다. rate 모수가있는 푸 아송 분포를 사용하겠습니다.

  • rate 매개 변수에 대해 단일 값으로 포아송을 생성하면 sample() 메서드를 호출하면 단일 값이 반환됩니다. 이 값을 event 라고하며이 경우 이벤트는 모두 스칼라입니다.
  • rate 매개 변수에 대한 값의 텐서로 푸 아송을 생성하면 sample() 메서드에 대한 호출은 이제 속도 텐서의 각 값에 대해 하나씩 여러 값을 반환합니다. 객체는 각각 고유 한 속도를 가진 독립적 인 포아송 모음 으로 작동하며 sample() 호출에 의해 반환 된 각 값은 이러한 포아송 중 하나에 해당합니다. 이 독립적 이지만 동일하게 분산되지 않은 이벤트 모음을 batch 라고합니다.
  • sample() 메서드는 기본값이 빈 튜플 인 sample_shape 매개 변수를 사용합니다. sample_shape 비어 있지 않은 값을 전달하면 샘플이 여러 배치를 반환합니다. 이 배치 모음을 sample 이라고합니다.

분 j의 log_prob() 메소드는 방법 평행하는 방식으로 데이터를 소모 sample() 을 생성한다. log_prob() 는 샘플, 즉 여러 개의 독립적 인 이벤트 배치에 대한 확률을 반환합니다.

  • 스칼라 rate 로 생성 된 Poisson 객체가있는 경우 각 배치는 스칼라이고 샘플의 텐서를 전달하면 로그 확률 크기가 같은 텐서를 얻을 수 있습니다.
  • rate 값의 모양 T 텐서로 생성 된 푸 아송 객체가있는 경우 각 배치는 모양 T 의 텐서입니다. 모양 D, T의 텐서 샘플을 전달하면 모양 D, T의 텐서 로그 확률을 얻을 수 있습니다.

다음은 이러한 사례를 보여주는 몇 가지 예입니다. 이벤트, 배치 및 모양에 대한 자세한 자습서는 이 노트북 을 참조하십시오.

# case 1: get log probabilities for a vector of iid draws from a single
# normal distribution
norm1 = tfd.Normal(loc=0., scale=1.)
probs1 = norm1.log_prob(tf.constant([1., 0.5, 0.]))

# case 2: get log probabilities for a vector of independent draws from
# multiple normal distributions with different parameters.  Note the vector
# values for loc and scale in the Normal constructor.
norm2 = tfd.Normal(loc=[0., 2., 4.], scale=[1., 1., 1.])
probs2 = norm2.log_prob(tf.constant([1., 0.5, 0.]))

print('iid draws from a single normal:', probs1.numpy())
print('draws from a batch of normals:', probs2.numpy())
iid draws from a single normal: [-1.4189385 -1.0439385 -0.9189385]
draws from a batch of normals: [-1.4189385 -2.0439386 -8.918939 ]

데이터 로그 가능성

먼저 데이터 로그 가능도 함수를 구현합니다.

VALIDATE_ARGS = True
ALLOW_NAN_STATS = False

NumPy 사례와의 한 가지 주요 차이점은 TensorFlow 가능성 함수가 단일 행렬이 아닌 정밀 행렬의 벡터를 처리해야한다는 것입니다. 매개 변수 벡터는 여러 체인에서 샘플링 할 때 사용됩니다.

정밀 행렬의 배치 (예 : 체인 당 하나의 행렬)로 작동하는 분포 객체를 생성합니다.

데이터의 로그 확률을 계산할 때 매개 변수와 동일한 방식으로 데이터를 복제하여 배치 변수 당 하나의 복사본이 있어야합니다. 복제 된 데이터의 모양은 다음과 같아야합니다.

[sample shape, batch shape, event shape]

우리의 경우 이벤트 모양은 2입니다 (2 차원 가우스로 작업하기 때문에). 샘플이 100 개이므로 샘플 모양은 100입니다. 배치 형태는 우리가 작업하는 정밀 행렬의 수일뿐입니다. 우도 함수를 호출 할 때마다 데이터를 복제하는 것은 낭비이므로 데이터를 미리 복제하고 복제 된 버전을 전달합니다.

이는 비효율적 인 구현입니다. MultivariateNormalFullCovariance 는 마지막 최적화 섹션에서 설명 할 일부 대안에 비해 비용이 많이 듭니다.

def log_lik_data(precisions, replicated_data):
  n = tf.shape(precisions)[0]  # number of precision matrices
  # We're estimating a precision matrix; we have to invert to get log
  # probabilities.  Cholesky inversion should be relatively efficient,
  # but as we'll see later, it's even better if we can avoid doing the Cholesky
  # decomposition altogether.
  precisions_cholesky = tf.linalg.cholesky(precisions)
  covariances = tf.linalg.cholesky_solve(
      precisions_cholesky, tf.linalg.eye(2, batch_shape=[n]))
  rv_data = tfd.MultivariateNormalFullCovariance(
      loc=tf.zeros([n, 2]),
      covariance_matrix=covariances,
      validate_args=VALIDATE_ARGS,
      allow_nan_stats=ALLOW_NAN_STATS)

  return tf.reduce_sum(rv_data.log_prob(replicated_data), axis=0)
# For our test, we'll use a tensor of 2 precision matrices.
# We'll need to replicate our data for the likelihood function.
# Remember, TFP wants the data to be structured so that the sample dimensions
# are first (100 here), then the batch dimensions (2 here because we have 2
# precision matrices), then the event dimensions (2 because we have 2-D
# Gaussian data).  We'll need to add a middle dimension for the batch using
# expand_dims, and then we'll need to create 2 replicates in this new dimension
# using tile.
n = 2
replicated_data = np.tile(np.expand_dims(my_data, axis=1), reps=[1, 2, 1])
print(replicated_data.shape)
(100, 2, 2)

팁 : 제가 매우 도움이되는 것으로 확인 된 한 가지는 TensorFlow 함수의 온 전성 검사를 거의 작성하지 않는 것입니다. TF에서 벡터화를 엉망으로 만드는 것은 정말 쉽기 때문에 더 간단한 NumPy 함수를 사용하는 것이 TF 출력을 확인하는 좋은 방법입니다. 이것을 작은 단위 테스트로 생각하십시오.

# check against the numpy implementation
precisions = np.stack([np.eye(2, dtype=np.float32), true_precision])
n = precisions.shape[0]
lik_tf = log_lik_data(precisions, replicated_data=replicated_data).numpy()

for i in range(n):
  print(i)
  print('numpy:', log_lik_data_numpy(precisions[i], my_data))
  print('tensorflow:', lik_tf[i])
0
numpy: -430.71218815801365
tensorflow: -430.71207
1
numpy: -280.81822950593767
tensorflow: -280.8182

이전 로그 가능성

데이터 복제에 대해 걱정할 필요가 없기 때문에 이전이 더 쉽습니다.

@tf.function(autograph=False)
def log_lik_prior(precisions):
  rv_precision = tfd.WishartTriL(
      df=PRIOR_DF,
      scale_tril=tf.linalg.cholesky(PRIOR_SCALE),
      validate_args=VALIDATE_ARGS,
      allow_nan_stats=ALLOW_NAN_STATS)
  return rv_precision.log_prob(precisions)
# check against the numpy implementation
precisions = np.stack([np.eye(2, dtype=np.float32), true_precision])
n = precisions.shape[0]
lik_tf = log_lik_prior(precisions).numpy()

for i in range(n):
  print(i)
  print('numpy:', log_lik_prior_numpy(precisions[i]))
  print('tensorflow:', lik_tf[i])
0
numpy: -2.2351873809649625
tensorflow: -2.2351875
1
numpy: -9.103606346649766
tensorflow: -9.103608

조인트 로그 우도 함수 구축

위의 데이터 로그 우도 함수는 관찰에 따라 다르지만 샘플러에는 해당 기능이 없습니다. [closure] (https://en.wikipedia.org/wiki/Closure_ (computer_programming))를 사용하여 전역 변수를 사용하지 않고도 종속성을 제거 할 수 있습니다. 클로저에는 필요한 변수를 포함하는 환경을 구축하는 외부 함수가 포함됩니다. 내부 기능.

def get_log_lik(data, n_chains=1):
  # The data argument that is passed in will be available to the inner function
  # below so it doesn't have to be passed in as a parameter.
  replicated_data = np.tile(np.expand_dims(data, axis=1), reps=[1, n_chains, 1])

  @tf.function(autograph=False)
  def _log_lik(precision):
    return log_lik_data(precision, replicated_data) + log_lik_prior(precision)

  return _log_lik

4 단계 : 샘플

좋습니다. 샘플링 할 시간입니다! 간단하게하기 위해 1 개의 체인 만 사용하고 단위 행렬을 시작점으로 사용할 것입니다. 나중에 좀 더 조심스럽게 할 것입니다.

다시 말하지만, 이것은 작동하지 않을 것입니다. 예외가있을 것입니다.

@tf.function(autograph=False)
def sample():
  tf.random.set_seed(123)
  init_precision = tf.expand_dims(tf.eye(2), axis=0)

  # Use expand_dims because we want to pass in a tensor of starting values
  log_lik_fn = get_log_lik(my_data, n_chains=1)

  # we'll just do a few steps here
  num_results = 10
  num_burnin_steps = 10
  states = tfp.mcmc.sample_chain(
     num_results=num_results,
     num_burnin_steps=num_burnin_steps,
     current_state=[
         init_precision,
     ],
     kernel=tfp.mcmc.HamiltonianMonteCarlo(
         target_log_prob_fn=log_lik_fn,
         step_size=0.1,
         num_leapfrog_steps=3),
     trace_fn=None,
     seed=123)
  return states

try:
  states = sample()
except Exception as e:
  # shorten the giant stack trace
  lines = str(e).split('\n')
  print('\n'.join(lines[:5]+['...']+lines[-3:]))
 Cholesky decomposition was not successful. The input might not be valid.
     [[{ {node mcmc_sample_chain/trace_scan/while/body/_79/smart_for_loop/while/body/_371/mh_one_step/hmc_kernel_one_step/leapfrog_integrate/while/body/_537/leapfrog_integrate_one_step/maybe_call_fn_and_grads/value_and_gradients/StatefulPartitionedCall/Cholesky} }]] [Op:__inference_sample_2849]

Function call stack:
sample
...
Function call stack:
sample


문제 식별

InvalidArgumentError (see above for traceback): Cholesky decomposition was not successful. The input might not be valid. 그다지 도움이되지 않습니다. 무슨 일이 있었는지 더 자세히 알아볼 수 있는지 봅시다.

  • 실패한 값을 볼 수 있도록 각 단계에 대한 매개 변수를 인쇄합니다.
  • 특정 문제를 방지하기 위해 몇 가지 주장을 추가 할 것입니다.

Assertion은 TensorFlow 작업이기 때문에 까다 롭고, 실행되고 그래프에서 최적화되지 않도록주의해야합니다. TF 어설 션에 익숙하지 않은 경우 TensorFlow 디버깅에 대한이 개요 를 읽어 볼 가치가 있습니다. tf.control_dependencies 사용하여 명시 적으로 어설 션을 강제 실행할 수 있습니다 (아래 코드의 주석 참조).

TensorFlow의 기본 Print 함수는 어설 션과 동일한 동작을 수행합니다. 이는 작업이므로 실행되도록주의해야합니다. Print 출력이 전송됩니다 : 우리가 노트북에서 작업 할 때 추가 두통의 원인 stderrstderr 셀에 표시되지 않습니다. 여기서 트릭을 사용할 것입니다. tf.Print 를 사용하는 대신 tf.Print 를 통해 자체 TensorFlow 인쇄 작업을 tf.pyfunc . 어설 션과 마찬가지로 메서드가 실행되는지 확인해야합니다.

def get_log_lik_verbose(data, n_chains=1):
  # The data argument that is passed in will be available to the inner function
  # below so it doesn't have to be passed in as a parameter.
  replicated_data = np.tile(np.expand_dims(data, axis=1), reps=[1, n_chains, 1])

  def _log_lik(precisions):
    # An internal method we'll make into a TensorFlow operation via tf.py_func
    def _print_precisions(precisions):
      print('precisions:\n', precisions)
      return False  # operations must return something!
    # Turn our method into a TensorFlow operation
    print_op = tf.compat.v1.py_func(_print_precisions, [precisions], tf.bool)

    # Assertions are also operations, and some care needs to be taken to ensure
    # that they're executed
    assert_op = tf.assert_equal(
        precisions, tf.linalg.matrix_transpose(precisions),
        message='not symmetrical', summarize=4, name='symmetry_check')

    # The control_dependencies statement forces its arguments to be executed
    # before subsequent operations
    with tf.control_dependencies([print_op, assert_op]):
      return (log_lik_data(precisions, replicated_data) +
              log_lik_prior(precisions))

  return _log_lik
@tf.function(autograph=False)
def sample():
  tf.random.set_seed(123)
  init_precision = tf.eye(2)[tf.newaxis, ...]
  log_lik_fn = get_log_lik_verbose(my_data)
  # we'll just do a few steps here
  num_results = 10
  num_burnin_steps = 10
  states = tfp.mcmc.sample_chain(
      num_results=num_results,
      num_burnin_steps=num_burnin_steps,
      current_state=[
          init_precision,
      ],
      kernel=tfp.mcmc.HamiltonianMonteCarlo(
          target_log_prob_fn=log_lik_fn,
          step_size=0.1,
          num_leapfrog_steps=3),
      trace_fn=None,
      seed=123)

try:
  states = sample()
except Exception as e:
  # shorten the giant stack trace
  lines = str(e).split('\n')
  print('\n'.join(lines[:5]+['...']+lines[-3:]))
precisions:
 [[[1. 0.]
  [0. 1.]]]
precisions:
 [[[1. 0.]
  [0. 1.]]]
precisions:
 [[[ 0.24315196 -0.2761638 ]
  [-0.33882257  0.8622    ]]]
 assertion failed: [not symmetrical] [Condition x == y did not hold element-wise:] [x (leapfrog_integrate_one_step/add:0) = ] [[[0.243151963 -0.276163787][-0.338822573 0.8622]]] [y (leapfrog_integrate_one_step/maybe_call_fn_and_grads/value_and_gradients/matrix_transpose/transpose:0) = ] [[[0.243151963 -0.338822573][-0.276163787 0.8622]]]
     [[{ {node mcmc_sample_chain/trace_scan/while/body/_96/smart_for_loop/while/body/_381/mh_one_step/hmc_kernel_one_step/leapfrog_integrate/while/body/_503/leapfrog_integrate_one_step/maybe_call_fn_and_grads/value_and_gradients/symmetry_check_1/Assert/AssertGuard/else/_577/Assert} }]] [Op:__inference_sample_4837]

Function call stack:
sample
...
Function call stack:
sample


이것이 실패하는 이유

샘플러가 시도하는 첫 번째 새 매개 변수 값은 비대칭 행렬입니다. 이로 인해 촐레 스키 분해가 실패하게됩니다. 이는 대칭 (및 양의 정의) 행렬에 대해서만 정의되기 때문입니다.

여기서 문제는 관심있는 매개 변수가 정밀도 행렬이고 정밀도 행렬은 실수, 대칭 및 양의 정부 호 여야한다는 것입니다. 샘플러는이 제약 조건에 대해 전혀 알지 못합니다 (그라데이션을 통한 경우 제외). 따라서 샘플러가 잘못된 값을 제안하여 특히 단계 크기가 큰 경우 예외가 발생할 수 있습니다.

Hamiltonian Monte Carlo 샘플러를 사용하면 그라디언트가 매개 변수를 유효하지 않은 영역에서 멀리 유지해야하므로 매우 작은 단계 크기를 사용하여 문제를 해결할 수 있지만 작은 단계 크기는 느린 수렴을 의미합니다. 그래디언트에 대해 아무것도 모르는 Metropolis-Hastings 샘플러를 사용하면 운명에 처해 있습니다.

버전 2 : 제한되지 않은 매개 변수로 다시 매개 변수화

위의 문제에 대한 간단한 해결책이 있습니다. 새 매개 변수가 더 이상 이러한 제약을 갖지 않도록 모델을 다시 매개 변수화 할 수 있습니다. TFP는이 작업을 수행하기위한 유용한 도구 세트 인 bijector를 제공합니다.

bijector를 사용한 재 매개 변수화

정밀도 행렬은 실수이고 대칭이어야합니다. 이러한 제약이없는 대체 매개 변수화를 원합니다. 시작점은 정밀도 행렬의 촐레 스키 분해입니다. 촐레 스키 요인은 여전히 ​​제한되어 있습니다. 낮은 삼각형이고 대각선 요소는 양수 여야합니다. 그러나 촐레 스키 인자의 대각선 로그를 취하면 로그가 더 이상 양수로 제한되지 않고 아래쪽 삼각형 부분을 1 차원 벡터로 평면화하면 더 이상 낮은 삼각형 제약 조건이 없습니다. . 우리의 경우 결과는 제약 조건이없는 길이 3 벡터가됩니다.

( Stan 매뉴얼 에는 매개 변수에 대한 다양한 유형의 제약 조건을 제거하기 위해 변환을 사용하는 방법에 대한 훌륭한 장이 있습니다.)

이 재 매개 변수화는 데이터 로그 가능성 함수에 거의 영향을주지 않습니다. 변환을 반전하여 정밀도 행렬을 되 찾아야하지만 이전에 미치는 영향은 더 복잡합니다. 주어진 정밀도 행렬의 확률은 Wishart 분포에 의해 주어집니다. 변환 된 행렬의 확률은 얼마입니까?

단조 함수 $ g $를 1 차원 랜덤 변수 $ X $, $ Y = g (X) $에 적용하면 $ Y $에 대한 밀도는 다음과 같이 주어집니다.

$$ f_Y(y) = | \frac{d}{dy}(g^{-1}(y)) | f_X(g^{-1}(y)) $$

$ g ^ {-1} $ 용어의 파생어는 $ g $가 로컬 볼륨을 변경하는 방식을 설명합니다. 고차원 랜덤 변수의 경우 수정 인자는 $ g ^ {-1} $의 야 코비 행렬 행렬식의 절대 값입니다 ( 여기 참조).

역변환의 야 코비 행렬을 로그 사전 우도 함수에 추가해야합니다. 다행 Bijector TFP의 Bijector 클래스 Bijector 수 있습니다.

Bijector 클래스는 확률 밀도 함수에서 변수를 변경하는 데 사용되는 가역적이고 부드러운 함수를 나타내는 데 사용됩니다. Bijector에는 모두 변환을 수행하는 forward() 메서드,이를 inverse() 하는 inverse() 메서드, pdf를 다시 매개 변수화 할 때 필요한 Jacobian 수정을 제공하는 forward_log_det_jacobian()inverse_log_det_jacobian() 메서드가 있습니다.

TFP는 Chain 연산자를 통해 합성을 통해 결합하여 매우 복잡한 변환을 형성 할 수있는 유용한 bijector 모음을 제공합니다. 우리의 경우 다음 3 개의 bijector를 구성합니다 (체인의 작업은 오른쪽에서 왼쪽으로 수행됨).

  1. 변환의 첫 번째 단계는 정밀도 행렬에서 촐레 스키 분해를 수행하는 것입니다. 이를위한 Bijector 클래스는 없습니다. 그러나CholeskyOuterProduct bijector는 2 개의 Cholesky 요인의 곱을 취합니다. Invert 연산자를 사용하여 해당 작업의 역을 사용할 수 있습니다.
  2. 다음 단계는 촐레 스키 인자의 대각선 요소에 대한 로그를 취하는 것입니다. TransformDiagonal bijector와 Exp bijector의 역을 통해이를 수행합니다.
  3. 마지막으로 FillTriangular bijector의 역을 사용하여 행렬의 아래쪽 삼각형 부분을 벡터로 평평하게 만듭니다.
# Our transform has 3 stages that we chain together via composition:
precision_to_unconstrained = tfb.Chain([
    # step 3: flatten the lower triangular portion of the matrix
    tfb.Invert(tfb.FillTriangular(validate_args=VALIDATE_ARGS)),
    # step 2: take the log of the diagonals    
    tfb.TransformDiagonal(tfb.Invert(tfb.Exp(validate_args=VALIDATE_ARGS))),
    # step 1: decompose the precision matrix into its Cholesky factors
    tfb.Invert(tfb.CholeskyOuterProduct(validate_args=VALIDATE_ARGS)),
])
# sanity checks
m = tf.constant([[1., 2.], [2., 8.]])
m_fwd = precision_to_unconstrained.forward(m)
m_inv = precision_to_unconstrained.inverse(m_fwd)

# bijectors handle tensors of values, too!
m2 = tf.stack([m, tf.eye(2)])
m2_fwd = precision_to_unconstrained.forward(m2)
m2_inv = precision_to_unconstrained.inverse(m2_fwd)

print('single input:')
print('m:\n', m.numpy())
print('precision_to_unconstrained(m):\n', m_fwd.numpy())
print('inverse(precision_to_unconstrained(m)):\n', m_inv.numpy())
print()

print('tensor of inputs:')
print('m2:\n', m2.numpy())
print('precision_to_unconstrained(m2):\n', m2_fwd.numpy())
print('inverse(precision_to_unconstrained(m2)):\n', m2_inv.numpy())
single input:
m:
 [[1. 2.]
 [2. 8.]]
precision_to_unconstrained(m):
 [0.6931472 2.        0.       ]
inverse(precision_to_unconstrained(m)):
 [[1. 2.]
 [2. 8.]]

tensor of inputs:
m2:
 [[[1. 2.]
  [2. 8.]]

 [[1. 0.]
  [0. 1.]]]
precision_to_unconstrained(m2):
 [[0.6931472 2.        0.       ]
 [0.        0.        0.       ]]
inverse(precision_to_unconstrained(m2)):
 [[[1. 2.]
  [2. 8.]]

 [[1. 0.]
  [0. 1.]]]

TransformedDistribution 클래스는 분포에 bijector를 적용하고 log_prob() 필요한 Jacobian 수정을 log_prob() 하는 프로세스를 자동화합니다. 우리의 새로운 사전은 다음과 같습니다.

def log_lik_prior_transformed(transformed_precisions):
  rv_precision = tfd.TransformedDistribution(
      tfd.WishartTriL(
          df=PRIOR_DF,
          scale_tril=tf.linalg.cholesky(PRIOR_SCALE),
          validate_args=VALIDATE_ARGS,
          allow_nan_stats=ALLOW_NAN_STATS),
      bijector=precision_to_unconstrained,
      validate_args=VALIDATE_ARGS)
  return rv_precision.log_prob(transformed_precisions)
# Check against the numpy implementation.  Note that when comparing, we need
# to add in the Jacobian correction.
precisions = np.stack([np.eye(2, dtype=np.float32), true_precision])
transformed_precisions = precision_to_unconstrained.forward(precisions)
lik_tf = log_lik_prior_transformed(transformed_precisions).numpy()
corrections = precision_to_unconstrained.inverse_log_det_jacobian(
    transformed_precisions, event_ndims=1).numpy()
n = precisions.shape[0]

for i in range(n):
  print(i)
  print('numpy:', log_lik_prior_numpy(precisions[i]) + corrections[i])
  print('tensorflow:', lik_tf[i])
0
numpy: -0.8488930160357633
tensorflow: -0.84889317
1
numpy: -7.305657151741624
tensorflow: -7.305659

데이터 로그 가능성에 대해 변환을 반전하면됩니다.

precision = precision_to_unconstrained.inverse(transformed_precision)

우리는 실제로 정밀도 행렬의 촐레 스키 분해를 원하기 때문에 여기서 부분 역을 수행하는 것이 더 효율적일 것입니다. 그러나 우리는 나중에 최적화를 남겨두고 독자를위한 연습으로 부분 역수를 남겨 둘 것입니다.

def log_lik_data_transformed(transformed_precisions, replicated_data):
  # We recover the precision matrix by inverting our bijector.  This is
  # inefficient since we really want the Cholesky decomposition of the
  # precision matrix, and the bijector has that in hand during the inversion,
  # but we'll worry about efficiency later.
  n = tf.shape(transformed_precisions)[0]
  precisions = precision_to_unconstrained.inverse(transformed_precisions)
  precisions_cholesky = tf.linalg.cholesky(precisions)
  covariances = tf.linalg.cholesky_solve(
      precisions_cholesky, tf.linalg.eye(2, batch_shape=[n]))
  rv_data = tfd.MultivariateNormalFullCovariance(
      loc=tf.zeros([n, 2]),
      covariance_matrix=covariances,
      validate_args=VALIDATE_ARGS,
      allow_nan_stats=ALLOW_NAN_STATS)

  return tf.reduce_sum(rv_data.log_prob(replicated_data), axis=0)
# sanity check
precisions = np.stack([np.eye(2, dtype=np.float32), true_precision])
transformed_precisions = precision_to_unconstrained.forward(precisions)
lik_tf = log_lik_data_transformed(
    transformed_precisions, replicated_data).numpy()

for i in range(precisions.shape[0]):
  print(i)
  print('numpy:', log_lik_data_numpy(precisions[i], my_data))
  print('tensorflow:', lik_tf[i])
0
numpy: -430.71218815801365
tensorflow: -430.71207
1
numpy: -280.81822950593767
tensorflow: -280.8182

다시 우리는 새로운 함수를 클로저로 감 쌉니다.

def get_log_lik_transformed(data, n_chains=1):
  # The data argument that is passed in will be available to the inner function
  # below so it doesn't have to be passed in as a parameter.
  replicated_data = np.tile(np.expand_dims(data, axis=1), reps=[1, n_chains, 1])

  @tf.function(autograph=False)
  def _log_lik_transformed(transformed_precisions):
    return (log_lik_data_transformed(transformed_precisions, replicated_data) +
            log_lik_prior_transformed(transformed_precisions))

  return _log_lik_transformed
# make sure everything runs
log_lik_fn = get_log_lik_transformed(my_data)
m = tf.eye(2)[tf.newaxis, ...]
lik = log_lik_fn(precision_to_unconstrained.forward(m)).numpy()
print(lik)
[-431.5611]

견본 추출

이제 잘못된 매개 변수 값으로 인해 샘플러가 파열되는 것에 대해 걱정할 필요가 없으므로 실제 샘플을 생성 해 보겠습니다.

샘플러는 매개 변수의 제한되지 않은 버전으로 작동하므로 초기 값을 제한되지 않은 버전으로 변환해야합니다. 우리가 생성하는 샘플은 모두 제한되지 않은 형태가 될 것이므로 다시 변환해야합니다. Bijector는 벡터화되므로 쉽게 할 수 있습니다.

# We'll choose a proper random initial value this time
np.random.seed(123)
initial_value_cholesky = np.array(
    [[0.5 + np.random.uniform(), 0.0],
     [-0.5 + np.random.uniform(), 0.5 + np.random.uniform()]],
    dtype=np.float32)
initial_value =  initial_value_cholesky.dot(
  initial_value_cholesky.T)[np.newaxis, ...]

# The sampler works with unconstrained values, so we'll transform our initial
# value
initial_value_transformed = precision_to_unconstrained.forward(
  initial_value).numpy()
# Sample!
@tf.function(autograph=False)
def sample():
  tf.random.set_seed(123)
  log_lik_fn = get_log_lik_transformed(my_data, n_chains=1)

  num_results = 1000
  num_burnin_steps = 1000

  states, is_accepted = tfp.mcmc.sample_chain(
      num_results=num_results,
      num_burnin_steps=num_burnin_steps,
      current_state=[
          initial_value_transformed,
      ],
      kernel=tfp.mcmc.HamiltonianMonteCarlo(
          target_log_prob_fn=log_lik_fn,
          step_size=0.1,
          num_leapfrog_steps=3),
      trace_fn=lambda _, pkr: pkr.is_accepted,
      seed=123)
  # transform samples back to their constrained form
  precision_samples = [precision_to_unconstrained.inverse(s) for s in states]
  return states, precision_samples, is_accepted

states, precision_samples, is_accepted = sample()

샘플러의 결과 평균을 분석적 사후 평균과 비교해 봅시다!

print('True posterior mean:\n', posterior_mean)
print('Sample mean:\n', np.mean(np.reshape(precision_samples, [-1, 2, 2]), axis=0))
True posterior mean:
 [[ 0.9641779 -1.6534661]
 [-1.6534661  3.8683164]]
Sample mean:
 [[ 1.4315274  -0.25587553]
 [-0.25587553  0.5740424 ]]

우리는 멀리 떨어져 있습니다! 이유를 알아 봅시다. 먼저 샘플을 살펴 보겠습니다.

np.reshape(precision_samples, [-1, 2, 2])
array([[[ 1.4315385, -0.2558777],
        [-0.2558777,  0.5740494]],

       [[ 1.4315385, -0.2558777],
        [-0.2558777,  0.5740494]],

       [[ 1.4315385, -0.2558777],
        [-0.2558777,  0.5740494]],

       ...,

       [[ 1.4315385, -0.2558777],
        [-0.2558777,  0.5740494]],

       [[ 1.4315385, -0.2558777],
        [-0.2558777,  0.5740494]],

       [[ 1.4315385, -0.2558777],
        [-0.2558777,  0.5740494]]], dtype=float32)

오-모두 같은 가치를 가지고있는 것 같습니다. 이유를 알아 봅시다.

kernel_results_ 변수는 각 상태에서 샘플러에 대한 정보를 제공하는 명명 된 튜플입니다. is_accepted 필드가 여기서 핵심입니다.

# Look at the acceptance for the last 100 samples
print(np.squeeze(is_accepted)[-100:])
print('Fraction of samples accepted:', np.mean(np.squeeze(is_accepted)))
[False False False False False False False False False False False False
 False False False False False False False False False False False False
 False False False False False False False False False False False False
 False False False False False False False False False False False False
 False False False False False False False False False False False False
 False False False False False False False False False False False False
 False False False False False False False False False False False False
 False False False False False False False False False False False False
 False False False False]
Fraction of samples accepted: 0.0

모든 샘플이 거부되었습니다! 아마도 우리의 스텝 크기가 너무 컸을 것입니다. 순전히 stepsize=0.1 선택했습니다.

버전 3 : 적응 형 단계 크기로 샘플링

임의의 단계 크기 선택을 사용한 샘플링이 실패했기 때문에 몇 가지 의제 항목이 있습니다.

  1. 적응 형 단계 크기를 구현하고
  2. 수렴 검사를 수행합니다.

tensorflow_probability/python/mcmc/hmc.py 에는 적응 형 단계 크기를 구현하기위한 멋진 샘플 코드가 있습니다. 아래에서 수정했습니다.

각 단계마다 별도의 sess.run() 문이 있습니다. 필요한 경우 단계별 진단을 쉽게 추가 할 수 있으므로 디버깅에 정말 유용합니다. 예를 들어 점진적 진행 상황, 각 단계의 시간 등을 표시 할 수 있습니다.

팁 : 샘플링을 엉망으로 만드는 일반적인 방법 중 하나는 그래프를 루프에서 늘리는 것입니다. (세션이 실행되기 전에 그래프를 마무리하는 이유는 이러한 문제를 방지하기위한 것입니다.) 그러나 finalize ()를 사용하지 않은 경우 코드가 크롤링 속도가 느려지는 경우 유용한 디버깅 검사는 그래프를 인쇄하는 것입니다. len(mygraph.get_operations()) 를 통해 각 단계에서 크기-길이가 증가하면 아마도 뭔가 나쁜 일을하고있을 것입니다.

여기서 우리는 3 개의 독립적 인 체인을 운영 할 것입니다. 체인을 비교하면 수렴을 확인하는 데 도움이됩니다.

# The number of chains is determined by the shape of the initial values.
# Here we'll generate 3 chains, so we'll need a tensor of 3 initial values.
N_CHAINS = 3

np.random.seed(123)

initial_values = []
for i in range(N_CHAINS):
  initial_value_cholesky = np.array(
      [[0.5 + np.random.uniform(), 0.0],
       [-0.5 + np.random.uniform(), 0.5 + np.random.uniform()]],
      dtype=np.float32)
  initial_values.append(initial_value_cholesky.dot(initial_value_cholesky.T))
initial_values = np.stack(initial_values)

initial_values_transformed = precision_to_unconstrained.forward(
  initial_values).numpy()
@tf.function(autograph=False)
def sample():
  tf.random.set_seed(123)
  log_lik_fn = get_log_lik_transformed(my_data)

  # Tuning acceptance rates:
  dtype = np.float32
  num_burnin_iter = 3000
  num_warmup_iter = int(0.8 * num_burnin_iter) 
  num_chain_iter = 2500

  # Set the target average acceptance ratio for the HMC as suggested by
  # Beskos et al. (2013):
  # https://projecteuclid.org/download/pdfview_1/euclid.bj/1383661192
  target_accept_rate = 0.651

  # Initialize the HMC sampler.
  hmc = tfp.mcmc.HamiltonianMonteCarlo(
      target_log_prob_fn=log_lik_fn,
      step_size=0.01,
      num_leapfrog_steps=3)

  # Adapt the step size using standard adaptive MCMC procedure. See Section 4.2
  # of Andrieu and Thoms (2008):
  # http://www4.ncsu.edu/~rsmith/MA797V_S12/Andrieu08_AdaptiveMCMC_Tutorial.pdf
  adapted_kernel = tfp.mcmc.SimpleStepSizeAdaptation(
      inner_kernel=hmc,
      num_adaptation_steps=num_warmup_iter,
      target_accept_prob=target_accept_rate)

  states, is_accepted = tfp.mcmc.sample_chain(
      num_results=num_chain_iter,
      num_burnin_steps=num_burnin_iter,
      current_state=initial_values_transformed,
      kernel=adapted_kernel,
      trace_fn=lambda _, pkr: pkr.inner_results.is_accepted,
      parallel_iterations=1)
  # transform samples back to their constrained form
  precision_samples = precision_to_unconstrained.inverse(states)
  return states, precision_samples, is_accepted

states, precision_samples, is_accepted = sample()

빠른 확인 : 샘플링 중 수락 률은 목표 인 0.651에 가깝습니다.

print(np.mean(is_accepted))
0.6190666666666667

더 좋은 점은 표본 평균과 표준 편차가 분석 솔루션에서 기대하는 것과 비슷하다는 것입니다.

precision_samples_reshaped = np.reshape(precision_samples, [-1, 2, 2])
print('True posterior mean:\n', posterior_mean)
print('Mean of samples:\n', np.mean(precision_samples_reshaped, axis=0))
True posterior mean:
 [[ 0.9641779 -1.6534661]
 [-1.6534661  3.8683164]]
Mean of samples:
 [[ 0.96426415 -1.6519215 ]
 [-1.6519215   3.8614824 ]]

print('True posterior standard deviation:\n', posterior_sd)
print('Standard deviation of samples:\n', np.std(precision_samples_reshaped, axis=0))
True posterior standard deviation:
 [[0.13435492 0.25050813]
 [0.25050813 0.53903675]]
Standard deviation of samples:
 [[0.13622096 0.25235635]
 [0.25235635 0.5394968 ]]

수렴 확인

일반적으로 확인할 분석 솔루션이 없으므로 샘플러가 수렴되었는지 확인해야합니다. 하나의 표준 검사는 Gelman-Rubin $ \ hat {R} $ 통계로, 여러 샘플링 체인이 필요합니다. $ \ hat {R} $는 체인이 동일하게 분포 된 경우 체인 간의 분산 (평균의)이 예상되는 값을 초과하는 정도를 측정합니다. 1에 가까운 $ \ hat {R} $ 값은 대략적인 수렴을 나타내는 데 사용됩니다. 자세한 내용 은 소스 를 참조하십시오.

r_hat = tfp.mcmc.potential_scale_reduction(precision_samples).numpy()
print(r_hat)
[[1.0038308 1.0005717]
 [1.0005717 1.0006068]]

모델 비평

분석적 해결책이 없다면 실제 모델 비평을 할 때가 될 것입니다.

다음은 지상 진실 (빨간색)과 관련된 샘플 구성 요소의 몇 가지 빠른 히스토그램입니다. 샘플은 이전에 단위 행렬을 향해 샘플 정밀도 행렬 값에서 축소되었습니다.

fig, axes = plt.subplots(2, 2, sharey=True)
fig.set_size_inches(8, 8)
for i in range(2):
  for j in range(2):
    ax = axes[i, j]
    ax.hist(precision_samples_reshaped[:, i, j])
    ax.axvline(true_precision[i, j], color='red',
               label='True precision')
    ax.axvline(sample_precision[i, j], color='red', linestyle=':',
               label='Sample precision')
    ax.set_title('precision[%d, %d]' % (i, j))
plt.tight_layout()
plt.legend()
plt.show()

png

정밀도 성분 쌍의 일부 산점도는 사후의 상관 구조로 인해 실제 사후 값이 위의 주변에서 나타나는 것만 큼 가능성이 낮다는 것을 보여줍니다.

fig, axes = plt.subplots(4, 4)
fig.set_size_inches(12, 12)
for i1 in range(2):
  for j1 in range(2):
    index1 = 2 * i1 + j1
    for i2 in range(2):
      for j2 in range(2):
        index2 = 2 * i2 + j2
        ax = axes[index1, index2]
        ax.scatter(precision_samples_reshaped[:, i1, j1],
                   precision_samples_reshaped[:, i2, j2], alpha=0.1)
        ax.axvline(true_precision[i1, j1], color='red')
        ax.axhline(true_precision[i2, j2], color='red')
        ax.axvline(sample_precision[i1, j1], color='red', linestyle=':')
        ax.axhline(sample_precision[i2, j2], color='red', linestyle=':')
        ax.set_title('(%d, %d) vs (%d, %d)' % (i1, j1, i2, j2))
plt.tight_layout()
plt.show()

png

버전 4 : 제한된 매개 변수의 더 간단한 샘플링

Bijectors는 정밀 매트릭스 샘플링을 간단하게 만들었지 만 제한되지 않은 표현으로 /에서 상당한 양의 수동 변환이있었습니다. 더 쉬운 방법이 있습니다!

TransformedTransitionKernel

TransformedTransitionKernel 은이 프로세스를 단순화합니다. 샘플러를 래핑하고 모든 변환을 처리합니다. 제약되지 않은 매개 변수 값을 제약 된 값에 매핑하는 bijector 목록을 인수로 사용합니다. 그래서 여기서 우리는 위에서 사용한 precision_to_unconstrained bijector의 역이 필요합니다. 우리는 tfb.Invert(precision_to_unconstrained) 사용할 수 있지만 역의 역을 취하는 것이 포함됩니다 (TensorFlow는 tf.Invert(tf.Invert())tf.Identity()) 로 단순화 tf.Invert(tf.Invert()) 큼 충분히 똑똑하지 않으므로 대신 우리는 새 bijector를 작성하겠습니다.

비 젝터 제한

# The bijector we need for the TransformedTransitionKernel is the inverse of
# the one we used above
unconstrained_to_precision = tfb.Chain([
    # step 3: take the product of Cholesky factors
    tfb.CholeskyOuterProduct(validate_args=VALIDATE_ARGS),
    # step 2: exponentiate the diagonals    
    tfb.TransformDiagonal(tfb.Exp(validate_args=VALIDATE_ARGS)),
    # step 1: map a vector to a lower triangular matrix
    tfb.FillTriangular(validate_args=VALIDATE_ARGS),
])
# quick sanity check
m = [[1., 2.], [2., 8.]]
m_inv = unconstrained_to_precision.inverse(m).numpy()
m_fwd = unconstrained_to_precision.forward(m_inv).numpy()

print('m:\n', m)
print('unconstrained_to_precision.inverse(m):\n', m_inv)
print('forward(unconstrained_to_precision.inverse(m)):\n', m_fwd)
m:
 [[1.0, 2.0], [2.0, 8.0]]
unconstrained_to_precision.inverse(m):
 [0.6931472 2.        0.       ]
forward(unconstrained_to_precision.inverse(m)):
 [[1. 2.]
 [2. 8.]]

TransformedTransitionKernel을 사용한 샘플링

TransformedTransitionKernel 사용하면 더 이상 매개 변수를 수동으로 변환 할 필요가 없습니다. 초기 값과 샘플은 모두 정밀 행렬입니다. 제한되지 않는 bijector를 커널에 전달하기 만하면 모든 변환이 처리됩니다.

@tf.function(autograph=False)
def sample():
  tf.random.set_seed(123)
  log_lik_fn = get_log_lik(my_data)

  # Tuning acceptance rates:
  dtype = np.float32
  num_burnin_iter = 3000
  num_warmup_iter = int(0.8 * num_burnin_iter) 
  num_chain_iter = 2500

  # Set the target average acceptance ratio for the HMC as suggested by
  # Beskos et al. (2013):
  # https://projecteuclid.org/download/pdfview_1/euclid.bj/1383661192
  target_accept_rate = 0.651

  # Initialize the HMC sampler.
  hmc = tfp.mcmc.HamiltonianMonteCarlo(
      target_log_prob_fn=log_lik_fn,
      step_size=0.01,
      num_leapfrog_steps=3)

  ttk = tfp.mcmc.TransformedTransitionKernel(
      inner_kernel=hmc, bijector=unconstrained_to_precision)

  # Adapt the step size using standard adaptive MCMC procedure. See Section 4.2
  # of Andrieu and Thoms (2008):
  # http://www4.ncsu.edu/~rsmith/MA797V_S12/Andrieu08_AdaptiveMCMC_Tutorial.pdf
  adapted_kernel = tfp.mcmc.SimpleStepSizeAdaptation(
      inner_kernel=ttk,
      num_adaptation_steps=num_warmup_iter,
      target_accept_prob=target_accept_rate)

  states = tfp.mcmc.sample_chain(
      num_results=num_chain_iter,
      num_burnin_steps=num_burnin_iter,
      current_state=initial_values,
      kernel=adapted_kernel,
      trace_fn=None,
      parallel_iterations=1)
  # transform samples back to their constrained form
  return states

precision_samples  = sample()

수렴 확인

$ \ hat {R} $ 수렴 검사가 좋아 보입니다!

r_hat = tfp.mcmc.potential_scale_reduction(precision_samples).numpy()
print(r_hat)
[[1.0013582 1.0019467]
 [1.0019467 1.0011805]]

분석적 사후와 비교

다시 분석 사후와 비교해 보겠습니다.

# The output samples have shape [n_steps, n_chains, 2, 2]
# Flatten them to [n_steps * n_chains, 2, 2] via reshape:
precision_samples_reshaped = np.reshape(precision_samples, [-1, 2, 2])
print('True posterior mean:\n', posterior_mean)
print('Mean of samples:\n', np.mean(precision_samples_reshaped, axis=0))
True posterior mean:
 [[ 0.9641779 -1.6534661]
 [-1.6534661  3.8683164]]
Mean of samples:
 [[ 0.96687526 -1.6552585 ]
 [-1.6552585   3.867676  ]]

print('True posterior standard deviation:\n', posterior_sd)
print('Standard deviation of samples:\n', np.std(precision_samples_reshaped, axis=0))
True posterior standard deviation:
 [[0.13435492 0.25050813]
 [0.25050813 0.53903675]]
Standard deviation of samples:
 [[0.13329624 0.24913791]
 [0.24913791 0.53983927]]

최적화

이제 종단 간 실행이 완료되었으므로보다 최적화 된 버전을 수행해 보겠습니다. 이 예제에서는 속도가 그다지 중요하지 않지만 일단 행렬이 커지면 몇 가지 최적화가 큰 차이를 만들 것입니다.

우리가 할 수있는 한 가지 큰 속도 향상은 촐레 스키 분해 측면에서 다시 매개 변수화하는 것입니다. 그 이유는 데이터 우도 함수에 공분산과 정밀도 행렬이 모두 필요하기 때문입니다. 행렬 반전은 비용이 많이 들고 ($ n \ x n $ 행렬의 경우 $ O (n ^ 3) $), 공분산 또는 정밀도 행렬 측면에서 매개 변수화하면 다른 것을 얻기 위해 반전을 수행해야합니다.

다시 말해, 실수의 양의 정의 대칭 행렬 $ M $는 $ M = LL ^ T $ 형식의 곱으로 분해 될 수 있습니다. 여기서 행렬 $ L $은 아래쪽 삼각형이고 양의 대각선을가집니다. $ M $의 촐레 스키 분해가 주어지면 $ M $ (하위 삼각 행렬과 상위 삼각 행렬의 곱)과 $ M ^ {-1} $ (역 치환을 통해)를 더 효율적으로 얻을 수 있습니다. 촐레 스키 분해 자체는 계산하기에 저렴하지 않지만 촐레 스키 인자로 매개 변수화하면 초기 매개 변수 값의 Choleksy 분해 만 계산하면됩니다.

공분산 행렬의 촐레 스키 분해 사용

TFP에는 공분산 행렬의 촐레 스키 인자로 모수화 된 다변량 정규 분포 버전 인 MultivariateNormalTriL 이 있습니다. 따라서 공분산 행렬의 촐레 스키 인자를 매개 변수화하면 데이터 로그 우도를 효율적으로 계산할 수 있습니다. 문제는 유사한 효율성으로 이전 로그 가능성을 계산하는 것입니다.

샘플의 촐레 스키 인자와 함께 작동하는 역 Wishart 분포의 버전이 있다면 모두 설정되었을 것입니다. 아아, 그렇지 않습니다. (하지만 팀은 코드 제출을 환영합니다!) 대안으로, Bijector 체인과 함께 샘플의 Cholesky 인수와 함께 작동하는 Wishart 배포 버전을 사용할 수 있습니다.

현재 우리는 작업을 효율적으로 만들기 위해 몇 가지 스톡 바 이젝터를 놓치고 있지만, 저는 그 과정을 연습으로 보여주고 TFP의 바 이젝터의 힘에 대한 유용한 예시를 보여 드리고 싶습니다.

촐레 스키 요인에서 작동하는 Wishart 분포

Wishart 분포에는 입력 및 출력 행렬이 촐레 스키 인자 여야 함을 지정하는 유용한 플래그 input_output_cholesky 가 있습니다. 전체 행렬보다 촐레 스키 인자로 작업하는 것이 더 효율적이고 수치 적으로 유리하므로 이것이 바람직한 이유입니다. 플래그의 의미론에 대한 중요한 점 : 분포에 대한 입력 및 출력의 표현이 변경되어야한다는 표시 일뿐입니다. 이는 분포의 전체 재 매개 변수화를 나타내지 않으며 log_prob() 대한 Jacobian 수정을 포함합니다. 함수. 우리는 실제로이 완전한 재 매개 변수화를 수행하기를 원하므로 자체 배포를 구축 할 것입니다.

# An optimized Wishart distribution that has been transformed to operate on
# Cholesky factors instead of full matrices.  Note that we gain a modest
# additional speedup by specifying the Cholesky factor of the scale matrix
# (i.e. by passing in the scale_tril parameter instead of scale).

class CholeskyWishart(tfd.TransformedDistribution):
  """Wishart distribution reparameterized to use Cholesky factors."""
  def __init__(self,
      df,
      scale_tril,
      validate_args=False,
      allow_nan_stats=True,
      name='CholeskyWishart'):
    # Wishart has a bunch of methods that we want to support but not
    # implement.  We'll subclass TransformedDistribution here to take care of
    # those.  We'll override the few for which speed is critical and implement
    # them with a separate Wishart for which input_output_cholesky=True
    super(CholeskyWishart, self).__init__(
        distribution=tfd.WishartTriL(
            df=df,
            scale_tril=scale_tril,
            input_output_cholesky=False,
            validate_args=validate_args,
            allow_nan_stats=allow_nan_stats),
        bijector=tfb.Invert(tfb.CholeskyOuterProduct()),
        validate_args=validate_args,
        name=name
    )
    # Here's the Cholesky distribution we'll use for log_prob() and sample()
    self.cholesky = tfd.WishartTriL(
        df=df,
        scale_tril=scale_tril,
        input_output_cholesky=True,
        validate_args=validate_args,
        allow_nan_stats=allow_nan_stats)

  def _log_prob(self, x):
    return (self.cholesky.log_prob(x) +
            self.bijector.inverse_log_det_jacobian(x, event_ndims=2))

  def _sample_n(self, n, seed=None):
    return self.cholesky._sample_n(n, seed)
# some checks
PRIOR_SCALE_CHOLESKY = np.linalg.cholesky(PRIOR_SCALE)

@tf.function(autograph=False)
def compute_log_prob(m):
  w_transformed = tfd.TransformedDistribution(
      tfd.WishartTriL(df=PRIOR_DF, scale_tril=PRIOR_SCALE_CHOLESKY),
      bijector=tfb.Invert(tfb.CholeskyOuterProduct()))
  w_optimized = CholeskyWishart(
      df=PRIOR_DF, scale_tril=PRIOR_SCALE_CHOLESKY)
  log_prob_transformed = w_transformed.log_prob(m)
  log_prob_optimized = w_optimized.log_prob(m)
  return log_prob_transformed, log_prob_optimized

for matrix in [np.eye(2, dtype=np.float32),
               np.array([[1., 0.], [2., 8.]], dtype=np.float32)]:
  log_prob_transformed, log_prob_optimized = [
      t.numpy() for t in compute_log_prob(matrix)]
  print('Transformed Wishart:', log_prob_transformed)
  print('Optimized Wishart', log_prob_optimized)
Transformed Wishart: -0.84889317
Optimized Wishart -0.84889317
Transformed Wishart: -99.269455
Optimized Wishart -99.269455

역 Wishart 분포 구축

우리는 공분산 행렬 $ C $를 $ C = LL ^ T $로 분해했습니다. 여기서 $ L $은 더 낮은 삼각형이고 양의 대각선을가집니다. $ C \ sim W ^ {-1} (\ nu, V) $에서 $ L $의 확률을 알고 싶습니다. 여기서 $ W ^ {-1} $는 Wishart 역 분포입니다.

역 Wishart 분포는 $ C \ sim W ^ {-1} (\ nu, V) $이면 정밀도 행렬 $ C ^ {-1} \ sim W (\ nu, V ^ {-1 }) $. 따라서 Wishart 분포를 매개 변수로 취하는 TransformedDistribution 과 정밀도 행렬의 Cholesky 인자를 역의 Cholesky 인자에 매핑하는 bijector를 통해 $ L $의 확률을 얻을 수 있습니다.

$ C ^ {-1} $의 촐레 스키 인자에서 $ L $로 구하는 간단한 (하지만 매우 효율적이지는 않음) 방법은 역해를 통해 촐레 스키 인자를 반전 한 다음 이러한 반전 된 인자로부터 공분산 행렬을 형성하는 것입니다. 그런 다음 촐레 스키 분해를 수행합니다.

$ C ^ {-1} = MM ^ T $의 촐레 스키 분해를합시다. $ M $는 하부 삼각형이므로 MatrixInverseTriL bijector를 사용하여 반전 할 수 있습니다.

$ M ^ {-1} $에서 $ C $를 만드는 것은 약간 까다 롭습니다. $ C = (MM ^ T) ^ {-1} = M ^ {-T} M ^ {-1} = M ^ {-T } (M ^ {-T}) ^ T $. $ M $는 아래쪽 삼각형이므로 $ M ^ {-1} $도 아래쪽 삼각형이고 $ M ^ {-T} $는 위쪽 삼각형입니다. CholeskyOuterProduct() bijector는 더 낮은 삼각형 행렬에서만 작동하므로 $ M ^ {-T} $에서 $ C $를 형성하는 데 사용할 수 없습니다. 해결 방법은 행렬의 행과 열을 순회하는 일련의 bijector입니다.

다행히이 논리는 CholeskyToInvCholesky bijector에 캡슐화되어 있습니다!

모든 조각을 결합

# verify that the bijector works
m = np.array([[1., 0.], [2., 8.]], dtype=np.float32)
c_inv = m.dot(m.T)
c = np.linalg.inv(c_inv)
c_chol = np.linalg.cholesky(c)
wishart_cholesky_to_iw_cholesky = tfb.CholeskyToInvCholesky()
w_fwd = wishart_cholesky_to_iw_cholesky.forward(m).numpy()

print('numpy =\n', c_chol)
print('bijector =\n', w_fwd)
numpy =
 [[ 1.0307764   0.        ]
 [-0.03031695  0.12126781]]
bijector =
 [[ 1.0307764   0.        ]
 [-0.03031695  0.12126781]]

최종 배포

Cholesky 요인에 대해 작동하는 역 Wishart는 다음과 같습니다.

inverse_wishart_cholesky = tfd.TransformedDistribution(
    distribution=CholeskyWishart(
        df=PRIOR_DF,
        scale_tril=np.linalg.cholesky(np.linalg.inv(PRIOR_SCALE))),
    bijector=tfb.CholeskyToInvCholesky())

우리는 역 Wishart를 가지고 있지만, bijector에서 Cholesky 분해를해야하기 때문에 다소 느립니다. 정밀도 매트릭스 매개 변수화로 돌아가서 최적화를 위해 무엇을 할 수 있는지 살펴 보겠습니다.

Final (!) 버전 : 정밀도 행렬의 Cholesky 분해 사용

또 다른 방법은 정밀도 행렬의 촐레 스키 인자를 사용하는 것입니다. 여기서 사전 우도 함수는 계산하기 쉽지만 TFP에는 정밀도에 의해 매개 변수화 된 다변량 정규화 버전이 없기 때문에 데이터 로그 우도 함수는 더 많은 작업이 필요합니다.

최적화 된 이전 로그 가능성

우리는 위에서 구축 한 CholeskyWishart 배포판을 사용하여 사전을 구성합니다.

# Our new prior.
PRIOR_SCALE_CHOLESKY = np.linalg.cholesky(PRIOR_SCALE)

def log_lik_prior_cholesky(precisions_cholesky):
  rv_precision = CholeskyWishart(
      df=PRIOR_DF,
      scale_tril=PRIOR_SCALE_CHOLESKY,
      validate_args=VALIDATE_ARGS,
      allow_nan_stats=ALLOW_NAN_STATS)
  return rv_precision.log_prob(precisions_cholesky)
# Check against the slower TF implementation and the NumPy implementation.
# Note that when comparing to NumPy, we need to add in the Jacobian correction.
precisions = [np.eye(2, dtype=np.float32),
              true_precision]
precisions_cholesky = np.stack([np.linalg.cholesky(m) for m in precisions])
precisions = np.stack(precisions)
lik_tf = log_lik_prior_cholesky(precisions_cholesky).numpy()
lik_tf_slow = tfd.TransformedDistribution(
    distribution=tfd.WishartTriL(
        df=PRIOR_DF, scale_tril=tf.linalg.cholesky(PRIOR_SCALE)),
    bijector=tfb.Invert(tfb.CholeskyOuterProduct())).log_prob(
    precisions_cholesky).numpy()
corrections = tfb.Invert(tfb.CholeskyOuterProduct()).inverse_log_det_jacobian(
    precisions_cholesky, event_ndims=2).numpy()
n = precisions.shape[0]

for i in range(n):
  print(i)
  print('numpy:', log_lik_prior_numpy(precisions[i]) + corrections[i])
  print('tensorflow slow:', lik_tf_slow[i])
  print('tensorflow fast:', lik_tf[i])
0
numpy: -0.8488930160357633
tensorflow slow: -0.84889317
tensorflow fast: -0.84889317
1
numpy: -7.442875031036973
tensorflow slow: -7.442877
tensorflow fast: -7.442876

최적화 된 데이터 로그 가능성

TFP의 bijector를 사용하여 다변량 법선의 자체 버전을 구축 할 수 있습니다. 핵심 아이디어는 다음과 같습니다.

요소가 $ N (0, 1) $의 iid 샘플 인 열 벡터 $ X $가 있다고 가정합니다. $ \ text {mean} (X) = 0 $ 및 $ \ text {cov} (X) = I $

이제 $ Y = AX + b $로합시다. $ \ text {mean} (Y) = b $ 및 $ \ text {cov} (Y) = AA ^ T $가 있습니다.

따라서 우리는 $ AA ^ T = C $를 제공하는 iid 표준 정규 샘플의 벡터로의 아핀 변환 $ Ax + b $를 사용하여 평균 $ b $ 및 공분산 $ C $를 가진 벡터를 만들 수 있습니다. $ C $의 Cholesky 분해에는 원하는 속성이 있습니다. 그러나 다른 해결책이 있습니다.

$ P = C ^ {-1} $로하고 $ P $의 Cholesky 분해를 $ B $, 즉 $ BB ^ T = P $로 둡니다. 지금

$ P ^ {-1} = (BB ^ T) ^ {-1} = B ^ {-T} B ^ {-1} = B ^ {-T} (B ^ {-T}) ^ T $

따라서 원하는 평균과 공분산을 얻는 또 다른 방법은 affine 변환 $ Y = B ^ {-T} X + b $를 사용하는 것입니다.

우리의 접근 방식 ( 이 노트북 제공 ) :

  1. tfd.Independent() 를 사용하여 1 차원 Normal 확률 변수의 배치를 단일 다차원 확률 변수로 결합합니다. Independent()reinterpreted_batch_ndims 매개 변수는 이벤트 차원으로 재 해석되어야하는 배치 차원의 수를 지정합니다. In our case we create a 1-D batch of length 2 that we transform into a 1-D event of length 2, so reinterpreted_batch_ndims=1 .
  2. Apply a bijector to add the desired covariance: tfb.Invert(tfb.ScaleMatvecTriL(scale_tril=precision_cholesky, adjoint=True)) . Note that above we're multiplying our iid normal random variables by the transpose of the inverse of the Cholesky factor of the precision matrix $(B^{-T}X)$. The tfb.Invert takes care of inverting $B$, and the adjoint=True flag performs the transpose.
  3. Apply a bijector to add the desired offset: tfb.Shift(shift=shift) Note that we have to do the shift as a separate step from the initial inverted affine transform because otherwise the inverted scale is applied to the shift (since the inverse of $y=Ax+b$ is $x=A^{-1}y - A^{-1}b$).
class MVNPrecisionCholesky(tfd.TransformedDistribution):
  """Multivariate normal parameterized by loc and Cholesky precision matrix."""

  def __init__(self, loc, precision_cholesky, name=None):
    super(MVNPrecisionCholesky, self).__init__(
        distribution=tfd.Independent(
            tfd.Normal(loc=tf.zeros_like(loc),
                       scale=tf.ones_like(loc)),
            reinterpreted_batch_ndims=1),
        bijector=tfb.Chain([
            tfb.Shift(shift=loc),
            tfb.Invert(tfb.ScaleMatvecTriL(scale_tril=precision_cholesky,
                                  adjoint=True)),
        ]),
        name=name)
@tf.function(autograph=False)
def log_lik_data_cholesky(precisions_cholesky, replicated_data):
  n = tf.shape(precisions_cholesky)[0]  # number of precision matrices
  rv_data = MVNPrecisionCholesky(
      loc=tf.zeros([n, 2]),
      precision_cholesky=precisions_cholesky)
  return tf.reduce_sum(rv_data.log_prob(replicated_data), axis=0)
# check against the numpy implementation
true_precision_cholesky = np.linalg.cholesky(true_precision)
precisions = [np.eye(2, dtype=np.float32), true_precision]
precisions_cholesky = np.stack([np.linalg.cholesky(m) for m in precisions])
precisions = np.stack(precisions)
n = precisions_cholesky.shape[0]
replicated_data = np.tile(np.expand_dims(my_data, axis=1), reps=[1, 2, 1])
lik_tf = log_lik_data_cholesky(precisions_cholesky, replicated_data).numpy()

for i in range(n):
  print(i)
  print('numpy:', log_lik_data_numpy(precisions[i], my_data))
  print('tensorflow:', lik_tf[i])
0
numpy: -430.71218815801365
tensorflow: -430.71207
1
numpy: -280.81822950593767
tensorflow: -280.81824

Combined log likelihood function

Now we combine our prior and data log likelihood functions in a closure.

def get_log_lik_cholesky(data, n_chains=1):
  # The data argument that is passed in will be available to the inner function
  # below so it doesn't have to be passed in as a parameter.
  replicated_data = np.tile(np.expand_dims(data, axis=1), reps=[1, n_chains, 1])

  @tf.function(autograph=False)
  def _log_lik_cholesky(precisions_cholesky):
    return (log_lik_data_cholesky(precisions_cholesky, replicated_data) +
            log_lik_prior_cholesky(precisions_cholesky))

  return _log_lik_cholesky

Constraining bijector

Our samples are constrained to be valid Cholesky factors, which means they must be lower triangular matrices with positive diagonals. The TransformedTransitionKernel needs a bijector that maps unconstrained tensors to/from tensors with our desired constraints. We've removed the Cholesky decomposition from the bijector's inverse, which speeds things up.

unconstrained_to_precision_cholesky = tfb.Chain([
    # step 2: exponentiate the diagonals    
    tfb.TransformDiagonal(tfb.Exp(validate_args=VALIDATE_ARGS)),
    # step 1: expand the vector to a lower triangular matrix
    tfb.FillTriangular(validate_args=VALIDATE_ARGS),
])
# some checks
inv = unconstrained_to_precision_cholesky.inverse(precisions_cholesky).numpy()
fwd = unconstrained_to_precision_cholesky.forward(inv).numpy()
print('precisions_cholesky:\n', precisions_cholesky)
print('\ninv:\n', inv)
print('\nfwd(inv):\n', fwd)
precisions_cholesky:
 [[[ 1.         0.       ]
  [ 0.         1.       ]]

 [[ 1.1470785  0.       ]
  [-2.0647411  1.0000004]]]

inv:
 [[ 0.0000000e+00  0.0000000e+00  0.0000000e+00]
 [ 3.5762781e-07 -2.0647411e+00  1.3721828e-01]]

fwd(inv):
 [[[ 1.         0.       ]
  [ 0.         1.       ]]

 [[ 1.1470785  0.       ]
  [-2.0647411  1.0000004]]]

Initial values

We generate a tensor of initial values. We're working with Cholesky factors, so we generate some Cholesky factor initial values.

# The number of chains is determined by the shape of the initial values.
# Here we'll generate 3 chains, so we'll need a tensor of 3 initial values.
N_CHAINS = 3

np.random.seed(123)

initial_values_cholesky = []
for i in range(N_CHAINS):
  initial_values_cholesky.append(np.array(
      [[0.5 + np.random.uniform(), 0.0],
       [-0.5 + np.random.uniform(), 0.5 + np.random.uniform()]],
      dtype=np.float32))
initial_values_cholesky = np.stack(initial_values_cholesky)

Sampling

We sample N_CHAINS chains using the TransformedTransitionKernel .

@tf.function(autograph=False)
def sample():
  tf.random.set_seed(123)
  log_lik_fn = get_log_lik_cholesky(my_data)

  # Tuning acceptance rates:
  dtype = np.float32
  num_burnin_iter = 3000
  num_warmup_iter = int(0.8 * num_burnin_iter) 
  num_chain_iter = 2500

  # Set the target average acceptance ratio for the HMC as suggested by
  # Beskos et al. (2013):
  # https://projecteuclid.org/download/pdfview_1/euclid.bj/1383661192
  target_accept_rate = 0.651

  # Initialize the HMC sampler.
  hmc = tfp.mcmc.HamiltonianMonteCarlo(
      target_log_prob_fn=log_lik_fn,
      step_size=0.01,
      num_leapfrog_steps=3)

  ttk = tfp.mcmc.TransformedTransitionKernel(
      inner_kernel=hmc, bijector=unconstrained_to_precision_cholesky)

  # Adapt the step size using standard adaptive MCMC procedure. See Section 4.2
  # of Andrieu and Thoms (2008):
  # http://www4.ncsu.edu/~rsmith/MA797V_S12/Andrieu08_AdaptiveMCMC_Tutorial.pdf
  adapted_kernel = tfp.mcmc.SimpleStepSizeAdaptation(
      inner_kernel=ttk,
      num_adaptation_steps=num_warmup_iter,
      target_accept_prob=target_accept_rate)

  states = tfp.mcmc.sample_chain(
      num_results=num_chain_iter,
      num_burnin_steps=num_burnin_iter,
      current_state=initial_values,
      kernel=adapted_kernel,
      trace_fn=None,
      parallel_iterations=1)
  # transform samples back to their constrained form
  samples = tf.linalg.matmul(states, states, transpose_b=True)
  return samples

precision_samples = sample()

Convergence check

A quick convergence check looks good:

r_hat = tfp.mcmc.potential_scale_reduction(precision_samples).numpy()
print(r_hat)
[[1.0013583 1.0019467]
 [1.0019467 1.0011804]]

Comparing results to the analytic posterior

# The output samples have shape [n_steps, n_chains, 2, 2]
# Flatten them to [n_steps * n_chains, 2, 2] via reshape:
precision_samples_reshaped = np.reshape(precision_samples, newshape=[-1, 2, 2])

And again, the sample means and standard deviations match those of the analytic posterior.

print('True posterior mean:\n', posterior_mean)
print('Mean of samples:\n', np.mean(precision_samples_reshaped, axis=0))
True posterior mean:
 [[ 0.9641779 -1.6534661]
 [-1.6534661  3.8683164]]
Mean of samples:
 [[ 0.9668749 -1.6552604]
 [-1.6552604  3.8676758]]

print('True posterior standard deviation:\n', posterior_sd)
print('Standard deviation of samples:\n', np.std(precision_samples_reshaped, axis=0))
True posterior standard deviation:
 [[0.13435492 0.25050813]
 [0.25050813 0.53903675]]
Standard deviation of samples:
 [[0.13329637 0.24913797]
 [0.24913797 0.53983945]]

Ok, all done! We've got our optimized sampler working.