broker

Broker

여러 producer가 보낸 메시지를 디스크에 저장하고, 패턴에 맞는 consumer로 전달하는 중앙 모듈입니다. TopicQueue, 세그먼트, 체크포인트, DLQ가 모두 들어 있습니다.

개요

broker는 mmmq의 핵심 모듈입니다. POST /messages로 들어온 메시지를 토픽별 TopicQueue에 fsync와 함께 기록하고, 등록된 Dispatcher가 패턴이 맞는 토픽의 메시지를 consumer로 전달합니다.

저장 구조는 토픽당 segment 파일(.mmm)과 인덱스 파일(.idx)이 한 쌍입니다. 인덱스가 곧 가시성 경계(commit point) 역할을 하고, dispatcher별 체크포인트로 소비 위치를 따로 관리합니다.

참고
토픽당 라이터를 하나만 허용하기 때문에 메시지 순서가 자연스럽게 지켜집니다. 동시 읽기는 잠금 없이 처리되며, 인플라이트 항목은 인덱스 카운트로 격리됩니다.

기능

  • 디스크 영속화 — FileChannel + fsync. CRC32C 체크섬으로 손상 감지
  • 토픽별 단일 라이터 — ReentrantLock 기반 append, 동시 읽기 무잠금
  • 세그먼트 회전segment.max-bytes 초과 시 새 세그먼트 생성
  • 인덱스 기반 복구 — 부팅 시 인덱스가 source of truth, 부분 쓰기는 폐기
  • Dispatcher 라우팅 — Ant 스타일 패턴 매칭으로 consumer로 전달
  • 두 단계 재시도 — NACK 재시도(기본 3) + 네트워크 재시도(지수 백오프, 무한)
  • DLQ 내장 — Counter / Timer 정책 두 가지 기본 제공, 다중 등록 가능

설치

JitPack 저장소와 broker 아티팩트를 추가합니다. broker는 보통 별도의 Spring Boot 애플리케이션으로 띄웁니다.

build.gradle

build.gradle
repositories {
    mavenCentral()
    maven { url "https://jitpack.io" }
}

dependencies {
    implementation "org.springframework.boot:spring-boot-starter-web"
    implementation "com.github.moko-meringue.mmmq:broker:0.0.1"
}

설정

application.yml

application.yml
mmmq:
  broker:
    storage:
      root-dir: ./data             # 토픽 데이터 루트
    segment:
      max-bytes: 67108864          # 64 MiB 소프트 캡

application.properties (대안)

application.properties
mmmq.broker.storage.root-dir=./data
mmmq.broker.segment.max-bytes=67108864

설정 키 표

타입 기본값 설명
mmmq.broker.storage.root-dir String ./data 토픽별 데이터·인덱스·체크포인트 루트 디렉토리.
mmmq.broker.segment.max-bytes long 67108864 세그먼트 회전 소프트 캡. 초과 시 새 segment 파일 생성.

Dispatcher 빈 등록

Java · @Configuration
@Configuration
public class DispatcherConfig {

    @Bean
    public Dispatcher orderDispatcher() {
        return new Dispatcher(
            "order-dispatcher",                                     // 체크포인트 파일명
            new Host(WebProtocol.HTTP, "localhost", 8081),       // consumer 엔드포인트
            List.of(new TopicPattern("order.*"))                // 매칭 패턴
        );
    }
}

DLQ 빈 등록

Java · DLQ
@Bean
public DeadLetterQueue counterDlq(DeadLetterHandler handler) {
    return new CounterDeadLetterQueue("counter-dlq", handler, 50);
}

@Bean
public DeadLetterHandler deadLetterFileWriter() {
    return new DeadLetterFileWriter(Paths.get("./dlq"), "order-dlq");
}

사용 예제

broker는 보통 별도의 Spring Boot 애플리케이션으로 띄웁니다.

Java · @SpringBootApplication
@SpringBootApplication
public class BrokerApplication {
    public static void main(String[] args) {
        SpringApplication.run(BrokerApplication.class, args);
    }
}
storage.root-dir는 broker 프로세스 바깥의 안정적인 디스크에 둬 주세요. 컨테이너로 운영할 때는 볼륨 마운트가 필수입니다.

API 레퍼런스

핵심 클래스

클래스 설명
FrontDispatcher 들어오는 메시지를 TopicQueue로 라우팅하는 진입점.
TopicQueue 토픽별 영속 큐. offer/peek/commit/subscribe 제공.
Dispatcher 패턴 매칭된 토픽의 메시지를 consumer로 전달.
DeadLetterQueue NACK 재시도 실패 메시지 큐 인터페이스.
CounterDeadLetterQueue 누적 N건마다 핸들러 호출.
TimerDeadLetterQueue 일정 주기마다 핸들러 호출.
DeadLetterHandler DLQ가 모은 메시지를 처리하는 콜백.
CorruptionException CRC 검증 실패 시 발생, 단일 항목 격리.

주요 이벤트

이벤트 발생 시점
TopicQueueInitializedEvent 새 토픽 큐가 생성될 때.
MessageArrivedEvent 새 메시지가 영속화 완료된 직후.