context包
控制并发的方式:
WaitGroup
Context
waitGroup
- 并发启动多个子协程,等待所有的 goroutine 完成任务
wg.Wait()
等待所有的子协程完成后再执行后续代码 - 无法主动通知子协程的退出
使用channel
和select
通知 goroutine 的处理
子协程使用 for 循环定时轮询,stop
信道有值(可读)则退出
select {
case <-stop :
...
return
default :
...
}
背景:
网络请求Request 需要开启 goroutine 需要去跟踪 goroutine 的方案 从而进行控制 即 goroutine 的上下文
Context
- 通知子协程退出(正常和超时)
- 传递必要参数
使用原则:
- 以参数形式传递 Context 且位于入口请求和出口请求链路的每个函数的首位
- 函数方法传递 Context 不可传递 nil ——>tarce 追踪时断掉
- Context 线程安全
- Context 可传递给任意个数的 goroutine 子协程受根 context 控制
控制单协程
将 chan stop 转化为 Context 追踪 goroutine
// Background 返回空的 Context 作为整个 context 树的根节点
// WithCanclel(parent) 创建可取消的子 context 作为参数传递给 goroutine 进行跟踪
ctx,cancel:=context.WithCancel(context.Background())
go func(ctx context.Context) {
for {
select {
// Done()判断接收到值则结束
case <-ctx.Done():
fmt.Println("quit ")
return
default:
fmt.Println("goroutine is under control")
time.Sleep(2*time.Second)
}
}
}(ctx)
time.Sleep(10*time.Second)
fmt.Println("it is time to tell ctx stop control")
cancel()
time.Sleep(5*time.Second)
context.Backgroud()
创建根 Context 作为顶层的 Contextcontext.WithCancle(parent)
创建可取消的子 Context 返回函数cancel
- 子协程中使用
select
调用<-ctx.Done()
判断是否需要退出 - 主协程使用
cancle()
函数通知子协程退出
控制多协程
ctx,cancle:=context.WithCancel(context.Background())
go reqTask(ctx,"one")
go reqTask(ctx,"two")
go reqTask(ctx,"three")
time.Sleep(3*time.Second)
cancle()
time.Sleep(3*time.Second)
func reqTask(ctx context.Context,name string) {
for {
select {
case <-ctx.Done():
fmt.Println("stop",name)
return
default:
fmt.Println("send request: ",name)
time.Sleep(time.Second)
}
}
cancle()
终止所有的子协程
context.Context
是一个接口
type Context interface {
Deadline() (deadline time.Time, ok bool)
Done() <-chan struct{}
Err() error
Value(key interface{}) interface{}
}
WithCancel()
返回context.Context
接口
func WithCancel(parent Context) (ctx Context, cancel CancelFunc)
// 返回的是 parent的副本newCancleCtx
func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
if parent == nil {
panic("cannot create context from nil parent")
}
c := newCancelCtx(parent)
// propagateCancel 是结束子context
propagateCancel(parent, &c)
return &c, func() { c.cancel(true, Canceled) }
}
// *ctx 本身是 cancleCtx 故而是地址传递
type cancelCtx struct {
Context
mu sync.Mutex // protects following fields
done chan struct{} // created lazily, closed by first cancel call
children map[canceler]struct{} // set to nil by the first cancel call
err error // set to non-nil by the first cancel call
}
Context.WithValue
往子协程中传递参数
其中key应该自定义 避免内置数据类型
func WithValue(parent Context, key, val interface{}) Context
type favKey string
f := func(ctx context.Context, k favKey) {
if v := ctx.Value(k); v != nil {
fmt.Println("found value ", v)
return
}
fmt.Println("not found key ", k)
}
k := favKey("language")
ctx = context.WithValue(context.Background(), k, "Go")
f(ctx, k)
f(ctx, favKey("color"))
WithValue()
创建基于ctx
的子 context 携带值 options- 子协程使用
ctx.Value(key)
获取传递的值
常用 API
WithTimeout()
设置超时时间(子协程最长执行时间)
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)
- 可能使得子协程在 cancle()函数之前结束
- 可使用
ctx.Err()
获取子协程退出原因
WithDeadline
设置控制子协程的最长推出时间
func WithDeadline(parent Context, d time.Time) (Context, CancelFunc)
函数签名:
// 创建一个带有新的 Done channel 的 context,并且返回一个取消的方法
func WithCancel(parent Context) (ctx Context, cancel CancelFunc)
// 创建一个具有截止时间的 context
// 截止时间是 d 和 parent(如果有截止时间的话) 的截止时间中更早的那一个
// 当 parent 执行完毕,或 cancel 被调用 或者 截止时间到了的时候,这个 context done 掉
func WithDeadline(parent Context, d time.Time) (Context, CancelFunc)
// 其实就是调用的 WithDeadline
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)
type CancelFunc
type Context
// 一般用于创建 root context,这个 context 永远也不会被取消,或者是 done
func Background() Context
// 底层和 Background 一致,但是含义不同,当不清楚用什么的时候或者是还没准备好的时候可以用它
func TODO() Context
// 为 context 附加值
// key 应该具有可比性,一般不应该是 string int 这种默认类型,应该自己创建一个类型
// 避免出现冲突,一般 key 不应该导出,如果要导出的话应该是一个接口或者是指针
func WithValue(parent Context, key, val interface{}) Context
使用场景
超时控制
错误取消
eg:
main 并发调用f1 f2 当f1 错误返回 f2 处于阻塞,报错误 context被取消
func f1(ctx context.Context) error {
select {
case <-ctx.Done():
return fmt.Errorf("f1: %w", ctx.Err())
case <-time.After(time.Millisecond):
return fmt.Errorf("f1 err in 1ms")
}
}
func f2(ctx context.Context) error {
select {
case <-ctx.Done():
return fmt.Errorf("f2 : %w", ctx.Err())
case <-time.After(time.Hour):
return nil
}
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
if err := f1(ctx); err != nil {
fmt.Println(err)
cancel()
}
}()
go func() {
defer wg.Done()
if err := f2(ctx); err != nil {
fmt.Println(err)
cancel()
}
}()
wg.Wait()
}
类似errgroup