Package mvpa :: Package tests :: Module test_rfe
[hide private]
[frames] | no frames]

Source Code for Module mvpa.tests.test_rfe

  1  # emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*- 
  2  # vi: set ft=python sts=4 ts=4 sw=4 et: 
  3  ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ## 
  4  # 
  5  #   See COPYING file distributed along with the PyMVPA package for the 
  6  #   copyright and license terms. 
  7  # 
  8  ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ## 
  9  """Unit tests for PyMVPA recursive feature elimination""" 
 10   
 11  from sets import Set 
 12   
 13  from mvpa.datasets.splitters import NFoldSplitter 
 14  from mvpa.algorithms.cvtranserror import CrossValidatedTransferError 
 15  from mvpa.datasets.masked import MaskedDataset 
 16  from mvpa.measures.base import FeaturewiseDatasetMeasure 
 17  from mvpa.featsel.rfe import RFE 
 18  from mvpa.featsel.base import \ 
 19       SensitivityBasedFeatureSelection, \ 
 20       FeatureSelectionPipeline 
 21  from mvpa.featsel.helpers import \ 
 22       NBackHistoryStopCrit, FractionTailSelector, FixedErrorThresholdStopCrit, \ 
 23       MultiStopCrit, NStepsStopCrit, \ 
 24       FixedNElementTailSelector, BestDetector, RangeElementSelector 
 25   
 26  from mvpa.clfs.meta import FeatureSelectionClassifier, SplitClassifier 
 27  from mvpa.clfs.transerror import TransferError 
 28  from mvpa.misc.transformers import Absolute 
 29   
 30  from mvpa.misc.state import UnknownStateError 
 31   
 32  from tests_warehouse import * 
 33  from tests_warehouse_clfs import * 
 34   
35 -class SillySensitivityAnalyzer(FeaturewiseDatasetMeasure):
36 """Simple one which just returns xrange[-N/2, N/2], where N is the 37 number of features 38 """ 39
40 - def __init__(self, mult=1, **kwargs):
41 FeaturewiseDatasetMeasure.__init__(self, **kwargs) 42 self.__mult = mult
43
44 - def __call__(self, dataset):
45 """Train linear SVM on `dataset` and extract weights from classifier. 46 """ 47 return( self.__mult *( N.arange(dataset.nfeatures) - int(dataset.nfeatures/2) ))
48 49
50 -class RFETests(unittest.TestCase):
51
52 - def getData(self):
53 return datasets['uni2medium_train']
54
55 - def getDataT(self):
56 return datasets['uni2medium_test']
57 58
59 - def testBestDetector(self):
60 bd = BestDetector() 61 62 # for empty history -- no best 63 self.failUnless(bd([]) == False) 64 # we got the best if we have just 1 65 self.failUnless(bd([1]) == True) 66 # we got the best if we have the last minimal 67 self.failUnless(bd([1, 0.9, 0.8]) == True) 68 69 # test for alternative func 70 bd = BestDetector(func=max) 71 self.failUnless(bd([0.8, 0.9, 1.0]) == True) 72 self.failUnless(bd([0.8, 0.9, 1.0]+[0.9]*9) == False) 73 self.failUnless(bd([0.8, 0.9, 1.0]+[0.9]*10) == False) 74 75 # test to detect earliest and latest minimum 76 bd = BestDetector(lastminimum=True) 77 self.failUnless(bd([3, 2, 1, 1, 1, 2, 1]) == True) 78 bd = BestDetector() 79 self.failUnless(bd([3, 2, 1, 1, 1, 2, 1]) == False)
80 81
82 - def testNBackHistoryStopCrit(self):
83 """Test stopping criterion""" 84 stopcrit = NBackHistoryStopCrit() 85 # for empty history -- no best but just go 86 self.failUnless(stopcrit([]) == False) 87 # should not stop if we got 10 more after minimal 88 self.failUnless(stopcrit( 89 [1, 0.9, 0.8]+[0.9]*(stopcrit.steps-1)) == False) 90 # should stop if we got 10 more after minimal 91 self.failUnless(stopcrit( 92 [1, 0.9, 0.8]+[0.9]*stopcrit.steps) == True) 93 94 # test for alternative func 95 stopcrit = NBackHistoryStopCrit(BestDetector(func=max)) 96 self.failUnless(stopcrit([0.8, 0.9, 1.0]+[0.9]*9) == False) 97 self.failUnless(stopcrit([0.8, 0.9, 1.0]+[0.9]*10) == True) 98 99 # test to detect earliest and latest minimum 100 stopcrit = NBackHistoryStopCrit(BestDetector(lastminimum=True)) 101 self.failUnless(stopcrit([3, 2, 1, 1, 1, 2, 1]) == False) 102 stopcrit = NBackHistoryStopCrit(steps=4) 103 self.failUnless(stopcrit([3, 2, 1, 1, 1, 2, 1]) == True)
104 105
107 """Test stopping criterion""" 108 stopcrit = FixedErrorThresholdStopCrit(0.5) 109 110 self.failUnless(stopcrit([]) == False) 111 self.failUnless(stopcrit([0.8, 0.9, 0.5]) == False) 112 self.failUnless(stopcrit([0.8, 0.9, 0.4]) == True) 113 # only last error has to be below to stop 114 self.failUnless(stopcrit([0.8, 0.4, 0.6]) == False)
115 116
117 - def testNStepsStopCrit(self):
118 """Test stopping criterion""" 119 stopcrit = NStepsStopCrit(2) 120 121 self.failUnless(stopcrit([]) == False) 122 self.failUnless(stopcrit([0.8, 0.9]) == True) 123 self.failUnless(stopcrit([0.8]) == False)
124 125
126 - def testMultiStopCrit(self):
127 """Test multiple stop criteria""" 128 stopcrit = MultiStopCrit([FixedErrorThresholdStopCrit(0.5), 129 NBackHistoryStopCrit(steps=4)]) 130 131 # default 'or' mode 132 # nback triggers 133 self.failUnless(stopcrit([1, 0.9, 0.8]+[0.9]*4) == True) 134 # threshold triggers 135 self.failUnless(stopcrit([1, 0.9, 0.2]) == True) 136 137 # alternative 'and' mode 138 stopcrit = MultiStopCrit([FixedErrorThresholdStopCrit(0.5), 139 NBackHistoryStopCrit(steps=4)], 140 mode = 'and') 141 # nback triggers not 142 self.failUnless(stopcrit([1, 0.9, 0.8]+[0.9]*4) == False) 143 # threshold triggers not 144 self.failUnless(stopcrit([1, 0.9, 0.2]) == False) 145 # only both satisfy 146 self.failUnless(stopcrit([1, 0.9, 0.4]+[0.4]*4) == True)
147 148
149 - def testFeatureSelector(self):
150 """Test feature selector""" 151 # remove 10% weekest 152 selector = FractionTailSelector(0.1) 153 data = N.array([3.5, 10, 7, 5, -0.4, 0, 0, 2, 10, 9]) 154 # == rank [4, 5, 6, 7, 0, 3, 2, 9, 1, 8] 155 target10 = N.array([0, 1, 2, 3, 5, 6, 7, 8, 9]) 156 target30 = N.array([0, 1, 2, 3, 7, 8, 9]) 157 158 self.failUnlessRaises(UnknownStateError, 159 selector.__getattribute__, 'ndiscarded') 160 self.failUnless((selector(data) == target10).all()) 161 selector.felements = 0.30 # discard 30% 162 self.failUnless(selector.felements == 0.3) 163 self.failUnless((selector(data) == target30).all()) 164 self.failUnless(selector.ndiscarded == 3) # se 3 were discarded 165 166 selector = FixedNElementTailSelector(1) 167 # 0 1 2 3 4 5 6 7 8 9 168 data = N.array([3.5, 10, 7, 5, -0.4, 0, 0, 2, 10, 9]) 169 self.failUnless((selector(data) == target10).all()) 170 171 selector.nelements = 3 172 self.failUnless(selector.nelements == 3) 173 self.failUnless((selector(data) == target30).all()) 174 self.failUnless(selector.ndiscarded == 3) 175 176 # test range selector 177 # simple range 'above' 178 self.failUnless((RangeElementSelector(lower=0)(data) == \ 179 N.array([0,1,2,3,7,8,9])).all()) 180 181 self.failUnless((RangeElementSelector(lower=0, 182 inclusive=True)(data) == \ 183 N.array([0,1,2,3,5,6,7,8,9])).all()) 184 185 self.failUnless((RangeElementSelector(lower=0, mode='discard', 186 inclusive=True)(data) == \ 187 N.array([4])).all()) 188 189 # simple range 'below' 190 self.failUnless((RangeElementSelector(upper=2)(data) == \ 191 N.array([4,5,6])).all()) 192 193 self.failUnless((RangeElementSelector(upper=2, 194 inclusive=True)(data) == \ 195 N.array([4,5,6,7])).all()) 196 197 self.failUnless((RangeElementSelector(upper=2, mode='discard', 198 inclusive=True)(data) == \ 199 N.array([0,1,2,3,8,9])).all()) 200 201 202 # ranges 203 self.failUnless((RangeElementSelector(lower=2, upper=9)(data) == \ 204 N.array([0,2,3])).all()) 205 206 self.failUnless((RangeElementSelector(lower=2, upper=9, 207 inclusive=True)(data) == \ 208 N.array([0,2,3,7,9])).all()) 209 210 self.failUnless((RangeElementSelector(upper=2, lower=9, mode='discard', 211 inclusive=True)(data) == 212 RangeElementSelector(lower=2, upper=9, 213 inclusive=False)(data)).all()) 214 215 # non-0 elements -- should be equivalent to N.nonzero()[0] 216 self.failUnless((RangeElementSelector()(data) == \ 217 N.nonzero(data)[0]).all())
218 219 220 @sweepargs(clf=clfswh['has_sensitivity', '!meta'])
222 223 # sensitivity analyser and transfer error quantifier use the SAME clf! 224 sens_ana = clf.getSensitivityAnalyzer() 225 226 # of features to remove 227 Nremove = 2 228 229 # because the clf is already trained when computing the sensitivity 230 # map, prevent retraining for transfer error calculation 231 # Use absolute of the svm weights as sensitivity 232 fe = SensitivityBasedFeatureSelection(sens_ana, 233 feature_selector=FixedNElementTailSelector(2), 234 enable_states=["sensitivity", "selected_ids"]) 235 236 wdata = self.getData() 237 wdata_nfeatures = wdata.nfeatures 238 tdata = self.getDataT() 239 tdata_nfeatures = tdata.nfeatures 240 241 sdata, stdata = fe(wdata, tdata) 242 243 # fail if orig datasets are changed 244 self.failUnless(wdata.nfeatures == wdata_nfeatures) 245 self.failUnless(tdata.nfeatures == tdata_nfeatures) 246 247 # silly check if nfeatures got a single one removed 248 self.failUnlessEqual(wdata.nfeatures, sdata.nfeatures+Nremove, 249 msg="We had to remove just a single feature") 250 251 self.failUnlessEqual(tdata.nfeatures, stdata.nfeatures+Nremove, 252 msg="We had to remove just a single feature in testing as well") 253 254 self.failUnlessEqual(len(fe.sensitivity), wdata_nfeatures, 255 msg="Sensitivity have to have # of features equal to original") 256 257 self.failUnlessEqual(len(fe.selected_ids), sdata.nfeatures, 258 msg="# of selected features must be equal the one in the result dataset")
259 260
262 sens_ana = SillySensitivityAnalyzer() 263 264 wdata = self.getData() 265 wdata_nfeatures = wdata.nfeatures 266 tdata = self.getDataT() 267 tdata_nfeatures = tdata.nfeatures 268 269 # test silly one first ;-) 270 self.failUnlessEqual(sens_ana(wdata)[0], -int(wdata_nfeatures/2)) 271 272 # OLD: first remove 25% == 6, and then 4, total removing 10 273 # NOW: test should be independent of the numerical number of features 274 feature_selections = [SensitivityBasedFeatureSelection( 275 sens_ana, 276 FractionTailSelector(0.25)), 277 SensitivityBasedFeatureSelection( 278 sens_ana, 279 FixedNElementTailSelector(4)) 280 ] 281 282 # create a FeatureSelection pipeline 283 feat_sel_pipeline = FeatureSelectionPipeline( 284 feature_selections=feature_selections, 285 enable_states=['nfeatures', 'selected_ids']) 286 287 sdata, stdata = feat_sel_pipeline(wdata, tdata) 288 289 self.failUnlessEqual(len(feat_sel_pipeline.feature_selections), 290 len(feature_selections), 291 msg="Test the property feature_selections") 292 293 desired_nfeatures = int(N.ceil(wdata_nfeatures*0.75)) 294 self.failUnlessEqual(feat_sel_pipeline.nfeatures, 295 [wdata_nfeatures, desired_nfeatures], 296 msg="Test if nfeatures get assigned properly." 297 " Got %s!=%s" % (feat_sel_pipeline.nfeatures, 298 [wdata_nfeatures, desired_nfeatures])) 299 300 self.failUnlessEqual(list(feat_sel_pipeline.selected_ids), 301 range(int(wdata_nfeatures*0.25)+4, wdata_nfeatures))
302 303 304 # TODO: should later on work for any clfs_with_sens 305 @sweepargs(clf=clfswh['has_sensitivity', '!meta'][:1])
306 - def testRFE(self, clf):
307 308 # sensitivity analyser and transfer error quantifier use the SAME clf! 309 sens_ana = clf.getSensitivityAnalyzer() 310 trans_error = TransferError(clf) 311 # because the clf is already trained when computing the sensitivity 312 # map, prevent retraining for transfer error calculation 313 # Use absolute of the svm weights as sensitivity 314 rfe = RFE(sens_ana, 315 trans_error, 316 feature_selector=FixedNElementTailSelector(1), 317 train_clf=False) 318 319 wdata = self.getData() 320 wdata_nfeatures = wdata.nfeatures 321 tdata = self.getDataT() 322 tdata_nfeatures = tdata.nfeatures 323 324 sdata, stdata = rfe(wdata, tdata) 325 326 # fail if orig datasets are changed 327 self.failUnless(wdata.nfeatures == wdata_nfeatures) 328 self.failUnless(tdata.nfeatures == tdata_nfeatures) 329 330 # check that the features set with the least error is selected 331 if len(rfe.errors): 332 e = N.array(rfe.errors) 333 self.failUnless(sdata.nfeatures == wdata_nfeatures - e.argmin()) 334 else: 335 self.failUnless(sdata.nfeatures == wdata_nfeatures) 336 337 # silly check if nfeatures is in decreasing order 338 nfeatures = N.array(rfe.nfeatures).copy() 339 nfeatures.sort() 340 self.failUnless( (nfeatures[::-1] == rfe.nfeatures).all() ) 341 342 # check if history has elements for every step 343 self.failUnless(Set(rfe.history) 344 == Set(range(len(N.array(rfe.errors))))) 345 346 # Last (the largest number) can be present multiple times even 347 # if we remove 1 feature at a time -- just need to stop well 348 # in advance when we have more than 1 feature left ;) 349 self.failUnless(rfe.nfeatures[-1] 350 == len(N.where(rfe.history 351 ==max(rfe.history))[0]))
352 353 # XXX add a test where sensitivity analyser and transfer error do not 354 # use the same classifier 355 356
357 - def testJamesProblem(self):
358 percent = 80 359 dataset = datasets['uni2small'] 360 rfesvm_split = LinearCSVMC() 361 FeatureSelection = \ 362 RFE(sensitivity_analyzer=rfesvm_split.getSensitivityAnalyzer(), 363 transfer_error=TransferError(rfesvm_split), 364 feature_selector=FractionTailSelector( 365 percent / 100.0, 366 mode='select', tail='upper'), update_sensitivity=True) 367 368 clf = FeatureSelectionClassifier( 369 clf = LinearCSVMC(), 370 # on features selected via RFE 371 feature_selection = FeatureSelection) 372 # update sensitivity at each step (since we're not using the 373 # same CLF as sensitivity analyzer) 374 clf.states.enable('feature_ids') 375 376 cv = CrossValidatedTransferError( 377 TransferError(clf), 378 NFoldSplitter(cvtype=1), 379 enable_states=['confusion'], 380 expose_testdataset=True) 381 #cv = SplitClassifier(clf) 382 try: 383 error = cv(dataset) 384 self.failUnless(error < 0.2) 385 except: 386 self.fail('CrossValidation cannot handle classifier with RFE ' 387 'feature selection')
388 389 390 391
392 -def suite():
393 return unittest.makeSuite(RFETests)
394 395 396 if __name__ == '__main__': 397 import runner 398