醋醋百科网

Good Luck To You!

Go语言进阶:时间轮(golang时间轮)

时间轮概念

时间轮(Timing Wheel)是一种高效的定时任务调度数据结构,特别适合处理大量定时任务。它通过一个循环数组(轮盘)和多个槽位(buckets)来组织定时任务,每个槽位代表一个时间间隔。

核心思想

  • 循环数组:时间轮是一个环形结构,指针按固定时间间隔移动
  • 槽位(Buckets):每个槽位存储在该时间点需要执行的任务
  • 时间刻度:每个槽位代表一个时间单位(如1秒)
  • 高效操作:添加/删除定时任务的时间复杂度为 O(1)

优势

  1. 高效管理大量定时任务
  2. 避免频繁的系统调用
  3. 减少内存占用
  4. 适用于心跳检测、超时控制等场景

Go 实现的单级时间轮

// Task 定义定时任务结构
type Task struct {
	delay   time.Duration // 任务延迟执行的时间
	key     string        // 任务唯一标识
	job     func()        // 任务执行函数
	rounds  int           // 任务需要经历的轮次(用于处理长延迟任务)
	slotIdx int           // 任务所在槽位索引
	element *list.Element // 在链表中的位置(用于快速删除)
}

// TimeWheel 时间轮结构
type TimeWheel struct {
	tick         time.Duration    // 时间轮每次转动的时间间隔
	slotsNum     int              // 时间轮槽位数量
	slots        []*list.List     // 槽位数组,每个槽位是一个任务链表
	currentPos   int              // 当前指针位置
	ticker       *time.Ticker     // 定时触发器
	stopChan     chan struct{}    // 停止通道
	taskRegistry map[string]*Task // 任务注册表(用于快速查找任务)
	mutex        sync.Mutex       // 互斥锁,保证并发安全
	wg           sync.WaitGroup   // 等待组,用于优雅关闭
}

// NewTimeWheel 创建时间轮实例
func NewTimeWheel(tick time.Duration, slotsNum int) *TimeWheel {
	if tick < time.Millisecond {
		tick = time.Millisecond // 确保最小时间单位为毫秒
	}

	tw := &TimeWheel{
		tick:         tick,
		slotsNum:     slotsNum,
		slots:        make([]*list.List, slotsNum),
		currentPos:   0,
		stopChan:     make(chan struct{}),
		taskRegistry: make(map[string]*Task),
	}

	// 初始化每个槽位的链表
	for i := 0; i < slotsNum; i++ {
		tw.slots[i] = list.New()
	}

	return tw
}

// Start 启动时间轮
func (tw *TimeWheel) Start() {
	tw.ticker = time.NewTicker(tw.tick)
	tw.wg.Add(1)

	go func() {
		defer tw.wg.Done()

		for {
			select {
			case <-tw.ticker.C:
				tw.tickHandler() // 处理定时触发
			case <-tw.stopChan:
				tw.ticker.Stop()
				return
			}
		}
	}()
}

// Stop 停止时间轮
func (tw *TimeWheel) Stop() {
	close(tw.stopChan) // 发送停止信号
	tw.wg.Wait()       // 等待所有goroutine退出
}

// AddTask 添加定时任务
func (tw *TimeWheel) AddTask(key string, delay time.Duration, job func()) error {
	tw.mutex.Lock()
	defer tw.mutex.Unlock()

	// 检查任务是否已存在
	if _, exists := tw.taskRegistry[key]; exists {
		return fmt.Errorf("task with key '%s' already exists", key)
	}

	// 计算延迟对应的轮次和槽位
	delayTicks := int(delay / tw.tick)
	rounds := delayTicks / tw.slotsNum
	slotIdx := (tw.currentPos + delayTicks) % tw.slotsNum

	// 创建新任务
	task := &Task{
		delay:   delay,
		key:     key,
		job:     job,
		rounds:  rounds,
		slotIdx: slotIdx,
	}

	// 将任务添加到对应槽位
	element := tw.slots[slotIdx].PushBack(task)
	task.element = element

	// 注册任务
	tw.taskRegistry[key] = task

	return nil
}

// RemoveTask 移除定时任务
func (tw *TimeWheel) RemoveTask(key string) bool {
	tw.mutex.Lock()
	defer tw.mutex.Unlock()

	task, exists := tw.taskRegistry[key]
	if !exists {
		return false
	}

	// 从槽位链表中移除
	tw.slots[task.slotIdx].Remove(task.element)

	// 从注册表中移除
	delete(tw.taskRegistry, key)

	return true
}

// tickHandler 时间轮转动处理
func (tw *TimeWheel) tickHandler() {
	tw.mutex.Lock()
	defer tw.mutex.Unlock()

	// 获取当前槽位的任务链表
	currentList := tw.slots[tw.currentPos]

	// 处理当前槽位的所有任务
	var next *list.Element
	for e := currentList.Front(); e != nil; e = next {
		next = e.Next()
		task := e.Value.(*Task)

		// 如果任务轮次为0,执行任务
		if task.rounds == 0 {
			go task.job() // 异步执行任务
			currentList.Remove(e)
			delete(tw.taskRegistry, task.key)
		} else {
			// 减少轮次
			task.rounds--
		}
	}

	// 移动指针到下一个槽位
	tw.currentPos = (tw.currentPos + 1) % tw.slotsNum
}

// GetTaskCount 获取当前任务总数
func (tw *TimeWheel) GetTaskCount() int {
	tw.mutex.Lock()
	defer tw.mutex.Unlock()
	return len(tw.taskRegistry)
}

完整代码

package main

import (
	"container/list"
	"fmt"
	"sync"
	"time"
)

// Task 定义定时任务结构
type Task struct {
	delay   time.Duration // 任务延迟执行的时间
	key     string        // 任务唯一标识
	job     func()        // 任务执行函数
	rounds  int           // 任务需要经历的轮次(用于处理长延迟任务)
	slotIdx int           // 任务所在槽位索引
	element *list.Element // 在链表中的位置(用于快速删除)
}

// TimeWheel 时间轮结构
type TimeWheel struct {
	tick         time.Duration    // 时间轮每次转动的时间间隔
	slotsNum     int              // 时间轮槽位数量
	slots        []*list.List     // 槽位数组,每个槽位是一个任务链表
	currentPos   int              // 当前指针位置
	ticker       *time.Ticker     // 定时触发器
	stopChan     chan struct{}    // 停止通道
	taskRegistry map[string]*Task // 任务注册表(用于快速查找任务)
	mutex        sync.Mutex       // 互斥锁,保证并发安全
	wg           sync.WaitGroup   // 等待组,用于优雅关闭
}

// NewTimeWheel 创建时间轮实例
func NewTimeWheel(tick time.Duration, slotsNum int) *TimeWheel {
	if tick < time.Millisecond {
		tick = time.Millisecond // 确保最小时间单位为毫秒
	}

	tw := &TimeWheel{
		tick:         tick,
		slotsNum:     slotsNum,
		slots:        make([]*list.List, slotsNum),
		currentPos:   0,
		stopChan:     make(chan struct{}),
		taskRegistry: make(map[string]*Task),
	}

	// 初始化每个槽位的链表
	for i := 0; i < slotsNum; i++ {
		tw.slots[i] = list.New()
	}

	return tw
}

// Start 启动时间轮
func (tw *TimeWheel) Start() {
	tw.ticker = time.NewTicker(tw.tick)
	tw.wg.Add(1)

	go func() {
		defer tw.wg.Done()

		for {
			select {
			case <-tw.ticker.C:
				tw.tickHandler() // 处理定时触发
			case <-tw.stopChan:
				tw.ticker.Stop()
				return
			}
		}
	}()
}

// Stop 停止时间轮
func (tw *TimeWheel) Stop() {
	close(tw.stopChan) // 发送停止信号
	tw.wg.Wait()       // 等待所有goroutine退出
}

// AddTask 添加定时任务
func (tw *TimeWheel) AddTask(key string, delay time.Duration, job func()) error {
	tw.mutex.Lock()
	defer tw.mutex.Unlock()

	// 检查任务是否已存在
	if _, exists := tw.taskRegistry[key]; exists {
		return fmt.Errorf("task with key '%s' already exists", key)
	}

	// 计算延迟对应的轮次和槽位
	delayTicks := int(delay / tw.tick)
	rounds := delayTicks / tw.slotsNum
	slotIdx := (tw.currentPos + delayTicks) % tw.slotsNum

	// 创建新任务
	task := &Task{
		delay:   delay,
		key:     key,
		job:     job,
		rounds:  rounds,
		slotIdx: slotIdx,
	}

	// 将任务添加到对应槽位
	element := tw.slots[slotIdx].PushBack(task)
	task.element = element

	// 注册任务
	tw.taskRegistry[key] = task

	return nil
}

// RemoveTask 移除定时任务
func (tw *TimeWheel) RemoveTask(key string) bool {
	tw.mutex.Lock()
	defer tw.mutex.Unlock()

	task, exists := tw.taskRegistry[key]
	if !exists {
		return false
	}

	// 从槽位链表中移除
	tw.slots[task.slotIdx].Remove(task.element)

	// 从注册表中移除
	delete(tw.taskRegistry, key)

	return true
}

// tickHandler 时间轮转动处理
func (tw *TimeWheel) tickHandler() {
	tw.mutex.Lock()
	defer tw.mutex.Unlock()

	// 获取当前槽位的任务链表
	currentList := tw.slots[tw.currentPos]

	// 处理当前槽位的所有任务
	var next *list.Element
	for e := currentList.Front(); e != nil; e = next {
		next = e.Next()
		task := e.Value.(*Task)

		// 如果任务轮次为0,执行任务
		if task.rounds == 0 {
			go task.job() // 异步执行任务
			currentList.Remove(e)
			delete(tw.taskRegistry, task.key)
		} else {
			// 减少轮次
			task.rounds--
		}
	}

	// 移动指针到下一个槽位
	tw.currentPos = (tw.currentPos + 1) % tw.slotsNum
}

// GetTaskCount 获取当前任务总数
func (tw *TimeWheel) GetTaskCount() int {
	tw.mutex.Lock()
	defer tw.mutex.Unlock()
	return len(tw.taskRegistry)
}

func main() {
	// 创建时间轮:1秒转动一次,共60个槽位(可管理60秒内的任务)
	tw := NewTimeWheel(time.Second, 60)
	tw.Start()
	defer tw.Stop() // 确保程序退出时停止时间轮

	// 添加3秒后执行的任务
	tw.AddTask("task1", 3*time.Second, func() {
		fmt.Println("Task1 executed at", time.Now().Format("15:04:05"))
	})

	// 添加10秒后执行的任务
	tw.AddTask("task2", 10*time.Second, func() {
		fmt.Println("Task2 executed at", time.Now().Format("15:04:05"))
	})

	// 添加61秒后执行的任务(测试跨轮次)
	tw.AddTask("task3", 61*time.Second, func() {
		fmt.Println("Task3 executed at", time.Now().Format("15:04:05"))
	})

	fmt.Println("Time wheel started. Current tasks:", tw.GetTaskCount())

	// 模拟运行一段时间
	time.Sleep(5 * time.Second)
	fmt.Println("After 5 seconds, tasks left:", tw.GetTaskCount())

	// 移除task2
	if tw.RemoveTask("task2") {
		fmt.Println("Task2 removed")
	}

	// 等待所有任务完成
	time.Sleep(65 * time.Second)
}

欢迎关注,持续更新~

#如何学习go语言##Go语言##golang#

控制面板
您好,欢迎到访网站!
  查看权限
网站分类
最新留言