Channel增强
# 1. 无线缓冲通道
# 1. 任意类型版本
package main
import (
"fmt"
"sync"
"time"
)
// InfiniteChannel 封装了无限缓冲的通道
type InfiniteChannel struct {
inputChan chan interface{}
outputChan chan interface{}
closeChan chan struct{}
wg sync.WaitGroup
}
// NewInfiniteChannel 创建并初始化一个InfiniteChannel
func NewInfiniteChannel() *InfiniteChannel {
ic := &InfiniteChannel{
inputChan: make(chan interface{}),
outputChan: make(chan interface{}),
closeChan: make(chan struct{}),
}
ic.wg.Add(1)
go ic.manageData()
return ic
}
// manageData 处理所有数据的内部缓冲和发送
func (ic *InfiniteChannel) manageData() {
defer ic.wg.Done()
buffer := make([]interface{}, 0)
for {
if len(buffer) > 0 {
select {
case ic.outputChan <- buffer[0]:
// 发送成功后,移除已发送的数据
buffer = buffer[1:]
case <-ic.closeChan:
return
}
}
select {
case newElement := <-ic.inputChan:
buffer = append(buffer, newElement)
case <-ic.closeChan:
return
}
}
}
// Send 向通道发送数据
func (ic *InfiniteChannel) Send(data interface{}) error {
select {
case ic.inputChan <- data:
return nil
case <-ic.closeChan:
return fmt.Errorf("channel is closed")
}
}
// Receive 从通道接收数据
func (ic *InfiniteChannel) Receive() (interface{}, error) {
select {
case data := <-ic.outputChan:
return data, nil
case <-ic.closeChan:
return nil, fmt.Errorf("channel is closed")
}
}
// Close 关闭通道,并等待内部处理完成
func (ic *InfiniteChannel) Close() {
close(ic.closeChan)
ic.wg.Wait()
}
// 示例代码
func main() {
infChan := NewInfiniteChannel()
// 生产者
go func() {
for i := 0; i < 10; i++ {
if err := infChan.Send(i); err != nil {
fmt.Println("Send Error:", err)
break
}
fmt.Println("Sent:", i)
time.Sleep(1 * time.Second)
}
infChan.Close()
}()
// 消费者
go func() {
for {
value, err := infChan.Receive()
if err != nil {
fmt.Println("Receive Error:", err)
break
}
fmt.Println("Received:", value)
time.Sleep(2 * time.Second)
}
}()
time.Sleep(20 * time.Second)
}
# 2. 泛型版本
package main
import (
"fmt"
"sync"
"time"
)
// InfiniteChannel 封装了无限缓冲的通道
type InfiniteChannel[T interface{}] struct {
inputChan chan T
outputChan chan T
closeChan chan struct{}
wg sync.WaitGroup
}
// NewInfiniteChannel 创建并初始化一个InfiniteChannel
func NewInfiniteChannel[T interface{}]() *InfiniteChannel[T] {
ic := &InfiniteChannel[T]{
inputChan: make(chan T),
outputChan: make(chan T),
closeChan: make(chan struct{}),
}
ic.wg.Add(1)
go ic.manageData()
return ic
}
// manageData 处理所有数据的内部缓冲和发送
func (ic *InfiniteChannel[T]) manageData() {
defer ic.wg.Done()
buffer := make([]T, 0)
for {
if len(buffer) > 0 {
select {
case ic.outputChan <- buffer[0]:
// 发送成功后,移除已发送的数据
buffer = buffer[1:]
case <-ic.closeChan:
return
}
}
select {
case newElement := <-ic.inputChan:
buffer = append(buffer, newElement)
case <-ic.closeChan:
return
}
}
}
// Send 向通道发送数据
func (ic *InfiniteChannel[T]) Send(data T) error {
select {
case ic.inputChan <- data:
return nil
case <-ic.closeChan:
return fmt.Errorf("channel is closed")
}
}
// Receive 从通道接收数据
func (ic *InfiniteChannel[T]) Receive() (interface{}, error) {
select {
case data := <-ic.outputChan:
return data, nil
case <-ic.closeChan:
return nil, fmt.Errorf("channel is closed")
}
}
// Close 关闭通道,并等待内部处理完成
func (ic *InfiniteChannel[T]) Close() {
close(ic.closeChan)
ic.wg.Wait()
}
func main() {
infChan := NewInfiniteChannel[int]()
// 生产者
go func() {
for i := 0; i < 10; i++ {
if err := infChan.Send(i); err != nil {
fmt.Println("Send Error:", err)
break
}
fmt.Println("Sent:", i)
time.Sleep(1 * time.Second)
}
infChan.Close()
}()
// 消费者
go func() {
for {
value, err := infChan.Receive()
if err != nil {
fmt.Println("Receive Error:", err)
break
}
fmt.Println("Received:", value)
time.Sleep(2 * time.Second)
}
}()
time.Sleep(20 * time.Second)
}
InfiniteChannel
是线程安全的
- 专用的通道操作:
使用了单独的 Go 通道(
inputChan
和outputChan
)来接收和发送数据。Go 通道本身是并发安全的,意味着它们在设计时就考虑到了多协程同时进行发送和接收操作的情况。这保证了在多个协程同时尝试发送或接收数据时,通道的内部状态不会发生冲突。
- 清晰的协程职责划分:
管理数据的协程 (
manageData
) 负责从 inputChan 读取数据到内部缓冲区,并从内部缓冲区向outputChan
输出数据。由于这一过程是在单个协程中完成的,因此在任何时刻都只有一个协程操作内部缓冲区,从而避免了并发修改的问题。
- 同步机制的使用:
使用
closeChan
和sync.WaitGroup
来控制通道的关闭和协程的终止。通过closeChan
,无论是发送还是接收操作,都可以安全地检查通道是否已经关闭,并在尝试进行进一步操作前优雅地退出。sync.WaitGroup
确保了在关闭通道时,所有在执行的数据处理都可以正确完成,避免在协程还在运行时程序就结束了。
- 防止数据竞争的方法:
Send
和Receive
方法使用了select
语句,这不仅可以处理正常的数据发送和接收,还可以同时监听closeChan
,这样一来,在通道关闭的情况下,可以立即停止操作并返回错误,避免了在关闭通道后还继续操作通道引起的竞争。
# 2. 缓冲缓冲区
type RingBuffer struct {
buffer []TimestampedValue
size int
start int
end int
full bool
}
func NewRingBuffer(size int) *RingBuffer {
return &RingBuffer{
buffer: make([]TimestampedValue, size),
size: size,
}
}
func (r *RingBuffer) Push(value TimestampedValue) {
r.buffer[r.end] = value
r.end = (r.end + 1) % r.size
if r.full {
r.start = (r.start + 1) % r.size
}
if r.end == r.start {
r.full = true
}
}
func (r *RingBuffer) FetchRecentData() []TimestampedValue {
recentData := make([]TimestampedValue, 0)
i := r.start
now := time.Now().Add(-time.Second)
for {
if !r.full && i == r.end {
break
}
if r.buffer[i].Timestamp.After(now) {
recentData = append(recentData, r.buffer[i])
}
i = (i + 1) % r.size
if i == r.start {
break
}
}
return recentData
}
- 环形缓冲区中的模运算示例
考虑一个容量为 size 的环形缓冲区,其中的索引需要在 0 到 size-1 的范围内循环。以下是模运算在环形缓冲区代码中的两个主要应用:
更新尾部索引(Pushing an element):
r.end = (r.end + 1) % r.size
这行代码用于在向缓冲区添加元素后更新尾部索引 r.end。当 r.end 加1后,模 r.size 的结果确保了如果 r.end 达到了数组的最大长度(即 r.size),它将被重置为0,从而在数组的末尾自动“回绕”到开头。这样就创建了一个循环的效果。
- 遍历环形缓冲区(Fetching data):
i = (i + 1) % r.size
在遍历缓冲区的元素时,此模运算同样保证索引 i 在到达数组末尾后会从头开始,即索引也是循环的。
# 3. OverwriteBuffer 覆盖 BufferChan
适用于高性能覆盖场景
- 消息队列
- 实时日志框架
- 网络丢弃机制
package main
import (
"fmt"
"sync"
"time"
)
type OverwriteBuffer struct {
data []interface{}
size int
start int
count int
lock sync.Mutex
}
func NewOverwriteBuffer(size int) *OverwriteBuffer {
return &OverwriteBuffer{
data: make([]interface{}, size),
size: size,
start: 0,
count: 0,
}
}
func (b *OverwriteBuffer) Put(item interface{}) {
b.lock.Lock()
defer b.lock.Unlock()
if b.count == b.size { // Buffer is full
// Overwrite the oldest item
b.start = (b.start + 1) % b.size
} else {
b.count++
}
index := (b.start + b.count - 1) % b.size
b.data[index] = item
}
func (b *OverwriteBuffer) Get() (interface{}, bool) {
b.lock.Lock()
defer b.lock.Unlock()
if b.count == 0 {
return nil, false
}
item := b.data[b.start]
b.start = (b.start + 1) % b.size
b.count--
return item, true
}
func main() {
buffer := NewOverwriteBuffer(5)
// Producer
go func() {
for i := 0; i < 10; i++ {
buffer.Put(i)
fmt.Println("Produced:", i)
time.Sleep(1 * time.Second)
}
}()
// Consumer
go func() {
for {
item, ok := buffer.Get()
if ok {
fmt.Println("Consumed:", item)
}
time.Sleep(2 * time.Second)
}
}()
time.Sleep(20 * time.Second)
}