模块 6:模型提取与推理攻击
对手如何窃取专有模型、重建训练数据、推理成员身份, 以及利用加密网络流量——以及抵御这些攻击的防御措施。
模型提取基础
机器学习模型是一种知识产权。它编码了多年的领域专业知识、数百万标注训练样本、 大量计算投资,以及通过详尽实验优化的专有架构选择。据报道,训练GPT-4级别的模型 仅计算成本就需要数千万到数亿美元——这个数字还不包括在整理训练数据、调整超参数 或执行对齐工作方面投入的人力专长。模型提取,也称为模型窃取, 是通过查询模型的公开API并在查询-响应对上训练替代模型来克隆专有模型行为的做法。 其结果是:对手可以以原始训练成本的一小部分获得目标模型的功能等价物。
为什么模型提取很重要
攻击模型知识产权的动机是多样的,且往往相互叠加。首先,是直接的知识产权盗窃: 竞争对手可以提取生产模型并将其作为自己的产品部署,绕过原始开发者的许可费用、服务条款和竞争壁垒。 其次,被窃取的替代模型可以作为进一步攻击的跳板。当你拥有对模型梯度的白盒访问权限时, 生成对抗样本要容易得多。通过首先提取替代模型,攻击者将黑盒问题转化为白盒问题,使下游对抗攻击的 效果提高几个数量级。第三,提取可以用来规避速率限制和成本控制: 一旦你拥有本地副本,就可以在没有按查询收费或使用监控的情况下运行推理。
最后,也许是最令人担忧的是,提取能够实现隐私推理。在敏感数据 (医疗记录、财务记录、私人通信)上训练的模型,即使仅通过API访问,也可能泄露 有关其训练集的信息。提取为对手提供了一个持久的本地工件,可以在闲暇时进行探测, 而无需担心API提供商维护的审计追踪。
精确提取与功能等价
模型提取存在两个不同的目标。精确提取试图恢复目标模型的精确权重和架构, 在每个可能的输入上重现其行为——包括边界情况。对于简单的模型类别(例如小型ReLU网络), 这在理论上是可能的,其中唯一确定权重所需的查询数量随模型大小多项式增长, 但对于现代数十亿参数的LLM来说,这在计算上仍然是不可行的。
相比之下,功能等价满足于一个在与任务相关的输入分布上匹配目标模型行为的替代模型。 替代模型不需要共享目标的架构或权重;它只需要在攻击者关心的输入上产生类似的预测。 这是对大多数商业部署具有实际意义的威胁,比精确提取需要的查询少得多。研究表明, 商业NLP模型的功能等价物可以通过几十万次API调用实现——完全在资金充足的对手的预算范围内。 [Tramèr et al.]
攻击目标
在不访问权重或架构的情况下克隆模型行为。
攻击面
任何返回标签或概率分数的公共预测API。
攻击者成本
API查询费用 + 替代模型训练计算(通常比原始模型便宜100-10,000倍)。
受害者损失
收入、竞争优势、训练数据主体的下游隐私风险。
基于查询的模型窃取
经典的模型窃取攻击分为三个阶段:对目标API的系统化查询、查询-响应数据集的积累, 以及在该数据集上训练替代模型。这个流程的简单性掩盖了其有效性。现代研究表明, 即使替代模型与目标模型的架构不同,在经过精心选择的查询-响应对训练后, 也能达到几乎相同的任务性能。 [Tramèr et al.]
查询策略:主动学习以提高效率
一个天真的攻击者可能会均匀随机地采样输入。而老练的攻击者会使用主动学习 来选择能最大化信息增益的查询。核心洞察在于并非所有输入的信息量都是相等的: 靠近模型决策边界的点比牢固处于某一类区域的点携带更多关于模型函数的信息。 主动学习启发式方法(不确定性采样、委员会查询、核心集选择)使攻击者能够以 显著更少的查询构建准确的替代模型——有时比均匀采样少一个数量级。
攻击者从一组种子未标记输入开始,查询目标,然后通过询问以下问题来选择下一批查询: "如果标记了哪些输入,将最大程度地减少替代模型的不确定性?"每批之后重新训练替代模型, 然后重复这个过程。这个闭环正是模型窃取即使对仅返回硬标签(无概率)的API也能 具有毁灭性效率的原因:即使是二元成员信息也能逐步约束替代模型。
训练替代模型
一旦积累了足够的(输入, 目标响应)对,攻击者就训练一个替代模型——该模型不需要
共享目标的架构——以最小化这些对上的损失。软标签(概率分布)比硬标签的信息丰富得多:
[cat: 0.72, dog: 0.25, fox: 0.03]的预测传达了目标模型在该输入附近的
置信度几何结构,而硬标签cat则丢弃了相对分数。当API返回置信度分数时,
攻击者应将其用作训练目标(知识蒸馏)。当只有硬标签可用时,替代模型端的温度缩放
和标签平滑可以部分补偿。
实现代码
import numpy as np
import requests
from sklearn.neural_network import MLPClassifier
from sklearn.preprocessing import StandardScaler
from scipy.stats import entropy
class ModelExtractor:
"""
Black-box model extraction via active-learning-guided query selection.
Trains a local surrogate MLPClassifier to mimic a remote target API.
"""
def __init__(self, target_api_url, n_classes=2, query_budget=5000):
self.target_url = target_api_url
self.n_classes = n_classes
self.query_budget = query_budget
self.queries = [] # List[np.ndarray] — inputs sent to target
self.responses = [] # List[int or list] — labels/probabilities returned
# Surrogate architecture: two hidden layers
self.surrogate = MLPClassifier(
hidden_layer_sizes=(100, 50),
activation='relu',
max_iter=500,
random_state=42
)
self.scaler = StandardScaler()
self._fitted = False
def query_target(self, inputs):
"""Send inputs to the target API and record (input, response) pairs."""
for x in inputs:
try:
resp = requests.post(
self.target_url,
json={"input": x.tolist()},
timeout=10
)
resp.raise_for_status()
data = resp.json()
# Accept either {"prediction": 1} or {"probabilities": [0.3, 0.7]}
label = data.get("prediction", np.argmax(data.get("probabilities", [0])))
self.queries.append(x)
self.responses.append(label)
except (requests.RequestException, Key错误) as e:
print(f"Query failed: {e}")
def train_surrogate(self):
"""Fit the surrogate on all accumulated (query, response) pairs."""
X = np.array(self.queries)
y = np.array(self.responses)
X_scaled = self.scaler.fit_transform(X)
self.surrogate.fit(X_scaled, y)
self._fitted = True
train_acc = self.surrogate.score(X_scaled, y)
return train_acc
def select_uncertain_batch(self, candidate_pool, batch_size=100):
"""
活跃 learning: pick the batch_size candidates where the surrogate
is most uncertain (highest entropy over class probabilities).
Requires surrogate to be trained at least once.
"""
if not self._fitted:
# Cold start — return random batch
idx = np.random.choice(len(candidate_pool), batch_size, replace=False)
return candidate_pool[idx]
X_scaled = self.scaler.transform(candidate_pool)
proba = self.surrogate.predict_proba(X_scaled) # (N, n_classes)
uncertainties = np.array([entropy(p) for p in proba]) # Shannon entropy
# Select top-k most uncertain samples
top_idx = np.argsort(uncertainties)[-batch_size:]
return candidate_pool[top_idx]
def run_extraction(self, input_domain_sampler, batch_size=100):
"""
Full extraction loop.
input_domain_sampler: callable() -> np.ndarray of shape (N, d)
"""
rounds = self.query_budget // batch_size
for round_i in range(rounds):
# 1. Sample a large candidate pool from the input domain
candidates = input_domain_sampler()
# 2. Use active learning to pick the most informative batch
batch = self.select_uncertain_batch(candidates, batch_size)
# 3. Query the target API
self.query_target(batch)
# 4. Retrain surrogate on all data so far
if len(self.queries) >= 200: # Minimum for meaningful training
acc = self.train_surrogate()
print(f"Round {round_i+1}: {len(self.queries)} queries, surrogate accuracy={acc:.3f}")
return self.surrogate
# ── 示例 usage ──────────────────────────────────────────────────────
# Suppose the target model classifies 20-dimensional input vectors
def domain_sampler():
"""Returns 500 random candidates from input domain."""
return np.random.uniform(-1, 1, size=(500, 20)).astype(np.float32)
extractor = ModelExtractor(
target_api_url="https://api.example.com/predict",
n_classes=3,
query_budget=2000
)
surrogate_model = extractor.run_extraction(domain_sampler, batch_size=100)
print(f"Extraction complete. Total queries: {len(extractor.queries)}")
最大化信息增益的优化
除了不确定性采样之外,攻击者还可以利用其他几种策略。 基于Jacobian的数据增强(JBDA)通过对现有标记输入施加小梯度步骤 来合成新的训练点,在不需要额外API调用的情况下生成靠近决策边界的输入。 无模型方法使用生成模型从头合成多样化的输入。对于NLP模型, 提示链——攻击者一次系统地改变一个语言维度——允许使用结构化查询集 高效覆盖响应面。 [Papernot et al.]
从LLM中提取训练数据
从精确的技术意义上说,大语言模型是其训练语料库的压缩摘要。它们学习统计规律—— 从拼写模式到完整的逐字段落——并将这些知识存储在数十亿参数中。这产生了一个漏洞: 足够精确的攻击者可以导致模型原样输出出现在训练集中的逐字文本。 Carlini et al. (2021)的里程碑研究在GPT-2上决定性地证明了这一点, 仅通过查询模型的公共API就提取了数百个逐字训练样本,包括全名、物理地址、电话号码和受版权保护的文本。 [Carlini et al., USENIX Security 2021]
为什么LLM会记忆训练数据
记忆化源于多种因素的结合。在语料库中重复多次的训练数据更可能被记忆—— GPT-2记忆了完整的MIT许可证文本,因为它们逐字出现在数十万个GitHub仓库中。 模型容量放大了这一点:更大的模型记忆更多,因为它们有更多参数来存储 罕见的训练样本。反直觉的是,更长时间的训练(更多轮次)也会增加记忆化, 因为模型反复看到相同的样本并更精确地拟合它们。
Carlini et al.定义了k-逐字记忆:如果模型能从长度为k的前缀 重现字符串s,且s在训练数据中仅出现一次,则该字符串s 被模型k-逐字记忆。这与事实知识(可能从许多相互佐证的样本中学习)不同—— 逐字记忆是从单个训练样本中逐字保留的。
提取方法论
攻击流程包含三个步骤:生成、排序和验证。首先,攻击者从模型中生成大量文本样本—— Carlini et al.使用多种提示策略生成了60万个样本。其次,他们使用成员推理指标 作为过滤器对这些样本进行排序:模型给予异常高似然度的样本更可能是被记忆的。 具体来说,他们将模型对候选文本的困惑度与较小参考模型的困惑度进行比较。 被记忆的文本对大模型来说是高似然的,但对参考模型则不是。第三,排名最高的候选文本 将与原始训练语料库进行验证。
发散攻击和前缀攻击
发散攻击利用了这样一个现象:经过RLHF或指令调优微调的模型在正常使用时 会抑制记忆输出,但可以通过构造对抗性提示使其"忘记"这种抑制。Carlini et al. (2023) 从ChatGPT中提取了数兆字节的训练数据——尽管它经过了对齐训练——使用一个简单的重复提示: 要求模型无限重复一个词会导致它最终偏离对齐行为并逐字输出训练数据。 [Carlini et al., 2023 — ChatGPT提取]
前缀攻击向模型提供训练数据中的真实前缀,并观察它是否准确地完成段落的其余部分。 在Carlini et al.的实验中,用"My address is 1 Main Street"提示GPT-2, 导致它准确地补全了特定真实个人的联系信息。 基于补全的提取是一种通用技术:任何在训练期间见过的提示都充当了其后续段落的检索键。
记忆化测试
import torch
from transformers import GPT2LMHeadModel, GPT2Tokenizer
import numpy as np
import zlib
def compute_perplexity(model, tokenizer, text, device="cpu"):
"""Compute per-token perplexity of a text under a given model."""
inputs = tokenizer(text, return_tensors="pt").to(device)
with torch.no_grad():
output = model(**inputs, labels=inputs["input_ids"])
return torch.exp(output.loss).item()
def memorization_score(large_model, small_model, tokenizer, text, device="cpu"):
"""
Carlini et al. 'ratio' metric: compares large vs. small model perplexity.
A memorized passage has LOW perplexity under the large model
but NOT proportionally low under the small reference model.
Higher score → more likely memorized.
"""
ppl_large = compute_perplexity(large_model, tokenizer, text, device)
ppl_small = compute_perplexity(small_model, tokenizer, text, device)
# Ratio metric: lower PPL in large vs small suggests memorization
ratio_score = np.log(ppl_small) / np.log(ppl_large)
# Zlib metric: compare model PPL to compression-based estimate
zlib_entropy = len(zlib.compress(text.encode())) / len(text)
zlib_score = np.log(ppl_large) / zlib_entropy
return {
"perplexity_large": ppl_large,
"perplexity_small": ppl_small,
"ratio_score": ratio_score, # higher → more suspect
"zlib_score": zlib_score, # higher → less compressible = natural text
}
def generate_candidates(model, tokenizer, n_samples=200, max_length=256, device="cpu"):
"""
Generate n_samples completions from the model with an empty prefix.
Returns a list of generated strings.
"""
model.eval()
candidates = []
with torch.no_grad():
for _ in range(n_samples):
input_ids = torch.tensor([[tokenizer.bos_token_id]]).to(device)
output = model.generate(
input_ids,
max_new_tokens=max_length,
do_sample=True,
top_k=40,
temperature=1.0,
pad_token_id=tokenizer.eos_token_id
)
text = tokenizer.decode(output[0], skip_special_tokens=True)
candidates.append(text)
return candidates
# ── 示例: screen 200 GPT-2 generations for possible memorized text ──
if __name__ == "__main__":
device = "cuda" if torch.cuda.is_available() else "cpu"
tokenizer = GPT2Tokenizer.from_pretrained("gpt2-xl")
model_xl = GPT2LMHeadModel.from_pretrained("gpt2-xl").to(device)
model_sm = GPT2LMHeadModel.from_pretrained("gpt2").to(device) # reference
candidates = generate_candidates(model_xl, tokenizer, n_samples=200, device=device)
scored = []
for text in candidates:
scores = memorization_score(model_xl, model_sm, tokenizer, text, device)
scored.append((text, scores))
# 排序 by ratio score descending — highest ratio = most likely memorized
scored.sort(key=lambda x: x[1]["ratio_score"], reverse=True)
print("Top 5 candidates most likely to contain memorized training data:")
for text, scores in scored[:5]:
print(f" Ratio={scores['ratio_score']:.3f} | PPL(xl)={scores['perplexity_large']:.1f}")
print(f" Text: {text[:120]}...")
print()
在Carlini et al.的最佳攻击配置中,67%的排名最高的候选文本被确认为逐字训练样本。 在他们提取的604个唯一记忆序列中,46个包含个人姓名,32个包含联系信息—— 这些都是真实个人,其数据在不知情的情况下被收集到了GPT-2的CommonCrawl训练语料库中。 [USENIX Security 2021]
成员推理
设想一家医院训练了一个机器学习模型来预测患者再入院风险。该模型通过公共API部署, 供保险公司查询。现在考虑一个对手——也许是竞争对手、好管闲事的雇主或恶意的保险公司—— 他拥有某个特定个人的医疗记录,想知道:这个人的数据是否被用来训练这个模型? 这就是成员推理问题,它是现代机器学习中最具实际意义的隐私威胁之一。 [Shokri et al., IEEE S&P 2017]
为什么模型会泄露成员信息
根本原因是过拟合。在某个数据点上训练过的模型通常会为其分配更高的置信度、 更低的损失以及与未见数据点不同的内部表示。即使是经过良好正则化的模型也会在可测量的程度上 表现出这种差异。对于罕见的、不寻常的或完全重复的训练样本,这种差距更大—— 与训练数据提取相同的记忆化现象也使成员推理成为可能。
影子模型技术
Shokri et al. (2017)的开创性攻击引入了影子模型方法。由于攻击者 无法直接观察目标模型的训练集,他们会模拟它:在与目标训练数据相同分布的数据集上 训练多个影子模型。对于每个影子模型,攻击者确切知道哪些点在训练集中 (成员)哪些不在(非成员)。他们记录模型对每个点的输出向量并相应标注。 这产生了一个标注的(模型输出, 成员/非成员)对数据集。在此标注数据集上训练的 攻击分类器随后可以应用于目标模型的输出来推断成员身份。
该技术在实际实验中对Google机器学习服务训练的模型实现了94%的 中位成员推理准确率,对Amazon服务实现了74%的准确率。 [Shokri et al.]
基于损失的推理
一种避免训练影子模型的更简单方法是损失阈值攻击:计算模型在目标点上的 损失并与阈值进行比较。如果损失低于阈值(模型高度自信),则预测为成员。 这是因为训练样本往往比未见数据具有更低的损失,特别是对于过拟合的模型。 更复杂的变体使用参考模型:计算目标模型和在不相交数据上训练的 参考模型之间的似然比。该比率较高的点很可能是成员。
Python实现
import numpy as np
from sklearn.neural_network import MLPClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import roc_auc_score
class MembershipInferenceAttack:
"""
Shadow-model-based membership inference attack.
Assumes black-box access to a target model that returns
class probability vectors for input features.
"""
def __init__(self, n_shadow=4, shadow_architecture=None):
self.n_shadow = n_shadow
self.shadow_architecture = shadow_architecture or (64, 32)
self.attack_classifier = LogisticRegression(max_iter=500)
self.shadow_models = []
self._trained = False
def _train_shadow_model(self, X_train, y_train):
"""Train a single shadow model on a subset of proxy data."""
shadow = MLPClassifier(
hidden_layer_sizes=self.shadow_architecture,
max_iter=300,
random_state=np.random.randint(0, 10000)
)
shadow.fit(X_train, y_train)
return shadow
def _extract_features(self, model, X):
"""
Extract membership inference features from model outputs.
Uses the full probability vector as features (preserves calibration signal).
"""
proba = model.predict_proba(X) # shape: (n_samples, n_classes)
# Additional engineered features: confidence, entropy, top-2 gap
confidence = proba.max(axis=1, keepdims=True)
pred_entropy = -np.sum(proba * np.log(proba + 1e-9), axis=1, keepdims=True)
sorted_p = np.sort(proba, axis=1)[:, ::-1]
top2_gap = (sorted_p[:, 0] - sorted_p[:, 1]).reshape(-1, 1)
return np.hstack([proba, confidence, pred_entropy, top2_gap])
def train_attack(self, proxy_data_X, proxy_data_y):
"""
Build labeled (features, membership) dataset using shadow models,
then train the attack classifier.
proxy_data_X, proxy_data_y : dataset drawn from same distribution
as target's training set.
"""
all_features = []
all_labels = []
n = len(proxy_data_X)
split = n // 2
for i in range(self.n_shadow):
# Randomly partition proxy data into shadow-train and shadow-test
idx = np.random.permutation(n)
train_idx, test_idx = idx[:split], idx[split:]
X_tr, y_tr = proxy_data_X[train_idx], proxy_data_y[train_idx]
X_te, y_te = proxy_data_X[test_idx], proxy_data_y[test_idx]
shadow = self._train_shadow_model(X_tr, y_tr)
self.shadow_models.append(shadow)
# "In" examples: training set of this shadow model → label 1
f_in = self._extract_features(shadow, X_tr)
# "Out" examples: test set of this shadow model → label 0
f_out = self._extract_features(shadow, X_te)
all_features.append(np.vstack([f_in, f_out]))
all_labels.extend([1] * len(X_tr) + [0] * len(X_te))
print(f"Shadow model {i+1}/{self.n_shadow} trained. "
f"Accuracy={shadow.score(X_te, y_te):.3f}")
F = np.vstack(all_features)
L = np.array(all_labels)
self.attack_classifier.fit(F, L)
self._trained = True
def infer_membership(self, target_model, query_points):
"""
Given a trained target model and query data points,
return membership probability for each point.
1.0 = likely member, 0.0 = likely non-member.
"""
if not self._trained:
raise Runtime错误("Call train_attack() first.")
features = self._extract_features(target_model, query_points)
return self.attack_classifier.predict_proba(features)[:, 1]
def evaluate(self, target_model, member_X, nonmember_X):
"""Compute AUC of attack against target model."""
member_scores = self.infer_membership(target_model, member_X)
nonmember_scores = self.infer_membership(target_model, nonmember_X)
y_true = np.concatenate([np.ones(len(member_X)), np.zeros(len(nonmember_X))])
y_score = np.concatenate([member_scores, nonmember_scores])
auc = roc_auc_score(y_true, y_score)
print(f"成员推理 AUC: {auc:.4f}")
return auc
隐私影响:GDPR与HIPAA
成员推理直接违反了主要数据保护法律中规定的隐私原则。根据GDPR第17条 (被遗忘权),个人可以请求删除其数据——但如果已部署的模型揭示了成员身份, 有效删除就变得不可能,除非重新训练或应用机器遗忘技术。根据HIPAA, 未经去标识化就用于训练模型的健康信息,如果可以从已部署的模型中推断出成员身份, 可能使机构面临法律责任。监管机构越来越倾向于将可证明的成员推理漏洞视为合规失败, 而不仅仅是理论风险。
属性推理
成员推理是一个二元问题:这个人是否在训练集中?属性推理更为精细: 假设一个人在训练集中(或者甚至只是作为模型的输入),攻击者能否了解关于他们的 未作为输入明确提供的敏感属性?这类攻击特别有害,因为它可以在查询时操作, 而不仅仅是在训练时——任何预测API都可能泄露查询对象的人口统计或行为属性。
属性推理的工作原理
考虑一个在包含"批准特征"(收入、信用记录、贷款金额)和"受保护属性"(种族、 性别、邮编作为种族的代理)的数据集上训练的信用评分模型。即使受保护属性在 推理时被排除在模型的官方特征集之外,模型可能在训练期间已经吸收了这些相关性。 一个知道目标个人批准特征的对手可以查询模型,并通过观察预测来推断个人的 受保护属性。研究表明,即使是使用明确公平性约束训练的模型,仍然可以通过其输出分布泄露受保护属性。
该攻击通常通过训练一个重建模型来工作:攻击者收集大量 (已知特征, 模型输出)对,其中敏感属性也是已知的(来自单独的数据集或通过关联), 并训练一个分类器从模型的输出中预测敏感属性。Yeom et al. (2018)将其形式化为 一种在模型学会利用敏感属性与标签之间的相关性时就会成功的攻击。
人口统计和行为推理
LLM为属性推理提供了特别丰富的攻击面,因为它们产生细致的、开放式的输出。 研究表明,LLM可以仅从写作风格来预测用户的人口统计特征(年龄、性别、 政治倾向、国籍)。当LLM在用户交互日志上进行微调时,微调后的模型可能通过 响应查询方式的微妙系统性差异来暴露单个用户的特征——即使是属于微调集 而非当前查询的用户也是如此。
交叉引用攻击
一个强大的变体将属性推理与辅助数据相结合。攻击者不仅依赖模型本身; 他们将模型输出与公开可用的数据库、社交媒体个人资料和其他数据源结合。 例如,知道一个医疗模型对特征为(年龄=54, 邮编=90210, 诊断=T2D)的 患者预测73%的再入院风险,当与公共选民登记和财产记录交叉引用时, 可能足以唯一识别该患者并推断其完整的医疗记录。
缓解策略
防御属性推理需要训练时和部署时的双重干预。公平性对抗训练 惩罚允许判别器从中间表示中推断受保护属性的模型。输出限制—— 仅返回硬标签而非置信度分数——减少了攻击者可用的信息,但不能消除风险。 带有差分隐私的联邦学习可以限制模型权重中编码的个人级别 信息量,提供最强的理论保证。
LLM侧信道攻击:Whisper Leak
LLM部署中一个普遍的假设是TLS/HTTPS加密为用户查询提供了有意义的机密性。 Whisper Leak研究(2025)打破了这一假设。该攻击表明, 对用户加密网络流量具有被动访问权限的对手——例如互联网服务提供商、被入侵的路由器 或恶意Wi-Fi接入点——可以在不解密任何一个字节负载的情况下,以超过98%的准确率 推断用户LLM查询的主题,在测试的28个商业LLM中有17个达到了这一水平。 [Whisper Leak, arXiv 2511.03675, 2025]
攻击前提:为什么加密不够
TLS(使用AES-GCM等流密码)加密负载内容但保留了负载大小:
size(密文) = size(明文) + 常数。
当LLM逐Token生成流式响应时,每个Token作为单独的加密数据包发送。
由于Token长度可变(Tokenthe是3个字符;
antidisestablishmentarianism是28个字符),
数据包大小序列直接编码了Token长度序列。
关键洞察是不同主题会产生系统性不同的Token长度模式。 关于量子物理的回复使用更长、更不常见的技术术语。关于烹饪的回复使用更短、 更常见的词汇。关于法律事务的回复包含特定的法律术语和拉丁短语。 这些模式在不同用户提出类似问题时足够稳定,训练有素的分类器仅从数据包大小 就能识别主题——甚至不需要看到内容。
实验设置与结果
研究人员为每个模型收集了21,716个查询(100个主题变体 x 100次重复 + 11,716个Quora噪声查询),并使用tcpdump捕获的加密流量轨迹训练二元分类器。 测试针对来自主要提供商的28个LLM,包括OpenAI(GPT-4o-mini、GPT-4.1)、 Anthropic(Claude 3 Haiku)、Google(Gemini 1.5 Flash、2.5 Pro)、 Microsoft、xAI(Grok)、Mistral、DeepSeek、Meta(通过Lambda的LLaMA) 和Amazon(Nova)。 [Whisper Leak]
AUPRC(中位数)
28个模型中有17个超过98%。所有模型平均96.8%。
精确率
17/28个模型在5-20%召回率下达到100%精确率(如GPT-4o-mini、Mistral、Grok)。
最难攻击的目标
Google Gemini(81-84% AUPRC)和Amazon Nova(71-77%)最具抗性。
攻击者模型
被动网络观察者(ISP级别)。无需主动干预。
三种分类器架构
该论文在数据包大小+到达间隔时间特征序列上评估了三种分类器架构:
- LightGBM:在展平的、零填充的数据包大小/时间序列 (填充到第95百分位长度)上的梯度提升决策树集成。训练速度快,准确率有竞争力。
- BiLSTM:带注意力机制的双向LSTM。嵌入每个 (数据包大小, 到达间隔时间)对,使用BiLSTM+注意力处理, 然后通过两层MLP头进行分类。捕获序列依赖关系。
- 基于BERT的(DistilBERT):将数据包大小和时间离散化为 50个区间的Token,然后在这些Token序列上微调DistilBERT分类器。 在具有复杂数据包分布的模型上表现最佳。
概念性攻击代码:流量捕获与分类
import subprocess
import struct
import numpy as np
from collections import namedtuple
import joblib # for loading pre-trained LightGBM classifier
Packet追踪 = namedtuple("Packet追踪", ["sizes", "timings"])
def capture_llm_traffic(target_host, duration_sec=30, interface="eth0"):
"""
Passively capture encrypted HTTPS packets to/from an LLM API endpoint.
Returns a Packet追踪 with per-packet sizes and inter-arrival times.
IMPORTANT: Only use on networks/systems you are authorized to monitor.
This is a conceptual demonstration of the Whisper Leak methodology.
Uses tcpdump (requires appropriate privileges or packet capture capability).
"""
# In production, Whisper Leak uses tcpdump output parsed with Scapy
# Here we show the structure; actual implementation needs pcap parsing
cmd = [
"tcpdump", "-i", interface,
"-nn", "-q", "-tt", # timestamp + size, no DNS resolution
f"host {target_host} and port 443",
"-c", "2000" # capture up to 2000 packets
]
# Parse output lines of form: "1700000000.123456 IP src > dst: ... length NNN"
raw_lines = subprocess.check_output(cmd, stderr=subprocess.DEVNULL).decode()
sizes = []
timestamps = []
prev_ts = None
inter_arrivals = []
for line in raw_lines.splitlines():
parts = line.split()
if not parts: continue
try:
ts = float(parts[0])
# length is typically the last token starting with a digit
pkt_len = int([p for p in parts if p.isdigit()][-1])
sizes.append(pkt_len)
if prev_ts is not None:
inter_arrivals.append(ts - prev_ts)
prev_ts = ts
except (Value错误, Index错误):
continue
return Packet追踪(sizes=sizes, inter_arrivals=inter_arrivals)
def featurize_trace(trace, max_len=512):
"""
Pad/truncate packet size and inter-arrival sequences to fixed length,
then concatenate into a feature vector for LightGBM classification.
Mirrors the Whisper Leak featurization strategy.
"""
sizes = np.array(trace.sizes[:max_len], dtype=np.float32)
timings = np.array(trace.inter_arrivals[:max_len], dtype=np.float32)
# Zero-pad to max_len
sizes_padded = np.pad(sizes, (0, max_len - len(sizes)))
timings_padded = np.pad(timings, (0, max_len - len(timings)))
# Concatenate sizes + timings into one feature vector
feature_vector = np.concatenate([sizes_padded, timings_padded])
return feature_vector.reshape(1, -1)
def classify_prompt_topic(trace, classifier_path, topic_labels):
"""
Given a captured traffic trace, load a pre-trained LightGBM classifier
and predict the topic of the underlying LLM prompt.
"""
clf = joblib.load(classifier_path) # pre-trained binary or multi-class model
features = featurize_trace(trace)
proba = clf.predict_proba(features)[0]
results = sorted(zip(topic_labels, proba), key=lambda x: x[1], reverse=True)
print("Topic classification results:")
for topic, prob in results[:5]:
print(f" {topic:30} {prob:.3f}")
return results[0][0] # top predicted topic
防御
Whisper Leak论文评估了随机填充作为缓解措施:向每个流式Token附加随机长度的字符串 以隐藏其长度。Cloudflare在初始披露后实施了这一防御。然而,论文表明即使有填充, 数据包之间的时间信息仍然保留了显著的可分类信号。 唯一完全有效的防御是恒定形状流量:将所有响应填充到固定大小, 并在传输前批量处理Token,消除所有数据包大小变化。 这从根本上与大多数LLM提供商优化的低延迟流式用户体验相冲突。
Token长度侧信道
Whisper Leak攻击推断提示的主题。Weiss et al. (2024)的 研究追求更精确的目标:从加密数据包的大小推断LLM响应的确切内容——逐词推断。 这就是Token长度侧信道,它代表了LLM部署历史上最令人震惊的隐私失败之一。 [Microsoft Security Blog, 2025]
攻击方法论
当LLM逐Token流式传输响应时,每个TLS数据包恰好携带一个Token的字节。
数据包大小直接揭示Token的字节长度:3字节的数据包携带3个字符的Token;
7字节的数据包携带7个字符的Token。观察加密流的对手因此了解了响应中
每个Token的长度序列——例如,[4, 5, 1, 3, 6, 2, 7, ...]。
掌握这个长度序列后,Weiss et al.使用辅助LLM来重建与该确切Token长度模式 匹配的最合理句子。这个任务本质上是一个受约束的文本生成问题:生成一个其分词化 恰好产生观察到的长度序列的连贯句子。鉴于分词器词表是固定且广泛已知的 (例如OpenAI模型的tiktoken),这是一个约束极强的问题——远比无约束生成容易。
该攻击实现了大约27%的精确Token重建率。虽然这看起来不高, 但请考虑到即使部分重建也可能足以识别响应是否包含敏感的医疗诊断、 法律建议或私人个人信息。
为什么这种攻击在结构上难以修复
根本原因不是TLS的缺陷——而是流式API设计的根本属性。任何满足以下条件的系统:
- 使用固定的分词器(因此Token长度可从词表预测),
- 通过TLS逐一流式传输Token,以及
- 使用保留明文长度的流密码,
都容易受到这种攻击。唯一能完全中和它的缓解措施是: (a) 为所有Token添加确定性填充,使它们看起来大小相同, 或 (b) 在加密前将多个Token批量打包到每个数据包中——两者都会增加延迟 并降低使流式传输有价值的响应性。Cloudflare在Weiss et al.披露后 在CDN层实施了逐Token随机填充,但后续工作仍然证明了残余信息泄漏。 [Whisper Leak引用Cloudflare缓解措施]
对敏感部署的影响
任何在医疗、法律、金融或政府环境中使用流式API部署的LLM,都可能向 用户和提供商之间具有网络可见性的任何人泄露响应内容。这包括企业代理、 VPN提供商、ISP,以及潜在的政府监控基础设施。对于高度敏感的部署, 适当的安全态势是完全禁用流式传输(将完整响应作为单个数据包返回) 或在本地部署LLM推理,不暴露于外部网络。
高效推理中的计时攻击
在LLM服务市场中,性能竞争激烈,提供商在推理优化技术上投入巨大。 两个主要类别——推测解码和KV缓存共享—— 已被证明会引入可利用的侧信道,揭示有关用户输入和系统配置的私有信息。
推测解码侧信道
推测解码通过让一个小型、低成本的"草稿模型"预测前方几个Token, 然后让大型目标模型并行验证它们来加速LLM推理。当草稿模型猜测正确时, 一次验证通过接受多个Token。当它猜测错误时,模型回退到单Token自回归生成。 这种接受/拒绝模式是输入相关的:某些提示将以许多正确的推测 解码(每次迭代产生更大的数据包),而其他提示触发许多错误推测(每次迭代更小的数据包)。
Wei et al. (2024)证明,观察每次生成迭代数据包大小的被动网络对手可以重建 这个接受/拒绝轨迹,并使用它在四种不同的推测解码方案中以超过90%的准确率 指纹识别用户查询。 [Wei et al., "When Speculation Spills Secrets," 2024] 具体来说:REST达到约100%的查询识别准确率,LADE达到92%,BiLD达到95%, 即使是远程vLLM上的EAGLE也达到了77.6%的准确率。
除了查询指纹识别之外,拥有API访问权限的恶意用户还可以提取 私有数据存储内容,这些内容被检索增强的推测解码(如REST)使用: 通过构造旨在探测数据存储的输入并观察哪些Token被正确推测, 攻击者可以以每秒超过25个Token的速度泄露数据存储内容。这对于在检索语料库中 包含专有或机密文档的RAG系统来说是一个特别严重的威胁。
Carlini & Nasr:推理优化引起的计时变化
Carlini & Nasr (2024)展示了这类攻击的早期版本,表明商业模型(GPT-4、Claude) 中推理优化引起的计时变化可以通过数据包到达间隔时间作为侧信道被利用。 他们的工作为该研究领域建立了威胁模型,尽管后续研究表明到达间隔时间信号 比数据包大小信号噪声大得多——Wei et al.的方法在相同的vLLM设置上达到77.6%的 准确率,而Carlini & Nasr的方法仅达到14.4%。
InputSnatch:KV缓存计时攻击
第二类计时侧信道利用KV缓存共享。现代LLM推理后端 (vLLM、TensorRT-LLM)实现了前缀缓存:如果两个请求共享公共前缀, 为该前缀计算的KV缓存将被重用而不是重新计算。这产生了可测量的计时差异—— 缓存命中的响应明显快于缓存未命中。 [Zheng et al., "InputSnatch," arXiv 2411.18191, 2024]
Zheng et al. (2024)的InputSnatch攻击利用此漏洞 重建其他用户的缓存提示。该攻击通过系统地用候选输入查询目标服务 并观察首个Token时间(TTFT)是否指示缓存命中来工作。缓存命中揭示 候选前缀匹配了另一个用户的缓存查询。通过迭代构建越来越长的匹配前缀, 攻击者可以重建受害用户的确切提示——即使服务使用HTTPS加密。
在带有前缀缓存的医疗问答聊天机器人实验中,InputSnatch在提取确切疾病输入方面 实现了62%的成功率,精确症状描述的成功率为13.5%。 对于带有语义缓存的法律咨询RAG系统,语义提取成功率范围从43%到100%。
TPUXtract:硬件电磁泄漏攻击
到目前为止讨论的所有攻击都在API或网络层面运行。但还有另一个完全不同的攻击面: 运行模型的物理硬件。在专用芯片(TPU、GPU、NPU)上的神经网络推理会产生 与正在执行的操作相关的电磁(EM)辐射。与硬件物理邻近的对手—— 甚至是在数据中心相邻机架中有测量设备访问权限的人——可以使用这些辐射来推断模型的 架构。这就是TPUXtract。 [Keysight Security Blog, 2025]
攻击方法论
该攻击由北卡罗来纳州立大学的研究人员在Google张量处理单元(TPU)上演示。 基本观察是TPU的功耗根据正在处理的层配置产生可测量的变化。 不同的层类型(卷积、全连接、注意力)、不同的层大小和不同的连接模式 各自产生不同的电磁特征。
TPUXtract利用了神经网络中数据顺序流过各层的事实。 攻击者在TPU处理输入时测量随时间变化的电磁信号,并将不同的时间窗口 与不同层配置预期的电磁特征相关联。通过将观察到的电磁轨迹与 预先表征的层特征库进行匹配,攻击者逐层重建模型的架构。
与试图一次匹配整个模型相比,这种逐层方法大大降低了搜索复杂度。 该攻击在从TPU提取神经网络超参数方面达到了99.91%的准确率。
可以提取什么
TPUXtract可以恢复:
- 层数——网络的深度
- 层类型——全连接、卷积、注意力、归一化
- 层维度——每层的神经元/通道数
- 连接模式——跳跃连接、注意力头
关键的是,TPUXtract不能恢复模型权重——即训练期间学习的实际数值参数。 这是一个重要的限制:了解架构就像知道建筑的蓝图但不知道里面的家具。 然而,了解架构是极其有价值的:它使通过API进行模型提取更加高效 (替代模型的搜索空间现在大大受限),它揭示了专有的架构创新, 并为有针对性的对抗攻击提供了路线图。对于基于Transformer的LLM, 完整提取需要额外的步骤(清零特定层的权重以隔离其他层), 但论文证明了其可行性。
对模型知识产权安全的影响
云提供商和硬件制造商历来假设加速器的电磁辐射在多租户环境中不可被利用, 因为租户之间的隔离应该防止物理访问。TPUXtract挑战了这一假设: 在共同定位的数据中心中,来自相邻物理硬件的测量可能就足够了。 更广泛地说,任何在非端到端物理控制的硬件上运行AI推理的组织 都面临架构保密风险。有效的对策包括硬件级电磁屏蔽、噪声注入电路 和功耗掩盖——所有这些都是来自密码学硬件安全领域的成熟技术, 现在变得与AI部署相关。 [Dark Reading, 2024]
模型反演
模型反演攻击从模型输出重建代表性输入,本质上是反向运行推理。 攻击者不是问"这个模型对这个输入预测什么?",而是问"这个模型将什么输入 与这个预测关联?"在人脸识别领域,模型反演可以仅从模型的置信度分数 重建训练对象的可识别面部图像——这是一种深刻的隐私侵犯。
Fredrikson et al.:面部重建
Fredrikson, Jha和Ristenpart (CCS 2015)的开创性模型反演论文 证明了在一组命名个人上训练的人脸识别模型可以被反演,为模型标签空间中 任何目标个人产生可识别的面部图像。 [Fredrikson et al., CCS 2015] 该攻击通过在输入空间中基于梯度的优化工作:从随机噪声开始, 攻击者迭代调整输入以最大化模型对目标标签的置信度。优化收敛到一个 高度可识别为目标个人的输入——即使从未见过他们在训练集中的实际照片。
这种攻击之所以有效,是因为模型在其参数中编码了关于每个个人面部特征的 足够信息以做出自信的预测——基于梯度的反演可以将这些信息解码回图像空间。 该攻击成功地产生了人类评估者能以显著高于随机水平的比率正确识别为目标个人的面部图像。
基于梯度的反演代码
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
def model_inversion_attack(
model,
target_class,
input_shape,
n_iterations=2000,
lr=0.01,
reg_strength=0.001
):
"""
White-box model inversion: reconstruct an input that maximally activates
target_class in the model's output.
Args:
model : PyTorch classification model (should output logits or log-probs)
target_class : int, index of the class to invert
input_shape : tuple, e.g. (1, 3, 64, 64) for a single RGB 64×64 image
n_iterations : optimization steps
lr : learning rate
reg_strength : L2 regularization on input (prevents degenerate solutions)
Returns:
reconstructed_input : torch.Tensor of shape input_shape
"""
model.eval()
# Initialize from random noise in [0, 1]
x = torch.rand(input_shape, requires_grad=True)
optimizer = optim.Adam([x], lr=lr)
criterion = nn.CrossEntropyLoss()
target = torch.tensor([target_class], dtype=torch.long)
for step in range(n_iterations):
optimizer.zero_grad()
# Clamp input to valid image range
x_clamped = torch.clamp(x, 0.0, 1.0)
# Forward pass through the target model
logits = model(x_clamped)
# Loss: maximize confidence for target_class + regularize for naturalness
classification_loss = criterion(logits, target)
regularization = reg_strength * torch.norm(x_clamped, p=2)
total_loss = classification_loss + regularization
total_loss.backward()
optimizer.step()
if step % 500 == 0:
confidence = torch.softmax(logits, dim=-1)[0, target_class].item()
print(f"Step {step:4d} | Loss={total_loss.item():.4f} | "
f"Confidence for class {target_class}: {confidence:.3f}")
return x.detach().clamp(0, 1)
def black_box_inversion(model_query_fn, target_class, input_shape,
n_iterations=5000, population_size=50):
"""
Black-box model inversion using natural evolution strategy (NES).
model_query_fn: callable that takes an input array and returns confidence scores.
Uses estimated gradients from score differences in random directions.
"""
sigma = 0.1 # noise scale for gradient estimation
lr = 0.01 # learning rate
# 开始 from mean of uniform distribution
x = np.random.uniform(0, 1, input_shape).astype(np.float32)
for step in range(n_iterations):
# Estimate gradient via random perturbations
noise = np.random.randn(population_size, *input_shape).astype(np.float32)
rewards = np.zeros(population_size)
for i, n in enumerate(noise):
x_perturbed = np.clip(x + sigma * n, 0, 1)
scores = model_query_fn(x_perturbed)
rewards[i] = scores[target_class] # maximize target class confidence
# NES gradient estimate
grad_estimate = np.mean(
rewards[:, None, ...] * noise.reshape(population_size, -1),
axis=0
).reshape(input_shape) / sigma
x = np.clip(x + lr * grad_estimate, 0, 1)
if step % 1000 == 0:
curr_confidence = model_query_fn(x)[target_class]
print(f"Step {step} | Target confidence: {curr_confidence:.3f}")
return x
现代模型反演攻击通过利用生成对抗网络(GAN)和扩散模型作为输入分布的先验知识 变得极其强大。基于GAN的反演将搜索限制在同一领域训练的GAN的潜在空间中, 确保重建的图像是逼真的且语义有效的。这种方法已经实现了64x64分辨率的面部重建, 即使面对生产级人脸识别模型,人类评估者也能识别。
防御:速率限制
针对模型提取最简单且最容易部署的防御措施是速率限制: 约束任何单个用户或IP地址每单位时间可以进行的查询数量。由于模型提取需要 数千到数十万次API调用,经过良好校准的速率限制可以大幅增加攻击的时间和金钱成本, 有可能使其在经济上不可行。
自适应速率限制
静态速率限制是一种粗糙的工具——它可能会阻止合法的高级用户, 而老练的攻击者会将查询分散到多个账户或IP地址。自适应速率限制 监控区分合法使用和提取尝试的行为信号,并动态调整限制。 提取查询往往是系统性的(间隔规律,以结构化方式覆盖输入空间), 而合法查询往往是不规律且语义连贯的。 按用户查询预算、查询分布异常检测和渐进式节流创造了分层防御。
实现
import time
import collections
import numpy as np
from dataclasses import dataclass, field
from typing import Dict, Deque
@dataclass
class UserQueryRecord:
"""Per-user state for extraction detection."""
query_times: Deque[float] = field(default_factory=lambda: collections.deque(maxlen=1000))
query_count: int = 0
flagged_count: int = 0
throttle_until: float = 0.0 # Unix timestamp when throttling expires
class AdaptiveRateLimiter:
"""
Adaptive rate limiter that detects extraction-like query patterns.
Implements:
1. Hard per-minute rate limit
2. Daily budget per user
3. Regularity anomaly detection (extraction queries are too regular)
4. Progressive throttling on suspicious users
"""
def __init__(
self,
hard_limit_per_minute=60,
daily_budget=5000,
regularity_threshold=0.15, # coefficient of variation below which = suspicious
throttle_multiplier=4.0 # slow down suspicious users by this factor
):
self.hard_limit_per_minute = hard_limit_per_minute
self.daily_budget = daily_budget
self.regularity_threshold = regularity_threshold
self.throttle_multiplier = throttle_multiplier
self.users: Dict[str, UserQueryRecord] = {}
def _get_user(self, user_id: str) -> UserQueryRecord:
if user_id not in self.users:
self.users[user_id] = UserQueryRecord()
return self.users[user_id]
def _queries_last_minute(self, record: UserQueryRecord) -> int:
now = time.time()
cutoff = now - 60
return sum(1 for t in record.query_times if t > cutoff)
def _is_too_regular(self, record: UserQueryRecord) -> bool:
"""
Extraction bots tend to query at very regular intervals.
Compute coefficient of variation (CV) of inter-query intervals.
Low CV (regular) → flag as suspicious.
"""
times = list(record.query_times)
if len(times) < 20:
return False # insufficient data
intervals = np.diff(sorted(times[-50:])) # last 50 queries
if len(intervals) == 0 or np.mean(intervals) == 0:
return False
cv = np.std(intervals) / np.mean(intervals)
return cv < self.regularity_threshold
def check_and_record(self, user_id: str) -> dict:
"""
Check whether a query from user_id should be allowed.
Returns {"allowed": bool, "reason": str, "throttle_remaining": float}
Records the query time if allowed.
"""
record = self._get_user(user_id)
now = time.time()
# 1. Check if user is currently under active throttle
if now < record.throttle_until:
return {
"allowed": False,
"reason": "throttled",
"throttle_remaining": record.throttle_until - now
}
# 2. Check hard per-minute rate limit
if self._queries_last_minute(record) >= self.hard_limit_per_minute:
return {
"allowed": False,
"reason": "rate_limit_exceeded",
"throttle_remaining": 0
}
# 3. Check daily budget
if record.query_count >= self.daily_budget:
return {
"allowed": False,
"reason": "daily_budget_exceeded",
"throttle_remaining": 0
}
# 4. Check for extraction-like regularity
suspicious = self._is_too_regular(record)
if suspicious:
record.flagged_count += 1
# Progressive: each flag doubles the throttle window
throttle_seconds = min(60 * self.throttle_multiplier ** record.flagged_count, 86400)
record.throttle_until = now + throttle_seconds
return {
"allowed": False,
"reason": "extraction_pattern_detected",
"throttle_remaining": throttle_seconds
}
# 5. Allow: record the query
record.query_times.append(now)
record.query_count += 1
return {"allowed": True, "reason": "ok", "throttle_remaining": 0}
防御:输出扰动
即使攻击者成功收集了数千个查询-响应对,其替代模型的保真度取决于这些标签的质量。 如果目标模型的输出被扰动——通过添加校准噪声、四舍五入置信度分数或将输出限制为 top-k类——替代模型在损坏的监督上训练并降低质量。 输出扰动的艺术在于添加足够的噪声以阻碍提取,同时保留足够的信号以维持合法用户的效用。
输出的差分隐私机制
差分隐私理论中的拉普拉斯机制和高斯机制
提供了具有正式保证的输出噪声添加原则方法。对于返回[0,1]^k中
概率向量的模型,添加尺度为Δf / ε的拉普拉斯噪声
(其中Δf是输出函数的L1敏感度,ε是隐私参数)
确保(ε, 0)-差分隐私。在实践中,softmax输出的敏感度为2(有界范围),
因此噪声尺度为2/ε。
置信度分数四舍五入和Top-k限制
一种更简单的非概率方法是置信度分数四舍五入: 返回四舍五入到2位小数而非8位的概率。这大幅减少了每个查询响应的信息量, 同时保留了类别的序数排名(这是大多数合法用户所需要的)。 Top-k限制仅返回前k个预测而非完整分布, 进一步限制了替代模型可用的信息。
实现
import numpy as np
from scipy.special import softmax
class 输出PerturbationDefense:
"""
Defends against model extraction by perturbing model outputs before
returning them to the user.
Supports three strategies:
- 'laplace' : Add Laplace noise (differential privacy)
- 'rounding' : Round confidences to d decimal places
- 'topk' : Return only top-k predictions
"""
def __init__(self, strategy='laplace', epsilon=1.0, decimal_places=2, top_k=3):
assert strategy in ('laplace', 'gaussian', 'rounding', 'topk')
self.strategy = strategy
self.epsilon = epsilon # privacy budget (smaller = more noise)
self.decimal_places = decimal_places
self.top_k = top_k
def perturb(self, probabilities: np.ndarray) -> np.ndarray:
"""
Apply output perturbation to a probability vector.
probabilities: np.ndarray of shape (n_classes,), summing to 1.
Returns perturbed probabilities (not necessarily summing to 1 for noisy methods).
"""
proba = np.asarray(probabilities, dtype=np.float64)
if self.strategy == 'laplace':
return self._laplace_mechanism(proba)
elif self.strategy == 'gaussian':
return self._gaussian_mechanism(proba)
elif self.strategy == 'rounding':
return self._rounding(proba)
elif self.strategy == 'topk':
return self._topk_restriction(proba)
def _laplace_mechanism(self, proba):
"""
Add Laplace noise with scale = sensitivity / epsilon.
For probability vectors, L1 sensitivity is 2.
After noise addition, re-normalize and clip to [0, 1].
"""
sensitivity = 2.0 # L1 sensitivity of softmax output
scale = sensitivity / self.epsilon
noise = np.random.laplace(loc=0.0, scale=scale, size=proba.shape)
noisy = np.clip(proba + noise, 0.0, 1.0)
# Re-normalize to sum to 1 (project onto probability simplex)
total = noisy.sum()
return noisy / total if total > 0 else np.ones_like(noisy) / len(noisy)
def _gaussian_mechanism(self, proba):
"""
Gaussian mechanism with (epsilon, delta)-DP guarantee.
Uses delta=1e-5 by default; sigma calibrated to L2 sensitivity.
"""
delta = 1e-5
l2_sensitivity = np.sqrt(2) # L2 sensitivity of probability vector
sigma = l2_sensitivity * np.sqrt(2 * np.log(1.25 / delta)) / self.epsilon
noise = np.random.normal(0, sigma, proba.shape)
noisy = np.clip(proba + noise, 0.0, 1.0)
total = noisy.sum()
return noisy / total if total > 0 else np.ones_like(noisy) / len(noisy)
def _rounding(self, proba):
"""Round each probability to d decimal places, then renormalize."""
rounded = np.round(proba, self.decimal_places)
total = rounded.sum()
return rounded / total if total > 0 else np.ones_like(rounded) / len(rounded)
def _topk_restriction(self, proba):
"""
Zero out all but the top-k classes and renormalize.
Returns a sparse vector with at most k non-zero entries.
"""
k = min(self.top_k, len(proba))
result = np.zeros_like(proba)
top_indices = np.argsort(proba)[-k:]
result[top_indices] = proba[top_indices]
total = result.sum()
return result / total if total > 0 else result
# ── Privacy-utility tradeoff demonstration ──────────────────────────────
if __name__ == "__main__":
original_proba = np.array([0.70, 0.20, 0.07, 0.03])
for eps in [0.1, 1.0, 5.0]:
defense = 输出PerturbationDefense(strategy='laplace', epsilon=eps)
perturbed = defense.perturb(original_proba)
print(f"ε={eps:.1f}: original={np.round(original_proba,3)} → perturbed={np.round(perturbed,3)}")
关键权衡:更小的ε意味着更多噪声(更强的隐私,更差的效用)。
对于大多数商业模型,ε = 1.0到ε = 5.0
代表了实用的操作范围——足够的噪声将替代模型的训练信号降低15-30%,
同时保持合法用户的预测准确率在可接受的范围内。
防御:模型水印
速率限制和输出扰动试图阻止提取。模型水印采取不同的方法: 它假设提取可能发生,并在模型的行为中嵌入一个可验证的签名, 该签名会持续存在于替代模型中,使原始模型所有者能够证明 一个疑似被盗的模型是从其源模型派生的。 [综述:深度学习的知识产权保护, arXiv 2411.05051, 2024]
基于后门的水印
最广泛部署的水印技术引入了一个秘密触发集: 一小组精心构造的输入-输出对,模型被训练为以特定的、不寻常的方式响应。 例如,人脸识别模型可能被水印化以将包含特定微妙纹理模式的图像 以非常高的置信度分类为指定的"水印类"。 模型的合法副本(包括通过原始模型输出的知识蒸馏训练的任何替代模型) 也将表现出这种行为,因为攻击者的训练查询包含了这些触发输入, 攻击者忠实地复制了相应的响应。当模型所有者怀疑有被盗副本时, 他们用触发集查询它并检查预期的水印行为。
参数空间水印
白盒水印将签名直接嵌入模型权重而非行为中。Uchida et al. (2017)提出 在训练期间使用正则化项将比特串嵌入特定层的权重值分布中。 水印可以通过计算权重向量与秘密密钥矩阵的内积来提取。 这种方法比基于后门的方法对模型修改(微调、剪枝)更具鲁棒性, 但需要访问模型的内部权重进行验证——只有当攻击者将其替代模型提供检查时才可能。
局限性和活跃研究
水印面临几个基本挑战。首先,知道水印存在的坚定对手可以尝试通过微调、 模型剪枝或知识蒸馏到新模型来移除它。其次,基于后门的水印引入了 真正的安全漏洞——如果触发模式被发现,它可以被用来操纵模型的行为。 第三,水印在事后提供归因,但不能防止提取造成的初始损害。 尽管如此,出于知识产权诉讼目的,一个在提取后仍然存在且可证明与 受害者模型非平凡相关的鲁棒水印提供了强有力的盗窃法律证据。
关于放射性数据的最新工作——用传播到在其上训练的模型中的 不可感知扰动来污染训练数据——提供了一种在数据而非模型层面运作的替代水印方法, 即使攻击者从被盗训练数据训练一个全新模型而不是从API蒸馏,也能提供归因。
API提取尝试监控
一个经过良好检测的API可以通过监控行为异常来检测正在进行的模型提取。 合法的API用户表现出特有的使用模式(与特定用例相关的突发查询、 NLP应用中的自然语言多样性、可预测的昼夜模式),这与提取攻击不同 (系统性覆盖输入空间、高查询量、机械生成的输入、低语义多样性)。 有效的API监控将统计异常检测与特定领域的提取启发式方法相结合。
提取行为特征
- 异常查询分布:提取查询往往均匀覆盖输入域或沿特定的 信息论标准分布,产生与自然使用截然不同的输入分布。
- 高查询速度:来自单个账户或关联账户的接近最大速率的API调用。
- 低语义连贯性:对于NLP模型,提取查询可能包含部分随机化的 文本、边界情况输入或不会产生于真正用户需求的语法异常构造。
- 缺乏反馈模式:合法用户通常会跟进错误或低置信度响应。 提取机器人通常不会。
- 跨账户协调:具有相似查询模式或查询输入空间互补区域的多个账户。
异常检测实现
import numpy as np
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
import collections
import math
class ExtractionDetector:
"""
Anomaly-based detector for model extraction attempts.
Builds a feature vector per user session from query statistics
and uses Isolation Forest to identify anomalous usage patterns.
"""
def __init__(self, window_size=100, contamination=0.05):
"""
window_size : number of recent queries to consider per user
contamination : expected fraction of anomalous users (for IsolationForest)
"""
self.window_size = window_size
self.detector = IsolationForest(contamination=contamination, random_state=42)
self.scaler = StandardScaler()
self.user_history = collections.defaultdict(list) # user_id -> [query_features]
self._fitted = False
def _compute_session_features(self, user_id: str) -> np.ndarray:
"""
Compute a feature vector summarizing a user's recent query behavior.
Features:
0: queries_per_minute (last window)
1: inter_query_cv (coefficient of variation of intervals)
2: input_entropy (diversity of input lengths)
3: confidence_mean (avg model confidence — low for adversarial inputs)
4: confidence_cv (variation in model confidence)
5: unique_input_ratio (fraction of distinct inputs — high for extraction)
"""
history = self.user_history[user_id][-self.window_size:]
n = len(history)
if n < 2:
return np.zeros(6)
timestamps = np.array([h["timestamp"] for h in history])
input_lengths = np.array([h["input_length"] for h in history])
confidences = np.array([h["top_confidence"] for h in history])
input_hashes = [h["input_hash"] for h in history]
intervals = np.diff(sorted(timestamps))
# Feature 0: query rate (queries per minute in the observed window)
time_span = timestamps.max() - timestamps.min()
qpm = (n / time_span * 60) if time_span > 0 else 0
# Feature 1: inter-query interval regularity (extraction bots → low CV)
cv_intervals = (np.std(intervals) / np.mean(intervals)) if np.mean(intervals) > 0 else 0
# Feature 2: entropy of input lengths (systematic scanning → low entropy)
length_counts = collections.Counter(input_lengths)
total = sum(length_counts.values())
entropy = -sum((c/total) * math.log2(c/total + 1e-9) for c in length_counts.values())
# Feature 3 & 4: model confidence statistics
conf_mean = np.mean(confidences)
conf_cv = np.std(confidences) / conf_mean if conf_mean > 0 else 0
# Feature 5: unique input ratio (close to 1.0 for systematic extraction)
unique_ratio = len(set(input_hashes)) / n
return np.array([qpm, cv_intervals, entropy, conf_mean, conf_cv, unique_ratio])
def record_query(self, user_id: str, timestamp: float, input_length: int,
top_confidence: float, input_hash: str):
"""Record metadata for a single API query (do not store raw inputs for privacy)."""
self.user_history[user_id].append({
"timestamp": timestamp,
"input_length": input_length,
"top_confidence": top_confidence,
"input_hash": input_hash,
})
def fit_baseline(self, baseline_user_ids):
"""Train the anomaly detector on baseline legitimate user sessions."""
features = [self._compute_session_features(uid) for uid in baseline_user_ids
if len(self.user_history[uid]) >= 10]
if not features:
raise Value错误("No baseline data available.")
X = np.array(features)
X_scaled = self.scaler.fit_transform(X)
self.detector.fit(X_scaled)
self._fitted = True
def score_user(self, user_id: str) -> float:
"""
Returns anomaly score for a user.
Negative score → anomalous (potential extraction).
Positive score → normal behavior.
"""
if not self._fitted:
raise Runtime错误("Call fit_baseline() first.")
features = self._compute_session_features(user_id).reshape(1, -1)
scaled = self.scaler.transform(features)
return self.detector.decision_function(scaled)[0]
def is_suspicious(self, user_id: str, threshold: float = 0.0) -> bool:
"""Returns True if the user's behavior is anomalous above threshold."""
return self.score_user(user_id) < threshold
差分隐私
到目前为止讨论的所有防御措施都在推理时运行——在模型训练完成之后。 差分隐私(DP)通过修改训练过程本身来解决根本原因, 限制任何单个训练样本对最终模型的影响程度。差分隐私模型在形式上保证 仅揭示关于任何单个训练样本的有限信息——提供可证明的对成员推理、 属性推理和训练数据提取攻击的抵抗力。
形式化定义
如果对于任何两个相邻数据集D和D'(仅在一条记录上不同), 以及对于任何输出集S,随机化机制M满足 (ε, δ)-差分隐私:
Pr[M(D) ∈ S] ≤ eε · Pr[M(D') ∈ S] + δ
更小的ε意味着更强的隐私。当δ = 0时(纯DP),保证是绝对的; 允许δ > 0(近似DP,通常δ < 1/n)以换取显著更好的效用而略微放松保证。 隐私预算ε跟踪机制所有使用中的总信息泄漏—— 它随每次查询、访问或训练步骤被消耗,提供了随时间管理隐私风险的定量框架。
DP-SGD:使用差分隐私进行训练
差分隐私随机梯度下降(DP-SGD),由Abadi et al. (2016)引入, 是使用差分隐私训练神经网络的标准机制。该算法以两种方式修改标准SGD:
- 梯度裁剪:逐样本单独计算梯度(而非在批次上平均), 并裁剪到最大L2范数C。这限制了任何单个训练样本对梯度更新的影响。
- 高斯噪声添加:在参数更新前,将标准差与
C × σ成比例的高斯噪声添加到裁剪后的梯度总和中, 其中σ是根据所需的(ε, δ)预算校准的噪声乘数。
DP-SGD的代价是模型准确率的降低:更多噪声意味着更少有效的梯度信号, 尤其是在训练早期。隐私-效用权衡由ε控制:ε < 1的值提供强隐私但准确率损失显著; ε = 1-10提供适度隐私和适度准确率成本(在分类基准上通常为2-5%)。
使用Opacus实现
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
from opacus import PrivacyEngine
from opacus.utils.batch_memory_manager import BatchMemoryManager
import numpy as np
def train_with_dp(
model: nn.模块,
train_loader: DataLoader,
optimizer: torch.optim.Optimizer,
n_epochs: int,
target_epsilon: float = 1.0, # privacy budget
target_delta: float = 1e-5, # failure probability
max_grad_norm: float = 1.0, # gradient clipping norm
noise_multiplier:float = 1.1, # σ: larger → more privacy, less utility
device: str = "cpu"
) -> dict:
"""
Train a PyTorch model with (target_epsilon, target_delta)-differential privacy
using the Opacus library (meta-pytorch/opacus).
Returns: dict with final epsilon, delta, and per-epoch losses.
"""
model = model.to(device)
criterion = nn.CrossEntropyLoss()
# Attach the PrivacyEngine to enforce DP during training
privacy_engine = PrivacyEngine()
model, optimizer, train_loader = privacy_engine.make_private_with_epsilon(
module=model,
optimizer=optimizer,
data_loader=train_loader,
epochs=n_epochs,
target_epsilon=target_epsilon,
target_delta=target_delta,
max_grad_norm=max_grad_norm,
)
# Equivalent to specifying noise_multiplier directly:
# model, optimizer, train_loader = privacy_engine.make_private(
# module=model, optimizer=optimizer, data_loader=train_loader,
# noise_multiplier=noise_multiplier, max_grad_norm=max_grad_norm,
# )
history = []
for epoch in range(1, n_epochs + 1):
model.train()
epoch_losses = []
with BatchMemoryManager(
data_loader=train_loader,
max_physical_batch_size=64, # memory-efficient batching for DP
optimizer=optimizer
) as memory_safe_loader:
for data, target in memory_safe_loader:
data, target = data.to(device), target.to(device)
optimizer.zero_grad()
output = model(data)
loss = criterion(output, target)
loss.backward()
optimizer.step()
epoch_losses.append(loss.item())
# Query current privacy budget spent
epsilon = privacy_engine.get_epsilon(target_delta)
mean_loss = np.mean(epoch_losses)
print(
f"Epoch {epoch:3d}/{n_epochs} | "
f"Loss: {mean_loss:.4f} | "
f"Privacy: (ε={epsilon:.2f}, δ={target_delta})"
)
history.append({"epoch": epoch, "loss": mean_loss, "epsilon": epsilon})
final_epsilon = privacy_engine.get_epsilon(target_delta)
print(f"\nFinal privacy budget spent: ε={final_epsilon:.3f}, δ={target_delta}")
return {"epsilon": final_epsilon, "delta": target_delta, "history": history}
# ── Privacy-utility tradeoff guide ─────────────────────────────────────
# ε ≈ 0.1 : Very strong privacy. Significant accuracy degradation (~10-20%).
# Membership inference reduced to near-random guessing.
# ε ≈ 1.0 : Strong privacy. Moderate accuracy degradation (~3-8%).
# Practical for medical / financial datasets.
# ε ≈ 10.0 : Moderate privacy. Minimal accuracy degradation (~1-3%).
# Reduces but does not eliminate membership inference risk.
# ε > 100 : Weak privacy. Near-original model utility.
# Provides little meaningful protection against determined attackers.
实际部署考虑
DP-SGD需要逐样本梯度计算,这比标准批量梯度计算成本更高——使用 Opacus等库通常在内存方面有2-3倍的开销, 在计算时间方面有1.5-2倍的开销。某些层类型(BatchNorm)与DP-SGD不兼容, 因为它们混合了跨样本信息;必须用GroupNorm或LayerNorm替换。 对于非常大的模型(LLM),DP微调比从头开始DP预训练更实际: 使用DP微调预训练模型需要更少的梯度步骤,因此隐私预算使用得更高效。
DP提供正式的、定量的保证——不仅仅是"我们添加了噪声,看起来更难攻击"。 当监管机构或法律机构问"这个模型有多私密?"时, 一个ε = 1.0, δ = 10-5的训练模型给出了精确的、可审计的答案。 这是相对于启发式防御的重大优势,也是为什么隐私法规越来越多地要求 敏感ML部署使用DP的原因。 [Opacus教程 — Meta AI]
模块总结
本模块涵盖了模型提取和推理攻击的完整生命周期——从基于API的模型克隆的经济学, 到影子模型、训练数据提取和加密流量侧信道攻击的技术机制, 再到速率限制、差分隐私、水印和异常检测的防御全景。
核心要点:
- 模型提取具有经济不对称性:攻击者可以用数千美元的API查询复制数百万美元的训练成果。
- 训练数据提取不是理论上的——Carlini et al.在GPT-2和ChatGPT生产系统上进行了演示。
- 成员推理将模型输出转变为隐私检测器,对GDPR、HIPAA以及医疗/金融ML具有实际影响。
- 侧信道攻击(Whisper Leak、Token长度、推测解码、KV缓存计时)表明仅HTTPS加密不足以保护隐私敏感的LLM部署。
- 硬件攻击(TPUXtract)证明与加速器的物理邻近性可以在没有任何API访问的情况下泄露架构秘密。
- 差分隐私是唯一具有正式定量保证的防御措施——代价是效用和计算开销。
- 分层防御(速率限制 + 输出扰动 + 水印 + 差分隐私)比任何单一机制都更加稳健。