"""
This module performs the simulations of the MSPE and its decomposition into
squared-bias, variance and noise, for the Bagging Algorithm as described in the
paper:
In all simulations we use the following procedure:
1. Generate a test sample, without error term, according to the data
generating processes of interest. This will be constant for the whole
simulation study. All predictions will be made on this sample.
2. For each simulation iteration we follow this procedure
a. Draw new error terms for the test sample.
b. Draw a new training sample with regressor and error terms.
c. Fit the predictor (Tree, Bagging, Subagging) to the generated training
data.
d. Using this new predictor make a prediction into the fixed test sample
and save the predicted values.
3. We compute the MSPE, squared bias and variance for the given predictor at
the input point x_matrix = x0 with x0 being the test sample generated in (i).
Within one class instance we define all parameters that are relevant to the
data generating process (DGP) and the simulation set-up. Parameters that are
specific to the Bagging Algorithm are defined in the functions. The idea is to
define one class instance and then loop over different bagging parameters for
the same instance, keeping the DGP and the simulation set-up constant.
The function calc_mse() computes the MSPE and its decomposition for one
specification and the calc_mse_all_ratios() for a range of subsampling ratios.
"""
import warnings
import numpy as np
from src.model_code.baggingtree import BaggingTree
from src.model_code.datasimulation import DataSimulation
[docs]class MonteCarloSimulation:
"""
A class that implements a Monte Carlo Simulation for the Bagging Algorithm.
Parameters
----------
random_seeds: tuple or list of size 4 consisting of int or None, optional (Default: (None, None, None, None))
Specify the random seeds that will be used for the simulation study.
We have to use different random seeds, as we define different
RandomState instances for each part of the simulation.
random_seeds[0]: Defines the RandomState for the noise term draw
random_seeds[1]: Defines the RandomState for the BaggingTree class
random_seeds[2]: Defines the RandomState for the training sample draws
random_seeds[3]: Defines the RandomState for the test sample draw
All random seeds need to be different from one another and specified
as a usual random seed as it is deployed to numpy.random.
One random_seed is used to specify the RandomState for numpy.random.
It is shared across all functions of the class.
IMPORTANT: This random seed is fixed for a specific instance, as it
specifies a new RandomState for all numpy functions used in this class.
As a result this random_seed is *not* overwritten by numpy random seeds
that are defined outside of specific class instance. The reason for
this is that it makes reproducibility easier across different simulations
and modules. Note however that the downside is, that we have to specify
for each class (each instance) a different random seed and it is not
possible to specify one random seed at the beginning of the whole
simulation, as this will define the RandomState within each class.
For further information on this see in :ref:`design_choices`.
noise: int, float, optional (Default=1.0)
The standard deviation of the error term that is used for the data
generating processes.
The default of *noise* = 1.0 indicates that we draw an error term
that is standard normally distributed.
Needs to be greater than zero.
Note that we cannot draw *noise=0* as this would not be inline with the
simulation setup that we have chosen.
n_test_train: tuple or list of size 2 with int, optional(Default= (500, 500))
Specify the sample size of the test sample and the training samples.
n_test_train[0]: Defines the size for the test sample
n_test_train[1]: Defines the size for the training samples
Both need to be greater than zero.
data_process: string, optional (Default='friedman')
Defines which data generating process we use.
The options are 'friedman', 'linear' and 'indicator'.
n_repeat: int, optional (Default=100)
The number of Monte Carlo repetitions to use for the simulation.
Needs to be greater than zero.
"""
def __init__(self,
n_repeat=100,
noise=1.0,
data_process='friedman',
n_test_train=(500, 500),
random_seeds=(None, None, None, None)):
# The order mattes in the following.
self._set_noise(noise)
self._set_n_repeat(n_repeat)
self._set_random_seeds(random_seeds)
self._set_n_train_test(n_test_train)
self._set_data_process(data_process)
def _set_n_repeat(self, n_repeat):
""" A function to check if *n_repeat* is specified correctly."""
assert np.issubdtype(type(n_repeat), np.integer) and n_repeat > 0, \
('*b_iterations* need to be an integer greater than zero.'
' You provided b_iteartions={}, which is of type {}.'
''.format(n_repeat, type(n_repeat)))
self.n_repeat = n_repeat
def _set_noise(self, noise):
""" A function to check if *noise* is specified correctly."""
assert np.issubdtype(type(noise), np.floating) and noise > 0, \
('*noise* needs to be of type integer or float and greater or equal'
' to zero. You provided noise={}, which is of type {}.'
''.format(noise, type(noise)))
self.noise = noise
def _set_random_seeds(self, random_seeds):
""" A function to check if the tuple/list *n_test_train* is specified correctly."""
assert isinstance(random_seeds, (list, tuple)), \
(' *random_seeds* is not specified correctly. It must be of type'
' *list* or *tuple*. The object you have provided is of type'
' {}'.format(type(random_seeds)))
assert all((isinstance(seed, int) or seed is None) for seed in random_seeds), \
('All seeds specified in *random_seeds* must be either None or a'
' positive integer.')
assert len(random_seeds) == 4, \
('*random_seeds* must be of length 4 for the different'
' random seeds. The list/tuple provided'
' is of length {}'.format(len(random_seeds)))
# We want the random seeds to be different unless they are None.
if len(set(random_seeds)) != 4:
if (random_seeds.count(None) + len(set(random_seeds))) != 5:
raise AssertionError('Some random seeds are the same. '
'Make sure to choose 4 random '
' seeds that are different from '
' one another.')
if random_seeds.count(None) != 0:
warnings.warn('At least one of your random seeds is defined as '
'*None*. Note that the results will not be '
'deterministic as usually desired.')
# We define the random states. For further details on why we do this, see
# in the documentation.
self.random_seed_noise = random_seeds[0]
self.random_seed_fit = random_seeds[1]
self.random_seed_train = random_seeds[2]
self.random_seed_test = random_seeds[3]
def _set_n_train_test(self, n_test_train):
""" A function to check if the tuple *n_test_train* is specified correctly."""
assert isinstance(n_test_train, (list, tuple)), \
(' *n_test_train* is not specified correctly. It must be of type'
' *list* or *tuple*. The object you have provided is of type'
' {}'.format(type(n_test_train)))
assert len(n_test_train) == 2, (
'*n_test_train* must be of length 2 with the first(second) element'
' being the test(training) sample size. The list/tuple provided'
' is of lenngth {}'.format(len(n_test_train)))
assert (np.issubdtype(type(n_test_train[0]), np.integer) and
np.issubdtype(type(n_test_train[1]), np.integer)), \
('The size of the training and the test sample '
'*n_train_test* has to be a positive integer.')
assert n_test_train[0] > 0 and n_test_train[1] > 0, \
('The size of the training and the test sample '
'*n_train_test* has to be a positive integer.')
self.n_test = n_test_train[0]
self.n_train = n_test_train[1]
def _set_data_process(self, data_process):
""" A function to check if *data_process* is specified correctly.
Furthermore it creates the test sample consisting of *x_matrix_test* and
*f_vector_test*(dependent variable without error) according to the given
data generating process.
"""
# Check if the data_process is correct
assert data_process in ('friedman', 'linear', 'indicator'), \
('You didnt use a supported data generating process. '
'Choose among "friedman", "linear" and "indicator". '
'You provided "{}"'.format(data_process))
# Set the data process as a class variable
self.data_process = data_process
# Create one x_matrix_test and f_vector_test that will be same for all following
# simulation steps. This is important as we want to make for each
# step a prediction on the same sample.
self.test_simulation = DataSimulation(
n_size=self.n_test,
noise=self.noise,
without_error=True,
random_seed=self.random_seed_test)
# Create test sample according to the given data generating process.
if data_process == 'friedman':
self.x_matrix_test, self.f_vector_test = self.test_simulation.friedman_1_model()
elif data_process == 'indicator':
self.x_matrix_test, self.f_vector_test = self.test_simulation.indicator_model()
elif data_process == 'linear':
self.x_matrix_test, self.f_vector_test = self.test_simulation.linear_model()
def _set_train_function(self):
""" A function to define a new *DataSimulation* instance and to define
the function *draw_train*, which will be used to draw the training
samples in each iteration.
Note that we do that here and not when initializing the class as we
want to draw the same sequence of training samples for all subagging
iterations.
Hence it would **not** be feasible to define *draw_train* as a class
function.
"""
# We define the basis for drawing the training samples.
train_simulation = DataSimulation(n_size=self.n_train,
noise=self.noise,
without_error=False,
random_seed=self.random_seed_train)
# Assign to the variable *draw_train* the according function of the
# DataSimulation class/train_instance instance.
# This is mainly for ease of execution and notation.
if self.data_process == 'friedman':
draw_train = train_simulation.friedman_1_model
elif self.data_process == 'indicator':
draw_train = train_simulation.indicator_model
elif self.data_process == 'linear':
draw_train = train_simulation.linear_model
return draw_train
[docs] def calc_mse(
self,
ratio=1.0,
bootstrap=True,
min_split_tree=2,
b_iterations=50):
"""
A function to simulate he MSPE decomposition for one specific
specification of the Bagging Algorithm applied to Regression Trees.
The simulation set up and the data generating process is given by the
respective class instance. We want to compare the output of this
function with respect to variations in the Bagging parameters.
Returns a numpy array of size 4 with the MSPE decomposition:
- array[0]: Simulated MSPE
- array[1]: Simulated squared bias
- array[2]: Simulated variance
- array[3]: Simulated noise
Parameters
----------
ratio: float, optional (Default=1.0)
The sample size used for the simulation procedure. Each sample we
draw for the Bagging Algorithm will be of size
math.ceil(n_observations * self.ratio).
min_split_tree: int, optional (Default=2)
The minimal number of observations within a terminal node of the
Regression Trees to be considered for a split that are used in the
simulation. Use this to control for the complexity of the
Regression Tree.
Must be greater than 2.
b_iterations: int, optional (Default=50)
The number of bootstrap iterations used to construct the
bagging/subagging predictor in the simulation.
bootstrap: bool, optional(Default=True)
Specify if the you use the standard bootstrap (Bagging) or
m out of n bootstrap (Subagging).
Default=True implies that we use Bagging.
"""
# Create the instance of the bagging algorithm class, with the given
# parameters, that will be used for the rest of the simulation run.
bagging_instance = BaggingTree(random_seed=self.random_seed_fit,
ratio=ratio, bootstrap=bootstrap,
b_iterations=b_iterations,
min_split_tree=min_split_tree)
# To make results comparable and to get a smooth plot (we have
# to limit *n_repeat* due to computation reasons), we create a
# RandomState container for numpy to draw the noise terms. For further
# information on why we do this, see in the documentation
# x_matrix-x_matrix.
random_state_noise = np.random.RandomState(self.random_seed_noise)
# Define the function that sets the training function, which draws the
# training samples. Read in the documentation, why we do this here and
# not on a class level.
draw_train = self._set_train_function()
# Create arrays to save prediction results, squared-error and simulated
# y_vector_test. Note that we only save test samples as we also want to compute
# the noise.
predictions = np.ones((self.n_test, self.n_repeat)) * np.nan
simulated_y_test_all = np.ones((self.n_test, self.n_repeat)) * np.nan
y_se_all = np.ones((self.n_repeat, self.n_test)) * np.nan
# Perform the main simulation. Further explanation on this can be found
# in the paper.
for i in range(self.n_repeat):
# Draw a new error term for the given *f_vector_test*.
y_vector_test = (
self.f_vector_test + random_state_noise.normal(0, self.noise, self.n_test)
)
# Draw a new training set.
x_matrix_train, y_vector_train = draw_train()
# Save y_vector_test for the simulation run.
simulated_y_test_all[:, i] = y_vector_test
# Use the *bagging_instance* to estimate bagging given new training
# sample.
fitted_bagging = bagging_instance.fit(x_matrix_train, y_vector_train)
# Make a prediction on the test sample and save the squared-error.
predictions[:, i] = fitted_bagging.predict(self.x_matrix_test)
y_se_all[i, :] = (y_vector_test - predictions[:, i]) ** 2
# Compute the simulated expected squared-error, squared-bias,variance
# and noise for each observation.
y_mse = y_se_all.sum(axis=0) / self.n_repeat
y_bias = (self.f_vector_test - predictions.mean(axis=1)) ** 2
y_var = np.var(predictions, axis=1)
y_noise = np.var(simulated_y_test_all, axis=1)
# Average over all test observation and save to results to numpy array.
output = np.array([np.mean(y_mse),
np.mean(y_bias),
np.mean(y_var),
np.mean(y_noise)])
return output
[docs] def calc_mse_all_ratios(
self,
n_ratios=10,
min_ratio=0.1,
max_ratio=1.0,
min_split_tree=2,
b_iterations=50):
"""
A function to simulate he MSPE decomposition for a range of
subsampling fractions for one specification of the Subagging Algorithm
applied to Regression Trees. The simulation set up and the data
generating process is given by the respective class instance. We want
to compare the output of this function with respect to variations in
the Bagging parameters and the variation between the subsampling
fractions. Note that by default we use subsampling instead of the
standard bootstrap.
The range of subsampling ratios is created by
np.linspace(*min_ratio*, *max_ratio*, *n_ratios*).
Returns a numpy array of shape = [n_ratios, 4] with the MSPE
decomposition for the *n_ratios* different subsampling
Returns a numpy array with:
- array[:,0]: Simulated MSPE for all subsampling ratios
- array[:,1]: Simulated squared bias for all subsampling ratios
- array[:,2]: Simulated variance for all subsampling ratios
- array[:,3]: Simulated noise for all subsampling ratios
Parameters
----------
n_ratios: int, optional (Default=10)
The number of subsampling fractions we want to consider for the
simulation.
Needs to be greater than 1.
min_ratio: float, optional (Default=0.1)
The minimal subsampling fraction to be considered.
Needs to be between zero and one and smaller than *max_ratio*.
max_ratio: float, optional (Default=1.0)
The maximal subsampling fraction to be considered.
Needs to be between zero and one and larger than *min_ratio*.
min_split_tree: int, optional (Default=2)
The minimal number of observations within a terminal node of the
Regression Trees to be considered for a split that are used in the
simulation.
Use this to control for the complexity of the Regression Tree.
Must be greater than 2.
b_iterations: int, optional (Default=50)
The number of bootstrap iterations used to construct the subagging
predictor in the simulation.
Must be greater than zero.
"""
# Check inputs, that are not checked used methods anyways, first.
self._check_calc_mse_all_ratios(n_ratios, min_ratio, max_ratio)
# The array must be of length four: MSPE, Bias, Variance, Noise
array_length = 4
# Create a range of subsampling ratios.
ratio_range = np.linspace(min_ratio, max_ratio, n_ratios)
# Create an array to save simulation for each ratio.
output_array_subagging = np.ones((n_ratios, array_length)) * np.nan
# We loop over all ratios and save the results to an array.
for index, ratio in enumerate(ratio_range):
output_array_subagging[index, :] = (
self.calc_mse(
ratio=ratio,
bootstrap=False,
min_split_tree=min_split_tree,
b_iterations=b_iterations
)
)
return output_array_subagging
@staticmethod
def _check_calc_mse_all_ratios(n_ratios, min_ratio, max_ratio):
""" A static function to check if *calc_mse_all_ratios()* is specified
correctly.
As it is static, we used the ``staticmethod`` decorator and
dropped *self* from the attributes.
Note that it does not check values that are checked by used methods
anyways.
"""
assert np.issubdtype(type(n_ratios), np.integer) and n_ratios > 1, \
('*n_ratios* need to be an integer greater than one. '
'You provided n_ratios={}, which is of type'
' {}'.format(n_ratios, type(n_ratios)))
assert np.issubdtype(type(min_ratio), np.floating) and min_ratio >= 0, \
('*min_ratio* need to be an float greater than zero. '
'You provided min_ratio={}, which is of type'
' {}'.format(min_ratio, type(min_ratio)))
assert np.issubdtype(type(max_ratio), np.floating) and max_ratio <= 1, \
('*max_ratio* need to be an float greater than zero. '
'You provided max_ratio={}, which is of type'
' {}'.format(max_ratio, type(max_ratio)))
assert min_ratio < max_ratio, \
'*min_ratio* needs to be smaler than *max_ratio*.'