Opacusの実装を読んでみる (DP-SGD): Part 2
はじめに
PyTorch向けのDP-SGDライブラリであるOpacusの実装を引き続き読んでいきます。
前回の記事はこちら。
バージョン情報:Opacus v1.4.0
実装を読んでみる
以下はOpacusを用いた学習の実装例です。(モデル訓練部分)
from opacus.utils.batch_memory_manager import BatchMemoryManager
def train(model, train_loader, optimizer, epoch, device):
model.train()
criterion = nn.CrossEntropyLoss()
with BatchMemoryManager(
data_loader=train_loader,
max_physical_batch_size=MAX_PHYSICAL_BATCH_SIZE,
optimizer=optimizer
) as memory_safe_data_loader:
for i, (images, target) in enumerate(memory_safe_data_loader):
optimizer.zero_grad()
images = images.to(device)
target = target.to(device)
# compute output
output = model(images)
loss = criterion(output, target)
loss.backward()
optimizer.step()
DPOptimizer
DPOptimizer
はPyTorchのOptimizer
のサブクラスです。
まずzero_grad
メソッドでは、PyTorch標準のgrad
属性に加えて、Opacusで新たに追加されるgrad_sample
属性とsummed_grad
属性もクリアします。
-
grad_sample
: サンプルごとの勾配(クリッピング前) -
summed_grad
: ミニバッチ集約後の勾配(ノイズ付加前) -
grad
: 最終的な勾配
def zero_grad(self, set_to_none: bool = False):
for p in self.params:
p.grad_sample = None
if not self._is_last_step_skipped:
p.summed_grad = None
self.original_optimizer.zero_grad(set_to_none)
step
メソッドでは、最適化を実行する前にpre_step
メソッドで勾配のクリッピングとノイズ付加を行います。clip_and_accumulate
、add_noise
、scale_grad
、step_hook
の順に実行されます。
def pre_step(
self, closure: Optional[Callable[[], float]] = None
) -> Optional[float]:
self.clip_and_accumulate()
self.add_noise()
self.scale_grad()
if self.step_hook:
self.step_hook(self)
def step(self, closure: Optional[Callable[[], float]] = None) -> Optional[float]:
if self.pre_step():
return self.original_optimizer.step()
clip_and_accumulate
メソッドは、勾配のクリッピングとミニバッチ集約を行います。具体的には、grad_sample
のL2ノルムから求めたper_sample_clip_factor
とgrad_sample
を乗算することで、勾配の大きさがmax_grad_norm
を超えないように制限します。その結果を集約してsummed_grad
に格納します。
def clip_and_accumulate(self):
if len(self.grad_samples[0]) == 0:
# Empty batch
per_sample_clip_factor = torch.zeros((0,))
else:
per_param_norms = [
g.reshape(len(g), -1).norm(2, dim=-1) for g in self.grad_samples
]
per_sample_norms = torch.stack(per_param_norms, dim=1).norm(2, dim=1)
per_sample_clip_factor = (
self.max_grad_norm / (per_sample_norms + 1e-6)
).clamp(max=1.0)
for p in self.params:
grad_sample = self._get_flat_grad_sample(p)
grad = contract("i,i...", per_sample_clip_factor, grad_sample)
if p.summed_grad is not None:
p.summed_grad += grad
else:
p.summed_grad = grad
add_noise
メソッドは、勾配へのノイズ付加を行います。 _generate_noise
関数で平均が0
、分散がnoise_multiplier
とmax_grad_norm
の積である正規ノイズを生成し、summed_grad
に加えたものをgrad
とします。
def add_noise(self):
for p in self.params:
noise = _generate_noise(
std=self.noise_multiplier * self.max_grad_norm,
reference=p.summed_grad,
generator=self.generator,
secure_mode=self.secure_mode,
)
p.grad = (p.summed_grad + noise).view_as(p)
def _generate_noise(
std: float,
reference: torch.Tensor,
generator=None,
secure_mode: bool = False,
) -> torch.Tensor:
if secure_mode:
### 省略 ###
else:
return torch.normal(
mean=0,
std=std,
size=reference.shape,
device=reference.device,
generator=generator,
)
scale_grad
メソッドは、バッチサイズに基づいて勾配をスケーリングします。
def scale_grad(self):
if self.loss_reduction == "mean":
for p in self.params:
p.grad /= self.expected_batch_size * self.accumulated_iterations
step_hook
メソッドは、前回の記事でアタッチしたIAccountant
のhook_fn
関数を実行します。
def get_optimizer_hook_fn(
self, sample_rate: float
) -> Callable[[DPOptimizer], None]:
def hook_fn(optim: DPOptimizer):
self.step(
noise_multiplier=optim.noise_multiplier,
sample_rate=sample_rate * optim.accumulated_iterations,
)
return hook_fn
PRVAccountant
PRVAccountant
はIAccountant
のサブクラスで、下記論文の方法でプライバシーバジェットを計算します。
step
メソッドは、上述のDPOptimizer
のpre_step
メソッドの最後に呼び出され、history
にノイズ乗数、サンプルレート、ステップ数を記録します。
def step(self, *, noise_multiplier: float, sample_rate: float):
if len(self.history) >= 1:
(last_noise_multiplier, last_sample_rate, num_steps) = self.history.pop()
if (
last_noise_multiplier == noise_multiplier
and last_sample_rate == sample_rate
):
self.history.append(
(last_noise_multiplier, last_sample_rate, num_steps + 1)
)
else:
self.history.append(
(last_noise_multiplier, last_sample_rate, num_steps)
)
self.history.append((noise_multiplier, sample_rate, 1))
else:
self.history.append((noise_multiplier, sample_rate, 1))
get_epsilon
メソッドは、history
の記録から消費されたプライバシーバジェット
def get_epsilon(
self, delta: float, *, eps_error: float = 0.01, delta_error: float = None
) -> float:
if delta_error is None:
delta_error = delta / 1000
# we construct a discrete PRV from the history
dprv = self._get_dprv(eps_error=eps_error, delta_error=delta_error)
# this discrete PRV can be used to directly estimate and bound epsilon
_, _, eps_upper = dprv.compute_epsilon(delta, delta_error, eps_error)
# return upper bound as we want guarantee, not just estimate
return eps_upper
続く
モデル訓練中の処理は以上になります。
次回は、サンプルごとの勾配を計算するGradSampleModule
クラスの詳細を見ていきます。
続きの記事はこちらです。
Discussion