import pandas as pd import numpy as np from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.preprocessing import OneHotEncoder, StandardScaler class FeatureExtractor: def __init__(self): self.extractors = {} def add_extractor(self, name, extractor): self.extractors[name] = extractor def extract(self, data): features = {} for name, extractor in self.extractors.items(): if hasattr(extractor, 'fit_transform'): features[name] = extractor.fit_transform(data) else: features[name] = extractor(data) return features class NumericalFeatureExtractor: def __init__(self, columns=None): self.columns = columns self.scaler = StandardScaler() def fit_transform(self, df): if self.columns: data = df[self.columns] else: data = df.select_dtypes(include=[np.number]) return self.scaler.fit_transform(data) def transform(self, df): if self.columns: data = df[self.columns] else: data = df.select_dtypes(include=[np.number]) return self.scaler.transform(data) class CategoricalFeatureExtractor: def __init__(self, columns=None): self.columns = columns self.encoder = OneHotEncoder(sparse=False, handle_unknown='ignore') def fit_transform(self, df): if self.columns: data = df[self.columns] else: data = df.select_dtypes(include=['object']) return self.encoder.fit_transform(data) def transform(self, df): if self.columns: data = df[self.columns] else: data = df.select_dtypes(include=['object']) return self.encoder.transform(data) class TextFeatureExtractor: def __init__(self, column, max_features=5000): self.column = column self.vectorizer = TfidfVectorizer(max_features=max_features) def fit_transform(self, df): return self.vectorizer.fit_transform(df[self.column]).toarray() def transform(self, df): return self.vectorizer.transform(df[self.column]).toarray()
2.2 特征选择
from sklearn.feature_selection import SelectKBest, mutual_info_classif, RFE from sklearn.ensemble import RandomForestClassifier class FeatureSelector: def __init__(self, method='filter', k=10): self.method = method self.k = k self.selector = None def fit(self, X, y): if self.method == 'filter': self.selector = SelectKBest(score_func=mutual_info_classif, k=self.k) elif self.method == 'rfe': estimator = RandomForestClassifier() self.selector = RFE(estimator, n_features_to_select=self.k) self.selector.fit(X, y) def transform(self, X): return self.selector.transform(X) def get_selected_features(self): if hasattr(self.selector, 'get_support'): return self.selector.get_support(indices=True) return self.selector.ranking_ class FeatureImportanceAnalyzer: def __init__(self, model): self.model = model def analyze(self, X, y, feature_names): self.model.fit(X, y) if hasattr(self.model, 'feature_importances_'): importances = self.model.feature_importances_ elif hasattr(self.model, 'coef_'): importances = np.abs(self.model.coef_[0]) else: return None indices = np.argsort(importances)[::-1] return [(feature_names[i], importances[i]) for i in indices] class DimensionalityReducer: def __init__(self, method='pca', n_components=2): self.method = method self.n_components = n_components if method == 'pca': from sklearn.decomposition import PCA self.reducer = PCA(n_components=n_components) elif method == 'tsne': from sklearn.manifold import TSNE self.reducer = TSNE(n_components=n_components) elif method == 'umap': import umap self.reducer = umap.UMAP(n_components=n_components) def fit_transform(self, X): return self.reducer.fit_transform(X) def transform(self, X): return self.reducer.transform(X)
2.3 特征验证
class FeatureValidator: def __init__(self): pass def check_missing_values(self, df): missing = df.isnull().sum() return missing[missing > 0] def check_cardinality(self, df, threshold=100): high_cardinality = [] for col in df.columns: if df[col].nunique() > threshold: high_cardinality.append(col) return high_cardinality def check_feature_correlation(self, df, threshold=0.8): corr_matrix = df.corr().abs() high_corr = [] for i in range(len(corr_matrix.columns)): for j in range(i): if corr_matrix.iloc[i, j] > threshold: high_corr.append((corr_matrix.columns[i], corr_matrix.columns[j], corr_matrix.iloc[i, j])) return high_corr class FeatureDriftDetector: def __init__(self): pass def detect_drift(self, reference_data, current_data, threshold=0.05): drift_scores = [] for col in reference_data.columns: if reference_data[col].dtype in ['int64', 'float64']: ref_mean = reference_data[col].mean() curr_mean = current_data[col].mean() diff = abs(ref_mean - curr_mean) / ref_mean if diff > threshold: drift_scores.append((col, diff)) return drift_scores class FeatureStore: def __init__(self): self.features = {} def add_feature(self, name, feature): self.features[name] = feature def get_feature(self, name): return self.features.get(name) def save(self, path): import pickle with open(path, 'wb') as f: pickle.dump(self.features, f) @classmethod def load(cls, path): import pickle with open(path, 'rb') as f: features = pickle.load(f) store = cls() store.features = features return store
3. 性能对比
3.1 特征选择方法对比
方法
计算速度
效果
适用场景
Filter
快
中
高维数据
RFE
慢
高
中等维度
Embedded
中
高
通用
3.2 降维方法对比
方法
保留信息
计算速度
可视化效果
PCA
高
快
中
t-SNE
中
慢
高
UMAP
高
中
高
3.3 特征编码方法对比
方法
维度扩展
处理速度
适用场景
One-Hot
高
快
低基数
Label
无
快
有序类别
Embedding
可控
中
高基数
4. 最佳实践
4.1 特征工程流程
def build_feature_pipeline(config): extractors = [] if config.get('numerical', True): extractors.append(NumericalFeatureExtractor()) if config.get('categorical', True): extractors.append(CategoricalFeatureExtractor()) if config.get('text', False): extractors.append(TextFeatureExtractor('text')) return extractors class FeatureEngineeringPipeline: def __init__(self, extractors, selector=None, reducer=None): self.extractors = extractors self.selector = selector self.reducer = reducer def fit_transform(self, data): features = [] for extractor in self.extractors: features.append(extractor.fit_transform(data)) X = np.hstack(features) if self.selector: self.selector.fit(X, data['target']) X = self.selector.transform(X) if self.reducer: X = self.reducer.fit_transform(X) return X def transform(self, data): features = [] for extractor in self.extractors: features.append(extractor.transform(data)) X = np.hstack(features) if self.selector: X = self.selector.transform(X) if self.reducer: X = self.reducer.transform(X) return X