feat: initial commit - Phase 1 & 2 core features

This commit is contained in:
hiderfong
2026-04-22 17:07:33 +08:00
commit 1773bda06b
25005 changed files with 6252106 additions and 0 deletions
@@ -0,0 +1,983 @@
import itertools
import warnings
from functools import partial
import numpy as np
import pytest
import sklearn
from sklearn.base import clone
from sklearn.decomposition import (
DictionaryLearning,
MiniBatchDictionaryLearning,
SparseCoder,
dict_learning,
dict_learning_online,
sparse_encode,
)
from sklearn.decomposition._dict_learning import _update_dict
from sklearn.exceptions import ConvergenceWarning
from sklearn.utils import check_array
from sklearn.utils._testing import (
TempMemmap,
assert_allclose,
assert_array_almost_equal,
assert_array_equal,
ignore_warnings,
)
from sklearn.utils.estimator_checks import (
check_transformer_data_not_an_array,
check_transformer_general,
check_transformers_unfitted,
)
from sklearn.utils.parallel import Parallel
rng_global = np.random.RandomState(0)
n_samples, n_features = 10, 8
X = rng_global.randn(n_samples, n_features)
def test_sparse_encode_shapes_omp():
rng = np.random.RandomState(0)
algorithms = ["omp", "lasso_lars", "lasso_cd", "lars", "threshold"]
for n_components, n_samples in itertools.product([1, 5], [1, 9]):
X_ = rng.randn(n_samples, n_features)
dictionary = rng.randn(n_components, n_features)
for algorithm, n_jobs in itertools.product(algorithms, [1, 2]):
code = sparse_encode(X_, dictionary, algorithm=algorithm, n_jobs=n_jobs)
assert code.shape == (n_samples, n_components)
def test_dict_learning_shapes():
n_components = 5
dico = DictionaryLearning(n_components, random_state=0).fit(X)
assert dico.components_.shape == (n_components, n_features)
n_components = 1
dico = DictionaryLearning(n_components, random_state=0).fit(X)
assert dico.components_.shape == (n_components, n_features)
assert dico.transform(X).shape == (X.shape[0], n_components)
def test_dict_learning_overcomplete():
n_components = 12
dico = DictionaryLearning(n_components, random_state=0).fit(X)
assert dico.components_.shape == (n_components, n_features)
def test_max_iter():
def ricker_function(resolution, center, width):
"""Discrete sub-sampled Ricker (Mexican hat) wavelet"""
x = np.linspace(0, resolution - 1, resolution)
x = (
(2 / (np.sqrt(3 * width) * np.pi**0.25))
* (1 - (x - center) ** 2 / width**2)
* np.exp(-((x - center) ** 2) / (2 * width**2))
)
return x
def ricker_matrix(width, resolution, n_components):
"""Dictionary of Ricker (Mexican hat) wavelets"""
centers = np.linspace(0, resolution - 1, n_components)
D = np.empty((n_components, resolution))
for i, center in enumerate(centers):
D[i] = ricker_function(resolution, center, width)
D /= np.sqrt(np.sum(D**2, axis=1))[:, np.newaxis]
return D
transform_algorithm = "lasso_cd"
resolution = 1024
subsampling = 3 # subsampling factor
n_components = resolution // subsampling
# Compute a wavelet dictionary
D_multi = np.r_[
tuple(
ricker_matrix(
width=w, resolution=resolution, n_components=n_components // 5
)
for w in (10, 50, 100, 500, 1000)
)
]
X = np.linspace(0, resolution - 1, resolution)
first_quarter = X < resolution / 4
X[first_quarter] = 3.0
X[np.logical_not(first_quarter)] = -1.0
X = X.reshape(1, -1)
# check that the underlying model fails to converge
with pytest.warns(ConvergenceWarning):
model = SparseCoder(
D_multi, transform_algorithm=transform_algorithm, transform_max_iter=1
)
model.fit_transform(X)
# check that the underlying model converges w/o warnings
with warnings.catch_warnings():
warnings.simplefilter("error", ConvergenceWarning)
model = SparseCoder(
D_multi, transform_algorithm=transform_algorithm, transform_max_iter=2000
)
model.fit_transform(X)
def test_dict_learning_lars_positive_parameter():
n_components = 5
alpha = 1
err_msg = "Positive constraint not supported for 'lars' coding method."
with pytest.raises(ValueError, match=err_msg):
dict_learning(X, n_components, alpha=alpha, positive_code=True)
@pytest.mark.parametrize(
"transform_algorithm",
[
"lasso_lars",
"lasso_cd",
"threshold",
],
)
@pytest.mark.parametrize("positive_code", [False, True])
@pytest.mark.parametrize("positive_dict", [False, True])
def test_dict_learning_positivity(transform_algorithm, positive_code, positive_dict):
n_components = 5
dico = DictionaryLearning(
n_components,
transform_algorithm=transform_algorithm,
random_state=0,
positive_code=positive_code,
positive_dict=positive_dict,
fit_algorithm="cd",
).fit(X)
code = dico.transform(X)
if positive_dict:
assert (dico.components_ >= 0).all()
else:
assert (dico.components_ < 0).any()
if positive_code:
assert (code >= 0).all()
else:
assert (code < 0).any()
@pytest.mark.parametrize("positive_dict", [False, True])
def test_dict_learning_lars_dict_positivity(positive_dict):
n_components = 5
dico = DictionaryLearning(
n_components,
transform_algorithm="lars",
random_state=0,
positive_dict=positive_dict,
fit_algorithm="cd",
).fit(X)
if positive_dict:
assert (dico.components_ >= 0).all()
else:
assert (dico.components_ < 0).any()
def test_dict_learning_lars_code_positivity():
n_components = 5
dico = DictionaryLearning(
n_components,
transform_algorithm="lars",
random_state=0,
positive_code=True,
fit_algorithm="cd",
).fit(X)
err_msg = "Positive constraint not supported for '{}' coding method."
err_msg = err_msg.format("lars")
with pytest.raises(ValueError, match=err_msg):
dico.transform(X)
def test_dict_learning_reconstruction():
n_components = 12
dico = DictionaryLearning(
n_components, transform_algorithm="omp", transform_alpha=0.001, random_state=0
)
code = dico.fit(X).transform(X)
assert_array_almost_equal(np.dot(code, dico.components_), X)
dico.set_params(transform_algorithm="lasso_lars")
code = dico.transform(X)
assert_array_almost_equal(np.dot(code, dico.components_), X, decimal=2)
# used to test lars here too, but there's no guarantee the number of
# nonzero atoms is right.
def test_dict_learning_reconstruction_parallel():
# regression test that parallel reconstruction works with n_jobs>1
n_components = 12
dico = DictionaryLearning(
n_components,
transform_algorithm="omp",
transform_alpha=0.001,
random_state=0,
n_jobs=4,
)
code = dico.fit(X).transform(X)
assert_array_almost_equal(np.dot(code, dico.components_), X)
dico.set_params(transform_algorithm="lasso_lars")
code = dico.transform(X)
assert_array_almost_equal(np.dot(code, dico.components_), X, decimal=2)
def test_dict_learning_lassocd_readonly_data():
n_components = 12
with TempMemmap(X) as X_read_only:
dico = DictionaryLearning(
n_components,
transform_algorithm="lasso_cd",
transform_alpha=0.001,
random_state=0,
n_jobs=4,
)
with ignore_warnings(category=ConvergenceWarning):
code = dico.fit(X_read_only).transform(X_read_only)
assert_array_almost_equal(
np.dot(code, dico.components_), X_read_only, decimal=2
)
def test_dict_learning_nonzero_coefs():
n_components = 4
dico = DictionaryLearning(
n_components,
transform_algorithm="lars",
transform_n_nonzero_coefs=3,
random_state=0,
)
code = dico.fit(X).transform(X[np.newaxis, 1])
assert len(np.flatnonzero(code)) == 3
dico.set_params(transform_algorithm="omp")
code = dico.transform(X[np.newaxis, 1])
assert len(np.flatnonzero(code)) == 3
def test_dict_learning_split():
n_components = 5
dico = DictionaryLearning(
n_components, transform_algorithm="threshold", random_state=0
)
code = dico.fit(X).transform(X)
dico.split_sign = True
split_code = dico.transform(X)
assert_array_almost_equal(
split_code[:, :n_components] - split_code[:, n_components:], code
)
def test_dict_learning_online_shapes():
rng = np.random.RandomState(0)
n_components = 8
code, dictionary = dict_learning_online(
X,
n_components=n_components,
batch_size=4,
max_iter=10,
method="cd",
random_state=rng,
return_code=True,
)
assert code.shape == (n_samples, n_components)
assert dictionary.shape == (n_components, n_features)
assert np.dot(code, dictionary).shape == X.shape
dictionary = dict_learning_online(
X,
n_components=n_components,
batch_size=4,
max_iter=10,
method="cd",
random_state=rng,
return_code=False,
)
assert dictionary.shape == (n_components, n_features)
def test_dict_learning_online_lars_positive_parameter():
err_msg = "Positive constraint not supported for 'lars' coding method."
with pytest.raises(ValueError, match=err_msg):
dict_learning_online(X, batch_size=4, max_iter=10, positive_code=True)
@pytest.mark.parametrize(
"transform_algorithm",
[
"lasso_lars",
"lasso_cd",
"threshold",
],
)
@pytest.mark.parametrize("positive_code", [False, True])
@pytest.mark.parametrize("positive_dict", [False, True])
def test_minibatch_dictionary_learning_positivity(
transform_algorithm, positive_code, positive_dict
):
n_components = 8
dico = MiniBatchDictionaryLearning(
n_components,
batch_size=4,
max_iter=10,
transform_algorithm=transform_algorithm,
random_state=0,
positive_code=positive_code,
positive_dict=positive_dict,
fit_algorithm="cd",
).fit(X)
code = dico.transform(X)
if positive_dict:
assert (dico.components_ >= 0).all()
else:
assert (dico.components_ < 0).any()
if positive_code:
assert (code >= 0).all()
else:
assert (code < 0).any()
@pytest.mark.parametrize("positive_dict", [False, True])
def test_minibatch_dictionary_learning_lars(positive_dict):
n_components = 8
dico = MiniBatchDictionaryLearning(
n_components,
batch_size=4,
max_iter=10,
transform_algorithm="lars",
random_state=0,
positive_dict=positive_dict,
fit_algorithm="cd",
).fit(X)
if positive_dict:
assert (dico.components_ >= 0).all()
else:
assert (dico.components_ < 0).any()
@pytest.mark.parametrize("positive_code", [False, True])
@pytest.mark.parametrize("positive_dict", [False, True])
def test_dict_learning_online_positivity(positive_code, positive_dict):
rng = np.random.RandomState(0)
n_components = 8
code, dictionary = dict_learning_online(
X,
n_components=n_components,
batch_size=4,
method="cd",
alpha=1,
random_state=rng,
positive_dict=positive_dict,
positive_code=positive_code,
)
if positive_dict:
assert (dictionary >= 0).all()
else:
assert (dictionary < 0).any()
if positive_code:
assert (code >= 0).all()
else:
assert (code < 0).any()
def test_dict_learning_online_verbosity():
# test verbosity for better coverage
n_components = 5
import sys
from io import StringIO
old_stdout = sys.stdout
try:
sys.stdout = StringIO()
# convergence monitoring verbosity
dico = MiniBatchDictionaryLearning(
n_components, batch_size=4, max_iter=5, verbose=1, tol=0.1, random_state=0
)
dico.fit(X)
dico = MiniBatchDictionaryLearning(
n_components,
batch_size=4,
max_iter=5,
verbose=1,
max_no_improvement=2,
random_state=0,
)
dico.fit(X)
# higher verbosity level
dico = MiniBatchDictionaryLearning(
n_components, batch_size=4, max_iter=5, verbose=2, random_state=0
)
dico.fit(X)
# function API verbosity
dict_learning_online(
X,
n_components=n_components,
batch_size=4,
alpha=1,
verbose=1,
random_state=0,
)
dict_learning_online(
X,
n_components=n_components,
batch_size=4,
alpha=1,
verbose=2,
random_state=0,
)
finally:
sys.stdout = old_stdout
assert dico.components_.shape == (n_components, n_features)
def test_dict_learning_online_estimator_shapes():
n_components = 5
dico = MiniBatchDictionaryLearning(
n_components, batch_size=4, max_iter=5, random_state=0
)
dico.fit(X)
assert dico.components_.shape == (n_components, n_features)
def test_dict_learning_online_overcomplete():
n_components = 12
dico = MiniBatchDictionaryLearning(
n_components, batch_size=4, max_iter=5, random_state=0
).fit(X)
assert dico.components_.shape == (n_components, n_features)
def test_dict_learning_online_initialization():
n_components = 12
rng = np.random.RandomState(0)
V = rng.randn(n_components, n_features)
dico = MiniBatchDictionaryLearning(
n_components, batch_size=4, max_iter=0, dict_init=V, random_state=0
).fit(X)
assert_array_equal(dico.components_, V)
def test_dict_learning_online_readonly_initialization():
n_components = 12
rng = np.random.RandomState(0)
V = rng.randn(n_components, n_features)
V.setflags(write=False)
MiniBatchDictionaryLearning(
n_components,
batch_size=4,
max_iter=1,
dict_init=V,
random_state=0,
shuffle=False,
).fit(X)
def test_dict_learning_online_partial_fit():
n_components = 12
rng = np.random.RandomState(0)
V = rng.randn(n_components, n_features) # random init
V /= np.sum(V**2, axis=1)[:, np.newaxis]
dict1 = MiniBatchDictionaryLearning(
n_components,
max_iter=10,
batch_size=1,
alpha=1,
shuffle=False,
dict_init=V,
max_no_improvement=None,
tol=0.0,
random_state=0,
).fit(X)
dict2 = MiniBatchDictionaryLearning(
n_components, alpha=1, dict_init=V, random_state=0
)
for i in range(10):
for sample in X:
dict2.partial_fit(sample[np.newaxis, :])
assert not np.all(sparse_encode(X, dict1.components_, alpha=1) == 0)
assert_array_almost_equal(dict1.components_, dict2.components_, decimal=2)
# partial_fit should ignore max_iter (#17433)
assert dict1.n_steps_ == dict2.n_steps_ == 100
def test_sparse_encode_shapes():
n_components = 12
rng = np.random.RandomState(0)
V = rng.randn(n_components, n_features) # random init
V /= np.sum(V**2, axis=1)[:, np.newaxis]
for algo in ("lasso_lars", "lasso_cd", "lars", "omp", "threshold"):
code = sparse_encode(X, V, algorithm=algo)
assert code.shape == (n_samples, n_components)
@pytest.mark.parametrize("algo", ["lasso_lars", "lasso_cd", "threshold"])
@pytest.mark.parametrize("positive", [False, True])
def test_sparse_encode_positivity(algo, positive):
n_components = 12
rng = np.random.RandomState(0)
V = rng.randn(n_components, n_features) # random init
V /= np.sum(V**2, axis=1)[:, np.newaxis]
code = sparse_encode(X, V, algorithm=algo, positive=positive)
if positive:
assert (code >= 0).all()
else:
assert (code < 0).any()
@pytest.mark.parametrize("algo", ["lars", "omp"])
def test_sparse_encode_unavailable_positivity(algo):
n_components = 12
rng = np.random.RandomState(0)
V = rng.randn(n_components, n_features) # random init
V /= np.sum(V**2, axis=1)[:, np.newaxis]
err_msg = "Positive constraint not supported for '{}' coding method."
err_msg = err_msg.format(algo)
with pytest.raises(ValueError, match=err_msg):
sparse_encode(X, V, algorithm=algo, positive=True)
def test_sparse_encode_input():
n_components = 100
rng = np.random.RandomState(0)
V = rng.randn(n_components, n_features) # random init
V /= np.sum(V**2, axis=1)[:, np.newaxis]
Xf = check_array(X, order="F")
for algo in ("lasso_lars", "lasso_cd", "lars", "omp", "threshold"):
a = sparse_encode(X, V, algorithm=algo)
b = sparse_encode(Xf, V, algorithm=algo)
assert_array_almost_equal(a, b)
def test_sparse_encode_error():
n_components = 12
rng = np.random.RandomState(0)
V = rng.randn(n_components, n_features) # random init
V /= np.sum(V**2, axis=1)[:, np.newaxis]
code = sparse_encode(X, V, alpha=0.001)
assert not np.all(code == 0)
assert np.sqrt(np.sum((np.dot(code, V) - X) ** 2)) < 0.1
def test_sparse_encode_error_default_sparsity():
rng = np.random.RandomState(0)
X = rng.randn(100, 64)
D = rng.randn(2, 64)
code = ignore_warnings(sparse_encode)(X, D, algorithm="omp", n_nonzero_coefs=None)
assert code.shape == (100, 2)
def test_sparse_coder_estimator():
n_components = 12
rng = np.random.RandomState(0)
V = rng.randn(n_components, n_features) # random init
V /= np.sum(V**2, axis=1)[:, np.newaxis]
coder = SparseCoder(
dictionary=V, transform_algorithm="lasso_lars", transform_alpha=0.001
).transform(X)
assert not np.all(coder == 0)
assert np.sqrt(np.sum((np.dot(coder, V) - X) ** 2)) < 0.1
def test_sparse_coder_estimator_clone():
n_components = 12
rng = np.random.RandomState(0)
V = rng.randn(n_components, n_features) # random init
V /= np.sum(V**2, axis=1)[:, np.newaxis]
coder = SparseCoder(
dictionary=V, transform_algorithm="lasso_lars", transform_alpha=0.001
)
cloned = clone(coder)
assert id(cloned) != id(coder)
np.testing.assert_allclose(cloned.dictionary, coder.dictionary)
assert id(cloned.dictionary) != id(coder.dictionary)
assert cloned.n_components_ == coder.n_components_
assert cloned.n_features_in_ == coder.n_features_in_
data = np.random.rand(n_samples, n_features).astype(np.float32)
np.testing.assert_allclose(cloned.transform(data), coder.transform(data))
def test_sparse_coder_parallel_mmap():
# Non-regression test for:
# https://github.com/scikit-learn/scikit-learn/issues/5956
# Test that SparseCoder does not error by passing reading only
# arrays to child processes
rng = np.random.RandomState(777)
n_components, n_features = 40, 64
init_dict = rng.rand(n_components, n_features)
# Ensure that `data` is >2M. Joblib memory maps arrays
# if they are larger than 1MB. The 4 accounts for float32
# data type
n_samples = int(2e6) // (4 * n_features)
data = np.random.rand(n_samples, n_features).astype(np.float32)
sc = SparseCoder(init_dict, transform_algorithm="omp", n_jobs=2)
sc.fit_transform(data)
def test_sparse_coder_common_transformer():
rng = np.random.RandomState(777)
n_components, n_features = 40, 3
init_dict = rng.rand(n_components, n_features)
sc = SparseCoder(init_dict)
check_transformer_data_not_an_array(sc.__class__.__name__, sc)
check_transformer_general(sc.__class__.__name__, sc)
check_transformer_general_memmap = partial(
check_transformer_general, readonly_memmap=True
)
check_transformer_general_memmap(sc.__class__.__name__, sc)
check_transformers_unfitted(sc.__class__.__name__, sc)
def test_sparse_coder_n_features_in():
d = np.array([[1, 2, 3], [1, 2, 3]])
sc = SparseCoder(d)
assert sc.n_features_in_ == d.shape[1]
def test_update_dict():
# Check the dict update in batch mode vs online mode
# Non-regression test for #4866
rng = np.random.RandomState(0)
code = np.array([[0.5, -0.5], [0.1, 0.9]])
dictionary = np.array([[1.0, 0.0], [0.6, 0.8]])
X = np.dot(code, dictionary) + rng.randn(2, 2)
# full batch update
newd_batch = dictionary.copy()
_update_dict(newd_batch, X, code)
# online update
A = np.dot(code.T, code)
B = np.dot(X.T, code)
newd_online = dictionary.copy()
_update_dict(newd_online, X, code, A, B)
assert_allclose(newd_batch, newd_online)
@pytest.mark.parametrize(
"algorithm", ("lasso_lars", "lasso_cd", "lars", "threshold", "omp")
)
@pytest.mark.parametrize("data_type", (np.float32, np.float64))
# Note: do not check integer input because `lasso_lars` and `lars` fail with
# `ValueError` in `_lars_path_solver`
def test_sparse_encode_dtype_match(data_type, algorithm):
n_components = 6
rng = np.random.RandomState(0)
dictionary = rng.randn(n_components, n_features)
code = sparse_encode(
X.astype(data_type), dictionary.astype(data_type), algorithm=algorithm
)
assert code.dtype == data_type
@pytest.mark.parametrize(
"algorithm", ("lasso_lars", "lasso_cd", "lars", "threshold", "omp")
)
def test_sparse_encode_numerical_consistency(algorithm):
# verify numerical consistency among np.float32 and np.float64
rtol = 1e-4
n_components = 6
rng = np.random.RandomState(0)
dictionary = rng.randn(n_components, n_features)
code_32 = sparse_encode(
X.astype(np.float32), dictionary.astype(np.float32), algorithm=algorithm
)
code_64 = sparse_encode(
X.astype(np.float64), dictionary.astype(np.float64), algorithm=algorithm
)
assert_allclose(code_32, code_64, rtol=rtol)
@pytest.mark.parametrize(
"transform_algorithm", ("lasso_lars", "lasso_cd", "lars", "threshold", "omp")
)
@pytest.mark.parametrize("data_type", (np.float32, np.float64))
# Note: do not check integer input because `lasso_lars` and `lars` fail with
# `ValueError` in `_lars_path_solver`
def test_sparse_coder_dtype_match(data_type, transform_algorithm):
# Verify preserving dtype for transform in sparse coder
n_components = 6
rng = np.random.RandomState(0)
dictionary = rng.randn(n_components, n_features)
coder = SparseCoder(
dictionary.astype(data_type), transform_algorithm=transform_algorithm
)
code = coder.transform(X.astype(data_type))
assert code.dtype == data_type
@pytest.mark.parametrize("fit_algorithm", ("lars", "cd"))
@pytest.mark.parametrize(
"transform_algorithm", ("lasso_lars", "lasso_cd", "lars", "threshold", "omp")
)
@pytest.mark.parametrize(
"data_type, expected_type",
(
(np.float32, np.float32),
(np.float64, np.float64),
(np.int32, np.float64),
(np.int64, np.float64),
),
)
def test_dictionary_learning_dtype_match(
data_type,
expected_type,
fit_algorithm,
transform_algorithm,
):
# Verify preserving dtype for fit and transform in dictionary learning class
dict_learner = DictionaryLearning(
n_components=8,
fit_algorithm=fit_algorithm,
transform_algorithm=transform_algorithm,
random_state=0,
)
dict_learner.fit(X.astype(data_type))
assert dict_learner.components_.dtype == expected_type
assert dict_learner.transform(X.astype(data_type)).dtype == expected_type
@pytest.mark.parametrize("fit_algorithm", ("lars", "cd"))
@pytest.mark.parametrize(
"transform_algorithm", ("lasso_lars", "lasso_cd", "lars", "threshold", "omp")
)
@pytest.mark.parametrize(
"data_type, expected_type",
(
(np.float32, np.float32),
(np.float64, np.float64),
(np.int32, np.float64),
(np.int64, np.float64),
),
)
def test_minibatch_dictionary_learning_dtype_match(
data_type,
expected_type,
fit_algorithm,
transform_algorithm,
):
# Verify preserving dtype for fit and transform in minibatch dictionary learning
dict_learner = MiniBatchDictionaryLearning(
n_components=8,
batch_size=10,
fit_algorithm=fit_algorithm,
transform_algorithm=transform_algorithm,
max_iter=100,
tol=1e-1,
random_state=0,
)
dict_learner.fit(X.astype(data_type))
assert dict_learner.components_.dtype == expected_type
assert dict_learner.transform(X.astype(data_type)).dtype == expected_type
assert dict_learner._A.dtype == expected_type
assert dict_learner._B.dtype == expected_type
@pytest.mark.parametrize("method", ("lars", "cd"))
@pytest.mark.parametrize(
"data_type, expected_type",
(
(np.float32, np.float32),
(np.float64, np.float64),
(np.int32, np.float64),
(np.int64, np.float64),
),
)
def test_dict_learning_dtype_match(data_type, expected_type, method):
# Verify output matrix dtype
rng = np.random.RandomState(0)
n_components = 8
code, dictionary, _ = dict_learning(
X.astype(data_type),
n_components=n_components,
alpha=1,
random_state=rng,
method=method,
)
assert code.dtype == expected_type
assert dictionary.dtype == expected_type
@pytest.mark.parametrize("method", ("lars", "cd"))
def test_dict_learning_numerical_consistency(method):
# verify numerically consistent among np.float32 and np.float64
rtol = 1e-6
n_components = 4
alpha = 2
U_64, V_64, _ = dict_learning(
X.astype(np.float64),
n_components=n_components,
alpha=alpha,
random_state=0,
method=method,
)
U_32, V_32, _ = dict_learning(
X.astype(np.float32),
n_components=n_components,
alpha=alpha,
random_state=0,
method=method,
)
# Optimal solution (U*, V*) is not unique.
# If (U*, V*) is optimal solution, (-U*,-V*) is also optimal,
# and (column permutated U*, row permutated V*) are also optional
# as long as holding UV.
# So here UV, ||U||_1,1 and sum(||V_k||_2^2) are verified
# instead of comparing directly U and V.
assert_allclose(np.matmul(U_64, V_64), np.matmul(U_32, V_32), rtol=rtol)
assert_allclose(np.sum(np.abs(U_64)), np.sum(np.abs(U_32)), rtol=rtol)
assert_allclose(np.sum(V_64**2), np.sum(V_32**2), rtol=rtol)
# verify an obtained solution is not degenerate
assert np.mean(U_64 != 0.0) > 0.05
assert np.count_nonzero(U_64 != 0.0) == np.count_nonzero(U_32 != 0.0)
@pytest.mark.parametrize("method", ("lars", "cd"))
@pytest.mark.parametrize(
"data_type, expected_type",
(
(np.float32, np.float32),
(np.float64, np.float64),
(np.int32, np.float64),
(np.int64, np.float64),
),
)
def test_dict_learning_online_dtype_match(data_type, expected_type, method):
# Verify output matrix dtype
rng = np.random.RandomState(0)
n_components = 8
code, dictionary = dict_learning_online(
X.astype(data_type),
n_components=n_components,
alpha=1,
batch_size=10,
random_state=rng,
method=method,
)
assert code.dtype == expected_type
assert dictionary.dtype == expected_type
@pytest.mark.parametrize("method", ("lars", "cd"))
def test_dict_learning_online_numerical_consistency(method):
# verify numerically consistent among np.float32 and np.float64
rtol = 1e-4
n_components = 4
alpha = 1
U_64, V_64 = dict_learning_online(
X.astype(np.float64),
n_components=n_components,
max_iter=1_000,
alpha=alpha,
batch_size=10,
random_state=0,
method=method,
tol=0.0,
max_no_improvement=None,
)
U_32, V_32 = dict_learning_online(
X.astype(np.float32),
n_components=n_components,
max_iter=1_000,
alpha=alpha,
batch_size=10,
random_state=0,
method=method,
tol=0.0,
max_no_improvement=None,
)
# Optimal solution (U*, V*) is not unique.
# If (U*, V*) is optimal solution, (-U*,-V*) is also optimal,
# and (column permutated U*, row permutated V*) are also optional
# as long as holding UV.
# So here UV, ||U||_1,1 and sum(||V_k||_2) are verified
# instead of comparing directly U and V.
assert_allclose(np.matmul(U_64, V_64), np.matmul(U_32, V_32), rtol=rtol)
assert_allclose(np.sum(np.abs(U_64)), np.sum(np.abs(U_32)), rtol=rtol)
assert_allclose(np.sum(V_64**2), np.sum(V_32**2), rtol=rtol)
# verify an obtained solution is not degenerate
assert np.mean(U_64 != 0.0) > 0.05
assert np.count_nonzero(U_64 != 0.0) == np.count_nonzero(U_32 != 0.0)
@pytest.mark.parametrize(
"estimator",
[
SparseCoder(X.T),
DictionaryLearning(),
MiniBatchDictionaryLearning(batch_size=4, max_iter=10),
],
ids=lambda x: x.__class__.__name__,
)
def test_get_feature_names_out(estimator):
"""Check feature names for dict learning estimators."""
estimator.fit(X)
n_components = X.shape[1]
feature_names_out = estimator.get_feature_names_out()
estimator_name = estimator.__class__.__name__.lower()
assert_array_equal(
feature_names_out,
[f"{estimator_name}{i}" for i in range(n_components)],
)
def test_cd_work_on_joblib_memmapped_data(monkeypatch):
monkeypatch.setattr(
sklearn.decomposition._dict_learning,
"Parallel",
partial(Parallel, max_nbytes=100),
)
rng = np.random.RandomState(0)
X_train = rng.randn(10, 10)
dict_learner = DictionaryLearning(
n_components=5,
random_state=0,
n_jobs=2,
fit_algorithm="cd",
max_iter=50,
verbose=True,
)
# This must run and complete without error.
dict_learner.fit(X_train)
# TODO(1.6): remove in 1.6
def test_xxx():
warn_msg = "`max_iter=None` is deprecated in version 1.4 and will be removed"
with pytest.warns(FutureWarning, match=warn_msg):
MiniBatchDictionaryLearning(max_iter=None, random_state=0).fit(X)
with pytest.warns(FutureWarning, match=warn_msg):
dict_learning_online(X, max_iter=None, random_state=0)
@@ -0,0 +1,116 @@
# Author: Christian Osendorfer <osendorf@gmail.com>
# Alexandre Gramfort <alexandre.gramfort@inria.fr>
# License: BSD3
from itertools import combinations
import numpy as np
import pytest
from sklearn.decomposition import FactorAnalysis
from sklearn.decomposition._factor_analysis import _ortho_rotation
from sklearn.exceptions import ConvergenceWarning
from sklearn.utils._testing import (
assert_almost_equal,
assert_array_almost_equal,
ignore_warnings,
)
# Ignore warnings from switching to more power iterations in randomized_svd
@ignore_warnings
def test_factor_analysis():
# Test FactorAnalysis ability to recover the data covariance structure
rng = np.random.RandomState(0)
n_samples, n_features, n_components = 20, 5, 3
# Some random settings for the generative model
W = rng.randn(n_components, n_features)
# latent variable of dim 3, 20 of it
h = rng.randn(n_samples, n_components)
# using gamma to model different noise variance
# per component
noise = rng.gamma(1, size=n_features) * rng.randn(n_samples, n_features)
# generate observations
# wlog, mean is 0
X = np.dot(h, W) + noise
fas = []
for method in ["randomized", "lapack"]:
fa = FactorAnalysis(n_components=n_components, svd_method=method)
fa.fit(X)
fas.append(fa)
X_t = fa.transform(X)
assert X_t.shape == (n_samples, n_components)
assert_almost_equal(fa.loglike_[-1], fa.score_samples(X).sum())
assert_almost_equal(fa.score_samples(X).mean(), fa.score(X))
diff = np.all(np.diff(fa.loglike_))
assert diff > 0.0, "Log likelihood dif not increase"
# Sample Covariance
scov = np.cov(X, rowvar=0.0, bias=1.0)
# Model Covariance
mcov = fa.get_covariance()
diff = np.sum(np.abs(scov - mcov)) / W.size
assert diff < 0.1, "Mean absolute difference is %f" % diff
fa = FactorAnalysis(
n_components=n_components, noise_variance_init=np.ones(n_features)
)
with pytest.raises(ValueError):
fa.fit(X[:, :2])
def f(x, y):
return np.abs(getattr(x, y)) # sign will not be equal
fa1, fa2 = fas
for attr in ["loglike_", "components_", "noise_variance_"]:
assert_almost_equal(f(fa1, attr), f(fa2, attr))
fa1.max_iter = 1
fa1.verbose = True
with pytest.warns(ConvergenceWarning):
fa1.fit(X)
# Test get_covariance and get_precision with n_components == n_features
# with n_components < n_features and with n_components == 0
for n_components in [0, 2, X.shape[1]]:
fa.n_components = n_components
fa.fit(X)
cov = fa.get_covariance()
precision = fa.get_precision()
assert_array_almost_equal(np.dot(cov, precision), np.eye(X.shape[1]), 12)
# test rotation
n_components = 2
results, projections = {}, {}
for method in (None, "varimax", "quartimax"):
fa_var = FactorAnalysis(n_components=n_components, rotation=method)
results[method] = fa_var.fit_transform(X)
projections[method] = fa_var.get_covariance()
for rot1, rot2 in combinations([None, "varimax", "quartimax"], 2):
assert not np.allclose(results[rot1], results[rot2])
assert np.allclose(projections[rot1], projections[rot2], atol=3)
# test against R's psych::principal with rotate="varimax"
# (i.e., the values below stem from rotating the components in R)
# R's factor analysis returns quite different values; therefore, we only
# test the rotation itself
factors = np.array(
[
[0.89421016, -0.35854928, -0.27770122, 0.03773647],
[-0.45081822, -0.89132754, 0.0932195, -0.01787973],
[0.99500666, -0.02031465, 0.05426497, -0.11539407],
[0.96822861, -0.06299656, 0.24411001, 0.07540887],
]
)
r_solution = np.array(
[[0.962, 0.052], [-0.141, 0.989], [0.949, -0.300], [0.937, -0.251]]
)
rotated = _ortho_rotation(factors[:, :n_components], method="varimax").T
assert_array_almost_equal(np.abs(rotated), np.abs(r_solution), decimal=3)
@@ -0,0 +1,452 @@
"""
Test the fastica algorithm.
"""
import itertools
import os
import warnings
import numpy as np
import pytest
from scipy import stats
from sklearn.decomposition import PCA, FastICA, fastica
from sklearn.decomposition._fastica import _gs_decorrelation
from sklearn.exceptions import ConvergenceWarning
from sklearn.utils._testing import assert_allclose
def center_and_norm(x, axis=-1):
"""Centers and norms x **in place**
Parameters
-----------
x: ndarray
Array with an axis of observations (statistical units) measured on
random variables.
axis: int, optional
Axis along which the mean and variance are calculated.
"""
x = np.rollaxis(x, axis)
x -= x.mean(axis=0)
x /= x.std(axis=0)
def test_gs():
# Test gram schmidt orthonormalization
# generate a random orthogonal matrix
rng = np.random.RandomState(0)
W, _, _ = np.linalg.svd(rng.randn(10, 10))
w = rng.randn(10)
_gs_decorrelation(w, W, 10)
assert (w**2).sum() < 1.0e-10
w = rng.randn(10)
u = _gs_decorrelation(w, W, 5)
tmp = np.dot(u, W.T)
assert (tmp[:5] ** 2).sum() < 1.0e-10
def test_fastica_attributes_dtypes(global_dtype):
rng = np.random.RandomState(0)
X = rng.random_sample((100, 10)).astype(global_dtype, copy=False)
fica = FastICA(
n_components=5, max_iter=1000, whiten="unit-variance", random_state=0
).fit(X)
assert fica.components_.dtype == global_dtype
assert fica.mixing_.dtype == global_dtype
assert fica.mean_.dtype == global_dtype
assert fica.whitening_.dtype == global_dtype
def test_fastica_return_dtypes(global_dtype):
rng = np.random.RandomState(0)
X = rng.random_sample((100, 10)).astype(global_dtype, copy=False)
k_, mixing_, s_ = fastica(
X, max_iter=1000, whiten="unit-variance", random_state=rng
)
assert k_.dtype == global_dtype
assert mixing_.dtype == global_dtype
assert s_.dtype == global_dtype
@pytest.mark.parametrize("add_noise", [True, False])
def test_fastica_simple(add_noise, global_random_seed, global_dtype):
if (
global_random_seed == 20
and global_dtype == np.float32
and not add_noise
and os.getenv("DISTRIB") == "ubuntu"
):
pytest.xfail(
"FastICA instability with Ubuntu Atlas build with float32 "
"global_dtype. For more details, see "
"https://github.com/scikit-learn/scikit-learn/issues/24131#issuecomment-1208091119" # noqa
)
# Test the FastICA algorithm on very simple data.
rng = np.random.RandomState(global_random_seed)
n_samples = 1000
# Generate two sources:
s1 = (2 * np.sin(np.linspace(0, 100, n_samples)) > 0) - 1
s2 = stats.t.rvs(1, size=n_samples, random_state=global_random_seed)
s = np.c_[s1, s2].T
center_and_norm(s)
s = s.astype(global_dtype)
s1, s2 = s
# Mixing angle
phi = 0.6
mixing = np.array([[np.cos(phi), np.sin(phi)], [np.sin(phi), -np.cos(phi)]])
mixing = mixing.astype(global_dtype)
m = np.dot(mixing, s)
if add_noise:
m += 0.1 * rng.randn(2, 1000)
center_and_norm(m)
# function as fun arg
def g_test(x):
return x**3, (3 * x**2).mean(axis=-1)
algos = ["parallel", "deflation"]
nls = ["logcosh", "exp", "cube", g_test]
whitening = ["arbitrary-variance", "unit-variance", False]
for algo, nl, whiten in itertools.product(algos, nls, whitening):
if whiten:
k_, mixing_, s_ = fastica(
m.T, fun=nl, whiten=whiten, algorithm=algo, random_state=rng
)
with pytest.raises(ValueError):
fastica(m.T, fun=np.tanh, whiten=whiten, algorithm=algo)
else:
pca = PCA(n_components=2, whiten=True, random_state=rng)
X = pca.fit_transform(m.T)
k_, mixing_, s_ = fastica(
X, fun=nl, algorithm=algo, whiten=False, random_state=rng
)
with pytest.raises(ValueError):
fastica(X, fun=np.tanh, algorithm=algo)
s_ = s_.T
# Check that the mixing model described in the docstring holds:
if whiten:
# XXX: exact reconstruction to standard relative tolerance is not
# possible. This is probably expected when add_noise is True but we
# also need a non-trivial atol in float32 when add_noise is False.
#
# Note that the 2 sources are non-Gaussian in this test.
atol = 1e-5 if global_dtype == np.float32 else 0
assert_allclose(np.dot(np.dot(mixing_, k_), m), s_, atol=atol)
center_and_norm(s_)
s1_, s2_ = s_
# Check to see if the sources have been estimated
# in the wrong order
if abs(np.dot(s1_, s2)) > abs(np.dot(s1_, s1)):
s2_, s1_ = s_
s1_ *= np.sign(np.dot(s1_, s1))
s2_ *= np.sign(np.dot(s2_, s2))
# Check that we have estimated the original sources
if not add_noise:
assert_allclose(np.dot(s1_, s1) / n_samples, 1, atol=1e-2)
assert_allclose(np.dot(s2_, s2) / n_samples, 1, atol=1e-2)
else:
assert_allclose(np.dot(s1_, s1) / n_samples, 1, atol=1e-1)
assert_allclose(np.dot(s2_, s2) / n_samples, 1, atol=1e-1)
# Test FastICA class
_, _, sources_fun = fastica(
m.T, fun=nl, algorithm=algo, random_state=global_random_seed
)
ica = FastICA(fun=nl, algorithm=algo, random_state=global_random_seed)
sources = ica.fit_transform(m.T)
assert ica.components_.shape == (2, 2)
assert sources.shape == (1000, 2)
assert_allclose(sources_fun, sources)
# Set atol to account for the different magnitudes of the elements in sources
# (from 1e-4 to 1e1).
atol = np.max(np.abs(sources)) * (1e-5 if global_dtype == np.float32 else 1e-7)
assert_allclose(sources, ica.transform(m.T), atol=atol)
assert ica.mixing_.shape == (2, 2)
ica = FastICA(fun=np.tanh, algorithm=algo)
with pytest.raises(ValueError):
ica.fit(m.T)
def test_fastica_nowhiten():
m = [[0, 1], [1, 0]]
# test for issue #697
ica = FastICA(n_components=1, whiten=False, random_state=0)
warn_msg = "Ignoring n_components with whiten=False."
with pytest.warns(UserWarning, match=warn_msg):
ica.fit(m)
assert hasattr(ica, "mixing_")
def test_fastica_convergence_fail():
# Test the FastICA algorithm on very simple data
# (see test_non_square_fastica).
# Ensure a ConvergenceWarning raised if the tolerance is sufficiently low.
rng = np.random.RandomState(0)
n_samples = 1000
# Generate two sources:
t = np.linspace(0, 100, n_samples)
s1 = np.sin(t)
s2 = np.ceil(np.sin(np.pi * t))
s = np.c_[s1, s2].T
center_and_norm(s)
# Mixing matrix
mixing = rng.randn(6, 2)
m = np.dot(mixing, s)
# Do fastICA with tolerance 0. to ensure failing convergence
warn_msg = (
"FastICA did not converge. Consider increasing tolerance "
"or the maximum number of iterations."
)
with pytest.warns(ConvergenceWarning, match=warn_msg):
ica = FastICA(
algorithm="parallel", n_components=2, random_state=rng, max_iter=2, tol=0.0
)
ica.fit(m.T)
@pytest.mark.parametrize("add_noise", [True, False])
def test_non_square_fastica(add_noise):
# Test the FastICA algorithm on very simple data.
rng = np.random.RandomState(0)
n_samples = 1000
# Generate two sources:
t = np.linspace(0, 100, n_samples)
s1 = np.sin(t)
s2 = np.ceil(np.sin(np.pi * t))
s = np.c_[s1, s2].T
center_and_norm(s)
s1, s2 = s
# Mixing matrix
mixing = rng.randn(6, 2)
m = np.dot(mixing, s)
if add_noise:
m += 0.1 * rng.randn(6, n_samples)
center_and_norm(m)
k_, mixing_, s_ = fastica(
m.T, n_components=2, whiten="unit-variance", random_state=rng
)
s_ = s_.T
# Check that the mixing model described in the docstring holds:
assert_allclose(s_, np.dot(np.dot(mixing_, k_), m))
center_and_norm(s_)
s1_, s2_ = s_
# Check to see if the sources have been estimated
# in the wrong order
if abs(np.dot(s1_, s2)) > abs(np.dot(s1_, s1)):
s2_, s1_ = s_
s1_ *= np.sign(np.dot(s1_, s1))
s2_ *= np.sign(np.dot(s2_, s2))
# Check that we have estimated the original sources
if not add_noise:
assert_allclose(np.dot(s1_, s1) / n_samples, 1, atol=1e-3)
assert_allclose(np.dot(s2_, s2) / n_samples, 1, atol=1e-3)
def test_fit_transform(global_random_seed, global_dtype):
"""Test unit variance of transformed data using FastICA algorithm.
Check that `fit_transform` gives the same result as applying
`fit` and then `transform`.
Bug #13056
"""
# multivariate uniform data in [0, 1]
rng = np.random.RandomState(global_random_seed)
X = rng.random_sample((100, 10)).astype(global_dtype)
max_iter = 300
for whiten, n_components in [["unit-variance", 5], [False, None]]:
n_components_ = n_components if n_components is not None else X.shape[1]
ica = FastICA(
n_components=n_components, max_iter=max_iter, whiten=whiten, random_state=0
)
with warnings.catch_warnings():
# make sure that numerical errors do not cause sqrt of negative
# values
warnings.simplefilter("error", RuntimeWarning)
# XXX: for some seeds, the model does not converge.
# However this is not what we test here.
warnings.simplefilter("ignore", ConvergenceWarning)
Xt = ica.fit_transform(X)
assert ica.components_.shape == (n_components_, 10)
assert Xt.shape == (X.shape[0], n_components_)
ica2 = FastICA(
n_components=n_components, max_iter=max_iter, whiten=whiten, random_state=0
)
with warnings.catch_warnings():
# make sure that numerical errors do not cause sqrt of negative
# values
warnings.simplefilter("error", RuntimeWarning)
warnings.simplefilter("ignore", ConvergenceWarning)
ica2.fit(X)
assert ica2.components_.shape == (n_components_, 10)
Xt2 = ica2.transform(X)
# XXX: we have to set atol for this test to pass for all seeds when
# fitting with float32 data. Is this revealing a bug?
if global_dtype:
atol = np.abs(Xt2).mean() / 1e6
else:
atol = 0.0 # the default rtol is enough for float64 data
assert_allclose(Xt, Xt2, atol=atol)
@pytest.mark.filterwarnings("ignore:Ignoring n_components with whiten=False.")
@pytest.mark.parametrize(
"whiten, n_components, expected_mixing_shape",
[
("arbitrary-variance", 5, (10, 5)),
("arbitrary-variance", 10, (10, 10)),
("unit-variance", 5, (10, 5)),
("unit-variance", 10, (10, 10)),
(False, 5, (10, 10)),
(False, 10, (10, 10)),
],
)
def test_inverse_transform(
whiten, n_components, expected_mixing_shape, global_random_seed, global_dtype
):
# Test FastICA.inverse_transform
n_samples = 100
rng = np.random.RandomState(global_random_seed)
X = rng.random_sample((n_samples, 10)).astype(global_dtype)
ica = FastICA(n_components=n_components, random_state=rng, whiten=whiten)
with warnings.catch_warnings():
# For some dataset (depending on the value of global_dtype) the model
# can fail to converge but this should not impact the definition of
# a valid inverse transform.
warnings.simplefilter("ignore", ConvergenceWarning)
Xt = ica.fit_transform(X)
assert ica.mixing_.shape == expected_mixing_shape
X2 = ica.inverse_transform(Xt)
assert X.shape == X2.shape
# reversibility test in non-reduction case
if n_components == X.shape[1]:
# XXX: we have to set atol for this test to pass for all seeds when
# fitting with float32 data. Is this revealing a bug?
if global_dtype:
# XXX: dividing by a smaller number makes
# tests fail for some seeds.
atol = np.abs(X2).mean() / 1e5
else:
atol = 0.0 # the default rtol is enough for float64 data
assert_allclose(X, X2, atol=atol)
def test_fastica_errors():
n_features = 3
n_samples = 10
rng = np.random.RandomState(0)
X = rng.random_sample((n_samples, n_features))
w_init = rng.randn(n_features + 1, n_features + 1)
with pytest.raises(ValueError, match=r"alpha must be in \[1,2\]"):
fastica(X, fun_args={"alpha": 0})
with pytest.raises(
ValueError, match="w_init has invalid shape.+" r"should be \(3L?, 3L?\)"
):
fastica(X, w_init=w_init)
def test_fastica_whiten_unit_variance():
"""Test unit variance of transformed data using FastICA algorithm.
Bug #13056
"""
rng = np.random.RandomState(0)
X = rng.random_sample((100, 10))
n_components = X.shape[1]
ica = FastICA(n_components=n_components, whiten="unit-variance", random_state=0)
Xt = ica.fit_transform(X)
assert np.var(Xt) == pytest.approx(1.0)
@pytest.mark.parametrize("whiten", ["arbitrary-variance", "unit-variance", False])
@pytest.mark.parametrize("return_X_mean", [True, False])
@pytest.mark.parametrize("return_n_iter", [True, False])
def test_fastica_output_shape(whiten, return_X_mean, return_n_iter):
n_features = 3
n_samples = 10
rng = np.random.RandomState(0)
X = rng.random_sample((n_samples, n_features))
expected_len = 3 + return_X_mean + return_n_iter
out = fastica(
X, whiten=whiten, return_n_iter=return_n_iter, return_X_mean=return_X_mean
)
assert len(out) == expected_len
if not whiten:
assert out[0] is None
@pytest.mark.parametrize("add_noise", [True, False])
def test_fastica_simple_different_solvers(add_noise, global_random_seed):
"""Test FastICA is consistent between whiten_solvers."""
rng = np.random.RandomState(global_random_seed)
n_samples = 1000
# Generate two sources:
s1 = (2 * np.sin(np.linspace(0, 100, n_samples)) > 0) - 1
s2 = stats.t.rvs(1, size=n_samples, random_state=rng)
s = np.c_[s1, s2].T
center_and_norm(s)
s1, s2 = s
# Mixing angle
phi = rng.rand() * 2 * np.pi
mixing = np.array([[np.cos(phi), np.sin(phi)], [np.sin(phi), -np.cos(phi)]])
m = np.dot(mixing, s)
if add_noise:
m += 0.1 * rng.randn(2, 1000)
center_and_norm(m)
outs = {}
for solver in ("svd", "eigh"):
ica = FastICA(random_state=0, whiten="unit-variance", whiten_solver=solver)
sources = ica.fit_transform(m.T)
outs[solver] = sources
assert ica.components_.shape == (2, 2)
assert sources.shape == (1000, 2)
# compared numbers are not all on the same magnitude. Using a small atol to
# make the test less brittle
assert_allclose(outs["eigh"], outs["svd"], atol=1e-12)
def test_fastica_eigh_low_rank_warning(global_random_seed):
"""Test FastICA eigh solver raises warning for low-rank data."""
rng = np.random.RandomState(global_random_seed)
A = rng.randn(10, 2)
X = A @ A.T
ica = FastICA(random_state=0, whiten="unit-variance", whiten_solver="eigh")
msg = "There are some small singular values"
with pytest.warns(UserWarning, match=msg):
ica.fit(X)
@@ -0,0 +1,466 @@
"""Tests for Incremental PCA."""
import warnings
import numpy as np
import pytest
from numpy.testing import assert_allclose, assert_array_equal
from sklearn import datasets
from sklearn.decomposition import PCA, IncrementalPCA
from sklearn.utils._testing import (
assert_allclose_dense_sparse,
assert_almost_equal,
assert_array_almost_equal,
)
from sklearn.utils.fixes import CSC_CONTAINERS, CSR_CONTAINERS, LIL_CONTAINERS
iris = datasets.load_iris()
def test_incremental_pca():
# Incremental PCA on dense arrays.
X = iris.data
batch_size = X.shape[0] // 3
ipca = IncrementalPCA(n_components=2, batch_size=batch_size)
pca = PCA(n_components=2)
pca.fit_transform(X)
X_transformed = ipca.fit_transform(X)
assert X_transformed.shape == (X.shape[0], 2)
np.testing.assert_allclose(
ipca.explained_variance_ratio_.sum(),
pca.explained_variance_ratio_.sum(),
rtol=1e-3,
)
for n_components in [1, 2, X.shape[1]]:
ipca = IncrementalPCA(n_components, batch_size=batch_size)
ipca.fit(X)
cov = ipca.get_covariance()
precision = ipca.get_precision()
np.testing.assert_allclose(
np.dot(cov, precision), np.eye(X.shape[1]), atol=1e-13
)
@pytest.mark.parametrize(
"sparse_container", CSC_CONTAINERS + CSR_CONTAINERS + LIL_CONTAINERS
)
def test_incremental_pca_sparse(sparse_container):
# Incremental PCA on sparse arrays.
X = iris.data
pca = PCA(n_components=2)
pca.fit_transform(X)
X_sparse = sparse_container(X)
batch_size = X_sparse.shape[0] // 3
ipca = IncrementalPCA(n_components=2, batch_size=batch_size)
X_transformed = ipca.fit_transform(X_sparse)
assert X_transformed.shape == (X_sparse.shape[0], 2)
np.testing.assert_allclose(
ipca.explained_variance_ratio_.sum(),
pca.explained_variance_ratio_.sum(),
rtol=1e-3,
)
for n_components in [1, 2, X.shape[1]]:
ipca = IncrementalPCA(n_components, batch_size=batch_size)
ipca.fit(X_sparse)
cov = ipca.get_covariance()
precision = ipca.get_precision()
np.testing.assert_allclose(
np.dot(cov, precision), np.eye(X_sparse.shape[1]), atol=1e-13
)
with pytest.raises(
TypeError,
match=(
"IncrementalPCA.partial_fit does not support "
"sparse input. Either convert data to dense "
"or use IncrementalPCA.fit to do so in batches."
),
):
ipca.partial_fit(X_sparse)
def test_incremental_pca_check_projection():
# Test that the projection of data is correct.
rng = np.random.RandomState(1999)
n, p = 100, 3
X = rng.randn(n, p) * 0.1
X[:10] += np.array([3, 4, 5])
Xt = 0.1 * rng.randn(1, p) + np.array([3, 4, 5])
# Get the reconstruction of the generated data X
# Note that Xt has the same "components" as X, just separated
# This is what we want to ensure is recreated correctly
Yt = IncrementalPCA(n_components=2).fit(X).transform(Xt)
# Normalize
Yt /= np.sqrt((Yt**2).sum())
# Make sure that the first element of Yt is ~1, this means
# the reconstruction worked as expected
assert_almost_equal(np.abs(Yt[0][0]), 1.0, 1)
def test_incremental_pca_inverse():
# Test that the projection of data can be inverted.
rng = np.random.RandomState(1999)
n, p = 50, 3
X = rng.randn(n, p) # spherical data
X[:, 1] *= 0.00001 # make middle component relatively small
X += [5, 4, 3] # make a large mean
# same check that we can find the original data from the transformed
# signal (since the data is almost of rank n_components)
ipca = IncrementalPCA(n_components=2, batch_size=10).fit(X)
Y = ipca.transform(X)
Y_inverse = ipca.inverse_transform(Y)
assert_almost_equal(X, Y_inverse, decimal=3)
def test_incremental_pca_validation():
# Test that n_components is <= n_features.
X = np.array([[0, 1, 0], [1, 0, 0]])
n_samples, n_features = X.shape
n_components = 4
with pytest.raises(
ValueError,
match=(
"n_components={} invalid"
" for n_features={}, need more rows than"
" columns for IncrementalPCA"
" processing".format(n_components, n_features)
),
):
IncrementalPCA(n_components, batch_size=10).fit(X)
# Tests that n_components is also <= n_samples.
n_components = 3
with pytest.raises(
ValueError,
match=(
"n_components={} must be"
" less or equal to the batch number of"
" samples {}".format(n_components, n_samples)
),
):
IncrementalPCA(n_components=n_components).partial_fit(X)
def test_n_samples_equal_n_components():
# Ensures no warning is raised when n_samples==n_components
# Non-regression test for gh-19050
ipca = IncrementalPCA(n_components=5)
with warnings.catch_warnings():
warnings.simplefilter("error", RuntimeWarning)
ipca.partial_fit(np.random.randn(5, 7))
with warnings.catch_warnings():
warnings.simplefilter("error", RuntimeWarning)
ipca.fit(np.random.randn(5, 7))
def test_n_components_none():
# Ensures that n_components == None is handled correctly
rng = np.random.RandomState(1999)
for n_samples, n_features in [(50, 10), (10, 50)]:
X = rng.rand(n_samples, n_features)
ipca = IncrementalPCA(n_components=None)
# First partial_fit call, ipca.n_components_ is inferred from
# min(X.shape)
ipca.partial_fit(X)
assert ipca.n_components_ == min(X.shape)
# Second partial_fit call, ipca.n_components_ is inferred from
# ipca.components_ computed from the first partial_fit call
ipca.partial_fit(X)
assert ipca.n_components_ == ipca.components_.shape[0]
def test_incremental_pca_set_params():
# Test that components_ sign is stable over batch sizes.
rng = np.random.RandomState(1999)
n_samples = 100
n_features = 20
X = rng.randn(n_samples, n_features)
X2 = rng.randn(n_samples, n_features)
X3 = rng.randn(n_samples, n_features)
ipca = IncrementalPCA(n_components=20)
ipca.fit(X)
# Decreasing number of components
ipca.set_params(n_components=10)
with pytest.raises(ValueError):
ipca.partial_fit(X2)
# Increasing number of components
ipca.set_params(n_components=15)
with pytest.raises(ValueError):
ipca.partial_fit(X3)
# Returning to original setting
ipca.set_params(n_components=20)
ipca.partial_fit(X)
def test_incremental_pca_num_features_change():
# Test that changing n_components will raise an error.
rng = np.random.RandomState(1999)
n_samples = 100
X = rng.randn(n_samples, 20)
X2 = rng.randn(n_samples, 50)
ipca = IncrementalPCA(n_components=None)
ipca.fit(X)
with pytest.raises(ValueError):
ipca.partial_fit(X2)
def test_incremental_pca_batch_signs():
# Test that components_ sign is stable over batch sizes.
rng = np.random.RandomState(1999)
n_samples = 100
n_features = 3
X = rng.randn(n_samples, n_features)
all_components = []
batch_sizes = np.arange(10, 20)
for batch_size in batch_sizes:
ipca = IncrementalPCA(n_components=None, batch_size=batch_size).fit(X)
all_components.append(ipca.components_)
for i, j in zip(all_components[:-1], all_components[1:]):
assert_almost_equal(np.sign(i), np.sign(j), decimal=6)
def test_incremental_pca_batch_values():
# Test that components_ values are stable over batch sizes.
rng = np.random.RandomState(1999)
n_samples = 100
n_features = 3
X = rng.randn(n_samples, n_features)
all_components = []
batch_sizes = np.arange(20, 40, 3)
for batch_size in batch_sizes:
ipca = IncrementalPCA(n_components=None, batch_size=batch_size).fit(X)
all_components.append(ipca.components_)
for i, j in zip(all_components[:-1], all_components[1:]):
assert_almost_equal(i, j, decimal=1)
def test_incremental_pca_batch_rank():
# Test sample size in each batch is always larger or equal to n_components
rng = np.random.RandomState(1999)
n_samples = 100
n_features = 20
X = rng.randn(n_samples, n_features)
all_components = []
batch_sizes = np.arange(20, 90, 3)
for batch_size in batch_sizes:
ipca = IncrementalPCA(n_components=20, batch_size=batch_size).fit(X)
all_components.append(ipca.components_)
for components_i, components_j in zip(all_components[:-1], all_components[1:]):
assert_allclose_dense_sparse(components_i, components_j)
def test_incremental_pca_partial_fit():
# Test that fit and partial_fit get equivalent results.
rng = np.random.RandomState(1999)
n, p = 50, 3
X = rng.randn(n, p) # spherical data
X[:, 1] *= 0.00001 # make middle component relatively small
X += [5, 4, 3] # make a large mean
# same check that we can find the original data from the transformed
# signal (since the data is almost of rank n_components)
batch_size = 10
ipca = IncrementalPCA(n_components=2, batch_size=batch_size).fit(X)
pipca = IncrementalPCA(n_components=2, batch_size=batch_size)
# Add one to make sure endpoint is included
batch_itr = np.arange(0, n + 1, batch_size)
for i, j in zip(batch_itr[:-1], batch_itr[1:]):
pipca.partial_fit(X[i:j, :])
assert_almost_equal(ipca.components_, pipca.components_, decimal=3)
def test_incremental_pca_against_pca_iris():
# Test that IncrementalPCA and PCA are approximate (to a sign flip).
X = iris.data
Y_pca = PCA(n_components=2).fit_transform(X)
Y_ipca = IncrementalPCA(n_components=2, batch_size=25).fit_transform(X)
assert_almost_equal(np.abs(Y_pca), np.abs(Y_ipca), 1)
def test_incremental_pca_against_pca_random_data():
# Test that IncrementalPCA and PCA are approximate (to a sign flip).
rng = np.random.RandomState(1999)
n_samples = 100
n_features = 3
X = rng.randn(n_samples, n_features) + 5 * rng.rand(1, n_features)
Y_pca = PCA(n_components=3).fit_transform(X)
Y_ipca = IncrementalPCA(n_components=3, batch_size=25).fit_transform(X)
assert_almost_equal(np.abs(Y_pca), np.abs(Y_ipca), 1)
def test_explained_variances():
# Test that PCA and IncrementalPCA calculations match
X = datasets.make_low_rank_matrix(
1000, 100, tail_strength=0.0, effective_rank=10, random_state=1999
)
prec = 3
n_samples, n_features = X.shape
for nc in [None, 99]:
pca = PCA(n_components=nc).fit(X)
ipca = IncrementalPCA(n_components=nc, batch_size=100).fit(X)
assert_almost_equal(
pca.explained_variance_, ipca.explained_variance_, decimal=prec
)
assert_almost_equal(
pca.explained_variance_ratio_, ipca.explained_variance_ratio_, decimal=prec
)
assert_almost_equal(pca.noise_variance_, ipca.noise_variance_, decimal=prec)
def test_singular_values():
# Check that the IncrementalPCA output has the correct singular values
rng = np.random.RandomState(0)
n_samples = 1000
n_features = 100
X = datasets.make_low_rank_matrix(
n_samples, n_features, tail_strength=0.0, effective_rank=10, random_state=rng
)
pca = PCA(n_components=10, svd_solver="full", random_state=rng).fit(X)
ipca = IncrementalPCA(n_components=10, batch_size=100).fit(X)
assert_array_almost_equal(pca.singular_values_, ipca.singular_values_, 2)
# Compare to the Frobenius norm
X_pca = pca.transform(X)
X_ipca = ipca.transform(X)
assert_array_almost_equal(
np.sum(pca.singular_values_**2.0), np.linalg.norm(X_pca, "fro") ** 2.0, 12
)
assert_array_almost_equal(
np.sum(ipca.singular_values_**2.0), np.linalg.norm(X_ipca, "fro") ** 2.0, 2
)
# Compare to the 2-norms of the score vectors
assert_array_almost_equal(
pca.singular_values_, np.sqrt(np.sum(X_pca**2.0, axis=0)), 12
)
assert_array_almost_equal(
ipca.singular_values_, np.sqrt(np.sum(X_ipca**2.0, axis=0)), 2
)
# Set the singular values and see what we get back
rng = np.random.RandomState(0)
n_samples = 100
n_features = 110
X = datasets.make_low_rank_matrix(
n_samples, n_features, tail_strength=0.0, effective_rank=3, random_state=rng
)
pca = PCA(n_components=3, svd_solver="full", random_state=rng)
ipca = IncrementalPCA(n_components=3, batch_size=100)
X_pca = pca.fit_transform(X)
X_pca /= np.sqrt(np.sum(X_pca**2.0, axis=0))
X_pca[:, 0] *= 3.142
X_pca[:, 1] *= 2.718
X_hat = np.dot(X_pca, pca.components_)
pca.fit(X_hat)
ipca.fit(X_hat)
assert_array_almost_equal(pca.singular_values_, [3.142, 2.718, 1.0], 14)
assert_array_almost_equal(ipca.singular_values_, [3.142, 2.718, 1.0], 14)
def test_whitening(global_random_seed):
# Test that PCA and IncrementalPCA transforms match to sign flip.
X = datasets.make_low_rank_matrix(
1000, 10, tail_strength=0.0, effective_rank=2, random_state=global_random_seed
)
atol = 1e-3
for nc in [None, 9]:
pca = PCA(whiten=True, n_components=nc).fit(X)
ipca = IncrementalPCA(whiten=True, n_components=nc, batch_size=250).fit(X)
# Since the data is rank deficient, some components are pure noise. We
# should not expect those dimensions to carry any signal and their
# values might be arbitrarily changed by implementation details of the
# internal SVD solver. We therefore filter them out before comparison.
stable_mask = pca.explained_variance_ratio_ > 1e-12
Xt_pca = pca.transform(X)
Xt_ipca = ipca.transform(X)
assert_allclose(
np.abs(Xt_pca)[:, stable_mask],
np.abs(Xt_ipca)[:, stable_mask],
atol=atol,
)
# The noisy dimensions are in the null space of the inverse transform,
# so they are not influencing the reconstruction. We therefore don't
# need to apply the mask here.
Xinv_ipca = ipca.inverse_transform(Xt_ipca)
Xinv_pca = pca.inverse_transform(Xt_pca)
assert_allclose(X, Xinv_ipca, atol=atol)
assert_allclose(X, Xinv_pca, atol=atol)
assert_allclose(Xinv_pca, Xinv_ipca, atol=atol)
def test_incremental_pca_partial_fit_float_division():
# Test to ensure float division is used in all versions of Python
# (non-regression test for issue #9489)
rng = np.random.RandomState(0)
A = rng.randn(5, 3) + 2
B = rng.randn(7, 3) + 5
pca = IncrementalPCA(n_components=2)
pca.partial_fit(A)
# Set n_samples_seen_ to be a floating point number instead of an int
pca.n_samples_seen_ = float(pca.n_samples_seen_)
pca.partial_fit(B)
singular_vals_float_samples_seen = pca.singular_values_
pca2 = IncrementalPCA(n_components=2)
pca2.partial_fit(A)
pca2.partial_fit(B)
singular_vals_int_samples_seen = pca2.singular_values_
np.testing.assert_allclose(
singular_vals_float_samples_seen, singular_vals_int_samples_seen
)
def test_incremental_pca_fit_overflow_error():
# Test for overflow error on Windows OS
# (non-regression test for issue #17693)
rng = np.random.RandomState(0)
A = rng.rand(500000, 2)
ipca = IncrementalPCA(n_components=2, batch_size=10000)
ipca.fit(A)
pca = PCA(n_components=2)
pca.fit(A)
np.testing.assert_allclose(ipca.singular_values_, pca.singular_values_)
def test_incremental_pca_feature_names_out():
"""Check feature names out for IncrementalPCA."""
ipca = IncrementalPCA(n_components=2).fit(iris.data)
names = ipca.get_feature_names_out()
assert_array_equal([f"incrementalpca{i}" for i in range(2)], names)
@@ -0,0 +1,566 @@
import warnings
import numpy as np
import pytest
import sklearn
from sklearn.datasets import load_iris, make_blobs, make_circles
from sklearn.decomposition import PCA, KernelPCA
from sklearn.exceptions import NotFittedError
from sklearn.linear_model import Perceptron
from sklearn.metrics.pairwise import rbf_kernel
from sklearn.model_selection import GridSearchCV
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.utils._testing import (
assert_allclose,
assert_array_almost_equal,
assert_array_equal,
)
from sklearn.utils.fixes import CSR_CONTAINERS
from sklearn.utils.validation import _check_psd_eigenvalues
def test_kernel_pca():
"""Nominal test for all solvers and all known kernels + a custom one
It tests
- that fit_transform is equivalent to fit+transform
- that the shapes of transforms and inverse transforms are correct
"""
rng = np.random.RandomState(0)
X_fit = rng.random_sample((5, 4))
X_pred = rng.random_sample((2, 4))
def histogram(x, y, **kwargs):
# Histogram kernel implemented as a callable.
assert kwargs == {} # no kernel_params that we didn't ask for
return np.minimum(x, y).sum()
for eigen_solver in ("auto", "dense", "arpack", "randomized"):
for kernel in ("linear", "rbf", "poly", histogram):
# histogram kernel produces singular matrix inside linalg.solve
# XXX use a least-squares approximation?
inv = not callable(kernel)
# transform fit data
kpca = KernelPCA(
4, kernel=kernel, eigen_solver=eigen_solver, fit_inverse_transform=inv
)
X_fit_transformed = kpca.fit_transform(X_fit)
X_fit_transformed2 = kpca.fit(X_fit).transform(X_fit)
assert_array_almost_equal(
np.abs(X_fit_transformed), np.abs(X_fit_transformed2)
)
# non-regression test: previously, gamma would be 0 by default,
# forcing all eigenvalues to 0 under the poly kernel
assert X_fit_transformed.size != 0
# transform new data
X_pred_transformed = kpca.transform(X_pred)
assert X_pred_transformed.shape[1] == X_fit_transformed.shape[1]
# inverse transform
if inv:
X_pred2 = kpca.inverse_transform(X_pred_transformed)
assert X_pred2.shape == X_pred.shape
def test_kernel_pca_invalid_parameters():
"""Check that kPCA raises an error if the parameters are invalid
Tests fitting inverse transform with a precomputed kernel raises a
ValueError.
"""
estimator = KernelPCA(
n_components=10, fit_inverse_transform=True, kernel="precomputed"
)
err_ms = "Cannot fit_inverse_transform with a precomputed kernel"
with pytest.raises(ValueError, match=err_ms):
estimator.fit(np.random.randn(10, 10))
def test_kernel_pca_consistent_transform():
"""Check robustness to mutations in the original training array
Test that after fitting a kPCA model, it stays independent of any
mutation of the values of the original data object by relying on an
internal copy.
"""
# X_fit_ needs to retain the old, unmodified copy of X
state = np.random.RandomState(0)
X = state.rand(10, 10)
kpca = KernelPCA(random_state=state).fit(X)
transformed1 = kpca.transform(X)
X_copy = X.copy()
X[:, 0] = 666
transformed2 = kpca.transform(X_copy)
assert_array_almost_equal(transformed1, transformed2)
def test_kernel_pca_deterministic_output():
"""Test that Kernel PCA produces deterministic output
Tests that the same inputs and random state produce the same output.
"""
rng = np.random.RandomState(0)
X = rng.rand(10, 10)
eigen_solver = ("arpack", "dense")
for solver in eigen_solver:
transformed_X = np.zeros((20, 2))
for i in range(20):
kpca = KernelPCA(n_components=2, eigen_solver=solver, random_state=rng)
transformed_X[i, :] = kpca.fit_transform(X)[0]
assert_allclose(transformed_X, np.tile(transformed_X[0, :], 20).reshape(20, 2))
@pytest.mark.parametrize("csr_container", CSR_CONTAINERS)
def test_kernel_pca_sparse(csr_container):
"""Test that kPCA works on a sparse data input.
Same test as ``test_kernel_pca except inverse_transform`` since it's not
implemented for sparse matrices.
"""
rng = np.random.RandomState(0)
X_fit = csr_container(rng.random_sample((5, 4)))
X_pred = csr_container(rng.random_sample((2, 4)))
for eigen_solver in ("auto", "arpack", "randomized"):
for kernel in ("linear", "rbf", "poly"):
# transform fit data
kpca = KernelPCA(
4,
kernel=kernel,
eigen_solver=eigen_solver,
fit_inverse_transform=False,
random_state=0,
)
X_fit_transformed = kpca.fit_transform(X_fit)
X_fit_transformed2 = kpca.fit(X_fit).transform(X_fit)
assert_array_almost_equal(
np.abs(X_fit_transformed), np.abs(X_fit_transformed2)
)
# transform new data
X_pred_transformed = kpca.transform(X_pred)
assert X_pred_transformed.shape[1] == X_fit_transformed.shape[1]
# inverse transform: not available for sparse matrices
# XXX: should we raise another exception type here? For instance:
# NotImplementedError.
with pytest.raises(NotFittedError):
kpca.inverse_transform(X_pred_transformed)
@pytest.mark.parametrize("solver", ["auto", "dense", "arpack", "randomized"])
@pytest.mark.parametrize("n_features", [4, 10])
def test_kernel_pca_linear_kernel(solver, n_features):
"""Test that kPCA with linear kernel is equivalent to PCA for all solvers.
KernelPCA with linear kernel should produce the same output as PCA.
"""
rng = np.random.RandomState(0)
X_fit = rng.random_sample((5, n_features))
X_pred = rng.random_sample((2, n_features))
# for a linear kernel, kernel PCA should find the same projection as PCA
# modulo the sign (direction)
# fit only the first four components: fifth is near zero eigenvalue, so
# can be trimmed due to roundoff error
n_comps = 3 if solver == "arpack" else 4
assert_array_almost_equal(
np.abs(KernelPCA(n_comps, eigen_solver=solver).fit(X_fit).transform(X_pred)),
np.abs(
PCA(n_comps, svd_solver=solver if solver != "dense" else "full")
.fit(X_fit)
.transform(X_pred)
),
)
def test_kernel_pca_n_components():
"""Test that `n_components` is correctly taken into account for projections
For all solvers this tests that the output has the correct shape depending
on the selected number of components.
"""
rng = np.random.RandomState(0)
X_fit = rng.random_sample((5, 4))
X_pred = rng.random_sample((2, 4))
for eigen_solver in ("dense", "arpack", "randomized"):
for c in [1, 2, 4]:
kpca = KernelPCA(n_components=c, eigen_solver=eigen_solver)
shape = kpca.fit(X_fit).transform(X_pred).shape
assert shape == (2, c)
def test_remove_zero_eig():
"""Check that the ``remove_zero_eig`` parameter works correctly.
Tests that the null-space (Zero) eigenvalues are removed when
remove_zero_eig=True, whereas they are not by default.
"""
X = np.array([[1 - 1e-30, 1], [1, 1], [1, 1 - 1e-20]])
# n_components=None (default) => remove_zero_eig is True
kpca = KernelPCA()
Xt = kpca.fit_transform(X)
assert Xt.shape == (3, 0)
kpca = KernelPCA(n_components=2)
Xt = kpca.fit_transform(X)
assert Xt.shape == (3, 2)
kpca = KernelPCA(n_components=2, remove_zero_eig=True)
Xt = kpca.fit_transform(X)
assert Xt.shape == (3, 0)
def test_leave_zero_eig():
"""Non-regression test for issue #12141 (PR #12143)
This test checks that fit().transform() returns the same result as
fit_transform() in case of non-removed zero eigenvalue.
"""
X_fit = np.array([[1, 1], [0, 0]])
# Assert that even with all np warnings on, there is no div by zero warning
with warnings.catch_warnings():
# There might be warnings about the kernel being badly conditioned,
# but there should not be warnings about division by zero.
# (Numpy division by zero warning can have many message variants, but
# at least we know that it is a RuntimeWarning so lets check only this)
warnings.simplefilter("error", RuntimeWarning)
with np.errstate(all="warn"):
k = KernelPCA(n_components=2, remove_zero_eig=False, eigen_solver="dense")
# Fit, then transform
A = k.fit(X_fit).transform(X_fit)
# Do both at once
B = k.fit_transform(X_fit)
# Compare
assert_array_almost_equal(np.abs(A), np.abs(B))
def test_kernel_pca_precomputed():
"""Test that kPCA works with a precomputed kernel, for all solvers"""
rng = np.random.RandomState(0)
X_fit = rng.random_sample((5, 4))
X_pred = rng.random_sample((2, 4))
for eigen_solver in ("dense", "arpack", "randomized"):
X_kpca = (
KernelPCA(4, eigen_solver=eigen_solver, random_state=0)
.fit(X_fit)
.transform(X_pred)
)
X_kpca2 = (
KernelPCA(
4, eigen_solver=eigen_solver, kernel="precomputed", random_state=0
)
.fit(np.dot(X_fit, X_fit.T))
.transform(np.dot(X_pred, X_fit.T))
)
X_kpca_train = KernelPCA(
4, eigen_solver=eigen_solver, kernel="precomputed", random_state=0
).fit_transform(np.dot(X_fit, X_fit.T))
X_kpca_train2 = (
KernelPCA(
4, eigen_solver=eigen_solver, kernel="precomputed", random_state=0
)
.fit(np.dot(X_fit, X_fit.T))
.transform(np.dot(X_fit, X_fit.T))
)
assert_array_almost_equal(np.abs(X_kpca), np.abs(X_kpca2))
assert_array_almost_equal(np.abs(X_kpca_train), np.abs(X_kpca_train2))
@pytest.mark.parametrize("solver", ["auto", "dense", "arpack", "randomized"])
def test_kernel_pca_precomputed_non_symmetric(solver):
"""Check that the kernel centerer works.
Tests that a non symmetric precomputed kernel is actually accepted
because the kernel centerer does its job correctly.
"""
# a non symmetric gram matrix
K = [[1, 2], [3, 40]]
kpca = KernelPCA(
kernel="precomputed", eigen_solver=solver, n_components=1, random_state=0
)
kpca.fit(K) # no error
# same test with centered kernel
Kc = [[9, -9], [-9, 9]]
kpca_c = KernelPCA(
kernel="precomputed", eigen_solver=solver, n_components=1, random_state=0
)
kpca_c.fit(Kc)
# comparison between the non-centered and centered versions
assert_array_equal(kpca.eigenvectors_, kpca_c.eigenvectors_)
assert_array_equal(kpca.eigenvalues_, kpca_c.eigenvalues_)
def test_gridsearch_pipeline():
"""Check that kPCA works as expected in a grid search pipeline
Test if we can do a grid-search to find parameters to separate
circles with a perceptron model.
"""
X, y = make_circles(n_samples=400, factor=0.3, noise=0.05, random_state=0)
kpca = KernelPCA(kernel="rbf", n_components=2)
pipeline = Pipeline([("kernel_pca", kpca), ("Perceptron", Perceptron(max_iter=5))])
param_grid = dict(kernel_pca__gamma=2.0 ** np.arange(-2, 2))
grid_search = GridSearchCV(pipeline, cv=3, param_grid=param_grid)
grid_search.fit(X, y)
assert grid_search.best_score_ == 1
def test_gridsearch_pipeline_precomputed():
"""Check that kPCA works as expected in a grid search pipeline (2)
Test if we can do a grid-search to find parameters to separate
circles with a perceptron model. This test uses a precomputed kernel.
"""
X, y = make_circles(n_samples=400, factor=0.3, noise=0.05, random_state=0)
kpca = KernelPCA(kernel="precomputed", n_components=2)
pipeline = Pipeline([("kernel_pca", kpca), ("Perceptron", Perceptron(max_iter=5))])
param_grid = dict(Perceptron__max_iter=np.arange(1, 5))
grid_search = GridSearchCV(pipeline, cv=3, param_grid=param_grid)
X_kernel = rbf_kernel(X, gamma=2.0)
grid_search.fit(X_kernel, y)
assert grid_search.best_score_ == 1
def test_nested_circles():
"""Check that kPCA projects in a space where nested circles are separable
Tests that 2D nested circles become separable with a perceptron when
projected in the first 2 kPCA using an RBF kernel, while raw samples
are not directly separable in the original space.
"""
X, y = make_circles(n_samples=400, factor=0.3, noise=0.05, random_state=0)
# 2D nested circles are not linearly separable
train_score = Perceptron(max_iter=5).fit(X, y).score(X, y)
assert train_score < 0.8
# Project the circles data into the first 2 components of a RBF Kernel
# PCA model.
# Note that the gamma value is data dependent. If this test breaks
# and the gamma value has to be updated, the Kernel PCA example will
# have to be updated too.
kpca = KernelPCA(
kernel="rbf", n_components=2, fit_inverse_transform=True, gamma=2.0
)
X_kpca = kpca.fit_transform(X)
# The data is perfectly linearly separable in that space
train_score = Perceptron(max_iter=5).fit(X_kpca, y).score(X_kpca, y)
assert train_score == 1.0
def test_kernel_conditioning():
"""Check that ``_check_psd_eigenvalues`` is correctly called in kPCA
Non-regression test for issue #12140 (PR #12145).
"""
# create a pathological X leading to small non-zero eigenvalue
X = [[5, 1], [5 + 1e-8, 1e-8], [5 + 1e-8, 0]]
kpca = KernelPCA(kernel="linear", n_components=2, fit_inverse_transform=True)
kpca.fit(X)
# check that the small non-zero eigenvalue was correctly set to zero
assert kpca.eigenvalues_.min() == 0
assert np.all(kpca.eigenvalues_ == _check_psd_eigenvalues(kpca.eigenvalues_))
@pytest.mark.parametrize("solver", ["auto", "dense", "arpack", "randomized"])
def test_precomputed_kernel_not_psd(solver):
"""Check how KernelPCA works with non-PSD kernels depending on n_components
Tests for all methods what happens with a non PSD gram matrix (this
can happen in an isomap scenario, or with custom kernel functions, or
maybe with ill-posed datasets).
When ``n_component`` is large enough to capture a negative eigenvalue, an
error should be raised. Otherwise, KernelPCA should run without error
since the negative eigenvalues are not selected.
"""
# a non PSD kernel with large eigenvalues, already centered
# it was captured from an isomap call and multiplied by 100 for compacity
K = [
[4.48, -1.0, 8.07, 2.33, 2.33, 2.33, -5.76, -12.78],
[-1.0, -6.48, 4.5, -1.24, -1.24, -1.24, -0.81, 7.49],
[8.07, 4.5, 15.48, 2.09, 2.09, 2.09, -11.1, -23.23],
[2.33, -1.24, 2.09, 4.0, -3.65, -3.65, 1.02, -0.9],
[2.33, -1.24, 2.09, -3.65, 4.0, -3.65, 1.02, -0.9],
[2.33, -1.24, 2.09, -3.65, -3.65, 4.0, 1.02, -0.9],
[-5.76, -0.81, -11.1, 1.02, 1.02, 1.02, 4.86, 9.75],
[-12.78, 7.49, -23.23, -0.9, -0.9, -0.9, 9.75, 21.46],
]
# this gram matrix has 5 positive eigenvalues and 3 negative ones
# [ 52.72, 7.65, 7.65, 5.02, 0. , -0. , -6.13, -15.11]
# 1. ask for enough components to get a significant negative one
kpca = KernelPCA(kernel="precomputed", eigen_solver=solver, n_components=7)
# make sure that the appropriate error is raised
with pytest.raises(ValueError, match="There are significant negative eigenvalues"):
kpca.fit(K)
# 2. ask for a small enough n_components to get only positive ones
kpca = KernelPCA(kernel="precomputed", eigen_solver=solver, n_components=2)
if solver == "randomized":
# the randomized method is still inconsistent with the others on this
# since it selects the eigenvalues based on the largest 2 modules, not
# on the largest 2 values.
#
# At least we can ensure that we return an error instead of returning
# the wrong eigenvalues
with pytest.raises(
ValueError, match="There are significant negative eigenvalues"
):
kpca.fit(K)
else:
# general case: make sure that it works
kpca.fit(K)
@pytest.mark.parametrize("n_components", [4, 10, 20])
def test_kernel_pca_solvers_equivalence(n_components):
"""Check that 'dense' 'arpack' & 'randomized' solvers give similar results"""
# Generate random data
n_train, n_test = 1_000, 100
X, _ = make_circles(
n_samples=(n_train + n_test), factor=0.3, noise=0.05, random_state=0
)
X_fit, X_pred = X[:n_train, :], X[n_train:, :]
# reference (full)
ref_pred = (
KernelPCA(n_components, eigen_solver="dense", random_state=0)
.fit(X_fit)
.transform(X_pred)
)
# arpack
a_pred = (
KernelPCA(n_components, eigen_solver="arpack", random_state=0)
.fit(X_fit)
.transform(X_pred)
)
# check that the result is still correct despite the approx
assert_array_almost_equal(np.abs(a_pred), np.abs(ref_pred))
# randomized
r_pred = (
KernelPCA(n_components, eigen_solver="randomized", random_state=0)
.fit(X_fit)
.transform(X_pred)
)
# check that the result is still correct despite the approximation
assert_array_almost_equal(np.abs(r_pred), np.abs(ref_pred))
def test_kernel_pca_inverse_transform_reconstruction():
"""Test if the reconstruction is a good approximation.
Note that in general it is not possible to get an arbitrarily good
reconstruction because of kernel centering that does not
preserve all the information of the original data.
"""
X, *_ = make_blobs(n_samples=100, n_features=4, random_state=0)
kpca = KernelPCA(
n_components=20, kernel="rbf", fit_inverse_transform=True, alpha=1e-3
)
X_trans = kpca.fit_transform(X)
X_reconst = kpca.inverse_transform(X_trans)
assert np.linalg.norm(X - X_reconst) / np.linalg.norm(X) < 1e-1
def test_kernel_pca_raise_not_fitted_error():
X = np.random.randn(15).reshape(5, 3)
kpca = KernelPCA()
kpca.fit(X)
with pytest.raises(NotFittedError):
kpca.inverse_transform(X)
def test_32_64_decomposition_shape():
"""Test that the decomposition is similar for 32 and 64 bits data
Non regression test for
https://github.com/scikit-learn/scikit-learn/issues/18146
"""
X, y = make_blobs(
n_samples=30, centers=[[0, 0, 0], [1, 1, 1]], random_state=0, cluster_std=0.1
)
X = StandardScaler().fit_transform(X)
X -= X.min()
# Compare the shapes (corresponds to the number of non-zero eigenvalues)
kpca = KernelPCA()
assert kpca.fit_transform(X).shape == kpca.fit_transform(X.astype(np.float32)).shape
def test_kernel_pca_feature_names_out():
"""Check feature names out for KernelPCA."""
X, *_ = make_blobs(n_samples=100, n_features=4, random_state=0)
kpca = KernelPCA(n_components=2).fit(X)
names = kpca.get_feature_names_out()
assert_array_equal([f"kernelpca{i}" for i in range(2)], names)
def test_kernel_pca_inverse_correct_gamma():
"""Check that gamma is set correctly when not provided.
Non-regression test for #26280
"""
rng = np.random.RandomState(0)
X = rng.random_sample((5, 4))
kwargs = {
"n_components": 2,
"random_state": rng,
"fit_inverse_transform": True,
"kernel": "rbf",
}
expected_gamma = 1 / X.shape[1]
kpca1 = KernelPCA(gamma=None, **kwargs).fit(X)
kpca2 = KernelPCA(gamma=expected_gamma, **kwargs).fit(X)
assert kpca1.gamma_ == expected_gamma
assert kpca2.gamma_ == expected_gamma
X1_recon = kpca1.inverse_transform(kpca1.transform(X))
X2_recon = kpca2.inverse_transform(kpca1.transform(X))
assert_allclose(X1_recon, X2_recon)
def test_kernel_pca_pandas_output():
"""Check that KernelPCA works with pandas output when the solver is arpack.
Non-regression test for:
https://github.com/scikit-learn/scikit-learn/issues/27579
"""
pytest.importorskip("pandas")
X, _ = load_iris(as_frame=True, return_X_y=True)
with sklearn.config_context(transform_output="pandas"):
KernelPCA(n_components=2, eigen_solver="arpack").fit_transform(X)
@@ -0,0 +1,477 @@
import sys
from io import StringIO
import numpy as np
import pytest
from numpy.testing import assert_array_equal
from scipy.linalg import block_diag
from scipy.special import psi
from sklearn.decomposition import LatentDirichletAllocation
from sklearn.decomposition._online_lda_fast import (
_dirichlet_expectation_1d,
_dirichlet_expectation_2d,
)
from sklearn.exceptions import NotFittedError
from sklearn.utils._testing import (
assert_allclose,
assert_almost_equal,
assert_array_almost_equal,
if_safe_multiprocessing_with_blas,
)
from sklearn.utils.fixes import CSR_CONTAINERS
def _build_sparse_array(csr_container):
# Create 3 topics and each topic has 3 distinct words.
# (Each word only belongs to a single topic.)
n_components = 3
block = np.full((3, 3), n_components, dtype=int)
blocks = [block] * n_components
X = block_diag(*blocks)
X = csr_container(X)
return (n_components, X)
@pytest.mark.parametrize("csr_container", CSR_CONTAINERS)
def test_lda_default_prior_params(csr_container):
# default prior parameter should be `1 / topics`
# and verbose params should not affect result
n_components, X = _build_sparse_array(csr_container)
prior = 1.0 / n_components
lda_1 = LatentDirichletAllocation(
n_components=n_components,
doc_topic_prior=prior,
topic_word_prior=prior,
random_state=0,
)
lda_2 = LatentDirichletAllocation(n_components=n_components, random_state=0)
topic_distr_1 = lda_1.fit_transform(X)
topic_distr_2 = lda_2.fit_transform(X)
assert_almost_equal(topic_distr_1, topic_distr_2)
@pytest.mark.parametrize("csr_container", CSR_CONTAINERS)
def test_lda_fit_batch(csr_container):
# Test LDA batch learning_offset (`fit` method with 'batch' learning)
rng = np.random.RandomState(0)
n_components, X = _build_sparse_array(csr_container)
lda = LatentDirichletAllocation(
n_components=n_components,
evaluate_every=1,
learning_method="batch",
random_state=rng,
)
lda.fit(X)
correct_idx_grps = [(0, 1, 2), (3, 4, 5), (6, 7, 8)]
for component in lda.components_:
# Find top 3 words in each LDA component
top_idx = set(component.argsort()[-3:][::-1])
assert tuple(sorted(top_idx)) in correct_idx_grps
@pytest.mark.parametrize("csr_container", CSR_CONTAINERS)
def test_lda_fit_online(csr_container):
# Test LDA online learning (`fit` method with 'online' learning)
rng = np.random.RandomState(0)
n_components, X = _build_sparse_array(csr_container)
lda = LatentDirichletAllocation(
n_components=n_components,
learning_offset=10.0,
evaluate_every=1,
learning_method="online",
random_state=rng,
)
lda.fit(X)
correct_idx_grps = [(0, 1, 2), (3, 4, 5), (6, 7, 8)]
for component in lda.components_:
# Find top 3 words in each LDA component
top_idx = set(component.argsort()[-3:][::-1])
assert tuple(sorted(top_idx)) in correct_idx_grps
@pytest.mark.parametrize("csr_container", CSR_CONTAINERS)
def test_lda_partial_fit(csr_container):
# Test LDA online learning (`partial_fit` method)
# (same as test_lda_batch)
rng = np.random.RandomState(0)
n_components, X = _build_sparse_array(csr_container)
lda = LatentDirichletAllocation(
n_components=n_components,
learning_offset=10.0,
total_samples=100,
random_state=rng,
)
for i in range(3):
lda.partial_fit(X)
correct_idx_grps = [(0, 1, 2), (3, 4, 5), (6, 7, 8)]
for c in lda.components_:
top_idx = set(c.argsort()[-3:][::-1])
assert tuple(sorted(top_idx)) in correct_idx_grps
@pytest.mark.parametrize("csr_container", CSR_CONTAINERS)
def test_lda_dense_input(csr_container):
# Test LDA with dense input.
rng = np.random.RandomState(0)
n_components, X = _build_sparse_array(csr_container)
lda = LatentDirichletAllocation(
n_components=n_components, learning_method="batch", random_state=rng
)
lda.fit(X.toarray())
correct_idx_grps = [(0, 1, 2), (3, 4, 5), (6, 7, 8)]
for component in lda.components_:
# Find top 3 words in each LDA component
top_idx = set(component.argsort()[-3:][::-1])
assert tuple(sorted(top_idx)) in correct_idx_grps
def test_lda_transform():
# Test LDA transform.
# Transform result cannot be negative and should be normalized
rng = np.random.RandomState(0)
X = rng.randint(5, size=(20, 10))
n_components = 3
lda = LatentDirichletAllocation(n_components=n_components, random_state=rng)
X_trans = lda.fit_transform(X)
assert (X_trans > 0.0).any()
assert_array_almost_equal(np.sum(X_trans, axis=1), np.ones(X_trans.shape[0]))
@pytest.mark.parametrize("method", ("online", "batch"))
def test_lda_fit_transform(method):
# Test LDA fit_transform & transform
# fit_transform and transform result should be the same
rng = np.random.RandomState(0)
X = rng.randint(10, size=(50, 20))
lda = LatentDirichletAllocation(
n_components=5, learning_method=method, random_state=rng
)
X_fit = lda.fit_transform(X)
X_trans = lda.transform(X)
assert_array_almost_equal(X_fit, X_trans, 4)
def test_lda_negative_input():
# test pass dense matrix with sparse negative input.
X = np.full((5, 10), -1.0)
lda = LatentDirichletAllocation()
regex = r"^Negative values in data passed"
with pytest.raises(ValueError, match=regex):
lda.fit(X)
def test_lda_no_component_error():
# test `perplexity` before `fit`
rng = np.random.RandomState(0)
X = rng.randint(4, size=(20, 10))
lda = LatentDirichletAllocation()
regex = (
"This LatentDirichletAllocation instance is not fitted yet. "
"Call 'fit' with appropriate arguments before using this "
"estimator."
)
with pytest.raises(NotFittedError, match=regex):
lda.perplexity(X)
@if_safe_multiprocessing_with_blas
@pytest.mark.parametrize("csr_container", CSR_CONTAINERS)
@pytest.mark.parametrize("method", ("online", "batch"))
def test_lda_multi_jobs(method, csr_container):
n_components, X = _build_sparse_array(csr_container)
# Test LDA batch training with multi CPU
rng = np.random.RandomState(0)
lda = LatentDirichletAllocation(
n_components=n_components,
n_jobs=2,
learning_method=method,
evaluate_every=1,
random_state=rng,
)
lda.fit(X)
correct_idx_grps = [(0, 1, 2), (3, 4, 5), (6, 7, 8)]
for c in lda.components_:
top_idx = set(c.argsort()[-3:][::-1])
assert tuple(sorted(top_idx)) in correct_idx_grps
@if_safe_multiprocessing_with_blas
@pytest.mark.parametrize("csr_container", CSR_CONTAINERS)
def test_lda_partial_fit_multi_jobs(csr_container):
# Test LDA online training with multi CPU
rng = np.random.RandomState(0)
n_components, X = _build_sparse_array(csr_container)
lda = LatentDirichletAllocation(
n_components=n_components,
n_jobs=2,
learning_offset=5.0,
total_samples=30,
random_state=rng,
)
for i in range(2):
lda.partial_fit(X)
correct_idx_grps = [(0, 1, 2), (3, 4, 5), (6, 7, 8)]
for c in lda.components_:
top_idx = set(c.argsort()[-3:][::-1])
assert tuple(sorted(top_idx)) in correct_idx_grps
def test_lda_preplexity_mismatch():
# test dimension mismatch in `perplexity` method
rng = np.random.RandomState(0)
n_components = rng.randint(3, 6)
n_samples = rng.randint(6, 10)
X = np.random.randint(4, size=(n_samples, 10))
lda = LatentDirichletAllocation(
n_components=n_components,
learning_offset=5.0,
total_samples=20,
random_state=rng,
)
lda.fit(X)
# invalid samples
invalid_n_samples = rng.randint(4, size=(n_samples + 1, n_components))
with pytest.raises(ValueError, match=r"Number of samples"):
lda._perplexity_precomp_distr(X, invalid_n_samples)
# invalid topic number
invalid_n_components = rng.randint(4, size=(n_samples, n_components + 1))
with pytest.raises(ValueError, match=r"Number of topics"):
lda._perplexity_precomp_distr(X, invalid_n_components)
@pytest.mark.parametrize("method", ("online", "batch"))
@pytest.mark.parametrize("csr_container", CSR_CONTAINERS)
def test_lda_perplexity(method, csr_container):
# Test LDA perplexity for batch training
# perplexity should be lower after each iteration
n_components, X = _build_sparse_array(csr_container)
lda_1 = LatentDirichletAllocation(
n_components=n_components,
max_iter=1,
learning_method=method,
total_samples=100,
random_state=0,
)
lda_2 = LatentDirichletAllocation(
n_components=n_components,
max_iter=10,
learning_method=method,
total_samples=100,
random_state=0,
)
lda_1.fit(X)
perp_1 = lda_1.perplexity(X, sub_sampling=False)
lda_2.fit(X)
perp_2 = lda_2.perplexity(X, sub_sampling=False)
assert perp_1 >= perp_2
perp_1_subsampling = lda_1.perplexity(X, sub_sampling=True)
perp_2_subsampling = lda_2.perplexity(X, sub_sampling=True)
assert perp_1_subsampling >= perp_2_subsampling
@pytest.mark.parametrize("method", ("online", "batch"))
@pytest.mark.parametrize("csr_container", CSR_CONTAINERS)
def test_lda_score(method, csr_container):
# Test LDA score for batch training
# score should be higher after each iteration
n_components, X = _build_sparse_array(csr_container)
lda_1 = LatentDirichletAllocation(
n_components=n_components,
max_iter=1,
learning_method=method,
total_samples=100,
random_state=0,
)
lda_2 = LatentDirichletAllocation(
n_components=n_components,
max_iter=10,
learning_method=method,
total_samples=100,
random_state=0,
)
lda_1.fit_transform(X)
score_1 = lda_1.score(X)
lda_2.fit_transform(X)
score_2 = lda_2.score(X)
assert score_2 >= score_1
@pytest.mark.parametrize("csr_container", CSR_CONTAINERS)
def test_perplexity_input_format(csr_container):
# Test LDA perplexity for sparse and dense input
# score should be the same for both dense and sparse input
n_components, X = _build_sparse_array(csr_container)
lda = LatentDirichletAllocation(
n_components=n_components,
max_iter=1,
learning_method="batch",
total_samples=100,
random_state=0,
)
lda.fit(X)
perp_1 = lda.perplexity(X)
perp_2 = lda.perplexity(X.toarray())
assert_almost_equal(perp_1, perp_2)
@pytest.mark.parametrize("csr_container", CSR_CONTAINERS)
def test_lda_score_perplexity(csr_container):
# Test the relationship between LDA score and perplexity
n_components, X = _build_sparse_array(csr_container)
lda = LatentDirichletAllocation(
n_components=n_components, max_iter=10, random_state=0
)
lda.fit(X)
perplexity_1 = lda.perplexity(X, sub_sampling=False)
score = lda.score(X)
perplexity_2 = np.exp(-1.0 * (score / np.sum(X.data)))
assert_almost_equal(perplexity_1, perplexity_2)
@pytest.mark.parametrize("csr_container", CSR_CONTAINERS)
def test_lda_fit_perplexity(csr_container):
# Test that the perplexity computed during fit is consistent with what is
# returned by the perplexity method
n_components, X = _build_sparse_array(csr_container)
lda = LatentDirichletAllocation(
n_components=n_components,
max_iter=1,
learning_method="batch",
random_state=0,
evaluate_every=1,
)
lda.fit(X)
# Perplexity computed at end of fit method
perplexity1 = lda.bound_
# Result of perplexity method on the train set
perplexity2 = lda.perplexity(X)
assert_almost_equal(perplexity1, perplexity2)
@pytest.mark.parametrize("csr_container", CSR_CONTAINERS)
def test_lda_empty_docs(csr_container):
"""Test LDA on empty document (all-zero rows)."""
Z = np.zeros((5, 4))
for X in [Z, csr_container(Z)]:
lda = LatentDirichletAllocation(max_iter=750).fit(X)
assert_almost_equal(
lda.components_.sum(axis=0), np.ones(lda.components_.shape[1])
)
def test_dirichlet_expectation():
"""Test Cython version of Dirichlet expectation calculation."""
x = np.logspace(-100, 10, 10000)
expectation = np.empty_like(x)
_dirichlet_expectation_1d(x, 0, expectation)
assert_allclose(expectation, np.exp(psi(x) - psi(np.sum(x))), atol=1e-19)
x = x.reshape(100, 100)
assert_allclose(
_dirichlet_expectation_2d(x),
psi(x) - psi(np.sum(x, axis=1)[:, np.newaxis]),
rtol=1e-11,
atol=3e-9,
)
def check_verbosity(
verbose, evaluate_every, expected_lines, expected_perplexities, csr_container
):
n_components, X = _build_sparse_array(csr_container)
lda = LatentDirichletAllocation(
n_components=n_components,
max_iter=3,
learning_method="batch",
verbose=verbose,
evaluate_every=evaluate_every,
random_state=0,
)
out = StringIO()
old_out, sys.stdout = sys.stdout, out
try:
lda.fit(X)
finally:
sys.stdout = old_out
n_lines = out.getvalue().count("\n")
n_perplexity = out.getvalue().count("perplexity")
assert expected_lines == n_lines
assert expected_perplexities == n_perplexity
@pytest.mark.parametrize(
"verbose,evaluate_every,expected_lines,expected_perplexities",
[
(False, 1, 0, 0),
(False, 0, 0, 0),
(True, 0, 3, 0),
(True, 1, 3, 3),
(True, 2, 3, 1),
],
)
@pytest.mark.parametrize("csr_container", CSR_CONTAINERS)
def test_verbosity(
verbose, evaluate_every, expected_lines, expected_perplexities, csr_container
):
check_verbosity(
verbose, evaluate_every, expected_lines, expected_perplexities, csr_container
)
@pytest.mark.parametrize("csr_container", CSR_CONTAINERS)
def test_lda_feature_names_out(csr_container):
"""Check feature names out for LatentDirichletAllocation."""
n_components, X = _build_sparse_array(csr_container)
lda = LatentDirichletAllocation(n_components=n_components).fit(X)
names = lda.get_feature_names_out()
assert_array_equal(
[f"latentdirichletallocation{i}" for i in range(n_components)], names
)
@pytest.mark.parametrize("learning_method", ("batch", "online"))
def test_lda_dtype_match(learning_method, global_dtype):
"""Check data type preservation of fitted attributes."""
rng = np.random.RandomState(0)
X = rng.uniform(size=(20, 10)).astype(global_dtype, copy=False)
lda = LatentDirichletAllocation(
n_components=5, random_state=0, learning_method=learning_method
)
lda.fit(X)
assert lda.components_.dtype == global_dtype
assert lda.exp_dirichlet_component_.dtype == global_dtype
@pytest.mark.parametrize("learning_method", ("batch", "online"))
def test_lda_numerical_consistency(learning_method, global_random_seed):
"""Check numerical consistency between np.float32 and np.float64."""
rng = np.random.RandomState(global_random_seed)
X64 = rng.uniform(size=(20, 10))
X32 = X64.astype(np.float32)
lda_64 = LatentDirichletAllocation(
n_components=5, random_state=global_random_seed, learning_method=learning_method
).fit(X64)
lda_32 = LatentDirichletAllocation(
n_components=5, random_state=global_random_seed, learning_method=learning_method
).fit(X32)
assert_allclose(lda_32.components_, lda_64.components_)
assert_allclose(lda_32.transform(X32), lda_64.transform(X64))
@@ -0,0 +1,371 @@
# Author: Vlad Niculae
# License: BSD 3 clause
import sys
import numpy as np
import pytest
from numpy.testing import assert_array_equal
from sklearn.decomposition import PCA, MiniBatchSparsePCA, SparsePCA
from sklearn.utils import check_random_state
from sklearn.utils._testing import (
assert_allclose,
assert_array_almost_equal,
if_safe_multiprocessing_with_blas,
)
from sklearn.utils.extmath import svd_flip
def generate_toy_data(n_components, n_samples, image_size, random_state=None):
n_features = image_size[0] * image_size[1]
rng = check_random_state(random_state)
U = rng.randn(n_samples, n_components)
V = rng.randn(n_components, n_features)
centers = [(3, 3), (6, 7), (8, 1)]
sz = [1, 2, 1]
for k in range(n_components):
img = np.zeros(image_size)
xmin, xmax = centers[k][0] - sz[k], centers[k][0] + sz[k]
ymin, ymax = centers[k][1] - sz[k], centers[k][1] + sz[k]
img[xmin:xmax][:, ymin:ymax] = 1.0
V[k, :] = img.ravel()
# Y is defined by : Y = UV + noise
Y = np.dot(U, V)
Y += 0.1 * rng.randn(Y.shape[0], Y.shape[1]) # Add noise
return Y, U, V
# SparsePCA can be a bit slow. To avoid having test times go up, we
# test different aspects of the code in the same test
def test_correct_shapes():
rng = np.random.RandomState(0)
X = rng.randn(12, 10)
spca = SparsePCA(n_components=8, random_state=rng)
U = spca.fit_transform(X)
assert spca.components_.shape == (8, 10)
assert U.shape == (12, 8)
# test overcomplete decomposition
spca = SparsePCA(n_components=13, random_state=rng)
U = spca.fit_transform(X)
assert spca.components_.shape == (13, 10)
assert U.shape == (12, 13)
def test_fit_transform():
alpha = 1
rng = np.random.RandomState(0)
Y, _, _ = generate_toy_data(3, 10, (8, 8), random_state=rng) # wide array
spca_lars = SparsePCA(n_components=3, method="lars", alpha=alpha, random_state=0)
spca_lars.fit(Y)
# Test that CD gives similar results
spca_lasso = SparsePCA(n_components=3, method="cd", random_state=0, alpha=alpha)
spca_lasso.fit(Y)
assert_array_almost_equal(spca_lasso.components_, spca_lars.components_)
@if_safe_multiprocessing_with_blas
def test_fit_transform_parallel():
alpha = 1
rng = np.random.RandomState(0)
Y, _, _ = generate_toy_data(3, 10, (8, 8), random_state=rng) # wide array
spca_lars = SparsePCA(n_components=3, method="lars", alpha=alpha, random_state=0)
spca_lars.fit(Y)
U1 = spca_lars.transform(Y)
# Test multiple CPUs
spca = SparsePCA(
n_components=3, n_jobs=2, method="lars", alpha=alpha, random_state=0
).fit(Y)
U2 = spca.transform(Y)
assert not np.all(spca_lars.components_ == 0)
assert_array_almost_equal(U1, U2)
def test_transform_nan():
# Test that SparsePCA won't return NaN when there is 0 feature in all
# samples.
rng = np.random.RandomState(0)
Y, _, _ = generate_toy_data(3, 10, (8, 8), random_state=rng) # wide array
Y[:, 0] = 0
estimator = SparsePCA(n_components=8)
assert not np.any(np.isnan(estimator.fit_transform(Y)))
def test_fit_transform_tall():
rng = np.random.RandomState(0)
Y, _, _ = generate_toy_data(3, 65, (8, 8), random_state=rng) # tall array
spca_lars = SparsePCA(n_components=3, method="lars", random_state=rng)
U1 = spca_lars.fit_transform(Y)
spca_lasso = SparsePCA(n_components=3, method="cd", random_state=rng)
U2 = spca_lasso.fit(Y).transform(Y)
assert_array_almost_equal(U1, U2)
def test_initialization():
rng = np.random.RandomState(0)
U_init = rng.randn(5, 3)
V_init = rng.randn(3, 4)
model = SparsePCA(
n_components=3, U_init=U_init, V_init=V_init, max_iter=0, random_state=rng
)
model.fit(rng.randn(5, 4))
expected_components = V_init / np.linalg.norm(V_init, axis=1, keepdims=True)
expected_components = svd_flip(u=expected_components.T, v=None)[0].T
assert_allclose(model.components_, expected_components)
def test_mini_batch_correct_shapes():
rng = np.random.RandomState(0)
X = rng.randn(12, 10)
pca = MiniBatchSparsePCA(n_components=8, max_iter=1, random_state=rng)
U = pca.fit_transform(X)
assert pca.components_.shape == (8, 10)
assert U.shape == (12, 8)
# test overcomplete decomposition
pca = MiniBatchSparsePCA(n_components=13, max_iter=1, random_state=rng)
U = pca.fit_transform(X)
assert pca.components_.shape == (13, 10)
assert U.shape == (12, 13)
# XXX: test always skipped
@pytest.mark.skipif(True, reason="skipping mini_batch_fit_transform.")
def test_mini_batch_fit_transform():
alpha = 1
rng = np.random.RandomState(0)
Y, _, _ = generate_toy_data(3, 10, (8, 8), random_state=rng) # wide array
spca_lars = MiniBatchSparsePCA(n_components=3, random_state=0, alpha=alpha).fit(Y)
U1 = spca_lars.transform(Y)
# Test multiple CPUs
if sys.platform == "win32": # fake parallelism for win32
import joblib
_mp = joblib.parallel.multiprocessing
joblib.parallel.multiprocessing = None
try:
spca = MiniBatchSparsePCA(
n_components=3, n_jobs=2, alpha=alpha, random_state=0
)
U2 = spca.fit(Y).transform(Y)
finally:
joblib.parallel.multiprocessing = _mp
else: # we can efficiently use parallelism
spca = MiniBatchSparsePCA(n_components=3, n_jobs=2, alpha=alpha, random_state=0)
U2 = spca.fit(Y).transform(Y)
assert not np.all(spca_lars.components_ == 0)
assert_array_almost_equal(U1, U2)
# Test that CD gives similar results
spca_lasso = MiniBatchSparsePCA(
n_components=3, method="cd", alpha=alpha, random_state=0
).fit(Y)
assert_array_almost_equal(spca_lasso.components_, spca_lars.components_)
def test_scaling_fit_transform():
alpha = 1
rng = np.random.RandomState(0)
Y, _, _ = generate_toy_data(3, 1000, (8, 8), random_state=rng)
spca_lars = SparsePCA(n_components=3, method="lars", alpha=alpha, random_state=rng)
results_train = spca_lars.fit_transform(Y)
results_test = spca_lars.transform(Y[:10])
assert_allclose(results_train[0], results_test[0])
def test_pca_vs_spca():
rng = np.random.RandomState(0)
Y, _, _ = generate_toy_data(3, 1000, (8, 8), random_state=rng)
Z, _, _ = generate_toy_data(3, 10, (8, 8), random_state=rng)
spca = SparsePCA(alpha=0, ridge_alpha=0, n_components=2)
pca = PCA(n_components=2)
pca.fit(Y)
spca.fit(Y)
results_test_pca = pca.transform(Z)
results_test_spca = spca.transform(Z)
assert_allclose(
np.abs(spca.components_.dot(pca.components_.T)), np.eye(2), atol=1e-5
)
results_test_pca *= np.sign(results_test_pca[0, :])
results_test_spca *= np.sign(results_test_spca[0, :])
assert_allclose(results_test_pca, results_test_spca)
@pytest.mark.parametrize("SPCA", [SparsePCA, MiniBatchSparsePCA])
@pytest.mark.parametrize("n_components", [None, 3])
def test_spca_n_components_(SPCA, n_components):
rng = np.random.RandomState(0)
n_samples, n_features = 12, 10
X = rng.randn(n_samples, n_features)
model = SPCA(n_components=n_components).fit(X)
if n_components is not None:
assert model.n_components_ == n_components
else:
assert model.n_components_ == n_features
@pytest.mark.parametrize("SPCA", (SparsePCA, MiniBatchSparsePCA))
@pytest.mark.parametrize("method", ("lars", "cd"))
@pytest.mark.parametrize(
"data_type, expected_type",
(
(np.float32, np.float32),
(np.float64, np.float64),
(np.int32, np.float64),
(np.int64, np.float64),
),
)
def test_sparse_pca_dtype_match(SPCA, method, data_type, expected_type):
# Verify output matrix dtype
n_samples, n_features, n_components = 12, 10, 3
rng = np.random.RandomState(0)
input_array = rng.randn(n_samples, n_features).astype(data_type)
model = SPCA(n_components=n_components, method=method)
transformed = model.fit_transform(input_array)
assert transformed.dtype == expected_type
assert model.components_.dtype == expected_type
@pytest.mark.parametrize("SPCA", (SparsePCA, MiniBatchSparsePCA))
@pytest.mark.parametrize("method", ("lars", "cd"))
def test_sparse_pca_numerical_consistency(SPCA, method):
# Verify numericall consistentency among np.float32 and np.float64
rtol = 1e-3
alpha = 2
n_samples, n_features, n_components = 12, 10, 3
rng = np.random.RandomState(0)
input_array = rng.randn(n_samples, n_features)
model_32 = SPCA(
n_components=n_components, alpha=alpha, method=method, random_state=0
)
transformed_32 = model_32.fit_transform(input_array.astype(np.float32))
model_64 = SPCA(
n_components=n_components, alpha=alpha, method=method, random_state=0
)
transformed_64 = model_64.fit_transform(input_array.astype(np.float64))
assert_allclose(transformed_64, transformed_32, rtol=rtol)
assert_allclose(model_64.components_, model_32.components_, rtol=rtol)
@pytest.mark.parametrize("SPCA", [SparsePCA, MiniBatchSparsePCA])
def test_spca_feature_names_out(SPCA):
"""Check feature names out for *SparsePCA."""
rng = np.random.RandomState(0)
n_samples, n_features = 12, 10
X = rng.randn(n_samples, n_features)
model = SPCA(n_components=4).fit(X)
names = model.get_feature_names_out()
estimator_name = SPCA.__name__.lower()
assert_array_equal([f"{estimator_name}{i}" for i in range(4)], names)
# TODO(1.6): remove in 1.6
def test_spca_max_iter_None_deprecation():
"""Check that we raise a warning for the deprecation of `max_iter=None`."""
rng = np.random.RandomState(0)
n_samples, n_features = 12, 10
X = rng.randn(n_samples, n_features)
warn_msg = "`max_iter=None` is deprecated in version 1.4 and will be removed"
with pytest.warns(FutureWarning, match=warn_msg):
MiniBatchSparsePCA(max_iter=None).fit(X)
def test_spca_early_stopping(global_random_seed):
"""Check that `tol` and `max_no_improvement` act as early stopping."""
rng = np.random.RandomState(global_random_seed)
n_samples, n_features = 50, 10
X = rng.randn(n_samples, n_features)
# vary the tolerance to force the early stopping of one of the model
model_early_stopped = MiniBatchSparsePCA(
max_iter=100, tol=0.5, random_state=global_random_seed
).fit(X)
model_not_early_stopped = MiniBatchSparsePCA(
max_iter=100, tol=1e-3, random_state=global_random_seed
).fit(X)
assert model_early_stopped.n_iter_ < model_not_early_stopped.n_iter_
# force the max number of no improvement to a large value to check that
# it does help to early stop
model_early_stopped = MiniBatchSparsePCA(
max_iter=100, tol=1e-6, max_no_improvement=2, random_state=global_random_seed
).fit(X)
model_not_early_stopped = MiniBatchSparsePCA(
max_iter=100, tol=1e-6, max_no_improvement=100, random_state=global_random_seed
).fit(X)
assert model_early_stopped.n_iter_ < model_not_early_stopped.n_iter_
def test_equivalence_components_pca_spca(global_random_seed):
"""Check the equivalence of the components found by PCA and SparsePCA.
Non-regression test for:
https://github.com/scikit-learn/scikit-learn/issues/23932
"""
rng = np.random.RandomState(global_random_seed)
X = rng.randn(50, 4)
n_components = 2
pca = PCA(
n_components=n_components,
svd_solver="randomized",
random_state=0,
).fit(X)
spca = SparsePCA(
n_components=n_components,
method="lars",
ridge_alpha=0,
alpha=0,
random_state=0,
).fit(X)
assert_allclose(pca.components_, spca.components_)
def test_sparse_pca_inverse_transform():
"""Check that `inverse_transform` in `SparsePCA` and `PCA` are similar."""
rng = np.random.RandomState(0)
n_samples, n_features = 10, 5
X = rng.randn(n_samples, n_features)
n_components = 2
spca = SparsePCA(
n_components=n_components, alpha=1e-12, ridge_alpha=1e-12, random_state=0
)
pca = PCA(n_components=n_components, random_state=0)
X_trans_spca = spca.fit_transform(X)
X_trans_pca = pca.fit_transform(X)
assert_allclose(
spca.inverse_transform(X_trans_spca), pca.inverse_transform(X_trans_pca)
)
@pytest.mark.parametrize("SPCA", [SparsePCA, MiniBatchSparsePCA])
def test_transform_inverse_transform_round_trip(SPCA):
"""Check the `transform` and `inverse_transform` round trip with no loss of
information.
"""
rng = np.random.RandomState(0)
n_samples, n_features = 10, 5
X = rng.randn(n_samples, n_features)
n_components = n_features
spca = SPCA(
n_components=n_components, alpha=1e-12, ridge_alpha=1e-12, random_state=0
)
X_trans_spca = spca.fit_transform(X)
assert_allclose(spca.inverse_transform(X_trans_spca), X)
@@ -0,0 +1,212 @@
"""Test truncated SVD transformer."""
import numpy as np
import pytest
import scipy.sparse as sp
from sklearn.decomposition import PCA, TruncatedSVD
from sklearn.utils import check_random_state
from sklearn.utils._testing import assert_allclose, assert_array_less
SVD_SOLVERS = ["arpack", "randomized"]
@pytest.fixture(scope="module")
def X_sparse():
# Make an X that looks somewhat like a small tf-idf matrix.
rng = check_random_state(42)
X = sp.random(60, 55, density=0.2, format="csr", random_state=rng)
X.data[:] = 1 + np.log(X.data)
return X
@pytest.mark.parametrize("solver", ["randomized"])
@pytest.mark.parametrize("kind", ("dense", "sparse"))
def test_solvers(X_sparse, solver, kind):
X = X_sparse if kind == "sparse" else X_sparse.toarray()
svd_a = TruncatedSVD(30, algorithm="arpack")
svd = TruncatedSVD(30, algorithm=solver, random_state=42, n_oversamples=100)
Xa = svd_a.fit_transform(X)[:, :6]
Xr = svd.fit_transform(X)[:, :6]
assert_allclose(Xa, Xr, rtol=2e-3)
comp_a = np.abs(svd_a.components_)
comp = np.abs(svd.components_)
# All elements are equal, but some elements are more equal than others.
assert_allclose(comp_a[:9], comp[:9], rtol=1e-3)
assert_allclose(comp_a[9:], comp[9:], atol=1e-2)
@pytest.mark.parametrize("n_components", (10, 25, 41, 55))
def test_attributes(n_components, X_sparse):
n_features = X_sparse.shape[1]
tsvd = TruncatedSVD(n_components).fit(X_sparse)
assert tsvd.n_components == n_components
assert tsvd.components_.shape == (n_components, n_features)
@pytest.mark.parametrize(
"algorithm, n_components",
[
("arpack", 55),
("arpack", 56),
("randomized", 56),
],
)
def test_too_many_components(X_sparse, algorithm, n_components):
tsvd = TruncatedSVD(n_components=n_components, algorithm=algorithm)
with pytest.raises(ValueError):
tsvd.fit(X_sparse)
@pytest.mark.parametrize("fmt", ("array", "csr", "csc", "coo", "lil"))
def test_sparse_formats(fmt, X_sparse):
n_samples = X_sparse.shape[0]
Xfmt = X_sparse.toarray() if fmt == "dense" else getattr(X_sparse, "to" + fmt)()
tsvd = TruncatedSVD(n_components=11)
Xtrans = tsvd.fit_transform(Xfmt)
assert Xtrans.shape == (n_samples, 11)
Xtrans = tsvd.transform(Xfmt)
assert Xtrans.shape == (n_samples, 11)
@pytest.mark.parametrize("algo", SVD_SOLVERS)
def test_inverse_transform(algo, X_sparse):
# We need a lot of components for the reconstruction to be "almost
# equal" in all positions. XXX Test means or sums instead?
tsvd = TruncatedSVD(n_components=52, random_state=42, algorithm=algo)
Xt = tsvd.fit_transform(X_sparse)
Xinv = tsvd.inverse_transform(Xt)
assert_allclose(Xinv, X_sparse.toarray(), rtol=1e-1, atol=2e-1)
def test_integers(X_sparse):
n_samples = X_sparse.shape[0]
Xint = X_sparse.astype(np.int64)
tsvd = TruncatedSVD(n_components=6)
Xtrans = tsvd.fit_transform(Xint)
assert Xtrans.shape == (n_samples, tsvd.n_components)
@pytest.mark.parametrize("kind", ("dense", "sparse"))
@pytest.mark.parametrize("n_components", [10, 20])
@pytest.mark.parametrize("solver", SVD_SOLVERS)
def test_explained_variance(X_sparse, kind, n_components, solver):
X = X_sparse if kind == "sparse" else X_sparse.toarray()
svd = TruncatedSVD(n_components, algorithm=solver)
X_tr = svd.fit_transform(X)
# Assert that all the values are greater than 0
assert_array_less(0.0, svd.explained_variance_ratio_)
# Assert that total explained variance is less than 1
assert_array_less(svd.explained_variance_ratio_.sum(), 1.0)
# Test that explained_variance is correct
total_variance = np.var(X_sparse.toarray(), axis=0).sum()
variances = np.var(X_tr, axis=0)
true_explained_variance_ratio = variances / total_variance
assert_allclose(
svd.explained_variance_ratio_,
true_explained_variance_ratio,
)
@pytest.mark.parametrize("kind", ("dense", "sparse"))
@pytest.mark.parametrize("solver", SVD_SOLVERS)
def test_explained_variance_components_10_20(X_sparse, kind, solver):
X = X_sparse if kind == "sparse" else X_sparse.toarray()
svd_10 = TruncatedSVD(10, algorithm=solver, n_iter=10).fit(X)
svd_20 = TruncatedSVD(20, algorithm=solver, n_iter=10).fit(X)
# Assert the 1st component is equal
assert_allclose(
svd_10.explained_variance_ratio_,
svd_20.explained_variance_ratio_[:10],
rtol=5e-3,
)
# Assert that 20 components has higher explained variance than 10
assert (
svd_20.explained_variance_ratio_.sum() > svd_10.explained_variance_ratio_.sum()
)
@pytest.mark.parametrize("solver", SVD_SOLVERS)
def test_singular_values_consistency(solver):
# Check that the TruncatedSVD output has the correct singular values
rng = np.random.RandomState(0)
n_samples, n_features = 100, 80
X = rng.randn(n_samples, n_features)
pca = TruncatedSVD(n_components=2, algorithm=solver, random_state=rng).fit(X)
# Compare to the Frobenius norm
X_pca = pca.transform(X)
assert_allclose(
np.sum(pca.singular_values_**2.0),
np.linalg.norm(X_pca, "fro") ** 2.0,
rtol=1e-2,
)
# Compare to the 2-norms of the score vectors
assert_allclose(
pca.singular_values_, np.sqrt(np.sum(X_pca**2.0, axis=0)), rtol=1e-2
)
@pytest.mark.parametrize("solver", SVD_SOLVERS)
def test_singular_values_expected(solver):
# Set the singular values and see what we get back
rng = np.random.RandomState(0)
n_samples = 100
n_features = 110
X = rng.randn(n_samples, n_features)
pca = TruncatedSVD(n_components=3, algorithm=solver, random_state=rng)
X_pca = pca.fit_transform(X)
X_pca /= np.sqrt(np.sum(X_pca**2.0, axis=0))
X_pca[:, 0] *= 3.142
X_pca[:, 1] *= 2.718
X_hat_pca = np.dot(X_pca, pca.components_)
pca.fit(X_hat_pca)
assert_allclose(pca.singular_values_, [3.142, 2.718, 1.0], rtol=1e-14)
def test_truncated_svd_eq_pca(X_sparse):
# TruncatedSVD should be equal to PCA on centered data
X_dense = X_sparse.toarray()
X_c = X_dense - X_dense.mean(axis=0)
params = dict(n_components=10, random_state=42)
svd = TruncatedSVD(algorithm="arpack", **params)
pca = PCA(svd_solver="arpack", **params)
Xt_svd = svd.fit_transform(X_c)
Xt_pca = pca.fit_transform(X_c)
assert_allclose(Xt_svd, Xt_pca, rtol=1e-9)
assert_allclose(pca.mean_, 0, atol=1e-9)
assert_allclose(svd.components_, pca.components_)
@pytest.mark.parametrize(
"algorithm, tol", [("randomized", 0.0), ("arpack", 1e-6), ("arpack", 0.0)]
)
@pytest.mark.parametrize("kind", ("dense", "sparse"))
def test_fit_transform(X_sparse, algorithm, tol, kind):
# fit_transform(X) should equal fit(X).transform(X)
X = X_sparse if kind == "sparse" else X_sparse.toarray()
svd = TruncatedSVD(
n_components=5, n_iter=7, random_state=42, algorithm=algorithm, tol=tol
)
X_transformed_1 = svd.fit_transform(X)
X_transformed_2 = svd.fit(X).transform(X)
assert_allclose(X_transformed_1, X_transformed_2)