首页 最新 热门 推荐

  • 首页
  • 最新
  • 热门
  • 推荐

Java高并发利器:时间轮算法详解与实战

  • 25-04-24 20:41
  • 3700
  • 12397
juejin.cn

大家好,今天咱们聊聊一个在高并发场景下非常实用但又容易被忽视的技术——时间轮算法。如果你的系统需要处理大量的定时任务,但又被 Timer 或 ScheduledThreadPoolExecutor 的性能问题困扰,那这篇文章正是你需要的!

时间轮是个啥?

时间轮本质上是一个环形数组(或链表),每个槽位存储定时任务。有一个指针按固定频率旋转,指到哪个槽,就触发该槽里的所有任务。

时间轮结构
1
0
2
3
...
N-1
当前指针

时间轮的核心特点是:添加任务的时间复杂度为 O(1),触发任务的时间复杂度为 O(k)(k 为当前槽位的任务数)。平均情况下接近 O(1),最坏情况下为 O(n)(n 为总任务数)。相比之下,传统的 Timer 或 ScheduledThreadPoolExecutor 在任务量大时,性能会急剧下降。

时间轮的工作原理

单层时间轮

最简单的时间轮就是单层的,就像秒针一样,所有任务都放在一个环形数组里。

槽位1中的任务
单层时间轮
任务1
任务2
...
1
0
2
3
...
N-1
当前指针

举个例子:如果我们的时间轮有 60 个槽,每个槽代表 1 秒,当前指针为 20,如果要添加一个 30 秒后执行的任务,就应该放到槽(20+30)%60=50 的位置。

单层时间轮的延迟范围公式:最大延迟 = wheelSize × tickMs。例如 60 槽、每槽 1 秒的时间轮,最大延迟为 60 秒。

多层时间轮

为了解决长延时问题,我们可以用多层时间轮,就像钟表有时针、分针、秒针一样。

多层时间轮的工作方式:当内层时间轮(细粒度)转完一圈后,外层时间轮(粗粒度)前进一格。外层前进时,会把该格的任务重新插入内层时间轮。这种层级结构让我们能处理跨度很大的定时任务。

多层时间轮的总延迟范围计算公式:总延迟 = Σ(层 i 的粒度 × 层 i 的轮大小)。例如:

  • 第 1 层:1 秒/槽 × 60 槽 = 60 秒
  • 第 2 层:60 秒/槽 × 60 槽 = 3600 秒(1 小时)
  • 第 3 层:3600 秒/槽 × 24 槽 = 86400 秒(24 小时)

总计可支持延迟范围为:60 + 3600 + 86400 = 90060 秒(约 25 小时)

层间同步与任务级联

多层时间轮工作方式类似机械钟表的级联触发:

多层时间轮的核心是"层级递减"机制:外层时间轮负责粗粒度管理,当指针到达任务所在槽位时,任务会被"降级"到内层,直到最终到达秒级时间轮触发执行。

核心代码示例:

java
代码解读
复制代码
// 多层时间轮任务级联示例 class MultiLevelTimeWheel { private final TimeWheel secondWheel; // 秒级时间轮 private final TimeWheel minuteWheel; // 分级时间轮 private final TimeWheel hourWheel; // 时级时间轮 public MultiLevelTimeWheel() { // 初始化三层时间轮 secondWheel = new TimeWheel(60, 1000); // 60槽,每槽1秒 minuteWheel = new TimeWheel(60, 60000); // 60槽,每槽1分钟 hourWheel = new TimeWheel(24, 3600000); // 24槽,每槽1小时 // 设置级联关系 secondWheel.setOverflowHandler(task -> { minuteWheel.addTask(task); }); minuteWheel.setOverflowHandler(task -> { hourWheel.addTask(task); }); } public void addTask(TimerTask task, long delayMs) { // 根据延迟时间决定放入哪层时间轮 if (delayMs < 60000) { // 小于1分钟 secondWheel.addTask(task, delayMs); } else if (delayMs < 3600000) { // 小于1小时 minuteWheel.addTask(task, delayMs); } else { // 大于1小时 hourWheel.addTask(task, delayMs); } } // 推进时间(模拟时间流逝) public void advance() { // 秒轮前进一格 boolean secondRound = secondWheel.advance(); // 如果秒轮转完一圈,分轮前进一格 if (secondRound) { boolean minuteRound = minuteWheel.advance(); // 如果分轮转完一圈,时轮前进一格 if (minuteRound) { hourWheel.advance(); } } } } class TimeWheel { private final int wheelSize; private final long tickDuration; private int currentPosition = 0; private OverflowHandler overflowHandler; private final List> slots; public TimeWheel(int wheelSize, long tickDuration) { this.wheelSize = wheelSize; this.tickDuration = tickDuration; this.slots = new ArrayList<>(wheelSize); for (int i = 0; i < wheelSize; i++) { slots.add(new ArrayList<>()); } } // 当前轮任务溢出时的处理器(降级到上一层) interface OverflowHandler { void handle(TimerTask task); } public void setOverflowHandler(OverflowHandler handler) { this.overflowHandler = handler; } public void addTask(TimerTask task, long delayMs) { // 计算任务放在哪个槽位 long ticks = delayMs / tickDuration; int slotIndex = (currentPosition + (int)(ticks % wheelSize)) % wheelSize; // 计算剩余轮数 long rounds = ticks / wheelSize; task.setRemainingRounds(rounds); // 添加到槽位 slots.get(slotIndex).add(task); } // 前进时间轮,返回是否转完一圈 public boolean advance() { // 执行当前槽位到期任务 List currentTasks = slots.get(currentPosition); List remainingTasks = new ArrayList<>(); for (TimerTask task : currentTasks) { if (task.reduceRoundAndCheck()) { // 任务到期,执行 task.run(); } else { // 任务未到期,保留 remainingTasks.add(task); } } // 更新当前槽位任务 currentTasks.clear(); currentTasks.addAll(remainingTasks); // 指针前进 currentPosition = (currentPosition + 1) % wheelSize; // 如果回到起点,说明转完一圈 return currentPosition == 0; } } class TimerTask { private final Runnable task; private long remainingRounds; public TimerTask(Runnable task) { this.task = task; } public void setRemainingRounds(long rounds) { this.remainingRounds = rounds; } public boolean reduceRoundAndCheck() { if (remainingRounds <= 0) { return true; } remainingRounds--; return remainingRounds <= 0; } public void run() { task.run(); } }

层级参数设计公式

在设计多层时间轮时,层级参数的选择很关键。以 Kafka 为例,它使用指数增长的粒度:

scss
代码解读
复制代码
第n层粒度 = 第(n-1)层粒度 × 轮大小

对于轮大小固定为 20 的 Kafka 来说:

  • 第 1 层:1ms/槽 × 20 槽 = 20ms 范围
  • 第 2 层:20ms/槽 × 20 槽 = 400ms 范围
  • 第 3 层:400ms/槽 × 20 槽 = 8s 范围
  • 第 4 层:8s/槽 × 20 槽 = 160s 范围

如果你想设计自己的层级,可以用这个公式:

scss
代码解读
复制代码
总时间范围 = 单层范围 × (增长系数^层数 - 1) / (增长系数 - 1)

例如,要设计一个支持 1 天的时间轮,单层 60 格,增长系数 60:

  • 第 1 层:1 秒/槽 × 60 槽 = 60 秒
  • 第 2 层:60 秒/槽 × 60 槽 = 3600 秒(1 小时)
  • 第 3 层:3600 秒/槽 × 60 槽 = 216000 秒(60 小时)

那么我们只需要 3 层就能覆盖超过 1 天的时间范围。

业界成熟实现对比

Netty 和 Kafka 都实现了时间轮算法,但设计思路和侧重点不同:

特性Netty HashedWheelTimerKafka TimingWheel
层级设计单层(通过 remainingRounds 支持多轮)多层(层级粒度指数增长)
时间精度受 Tick 粒度限制第一层保持高精度,上层粗粒度
任务存储双向链表数组+链表
任务查找/取消O(1)(通过 timeout 对象直接引用)O(n)(需遍历查找)
内存占用相对较低随层数增加
并发控制单 Worker 线程模式读写锁
适用场景网络超时、连接管理延迟消息、日志清理

Netty 的 HashedWheelTimer

Netty 框架实现了一个很优秀的时间轮算法——HashedWheelTimer,它的核心思想是:

  1. 用数组存储时间轮的槽位
  2. 每个槽位维护一个任务链表
  3. 一个工作线程定期检查当前槽位的任务
  4. 对于超过一轮的任务,记录需要经过的轮数,每转一圈减一轮

Netty 源码关键部分剖析

Netty 的时间轮实现有几个巧妙的设计:

java
代码解读
复制代码
// HashedWheelTimer.java核心部分 // 1. 位运算优化:使用2的幂作为轮大小,用位运算代替取模 private static final int DEFAULT_WHEEL_SIZE = 512; // 必须是2的幂 private final int mask; // 用于位运算替代取模 public HashedWheelTimer() { // ... wheel = createWheel(wheelSize); mask = wheel.length - 1; // 掩码 = 轮大小-1 // ... } // 计算槽位时用位运算 int stopIndex = (int) (deadline & mask); // 2. 双向链表实现槽位,便于快速添加和删除 private static final class HashedWheelBucket { private HashedWheelTimeout head; private HashedWheelTimeout tail; public void addTimeout(HashedWheelTimeout timeout) { if (head == null) { head = tail = timeout; } else { tail.next = timeout; timeout.prev = tail; tail = timeout; } } public void remove(HashedWheelTimeout timeout) { if (timeout.prev != null) { timeout.prev.next = timeout.next; } if (timeout.next != null) { timeout.next.prev = timeout.prev; } if (timeout == head) { head = timeout.next; } if (timeout == tail) { tail = timeout.prev; } } } // 3. 单线程Worker模式,避免并发问题 private final class Worker implements Runnable { @Override public void run() { while (workerState.get() == WORKER_STATE_STARTED) { // 等待到下一个tick waitForNextTick(); // 处理当前槽位任务 int idx = (int) (tick & mask); processBucket(wheel[idx], tick); // 指针前进 tick++; } } }

这里的实现关键点:

  1. 使用位运算deadline & mask代替deadline % wheelSize,将时间复杂度从 O(1)进一步优化
  2. 用双向链表存储任务,添加/删除复杂度为 O(1),不需要像 ArrayList 那样在删除时移动元素
  3. 单线程 Worker 避免并发控制开销,整个时间轮只有一个工作线程处理任务触发

Kafka 的 TimingWheel 实现细节

Kafka 的TimingWheel使用了层级结构,每层的粒度依次增大:

java
代码解读
复制代码
// Kafka TimingWheel核心源码分析 // 1. 多层级设计 public class TimingWheel { private final long tickMs; // 每格时间间隔 private final int wheelSize; // 轮大小 private final long interval; // 整个轮的时间跨度 = tickMs * wheelSize private final TimerTaskList[] buckets; // 槽位数组 private final AtomicLong currentTime; // 当前时间 // 外层时间轮,粒度更大 private TimingWheel overflowWheel = null; // 2. 任务添加核心逻辑 public boolean add(TimerTaskEntry timerTaskEntry) { long expiration = timerTaskEntry.expirationMs; // 如果已过期,返回true让上层立即执行 if (expiration < currentTime.get() + tickMs) { return true; // 如果超出当前轮范围,添加到外层 } else if (expiration >= currentTime.get() + interval) { // 延迟初始化外层时间轮 if (overflowWheel == null) { overflowWheel = new TimingWheel( tickMs * wheelSize, wheelSize, currentTime.get() ); } return overflowWheel.add(timerTaskEntry); // 添加到当前轮的对应槽位 } else { int virtualId = (int) (expiration / tickMs); int bucketIndex = virtualId % wheelSize; buckets[bucketIndex].add(timerTaskEntry); return false; } } // 3. 推进时间轮并级联触发任务 public void advanceClock(long timeMs) { if (timeMs >= currentTime.get() + tickMs) { currentTime.set(timeMs - (timeMs % tickMs)); // 如果外层时间轮存在,也推进外层时间轮 if (overflowWheel != null) { overflowWheel.advanceClock(currentTime.get()); } } } }

Kafka 的设计亮点:

  1. 使用绝对时间计算槽位,避免了相对时间带来的精度问题
  2. 懒初始化外层时间轮,节省内存
  3. 级联时间推进,保证多层时间轮的同步

时间轮的应用场景

1. 网络连接超时处理

在处理大量网络连接时,每个连接都需要定时检查是否超时。比如说咱们在开发一个聊天服务器,需要检测客户端是否长时间没响应。

传统方案:

java
代码解读
复制代码
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(10); // 对每个连接设置超时检测 for (Connection conn : connections) { executor.schedule(() -> { if (conn.isIdle()) { conn.close(); } }, 30, TimeUnit.SECONDS); }

当连接数达到几万时,这种方式会导致任务队列变得超级大,性能直线下降。

时间轮方案:

java
代码解读
复制代码
HashedWheelTimer timer = new HashedWheelTimer(100, TimeUnit.MILLISECONDS, 512); // 对每个连接设置超时检测 for (Connection conn : connections) { timer.newTimeout(timeout -> { if (conn.isIdle()) { conn.close(); } }, 30, TimeUnit.SECONDS); }

无论有多少连接,时间轮算法的性能都很稳定,因为添加任务的复杂度是 O(1)。

2. 延迟消息队列

想象一下电商平台的订单系统:用户下单后 15 分钟如果没支付,就要自动取消订单。如何实现?

传统方案:用 Redis 的 zset 存储订单 ID 和过期时间,然后定期扫描。

java
代码解读
复制代码
// 添加订单超时任务 jedis.zadd("order_timeout", System.currentTimeMillis() + 15*60*1000, orderId); // 定期扫描过期订单 while (true) { Set expiredOrders = jedis.zrangeByScore("order_timeout", 0, System.currentTimeMillis()); for (String orderId : expiredOrders) { // 处理超时订单 processTimeoutOrder(orderId); jedis.zrem("order_timeout", orderId); } Thread.sleep(1000); }

当订单量很大时,扫描操作会越来越慢。

时间轮方案:直接用时间轮调度延迟任务,到时间就执行。

java
代码解读
复制代码
// 创建时间轮 TimingWheel orderTimeoutWheel = new TimingWheel<>(1, TimeUnit.SECONDS, 900); // 15分钟 // 添加订单超时任务 orderTimeoutWheel.addTask(orderId, 15 * 60, TimeUnit.SECONDS, this::processTimeoutOrder); // 时间轮会自动在到期时触发任务

时间轮方案避免了扫描操作,效率更高。

3. 限流器实现

假设我们的 API 网关需要对每个 API 实现精确的 QPS 限制。

传统方案:使用固定速率的令牌桶。

java
代码解读
复制代码
RateLimiter rateLimiter = RateLimiter.create(100.0); // 100 QPS public boolean allowRequest() { return rateLimiter.tryAcquire(); }

时间轮方案:使用时间轮实现更精确的令牌投放。

java
代码解读
复制代码
class TimeWheelRateLimiter { private final int capacity; private final AtomicInteger tokens; private final HashedWheelTimer timer; private final int refillRate; public TimeWheelRateLimiter(int capacity, int refillRate) { this.capacity = capacity; this.tokens = new AtomicInteger(capacity); this.refillRate = refillRate; this.timer = new HashedWheelTimer(100, TimeUnit.MILLISECONDS, 512); // 每秒添加指定数量的令牌 timer.newTimeout(this::refill, 1, TimeUnit.SECONDS); } private void refill(Timeout timeout) { int currentTokens = tokens.get(); if (currentTokens < capacity) { tokens.updateAndGet(current -> Math.min(current + refillRate, capacity)); } timer.newTimeout(this::refill, 1, TimeUnit.SECONDS); } public boolean tryAcquire() { return tokens.getAndUpdate(current -> current > 0 ? current - 1 : 0) > 0; } }

时间轮方案可以更灵活地控制令牌的投放节奏,适合实现复杂的限流策略。

4. 缓存过期策略

实现一个本地缓存,支持不同过期时间的缓存项,该怎么做?

传统方案:定期扫描所有缓存项,检查是否过期。

java
代码解读
复制代码
class SimpleCache { private final Map> cache = new ConcurrentHashMap<>(); public void put(K key, V value, long expireAfter, TimeUnit unit) { long expireTime = System.currentTimeMillis() + unit.toMillis(expireAfter); cache.put(key, new CacheItem<>(value, expireTime)); } public V get(K key) { CacheItem item = cache.get(key); if (item != null && !item.isExpired()) { return item.value; } else { cache.remove(key); return null; } } // 定期清理过期缓存 public void cleanUp() { Iterator>> it = cache.entrySet().iterator(); while (it.hasNext()) { if (it.next().getValue().isExpired()) { it.remove(); } } } static class CacheItem { final V value; final long expireTime; CacheItem(V value, long expireTime) { this.value = value; this.expireTime = expireTime; } boolean isExpired() { return System.currentTimeMillis() > expireTime; } } }

随着缓存项增多,定期扫描会消耗大量 CPU。

时间轮方案:使用时间轮在缓存项过期时自动删除。

java
代码解读
复制代码
class TimeWheelCache { private final Map cache = new ConcurrentHashMap<>(); private final HashedWheelTimer timer = new HashedWheelTimer(100, TimeUnit.MILLISECONDS, 512); public void put(K key, V value, long expireAfter, TimeUnit unit) { cache.put(key, value); timer.newTimeout(timeout -> cache.remove(key), expireAfter, unit); } public V get(K key) { return cache.get(key); } }

时间轮方案避免了全量扫描,只在缓存项真正过期时执行删除操作,效率更高。

开源框架中的时间轮应用

不同框架根据各自业务场景对时间轮进行了定制:

框架时间轮应用特点场景
NettyHashedWheelTimer单层轮+轮数记录,双向链表网络连接超时、空闲检测
KafkaTimingWheel多层时间轮,指数级增长消息延迟交付、日志分段删除
RocketMQTimerWheel分层时间轮+持久化延迟消息、事务消息回查
QuartzJobStore Timer数据库持久化+时间轮调度分布式任务调度
AkkaTimerWheel多层时间轮,Actor 模型分布式系统心跳检测

举个例子,RocketMQ 的延迟消息实现:

  1. 将延迟消息存储在特定的主题中
  2. 使用时间轮扫描到期消息
  3. 到期后投递到目标主题

ElasticJob 的分布式任务调度:

  1. 使用 ZooKeeper 存储任务元数据
  2. 使用时间轮进行本地任务调度
  3. 通过一致性哈希分片,确保任务均匀分布

实现一个简单时间轮

下面我们来实现一个简单的时间轮,增加了任务取消功能和更完善的参数校验:

java
代码解读
复制代码
import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; public class SimpleTimeWheel { // 时间轮大小 private final int wheelSize; // 每格时间间隔(毫秒) private final long tickMs; // 时间轮 private final List wheel; // 当前指针位置 private int currentTickIndex = 0; // 启动时间 private long startMs; // 任务执行线程池 private final ExecutorService executor; // 时间轮转动线程 private final Thread wheelThread; // 是否运行标志 private final AtomicBoolean running = new AtomicBoolean(false); // 任务总数 private long taskCount = 0; // 任务存储,用于取消 private final Map taskEntries = new ConcurrentHashMap<>(); public SimpleTimeWheel(int wheelSize, long tickMs, int threadPoolSize) { // 参数校验 if (wheelSize <= 0) { throw new IllegalArgumentException("Wheel size must be positive"); } if (tickMs <= 0) { throw new IllegalArgumentException("Tick duration must be positive"); } if (threadPoolSize <= 0) { throw new IllegalArgumentException("Thread pool size must be positive"); } this.wheelSize = wheelSize; this.tickMs = tickMs; this.wheel = new ArrayList<>(wheelSize); // 初始化时间轮槽位 for (int i = 0; i < wheelSize; i++) { wheel.add(new TaskList()); } // 初始化执行线程池 this.executor = Executors.newFixedThreadPool(threadPoolSize); // 初始化时间轮转动线程 this.wheelThread = new Thread(() -> { startMs = System.currentTimeMillis(); while (running.get()) { try { // 等待到下一个tick long sleepMs = tickMs - (System.currentTimeMillis() - startMs) % tickMs; Thread.sleep(sleepMs); // 推进时间轮 advance(); } catch (InterruptedException e) { break; } } }); } /** * 启动时间轮 */ public void start() { if (running.compareAndSet(false, true)) { wheelThread.start(); } } /** * 停止时间轮 */ public void stop() { if (running.compareAndSet(true, false)) { wheelThread.interrupt(); executor.shutdown(); try { if (!executor.awaitTermination(60, TimeUnit.SECONDS)) { executor.shutdownNow(); } } catch (InterruptedException e) { executor.shutdownNow(); } } } /** * 添加延迟任务 * * @param task 任务 * @param delayMs 延迟毫秒数 * @return 任务ID */ public long addTask(Runnable task, long delayMs) { if (delayMs < 0) { throw new IllegalArgumentException("Delay must be non-negative"); } // 计算需要的tick数 (使用向上取整确保最小延迟) long ticks = (delayMs + tickMs - 1) / tickMs; // 计算任务应该放在哪个槽位 int targetIndex = (currentTickIndex + (int)(ticks % wheelSize)) % wheelSize; // 计算需要经过多少轮 long rounds = ticks / wheelSize; // 创建定时任务 long taskId = taskCount++; TimerTask timerTask = new TimerTask(taskId, task, rounds); // 添加到对应槽位 wheel.get(targetIndex).addTask(timerTask); // 记录任务入口,用于取消 taskEntries.put(taskId, new TaskEntry(timerTask, wheel.get(targetIndex))); return taskId; } /** * 取消任务 * * @param taskId 任务ID * @return 是否成功取消 */ public boolean cancelTask(long taskId) { TaskEntry entry = taskEntries.remove(taskId); if (entry != null) { return entry.taskList.removeTask(entry.task); } return false; } /** * 推进时间轮 */ private void advance() { // 获取当前槽位 TaskList taskList = wheel.get(currentTickIndex); // 执行当前槽位中到期的任务 List expiredTasks = taskList.getAndRemoveExpiredTasks(); for (TimerTask task : expiredTasks) { // 移除任务记录 taskEntries.remove(task.getId()); executor.submit(task.getTask()); } // 指针前进 currentTickIndex = (currentTickIndex + 1) % wheelSize; } /** * 任务列表,每个槽位对应一个任务列表 */ private static class TaskList { private final List tasks = new ArrayList<>(); private final ReadWriteLock lock = new ReentrantReadWriteLock(); /** * 添加任务 */ public void addTask(TimerTask task) { lock.writeLock().lock(); try { tasks.add(task); } finally { lock.writeLock().unlock(); } } /** * 获取并移除到期的任务 */ public List getAndRemoveExpiredTasks() { List expiredTasks = new ArrayList<>(); List remainingTasks = new ArrayList<>(); lock.writeLock().lock(); try { for (TimerTask task : tasks) { if (task.reduceRoundAndCheck()) { expiredTasks.add(task); } else { remainingTasks.add(task); } } // 更新任务列表,只保留未到期的任务 tasks.clear(); tasks.addAll(remainingTasks); } finally { lock.writeLock().unlock(); } return expiredTasks; } /** * 移除指定任务 */ public boolean removeTask(TimerTask task) { lock.writeLock().lock(); try { return tasks.remove(task); } finally { lock.writeLock().unlock(); } } } /** * 定时任务 */ private static class TimerTask { private final long id; private final Runnable task; private long remainingRounds; public TimerTask(long id, Runnable task, long rounds) { this.id = id; this.task = task; this.remainingRounds = rounds; } public long getId() { return id; } public Runnable getTask() { return task; } /** * 减少剩余轮数并检查是否到期 * * @return 如果任务到期返回true,否则返回false */ public boolean reduceRoundAndCheck() { if (remainingRounds <= 0) { return true; } remainingRounds--; return remainingRounds <= 0; } @Override public boolean equals(Object obj) { if (this == obj) return true; if (obj == null || getClass() != obj.getClass()) return false; TimerTask that = (TimerTask) obj; return id == that.id; } @Override public int hashCode() { return Long.hashCode(id); } } /** * 任务入口,用于快速定位和取消任务 */ private static class TaskEntry { final TimerTask task; final TaskList taskList; TaskEntry(TimerTask task, TaskList taskList) { this.task = task; this.taskList = taskList; } } }

使用示例,包括任务取消:

java
代码解读
复制代码
public class TimeWheelDemo { public static void main(String[] args) throws InterruptedException { // 创建一个时间轮:60个槽位,每个槽位1秒,10个线程执行任务 SimpleTimeWheel timeWheel = new SimpleTimeWheel(60, 1000, 10); timeWheel.start(); System.out.println("时间轮启动..."); // 添加一些测试任务 long task1Id = timeWheel.addTask(() -> System.out.println("任务1执行,延迟3秒,时间: " + System.currentTimeMillis()), 3000); long task2Id = timeWheel.addTask(() -> System.out.println("任务2执行,延迟10秒,时间: " + System.currentTimeMillis()), 10000); long task3Id = timeWheel.addTask(() -> System.out.println("任务3执行,延迟5秒,时间: " + System.currentTimeMillis()), 5000); long task4Id = timeWheel.addTask(() -> System.out.println("任务4执行,延迟70秒,时间: " + System.currentTimeMillis()), 70000); // 取消任务2 Thread.sleep(2000); System.out.println("取消任务2: " + timeWheel.cancelTask(task2Id)); // 等待剩余任务执行完毕 Thread.sleep(80000); // 停止时间轮 timeWheel.stop(); System.out.println("时间轮停止"); } }

时间轮实现的关键设计点

1. 槽位冲突处理

当多个任务映射到同一槽位但实际过期时间不同时,如何处理?Netty 的 HashedWheelTimer 采用了两种机制:

  1. 剩余轮数(remainingRounds):记录任务还需要等待的轮数
  2. 双向链表:将同一槽位的所有任务组织成链表,便于快速添加和删除
槽位冲突处理
任务A: 剩余1轮
槽位
任务B: 剩余0轮
任务C: 剩余2轮

每次时间轮转动到该槽位时,只执行剩余轮数为 0 的任务。

2. 高效数据结构选择

数组 vs 链表对比

在时间轮实现中,任务存储结构的选择很关键。我们来对比下两种主流方案:

特性数组 (ArrayList)链表 (LinkedList)
添加任务O(1) (摊销)O(1)
删除任务O(n)O(1) (已知节点位置)
内存占用低(连续内存)高(额外指针)
缓存友好性高低
扩容开销需要不需要
适用场景查询多、删除少频繁添加删除

Netty 选择链表是因为:

  1. 频繁的任务取消操作需要 O(1)的删除时间
  2. 避免 ArrayList 扩容带来的性能抖动
  3. 任务量大时,删除耗时的差异更显著

双向链表的简化实现:

java
代码解读
复制代码
class TimerTaskList { private TimerTask head = null; private TimerTask tail = null; public void add(TimerTask task) { if (head == null) { head = tail = task; task.next = task.prev = null; } else { tail.next = task; task.prev = tail; task.next = null; tail = task; } } public boolean remove(TimerTask task) { if (task.prev != null) { task.prev.next = task.next; } else { head = task.next; } if (task.next != null) { task.next.prev = task.prev; } else { tail = task.prev; } task.next = task.prev = null; return true; } }

无锁化设计

高并发场景下,锁竞争会成为性能瓶颈。无锁化设计思路:

  1. 单线程触发模型:Netty 采用单 Worker 线程处理所有槽位任务
  2. CAS 操作:任务提交使用原子操作避免锁
  3. 多生产者单消费者队列:使用 MpscQueue 缓冲任务提交

无锁模型实现示例:

java
代码解读
复制代码
// 多生产者单消费者队列实现 class MpscTaskQueue { private final MpscLinkedQueue queue = new MpscLinkedQueue<>(); private final AtomicBoolean processing = new AtomicBoolean(false); private final ExecutorService executor; public MpscTaskQueue(ExecutorService executor) { this.executor = executor; } // 多线程安全的添加任务 public void add(Runnable task) { queue.offer(task); tryProcess(); } private void tryProcess() { // CAS尝试获取处理权 if (processing.compareAndSet(false, true)) { executor.execute(this::processQueue); } } private void processQueue() { try { Runnable task; while ((task = queue.poll()) != null) { try { task.run(); } catch (Throwable t) { // 处理异常 } } } finally { // 重置处理状态,允许下一轮处理 processing.set(false); // 如果队列不空,再次尝试处理 // (处理期间可能有新任务被添加) if (!queue.isEmpty()) { tryProcess(); } } } }

这种设计在高并发环境中可以避免锁争用,提高吞吐量。

分布式时间轮实现技术

在分布式系统中,单机时间轮无法满足高可用需求。分布式时间轮的关键技术点:

1. 任务路由与分片策略

分布式时间轮需要将任务分配到不同节点,常用的分片算法有:

  1. 哈希取模: 任务 ID % 节点数
  2. 一致性哈希: 任务 ID 映射到哈希环上的节点
  3. 可配置分片: 通过配置表指定任务到节点的映射

一致性哈希的优势在于当节点数变化时,只有少部分任务需要重新分配,避免了全量迁移。简化实现:

java
代码解读
复制代码
// 一致性哈希分片简化示例 class ConsistentHashRouter { private final TreeMap ring = new TreeMap<>(); private final int virtualNodesPerNode = 100; // 每个物理节点的虚拟节点数 public void addNode(String node) { for (int i = 0; i < virtualNodesPerNode; i++) { long hash = hash(node + "#" + i); ring.put(hash, node); } } public void removeNode(String node) { for (int i = 0; i < virtualNodesPerNode; i++) { long hash = hash(node + "#" + i); ring.remove(hash); } } public String getNode(String taskId) { if (ring.isEmpty()) return null; long hash = hash(taskId); Map.Entry entry = ring.ceilingEntry(hash); if (entry == null) { // 环回到第一个节点 entry = ring.firstEntry(); } return entry.getValue(); } private long hash(String key) { // 使用MurmurHash或其他快速哈希算法 return Math.abs(key.hashCode()); } }

2. 持久化方案选择

分布式环境下,任务持久化是保证可靠性的关键。不同持久化方案适用于不同场景:

存储方案优势劣势适用场景
Redis高性能,支持 TTL内存有限,持久化有代价高频小任务,短延迟
RocketMQ可靠性高,支持延迟消息精度有限(18 个级别)中等延迟任务
MySQL可靠性最高,支持复杂查询性能较低长延迟关键任务
TiKV线性扩展,一致性强部署复杂大规模分布式

Redis 方案示例:

java
代码解读
复制代码
// Redis持久化任务简化示例 class RedisTimeWheel { private final JedisPool jedisPool; private final String taskKey = "time:wheel:tasks"; public void addTask(String taskId, long delayMs) { long executeAt = System.currentTimeMillis() + delayMs; try (Jedis jedis = jedisPool.getResource()) { // 使用Sorted Set存储任务,分数为执行时间 jedis.zadd(taskKey, executeAt, taskId); } } public Set getExpiredTasks() { try (Jedis jedis = jedisPool.getResource()) { long now = System.currentTimeMillis(); return jedis.zrangeByScore(taskKey, 0, now); } } public void removeTask(String taskId) { try (Jedis jedis = jedisPool.getResource()) { jedis.zrem(taskKey, taskId); } } }

3. 分布式协调简化版

使用 ZooKeeper 进行节点协调与故障转移的简化示例:

java
代码解读
复制代码
// ZooKeeper协调简化示例 class ZookeeperCoordinator { private final ZooKeeper zk; private final String basePath = "/timewheel"; private final String nodesPath = basePath + "/nodes"; private final String nodeName; private final ConsistentHashRouter router; public ZookeeperCoordinator(String zkConnectString, String nodeName) throws Exception { this.nodeName = nodeName; this.zk = new ZooKeeper(zkConnectString, 3000, null); this.router = new ConsistentHashRouter(); // 注册节点 registerNode(); // 监听节点变化 watchNodes(); } private void registerNode() throws Exception { // 创建临时节点,节点断开连接时自动删除 zk.create(nodesPath + "/" + nodeName, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); } private void watchNodes() throws Exception { // 监视节点变化 List nodes = zk.getChildren(nodesPath, event -> { if (event.getType() == Watcher.Event.EventType.NodeChildrenChanged) { // 节点变化,重新加载路由表 try { updateRouter(); watchNodes(); // 继续监听 } catch (Exception e) { // 异常处理 } } }); // 更新路由表 updateRouter(); } private void updateRouter() throws Exception { List nodes = zk.getChildren(nodesPath, false); for (String node : nodes) { router.addNode(node); } } public String getNodeForTask(String taskId) { return router.getNode(taskId); } }

真实项目性能对比

我们在 8 核 16G 服务器上对不同实现进行了性能测试:

不同槽位数下性能对比
触发耗时(ms)
64槽: 280
128槽: 142
256槽: 75
512槽: 45
1024槽: 28
2048槽: 24

从压测数据可以得出几个关键结论:

  1. 数据结构选择:使用链表比数组在高频添加/删除场景下快约 13%
  2. 无锁 vs 加锁:无锁设计比锁方案快约 16%,且压力增大时差距更明显
  3. 槽位数量:512 槽位是添加性能的最佳平衡点
  4. 触发性能:槽位数与触发耗时成反比,槽位越多,单槽任务越少,触发越快

不同 Tick 粒度对 CPU 占用的影响:

Tick 粒度CPU 占用时间精度误差适用场景
1ms5-7%<1ms高频交易、实时游戏
10ms1-2%<10msAPI 超时控制、网络连接管理
100ms0.2-0.5%<100ms一般业务定时任务
1000ms<0.1%<1s日志清理、统计汇总

实践经验:在 10ms-100ms 的 tick 粒度下,可以获得较好的性能和精度平衡。

常见问题与解决方案

1. 时间精度问题

问题:时间轮的精度受 tick 大小限制,如果 tick 设置为 100ms,任务执行时间最多有 100ms 误差。

解决方案:

  • 使用更小的 tick 值提高精度(但会增加 CPU 占用)
  • 对要求高精度的任务,可以在任务触发时再检查实际时间,决定是否立即执行
java
代码解读
复制代码
timeWheel.addTask(() -> { long expectedTime = System.currentTimeMillis(); long actualTime = System.currentTimeMillis(); if (actualTime >= expectedTime) { // 立即执行 executeTask(); } else { // 精确等待剩余时间 try { Thread.sleep(expectedTime - actualTime); executeTask(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } }, delayMs);

2. 时间轮指针漂移问题

问题:长时间运行时,由于 System.currentTimeMillis()的精度和系统时钟调整,可能导致时间轮指针漂移。

解决方案:

  • 定期校准时间轮(如每小时同步一次系统时间)
  • 使用更稳定的时间源如 System.nanoTime()
  • 实现自适应 tick 调整,动态缩短或延长 tick 时间
java
代码解读
复制代码
// 自适应tick调整示例 long expectedTickTime = startTime + (tick * tickDuration); long currentTime = System.currentTimeMillis(); long drift = currentTime - expectedTickTime; // 如果漂移超过阈值,调整下一个tick时间 if (Math.abs(drift) > DRIFT_THRESHOLD) { nextTickDuration = tickDuration - drift; } else { nextTickDuration = tickDuration; }

3. 任务堆积问题

问题:如果某个槽位的任务太多或任务执行时间过长,可能导致任务堆积。

解决方案:

  • 使用更大的线程池
  • 实现任务优先级机制(如为任务队列添加优先级排序)
  • 监控每个槽位的任务数量,必要时进行告警或拒绝新任务
  • 分析业务模式,避免大量任务集中在同一时间点

4. 跨 JVM 时区问题

问题:在分布式环境下,不同 JVM 可能使用不同时区,导致任务触发时间不一致。

解决方案:

  • 统一使用 UTC 时间处理所有时间计算
  • 在任务定义中明确时区信息
  • 添加任务时进行时区转换,确保一致性
java
代码解读
复制代码
// 处理时区问题示例 ZonedDateTime targetTime = ZonedDateTime.now(ZoneId.of("Asia/Shanghai")) .plusMinutes(15); ZonedDateTime utcTime = targetTime.withZoneSameInstant(ZoneId.of("UTC")); long delayMs = ChronoUnit.MILLIS.between( ZonedDateTime.now(ZoneId.of("UTC")), utcTime ); timeWheel.addTask(task, delayMs);

时间轮与其他定时器对比

特性时间轮算法ScheduledThreadPoolDelayQueue
添加任务复杂度O(1)O(log n)O(log n)
触发任务复杂度O(k) (k 为当前槽任务数)O(1)O(1)
内存占用固定随任务增加而增加随任务增加而增加
时间精度受 tick 大小限制较高较高
大量任务性能非常好一般较差
实现复杂度较复杂简单简单
适用场景高并发,大量定时任务中等数量定时任务少量定时任务,优先级调度

DelayQueue基于优先队列实现,适合任务量较小但对优先级有要求的场景;ScheduledThreadPool是 JDK 内置解决方案,适合一般场景;时间轮算法则是高并发大量定时任务场景的首选。

总结

特性详细说明
时间复杂度添加任务:O(1),触发任务:O(k) (k 为当前槽位任务数)
适用场景高并发定时任务、网络超时管理、延迟消息队列、缓存过期管理
实现方式单层时间轮、多层时间轮
主要优势高效处理大量定时任务、内存占用稳定、性能随任务量增加优势明显
主要局限1. 时间精度受 tick 大小限制 2. 单层实现不适合跨度太大的任务 3. 实现复杂度高
常见实现Netty 的 HashedWheelTimer、Kafka 的 TimingWheel
优化方向1. 槽位数量选择 2. 多层实现 3. 任务执行策略 4. 并发控制

时间轮通过牺牲一定的时间精度换取极高的处理效率,在高并发场景下是传统定时器的理想替代品。选择合适的槽位数量和 tick 大小,结合业务特点进行优化,可以让时间轮在你的系统中发挥最大价值。

注:本文转载自juejin.cn的异常君的文章"https://juejin.cn/post/7495677243491008562"。版权归原作者所有,此博客不拥有其著作权,亦不承担相应法律责任。如有侵权,请联系我们删除。
复制链接
复制链接
相关推荐
发表评论
登录后才能发表评论和回复 注册

/ 登录

评论记录:

未查询到任何数据!
回复评论:

分类栏目

后端 (14832) 前端 (14280) 移动开发 (3760) 编程语言 (3851) Java (3904) Python (3298) 人工智能 (10119) AIGC (2810) 大数据 (3499) 数据库 (3945) 数据结构与算法 (3757) 音视频 (2669) 云原生 (3145) 云平台 (2965) 前沿技术 (2993) 开源 (2160) 小程序 (2860) 运维 (2533) 服务器 (2698) 操作系统 (2325) 硬件开发 (2491) 嵌入式 (2955) 微软技术 (2769) 软件工程 (2056) 测试 (2865) 网络空间安全 (2948) 网络与通信 (2797) 用户体验设计 (2592) 学习和成长 (2593) 搜索 (2744) 开发工具 (7108) 游戏 (2829) HarmonyOS (2935) 区块链 (2782) 数学 (3112) 3C硬件 (2759) 资讯 (2909) Android (4709) iOS (1850) 代码人生 (3043) 阅读 (2841)

热门文章

101
推荐
关于我们 隐私政策 免责声明 联系我们
Copyright © 2020-2025 蚁人论坛 (iYenn.com) All Rights Reserved.
Scroll to Top