Dacă ați lucrat vreodată pe un proiect de machine learning ceva mai serios, probabil știți cum e: începi cu entuzism, faci preprocesarea datelor, antrenezi un model, obții niște rezultate... și apoi realizezi că ai un cod spaghetti greu de reprodus și plin de variabile intermediare. Am trecut și eu prin asta. Din fericire, pipeline-urile din Scikit-Learn rezolvă elegant toată această harababură — și sincer, odată ce le descoperi, nu te mai întorci.
În acest ghid, vom parcurge tot ce trebuie să știți despre pipeline-uri în Python: de la bazele clasei Pipeline, la ColumnTransformer, transformatoare personalizate, și până la optimizarea hiperparametrilor cu GridSearchCV. Hai să începem.
Ce sunt Pipeline-urile de Machine Learning și de ce contează
Un pipeline de machine learning e, în esență, o secvență ordonată de pași de procesare, în care ieșirea unui pas devine intrarea următorului. Gândiți-vă la el ca la o linie de asamblare: datele brute intră pe o parte, trec prin diverse transformări (curățare, normalizare, codificare, extragere de caracteristici) și ies pe cealaltă parte ca predicții sau date pregătite pentru antrenament.
Fără pipeline-uri, codul tipic de ML arată cam așa: apeluri separate pentru fiecare transformator, variabile intermediare peste tot, și — poate cea mai insidioasă problemă — riscul de data leakage. Adică aplici transformări pe setul de test folosind statistici calculate pe întregul set de date, fără să-ți dai seama.
Pipeline-urile rezolvă toate aceste probleme.
Avantajele principale ale utilizării pipeline-urilor
- Reproductibilitate — Întregul flux de lucru e encapsulat într-un singur obiect, ușor de salvat, încărcat și reutilizat.
- Prevenirea scurgerii de date — Transformările sunt aplicate corect:
fit_transform()pe antrenament și doartransform()pe test. - Cod mai curat — Dispare nevoia de variabile intermediare și cod duplicat.
- Integrare cu validarea încrucișată — Pipeline-ul funcționează ca un singur estimator, deci merge direct cu
GridSearchCVsaucross_val_score. - Colaborare facilitată — Colegii pot înțelege și reproduce exact fluxul de lucru, fără ghicitori.
Bazele sklearn.pipeline.Pipeline — Crearea Pipeline-urilor Simple
Clasa Pipeline din sklearn.pipeline e piesa centrală. Primește o listă de tupluri — fiecare cu un nume (string) și un obiect transformator sau estimator. Toți pașii, cu excepția ultimului, trebuie să fie transformatoare (adică să implementeze transform()), iar ultimul pas poate fi orice estimator.
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.linear_model import LogisticRegression
# Crearea unui pipeline simplu cu trei pași
pipeline = Pipeline([
('scalare', StandardScaler()), # Pasul 1: standardizarea datelor
('reducere_dim', PCA(n_components=5)), # Pasul 2: reducerea dimensionalității
('clasificator', LogisticRegression()) # Pasul 3: modelul de clasificare
])
# Antrenarea întregului pipeline
pipeline.fit(X_train, y_train)
# Predicția pe date noi — toate transformările sunt aplicate automat
predicții = pipeline.predict(X_test)
# Evaluarea performanței
scor = pipeline.score(X_test, y_test)
print(f"Acuratețea modelului: {scor:.4f}")
Când apelați pipeline.fit(X_train, y_train), se întâmplă următoarele: pentru fiecare pas intermediar se apelează fit_transform(), iar pentru ultimul pas se apelează fit(). Simplu și eficient — fiecare transformare e învățată exclusiv din datele de antrenament.
Scurtătura make_pipeline()
Dacă nu aveți nevoie de nume personalizate pentru pași, make_pipeline() vă scutește de bătaia de cap. Generează automat numele din clasele folosite.
from sklearn.pipeline import make_pipeline
# Echivalent cu Pipeline-ul de mai sus, dar cu nume generate automat
pipeline_simplu = make_pipeline(
StandardScaler(), # Numele generat: 'standardscaler'
PCA(n_components=5), # Numele generat: 'pca'
LogisticRegression() # Numele generat: 'logisticregression'
)
# Verificarea numelor generate
print(pipeline_simplu.named_steps)
Accesarea pașilor individuali
Puteți accesa orice pas prin named_steps sau prin indexare directă — destul de intuitiv:
# Accesare prin nume
scaler = pipeline.named_steps['scalare']
# Accesare prin index
primul_pas = pipeline[0]
ultimul_pas = pipeline[-1]
# Crearea unui sub-pipeline (slicing)
sub_pipeline = pipeline[:2] # Doar primii doi pași
ColumnTransformer — Gestionarea Datelor Eterogene
Acum intrăm în zona unde lucrurile devin cu adevărat interesante. În practică, seturile de date conțin rareori un singur tip de caracteristici. Aveți coloane numerice care necesită scalare, coloane categorice care necesită codificare, poate și coloane de text. ColumnTransformer vă permite să aplicați transformări diferite pe subseturi diferite de coloane, în paralel.
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
# Definirea coloanelor după tip
coloane_numerice = ['vârstă', 'venit', 'experiență_ani']
coloane_categorice = ['departament', 'nivel_educație', 'oraș']
# Pipeline pentru coloane numerice
pipeline_numeric = Pipeline([
('imputare', SimpleImputer(strategy='median')),
('scalare', StandardScaler())
])
# Pipeline pentru coloane categorice
pipeline_categoric = Pipeline([
('imputare', SimpleImputer(strategy='most_frequent')),
('codificare', OneHotEncoder(handle_unknown='ignore', sparse_output=False))
])
# Combinarea cu ColumnTransformer
preprocesor = ColumnTransformer([
('numeric', pipeline_numeric, coloane_numerice),
('categoric', pipeline_categoric, coloane_categorice)
])
# Integrarea într-un pipeline complet
pipeline_complet = Pipeline([
('preprocesare', preprocesor),
('clasificator', LogisticRegression(max_iter=1000))
])
pipeline_complet.fit(X_train, y_train)
Parametrul remainder
Un detaliu important: implicit, coloanele care nu sunt specificate în niciun transformator sunt pur și simplu eliminate. Puteți controla asta cu parametrul remainder:
# Păstrarea coloanelor netransformate
preprocesor = ColumnTransformer(
transformers=[
('numeric', pipeline_numeric, coloane_numerice),
('categoric', pipeline_categoric, coloane_categorice)
],
remainder='passthrough' # Coloanele rămase sunt trecute neschimbate
# remainder='drop' # Implicit: coloanele rămase sunt eliminate
)
Selectarea coloanelor cu make_column_selector
În loc să scrieți manual fiecare nume de coloană (ceea ce devine obositor la seturi mari de date), puteți folosi make_column_selector() pentru selecție automată:
from sklearn.compose import make_column_selector
preprocesor_automat = ColumnTransformer([
('numeric', pipeline_numeric, make_column_selector(dtype_include='number')),
('categoric', pipeline_categoric, make_column_selector(dtype_include='object'))
])
FeatureUnion — Combinarea Metodelor de Extragere a Caracteristicilor
Dacă ColumnTransformer aplică transformări diferite pe coloane diferite, FeatureUnion face ceva complementar: aplică mai multe transformări pe aceleași date și concatenează rezultatele orizontal. E extrem de util când vreți să combinați mai multe strategii de extragere a caracteristicilor.
from sklearn.pipeline import FeatureUnion
from sklearn.decomposition import PCA, TruncatedSVD
from sklearn.feature_selection import SelectKBest, f_classif
# Combinarea mai multor metode de extragere a caracteristicilor
extractor_caracteristici = FeatureUnion([
('pca', PCA(n_components=10)), # Reducere dimensională cu PCA
('svd', TruncatedSVD(n_components=5)), # Reducere cu SVD
('selectie', SelectKBest(f_classif, k=15)) # Selecția celor mai bune k caracteristici
])
# Rezultatul va avea 10 + 5 + 15 = 30 de coloane
pipeline_cu_union = Pipeline([
('scalare', StandardScaler()),
('caracteristici', extractor_caracteristici),
('clasificator', LogisticRegression())
])
FeatureUnion e deosebit de util în procesarea textului. De exemplu, puteți combina un vectorizator TF-IDF cu un extractor de caracteristici personalizat care calculează lungimea textului, numărul de cuvinte și alte metrici simple:
from sklearn.feature_extraction.text import TfidfVectorizer, CountVectorizer
# Combinarea mai multor reprezentări ale textului
extractor_text = FeatureUnion([
('tfidf', TfidfVectorizer(max_features=5000)),
('contor', CountVectorizer(max_features=3000, ngram_range=(2, 3)))
])
Transformatoare Personalizate — Flexibilitate Maximă
Scikit-Learn vine cu o gamă largă de transformatoare predefinite, dar în proiecte reale veți avea nevoie (aproape garantat) de transformări personalizate. Există două abordări: crearea unei clase care moștenește BaseEstimator și TransformerMixin, sau folosirea FunctionTransformer pentru cazuri mai simple.
Clasa de transformator personalizat
from sklearn.base import BaseEstimator, TransformerMixin
import numpy as np
class EliminatorValoriAberante(BaseEstimator, TransformerMixin):
"""Transformator personalizat care elimină valorile aberante folosind metoda IQR."""
def __init__(self, factor=1.5):
# Parametrii trebuie salvați exact cu numele din semnătura __init__
self.factor = factor
def fit(self, X, y=None):
# Calculăm cuartilele pe datele de antrenament
Q1 = np.percentile(X, 25, axis=0)
Q3 = np.percentile(X, 75, axis=0)
self.IQR_ = Q3 - Q1
self.limita_inf_ = Q1 - self.factor * self.IQR_
self.limita_sup_ = Q3 + self.factor * self.IQR_
return self # Întotdeauna returnați self din fit()
def transform(self, X):
# Trunchierea valorilor la limitele calculate
X_trunchiat = np.clip(X, self.limita_inf_, self.limita_sup_)
return X_trunchiat
class ExtractorCaracteristiciTemporale(BaseEstimator, TransformerMixin):
"""Extrage caracteristici din coloane de tip datetime."""
def __init__(self, coloana_data='data'):
self.coloana_data = coloana_data
def fit(self, X, y=None):
return self
def transform(self, X):
X_copie = X.copy()
X_copie['an'] = X_copie[self.coloana_data].dt.year
X_copie['luna'] = X_copie[self.coloana_data].dt.month
X_copie['zi_saptamana'] = X_copie[self.coloana_data].dt.dayofweek
X_copie['este_weekend'] = (X_copie['zi_saptamana'] >= 5).astype(int)
X_copie = X_copie.drop(columns=[self.coloana_data])
return X_copie
Câteva reguli de aur pe care le-am învățat pe propria piele:
- Metoda
fit()trebuie să returneze întotdeaunaself. Uitați asta o dată și veți pierde o oră debuggând. - Parametrii constructorului trebuie salvați ca atribute cu exact același nume — altfel
get_params()șiset_params()nu vor funcționa, iar GridSearchCV va da erori criptice. - Atributele învățate (cele calculate în
fit()) ar trebui să aibă numele terminat cu underscore (ex:limita_inf_), conform convenției Scikit-Learn. - TransformerMixin oferă automat
fit_transform(), deci nu trebuie să o implementați voi.
FunctionTransformer pentru transformări simple
Când transformarea e o funcție simplă, fără stare, FunctionTransformer e de ajuns — și mult mai concis:
from sklearn.preprocessing import FunctionTransformer
import numpy as np
# Transformare logaritmică
transformare_log = FunctionTransformer(
func=np.log1p, # Funcția de transformare directă
inverse_func=np.expm1, # Funcția inversă (opțional)
validate=True # Validarea datelor de intrare
)
# Transformare personalizată cu funcție lambda
adaugare_interactiuni = FunctionTransformer(
func=lambda X: np.column_stack([
X,
X[:, 0] * X[:, 1], # Interacțiune între prima și a doua coloană
X[:, 0] ** 2 # Termenul pătratic al primei coloane
])
)
# Utilizare cu pandas DataFrame
def extrage_lungime_text(df):
"""Extrage lungimea textului din coloana specificată."""
rezultat = df.copy()
rezultat['lungime_text'] = df['descriere'].str.len()
rezultat['numar_cuvinte'] = df['descriere'].str.split().str.len()
return rezultat
transformator_text = FunctionTransformer(func=extrage_lungime_text)
Integrarea cu GridSearchCV pentru Optimizarea Hiperparametrilor
Aici e, probabil, cel mai mare avantaj al pipeline-urilor. Datorită faptului că pipeline-ul se comportă ca un singur estimator, puteți optimiza simultan hiperparametrii oricărui pas — de la preprocesare până la modelul final.
Convenția de denumire a parametrilor
Cheia integrării e convenția cu dublu underscore: nume_pas__nume_parametru. Prin ea puteți accesa orice parametru din orice nivel de adâncime al pipeline-ului.
from sklearn.model_selection import GridSearchCV
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import SVC
# Pipeline cu pași denumiți
pipeline = Pipeline([
('preprocesare', preprocesor),
('clasificator', RandomForestClassifier())
])
# Grila de hiperparametri — observați convenția dublu underscore
grila_parametri = {
# Parametri pentru preprocesare → sub-pipeline numeric → scalare
'preprocesare__numeric__scalare__with_mean': [True, False],
# Parametri pentru clasificator
'clasificator__n_estimators': [100, 200, 500],
'clasificator__max_depth': [5, 10, 20, None],
'clasificator__min_samples_split': [2, 5, 10]
}
# Căutarea pe grilă
cautare = GridSearchCV(
pipeline,
grila_parametri,
cv=5, # Validare încrucișată cu 5 fold-uri
scoring='f1_weighted', # Metrica de evaluare
n_jobs=-1, # Utilizarea tuturor nucleelor procesorului
verbose=1
)
cautare.fit(X_train, y_train)
# Rezultatele optime
print(f"Cei mai buni parametri: {cautare.best_params_}")
print(f"Cel mai bun scor: {cautare.best_score_:.4f}")
# Cel mai bun pipeline este deja antrenat
cel_mai_bun_pipeline = cautare.best_estimator_
Căutarea pe mai mulți algoritmi simultan
O tehnică pe care o folosesc des (și care economisește mult timp): puteți folosi mai multe grile de parametri pentru a compara algoritmi diferiți în aceeași căutare.
from sklearn.ensemble import GradientBoostingClassifier
# Mai multe grile — fiecare pentru un algoritm diferit
grile_multiple = [
{
'clasificator': [RandomForestClassifier()],
'clasificator__n_estimators': [100, 300],
'clasificator__max_depth': [10, 20]
},
{
'clasificator': [SVC()],
'clasificator__C': [0.1, 1, 10],
'clasificator__kernel': ['rbf', 'linear']
},
{
'clasificator': [GradientBoostingClassifier()],
'clasificator__n_estimators': [100, 200],
'clasificator__learning_rate': [0.01, 0.1, 0.2]
}
]
cautare_multipla = GridSearchCV(
pipeline,
grile_multiple,
cv=5,
scoring='accuracy',
n_jobs=-1
)
cautare_multipla.fit(X_train, y_train)
print(f"Cel mai bun model: {cautare_multipla.best_estimator_.named_steps['clasificator']}")
RandomizedSearchCV pentru spații mari de căutare
Când spațiul hiperparametrilor devine prea mare (și credeți-mă, se întâmplă mai des decât v-ați aștepta), RandomizedSearchCV e alternativa inteligentă:
from sklearn.model_selection import RandomizedSearchCV
from scipy.stats import uniform, randint
# Distribuții continue și discrete pentru eșantionare
distributii_parametri = {
'clasificator__n_estimators': randint(50, 500),
'clasificator__max_depth': randint(3, 30),
'clasificator__min_samples_split': randint(2, 20),
'clasificator__min_samples_leaf': randint(1, 10),
'clasificator__max_features': uniform(0.1, 0.9)
}
cautare_aleatorie = RandomizedSearchCV(
pipeline,
distributii_parametri,
n_iter=100, # Numărul de combinații evaluate
cv=5,
scoring='roc_auc',
n_jobs=-1,
random_state=42
)
cautare_aleatorie.fit(X_train, y_train)
Tehnici Avansate: Cache, Memoria Pipeline-ului și Vizualizare
Parametrul memory pentru cache-ul transformărilor
Când aveți pași de transformare care durează mult (iar în proiecte reale, chiar durează), puteți activa cache-ul cu parametrul memory. Rezultatele se salvează pe disc, evitând recalcularea la antrenări repetate — extrem de util în timpul căutării pe grilă.
from sklearn.pipeline import Pipeline
from tempfile import mkdtemp
import joblib
# Crearea unui director temporar pentru cache
director_cache = mkdtemp()
# Pipeline cu cache activat
pipeline_cu_cache = Pipeline([
('scalare', StandardScaler()),
('pca', PCA(n_components=50)), # Acest pas va fi cached
('clasificator', LogisticRegression())
], memory=director_cache) # Calea către directorul de cache
pipeline_cu_cache.fit(X_train, y_train)
# Alternativ, folosiți un obiect Memory din joblib
from joblib import Memory
memorie = Memory(location='./cache_pipeline', verbose=0)
pipeline_cu_memorie = Pipeline([
('scalare', StandardScaler()),
('pca', PCA(n_components=50)),
('clasificator', LogisticRegression())
], memory=memorie)
# Curățarea cache-ului când nu mai este necesar
memorie.clear(warn=False)
Practic, dacă modificați doar hiperparametrii clasificatorului, pașii de preprocesare nu vor fi recalculați. La proiecte cu seturi mari de date, diferența e enormă.
Vizualizarea pipeline-urilor
Scikit-Learn oferă o reprezentare vizuală interactivă a pipeline-urilor direct în Jupyter Notebook, ceea ce ajută enorm la înțelegerea structurii:
from sklearn import set_config
# Activarea reprezentării HTML
set_config(display='diagram')
# Simpla afișare a pipeline-ului va genera o diagramă interactivă
pipeline_complet # Afișarea în Jupyter Notebook
# Alternativ, puteți exporta reprezentarea HTML
from sklearn.utils import estimator_html_repr
html_repr = estimator_html_repr(pipeline_complet)
with open('vizualizare_pipeline.html', 'w') as f:
f.write(html_repr)
Ieșire în format pandas cu set_output
O funcționalitate pe care am ajuns s-o apreciez enorm: puteți configura transformatoarele să returneze DataFrames pandas în loc de array-uri NumPy. Asta face depanarea mult mai ușoară, pentru că vedeți efectiv ce se întâmplă cu datele la fiecare pas.
# Configurarea globală pentru ieșire pandas
from sklearn import set_config
set_config(transform_output='pandas')
# Sau, la nivel de pipeline individual
pipeline_complet.set_output(transform='pandas')
# Acum transformările păstrează numele coloanelor
date_transformate = pipeline_complet[:-1].transform(X_test)
print(date_transformate.head()) # DataFrame cu nume de coloane descriptive
print(date_transformate.columns.tolist())
Accesarea rezultatelor intermediare
Pentru depanare, puteți rula doar primii N pași, fără să treceți prin tot pipeline-ul:
# Rularea doar a primilor N pași
date_dupa_preprocesare = pipeline_complet[:1].transform(X_test)
# Accesarea unui transformator specific pentru inspecție
scaler_antrenat = pipeline_complet.named_steps['preprocesare']
print(f"Media calculată: {scaler_antrenat.named_transformers_['numeric']['scalare'].mean_}")
Exemplu Complet End-to-End cu Setul de Date Titanic
Hai să punem totul cap la cap. Vom folosi clasicul set de date Titanic pentru a construi un pipeline profesional, de la citirea datelor până la evaluarea finală. E genul de exemplu pe care l-aș fi vrut când am început să lucrez cu pipeline-uri.
import pandas as pd
import numpy as np
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer, make_column_selector
from sklearn.preprocessing import StandardScaler, OneHotEncoder, OrdinalEncoder
from sklearn.impute import SimpleImputer
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.model_selection import (
train_test_split, GridSearchCV, cross_val_score, StratifiedKFold
)
from sklearn.metrics import (
classification_report, confusion_matrix, roc_auc_score
)
from sklearn.feature_selection import SelectFromModel
import warnings
warnings.filterwarnings('ignore')
# === Pasul 1: Încărcarea și explorarea datelor ===
url_date = "https://raw.githubusercontent.com/datasciencedojo/datasets/master/titanic.csv"
df = pd.read_csv(url_date)
print("Dimensiunile setului de date:", df.shape)
print("\nPrimele rânduri:")
print(df.head())
print("\nValori lipsă per coloană:")
print(df.isnull().sum())
print("\nTipurile de date:")
print(df.dtypes)
# === Pasul 2: Pregătirea datelor ===
# Selectarea coloanelor relevante
coloane_de_folosit = ['Pclass', 'Sex', 'Age', 'SibSp', 'Parch', 'Fare', 'Embarked']
X = df[coloane_de_folosit]
y = df['Survived']
# Împărțirea în seturi de antrenament și test
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y
)
print(f"\nDimensiuni antrenament: {X_train.shape}")
print(f"Dimensiuni test: {X_test.shape}")
# === Pasul 3: Definirea transformatoarelor personalizate ===
class ExtractorCaracteristiciTitanic(BaseEstimator, TransformerMixin):
"""Creează caracteristici noi specifice setului de date Titanic."""
def __init__(self, adauga_dimensiune_familie=True, adauga_este_singur=True):
self.adauga_dimensiune_familie = adauga_dimensiune_familie
self.adauga_este_singur = adauga_este_singur
def fit(self, X, y=None):
return self
def transform(self, X):
X_copie = X.copy()
if self.adauga_dimensiune_familie:
# Dimensiunea familiei = frați/surori + părinți/copii + persoana însăși
X_copie['dimensiune_familie'] = X_copie['SibSp'] + X_copie['Parch'] + 1
if self.adauga_este_singur:
# Indicator dacă persoana călătorește singură
X_copie['este_singur'] = (
(X_copie['SibSp'] == 0) & (X_copie['Parch'] == 0)
).astype(int)
return X_copie
# === Pasul 4: Construirea pipeline-ului ===
# Coloane după tip
coloane_numerice = ['Age', 'Fare', 'SibSp', 'Parch', 'dimensiune_familie']
coloane_categorice = ['Sex', 'Embarked']
coloane_ordinale = ['Pclass']
# Sub-pipeline pentru coloane numerice
pipeline_numeric = Pipeline([
('imputare_numerica', SimpleImputer(strategy='median')),
('scalare', StandardScaler())
])
# Sub-pipeline pentru coloane categorice nominale
pipeline_categoric = Pipeline([
('imputare_categorica', SimpleImputer(strategy='most_frequent')),
('codificare_ohe', OneHotEncoder(handle_unknown='ignore', sparse_output=False))
])
# Sub-pipeline pentru coloane ordinale
pipeline_ordinal = Pipeline([
('codificare_ordinala', OrdinalEncoder())
])
# ColumnTransformer care combină toate sub-pipeline-urile
preprocesor = ColumnTransformer(
transformers=[
('numeric', pipeline_numeric, coloane_numerice),
('categoric', pipeline_categoric, coloane_categorice),
('ordinal', pipeline_ordinal, coloane_ordinale)
],
remainder='drop' # Eliminarea coloanelor nespecificate
)
# Pipeline-ul complet
pipeline_final = Pipeline([
('inginerie', ExtractorCaracteristiciTitanic(
adauga_dimensiune_familie=True,
adauga_este_singur=True
)),
('preprocesare', preprocesor),
('clasificator', GradientBoostingClassifier(random_state=42))
])
# === Pasul 5: Validare încrucișată inițială ===
cv_stratificat = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
scoruri_cv = cross_val_score(
pipeline_final, X_train, y_train,
cv=cv_stratificat, scoring='accuracy'
)
print(f"\nAcuratețe CV (înainte de optimizare): {scoruri_cv.mean():.4f} ± {scoruri_cv.std():.4f}")
# === Pasul 6: Optimizarea hiperparametrilor ===
grila_parametri = {
# Parametri pentru ingineria caracteristicilor
'inginerie__adauga_dimensiune_familie': [True, False],
'inginerie__adauga_este_singur': [True, False],
# Parametri pentru imputarea numerică
'preprocesare__numeric__imputare_numerica__strategy': ['median', 'mean'],
# Parametri pentru clasificator
'clasificator__n_estimators': [100, 200, 300],
'clasificator__max_depth': [3, 5, 7],
'clasificator__learning_rate': [0.05, 0.1, 0.2],
'clasificator__subsample': [0.8, 1.0]
}
cautare_grila = GridSearchCV(
pipeline_final,
grila_parametri,
cv=cv_stratificat,
scoring='accuracy',
n_jobs=-1,
verbose=1,
return_train_score=True
)
cautare_grila.fit(X_train, y_train)
print(f"\nCei mai buni parametri găsiți:")
for parametru, valoare in cautare_grila.best_params_.items():
print(f" {parametru}: {valoare}")
print(f"\nCel mai bun scor CV: {cautare_grila.best_score_:.4f}")
# === Pasul 7: Evaluarea finală pe setul de test ===
pipeline_optim = cautare_grila.best_estimator_
predictii = pipeline_optim.predict(X_test)
probabilitati = pipeline_optim.predict_proba(X_test)[:, 1]
print("\n=== Raport de Clasificare ===")
print(classification_report(y_test, predictii, target_names=['Nu a supraviețuit', 'A supraviețuit']))
print(f"AUC-ROC: {roc_auc_score(y_test, probabilitati):.4f}")
print("\nMatricea de confuzie:")
print(confusion_matrix(y_test, predictii))
# === Pasul 8: Salvarea pipeline-ului pentru producție ===
import joblib
joblib.dump(pipeline_optim, 'pipeline_titanic_optim.pkl')
print("\nPipeline-ul a fost salvat cu succes!")
# Încărcarea și utilizarea pipeline-ului salvat
pipeline_incarcat = joblib.load('pipeline_titanic_optim.pkl')
predictii_noi = pipeline_incarcat.predict(X_test[:5])
print(f"Predicții pe date noi: {predictii_noi}")
Ce face acest exemplu special e modul în care totul se leagă: transformatorul personalizat ExtractorCaracteristiciTitanic creează caracteristici noi înainte de preprocesare, ColumnTransformer aplică transformări specifice pe fiecare tip de coloană, iar GridSearchCV optimizează simultan atât ingineria caracteristicilor, cât și hiperparametrii modelului. Totul într-un singur obiect.
Bune Practici și Capcane Frecvente
După câteva proiecte cu pipeline-uri, am adunat o listă de lecții învățate. Unele pe calea cea bună, altele... nu chiar. Iată ce contează cu adevărat.
Bune practici
-
Numiți pașii descriptiv — Evitați denumiri generice precum
step1saustep2. Numele bune fac codul auto-documentat și simplifică referința parametrilor în GridSearchCV. -
Păstrați pipeline-ul modular — Împărțiți pipeline-ul complex în sub-pipeline-uri reutilizabile. Creați un pipeline de preprocesare separat pe care îl puteți reutiliza cu diferiți clasificatori.
-
Folosiți
set_output(transform='pandas')în dezvoltare — Serios, asta vă schimbă viața la depanare. Vedeți datele intermediare cu nume de coloane descriptive în loc de array-uri anonime. -
Testați fiecare pas individual — Înainte de a combina totul, verificați că fiecare transformator funcționează corect pe datele voastre. E mult mai ușor să găsiți buguri așa.
-
Salvați întregul pipeline cu
joblib.dump()— Nu doar modelul! Asta asigură reproductibilitatea completă în producție. -
Documentați transformatoarele personalizate — Adăugați docstrings care explică ce face fiecare transformator, ce parametri acceptă și ce returnează. Voi din viitor vă veți mulțumi.
-
Activați cache-ul pentru pași costisitori — Parametrul
memoryface minuni când aveți transformări computațional intensive, mai ales în combinație cu căutarea pe grilă.
Capcane frecvente și cum să le evitați
-
Scurgerea de date (data leakage) — Cea mai gravă eroare. Pipeline-urile previn asta automat, dar doar dacă toate transformările sunt incluse în pipeline, nu doar unele.
# GREȘIT: Scurgere de date — scalarea pe toate datele scaler = StandardScaler() X_scalat = scaler.fit_transform(X) # fit pe TOATE datele! X_train, X_test = train_test_split(X_scalat, ...) # CORECT: Scalarea inclusă în pipeline pipeline = Pipeline([ ('scalare', StandardScaler()), ('model', LogisticRegression()) ]) pipeline.fit(X_train, y_train) # fit doar pe datele de antrenament -
Ordinea incorectă a pașilor — Imputarea valorilor lipsă trebuie întotdeauna înaintea scalării sau codificării. Pare evident, dar e o greșeală surprinzător de frecventă.
-
Uitarea lui
handle_unknown='ignore'— Dacă OneHotEncoder întâlnește în test o categorie nouă care n-a apărut în antrenament, aruncă o eroare. Acest parametru previne asta. -
Nealinierea coloanelor în ColumnTransformer — Dacă specificați coloanele prin indici numerici și structura DataFrame-ului se schimbă, pipeline-ul va eșua. Folosiți nume de coloane sau
make_column_selector(). -
Parametri incorect denumiți în GridSearchCV — O eroare de tastare în numele parametrului nu va genera o excepție — pur și simplu nu va avea niciun efect. Verificați mereu cu:
# Verificarea parametrilor disponibili print(pipeline.get_params().keys()) -
Nerespectarea API-ului în transformatoarele personalizate — Asigurați-vă că
__init__()salvează parametrii cu numele din semnătură, căfit()returneazăself, și mai ales că nu modificați datele de intrare in-place.# GREȘIT: Modificarea datelor de intrare def transform(self, X): X['coloana_noua'] = X['a'] + X['b'] # Modifică originalul! return X # CORECT: Lucrul pe o copie def transform(self, X): X_copie = X.copy() X_copie['coloana_noua'] = X_copie['a'] + X_copie['b'] return X_copie -
Ignorarea dimensiunii spațiului de căutare — O grilă cu prea mulți parametri poate necesita ore de calcul. Calculați întotdeauna numărul total de combinații înainte de a lansa căutarea, și folosiți
RandomizedSearchCVpentru spații mari.
Sfaturi pentru producție
Când treceți de la experimentare la producție, câteva lucruri devin critice:
- Versionarea pipeline-ului — Salvați-l cu un nume care include versiunea și data (ex:
pipeline_v2.3_20260205.pkl). Vă va scuti de multe dureri de cap. - Validarea datelor de intrare — Adăugați un pas de validare la început, care verifică tipurile de date, intervalele valorilor și prezența coloanelor așteptate.
- Monitorizarea performanței — Implementați mecanisme de monitorizare pentru a detecta degradarea în timp (concept drift). Modelele nu rămân bune pentru totdeauna.
- Compatibilitatea versiunilor — Versiunea Scikit-Learn din antrenare trebuie să fie aceeași cu cea din producție. Pipeline-urile serializate cu o versiune pot să nu funcționeze cu alta.
import sklearn
import joblib
# Salvarea pipeline-ului împreună cu metadata
metadata = {
'versiune_sklearn': sklearn.__version__,
'versiune_pipeline': '2.3',
'data_antrenare': '2026-02-05',
'scor_validare': cautare_grila.best_score_,
'coloane_necesare': coloane_de_folosit
}
artefact = {
'pipeline': pipeline_optim,
'metadata': metadata
}
joblib.dump(artefact, 'pipeline_productie_v2.3.pkl')
# Încărcarea și validarea în producție
artefact_incarcat = joblib.load('pipeline_productie_v2.3.pkl')
assert artefact_incarcat['metadata']['versiune_sklearn'] == sklearn.__version__, \
"Versiunea Scikit-Learn nu corespunde!"
pipeline_productie = artefact_incarcat['pipeline']
Funcționalități Recente din Scikit-Learn
Versiunile recente ale Scikit-Learn au adus câteva îmbunătățiri care merită menționate.
Configurarea globală cu set_config
set_config() vă permite să controlați comportamentul transformatoarelor la nivel global, ceea ce e foarte convenabil:
from sklearn import set_config
# Activarea ieșirii pandas la nivel global
set_config(transform_output='pandas')
# Activarea diagramelor HTML pentru vizualizarea pipeline-urilor
set_config(display='diagram')
# Activarea rutării metadatelor (metadata routing)
set_config(enable_metadata_routing=True)
Rutarea metadatelor (Metadata Routing)
Un sistem mai nou care permite transmiterea de informații suplimentare (cum ar fi ponderile eșantioanelor) către pași specifici din pipeline:
from sklearn import set_config
set_config(enable_metadata_routing=True)
# Configurarea unui clasificator care acceptă ponderi
from sklearn.linear_model import LogisticRegression
pipeline_cu_ponderi = Pipeline([
('scalare', StandardScaler()),
('clasificator', LogisticRegression()
.set_fit_request(sample_weight=True)) # Solicitarea ponderilor
])
# Antrenarea cu ponderi ale eșantioanelor
pipeline_cu_ponderi.fit(
X_train, y_train,
clasificator__sample_weight=ponderi_antrenament
)
Vizualizarea îmbunătățită
Diagramele HTML interactive au fost rafinate considerabil, oferind o vizualizare mai clară a structurii, inclusiv parametrii fiecărui pas:
from sklearn import set_config
set_config(display='diagram')
# Afișarea automată a diagramei în Jupyter
pipeline_final
Pentru pipeline-uri complexe cu mai multe niveluri de imbricare, această vizualizare chiar face diferența.
Concluzii
Pipeline-urile din Scikit-Learn sunt, sincer, unul dintre cele mai utile instrumente pe care le aveți la dispoziție ca practician de data science. Transformă codul haotic într-un flux de lucru curat, reproductibil și gata de producție.
Am acoperit mult teren în acest ghid: de la pipeline-uri simple, la ColumnTransformer, FeatureUnion, transformatoare personalizate, cache cu memory, ieșire pandas cu set_output și optimizare cu GridSearchCV. E mult, dar fiecare piesă are rolul ei.
Ce merită reținut mai ales:
- Includeți TOATE transformările în pipeline — E regula numărul unu pentru prevenirea scurgerii de date.
- Folosiți
ColumnTransformerpentru date eterogene cu coloane de tipuri diferite. - Creați transformatoare personalizate pentru logica de business specifică, respectând API-ul Scikit-Learn.
- Optimizați hiperparametrii la nivel de pipeline, nu la nivel de componente individuale.
- Salvați pipeline-ul complet pentru reproducerea rezultatelor în producție.
- Testați, documentați și versionați pipeline-urile ca pe orice cod de producție serios.
Sfatul meu? Începeți cu pipeline-uri simple și adăugați complexitate treptat. Nu trebuie să implementați tot din prima. Odată ce vă obișnuiți cu acest mod de lucru, veți observa că și codul devine mai clar, și rezultatele mai fiabile. Iar asta, la sfârșitul zilei, e ceea ce contează.