协程并发下数据汇总:除了互斥锁,还有其他方式吗

科技资讯 投稿 5100 0 评论

协程并发下数据汇总:除了互斥锁,还有其他方式吗

1. 简介

首先,通过一个实例,描述了一个并发拉取数据并汇总的案例,并使用互斥锁来确保线程安全。然后,讨论了互斥锁的一些缺点,引出了通道作为一种替代方案,并介绍了通道的基本使用和特性。接下来,通过实例演示了如何使用通道来实现并发下的数据汇总。

2. 问题引入

在请求处理过程中,经常需要通过RPC接口拉取数据。有时候,由于数据量较大,单个数据拉取操作可能会导致整个请求的处理时间较长。为了加快处理速度,我们通常考虑同时开启多个协程并发地拉取数据。一旦多个协程并发拉取数据后,主协程需要汇总这些协程拉取到的数据,然后再返回结果。在这个过程中,往往涉及对共享资源的并发访问,为了保证线程安全性,通常会使用互斥锁。下面通过一个简单的代码来展示该过程:

package main

import (
        "fmt"
        "sync"
        "time"


type Data struct {
        ID   int
        Name string
}

var (
        // 汇总结果
        dataList []Data
        // 互斥锁
        mutex    sync.Mutex


func fetchData(page int, wg *sync.WaitGroup {
        // 模拟RPC接口拉取数据的耗时操作
        time.Sleep(time.Second

        // 假设从RPC接口获取到了一批数据
        data := Data{
                ID:   page,
                Name: fmt.Sprintf("Data %d", page,
        }

        // 使用互斥锁保护共享数据的并发访问
        mutex.Lock(
        defer mutext.Unlock(
        dataList = append(dataList, data

        wg.Done(
}

func main( {
        var wg sync.WaitGroup

        // 定义需要拉取的数据页数
        numPages := 10

        // 启动多个协程并发地拉取数据
        for i := 1; i <= numPages; i++ {
            wg.Add(1
            go fetchData(i, &wg
        }

        // 等待所有协程完成
        wg.Wait(

        // 打印拉取到的数据
        fmt.Println("Fetched data:"
        for _, data := range dataList {
            fmt.Printf("ID: %d, Name: %s\n", data.ID, data.Name
        }
}

在上述示例中,我们定义了一个共享的dataList切片用于保存拉取到的数据。每个goroutine通过调用fetchData函数来模拟拉取数据的过程,并使用互斥锁mutex保护dataList的并发访问。主协程使用sync.WaitGroup等待所有协程完成数据拉取任务,然后打印出拉取到的数据。通过并发地拉取数据,并使用互斥锁保证线程安全,我们可以显著提高数据拉取的速度,并且确保数据的正确性和一致性。

那我们其实就有疑问,在协程并发下数据汇总的场景,是否存在其他方式,不需要通过使用互斥锁,也能够保证线程安全呢? 其实还真有,Go语言中的channel非常适用于这种情况。通过使用通道,我们可以实现线程安全的数据共享和同步,而无需显式地使用互斥锁。下面我们来了解一下channel

3. channel的使用

3.1 channel的基本介绍

3.1.1 基本说明

channel在Go语言中是一种特殊的数据结构,用于协程之间的通信和同步。它类似于一个先进先出(FIFO的队列,用于数据的传输和共享。在并发环境中,可以将数据发送到通道,也可以从通道中接收数据,而这两个操作都是线程安全的。

channel的优势在于它提供了内置的同步机制,无需显式地使用互斥锁来处理并发访问。

同时,当多个协程同时访问通道时,Go运行时系统会自动处理协程之间的同步和并发访问的细节,保证数据的正确性和一致性。从而可以放心地在多个协程中使用通道进行数据的发送和接收操作,而不需要额外的锁或同步机制来保证线程安全。

channel其实是可以避免常见的并发问题,如竞态条件和死锁,简化了并发编程的复杂性。

3.1.2 基本使用

channel的基本介绍,我们已经对channel有了基本的了解,其实可以粗略理解其为一个并发安全的队列。下面来了解下channel的基本语法,从而能够开始使用channel

channel,发送数据到channel,接收channel中的数据,以及关闭channel。下面对其进行简单展示:

int、string等:

ch := make(chan int

发送数据到channel:使用<-操作符将数据发送到通道中

ch <- data

接收channel中的数据: 使用<-操作符从通道中接收数据

result := <-ch

关闭channel,使用close函数关闭通道。关闭通道后,仍然可以从通道接收数据,但无法再向通道发送数据

close(ch

通过上面channel的四个基本操作,便能够实现在不同协程间线程安全得传递数据。最后通过一个例子,完整得展示channel的基本使用。

package main

import "fmt"

func main( {
        ch := make(chan string // 创建字符串通道
        defer close(ch
        go func( {
                ch <- "hello, channel!" // 发送数据到通道
        }(

        result := <-ch // 从通道接收数据
        fmt.Println(result
}

在这个示例中,我们创建了一个字符串通道ch。然后,在一个单独的协程中,我们向通道发送了字符串"hello, channel!"。最后,主协程从通道中接收数据,并将其打印出来。

3.2 使用channel实现汇总数据

下面,我们使用channel来实现并发数据汇总,替换掉之前使用互斥锁来保证线程安全的实现:

package main

import (
        "fmt"
        "sync"
        "time"


type Data struct {
        ID   int
        Name string
}

func fetchData(page int, ch chan Data, wg *sync.WaitGroup {
        // 模拟 RPC 接口拉取数据的耗时操作
        time.Sleep(time.Second

        // 假设从 RPC 接口获取到了一批数据
        data := Data{
                ID:   page,
                Name: fmt.Sprintf("Data %d", page,
        }

        ch <- data // 将数据发送到通道

        wg.Done(
}

func main( {
        var wg sync.WaitGroup

        // 定义需要拉取的数据页数
        numPages := 10

        dataCh := make(chan Data, 10 // 创建用于接收数据的通道

        // 启动多个协程并发地拉取数据
        for i := 1; i <= numPages; i++ {
                wg.Add(1
                go fetchData(i, dataCh, &wg
        }

        go func( {
                wg.Wait(
                close(dataCh // 关闭通道,表示数据已经全部发送完成
        }(

        // 从通道接收数据并汇总
        var dataList []Data
        for data := range dataCh {
            dataList = append(dataList, data
        }

        // 打印拉取到的数据
        fmt.Println("Fetched data:"
        for _, data := range dataList {
                fmt.Printf("ID: %d, Name: %s\n", data.ID, data.Name
        }
}

在修改后的代码中,我们创建了一个用于接收数据的 dataCh。每个协程通过将数据发送到该channel 来完成数据的汇总。主协程通过从channel接收数据,并将其添加到 dataList 中实现数据的汇总过程。这种方式不需要显式地加锁和解锁,并且避免了互斥锁带来的复杂性和性能问题。

channel,我们能够以一种更直观、更安全的方式实现协程之间的数据传递和同步。channel在并发编程中起到了关键的作用,简化了并发操作的管理和实现。同时,它提供了内置的同步机制,保证了数据的正确性和一致性,避免了死锁和竞态条件的问题。

3.3 总结

在原始的实现中,使用了互斥锁来保护共享数据的并发访问。互斥锁提供了互斥访问的机制,确保同一时间只有一个协程可以访问共享数据,从而避免了数据竞争和不一致性。这种方式在保证线程安全的同时,引入了锁的开销和复杂性。

channel来实现协程间的安全数据传递可以更简洁和高效。每个协程可以将拉取到的数据通过channel发送到主协程,主协程通过接收channel中的数据来进行汇总。channel提供了并发安全的数据传递机制,协程之间的数据传输是同步和有序的。由于channel本身就提供了同步机制,不需要额外的锁和同步操作,能够更简洁地实现协程间的安全数据传递。

channel来实现是相对比较合适的。

4. 开源项目中的使用

etcd进行性能测试,此时需要模拟大量并发请求,对etcd进行负载测试,并收集每个请求的执行时间、成功/失败状态等结果数据。然后主协程需要收集每一个请求的结果数据,并进行统计计算,生成相应的性能报告。基于此,能够计算出总请求数、请求成功率、平均执行时间、最慢/最快请求等统计信息,以及错误分布情况和慢速请求的详细信息。

channel来实现协程间的数据传输,是非常合适的。

etcd中对应的实现。etcd中存在一个report对象的实现,能够接受一系列的请求数据的结果,然后生成性能报告返回回去。结构体定义如下:

type report struct {
   results   chan Result
   stats Stats
}
func (r *report Results( chan<- Result { return r.results }

// Result describes the timings for an operation.
type Result struct {
   Start  time.Time
   End    time.Time
   Err    error
}

func newReport(precision string *report {
   r := &report{
      results:   make(chan Result, 16,
   }
   return r
}
Result结构体为单个测试的结果,而 report 结构体则用于整个测试过程的报告和统计信息。通过使用 results 通道,可以将每个测试的结果发送到 report 结构体中,以便进行统计和生成报告。

newReport生成一个report对象,然后启动多个协程同时进行压测请求,每一个请求处理完成之后,便会生成一个处理结果,存储到Result对象当中。然后基于report对象的Results方法获取到对应的channel,将处理结果传输给主协程。

report对象中的results变量对应的channel,汇总计算所有处理结果,基于此便能够生成压测结果和报告。下面来看其具体流程。

report对象,然后启动多个协程来处理请求,将结果发送到report对象中的results对应的channel中。

// 这里NewReportSample方法,其实是对上面newReport方法的一个封装
r := NewReportSample("%f"
// 这里假设只有一个协程,模拟执行一系列的测试,并将测试结果发送到 Report 对象的 results 通道中。
go func( {
   start := time.Now(
   for i := 0; i < 5; i++ {
      // 不真实进行请求,只是简单获取执行结果,将测试结果进行传输
      end := start.Add(time.Second
      r.Results( <- Result{Start: start, End: end}
      start = end
   }
   r.Results( <- Result{Start: start, End: start.Add(time.Second, Err: fmt.Errorf("oops"}
   // 假设所有压测请求都执行完成了
   close(r.Results(
}(
// 主协程 汇总所有的处理结果,然后生成压测报告
stats := <-r.Stats(

以上代码中,r 是通过 NewReportSample("%f" 创建的一个 Report 对象。然后,在一个单独的协程中,执行了一系列的测试,并将测试结果发送到 r.Results( 通道中。

Report 对象的 results 通道中。通过使用 r.Results( 方法返回的通道,可以将测试结果发送到报告对象中进行统计和处理。

r.Results(方法返回的通道中读取数据,汇总所有的处理结果,从而生成压测报告。这个方法其实是被封装在r.Stas(方法中,具体如下:

func (r *report Stats( <-chan Stats {
    // 创建一个channel
   donec := make(chan Stats, 1
   // 启动一个协程来执行
   go func( {
      defer close(donec
      r.processResults(
      s := r.stats.copy(
      if r.sps != nil {
         s.TimeSeries = r.sps.getTimeSeries(
      }
      // 执行完成的话,将结果返回
      donec <- s
   }(
   // 返回channel
   return donec
}

// Stats方法启动的协程中,实际运行的任务
func (r *report processResults( {
   st := time.Now(
   // 遍历r.results方法中channel中的数据,然后执行处理流程
   for res := range r.results {
      r.processResult(&res
   }
   // 后续执行一些具体的计算逻辑
}

上述代码是 report 结构体中的两个方法,其中 Stats( 方法返回一个只读的 Stats 通道。这个方法会在一个单独的协程中执行,并处理 results 通道中的测试结果。事实上就是汇总channel中的数据,然后进行一定的处理,然后返回。

5. 总结

基于以上内容的介绍,大概能够明确下,在数据传递和汇总的场景下,使用channel来实现可能是更为合适的,能够提高代码的可读性和并发安全性。希望以上内容对你有所帮助。

编程笔记 » 协程并发下数据汇总:除了互斥锁,还有其他方式吗

赞同 (20) or 分享 (0)
游客 发表我的评论   换个身份
取消评论

表情
(0)个小伙伴在吐槽