콘텐츠로 이동

생산자 소비자 문제

생산자 소비자 문제

멀티스레드 환경에서 데이터를 만드는 쪽과 데이터를 쓰는 쪽의 속도가 다를 수 있다. 두 스레드가 같은 자원(버퍼)을 공유하면 ==경쟁 상태(Race Condition)==가 발생한다.

문제 탄생 배경

  • 멀티스레드 환경에서 데이터를 만드는 쪽과 데이터를 쓰는 쪽의 속도가 다를 수 있다.
  • 두 스레드가 같은 자원(버퍼) 을 공유하면 경쟁 상태가 발생한다.
  • 목표
  • 데이터 유실 없음
  • CPU 낭비 없음
  • 동기화 보장

기본 개념

명칭 역할 설명
생산자(Producer) 데이터 생성 버퍼에 데이터를 저장한다.
소비자(Consumer) 데이터 소비 버퍼에 데이터를 꺼낸다.
버퍼(Buffer) 공유 자원 버퍼는 한정된 크기를 가지며, 생산자와 소비자가 이 버퍼를 통해 데이터를 주고받는다.

문제 사항

문제 상황 문제 해결 방안
생산자가 너무 빠를 때 버퍼가 가득 차서 더 이상 데이터를 넣을 수 없을 때까지 생산자가 데이터를 생성한다. 버퍼에 데이터가 가득 찼을 때 넣으면 오버플로우 / 데이터 유실이 발생할 수 있다. 버퍼가 가득 찬 경우 생산자는 버퍼에 빈 공간이 생길 때까지 기다려야 한다.
소비자가 너무 빠를 때 버퍼가 비어서 더 이상 소비할 데이터가 없을 때까지 소비자가 데이터를 처리한다. 버퍼가 비어 있을 때 소비자가 꺼내려고 시도하면 CPU를 낭비하며 의미 없는 경쟁을 발생시키므로, 조건이 만족될 때까지 대기해야 한다. 버퍼가 비어있을 때 소비자는 버퍼에 새로운 데이터가 들어올 때까지 기다려야 한다.

Object - wait, notify

메서드 설명
Object.wait() 현재 스레드가 가진 락을 반납하고 대기하고,
해당 스레드는 wait set으로 들어간다.
Object.notify() wait set에 대기 중인 스레드 중 하나를 깨운다.
Object.notifyAll() wait set에 대기 중인 모든 스레드를 깨운다.

한계

  • 비효율
  • 깨워야 할 스레드를 구분해서 깨울 수 없음
  • 잘못 깨우는 경우 발생
    • 소비자 → 소비자
    • 생산자 → 생산자
  • 스레드 기아
  • 어떤 스레드가 깨어날지 보장되지 않음
  • 특정 스레드가 계속 밀릴 수 있음
  • 임시 해결
  • notifyAll() 문제는 줄어들지만, 컨텍스트 스위칭 비용 증가

스레드 대기 집합(wait set)

정의

조건이 만족되지 않아 락을 반납하고 대기 중인 스레드들이
객체 상태 변화(notify)를 기다리는 공간이다

왜 필요한가?

“락을 반납한 채로, 객체의 상태(WAITING)가 바뀔 때까지 CPU를 쓰지 않고 기다리게 하기 위해” 필요하다.

예시 코드

public class Main {

    private final Queue<String> queue = new ArrayDeque<>(); // 버퍼
    private final int MAX; // 버퍼 최대 크기

    public Main(int max) {
        this.MAX = max;
    }

    public synchronized void put(String data) { // 생산자
        while (queue.size() == MAX) { // 버퍼 가득참
            try {
                wait(); // RUNNABLE -> WAITING, 락 반납
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
        queue.offer(data); // 락 획득 후 데이터 인입
        notify(); // 대기 스레드, WAITING -> BLOCKED -> RUNNABLE
    }

    public synchronized String take() { // 소비자
        while (queue.isEmpty()) { // 버퍼 비었음
            try {
                wait(); // RUNNABLE -> WAITING, 락 반납
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
        String data =  queue.poll(); // 락 획득 후 데이터 꺼냄
        notify(); // 대기 스레드, WAITING -> BLOCKED -> RUNNABLE
        return data;
    }

}

ReentrantLock + Condition

Condition이란?

  • [Lock]에 종속된 조건 대기 큐다.
    ReentrantLock 1개
     ├─ Condition A (대기 큐 1)
     ├─ Condition B (대기 큐 2)
     └─ Condition C (대기 큐 3)
    

사용하는 이유

1. wait set 분리

  • synchronized + wait/notify단일 wait set만 제공
  • 모든 대기 스레드가 동일한 wait set에 들어가므로
  • 생산자/소비자, 읽기/쓰기 등 서로 다른 조건을 분리해 제어하기 어려움
  • ReentrantLock + Condition여러 개의 조건 대기 집합 생성 가능

Condition notEmpty = lock.newCondition(); // 소비자 대기
Condition notFull  = lock.newCondition(); // 생산자 대기
- 이를 통해 다음 패턴을 명확히 보장 - 생산자 → 소비자 - 소비자 → 생산자

2. FIFO 구조 보장

  • ReentrantLock(true) 사용 시 공정 락(Fair Lock) 제공
  • 락 대기 큐에서 선입선출(FIFO) 순서로 락 획득
  • synchronized는 락 획득 순서를 보장하지 않음

주요 메서드

  • await()
  • 현재 스레드를 Condition 대기 큐로 이동
  • 동작 과정
    • 락을 반납
    • WAITING 상태 진입
    • signal() 수신 시 깨어남
    • 다시 락 획득 시도
    • 반드시 락을 획득한 상태에서 호출해야 함
  • signal()
  • Condition 대기 큐에서 하나의 스레드를 깨움
  • 깨운 스레드는 즉시 실행되지 않음
  • 락 획득 대기 큐로 이동

락 대기 집합 흐름

synchronized

wait set 대기 집합 (WAITING)
        ↓ signal()
락 대기 큐 (BLOCKED)
        ↓ lock 획득
실행 상태 (RUNNABLE)

ReentrantLock

Condition 대기 집합 (WAITING)
        ↓ signal()
락 대기 큐 (WAITING)
        ↓ lock 획득
실행 상태 (RUNNABLE)

synchronized VS ReentrantLock 대기 구조 비교

synchronized

구조

  • 단일 wait set
  • wait() 호출 시
  • 모니터 락 반납
  • WAITING 상태 진입
  • notify() / notifyAll()
  • 어떤 스레드가 깨어날지 제어 불가

문제점

  • 생산자/소비자 혼재
  • 불필요한 깨움 발생 (spurious wakeup + notifyAll 남용)
  • 락 획득 순서 보장 불가

ReentrantLock + Condition

구조

  • 다중 Condition 대기 큐
  • 조건별로 대기 집합 분리 가능

장점

  • 조건별 명확한 신호 전달
  • 불필요한 스레드 깨움 방지
  • 공정 락 사용 시 FIFO 락 획득 보장
  • lockInterruptibly() 등 고급 제어 가능

BlockingQueue

  • BlockingQueue는 생산자–소비자 패턴을 안전하게 구현하기 위한 동기화 큐다.
  • BlockingQueue에서 “차단”은 락 대기가 아니라 조건 대기(Condition wait)다.
  • 큐가 비어 있음 → 소비자는 데이터가 들어올 때까지 대기
  • 큐가 가득 참 → 생산자는 공간이 생길 때까지 대기

대표적인 구현체

  • ArrayBlockingQueue
  • 배열 기반
  • 버퍼 크기 고정
  • LinkedBlockingQueue
  • 링크 기반
  • 버퍼 크기 고정 및 가변 가능

메서드 실패 정책

  • 큐를 상속 받는다. 큐의 기능들도 사용할 수 있다.
설명 Throws Exception(예외 기반) Special Value(즉시 반환 기반) Blocks(무한 대기 기반) Times Out(시간 제한 대기)
Insert(추가) add(e) offer(e) put(e) offer(e, time unit)
Remove(제거) remove() poll() take() poll(time, unit)
Examine(관찰) element() peek() not applicable not applicable