https://suho0303.tistory.com/48
잘못된 방식이니 따라하지 마세요!!!!
이전 게시물에서 단일 파티션에서의 동시성 처리에 대해서 작성해보았는데요.
단일 파티션으로 동시성을 처리하게 되면, 현재 상태에서는 문제가 없겠지만 이후 프로젝트가 확장할 시에 많은 성능적 문제가 발생하게 됩니다.
문제점
단일 파티션을 하게되면 한 파티션에만 메세지가 쌓이기 때문에 순서 보장이 됩니다.
그러나, 단일 파티션으로 구성시에는 모든 메세지가 한 파티션에 쌓이게 되면서 처리 성능이 매우 떨어지고, 해당 파티션에 문제가 발생하 면 모든 데이터가 유실될 가능성도 생깁니다.
멀티 파티션을 적용해서 개선해보고자 했습니다.
이를 통해서 메세지를 병렬적으로 처리하면서 처리량을 향상시키도록 구현해보았습니다.
아직 완벽하게 동시성을 처리하는지는 확실치 않지만, 한번 확인해봐주시면 감사하겠습니다.
멀티 파티션
@Bean
public NewTopic topic() {
return new NewTopic("orders", 3, (short) 1);
}
우선은 특정 토픽에 대한 파티션 개수를 3개로 늘려보았습니다.
그다음 메세지를 발급하는 메소드 입니다.
@Transactional
public void sendMessage(Long id) {
OutboxMessage outboxMessage = outboxMessageService.findById(id);
retryTemplate.execute(context -> {
kafkaTemplate.send(outboxMessage.getTopic(), outboxMessage.getPayload(), outboxMessage.getMessage());
outboxMessageService.delete(outboxMessage);
return null;
}, context -> {
log.error("Failed to send message: {}", context.getLastThrowable().getMessage());
return null;
});
}
저는 재고에 대한 동시성 처리를 했기 때문에 키값을 주문ID값으로 주고 진행을 했는데요.
멀티파티션의 경우, 여러 파티션에 들어가는 기준이 키값으로 나뉘게 되는데, 같은 주문번호를 가진 메세지들끼리 특정 파티션에 쌓이게 됩니다.
하지만 파티션 내부에서는 순서보장이 되지만, 여러 파티션간에는 순서보장이 되지않아 먼저 메세지가 발급되었더라도, 뒤늦게 컨슈밍되는 경우가 발생합니다.
해당 메세지들은 아래 메소드에서 컨슈밍 됩니다.
@KafkaListener(topics = "orders", groupId = "group_id")
public void orderToOrder(ConsumerRecord<String, String> message, Acknowledgment ack) {
try {
queue.add(message);
while (!queue.isEmpty()) {
ConsumerRecord<String, String> record = queue.poll();
executorService.submit(() -> {
try {
log.info("record: {}, {}, {}", record.timestamp(), record.value(), record.key());
orderService.tryOrder(record.key());
} catch (Exception e) {
log.error("Error processing message", e);
}
});
}
ack.acknowledge();
} catch (Exception e) {
log.error("Error processing messages", e);
}
}
따라서 저는 메세지가 발급된 타임스탬프 기준으로 메세지들을 입력받았는데요. 그렇게 되면 멀티 파티션끼리도 순서를 보장할 수 있게 됩니다.
하지만,
하지만 아직 뭔가 완벽하게 동시성 처리를 수행했다고 하기에는 잘 이해가 되지않습니다.
주문 ID순으로 처리가 되고 있다고 해서, 동시성이 실제로 처리되는지 확인할 수 없기 때문입니다.
아직 완벽하게 마무리했다고 하지 못할 것같습니다.
따라서 UI for Kafka로 실제 카프카에 발급되는 메세지들을 확인하거나, 다른 추가적인 방법을 더 생각해보고 코드를 개선해볼 생각입니다.
'나의 공부' 카테고리의 다른 글
Cache 탐험 (0) | 2023.08.28 |
---|---|
Kafka와 Redis를 활용한 재고에 대한 동시성 처리기 - 3 (0) | 2023.07.19 |
kafka를 활용한 재고에 대한 동시성 처리기 - 1 (0) | 2023.07.16 |
NCP에서 크레딧을 지원받았다.. (1) | 2023.06.19 |
제네릭이란 뭘까.. (0) | 2023.05.18 |
댓글