1. 简介
首先,通过一个实例,描述了一个并发拉取数据并汇总的案例,并使用互斥锁来确保线程安全。然后,讨论了互斥锁的一些缺点,引出了通道作为一种替代方案,并介绍了通道的基本使用和特性。接下来,通过实例演示了如何使用通道来实现并发下的数据汇总。
2. 问题引入
在请求处理过程中,经常需要通过RPC接口拉取数据。有时候,由于数据量较大,单个数据拉取操作可能会导致整个请求的处理时间较长。为了加快处理速度,我们通常考虑同时开启多个协程并发地拉取数据。一旦多个协程并发拉取数据后,主协程需要汇总这些协程拉取到的数据,然后再返回结果。在这个过程中,往往涉及对共享资源的并发访问,为了保证线程安全性,通常会使用互斥锁。下面通过一个简单的代码来展示该过程:
package main
import (
"fmt"
"sync"
"time"
type Data struct {
ID int
Name string
}
var (
// 汇总结果
dataList []Data
// 互斥锁
mutex sync.Mutex
func fetchData(page int, wg *sync.WaitGroup {
// 模拟RPC接口拉取数据的耗时操作
time.Sleep(time.Second
// 假设从RPC接口获取到了一批数据
data := Data{
ID: page,
Name: fmt.Sprintf("Data %d", page,
}
// 使用互斥锁保护共享数据的并发访问
mutex.Lock(
defer mutext.Unlock(
dataList = append(dataList, data
wg.Done(
}
func main( {
var wg sync.WaitGroup
// 定义需要拉取的数据页数
numPages := 10
// 启动多个协程并发地拉取数据
for i := 1; i <= numPages; i++ {
wg.Add(1
go fetchData(i, &wg
}
// 等待所有协程完成
wg.Wait(
// 打印拉取到的数据
fmt.Println("Fetched data:"
for _, data := range dataList {
fmt.Printf("ID: %d, Name: %s\n", data.ID, data.Name
}
}
在上述示例中,我们定义了一个共享的dataList
切片用于保存拉取到的数据。每个goroutine通过调用fetchData
函数来模拟拉取数据的过程,并使用互斥锁mutex
保护dataList
的并发访问。主协程使用sync.WaitGroup
等待所有协程完成数据拉取任务,然后打印出拉取到的数据。通过并发地拉取数据,并使用互斥锁保证线程安全,我们可以显著提高数据拉取的速度,并且确保数据的正确性和一致性。
那我们其实就有疑问,在协程并发下数据汇总的场景,是否存在其他方式,不需要通过使用互斥锁,也能够保证线程安全呢? 其实还真有,Go
语言中的channel
非常适用于这种情况。通过使用通道,我们可以实现线程安全的数据共享和同步,而无需显式地使用互斥锁。下面我们来了解一下channel
。
3. channel的使用
3.1 channel的基本介绍
3.1.1 基本说明
channel在Go语言中是一种特殊的数据结构,用于协程之间的通信和同步。它类似于一个先进先出(FIFO的队列,用于数据的传输和共享。在并发环境中,可以将数据发送到通道,也可以从通道中接收数据,而这两个操作都是线程安全的。
channel的优势在于它提供了内置的同步机制,无需显式地使用互斥锁来处理并发访问。
同时,当多个协程同时访问通道时,Go运行时系统会自动处理协程之间的同步和并发访问的细节,保证数据的正确性和一致性。从而可以放心地在多个协程中使用通道进行数据的发送和接收操作,而不需要额外的锁或同步机制来保证线程安全。
channel其实是可以避免常见的并发问题,如竞态条件和死锁,简化了并发编程的复杂性。
3.1.2 基本使用
channel的基本介绍,我们已经对channel
有了基本的了解,其实可以粗略理解其为一个并发安全的队列。下面来了解下channel
的基本语法,从而能够开始使用channel
。
channel,发送数据到channel
,接收channel
中的数据,以及关闭channel
。下面对其进行简单展示:
int、string
等:
ch := make(chan int
发送数据到channel:使用<-
操作符将数据发送到通道中
ch <- data
接收channel中的数据: 使用<-
操作符从通道中接收数据
result := <-ch
关闭channel,使用close
函数关闭通道。关闭通道后,仍然可以从通道接收数据,但无法再向通道发送数据
close(ch
通过上面channel
的四个基本操作,便能够实现在不同协程间线程安全得传递数据。最后通过一个例子,完整得展示channel
的基本使用。
package main
import "fmt"
func main( {
ch := make(chan string // 创建字符串通道
defer close(ch
go func( {
ch <- "hello, channel!" // 发送数据到通道
}(
result := <-ch // 从通道接收数据
fmt.Println(result
}
在这个示例中,我们创建了一个字符串通道ch
。然后,在一个单独的协程中,我们向通道发送了字符串"hello, channel!"。最后,主协程从通道中接收数据,并将其打印出来。
3.2 使用channel实现汇总数据
下面,我们使用channel
来实现并发数据汇总,替换掉之前使用互斥锁来保证线程安全的实现:
package main
import (
"fmt"
"sync"
"time"
type Data struct {
ID int
Name string
}
func fetchData(page int, ch chan Data, wg *sync.WaitGroup {
// 模拟 RPC 接口拉取数据的耗时操作
time.Sleep(time.Second
// 假设从 RPC 接口获取到了一批数据
data := Data{
ID: page,
Name: fmt.Sprintf("Data %d", page,
}
ch <- data // 将数据发送到通道
wg.Done(
}
func main( {
var wg sync.WaitGroup
// 定义需要拉取的数据页数
numPages := 10
dataCh := make(chan Data, 10 // 创建用于接收数据的通道
// 启动多个协程并发地拉取数据
for i := 1; i <= numPages; i++ {
wg.Add(1
go fetchData(i, dataCh, &wg
}
go func( {
wg.Wait(
close(dataCh // 关闭通道,表示数据已经全部发送完成
}(
// 从通道接收数据并汇总
var dataList []Data
for data := range dataCh {
dataList = append(dataList, data
}
// 打印拉取到的数据
fmt.Println("Fetched data:"
for _, data := range dataList {
fmt.Printf("ID: %d, Name: %s\n", data.ID, data.Name
}
}
在修改后的代码中,我们创建了一个用于接收数据的 dataCh
。每个协程通过将数据发送到该channel
来完成数据的汇总。主协程通过从channel
接收数据,并将其添加到 dataList
中实现数据的汇总过程。这种方式不需要显式地加锁和解锁,并且避免了互斥锁带来的复杂性和性能问题。
channel,我们能够以一种更直观、更安全的方式实现协程之间的数据传递和同步。channel
在并发编程中起到了关键的作用,简化了并发操作的管理和实现。同时,它提供了内置的同步机制,保证了数据的正确性和一致性,避免了死锁和竞态条件的问题。
3.3 总结
在原始的实现中,使用了互斥锁来保护共享数据的并发访问。互斥锁提供了互斥访问的机制,确保同一时间只有一个协程可以访问共享数据,从而避免了数据竞争和不一致性。这种方式在保证线程安全的同时,引入了锁的开销和复杂性。
channel来实现协程间的安全数据传递可以更简洁和高效。每个协程可以将拉取到的数据通过channel
发送到主协程,主协程通过接收channel
中的数据来进行汇总。channel
提供了并发安全的数据传递机制,协程之间的数据传输是同步和有序的。由于channel
本身就提供了同步机制,不需要额外的锁和同步操作,能够更简洁地实现协程间的安全数据传递。
channel来实现是相对比较合适的。
4. 开源项目中的使用
etcd进行性能测试,此时需要模拟大量并发请求,对etcd
进行负载测试,并收集每个请求的执行时间、成功/失败状态等结果数据。然后主协程需要收集每一个请求的结果数据,并进行统计计算,生成相应的性能报告。基于此,能够计算出总请求数、请求成功率、平均执行时间、最慢/最快请求等统计信息,以及错误分布情况和慢速请求的详细信息。
channel来实现协程间的数据传输,是非常合适的。
etcd中对应的实现。etcd
中存在一个report
对象的实现,能够接受一系列的请求数据的结果,然后生成性能报告返回回去。结构体定义如下:
type report struct {
results chan Result
stats Stats
}
func (r *report Results( chan<- Result { return r.results }
// Result describes the timings for an operation.
type Result struct {
Start time.Time
End time.Time
Err error
}
func newReport(precision string *report {
r := &report{
results: make(chan Result, 16,
}
return r
}
Result
结构体为单个测试的结果,而report
结构体则用于整个测试过程的报告和统计信息。通过使用results
通道,可以将每个测试的结果发送到report
结构体中,以便进行统计和生成报告。newReport生成一个
report
对象,然后启动多个协程同时进行压测请求,每一个请求处理完成之后,便会生成一个处理结果,存储到Result
对象当中。然后基于report
对象的Results
方法获取到对应的channel
,将处理结果传输给主协程。report对象中的
results
变量对应的channel
,汇总计算所有处理结果,基于此便能够生成压测结果和报告。下面来看其具体流程。report对象,然后启动多个协程来处理请求,将结果发送到
report
对象中的results
对应的channel
中。// 这里NewReportSample方法,其实是对上面newReport方法的一个封装 r := NewReportSample("%f" // 这里假设只有一个协程,模拟执行一系列的测试,并将测试结果发送到 Report 对象的 results 通道中。 go func( { start := time.Now( for i := 0; i < 5; i++ { // 不真实进行请求,只是简单获取执行结果,将测试结果进行传输 end := start.Add(time.Second r.Results( <- Result{Start: start, End: end} start = end } r.Results( <- Result{Start: start, End: start.Add(time.Second, Err: fmt.Errorf("oops"} // 假设所有压测请求都执行完成了 close(r.Results( }( // 主协程 汇总所有的处理结果,然后生成压测报告 stats := <-r.Stats(
以上代码中,
r
是通过NewReportSample("%f"
创建的一个Report
对象。然后,在一个单独的协程中,执行了一系列的测试,并将测试结果发送到r.Results(
通道中。Report 对象的
results
通道中。通过使用r.Results(
方法返回的通道,可以将测试结果发送到报告对象中进行统计和处理。r.Results(方法返回的通道中读取数据,汇总所有的处理结果,从而生成压测报告。这个方法其实是被封装在
r.Stas(
方法中,具体如下:func (r *report Stats( <-chan Stats { // 创建一个channel donec := make(chan Stats, 1 // 启动一个协程来执行 go func( { defer close(donec r.processResults( s := r.stats.copy( if r.sps != nil { s.TimeSeries = r.sps.getTimeSeries( } // 执行完成的话,将结果返回 donec <- s }( // 返回channel return donec } // Stats方法启动的协程中,实际运行的任务 func (r *report processResults( { st := time.Now( // 遍历r.results方法中channel中的数据,然后执行处理流程 for res := range r.results { r.processResult(&res } // 后续执行一些具体的计算逻辑 }
上述代码是
report
结构体中的两个方法,其中Stats(
方法返回一个只读的Stats
通道。这个方法会在一个单独的协程中执行,并处理results
通道中的测试结果。事实上就是汇总channel
中的数据,然后进行一定的处理,然后返回。5. 总结
基于以上内容的介绍,大概能够明确下,在数据传递和汇总的场景下,使用
channel
来实现可能是更为合适的,能够提高代码的可读性和并发安全性。希望以上内容对你有所帮助。