본문
LLM (Ollama)가 동작하지 않을 때 Resilience4j를 사용하여 재시도 처리 로직을 추가했지만 DLQ 로직은 구현되지 않았었다.
DLQ (Dead Letter Queue)
메시지 큐 시스템(Kafka, RabbitMQ, SQS 등)이나 이벤트 기반 아키텍처에서 처리 실패한 메시지를 따로 보관하는 큐로 장애나 외로 인해 메시지가 실패했을 때 재시도해도 계속 실패하는 메시지들을 나중에 재처리(reprocess)하거나 분석하기 위해 DLQ에 보관
운영자는 DLQ에 쌓인 메시지를 모니터링 / 복구 / 재처리
DLQ가 해결하는 점
1. 실패 메시지를 격리시켜 서비스 정상화
2. DLQ에 보관해 나중에 로그·데이터 분석 가능
3. DLQ로 분리해서 다른 메시지 처리는 정상 진행
DLQ를 Kafka로 구현해보려 하지만 아직 사용이 미숙하여 DB에 로그를 저장 후 일정 시간 간격에 스케줄러로 재시도 처리하는 로직으로 임시 구현하기로 함
구현 방식
1. AiResponder에서 실패 시 호출되는 메서드를 단순 로그 출력이 아닌 DlqMessage를 생성해 DB에 저장
2. 스케줄러가 주기적으로 DB에 저장된 DLQ 로그를 보고 재시도 처리
@Service
@RequiredArgsConstructor
@Slf4j
public class AiResponder {
private final CommentService commentService;
private final ChatClient chatClient;
private final DlqMessageRepository dlqMessageRepository;
public void aiAnswerComment(User user, Inquiry inquiry) {
String answer = chatClient
.prompt()
.user(inquiry.getContent())
.call()
.content();
if (answer == null || answer.isBlank()) {
throw new RetryableAiException("Blank content from LLM");
}
commentService.saveAiComment(answer, user, inquiry);
}
// 3회 실패 시 호출되어 실행하는 메서드
@Transactional
public void sendToDlq(UUID inquiryId, Throwable cause) {
String causeMsg = cause == null ? "Unknown error" : cause.getClass().getName() + ": " + cause.getMessage();
DlqMessage dlq = DlqMessage.createDlq(DlqStatus.PENDING, causeMsg, inquiryId);
dlqMessageRepository.save(dlq);
}
}
Ollama 연결과 Resilience4j 설정의 경우 링크 참조
https://kimfishes.tistory.com/3
코드
DlqMessage
@Entity
@Getter
@NoArgsConstructor(access = AccessLevel.PROTECTED)
public class DlqMessage {
@Id
@GeneratedValue(strategy = GenerationType.UUID)
private UUID id;
@Column(columnDefinition = "TEXT")
private String cause;
@Enumerated(EnumType.STRING)
@Column(nullable = false)
private DlqStatus dlqStatus;
@Column(nullable = false)
private UUID inquiryId;
@CreationTimestamp
private LocalDateTime createdAt;
@UpdateTimestamp
private LocalDateTime updatedAt;
private LocalDateTime nextRunTime;
private int retryCount = 0;
public DlqMessage(DlqStatus dlqStatus, String cause, UUID inquiryId) {
this.dlqStatus = dlqStatus;
this.cause = cause;
this.inquiryId = inquiryId;
}
public static DlqMessage createDlq(DlqStatus dlqStatus, String cause, UUID inquiryId) {
DlqMessage dlqMessage = new DlqMessage(dlqStatus, cause, inquiryId);
dlqMessage.nextRunTime = null;
return dlqMessage;
}
public void updateDlqStatus(DlqStatus dlqStatus) {
this.dlqStatus = dlqStatus;
}
public void updateNextRunTime(LocalDateTime nextRunTime) {
this.nextRunTime = nextRunTime;
}
public void retryUpCount(){
retryCount++;
}
}
public enum DlqStatus {
PENDING, FAILED, SUCCESS, PROCESSING
}
RecoveryService (스케줄러에서 호출 될 예정)
@Service
@RequiredArgsConstructor
public class RecoveryService {
private final AiResponder aiResponder;
private final InquiryRepository inquiryRepository;
private final UserRepository userRepository;
@Transactional
public void reprocessOnce(UUID inquiryId) {
Inquiry inquiry = inquiryRepository.findById(inquiryId).orElseThrow(() -> new IllegalStateException("Inquiry not found: " + inquiryId));
User user = userRepository.findById(inquiry.getUserId()).orElseThrow(() -> new BusinessException(ErrorCode.USER_NOT_FOUND));
// 기존 AI 답변 로직 재실행
aiResponder.aiAnswerComment(user, inquiry);
}
}
DlqMessageRepository
- 재처리 가능한 DLQ 메시지들을 비관적 락(PESSIMISTIC_WRITE)**을 걸어 조회
- 아직 재시도 횟수가 한도(maxCount) 미만이고, 재실행 가능 시점(nextRunTime)이 도래한 DLQ 메시지”를 다른 스레드와 동시에 처리되지 않도록 락을 걸고 가져오는 역할
@Repository
public interface DlqMessageRepository extends JpaRepository<DlqMessage, UUID> {
@Lock(LockModeType.PESSIMISTIC_WRITE)
@Query("""
select d
from DlqMessage d
where d.dlqStatus = :status
and d.retryCount < :maxCount
and d.nextRunTime is null or d.nextRunTime <= :nowTime
order by coalesce(d.nextRunTime, d.createdAt) asc, d.createdAt asc
""")
List<DlqMessage> pendingDlqLock(@Param("status") DlqStatus status,@Param("maxCount")int maxCount, @Param("nowTime") LocalDateTime nowTime, Pageable pageable);
// 한 번도 시도 안 한 DLQ → 맨 앞에 위치 시키고 이미 시도했지만 백오프된 DLQ → nextRunTime이 이른 순서대로
// coalesce(A, B)은 A가 null이면 B를 대신 써라 라는 뜻이며 끝에 d.createdAt asc를 한번 더 붙여 값이 같은 경우 이 방식으로 정렬
// 만약 null이면 createAt를 사용하니 여러 값들이 같은 상태이기 때무
// 엔티티 객체를 건드리지 않고 DB에 직접 쿼리를 날려 DB와 JPA 캐시의 상태 불일치로 인한 버그를 예방하는 역할
// 상태를 PROCESSING으로 바꾸고 곧바로 같은 트랜잭션에서 dlqMessage.getDlqStatus()를 호출해도 최신 상태 반영
@Modifying(clearAutomatically = true, flushAutomatically = true)
@Query("""
update DlqMessage d
set d.dlqStatus = :to
where d.id in :ids and d.dlqStatus= :from
""")
int changeStatus(@Param("ids")List<UUID>ids, @Param("from") DlqStatus from, @Param("to") DlqStatus to);
}
recoverDlq
- 스케줄러로 총 3번 재시도 처리를 하게 됨
@Component
@RequiredArgsConstructor
public class recoverDlq {
private static final int BATCH_SIZE = 50; // 실패 작업을 한번에 몇 개씩 재시도 할지
private static final int MAX_ATTEMPTS = 3; // 3번 하고도 실패면 종결
private final DlqMessageRepository dlqMessageRepository;
private final RecoveryService recoveryService;
// ⏱ 매 1분마다 실행 (Asia/Seoul)
@Scheduled(cron = "0 * * * * *", zone = "Asia/Seoul")
@Transactional
public void recoverDlq() {
LocalDateTime now = LocalDateTime.now();
// 1) 지금 처리할 차례가 된 것들만 잠그며 소량 집기
List<DlqMessage> dlqMessages = dlqMessageRepository.pendingDlqLock(DlqStatus.PENDING, MAX_ATTEMPTS, now, PageRequest.of(0, BATCH_SIZE));
if (dlqMessages.isEmpty()) return;
// 2) 집자마자 PROCESSING으로 전이 (중복 집기 차단)
List<UUID> ids = dlqMessages.stream().map(DlqMessage::getId).toList();
int changed = dlqMessageRepository.changeStatus(ids, DlqStatus.PENDING, DlqStatus.PROCESSING);
if (changed == 0) return;
// 3) 각 메시지 처리
for (DlqMessage dlqMessage : dlqMessages) {
try {
recoveryService.reprocessOnce(dlqMessage.getInquiryId());
dlqMessage.updateDlqStatus(DlqStatus.SUCCESS);
} catch (Exception e) {
// 위에서 재시도 처리를 하고 성공하면 냅두지만 실패 시 Exception이 발생해 로직들 수행
int retryCount = dlqMessage.getRetryCount() + 1;
dlqMessage.retryUpCount();
if (retryCount >= MAX_ATTEMPTS) {
dlqMessage.updateDlqStatus(DlqStatus.FAILED); // 그냥 실패로 처리하고 더는 재처리 하지 않음
// TODO 관리자에게 알림 추가 예정
}
else {
dlqMessage.updateDlqStatus(DlqStatus.PENDING);
dlqMessage.updateNextRunTime(now.plus(backoff(retryCount)));
}
}
}
}
private Duration backoff(int retryCount) {
return switch (retryCount) {
case 1 -> Duration.ofMinutes(1); // 1번째 실패 후: +1분
case 2 -> Duration.ofMinutes(5); // 2번째 실패 후: +5분
default -> Duration.ofMinutes(30); // 3번째 실패 후: +30분 (다음이 마지막 시도)
};
}
}
'Spring Boot > LLM' 카테고리의 다른 글
| PGVector Window 설치 방법 (0) | 2026.04.07 |
|---|---|
| RAG 구현 전 Vector DB 선택 (PGVector VS Qdrant 중 무엇이 좋을까?) (0) | 2026.04.07 |
| SpringAI 사용해서 LLM (Ollama) 연결 + Resilience4j 도입 (Spring Boot) (0) | 2025.10.20 |