Potoki ML w scikit-learn — praktyczny przewodnik od podstaw do produkcji

Dowiedz się, jak budować profesjonalne potoki uczenia maszynowego w scikit-learn. Pipeline, ColumnTransformer, FeatureUnion, optymalizacja hiperparametrów, wdrażanie z Flask i integracja z MLflow — wszystko z praktycznymi przykładami kodu.

Wprowadzenie — dlaczego potoki ML naprawdę mają znaczenie?

Jeśli kiedykolwiek pracowałeś z uczeniem maszynowym w Pythonie, pewnie wiesz, jak łatwo jest utknąć w chaosie ręcznych kroków przetwarzania danych. Skalowanie tutaj, imputacja tam, kodowanie kategorii gdzieś indziej... i nagle okazuje się, że Twój notebook ma 200 komórek, a odtworzenie wyników jest praktycznie niemożliwe.

No właśnie — jednym z największych wyzwań w data science nie jest samo zbudowanie modelu. To stworzenie powtarzalnego procesu, który przetwarza surowe dane od początku do końca bez błędów i wycieków danych (data leakage).

Rozwiązaniem są potoki uczenia maszynowego (ang. machine learning pipelines). Scikit-learn oferuje naprawdę eleganckie narzędzia do ich budowania: Pipeline, ColumnTransformer i FeatureUnion. W tym przewodniku pokażę Ci krok po kroku, jak budować profesjonalne potoki ML — od prostych łańcuchów transformacji po zaawansowane kompozycje gotowe do wdrożenia produkcyjnego.

Artykuł jest przeznaczony zarówno dla osób zaczynających przygodę z ML w Pythonie, jak i dla doświadczonych praktyków szukających sprawdzonych wzorców. Omówimy najlepsze praktyki na rok 2025/2026, włącznie z integracją z narzędziami MLOps.

Czym jest Pipeline w scikit-learn?

Klasa sklearn.pipeline.Pipeline pozwala na sekwencyjne łączenie wielu kroków przetwarzania danych w jeden obiekt. Każdy krok (oprócz ostatniego) musi implementować metody fit() i transform() — czyli być transformatorem. Ostatni krok może być dowolnym estymatorem, np. klasyfikatorem lub regresorem.

Główne zalety korzystania z Pipeline:

  • Eliminacja wycieku danych — transformacje są dopasowywane wyłącznie na danych treningowych, a potem automatycznie stosowane na danych testowych. To jest chyba najważniejsza zaleta.
  • Czystszy kod — cały przepływ pracy jest zdefiniowany w jednym obiekcie, co znacznie ułatwia czytanie i utrzymywanie kodu.
  • Łatwe przeszukiwanie hiperparametrów — potok można bezpośrednio przekazać do GridSearchCV lub RandomizedSearchCV.
  • Prostsze wdrażanie — jeden obiekt do serializacji i deserializacji (np. za pomocą joblib).
  • Powtarzalność — ten sam potok gwarantuje identyczne przetwarzanie na każdym etapie.

Prosty przykład Pipeline

Zacznijmy od najprostszego możliwego potoku — skalowanie cech, a następnie klasyfikacja regresją logistyczną:

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split

# Ładowanie danych
X, y = load_iris(return_X_y=True)
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42
)

# Definicja potoku
pipe = Pipeline([
    ('scaler', StandardScaler()),
    ('classifier', LogisticRegression(max_iter=200))
])

# Trenowanie i ewaluacja
pipe.fit(X_train, y_train)
score = pipe.score(X_test, y_test)
print(f"Dokładność: {score:.4f}")

W powyższym przykładzie StandardScaler jest dopasowywany i transformuje dane treningowe, a następnie te same parametry skalowania (średnia i odchylenie standardowe) są automatycznie stosowane do danych testowych. Żadnego ręcznego pilnowania — potok robi to za nas.

Skrót make_pipeline

Jeśli nie potrzebujesz nadawać nazw poszczególnym krokom, jest jeszcze krótsza opcja — make_pipeline automatycznie generuje nazwy na podstawie klas:

from sklearn.pipeline import make_pipeline

pipe = make_pipeline(
    StandardScaler(),
    LogisticRegression(max_iter=200)
)
# Nazwy kroków: 'standardscaler', 'logisticregression'
print(pipe.named_steps)

ColumnTransformer — klucz do danych heterogenicznych

W rzeczywistych projektach rzadko mamy do czynienia z jednorodnymi danymi. Typowy zbiór danych zawiera kolumny numeryczne, kategoryczne, tekstowe, a czasem daty czy dane binarne. I tutaj wchodzi ColumnTransformer z modułu sklearn.compose, który pozwala na zastosowanie różnych transformacji do różnych podzbiorów kolumn.

Szczerze mówiąc, to jest jeden z tych elementów scikit-learn, który zmienił moje podejście do pracy z danymi. Zanim go poznałem, pisałem naprawdę nieelegancki kod z mnóstwem ręcznych selekcji kolumn.

Podstawowe użycie ColumnTransformer

import pandas as pd
import numpy as np
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.ensemble import RandomForestClassifier

# Przykładowe dane
data = pd.DataFrame({
    'wiek': [25, 30, np.nan, 45, 35],
    'wynagrodzenie': [3000, 5000, 4000, np.nan, 6000],
    'miasto': ['Warszawa', 'Kraków', 'Warszawa', 'Gdańsk', 'Kraków'],
    'wyksztalcenie': ['mgr', 'inż', 'mgr', 'dr', 'inż'],
    'kupil': [1, 0, 1, 1, 0]
})

X = data.drop('kupil', axis=1)
y = data['kupil']

# Definicja kolumn
kolumny_numeryczne = ['wiek', 'wynagrodzenie']
kolumny_kategoryczne = ['miasto', 'wyksztalcenie']

# Transformatory dla poszczególnych typów kolumn
transformator_numeryczny = Pipeline([
    ('imputer', SimpleImputer(strategy='median')),
    ('scaler', StandardScaler())
])

transformator_kategoryczny = Pipeline([
    ('imputer', SimpleImputer(strategy='most_frequent')),
    ('encoder', OneHotEncoder(handle_unknown='ignore', sparse_output=False))
])

# Złożenie ColumnTransformer
preprocesor = ColumnTransformer(
    transformers=[
        ('num', transformator_numeryczny, kolumny_numeryczne),
        ('cat', transformator_kategoryczny, kolumny_kategoryczne)
    ],
    remainder='drop'  # domyślne — pomiń niespecyfikowane kolumny
)

# Kompletny potok z modelem
potok = Pipeline([
    ('preprocesor', preprocesor),
    ('klasyfikator', RandomForestClassifier(n_estimators=100, random_state=42))
])

potok.fit(X, y)
print("Potok gotowy do predykcji!")

Zwróć uwagę na kilka ważnych rzeczy w tym kodzie:

  • Każdy typ kolumn ma własny mini-potok z odpowiednimi krokami transformacji — to naprawdę czyste rozwiązanie.
  • Parametr remainder='drop' oznacza, że kolumny nieuwzględnione w żadnym transformatorze zostaną pominięte. Można ustawić remainder='passthrough', aby je przepuścić bez zmian.
  • Parametr handle_unknown='ignore' w OneHotEncoder zapewnia, że nowe kategorie w danych testowych nie spowodują błędu (a uwierz mi, bez tego w produkcji będą problemy).

Dynamiczny wybór kolumn za pomocą selektorów

Zamiast ręcznie podawać nazwy kolumn, możesz użyć selektorów, które automatycznie identyfikują kolumny na podstawie ich typu:

from sklearn.compose import make_column_selector

preprocesor = ColumnTransformer(
    transformers=[
        ('num', transformator_numeryczny,
         make_column_selector(dtype_include=np.number)),
        ('cat', transformator_kategoryczny,
         make_column_selector(dtype_include=object))
    ]
)

To podejście jest szczególnie przydatne przy dynamicznie zmieniających się zbiorach danych lub gdy chcesz napisać bardziej uniwersalny kod.

Zaawansowane techniki budowania potoków

Zagnieżdżone potoki

Potoki w scikit-learn można dowolnie zagnieżdżać. Jest to przydatne, gdy chcesz grupować powiązane kroki przetwarzania:

from sklearn.decomposition import PCA
from sklearn.feature_selection import SelectKBest, f_classif

potok_zaawansowany = Pipeline([
    ('preprocesor', preprocesor),  # ColumnTransformer zdefiniowany wcześniej
    ('selekcja_cech', SelectKBest(f_classif, k=5)),
    ('redukcja_wymiarow', PCA(n_components=3)),
    ('klasyfikator', RandomForestClassifier(n_estimators=200, random_state=42))
])

FeatureUnion — łączenie cech równolegle

Klasa FeatureUnion pozwala na równoległe zastosowanie wielu transformatorów i połączenie wynikowych cech w jedną macierz. Przydaje się to, gdy chcesz wyciągać różne rodzaje informacji z tych samych danych jednocześnie:

from sklearn.pipeline import FeatureUnion
from sklearn.decomposition import PCA, TruncatedSVD

polaczone_cechy = FeatureUnion([
    ('pca', PCA(n_components=3)),
    ('svd', TruncatedSVD(n_components=2))
])

potok_z_union = Pipeline([
    ('preprocesor', preprocesor),
    ('polaczone_cechy', polaczone_cechy),
    ('klasyfikator', LogisticRegression(max_iter=500))
])

Tworzenie własnych transformatorów

Często standardowe transformatory nie wystarczają i potrzebujemy własnej logiki. Na szczęście scikit-learn udostępnia bazowe klasy BaseEstimator i TransformerMixin, które bardzo ułatwiają tworzenie kompatybilnych transformatorów:

from sklearn.base import BaseEstimator, TransformerMixin

class UsunOutliery(BaseEstimator, TransformerMixin):
    """Transformator usuwający wartości odstające metodą IQR."""

    def __init__(self, factor=1.5):
        self.factor = factor

    def fit(self, X, y=None):
        Q1 = np.percentile(X, 25, axis=0)
        Q3 = np.percentile(X, 75, axis=0)
        self.iqr_ = Q3 - Q1
        self.lower_ = Q1 - self.factor * self.iqr_
        self.upper_ = Q3 + self.factor * self.iqr_
        return self

    def transform(self, X):
        X_clipped = np.clip(X, self.lower_, self.upper_)
        return X_clipped


class DodajCechyInterakcji(BaseEstimator, TransformerMixin):
    """Transformator dodający iloczyny par cech."""

    def __init__(self, include_original=True):
        self.include_original = include_original

    def fit(self, X, y=None):
        self.n_features_in_ = X.shape[1]
        return self

    def transform(self, X):
        X = np.array(X)
        interakcje = []
        for i in range(X.shape[1]):
            for j in range(i + 1, X.shape[1]):
                interakcje.append(X[:, i] * X[:, j])

        X_inter = np.column_stack(interakcje)
        if self.include_original:
            return np.hstack([X, X_inter])
        return X_inter


# Użycie w potoku
potok_wlasny = Pipeline([
    ('preprocesor', preprocesor),
    ('usun_outliery', UsunOutliery(factor=2.0)),
    ('interakcje', DodajCechyInterakcji()),
    ('klasyfikator', RandomForestClassifier(random_state=42))
])

Kilka kluczowych zasad przy tworzeniu własnych transformatorów (warto je zapamiętać):

  • Dziedzicz po BaseEstimator (automatyczne get_params/set_params) i TransformerMixin (automatyczne fit_transform).
  • Metoda fit() musi zwracać self.
  • Wszystkie parametry konstruktora powinny mieć wartości domyślne i być przechowywane jako atrybuty o tych samych nazwach.
  • Atrybuty wyuczone podczas dopasowania powinny mieć nazwy kończące się na _ — to konwencja scikit-learn, której warto się trzymać.

FunctionTransformer — szybka alternatywa

Jeśli potrzebujesz zastosować prostą funkcję bez stanu, FunctionTransformer jest Twoim przyjacielem:

from sklearn.preprocessing import FunctionTransformer

# Logarytmowanie cech
log_transformer = FunctionTransformer(
    func=np.log1p,
    inverse_func=np.expm1
)

potok_log = Pipeline([
    ('log', log_transformer),
    ('scaler', StandardScaler()),
    ('model', LogisticRegression())
])

Optymalizacja hiperparametrów w potokach

Jedną z największych zalet potoków jest możliwość jednoczesnej optymalizacji hiperparametrów na wszystkich etapach. Scikit-learn wykorzystuje konwencję podwójnego podkreślnika (__) do nawigowania po strukturze potoku — i kiedy to „kliknie", cały system zaczyna mieć sens.

GridSearchCV z Pipeline

from sklearn.model_selection import GridSearchCV

# Definiujemy przestrzeń przeszukiwania
param_grid = {
    # Parametry preprocesora
    'preprocesor__num__imputer__strategy': ['mean', 'median'],
    # Parametry klasyfikatora
    'klasyfikator__n_estimators': [50, 100, 200],
    'klasyfikator__max_depth': [5, 10, None],
    'klasyfikator__min_samples_split': [2, 5, 10]
}

grid_search = GridSearchCV(
    potok,
    param_grid,
    cv=5,
    scoring='accuracy',
    n_jobs=-1,
    verbose=1
)

grid_search.fit(X, y)

print(f"Najlepsze parametry: {grid_search.best_params_}")
print(f"Najlepszy wynik CV: {grid_search.best_score_:.4f}")

RandomizedSearchCV dla dużych przestrzeni

Gdy przestrzeń hiperparametrów jest zbyt duża dla pełnego przeszukiwania (a bywa tak zaskakująco często), RandomizedSearchCV losuje kombinacje parametrów:

from sklearn.model_selection import RandomizedSearchCV
from scipy.stats import randint, uniform

param_distributions = {
    'klasyfikator__n_estimators': randint(50, 500),
    'klasyfikator__max_depth': randint(3, 20),
    'klasyfikator__min_samples_leaf': randint(1, 10),
    'klasyfikator__max_features': uniform(0.1, 0.9)
}

random_search = RandomizedSearchCV(
    potok,
    param_distributions,
    n_iter=100,  # liczba losowych kombinacji
    cv=5,
    scoring='accuracy',
    n_jobs=-1,
    random_state=42
)

random_search.fit(X, y)
print(f"Najlepsze parametry: {random_search.best_params_}")

Porównywanie wielu modeli w potoku

Możemy też dynamicznie zamieniać ostatni krok potoku, żeby porównać różne algorytmy. To podejście oszczędza sporo czasu:

from sklearn.svm import SVC
from sklearn.neighbors import KNeighborsClassifier
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import cross_val_score

modele = {
    'Random Forest': RandomForestClassifier(random_state=42),
    'SVM': SVC(kernel='rbf', random_state=42),
    'KNN': KNeighborsClassifier(),
    'Gradient Boosting': GradientBoostingClassifier(random_state=42)
}

for nazwa, model in modele.items():
    potok_test = Pipeline([
        ('preprocesor', preprocesor),
        ('klasyfikator', model)
    ])
    scores = cross_val_score(potok_test, X, y, cv=5, scoring='accuracy')
    print(f"{nazwa}: {scores.mean():.4f} (+/- {scores.std():.4f})")

Kompletny przykład — prognozowanie rezygnacji klientów

Dobra, czas na pełny, realistyczny przykład. Zbudujemy potok do prognozowania rezygnacji klientów (churn prediction) z heterogenicznym zbiorem danych. To jeden z tych scenariuszy, które spotyka się w praktyce naprawdę często:

import pandas as pd
import numpy as np
from sklearn.compose import ColumnTransformer, make_column_selector
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import (
    StandardScaler, OneHotEncoder, OrdinalEncoder
)
from sklearn.impute import SimpleImputer
from sklearn.feature_selection import SelectPercentile, mutual_info_classif
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import (
    train_test_split, cross_val_score, StratifiedKFold
)
from sklearn.metrics import (
    classification_report, roc_auc_score, confusion_matrix
)

# --- 1. Przygotowanie danych ---
np.random.seed(42)
n = 1000

dane = pd.DataFrame({
    'wiek': np.random.randint(18, 70, n),
    'staż_miesiace': np.random.randint(1, 60, n),
    'miesieczna_oplata': np.random.uniform(30, 120, n).round(2),
    'liczba_reklamacji': np.random.poisson(1.5, n),
    'typ_umowy': np.random.choice(
        ['miesięczna', 'roczna', 'dwuletnia'], n,
        p=[0.5, 0.3, 0.2]
    ),
    'metoda_platnosci': np.random.choice(
        ['karta', 'przelew', 'gotówka'], n
    ),
    'region': np.random.choice(
        ['północ', 'południe', 'wschód', 'zachód'], n
    ),
    'czy_senior': np.random.choice([0, 1], n, p=[0.85, 0.15]),
})

# Generowanie zmiennej docelowej (rezygnacja)
prob_churn = (
    0.3
    + 0.02 * (dane['liczba_reklamacji'] > 2)
    + 0.15 * (dane['typ_umowy'] == 'miesięczna')
    - 0.1 * (dane['staż_miesiace'] > 24)
    + 0.05 * (dane['miesieczna_oplata'] > 80)
)
dane['rezygnacja'] = (np.random.random(n) < prob_churn).astype(int)

# Dodanie brakujących wartości (symulacja rzeczywistych danych)
maska = np.random.random(n) < 0.05
dane.loc[maska, 'wiek'] = np.nan
maska2 = np.random.random(n) < 0.03
dane.loc[maska2, 'miesieczna_oplata'] = np.nan

X = dane.drop('rezygnacja', axis=1)
y = dane['rezygnacja']

X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, stratify=y, random_state=42
)

# --- 2. Definicja kolumn i transformatorów ---
kolumny_num = ['wiek', 'staż_miesiace', 'miesieczna_oplata',
               'liczba_reklamacji']
kolumny_cat = ['typ_umowy', 'metoda_platnosci', 'region']
kolumny_bin = ['czy_senior']

transformator_num = Pipeline([
    ('imputer', SimpleImputer(strategy='median')),
    ('scaler', StandardScaler())
])

transformator_cat = Pipeline([
    ('imputer', SimpleImputer(strategy='constant',
                               fill_value='nieznany')),
    ('encoder', OneHotEncoder(handle_unknown='ignore',
                               sparse_output=False))
])

transformator_bin = Pipeline([
    ('imputer', SimpleImputer(strategy='most_frequent'))
])

preprocesor = ColumnTransformer([
    ('num', transformator_num, kolumny_num),
    ('cat', transformator_cat, kolumny_cat),
    ('bin', transformator_bin, kolumny_bin)
])

# --- 3. Kompletny potok ---
potok_churn = Pipeline([
    ('preprocesor', preprocesor),
    ('selekcja', SelectPercentile(
        mutual_info_classif, percentile=80
    )),
    ('model', GradientBoostingClassifier(
        n_estimators=200,
        max_depth=5,
        learning_rate=0.1,
        random_state=42
    ))
])

# --- 4. Walidacja krzyżowa ---
cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
cv_scores = cross_val_score(
    potok_churn, X_train, y_train,
    cv=cv, scoring='roc_auc'
)
print(f"ROC AUC (CV): {cv_scores.mean():.4f} (+/- {cv_scores.std():.4f})")

# --- 5. Trenowanie i ewaluacja ---
potok_churn.fit(X_train, y_train)
y_pred = potok_churn.predict(X_test)
y_proba = potok_churn.predict_proba(X_test)[:, 1]

print("\nRaport klasyfikacji:")
print(classification_report(y_test, y_pred,
                             target_names=['Pozostaje', 'Rezygnuje']))
print(f"ROC AUC (test): {roc_auc_score(y_test, y_proba):.4f}")

Serializacja i wdrażanie potoków

Gotowy potok można łatwo zapisać i wczytać za pomocą joblib. To kluczowy krok do wdrożenia modelu w produkcji — bez tego cały nasz piękny potok pozostanie jedynie w notebooku.

Zapisywanie i wczytywanie potoku

import joblib

# Zapisanie wytrenowanego potoku
joblib.dump(potok_churn, 'potok_churn_v1.joblib')

# Wczytanie w innym środowisku
potok_wczytany = joblib.load('potok_churn_v1.joblib')

# Predykcja na nowych danych
nowy_klient = pd.DataFrame({
    'wiek': [35],
    'staż_miesiace': [12],
    'miesieczna_oplata': [89.99],
    'liczba_reklamacji': [3],
    'typ_umowy': ['miesięczna'],
    'metoda_platnosci': ['karta'],
    'region': ['północ'],
    'czy_senior': [0]
})

prawdopodobienstwo = potok_wczytany.predict_proba(nowy_klient)[:, 1]
print(f"Prawdopodobieństwo rezygnacji: {prawdopodobienstwo[0]:.2%}")

Integracja z API za pomocą Flask

Jednym z najpopularniejszych sposobów udostępniania modelu ML jest REST API. Poniżej minimalny przykład z Flaskiem — jest zaskakująco prosty:

from flask import Flask, request, jsonify
import joblib
import pandas as pd

app = Flask(__name__)
model = joblib.load('potok_churn_v1.joblib')

@app.route('/predict', methods=['POST'])
def predict():
    dane = request.get_json()
    df = pd.DataFrame([dane])
    proba = model.predict_proba(df)[:, 1][0]
    predykcja = model.predict(df)[0]

    return jsonify({
        'predykcja': int(predykcja),
        'prawdopodobienstwo_rezygnacji': round(float(proba), 4)
    })

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000)

Zapytanie do API:

curl -X POST http://localhost:5000/predict \
  -H "Content-Type: application/json" \
  -d '{
    "wiek": 42,
    "staż_miesiace": 6,
    "miesieczna_oplata": 95.50,
    "liczba_reklamacji": 4,
    "typ_umowy": "miesięczna",
    "metoda_platnosci": "przelew",
    "region": "południe",
    "czy_senior": 0
  }'

Potoki w ekosystemie MLOps

Wdrożenie modelu to tak naprawdę dopiero początek. Potrzebujemy narzędzi do śledzenia eksperymentów, wersjonowania modeli i monitorowania ich wydajności w produkcji. Zobaczmy, jak potoki scikit-learn współgrają z popularnymi narzędziami MLOps.

Śledzenie eksperymentów z MLflow

MLflow to jedno z najpopularniejszych narzędzi MLOps. Pozwala rejestrować parametry, metryki i artefakty eksperymentów — a integracja z potokami scikit-learn jest bezbolesna:

import mlflow
import mlflow.sklearn
from sklearn.model_selection import cross_val_score

mlflow.set_experiment("churn-prediction")

with mlflow.start_run(run_name="gradient_boosting_v1"):
    # Trenowanie potoku
    potok_churn.fit(X_train, y_train)

    # Obliczanie metryk
    y_pred = potok_churn.predict(X_test)
    y_proba = potok_churn.predict_proba(X_test)[:, 1]
    auc = roc_auc_score(y_test, y_proba)

    cv_scores = cross_val_score(
        potok_churn, X_train, y_train,
        cv=5, scoring='roc_auc'
    )

    # Logowanie parametrów
    mlflow.log_param("n_estimators", 200)
    mlflow.log_param("max_depth", 5)
    mlflow.log_param("learning_rate", 0.1)
    mlflow.log_param("selekcja_percentile", 80)

    # Logowanie metryk
    mlflow.log_metric("test_roc_auc", auc)
    mlflow.log_metric("cv_roc_auc_mean", cv_scores.mean())
    mlflow.log_metric("cv_roc_auc_std", cv_scores.std())

    # Zapisanie modelu jako artefaktu
    mlflow.sklearn.log_model(potok_churn, "model")

    print(f"Run ID: {mlflow.active_run().info.run_id}")
    print(f"Test ROC AUC: {auc:.4f}")

Wersjonowanie danych z DVC

DVC (Data Version Control) pozwala na wersjonowanie danych i modeli w repozytorium Git. Oto typowy przepływ pracy:

# Inicjalizacja DVC w repozytorium
# (wykonywane w terminalu)
# dvc init
# dvc add data/dane_klientow.csv
# git add data/dane_klientow.csv.dvc .gitignore
# git commit -m "Dodanie danych treningowych v1"

# W Pythonie — definiowanie potoku DVC programowo
# plik: train.py
import sys
import json
import joblib
import pandas as pd
from sklearn.metrics import roc_auc_score

def train_model(data_path, model_path, metrics_path):
    dane = pd.read_csv(data_path)
    X = dane.drop('rezygnacja', axis=1)
    y = dane['rezygnacja']

    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.2, stratify=y, random_state=42
    )

    potok_churn.fit(X_train, y_train)
    joblib.dump(potok_churn, model_path)

    y_proba = potok_churn.predict_proba(X_test)[:, 1]
    metryki = {"roc_auc": roc_auc_score(y_test, y_proba)}

    with open(metrics_path, 'w') as f:
        json.dump(metryki, f)

if __name__ == '__main__':
    train_model(sys.argv[1], sys.argv[2], sys.argv[3])

Najczęstsze błędy i pułapki

Pracując z potokami ML, łatwo wpaść w pewne typowe pułapki. Widziałem te błędy wielokrotnie — zarówno u juniorów, jak i u bardziej doświadczonych programistów. Oto najważniejsze z nich.

1. Wyciek danych (Data Leakage)

To zdecydowanie najczęstszy i najbardziej podstępny błąd. Polega na dopasowywaniu transformatorów na całym zbiorze danych przed podziałem:

# ŹLE — wyciek danych!
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)  # fit na CAŁYM X
X_train, X_test, y_train, y_test = train_test_split(X_scaled, y)

# DOBRZE — użycie Pipeline
pipe = Pipeline([
    ('scaler', StandardScaler()),
    ('model', LogisticRegression())
])
X_train, X_test, y_train, y_test = train_test_split(X, y)
pipe.fit(X_train, y_train)  # scaler.fit() tylko na X_train

2. Nieprawidłowa obsługa brakujących wartości

Imputacja musi być częścią potoku, nie może być wykonywana przed podziałem danych. To kolejna forma wycieku:

# ŹLE — imputacja przed podziałem
X.fillna(X.mean(), inplace=True)

# DOBRZE — imputacja w potoku
Pipeline([
    ('imputer', SimpleImputer(strategy='mean')),
    ('scaler', StandardScaler()),
    ('model', LogisticRegression())
])

3. Zapominanie o obsłudze nieznanych kategorii

Ten błąd ujawnia się dopiero w produkcji, co czyni go szczególnie irytującym:

# ŹLE — spowoduje błąd przy nowych kategoriach
OneHotEncoder()

# DOBRZE — obsługa nieznanych kategorii
OneHotEncoder(handle_unknown='ignore')

4. Ignorowanie kolejności kroków

Kolejność transformacji ma znaczenie. Na przykład selekcja cech powinna następować po zakodowaniu kategorii, nie przed:

# Prawidłowa kolejność
Pipeline([
    ('preprocesor', preprocesor),     # 1. Kodowanie + skalowanie
    ('selekcja', SelectKBest(k=10)),  # 2. Selekcja najlepszych cech
    ('model', RandomForestClassifier()) # 3. Model
])

5. Niespójność wersji bibliotek

Serializowany potok może nie działać, jeśli wersja scikit-learn w środowisku produkcyjnym różni się od tej użytej podczas trenowania. To potrafi być frustrujące, więc zawsze zapisuj wymagania:

# Zapisz wymagania
# pip freeze > requirements.txt

# Sprawdź wersję scikit-learn
import sklearn
print(f"scikit-learn: {sklearn.__version__}")

# Użyj sklearnex dla wydajności (Intel)
# pip install scikit-learn-intelex
# from sklearnex import patch_sklearn
# patch_sklearn()

Testowanie potoków ML

Profesjonalny kod ML powinien być testowany — i nie, „działa w notebooku" to nie jest test. Oto przykład testów jednostkowych z pytest:

import pytest
import numpy as np
import pandas as pd
from sklearn.pipeline import Pipeline

class TestPotokChurn:
    """Testy dla potoku prognozowania rezygnacji."""

    @pytest.fixture
    def przykladowe_dane(self):
        """Generuje przykładowe dane testowe."""
        np.random.seed(0)
        n = 100
        X = pd.DataFrame({
            'wiek': np.random.randint(18, 70, n),
            'staż_miesiace': np.random.randint(1, 60, n),
            'miesieczna_oplata': np.random.uniform(30, 120, n),
            'liczba_reklamacji': np.random.poisson(1.5, n),
            'typ_umowy': np.random.choice(
                ['miesięczna', 'roczna', 'dwuletnia'], n
            ),
            'metoda_platnosci': np.random.choice(
                ['karta', 'przelew', 'gotówka'], n
            ),
            'region': np.random.choice(
                ['północ', 'południe', 'wschód', 'zachód'], n
            ),
            'czy_senior': np.random.choice([0, 1], n),
        })
        y = np.random.choice([0, 1], n)
        return X, y

    def test_potok_fit_predict(self, przykladowe_dane):
        """Potok powinien poprawnie trenować i predykować."""
        X, y = przykladowe_dane
        potok_churn.fit(X, y)
        predykcje = potok_churn.predict(X)
        assert len(predykcje) == len(y)
        assert set(predykcje).issubset({0, 1})

    def test_potok_prawdopodobienstwa(self, przykladowe_dane):
        """Prawdopodobieństwa powinny być w zakresie [0, 1]."""
        X, y = przykladowe_dane
        potok_churn.fit(X, y)
        proba = potok_churn.predict_proba(X)
        assert proba.min() >= 0.0
        assert proba.max() <= 1.0
        assert proba.shape[1] == 2

    def test_potok_nowe_kategorie(self, przykladowe_dane):
        """Potok powinien obsłużyć nieznane kategorie."""
        X, y = przykladowe_dane
        potok_churn.fit(X, y)

        X_nowy = X.iloc[:1].copy()
        X_nowy['region'] = 'nieznany_region'
        predykcja = potok_churn.predict(X_nowy)
        assert len(predykcja) == 1

    def test_potok_brakujace_wartosci(self, przykladowe_dane):
        """Potok powinien obsłużyć brakujące wartości."""
        X, y = przykladowe_dane
        X.iloc[0, 0] = np.nan  # brak wieku
        potok_churn.fit(X, y)
        predykcja = potok_churn.predict(X.iloc[:5])
        assert len(predykcja) == 5

Wizualizacja potoku

Scikit-learn oferuje wbudowane narzędzia do wizualizacji struktury potoku, co jest niezwykle pomocne przy debugowaniu i dokumentacji:

from sklearn import set_config

# Włączenie wizualizacji HTML
set_config(display='diagram')

# Wyświetlenie w Jupyter Notebook — wystarczy wpisać nazwę potoku
potok_churn

Można również wygenerować HTML z diagramem potoku i zapisać go do pliku:

from sklearn.utils import estimator_html_repr

# Generowanie HTML do zapisania
html = estimator_html_repr(potok_churn)
with open('potok_diagram.html', 'w') as f:
    f.write(html)
print("Diagram potoku zapisany do pliku HTML.")

Wydajność i optymalizacja

Przy dużych zbiorach danych wydajność potoków staje się krytyczna. Oto kilka sprawdzonych strategii optymalizacji.

Buforowanie wyników pośrednich

Parametr memory w Pipeline pozwala zapisywać wyniki transformacji w cache. Dzięki temu nie trzeba ponownie obliczać wcześniejszych kroków przy zmianie późniejszych etapów — a to potrafi zaoszczędzić naprawdę sporo czasu:

from tempfile import mkdtemp

# Tworzenie katalogu tymczasowego dla cache
katalog_cache = mkdtemp()

potok_z_cache = Pipeline([
    ('preprocesor', preprocesor),
    ('selekcja', SelectKBest(k=10)),
    ('model', RandomForestClassifier(random_state=42))
], memory=katalog_cache)

# Teraz zmiana parametrów modelu nie wymaga
# ponownego obliczania preprocesingu
potok_z_cache.fit(X_train, y_train)

Wykorzystanie Intel Extension for Scikit-learn

Jeśli pracujesz na procesorze Intel, rozszerzenie scikit-learn-intelex może dać Ci znaczące przyspieszenie praktycznie za darmo:

# Instalacja: pip install scikit-learn-intelex

from sklearnex import patch_sklearn
patch_sklearn()  # Przyspiesza algorytmy scikit-learn

# Od tego momentu standardowe importy scikit-learn
# automatycznie korzystają z optymalizacji Intel
from sklearn.ensemble import RandomForestClassifier
# Ten RandomForestClassifier jest teraz przyspieszony!

Podsumowanie i najlepsze praktyki

Potoki uczenia maszynowego w scikit-learn to fundamentalne narzędzie każdego inżyniera ML. Po przejściu przez ten przewodnik mam nadzieję, że widzisz, jak wiele problemów potrafią rozwiązać. Oto najważniejsze zasady:

  1. Zawsze używaj Pipeline — nawet dla prostych przepływów. Chroni to przed wyciekiem danych i ułatwia reprodukowalność.
  2. Stosuj ColumnTransformer dla heterogenicznych danych — osobne przetwarzanie kolumn numerycznych, kategorycznych i binarnych.
  3. Twórz własne transformatory z użyciem BaseEstimator i TransformerMixin dla niestandardowej logiki.
  4. Optymalizuj hiperparametry na poziomie całego potoku za pomocą GridSearchCV lub RandomizedSearchCV.
  5. Testuj swoje potoki — pisz testy sprawdzające obsługę brakujących wartości, nowych kategorii i poprawność predykcji.
  6. Serializuj cały potok za pomocą joblib — jeden plik zawiera pełną logikę przetwarzania i model.
  7. Śledź eksperymenty z MLflow lub podobnych narzędzi — loguj parametry, metryki i artefakty.
  8. Wersjonuj dane z DVC — zapewnij powtarzalność eksperymentów.
  9. Buforuj wyniki pośrednie za pomocą parametru memory przy dużych zbiorach danych.
  10. Wizualizuj strukturę potoku — korzystaj z wbudowanych narzędzi scikit-learn w Jupyter Notebook.

Budowanie potoków ML to inwestycja, która zwraca się wielokrotnie. Niezależnie od tego, czy pracujesz nad prostym modelem klasyfikacji czy nad złożonym systemem rekomendacji — zasady pozostają te same: automatyzacja, powtarzalność i testowanie.

Powodzenia z Twoimi potokami! A jeśli masz pytania, zachęcam do eksperymentowania z kodem z tego artykułu — najlepiej uczymy się przez praktykę.