빠른 흐름
jobs := make(chan Job)
results := make(chan Result)
for i := 0; i < workers; i++ {
go worker(ctx, jobs, results)
}worker pool과 pipeline은 "goroutine을 많이 만든다"보다 작업량 제한, 단계 분리, 종료 신호를 같이 설계하는 패턴입니다.
갈리는 기준
worker pool은 동시 실행 수를 제한한다
외부 API 호출, 이미지 처리, 파일 변환처럼 작업은 많지만 동시에 너무 많이 돌리면 위험한 경우 worker pool이 맞습니다.
func worker(ctx context.Context, jobs <-chan Job, results chan<- Result) {
for {
select {
case <-ctx.Done():
return
case job, ok := <-jobs:
if !ok {
return
}
results <- process(job)
}
}
}핵심은 worker 수를 정해 두고, jobs channel로 작업을 흘려보내는 것입니다.
pipeline은 처리 단계를 분리한다
pipeline은 입력을 여러 단계로 변환합니다.
read files -> parse -> validate -> store각 단계는 channel을 통해 값을 넘기고, 다음 단계는 이전 단계 결과를 받아 처리합니다.
func parse(in <-chan []byte) <-chan Item {
out := make(chan Item)
go func() {
defer close(out)
for raw := range in {
out <- parseItem(raw)
}
}()
return out
}단계별 책임이 분명하면 테스트와 병목 확인이 쉬워집니다.
종료 설계
channel close는 보내는 쪽 책임이다
일반적으로 channel을 닫는 쪽은 값을 보내는 쪽입니다. 받는 쪽이 닫으면 아직 보내려는 goroutine에서 panic이 날 수 있습니다.
close(jobs)작업 투입이 끝났다는 신호로 jobs를 닫고, worker는 range jobs나 ok 값을 보고 종료합니다.
WaitGroup은 goroutine 종료 대기용이다
var wg sync.WaitGroup
wg.Add(workers)
for i := 0; i < workers; i++ {
go func() {
defer wg.Done()
worker(ctx, jobs, results)
}()
}
go func() {
wg.Wait()
close(results)
}()WaitGroup은 결과 전달 도구가 아니라, 종료를 기다리는 도구입니다.
context는 중간 실패와 취소 전파에 필요하다
한 단계에서 실패했는데 다른 goroutine이 계속 기다리면 누수가 생깁니다. context를 넘기고, 실패 시 cancel을 호출하면 각 단계가 빠져나올 경로를 가질 수 있습니다.
선택 기준
| 상황 | 먼저 떠올릴 선택 |
|---|---|
| 동시에 처리할 작업 수 제한 | worker pool |
| 여러 처리 단계를 순서대로 연결 | pipeline |
| 작업 입력 종료 알림 | 보내는 쪽에서 channel close |
| goroutine 종료 대기 | sync.WaitGroup |
| 중간 실패/취소 전파 | context |
주의할 점
worker pool과 pipeline에서 가장 위험한 부분은 처리 로직보다 종료 경계입니다. 누가 channel을 닫는지, 누가 WaitGroup을 기다리는지, 실패 시 누가 cancel()을 호출하는지가 흐리면 goroutine이 조용히 남습니다. 동시성 패턴은 코드 모양보다 종료 계약이 먼저입니다.
참고 링크
3 sources