时间轮概念
时间轮(Timing Wheel)是一种高效的定时任务调度数据结构,特别适合处理大量定时任务。它通过一个循环数组(轮盘)和多个槽位(buckets)来组织定时任务,每个槽位代表一个时间间隔。
核心思想
- 循环数组:时间轮是一个环形结构,指针按固定时间间隔移动
- 槽位(Buckets):每个槽位存储在该时间点需要执行的任务
- 时间刻度:每个槽位代表一个时间单位(如1秒)
- 高效操作:添加/删除定时任务的时间复杂度为 O(1)
优势
- 高效管理大量定时任务
- 避免频繁的系统调用
- 减少内存占用
- 适用于心跳检测、超时控制等场景
Go 实现的单级时间轮
// Task 定义定时任务结构
type Task struct {
delay time.Duration // 任务延迟执行的时间
key string // 任务唯一标识
job func() // 任务执行函数
rounds int // 任务需要经历的轮次(用于处理长延迟任务)
slotIdx int // 任务所在槽位索引
element *list.Element // 在链表中的位置(用于快速删除)
}
// TimeWheel 时间轮结构
type TimeWheel struct {
tick time.Duration // 时间轮每次转动的时间间隔
slotsNum int // 时间轮槽位数量
slots []*list.List // 槽位数组,每个槽位是一个任务链表
currentPos int // 当前指针位置
ticker *time.Ticker // 定时触发器
stopChan chan struct{} // 停止通道
taskRegistry map[string]*Task // 任务注册表(用于快速查找任务)
mutex sync.Mutex // 互斥锁,保证并发安全
wg sync.WaitGroup // 等待组,用于优雅关闭
}
// NewTimeWheel 创建时间轮实例
func NewTimeWheel(tick time.Duration, slotsNum int) *TimeWheel {
if tick < time.Millisecond {
tick = time.Millisecond // 确保最小时间单位为毫秒
}
tw := &TimeWheel{
tick: tick,
slotsNum: slotsNum,
slots: make([]*list.List, slotsNum),
currentPos: 0,
stopChan: make(chan struct{}),
taskRegistry: make(map[string]*Task),
}
// 初始化每个槽位的链表
for i := 0; i < slotsNum; i++ {
tw.slots[i] = list.New()
}
return tw
}
// Start 启动时间轮
func (tw *TimeWheel) Start() {
tw.ticker = time.NewTicker(tw.tick)
tw.wg.Add(1)
go func() {
defer tw.wg.Done()
for {
select {
case <-tw.ticker.C:
tw.tickHandler() // 处理定时触发
case <-tw.stopChan:
tw.ticker.Stop()
return
}
}
}()
}
// Stop 停止时间轮
func (tw *TimeWheel) Stop() {
close(tw.stopChan) // 发送停止信号
tw.wg.Wait() // 等待所有goroutine退出
}
// AddTask 添加定时任务
func (tw *TimeWheel) AddTask(key string, delay time.Duration, job func()) error {
tw.mutex.Lock()
defer tw.mutex.Unlock()
// 检查任务是否已存在
if _, exists := tw.taskRegistry[key]; exists {
return fmt.Errorf("task with key '%s' already exists", key)
}
// 计算延迟对应的轮次和槽位
delayTicks := int(delay / tw.tick)
rounds := delayTicks / tw.slotsNum
slotIdx := (tw.currentPos + delayTicks) % tw.slotsNum
// 创建新任务
task := &Task{
delay: delay,
key: key,
job: job,
rounds: rounds,
slotIdx: slotIdx,
}
// 将任务添加到对应槽位
element := tw.slots[slotIdx].PushBack(task)
task.element = element
// 注册任务
tw.taskRegistry[key] = task
return nil
}
// RemoveTask 移除定时任务
func (tw *TimeWheel) RemoveTask(key string) bool {
tw.mutex.Lock()
defer tw.mutex.Unlock()
task, exists := tw.taskRegistry[key]
if !exists {
return false
}
// 从槽位链表中移除
tw.slots[task.slotIdx].Remove(task.element)
// 从注册表中移除
delete(tw.taskRegistry, key)
return true
}
// tickHandler 时间轮转动处理
func (tw *TimeWheel) tickHandler() {
tw.mutex.Lock()
defer tw.mutex.Unlock()
// 获取当前槽位的任务链表
currentList := tw.slots[tw.currentPos]
// 处理当前槽位的所有任务
var next *list.Element
for e := currentList.Front(); e != nil; e = next {
next = e.Next()
task := e.Value.(*Task)
// 如果任务轮次为0,执行任务
if task.rounds == 0 {
go task.job() // 异步执行任务
currentList.Remove(e)
delete(tw.taskRegistry, task.key)
} else {
// 减少轮次
task.rounds--
}
}
// 移动指针到下一个槽位
tw.currentPos = (tw.currentPos + 1) % tw.slotsNum
}
// GetTaskCount 获取当前任务总数
func (tw *TimeWheel) GetTaskCount() int {
tw.mutex.Lock()
defer tw.mutex.Unlock()
return len(tw.taskRegistry)
}
完整代码
package main
import (
"container/list"
"fmt"
"sync"
"time"
)
// Task 定义定时任务结构
type Task struct {
delay time.Duration // 任务延迟执行的时间
key string // 任务唯一标识
job func() // 任务执行函数
rounds int // 任务需要经历的轮次(用于处理长延迟任务)
slotIdx int // 任务所在槽位索引
element *list.Element // 在链表中的位置(用于快速删除)
}
// TimeWheel 时间轮结构
type TimeWheel struct {
tick time.Duration // 时间轮每次转动的时间间隔
slotsNum int // 时间轮槽位数量
slots []*list.List // 槽位数组,每个槽位是一个任务链表
currentPos int // 当前指针位置
ticker *time.Ticker // 定时触发器
stopChan chan struct{} // 停止通道
taskRegistry map[string]*Task // 任务注册表(用于快速查找任务)
mutex sync.Mutex // 互斥锁,保证并发安全
wg sync.WaitGroup // 等待组,用于优雅关闭
}
// NewTimeWheel 创建时间轮实例
func NewTimeWheel(tick time.Duration, slotsNum int) *TimeWheel {
if tick < time.Millisecond {
tick = time.Millisecond // 确保最小时间单位为毫秒
}
tw := &TimeWheel{
tick: tick,
slotsNum: slotsNum,
slots: make([]*list.List, slotsNum),
currentPos: 0,
stopChan: make(chan struct{}),
taskRegistry: make(map[string]*Task),
}
// 初始化每个槽位的链表
for i := 0; i < slotsNum; i++ {
tw.slots[i] = list.New()
}
return tw
}
// Start 启动时间轮
func (tw *TimeWheel) Start() {
tw.ticker = time.NewTicker(tw.tick)
tw.wg.Add(1)
go func() {
defer tw.wg.Done()
for {
select {
case <-tw.ticker.C:
tw.tickHandler() // 处理定时触发
case <-tw.stopChan:
tw.ticker.Stop()
return
}
}
}()
}
// Stop 停止时间轮
func (tw *TimeWheel) Stop() {
close(tw.stopChan) // 发送停止信号
tw.wg.Wait() // 等待所有goroutine退出
}
// AddTask 添加定时任务
func (tw *TimeWheel) AddTask(key string, delay time.Duration, job func()) error {
tw.mutex.Lock()
defer tw.mutex.Unlock()
// 检查任务是否已存在
if _, exists := tw.taskRegistry[key]; exists {
return fmt.Errorf("task with key '%s' already exists", key)
}
// 计算延迟对应的轮次和槽位
delayTicks := int(delay / tw.tick)
rounds := delayTicks / tw.slotsNum
slotIdx := (tw.currentPos + delayTicks) % tw.slotsNum
// 创建新任务
task := &Task{
delay: delay,
key: key,
job: job,
rounds: rounds,
slotIdx: slotIdx,
}
// 将任务添加到对应槽位
element := tw.slots[slotIdx].PushBack(task)
task.element = element
// 注册任务
tw.taskRegistry[key] = task
return nil
}
// RemoveTask 移除定时任务
func (tw *TimeWheel) RemoveTask(key string) bool {
tw.mutex.Lock()
defer tw.mutex.Unlock()
task, exists := tw.taskRegistry[key]
if !exists {
return false
}
// 从槽位链表中移除
tw.slots[task.slotIdx].Remove(task.element)
// 从注册表中移除
delete(tw.taskRegistry, key)
return true
}
// tickHandler 时间轮转动处理
func (tw *TimeWheel) tickHandler() {
tw.mutex.Lock()
defer tw.mutex.Unlock()
// 获取当前槽位的任务链表
currentList := tw.slots[tw.currentPos]
// 处理当前槽位的所有任务
var next *list.Element
for e := currentList.Front(); e != nil; e = next {
next = e.Next()
task := e.Value.(*Task)
// 如果任务轮次为0,执行任务
if task.rounds == 0 {
go task.job() // 异步执行任务
currentList.Remove(e)
delete(tw.taskRegistry, task.key)
} else {
// 减少轮次
task.rounds--
}
}
// 移动指针到下一个槽位
tw.currentPos = (tw.currentPos + 1) % tw.slotsNum
}
// GetTaskCount 获取当前任务总数
func (tw *TimeWheel) GetTaskCount() int {
tw.mutex.Lock()
defer tw.mutex.Unlock()
return len(tw.taskRegistry)
}
func main() {
// 创建时间轮:1秒转动一次,共60个槽位(可管理60秒内的任务)
tw := NewTimeWheel(time.Second, 60)
tw.Start()
defer tw.Stop() // 确保程序退出时停止时间轮
// 添加3秒后执行的任务
tw.AddTask("task1", 3*time.Second, func() {
fmt.Println("Task1 executed at", time.Now().Format("15:04:05"))
})
// 添加10秒后执行的任务
tw.AddTask("task2", 10*time.Second, func() {
fmt.Println("Task2 executed at", time.Now().Format("15:04:05"))
})
// 添加61秒后执行的任务(测试跨轮次)
tw.AddTask("task3", 61*time.Second, func() {
fmt.Println("Task3 executed at", time.Now().Format("15:04:05"))
})
fmt.Println("Time wheel started. Current tasks:", tw.GetTaskCount())
// 模拟运行一段时间
time.Sleep(5 * time.Second)
fmt.Println("After 5 seconds, tasks left:", tw.GetTaskCount())
// 移除task2
if tw.RemoveTask("task2") {
fmt.Println("Task2 removed")
}
// 等待所有任务完成
time.Sleep(65 * time.Second)
}
欢迎关注,持续更新~