Image for post Tu Primer Pipeline con Scikit-learn: Encadenando Preprocesamiento y Modelos de ML

Tu Primer Pipeline con Scikit-learn: Encadenando Preprocesamiento y Modelos de ML


Contexto del Problema: ¿Por Qué Necesitamos Pipelines en Machine Learning?

Como desarrolladores que se inician en el Machine Learning, es común que nuestro primer código de preprocesamiento y entrenamiento de modelos se vea un poco desordenado. Primero escalamos los datos, luego imputamos valores faltantes, después aplicamos PCA, y finalmente entrenamos nuestro modelo. Cada paso se hace de forma manual, a menudo con código separado.

Este enfoque, aunque funcional para ejemplos pequeños, presenta varios problemas:

  • Código desordenado y difícil de mantener: A medida que tu flujo de trabajo crece, el código se vuelve una maraña de transformaciones y llamadas a modelos.
  • Riesgo de fuga de datos (Data Leakage): Este es el más crítico. Si aplicas transformaciones como el escalado (StandardScaler) o la imputación (SimpleImputer) a todo tu conjunto de datos (entrenamiento y prueba) antes de dividirlo, tu modelo "ve" información del conjunto de prueba durante el entrenamiento. Esto infla artificialmente las métricas de rendimiento y tu modelo no generalizará bien a datos nuevos y no vistos.
  • Dificultad para reproducir y desplegar: Para desplegar tu modelo en producción, necesitas aplicar exactamente las mismas transformaciones a los nuevos datos de entrada que usaste durante el entrenamiento. Replicar esto manualmente es propenso a errores.

Aquí es donde entran los Pipelines de Scikit-learn. Son una herramienta poderosa que te permite encadenar múltiples pasos de preprocesamiento y un estimador final en un solo objeto coherente. Resuelven estos problemas, haciendo tu código más limpio, seguro y reproducible.

Conceptos Clave: Los Componentes de un Pipeline

Para entender los pipelines, primero debemos familiarizarnos con dos tipos de objetos fundamentales en Scikit-learn:

Glosario Rápido

  • Transformer: Un objeto que aprende de los datos (fit) y luego los modifica (transform). Ejemplos: StandardScaler, SimpleImputer, PCA.
  • Estimator: Un objeto que aprende de los datos (fit) y luego hace predicciones (predict). Ejemplos: LogisticRegression, RandomForestClassifier.
  • Pipeline: Una secuencia de transformadores seguida por un estimador final.

La Analogía de la Cadena de Montaje

Imagina un pipeline como una cadena de montaje en una fábrica. Cada estación de la cadena realiza una tarea específica en el producto antes de pasarlo a la siguiente estación. Al final de la cadena, el producto está listo.

  • Cada Transformer es una estación de trabajo: recibe el producto (datos), realiza una operación (escalado, imputación, reducción de dimensionalidad) y pasa el producto modificado a la siguiente estación.
  • El Estimator es la estación final: toma el producto ya procesado y realiza la tarea final (clasificación, regresión).

Lo crucial es que, cuando entrenas el pipeline (.fit()), cada estación aprende solo de los datos de entrenamiento que le corresponden. Cuando haces predicciones (.predict()), los nuevos datos fluyen por la misma cadena de montaje, asegurando que se apliquen las mismas transformaciones aprendidas durante el entrenamiento.

Implementación Paso a Paso: Construyendo Tu Primer Pipeline

Vamos a construir un pipeline sencillo que escale nuestros datos y luego entrene un modelo de regresión logística. Usaremos un conjunto de datos sintético para mantener el enfoque en el pipeline.

1. Instalación de Dependencias

Asegúrate de tener las librerías necesarias instaladas. Si no las tienes, puedes instalarlas con pip:

pip install scikit-learn numpy pandas

2. Creando un Conjunto de Datos Sintético

Para nuestro primer ejemplo, generaremos datos aleatorios. Esto nos permite ver cómo el pipeline maneja el preprocesamiento sin distracciones de un dataset complejo.

import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.pipeline import Pipeline
from sklearn.metrics import accuracy_score

# Versiones de librerías (para reproducibilidad)
print(f"Scikit-learn version: {sklearn.__version__}")
print(f"NumPy version: {np.__version__}")
print(f"Pandas version: {pd.__version__}")

# Generar datos sintéticos
np.random.seed(42) # Para resultados reproducibles
X_synth = np.random.rand(100, 5) * 100 # 100 muestras, 5 características
y_synth = (X_synth[:, 0] + X_synth[:, 1] > 100).astype(int) # Una regla simple para la clase

# Dividir los datos en conjuntos de entrenamiento y prueba
X_train_synth, X_test_synth, y_train_synth, y_test_synth = train_test_split(X_synth, y_synth, test_size=0.2, random_state=42)

print(f"Dimensiones de X_train: {X_train_synth.shape}")
print(f"Dimensiones de X_test: {X_test_synth.shape}")

En este código, creamos un conjunto de datos X_synth con 100 filas y 5 columnas, y una variable objetivo y_synth binaria. Luego, lo dividimos en conjuntos de entrenamiento y prueba.

3. Definiendo los Pasos del Pipeline

Un pipeline se define como una lista de tuplas, donde cada tupla contiene un nombre para el paso (una cadena de texto) y el objeto transformador o estimador. El último elemento de la lista debe ser siempre el estimador final.

# Definir los pasos del pipeline
steps = [
    ('scaler', StandardScaler()), # Primer paso: escalar los datos
    ('logreg', LogisticRegression()) # Segundo paso: modelo de regresión logística
]

# Crear el objeto Pipeline
pipeline_synth = Pipeline(steps)

print("Pipeline creado con los siguientes pasos:")
for name, step in pipeline_synth.steps:
    print(f"- {name}: {step.__class__.__name__}")

Aquí, 'scaler' es el nombre que le damos a nuestro StandardScaler, y 'logreg' es el nombre para LogisticRegression. Estos nombres son útiles para acceder a los pasos individuales o para la optimización de hiperparámetros.

4. Entrenando y Evaluando el Pipeline

Una vez que el pipeline está definido, lo entrenamos (.fit()) y hacemos predicciones (.predict()) exactamente como lo haríamos con un modelo individual. La magia es que el pipeline se encarga de aplicar fit_transform a los pasos intermedios y fit al estimador final, todo en el orden correcto y solo con los datos de entrenamiento.

# Entrenar el pipeline con los datos de entrenamiento
# El pipeline se encarga de: 
# 1. Ajustar el scaler a X_train_synth y transformar X_train_synth
# 2. Entrenar el modelo de regresión logística con los datos escalados
pipeline_synth.fit(X_train_synth, y_train_synth)

# Hacer predicciones en el conjunto de prueba
# El pipeline se encarga de: 
# 1. Transformar X_test_synth usando el scaler ajustado en el entrenamiento
# 2. Hacer predicciones con el modelo entrenado
y_pred_synth = pipeline_synth.predict(X_test_synth)

# Evaluar la precisión del modelo
accuracy_synth = accuracy_score(y_test_synth, y_pred_synth)
print(f"Precisión del modelo (Datos Sintéticos): {accuracy_synth:.2f}")

# Prueba mínima: Input -> Output esperado
# Un ejemplo de cómo se vería una predicción para un nuevo dato
new_data = np.array([[50, 60, 10, 5, 20]]) # Suma de los dos primeros > 100
predicted_class = pipeline_synth.predict(new_data)
print(f"Predicción para nuevo dato {new_data}: Clase {predicted_class[0]}")

new_data_2 = np.array([[10, 20, 5, 1, 3]]) # Suma de los dos primeros <= 100
predicted_class_2 = pipeline_synth.predict(new_data_2)
print(f"Predicción para nuevo dato {new_data_2}: Clase {predicted_class_2[0]}")

Observa cómo la llamada a pipeline_synth.fit(X_train_synth, y_train_synth) se encarga de todo el flujo: el escalador se ajusta solo con los datos de entrenamiento y luego transforma esos datos, y el modelo de regresión logística se entrena con los datos ya escalados. Lo mismo ocurre con .predict() para los datos de prueba, evitando la fuga de datos.

Mini Proyecto / Aplicación Sencilla: Un Pipeline Más Complejo con Datos Reales

Ahora, apliquemos un pipeline más robusto a un conjunto de datos real y un poco más complejo: el famoso dataset Iris. Introduciremos algunos valores faltantes para demostrar el uso de un imputador.

1. Cargar y Preparar el Dataset Iris

El dataset Iris es un clásico en Machine Learning, ideal para problemas de clasificación. Introduciremos algunos NaN (Not a Number) para simular datos faltantes y demostrar cómo SimpleImputer los maneja dentro del pipeline.

from sklearn.datasets import load_iris
from sklearn.impute import SimpleImputer
from sklearn.decomposition import PCA

# Cargar el dataset Iris
iris = load_iris()
X_iris, y_iris = iris.data, iris.target

# Convertir a DataFrame para facilitar la introducción de NaNs
X_iris_df = pd.DataFrame(X_iris, columns=iris.feature_names)

# Introducir algunos valores NaN artificialmente para demostrar SimpleImputer
X_iris_df.iloc[::10, 0] = np.nan # Cada 10ª fila en la primera columna
X_iris_df.iloc[::15, 2] = np.nan # Cada 15ª fila en la tercera columna

# Dividir los datos con los NaNs
X_train_iris, X_test_iris, y_train_iris, y_test_iris = train_test_split(X_iris_df, y_iris, test_size=0.2, random_state=42)

print(f"Número de NaNs en X_train_iris antes de imputación: {X_train_iris.isnull().sum().sum()}")

2. Construyendo el Pipeline Complejo

Este pipeline incluirá:

  • SimpleImputer: Para rellenar los valores faltantes (usaremos la media).
  • StandardScaler: Para escalar las características.
  • PCA (Análisis de Componentes Principales): Para reducir la dimensionalidad a 2 componentes.
  • LogisticRegression: Nuestro clasificador final.
# Definir los pasos del pipeline complejo
complex_steps = [
    ('imputer', SimpleImputer(strategy='mean')), # Rellenar NaNs con la media
    ('scaler', StandardScaler()),               # Escalar las características
    ('pca', PCA(n_components=2)),               # Reducir a 2 componentes principales
    ('classifier', LogisticRegression(max_iter=200)) # Clasificador (aumentamos max_iter por si acaso)
]

# Crear el pipeline
pipeline_iris = Pipeline(complex_steps)

print("Pipeline complejo creado con los siguientes pasos:")
for name, step in pipeline_iris.steps:
    print(f"- {name}: {step.__class__.__name__}")

3. Entrenando y Evaluando con Validación Cruzada

Para una evaluación más robusta, usaremos validación cruzada (cross_val_score). Esto entrena y evalúa el pipeline múltiples veces en diferentes subconjuntos de los datos, dándonos una estimación más fiable del rendimiento.

from sklearn.model_selection import cross_val_score

# Entrenar y evaluar el pipeline usando validación cruzada
# cross_val_score se encarga de dividir los datos, entrenar y evaluar el pipeline en cada fold
scores = cross_val_score(pipeline_iris, X_train_iris, y_train_iris, cv=5) # 5-fold cross-validation

print(f"Precisión de validación cruzada (Iris Data): {scores.mean():.2f} (+/- {scores.std() * 2:.2f})")

# Opcional: Entrenar el pipeline en todo el conjunto de entrenamiento y evaluar en el conjunto de prueba
pipeline_iris.fit(X_train_iris, y_train_iris)
y_pred_iris = pipeline_iris.predict(X_test_iris)
print(f"Precisión en el conjunto de prueba (Iris Data): {accuracy_score(y_test_iris, y_pred_iris):.2f}")

# Prueba mínima: Input -> Output esperado
# Un ejemplo de cómo se vería una predicción para un nuevo dato de Iris
# Usamos un dato de ejemplo del dataset original para simular un nuevo input
example_new_iris_data = np.array([[5.1, 3.5, 1.4, 0.2]]) # Setosa
predicted_class_iris = pipeline_iris.predict(example_new_iris_data)
print(f"Predicción para nuevo dato Iris {example_new_iris_data}: Clase {predicted_class_iris[0]} (0=Setosa, 1=Versicolor, 2=Virginica)")

Este ejemplo demuestra cómo un pipeline puede manejar múltiples pasos de preprocesamiento, incluyendo la imputación de datos faltantes y la reducción de dimensionalidad, antes de pasar los datos al modelo final. La validación cruzada con pipelines es la forma correcta de evaluar el rendimiento general de tu flujo de trabajo de ML.

Errores Comunes y Depuración

Aunque los pipelines simplifican mucho el flujo de trabajo, hay algunos errores comunes que los desarrolladores junior-mid suelen encontrar:

  • Orden incorrecto de los pasos:

    Asegúrate de que el orden de tus transformadores tenga sentido lógico. Por ejemplo, no puedes aplicar PCA (que espera datos numéricos) antes de imputar valores faltantes o manejar categorías. Scikit-learn te dará un error si un transformador recibe un tipo de dato inesperado.

    # ERROR COMÚN: PCA antes de imputar NaNs (si los hubiera)
    # from sklearn.pipeline import Pipeline
    # from sklearn.impute import SimpleImputer
    # from sklearn.decomposition import PCA
    # from sklearn.linear_model import LogisticRegression
    
    # bad_pipeline_steps = [
    #     ('pca', PCA(n_components=2)), # Esto fallaría si hay NaNs
    #     ('imputer', SimpleImputer(strategy='mean')),
    #     ('classifier', LogisticRegression())
    # ]
    # bad_pipeline = Pipeline(bad_pipeline_steps)
    # try:
    #     bad_pipeline.fit(X_train_iris, y_train_iris)
    # except Exception as e:
    #     print(f"Error esperado: {e}")
    
  • No usar fit_transform implícitamente:

    Cuando construyes un pipeline, Scikit-learn se encarga automáticamente de llamar fit_transform() en todos los pasos intermedios y fit() en el estimador final. Si intentas depurar un paso individual fuera del pipeline y olvidas fit_transform, podrías obtener errores o resultados incorrectos.

    # Cómo acceder a un paso individual para depuración (después de fit)
    # Acceder al scaler ajustado
    scaler_fitted = pipeline_iris.named_steps['scaler']
    # Transformar datos de prueba usando el scaler ya ajustado
    X_test_scaled = scaler_fitted.transform(pipeline_iris.named_steps['imputer'].transform(X_test_iris))
    print(f"Primeras 5 filas de X_test_scaled:\n{X_test_scaled[:5]}")
    

    Usa pipeline.named_steps['nombre_del_paso'] para acceder a los objetos ajustados dentro del pipeline.

  • Problemas con tipos de datos o columnas:

    Si tienes columnas categóricas y numéricas, un StandardScaler aplicado a todas las columnas fallará. Para esto, necesitarás ColumnTransformer (ver sección de aprendizaje futuro).

Aprendizaje Futuro: Llevando tus Pipelines al Siguiente Nivel

Los pipelines son solo el comienzo. Aquí hay algunos temas avanzados que te recomendamos explorar para dominar el flujo de trabajo de ML:

  • ColumnTransformer para diferentes tipos de columnas:

    En datasets reales, a menudo tienes columnas numéricas y categóricas que requieren diferentes preprocesamientos. ColumnTransformer te permite aplicar transformaciones específicas a subconjuntos de columnas dentro de un pipeline. Es esencial para pipelines robustos.

    # Ejemplo conceptual de ColumnTransformer
    # from sklearn.compose import ColumnTransformer
    # from sklearn.preprocessing import OneHotEncoder
    
    # preprocessor = ColumnTransformer(
    #     transformers=[
    #         ('num', StandardScaler(), ['feature_num_1', 'feature_num_2']),
    #         ('cat', OneHotEncoder(handle_unknown='ignore'), ['feature_cat_1'])
    #     ])
    
    # complex_pipeline = Pipeline(steps=[
    #     ('preprocessor', preprocessor),
    #     ('classifier', LogisticRegression())
    # ])
    
  • Optimización de hiperparámetros con Pipelines:

    Puedes usar GridSearchCV o RandomizedSearchCV directamente con un pipeline para optimizar los hiperparámetros de cualquier paso (transformadores o el estimador final). Esto asegura que la validación cruzada se realice correctamente en cada iteración de búsqueda.

    # Ejemplo conceptual de GridSearchCV con Pipeline
    # from sklearn.model_selection import GridSearchCV
    
    # param_grid = {
    #     'pca__n_components': [1, 2, 3],
    #     'classifier__C': [0.1, 1.0, 10.0]
    # }
    
    # grid_search = GridSearchCV(pipeline_iris, param_grid, cv=5)
    # grid_search.fit(X_train_iris, y_train_iris)
    # print(f"Mejores parámetros: {grid_search.best_params_}")
    
  • Serialización de Pipelines para Producción:

    Una vez que tu pipeline está entrenado, puedes guardarlo en un archivo usando joblib o pickle. Esto te permite cargar el pipeline entrenado en tu aplicación de producción y usarlo para hacer predicciones en nuevos datos sin tener que reentrenarlo.

    import joblib
    
    # Guardar el pipeline entrenado
    joblib.dump(pipeline_iris, 'iris_pipeline.pkl')
    print("Pipeline guardado como 'iris_pipeline.pkl'")
    
    # Cargar el pipeline
    loaded_pipeline = joblib.load('iris_pipeline.pkl')
    print("Pipeline cargado exitosamente.")
    
    # Usar el pipeline cargado para hacer predicciones
    loaded_predictions = loaded_pipeline.predict(X_test_iris)
    print(f"Precisión del pipeline cargado: {accuracy_score(y_test_iris, loaded_predictions):.2f}")
    

    Advertencia de Seguridad/Privacidad: Al serializar modelos, asegúrate de que no contengan información sensible o credenciales. Además, ten en cuenta la compatibilidad de versiones de librerías; un pipeline guardado con una versión de Scikit-learn podría no ser compatible con una versión muy diferente.

Recursos Recomendados: