Go 中的 Channel 与 Java BlockingQueue 的本质区别

知新闻 2023-07-05 17:29:38
56阅读

前言

最近在实现两个需求,由于两者之间并没有依赖关系,所以想利用队列进行解耦;但在 Go 的标准库中并没有现成可用并且并发安全的数据结构;但 Go 提供了一个更加优雅的解决方案,那就是 channel。

channel 应用

Go 与 Java 的一个很大的区别就是并发模型不同,Go 采用的是 CSP(Communicating sequential processes) 模型;用 Go 官方的说法:

Do not communicate by sharing memory; instead, share memory by communicating.

翻译过来就是:不用使用共享内存来通信,而是用通信来共享内存。

而这里所提到的通信,在 Go 里就是指代的 channel。

只讲概念并不能快速的理解与应用,所以接下来会结合几个实际案例更方便理解。

futrue task

Go 官方没有提供类似于 Java 的 FutureTask 支持:

 
  1.     public static void main(String[] args) throws InterruptedException, ExecutionException { 
  2.         ExecutorService executorService = Executors.newFixedThreadPool(2); 
  3.         Task task = new Task(); 
  4.         FutureTask<String> futureTask = new FutureTask<>(task); 
  5.         executorService.submit(futureTask); 
  6.         String s = futureTask.get(); 
  7.         System.out.println(s); 
  8.         executorService.shutdown(); 
  9.     } 
  10.  
  11. class Task implements Callable<String> { 
  12.     @Override 
  13.     public String call() throws Exception { 
  14.         // 模拟http 
  15.         System.out.println("http request"); 
  16.         Thread.sleep(1000); 
  17.  
  18.         return "request success"
  19.     } 

但我们可以使用 channel 配合 goroutine 实现类似的功能:

 
  1. func main() { 
  2.  ch := Request("https://github.com"
  3.  select { 
  4.  case r := <-ch: 
  5.   fmt.Println(r) 
  6.  } 
  7. func Request(url string) <-chan string { 
  8.  ch := make(chan string) 
  9.  go func() { 
  10.   // 模拟http请求 
  11.   time.Sleep(time.Second
  12.   ch <- fmt.Sprintf("url=%s, res=%s", url, "ok"
  13.  }() 
  14.  return ch 

goroutine 发起请求后直接将这个 channel 返回,调用方会在请求响应之前一直阻塞,直到 goroutine 拿到了响应结果。

goroutine 互相通信

 
  1. /** 
  2.   * 偶数线程 
  3.   */ 
  4.  public static class OuNum implements Runnable { 
  5.      private TwoThreadWaitNotifySimple number; 
  6.  
  7.      public OuNum(TwoThreadWaitNotifySimple number) { 
  8.          this.number = number; 
  9.      } 
  10.  
  11.      @Override 
  12.      public void run() { 
  13.          for (int i = 0; i < 11; i++) { 
  14.              synchronized (TwoThreadWaitNotifySimple.class) { 
  15.                  if (number.flag) { 
  16.                      if (i % 2 == 0) { 
  17.                          System.out.println(Thread.currentThread().getName() + "+-+偶数" + i); 
  18.  
  19.                          number.flag = false
  20.                          TwoThreadWaitNotifySimple.class.notify(); 
  21.                      } 
  22.  
  23.                  } else { 
  24.                      try { 
  25.                          TwoThreadWaitNotifySimple.class.wait(); 
  26.                      } catch (InterruptedException e) { 
  27.                          e.printStackTrace(); 
  28.                      } 
  29.                  } 
  30.              } 
  31.          } 
  32.      } 
  33.  } 
  34.  
  35.  
  36.  /** 
  37.   * 奇数线程 
  38.   */ 
  39.  public static class JiNum implements Runnable { 
  40.      private TwoThreadWaitNotifySimple number; 
  41.  
  42.      public JiNum(TwoThreadWaitNotifySimple number) { 
  43.          this.number = number; 
  44.      } 
  45.  
  46.      @Override 
  47.      public void run() { 
  48.          for (int i = 0; i < 11; i++) { 
  49.              synchronized (TwoThreadWaitNotifySimple.class) { 
  50.                  if (!number.flag) { 
  51.                      if (i % 2 == 1) { 
  52.                          System.out.println(Thread.currentThread().getName() + "+-+奇数" + i); 
  53.  
  54.                          number.flag = true
  55.                          TwoThreadWaitNotifySimple.class.notify(); 
  56.                      } 
  57.  
  58.                  } else { 
  59.                      try { 
  60.                          TwoThreadWaitNotifySimple.class.wait(); 
  61.                      } catch (InterruptedException e) { 
  62.                          e.printStackTrace(); 
  63.                      } 
  64.                  } 
  65.              } 
  66.          } 
  67.      } 
  68.  } 

这里截取了”两个线程交替打印奇偶数“的部分代码。

Java 提供了 object.wait()/object.notify() 这样的等待通知机制,可以实现两个线程间通信。

go 通过 channel 也能实现相同效果:

 
  1. func main() { 
  2.  ch := make(chan struct{}) 
  3.  go func() { 
  4.   for i := 1; i < 11; i++ { 
  5.    ch <- struct{}{} 
  6.    //奇数 
  7.    if i%2 == 1 { 
  8.     fmt.Println("奇数:", i) 
  9.    } 
  10.   } 
  11.  }() 
  12.  
  13.  go func() { 
  14.   for i := 1; i < 11; i++ { 
  15.    <-ch 
  16.    if i%2 == 0 { 
  17.     fmt.Println("偶数:", i) 
  18.    } 
  19.   } 
  20.  }() 
  21.  
  22.  time.Sleep(10 * time.Second

本质上他们都是利用了线程(goroutine)阻塞然后唤醒的特性,只是 Java 是通过 wait/notify 机制;

而 go 提供的 channel 也有类似的特性:

向 channel 发送数据时(ch<-struct{}{})会被阻塞,直到 channel 被消费(<-ch)。

以上针对于无缓冲 channel。

channel 本身是由 go 原生保证并发安全的,不用额外的同步措施,可以放心使用。

广播通知

不仅是两个 goroutine 之间通信,同样也能广播通知,类似于如下 Java 代码:

 
  1. public static void main(String[] args) throws InterruptedException { 
  2.     for (int i = 0; i < 10; i++) { 
  3.         new Thread(() -> { 
  4.             try { 
  5.                 synchronized (NotifyAll.class){ 
  6.                     NotifyAll.class.wait(); 
  7.                 } 
  8.                 System.out.println(Thread.currentThread().getName() + "done...."); 
  9.             } catch (InterruptedException e) { 
  10.                 e.printStackTrace(); 
  11.             } 
  12.         }).start(); 
  13.     } 
  14.     Thread.sleep(3000); 
  15.     synchronized (NotifyAll.class){ 
  16.         NotifyAll.class.notifyAll(); 
  17.     } 

主线程将所有等待的子线程全部唤醒,这个本质上也是通过 wait/notify 机制实现的,区别只是通知了所有等待的线程。

换做是 go 的实现:

 
  1. func main() { 
  2.  notify := make(chan struct{}) 
  3.  for i := 0; i < 10; i++ { 
  4.   go func(i int) { 
  5.    for { 
  6.     select { 
  7.     case <-notify: 
  8.      fmt.Println("done.......",i) 
  9.      return 
  10.     case <-time.After(1 * time.Second): 
  11.      fmt.Println("wait notify",i) 
  12.  
  13.     } 
  14.    } 
  15.   }(i) 
  16.  } 
  17.  time.Sleep(1 * time.Second
  18.  close(notify) 
  19.  time.Sleep(3 * time.Second

当关闭一个 channel 后,会使得所有获取 channel 的 goroutine 直接返回,不会阻塞,正是利用这一特性实现了广播通知所有 goroutine 的目的。

注意,同一个 channel 不能反复关闭,不然会出现panic。

channel 解耦

以上例子都是基于无缓冲的 channel,通常用于 goroutine 之间的同步;同时 channel 也具备缓冲的特性:

 
  1. ch :=make(chan T, 100) 

可以直接将其理解为队列,正是因为具有缓冲能力,所以我们可以将业务之间进行解耦,生产方只管往 channel 中丢数据,消费者只管将数据取出后做自己的业务。

同时也具有阻塞队列的特性:

  • 当 channel 写满时生产者将会被阻塞。
  • 当 channel 为空时消费者也会阻塞。

从上文的例子中可以看出,实现相同的功能 go 的写法会更加简单直接,相对的 Java 就会复杂许多(当然这也和这里使用的偏底层 api 有关)。

Java 中的 BlockingQueue

这些特性都与 Java 中的 BlockingQueue 非常类似,他们具有以下的相同点:

  • 可以通过两者来进行 goroutine/thread 通信。
  • 具备队列的特征,可以解耦业务。
  • 支持并发安全。

同样的他们又有很大的区别,从表现上看:

  • channel 支持 select 语法,对 channel 的管理更加简洁直观。
  • channel 支持关闭,不能向已关闭的 channel 发送消息。
  • channel 支持定义方向,在编译器的帮助下可以在语义上对行为的描述更加准确。

当然还有本质上的区别就是 channel 是 go 推荐的 CSP 模型的核心,具有编译器的支持,可以有很轻量的成本实现并发通信。

而 BlockingQueue 对于 Java 来说只是一个实现了并发安全的数据结构,即便不使用它也有其他的通信方式;只是他们都具有阻塞队列的特征,所有在初步接触 channel 时容易产生混淆。

相同点 channel 特有
阻塞策略 支持select
设置大小 支持关闭
并发安全 自定义方向
普通数据结构 编译器支持

总结

有过一门编程语言的使用经历在学习其他语言是确实是要方便许多,比如之前写过 Java 再看 Go 时就会发现许多类似之处,只是实现不同。

拿这里的并发通信来说,本质上是因为并发模型上的不同;

Go 更推荐使用通信来共享内存,而 Java 大部分场景都是使用共享内存来通信(这样就得加锁来同步)。

带着疑问来学习确实会事半功倍。

the end
免责声明:本文不代表本站的观点和立场,如有侵权请联系本站删除!本站仅提供信息存储空间服务。