问小白 wenxiaobai
资讯
历史
科技
环境与自然
成长
游戏
财经
文学与艺术
美食
健康
家居
文化
情感
汽车
三农
军事
旅行
运动
教育
生活
星座命理

Go 并发:互斥体与通道的示例

创作时间:
作者:
@小白创作中心

Go 并发:互斥体与通道的示例

引用
1
来源
1.
https://m.php.cn/faq/1176631.html

在Go语言中构建并发应用程序时,同步对于确保安全访问共享数据至关重要。本文通过一个具体的计数器示例,详细讲解了如何使用互斥体、缓冲通道和无缓冲通道来实现线程安全的计数器。

介绍

在 go 中构建并发应用程序时,同步对于确保安全访问共享数据至关重要。在 go 中,互斥体通道是用于同步的主要工具。

动机

这几天我在学习golang,遇到一个有趣的问题,我需要构建一个可以安全并发使用的计数器。
但是,在提到的文章中,作者使用一种方法解决了该问题:互斥体。但我想知道是否可以使用缓冲通道无缓冲通道来解决同样的问题。

看看柜台代码:

package main

type counter struct {
    count int
}

func (c *counter) inc() {
    c.count++
}

func (c *counter) value() int {
    return c.count
}

为了确保我们的代码可以安全地同时使用,让我们开始编写一些测试。
我们先从最简单的方法开始。

1) 互斥体

互斥体(“互斥”的缩写)是一种同步原语,它确保一次只有一个goroutine 可以访问代码的关键部分。
它提供了一种锁定机制,当一个 goroutine 锁定一个互斥体时,其他试图锁定它的 goroutine 将会阻塞,直到该互斥体被解锁。因此,当您需要保护共享变量或资源免受竞争条件
影响时,通常会使用它

package main

import (
    "sync"
    "testing"
)

func testcounter(t *testing.T) {
    t.Run("using mutexes and wait groups", func(t *testing.T) {
        counter := counter{}
        wantedcount := 1000
        var wg sync.WaitGroup
        var mut sync.Mutex
        wg.Add(wantedcount)
        for i := 0; i < wantedcount; i++ {
            go func() {
                mut.Lock()
                counter.inc()
                mut.Unlock()
                wg.Done()
            }()
        }
        wg.Wait()
        if counter.value() != wantedcount {
            t.Errorf("got %d, want %d", counter.value(), wantedcount)
        }
    })
}
  • sync.WaitGroup等待组用于跟踪所有goroutine的完成情况。
  • sync.Mutex 用于防止多个 goroutine 同时访问共享计数器(以避免竞争条件)。
  • 循环启动 1000 个 goroutine。每个 goroutine 都会执行以下操作:
    1. mut.Lock():在访问计数器并调用其 inc() 方法之前首先锁定互斥锁。这确保一次只有一个 goroutine 可以增加计数器,从而防止出现竞争情况。
    2. counter.inc():由于互斥锁,一次只有一个 goroutine 可以调用此方法。
    3. mut.Unlock():在计数器递增后解锁互斥体。这允许其他 goroutine 获取锁并执行自己的增量操作。
    4. wg.Done():调用 wg.Done() 来表示它已完成其工作(递增计数器)。这会将 waitgroup 计数器减一。
  • wg.Wait():这使得主 goroutine 等待,直到所有 1000 个 worker goroutine 都完成。 wait() 方法会阻塞,直到 waitgroup 计数器达到零(当所有 wg.Done() 调用都已完成时)。

2) 缓冲通道

通道是 go 允许 goroutine 彼此安全通信的方式。它们支持 goroutine 之间的数据传输,并通过控制对正在传递的数据的访问来提供同步。
话虽如此,在我们的示例中,我们将在通道中利用这一事实来阻止 goroutine,并只让一个 goroutine 访问共享数据。
在这种情况下,缓冲通道具有固定容量,这意味着它们可以在阻止发送者之前容纳预定义数量的元素。发送方仅在缓冲区已满时阻塞。

package main

import (
    "sync"
    "testing"
)

func testcounter(t *testing.T) {
    t.Run("using buffered channels and wait groups", func(t *testing.T) {
        counter := counter{}
        wantedcount := 1000
        var wg sync.WaitGroup
        wg.Add(wantedcount)
        ch := make(chan struct{}, 1)
        ch <- struct{}{}
        for i := 0; i < wantedcount; i++ {
            go func() {
                <-ch
                counter.inc()
                ch <- struct{}{}
                wg.Done()
            }()
        }
        wg.Wait()
        if counter.value() != wantedcount {
            t.Errorf("got %d, want %d", counter.value(), wantedcount)
        }
    })
}
  • ch := make(chan struct{}, 1):创建一个容量为1的缓冲通道ch。缓冲区大小为 1 一次只允许一个 goroutine 写入通道。
  • chan struct{}:使用空结构而不是其他类型(如 int、bool 等),因为它不占用内存。它的大小为0 字节。这使得它非常适合信号发送等场景,您不需要传递任何实际数据,只需传递信号。另一方面,其他类型(如 int、bool 等)会消耗更多内存,当您只需要信号时,这是不必要的。
  • ch main 函数 发送到缓冲通道,以允许第一个 goroutine 启动。由于通道的容量为 1,因此该操作不会阻塞,并且使第一个工作 goroutine 能够继续进行。
  • 循环启动 1000 个 goroutine。每个 goroutine 都会执行以下操作:
    1. 或第一个循环中的第一个信号(前一点) 来增加计数器。
    2. counter.inc():一旦接收到信号,计数器就加1。
    3. ch

3) 无缓冲通道

这些通道没有缓冲区。它们会阻塞发送者,直到接收者准备好接收数据。这提供了严格的同步,数据在 goroutine 之间一次传递一个。

package main

import (
    "sync"
    "testing"
)

func testcounter(t *testing.T) {
    t.Run("using unbuffered channels and wait groups", func(t *testing.T) {
        counter := counter{}
        wantedcount := 1000
        var wg sync.WaitGroup
        wg.Add(wantedcount)
        ch := make(chan struct{})
        go func() {
            ch <- struct{}{}
        }()
        for i := 0; i < wantedcount; i++ {
            go func() {
                <-ch
                counter.inc()
                go func() {
                    ch <- struct{}{}
                }()
                wg.Done()
            }()
        }
        wg.Wait()
        if counter.value() != wantedcount {
            t.Errorf("got %d, want %d", counter.value(), wantedcount)
        }
    })
}
  • ch := make(chan struct{}):使用 struct{} 类型创建一个无缓冲通道。使用 struct{} 类型是因为它不保存任何数据,并且通道纯粹用于信号发送。
  • go func() { ch
  • 循环启动 1000 个 goroutine。每个 goroutine 都会执行以下操作:
    1. counter.inc():收到信号后计数器加 1。这是计数器更新的关键部分,它受到信号机制的保护,因此任何时候只有一个 goroutine 可以递增计数器。
    2. go func() { ch 在一个单独的goroutine中完成,以确保通道操作(ch 外部goroutine。这确保了通道操作不会导致死锁。

4) 没有等待组的缓冲通道

使用上述解决方案解决这个问题后,我问自己,“我可以在没有等待组的情况下解决它吗?”。其实我想出了两个解决方案。
事实上,
等待组使主函数等待,直到所有子协程完成。所以我认为我们可以使用无限循环来中断条件,或者我们可以使用另一个通道来跟踪 goroutine 的完成情况。
让我们使用
无限循环进入代码。

package main

import (
    "sync"
    "testing"
)

func testcounter(t *testing.T) {
    t.Run("using buffered channels without wait groups (infinite loop)", func(t *testing.T) {
        counter := counter{}
        wantedcount := 1000
        ch := make(chan struct{}, 1)
        ch <- struct{}{}
        for i := 0; i < wantedcount; i++ {
            go func() {
                <-ch
                counter.inc()
                ch <- struct{}{}
            }()
        }
        for {
            if counter.value() == wantedcount {
                break
            }
        }
        if counter.value() != wantedcount {
            t.Errorf("got %d, want %d", counter.value(), wantedcount)
        }
    })
}

如您所见,这是一个幼稚的解决方案,而不是
等待组,我使用的是无限循环**,它循环直到匹配此条件 counter.value() == wantcount这意味着所有 goroutine 都已完成。简单。**
另一个解决方案是使用
另一个渠道

package main

import (
    "sync"
    "testing"
)

func TestCounter(t *testing.T) {
    t.Run("using buffered channels without wait groups (another channel)", func(t *testing.T) {
        counter := Counter{}
        wantedCount := 1000
        ch := make(chan struct{}, 1)
        wc := make(chan struct{}, 1)
        ch <- struct{}{}
        for i := 0; i < wantedCount; i++ {
            go func() {
                <-ch
                counter.Inc()
                ch <- struct{}{}
                if counter.Value() == wantedCount {
                    close(wc)
                }
            }()
        }
        <-wc
        if counter.Value() != wantedCount {
            t.Errorf("got %d, want %d", counter.Value(), wantedCount)
        }
    })
}

如你所见,我正在使用另一个等待通道 wc,它将在最后一个 goroutine close(wc) 结束时关闭“signal”。

close(wc) 通过关闭 wc 通道,它向接收者 此时,wc 通道释放了块,这使我们保证所有 goroutine 都已完成。

结论

在本文中,我们探索了不同的方法来解决在 go 中构建可安全并发使用的计数器的问题。虽然我们引用的文章使用
mutexes实现了解决方案,但我们还讨论了使用bufferedunbuffered channels的替代方法。
了解这些工具以及何时使用它们是编写高效、安全的并发 go 程序的关键。
因此,无论您选择互斥体、缓冲通道还是无缓冲通道,掌握 go 中的同步都是至关重要的,它将帮助您构建可以轻松处理并发的健壮应用程序。

© 2023 北京元石科技有限公司 ◎ 京公网安备 11010802042949号