import cpm
from packaging import version
## cpm checks
print(cpm.__version__)
if version.parse(cpm.__version__) < version.parse("0.22"):
raise ImportError("cpm version must be >= 0.22. Please install the latest version using: pip install --upgrade cpm")
Parameter Recovery (cpm.generators, cpm.models, cpm.optimisation)¶
Evaluating how well the model recovers the original parameters. Briefly, you will sample a random set of model parameters, generate new data, fit the model to the data, and then compare the estimated parameters to the original ones. Specifically, instead of using the participant's choice to select what value to update, you will take the model's probability of choosing each option and use that to sample a choice. The technique we will use below will also involve two new additions to our model: the cpm.minimisation.LogLikelihood.bernoulli and the Bernoulli distribution function to generate data. So, what will we do here?
- Estimate the parameter recovery of the model
- Generate data from the model
- Fit the model to the data
- Compare the estimated parameters to the true parameters
Loss (Negative Log Likelihood) and data-generating function¶
Here, we define the loss function based on negative log likelihood.
\begin{align*} -\log L(\theta \mid Y, M) = -\sum_{i=1}^{N} \log \bigg[ p(y_i \mid \theta) \bigg] \end{align*}
where $\theta$ are model parameters, $Y$ is the data with $N$ number of data points, and $M$ is the model. The $p(y~|~\theta)$ gives the probability of observing the data given a certain parameter set. It follows a Bernoulli distribution and defined such as:
\begin{align*} p(y_t~|~\theta) = \left\{ \begin{array}{ll} P(a_t) & \text{if } y = 1, \text{ and} \\ 1 - P(a_t) & \text{if } y = 0. \end{array} \right. \end{align*}
Note that the loss function is not part of our model specification, but is implemented as cpm.optimistaion.minimisation.LogLikelihood.bernoulli
. If the number of actions participants can take on a given trial is greater than one, we can exchange it for a loss function LogLikelihood.categorical
without the need to modify the implementation of a model.
Specify a generative model¶
We call models that generate realistic data as generative models.
To generate data with the model, we will use the same Bernoulli distribution, with its mean given by the model's probability of choosing each option. The SoftMax function we defined previously will return a probability for each action, and we will sample from this distribution to generate a choice. This is done using the cpm.models.decision.SoftMax.choice()
method.
Modify your model, generate data with random parameters¶
import numpy
import pandas as pd
import cpm
import cpm.datasets as datasets
from cpm.generators import Parameters, Value
import ipyparallel as ipp # for parallel computing with ipython (specific for Jupyter Notebook)
data = datasets.load_bandit_data()
data.head()
parameters = Parameters(
# free parameters are indicated by specifying priors
alpha=Value(
value=0.5,
lower=1e-10,
upper=1,
prior="truncated_normal",
args={"mean": 0.5, "sd": 0.25},
),
temperature=Value(
value=1,
lower=0,
upper=10,
prior="truncated_normal",
args={"mean": 5, "sd": 2.5},
),
# everything without a prior is part of the initial state of the
# model or constructs fixed throughout the simulation
# (e.g. exemplars in general-context models of categorizations)
# initial q-values starting starting from non-zero value
# these are equal to all 4 stimuli (1 / 4)
values = numpy.array([0.25, 0.25, 0.25, 0.25])
)
@ipp.require("numpy")
def model(parameters, trial):
# pull out the parameters
alpha = parameters.alpha
temperature = parameters.temperature
values = numpy.asarray(parameters.values)
# pull out the trial information
stimulus = numpy.array([trial.arm_left, trial.arm_right]).astype(int)
feedback = numpy.array([trial.reward_left, trial.reward_right])
# Equation 1. - get the value of each available action
# Note that because python counts from 0, we need to shift
# the stimulus identifiers by -1
expected_rewards = values[stimulus - 1]
# convert columns to rows
expected_rewards = expected_rewards.reshape(2, 1)
# calculate a policy based on the activations
# Equation 2.
choice_rule = cpm.models.decision.Softmax(
activations=expected_rewards,
temperature=temperature
)
choice_rule.compute() # compute the policy
# if the policy is NaN for an action, then we need to set it to 1
# this corrects some numerical issues with python and infinities
if numpy.isnan(choice_rule.policies).any():
choice_rule.policies[numpy.isnan(choice_rule.policies)] = 1
model_choices = choice_rule.choice() # get the model's choice
# get the received reward for the choice
reward = feedback[model_choices]
teacher = numpy.array([reward])
# we now create a vector that tells our learning rule what...
# ... stimulus to update according to the participant's choice
what_to_update = numpy.zeros(4)
chosen_stimulus = stimulus[model_choices] - 1
what_to_update[chosen_stimulus] = 1
# Equation 4.
update = cpm.models.learning.SeparableRule(
weights=values,
feedback=teacher,
input=what_to_update,
alpha=alpha
)
update.compute()
# Equation 5.
values += update.weights.flatten()
# compile output
output = {
"trial" : trial.trial.astype(int), # trial numbers
"activation" : expected_rewards.flatten(), # expected reward of arms
"policy" : choice_rule.policies, # policies
"reward" : reward, # received reward
"error" : update.weights, # prediction error
"values" : values, # updated values
"response" : model_choices, # model's choice
# dependent variable
"dependent" : numpy.array([choice_rule.policies[1]]),
}
return output
Generate data with random parameters (cpm.generators.Simulator)¶
Her, you need to generate a random set of parameters, equal in number to the number of participants in your data set. Then, you will use the cpm.generators.Simulator
class to generate data for the whole sample.
generative_model = cpm.generators.Wrapper(
model=model,
parameters=parameters,
data=data[data.ppt == 1],
)
In the code below, you will need to fill in the missing parts by sampling from the parameters space. You can use the cpm.generators.Parameters
class to sample parameters from the priors.
number_of_participants = data.ppt.nunique()
parameters_for_the_whole_sample = parameters.sample(size=number_of_participants)
original_parameters = pd.DataFrame(parameters_for_the_whole_sample) ## save the original parameters
original_parameters["ppt"] = data.ppt.unique()
# create a simulator that will run the model for each participant
simulator = cpm.generators.Simulator(
wrapper=generative_model,
data=data.groupby("ppt"),
parameters=parameters_for_the_whole_sample,
)
simulator.run()
# collect the results
results = simulator.export()
results.head()
All that is left from the simulations is to take the model output and create our dependent variable for the fitting. The toolbox requires the data to contain a single column with the dependent variable
, that we are estimating model performance against. In this case, the dependent variable is the choice made by the model, which is a binary variable (0 or 1) indicating whether the model chose option 1 or option 2. We take that from the Simulator
output and create a new column in the data frame.
recovery_data = data.copy() ## copy the original data
recovery_data["observed"] = results["response_0"] ## select the variable of interest
recovery_data["response"] = results["response_0"] ## select the model's response
recovery_data.head() ## print the first few rows of the data to check
ppt | trial | arm_left | arm_right | reward_left | reward_right | response | feedback | observed | |
---|---|---|---|---|---|---|---|---|---|
0 | 1 | 1 | 2 | 4 | 1 | 1 | 1 | 1 | 1 |
1 | 1 | 2 | 2 | 4 | 1 | 1 | 0 | 1 | 0 |
2 | 1 | 3 | 2 | 1 | 0 | 0 | 0 | 0 | 0 |
3 | 1 | 4 | 2 | 1 | 0 | 0 | 0 | 0 | 0 |
4 | 1 | 5 | 1 | 4 | 0 | 0 | 1 | 0 | 1 |
Revert the model to the original version and refit the data¶
So, we need to revert the model to the original version, which means removing the choice
argument from the code we wrote before. This will allow us to fit the model to the data generated as opposed to generating new data.
@ipp.require("numpy")
def model_fitting(parameters, trial):
# pull out the parameters
alpha = parameters.alpha
temperature = parameters.temperature
values = numpy.array(parameters.values)
# pull out the trial information
stimulus = numpy.array([trial.arm_left, trial.arm_right]).astype(int)
feedback = numpy.array([trial.reward_left, trial.reward_right])
human_choice = trial.observed.astype(int)
# Equation 1. - get the value of each available action
# Note that because python counts from 0, we need to shift
# the stimulus identifiers by -1
expected_rewards = values[stimulus - 1]
# convert columns to rows
expected_rewards = expected_rewards.reshape(2, 1)
# calculate a policy based on the activations
# Equation 2.
choice_rule = cpm.models.decision.Softmax(
activations=expected_rewards,
temperature=temperature
)
choice_rule.compute() # compute the policy
# if the policy is NaN for an action, then we need to set it to 1
# this corrects some numerical issues with python and infinities
if numpy.isnan(choice_rule.policies).any():
choice_rule.policies[numpy.isnan(choice_rule.policies)] = 1
# get the received reward for the choice
reward = feedback[human_choice]
teacher = numpy.array([reward])
# we now create a vector that tells our learning rule what...
# ... stimulus to update according to the participant's choice
what_to_update = numpy.zeros(4)
chosen_stimulus = stimulus[human_choice] - 1
what_to_update[chosen_stimulus] = 1
# Equation 4.
update = cpm.models.learning.SeparableRule(
weights=values,
feedback=teacher,
input=what_to_update,
alpha=alpha
)
update.compute()
# Equation 5.
values += update.weights.flatten()
# compile output
output = {
"trial" : trial.trial.astype(int), # trial numbers
"activation" : expected_rewards.flatten(), # expected reward of arms
"policy" : choice_rule.policies, # policies
"reward" : reward, # received reward
"error" : update.weights, # prediction error
"values" : values, # updated values
# dependent variable
"dependent" : numpy.array([choice_rule.policies[1]]),
}
return output
base_model = cpm.generators.Wrapper(
model=model_fitting,
parameters=parameters,
data=recovery_data[recovery_data.ppt == 1],
)
Now that we have everything ready, we can fit the model to the data. We will use the cpm.optimisation.FminBound
class to do that. The fitting process will return a set of estimated parameters for each participant, which we can then compare to the original parameters.
from cpm.optimisation import minimise, FminBound
# Set up the fitting procedure
fit = FminBound(
model=base_model, # Wrapper class with the model we specified from before
data=recovery_data.groupby('ppt'), # the data as a list of dictionaries
minimisation=minimise.LogLikelihood.bernoulli,
parallel=True,
libraries=["numpy", "cpm", "pandas"],
prior=False,
ppt_identifier="ppt",
display=False,
number_of_starts=5,
# everything below is optional and passed directly to the scipy implementation of the optimiser
approx_grad=True
)
fit.optimise()
Starting optimization 1/5 from [0.85482528 3.04383525] Starting 10 engines with <class 'ipyparallel.cluster.launcher.LocalEngineSetLauncher'>
0%| | 0/10 [00:00<?, ?engine/s]
Starting optimization 2/5 from [0.23496486 4.11982498] Starting 10 engines with <class 'ipyparallel.cluster.launcher.LocalEngineSetLauncher'>
0%| | 0/10 [00:00<?, ?engine/s]
Starting optimization 3/5 from [0.82464871 6.3520411 ] Starting 10 engines with <class 'ipyparallel.cluster.launcher.LocalEngineSetLauncher'>
0%| | 0/10 [00:00<?, ?engine/s]
Starting optimization 4/5 from [0.77991782 4.96094758] Starting 10 engines with <class 'ipyparallel.cluster.launcher.LocalEngineSetLauncher'>
0%| | 0/10 [00:00<?, ?engine/s]
Starting optimization 5/5 from [0.71609669 7.102829 ] Starting 10 engines with <class 'ipyparallel.cluster.launcher.LocalEngineSetLauncher'>
0%| | 0/10 [00:00<?, ?engine/s]
Compare the estimated parameters with the original ones¶
recovered_parameters = fit.export()
recovered_parameters.rename(
columns={
"x_0": "alpha",
"x_1": "temperature",
},
inplace=True
)
recovered_parameters.head()
alpha | temperature | grad_0 | grad_1 | task | funcalls | nit | warnflag | hessian_0 | hessian_1 | hessian_2 | hessian_3 | ppt | fun | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 0.524996 | 3.845955 | 0.000001 | 0.000000 | CONVERGENCE: NORM_OF_PROJECTED_GRADIENT_<=_PGTOL | 33 | 8 | 0 | 83.027474 | 2.107967 | 2.107967 | 1.642208 | 1 | 42.071934 |
1 | 0.825243 | 10.000000 | 0.000001 | -0.060952 | CONVERGENCE: NORM_OF_PROJECTED_GRADIENT_<=_PGTOL | 51 | 15 | 0 | 123.930813 | -2.119611 | -2.119611 | 0.069328 | 2 | 30.886401 |
2 | 0.314060 | 6.225487 | -0.000011 | 0.000000 | CONVERGENCE: REL_REDUCTION_OF_F_<=_FACTR*EPSMCH | 30 | 8 | 0 | 216.366675 | 2.143558 | 2.143558 | 0.609612 | 3 | 38.966781 |
3 | 1.000000 | 1.318152 | -0.058925 | 0.000001 | CONVERGENCE: NORM_OF_PROJECTED_GRADIENT_<=_PGTOL | 33 | 8 | 0 | 18.479315 | 6.917438 | 6.917438 | 8.249126 | 4 | 57.768197 |
4 | 0.716764 | 4.661707 | -0.000003 | 0.000004 | CONVERGENCE: NORM_OF_PROJECTED_GRADIENT_<=_PGTOL | 33 | 7 | 0 | 78.660714 | -0.186774 | -0.186774 | 0.862853 | 5 | 40.526508 |
import matplotlib.pyplot as plt
alpha_original = original_parameters["alpha"].values
alpha_recovered = recovered_parameters["alpha"].values
beta_original = original_parameters["temperature"].values
beta_recovered = recovered_parameters["temperature"].values
# Plotting the results
fig, axes = plt.subplots(1, 2, figsize=(12, 6), sharey=False)
## scatterplots
axes[0].scatter(alpha_original, alpha_recovered, alpha=0.5)
## add correlation coefficient
corr_alpha = numpy.corrcoef(alpha_original, alpha_recovered)[0, 1]
axes[0].text(0.5, 0.9, f'Correlation: {corr_alpha:.2f}', transform=axes[0].transAxes, fontsize=12, ha='center', va='center')
axes[0].set_xlabel(r'Original $\alpha$')
axes[0].set_ylabel(r'Recovered $\alpha$')
axes[0].set_title('Learning rate recovery\n')
axes[0].plot([0, 1], [0, 1], 'r--') # Diagonal line for reference
axes[1].scatter(beta_original, beta_recovered, alpha=0.5)
## add correlation coefficient
corr_beta = numpy.corrcoef(beta_original, beta_recovered)[0, 1]
axes[1].text(0.5, 0.9, f'Correlation: {corr_beta:.2f}', transform=axes[1].transAxes, fontsize=12, ha='center', va='center')
axes[1].set_xlabel(r'Original $\beta$')
axes[1].set_ylabel(r'Recovered $\beta$')
axes[1].set_title('Choice stochasticity (inverse temperature) recovery\n')
axes[1].plot([0, 10], [0, 10], 'r--') # Diagonal line for reference
# Adjust layout and show the plot
plt.tight_layout()
Questions¶
- What do you think about the results of the parameter recovery? Are the parameters identifiable?
- Are there any parameters that are not identifiable? If so, why do you think that is the case?