Code
import os
if "KERAS_BACKEND" not in os.environ:
# set this to "torch", "tensorflow", or "jax"
"KERAS_BACKEND"] = "jax"
os.environ[
import matplotlib.pyplot as plt
import numpy as np
import bayesflow as bf
import keras
import os
if "KERAS_BACKEND" not in os.environ:
# set this to "torch", "tensorflow", or "jax"
"KERAS_BACKEND"] = "jax"
os.environ[
import matplotlib.pyplot as plt
import numpy as np
import bayesflow as bf
import keras
\[\begin{equation} \begin{aligned} c & \sim \text{Uniform}(0, 100) \\ s & \sim \text{Uniform}(0, 100) \\ t & \sim \text{Uniform}(0, 1) \\ \eta_{ij} & \leftarrow \exp\left(-c \lvert \log T_{i} - T_{j} \rvert \right) \\ d_{ij} & \leftarrow \frac{\eta_{ij}}{\sum_k \eta_{ik}} \\ r_{ij} & \leftarrow \frac{1}{1 + \exp\left(-s (d_{ij} - t)\right)} \\ \theta_i & \leftarrow \begin{cases} \min\left(1, \sum_j r_{ij}\right) & \text{if version = 0} \\ 1-\prod_j (1-r_{ij}) & \text{if version = 1} \end{cases}\\ y_i & \sim \text{Binomial}(\theta_i, n) \end{aligned} \end{equation}\]
=40
max_n_wordsdef context(version=None, max_n_words=max_n_words):
if version is None:
= np.random.randint(0, 2)
version return dict(
= np.random.randint(1200, 1521),
n_obs = np.random.randint(10, max_n_words+1),
n_words = np.random.randint(10, 26),
offset = np.random.randint(1, 3),
lag = version
version )
def prior():
return dict(
= np.random.gamma(shape=5, scale=2.5),
c = np.random.gamma(shape=5, scale=2.5),
s = np.random.beta(a=5, b=3)
t )
def calculate_log_T(offset, lag, n_words):
= np.arange(n_words, 0, -1) - 1
x return np.log(offset + lag * x)
def prob_correct(c, s, t, n_words, offset, lag, version):
= calculate_log_T(offset, lag, n_words)
log_T
= np.zeros((n_words, n_words))
eta = np.zeros((n_words, n_words))
d
for i in range(n_words):
for j in range(n_words):
= np.abs(log_T[i] - log_T[j])
abs_diff = np.exp(-c * abs_diff)
eta[i, j] = eta[i,:] / np.sum(eta[i,:])
d[i,:]
= 1 / (1 + np.exp(-s * (d - t)))
r
= np.ones(max_n_words)
theta
for i in range(n_words):
if version < 0.5:
= np.min([1, np.sum(r[i,:])])
theta[i] else:
= 1 - np.prod(1-r[i,:])
theta[i]
return theta
def likelihood(c, s, t, n_obs, n_words, offset, lag, version, max_n_words=max_n_words):
= prob_correct(c, s, t, n_words, offset, lag, version)
theta
= np.zeros(max_n_words)
y = np.zeros(max_n_words)
observed = np.arange(max_n_words)
trial
range(n_words)] = np.random.binomial(n=n_obs, p=theta[range(n_words)], size=n_words)
y[range(n_words)] = 1
observed[
return dict(y=y, observed=observed, trial=trial)
=bf.make_simulator([context, prior, likelihood]) simulator
=simulator.sample(100)
df
for key, value in df.items():
print(key, "shape: ", value.shape)
n_obs shape: (100, 1)
n_words shape: (100, 1)
offset shape: (100, 1)
lag shape: (100, 1)
version shape: (100, 1)
c shape: (100, 1)
s shape: (100, 1)
t shape: (100, 1)
y shape: (100, 40)
observed shape: (100, 40)
trial shape: (100, 40)
We will use the observed successfull trials as variables that will be passed into a time series summary network, as well as the number of observations, number of words, offset, lag, and version of the model directly into the inference network. The inference network will learn to predict the posterior distribution of parameters \(c\), \(s\), and \(t\). Using the adapter, we will also make sure that we respect the natural constraints of the parameters.
= (bf.Adapter()
adapter "c", "s"], lower=0)
.constrain(["t", lower=0, upper=1)
.constrain(=["c", "s", "t"])
.standardize(include"y", "observed", "trial"])
.as_set(["y", "observed", "trial"], into="summary_variables")
.concatenate(["n_obs", "n_words", "offset", "lag", "version"], into="inference_conditions")
.concatenate(["c", "s", "t"], into="inference_variables")
.concatenate([ )
=adapter(simulator.sample(100), stage="inference")
df
for key, value in df.items():
print(key, "shape: ", value.shape)
summary_variables shape: (100, 40, 3)
inference_conditions shape: (100, 5)
inference_variables shape: (100, 3)
=bf.BasicWorkflow(
workflow=simulator,
simulator=adapter,
adapter=bf.networks.CouplingFlow(),
inference_network=bf.networks.DeepSet(),
summary_network=1e-3
initial_learning_rate )
Since the simulation is relatively involved, we will presimulate data and fit the networks in an offline regime to speed up training.
= simulator.sample(20_000) train_data
= simulator.sample(5_000) val_data
=workflow.fit_offline(
history=train_data,
data=val_data,
validation_data=200,
epochs=500) batch_size
= simulator.sample(1000, version=0)
test_data =workflow.plot_default_diagnostics(test_data=test_data, num_samples=500) figs
= np.array([
y 994,806,691,634,634,835,965,1008,1181,1382,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],
[794,640,576,512,486,486,512,512,538,640,742,794,1024,1126,1254,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],
[836,623,562,517,441,471,441,395,350,441,456,486,456,593,608,684,897,1110,1353,1459,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],
[699,410,304,243,258,243,228,213,304,350,395,319,365,410,456,608,958,1170,1277,1474,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],
[576,360,240,228,240,240,228,216,240,240,240,240,228,240,216,204,240,228,240,240,264,252,288,324,444,480,660,912,1080,1188,0,0,0,0,0,0,0,0,0,0],
[384,256,154,154,141,128,154,154,154,128,179,141,128,141,154,179,154,141,179,128,154,128,154,192,179,166,154,218,218,218,256,256,230,307,384,512,691,922,1114,1254]
[
])[...,np.newaxis]
= np.zeros_like(y)
observed != 0)] = 1
observed[np.where(y
= np.arange(max_n_words)[np.newaxis]
trial = np.tile(trial, [6, 1])
trial trial.shape
(6, 40)
= dict(
inference_data = y,
y = observed,
observed = trial,
trial = np.array([1440, 1280, 1520, 1520, 1200, 1280])[...,np.newaxis],
n_obs = np.array([10, 15, 20, 20, 30, 40])[...,np.newaxis],
n_words = np.array([15, 20, 25, 10, 15, 20])[...,np.newaxis],
offset = np.array([2, 2, 2, 1, 1, 1])[...,np.newaxis],
lag = np.zeros(6)[...,np.newaxis]
version )
= workflow.sample(num_samples=2000, conditions=inference_data) posterior_samples
def plot_data(c, s, t, data, max_n_words=max_n_words):
= c.shape
n_datasets, n_samples, _ = np.zeros((n_datasets, n_samples, max_n_words))
y_pred for d in range(n_datasets):
for i in range(n_samples):
= int(data['n_obs'][d, 0])
n_obs = int(data['n_words'][d, 0])
n_words = int(data['offset'][d, 0])
offset = int(data['lag'][d, 0])
lag = int(data['version'][d, 0])
version = prob_correct(c=c[d, i, 0], s=s[d, i, 0], t=t[d, i, 0],
theta =n_words, offset=offset,
n_words=lag, version=version)
lagrange(n_words)] = np.random.binomial(n=n_obs, p=theta[range(n_words)], size=n_words)
y_pred[d, i,
= np.squeeze(data['y'])/data['n_obs']
y = np.mean(y_pred, axis=1)/data['n_obs']
y_hat = np.quantile(y_pred, q = 0.025, axis=1)/data['n_obs']
lower = np.quantile(y_pred, q = 0.975, axis=1)/data['n_obs']
upper
return y, y_hat, lower, upper
= plot_data(**posterior_samples, data=inference_data) y, y_hat, lower, upper
= plt.subplots(2, 3, figsize=(12, 10))
fig, axes = axes.flatten()
axes
for d, ax in enumerate(axes):
0, 1])
ax.set_ylim([0, max_n_words])
ax.set_xlim(["Serial position")
ax.set_xlabel("Probability correct")
ax.set_ylabel(
= int(inference_data['n_obs'][d, 0])
n_obs = int(inference_data['n_words'][d, 0])
n_words = range(n_words)
observed = range(1, n_words+1)
x
=0.3)
ax.fill_between(x, lower[d, observed], upper[d, observed], alpha
ax.plot(x, y_hat[d, observed])
ax.scatter(x, y[d, observed])
fig.tight_layout()