supuwoerc:
晚上看了会 Medium 上的博客,发现了一篇关于使用 Go 来实现并发的获取数据。
我在作者原有的基础上改造了一下,限制了最大的数量,不知道有没有什么疏漏,请大家指教指教。
另外我也在思考关于 GC 的问题,如果代码中在某一次循环中走了<-ctx.Done()的分支,直接返回了结果,productsChan 会被怎么回收?我应该如何关闭相关的 channel ?还是让程序自己处理?
Medium 原文地址
我的改造:
package main
import "sync"
import "runtime"
import "fmt"
var LIST_PRODUCT_TYPE = [100000]string{"food", "electronics", "clothing","...more"} // ......非常多的数据需要查询
type GetListProductResponse struct {
Data []ProductListResponse `json:"data"`
}
type ProductListResponse struct {
Code string `json:"code"`
Name string `json:"name"`
Price string `json:"price"`
Status bool `json:"status"`
}
func getProducts(ctx context.Context, req *GetProductListRequest) (*GetListProductResponse, error) {
// calling endpoint 3rd party
// parse to response
// and return the data
return &productList, nil
}
func main() {
ctx, cancelFunc := context.WithCancel(ctx)
defer cancelFunc()
wg := sync.WaitGroup{}
doneChan := make(chan struct{}, 1)
productsChan := make(chan *GetListProductResponse)
errChan := make(chan error)
// LIST_PRODUCT_TYPE 数量非常大,需要限制最大的并发数量
maxConcurrency := 5
semaphore := make(chan struct{}, maxConcurrency)
wg.Add(len(LIST_PRODUCT_TYPE))
for key := range LIST_PRODUCT_TYPE {
req := &GetProductListRequest{
ProductType: LIST_PRODUCT_TYPE[key],
}
select {
case <-ctx.Done():
return nil, ctx.Err()
case semaphore<-struct{}{}:
go func() {
defer wg.Done()
defer func() { <-semaphore }()
products, err := getProductList(ctx, req)
if err != nil {
errChan <- err
return
}
productsChan <- products
}()
}
}
go func() {
wg.Wait()
doneChan <- struct{}{}
}()
var (
catalogues GetListProductResponse
data []ProductListResponse
)
for {
select {
case <-ctx.Done():
return nil, ctx.Err()
case err := <-errChan:
return nil, err
case products := <-productsChan:
data = append(data, products.Data...)
catalogues.Data = data
case <-doneChan:
return &catalogues, nil
}
}
}
Read More