Skip to content

mapreduce.Finish中嵌套使用mapreduce.MapReduce会导致Finish变成非阻塞操作 #4

@lujin123

Description

@lujin123

嵌套的MapReducereducer中如果不调用writer.Write方法,会产生一个ErrReduceNoOutput错误,Finish 中 worker 返回异常会直接结束 Finish 调用,但是Finish中调用的MapReduceVoid会吞掉ErrReduceNoOutput错误返回一个 nil,从最后结果看是没有异常的成功调用,实际其他的 worker 都还在异步运行

例如下面这样的调用:

func main(){
        err := mapreduce.Finish(func() error {
		return worker1()
	}, func() error {
		val, err := mapreduce.MapReduce(func(source chan<- interface{}) {
			for i := 0;i<10;i++{
				source <- i
			}
		}, func(item interface{}, writer mapreduce.Writer, cancel func(error)) {
			i := item.(int)
			writer.Write(i * i)
		}, func(pipe <-chan interface{}, writer mapreduce.Writer, cancel func(error)) {
			var cnt int
			for i := range pipe{
				cnt += i.(int)
			}
                         // 这里不调用Write 会导致当前这个 worker 任务返回一个异常
			// writer.Write(cnt) 
		})
                 // 收到一个异常 `ErrReduceNoOutput`
		if err != nil {
			return err
		}
		fmt.Println("result:", val)
	})
       // 这里的 err 是 nil
       if err != nil {
           fmt.Println(err)
      }
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions