|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import os |
|
|
import operator |
|
|
import time |
|
|
|
|
|
import dllogger as logger |
|
|
import numpy as np |
|
|
import torch.cuda.profiler as profiler |
|
|
from dllogger import JSONStreamBackend, StdOutBackend, Verbosity |
|
|
from pytorch_lightning import Callback |
|
|
|
|
|
|
|
|
def is_main_process(): |
|
|
return int(os.getenv("LOCAL_RANK", "0")) == 0 |
|
|
|
|
|
|
|
|
class PerformanceLoggingCallback(Callback): |
|
|
def __init__(self, log_file, global_batch_size, warmup_steps: int = 0, profile: bool = False): |
|
|
logger.init(backends=[JSONStreamBackend(Verbosity.VERBOSE, log_file), StdOutBackend(Verbosity.VERBOSE)]) |
|
|
self.warmup_steps = warmup_steps |
|
|
self.global_batch_size = global_batch_size |
|
|
self.step = 0 |
|
|
self.profile = profile |
|
|
self.timestamps = [] |
|
|
|
|
|
def do_step(self): |
|
|
self.step += 1 |
|
|
if self.profile and self.step == self.warmup_steps: |
|
|
profiler.start() |
|
|
if self.step > self.warmup_steps: |
|
|
self.timestamps.append(time.time()) |
|
|
|
|
|
def on_train_batch_start(self, trainer, pl_module, batch, batch_idx, dataloader_idx): |
|
|
self.do_step() |
|
|
|
|
|
def on_test_batch_start(self, trainer, pl_module, batch, batch_idx, dataloader_idx): |
|
|
self.do_step() |
|
|
|
|
|
def process_performance_stats(self, deltas): |
|
|
def _round3(val): |
|
|
return round(val, 3) |
|
|
|
|
|
throughput_imgps = _round3(self.global_batch_size / np.mean(deltas)) |
|
|
timestamps_ms = 1000 * deltas |
|
|
stats = { |
|
|
f"throughput": throughput_imgps, |
|
|
f"latency_mean": _round3(timestamps_ms.mean()), |
|
|
} |
|
|
for level in [90, 95, 99]: |
|
|
stats.update({f"latency_{level}": _round3(np.percentile(timestamps_ms, level))}) |
|
|
|
|
|
return stats |
|
|
|
|
|
def _log(self): |
|
|
if is_main_process(): |
|
|
diffs = list(map(operator.sub, self.timestamps[1:], self.timestamps[:-1])) |
|
|
deltas = np.array(diffs) |
|
|
stats = self.process_performance_stats(deltas) |
|
|
logger.log(step=(), data=stats) |
|
|
logger.flush() |
|
|
|
|
|
def on_train_end(self, trainer, pl_module): |
|
|
if self.profile: |
|
|
profiler.stop() |
|
|
self._log() |
|
|
|
|
|
def on_epoch_end(self, trainer, pl_module): |
|
|
self._log() |
|
|
|