前言
Improved Denoising Diffusion Probabilistic Models(IDDPM) 是上一篇 Denoising Diffusion Probabilistic Models(DDPM)的改进工作。
之前一些重要的公式已在上一篇博客 DDPM原理与代码剖析说过, 这里将不再赘述。本文主要将一些改进点和代码的解释。
本文参考视频 58、Improved Diffusion的PyTorch代码逐行深入讲解, up讲解得很清晰,推荐观看。
本文不断更新中ing…
DDIM针对采样进行了优化,利用respace技巧减少了采样步骤 DDIM原理及代码(Denoising diffusion implicit models)
复现这篇论文代码折腾了一下 Ubuntu 20.04下安装和配置MPI, 感谢这篇文章。
mpi4py这个库一直下不好原来是没有下mpicc。
代码
案例主要基于这份OpenAI官方代码 openai/improved-diffusion 。
该部分主要关注前向扩散,逆向扩散, 采样以及loss计算。至于模型用的是加了attention的unet,这里不展开。
主要集中于 improved_diffusion/gaussian_diffusion.py中的GaussianDiffusion类, 另外该部分只抽取核心部分代码,至于鲁棒性的,比如assert或者类型转换的代码将不包括,需要运行的请查看原仓库中的代码。
GaussianDiffusion
init
模型参数穿进来了betas, 其中betas是这么来的。原始的DDPM是采用Linear的方式,而IDDPM采用的是cosine的方式。
# gaussian_diffusion.py
def get_named_beta_schedule(schedule_name, num_diffusion_timesteps):
"""
Get a pre-defined beta schedule for the given name.
"""
if schedule_name == "linear":
# Linear schedule from Ho et al, extended to work for any number of
# diffusion steps.
scale = 1000 / num_diffusion_timesteps
beta_start = scale * 0.0001
beta_end = scale * 0.02
return np.linspace(
beta_start, beta_end, num_diffusion_timesteps, dtype=np.float64
)
elif schedule_name == "cosine":
return betas_for_alpha_bar(
num_diffusion_timesteps,
lambda t: math.cos((t + 0.008) / 1.008 * math.pi / 2) ** 2,
)
alphas_cumprod 是 α ‾ t \overline{\alpha}_t αt, alphas_cumprod_prev 是 α ‾ t − 1 \overline{\alpha}_{t-1} αt−1, alphas_cumprod_next 是 α ‾ t + 1 \overline{\alpha}_{t+1} αt+1
alphas = 1.0 - betas
alphas_cumprod = np.cumprod(alphas, axis=0)
alphas_cumprod_prev = np.append(1.0, self.alphas_cumprod[:-1])
alphas_cumprod_next = np.append(self.alphas_cumprod[1:], 0.0)
α ‾ t \sqrt{\overline{\alpha}_t} αt 为 sqrt_alphas_cumprod
sqrt_alphas_cumprod = np.sqrt(self.alphas_cumprod)
1 − α ‾ t \sqrt{1-\overline{\alpha}_t} 1−αt 为 sqrt_one_minus_alphas_cumprod
sqrt_one_minus_alphas_cumprod = np.sqrt(1.0 - self.alphas_cumprod)
l o g ( 1 − α ‾ t ) log(1-\overline{\alpha}_t) log(1−αt) 为 log_one_minus_alphas_cumprod。
log_one_minus_alphas_cumprod = np.log(1.0 - self.alphas_cumprod)
1 α ‾ t \frac{1}{\sqrt{\overline{\alpha}_t}} αt1 为 sqrt_recip_alphas_cumprod
sqrt_recip_alphas_cumprod = np.sqrt(1.0 / self.alphas_cumprod)
1 α ‾ t − 1 \sqrt{\frac{1}{\overline{\alpha}_t}-1} αt1−1 为 sqrt_recipm1_alphas_cumprod
sqrt_recipm1_alphas_cumprod = np.sqrt(1.0 / self.alphas_cumprod - 1)
β ~ t = 1 − α ‾ t − 1 1 − α ‾ t β t \widetilde{\beta}_t = \frac{1-\overline{\alpha}_{t-1}}{1-\overline{\alpha}_t}\beta_t β t=1−αt1−αt−1βt
# calculations for posterior q(x_{t-1} | x_t, x_0)
posterior_variance = (
betas * (1.0 - self.alphas_cumprod_prev) / (1.0 - self.alphas_cumprod)
)
取一个log
# log calculation clipped because the posterior variance is 0 at the
# beginning of the diffusion chain.
posterior_log_variance_clipped = np.log(
np.append(self.posterior_variance[1], self.posterior_variance[1:])
)
μ ~ ( X t , X 0 ) = α ‾ t − 1 1 − α ‾ t X 0 + α t ( 1 − α ‾ t − 1 ) 1 − α ‾ t X t \widetilde{\mu}(X_t, X_0) = \frac{\sqrt{\overline{\alpha}_{t-1}}}{1-\overline{\alpha}_t} X_0 + \frac{\sqrt{\alpha_t}(1-\overline{\alpha}_{t-1})}{1-\overline{\alpha}_{t}}X_t μ (Xt,X0)=1−αtαt−1X0+1−αtαt(1−αt−1)Xt, 其中 X 0 X_0 X0 前的系数对应 posterior_mean_coef1, X t X_t Xt 前的系数对应 posterior_mean_coef2。
posterior_mean_coef1 = (
betas * np.sqrt(self.alphas_cumprod_prev) / (1.0 - self.alphas_cumprod)
)
posterior_mean_coef2 = (
(1.0 - self.alphas_cumprod_prev)
* np.sqrt(alphas)
/ (1.0 - self.alphas_cumprod)
)
q_mean_variance
传入 (x_start, t), 得到 均值和方差
q
(
X
t
∣
X
0
)
=
N
(
X
t
;
α
‾
t
X
0
,
(
1
−
α
‾
t
)
I
)
q(X_t|X_0) = N(X_t; \sqrt{\overline{\alpha}_t}X_0, (1-\overline{\alpha}_t)I)
q(Xt∣X0)=N(Xt;αtX0,(1−αt)I)
mean = (
_extract_into_tensor(self.sqrt_alphas_cumprod, t, x_start.shape) * x_start
)
variance = _extract_into_tensor(1.0 - self.alphas_cumprod, t, x_start.shape)
log_variance = _extract_into_tensor(
self.log_one_minus_alphas_cumprod, t, x_start.shape
)
q_sample
重参数化获取加噪后的图片
X
t
=
α
‾
t
X
0
+
1
−
α
‾
t
ϵ
X_t = \sqrt{\overline{\alpha}_t}X_0+\sqrt{1-\overline{\alpha}_t}~\epsilon
Xt=αtX0+1−αt ϵ
_extract_into_tensor(self.sqrt_alphas_cumprod, t, x_start.shape) * x_start
+ _extract_into_tensor(self.sqrt_one_minus_alphas_cumprod, t, x_start.shape)
* noise
q_posterior_mean_variance
后验的均值和分布
μ
~
(
X
t
,
X
0
)
=
α
‾
t
−
1
1
−
α
‾
t
X
0
+
α
t
(
1
−
α
‾
t
−
1
)
1
−
α
‾
t
X
t
\widetilde{\mu}(X_t, X_0) = \frac{\sqrt{\overline{\alpha}_{t-1}}}{1-\overline{\alpha}_t} X_0 + \frac{\sqrt{\alpha_t}(1-\overline{\alpha}_{t-1})}{1-\overline{\alpha}_{t}}X_t
μ
(Xt,X0)=1−αtαt−1X0+1−αtαt(1−αt−1)Xt
posterior_mean = (
_extract_into_tensor(self.posterior_mean_coef1, t, x_t.shape) * x_start
+ _extract_into_tensor(self.posterior_mean_coef2, t, x_t.shape) * x_t
)
β ~ t = 1 − α ‾ t − 1 1 − α ‾ t β t \widetilde{\beta}_t = \frac{1-\overline{\alpha}_{t-1}}{1-\overline{\alpha}_t} \beta_t β t=1−αt1−αt−1βt, 这个之前 init 函数中已经算出来了,就不需要再计算了
posterior_variance = _extract_into_tensor(self.posterior_variance, t, x_t.shape)
p_mean_variance
这里传入的是
t
t
t 时刻的x, 要预测
t
−
1
t-1
t−1 时刻的 均值mean 和方差variance。
方差既可以学习,也可以固定值。
(1) 方差可学习, 及下面的条件
if self.model_var_type in [ModelVarType.LEARNED, ModelVarType.LEARNED_RANGE]:
需要在channel维度切分一下
model_output, model_var_values = th.split(model_output, C, dim=1)
这里也分两种情况,原始DDPM是直接预测方差
if self.model_var_type == ModelVarType.LEARNED:
model_log_variance = model_var_values
model_variance = th.exp(model_log_variance)
而improve-DDPM中是预测范围, 及预测下列式子的v。
Σ
θ
(
X
t
,
t
)
=
e
x
p
(
v
l
o
g
β
t
+
(
1
−
v
)
l
o
g
β
~
t
)
\Sigma_{\theta}(X_t, t)=exp(vlog\beta_t + (1-v)log \widetilde{\beta}_t)
Σθ(Xt,t)=exp(vlogβt+(1−v)logβ
t)
因为 β ~ t = 1 − α ‾ t − 1 1 − α ‾ t β t \widetilde{\beta}_t = \frac{1-\overline{\alpha}_{t-1}}{1-\overline{\alpha}_t} \beta_t β t=1−αt1−αt−1βt, 而 1 − α ‾ t − 1 < 1 − α ‾ t 1-\overline{\alpha}_{t-1} < 1-\overline{\alpha}_t 1−αt−1<1−αt, 所以 β ~ t < β t \widetilde{\beta}_t < \beta_t β t<βt
故 max_log 就是 l o g β t log \beta_t logβt
max_log = _extract_into_tensor(np.log(self.betas), t, x.shape)
而 min_log 就是 l o g β ~ t log \widetilde{\beta}_t logβ t
min_log = _extract_into_tensor(
self.posterior_log_variance_clipped, t, x.shape)
将预测值 [-1, 1] 转化为 [0, 1]
# The model_var_values is [-1, 1] for [min_var, max_var].
frac = (model_var_values + 1) / 2
然后根据公式 Σ θ ( X t , t ) = e x p ( v l o g β t + ( 1 − v ) l o g β ~ t ) \Sigma_{\theta}(X_t, t)=exp(vlog\beta_t + (1-v)log \widetilde{\beta}_t) Σθ(Xt,t)=exp(vlogβt+(1−v)logβ t)
model_log_variance = frac * max_log + (1 - frac) * min_log
model_variance = th.exp(model_log_variance)
(2) 方差不可学习
在DDPM中是用
β
t
\beta_t
βt, 而在IDDPM中有两种方式
β
t
\beta_t
βt or
β
~
t
\widetilde{\beta}_t
β
t
大的方差即
β
t
\beta_t
βt,
ModelVarType.FIXED_LARGE: (
# for fixedlarge, we set the initial (log-)variance like so
# to get a better decoder log likelihood.
np.append(self.posterior_variance[1], self.betas[1:]),
np.log(np.append(self.posterior_variance[1], self.betas[1:])),
),
小的方差即 β ~ t \widetilde{\beta}_t β t
ModelVarType.FIXED_SMALL: (
self.posterior_variance,
self.posterior_log_variance_clipped,
),
注意上面计算的输出是列表,然后我们只需要取出第 t 时刻的
model_variance = _extract_into_tensor(model_variance, t, x.shape)
model_log_variance = _extract_into_tensor(model_log_variance, t, x.shape)
然后就是对均值的预测
(1) 预测
X
t
−
1
X_{t-1}
Xt−1 时刻的均值
if self.model_mean_type == ModelMeanType.PREVIOUS_X:
那么直接
model_mean = model_output
这里顺带还预测除了 X 0 X_0 X0, 在训练中不会用到,但在evaluation中会用到
pred_xstart = process_xstart(
self._predict_xstart_from_xprev(x_t=x, t=t, xprev=model_output)
)
(2) 预测 X 0 X_0 X0
if self.model_mean_type == ModelMeanType.START_X:
经过一个后处理函数即可
pred_xstart = process_xstart(model_output)
(3) 预测 噪声
ModelMeanType.EPSILON
pred_xstart = process_xstart(
self._predict_xstart_from_eps(x_t=x, t=t, eps=model_output)
)
μ ~ ( X t , X 0 ) = α ‾ t − 1 1 − α ‾ t X 0 + α t ( 1 − α ‾ t − 1 ) 1 − α ‾ t X t \widetilde{\mu}(X_t, X_0) = \frac{\sqrt{\overline{\alpha}_{t-1}}}{1-\overline{\alpha}_t} X_0 + \frac{\sqrt{\alpha_t}(1-\overline{\alpha}_{t-1})}{1-\overline{\alpha}_{t}}X_t μ (Xt,X0)=1−αtαt−1X0+1−αtαt(1−αt−1)Xt
model_mean, _, _ = self.q_posterior_mean_variance(
x_start=pred_xstart, x_t=x, t=t
)
_predict_xstart_from_xprev
利用该公式计算 X 0 X_0 X0: μ ~ ( X t , X 0 ) = α ‾ t − 1 1 − α ‾ t X 0 + α t ( 1 − α ‾ t − 1 ) 1 − α ‾ t X t \widetilde{\mu}(X_t, X_0) = \frac{\sqrt{\overline{\alpha}_{t-1}}}{1-\overline{\alpha}_t} X_0 + \frac{\sqrt{\alpha_t}(1-\overline{\alpha}_{t-1})}{1-\overline{\alpha}_{t}}X_t μ (Xt,X0)=1−αtαt−1X0+1−αtαt(1−αt−1)Xt
return ( # (xprev - coef2*x_t) / coef1
_extract_into_tensor(1.0 / self.posterior_mean_coef1, t, x_t.shape) * xprev
- _extract_into_tensor(
self.posterior_mean_coef2 / self.posterior_mean_coef1, t, x_t.shape
)
* x_t
)
_predict_xstart_from_eps
X
0
=
1
α
t
(
X
t
−
β
t
1
−
α
‾
t
ϵ
)
X_0 = \frac{1}{\sqrt{\alpha_t}}(X_t-\frac{\beta_t}{\sqrt{1-\overline{\alpha}_t}}\epsilon)
X0=αt1(Xt−1−αtβtϵ)
将上述公式化简
X
0
=
1
α
‾
t
X
t
−
1
α
‾
t
−
1
ϵ
X_0 = \frac{1}{\sqrt{\overline{\alpha}_t}}X_t - \sqrt{\frac{1}{\overline{\alpha}_t}-1}~\epsilon
X0=αt1Xt−αt1−1 ϵ
_extract_into_tensor(self.sqrt_recip_alphas_cumprod, t, x_t.shape) * x_t
- _extract_into_tensor(self.sqrt_recipm1_alphas_cumprod, t, x_t.shape) * eps
p_sample
基于
X
t
X_t
Xt 采样出
X
t
−
1
X_{t-1}
Xt−1
out是 { “mean”: model_mean, “variance”: model_variance, “log_variance”: model_log_variance, “pred_xstart”: pred_xstart}
# 得到 X[t-1]的均值、方差、对数方差、X[0]的预测值
out = self.p_mean_variance(
model,
x,
t,
clip_denoised=clip_denoised,
denoised_fn=denoised_fn,
model_kwargs=model_kwargs,
)
重参数化采样
noise = th.randn_like(x)
sample = out["mean"] + nonzero_mask * th.exp(0.5 * out["log_variance"]) * noise
p_sample_loop 和 p_sample_loop_progressive 函数迭代调用这个函数。
p_sample_loop_progressive
X 0 X_0 X0
if noise is not None:
img = noise
else:
img = th.randn(*shape, device=device)
T, T-1, …, 0
indices = list(range(self.num_timesteps))[::-1]
不断的采样
for i in indices:
t = th.tensor([i] * shape[0], device=device)
with th.no_grad():
out = self.p_sample(
model,
img,
t,
clip_denoised=clip_denoised,
denoised_fn=denoised_fn,
model_kwargs=model_kwargs,
)
yield out
img = out["sample"]
_vb_terms_bpd
vb 变分下界, bpd 是 bit per dimension
算出 q 分布真实的均值和方差
true_mean, _, true_log_variance_clipped = self.q_posterior_mean_variance(
x_start=x_start, x_t=x_t, t=t
)
算出 p 分布的模型预测的均值和方差
out = self.p_mean_variance(
model, x_t, t, clip_denoised=clip_denoised, model_kwargs=model_kwargs
)
计算两个高斯分布的KL散度。
L
t
−
1
=
D
K
L
(
q
(
X
t
−
1
∣
X
t
,
X
0
)
∣
∣
p
θ
(
X
t
−
1
∣
X
t
)
)
L_{t-1} = D_{KL}(q(X_{t-1}|X_t, X_0)~||~ p_\theta (X_{t-1}|X_t))
Lt−1=DKL(q(Xt−1∣Xt,X0) ∣∣ pθ(Xt−1∣Xt))
kl = normal_kl( true_mean, true_log_variance_clipped,
out["mean"], out["log_variance"])
kl = mean_flat(kl) / np.log(2.0)
L
0
=
−
l
o
g
p
θ
(
X
0
∣
X
1
)
L_0=-log p_{\theta}(X_0|X_1)
L0=−logpθ(X0∣X1)
用一个累计函数的差分模拟一个离散分布
decoder_nll = -discretized_gaussian_log_likelihood(
x_start, means=out["mean"], log_scales=0.5 * out["log_variance"]
)
decoder_nll = mean_flat(decoder_nll) / np.log(2.0)
合并一下, 囊括所有时刻的KL散度
output = th.where((t == 0), decoder_nll, kl)
discretized_gaussian_log_likelihood
improved_diffusion/losses.py
def discretized_gaussian_log_likelihood(x, *, means, log_scales):
"""
Compute the log-likelihood of a Gaussian distribution discretizing to a
given image.
:param x: the target images. It is assumed that this was uint8 values,
rescaled to the range [-1, 1].
:param means: the Gaussian mean Tensor.
:param log_scales: the Gaussian log stddev Tensor.
:return: a tensor like x of log probabilities (in nats).
"""
assert x.shape == means.shape == log_scales.shape
centered_x = x - means
inv_stdv = th.exp(-log_scales)
plus_in = inv_stdv * (centered_x + 1.0 / 255.0)
cdf_plus = approx_standard_normal_cdf(plus_in)
min_in = inv_stdv * (centered_x - 1.0 / 255.0)
cdf_min = approx_standard_normal_cdf(min_in)
log_cdf_plus = th.log(cdf_plus.clamp(min=1e-12))
log_one_minus_cdf_min = th.log((1.0 - cdf_min).clamp(min=1e-12))
cdf_delta = cdf_plus - cdf_min
log_probs = th.where(
x < -0.999,
log_cdf_plus,
th.where(x > 0.999, log_one_minus_cdf_min, th.log(cdf_delta.clamp(min=1e-12))),
)
assert log_probs.shape == x.shape
return log_probs
training_losses
如果losstype是KL的话
if self.loss_type == LossType.KL or self.loss_type == LossType.RESCALED_KL:
调用之前的 vb_terms 函数
terms["loss"] = self._vb_terms_bpd(
model=model,
x_start=x_start,
x_t=x_t,
t=t,
clip_denoised=False,
model_kwargs=model_kwargs,
)["output"]
至于MSE loss则根据模型预测类型不同加以不同的判断
elif self.loss_type == LossType.MSE or self.loss_type == LossType.RESCALED_MSE:
(1) 如果模型预测的是方差
if self.model_var_type in [
ModelVarType.LEARNED,
ModelVarType.LEARNED_RANGE,
]:
拆分开,model_output 和 model_var_values
B, C = x_t.shape[:2]
assert model_output.shape == (B, C * 2, *x_t.shape[2:])
model_output, model_var_values = th.split(model_output, C, dim=1)
frozen_out = th.cat([model_output.detach(), model_var_values], dim=1)
terms["vb"] = self._vb_terms_bpd(
model=lambda *args, r=frozen_out: r,
x_start=x_start,
x_t=x_t,
t=t,
clip_denoised=False,
)["output"]
这里由于model已经预测过一回了,就不用再让模型预测一回,所以传入的model直接就是一个恒等返回。这里 frozen_out 是让方差的学习不影响均值的优化。
model=lambda *args, r=frozen_out: r
这里 lambda相当于一个匿名函数
def fun(*args, r=frozen_out):
return r
如果rescale的话,
if self.loss_type == LossType.RESCALED_MSE:
# Divide by 1000 for equivalence with initial implementation.
# Without a factor of 1/1000, the VB term hurts the MSE term.
terms["vb"] *= self.num_timesteps / 1000.0
接下来就看目标预测的是哪种了
可以是
X
t
−
1
X_{t-1}
Xt−1 时刻的均值和方差,
X
0
X_0
X0, 也可以是噪声
target = {
ModelMeanType.PREVIOUS_X: self.q_posterior_mean_variance(
x_start=x_start, x_t=x_t, t=t
)[0],
ModelMeanType.START_X: x_start,
ModelMeanType.EPSILON: noise,
}[self.model_mean_type]
然后就算个MSEloss即可
terms["mse"] = mean_flat((target - model_output) ** 2)
再把loss合起来文章来源:https://www.toymoban.com/news/detail-440489.html
if "vb" in terms:
terms["loss"] = terms["mse"] + terms["vb"]
else:
terms["loss"] = terms["mse"]
总之,当方差可学习的话,就会有 vb 这一项损失。文章来源地址https://www.toymoban.com/news/detail-440489.html
到了这里,关于IDDPM原理和代码剖析的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!