如何优雅地实现并发编排任务

前端 2023-07-05 17:29:38
126阅读

 

文中转载微信公众平台「吴亲强的深夜厨房」,创作者吴亲杜兰特。转截文中请联络吴亲强的深夜厨房微信公众号。

业务场景

在做任务开发设计的情况下,大家一定会遇到下列情景:

情景1:启用第三方接口的情况下, 一个要求你需要启用不一样的插口,做数据拼装。

情景2:一个运用主页很有可能借助于许多 服务项目。那么就牵涉到在载入网页页面时必须另外要求好几个服务项目的插口。这一步通常是由后端开发统一启用拼装数据信息再回到给前面,也就是说白了的 BFF(Backend For Frontend) 层。

对于之上二种情景,假定在沒有强相互依赖下,挑选串行通信启用,那麼总用时即:

 
  1. time=s1 s2 ....sn 

依照当今秒入上百万的有为青年,那么长期早已将你祖宗十八代问好了一遍。

为了更好地杰出的KPI,大家通常会挑选高并发地启用这种依靠插口。那麼总用时便是:

 
  1. time=max(s1,s2,s3.....,sn) 

自然逐渐堆业务流程的情况下能够先串行化,直到上边的人心急的情况下,展示绝技。

那样,年末 PPT 就可以再加上浓厚的一笔流水账单:为业务流程某一插口提升 百分XXX特性,间接性造成XXX使用价值。

自然这一切的前提条件是,做老总不明白技术性,做技术性”懂”你。

大破冲霄楼,假如改动成高并发启用,你很有可能会那么写,

 
  1. package main 
  2.  
  3. import ( 
  4.     "fmt" 
  5.     "sync" 
  6.     "time" 
  7.  
  8. func main() { 
  9.     var wg sync.WaitGroup 
  10.     wg.Add(2) 
  11.  
  12.     var userInfo *User 
  13.     var productList []Product 
  14.  
  15.     go func() { 
  16.         defer wg.Done() 
  17.         userInfo, _ = getUser() 
  18.     }() 
  19.  
  20.     go func() { 
  21.         defer wg.Done() 
  22.         productList, _ = getProductList() 
  23.     }() 
  24.     wg.Wait() 
  25.     fmt.Printf("客户信息:% v\n", userInfo) 
  26.     fmt.Printf("产品信息:% v\n", productList) 
  27.  
  28.  
  29. /********客户服务**********/ 
  30.  
  31. type User struct { 
  32.     Name string 
  33.     Age uint8 
  34.  
  35. func getUser() (*User, error) { 
  36.     time.Sleep(500 * time.Millisecond) 
  37.     var u User 
  38.     u.Name = "wuqinqiang" 
  39.     u.Age = 18 
  40.     return &u, nil 
  41.  
  42. /********产品服务项目**********/ 
  43.  
  44. type Product struct { 
  45.     Title string 
  46.     Price uint32 
  47.  
  48. func getProductList() ([]Product, error) { 
  49.     time.Sleep(400 * time.Millisecond) 
  50.     var list []Product 
  51.     list = append(list, Product{ 
  52.         Title: "SHib"
  53.         Price: 10, 
  54.     }) 
  55.     return list, nil 

从完成上而言,必须是多少服务项目,会开多少个 G,运用 sync.WaitGroup 的特点,

完成高并发编辑每日任务的实际效果。

仿佛,问题不大。

可是伴随着编号 996 业务场景的提升,你能发觉,许多控制模块都是有类似的作用,仅仅相匹配的业务场景不一样罢了。

那麼大家能否抽像出一套对于此业务场景的专用工具,而把实际业务流程完成交到业务流程方。

应用

秉着不反复造轮子的标准,去搜过下开源软件,最后看到了 go-zero 里边的一个专用工具 mapreduce。

能够自主 Google 这一专有名词。

应用非常简单。大家根据它更新改造一下上边的编码:

 
  1. package main 
  2.  
  3. import ( 
  4.     "fmt" 
  5.     "github.com/tal-tech/go-zero/core/mr" 
  6.     "time" 
  7.  
  8. func main() { 
  9.     var userInfo *User 
  10.     var productList []Product 
  11.     _ = mr.Finish(func() (err error) { 
  12.         userInfo, err = getUser() 
  13.         return err 
  14.     }, func() (err error) { 
  15.         productList, err = getProductList() 
  16.         return err 
  17.     }) 
  18.     fmt.Printf("客户信息:% v\n", userInfo) 
  19.     fmt.Printf("产品信息:% v\n", productList) 
  20. //打印出 
  21. 客户信息:&{Name:wuqinqiang Age:18} 
  22. 产品信息:[{Title:SHib Price:10}] 

是否舒适多了。

可是这儿还必须留意一点,假定你启用的在其中一个服务项目不正确,而且你 return err 相匹配的不正确,那麼别的启用的服务项目会被撤销。

例如大家改动 getProductList 立即回应不正确。

 
  1. func getProductList() ([]Product, error) { 
  2.     return nil, errors.New("test error"
  3. //打印出 
  4. // 客户信息:<nil> 
  5. // 产品信息:[] 

那麼最后打印出的情况下连客户信息都是会为空,由于发生一个服务项目不正确,客户服务要求被取消了。

一般状况下,在要求服务项目不正确的情况下大家会出现最低实际操作,一个服务项目不正确不可以危害别的要求的結果。

因此在应用的情况下实际解决在于业务场景。

源代码

即然用了,那麼就追下源码吧。

 
  1. func Finish(fns ...func() error) error { 
  2.     if len(fns) == 0 { 
  3.         return nil 
  4.     } 
  5.  
  6.     return MapReduceVoid(func(source chan<- interface{}) { 
  7.         for _, fn := range fns { 
  8.             source <- fn 
  9.         } 
  10.     }, func(item interface{}, writer Writer, cancel func(error)) { 
  11.         fn := item.(func() error) 
  12.         if err := fn(); err != nil { 
  13.             cancel(err) 
  14.         } 
  15.     }, func(pipe <-chan interface{}, cancel func(error)) { 
  16.         drain(pipe) 
  17.     }, WithWorkers(len(fns))) 
  18. }  
 
  1. func MapReduceVoid(generator GenerateFunc, mapper MapperFunc, reducer VoidReducerFunc, opts ...Option) error { 
  2.     _, err := MapReduce(generator, mapper, func(input <-chan interface{}, writer Writer, cancel func(error)) { 
  3.         reducer(input, cancel) 
  4.         drain(input) 
  5.         // We need to write a placeholder to let MapReduce to continue on reducer done, 
  6.         // otherwise, all goroutines are waiting. The placeholder will be discarded by MapReduce. 
  7.         writer.Write(lang.Placeholder) 
  8.     }, opts...) 
  9.     return err 

针对 MapReduceVoid涵数,关键查询三个闭包主要参数。

  • 第一个 GenerateFunc 用以生产制造数据信息。
  • MapperFunc 载入生产制造出的数据信息,开展解决。
  • VoidReducerFunc 这儿表明不对 mapper 后的数据信息做汇聚回到。因此这一闭包在这里实际操作基本上0功效。
 
  1. func MapReduce(generate GenerateFunc, mapper MapperFunc, reducer ReducerFunc, opts ...Option) (interface{}, error) { 
  2.     source := buildSource(generate)  
  3.     return MapReduceWithSource(source, mapper, reducer, opts...) 
  4.  
  5. func buildSource(generate GenerateFunc) chan interface{} { 
  6.     source := make(chan interface{})// 建立无缓存安全通道 
  7.     threading.GoSafe(func() { 
  8.         defer close(source) 
  9.         generate(source) //逐渐生产制造数据信息 
  10.     }) 
  11.  
  12.     return source //回到无缓存安全通道 

buildSource涵数中,回到一个无缓存的安全通道。并打开一个 G 运作 generate(source),往无缓存安全通道塞数据信息。这一generate(source) 不便是一开始 Finish 传送的第一个闭包主要参数。

 
  1. return MapReduceVoid(func(source chan<- interface{}) { 
  2.     // 就这个 
  3.         for _, fn := range fns { 
  4.             source <- fn 
  5.         } 
  6.     }) 

随后查询 MapReduceWithSource 涵数,

 
  1. func MapReduceWithSource(source <-chan interface{}, mapper MapperFunc, reducer ReducerFunc, 
  2.     opts ...Option) (interface{}, error) { 
  3.     options := buildOptions(opts...) 
  4.     //每日任务实行完毕通告数据信号 
  5.     output := make(chan interface{}) 
  6.     //将mapper解决完的数据信息载入collector 
  7.     collector := make(chan interface{}, options.workers) 
  8.     // 撤销实际操作数据信号 
  9.     done := syncx.NewDoneChan() 
  10.     writer := newGuardedWriter(output, done.Done()) 
  11.     var closeOnce sync.Once 
  12.     var retErr errorx.AtomicError 
  13.     finish := func() { 
  14.         closeOnce.Do(func() { 
  15.             done.Close() 
  16.             close(output
  17.         }) 
  18.     } 
  19.     cancel := once(func(err error) { 
  20.         if err != nil { 
  21.             retErr.Set(err) 
  22.         } else { 
  23.             retErr.Set(ErrCancelWithNil) 
  24.         } 
  25.  
  26.         drain(source) 
  27.         finish() 
  28.     }) 
  29.  
  30.     go func() { 
  31.         defer func() { 
  32.             if r := recover(); r != nil { 
  33.                 cancel(fmt.Errorf("%v", r)) 
  34.             } else { 
  35.                 finish() 
  36.             } 
  37.         }() 
  38.         reducer(collector, writer, cancel) 
  39.         drain(collector) 
  40.     }() 
  41.     // 真真正正从制作器安全通道取数据信息实行Mapper 
  42.     go executeMappers(func(item interface{}, w Writer) { 
  43.         mapper(item, w, cancel) 
  44.     }, source, collector, done.Done(), options.workers) 
  45.  
  46.     value, ok := <-output 
  47.     if err := retErr.Load(); err != nil { 
  48.         return nil, err 
  49.     } else if ok { 
  50.         return value, nil 
  51.     } else { 
  52.         return nil, ErrReduceNoOutput 
  53.     } 

这一段编码挺长的,大家说下关键的点。这儿应用一个G 启用 executeMappers 方式。

 
  1. go executeMappers(func(item interface{}, w Writer) { 
  2.         mapper(item, w, cancel) 
  3.     }, source, collector, done.Done(), options.workers) 
 
  1. func executeMappers(mapper MapFunc, input <-chan interface{}, collector chan<- interface{}, 
  2.     done <-chan lang.PlaceholderType, workers int) { 
  3.     var wg sync.WaitGroup 
  4.     defer func() { 
  5.         // 等候全部每日任务所有实行结束 
  6.         wg.Wait() 
  7.         // 关掉安全通道 
  8.         close(collector) 
  9.     }() 
  10.    //依据特定总数建立 worker池 
  11.     pool := make(chan lang.PlaceholderType, workers)  
  12.     writer := newGuardedWriter(collector, done) 
  13.     for { 
  14.         select { 
  15.         case <-done: 
  16.             return 
  17.         case pool <- lang.Placeholder: 
  18.             // 从buildSource() 回到的无缓存安全通道取数据信息 
  19.             item, ok := <-input  
  20.             // 当安全通道关掉,完毕 
  21.             if !ok { 
  22.                 <-pool 
  23.                 return 
  24.             } 
  25.  
  26.             wg.Add(1) 
  27.             // better to safely run caller defined method 
  28.             threading.GoSafe(func() { 
  29.                 defer func() { 
  30.                     wg.Done() 
  31.                     <-pool 
  32.                 }() 
  33.                 //真真正正运作闭包涵数的地区 
  34.                // func(item interface{}, w Writer) { 
  35.                // mapper(item, w, cancel) 
  36.                // } 
  37.                 mapper(item, writer) 
  38.             }) 
  39.         } 
  40.     } 

实际的逻辑性已备注名称,编码非常容易懂。

一旦 executeMappers 涵数回到,关掉 collector 安全通道,那麼实行 reducer 不会再堵塞。

 
  1. go func() { 
  2.         defer func() { 
  3.             if r := recover(); r != nil { 
  4.                 cancel(fmt.Errorf("%v", r)) 
  5.             } else { 
  6.                 finish() 
  7.             } 
  8.         }() 
  9.         reducer(collector, writer, cancel) 
  10.         //这儿 
  11.         drain(collector) 
  12.     }() 

这儿的 reducer(collector, writer, cancel) 实际上就是以 MapReduceVoid 传送的第三个闭包涵数。

 
  1. func MapReduceVoid(generator GenerateFunc, mapper MapperFunc, reducer VoidReducerFunc, opts ...Option) error { 
  2.     _, err := MapReduce(generator, mapper, func(input <-chan interface{}, writer Writer, cancel func(error)) { 
  3.         reducer(input, cancel) 
  4.         //这儿 
  5.         drain(input) 
  6.         // We need to write a placeholder to let MapReduce to continue on reducer done, 
  7.         // otherwise, all goroutines are waiting. The placeholder will be discarded by MapReduce. 
  8.         writer.Write(lang.Placeholder) 
  9.     }, opts...) 
  10.     return err 

随后这一闭包涵数又实行了 reducer(input, cancel),这儿的 reducer 便是大家一开始表述过的 VoidReducerFunc,从 Finish() 而成。

这些,见到上边三个地区的 drain(input)了没有?

 
  1. // drain drains the channel. 
  2. func drain(channel <-chan interface{}) { 
  3.     // drain the channel 
  4.     for range channel { 
  5.     } 

实际上便是一个排尽 channel 的实际操作,可是三个地区都对同一个 channel做一样的实际操作,也是要我难以相信。

也有更关键的一点。

 
  1. go func() { 
  2.         defer func() { 
  3.             if r := recover(); r != nil { 
  4.                 cancel(fmt.Errorf("%v", r)) 
  5.             } else { 
  6.                 finish() 
  7.             } 
  8.         }() 
  9.         reducer(collector, writer, cancel) 
  10.         drain(collector) 
  11.     }() 

上边的编码,倘若实行 reducer,writer 载入引起 panic,那麼drain(collector) 将沒有机遇实行。

但是创作者早已修补了这个问题,立即把 drain(collector) 放进到 defer。

实际 issues[1]。

到这儿,有关 Finish 的源代码也就结束了。有兴趣的能够看一下别的源代码。

很喜欢 go-zero 里的一些专用工具,可是专用工具通常并不单独,取决于别的文件包,造成本来只为应用在其中一个专用工具却必须安裝全部包。

因此最后的結果便是扒源代码,建立无依靠库工具箱,遵照 MIT 就可以。

附则[1]https://github.com/tal-tech/go-zero/issues/676

the end
免责声明:本文不代表本站的观点和立场,如有侵权请联系本站删除!本站仅提供信息存储空间服务。