Source code for hi_lasso.hi_lasso

# Author: Jongkwon Jo <jongkwon.jo@gmail.com>
# License: MIT
# Date: 10, Aug 2021

import warnings

warnings.simplefilter(action='ignore', category=FutureWarning)

from . import util, glmnet_model
from concurrent.futures import ProcessPoolExecutor
from scipy.stats import binom
from tqdm import tqdm
import numpy as np
import math

[docs]class HiLasso: """ Hi-LASSO(High-Demensinal LASSO) is to improve the LASSO solutions for extremely high-dimensional data. The main contributions of Hi-LASSO are as following: • Rectifying systematic bias introduced by bootstrapping. • Refining the computation for importance scores. • Providing a statistical strategy to determine the number of bootstrapping. • Taking advantage of global oracle property. • Allowing tests of significance for feature selection with appropriate distribution. Parameters ---------- q1: 'auto' or int, optional [default='auto'] The number of predictors to randomly selecting in Procedure 1. When to set 'auto', use q1 as number of samples. q2: 'auto' or int, optional [default='auto'] The number of predictors to randomly selecting in Procedure 2. When to set 'auto', use q2 as number of samples. L: int [default=30] The expected value at least how many times a predictor is selected in a bootstrapping. alpha: float [default=0.05] significance level used for significance test for feature selection logistic: Boolean [default=False] Whether to apply logistic regression model. For classification problem, Hi-LASSO can apply the logistic regression model. random_state : int or None, optional [default=None] If int, random_state is the seed used by the random number generator; If None, the random number generator is the RandomState instance used by np.random.default_rng n_jobs: 'None' or int, optional [default=1] The number of jobs to run in parallel. If "n_jobs is None" or "n_jobs == 0" could use the number of CPU cores returned by "multiprocessing.cpu_count()" for automatic parallelization across all available cores. Attributes ---------- n : int number of samples. p : int number of predictors. Examples -------- >>> from hi_lasso import HiLasso >>> model = HiLasso(q1='auto', q2='auto', L=30, logistic=False, random_state=None, n_jobs=1) >>> model.fit(X, y, sample_weight=None, significance_level=0.05) >>> model.coef_ >>> model.intercept_ >>> model.p_values_ """ def __init__(self, q1='auto', q2='auto', L=30, alpha=0.05, logistic=False, random_state=None, n_jobs=1): self.q1 = q1 self.q2 = q2 self.L = L self.alpha = alpha self.logistic = logistic self.random_state = random_state self.n_jobs = n_jobs
[docs] def fit(self, X, y, sample_weight=None): """Fit the model with Procedure 1 and Procedure 2. Procedure 1: Compute importance scores for predictors. Procedure 2: Compute coefficients and Select variables. Parameters ---------- X: array-like of shape (n_samples, n_predictors) predictor variables y: array-like of shape (n_samples,) response variables sample_weight : array-like of shape (n_samples,), default=None Optional weight vector for observations. If None, then samples are equally weighted. Attributes ---------- coef_ : array Coefficients of Hi-LASSO. p_values_ : array P-values of each coefficients. intercept_: float Intercept of Hi-LASSO. Returns ------- self : object """ self.X = np.array(X) self.y = np.array(y).ravel() self.n, self.p = X.shape self.q1 = self.n if self.q1 == 'auto' else self.q1 self.q2 = self.n if self.q2 == 'auto' else self.q2 self.sample_weight = np.ones( self.n) if sample_weight is None else np.asarray(sample_weight) self.select_prob = None print('Procedure 1') b1 = self._bootstrapping(mode='procedure1') with warnings.catch_warnings(): warnings.simplefilter("ignore", category=RuntimeWarning) b1_mean = np.nanmean(np.abs(b1), axis=1) importance_score = np.where(b1_mean == 0, 1e-10, b1_mean) # rescaled to sum to number of features. self.select_prob = importance_score / importance_score.sum() self.penalty_weights = 1 / (self.select_prob * 100) print('Procedure 2') b2 = self._bootstrapping(mode='procedure2') with warnings.catch_warnings(): warnings.simplefilter("ignore", category=RuntimeWarning) b2_mean = np.nanmean(b2, axis=1) self.p_values_ = self._compute_p_values(b2) self.coef_ = np.where(self.p_values_ < self.alpha, b2_mean, 0) self.intercept_ = np.average(self.y) - np.average(self.X, axis=0) @ self.coef_ return self
def _bootstrapping(self, mode): """ Apply different methods and q according to 'mode' parameter. """ if mode == 'procedure1': self.q = self.q1 self.method = 'ElasticNet' else: self.q = self.q2 self.method = 'AdaptiveLASSO' self.B = math.floor(self.L * self.p / self.q) if self.n_jobs != 1: with ProcessPoolExecutor(max_workers=self.n_jobs) as executor: results = tqdm(executor.map(self._estimate_coef, np.arange(self.B)), total=self.B) betas = np.array(list(results)).T else: betas = np.zeros((self.p, self.B)) for bootstrap_number in tqdm(np.arange(self.B)): betas[:, bootstrap_number] = self._estimate_coef( bootstrap_number) return betas def _estimate_coef(self, bootstrap_number): """ Estimate coefficients for each bootstrap samples. """ # Initialize beta : p by 1 matrix. beta = np.empty(self.p) # Initialize beta into NANs. beta[:] = np.NaN # Set random seed as each bootstrap_number. rs = np.random.RandomState( bootstrap_number + self.random_state) if self.random_state else np.random.default_rng() # Generate bootstrap index of sample and predictor. bst_sample_idx = rs.choice(np.arange(self.n), size=self.n, replace=True, p=None) bst_predictor_idx = rs.choice(np.arange(self.p), size=self.q, replace=False, p=self.select_prob) # Standardization. X_sc, y_sc, x_std = util.standardization(self.X[bst_sample_idx, :][:, bst_predictor_idx], self.y[bst_sample_idx]) # Estimate coef. if self.method == 'ElasticNet': coef = glmnet_model.ElasticNet(X_sc, y_sc, logistic=self.logistic, sample_weight=self.sample_weight[bst_sample_idx], random_state=rs) else: coef = glmnet_model.AdaptiveLasso(X_sc, y_sc, logistic=self.logistic, sample_weight=self.sample_weight[bst_sample_idx], random_state=rs, adaptive_weights=self.penalty_weights[bst_predictor_idx]) beta[bst_predictor_idx] = coef / x_std return beta def _compute_p_values(self, betas): """ Compute p-values of each predictor for Statistical Test of Variable Selection. """ not_null = ~np.isnan(betas) # d_j: non-zero and notnull of j-th beta d_j = np.logical_and(not_null, betas != 0).sum(axis=1) # pi: the average of the selcetion ratio of all predictor variables in B boostrap samples. pi = d_j.sum() / not_null.sum().sum() return binom.sf(d_j - 1, n=self.B, p=pi)