생산자 소비자 문제
생산자 소비자 문제
멀티스레드 환경에서 데이터를 만드는 쪽과 데이터를 쓰는 쪽의 속도가 다를 수 있다.
두 스레드가 같은 자원(버퍼)을 공유하면 ==경쟁 상태(Race Condition)==가 발생한다.
문제 탄생 배경
- 멀티스레드 환경에서 데이터를 만드는 쪽과 데이터를 쓰는 쪽의 속도가 다를 수 있다.
- 두 스레드가 같은 자원(버퍼) 을 공유하면 경쟁 상태가 발생한다.
- 목표
- 데이터 유실 없음
- CPU 낭비 없음
- 동기화 보장
기본 개념
| 명칭 |
역할 |
설명 |
| 생산자(Producer) |
데이터 생성 |
버퍼에 데이터를 저장한다. |
| 소비자(Consumer) |
데이터 소비 |
버퍼에 데이터를 꺼낸다. |
| 버퍼(Buffer) |
공유 자원 |
버퍼는 한정된 크기를 가지며, 생산자와 소비자가 이 버퍼를 통해 데이터를 주고받는다. |
문제 사항
| 문제 |
상황 |
문제 |
해결 방안 |
| 생산자가 너무 빠를 때 |
버퍼가 가득 차서 더 이상 데이터를 넣을 수 없을 때까지 생산자가 데이터를 생성한다. |
버퍼에 데이터가 가득 찼을 때 넣으면 오버플로우 / 데이터 유실이 발생할 수 있다. |
버퍼가 가득 찬 경우 생산자는 버퍼에 빈 공간이 생길 때까지 기다려야 한다. |
| 소비자가 너무 빠를 때 |
버퍼가 비어서 더 이상 소비할 데이터가 없을 때까지 소비자가 데이터를 처리한다. |
버퍼가 비어 있을 때 소비자가 꺼내려고 시도하면 CPU를 낭비하며 의미 없는 경쟁을 발생시키므로, 조건이 만족될 때까지 대기해야 한다. |
버퍼가 비어있을 때 소비자는 버퍼에 새로운 데이터가 들어올 때까지 기다려야 한다. |
Object - wait, notify
| 메서드 |
설명 |
| Object.wait() |
현재 스레드가 가진 락을 반납하고 대기하고, 해당 스레드는 wait set으로 들어간다. |
| Object.notify() |
wait set에 대기 중인 스레드 중 하나를 깨운다. |
| Object.notifyAll() |
wait set에 대기 중인 모든 스레드를 깨운다. |
한계
- 비효율
- 깨워야 할 스레드를 구분해서 깨울 수 없음
- 잘못 깨우는 경우 발생
- 스레드 기아
- 어떤 스레드가 깨어날지 보장되지 않음
- 특정 스레드가 계속 밀릴 수 있음
- 임시 해결
- notifyAll() 문제는 줄어들지만, 컨텍스트 스위칭 비용 증가
스레드 대기 집합(wait set)
정의
조건이 만족되지 않아 락을 반납하고 대기 중인 스레드들이
객체 상태 변화(notify)를 기다리는 공간이다
왜 필요한가?
“락을 반납한 채로, 객체의 상태(WAITING)가 바뀔 때까지 CPU를 쓰지 않고 기다리게 하기 위해” 필요하다.
예시 코드
public class Main {
private final Queue<String> queue = new ArrayDeque<>(); // 버퍼
private final int MAX; // 버퍼 최대 크기
public Main(int max) {
this.MAX = max;
}
public synchronized void put(String data) { // 생산자
while (queue.size() == MAX) { // 버퍼 가득참
try {
wait(); // RUNNABLE -> WAITING, 락 반납
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
queue.offer(data); // 락 획득 후 데이터 인입
notify(); // 대기 스레드, WAITING -> BLOCKED -> RUNNABLE
}
public synchronized String take() { // 소비자
while (queue.isEmpty()) { // 버퍼 비었음
try {
wait(); // RUNNABLE -> WAITING, 락 반납
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
String data = queue.poll(); // 락 획득 후 데이터 꺼냄
notify(); // 대기 스레드, WAITING -> BLOCKED -> RUNNABLE
return data;
}
}
ReentrantLock + Condition
Condition이란?
- [Lock]에 종속된 조건 대기 큐다.
ReentrantLock 1개
├─ Condition A (대기 큐 1)
├─ Condition B (대기 큐 2)
└─ Condition C (대기 큐 3)
사용하는 이유
1. wait set 분리
synchronized + wait/notify는 단일 wait set만 제공
- 모든 대기 스레드가 동일한 wait set에 들어가므로
- 생산자/소비자, 읽기/쓰기 등 서로 다른 조건을 분리해 제어하기 어려움
ReentrantLock + Condition은 여러 개의 조건 대기 집합 생성 가능
Condition notEmpty = lock.newCondition(); // 소비자 대기
Condition notFull = lock.newCondition(); // 생산자 대기
- 이를 통해 다음 패턴을 명확히 보장
- 생산자 → 소비자
- 소비자 → 생산자
2. FIFO 구조 보장
- ReentrantLock(true) 사용 시 공정 락(Fair Lock) 제공
- 락 대기 큐에서 선입선출(FIFO) 순서로 락 획득
- synchronized는 락 획득 순서를 보장하지 않음
주요 메서드
- await()
- 현재 스레드를 Condition 대기 큐로 이동
- 동작 과정
- 락을 반납
- WAITING 상태 진입
- signal() 수신 시 깨어남
- 다시 락 획득 시도
- 반드시 락을 획득한 상태에서 호출해야 함
- signal()
- Condition 대기 큐에서 하나의 스레드를 깨움
- 깨운 스레드는 즉시 실행되지 않음
- 락 획득 대기 큐로 이동
락 대기 집합 흐름
synchronized
wait set 대기 집합 (WAITING)
↓ signal()
락 대기 큐 (BLOCKED)
↓ lock 획득
실행 상태 (RUNNABLE)
ReentrantLock
Condition 대기 집합 (WAITING)
↓ signal()
락 대기 큐 (WAITING)
↓ lock 획득
실행 상태 (RUNNABLE)
synchronized VS ReentrantLock 대기 구조 비교
synchronized
구조
- 단일 wait set
- wait() 호출 시
- 모니터 락 반납
- WAITING 상태 진입
- notify() / notifyAll()
- 어떤 스레드가 깨어날지 제어 불가
문제점
- 생산자/소비자 혼재
- 불필요한 깨움 발생 (spurious wakeup + notifyAll 남용)
- 락 획득 순서 보장 불가
ReentrantLock + Condition
구조
- 다중 Condition 대기 큐
- 조건별로 대기 집합 분리 가능
장점
- 조건별 명확한 신호 전달
- 불필요한 스레드 깨움 방지
- 공정 락 사용 시 FIFO 락 획득 보장
- lockInterruptibly() 등 고급 제어 가능
BlockingQueue
- BlockingQueue는 생산자–소비자 패턴을 안전하게 구현하기 위한 동기화 큐다.
- BlockingQueue에서 “차단”은 락 대기가 아니라 조건 대기(Condition wait)다.
- 큐가 비어 있음 → 소비자는 데이터가 들어올 때까지 대기
- 큐가 가득 참 → 생산자는 공간이 생길 때까지 대기
대표적인 구현체
- ArrayBlockingQueue
- 배열 기반
- 버퍼 크기 고정
- LinkedBlockingQueue
- 링크 기반
- 버퍼 크기 고정 및 가변 가능
메서드 실패 정책
- 큐를 상속 받는다. 큐의 기능들도 사용할 수 있다.
| 설명 |
Throws Exception(예외 기반) |
Special Value(즉시 반환 기반) |
Blocks(무한 대기 기반) |
Times Out(시간 제한 대기) |
| Insert(추가) |
add(e) |
offer(e) |
put(e) |
offer(e, time unit) |
| Remove(제거) |
remove() |
poll() |
take() |
poll(time, unit) |
| Examine(관찰) |
element() |
peek() |
not applicable |
not applicable |