目录
1. 背景与动机
1.1 推荐系统的演进
传统方法的限制:
- 协同过滤: 仅能建模用户-项目互动,无法利用特征信息
- 逻辑回归(LR): 仅能学习线性关系,需要人工特征工程
- FM (Factorization Machine): 能学习二阶互动,但高阶互动能力有限
1.2 为什么需要DeepFM?
DeepFM解决了以下关键问题:
- 自动特征互动: 无需手动设计特征组合
- 低阶+高阶: 同时学习低阶和高阶特征互动
- 端到端学习: 特征学习和预测在同一个模型中完成
- 处理稀疏特征: 适用于推荐场景中的高维稀疏数据
1.3 DeepFM的创新点
| 模型 | 低阶互动 | 高阶互动 | 需要预训练 |
|---|---|---|---|
| FM | 二阶 | - | - |
| FFM | 二阶 | - | - |
| Wide&Deep | 需人工 | - | - |
| DeepFM | 自动 | 自动 | - |
2. 核心概念
2.1 基本概念
CTR预估问题:
- 输入: 用户特征 + 项目特征 + 上下文特征
- 输出: 点击概率 p ∈ [0, 1]
- 目标: 预测用户是否会点击某个项目
特征类型:
类别特征: 性别[男/女], 城市[北京/上海/...]
数值特征: 年龄, 价格, 评分
序列特征: 历史行为序列
2.2 特征工程
原始特征处理:
# 示例:电商推荐场景 原始特征: - 用户ID: 123456 - 商品ID: 789012 - 性别: 男 - 年龄: 28 - 类目: 电子产品 - 价格: 3999 转换为稀疏向量: [0,0,1,0,0, ...., 0,1,0, ...., 28, ...., 3999] 用户ID编码 商品ID编码 年龄 价格
3. 技术原理
3.1 整体架构
DeepFM = FM组件 + Deep组件 + 共享Embedding
输入层 (Sparse Features)
|
v
Embedding层 (共享)
/ \
/ \
v v
FM组件 Deep组件
(低阶交互) (高阶交互)
| |
v v
FM输出 DNN输出
\ /
\ /
v v
Sigmoid
|
v
预测概率
3.2 FM组件 (Factorization Machine)
目的: 学习一阶和二阶特征互动
公式:
y_FM = w? + Σ? w?x? + Σ? Σ?>? <v?, v?> x?x?
其中:
- w?: 全局偏置
- w?: 一阶权重
- v?: 特征i的embedding向量
- <v?, v?>: 向量内积
核心思想:
- 一阶项: 单个特征的影响
- 二阶项: 两两特征的互动影响
- 通过embedding内积避免参数爆炸
3.3 Deep组件 (Deep Neural Network)
目的: 学习高阶非线性特征互动
结构:
Embedding层
↓
[拼接所有embedding]
↓
全连接层1 (ReLU)
↓
全连接层2 (ReLU)
↓
全连接层3 (ReLU)
↓
输出层
特点:
- 多层感知机(MLP)结构
- 自动学习高阶互动
- 非线性激活函数
3.4 共享Embedding
为什么共享?
- 减少参数量: 避免重复学习
- 统一特征表示: FM和Deep看到相同的特征空间
- 提高泛化能力: 两个组件相互增强
4. 数学原理详解
4.1 问题定义
给定训练集 D = {(x?, y?), (x?, y?), …, (x?, y?)}
- x? ∈ R?: 第i个样本的特征向量 (高维稀疏)
- y? ∈ {0, 1}: 标签 (1=点击, 0=未点击)
目标: 学习函数 f: R? → [0, 1]
4.2 DeepFM完整公式
? = sigmoid(y_FM + y_DNN)
其中:
y_FM = <w, x> + Σ? Σ?>? <V?, V?> x?x?
y_DNN = W^(H+1) · a^(H) + b^(H+1)
a^(l+1) = σ(W^(l) · a^(l) + b^(l))
a^(0) = [V?, V?, ..., V?] (拼接所有embedding)
4.3 FM二阶项详解
原始形式 (复杂度 O(n?)):
Σ? Σ?>? <V?, V?> x?x?
优化形式 (复杂度 O(nk)):
? Σf=1? [(Σ? V?f x?)? - Σ? (V?f x?)?]
证明:
Σ? Σ?>? <V?, V?> x?x?
= ?[Σ? Σ? <V?, V?> x?x? - Σ? <V?, V?> x??]
= ? Σf [(Σ? V?f x?)? - Σ? V?f? x??]
直观理解:
# 对于特征互动 [用户ID, 商品ID, 性别] # FM会学习: <V_用户, V_商品> # 用户对商品的偏好 <V_用户, V_性别> # 用户特征和人口统计 <V_商品, V_性别> # 商品和性别的关系
4.4 损失函数
二分类交叉熵:
L = -1/N Σ? [y? log(??) + (1-y?) log(1-??)]
优化目标:
min L + λ||Θ||?
Θ (损失) (L2正则化)
其中 Θ = {w?, w, V, W^(l), b^(l)}
5. 网络架构
5.1 详细结构图
输入: [user_id, item_id, gender, age, category, ...]
↓ (one-hot/multi-hot编码)
稀疏特征: [0,0,1,0,...,0,1,0,...,1,0,...,28,...]
↓
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
Embedding层 (共享)
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
↓
[V?, V?, V?, ..., V?] (每个特征一个embedding)
↓
┌────────────────────────┐
│ │
↓ ↓
┌─────────┐ ┌──────────┐
│ FM组件 │ │ Deep组件 │
└─────────┘ └──────────┘
│ │
↓ ↓
一阶项 + 二阶项 Flatten层
│ ↓
│ Dense(256, relu)
│ ↓
│ Dense(128, relu)
│ ↓
│ Dense(64, relu)
│ ↓
│ Dense(1)
│ │
└────────┬───────────────┘
↓
Add层
↓
Sigmoid激活
↓
预测概率
5.2 参数规模分析
假设:
- 特征维度: d = 10,000 (稀疏)
- Embedding维度: k = 16
- DNN结构: [256, 128, 64]
- 实际非零特征: m = 50 (每个样本)
参数量:
Embedding层: d × k = 10,000 × 16 = 160,000
FM组件:
- 一阶权重: d = 10,000
- 二阶(共享embedding): 0 (使用Embedding层)
Deep组件:
- 输入层: m × k × 256 = 50 × 16 × 256 = 204,800
- 隐藏层1: 256 × 128 = 32,768
- 隐藏层2: 128 × 64 = 8,192
- 输出层: 64 × 1 = 64
总计: ≈ 406K 参数
6. 代码实现
6.1 PyTorch实现
import torch
import torch.nn as nn
class DeepFM(nn.Module):
def __init__(self, feature_sizes, embedding_size=16,
hidden_dims=[256, 128, 64], dropout=0.5):
"""
Args:
feature_sizes: List[int], 每个特征的取值数量
例: [1000, 2000, 2, 100] 表示4个特征
embedding_size: int, embedding维度
hidden_dims: List[int], DNN隐藏层维度
dropout: float, dropout比例
"""
super(DeepFM, self).__init__()
self.feature_sizes = feature_sizes
self.embedding_size = embedding_size
self.num_features = len(feature_sizes)
Embedding层 (共享)
self.embeddings = nn.ModuleList([
nn.Embedding(size, embedding_size)
for size in feature_sizes
])
FM组件
一阶权重
self.fm_first_order_weights = nn.ModuleList([
nn.Embedding(size, 1) for size in feature_sizes
])
self.fm_bias = nn.Parameter(torch.zeros(1))
二阶权重 (采用共享embedding)
无须额外参数
Deep组件
输入维度: num_features * embedding_size
input_dim = self.num_features * embedding_size
layers = []
for hidden_dim in hidden_dims:
layers.append(nn.Linear(input_dim, hidden_dim))
layers.append(nn.BatchNorm1d(hidden_dim))
layers.append(nn.ReLU())
layers.append(nn.Dropout(dropout))
input_dim = hidden_dim
layers.append(nn.Linear(input_dim, 1))
self.dnn = nn.Sequential(*layers)
前向传播函数
def forward(self, x):
"""
Args:
x: Tensor [batch_size, num_features]
每个元素代表特征的索引
Returns:
predictions: Tensor [batch_size, 1]
"""
# ============ Embedding查表 ============
# embeddings: [batch_size, num_features, embedding_size]
embeddings = [
emb(x[:, i]) for i, emb in enumerate(self.embeddings)
]
embeddings = torch.stack(embeddings, dim=1)
# ============ FM部分 ============
# 一阶项
first_order = [
w(x[:, i]) for i, w in enumerate(self.fm_first_order_weights)
]
first_order = torch.stack(first_order, dim=1)
y_first_order = torch.sum(first_order, dim=1) + self.fm_bias
# 二阶项: ?[Σ(Σ)? - Σ(?)]
sum_square = torch.sum(embeddings, dim=1) ** 2 # [bs, emb_size]
square_sum = torch.sum(embeddings ** 2, dim=1) # [bs, emb_size]
y_second_order = 0.5 * torch.sum(
sum_square - square_sum, dim=1, keepdim=True
) # [bs, 1]
y_fm = y_first_order + y_second_order
# ============ Deep部分 ============
# 展平: [batch_size, num_features * embedding_size]
dnn_input = embeddings.view(embeddings.size(0), -1)
y_dnn = self.dnn(dnn_input)
# ============ 组合输出 ============
y = y_fm + y_dnn
predictions = torch.sigmoid(y)
return predictions
训练代码
def train_deepfm(model, train_loader, optimizer, criterion, device):
model.train()
total_loss = 0
for batch_idx, (features, labels) in enumerate(train_loader):
features = features.to(device)
labels = labels.float().to(device)
# 正向传播
predictions = model(features)
loss = criterion(predictions, labels.unsqueeze(1))
# 反向传播
optimizer.zero_grad()
loss.backward()
optimizer.step()
total_loss += loss.item()
return total_loss / len(train_loader)
# ============ 评估代码 ============
def evaluate_deepfm(model, val_loader, device):
from sklearn.metrics import roc_auc_score
model.eval()
predictions_list = []
labels_list = []
with torch.no_grad():
for features, labels in val_loader:
features = features.to(device)
predictions = model(features)
predictions_list.append(predictions.cpu().numpy())
labels_list.append(labels.numpy())
predictions = np.concatenate(predictions_list)
labels = np.concatenate(labels_list)
auc = roc_auc_score(labels, predictions)
return auc
6.2 TensorFlow/Keras实现
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
class DeepFM_TF(keras.Model):
def __init__(self, feature_sizes, embedding_size=16,
hidden_dims=[256, 128, 64], dropout=0.5):
super(DeepFM_TF, self).__init__()
self.feature_sizes = feature_sizes
self.embedding_size = embedding_size
self.num_features = len(feature_sizes)
# 嵌入层
self.embeddings = [
layers.Embedding(size, embedding_size)
for size in feature_sizes
]
# FM一阶权重
self.fm_first_order = [
layers.Embedding(size, 1)
for size in feature_sizes
]
# 深层部分
self.dnn_layers = []
for dim in hidden_dims:
self.dnn_layers.extend([
layers.Dense(dim, activation='relu'),
layers.BatchNormalization(),
layers.Dropout(dropout)
])
self.dnn_layers.append(layers.Dense(1))
self.dnn = keras.Sequential(self.dnn_layers)
def call(self, inputs, training=False):
# 分割每个特征
feature_list = tf.split(inputs, self.num_features, axis=1)
feature_list = [tf.squeeze(f, axis=1) for f in feature_list]
# 嵌入
embeddings = [
emb(f) for f, emb in zip(feature_list, self.embeddings)
]
embeddings = tf.stack(embeddings, axis=1)
# FM一阶项
first_order = [
w(f) for f, w in zip(feature_list, self.fm_first_order)
]
first_order = tf.concat(first_order, axis=1)
y_first_order = tf.reduce_sum(first_order, axis=1, keepdims=True)
# FM二阶项
sum_square = tf.square(tf.reduce_sum(embeddings, axis=1))
square_sum = tf.reduce_sum(tf.square(embeddings), axis=1)
y_second_order = 0.5 * tf.reduce_sum(
sum_square - square_sum, axis=1, keepdims=True
)
y_fm = y_first_order + y_second_order
# Deep部分
dnn_input = tf.reshape(embeddings, [-1, self.num_features * self.embedding_size])
y_dnn = self.dnn(dnn_input, training=training)
# 组合
y = y_fm + y_dnn
return tf.sigmoid(y)
# 使用示例
model = DeepFM_TF(
feature_sizes=[1000, 2000, 2, 100],
embedding_size=16,
hidden_dims=[256, 128, 64]
)
model.compile(
optimizer='adam',
loss='binary_crossentropy',
metrics=['AUC']
)
# 训练
history = model.fit(
train_dataset,
validation_data=val_dataset,
epochs=10,
batch_size=512
)
6.3 特征预处理代码
import numpy as np
import pandas as pd
from sklearn.preprocessing import LabelEncoder
class FeatureProcessor:
def __init__(self):
self.encoders = {}
self.feature_sizes = []
def fit_transform(self, df, categorical_cols, numerical_cols):
"""
处理类别特征和数值特征
Args:
df: 原始数据DataFrame
categorical_cols: 类别特征列名列表
numerical_cols: 数值特征列名列表
Returns:
features: numpy数组 [n_samples, n_features]
feature_sizes: 每个特征的取值数量
"""
features = []
# 处理类别特征
for col in categorical_cols:
le = LabelEncoder()
encoded = le.fit_transform(df[col].astype(str))
features.append(encoded)
self.encoders[col] = le
self.feature_sizes.append(len(le.classes_))
# 处理数值特征 (分桶)
for col in numerical_cols:
# 等频分桶
binned = pd.qcut(df[col], q=10, labels=False, duplicates='drop')
features.append(binned)
n_bins = binned.max() + 1
self.feature_sizes.append(n_bins)
features = np.column_stack(features)
return features, self.feature_sizes
def transform(self, df, categorical_cols, numerical_cols):
"""转换新数据"""
features = []
for col in categorical_cols:
le = self.encoders[col]
encoded = le.transform(df[col].astype(str))
features.append(encoded)
# 数值特征处理...
return np.column_stack(features)
# 使用示例
df = pd.DataFrame({
'user_id': [1, 2, 3, 4, 5],
'item_id': [101, 102, 103, 104, 105],
'gender': ['M', 'F', 'M', 'F', 'M'],
'age': [25, 32, 45, 28, 35],
'price': [99.9, 199.9, 49.9, 299.9, 149.9],
'clicked': [1, 0, 1, 1, 0]
})
processor = FeatureProcessor()
X, feature_sizes = processor.fit_transform(
df,
categorical_cols=['user_id', 'item_id', 'gender'],
numerical_cols=['age', 'price']
)
y = df['clicked'].values
print(f"Features shape: {X.shape}")
print(f"Feature sizes: {feature_sizes}")
7. 实战运用
7.1 完整训练步骤
import torch
from torch.utils.data import Dataset, DataLoader
import numpy as np
# ============ 数据集定义 ============
class CTRDataset(Dataset):
def __init__(self, features, labels):
self.features = torch.LongTensor(features)
self.labels = torch.FloatTensor(labels)
def __len__(self):
return len(self.labels)
def __getitem__(self, idx):
return self.features[idx], self.labels[idx]
# ============ 主训练流程 ============
def main():
# 1. 数据读取
# 假定已完成特征工程
# X: [样本数量, 特征数量]
# y: [样本数量]
# feature_sizes: [特征1尺寸, 特征2尺寸, ...]
from sklearn.model_selection import train_test_split
X_train, X_val, y_train, y_val = train_test_split(
X, y, test_size=0.2, random_state=42
)
train_dataset = CTRDataset(X_train, y_train)
val_dataset = CTRDataset(X_val, y_val)
train_loader = DataLoader(
train_dataset, batch_size=512, shuffle=True, num_workers=4
)
val_loader = DataLoader(
val_dataset, batch_size=1024, shuffle=False, num_workers=4
)
# 2. 模型初始化
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = DeepFM(
feature_sizes=feature_sizes,
embedding_size=16,
hidden_dims=[256, 128, 64],
dropout=0.5
).to(device)
# 3. 优化器和损失函数
optimizer = torch.optim.Adam(
model.parameters(),
lr=1e-3,
weight_decay=1e-5 # L2正则化
)
criterion = nn.BCELoss() # 二分类交叉熵
# 学习率调整器
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
optimizer, mode='max', factor=0.5, patience=2
)
# 4. 训练循环
best_auc = 0
patience = 5
patience_counter = 0
for epoch in range(50):
# 训练
train_loss = train_deepfm(
model, train_loader, optimizer, criterion, device
)
# 验证
val_auc = evaluate_deepfm(model, val_loader, device)
print(f'Epoch {epoch+1}: Train Loss={train_loss:.4f}, Val AUC={val_auc:.4f}')
# 调整学习率
scheduler.step(val_auc)
# 早期停止
if val_auc > best_auc:
best_auc = val_auc
patience_counter = 0
# 保存最优模型
torch.save(model.state_dict(), 'best_deepfm.pth')
else:
patience_counter += 1
if patience_counter >= patience:
print(f'Early stopping at epoch {epoch+1}')
break
print(f'Best validation AUC: {best_auc:.4f}')
# 5. 加载最优模型
model.load_state_dict(torch.load('best_deepfm.pth'))
return model
if __name__ == '__main__':
model = main()
7.2 在线推理
class DeepFMPredictor:
def __init__(self, model_path, feature_processor, device='cpu'):
self.device = torch.device(device)
self.feature_processor = feature_processor
# 加载模型
self.model = DeepFM(
feature_sizes=feature_processor.feature_sizes,
embedding_size=16,
hidden_dims=[256, 128, 64]
)
self.model.load_state_dict(torch.load(model_path))
self.model.to(self.device)
self.model.eval()
def predict(self, user_features, item_features):
"""
单次预测
Args:
user_features: dict, {'user_id': 123, 'age': 25, ...}
item_features: dict, {'item_id': 456, 'price': 99.9, ...}
Returns:
score: float, 点击可能性
"""
# 整合特征
features = {**user_features, **item_features}
# 处理特征
X = self.feature_processor.transform(pd.DataFrame([features]))
X = torch.LongTensor(X).to(self.device)
# 进行预测
with torch.no_grad():
score = self.model(X).cpu().item()
return score
def batch_predict(self, users, items):
"""
批量预测 (用于排序)
Args:
users: List[dict], 用户特征列表
items: List[dict], 商品特征列表
Returns:
scores: numpy数组, 点击可能性
"""
# 创建特征
features_list = []
for user, item in zip(users, items):
features_list.append({**user, **item})
df = pd.DataFrame(features_list)
X = self.feature_processor.transform(df)
X = torch.LongTensor(X).to(self.device)
# 批量预测
with torch.no_grad():
scores = self.model(X).cpu().numpy().flatten()
return scores
# 使用实例
predictor = DeepFMPredictor(
model_path='best_deepfm.pth',
feature_processor=processor,
device='cuda'
)
# 单次预测
user = {'user_id': 123, 'age': 25, 'gender': 'M'}
item = {'item_id': 456, 'price': 99.9, 'category': '电子产品'}
score = predictor.predict(user, item)
print(f"Click probability: {score:.4f}")
# 批量预测
users = [user] * 100
items = [{'item_id': i, 'price': 99.9, 'category': '电子产品'} for i in range(100)]
scores = predictor.batch_predict(users, items)
# 排序推荐
top_k = 10
top_indices = np.argsort(scores)[::-1][:top_k]
recommended_items = [items[i] for i in top_indices]
7.3 实际案例: 音乐推荐
# 音乐推荐场景的特征设计
class MusicRecommendationFeatures:
"""
用户特征:
- user_id: 用户标识
- age_group: 年龄区间 [<18, 18-25, 25-35, 35-45, >45]
- gender: 性别
- city_level: 城市级别
- member_level: 会员级别
- listening_days: 听歌日数
物品特征:
- song_id: 歌曲标识
- artist_id: 歌手标识
- genre: 音乐类别
- language: 语言
- release_year: 发行年份
- duration: 持续时间
- popularity: 受欢迎程度
交叉特征:
- user_genre_match: 用户风格匹配
- time_of_day: 时间段 [早晨/上午/下午/晚上/深夜]
- device_type: 设备种类
"""
def __init__(self):
self.categorical_features = [
'user_id', 'song_id', 'artist_id', 'genre',
'gender', 'age_group', 'city_level', 'member_level',
'language', 'time_of_day', 'device_type'
]
self.numerical_features = [
'listening_days', 'duration', 'popularity', 'release_year'
]
def extract_features(self, user, song, context):
"""提取特征"""
features = {}
# 用户特征
features['user_id'] = user['id']
features['age_group'] = self._get_age_group(user['age'])
features['gender'] = user['gender']
features['city_level'] = user['city_level']
features['member_level'] = user['member_level']
features['listening_days'] = user['listening_days']
# 歌曲特征
features['song_id'] = song['id']
features['artist_id'] = song['artist_id']
features['genre'] = song['genre']
features['language'] = song['language']
features['release_year'] = song['release_year']
features['duration'] = song['duration']
features['popularity'] = song['popularity']
# 上下文特征
features['time_of_day'] = self._get_time_period(context['timestamp'])
features['device_type'] = context['device']
return features
def _get_age_group(self, age):
if age < 18:
return '<18'
elif age < 25:
return '18-25'
elif age < 35:
return '25-35'
elif age < 45:
return '35-45'
else:
return '>45'
def _get_time_period(self, timestamp):
hour = timestamp.hour
if 6 <= hour < 9:
return '早晨'
elif 9 <= hour < 12:
return '上午'
elif 12 <= hour < 18:
return '下午'
elif 18 <= hour < 23:
return '晚上'
else:
return '深夜'
8. 调优技巧
8.1 超参数调优
关键超参数:
hyperparameters = {
# Embedding维度
'embedding_size': [8, 16, 32, 64],
# 建议: 8-16对多数场景足够
# DNN结构
'hidden_dims': [
[256, 128, 64],
[512, 256, 128],
[128, 64, 32]
],
# 建议: 逐层减半,3-4层适宜
# Dropout
'dropout': [0.3, 0.5, 0.7],
# 建议: 0.5是一个好的起点
# 学习率
'learning_rate': [1e-4, 5e-4, 1e-3, 5e-3],
# 建议: 从1e-3开始,使用学习率衰减
# Batch size
'batch_size': [256, 512, 1024, 2048],
# 建议: 尽可能大,受限于GPU内存
# L2正则化
'weight_decay': [1e-6, 1e-5, 1e-4],
# 建议: 1e-5是一个好的起点
}
网格搜索示例:
from sklearn.model_selection import ParameterGrid
param_grid = {
'embedding_size': [16, 32],
'hidden_dims': [[256, 128, 64], [512, 256, 128]],
'dropout': [0.3, 0.5],
'learning_rate': [1e-3, 5e-4]
}
best_auc = 0
best_params = None
for params in ParameterGrid(param_grid):
print(f"Testing params: {params}")
model = DeepFM(
feature_sizes=feature_sizes,
embedding_size=params['embedding_size'],
hidden_dims=params['hidden_dims'],
dropout=params['dropout']
)
optimizer = torch.optim.Adam(
model.parameters(),
lr=params['learning_rate']
)
# 训练和验证...
val_auc = train_and_evaluate(model, optimizer, ...)
if val_auc > best_auc:
best_auc = val_auc
best_params = params
print(f"Best params: {best_params}")
print(f"Best AUC: {best_auc}")
8.2 特征工程技巧
- 类别特征处理:
- 数值特征处理:
- 序列特征处理:
# 高基数特征: Hash编码
def hash_feature(value, num_buckets=10000):
return hash(value) % num_buckets
# 低频特征: 归并到<UNK>
def handle_rare_categories(df, col, threshold=100):
value_counts = df[col].value_counts()
rare_values = value_counts[value_counts < threshold].index
df[col] = df[col].apply(
lambda x: '<UNK>' if x in rare_values else x
)
return df
# 归一化
from sklearn.preprocessing import StandardScaler, MinMaxScaler
scaler = MinMaxScaler()
df['age_normalized'] = scaler.fit_transform(df[['age']])
# 分桶 (更推荐)
df['age_bucket'] = pd.cut(
df['age'],
bins=[0, 18, 25, 35, 45, 60, 100],
labels=['0-18', '18-25', '25-35', '35-45', '45-60', '60+']
)
用户历史行为序列
def encode_sequence_feature(user_history, max_len=50):
"""
将用户历史行为转换为固定长度序列
"""
# 截短或补充
if len(user_history) > max_len:
user_history = user_history[-max_len:]
else:
user_history = [0] * (max_len - len(user_history)) + user_history
return user_history
统计特征
def extract_sequence_statistics(user_history):
"""从序列中获取统计特征"""
return {
'history_length': len(user_history),
'unique_items': len(set(user_history)),
'most_common_item': Counter(user_history).most_common(1)[0][0],
# 可用作DeepFM的额外特征
}
8.3 训练技巧
- 负采样
class NegativeSampler: """负采样: 减少负面示例的数量,加快训练速度""" def __init__(self, neg_ratio=4): self.neg_ratio = neg_ratio def sample(self, X, y): pos_indices = np.where(y == 1)[0] neg_indices = np.where(y == 0)[0] # 随机选取负面示例 num_neg_samples = len(pos_indices) * self.neg_ratio sampled_neg_indices = np.random.choice( neg_indices, size=min(num_neg_samples, len(neg_indices)), replace=False ) # 结合 indices = np.concatenate([pos_indices, sampled_neg_indices]) np.random.shuffle(indices) return X[indices], y[indices] - 类别不平衡处理
# 方法1: 权重损失 pos_weight = (y == 0).sum() / (y == 1).sum() criterion = nn.BCEWithLogitsLoss(pos_weight=torch.tensor([pos_weight])) # 方法2: 焦点损失 class FocalLoss(nn.Module): def __init__(self, alpha=0.25, gamma=2): super(FocalLoss, self).__init__() self.alpha = alpha self.gamma = gamma def forward(self, inputs, targets): BCE_loss = F.binary_cross_entropy_with_logits( inputs, targets, reduction='none' ) pt = torch.exp(-BCE_loss) F_loss = self.alpha * (1-pt)**self.gamma * BCE_loss return F_loss.mean() - 梯度裁剪
# 防止梯度爆炸 torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=5.0)
8.4 性能优化
- 批处理优化
# 使用DataLoader的pin_memory和num_workers train_loader = DataLoader( dataset, batch_size=1024, shuffle=True, num_workers=4, # 多线程加载 pin_memory=True, # 锁定页面内存,加速GPU传输 prefetch_factor=2 # 提前获取批次 ) - 混合精度训练
from torch.cuda.amp import autocast, GradScaler scaler = GradScaler() for features, labels in train_loader: optimizer.zero_grad() # 自动混合精度 with autocast(): predictions = model(features)
loss = criterion(predictions, labels)
# 梯度放大
scaler.scale(loss).backward()
scaler.step(optimizer)
scaler.update()
3. 模型量化 (加速推理)
:
# 动态量化
quantized_model = torch.quantization.quantize_dynamic(
model,
{nn.Linear, nn.Embedding},
dtype=torch.qint8
)
# 模型体积缩减2-4倍,推理速率提高2-3倍
9. 优缺点解析
9.1 优点
1. 端到端的学习
不需手动特征工程
特征表达和预测同步优化
减少开发投入
2. 同时捕捉低级和高级交互
FM模块: 二级交互,具有较强的解释性
DNN模块: 高级复杂交互
共享嵌入: 增强泛化性能
3. 适应稀疏数据
嵌入技术管理高维稀疏特性
参数共享减轻过拟合
在CTR预估场景中表现出色
4. 行业验证
华为应用市场CTR预估
多家公司在线部署
成效显著改善
9.2 缺点
1. 计算成本高
相比传统模型:
- LR: 推理耗时 1ms
- FM: 推理耗时 5ms
- DeepFM: 推理耗时 20-50ms
解决方案:
- 模型蒸馏
- 量化加速
- 工程优化
2. 参数调整复杂
超参数众多
需要广泛测试
依赖实践经验
3. 解释性较低
DNN部分像黑箱
难以阐明特定预测
不利于业务剖析
解决方案:
# 引入注意力机制提升解释性
class DeepFM_with_Attention(nn.Module):
def __init__(self, ...):
# ... 原始架构
self.attention = nn.MultiheadAttention(
embed_dim=embedding_size,
num_heads=4
)
def forward(self, x):
# ... 前部
# 注意力层
embeddings_attended, attention_weights = self.attention(
embeddings, embeddings, embeddings
)
# attention_weights有助于解释
# ... 后部
4. 冷启动难题
新用户/新项目缺乏训练资料
嵌入质量不佳
解决方案:
# 元学习 + 特征填充
class MetaDeepFM(nn.Module):
"""利用元特征解决冷启动问题"""
def __init__(self, ...):
# 主模型
self.main_model = DeepFM(...)
# 元特征网络 (用于冷启动)
self.meta_network = nn.Sequential(
nn.Linear(meta_feature_dim, embedding_size),
nn.ReLU()
)
def forward(self, x, is_cold_start, meta_features=None):
if is_cold_start:
# 利用元特征生成嵌入
cold_embedding = self.meta_network(meta_features)
# 替代冷启动特征的嵌入
# ...
return self.main_model(x)
9.3 与其它模型的比较
模型
AUC提升
推理时间
训练难度
解释性
LR
基准
1x
低
高
FM
+1-2%
3-5x
低
中
FFM
+2-3%
10-15x
中
中
Wide&Deep
+2-4%
15-20x
中
低
DeepFM
+3-5%
20-50x
中
低
xDeepFM
+4-6%
30-60x
高
低
AutoInt
+4-6%
40-80x
高
中
10. 高级主题
10.1 DeepFM变体
1. xDeepFM (极限DeepFM)
创新: Compressed Interaction Network (CIN)
- 显式学习高阶特征交互
- 向量级别的交互
- 可控的网络复杂度
结构:
Input → Embedding → CIN + DNN → Output
2. AFM (注意型FM)
创新: 注意力机制加权特征交互
- 不同特征对的重要性不同
- 自适应学习交互权重
公式:
y = w? + Σw?x? + Σ? Σ?>? α?? <v?, v?> x?x?
↑ 注意力权重
3. FGCNN (特征生成CNN)
创新: CNN自动生成新特征
- 卷积层挖掘局部模式
- 生成组合特征
- 动态特征工程
10.2 多任务学习
class MultiTaskDeepFM(nn.Module):
"""
同时预测多个目标:
- 点击 (CTR)
- 转化 (CVR)
- 时长 (Duration)
"""
def __init__(self, feature_sizes, embedding_size=16,
shared_dims=[256, 128], task_dims=[64, 32]):
super().__init__()
# 共享部分
self.embeddings = nn.ModuleList([...])
self.shared_dnn = nn.Sequential([...])
# 任务特异部分
self.ctr_tower = nn.Sequential([...])
self.cvr_tower = nn.Sequential([...])
self.duration_tower = nn.Sequential([...])
def forward(self, x):
# 嵌入
embeddings = self.get_embeddings(x)
# FM模块
y_fm = self.fm_layer(embeddings)
# 共享DNN
shared_out = self.shared_dnn(embeddings.flatten(1))
# 任务塔
ctr_out = self.ctr_tower(shared_out)
cvr_out = self.cvr_tower(shared_out)
duration_out = self.duration_tower(shared_out)
# 组合输出
ctr_pred = torch.sigmoid(y_fm + ctr_out)
cvr_pred = torch.sigmoid(cvr_out)
duration_pred = F.relu(duration_out)
return ctr_pred, cvr_pred, duration_pred
# 多任务损失
def multi_task_loss(ctr_pred, cvr_pred, duration_pred,
ctr_label, cvr_label, duration_label):
loss_ctr = F.binary_cross_entropy(ctr_pred, ctr_label)
loss_cvr = F.binary_cross_entropy(cvr_pred, cvr_label)
loss_duration = F.mse_loss(duration_pred, duration_label)
# 权重组合
total_loss = 0.5 * loss_ctr + 0.3 * loss_cvr + 0.2 * loss_duration
return total_loss
10.3 序列建模增强
class DeepFM_with_DIN(nn.Module):
"""
DeepFM + 深度兴趣网络
考虑用户行为序列,捕捉兴趣演变
"""
def __init__(self, ...):
super().__init__()
# 原有DeepFM组件
self.deepfm = DeepFM(...)
# 注意力层 (DIN核心)
self.attention = nn.Sequential(
nn.Linear(embedding_size * 3, 64),
nn.ReLU(),
nn.Linear(64, 1)
)
def forward(self, x, user_history, candidate_item):
"""
参数:
x: 基础特征
user_history: [batch_size, seq_len, embedding_size]
candidate_item: [batch_size, embedding_size]
"""
# DeepFM基础输出
base_output = self.deepfm(x)
# 注意力计算
# 将候选项目与每个历史项目对比
candidate_expanded = candidate_item.unsqueeze(1).expand_as(user_history)
# [user, candidate, user*candidate]
attention_input = torch.cat([
user_history,
candidate_expanded,
user_history * candidate_expanded
], dim=-1)
# [batch_size, seq_len, 1]
attention_weights = F.softmax(
self.attention(attention_input), dim=1
)
# 加权求和
user_interest = torch.sum(
attention_weights * user_history, dim=1
)
# 组合DeepFM输出和用户兴趣
# ... (额外的融合层)
return final_output
10.4 图增强
class DeepFM_with_GNN(nn.Module):
"""
DeepFM + 图神经网络
利用用户-物品交互图
"""
def __init__(self, ...):
super().__init__()
self.deepfm = DeepFM(...)
# GNN组件
from torch_geometric.nn import GCNConv
self.gcn1 = GCNConv(embedding_size, embedding_size)
self.gcn2 = GCNConv(embedding_size, embedding_size)
def forward(self, x, edge_index, batch):
"""
Args:
edge_index: 图的连接 [2, num_edges]
batch: 节点至样本的映射
"""
# GNN传递
node_embeddings = self.get_initial_embeddings(x)
node_embeddings = F.relu(self.gcn1(node_embeddings, edge_index))
node_embeddings = self.gcn2(node_embeddings, edge_index)
# 汇集图数据
graph_features = global_mean_pool(node_embeddings, batch)
# 结合DeepFM
deepfm_output = self.deepfm(x)
# 整合
combined = torch.cat([deepfm_output, graph_features], dim=-1)
# ... (整合层)
return final_output
10.5 AutoML与神经架构搜索
# 用于DeepFM的神经架构搜索
class SearchableDeepFM(nn.Module):
"""可探索的DeepFM框架"""
def __init__(self, feature_sizes, embedding_size=16):
super().__init__()
# 嵌入 (固定)
self.embeddings = nn.ModuleList([...])
# 可探索的DNN结构
self.operations = nn.ModuleList([
nn.Linear(input_size, 256),
nn.Linear(input_size, 512),
nn.Linear(input_size, 128),
# ... 更多备选操作
])
# 架构参数 (可训练)
self.arch_params = nn.Parameter(
torch.randn(len(self.operations))
)
def forward(self, x):
# 嵌入
embeddings = self.get_embeddings(x)
# FM部分 (固定)
y_fm = self.fm_layer(embeddings)
# 可探索DNN
# 使用Gumbel-Softmax抽样
weights = F.gumbel_softmax(self.arch_params, tau=1, hard=False)
dnn_input = embeddings.flatten(1)
dnn_output = sum(
w * op(dnn_input)
for w, op in zip(weights, self.operations)
)
return torch.sigmoid(y_fm + dnn_output)
# 使用DARTS进行架构搜索
def search_architecture(model, train_loader, val_loader):
arch_optimizer = torch.optim.Adam(
[model.arch_params], lr=3e-4
)
weight_optimizer = torch.optim.Adam(
model.parameters(), lr=1e-3
)
for epoch in range(50):
# 训练权重
for x, y in train_loader:
loss = train_step(model, x, y, weight_optimizer)
# 训练架构
for x, y in val_loader:
arch_loss = train_step(model, x, y, arch_optimizer)
# 确定最优架构
best_arch = torch.argmax(model.arch_params)
return best_arch
总结与最佳实践
核心要点回顾
DeepFM = FM + DNN + 共享嵌入
FM学习低级交互 (一阶+二阶)
DNN学习高级非线性交互
共享嵌入减少参数,提高泛化能力
适用场景
点击率预测 (广告、推荐系统)
点击率估计
高维稀疏特征环境
实现关键
特征工程: 类别编码、数值分桶
超参数: embedding_size=16, hidden=[256,128,64]
训练技巧: 负例采样、类别均衡、提前停止
性能优化
批量处理: 尽可能增大batch_size
混合精度训练
模型量化 (推理)
学习路径建议
第1周: 理论学习
- FM原理
- DNN基础
- DeepFM论文
第2周: 代码实现
- PyTorch基础
- 简单数据集实验
- 理解每个组件
第3周: 实战项目
- 真实数据处理
- 特征工程
- 完整训练流程
第4周: 调优与部署
- 超参数调优
- 线上推理
- 性能优化
推荐资源
论文:
- DeepFM原论文: “DeepFM: A Factorization-Machine based Neural Network for CTR Prediction” (IJCAI 2017)
- FM论文: “Factorization Machines” (ICDM 2010)
- Wide&Deep: “Wide & Deep Learning for Recommender Systems” (DLRS 2016)
代码:
- GitHub: https://github.com/shenweichen/DeepCTR-Torch - 开源实现集合,包含多个CTR模型
书籍:
- 《深度学习推荐系统》王喆
- 《推荐系统实践》项亮
课程:
- Stanford CS246: Mining Massive Datasets
- Coursera: Recommender Systems Specialization
附录
A. 常见问题FAQ
Q1: DeepFM vs Wide&Deep?
A: DeepFM无需手动设计Wide部分特征,FM自动学习二阶交互;共享嵌入更高效。
Q2: 嵌入维度如何选择?
A: 通常在8-64之间,大数据量使用16-32,小数据量使用8-16。过大容易过拟合。
Q3: 如何处理新特征值?
A:
训练时预留 token
使用哈希编码
基于元特征生成嵌入
Q4: 训练很慢怎么办?
A:
增大batch_size
负采样减少负样本
使用混合精度训练
多GPU并行
Q5: 如何解释模型预测?
A:
分析FM二阶项权重
添加注意力机制
使用SHAP/LIME解释工具
B. 数学符号表
| 符号 | 含义 |
|---|---|
| x | 输入特征向量 |
| y | 标签 |
| ? | 预测值 |
| d | 特征维度 |
| k | 嵌入维度 |
| V | 嵌入矩阵 |
| w | 一阶权重 |
| W | 神经网络权重矩阵 |
| σ | Sigmoid激活函数 |
| <·,·> | 向量内积 |
C. 代码清单
完整代码已在第6节提供,包括:
- PyTorch实现
- TensorFlow实现
- 特征预处理
- 训练流程
- 推理代码
D. 实验结果示例
| 数据集 | 模型 | AUC | Logloss |
|---|---|---|---|
| Criteo | LR | 0.7812 | 0.4652 |
| Criteo | FM | 0.7925 | 0.4521 |
| Criteo | DeepFM | 0.8043 | 0.4412 |
| Avazu | LR | 0.7623 | 0.3812 |
| Avazu | FM | 0.7741 | 0.3745 |
| Avazu | DeepFM | 0.7856 | 0.3671 |
总结
DeepFM是推荐系统中的经典模型,平衡了模型复杂度和效果提升。通过本教程,您应该已经掌握:
- DeepFM的原理和架构
- 数学公式和推导
- 完整代码实现
- 实战应用技巧
- 调优和部署方法
下一步行动
- 在自己的数据集上实验
- 尝试不同的特征工程
- 调优超参数
- 探索变体模型 (xDeepFM, AutoInt等)
祝您学习愉快!如有问题欢迎交流。


雷达卡


京公网安备 11010802022788号







