broker
Broker
여러 producer가 보낸 메시지를 디스크에 저장하고, 패턴에 맞는 consumer로 전달하는 중앙 모듈입니다. TopicQueue, 세그먼트, 체크포인트, DLQ가 모두 들어 있습니다.
개요
broker는 mmmq의 핵심 모듈입니다.
POST /messages로 들어온 메시지를 토픽별 TopicQueue에 fsync와 함께 기록하고,
등록된 Dispatcher가 패턴이 맞는 토픽의 메시지를 consumer로 전달합니다.
저장 구조는 토픽당 segment 파일(.mmm)과 인덱스 파일(.idx)이 한 쌍입니다.
인덱스가 곧 가시성 경계(commit point) 역할을 하고, dispatcher별 체크포인트로 소비 위치를 따로 관리합니다.
참고
토픽당 라이터를 하나만 허용하기 때문에 메시지 순서가 자연스럽게 지켜집니다.
동시 읽기는 잠금 없이 처리되며, 인플라이트 항목은 인덱스 카운트로 격리됩니다.
기능
- 디스크 영속화 — FileChannel + fsync. CRC32C 체크섬으로 손상 감지
- 토픽별 단일 라이터 — ReentrantLock 기반 append, 동시 읽기 무잠금
- 세그먼트 회전 —
segment.max-bytes초과 시 새 세그먼트 생성 - 인덱스 기반 복구 — 부팅 시 인덱스가 source of truth, 부분 쓰기는 폐기
- Dispatcher 라우팅 — Ant 스타일 패턴 매칭으로 consumer로 전달
- 두 단계 재시도 — NACK 재시도(기본 3) + 네트워크 재시도(지수 백오프, 무한)
- DLQ 내장 — Counter / Timer 정책 두 가지 기본 제공, 다중 등록 가능
설치
JitPack 저장소와 broker 아티팩트를 추가합니다. broker는 보통 별도의 Spring Boot 애플리케이션으로 띄웁니다.
build.gradle
build.gradle
repositories { mavenCentral() maven { url "https://jitpack.io" } } dependencies { implementation "org.springframework.boot:spring-boot-starter-web" implementation "com.github.moko-meringue.mmmq:broker:0.0.1" }
설정
application.yml
application.yml
mmmq: broker: storage: root-dir: ./data # 토픽 데이터 루트 segment: max-bytes: 67108864 # 64 MiB 소프트 캡
application.properties (대안)
application.properties
mmmq.broker.storage.root-dir=./data mmmq.broker.segment.max-bytes=67108864
설정 키 표
| 키 | 타입 | 기본값 | 설명 |
|---|---|---|---|
mmmq.broker.storage.root-dir |
String | ./data |
토픽별 데이터·인덱스·체크포인트 루트 디렉토리. |
mmmq.broker.segment.max-bytes |
long | 67108864 |
세그먼트 회전 소프트 캡. 초과 시 새 segment 파일 생성. |
Dispatcher 빈 등록
Java · @Configuration
@Configuration public class DispatcherConfig { @Bean public Dispatcher orderDispatcher() { return new Dispatcher( "order-dispatcher", // 체크포인트 파일명 new Host(WebProtocol.HTTP, "localhost", 8081), // consumer 엔드포인트 List.of(new TopicPattern("order.*")) // 매칭 패턴 ); } }
DLQ 빈 등록
Java · DLQ
@Bean public DeadLetterQueue counterDlq(DeadLetterHandler handler) { return new CounterDeadLetterQueue("counter-dlq", handler, 50); } @Bean public DeadLetterHandler deadLetterFileWriter() { return new DeadLetterFileWriter(Paths.get("./dlq"), "order-dlq"); }
사용 예제
broker는 보통 별도의 Spring Boot 애플리케이션으로 띄웁니다.
Java · @SpringBootApplication
@SpringBootApplication public class BrokerApplication { public static void main(String[] args) { SpringApplication.run(BrokerApplication.class, args); } }
팁
storage.root-dir는 broker 프로세스 바깥의 안정적인 디스크에 둬 주세요.
컨테이너로 운영할 때는 볼륨 마운트가 필수입니다.
API 레퍼런스
핵심 클래스
| 클래스 | 설명 |
|---|---|
FrontDispatcher |
들어오는 메시지를 TopicQueue로 라우팅하는 진입점. |
TopicQueue |
토픽별 영속 큐. offer/peek/commit/subscribe 제공. |
Dispatcher |
패턴 매칭된 토픽의 메시지를 consumer로 전달. |
DeadLetterQueue |
NACK 재시도 실패 메시지 큐 인터페이스. |
CounterDeadLetterQueue |
누적 N건마다 핸들러 호출. |
TimerDeadLetterQueue |
일정 주기마다 핸들러 호출. |
DeadLetterHandler |
DLQ가 모은 메시지를 처리하는 콜백. |
CorruptionException |
CRC 검증 실패 시 발생, 단일 항목 격리. |
주요 이벤트
| 이벤트 | 발생 시점 |
|---|---|
TopicQueueInitializedEvent |
새 토픽 큐가 생성될 때. |
MessageArrivedEvent |
새 메시지가 영속화 완료된 직후. |