这是本节的多页打印视图。 点击此处打印.

返回本页常规视图.

concurrency的源码学习

Dapr concurrency package的源码学习

concurrency packge的代码不多,暂时只有一个 limiter.go。

1 - limiter.go的源码学习

rating limiter的代码实现和使用场景

Dapr concurrency package中的 limiter.go 文件的源码学习,rating limiter的代码实现和使用场景。

重点:充分利用 golang chan 的特性

代码实现

Limiter 结构体定义

// Limiter object
type Limiter struct {
   limit         int
   tickets       chan int
   numInProgress int32
}

字段说明:

  • limit:最大并发数的限制,这是一个配置项,默认100,初始化后不再修改。
  • tickets:用 go 的 chan 来保存和分发 tickets
  • numInProgress:当前正在执行中的数量,这是一个实时状态

构建Limiter

const (
   // DefaultLimit is the default concurrency limit
   DefaultLimit = 100
)

// NewLimiter allocates a new ConcurrencyLimiter
func NewLimiter(limit int) *Limiter {
   if limit <= 0 {
      limit = DefaultLimit
   }

   // allocate a limiter instance
   c := &Limiter{
      limit:   limit,
      // tickets chan 的 size 设置为 limit
      tickets: make(chan int, limit),
   }

   // allocate the tickets:
   // 开始时先准备和limit数量相当的可用 tickets
   for i := 0; i < c.limit; i++ {
      c.tickets <- i
   }

   return c
}

Limiter的实现

// Execute adds a function to the execution queue.
// if num of go routines allocated by this instance is < limit
// launch a new go routine to execute job
// else wait until a go routine becomes available
func (c *Limiter) Execute(job func(param interface{}), param interface{}) int {
   // 从 chan 中拿一个有效票据
   // 如果当前 chan 中有票据,则说明 go routines 的数量还没有达到 limit 的最大限制,还可以继续启动go routine执行job
   // 如果当前 chan 中没有票据,则说明 go routines 的数量已经达到 limit 的最大限制,需要限速了。execute方法会阻塞在这里,等待有job执行完成释放票据
   ticket := <-c.tickets
   // 拿到之后更新numInProgress,数量加一,要求是原子操作
   atomic.AddInt32(&c.numInProgress, 1)
   // 启动 go routine 执行 job
   go func(param interface{}) {
      // 通过defer来做 job 完成后的清理
      defer func() {
         // 将票据释放给 chan,这样后续的 job 有机会申请到
         c.tickets <- ticket
         // 更新numInProgress,数量减一,要求是原子操作
         atomic.AddInt32(&c.numInProgress, -1)
      }()

      // 执行job
      job(param)
   }(param)
   
   // 返回当前的票据号
   return ticket
}

wait方法

wait方法会阻塞并等待所有的已经通过 execute() 方法拿到票据的 go routine 执行完毕。

// Wait will block all the previously Executed jobs completed running.
//
// IMPORTANT: calling the Wait function while keep calling Execute leads to
//            un-desired race conditions
func (c *Limiter) Wait() {
   // 这是从 chan 中读取所有的票据,只要有任何票据被 job 释放都会去争抢
   // 最后wait()方法获取到所有的票据,其他 job 自然就无法获取票据从而阻塞住所有job的工作
   // 但这并不能保证一定能第一时间抢的到,如果还有其他的 job 也在调用 execute() 方法申请票据,那只有等这个 job 完成工作释放票据时再次争抢
   for i := 0; i < c.limit; i++ {
      <-c.tickets
   }
}

使用场景

并行执行批量操作时限速

pkg/grpc/api.gopkg/http/api.go 的 GetBulkState()方法中,通过 limiter 来限制批量操作的并发数量:

// 构建limiter,limit参数由 请求参数中的 Parallelism 制定
limiter := concurrency.NewLimiter(int(in.Parallelism))
n := len(reqs)
for i := 0; i < n; i++ {
   fn := func(param interface{}) {
		......
   }
    // 提交 job 给 limiter
   limiter.Execute(fn, &reqs[i])
}

// 等待所有的 job 执行完成
limiter.Wait()

在 actor 中也有类似的代码:

limiter := concurrency.NewLimiter(actorMetadata.RemindersMetadata.PartitionCount)
for i := range getRequests {
    fn := func(param interface{}) {
    	......
    }
    limiter.Execute(fn, &bulkResponse[i])
}
limiter.Wait()