반응형

다음 강의에 넣으려고 만들었던 전략 코드였는데, 전체 인스턴스 분산락은 구현도 어렵고 설명하기도 어려울 것 같아서 난이도 조절을 위해 제거하고 기록만 해둡니다.

캐시 만료 시점 동시 요청에 대한 Cache Stampede를 방지하기 위해 분산락을 잡는데, 전체 인스턴스 분산락으로 반드시 1회의 요청만 오리진 서버로 전파되도록 합니다.

타 요청들은 락 해제 및 갱신 완료될 때까지 대기하다가 갱신된 요청을 반환합니다.

polling 방식의 비효율로 인한 pub/sub 방식 사용하였지만, pub/sub 만으로는 해결 안되는 지점이 있어서 polling 방식도 혼용했습니다.

실제 적용하려면 테스트 및 검토는 필요합니다.

redis pub/sub + polling 기반으로 분산락 제어

 

- pub/sub 분산락 제공 클래스

@Slf4j
@Component
@RequiredArgsConstructor
public class DistributedLockPubSubProvider {
	private final StringRedisTemplate redisTemplate;
	private final DistributedLockProvider distributedLockProvider;
	private final RedisMessageListenerContainer redisMessageListenerContainer;

	private final Map<String, CountDownLatch> latchMap = new ConcurrentHashMap<>();

	/**
	 * 지금은 단일 토픽으로 구현하지만, 처리량이 많다면 단일 토픽의 부하가 커질 수 있다.
	 * 레디스에서 부하를 분산하기 위해, 토픽도 어떠한 타입별로 분류 하거나 샤딩 전략을 취할 수도 있다.
	 */
	private static final ChannelTopic TOPIC = ChannelTopic.of("unlock-channel");

	@PostConstruct
	private void init() {
		redisMessageListenerContainer.addMessageListener((message, pattern) -> {
			/*
			아직 타 요청에서 latch.await 대기중인 요청이 없으면, latchMap에도 아직 latch가 없을 수 있다.
			하지만 만료되었다는 사실을 버리면, 이후 요청에서 이미 해제된 락이란 사실을 모르고 대기할 수 있으므로,
			MessageListener에서 latch.computeIfAbsent로 latch를 만들어서 해제된 락이란 사실을 기억하고 있는다.
			unlock을 대기중인 요청에 의해 제거되거나, 대기중인 요청이 앞으로도 없다면 스케줄링 작업에 의해 가비지는 제거된다.
			 */
			String idWithLockOwner = new String(message.getBody());
			latchMap.computeIfAbsent(idWithLockOwner, k -> new CountDownLatch(1)).countDown();
		}, TOPIC);

		Set<String> delMarkSet = new HashSet<>();
		Executors.newSingleThreadScheduledExecutor()
			.scheduleWithFixedDelay(() -> {
				/*
				latchMap에 가비지가 생길 수 있기 때문에 주기적으로 정리한다.
				[가비지 시나리오]
				1. A 요청에서 latchMap 생성
				2. B 요청에서 락 없다고 판단
				3. MessageListener에서 latchMap.countDown
				4. A 요청에서 latchMap.await=true 이후 latchMap.remove
				5. B 요청에서 latchMap 생성 <-- 가비지가 생길 수 있음
				한번에 제거하면 정상적으로 countDown된 latch도 제거될 수 있으므로, 1회 마킹 후에 제거한다.
				 */
				latchMap.entrySet().removeIf(entry -> delMarkSet.contains(entry.getKey()));
				delMarkSet.clear();
				latchMap.entrySet().stream()
					.filter(entry -> entry.getValue().getCount() == 0)
					.map(Map.Entry::getKey)
					.forEach(delMarkSet::add);
			}, 3, 3, TimeUnit.SECONDS);
	}

	/**
	 * 락을 점유하면 runnable 수행
	 * 락을 점유하지 못하면, 락을 점유한 요청에서 runnable 수행 및 락을 해제할 때까지 블로킹
	 */
	public boolean runIfLockedElseWait(String id, Duration ttl, Runnable runnable) {
		LockResult lockResult = distributedLockProvider.lockByOwner(id, UUID.randomUUID().toString(), ttl);
		String idWithLockOwner = id + ":" + lockResult.getLockOwner();
		if (lockResult.isAcquired()) {
			try {
				runnable.run();
			} catch (Exception e) {
				log.error("[DistributedLockPubSubProvider.runIfLockedElseWait] id={}", id, e);
			} finally {
				distributedLockProvider.unlock(id);

				/*
				락이 해제되었으면 알림을 보낸다.
				unlock을 기다리는 다른 요청들은 자신이 어떠한 owner의 lock을 대기하는지 알아야 한다.
				unlock이 되었다가 타 요청에 의해 즉시 lock이 재점유될 수 있는데,
				대기 중이던 요청들은 owner를 구분해야 자신이 대기하던 락이 해제되었다는 사실을 구분할 수 있음
				 */
				redisTemplate.convertAndSend(TOPIC.getTopic(), idWithLockOwner);
			}
			return true;
		}

		while (true) {
			/*
			polling + pub/sub 방식 혼합
			TTL로 만료된 경우는 해제 알림을 받을 수 없고, 타 요청에서 latchMap.remove 수행한 경우를 검증하기 위함
			 */
			if (!distributedLockProvider.isLockedByOwner(id, lockResult.getLockOwner())) {
				break;
			}

			try {
				boolean result = latchMap.computeIfAbsent(idWithLockOwner, k -> new CountDownLatch(1))
					.await(100, TimeUnit.MICROSECONDS);
				if (result) {
					break;
				}
			} catch (InterruptedException e) {
				Thread.currentThread().interrupt();
			}
		}
		latchMap.remove(idWithLockOwner);
		return false;
	}
}

 

- 분산락 제공 클래스

@Component
@RequiredArgsConstructor
public class DistributedLockProvider {
	private final StringRedisTemplate redisTemplate;

	private static final RedisScript<String> LOCK_SCRIPT = RedisScript.of(
		"""
		if redis.call("set", KEYS[1], ARGV[1], "NX", "PX", ARGV[2]) then
		  return ARGV[1]
		else
		  return redis.call("get", KEYS[1])
		end
		""",
		String.class
	);

	public boolean lock(String id, Duration ttl) {
		Boolean result = redisTemplate.opsForValue().setIfAbsent(genKey(id), "", ttl);
		return result != null && result;
	}

	public LockResult lockByOwner(String id, String requestedLockOwner, Duration ttl) {
		String savedLockOwner = redisTemplate.execute(
			LOCK_SCRIPT,
			List.of(genKey(id)),
			requestedLockOwner,
			String.valueOf(ttl.toMillis())
		);
		return LockResult.of(requestedLockOwner, savedLockOwner);
	}

	public boolean isLockedByOwner(String id, String lockOwner) {
		String result = redisTemplate.opsForValue().get(genKey(id));
		return result != null && result.equals(lockOwner);
	}

	public void unlock(String id) {
		redisTemplate.delete(genKey(id));
	}

	private String genKey(String id) {
		return "distributed-lock:" + id;
	}
}

 

 

- 캐시 핸들러 클래스

@Component
@RequiredArgsConstructor
public class RequestCollapsingBlockingCacheHandler implements KukeCacheHandler {
	private final StringRedisTemplate redisTemplate;
	private final DistributedLockPubSubProvider distributedLockPubSubProvider;

	@Override
	public <T> T fetch(String key, Duration ttl, Supplier<T> originSupplier, Class<T> clazz) {
		String cached = redisTemplate.opsForValue().get(key);
		if (cached != null) {
			return DataSerializer.deserializeOrNull(cached, clazz);
		}

		distributedLockPubSubProvider.runIfLockedElseWait(key, Duration.ofSeconds(5), () -> {
			T origin = originSupplier.get();
			put(key, ttl, origin);
		});

		cached = redisTemplate.opsForValue().get(key);
		if (cached != null) {
			return DataSerializer.deserializeOrNull(cached, clazz);
		} else {
			return originSupplier.get();
		}
	}

	@Override
	public void put(String key, Duration ttl, Object data) {
		redisTemplate.opsForValue().set(key, DataSerializer.serializeOrException(data), ttl);
	}

	@Override
	public void evict(String key) {
		redisTemplate.delete(key);
	}

	@Override
	public boolean supports(CacheStrategy cacheStrategy) {
		return CacheStrategy.REQUEST_COLLAPSING_BLOCKING == cacheStrategy;
	}
}

 

 

반응형

+ Recent posts