Regression kink design with pymc
models
import arviz as az
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import causalpy as cp
WARNING (pytensor.tensor.blas): Using NumPy C-API based implementation for BLAS functions.
%load_ext autoreload
%autoreload 2
%config InlineBackend.figure_format = 'retina'
seed = 42
rng = np.random.default_rng(seed)
The Regression kink design should be analysed by a piecewise continuous function. That is:
We want a function which can capture the data to the left and to the right of the kink point.
We want a piecewise function with one breakpoint or kink point
The function should be continuous at the kink point
An example of such a function would be a piecewise continuous polynomial:
Where:
\(\beta\)’s are the unknown parameters,
\(x\) is the running variable,
\(t\) is an indicator variable which is \(1\) if \(x \geq k\), and \(0\) otherwise,
\(k\) is the value of \(x\) at the kink point.
We can visualise what these functions look like by plotting a number of them with randomly chosen \(\beta\) coefficients, but all with a kink point at \(0.5\).
def f(x, beta, kink):
return (
beta[0]
+ beta[1] * x
+ beta[2] * x**2
+ beta[3] * (x - kink) * (x >= kink)
+ beta[4] * (x - kink) ** 2 * (x >= kink)
)
def gradient_change(beta, kink, epsilon=0.01):
gradient_left = (f(kink, beta, kink) - f(kink - epsilon, beta, kink)) / epsilon
gradient_right = (f(kink + epsilon, beta, kink) - f(kink, beta, kink)) / epsilon
gradient_change = gradient_right - gradient_left
return gradient_change
x = np.linspace(-1, 1, 1000)
kink = 0.5
cols = 5
fig, ax = plt.subplots(2, cols, sharex=True, sharey=True, figsize=(15, 5))
for col in range(cols):
beta = rng.random(5)
ax[0, col].plot(x, f(x, beta, kink), lw=3)
ax[1, col].plot(x, np.gradient(f(x, beta, kink), x), lw=3)
ax[0, col].set(title=f"Random {col+1}")
ax[1, col].set(xlabel="x")
ax[0, 0].set(ylabel="$y = f(x)$")
ax[1, 0].set(ylabel=r"$\frac{dy}{dx}$");
The idea of regression kink analysis is to fit a suitable function to data and to estimate whether there is a change in the gradient of the function at the kink point.
Below we will generate a number of datasets and run through how to conduct the regression kink analysis. We will use a function to generate simulated datasets with the properties we want.
def generate_data(beta, kink, sigma=0.05, N=50):
if beta is None:
beta = rng.random(5)
x = rng.uniform(-1, 1, N)
y = f(x, beta, kink) + rng.normal(0, sigma, N)
df = pd.DataFrame({"x": x, "y": y, "treated": x >= kink})
return df
Example 1 - continuous piecewise linear function
In this example we’ll stick to a simple continuous piecewise function.
kink = 0.5
# linear function with gradient change of 2 at kink point
beta = [0, -1, 0, 2, 0]
sigma = 0.05
df = generate_data(beta, kink, sigma=sigma)
fig, ax = plt.subplots()
ax.scatter(df["x"], df["y"], alpha=0.5)
ax.axvline(kink, color="red", linestyle="--")
ax.set(title=f"Change in gradient at kink point: {gradient_change(beta, kink):.2f}");
We can use the regular cp.pymc_models.LinearRegression
model and enforce the continuous piecewise nature by cleverly specifying a design matrix via the formula
input.
In this example, setting the formula to "y ~ 1 + x + I((x-0.5)*treated)"
(where the 0.5 is the kink point) equates to the following model:
Note
The change in gradient either side of the kink point is evaluated numerically. The epsilon
parameter determines the distance either side of the kink point that is used in this change in gradient calculation.
result1 = cp.pymc_experiments.RegressionKink(
df,
formula=f"y ~ 1 + x + I((x-{kink})*treated)",
model=cp.pymc_models.LinearRegression(sample_kwargs={"random_seed": seed}),
kink_point=kink,
epsilon=0.1,
)
fig, ax = result1.plot()
Auto-assigning NUTS sampler...
Initializing NUTS using jitter+adapt_diag...
Multiprocess sampling (4 chains in 4 jobs)
NUTS: [beta, sigma]
Sampling 4 chains for 1_000 tune and 1_000 draw iterations (4_000 + 4_000 draws total) took 1 seconds.
Sampling: [beta, sigma, y_hat]
Sampling: [y_hat]
Sampling: [y_hat]
Sampling: [y_hat]
Sampling: [y_hat]
If you want to plot the posterior distribution of the inferred gradient change, you can do it as follows.
ax = az.plot_posterior(result1.gradient_change, ref_val=gradient_change(beta, kink))
ax.set(title="Gradient change");
We know that the correct gradient change is 2, and that we have correctly recovered it as the posterior distribution is centred around 2.
We can also ask for summary information:
result1.summary()
================================Regression Kink=================================
Formula: y ~ 1 + x + I((x-0.5)*treated)
Running variable: x
Kink point on running variable: 0.5
Results:
Change in slope at kink point = 1.93
Model coefficients:
Intercept -0.01, 94% HDI [-0.02, 0.01]
x -0.99, 94% HDI [-1.01, -0.96]
I((x - 0.5) * treated) 1.93, 94% HDI [1.77, 2.10]
sigma 0.04, 94% HDI [0.03, 0.05]
Example 2 - continuous piecewise polynomial function
Now we’ll introduce some nonlinearity into the mix.
In this example, we’re going to have a 2nd order polynomial on either side of the kink point. So the model can be defined as:
While it’s a bit verbose, we can implement this in a patsy formula as so:
y ~ 1 + x + np.power(x, 2) + I((x-0.5)*treated) + I(np.power((x-0.5), 2)*treated)
where the 0.5 is the kink point.
kink = 0.5
# quadratic function going from 1*x^2 on the left of the kink point, to 1*x^2 -
# 1*(x-kink) - 5*(x-kink)^2 to the right of the kink point
beta = [0, 0, 1, -1, -5]
df = generate_data(beta, kink, N=200, sigma=sigma)
fig, ax = plt.subplots()
ax.scatter(df["x"], df["y"], alpha=0.5)
ax.axvline(kink, color="red", linestyle="--")
ax.set(title=f"Change in gradient at kink point: {gradient_change(beta, kink):.2f}");
formula = f"y ~ 1 + x + np.power(x, 2) + I((x-{kink})*treated) + I(np.power(x-{kink}, 2)*treated)"
result2 = cp.pymc_experiments.RegressionKink(
df,
formula=formula,
model=cp.pymc_models.LinearRegression(sample_kwargs={"random_seed": seed}),
kink_point=kink,
epsilon=0.01,
)
fig, ax = result2.plot()
Auto-assigning NUTS sampler...
Initializing NUTS using jitter+adapt_diag...
Multiprocess sampling (4 chains in 4 jobs)
NUTS: [beta, sigma]
Sampling 4 chains for 1_000 tune and 1_000 draw iterations (4_000 + 4_000 draws total) took 3 seconds.
Sampling: [beta, sigma, y_hat]
Sampling: [y_hat]
Sampling: [y_hat]
Sampling: [y_hat]
Sampling: [y_hat]
We can also evaluate the posterior distribution of the parameters and see how they match up with the true values.
Finally, we can see that we have also done a good job at recovering the true gradient change.
Example 3 - basis spline model
As a final example to demonstrate that we need not be constrained to polynomial functions, we can use a basis spline model. This takes advantage of the capability of patsy
to generate design matricies with basis splines. Note that we will use the same simulated dataset as the previous example.
result3 = cp.pymc_experiments.RegressionKink(
df,
formula=f"y ~ 1 + bs(x, df=3) + bs(I(x-{kink})*treated, df=3)",
model=cp.pymc_models.LinearRegression(sample_kwargs={"random_seed": seed}),
kink_point=kink,
)
result3.plot();
Auto-assigning NUTS sampler...
Initializing NUTS using jitter+adapt_diag...
Multiprocess sampling (4 chains in 4 jobs)
NUTS: [beta, sigma]
Sampling 4 chains for 1_000 tune and 1_000 draw iterations (4_000 + 4_000 draws total) took 4 seconds.
Sampling: [beta, sigma, y_hat]
Sampling: [y_hat]
Sampling: [y_hat]
Sampling: [y_hat]
Sampling: [y_hat]
Using a bandwidth
If we don’t want to fit on all the data available, we can use the bandwidth
kwarg. This will only fit the model to data within a certain bandwidth of the kink point. If \(x\) is the running variable, then the model will only be fitted to data where \(threshold - bandwidth \le x \le threshold + bandwidth\).
formula = f"y ~ 1 + x + np.power(x, 2) + I((x-{kink})*treated) + I(np.power(x-{kink}, 2)*treated)"
result4 = cp.pymc_experiments.RegressionKink(
df,
formula=formula,
model=cp.pymc_models.LinearRegression(sample_kwargs={"random_seed": seed}),
kink_point=kink,
epsilon=0.01,
bandwidth=0.4,
)
fig, ax = result4.plot()
Auto-assigning NUTS sampler...
Initializing NUTS using jitter+adapt_diag...
Multiprocess sampling (4 chains in 4 jobs)
NUTS: [beta, sigma]
Sampling 4 chains for 1_000 tune and 1_000 draw iterations (4_000 + 4_000 draws total) took 12 seconds.
Sampling: [beta, sigma, y_hat]
Sampling: [y_hat]
Sampling: [y_hat]
Sampling: [y_hat]
Sampling: [y_hat]
Notice now that the data outside of the bandwidth is ignored in the fit.