在Python下想将一个数组分成Batch处理是非常容易的,但是在go下好像就没那么容易了。
于是,自己动手,写了一个batch函数,支持并行操作。
func Batch(count, batchSize int, parallel int, eachFn func(idx, start, end int) error) error {
if count <= 0 || batchSize <= 0 {
return nil
}
numBatches := count / batchSize
if 0 != count%batchSize {
numBatches++
}
err := make([]error, numBatches)
// Limit parallelism
isParallel := parallel != 0
if parallel < 0 || parallel > numBatches {
parallel = numBatches
}
wg := sync.WaitGroup{}
ch := make(chan struct{}, parallel)
for idx := 0; idx < numBatches; idx++ {
sta := idx * batchSize
end := sta + batchSize
if end > count {
end = count
}
if isParallel {
wg.Add(1)
ch <- struct{}{}
go func(index, sta, end int) {
defer func() {
wg.Done()
<-ch
}()
err[index] = eachFn(index, sta, end)
}(idx, sta, end)
} else {
err[idx] = eachFn(idx, sta, end)
}
}
wg.Wait()
for _, e := range err {
if nil != e {
return e
}
}
return nil
}
大概用法是这样的:
someList := ...
parallel := -1
err := util.Batch(len(someList), someBatchSize, parallel,
func(batchId, start, end int) err error {
// do anything you want. for example:
data := someList[start:end]
for _, v := range data{
// do something
}
return nil
},
)
可以用parallel
参数限制协程数量以控制并行程度,0
是不并行,-1
是不限协程数量。
还算比较好用
发表回复