반응형
다음 강의에 넣으려고 만들었던 전략 코드였는데, 전체 인스턴스 분산락은 구현도 어렵고 설명하기도 어려울 것 같아서 난이도 조절을 위해 제거하고 기록만 해둡니다.
캐시 만료 시점 동시 요청에 대한 Cache Stampede를 방지하기 위해 분산락을 잡는데, 전체 인스턴스 분산락으로 반드시 1회의 요청만 오리진 서버로 전파되도록 합니다.
타 요청들은 락 해제 및 갱신 완료될 때까지 대기하다가 갱신된 요청을 반환합니다.
polling 방식의 비효율로 인한 pub/sub 방식 사용하였지만, pub/sub 만으로는 해결 안되는 지점이 있어서 polling 방식도 혼용했습니다.
실제 적용하려면 테스트 및 검토는 필요합니다.
redis pub/sub + polling 기반으로 분산락 제어
- pub/sub 분산락 제공 클래스
@Slf4j
@Component
@RequiredArgsConstructor
public class DistributedLockPubSubProvider {
private final StringRedisTemplate redisTemplate;
private final DistributedLockProvider distributedLockProvider;
private final RedisMessageListenerContainer redisMessageListenerContainer;
private final Map<String, CountDownLatch> latchMap = new ConcurrentHashMap<>();
/**
* 지금은 단일 토픽으로 구현하지만, 처리량이 많다면 단일 토픽의 부하가 커질 수 있다.
* 레디스에서 부하를 분산하기 위해, 토픽도 어떠한 타입별로 분류 하거나 샤딩 전략을 취할 수도 있다.
*/
private static final ChannelTopic TOPIC = ChannelTopic.of("unlock-channel");
@PostConstruct
private void init() {
redisMessageListenerContainer.addMessageListener((message, pattern) -> {
/*
아직 타 요청에서 latch.await 대기중인 요청이 없으면, latchMap에도 아직 latch가 없을 수 있다.
하지만 만료되었다는 사실을 버리면, 이후 요청에서 이미 해제된 락이란 사실을 모르고 대기할 수 있으므로,
MessageListener에서 latch.computeIfAbsent로 latch를 만들어서 해제된 락이란 사실을 기억하고 있는다.
unlock을 대기중인 요청에 의해 제거되거나, 대기중인 요청이 앞으로도 없다면 스케줄링 작업에 의해 가비지는 제거된다.
*/
String idWithLockOwner = new String(message.getBody());
latchMap.computeIfAbsent(idWithLockOwner, k -> new CountDownLatch(1)).countDown();
}, TOPIC);
Set<String> delMarkSet = new HashSet<>();
Executors.newSingleThreadScheduledExecutor()
.scheduleWithFixedDelay(() -> {
/*
latchMap에 가비지가 생길 수 있기 때문에 주기적으로 정리한다.
[가비지 시나리오]
1. A 요청에서 latchMap 생성
2. B 요청에서 락 없다고 판단
3. MessageListener에서 latchMap.countDown
4. A 요청에서 latchMap.await=true 이후 latchMap.remove
5. B 요청에서 latchMap 생성 <-- 가비지가 생길 수 있음
한번에 제거하면 정상적으로 countDown된 latch도 제거될 수 있으므로, 1회 마킹 후에 제거한다.
*/
latchMap.entrySet().removeIf(entry -> delMarkSet.contains(entry.getKey()));
delMarkSet.clear();
latchMap.entrySet().stream()
.filter(entry -> entry.getValue().getCount() == 0)
.map(Map.Entry::getKey)
.forEach(delMarkSet::add);
}, 3, 3, TimeUnit.SECONDS);
}
/**
* 락을 점유하면 runnable 수행
* 락을 점유하지 못하면, 락을 점유한 요청에서 runnable 수행 및 락을 해제할 때까지 블로킹
*/
public boolean runIfLockedElseWait(String id, Duration ttl, Runnable runnable) {
LockResult lockResult = distributedLockProvider.lockByOwner(id, UUID.randomUUID().toString(), ttl);
String idWithLockOwner = id + ":" + lockResult.getLockOwner();
if (lockResult.isAcquired()) {
try {
runnable.run();
} catch (Exception e) {
log.error("[DistributedLockPubSubProvider.runIfLockedElseWait] id={}", id, e);
} finally {
distributedLockProvider.unlock(id);
/*
락이 해제되었으면 알림을 보낸다.
unlock을 기다리는 다른 요청들은 자신이 어떠한 owner의 lock을 대기하는지 알아야 한다.
unlock이 되었다가 타 요청에 의해 즉시 lock이 재점유될 수 있는데,
대기 중이던 요청들은 owner를 구분해야 자신이 대기하던 락이 해제되었다는 사실을 구분할 수 있음
*/
redisTemplate.convertAndSend(TOPIC.getTopic(), idWithLockOwner);
}
return true;
}
while (true) {
/*
polling + pub/sub 방식 혼합
TTL로 만료된 경우는 해제 알림을 받을 수 없고, 타 요청에서 latchMap.remove 수행한 경우를 검증하기 위함
*/
if (!distributedLockProvider.isLockedByOwner(id, lockResult.getLockOwner())) {
break;
}
try {
boolean result = latchMap.computeIfAbsent(idWithLockOwner, k -> new CountDownLatch(1))
.await(100, TimeUnit.MICROSECONDS);
if (result) {
break;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
latchMap.remove(idWithLockOwner);
return false;
}
}
- 분산락 제공 클래스
@Component
@RequiredArgsConstructor
public class DistributedLockProvider {
private final StringRedisTemplate redisTemplate;
private static final RedisScript<String> LOCK_SCRIPT = RedisScript.of(
"""
if redis.call("set", KEYS[1], ARGV[1], "NX", "PX", ARGV[2]) then
return ARGV[1]
else
return redis.call("get", KEYS[1])
end
""",
String.class
);
public boolean lock(String id, Duration ttl) {
Boolean result = redisTemplate.opsForValue().setIfAbsent(genKey(id), "", ttl);
return result != null && result;
}
public LockResult lockByOwner(String id, String requestedLockOwner, Duration ttl) {
String savedLockOwner = redisTemplate.execute(
LOCK_SCRIPT,
List.of(genKey(id)),
requestedLockOwner,
String.valueOf(ttl.toMillis())
);
return LockResult.of(requestedLockOwner, savedLockOwner);
}
public boolean isLockedByOwner(String id, String lockOwner) {
String result = redisTemplate.opsForValue().get(genKey(id));
return result != null && result.equals(lockOwner);
}
public void unlock(String id) {
redisTemplate.delete(genKey(id));
}
private String genKey(String id) {
return "distributed-lock:" + id;
}
}
- 캐시 핸들러 클래스
@Component
@RequiredArgsConstructor
public class RequestCollapsingBlockingCacheHandler implements KukeCacheHandler {
private final StringRedisTemplate redisTemplate;
private final DistributedLockPubSubProvider distributedLockPubSubProvider;
@Override
public <T> T fetch(String key, Duration ttl, Supplier<T> originSupplier, Class<T> clazz) {
String cached = redisTemplate.opsForValue().get(key);
if (cached != null) {
return DataSerializer.deserializeOrNull(cached, clazz);
}
distributedLockPubSubProvider.runIfLockedElseWait(key, Duration.ofSeconds(5), () -> {
T origin = originSupplier.get();
put(key, ttl, origin);
});
cached = redisTemplate.opsForValue().get(key);
if (cached != null) {
return DataSerializer.deserializeOrNull(cached, clazz);
} else {
return originSupplier.get();
}
}
@Override
public void put(String key, Duration ttl, Object data) {
redisTemplate.opsForValue().set(key, DataSerializer.serializeOrException(data), ttl);
}
@Override
public void evict(String key) {
redisTemplate.delete(key);
}
@Override
public boolean supports(CacheStrategy cacheStrategy) {
return CacheStrategy.REQUEST_COLLAPSING_BLOCKING == cacheStrategy;
}
}
반응형
'Spring' 카테고리의 다른 글
| 스프링부트 대규모 시스템 캐시 전략 강의 오픈 (0) | 2025.11.08 |
|---|---|
| Spring Boot 파일 업로드 및 테스트(multipart/form-data) (5) | 2021.12.12 |
| JPA cascade = CascadeType.REMOVE와 @OnDelete(action = OnDeleteAction.CASCADE)의 차이 (5) | 2021.12.09 |
| 스프링부트 레디스(redis)로 캐시 사용 중 에러 RedisCommandExecutionException (0) | 2021.11.10 |
| 웹 채팅 방 인원 공유파일 전송 (spring boot, react, nextjs, redux, redux-saga, aws s3) (0) | 2021.11.10 |