Pytorch의 효율적인 메트릭 수집 : Torchmetrics의 성능 함정 방지
메트릭 컬렉션은 모든 머신 러닝 프로젝트의 필수 부분으로 모델 성능을 추적하고 교육 진행 상황을 모니터링 할 수 있습니다. 이상적으로는 교육 과정에 대한 추가 간접비를 도입하지 않고도 메트릭을 수집하고 계산해야합니다. 그러나 교육 루프의 다른 구성 요소와 마찬가지로 비효율적 인 메트릭 계산은 불필요한 오버 헤드를 도입하고 교육 단계 시간을 늘리고 교육 비용을 부풀릴 수 있습니다.
이 게시물은 Pytorch의 성능 프로파일 링 및 최적화에 관한 시리즈에서 7 번째입니다. 이 시리즈는 기계 학습 개발의 성능 분석 및 최적화의 중요한 역할을 강조하고자했습니다. 각 게시물은 교육 파이프 라인의 다양한 단계에 중점을 두어 리소스 활용 및 런타임 효율성을 분석하고 향상시키는 실용적인 도구 및 기술을 보여줍니다.
이 기사에서는 메트릭 컬렉션에 중점을 둡니다. 우리는 메트릭 컬렉션의 순진한 구현이 런타임 성능에 부정적인 영향을 미치고 분석 및 최적화를위한 도구와 기술을 탐색하는 방법을 보여줄 것입니다.
메트릭 컬렉션을 구현하기 위해 Pytorch에서 메트릭 계산을 단순화하고 표준화하도록 설계된 인기있는 라이브러리 인 Torchmetrics를 사용합니다. 우리의 목표는 다음과 같습니다.
- 런타임 오버 헤드를 보여줍니다 순진한 메트릭 컬렉션 구현으로 인해 발생합니다.
- Pytorch 프로파일을 사용하십시오 메트릭 계산에 의해 소개 된 성능 병목 현상을 정확히 찾아냅니다.
- 최적화 기술을 보여줍니다 메트릭 수집 오버 헤드를 줄입니다.
토론을 용이하게하기 위해 장난감 Pytorch 모델을 정의하고 메트릭 컬렉션이 런타임 성능에 어떤 영향을 줄 수 있는지 평가합니다. Pytorch 2.5.1 Docker Image 및 Torchmetrics 1.6.1을 사용하여 NVIDIA A40 GPU에서 실험을 실행할 것입니다.
메트릭 수집 동작은 하드웨어, 런타임 환경 및 모델 아키텍처에 따라 크게 다를 수 있습니다. 이 게시물에 제공된 코드 스 니펫은 시연 목적으로 만 사용됩니다. 도구 나 기술에 대한 우리의 언급을 그 사용에 대한 보증으로 해석하지 마십시오.
장난감 resnet 모델
아래 코드 블록에서 RESNET-18 백본으로 간단한 이미지 분류 모델을 정의합니다.
import time
import torch
import torchvision
device = "cuda"
model = torchvision.models.resnet18().to(device)
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters())
장난감 모델을 훈련시키는 데 사용할 합성 데이터 세트를 정의합니다.
from torch.utils.data import Dataset, DataLoader
# A dataset with random images and labels
class FakeDataset(Dataset):
def __len__(self):
return 100000000
def __getitem__(self, index):
rand_image = torch.randn([3, 224, 224], dtype=torch.float32)
label = torch.tensor(data=index % 1000, dtype=torch.int64)
return rand_image, label
train_set = FakeDataset()
batch_size = 128
num_workers = 12
train_loader = DataLoader(
dataset=train_set,
batch_size=batch_size,
num_workers=num_workers,
pin_memory=True
)
우리는 지표 계산을 활성화 또는 비활성화하기 위해 Torchmetrics의 표준 메트릭 모음을 정의합니다.
from torchmetrics import (
MeanMetric,
Accuracy,
Precision,
Recall,
F1Score,
)
# toggle to enable/disable metric collection
capture_metrics = False
if capture_metrics:
metrics = {
"avg_loss": MeanMetric(),
"accuracy": Accuracy(task="multiclass", num_classes=1000),
"precision": Precision(task="multiclass", num_classes=1000),
"recall": Recall(task="multiclass", num_classes=1000),
"f1_score": F1Score(task="multiclass", num_classes=1000),
}
# Move all metrics to the device
metrics = {name: metric.to(device) for name, metric in metrics.items()}
다음으로 Pytorch 프로파일 러 인스턴스와 프로파일 링을 활성화 또는 비활성화 할 수있는 제어 플래그와 함께 정의합니다. Pytorch Profiler 사용에 대한 자세한 자습서는이 시리즈의 첫 번째 게시물을 참조하십시오.
from torch import profiler
# toggle to enable/disable profiling
enable_profiler = True
if enable_profiler:
prof = profiler.profile(
schedule=profiler.schedule(wait=10, warmup=2, active=3, repeat=1),
on_trace_ready=profiler.tensorboard_trace_handler("./logs/"),
profile_memory=True,
with_stack=True
)
prof.start()
마지막으로 표준 교육 단계를 정의합니다.
model.train()
t0 = time.perf_counter()
total_time = 0
count = 0
for idx, (data, target) in enumerate(train_loader):
data = data.to(device, non_blocking=True)
target = target.to(device, non_blocking=True)
optimizer.zero_grad()
output = model(data)
loss = criterion(output, target)
loss.backward()
optimizer.step()
if capture_metrics:
# update metrics
metrics["avg_loss"].update(loss)
for name, metric in metrics.items():
if name != "avg_loss":
metric.update(output, target)
if (idx + 1) % 100 == 0:
# compute metrics
metric_results = {
name: metric.compute().item()
for name, metric in metrics.items()
}
# print metrics
print(f"Step {idx + 1}: {metric_results}")
# reset metrics
for metric in metrics.values():
metric.reset()
elif (idx + 1) % 100 == 0:
# print last loss value
print(f"Step {idx + 1}: Loss = {loss.item():.4f}")
batch_time = time.perf_counter() - t0
t0 = time.perf_counter()
if idx > 10: # skip first steps
total_time += batch_time
count += 1
if enable_profiler:
prof.step()
if idx > 200:
break
if enable_profiler:
prof.stop()
avg_time = total_time/count
print(f'Average step time: {avg_time}')
print(f'Throughput: {batch_size/avg_time:.2f} images/sec')
메트릭 수집 오버 헤드
훈련 단계 시간에 대한 메트릭 컬렉션의 영향을 측정하기 위해 메트릭 계산 유무에 관계없이 교육 스크립트를 실행했습니다. 결과는 다음 표에 요약되어 있습니다.

순진한 메트릭 컬렉션은 런타임 성능이 거의 10% 감소했습니다 !! 메트릭 컬렉션은 기계 학습 개발에 필수적이지만 일반적으로 비교적 간단한 수학 작업이 포함되며 그러한 상당한 오버 헤드를 거의 보증하지 않습니다. 무슨 일이야? !!
Pytorch Profiler의 성능 문제를 식별합니다
성능 저하의 원인을 더 잘 이해하기 위해 Pytorch 프로파일 러가 활성화 된 교육 스크립트를 다시 reran습니다. 결과 추적은 다음과 같습니다.

이 트레이스는 GPU 활용에서 눈에 띄는 낙하와 일치하는 반복되는 “Cudastreamsynchronize”작업을 보여줍니다. 이러한 유형의 “CPU-GPU 동기화”이벤트는 시리즈의 2 부에서 자세히 논의되었습니다. 일반적인 교육 단계에서 CPU 및 GPU는 병렬로 작동합니다. CPU는 데이터 전송과 같은 작업을 GPU 및 커널로드로 관리하고 GPU는 입력 데이터에서 모델을 실행하고 가중치를 업데이트합니다. 이상적으로는 성능을 최대화하기 위해 CPU와 GPU 간의 동기화 지점을 최소화하고 싶습니다. 그러나 여기서는 메트릭 컬렉션이 CPU에서 GPU 데이터 사본을 수행하여 동기화 이벤트를 트리거했음을 알 수 있습니다. 이를 위해서는 GPU가 따라 잡을 때까지 CPU가 처리를 중단해야합니다. 따라서 GPU는 CPU가 후속 커널 작업을로드 할 때까지 대기합니다. 결론은 이러한 동기화 지점이 CPU와 GPU의 비효율적 인 활용으로 이어진다는 것입니다. 우리의 메트릭 수집 구현은 각 훈련 단계에 8 개의 동기화 이벤트를 추가합니다.
트레이스를 면밀히 검토 한 결과, 동기화 이벤트는 평균 계수 토치 메틱의 업데이트 호출에서 나오는 것으로 나타났습니다. 숙련 된 프로파일 링 전문가의 경우 근본 원인을 식별하기에 충분할 수 있지만, 한 걸음 더 나아가 Torch.profiler.record_function 유틸리티를 사용하여 정확한 불쾌한 코드 라인을 식별합니다.
Record_function으로 프로파일 링
동기화 이벤트의 정확한 소스를 정확히 찾기 위해 평균 메트릭 클래스를 확장하고 record_function 컨텍스트 블록을 사용하여 업데이트 메소드를 무시했습니다. 이 접근법을 통해 우리는 방법 내에서 개별 작업을 프로파일 링하고 성능 병목 현상을 식별 할 수 있습니다.
class ProfileMeanMetric(MeanMetric):
def update(self, value, weight = 1.0):
# broadcast weight to value shape
with profiler.record_function("process value"):
if not isinstance(value, torch.Tensor):
value = torch.as_tensor(value, dtype=self.dtype,
device=self.device)
with profiler.record_function("process weight"):
if weight is not None and not isinstance(weight, torch.Tensor):
weight = torch.as_tensor(weight, dtype=self.dtype,
device=self.device)
with profiler.record_function("broadcast weight"):
weight = torch.broadcast_to(weight, value.shape)
with profiler.record_function("cast_and_nan_check"):
value, weight = self._cast_and_nan_check_input(value, weight)
if value.numel() == 0:
return
with profiler.record_function("update value"):
self.mean_value += (value * weight).sum()
with profiler.record_function("update weight"):
self.weight += weight.sum()
그런 다음 새로 생성 된 프로파일 메트릭을 사용하여 AVG_LOSS 메트릭을 업데이트하고 훈련 스크립트를 재 조정했습니다.

업데이트 된 추적은 동기화 이벤트가 다음 줄에서 유래 함을 보여줍니다.
weight = torch.as_tensor(weight, dtype=self.dtype, device=self.device)
이 작업은 기본 스칼라 값을 변환합니다 weight=1.0
Pytorch 텐서에 들어가 GPU에 배치하십시오. 동기화 이벤트는이 작업이 CPU-GPU 데이터 사본을 트리거하기 때문에 발생합니다. 이는 CPU가 GPU가 복사 된 값을 처리 할 때까지 기다려야합니다.
최적화 1 : 가중치 값을 지정합니다
이제 우리는 문제의 출처를 찾았으므로 무게 우리의 가치 업데이트 부르다. 이렇게하면 런타임이 기본 스칼라를 변환하는 것을 방지합니다 weight=1.0
GPU의 텐서로 동기화 이벤트를 피하십시오.
# update metrics
if capture_metric:
metrics["avg_loss"].update(loss, weight=torch.ones_like(loss))
이 변경 사항을 적용한 후 스크립트를 다시 시작하면 초기 동기화 이벤트를 제거하는 데 성공했음을 알 수 있습니다. 이벤트는 새 이벤트를 발견하지 못했습니다. 이번에는 _cast_and_nan_check_input 함수에서 나옵니다.

record_function으로 프로파일 링 – 파트 2
새로운 SYNC 이벤트를 탐색하기 위해 추가 프로파일 링 프로브로 사용자 정의 메트릭을 확장하고 스크립트를 재실행했습니다.
class ProfileMeanMetric(MeanMetric):
def update(self, value, weight = 1.0):
# broadcast weight to value shape
with profiler.record_function("process value"):
if not isinstance(value, torch.Tensor):
value = torch.as_tensor(value, dtype=self.dtype,
device=self.device)
with profiler.record_function("process weight"):
if weight is not None and not isinstance(weight, torch.Tensor):
weight = torch.as_tensor(weight, dtype=self.dtype,
device=self.device)
with profiler.record_function("broadcast weight"):
weight = torch.broadcast_to(weight, value.shape)
with profiler.record_function("cast_and_nan_check"):
value, weight = self._cast_and_nan_check_input(value, weight)
if value.numel() == 0:
return
with profiler.record_function("update value"):
self.mean_value += (value * weight).sum()
with profiler.record_function("update weight"):
self.weight += weight.sum()
def _cast_and_nan_check_input(self, x, weight = None):
"""Convert input ``x`` to a tensor and check for Nans."""
with profiler.record_function("process x"):
if not isinstance(x, torch.Tensor):
x = torch.as_tensor(x, dtype=self.dtype,
device=self.device)
with profiler.record_function("process weight"):
if weight is not None and not isinstance(weight, torch.Tensor):
weight = torch.as_tensor(weight, dtype=self.dtype,
device=self.device)
nans = torch.isnan(x)
if weight is not None:
nans_weight = torch.isnan(weight)
else:
nans_weight = torch.zeros_like(nans).bool()
weight = torch.ones_like(x)
with profiler.record_function("any nans"):
anynans = nans.any() or nans_weight.any()
with profiler.record_function("process nans"):
if anynans:
if self.nan_strategy == "error":
raise RuntimeError("Encountered `nan` values in tensor")
if self.nan_strategy in ("ignore", "warn"):
if self.nan_strategy == "warn":
print("Encountered `nan` values in tensor."
" Will be removed.")
x = x[~(nans | nans_weight)]
weight = weight[~(nans | nans_weight)]
else:
if not isinstance(self.nan_strategy, float):
raise ValueError(f"`nan_strategy` shall be float"
f" but you pass {self.nan_strategy}")
x[nans | nans_weight] = self.nan_strategy
weight[nans | nans_weight] = self.nan_strategy
with profiler.record_function("return value"):
retval = x.to(self.dtype), weight.to(self.dtype)
return retval
결과 트레이스는 다음과 같습니다.

추적은 문제가되는 선을 직접 가리 킵니다.
anynans = nans.any() or nans_weight.any()
이 작업은 확인합니다 NaN
입력 텐서의 값은 작업에는 GPU에서 CPU로 데이터를 복사하는 것이 포함되기 때문에 비용이 많이 드는 CPU-GPU 동기화 이벤트를 도입합니다.
Torchmetric Basegegregation 클래스를 면밀히 검사하면 NAN 값 업데이트를 처리하기위한 몇 가지 옵션이 있습니다. 그러나 사용 사례의 경우 평균 손실 지표를 계산하는 경우이 점검은 불필요하며 런타임 성능 페널티를 정당화하지 않습니다.
최적화 2 : NAN 값 검사를 비활성화합니다
오버 헤드를 제거하기 위해 비활성화를 제안합니다 NaN
값을 재정의하여 값 점검 _cast_and_nan_check_input
기능. 정적 재정의 대신, 우리는 Basegegegator 클래스의 자손에게 유연하게 적용 할 수있는 동적 솔루션을 구현했습니다.
from torchmetrics.aggregation import BaseAggregator
def suppress_nan_check(MetricClass):
assert issubclass(MetricClass, BaseAggregator), MetricClass
class DisableNanCheck(MetricClass):
def _cast_and_nan_check_input(self, x, weight=None):
if not isinstance(x, torch.Tensor):
x = torch.as_tensor(x, dtype=self.dtype,
device=self.device)
if weight is not None and not isinstance(weight, torch.Tensor):
weight = torch.as_tensor(weight, dtype=self.dtype,
device=self.device)
if weight is None:
weight = torch.ones_like(x)
return x.to(self.dtype), weight.to(self.dtype)
return DisableNanCheck
NoNanMeanMetric = suppress_nan_check(MeanMetric)
metrics["avg_loss"] = NoNanMeanMetric().to(device)
사후 최적화 결과 : 성공
두 가지 최적화를 구현 한 후 – 중량 값을 지정하고 비활성화 NaN
점검 – 우리는 기준 실험과 일치하는 단계 시간 성능과 GPU 사용을 찾습니다. 또한, 결과적인 Pytorch Profiler Trace는 메트릭 컬렉션과 관련된 추가 된“Cudastreamsynchronize”이벤트가 제거되었음을 보여줍니다. 약간의 작은 변화로 인해 우리는 메트릭 컬렉션의 행동에 대한 변화없이 훈련 비용을 ~ 10% 줄였습니다.
다음 섹션에서는 추가 메트릭 수집 최적화를 탐색합니다.
예제 2 : 메트릭 장치 배치 최적화
이전 섹션에서는 메트릭 값이 GPU에 존재하여 GPU의 메트릭을 저장하고 계산하는 것이 논리적입니다. 그러나 CPU에 집계하려는 값이 상주하는 시나리오에서는 불필요한 장치 전송을 피하기 위해 CPU에 메트릭을 저장하는 것이 바람직 할 수 있습니다.
아래의 코드 블록에서는 스크립트를 수정하여 CPU의 평균 계수를 사용하여 평균 단계 시간을 계산합니다. 이 변경 사항은 교육 단계의 런타임 성능에 영향을 미치지 않습니다.
avg_time = NoNanMeanMetric()
t0 = time.perf_counter()
for idx, (data, target) in enumerate(train_loader):
# move data to device
data = data.to(device, non_blocking=True)
target = target.to(device, non_blocking=True)
optimizer.zero_grad()
output = model(data)
loss = criterion(output, target)
loss.backward()
optimizer.step()
if capture_metrics:
metrics["avg_loss"].update(loss)
for name, metric in metrics.items():
if name != "avg_loss":
metric.update(output, target)
if (idx + 1) % 100 == 0:
# compute metrics
metric_results = {
name: metric.compute().item()
for name, metric in metrics.items()
}
# print metrics
print(f"Step {idx + 1}: {metric_results}")
# reset metrics
for metric in metrics.values():
metric.reset()
elif (idx + 1) % 100 == 0:
# print last loss value
print(f"Step {idx + 1}: Loss = {loss.item():.4f}")
batch_time = time.perf_counter() - t0
t0 = time.perf_counter()
if idx > 10: # skip first steps
avg_time.update(batch_time)
if enable_profiler:
prof.step()
if idx > 200:
break
if enable_profiler:
prof.stop()
avg_time = avg_time.compute().item()
print(f'Average step time: {avg_time}')
print(f'Throughput: {batch_size/avg_time:.2f} images/sec')
분산 교육을 지원하기 위해 스크립트를 확장하려고 할 때 문제가 발생합니다. 문제를 입증하기 위해 분산 DataparAllel (DDP)을 사용하도록 모델 정의를 수정했습니다.
# toggle to enable/disable ddp
use_ddp = True
if use_ddp:
import os
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
os.environ["MASTER_ADDR"] = "127.0.0.1"
os.environ["MASTER_PORT"] = "29500"
dist.init_process_group("nccl", rank=0, world_size=1)
torch.cuda.set_device(0)
model = DDP(torchvision.models.resnet18().to(device))
else:
model = torchvision.models.resnet18().to(device)
# insert training loop
# append to end of the script:
if use_ddp:
# destroy the process group
dist.destroy_process_group()
DDP 수정은 다음과 같은 오류를 초래합니다.
RuntimeError: No backend type associated with device type cpu
기본적으로 분산 교육의 메트릭은 사용중인 모든 장치에서 동기화하도록 프로그래밍됩니다. 그러나 DDP가 사용하는 동기화 백엔드는 CPU에 저장된 메트릭을 지원하지 않습니다.
이를 해결하는 한 가지 방법은 교차 장치 메트릭 동기화를 비활성화하는 것입니다.
avg_time = NoNanMeanMetric(sync_on_compute=False)
우리의 경우 평균 시간을 측정하는 경우이 솔루션은 허용됩니다. 그러나 경우에 따라 메트릭 동기화가 필수적이며 메트릭을 GPU로 옮기는 것 외에는 선택의 여지가 없을 수 있습니다.
avg_time = NoNanMeanMetric().to(device)
불행히도,이 상황은 업데이트 기능에서 나오는 새로운 CPU-GPU 동기화 이벤트가 발생합니다.

이 동기화 이벤트는 놀라운 일이 아닙니다. 대체로 CPU에 상주하는 값으로 GPU 메트릭을 업데이트하여 메모리 사본이 필요합니다. 그러나 스칼라 메트릭의 경우 간단한 최적화 로이 데이터 전송을 완전히 피할 수 있습니다.
최적화 3 : 스칼라 대신 텐서로 메트릭 업데이트 수행
솔루션은 간단합니다. 플로트 값으로 메트릭을 업데이트하는 대신 전화하기 전에 텐서로 변환합니다. update
.
batch_time = torch.as_tensor(batch_time)
avg_time.update(batch_time, torch.ones_like(batch_time))
이 사소한 변경은 문제가있는 코드 라인을 우회하고 동기화 이벤트를 제거하며 기준 성능으로 단계 시간을 복원합니다.
언뜻보기 에이 결과는 놀라운 것처럼 보일 수 있습니다. CPU 텐서로 GPU 메트릭을 업데이트하려면 여전히 메모리 사본이 필요할 것으로 예상됩니다. 그러나 Pytorch는 명시 적 데이터 전송없이 추가를 수행하는 전용 커널을 사용하여 스칼라 텐서의 작업을 최적화합니다. 이것은 그렇지 않으면 비싼 동기화 이벤트를 피합니다.
요약
이 게시물에서는 Torchmetrics에 대한 순진한 접근 방식이 CPU-GPU 동기화 이벤트를 도입하고 Pytorch 교육 성능을 크게 저하시키는 방법을 탐색했습니다. Pytorch Profiler를 사용하여 이러한 동기화 이벤트를 담당하는 코드 라인을 식별하고이를 제거하기 위해 타겟팅 된 최적화를 적용했습니다.
- 호출 할 때 웨이트 텐서를 명시 적으로 지정하십시오
MeanMetric.update
기본값에 의존하는 대신 함수. - 기본에서 NAN 체크를 비활성화하십시오
Aggregator
클래스 또는보다 효율적인 대안으로 교체하십시오. - 불필요한 전송을 최소화하기 위해 각 메트릭의 장치 배치를 신중하게 관리하십시오.
- 필요하지 않은 경우 교차 기기 메트릭 동기화를 비활성화하십시오.
- 메트릭이 GPU에있는 경우 부동 소수점 스칼라를 텐서로 변환하기 전에 전달합니다.
update
암시 적 동기화를 피하는 기능.
우리는이 게시물에서 논의 된 일부 최적화를 다루는 Torchmetrics Github 페이지에 전용 풀 요청을 만들었습니다. 자신의 개선과 최적화를 자유롭게 기여하십시오!
출처 참조
Post Comment