Go并发模式之Context

wuchangjian2021-11-15 17:02:22编程学习

简介

在Go语言实现的服务器中,每个到达的请求被一个新的go协程处理。请求处理器经常开启额外的go协程访问后端,例如数据库、RPC服务。这一组服务同一个请求的go协程通常需要访问单个请求的某些值,例如终端用户的身份、权限token、请求的最后期限。
context包让同一个请求处理的go协程间传值更为方便。它还能够在处理一个请求的一组go协程间传递取消信号、截止时间。

Context

Context接口的源码如下:

// Context的方法可以被并发调用
type Context interface {

	// Deadline返回该Context将被取消的时间
	Deadline() (deadline time.Time, ok bool)

	// 如果该Context被取消或超时。Done将返回一个已关闭的通道
	Done() <-chan struct{}

	// Err返回为何这个Context被取消
	Err() error

	// Value返回和key相关的键值
	Value(key interface{}) interface{}
}

一个Context没有Cancel方法,并且Done方法返回的通道是单向的,只能接收,不能发送。这是因为一个接收取消信号的方法通常不负责发送取消信号。比如,一个父操作开启一些go协程,这些go协程不应该能够取消父go协程的工作。
一个Context可以被多个go协程并发访问。可以将一个Context传给多个go协程,然后通过取消该Context来取消所有go协程的工作。

衍生Context

context包提供了方法从既有的Context衍生出新的Context。这些Context组成一棵树。当一个Context被取消时,所有从它衍生出的Context都被取消。
BackgroundContext树的树根,它永远不会被取消。它通常被main函数、初始化或测试使用,并作为到达请求的顶级Context

// Background returns a non-nil, empty Context. It is never canceled, has no
// values, and has no deadline. It is typically used by the main function,
// initialization, and tests, and as the top-level Context for incoming
// requests.
func Background() Context {
	return background
}

WithCancelWithTimeout返回一个从传递的Context衍生的Context。通常在处理请求的处理器返回后取消与该请求相关的ContextWithCancel可以在使用多个副本时取消冗余请求。此外,WithTimeout可以在请求后端服务时设置截止时间。
WithCancel源码如下:

// WithCancel returns a copy of parent with a new Done channel. The returned
// context's Done channel is closed when the returned cancel function is called
// or when the parent context's Done channel is closed, whichever happens first.
//
// Canceling this context releases resources associated with it, so code should
// call cancel as soon as the operations running in this Context complete.
func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
	if parent == nil {
		panic("cannot create context from nil parent")
	}
	// 由parent Context构造一个Cancel Context
	c := newCancelCtx(parent)
	// 传播cancel,当父Context取消时,子Context也将取消
	propagateCancel(parent, &c)
	return &c, func() { c.cancel(true, Canceled) }
}

它调用了newCancelCtx

// newCancelCtx returns an initialized cancelCtx.
func newCancelCtx(parent Context) cancelCtx {
	return cancelCtx{Context: parent}
}

该函数构造了一个cancelCtx

// A cancelCtx can be canceled. When canceled, it also cancels any children
// that implement canceler.
type cancelCtx struct {
	Context

	mu       sync.Mutex            // protects following fields
	done     atomic.Value          // of chan struct{}, created lazily, closed by first cancel call
	children map[canceler]struct{} // set to nil by the first cancel call
	err      error                 // set to non-nil by the first cancel call
}

propagateCancel意即繁衍cancel,即设置当父Context被取消时取消子Context。其源码如下:

// propagateCancel arranges for child to be canceled when parent is.
func propagateCancel(parent Context, child canceler) {
	// Done将返回一个只能接收的通道,如果done为空,则父Context永远不会取消
	done := parent.Done()
	if done == nil {
		return // parent is never canceled
	}

	select {
	case <-done:
		// parent is already canceled
		// 父Context已被取消,将子Context也取消
		child.cancel(false, parent.Err())
		return
	default:
	}

	// parentCancelCtx返回parent对应的cancelCtx
	if p, ok := parentCancelCtx(parent); ok {
		p.mu.Lock()
		if p.err != nil {
			// parent has already been canceled
			// 父Context已经被取消,将子Context取消
			child.cancel(false, p.err)
		} else {
			// 父Context未被取消,初始化该父Context的children域,并将自身加到父CancelCtx的children域中
			if p.children == nil {
				p.children = make(map[canceler]struct{})
			}
			p.children[child] = struct{}{}
		}
		p.mu.Unlock()
	} else {
		// parentCancelCtx
		atomic.AddInt32(&goroutines, +1)
		// 没有找到parent的CancelCtx,则开启一个go协程监听父Context,当它取消时取消子Context
		go func() {
			select {
			case <-parent.Done():
				child.cancel(false, parent.Err())
			case <-child.Done():
			}
		}()
	}
}

cancel方法源于canceler接口。canceler可以用于取消Context、检查Context是否被取消:

// A canceler is a context type that can be canceled directly. The
// implementations are *cancelCtx and *timerCtx.
type canceler interface {
	cancel(removeFromParent bool, err error)
	Done() <-chan struct{}
}

cancelCtx实现了该接口。相比Context,它新增了一个互斥锁、一个通道、一个记录子Contextchildren、一个记录取消的err

// A cancelCtx can be canceled. When canceled, it also cancels any children
// that implement canceler.
type cancelCtx struct {
	Context

	mu       sync.Mutex            // protects following fields
	done     atomic.Value          // of chan struct{}, created lazily, closed by first cancel call
	children map[canceler]struct{} // set to nil by the first cancel call
	err      error                 // set to non-nil by the first cancel call
}

cancel方法的逻辑:

// cancel方法关闭了cancelCtx的通道,取消方法接收者c的子Context,
// 如果removeFromParent设置为true,则将c从其父Context移除
func (c *cancelCtx) cancel(removeFromParent bool, err error) {
	if err == nil {
		panic("context: internal error: missing cancel error")
	}
	c.mu.Lock()
	if c.err != nil {
		c.mu.Unlock()
		return // already canceled
	}
	c.err = err
	// 由上文知done的类型为atomic.Value,其实际保存的类型为chan strcut{},
	// Load方法将返回最近一次Store方法保存的值,否则它返回nil
	d, _ := c.done.Load().(chan struct{})
	if d == nil {
		// c.done没有保存通道,存一个关闭了的通道
		c.done.Store(closedchan)
	} else {
		// 关闭通道
		close(d)
	}
	// 将children域保存的子Context取消
	for child := range c.children {
		// NOTE: acquiring the child's lock while holding parent's lock.
		child.cancel(false, err)
	}
	c.children = nil
	c.mu.Unlock()

	if removeFromParent {
		removeChild(c.Context, c)
	}
}

回到WithCancel方法,该方法实际上用cancelCtx封装了一下传入的Context,返回类型仍然是Context,并且将封装后的CancelCtxcancel方法返回,提供给掌握父Context的go协程进行取消。

// WithCancel返回了一个Context
func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
	if parent == nil {
		panic("cannot create context from nil parent")
	}
	// 由parent Context构造一个Cancel Context
	c := newCancelCtx(parent)
	// 向下传播cancel,当父Context取消时,子Context也将取消
	propagateCancel(parent, &c)
	return &c, func() { c.cancel(true, Canceled) }
}

例子

以下例子是一个处理请求的服务器。它注册了handleSearch来处理/search请求。处理器创建了一个初始Context,并且安排它在处理器返回后取消。如果请求中包含了timeout字段,Contexttimeout时长后自动取消。

func handleSearch(w http.ResponseWriter, req *http.Request) {
    var (
        ctx    context.Context
        cancel context.CancelFunc
    )
    timeout, err := time.ParseDuration(req.FormValue("timeout"))
    // 以根Context调用上文所述WithTimeout,得到子Context和取消方法cancel
    if err == nil {
        ctx, cancel = context.WithTimeout(context.Background(), timeout)
    } else {
        ctx, cancel = context.WithCancel(context.Background())
    }
    // 在handleSearch方法返回时调用取消方法,将取消该Context下的所有子Context
    defer cancel()

Context的级联取消:

func main() {
	route := gin.Default()

	route.GET("/hello", func(c *gin.Context) {
		// gin默认返回的Context就是context包下的Background context
		ctx, cancel := context.WithCancel(c.Request.Context())
		// 额外的go协程来处理请求
		go doSomething(ctx)
		time.Sleep(5 * time.Second)
		cancel()
	})
	route.Run(":8080")
}

// doSomething将递归创建Context,直至父Context取消
func doSomething(ctx context.Context) {
	for {
		time.Sleep(time.Second)
		select {
		case <-ctx.Done():
			fmt.Println("context is done.")
			return
		default:
			fmt.Println("context is not done...")
			// 未对孙Context进行取消,但孙Context将因父Context取消而取消
			c, _ := context.WithCancel(ctx)
			doSomething(c)
		}
	}
}

参考资料

Go Concurrency Patterns: Context

context documentation

发表评论    

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。