context包

控制并发的方式:

WaitGroup Context

waitGroup

  • 并发启动多个子协程,等待所有的 goroutine 完成任务 wg.Wait()等待所有的子协程完成后再执行后续代码
  • 无法主动通知子协程的退出

使用channelselect通知 goroutine 的处理

子协程使用 for 循环定时轮询,stop信道有值(可读)则退出

select {
    case <-stop :
    	...
    	return
	default :
    	...
  
}

背景:

​ 网络请求Request 需要开启 goroutine 需要去跟踪 goroutine 的方案 从而进行控制 即 goroutine 的上下文

Context

  • 通知子协程退出(正常和超时)
  • 传递必要参数

使用原则:

  • 以参数形式传递 Context 且位于入口请求和出口请求链路的每个函数的首位
  • 函数方法传递 Context 不可传递 nil ——>tarce 追踪时断掉
  • Context 线程安全
  • Context 可传递给任意个数的 goroutine 子协程受根 context 控制

控制单协程

将 chan stop 转化为 Context 追踪 goroutine

	// Background 返回空的 Context 作为整个 context 树的根节点
// WithCanclel(parent) 创建可取消的子 context 作为参数传递给 goroutine 进行跟踪
	ctx,cancel:=context.WithCancel(context.Background())

	go func(ctx context.Context) {
		for  {
			select {
                // Done()判断接收到值则结束
			case <-ctx.Done():
				fmt.Println("quit ")
				return
			default:
				fmt.Println("goroutine is under control")
				time.Sleep(2*time.Second)
			}
		}
	}(ctx)
	time.Sleep(10*time.Second)
	fmt.Println("it is time to tell ctx stop control")

	cancel()
	time.Sleep(5*time.Second)
  • context.Backgroud()创建根 Context 作为顶层的 Context
  • context.WithCancle(parent) 创建可取消的子 Context 返回函数 cancel
  • 子协程中使用select调用<-ctx.Done() 判断是否需要退出
  • 主协程使用cancle()函数通知子协程退出

控制多协程

ctx,cancle:=context.WithCancel(context.Background())
go reqTask(ctx,"one")
go reqTask(ctx,"two")
go reqTask(ctx,"three")

time.Sleep(3*time.Second)
cancle()
time.Sleep(3*time.Second)

func reqTask(ctx context.Context,name string)  {
	for  {
		select {
		case <-ctx.Done():
			fmt.Println("stop",name)
			return
		default:
			fmt.Println("send request: ",name)
			time.Sleep(time.Second)
		}

	}

cancle()终止所有的子协程

context.Context是一个接口

type Context interface {
  Deadline() (deadline time.Time, ok bool)
  Done() <-chan struct{}
  Err() error
  Value(key interface{}) interface{}
}

WithCancel()返回context.Context接口

func WithCancel(parent Context) (ctx Context, cancel CancelFunc)

// 返回的是 parent的副本newCancleCtx
func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
	if parent == nil {
		panic("cannot create context from nil parent")
	}
	c := newCancelCtx(parent)
    // propagateCancel 是结束子context
	propagateCancel(parent, &c)
	return &c, func() { c.cancel(true, Canceled) }
}
//  *ctx 本身是 cancleCtx 故而是地址传递
type cancelCtx struct {
	Context

	mu       sync.Mutex            // protects following fields
	done     chan struct{}         // created lazily, closed by first cancel call
	children map[canceler]struct{} // set to nil by the first cancel call
	err      error                 // set to non-nil by the first cancel call
}

Context.WithValue

往子协程中传递参数

其中key应该自定义 避免内置数据类型

func WithValue(parent Context, key, val interface{}) Context

	type favKey string
	f := func(ctx context.Context, k favKey) {
		if v := ctx.Value(k); v != nil {
			fmt.Println("found value ", v)
			return
		}
		fmt.Println("not found key ", k)
	}

	k := favKey("language")
	ctx = context.WithValue(context.Background(), k, "Go")
	f(ctx, k)
	f(ctx, favKey("color"))
  • WithValue()创建基于ctx的子 context 携带值 options
  • 子协程使用ctx.Value(key) 获取传递的值

常用 API

WithTimeout() 设置超时时间(子协程最长执行时间)

func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)
  • 可能使得子协程在 cancle()函数之前结束
  • 可使用 ctx.Err() 获取子协程退出原因

WithDeadline 设置控制子协程的最长推出时间

func WithDeadline(parent Context, d time.Time) (Context, CancelFunc)

函数签名:

// 创建一个带有新的 Done channel 的 context,并且返回一个取消的方法
func WithCancel(parent Context) (ctx Context, cancel CancelFunc)
// 创建一个具有截止时间的 context
// 截止时间是 d 和 parent(如果有截止时间的话) 的截止时间中更早的那一个
// 当 parent 执行完毕,或 cancel 被调用 或者 截止时间到了的时候,这个 context done 掉
func WithDeadline(parent Context, d time.Time) (Context, CancelFunc)
// 其实就是调用的 WithDeadline
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)
type CancelFunc
type Context
	// 一般用于创建 root context,这个 context 永远也不会被取消,或者是 done
    func Background() Context
	// 底层和 Background 一致,但是含义不同,当不清楚用什么的时候或者是还没准备好的时候可以用它
    func TODO() Context
	// 为 context 附加值
	// key 应该具有可比性,一般不应该是 string int 这种默认类型,应该自己创建一个类型
	// 避免出现冲突,一般 key 不应该导出,如果要导出的话应该是一个接口或者是指针
    func WithValue(parent Context, key, val interface{}) Context

使用场景

超时控制

错误取消

eg:

​ main 并发调用f1 f2 当f1 错误返回 f2 处于阻塞,报错误 context被取消

func f1(ctx context.Context) error {
	select {
	case <-ctx.Done():
		return fmt.Errorf("f1: %w", ctx.Err())
	case <-time.After(time.Millisecond):
		return fmt.Errorf("f1 err in 1ms")
	}
}
func f2(ctx context.Context) error {
	select {
	case <-ctx.Done():
		return fmt.Errorf("f2 : %w", ctx.Err())
	case <-time.After(time.Hour):
		return nil
	}
}
func main() {
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()
	var wg sync.WaitGroup
	wg.Add(2)
	go func() {
		defer wg.Done()
		if err := f1(ctx); err != nil {
			fmt.Println(err)
			cancel()
		}
	}()
	go func() {
		defer wg.Done()
		if err := f2(ctx); err != nil {
			fmt.Println(err)
			cancel()
		}
	}()
	wg.Wait()
}

类似errgroup

传递共享数据

防止 goroutine 泄露