1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
|
// Copyright 2010, Shuo Chen. All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
// Author: Shuo Chen (chenshuo at chenshuo dot com)
#define __STDC_LIMIT_MACROS
#include <muduo/net/TimerQueue.h>
#include <muduo/base/Logging.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/Timer.h>
#include <muduo/net/TimerId.h>
#include <boost/bind.hpp>
#include <sys/timerfd.h>
namespace muduo
{
namespace net
{
namespace detail
{
// 创建定时器
int createTimerfd()
{
int timerfd = ::timerfd_create(CLOCK_MONOTONIC,
TFD_NONBLOCK | TFD_CLOEXEC);
if (timerfd < 0)
{
LOG_SYSFATAL << "Failed in timerfd_create";
}
return timerfd;
}
// 计算超时时刻与当前时间的时间差
struct timespec howMuchTimeFromNow(Timestamp when)
{
int64_t microseconds = when.microSecondsSinceEpoch()
- Timestamp::now().microSecondsSinceEpoch();
if (microseconds < 100)
{
microseconds = 100;
}
struct timespec ts;
ts.tv_sec = static_cast<time_t>(
microseconds / Timestamp::kMicroSecondsPerSecond);
ts.tv_nsec = static_cast<long>(
(microseconds % Timestamp::kMicroSecondsPerSecond) * 1000);
return ts;
}
// 清除定时器,避免一直触发
void readTimerfd(int timerfd, Timestamp now)
{
uint64_t howmany;
ssize_t n = ::read(timerfd, &howmany, sizeof howmany);
LOG_TRACE << "TimerQueue::handleRead() " << howmany << " at " << now.toString();
if (n != sizeof howmany)
{
LOG_ERROR << "TimerQueue::handleRead() reads " << n << " bytes instead of 8";
}
}
// 重置定时器的超时时间
void resetTimerfd(int timerfd, Timestamp expiration)
{
// wake up loop by timerfd_settime()
struct itimerspec newValue;
struct itimerspec oldValue;
bzero(&newValue, sizeof newValue);
bzero(&oldValue, sizeof oldValue);
newValue.it_value = howMuchTimeFromNow(expiration);
int ret = ::timerfd_settime(timerfd, 0, &newValue, &oldValue);
if (ret)
{
LOG_SYSERR << "timerfd_settime()";
}
}
}
}
}
using namespace muduo;
using namespace muduo::net;
using namespace muduo::net::detail;
TimerQueue::TimerQueue(EventLoop* loop)
: loop_(loop),
timerfd_(createTimerfd()),
timerfdChannel_(loop, timerfd_),
timers_(),
callingExpiredTimers_(false)
{
timerfdChannel_.setReadCallback(
boost::bind(&TimerQueue::handleRead, this));
// we are always reading the timerfd, we disarm it with timerfd_settime.
/*把timerfd 加入到poller来关注*/
timerfdChannel_.enableReading();
}
TimerQueue::~TimerQueue()
{
::close(timerfd_);
// do not remove channel, since we're in EventLoop::dtor();
for (TimerList::iterator it = timers_.begin();
it != timers_.end(); ++it)
{
delete it->second;
}
}
/*增加一个定时器addTimer()----->addTimerInLoop()*/
TimerId TimerQueue::addTimer(const TimerCallback& cb,
Timestamp when,
double interval)
{
/**
注意:addTimer虽然是线程安全的,但是这里把安全实现的代码给注释掉了,所以以下的代码必须在所属
的EventLoop IO线程中调用
*/
Timer* timer = new Timer(cb, when, interval);
/*
loop_->runInLoop(
boost::bind(&TimerQueue::addTimerInLoop, this, timer));
*/
addTimerInLoop(timer);
return TimerId(timer, timer->sequence());
}
void TimerQueue::cancel(TimerId timerId)
{
/*
loop_->runInLoop(
boost::bind(&TimerQueue::cancelInLoop, this, timerId));
*/
cancelInLoop(timerId);
}
/*添加定时器到所属eventloop IO线程中*/
void TimerQueue::addTimerInLoop(Timer* timer)
{
loop_->assertInLoopThread();
// 插入一个定时器,有可能会使得最早到期的定时器发生改变
bool earliestChanged = insert(timer);
if (earliestChanged)
{
// 重置定时器的超时时刻(timerfd_settime)
resetTimerfd(timerfd_, timer->expiration());
}
}
/*取消某个timer*/
void TimerQueue::cancelInLoop(TimerId timerId)
{
loop_->assertInLoopThread();
assert(timers_.size() == activeTimers_.size());
ActiveTimer timer(timerId.timer_, timerId.sequence_);
// 查找该定时器
ActiveTimerSet::iterator it = activeTimers_.find(timer);
if (it != activeTimers_.end())
{
size_t n = timers_.erase(Entry(it->first->expiration(), it->first));
assert(n == 1); (void)n;
delete it->first; // FIXME: no delete please,如果用了unique_ptr,这里就不需要手动删除了
activeTimers_.erase(it);
}
else if (callingExpiredTimers_)
{
// 已经到期,并且正在调用回调函数的定时器 , 那么将其加到cancelingtimers中,
// 以便在其回调处理完后, 在reset(expired, now)里时无效,不需要重置
cancelingTimers_.insert(timer);
}
assert(timers_.size() == activeTimers_.size());
}
/*定时器触发的回调函数*/
void TimerQueue::handleRead()
{
loop_->assertInLoopThread();
Timestamp now(Timestamp::now());
readTimerfd(timerfd_, now); // 清除该事件,避免一直触发
// 获取该时刻之前所有的定时器列表(即超时定时器列表)
std::vector<Entry> expired = getExpired(now);
//已经处于处理到期定时器当中
callingExpiredTimers_ = true;
//清除上一次的超时定时器队列
cancelingTimers_.clear();
// safe to callback outside critical section
for (std::vector<Entry>::iterator it = expired.begin();
it != expired.end(); ++it)
{
// 这里回调定时器处理函数
it->second->run();
}
callingExpiredTimers_ = false;
// 不是一次性定时器,需要重启
reset(expired, now);
}
// rvo 优化
//返回已经超时的timers
// TimerQueue = 1,1,1,3,4,5,7,9
// 那么触发第一个1时,其实后面的2个1也被触发了
// timerQueue 只管队列中的第一个定时器
std::vector<TimerQueue::Entry> TimerQueue::getExpired(Timestamp now)
{
assert(timers_.size() == activeTimers_.size());
std::vector<Entry> expired;
Entry sentry(now, reinterpret_cast<Timer*>(UINTPTR_MAX));
// 返回第一个未到期的Timer的迭代器
// lower_bound的含义是返回第一个值>=sentry的元素的iterator
// 即*end >= sentry,从而end->first > now
TimerList::iterator end = timers_.lower_bound(sentry);
assert(end == timers_.end() || now < end->first);
// 将到期的定时器插入到expired中
std::copy(timers_.begin(), end, back_inserter(expired));
// 从timers_中移除到期的定时器
timers_.erase(timers_.begin(), end);
// 从activeTimers_中移除到期的定时器
for (std::vector<Entry>::iterator it = expired.begin();it != expired.end(); ++it)
{
ActiveTimer timer(it->second, it->second->sequence());
size_t n = activeTimers_.erase(timer);
assert(n == 1); (void)n;
}
assert(timers_.size() == activeTimers_.size());
return expired;
}
void TimerQueue::reset(const std::vector<Entry>& expired, Timestamp now)
{
Timestamp nextExpire;
for (std::vector<Entry>::const_iterator it = expired.begin();
it != expired.end(); ++it)
{
ActiveTimer timer(it->second, it->second->sequence());
// 如果是重复的定时器并且是未取消定时器(未被其他线程取消掉),则重启该定时器
// cancelingTimers_.find(timer) == cancelingTimers_.end() 在取消的队列中找到不到timer
if (it->second->repeat()&& cancelingTimers_.find(timer) == cancelingTimers_.end())
{
it->second->restart(now);
insert(it->second);
}
else
{
// 一次性定时器或者已被取消的定时器是不能重置的,因此删除该定时器
// FIXME move to a free list
delete it->second; // FIXME: no delete please
}
}
if (!timers_.empty())
{
// 获取最早到期的定时器超时时间
nextExpire = timers_.begin()->second->expiration();
}
if (nextExpire.valid())
{
// 重置定时器的超时时刻(timerfd_settime)
resetTimerfd(timerfd_, nextExpire);
}
}
bool TimerQueue::insert(Timer* timer)
{
loop_->assertInLoopThread();
assert(timers_.size() == activeTimers_.size());
// 最早到期时间是否改变
bool earliestChanged = false;
//获取传进来的定时器的超时时间
Timestamp when = timer->expiration();
TimerList::iterator it = timers_.begin();
// 如果timers_为空或者when小于timers_中的最早到期时间
if (it == timers_.end() || when < it->first)
{
earliestChanged = true;
}
{
// 插入到timers_中
std::pair<TimerList::iterator, bool> result
= timers_.insert(Entry(when, timer));
/*断言插入操作是否成功*/
assert(result.second); (void)result;
}
{
// 插入到activeTimers_中
std::pair<ActiveTimerSet::iterator, bool> result
= activeTimers_.insert(ActiveTimer(timer, timer->sequence()));
assert(result.second); (void)result;
}
//断言是否成功操作了
assert(timers_.size() == activeTimers_.size());
return earliestChanged;
}
|