파이썬 스레드 동기화 방법

By | 9월 12, 2025

파이썬에서는 여러 스레드가 동시에 작업을 처리할 때 공유 자원에 접근하며 발생하는 문제를 해결하기 위해 동기화 기법을 사용합니다. 동기화는 데이터의 일관성과 무결성을 보장하고, 경쟁 조건(Race Condition)과 같은 복잡한 문제를 예방하는 데 중요한 역할을 합니다. 특히, 여러 스레드가 하나의 자원을 두고 동시에 접근하려 할 때 데이터가 꼬이는 현상을 막아줍니다.


파이썬 스레드 동기화 기술

파이썬의 threading 모듈은 스레드 기반 병렬 처리를 지원하며, 동기화를 위한 다양한 도구를 제공합니다.

1. Lock (뮤텍스 – 상호 배제 락)

Lock은 가장 기본적이고 널리 쓰이는 동기화 메커니즘으로, 한 번에 하나의 스레드만 특정 코드 블록(임계 구역, Critical Section)에 접근하도록 허용합니다.

  • acquire() 메서드로 락을 획득하고, release() 메서드로 락을 해제합니다.
  • 이미 다른 스레드가 락을 획득했다면, 락을 기다리는 스레드는 락이 해제될 때까지 블록 상태로 대기합니다.
  • with을 활용하면 acquire()release()를 자동으로 처리해 주기 때문에 코드를 더 간결하고 안전하게 작성할 수 있습니다.

예시:

import threading

# 공유 자원
counter = 0
# 락 객체 생성
lock = threading.Lock()

def increase_counter():
    global counter
    for _ in range(100000):
        # with 문을 사용하여 락 획득 및 해제 자동화
        with lock:
            counter += 1
    print(f"스레드 {threading.current_thread().name} 작업 완료. 현재 카운터: {counter}")

threads = []
# 두 개의 스레드 생성
for i in range(2):
    thread = threading.Thread(target=increase_counter, name=f"Thread-{i}")
    threads.append(thread)
    thread.start()

# 모든 스레드가 종료될 때까지 대기
for thread in threads:
    thread.join()

print(f"최종 카운터 값: {counter}")

2. RLock (재진입 락)

RLockLock과 비슷하지만, 동일한 스레드가 이미 획득한 락을 다시 획득하는 것을 허용합니다. RLock은 락을 획득한 횟수를 내부적으로 추적하여, 획득한 횟수만큼 release()가 호출되어야 락이 완전히 해제됩니다.

Lock과 RLock의 차이점 및 RLock의 필요성:

Lock은 재진입이 불가능하여 한 스레드가 이미 획득한 락을 다시 획득하려고 시도하면 그 스레드 자신에게 블록되어 교착 상태(Deadlock)에 빠질 수 있습니다.

Lock을 사용할 때 발생하는 교착 상태 예시:

import threading

my_lock = threading.Lock()

def func_a():
    print(f"[{threading.current_thread().name}] 함수 A: 락 획득 시도 중...")
    with my_lock: # 첫 번째 락 획득
        print(f"[{threading.current_thread().name}] 함수 A: 락 획득.")
        # 이 시점에서 락이 잠겨 있음
        func_b() # <- 이 부분에서 문제 발생
        print(f"[{threading.current_thread().name}] 함수 A: 락 해제.")

def func_b():
    print(f"[{threading.current_thread().name}] 함수 B: 다시 락 획득 시도 중...")
    with my_lock: # 동일 스레드가 이미 락을 가지고 있음에도 다시 획득 시도
        print(f"[{threading.current_thread().name}] 함수 B: 락 재획득.")
        
print("--- Lock (교착 상태) 예시 ---")
t = threading.Thread(target=func_a, name="ProblemThread")
t.start()
t.join(timeout=2) # 2초 대기

if t.is_alive():
    print(f"\n[{threading.current_thread().name}] 오류: 스레드가 종료되지 않았습니다. Lock으로 인한 교착 상태 발생!")
print("--- Lock 예시 종료 ---\n")

위 예시에서 ProblemThreadfunc_a에서 my_lock을 획득한 후 func_b를 호출합니다. func_b는 다시 my_lock을 획득하려 하지만, 이미 락이 잠겨 있어 ProblemThread는 무한 대기 상태에 빠집니다.

반면, RLock은 이러한 상황을 방지할 수 있습니다. 한 스레드가 RLock을 획득할 때마다 내부 카운트가 증가하고, release()를 호출할 때마다 카운트가 감소합니다. 카운트가 0이 되어야 락이 완전히 해제되므로, 동일 스레드 내에서는 중첩된 락 획득이 가능합니다.

RLock을 사용하여 문제 해결하는 예시:

import threading
import time

my_rlock = threading.RLock()

def func_a_safe():
    print(f"[{threading.current_thread().name}] 함수 A: RLock 획득 시도...")
    with my_rlock: # 첫 번째 RLock 획득 (카운트 1)
        print(f"[{threading.current_thread().name}] 함수 A: RLock 획득.")
        func_b_safe()
        print(f"[{threading.current_thread().name}] 함수 A: RLock 해제.") # 카운트 1 감소

def func_b_safe():
    print(f"[{threading.current_thread().name}] 함수 B: 다시 RLock 획득 시도...")
    with my_rlock: # 동일 스레드 내 재진입 (카운트 2)
        print(f"[{threading.current_thread().name}] 함수 B: RLock 재획득.")
        time.sleep(0.1)
        print(f"[{threading.current_thread().name}] 함수 B: RLock 해제.") # 카운트 1 감소

print("--- RLock (문제 해결) 예시 ---")
t_rl = threading.Thread(target=func_a_safe, name="SafeThread")
t_rl.start()
t_rl.join() 

print(f"\n[{threading.current_thread().name}] SafeThread가 RLock 덕분에 정상적으로 종료되었습니다.")
print("--- RLock 예시 종료 ---")

따라서, 동일한 스레드 내에서 여러 함수나 메서드가 중첩적으로 동일한 락을 사용해야 할 가능성이 있다면, Lock 대신 RLock을 사용하는 것이 안전하고 바람직합니다.


3. Semaphore (세마포어)

Semaphore동시에 자원에 접근할 수 있는 스레드의 수를 제한할 때 사용됩니다. 내부적으로 카운터를 가지고 있어, acquire() 호출 시 카운터가 감소하고, release() 호출 시 카운터가 증가합니다. 카운터가 0이 되면 acquire() 호출은 블록됩니다.

예시:

import threading
import time

# 최대 3개의 스레드만 동시에 접근 허용
semaphore = threading.Semaphore(3)

def worker(name):
    print(f"{name}: 세마포어 획득 시도 중...")
    with semaphore:
        print(f"{name}: 세마포어 획득. 작업 시작.")
        time.sleep(2) 
        print(f"{name}: 세마포어 해제.")

threads = []
# 5개의 스레드 생성
for i in range(5):
    thread = threading.Thread(target=worker, args=(f"Worker-{i}",))
    threads.append(thread)
    thread.start()

for thread in threads:
    thread.join()

4. Event (이벤트)

Event는 하나의 스레드가 특정 이벤트의 발생을 알리면, 다른 스레드들이 그 이벤트가 발생할 때까지 기다리도록 할 때 사용됩니다.

  • set(): 내부 플래그를 True로 설정하여 대기 중인 모든 스레드를 깨웁니다.
  • clear(): 내부 플래그를 False로 설정합니다.
  • wait(): 플래그가 True가 될 때까지 블록 상태로 대기합니다.

예시:

import threading
import time

event = threading.Event()

def worker():
    print("작업자: 이벤트가 설정되기를 기다리는 중...")
    event.wait() # 이벤트가 설정될 때까지 대기
    print("작업자: 이벤트 발생! 작업을 진행합니다.")

def signaler():
    time.sleep(3) # 3초 후 이벤트 설정
    print("신호자: 이벤트를 설정합니다.")
    event.set()

worker_thread = threading.Thread(target=worker)
signaler_thread = threading.Thread(target=signaler)

worker_thread.start()
signaler_thread.start()

worker_thread.join()
signaler_thread.join()

5. Condition (조건 변수)

Condition은 복잡한 동기화 시나리오에서 스레드들이 특정 조건이 충족될 때까지 기다리거나, 조건이 충족되었을 때 다른 스레드에게 알리는 데 사용됩니다. Lock과 함께 사용되며, wait(), notify(), notify_all() 메서드를 제공합니다. 생산자-소비자 모델과 같은 상황에서 유용합니다.

예시 (생산자-소비자 모델):

import threading
import time
import collections

# 공유 버퍼
buffer = collections.deque(maxlen=10)
# 조건 변수 객체
condition = threading.Condition()

def producer():
    for i in range(15):
        with condition:
            while len(buffer) == buffer.maxlen:
                print("생산자: 버퍼가 가득 찼습니다. 소비자 대기 중...")
                condition.wait() # 버퍼가 가득 차면 대기
            item = f"item_{i}"
            buffer.append(item)
            print(f"생산자: {item} 생산. 현재 버퍼 크기: {len(buffer)}")
            condition.notify() # 소비자에게 알림
        time.sleep(0.1)

def consumer():
    for i in range(15):
        with condition:
            while not buffer:
                print("소비자: 버퍼가 비었습니다. 생산자 대기 중...")
                condition.wait() # 버퍼가 비어있으면 대기
            item = buffer.popleft()
            print(f"소비자: {item} 소비. 현재 버퍼 크기: {len(buffer)}")
            condition.notify() # 생산자에게 알림
        time.sleep(0.2)

producer_thread = threading.Thread(target=producer)
consumer_thread = threading.Thread(target=consumer)

producer_thread.start()
consumer_thread.start()

producer_thread.join()
consumer_thread.join()

동기화 사용 시 유의사항

  • GIL (Global Interpreter Lock)의 이해: 파이썬의 GIL은 한 번에 하나의 스레드만 파이썬 바이트코드를 실행하도록 합니다. 따라서 CPU 집중적인 작업에서는 스레딩이 진정한 병렬성을 제공하지 못하고 오버헤드만 증가시킬 수 있습니다. 반면, I/O 집중적인 작업 (예: 네트워크 통신, 파일 입출력)은 GIL의 영향을 덜 받으므로 스레딩이 매우 유용합니다.
  • 락 사용 최소화: 락은 성능 저하를 유발하고 교착 상태의 위험을 높이므로, 락으로 보호되는 임계 구역의 코드를 최대한 짧게 유지하는 것이 좋습니다.
  • 적합한 도구 선택: 해결하려는 문제에 가장 적합한 동기화 도구를 선택해야 합니다. 단순한 상호 배제에는 Lock, 재진입이 필요하면 RLock, 자원 접근 수 제한에는 Semaphore가 적합합니다.
  • with 문 활용: Lock, RLock, Semaphore 등의 동기화 객체는 컨텍스트 관리자를 지원하므로 with 문을 사용하면 락 획득과 해제를 자동으로 처리하여 안전하게 코딩할 수 있습니다.
  • 교착 상태 방지: 여러 락을 사용할 때는 락 획득 순서를 일관되게 유지하여 교착 상태를 예방해야 합니다.

답글 남기기

이메일 주소는 공개되지 않습니다. 필수 필드는 *로 표시됩니다