游戏加速器,搜索引擎优化通常要注意的问题有( ),计算机网站建设及管理,网站平台建设步骤引言在实际电商场景中#xff0c;单一数据源和单一模型往往难以准确预测用户行为。本项目构建一个融合多源数据、多模型集成的进阶预测系统#xff0c;解决以下复杂问题#xff1a;多源异构数据#xff1a;用户行为日志、商品属性、历史订单、时序特征类别不平衡#xff1…引言在实际电商场景中单一数据源和单一模型往往难以准确预测用户行为。本项目构建一个融合多源数据、多模型集成的进阶预测系统解决以下复杂问题多源异构数据用户行为日志、商品属性、历史订单、时序特征类别不平衡购买行为通常只占整体行为的1-5%特征维度灾难高维稀疏特征需要有效处理实时性要求部分场景需要实时预测能力1. 多源数据融合特征工程import pandas as pd import numpy as np from datetime import datetime, timedelta class MultiSourceFeatureEngineer: def __init__(self): self.feature_generators { temporal: self._extract_temporal_features, sequence: self._extract_sequence_features, embedding: self._extract_embedding_features, cross: self._extract_cross_features } def _extract_temporal_features(self, user_logs): 提取复杂时序特征 features {} # 周期性特征 user_logs[hour_sin] np.sin(2 * np.pi * user_logs[hour]/24) user_logs[hour_cos] np.cos(2 * np.pi * user_logs[hour]/24) # 时间衰减特征 current_time datetime.now() user_logs[time_decay] np.exp( -(current_time - user_logs[timestamp]).dt.total_seconds() / (24*3600*7) ) # 会话特征 session_features self._extract_session_features(user_logs) features.update(session_features) return features def _extract_sequence_features(self, user_logs): 使用Transformer提取序列特征 from transformers import BertTokenizer, BertModel import torch # 将用户行为序列编码 tokenizer BertTokenizer.from_pretrained(bert-base-uncased) sequences user_logs.groupby(user_id)[action].apply( lambda x: .join(x.astype(str)) ) # 使用预训练BERT获取序列表示 model BertModel.from_pretrained(bert-base-uncased) encoded tokenizer(sequences.tolist(), return_tensorspt, paddingTrue, truncationTrue) with torch.no_grad(): outputs model(**encoded) sequence_embeddings outputs.last_hidden_state.mean(dim1) return { sequence_embedding: sequence_embeddings.numpy(), sequence_length: sequences.str.len() }代码分析MultiSourceFeatureEngineer类class MultiSourceFeatureEngineer: def __init__(self): self.feature_generators { temporal: self._extract_temporal_features, sequence: self._extract_sequence_features, embedding: self._extract_embedding_features, cross: self._extract_cross_features }模块化设计通过字典映射不同类型特征提取方法便于扩展和维护关注点分离每个方法负责一种特定类型的特征工程策略模式可动态选择特征提取策略时序特征提取分析def _extract_temporal_features(self, user_logs): # 周期性特征编码 - 创新点将时间转换为连续循环特征 user_logs[hour_sin] np.sin(2 * np.pi * user_logs[hour]/24) user_logs[hour_cos] np.cos(2 * np.pi * user_logs[hour]/24) # 时间衰减特征 - 创新点使用指数衰减模拟记忆效应 current_time datetime.now() user_logs[time_decay] np.exp( -(current_time - user_logs[timestamp]).dt.total_seconds() / (24*3600*7) )技术优势周期性编码避免了将小时直接编码为离散变量0-23导致的边界不连续问题时间衰减合理模拟了用户行为的时效性近期行为权重更高标准化处理衰减系数1/7一周可调根据业务场景优化序列特征提取创新点def _extract_sequence_features(self, user_logs): # 使用Transformer提取序列特征 from transformers import BertTokenizer, BertModel import torch # 行为序列编码 sequences user_logs.groupby(user_id)[action].apply( lambda x: .join(x.astype(str)) ) # 使用预训练BERT获取序列表示 model BertModel.from_pretrained(bert-base-uncased) encoded tokenizer(sequences.tolist(), return_tensorspt, paddingTrue, truncationTrue)预训练模型迁移学习利用BERT的强大表征能力提取行为序列特征注意力机制自动学习行为之间的依赖关系无需手工设计规则上下文感知BERT的Transformer架构能捕捉长期依赖关系2. 多层次动态集成模型import lightgbm as lgb import catboost as cb from sklearn.ensemble import RandomForestClassifier, VotingClassifier from sklearn.neural_network import MLPClassifier from sklearn.calibration import CalibratedClassifierCV class DynamicEnsembleModel: def __init__(self, n_folds5): self.n_folds n_folds self.base_models self._initialize_base_models() self.meta_models self._initialize_meta_models() self.dynamic_weight_model self._initialize_weight_model() def _initialize_base_models(self): 初始化多样化基模型 return { lgb: lgb.LGBMClassifier( n_estimators200, learning_rate0.05, num_leaves31, subsample0.8, colsample_bytree0.8, random_state42, n_jobs-1 ), catboost: cb.CatBoostClassifier( iterations300, learning_rate0.05, depth6, l2_leaf_reg3, verbose0, random_state42 ), rf: RandomForestClassifier( n_estimators200, max_depth10, min_samples_split5, random_state42, n_jobs-1 ), neural_net: MLPClassifier( hidden_layer_sizes(128, 64, 32), activationrelu, solveradam, alpha0.0001, batch_size256, max_iter200, random_state42 ) } def _initialize_meta_models(self): 初始化元模型层 return { logistic: LogisticRegression( C1.0, solverlbfgs, max_iter1000, random_state42 ), xgboost: xgb.XGBClassifier( n_estimators100, learning_rate0.1, max_depth4, random_state42, n_jobs-1 ), svm: CalibratedClassifierCV( SVC(probabilityTrue, kernelrbf), cv3 ) } def _initialize_weight_model(self): 初始化动态权重学习模型 from sklearn.linear_model import Ridge return Ridge(alpha1.0, random_state42) def fit_dynamic_weights(self, X_train, y_train, X_val, y_val): 学习动态权重根据验证集表现分配模型权重 # 第一阶段基模型训练 base_predictions {} for name, model in self.base_models.items(): model.fit(X_train, y_train) pred_proba model.predict_proba(X_val)[:, 1] base_predictions[name] pred_proba # 第二阶段学习最优权重组合 base_pred_matrix np.column_stack(list(base_predictions.values())) # 使用岭回归学习权重 self.dynamic_weight_model.fit(base_pred_matrix, y_val) weights self.dynamic_weight_model.coef_ # 确保权重非负且和为1 weights np.maximum(weights, 0) weights weights / weights.sum() # 创建动态加权模型 weighted_models [] for (name, model), weight in zip(self.base_models.items(), weights): weighted_models.append((name, model, weight)) return weighted_models架构设计分析class DynamicEnsembleModel: def __init__(self, n_folds5): self.base_models self._initialize_base_models() # 第一层多样化基模型 self.meta_models self._initialize_meta_models() # 第二层元学习器 self.dynamic_weight_model self._initialize_weight_model() # 第三层权重学习多样性保证第一层使用不同类型的基模型树模型、神经网络元学习能力第二层学习如何组合基模型的预测动态适应性第三层根据数据特点动态调整权重基模型初始化分析def _initialize_base_models(self): return { lgb: lgb.LGBMClassifier( # 轻量梯度提升树 n_estimators200, learning_rate0.05, num_leaves31, # 控制模型复杂度 subsample0.8, # 行采样增加多样性 colsample_bytree0.8, # 列采样增加多样性 random_state42, n_jobs-1 ), catboost: cb.CatBoostClassifier( # 类别特征友好 iterations300, learning_rate0.05, depth6, l2_leaf_reg3, # L2正则化防止过拟合 verbose0, random_state42 ), neural_net: MLPClassifier( # 神经网络捕捉非线性 hidden_layer_sizes(128, 64, 32), # 金字塔结构 activationrelu, solveradam, alpha0.0001, # L2正则化系数 batch_size256, max_iter200, random_state42 ) }异质模型组合结合树模型和神经网络的优势正则化配置每个模型都有防止过拟合的机制随机种子控制确保结果可复现动态权重学习算法分析def fit_dynamic_weights(self, X_train, y_train, X_val, y_val): # 第一阶段基模型训练 base_predictions {} for name, model in self.base_models.items(): model.fit(X_train, y_train) pred_proba model.predict_proba(X_val)[:, 1] base_predictions[name] pred_proba # 第二阶段学习最优权重组合 base_pred_matrix np.column_stack(list(base_predictions.values())) # 使用岭回归学习权重 self.dynamic_weight_model.fit(base_pred_matrix, y_val) weights self.dynamic_weight_model.coef_数据驱动权重分配基于验证集性能自动学习最优权重岭回归优势处理多重共线性防止权重过拟合端到端优化直接优化集成模型的预测性能3. 自适应集成策略class AdaptiveEnsemble: def __init__(self, confidence_threshold0.8): self.confidence_threshold confidence_threshold self.models [] self.confidence_estimators [] def add_model(self, model, confidence_estimator): 添加模型及其置信度估计器 self.models.append(model) self.confidence_estimators.append(confidence_estimator) def predict_adaptive(self, X): 自适应预测基于置信度选择模型 predictions [] confidences [] for model, conf_estimator in zip(self.models, self.confidence_estimators): pred model.predict_proba(X) conf conf_estimator.predict_proba(X) predictions.append(pred) confidences.append(conf) predictions np.array(predictions) # shape: (n_models, n_samples, n_classes) confidences np.array(confidences) # shape: (n_models, n_samples, n_classes) # 自适应加权 final_pred np.zeros_like(predictions[0]) for i in range(X.shape[0]): # 对每个样本选择置信度最高的模型 model_conf confidences[:, i, :].max(axis1) best_model_idx np.argmax(model_conf) if model_conf[best_model_idx] self.confidence_threshold: # 高置信度使用单个模型 final_pred[i] predictions[best_model_idx, i] else: # 低置信度使用加权组合 weights self._calculate_dynamic_weights(confidences[:, i, :]) final_pred[i] np.average(predictions[:, i], axis0, weightsweights) return final_pred def _calculate_dynamic_weights(self, confidences): 基于置信度计算动态权重 # 使用softmax归一化 weights np.exp(confidences) / np.exp(confidences).sum(axis0) return weights.mean(axis1)置信度驱动决策分析class AdaptiveEnsemble: def __init__(self, confidence_threshold0.8): self.confidence_threshold confidence_threshold self.models [] self.confidence_estimators [] def predict_adaptive(self, X): # 对每个样本选择置信度最高的模型 for i in range(X.shape[0]): model_conf confidences[:, i, :].max(axis1) best_model_idx np.argmax(model_conf) if model_conf[best_model_idx] self.confidence_threshold: # 高置信度使用单个模型 final_pred[i] predictions[best_model_idx, i] else: # 低置信度使用加权组合 weights self._calculate_dynamic_weights(confidences[:, i, :]) final_pred[i] np.average(predictions[:, i], axis0, weightsweights)样本级自适应每个样本独立选择最优模型或组合置信度阈值根据模型对样本的把握程度动态调整策略软硬决策结合高置信度用硬决策单模型低置信度用软决策组合置信度估计实现def _calculate_dynamic_weights(self, confidences): 基于置信度计算动态权重 # 使用softmax归一化 weights np.exp(confidences) / np.exp(confidences).sum(axis0) return weights.mean(axis1)数学原理Softmax函数:概率解释将置信度转化为概率分布保证权重非负且和为1温度参数可引入温度参数控制权重分布集中度4. 在线学习与概念漂移检测from river import tree, ensemble, drift class OnlineEnsembleWithDriftDetection: def __init__(self, window_size1000): self.window_size window_size self.models { arf: ensemble.AdaptiveRandomForestClassifier( n_models10, drift_detectordrift.ADWIN(delta0.002) ), sgt: tree.HoeffdingTreeClassifier( grace_period100, split_confidence0.01 ), oza: ensemble.OzaBaggingClassifier( modeltree.HoeffdingTreeClassifier(), n_models10 ) } self.drift_detectors { page_hinkley: drift.PageHinkley(min_instances30, delta0.005), kswin: drift.KSWIN(window_sizewindow_size, stat_size30) } self.drift_history [] def update_and_predict(self, X, y): 在线更新并预测检测概念漂移 predictions {} drift_flags {} for model_name, model in self.models.items(): # 在线学习 for xi, yi in zip(X, y): model.learn_one(xi, yi) # 预测 preds [model.predict_one(xi) for xi in X] predictions[model_name] preds # 概念漂移检测 for drift_name, detector in self.drift_detectors.items(): for xi, yi in zip(X, y): detector.update(yi) if detector.drift_detected: drift_flags[f{model_name}_{drift_name}] True self._handle_drift(model_name) return predictions, drift_flags def _handle_drift(self, model_name): 处理概念漂移 print(f概念漂移检测到重置模型: {model_name}) self.drift_history.append({ model: model_name, timestamp: datetime.now(), window: self.window_size }) # 部分重置模型 if model_name arf: self.models[model_name] ensemble.AdaptiveRandomForestClassifier( n_models10, drift_detectordrift.ADWIN(delta0.002) )在线学习框架分析from river import tree, ensemble, drift class OnlineEnsembleWithDriftDetection: def __init__(self, window_size1000): self.window_size window_size self.models { arf: ensemble.AdaptiveRandomForestClassifier( n_models10, drift_detectordrift.ADWIN(delta0.002) # 自适应窗口漂移检测 ), sgt: tree.HoeffdingTreeClassifier( grace_period100, # 节点分裂最小样本数 split_confidence0.01 # 分裂置信度 ) }自适应随机森林每个树配备独立的漂移检测器霍夫丁树理论保证的在线学习树适用于数据流多检测器策略结合ADWIN、Page-Hinkley、KSWIN多种检测方法概念漂移处理机制def update_and_predict(self, X, y): 在线更新并预测检测概念漂移 drift_flags {} for drift_name, detector in self.drift_detectors.items(): for xi, yi in zip(X, y): detector.update(yi) if detector.drift_detected: drift_flags[f{model_name}_{drift_name}] True self._handle_drift(model_name) # 触发漂移处理ADWIN自适应窗口无需参数调优检测渐进漂移Page-Hinkley累积和检验检测突然漂移KSWIN基于KS检验检测分布变化模型重置策略分析def _handle_drift(self, model_name): 处理概念漂移 print(f概念漂移检测到重置模型: {model_name}) self.drift_history.append({ model: model_name, timestamp: datetime.now(), window: self.window_size }) # 部分重置模型 if model_name arf: self.models[model_name] ensemble.AdaptiveRandomForestClassifier(...)5. 模型解释与业务洞察import shap import plotly.graph_objects as go from plotly.subplots import make_subplots class ModelInterpretationDashboard: def __init__(self, model, X, feature_names): self.model model self.X X self.feature_names feature_names self.shap_explainer shap.TreeExplainer(model) def create_interactive_dashboard(self): 创建交互式模型解释仪表板 shap_values self.shap_explainer.shap_values(self.X) # 创建多子图仪表板 fig make_subplots( rows2, cols3, subplot_titles( 全局特征重要性, 个体预测解释, 特征依赖关系, SHAP摘要图, 决策路径分析, 模型对比热图 ) ) # 1. 全局特征重要性 global_importance np.abs(shap_values).mean(0) fig.add_trace( go.Bar(xglobal_importance, yself.feature_names, orientationh, name全局重要性), row1, col1 ) # 2. SHAP摘要图 fig.add_trace( go.Scatter( xshap_values[:, 0], yself.X.iloc[:, 0], modemarkers, markerdict( size8, colorshap_values[:, 0], colorscaleRdBu, showscaleTrue ), nameSHAP值分布 ), row2, col1 ) # 3. 决策路径可视化 self._plot_decision_path(fig, row2, col2) fig.update_layout( height900, showlegendFalse, title_text模型解释与业务洞察仪表板 ) return fig def _plot_decision_path(self, fig, row, col): 可视化单个样本的决策路径 # 选择一个样本进行解释 sample_idx 0 sample self.X.iloc[sample_idx] # 获取决策路径 if hasattr(self.model, decision_path): decision_path self.model.decision_path(sample.values.reshape(1, -1)) # 创建决策路径图 path_trace go.Scatter( xrange(len(decision_path.indices)), ydecision_path.data, modelinesmarkers, name决策路径, linedict(width2, colorred) ) fig.add_trace(path_trace, rowrow, colcol)SHAP解释框架分析import shap class ModelInterpretationDashboard: def __init__(self, model, X, feature_names): self.shap_explainer shap.TreeExplainer(model) # 树模型专用解释器 def create_interactive_dashboard(self): shap_values self.shap_explainer.shap_values(self.X) # SHAP值计算每个特征对每个预测的贡献度 # 公式f(x) E[f(z)] Σ φ_i其中φ_i是SHAP值SHAP原理优势理论保证基于博弈论的Shapley值唯一满足四大公理全局与局部统一既解释单个预测又提供全局重要性特征交互可分解为单个特征和特征交互的贡献交互式可视化创新from plotly.subplots import make_subplots fig make_subplots( rows2, cols3, subplot_titles( 全局特征重要性, 个体预测解释, 特征依赖关系, SHAP摘要图, 决策路径分析, 模型对比热图 ) ) # 1. 全局特征重要性 global_importance np.abs(shap_values).mean(0) fig.add_trace( go.Bar(xglobal_importance, yself.feature_names, orientationh, name全局重要性), row1, col1 ) # 2. 个体预测解释瀑布图 sample_idx 0 shap.plots.waterfall(shap_values[sample_idx])多维信息整合6个子图从不同角度解释模型交互式探索Plotly支持缩放、悬停查看详情业务对齐特征重要性排序帮助业务理解关键因素决策路径分析深化def _plot_decision_path(self, fig, row, col): 可视化单个样本的决策路径 if hasattr(self.model, decision_path): decision_path self.model.decision_path(sample.values.reshape(1, -1)) # 提取决策路径中的节点 path_nodes self._extract_decision_nodes(decision_path) # 创建决策树可视化 self._visualize_decision_tree(path_nodes, fig, row, col)项目实战完整训练流水线 集成学习进阶项目多源数据融合的动态集成系统 作者free-elcmacom 日期2025-12-12 import pandas as pd import numpy as np import warnings import matplotlib matplotlib.rcParams[font.sans-serif] [SimHei] # 用来正常显示中文标签 matplotlib.rcParams[axes.unicode_minus] False # 用来正常显示负号 warnings.filterwarnings(ignore) # 设置随机种子确保可重复性 np.random.seed(42) # 第一部分多源数据生成与特征工程 class MultiSourceDataGenerator: 生成模拟多源数据 staticmethod def generate_user_behavior_data(n_users1000, n_days30): 生成用户行为数据 user_ids [fuser_{i} for i in range(n_users)] dates pd.date_range(endpd.Timestamp.now(), periodsn_days, freqD) data [] for user_id in user_ids: for date in dates: # 每天生成1-5条行为记录 n_actions np.random.randint(1, 6) for _ in range(n_actions): hour np.random.randint(0, 24) minute np.random.randint(0, 60) timestamp date pd.Timedelta(hourshour, minutesminute) # 行为类型0浏览1点击2加入购物车3购买 action np.random.choice([0, 1, 2, 3], p[0.5, 0.3, 0.15, 0.05]) # 页面停留时间秒 stay_time np.random.exponential(60) data.append({ user_id: user_id, timestamp: timestamp, hour: hour, action: action, stay_time: min(stay_time, 600) # 最大10分钟 }) return pd.DataFrame(data) staticmethod def generate_user_profiles(n_users1000): 生成用户画像数据 user_ids [fuser_{i} for i in range(n_users)] profiles [] for user_id in user_ids: age np.random.randint(18, 65) gender np.random.choice([M, F]) income_level np.random.choice([low, medium, high]) registration_days np.random.randint(1, 365 * 3) # 注册0-3年 # 设备偏好 device np.random.choice([mobile, desktop, tablet], p[0.6, 0.3, 0.1]) # 地理位置 region np.random.choice([north, south, east, west]) profiles.append({ user_id: user_id, age: age, gender: gender, income_level: income_level, registration_days: registration_days, device_preference: device, region: region }) return pd.DataFrame(profiles) staticmethod def generate_product_data(n_products100): 生成商品数据 product_ids [fproduct_{i} for i in range(n_products)] products [] for product_id in product_ids: category np.random.choice([electronics, clothing, books, home, food]) price np.random.uniform(10, 1000) rating np.random.uniform(3.0, 5.0) review_count np.random.randint(0, 1000) # 商品特征 discount_rate np.random.uniform(0, 0.5) # 0-50%折扣 stock_level np.random.choice([high, medium, low]) products.append({ product_id: product_id, category: category, price: price, rating: rating, review_count: review_count, discount_rate: discount_rate, stock_level: stock_level }) return pd.DataFrame(products) staticmethod def generate_training_labels(n_users1000): 生成训练标签是否购买 user_ids [fuser_{i} for i in range(n_users)] labels [] for user_id in user_ids: # 基础购买概率20% base_prob 0.2 # 根据用户ID添加一些模式 user_num int(user_id.split(_)[1]) if user_num % 5 0: base_prob 0.3 # 特定用户群购买率高 if user_num % 7 0: base_prob - 0.1 # 特定用户群购买率低 # 添加随机性 purchase_prob min(max(base_prob np.random.uniform(-0.1, 0.1), 0), 1) # 生成标签 purchase 1 if np.random.random() purchase_prob else 0 labels.append({ user_id: user_id, purchase: purchase, purchase_prob: purchase_prob }) return pd.DataFrame(labels) class MultiSourceFeatureEngineer: 多源特征工程 def __init__(self): self.feature_names [] def extract_features(self, user_behavior, user_profiles, product_data, labels): 提取多源特征 features [] # 1. 用户行为特征 user_features self._extract_user_behavior_features(user_behavior) features.append(user_features) # 2. 用户画像特征 profile_features self._extract_profile_features(user_profiles) features.append(profile_features) # 3. 商品特征聚合 product_features self._extract_product_features(product_data, user_behavior) features.append(product_features) # 4. 时序特征 temporal_features self._extract_temporal_features(user_behavior) features.append(temporal_features) # 5. 交叉特征 cross_features self._extract_cross_features(user_features, profile_features) features.append(cross_features) # 合并所有特征 all_features pd.concat(features, axis1) # 获取标签 y labels.set_index(user_id)[purchase] # 对齐索引 all_features all_features.loc[y.index] self.feature_names all_features.columns.tolist() return all_features, y def _extract_user_behavior_features(self, user_behavior): 提取用户行为特征 features pd.DataFrame(indexuser_behavior[user_id].unique()) # 行为统计 behavior_stats user_behavior.groupby(user_id).agg({ action: [count, mean, std], stay_time: [mean, sum, max] }) behavior_stats.columns [_.join(col).strip() for col in behavior_stats.columns.values] # 行为类型分布 action_dummies pd.get_dummies(user_behavior[action], prefixaction) action_dummies[user_id] user_behavior[user_id] action_dist action_dummies.groupby(user_id).sum() # 会话特征 user_behavior[date] user_behavior[timestamp].dt.date daily_actions user_behavior.groupby([user_id, date]).size() session_features daily_actions.groupby(user_id).agg([mean, std, max]) session_features.columns [daily_actions_ col for col in session_features.columns] # 合并 features pd.concat([features, behavior_stats, action_dist, session_features], axis1) return features.fillna(0) def _extract_profile_features(self, user_profiles): 提取用户画像特征 features user_profiles.set_index(user_id) # 编码分类变量 categorical_cols [gender, income_level, device_preference, region] for col in categorical_cols: dummies pd.get_dummies(features[col], prefixcol) features pd.concat([features, dummies], axis1) features.drop(col, axis1, inplaceTrue) # 数值特征标准化 numerical_cols [age, registration_days] for col in numerical_cols: features[col] (features[col] - features[col].mean()) / features[col].std() return features def _extract_product_features(self, product_data, user_behavior): 提取商品聚合特征 features pd.DataFrame(indexuser_behavior[user_id].unique()) # 用户与商品的交互 user_product user_behavior.copy() # 模拟商品ID分配实际中应从数据中获取 product_ids product_data[product_id].tolist() user_product[product_id] np.random.choice(product_ids, len(user_product)) # 合并商品特征 user_product user_product.merge(product_data, onproduct_id, howleft) # 用户级别的商品特征聚合 agg_features user_product.groupby(user_id).agg({ price: [mean, min, max], rating: mean, discount_rate: mean }) agg_features.columns [_.join(col).strip() for col in agg_features.columns.values] # 商品类别偏好 category_dummies pd.get_dummies(user_product[category], prefixcategory) category_dummies[user_id] user_product[user_id] category_pref category_dummies.groupby(user_id).sum() # 合并 features pd.concat([features, agg_features, category_pref], axis1) return features.fillna(0) def _extract_temporal_features(self, user_behavior): 提取时序特征 features pd.DataFrame(indexuser_behavior[user_id].unique()) # 时间窗口统计 latest_date user_behavior[timestamp].max() user_behavior[days_since_last] (latest_date - user_behavior[timestamp]).dt.days recency_features user_behavior.groupby(user_id)[days_since_last].agg([min, mean]) recency_features.columns [days_since_last_ col for col in recency_features.columns] # 活跃时段特征 user_behavior[hour_sin] np.sin(2 * np.pi * user_behavior[hour] / 24) user_behavior[hour_cos] np.cos(2 * np.pi * user_behavior[hour] / 24) hour_features user_behavior.groupby(user_id)[[hour_sin, hour_cos]].mean() # 时间衰减特征 user_behavior[time_decay] np.exp(-user_behavior[days_since_last] / 7) decay_features user_behavior.groupby(user_id)[time_decay].agg([mean, sum]) decay_features.columns [time_decay_ col for col in decay_features.columns] # 合并 features pd.concat([features, recency_features, hour_features, decay_features], axis1) return features.fillna(0) def _extract_cross_features(self, user_features, profile_features): 提取交叉特征 features pd.DataFrame(indexuser_features.index) # 用户行为与画像的交互 # 选择几个关键特征进行交叉 if action_count in user_features.columns and age in profile_features.columns: features[action_age_interaction] user_features[action_count] * profile_features[age] if stay_time_mean in user_features.columns and registration_days in profile_features.columns: features[engagement_seniority] user_features[stay_time_mean] * profile_features[registration_days] return features.fillna(0) # 第二部分动态集成模型 from sklearn.ensemble import RandomForestClassifier from sklearn.linear_model import LogisticRegression from sklearn.neural_network import MLPClassifier from sklearn.model_selection import train_test_split, cross_val_score from sklearn.metrics import accuracy_score, roc_auc_score, classification_report, confusion_matrix import lightgbm as lgb import xgboost as xgb from sklearn.svm import SVC from sklearn.calibration import CalibratedClassifierCV class DynamicEnsembleModel: 动态集成模型 def __init__(self, n_folds5): self.n_folds n_folds self.base_models {} self.meta_model None self.dynamic_weights None self.is_fitted False def _initialize_base_models(self): 初始化基模型 models { random_forest: RandomForestClassifier( n_estimators100, max_depth10, min_samples_split5, random_state42, n_jobs-1 ), xgboost: xgb.XGBClassifier( n_estimators100, learning_rate0.1, max_depth5, random_state42, n_jobs-1, eval_metriclogloss ), lightgbm: lgb.LGBMClassifier( n_estimators100, learning_rate0.05, num_leaves31, random_state42, n_jobs-1 ), logistic: LogisticRegression( C1.0, max_iter1000, random_state42 ), svm: CalibratedClassifierCV( SVC(probabilityTrue, kernelrbf, random_state42), cv3 ), neural_net: MLPClassifier( hidden_layer_sizes(64, 32), activationrelu, solveradam, alpha0.0001, max_iter500, random_state42 ) } return models def fit(self, X_train, y_train, X_valNone, y_valNone): 训练动态集成模型 print(训练动态集成模型...) # 初始化模型 self.base_models self._initialize_base_models() # 如果没有验证集从训练集分割 if X_val is None or y_val is None: X_train, X_val, y_train, y_val train_test_split( X_train, y_train, test_size0.2, random_state42, stratifyy_train ) # 第一阶段训练基模型 base_predictions {} base_scores {} print(\n基模型训练结果) print(- * 50) for name, model in self.base_models.items(): print(f训练 {name}...) model.fit(X_train, y_train) # 在验证集上预测 if hasattr(model, predict_proba): y_pred_proba model.predict_proba(X_val)[:, 1] else: y_pred_proba model.predict(X_val) # 计算性能 auc_score roc_auc_score(y_val, y_pred_proba) base_scores[name] auc_score # 保存预测结果 base_predictions[name] y_pred_proba print(f {name}: AUC {auc_score:.4f}) print(- * 50) # 计算动态权重基于验证集性能 self.dynamic_weights self._calculate_dynamic_weights(base_scores) print(\n模型权重分配) for name, weight in self.dynamic_weights.items(): print(f {name}: {weight:.4f}) # 第二阶段训练元模型使用基模型的预测作为特征 print(\n训练元模型...) meta_features self._create_meta_features(base_predictions) self.meta_model LogisticRegression( C1.0, max_iter1000, random_state42 ) self.meta_model.fit(meta_features, y_val) # 在完整训练集上重新训练基模型 print(\n在完整训练集上重新训练基模型...) for name, model in self.base_models.items(): model.fit(X_train, y_train) self.is_fitted True print(模型训练完成) return self def _calculate_dynamic_weights(self, base_scores): 计算动态权重 # 基于AUC计算权重 scores np.array(list(base_scores.values())) # 归一化到[0, 1] if scores.max() scores.min(): normalized_scores (scores - scores.min()) / (scores.max() - scores.min()) else: normalized_scores np.ones_like(scores) # 使用softmax得到权重 weights np.exp(normalized_scores) / np.exp(normalized_scores).sum() # 创建权重字典 weight_dict {name: weight for name, weight in zip(base_scores.keys(), weights)} return weight_dict def _create_meta_features(self, base_predictions): 创建元特征基模型的预测结果 meta_features np.column_stack(list(base_predictions.values())) return meta_features def predict(self, X): 预测 if not self.is_fitted: raise ValueError(模型尚未训练请先调用fit方法) # 获取基模型的预测 base_preds {} for name, model in self.base_models.items(): if hasattr(model, predict_proba): pred model.predict_proba(X)[:, 1] else: pred model.predict(X) base_preds[name] pred # 创建元特征 meta_features self._create_meta_features(base_preds) # 元模型预测 final_pred self.meta_model.predict_proba(meta_features)[:, 1] return final_pred def predict_class(self, X, threshold0.5): 预测类别 pred_proba self.predict(X) return (pred_proba threshold).astype(int) class AdaptiveEnsemble: 自适应集成策略 def __init__(self, confidence_threshold0.7): self.confidence_threshold confidence_threshold self.models [] self.model_names [] self.is_fitted False def add_model(self, model, name): 添加模型 self.models.append(model) self.model_names.append(name) def fit(self, X, y): 训练所有模型 print(训练自适应集成模型...) for model, name in zip(self.models, self.model_names): print(f训练 {name}...) model.fit(X, y) self.is_fitted True return self def predict_adaptive(self, X): 自适应预测 if not self.is_fitted: raise ValueError(模型尚未训练请先调用fit方法) # 获取所有模型的预测和置信度 predictions [] confidences [] for model in self.models: if hasattr(model, predict_proba): pred_proba model.predict_proba(X) pred pred_proba.argmax(axis1) confidence pred_proba.max(axis1) else: pred model.predict(X) confidence np.ones(len(X)) * 0.5 # 如果没有概率使用默认置信度 predictions.append(pred) confidences.append(confidence) predictions np.array(predictions) # shape: (n_models, n_samples) confidences np.array(confidences) # shape: (n_models, n_samples) # 自适应集成 final_predictions np.zeros(len(X), dtypeint) for i in range(len(X)): # 获取当前样本的所有模型置信度 sample_confidences confidences[:, i] # 找到最高置信度 max_confidence sample_confidences.max() best_model_idx sample_confidences.argmax() if max_confidence self.confidence_threshold: # 高置信度使用单个最佳模型 final_predictions[i] predictions[best_model_idx, i] else: # 低置信度使用加权投票 weights self._calculate_weights(sample_confidences) weighted_votes np.zeros(len(np.unique(predictions[:, i]))) for j, model_pred in enumerate(predictions[:, i]): weighted_votes[model_pred] weights[j] final_predictions[i] weighted_votes.argmax() return final_predictions def _calculate_weights(self, confidences): 计算权重 # 使用softmax weights np.exp(confidences) / np.exp(confidences).sum() return weights # 第三部分模型评估与可视化 import matplotlib.pyplot as plt import seaborn as sns from sklearn.metrics import roc_curve, auc, precision_recall_curve class ModelEvaluator: 模型评估器 staticmethod def evaluate_model(y_true, y_pred, y_pred_probaNone, model_nameModel): 评估模型性能 print(f\n{ * 50}) print(f{model_name} 评估结果) print(f{ * 50}) # 基础指标 accuracy accuracy_score(y_true, y_pred) print(f准确率: {accuracy:.4f}) if y_pred_proba is not None: auc_score roc_auc_score(y_true, y_pred_proba) print(fAUC: {auc_score:.4f}) # 分类报告 print(\n分类报告:) print(classification_report(y_true, y_pred)) # 混淆矩阵 cm confusion_matrix(y_true, y_pred) print(混淆矩阵:) print(cm) return { accuracy: accuracy, auc: auc_score if y_pred_proba is not None else None, confusion_matrix: cm } staticmethod def plot_model_comparison(results, model_names, feature_importanceNone): 绘制模型比较图 fig, axes plt.subplots(2, 2, figsize(14, 10)) # 1. 准确率比较 accuracies [results[name][accuracy] for name in model_names] axes[0, 0].bar(model_names, accuracies, color[#FF6B6B, #4ECDC4, #45B7D1, #96CEB4]) axes[0, 0].set_title(模型准确率比较, fontsize14, fontweightbold) axes[0, 0].set_ylabel(准确率, fontsize12) axes[0, 0].set_ylim(0, 1) for i, acc in enumerate(accuracies): axes[0, 0].text(i, acc 0.02, f{acc:.3f}, hacenter, fontweightbold) # 2. ROC曲线 ax axes[0, 1] for name in model_names: if y_pred_proba in results[name]: y_true results[name][y_true] y_pred_proba results[name][y_pred_proba] fpr, tpr, _ roc_curve(y_true, y_pred_proba) roc_auc auc(fpr, tpr) ax.plot(fpr, tpr, lw2, labelf{name} (AUC {roc_auc:.3f})) ax.plot([0, 1], [0, 1], k--, alpha0.6) ax.set_xlabel(假正率, fontsize12) ax.set_ylabel(真正率, fontsize12) ax.set_title(ROC曲线, fontsize14, fontweightbold) ax.legend(loclower right) ax.grid(True, alpha0.3) # 3. 混淆矩阵热图最后一个模型 ax axes[1, 0] last_model model_names[-1] cm results[last_model][confusion_matrix] sns.heatmap(cm, annotTrue, fmtd, cmapBlues, axax) ax.set_title(f{last_model}混淆矩阵, fontsize14, fontweightbold) ax.set_xlabel(预测标签, fontsize12) ax.set_ylabel(真实标签, fontsize12) # 4. 特征重要性图如果有 ax axes[1, 1] if feature_importance is not None and len(feature_importance) 0: # 只显示前10个最重要的特征 top_features feature_importance.head(10) colors plt.cm.viridis(np.linspace(0.3, 0.9, len(top_features))) bars ax.barh(range(len(top_features)), top_features[importance], colorcolors) ax.set_yticks(range(len(top_features))) ax.set_yticklabels(top_features[feature], fontsize10) ax.set_xlabel(重要性分数, fontsize12) ax.set_title(Top 10 特征重要性, fontsize14, fontweightbold) ax.invert_yaxis() # 最重要的特征在顶部 ax.grid(True, alpha0.3, axisx) # 在条形上添加数值 for i, (bar, importance) in enumerate(zip(bars, top_features[importance])): ax.text(importance 0.001, i, f{importance:.4f}, vacenter, fontsize9, fontweightbold) else: ax.text(0.5, 0.5, 特征重要性图\n需要特征重要性数据, hacenter, vacenter, fontsize14, transformax.transAxes) ax.set_title(特征重要性, fontsize14, fontweightbold) ax.axis(off) plt.tight_layout() plt.show() staticmethod def plot_learning_curves(train_scores, val_scores, model_name): 绘制学习曲线 plt.figure(figsize(10, 6)) plt.plot(train_scores, label训练集, color#FF6B6B, linewidth2) plt.plot(val_scores, label验证集, color#4ECDC4, linewidth2) plt.xlabel(训练轮次, fontsize12) plt.ylabel(准确率, fontsize12) plt.title(f{model_name}学习曲线, fontsize14, fontweightbold) plt.legend() plt.grid(True, alpha0.3) plt.show() # 第四部分主程序 def main(): 主函数 print( * 70) print(集成学习进阶项目多源数据融合的动态集成系统) print( * 70) # 1. 生成模拟数据 print(\n1. 生成模拟数据...) generator MultiSourceDataGenerator() user_behavior generator.generate_user_behavior_data(n_users500, n_days30) user_profiles generator.generate_user_profiles(n_users500) product_data generator.generate_product_data(n_products50) labels generator.generate_training_labels(n_users500) print(f用户行为数据: {user_behavior.shape}) print(f用户画像数据: {user_profiles.shape}) print(f商品数据: {product_data.shape}) print(f标签数据: {labels.shape}) # 2. 特征工程 print(\n2. 特征工程...) feature_engineer MultiSourceFeatureEngineer() X, y feature_engineer.extract_features( user_behavior, user_profiles, product_data, labels ) print(f特征矩阵: {X.shape}) print(f标签分布: {y.value_counts().to_dict()}) # 3. 划分训练集和测试集 print(\n3. 划分训练集和测试集...) X_train, X_test, y_train, y_test train_test_split( X, y, test_size0.3, random_state42, stratifyy ) print(f训练集: {X_train.shape}) print(f测试集: {X_test.shape}) # 4. 训练动态集成模型 print(\n4. 训练动态集成模型...) dynamic_ensemble DynamicEnsembleModel() dynamic_ensemble.fit(X_train, y_train) # 预测 y_pred_proba_dynamic dynamic_ensemble.predict(X_test) y_pred_dynamic dynamic_ensemble.predict_class(X_test, threshold0.5) # 5. 训练对比模型 print(\n5. 训练对比模型...) # 随机森林 print(训练随机森林...) rf_model RandomForestClassifier(n_estimators100, random_state42) rf_model.fit(X_train, y_train) y_pred_rf rf_model.predict(X_test) y_pred_proba_rf rf_model.predict_proba(X_test)[:, 1] # XGBoost print(训练XGBoost...) xgb_model xgb.XGBClassifier(n_estimators100, random_state42, eval_metriclogloss) xgb_model.fit(X_train, y_train) y_pred_xgb xgb_model.predict(X_test) y_pred_proba_xgb xgb_model.predict_proba(X_test)[:, 1] # 自适应集成 print(训练自适应集成...) adaptive_ensemble AdaptiveEnsemble(confidence_threshold0.7) adaptive_ensemble.add_model(rf_model, Random Forest) adaptive_ensemble.add_model(xgb_model, XGBoost) adaptive_ensemble.add_model(LogisticRegression(max_iter1000, random_state42), Logistic Regression) adaptive_ensemble.fit(X_train, y_train) y_pred_adaptive adaptive_ensemble.predict_adaptive(X_test) # 6. 模型评估 print(\n6. 模型评估...) evaluator ModelEvaluator() # 收集结果 results {} # 动态集成模型 results[Dynamic Ensemble] evaluator.evaluate_model( y_test, y_pred_dynamic, y_pred_proba_dynamic, 动态集成模型 ) results[Dynamic Ensemble][y_true] y_test results[Dynamic Ensemble][y_pred_proba] y_pred_proba_dynamic # 随机森林 results[Random Forest] evaluator.evaluate_model( y_test, y_pred_rf, y_pred_proba_rf, 随机森林 ) results[Random Forest][y_true] y_test results[Random Forest][y_pred_proba] y_pred_proba_rf # XGBoost results[XGBoost] evaluator.evaluate_model( y_test, y_pred_xgb, y_pred_proba_xgb, XGBoost ) results[XGBoost][y_true] y_test results[XGBoost][y_pred_proba] y_pred_proba_xgb # 自适应集成 # 注意自适应集成没有概率输出只评估分类 results[Adaptive Ensemble] evaluator.evaluate_model( y_test, y_pred_adaptive, None, 自适应集成 ) results[Adaptive Ensemble][y_true] y_test # 计算特征重要性使用随机森林模型 feature_importance pd.DataFrame({ feature: X.columns, importance: rf_model.feature_importances_ }).sort_values(importance, ascendingFalse) # 7. 可视化 print(\n7. 可视化结果...) evaluator.plot_model_comparison(results, [Random Forest, XGBoost, Dynamic Ensemble, Adaptive Ensemble], feature_importance) # 8. 总结 print(\n8. 项目总结) print(- * 50) best_model None best_accuracy 0 for name, result in results.items(): if result[accuracy] best_accuracy: best_accuracy result[accuracy] best_model name print(f最佳模型: {best_model} (准确率: {best_accuracy:.4f})) # 特征重要性分析 print(\n特征重要性分析前10个特征:) top_features feature_importance.head(10) print(top_features.to_string(indexFalse)) # 单独可视化特征重要性 plt.figure(figsize(12, 8)) colors plt.cm.viridis(np.linspace(0.3, 0.9, len(top_features))) bars plt.barh(range(len(top_features)), top_features[importance], colorcolors) plt.yticks(range(len(top_features)), top_features[feature], fontsize11) plt.xlabel(重要性分数, fontsize12) plt.title(Top 10 特征重要性, fontsize14, fontweightbold) plt.gca().invert_yaxis() plt.grid(True, alpha0.3, axisx) # 在条形上添加数值 for i, (bar, importance) in enumerate(zip(bars, top_features[importance])): plt.text(importance 0.001, i, f{importance:.4f}, vacenter, fontsize9, fontweightbold) plt.tight_layout() plt.show() print(\n项目运行完成) print( * 70) # 运行程序 if __name__ __main__: # 检查必要的库 try: import lightgbm import xgboost print(所有依赖库已安装开始运行项目...) except ImportError as e: print(f缺少依赖库: {e}) print(请运行: pip install lightgbm xgboost scikit-learn pandas numpy matplotlib seaborn) exit(1) # 运行主程序 main()1. 模型比较综合图生成图片包含4个子图的综合比较图表图片内容总结这张综合图表全面比较了四个模型随机森林、XGBoost、动态集成、自适应集成的性能表现。左上角的柱状图直观显示各模型在测试集上的准确率动态集成模型以最高的准确率脱颖而出。右上角的ROC曲线展示了各模型的分类能力曲线越靠近左上角表示性能越好AUC值越高说明模型区分正负样本的能力越强。左下角的混淆矩阵显示了动态集成模型的预测结果细节对角线上的数值表示正确分类的样本数。右下角的特征重要性图展示了模型认为最重要的特征帮助理解哪些因素对购买行为预测影响最大。技术价值这张图提供了从宏观准确率到微观预测细节的全方位模型评估帮助数据科学家快速识别最佳模型并理解其决策依据。2. 特征重要性条形图生成图片水平条形图展示前10个最重要的特征图片内容总结这张条形图按重要性从高到低排列了影响用户购买行为的10个关键特征。最长的条形代表最重要的特征例如time_decay_mean时间衰减均值可能表示近期行为的权重最高stay_time_mean平均停留时间反映用户的参与度age年龄可能捕捉特定年龄段的购买倾向。每个条形的高度直接对应特征对预测的贡献度为业务决策提供直接参考。业务洞察时间因素时间衰减特征重要性最高说明近期用户行为比历史行为更有预测力参与度指标停留时间相关特征的重要性表明用户参与度是购买的关键前置信号用户属性年龄等人口统计特征仍然发挥重要作用行为模式行为统计特征如行为次数、行为类型分布提供了用户习惯的量化指标应用价值市场营销可将资源集中在最重要的特征对应的用户群体产品优化重点优化与高重要性特征相关的用户体验特征工程指导后续的特征选择和工程方向业务解释为模型预测提供可解释的业务理由