concurrency
pattern
- Future: channel buffer 1, 单一结果,这样结果可以不阻塞存到channel
- Queue: channel unbuffer,多个消息
- 给用户展现同步接口,内部通过channel和goroutine实现并发
- 如果写入侧不检查,那么接收侧哪怕知道结果没用也需要接收从而不block写入侧
- 可以通过一个单一元素的channel来实现串行运行
- a worker pool can bound the peak resource usage of the program
- 通过channel buffer设为limit来限制并发: Recall that we acquire this semaphore by sending a token, and we release it by discarding a token.
Refs
https://go.dev/blog/pipelines Google I/O 2012 - Go Concurrency Patterns Google I/O 2013 - Advanced Go Concurrency Patterns Concurrency is not Parallelism by Rob Pike GopherCon 2017: Kavya Joshi - Understanding Channels Rethinking Classical Concurrency Patterns GopherCon 2018: Bryan C. Mills - Rethinking Classical Concurrency Patterns GopherCon 2018: Filippo Valsorda- Asynchronous Networking Patterns Concurrency Patterns In Go Concurrency
pipelines
Informally, a pipeline is a series of stages connected by channels, where each stage is a group of goroutines running the same function
Fan-out, fan-in
Multiple functions can read from the same channel until that channel is closed; this is called fan-out
A function can read from multiple inputs and proceed until all are closed by multiplexing the input channels onto a single channel that’s closed when all the inputs are closed. This is called fan-in
merge
func merge(cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
// Start an output goroutine for each input channel in cs. output
// copies values from c to out until c is closed, then calls wg.Done.
output := func(c <-chan int) {
for n := range c {
out <- n
}
wg.Done()
}
wg.Add(len(cs))
for _, c := range cs {
go output(c)
}
// Start a goroutine to close out once all the output goroutines are
// done. This must start after the wg.Add call.
go func() {
wg.Wait()
close(out)
}()
return out
}
Explicit cancellation
func main() {
// Set up a done channel that's shared by the whole pipeline,
// and close that channel when this pipeline exits, as a signal
// for all the goroutines we started to exit.
done := make(chan struct{})
defer close(done)
in := gen(done, 2, 3)
// Distribute the sq work across two goroutines that both read from in.
c1 := sq(done, in)
c2 := sq(done, in)
// Consume the first value from output.
out := merge(done, c1, c2)
fmt.Println(<-out) // 4 or 9
// done will be closed by the deferred call.
}
func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
// Start an output goroutine for each input channel in cs. output
// copies values from c to out until c or done is closed, then calls
// wg.Done.
output := func(c <-chan int) {
defer wg.Done()
for n := range c {
select {
case out <- n:
case <-done:
return
}
}
}
// ... the rest is unchanged ...
Bryan Mills's talk on concurrency patterns
Bryan Mills's talk on concurrency patterns
asynchronous api
An asynchronous API returns to the caller before its result is ready.
通过callback获取结果
future api
通过指定接口获取结果
golang里可以通过返回buffer为1的channel,然后从channel获取
c := make(chan Item, 1)
PRODUCER–CONSUMER QUEUE: API
producer和consumer之间通过channel通信
CALLER-SIDE CONCURRENCY: SYNCHRONOUS API
VARIABLES
通过errgroup使用goroutine来并发,然后在里面通过closure把结果设置到variable上
var a, b Item
g, ctx := errgroup.WithContext(ctx)
g.Go(func() (err error) {
a, err = Fetch(ctx, "a")
return err
})
g.Go(func() (err error) {
b, err = Fetch(ctx, "b")
return err
})
err := g.Wait()
[...]
consume(a, b)
condition
condition的问题
- 没法cancel
- wakeup不可控
COMMUNICATION: RESOURCE POOL
func (p *Pool) Acquire(ctx context.Context) (net.Conn, error) {
select {
case conn := <-p.idle:
return conn, nil
case p.sem <- token{}:
conn, err := dial()
if err != nil {
<-p.sem
}
return conn, err
case <-ctx.Done():
return nil, ctx.Err()
}
}
COMMUNICATION: QUEUE CANCELLATION
func (q *Queue) Get(ctx context.Context) (Item, error) {
var items []Item
select {
case <-ctx.Done():
return 0, ctx.Err()
case items = <-q.items:
}
item := items[0]
if len(items) == 1 {
q.empty <- true
} else {
q.items <- items[1:]
}
return item, nil
}
SEMAPHORE CHANNEL: LIMITING CONCURRENCY
sem := make(chan token, limit)
for _, task := range hugeSlice {
sem <- token{}
go func(task Task) {
perform(task)
<-sem
}(task)
}