파이썬에서는 여러 스레드가 동시에 작업을 처리할 때 공유 자원에 접근하며 발생하는 문제를 해결하기 위해 동기화 기법을 사용합니다. 동기화는 데이터의 일관성과 무결성을 보장하고, 경쟁 조건(Race Condition)과 같은 복잡한 문제를 예방하는 데 중요한 역할을 합니다. 특히, 여러 스레드가 하나의 자원을 두고 동시에 접근하려 할 때 데이터가 꼬이는 현상을 막아줍니다.
파이썬 스레드 동기화 기술
파이썬의 threading
모듈은 스레드 기반 병렬 처리를 지원하며, 동기화를 위한 다양한 도구를 제공합니다.
1. Lock (뮤텍스 – 상호 배제 락)
Lock
은 가장 기본적이고 널리 쓰이는 동기화 메커니즘으로, 한 번에 하나의 스레드만 특정 코드 블록(임계 구역, Critical Section)에 접근하도록 허용합니다.
acquire()
메서드로 락을 획득하고,release()
메서드로 락을 해제합니다.- 이미 다른 스레드가 락을 획득했다면, 락을 기다리는 스레드는 락이 해제될 때까지 블록 상태로 대기합니다.
with
문을 활용하면acquire()
와release()
를 자동으로 처리해 주기 때문에 코드를 더 간결하고 안전하게 작성할 수 있습니다.
예시:
import threading
# 공유 자원
counter = 0
# 락 객체 생성
lock = threading.Lock()
def increase_counter():
global counter
for _ in range(100000):
# with 문을 사용하여 락 획득 및 해제 자동화
with lock:
counter += 1
print(f"스레드 {threading.current_thread().name} 작업 완료. 현재 카운터: {counter}")
threads = []
# 두 개의 스레드 생성
for i in range(2):
thread = threading.Thread(target=increase_counter, name=f"Thread-{i}")
threads.append(thread)
thread.start()
# 모든 스레드가 종료될 때까지 대기
for thread in threads:
thread.join()
print(f"최종 카운터 값: {counter}")
2. RLock (재진입 락)
RLock
은 Lock
과 비슷하지만, 동일한 스레드가 이미 획득한 락을 다시 획득하는 것을 허용합니다. RLock
은 락을 획득한 횟수를 내부적으로 추적하여, 획득한 횟수만큼 release()
가 호출되어야 락이 완전히 해제됩니다.
Lock과 RLock의 차이점 및 RLock의 필요성:
Lock은 재진입이 불가능하여 한 스레드가 이미 획득한 락을 다시 획득하려고 시도하면 그 스레드 자신에게 블록되어 교착 상태(Deadlock)에 빠질 수 있습니다.
Lock
을 사용할 때 발생하는 교착 상태 예시:
import threading
my_lock = threading.Lock()
def func_a():
print(f"[{threading.current_thread().name}] 함수 A: 락 획득 시도 중...")
with my_lock: # 첫 번째 락 획득
print(f"[{threading.current_thread().name}] 함수 A: 락 획득.")
# 이 시점에서 락이 잠겨 있음
func_b() # <- 이 부분에서 문제 발생
print(f"[{threading.current_thread().name}] 함수 A: 락 해제.")
def func_b():
print(f"[{threading.current_thread().name}] 함수 B: 다시 락 획득 시도 중...")
with my_lock: # 동일 스레드가 이미 락을 가지고 있음에도 다시 획득 시도
print(f"[{threading.current_thread().name}] 함수 B: 락 재획득.")
print("--- Lock (교착 상태) 예시 ---")
t = threading.Thread(target=func_a, name="ProblemThread")
t.start()
t.join(timeout=2) # 2초 대기
if t.is_alive():
print(f"\n[{threading.current_thread().name}] 오류: 스레드가 종료되지 않았습니다. Lock으로 인한 교착 상태 발생!")
print("--- Lock 예시 종료 ---\n")
위 예시에서 ProblemThread
는 func_a
에서 my_lock
을 획득한 후 func_b
를 호출합니다. func_b
는 다시 my_lock
을 획득하려 하지만, 이미 락이 잠겨 있어 ProblemThread
는 무한 대기 상태에 빠집니다.
반면, RLock
은 이러한 상황을 방지할 수 있습니다. 한 스레드가 RLock
을 획득할 때마다 내부 카운트가 증가하고, release()
를 호출할 때마다 카운트가 감소합니다. 카운트가 0이 되어야 락이 완전히 해제되므로, 동일 스레드 내에서는 중첩된 락 획득이 가능합니다.
RLock
을 사용하여 문제 해결하는 예시:
import threading
import time
my_rlock = threading.RLock()
def func_a_safe():
print(f"[{threading.current_thread().name}] 함수 A: RLock 획득 시도...")
with my_rlock: # 첫 번째 RLock 획득 (카운트 1)
print(f"[{threading.current_thread().name}] 함수 A: RLock 획득.")
func_b_safe()
print(f"[{threading.current_thread().name}] 함수 A: RLock 해제.") # 카운트 1 감소
def func_b_safe():
print(f"[{threading.current_thread().name}] 함수 B: 다시 RLock 획득 시도...")
with my_rlock: # 동일 스레드 내 재진입 (카운트 2)
print(f"[{threading.current_thread().name}] 함수 B: RLock 재획득.")
time.sleep(0.1)
print(f"[{threading.current_thread().name}] 함수 B: RLock 해제.") # 카운트 1 감소
print("--- RLock (문제 해결) 예시 ---")
t_rl = threading.Thread(target=func_a_safe, name="SafeThread")
t_rl.start()
t_rl.join()
print(f"\n[{threading.current_thread().name}] SafeThread가 RLock 덕분에 정상적으로 종료되었습니다.")
print("--- RLock 예시 종료 ---")
따라서, 동일한 스레드 내에서 여러 함수나 메서드가 중첩적으로 동일한 락을 사용해야 할 가능성이 있다면, Lock 대신 RLock을 사용하는 것이 안전하고 바람직합니다.
3. Semaphore (세마포어)
Semaphore
는 동시에 자원에 접근할 수 있는 스레드의 수를 제한할 때 사용됩니다. 내부적으로 카운터를 가지고 있어, acquire()
호출 시 카운터가 감소하고, release()
호출 시 카운터가 증가합니다. 카운터가 0이 되면 acquire()
호출은 블록됩니다.
예시:
import threading
import time
# 최대 3개의 스레드만 동시에 접근 허용
semaphore = threading.Semaphore(3)
def worker(name):
print(f"{name}: 세마포어 획득 시도 중...")
with semaphore:
print(f"{name}: 세마포어 획득. 작업 시작.")
time.sleep(2)
print(f"{name}: 세마포어 해제.")
threads = []
# 5개의 스레드 생성
for i in range(5):
thread = threading.Thread(target=worker, args=(f"Worker-{i}",))
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
4. Event (이벤트)
Event
는 하나의 스레드가 특정 이벤트의 발생을 알리면, 다른 스레드들이 그 이벤트가 발생할 때까지 기다리도록 할 때 사용됩니다.
set()
: 내부 플래그를True
로 설정하여 대기 중인 모든 스레드를 깨웁니다.clear()
: 내부 플래그를False
로 설정합니다.wait()
: 플래그가True
가 될 때까지 블록 상태로 대기합니다.
예시:
import threading
import time
event = threading.Event()
def worker():
print("작업자: 이벤트가 설정되기를 기다리는 중...")
event.wait() # 이벤트가 설정될 때까지 대기
print("작업자: 이벤트 발생! 작업을 진행합니다.")
def signaler():
time.sleep(3) # 3초 후 이벤트 설정
print("신호자: 이벤트를 설정합니다.")
event.set()
worker_thread = threading.Thread(target=worker)
signaler_thread = threading.Thread(target=signaler)
worker_thread.start()
signaler_thread.start()
worker_thread.join()
signaler_thread.join()
5. Condition (조건 변수)
Condition
은 복잡한 동기화 시나리오에서 스레드들이 특정 조건이 충족될 때까지 기다리거나, 조건이 충족되었을 때 다른 스레드에게 알리는 데 사용됩니다. Lock
과 함께 사용되며, wait()
, notify()
, notify_all()
메서드를 제공합니다. 생산자-소비자 모델
과 같은 상황에서 유용합니다.
예시 (생산자-소비자 모델):
import threading
import time
import collections
# 공유 버퍼
buffer = collections.deque(maxlen=10)
# 조건 변수 객체
condition = threading.Condition()
def producer():
for i in range(15):
with condition:
while len(buffer) == buffer.maxlen:
print("생산자: 버퍼가 가득 찼습니다. 소비자 대기 중...")
condition.wait() # 버퍼가 가득 차면 대기
item = f"item_{i}"
buffer.append(item)
print(f"생산자: {item} 생산. 현재 버퍼 크기: {len(buffer)}")
condition.notify() # 소비자에게 알림
time.sleep(0.1)
def consumer():
for i in range(15):
with condition:
while not buffer:
print("소비자: 버퍼가 비었습니다. 생산자 대기 중...")
condition.wait() # 버퍼가 비어있으면 대기
item = buffer.popleft()
print(f"소비자: {item} 소비. 현재 버퍼 크기: {len(buffer)}")
condition.notify() # 생산자에게 알림
time.sleep(0.2)
producer_thread = threading.Thread(target=producer)
consumer_thread = threading.Thread(target=consumer)
producer_thread.start()
consumer_thread.start()
producer_thread.join()
consumer_thread.join()
동기화 사용 시 유의사항
- GIL (Global Interpreter Lock)의 이해: 파이썬의 GIL은 한 번에 하나의 스레드만 파이썬 바이트코드를 실행하도록 합니다. 따라서 CPU 집중적인 작업에서는 스레딩이 진정한 병렬성을 제공하지 못하고 오버헤드만 증가시킬 수 있습니다. 반면, I/O 집중적인 작업 (예: 네트워크 통신, 파일 입출력)은 GIL의 영향을 덜 받으므로 스레딩이 매우 유용합니다.
- 락 사용 최소화: 락은 성능 저하를 유발하고 교착 상태의 위험을 높이므로, 락으로 보호되는 임계 구역의 코드를 최대한 짧게 유지하는 것이 좋습니다.
- 적합한 도구 선택: 해결하려는 문제에 가장 적합한 동기화 도구를 선택해야 합니다. 단순한 상호 배제에는
Lock
, 재진입이 필요하면RLock
, 자원 접근 수 제한에는Semaphore
가 적합합니다. with
문 활용:Lock
,RLock
,Semaphore
등의 동기화 객체는 컨텍스트 관리자를 지원하므로with
문을 사용하면 락 획득과 해제를 자동으로 처리하여 안전하게 코딩할 수 있습니다.- 교착 상태 방지: 여러 락을 사용할 때는 락 획득 순서를 일관되게 유지하여 교착 상태를 예방해야 합니다.