파이썬에서는 여러 스레드가 동시에 작업을 처리할 때 공유 자원에 접근하며 발생하는 문제를 해결하기 위해 동기화 기법을 사용합니다. 동기화는 데이터의 일관성과 무결성을 보장하고, 경쟁 조건(Race Condition)과 같은 복잡한 문제를 예방하는 데 중요한 역할을 합니다. 특히, 여러 스레드가 하나의 자원을 두고 동시에 접근하려 할 때 데이터가 꼬이는 현상을 막아줍니다.
파이썬 스레드 동기화 기술
파이썬의 threading 모듈은 스레드 기반 병렬 처리를 지원하며, 동기화를 위한 다양한 도구를 제공합니다.
1. Lock (뮤텍스 – 상호 배제 락)
Lock은 가장 기본적이고 널리 쓰이는 동기화 메커니즘으로, 한 번에 하나의 스레드만 특정 코드 블록(임계 구역, Critical Section)에 접근하도록 허용합니다.
acquire()메서드로 락을 획득하고,release()메서드로 락을 해제합니다.- 이미 다른 스레드가 락을 획득했다면, 락을 기다리는 스레드는 락이 해제될 때까지 블록 상태로 대기합니다.
with문을 활용하면acquire()와release()를 자동으로 처리해 주기 때문에 코드를 더 간결하고 안전하게 작성할 수 있습니다.
예시:
import threading
# 공유 자원
counter = 0
# 락 객체 생성
lock = threading.Lock()
def increase_counter():
global counter
for _ in range(100000):
# with 문을 사용하여 락 획득 및 해제 자동화
with lock:
counter += 1
print(f"스레드 {threading.current_thread().name} 작업 완료. 현재 카운터: {counter}")
threads = []
# 두 개의 스레드 생성
for i in range(2):
thread = threading.Thread(target=increase_counter, name=f"Thread-{i}")
threads.append(thread)
thread.start()
# 모든 스레드가 종료될 때까지 대기
for thread in threads:
thread.join()
print(f"최종 카운터 값: {counter}")
2. RLock (재진입 락)
RLock은 Lock과 비슷하지만, 동일한 스레드가 이미 획득한 락을 다시 획득하는 것을 허용합니다. RLock은 락을 획득한 횟수를 내부적으로 추적하여, 획득한 횟수만큼 release()가 호출되어야 락이 완전히 해제됩니다.
Lock과 RLock의 차이점 및 RLock의 필요성:
Lock은 재진입이 불가능하여 한 스레드가 이미 획득한 락을 다시 획득하려고 시도하면 그 스레드 자신에게 블록되어 교착 상태(Deadlock)에 빠질 수 있습니다.
Lock을 사용할 때 발생하는 교착 상태 예시:
import threading
my_lock = threading.Lock()
def func_a():
print(f"[{threading.current_thread().name}] 함수 A: 락 획득 시도 중...")
with my_lock: # 첫 번째 락 획득
print(f"[{threading.current_thread().name}] 함수 A: 락 획득.")
# 이 시점에서 락이 잠겨 있음
func_b() # <- 이 부분에서 문제 발생
print(f"[{threading.current_thread().name}] 함수 A: 락 해제.")
def func_b():
print(f"[{threading.current_thread().name}] 함수 B: 다시 락 획득 시도 중...")
with my_lock: # 동일 스레드가 이미 락을 가지고 있음에도 다시 획득 시도
print(f"[{threading.current_thread().name}] 함수 B: 락 재획득.")
print("--- Lock (교착 상태) 예시 ---")
t = threading.Thread(target=func_a, name="ProblemThread")
t.start()
t.join(timeout=2) # 2초 대기
if t.is_alive():
print(f"\n[{threading.current_thread().name}] 오류: 스레드가 종료되지 않았습니다. Lock으로 인한 교착 상태 발생!")
print("--- Lock 예시 종료 ---\n")
위 예시에서 ProblemThread는 func_a에서 my_lock을 획득한 후 func_b를 호출합니다. func_b는 다시 my_lock을 획득하려 하지만, 이미 락이 잠겨 있어 ProblemThread는 무한 대기 상태에 빠집니다.
반면, RLock은 이러한 상황을 방지할 수 있습니다. 한 스레드가 RLock을 획득할 때마다 내부 카운트가 증가하고, release()를 호출할 때마다 카운트가 감소합니다. 카운트가 0이 되어야 락이 완전히 해제되므로, 동일 스레드 내에서는 중첩된 락 획득이 가능합니다.
RLock을 사용하여 문제 해결하는 예시:
import threading
import time
my_rlock = threading.RLock()
def func_a_safe():
print(f"[{threading.current_thread().name}] 함수 A: RLock 획득 시도...")
with my_rlock: # 첫 번째 RLock 획득 (카운트 1)
print(f"[{threading.current_thread().name}] 함수 A: RLock 획득.")
func_b_safe()
print(f"[{threading.current_thread().name}] 함수 A: RLock 해제.") # 카운트 1 감소
def func_b_safe():
print(f"[{threading.current_thread().name}] 함수 B: 다시 RLock 획득 시도...")
with my_rlock: # 동일 스레드 내 재진입 (카운트 2)
print(f"[{threading.current_thread().name}] 함수 B: RLock 재획득.")
time.sleep(0.1)
print(f"[{threading.current_thread().name}] 함수 B: RLock 해제.") # 카운트 1 감소
print("--- RLock (문제 해결) 예시 ---")
t_rl = threading.Thread(target=func_a_safe, name="SafeThread")
t_rl.start()
t_rl.join()
print(f"\n[{threading.current_thread().name}] SafeThread가 RLock 덕분에 정상적으로 종료되었습니다.")
print("--- RLock 예시 종료 ---")
따라서, 동일한 스레드 내에서 여러 함수나 메서드가 중첩적으로 동일한 락을 사용해야 할 가능성이 있다면, Lock 대신 RLock을 사용하는 것이 안전하고 바람직합니다.
3. Semaphore (세마포어)
Semaphore는 동시에 자원에 접근할 수 있는 스레드의 수를 제한할 때 사용됩니다. 내부적으로 카운터를 가지고 있어, acquire() 호출 시 카운터가 감소하고, release() 호출 시 카운터가 증가합니다. 카운터가 0이 되면 acquire() 호출은 블록됩니다.
예시:
import threading
import time
# 최대 3개의 스레드만 동시에 접근 허용
semaphore = threading.Semaphore(3)
def worker(name):
print(f"{name}: 세마포어 획득 시도 중...")
with semaphore:
print(f"{name}: 세마포어 획득. 작업 시작.")
time.sleep(2)
print(f"{name}: 세마포어 해제.")
threads = []
# 5개의 스레드 생성
for i in range(5):
thread = threading.Thread(target=worker, args=(f"Worker-{i}",))
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
4. Event (이벤트)
Event는 하나의 스레드가 특정 이벤트의 발생을 알리면, 다른 스레드들이 그 이벤트가 발생할 때까지 기다리도록 할 때 사용됩니다.
set(): 내부 플래그를True로 설정하여 대기 중인 모든 스레드를 깨웁니다.clear(): 내부 플래그를False로 설정합니다.wait(): 플래그가True가 될 때까지 블록 상태로 대기합니다.
예시:
import threading
import time
event = threading.Event()
def worker():
print("작업자: 이벤트가 설정되기를 기다리는 중...")
event.wait() # 이벤트가 설정될 때까지 대기
print("작업자: 이벤트 발생! 작업을 진행합니다.")
def signaler():
time.sleep(3) # 3초 후 이벤트 설정
print("신호자: 이벤트를 설정합니다.")
event.set()
worker_thread = threading.Thread(target=worker)
signaler_thread = threading.Thread(target=signaler)
worker_thread.start()
signaler_thread.start()
worker_thread.join()
signaler_thread.join()
5. Condition (조건 변수)
Condition은 복잡한 동기화 시나리오에서 스레드들이 특정 조건이 충족될 때까지 기다리거나, 조건이 충족되었을 때 다른 스레드에게 알리는 데 사용됩니다. Lock과 함께 사용되며, wait(), notify(), notify_all() 메서드를 제공합니다. 생산자-소비자 모델과 같은 상황에서 유용합니다.
예시 (생산자-소비자 모델):
import threading
import time
import collections
# 공유 버퍼
buffer = collections.deque(maxlen=10)
# 조건 변수 객체
condition = threading.Condition()
def producer():
for i in range(15):
with condition:
while len(buffer) == buffer.maxlen:
print("생산자: 버퍼가 가득 찼습니다. 소비자 대기 중...")
condition.wait() # 버퍼가 가득 차면 대기
item = f"item_{i}"
buffer.append(item)
print(f"생산자: {item} 생산. 현재 버퍼 크기: {len(buffer)}")
condition.notify() # 소비자에게 알림
time.sleep(0.1)
def consumer():
for i in range(15):
with condition:
while not buffer:
print("소비자: 버퍼가 비었습니다. 생산자 대기 중...")
condition.wait() # 버퍼가 비어있으면 대기
item = buffer.popleft()
print(f"소비자: {item} 소비. 현재 버퍼 크기: {len(buffer)}")
condition.notify() # 생산자에게 알림
time.sleep(0.2)
producer_thread = threading.Thread(target=producer)
consumer_thread = threading.Thread(target=consumer)
producer_thread.start()
consumer_thread.start()
producer_thread.join()
consumer_thread.join()
동기화 사용 시 유의사항
- GIL (Global Interpreter Lock)의 이해: 파이썬의 GIL은 한 번에 하나의 스레드만 파이썬 바이트코드를 실행하도록 합니다. 따라서 CPU 집중적인 작업에서는 스레딩이 진정한 병렬성을 제공하지 못하고 오버헤드만 증가시킬 수 있습니다. 반면, I/O 집중적인 작업 (예: 네트워크 통신, 파일 입출력)은 GIL의 영향을 덜 받으므로 스레딩이 매우 유용합니다.
- 락 사용 최소화: 락은 성능 저하를 유발하고 교착 상태의 위험을 높이므로, 락으로 보호되는 임계 구역의 코드를 최대한 짧게 유지하는 것이 좋습니다.
- 적합한 도구 선택: 해결하려는 문제에 가장 적합한 동기화 도구를 선택해야 합니다. 단순한 상호 배제에는
Lock, 재진입이 필요하면RLock, 자원 접근 수 제한에는Semaphore가 적합합니다. with문 활용:Lock,RLock,Semaphore등의 동기화 객체는 컨텍스트 관리자를 지원하므로with문을 사용하면 락 획득과 해제를 자동으로 처리하여 안전하게 코딩할 수 있습니다.- 교착 상태 방지: 여러 락을 사용할 때는 락 획득 순서를 일관되게 유지하여 교착 상태를 예방해야 합니다.