broker

Broker

producer가 보낸 메시지를 디스크에 영속화하고 consumer에게 전송하는 모듈입니다.

개요

producer로부터 받은 메시지를 디스크에 영속화하고, consumer에게 전송하는 모듈입니다.

요구사항

mmmq를 사용하려면 다음 환경이 필요합니다.

항목 최소 버전
Java 17
Spring Boot 3.2.0

설치

broker는 JitPack으로 배포됩니다.
build.gradle에 의존성을 추가해 주세요.

build.gradle (Groovy DSL)

build.gradle
repositories {
    mavenCentral()
    maven { url "https://jitpack.io" }
}

dependencies {
    implementation "com.github.moko-meringue.mmmq:broker:0.0.2"
}

build.gradle.kts (Kotlin DSL)

build.gradle.kts
repositories {
    mavenCentral()
    maven("https://jitpack.io")
}

dependencies {
    implementation("com.github.moko-meringue.mmmq:broker:0.0.2")
}

Broker

Broker는 producer로부터 메시지를 받아 TopicQueue에 저장하는 클래스입니다.
Spring 자동 구성으로 빈이 등록되므로 사용자가 직접 정의할 필요는 없습니다.

POST /messages 엔드포인트

Broker@PostMapping("/messages")를 통해 메시지를 수신합니다.
producer는 이 경로로 메시지를 보내고, broker는 메시지를 TopicQueue에 영속화한 뒤 ACK 혹은 NACK을 응답합니다.

응답
{ "acknowledgement": "ACK" }

응답

응답 의미
ACK 메시지가 디스크에 영속화되었습니다.
NACK broker가 디스크 쓰기에 실패했습니다. 메시지는 저장되지 않았습니다.

TopicQueue

TopicQueue는 토픽 별 메시지를 저장하는 append-only 영속 큐입니다.
같은 토픽으로 들어온 메시지는 모두 해당 토픽의 TopicQueue에 도착 순서대로 영속화돠고, 하나 이상의 Dispatcher가 자신의 오프셋을 통해 메시지를 소비합니다.
중앙 집중식 구조 덕분에, 메시지 생산과 소비과 완전히 분리되고, 메시지 순서가 보장되며, Dispatcher는 독립적으로 메시지를 소비할 수 있습니다.

메시지 영속화

TopicQueue는 메시지를 디스크에 영속화합니다.

메시지가 디스크에 어떤 형태로 보관되는지에 대한 자세한 내용은 메시지 영속화 항목을 참고하세요.

오프셋 기반 메시지 소비

TopicQueue에 쌓이는 각 메시지는 오프셋이라는 절대 순번으로 식별됩니다.
토픽의 첫 메시지는 0, 다음은 1, 그 다음은 2 형태로 도착 순서에 따라 1씩 증가합니다.

오프셋 읽기 모델
TopicQueue - order.new
┌──────┬──────┬──────┬──────┬──────┬──────┬─────┐
│ msg  │ msg  │ msg  │ msg  │ msg  │ msg  │ ... │   ← 메시지
└──────┴──────┴──────┴──────┴──────┴──────┴─────┘
0      1      2      3      4      5                ← 오프셋

메시지 읽기는 오프셋을 통해 이루어집니다.

오프셋을 사용해 TopicQueue에서 메시지를 읽는 자세한 과정은 메시지 영속화 항목을 참고하세요.

동적 TopicQueue 생성

등록되어 있지 않은 토픽의 메시지가 도착하면 broker는 새 TopicQueue를 생성합니다.

동적 생성 모델의 장점

  • 리플레이: consumer가 토픽보다 나중에 생성되어도 메시지가 디스크에 그대로 쌓여 있습니다. 오프셋 0부터 시작하면 그동안의 모든 메시지를 리플레이할 수 있습니다.
  • 운영 결합도 감소: 토픽을 미리 등록하는 단계가 없으므로, 새 토픽이 도입되어도 broker를 업데이트하거나 재시작할 필요가 없습니다.

동시성 모델

  • 쓰기: TopicQueue는 동시에 하나의 쓰기를 진행합니다 (ReentrantLock 기반).
    메시지는 도착한 순서대로 디스크에 영속화되며, 같은 토픽 안에서 메시지 순서가 보존됩니다.
  • 읽기: 읽기는 잠금 없이 진행됩니다.
    쓰기 도중 읽기가 발생하더라도, 애플리케이션 레벨에서 영속화가 완료된 메시지만 노출하므로, 적재 중인 메시지는 안전하게 보호됩니다.

Dispatcher

DispatcherTopicQueue에 영속화된 메시지를 consumer에게 전송하는 클래스입니다.
TopicPattern들을 구독하고, 구독 중인 TopicPattern들에 매칭되는 TopicQueue의 메시지를 consumer에게 전송합니다.

여러 Dispatcher를 동시에 빈으로 등록할 수 있고, 각 Dispatcher는 자신의 TopicPattern들에 따라 독립적으로 메시지를 전송합니다.

Spring Bean 등록

DispatcherConfig.Java
@Configuration
public class DispatcherConfig {

    @Bean
    public Dispatcher orderDispatcher() {
        Host consumerHost = new Host(WebProtocol.HTTP, "localhost", 8081);
        TopicPattern orderPattern = new TopicPattern("order.**");
        TopicPattern paymentPattern = new TopicPattern("payment.*");
        return new Dispatcher(
                "order-dispatcher",
                consumerHost,
                List.of(orderPattern, paymentPattern)
        );
    }

    @Bean
    public Dispatcher notificationDispatcher() {
        Host consumerHost = new Host(WebProtocol.HTTP, "localhost", 8082);
        TopicPattern notificationPattern = new TopicPattern("notification.**");
        return new Dispatcher(
                "notification-dispatcher",
                consumerHost,
                List.of(notificationPattern)
        );
    }
}

생성자 매개변수

매개변수 타입 설명
name String Dispatcher 식별자로, [A-Za-z0-9._-]+ 형식이어야 합니다
host Host 메시지를 전송할 consumer의 Host
patterns List<TopicPattern> 구독할 TopicPattern 목록

TopicQueue 구독 모델

Dispatcher는 자신의 TopicPattern들과 매칭되는 모든 TopicQueue를 구독합니다.

Dispatcher는 구독한 TopicQueue 별로 두 가지 자원을 관리합니다.

  • 오프셋: TopicQueue에서 다음에 읽을 메시지의 오프셋입니다.
    메시지 전송이 끝날 때마다 커밋을 통해 다음 값(+1)으로 업데이트합니다.
  • 워커 스레드: TopicQueue를 전담하는 스레드입니다.
    워커는 TopicQueue의 모든 메시지를 처리한 뒤 종료되고, 새 메시지가 도착하면 다시 실행됩니다.
    같은 TopicQueue의 메시지를 순서대로 직렬 처리하므로, 메시지 순서가 보장됩니다.

broker 구동 시

broker가 시작되면 모든 Dispatcher는 자신이 가진 TopicPattern들과 매칭되는 모든 TopicQueue를 구독합니다.
구독한 TopicQueue마다 오프셋과 워커 하나가 할당되고, 메시지 전송이 시작됩니다.

새 TopicQueue 생성 시

TopicQueue가 생성되면, TopicQueue초기화 이벤트가 발생합니다.
모든 Dispatcher는 이 이벤트를 수신하고, 새 TopicQueue의 토픽과 자신의 TopicPattern들을 비교해 매칭 여부를 판단합니다.
만약 자신의 TopicPattern과 새 TopicQueue의 토픽이 매칭된다면, 그 Dispatcher는 새 TopicQueue를 구독합니다.

메시지 도착 시

구독 중인 TopicQueue에 메시지가 도착하면 워커가 그 메시지를 읽어 consumer에게 전송합니다.
ACK을 받으면 오프셋을 커밋하고 다음 메시지를 처리합니다.
더 이상 처리할 메시지가 없으면 다음 메시지가 도착하기 전까지 대기합니다.

DispatcherTopicQueue에서 메시지를 읽어 오는 과정에 대한 자세한 내용은 메시지 영속화 항목을 참고하세요.

재시도와 지수적 백오프

consumer가 NACK을 반환하면, Dispatcher는 메시지를 재전송합니다.
메시지는 최대 세 번 재전송되며, 전부 실패하면 로그를 남기고 다음 메시지를 소비합니다.

consumer가 정상 응답(ACK 또는 NACK)을 반환하지 못하면 Dispatcher는 지수적 백오프 전략으로 메시지를 재전송합니다.
네트워크 단절이나 HTTP 4xx/5xx 응답이 해당합니다.
대기 시간은 1초에서 시작해 두 배씩 늘어나 최대 60초까지 길어지며, 정상 응답이 돌아올 때까지 재시도가 반복됩니다.

메시지 영속화

broker는 producer가 보낸 메시지를 토픽 별 디렉토리의 세그먼트 파일과 인덱스 파일에 나누어 영속화합니다.
디스크 동기화(fsync)가 완료된 후 producer에게 ACK을 반환하므로, ACK을 받은 메시지는 디스크에 영속화된 상태로 남습니다.
OS, 하드웨어, JVM 프로세스가 갑작스럽게 중단되더라도, ACK을 받은 메시지의 영속성은 보장됩니다.

디렉토리 레이아웃

루트 디렉토리 아래에 토픽 별 하위 디렉토리가 만들어지고, 각 토픽 디렉토리 안에 필요한 파일이 함께 들어 있습니다.

파일 트리
./data/                             # 기본 루트 디렉토리 (설정으로 변경 가능)
├── order.created/
│   ├── 0000000000000000000.mmm
│   ├── 0000000000000000000.idx
│   ├── 0000000000000123456.mmm
│   ├── 0000000000000123456.idx
│   └── checkpoints/                # Dispatcher별 체크포인트 디렉토리
│       └── order-created-dispatcher.checkpoint
│       └── order-completed-dispatcher.checkpoint
└── notification.sent/
    ├── 0000000000000000000.mmm
    ├── 0000000000000000000.idx
    └── checkpoints/
        └── notification-dispatcher.checkpoint

각 파일의 역할은 다음과 같습니다.

파일 역할
.mmm 세그먼트 파일
.idx 세그먼트 별 오프셋 인덱스 파일
.checkpoint Dispatcher 별 체크포인트 파일

루트 디렉토리는 다음 설정으로 변경할 수 있습니다.

application.yml
mmmq:
  broker:
    storage:
      root-dir: ./data

세그먼트

토픽 큐의 메시지는 시간이 흐르면서 무한히 쌓일 수 있습니다.
이를 한 파일에 모두 담으면 단일 파일이 한없이 커지므로, broker는 일정 크기마다 세그먼트를 만들어 분할 보관합니다.

세그먼트는 토픽 큐를 일정 크기 단위로 잘라 보관하는 append-only 디스크 파일입니다.
세그먼트는 메시지를 저장하는 세그먼트 파일과 오프셋 인덱스로 구성됩니다.
각 세그먼트의 파일명은 그 세그먼트가 시작하는 오프셋(19자리)으로 이루어집니다 (예: 0000000000000001234.mmm).

각 메시지는 세그먼트 파일 안에 entry로 직렬화되어 저장됩니다.

Entry 구조
┌──────────┬─────────┬─────────────────────────┐
│  length  │   CRC   │  message bytes (JSON)   │
│   4 B    │   4 B   │       length - 4 B      │
└──────────┴─────────┴─────────────────────────┘
  • length: CRC와 메시지 바이트의 합.
  • CRC: 메시지 바이트 전체에 대한 CRC32C 체크섬.
  • message bytes: Jackson으로 직렬화한 Message의 JSON 바이트.

영속화 레벨은 fsync와 영속성 보장 항목을 참고하세요.

세그먼트 체인

한 토픽 큐가 가진 모든 세그먼트는 세그먼트 체인으로 구성됩니다.
세그먼트 체인은 토픽 큐의 세그먼트들을 시작 오프셋 순으로 정렬해 둔 자료구조로, 토픽 큐와 세그먼트 사이의 의사소통을 담당합니다.
하나의 세그먼트의 용량이 가득 차면 회전이 발생해 새 세그먼트가 만들어지고, 그 세그먼트가 세그먼트 체인 끝에 추가됩니다.
메시지는 항상 세그먼트 체인의 가장 마지막 세그먼트에 추가됩니다.

세그먼트 회전

세그먼트 체인은 세그먼트 용량이 일정 수준을 넘으면 새 세그먼트를 생성하는 회전을 수행합니다.
이후에 도착하는 메시지는 새 세그먼트에 저장되고, 이전 세그먼트에는 더 이상 메시지가 저장되지 않습니다.
새 세그먼트의 시작 오프셋은 직전 세그먼트의 시작 오프셋 + 직전 세그먼트가 보관한 메시지 개수를 더한 값으로 설정됩니다.

회전 검사는 메시지를 추가하기 직전에 이루어집니다.
검사 시점의 세그먼트 용량이 임계값 미만이면 메시지는 현재 세그먼트에 추가되고, 추가된 메시지의 크기만큼 파일 크기가 늘어나면서 세그먼트 용량 임계값을 초과할 수 있습니다.

세그먼트 회전
        [ Segment 1 ]                            [ Segment 2 ]
   (0000000000000001200.log)               (0000000000000001601.log)
┌─────────────────────────────┐         ┌─────────────────────────────┐
│     message 1200 ~ 1600     |  ────▶  |        message 1601 ~       |
└─────────────────────────────┘         └─────────────────────────────┘
      Size: 64MB (Closed)                     Size: 12KB (Active)

새 세그먼트 시작 오프셋 = 직전 세그먼트 시작 오프셋(1200) + 직전 세그먼트 보관 메시지 개수(401) = 1601

회전 임계값은 기본 64 MB로 설정되어 있으며, 설정을 통해 조정할 수 있습니다.

application.yml
mmmq:
  broker:
    segment:
      max-bytes: 67108864 # 64 MB

오프셋 인덱스

세그먼트에는 가변 길이의 메시지가 순서대로 저장됩니다.
특정 오프셋의 메시지를 찾기 위해선 세그먼트 파일의 첫 부분부터 순차적으로 탐색해야 합니다.
이 과정에서 수많은 시스템 콜과 디스크 탐색이 발생하므로, 세그먼트에 메시지가 쌓일수록 읽기 비용이 크게 증가합니다.

오프셋 인덱스는 이 비용을 최소화하기 위해 메시지 별 시작 byte 주소를 영속화해두는 별도 파일입니다.
오프셋 인덱스는 각 세그먼트별로 하나씩 존재하며, 세그먼트 파일과 같은 시작 오프셋을 공유하는 .idx 파일로 구성됩니다.

오프셋 인덱스는 오프셋에 해당하는 메시지의 세그먼트 파일 내 byte 주소를 저장합니다.
모든 값은 8 byte 고정 길이이므로, 세그먼트의 N번째 메시지의 byte 주소는 N × 8 byte 위치에서 읽을 수 있습니다.

.idx 파일 구조
       ┌──────────┬──────────┬──────────┬──────────┐
       │   3425   │   3837   │   4002   │   4233   │
       └──────────┴──────────┴──────────┴──────────┘
byte    0          8         16         24

오프셋 인덱스는 세그먼트에 보관된 메시지의 Single Source Of Truth입니다.
특정 오프셋의 메시지가 세그먼트에 존재하는지는 세그먼트가 아니라 오프셋 인덱스를 기준으로 판단합니다.
세그먼트에 메시지가 영속화되어 있어도 오프셋 인덱스에 해당 메시지의 byte 주소가 등록되어 있지 않으면, 메시지는 절대 노출되지 않습니다.
메시지가 세그먼트에 영속화된 후, 오프셋 인덱스까지 영속화가 완료되어야 producer에게 ACK가 반환됩니다.

오프셋 인덱스로 메시지 찾기

세그먼트 체인은 입력 오프셋에 해당하는 메시지를 찾기 위해 다음 단계를 수행합니다.

  1. 세그먼트 선택: 시작 오프셋이 입력 오프셋 이하인 세그먼트 중 시작 오프셋이 가장 큰 세그먼트를 선택합니다.
  2. 상대 오프셋 계산: 입력 오프셋 - 세그먼트의 시작 오프셋으로 세그먼트 내에서의 상대 오프셋을 구합니다.
  3. byte 주소 계산: 오프셋 인덱스에서, 상대 오프셋 × 8 byte 주소에 있는 long 값을 읽어 세그먼트 파일 내에서 메시지가 시작하는 byte 주소를 얻습니다.
  4. entry 읽어 역직렬화: 세그먼트 파일에서, byte 주소에 위치한 entry를 읽어 메시지를 역직렬화합니다. 세그먼트 entry 구조는 세그먼트 항목을 참고하세요.

체크포인트

체크포인트는 DispatcherTopicQueue의 메시지를 어디까지 처리했는지를 영속화한 파일입니다.
각 토픽 디렉토리의 checkpoints/ 하위에 <dispatcher 이름>.checkpoint 형태로 저장됩니다.

Dispatcher가 한 메시지의 처리를 마친 뒤 체크포인트의 오프셋을 다음 값으로 옮기는 동작을 커밋이라 부릅니다.
커밋이 디스크에 반영되는 시점과 그로 인한 at-least-once 전달 보장에 대한 자세한 내용은 커밋 시점과 fsync 항목을 참고하세요.

파일 위치
./data/order.created/checkpoints/order-dispatcher.checkpoint

체크포인트 파일은 단일 8 byte long 값을 담습니다.
이 값은 해당 Dispatcher가 다음에 읽을 메시지의 오프셋입니다.

토픽의 메시지 스트림 위에서 체크포인트가 가리키는 위치는 다음과 같습니다.

체크포인트 포인터
토픽의 메시지 스트림 (오프셋)

    ┌─────┬─────┬─────┬─────┬─────┬─────┬─────┐
    │  0  │  1  │  2  │  3  │  4  │  5  │ ... │
    └─────┴─────┴─────┴─────┴─────┴─────┴─────┘
                         ▲
                         │
            checkpoint = 3

체크포인트 값이 3이면 오프셋 0·1·2Dispatcher가 이미 처리한 메시지이고, 다음에 읽을 메시지는 오프셋 3입니다.

체크포인트는 Dispatcher 단위로 분리되어 있으므로, 같은 토픽을 여러 Dispatcher가 구독하더라도 각자의 진도를 독립적으로 관리합니다.

여러 Dispatcher의 독립 진도
order.created 토픽의 메시지 스트림 (오프셋)

    ┌─────┬─────┬─────┬─────┬─────┬─────┬─────┐
    │  0  │  1  │  2  │  3  │  4  │  5  │ ... │
    └─────┴─────┴─────┴─────┴─────┴─────┴─────┘
       ▲                 ▲           ▲
       │                 │           │
       │                 │           └─── billing-dispatcher.checkpoint = 5
       │                 └─── analytics-dispatcher.checkpoint = 3
       └─── audit-dispatcher.checkpoint = 0    아직 처음부터 읽는 중

같은 메시지가 세 개의 Dispatcher에 의해 서로 다른 시점에 처리될 수 있고, 한 Dispatcher가 뒤처져도 다른 Dispatcher의 진행에는 영향이 없습니다.

커밋 시점과 fsync

Dispatcher 워커는 한 메시지를 consumer에게 전송하고 ACK을 받은 직후 TopicQueuecommit(name, offset)을 호출합니다.
이 호출은 해당 Dispatcher.checkpoint 파일 0번 위치에 다음 오프셋(8 byte long)을 덮어쓰고 fsync를 호출합니다.

commit 한 번의 디스크 작업
CheckpointFile.write(next)
  │
  ▼
ByteBuffer (8 byte long)
  │ writeFully(0, ..)
  ▼
.checkpoint               파일 첫 8 byte를 덮어쓰기
  │
  ▼
fsync (force(true))       디스크 매체까지 동기화
  │
  ▼
commit 반환 → 다음 오프셋으로 진행

체크포인트는 append가 아니라 같은 8 byte 영역을 매번 덮어쓰는 방식입니다.
파일 크기는 항상 8 byte이므로 디스크 사용량이 늘지 않으며, 절단된 entry나 누적된 garbage가 남을 일도 없습니다.
Dispatcher는 단 하나의 long 값으로 자신의 진행 위치 전체를 표현합니다.

fsync가 끝나야 commit이 반환되고, 그 직후에야 워커는 다음 오프셋의 메시지로 진행합니다.
덕분에 다음과 같은 보장이 성립합니다.

  • broker가 처리 도중 비정상 종료되더라도, 마지막으로 fsync가 끝난 오프셋이 .checkpoint에 그대로 남아 있습니다. 재시작 후 Dispatcher는 정확히 그 오프셋부터 다시 읽기 시작합니다.
  • 그 사이에 처리되었지만 commit이 끝나기 전에 종료된 메시지는 다시 한 번 consumer에게 전송됩니다 (at-least-once 전달).
  • consumer가 ACK을 보냈더라도 Dispatcher 측에서 fsync가 끝나기 전에 broker가 죽으면 그 메시지는 재전송됩니다.
주의
broker는 at-least-once 전달을 보장하므로, consumer 핸들러는 같은 메시지가 두 번 도착하더라도 결과가 동일하도록 멱등하게 구현해야 합니다.

NACK 재시도가 모두 소진되어 메시지가 넘어가는 경우에도 commit이 호출되므로 체크포인트는 다음 오프셋으로 이동합니다.
이 경우의 디스크 반영도 동일하게 fsync로 보장되며, 같은 메시지가 Dispatcher에 의해 다시 전송되지는 않습니다.

새 Dispatcher 등록

한 번도 등록된 적 없는 이름의 Dispatcher가 빈으로 등록되면, 해당 토픽의 첫 오프셋(0)부터 시작하는 새 체크포인트가 만들어집니다.
그 결과 디스크에 남아 있는 기존 메시지를 처음부터 다시 받게 되므로, 새 Dispatcher가 곧 토픽의 전체 이력에 대한 리플레이를 의미합니다.

주의
운영 중인 broker에 새 Dispatcher를 추가하면 그동안 디스크에 누적된 모든 메시지가 한꺼번에 consumer로 전송됩니다. consumer 측 부하와 멱등 처리 가능 여부를 미리 확인하세요.

메시지 탐색 과정

다음과 같이 두 세그먼트로 나뉜 토픽에서 절대 오프셋 1030의 메시지를 찾는 과정을 예로 듭니다.

  • 세그먼트 A: 파일명 0000000000000000000.mmm, 시작 오프셋 0, 메시지 1,024개 보관.
  • 세그먼트 B: 파일명 0000000000000001024.mmm, 시작 오프셋 1024, 그 이후 메시지 보관.
  1. 세그먼트 선택: 시작 오프셋이 1030 이하인 세그먼트 중 가장 큰 시작 오프셋은 1024이므로 세그먼트 B를 선택합니다.
  2. 상대 오프셋 계산: 1030 - 1024 = 6.
  3. byte 주소 계산: B의 .idx 파일에서 6 × 8 = 48 byte 위치의 8 byte long을 읽어 412를 얻습니다.
  4. entry 읽어 역직렬화: B의 .mmm 파일 byte 412부터 다음 순서로 처리합니다.
    • 4 byte를 읽어 length 값을 얻습니다.
    • 다음 4 byte를 읽어 저장된 CRC32C 체크섬을 얻습니다.
    • 다음 length - 4 byte를 읽어 메시지 JSON 바이트를 얻습니다.
    • 읽은 JSON 바이트로 CRC32C를 재계산해 저장된 체크섬과 비교합니다. 일치하지 않으면 CorruptionException으로 격리합니다.
    • 검증된 JSON 바이트를 Jackson으로 Message 객체로 역직렬화하여 반환합니다.
세그먼트 B에서 오프셋 1030 읽기
  .idx (세그먼트 B의 인덱스, 시작 오프셋 1024)
   ┌────────────┬────────────┬────────────┬────────────┐
   │     0      │     82     │    ...     │    412     │
   └────────────┴────────────┴────────────┴────────────┘
     entry 0      entry 1     entry 2~5     entry 6
     byte 0       byte 8                    byte 48
                                                │
                                                │ long 값 = 412 (.mmm 안 byte 주소)
                                                ▼
  .mmm (세그먼트 B, 시작 오프셋 1024)
   ┌────────────┬────────────┬────────────┬────────────┐
   │  entry 0   │  entry 1   │    ...     │  entry 6   │
   └────────────┴────────────┴────────────┴────────────┘
     byte 0       byte 82                   byte 412
                                                ▲
                                                │ 절대 오프셋 1030 메시지

fsync와 영속성 보장

fsync는 운영체제의 페이지 캐시에 머무르는 쓰기 데이터를 디스크 매체까지 강제로 내려보내는 시스템 콜입니다.
프로그램이 일반 write로 파일에 쓴 데이터는 곧장 디스크로 가지 않고 OS의 페이지 캐시에 우선 머무릅니다. OS는 자체 정책에 따라 나중에 디스크로 흘려보내므로, 그 사이에 OS나 하드웨어가 갑자기 멈추면 페이지 캐시의 내용이 디스크에 도달하지 못한 채 사라질 수 있습니다.

fsync는 페이지 캐시에 있는 해당 파일의 모든 쓰기를 디스크 장치까지 보낸 뒤에야 호출자에게 응답합니다.
fsync가 정상적으로 끝났다는 것은 OS·하드웨어 단위 장애가 일어나도 그 시점까지의 데이터가 디스크에 남아 있다는 뜻이며, broker는 이 보장 위에서 producer에게 ACK을 돌려줍니다.

자바에서는 FileChannel.force(true)가 이 fsync를 호출합니다.
인자 true는 파일 내용뿐 아니라 파일 길이와 같은 메타데이터까지 함께 동기화하라는 뜻이며, append 직후 파일 크기가 늘어난 상태도 함께 디스크에 반영되어야 하므로 broker의 모든 영속화 쓰기는 force(true)를 사용합니다.

broker가 fsync를 호출하는 지점은 다음 세 가지입니다.

파일 fsync 시점 의미
세그먼트 세그먼트 파일 한 메시지를 entry로 추가한 직후 메시지 바이트가 디스크에 영속화됩니다.
.idx 오프셋 인덱스 그 메시지의 byte 주소를 추가한 직후 가시성 카운트가 증가해 Dispatcher에게 노출됩니다.
.checkpoint Dispatcher가 다음 오프셋을 영속화한 직후 해당 오프셋 직전까지의 메시지 처리가 영속화됩니다.

한 메시지를 적재할 때 broker는 fsync를 다음 순서로 두 번 호출합니다.

한 메시지의 fsync 순서
1. SegmentFile.append      .mmm 에 entry 추가  ──fsync──▶ 디스크   메시지 바이트 영속화
                                  │
                                  ▼
2. OffsetIndexFile.append  .idx 에 byte 주소 추가  ──fsync──▶ 디스크   가시성 카운트 +1
                                  │
                                  ▼
3. broker → producer       ACK 응답

이 순서는 다음 두 가지를 동시에 보장합니다.

  • 데이터 우선: 메시지 바이트가 디스크에 안전하게 닿은 뒤에야 인덱스에 byte 주소가 등록됩니다. 인덱스에 보이는 entry는 항상 세그먼트에 실제로 존재하는 entry입니다.
  • 가시성은 인덱스 기준: 단계 1과 단계 2 사이에 비정상 종료가 발생하면 세그먼트에는 흔적이 남고 .idx에는 반영되지 않은 entry가 있을 수 있습니다. 그러나 가시성 카운트가 증가하지 않으므로 Dispatcher가 이를 읽어가는 일은 없으며, 다음 시작 시점에 그 영역은 폐기됩니다.

producer에게 ACK이 돌아가는 시점은 위 두 fsync가 모두 끝난 다음입니다.
따라서 ACK을 받은 메시지는 OS·하드웨어 단위 장애에도 디스크에 남아 있고, Dispatcher가 다음 시작 후 정확히 이어 읽어갑니다.

시작 복구

broker는 시작 시점에 데이터 루트 디렉토리를 스캔해, 그 안의 각 토픽 디렉토리마다 TopicQueue를 하나씩 만들어 등록합니다.
이전에 운영하던 모든 토픽이 시작 직후에 다시 활성화되며, 디스크에 남아 있는 메시지도 그대로 보존됩니다.

TopicQueue가 만들어지는 과정에서 그 안의 세그먼트 파일은 인덱스를 기준으로 삼아 자신의 길이를 맞춥니다.
인덱스에 영속화된 마지막 entry의 끝 위치보다 세그먼트 파일이 길면 그만큼을 잘라내고, 짧으면 손상으로 간주해 시작을 중단합니다.

시작 복구 흐름
broker 시작
       │
       ▼
data/ 루트 디렉토리 스캔
       │
       ├──▶ order.created/       ──▶ TopicQueue 생성
       ├──▶ notification.sent/   ──▶ TopicQueue 생성
       └──▶ ...                  ──▶ ...

       │
       ▼ 각 TopicQueue 내부에서 세그먼트마다 길이 검사

세그먼트 길이 vs 인덱스 마지막 entry 끝 위치
       │
       ├── 같음 ─────▶ 정상, 그대로 사용
       │
       ├── 더 김 ─────▶ truncate (인덱스 미등록 byte 폐기)
       │
       └── 더 짧음 ───▶ 손상, broker 시작 중단

이 절차는 fsync와 영속성 보장 항목에서 설명한 fsync 순서와 짝을 이룹니다.

세그먼트의 fsync는 끝났지만 .idx의 fsync 전에 비정상 종료가 일어난 경우, 다음 시작 시점에 세그먼트의 꼬리에 인덱스에 등록되지 않은 byte 영역이 남아 있을 수 있습니다.
broker는 이 영역을 절반만 쓰여 안전을 보장할 수 없는 데이터로 간주하고 truncate하며, 이 truncate 자체도 fsync로 디스크에 반영합니다.

이 절차 덕분에 broker가 비정상 종료된 뒤 다시 시작되어도 항상 인덱스 기준의 일관된 시점에서 이어 동작합니다.

무결성 검증

저장 시점에 메시지 바이트 전체에 대해 CRC32C 체크섬을 계산해 entry 헤더에 함께 영속화해 둡니다.
읽을 때마다 같은 알고리즘으로 다시 계산하여 저장된 값과 비교하고, 값이 다르면 해당 entry는 CorruptionException으로 격리됩니다.

이 검증은 디스크 비트 플립이나 부분 쓰기처럼 fsync 보장을 우회하는 저수준 손상까지 검출합니다.
체크섬은 entry마다 독립적이므로, 한 entry가 손상되어도 같은 세그먼트의 다른 entry나 다른 세그먼트의 entry는 영향을 받지 않습니다.

Dispatcher는 손상된 entry를 만나면 로그를 남기고 그 메시지만 건너뛴 뒤 다음 오프셋으로 넘어가므로, 단일 entry의 손상이 토픽 전체의 처리를 멈추지는 않습니다.

사용 예제

broker는 @SpringBootApplication 클래스 하나로 시작하고, Dispatcher@Configuration 안에 빈으로 등록합니다.

BrokerApplication.Java

BrokerApplication.Java
@SpringBootApplication
public class BrokerApplication {

    public static void main(String[] args) {
        SpringApplication.run(BrokerApplication.class, args);
    }
}

application.yml

application.yml
server:
  port: 8080

mmmq:
  broker:
    storage:
      root-dir: ./data
    segment:
      max-bytes: 67108864

DispatcherConfig.Java

DispatcherConfig.Java
@Configuration
public class DispatcherConfig {

    @Bean
    public Dispatcher orderDispatcher() {
        Host consumerHost = new Host(WebProtocol.HTTP, "localhost", 8081);
        TopicPattern pattern = new TopicPattern("order.*");
        return new Dispatcher("order-dispatcher", consumerHost, List.of(pattern));
    }

    @Bean
    public Dispatcher notificationDispatcher() {
        Host consumerHost = new Host(WebProtocol.HTTP, "localhost", 8082);
        TopicPattern pattern = new TopicPattern("notification.**");
        return new Dispatcher("notification-dispatcher", consumerHost, List.of(pattern));
    }
}