[25] muduo_net库源码分析

TCP网络编程本质

TCP网络编程最本质是的处理三个半事件

  • 连接建立:服务器accept(被动)接受连接,客户端connect(主动)发起连接
  • 连接断开:主动断开(close、shutdown),被动断开(read返回0)
  • 消息到达:文件描述符可读
  • 消息发送完毕:这算半个。对于低流量的服务,可不必关心这个事件;这里的发送完毕是指数据写入操作系统缓冲区,将由TCP协议栈负责数据的发送与重传,不代表对方已经接收到数据。

EchoServer类图

什么都不做的EventLoop

  • one loop per thread

意思是说每个线程最多只能有一个EventLoop对象。

  • EventLoop对象构造的时候,会检查当前线程是否已经创建了其他EventLoop对象,如果已创建,终止程序(LOG_FATAL)EventLoop构造函数会记住本对象所属线程(threadId_)。
  • 创建了EventLoop对象的线程称为IO线程,其功能是运行事件循环(EventLoop::loop)

EventLoop 头文件

EventLoop.h

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
  // Copyright 2010, Shuo Chen.  All rights reserved.
  // http://code.google.com/p/muduo/
  //
  // Use of this source code is governed by a BSD-style license
  // that can be found in the License file.
  
  // Author: Shuo Chen (chenshuo at chenshuo dot com)
  //
  // This is a public header file, it must only include public header files.
  
  #ifndef MUDUO_NET_EVENTLOOP_H
  #define MUDUO_NET_EVENTLOOP_H
  
  #include <boost/noncopyable.hpp>
  
  #include <muduo/base/CurrentThread.h>
  #include <muduo/base/Thread.h>
  
  namespace muduo
  {
  namespace net
  {
  
  ///
  /// Reactor, at most one per thread.
  ///
  /// This is an interface class, so don't expose too much details.
  class EventLoop : boost::noncopyable
  {
  public:
    EventLoop();
    ~EventLoop();  // force out-line dtor, for scoped_ptr members.
  
    ///
    /// Loops forever.
    ///
    /// Must be called in the same thread as creation of the object.
    ///
    void loop();
  
    //断言是否是当前的线程
  
    void assertInLoopThread()
    {
        //过是当前线程,直接跳过,否则调用abortNotInLoopThread
      if (!isInLoopThread())
      {
        abortNotInLoopThread();
      }
    }
    //测试是否为当前线程
    bool isInLoopThread() const { return threadId_ == CurrentThread::tid(); }
  
    static EventLoop* getEventLoopOfCurrentThread();
  
  private:
    void abortNotInLoopThread();
  
    bool looping_; /* atomic , 是否处于事件循环*/
    const pid_t threadId_;        // 当前对象所属线程ID
  };
  
  }
  }
  #endif  // MUDUO_NET_EVENTLOOP_H

### EventLoop 源文件

EventLoop.cpp

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
  // Copyright 2010, Shuo Chen.  All rights reserved.
  // http://code.google.com/p/muduo/
  //
  // Use of this source code is governed by a BSD-style license
  // that can be found in the License file.
  
  // Author: Shuo Chen (chenshuo at chenshuo dot com)
  
  #include <muduo/net/EventLoop.h>
  
  #include <muduo/base/Logging.h>
  
  #include <poll.h>
  
  using namespace muduo;
  using namespace muduo::net;
  
  namespace
  {
  // 当前线程EventLoop对象指针
  // 线程局部存储 ,如果不用__thread修饰的话,那么就是线程共共享的了
  //这样达不到目标,。。
  //初始化时,设为0就OK了
  __thread EventLoop* t_loopInThisThread = 0;
  }
  
  EventLoop* EventLoop::getEventLoopOfCurrentThread()
  {
    return t_loopInThisThread;
  }
  
  EventLoop::EventLoop()
    : looping_(false), //初始化为false ,表示当前还没有处于事件循环状态
      threadId_(CurrentThread::tid()) //当前的线程Id ,用于标识
  {
    LOG_TRACE << "EventLoop created " << this << " in thread " << threadId_;
    // 如果当前线程已经创建了EventLoop对象,终止(LOG_FATAL),否者的话,设为当前线程this
    if (t_loopInThisThread)
    {
      LOG_FATAL << "Another EventLoop " << t_loopInThisThread
                << " exists in this thread " << threadId_;
    }
    else
    {
      t_loopInThisThread = this;
    }
  }
  
  EventLoop::~EventLoop()
  {
    t_loopInThisThread = NULL;
  }
  
  // 事件循环,该函数不能跨线程调用
  // 只能在创建该对象的线程中调用
  void EventLoop::loop()
  {
  //断言还没有事件循环
    assert(!looping_);
    // 断言当前处于创建该对象的线程中
    assertInLoopThread();
    //把事件循环标识设为true
    looping_ = true;
    LOG_TRACE << "EventLoop " << this << " start looping";
    //关注事件为NULL,个数为0,延时5000
    ::poll(NULL, 0, 5*1000);
  
    LOG_TRACE << "EventLoop " << this << " stop looping";
    //事件循环标识设为false ,这只是测试程序
    looping_ = false;
  }
  
  //终止程序
  void EventLoop::abortNotInLoopThread()
  {
    LOG_FATAL << "EventLoop::abortNotInLoopThread - EventLoop " << this
              << " was created in threadId_ = " << threadId_
              << ", current thread id = " <<  CurrentThread::tid();
  }

EventLoop 的测试程序1

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
#include <muduo/net/EventLoop.h>
 
#include <stdio.h>
 
using namespace muduo;
using namespace muduo::net;
 
/*
该程序 主要是用来测试EventLoop 是否是 一个Thread一个EventLoop
**/
void threadFunc()
{
    printf("threadFunc(): pid = %d, tid = %d\n",
        getpid(), CurrentThread::tid());
 
    EventLoop loop;
    loop.loop();
}
 
int main(void)
{
    printf("main(): pid = %d, tid = %d\n",
        getpid(), CurrentThread::tid());
 
    EventLoop loop;
 
    Thread t(threadFunc);
    t.start();
 
    loop.loop();
    t.join();
    return 0;
}

程序输出

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
[root@localhost bin]# ./reactor_test01
main(): pid = 2724, tid = 2724
20131018 04:12:55.305234Z  2724 TRACE EventLoop EventLoop created 0xBFA6B2C0 in thread 2724 - EventLoop.cc:36
20131018 04:12:55.305599Z  2724 TRACE loop EventLoop 0xBFA6B2C0 start looping - EventLoop.cc:62
threadFunc(): pid = 2724, tid = 2725
20131018 04:12:55.305792Z  2725 TRACE EventLoop EventLoop created 0xB77E0068 in thread 2725 - EventLoop.cc:36
20131018 04:12:55.305809Z  2725 TRACE loop EventLoop 0xB77E0068 start looping - EventLoop.cc:62
20131018 04:13:00.321713Z  2724 TRACE loop EventLoop 0xBFA6B2C0 stop looping - EventLoop.cc:66
20131018 04:13:00.321779Z  2725 TRACE loop EventLoop 0xB77E0068 stop looping - EventLoop.cc:66
[root@localhost bin]#

测试程序2

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#include <muduo/net/EventLoop.h>
 
#include <stdio.h>
/**
该程序主要用来测试,多个线程使用同一个eventloop时,程序将会错误终止!!
**/
using namespace muduo;
using namespace muduo::net;
 
EventLoop* g_loop;
 
void threadFunc()
{
    g_loop->loop();
}
 
int main(void)
{
    EventLoop loop;
    g_loop = &loop;
    Thread t(threadFunc);
    t.start();
    t.join();
    return 0;
}

程序输出

1
2
3
4
5
[root@localhost bin]# ./reactor_test02
20131018 04:15:09.010234Z  2730 TRACE EventLoop EventLoop created 0xBFD53730 in thread 2730 - EventLoop.cc:36
20131018 04:15:09.010768Z  2731 FATAL EventLoop::abortNotInLoopThread - EventLoop 0xBFD53730 was created in threadId_ = 2730, current thread id = 2731 - EventLoop.cc:72
Aborted
[root@localhost bin]#

[27] EpollPoller

EpollPoller的头文件

epollpller.h

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
// Copyright 2010, Shuo Chen.  All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
 
// Author: Shuo Chen (chenshuo at chenshuo dot com)
//
// This is an internal header file, you should not include this.
 
#ifndef MUDUO_NET_POLLER_EPOLLPOLLER_H
#define MUDUO_NET_POLLER_EPOLLPOLLER_H
 
#include <muduo/net/Poller.h>
 
#include <map>
#include <vector>
 
struct epoll_event;
 
namespace muduo
{
namespace net
{
 
///
/// IO Multiplexing with epoll(4).
///
class EPollPoller : public Poller
{
 public:
  EPollPoller(EventLoop* loop);
  virtual ~EPollPoller();
  // timeoutMs 超时事件
  // activeChannels活动通道
  virtual Timestamp poll(int timeoutMs, ChannelList* activeChannels);
  //更新通道
  virtual void updateChannel(Channel* channel);
  //移除通道
  virtual void removeChannel(Channel* channel);
 
 private:
  // EventListd的初始值
  static const int kInitEventListSize = 16;
 
  void fillActiveChannels(int numEvents,
                          ChannelList* activeChannels) const;
  //更新
  void update(int operation, Channel* channel);
 
  typedef std::vector<struct epoll_event> EventList;
 
  typedef std::map<int, Channel*> ChannelMap;
 
  //文件描述符 = epoll_create1(EPOLL_CLOEXEC),用来表示要关注事件的fd的集合的描述符
  int epollfd_;
  // epoll_wait返回的活动的通道channelList
  EventList events_;
  //通道map
  ChannelMap channels_;
};
 
}
}
#endif  // MUDUO_NET_POLLER_EPOLLPOLLER_H

EpollPoller的源文件

epollpoller.cc

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
// Copyright 2010, Shuo Chen.  All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
 
// Author: Shuo Chen (chenshuo at chenshuo dot com)
 
#include <muduo/net/poller/EPollPoller.h>
 
#include <muduo/base/Logging.h>
#include <muduo/net/Channel.h>
 
#include <boost/static_assert.hpp>
 
#include <assert.h>
#include <errno.h>
#include <poll.h>
#include <sys/epoll.h>
 
using namespace muduo;
using namespace muduo::net;
 
// On Linux, the constants of poll(2) and epoll(4)
// are expected to be the same.
BOOST_STATIC_ASSERT(EPOLLIN == POLLIN);
BOOST_STATIC_ASSERT(EPOLLPRI == POLLPRI);
BOOST_STATIC_ASSERT(EPOLLOUT == POLLOUT);
BOOST_STATIC_ASSERT(EPOLLRDHUP == POLLRDHUP);
BOOST_STATIC_ASSERT(EPOLLERR == POLLERR);
BOOST_STATIC_ASSERT(EPOLLHUP == POLLHUP);
 
namespace
{
const int kNew = -1; //新通道
const int kAdded = 1; //要关注的通道
const int kDeleted = 2; //要删除的通道
}
 
EPollPoller::EPollPoller(EventLoop* loop)
  : Poller(loop),
    epollfd_(::epoll_create1(EPOLL_CLOEXEC)),
    events_(kInitEventListSize)
{
  if (epollfd_ < 0)
  {
    LOG_SYSFATAL << "EPollPoller::EPollPoller";
  }
}
 
EPollPoller::~EPollPoller()
{
  ::close(epollfd_);
}
 
// IO 线程
Timestamp EPollPoller::poll(int timeoutMs, ChannelList* activeChannels)
{
  //监听事件的到来
  int numEvents = ::epoll_wait(epollfd_,
                               &*events_.begin(),
                               static_cast<int>(events_.size()),
                               timeoutMs);
 
  Timestamp now(Timestamp::now());
  if (numEvents > 0)
  {
    LOG_TRACE << numEvents << " events happended";
    /*添加活动通道channel*/
    fillActiveChannels(numEvents, activeChannels);
    //如果活动通道的容器已满,则增加活动通道容器的容量
    if (implicit_cast<size_t>(numEvents) == events_.size())
    {
      events_.resize(events_.size()*2);
    }
  }
  else if (numEvents == 0)
  {
    LOG_TRACE << " nothing happended";
  }
  else
  {
    LOG_SYSERR << "EPollPoller::poll()";
  }
  return now;
}
 
/*添加活动通道channel*/
void EPollPoller::fillActiveChannels(int numEvents,
                                     ChannelList* activeChannels) const
{
  assert(implicit_cast<size_t>(numEvents) <= events_.size());
  for (int i = 0; i < numEvents; ++i)
  {
    Channel* channel = static_cast<Channel*>(events_[i].data.ptr);
    //如果是调试状态,则
#ifndef NDEBUG
    int fd = channel->fd();
    ChannelMap::const_iterator it = channels_.find(fd);
    assert(it != channels_.end());
    assert(it->second == channel);
#endif
    //否则直接跳到这里
    // 设置channel的“可用事件”
    channel->set_revents(events_[i].events);
    // 加入活动通道容器
    activeChannels->push_back(channel);
  }
}
 
//更新某个通道 channel
void EPollPoller::updateChannel(Channel* channel)
{
  //断言 在IO线程中
  Poller::assertInLoopThread();
  LOG_TRACE << "fd = " << channel->fd() << " events = " << channel->events();
 
  //channel 的默认值是 -1 , ---->>channel class
  const int index = channel->index();
  // kNew 表示有新的通道要增加,kDeleted表示将已不关注事件的fd重新关注事件,及时重新加到epollfd_中去
  if (index == kNew || index == kDeleted)
  {
    // a new one, add with EPOLL_CTL_ADD
    int fd = channel->fd();
    if (index == kNew)
    {
      //如果是新的channel,那么在channels_里面是找不到的
      assert(channels_.find(fd) == channels_.end());
      //添加到channels_中
      channels_[fd] = channel;
    }
 
    else // index == kDeleted
    {
      assert(channels_.find(fd) != channels_.end());
      assert(channels_[fd] == channel);
    }
    //
    channel->set_index(kAdded);
    update(EPOLL_CTL_ADD, channel);
  }
  else
  {
    // update existing one with EPOLL_CTL_MOD/DEL
    int fd = channel->fd();
    (void)fd;
    assert(channels_.find(fd) != channels_.end());
    assert(channels_[fd] == channel);
    //断言已经在channels_里面了,并且已在epollfd_中
    assert(index == kAdded);
 
    //剔除channel的关注事件
    //如果channel没有事件关注了,就把他从epollfd_中剔除掉
    if (channel->isNoneEvent())
    {
      update(EPOLL_CTL_DEL, channel);
      //更新index = kDeleted
      channel->set_index(kDeleted);
    }
    else
    {
      update(EPOLL_CTL_MOD, channel);
    }
  }
}
 
// 移除channel
void EPollPoller::removeChannel(Channel* channel)
{
  //断言实在IO线程中
  Poller::assertInLoopThread();
  int fd = channel->fd();
  LOG_TRACE << "fd = " << fd;
  //断言能在channels_里面找到channel
  assert(channels_.find(fd) != channels_.end());
  assert(channels_[fd] == channel);
  //断言所要移除的channel已经没有事件关注了,但是此时在event_里面可能还有他的记录
  assert(channel->isNoneEvent());
  int index = channel->index();
  //断言
  assert(index == kAdded || index == kDeleted);
  //真正从channels_里面删除掉channel
  size_t n = channels_.erase(fd);
  (void)n;
  assert(n == 1);
 
  //从event_中剔除channel
  if (index == kAdded)
  {
    update(EPOLL_CTL_DEL, channel);
  }
 
  // channel现在变成新的channel了
  channel->set_index(kNew);
}
 
void EPollPoller::update(int operation, Channel* channel)
{
  struct epoll_event event;
  bzero(&event, sizeof event);
  event.events = channel->events();
  event.data.ptr = channel;
  int fd = channel->fd();
  //更新操作
  if (::epoll_ctl(epollfd_, operation, fd, &event) < 0)
  {
    //写入日志
    if (operation == EPOLL_CTL_DEL)
    {
      LOG_SYSERR << "epoll_ctl op=" << operation << " fd=" << fd;
    }
    else
    {
      LOG_SYSFATAL << "epoll_ctl op=" << operation << " fd=" << fd;
    }
  }
}

测试程序

Reactor_test03.cc

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
#include <muduo/net/Channel.h>
#include <muduo/net/EventLoop.h>
 
#include <boost/bind.hpp>
 
#include <stdio.h>
#include <sys/timerfd.h>
 
using namespace muduo;
using namespace muduo::net;
 
EventLoop* g_loop;
int timerfd;
 
void timeout(Timestamp receiveTime)
{
    printf("Timeout!\n");
    uint64_t howmany;
    ::read(timerfd, &howmany, sizeof howmany);
    g_loop->quit();
}
 
int main(void)
{
    EventLoop loop;
    g_loop = &loop;
 
    timerfd = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
    Channel channel(&loop, timerfd);
    channel.setReadCallback(boost::bind(timeout, _1));
    channel.enableReading();
 
    struct itimerspec howlong;
    bzero(&howlong, sizeof howlong);
    howlong.it_value.tv_sec = 1;
    ::timerfd_settime(timerfd, 0, &howlong, NULL);
 
    loop.loop();
 
    ::close(timerfd);
}

程序输出

1
2
3
4
5
6
7
8
9
[root@localhost bin]# ./reactor_test03
20131020 02:24:05.657327Z  4009 TRACE EventLoop EventLoop created 0xBFD2AAD4 in thread 4009 - EventLoop.cc:42
20131020 02:24:05.657513Z  4009 TRACE updateChannel fd = 4 events = 3 - EPollPoller.cc:104
20131020 02:24:05.657554Z  4009 TRACE loop EventLoop 0xBFD2AAD4 start looping - EventLoop.cc:68
20131020 02:24:06.658756Z  4009 TRACE poll 1 events happended - EPollPoller.cc:65
20131020 02:24:06.658972Z  4009 TRACE printActiveChannels {4: IN }  - EventLoop.cc:139
Timeout!
20131020 02:24:06.659008Z  4009 TRACE loop EventLoop 0xBFD2AAD4 stop looping - EventLoop.cc:93
[root@localhost bin]#

[28] muduo的定时器由三个类实现,TimerId(定时器)、Timer(最上层的抽象)、TimerQueue(定时器的列表)

  • muduo的定时器由三个类实现,TimerId(定时器)、Timer(最上层的抽象,并没有调用定时器的相关函数)、TimerQueue(定时器的列表),用户只能看到第一个类,其它两个都是内部实现细节
  • TimerQueue的接口很简单,只有两个函数addTimer和cancel。实际上这两个函数也没有向外部开发,访问它要通过EventLoop的相关方法
  • EventLoop

    1
    2
    3
    4
    5
    6
    7
    
       runAt    在某个时刻运行定时器------>TimerQueue.addTimer
    
       runAfter    过一段时间运行定时器---->TimeQueue.addTimer
    
       runEvery    每隔一段时间运行定时器->TimerQueue.addTimer
    
       cancel    取消定时器 ----》TimeQueue.cancal
  • TimerQueue数据结构的选择,能快速根据当前时间找到已到期的定时器,也要高效的添加和删除Timer,因而可以用二叉搜索树,用map或者set;如果选择map的话,是有问题的,可能TimeQueue里面有时间戳是一样的,但定时器timer*是不一样的,可以考虑multimap,不过最后还是不要使用

1
2
 typedef std::pair<Timestamp, Timer*> Entry;
 typedef std::set<Entry> TimerList;

时序图

Timer头文件

timer.h

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
// Copyright 2010, Shuo Chen.  All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
 
// Author: Shuo Chen (chenshuo at chenshuo dot com)
//
// This is an internal header file, you should not include this.
 
#ifndef MUDUO_NET_TIMER_H
#define MUDUO_NET_TIMER_H
 
#include <boost/noncopyable.hpp>
 
#include <muduo/base/Atomic.h>
#include <muduo/base/Timestamp.h>
#include <muduo/net/Callbacks.h>
 
namespace muduo
{
namespace net
{
///
/// Internal class for timer event.
///
class Timer : boost::noncopyable
{
 public:
  Timer(const TimerCallback& cb, Timestamp when, double interval)
    : callback_(cb),
      expiration_(when),
      interval_(interval),
      repeat_(interval > 0.0),
      sequence_(s_numCreated_.incrementAndGet())
  { }
 
  void run() const
  {
    callback_();
  }
 
  Timestamp expiration() const  { return expiration_; }
  bool repeat() const { return repeat_; }
  int64_t sequence() const { return sequence_; }
 
  void restart(Timestamp now);
 
  static int64_t numCreated() { return s_numCreated_.get(); }
 
 private:
  const TimerCallback callback_;        // 定时器回调函数
  Timestamp expiration_;                // 下一次的超时时刻
  const double interval_;               // 超时时间间隔,如果是一次性定时器,该值为0
  const bool repeat_;                   // 是否重复
  const int64_t sequence_;              // 定时器序号
 
  static AtomicInt64 s_numCreated_;     // 定时器计数,当前已经创建的定时器数量
};
}
}
#endif  // MUDUO_NET_TIMER_H

Timer源文件

timer.cc

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// Copyright 2010, Shuo Chen.  All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
 
// Author: Shuo Chen (chenshuo at chenshuo dot com)
 
#include <muduo/net/Timer.h>
 
using namespace muduo;
using namespace muduo::net;
 
AtomicInt64 Timer::s_numCreated_;
 
void Timer::restart(Timestamp now)
{
  if (repeat_)
  {
    // 重新计算下一个超时时刻
    expiration_ = addTime(now, interval_);
  }
  else
  {
    expiration_ = Timestamp::invalid();
  }
}

TimerId头文件

timerid.h

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// Copyright 2010, Shuo Chen.  All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
 
// Author: Shuo Chen (chenshuo at chenshuo dot com)
//
// This is a public header file, it must only include public header files.
 
#ifndef MUDUO_NET_TIMERID_H
#define MUDUO_NET_TIMERID_H
 
#include <muduo/base/copyable.h>
 
namespace muduo
{
namespace net
{
 
class Timer;
 
///是一个不透明的ID ,是外部可见的一个类
/// An opaque identifier, for canceling Timer.
///
class TimerId : public muduo::copyable
{
 public:
  TimerId()
    : timer_(NULL),
      sequence_(0)
  {
  }
 
  TimerId(Timer* timer, int64_t seq)
    : timer_(timer),
      sequence_(seq)
  {
  }
 
  // default copy-ctor, dtor and assignment are okay
 
  friend class TimerQueue;
 
 private:
  Timer* timer_; //定时器的地址 ,timer里面也包含了timer的序号
  int64_t sequence_; //定时器的序号
};
 
}
}
 
#endif  // MUDUO_NET_TIMERID_H

TimerQueue头文件

timerqueue.h

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
// Copyright 2010, Shuo Chen.  All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
 
// Author: Shuo Chen (chenshuo at chenshuo dot com)
//
// This is an internal header file, you should not include this.
 
#ifndef MUDUO_NET_TIMER_H
#define MUDUO_NET_TIMER_H
 
#include <boost/noncopyable.hpp>
 
#include <muduo/base/Atomic.h>
#include <muduo/base/Timestamp.h>
#include <muduo/net/Callbacks.h>
 
namespace muduo
{
namespace net
{
///
/// Internal class for timer event.
///
class Timer : boost::noncopyable
{
 public:
  Timer(const TimerCallback& cb, Timestamp when, double interval)
    : callback_(cb),
      expiration_(when),
      interval_(interval),
      repeat_(interval > 0.0),
      sequence_(s_numCreated_.incrementAndGet())
  { }
 
  void run() const
  {
    callback_();
  }
 
  Timestamp expiration() const  { return expiration_; }
  bool repeat() const { return repeat_; }
  int64_t sequence() const { return sequence_; }
 
  void restart(Timestamp now);
 
  static int64_t numCreated() { return s_numCreated_.get(); }
 
 private:
  const TimerCallback callback_;        // 定时器回调函数
  Timestamp expiration_;                // 下一次的超时时刻
  const double interval_;               // 超时时间间隔,如果是一次性定时器,该值为0
  const bool repeat_;                   // 是否重复
  const int64_t sequence_;              // 定时器序号
 
  static AtomicInt64 s_numCreated_;     // 定时器计数,当前已经创建的定时器数量
};
}
}
#endif  // MUDUO_NET_TIMER_H

TimerQueue源文件

timerqueue.cc

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
// Copyright 2010, Shuo Chen.  All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
 
// Author: Shuo Chen (chenshuo at chenshuo dot com)
 
#define __STDC_LIMIT_MACROS
#include <muduo/net/TimerQueue.h>
 
#include <muduo/base/Logging.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/Timer.h>
#include <muduo/net/TimerId.h>
 
#include <boost/bind.hpp>
 
#include <sys/timerfd.h>
 
namespace muduo
{
namespace net
{
namespace detail
{
 
// 创建定时器
int createTimerfd()
{
  int timerfd = ::timerfd_create(CLOCK_MONOTONIC,
                                 TFD_NONBLOCK | TFD_CLOEXEC);
  if (timerfd < 0)
  {
    LOG_SYSFATAL << "Failed in timerfd_create";
  }
  return timerfd;
}
 
// 计算超时时刻与当前时间的时间差
struct timespec howMuchTimeFromNow(Timestamp when)
{
  int64_t microseconds = when.microSecondsSinceEpoch()
                         - Timestamp::now().microSecondsSinceEpoch();
  if (microseconds < 100)
  {
    microseconds = 100;
  }
  struct timespec ts;
  ts.tv_sec = static_cast<time_t>(
      microseconds / Timestamp::kMicroSecondsPerSecond);
  ts.tv_nsec = static_cast<long>(
      (microseconds % Timestamp::kMicroSecondsPerSecond) * 1000);
  return ts;
}
 
// 清除定时器,避免一直触发
void readTimerfd(int timerfd, Timestamp now)
{
  uint64_t howmany;
  ssize_t n = ::read(timerfd, &howmany, sizeof howmany);
  LOG_TRACE << "TimerQueue::handleRead() " << howmany << " at " << now.toString();
  if (n != sizeof howmany)
  {
    LOG_ERROR << "TimerQueue::handleRead() reads " << n << " bytes instead of 8";
  }
}
 
// 重置定时器的超时时间
void resetTimerfd(int timerfd, Timestamp expiration)
{
  // wake up loop by timerfd_settime()
  struct itimerspec newValue;
  struct itimerspec oldValue;
  bzero(&newValue, sizeof newValue);
  bzero(&oldValue, sizeof oldValue);
  newValue.it_value = howMuchTimeFromNow(expiration);
  int ret = ::timerfd_settime(timerfd, 0, &newValue, &oldValue);
  if (ret)
  {
    LOG_SYSERR << "timerfd_settime()";
  }
}
 
}
}
}
 
using namespace muduo;
using namespace muduo::net;
using namespace muduo::net::detail;
 
TimerQueue::TimerQueue(EventLoop* loop)
  : loop_(loop),
    timerfd_(createTimerfd()),
    timerfdChannel_(loop, timerfd_),
    timers_(),
    callingExpiredTimers_(false)
{
  timerfdChannel_.setReadCallback(
      boost::bind(&TimerQueue::handleRead, this));
  // we are always reading the timerfd, we disarm it with timerfd_settime.
  /*把timerfd 加入到poller来关注*/
  timerfdChannel_.enableReading();
}
 
TimerQueue::~TimerQueue()
{
  ::close(timerfd_);
  // do not remove channel, since we're in EventLoop::dtor();
  for (TimerList::iterator it = timers_.begin();
      it != timers_.end(); ++it)
  {
    delete it->second;
  }
}
/*增加一个定时器addTimer()----->addTimerInLoop()*/
TimerId TimerQueue::addTimer(const TimerCallback& cb,
                             Timestamp when,
                             double interval)
{
 
  /**
注意:addTimer虽然是线程安全的,但是这里把安全实现的代码给注释掉了,所以以下的代码必须在所属
的EventLoop IO线程中调用
  */
  Timer* timer = new Timer(cb, when, interval);
  /*
  loop_->runInLoop(
      boost::bind(&TimerQueue::addTimerInLoop, this, timer));
      */
  addTimerInLoop(timer);
  return TimerId(timer, timer->sequence());
}
 
void TimerQueue::cancel(TimerId timerId)
{
  /*
  loop_->runInLoop(
      boost::bind(&TimerQueue::cancelInLoop, this, timerId));
      */
  cancelInLoop(timerId);
}
/*添加定时器到所属eventloop IO线程中*/
void TimerQueue::addTimerInLoop(Timer* timer)
{
  loop_->assertInLoopThread();
  // 插入一个定时器,有可能会使得最早到期的定时器发生改变
  bool earliestChanged = insert(timer);
 
  if (earliestChanged)
  {
    // 重置定时器的超时时刻(timerfd_settime)
    resetTimerfd(timerfd_, timer->expiration());
  }
}
 
/*取消某个timer*/
void TimerQueue::cancelInLoop(TimerId timerId)
{
  loop_->assertInLoopThread();
  assert(timers_.size() == activeTimers_.size());
  ActiveTimer timer(timerId.timer_, timerId.sequence_);
  // 查找该定时器
  ActiveTimerSet::iterator it = activeTimers_.find(timer);
  if (it != activeTimers_.end())
  {
    size_t n = timers_.erase(Entry(it->first->expiration(), it->first));
    assert(n == 1); (void)n;
    delete it->first; // FIXME: no delete please,如果用了unique_ptr,这里就不需要手动删除了
    activeTimers_.erase(it);
  }
  else if (callingExpiredTimers_)
  {
    // 已经到期,并且正在调用回调函数的定时器 , 那么将其加到cancelingtimers中,
    // 以便在其回调处理完后, 在reset(expired, now)里时无效,不需要重置
    cancelingTimers_.insert(timer);
  }
  assert(timers_.size() == activeTimers_.size());
}
 
/*定时器触发的回调函数*/
void TimerQueue::handleRead()
{
  loop_->assertInLoopThread();
  Timestamp now(Timestamp::now());
  readTimerfd(timerfd_, now);       // 清除该事件,避免一直触发
 
  // 获取该时刻之前所有的定时器列表(即超时定时器列表)
  std::vector<Entry> expired = getExpired(now);
  //已经处于处理到期定时器当中
  callingExpiredTimers_ = true;
  //清除上一次的超时定时器队列
  cancelingTimers_.clear();
  // safe to callback outside critical section
  for (std::vector<Entry>::iterator it = expired.begin();
      it != expired.end(); ++it)
  {
    // 这里回调定时器处理函数
    it->second->run();
  }
  callingExpiredTimers_ = false;
 
  // 不是一次性定时器,需要重启
  reset(expired, now);
}
 
// rvo 优化
//返回已经超时的timers
// TimerQueue = 1,1,1,3,4,5,7,9
// 那么触发第一个1时,其实后面的2个1也被触发了
// timerQueue 只管队列中的第一个定时器
std::vector<TimerQueue::Entry> TimerQueue::getExpired(Timestamp now)
{
  assert(timers_.size() == activeTimers_.size());
  std::vector<Entry> expired;
  Entry sentry(now, reinterpret_cast<Timer*>(UINTPTR_MAX));
  // 返回第一个未到期的Timer的迭代器
  // lower_bound的含义是返回第一个值>=sentry的元素的iterator
  // 即*end >= sentry,从而end->first > now
  TimerList::iterator end = timers_.lower_bound(sentry);
  assert(end == timers_.end() || now < end->first);
  // 将到期的定时器插入到expired中
  std::copy(timers_.begin(), end, back_inserter(expired));
  // 从timers_中移除到期的定时器
  timers_.erase(timers_.begin(), end);
 
  // 从activeTimers_中移除到期的定时器
  for (std::vector<Entry>::iterator it = expired.begin();it != expired.end(); ++it)
  {
    ActiveTimer timer(it->second, it->second->sequence());
    size_t n = activeTimers_.erase(timer);
    assert(n == 1); (void)n;
  }
 
  assert(timers_.size() == activeTimers_.size());
  return expired;
}
 
void TimerQueue::reset(const std::vector<Entry>& expired, Timestamp now)
{
  Timestamp nextExpire;
 
  for (std::vector<Entry>::const_iterator it = expired.begin();
      it != expired.end(); ++it)
  {
    ActiveTimer timer(it->second, it->second->sequence());
    // 如果是重复的定时器并且是未取消定时器(未被其他线程取消掉),则重启该定时器
    // cancelingTimers_.find(timer) == cancelingTimers_.end()  在取消的队列中找到不到timer
    if (it->second->repeat()&& cancelingTimers_.find(timer) == cancelingTimers_.end())
    {
      it->second->restart(now);
      insert(it->second);
    }
    else
    {
      // 一次性定时器或者已被取消的定时器是不能重置的,因此删除该定时器
      // FIXME move to a free list
      delete it->second; // FIXME: no delete please
    }
  }
 
  if (!timers_.empty())
  {
    // 获取最早到期的定时器超时时间
    nextExpire = timers_.begin()->second->expiration();
  }
 
  if (nextExpire.valid())
  {
    // 重置定时器的超时时刻(timerfd_settime)
    resetTimerfd(timerfd_, nextExpire);
  }
}
 
bool TimerQueue::insert(Timer* timer)
{
  loop_->assertInLoopThread();
  assert(timers_.size() == activeTimers_.size());
  // 最早到期时间是否改变
  bool earliestChanged = false;
  //获取传进来的定时器的超时时间
  Timestamp when = timer->expiration();
  TimerList::iterator it = timers_.begin();
  // 如果timers_为空或者when小于timers_中的最早到期时间
  if (it == timers_.end() || when < it->first)
  {
 
    earliestChanged = true;
  }
  {
    // 插入到timers_中
    std::pair<TimerList::iterator, bool> result
      = timers_.insert(Entry(when, timer));
    /*断言插入操作是否成功*/ 
    assert(result.second); (void)result;
  }
  {
    // 插入到activeTimers_中
    std::pair<ActiveTimerSet::iterator, bool> result
      = activeTimers_.insert(ActiveTimer(timer, timer->sequence()));
    assert(result.second); (void)result;
  }
  //断言是否成功操作了
  assert(timers_.size() == activeTimers_.size());
  return earliestChanged;
}

[29] EventLoop再分析之IO线程(29)

EventLoop IO 线程的简单描述

eventloopThread 分析:

eventloop现成循环调用loop()函数:

  • 获取活跃的通道
  • 执行活跃通道的回调函数
  • 调用doPending Functors,获取queueLoop的任务,依次执行(这些任务会阻塞IO线程,即IO线程也做了计算任务,所以这些任务不应该是耗时的任务)

注意

eventFd在这里用于唤醒eventloop的事件,试想一下,加入当前只有IO线程的计算任务,那么eventloop会阻塞在poll()函数里面,直到超时,这是不可取的策略。

进程(线程)wait/notify

  • pipe
  • socketpair
  • eventfd

eventfd 是一个比pipe更高效的线程间事件通知机制,一方面它比pipe少用一个file descripor,节省了资源;另一方面,eventfd的缓冲区管理也简单得多,全部buffer只有定长8 bytes,不像pipe那样可能有不定长的真正buffer

EventLoop头文件

eventloop.h

NOTIFY:

这里的eventloopmuduo_net库源码分析(26-1) 里面的内容有些不一样,这里的eventloop加了一些内容, 在阅读eventloop时,要注意runInLoopqueueInloop这两个函数,wakeup()也得注意.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
// Copyright 2010, Shuo Chen.  All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
 
// Author: Shuo Chen (chenshuo at chenshuo dot com)
//
// This is a public header file, it must only include public header files.
 
#ifndef MUDUO_NET_EVENTLOOP_H
#define MUDUO_NET_EVENTLOOP_H
 
#include <vector>
 
#include <boost/noncopyable.hpp>
#include <boost/scoped_ptr.hpp>
 
#include <muduo/base/Mutex.h>
#include <muduo/base/Thread.h>
#include <muduo/base/Timestamp.h>
#include <muduo/net/Callbacks.h>
#include <muduo/net/TimerId.h>
 
namespace muduo
{
namespace net
{
 
class Channel;
class Poller;
class TimerQueue;
///
/// Reactor, at most one per thread.
///
/// This is an interface class, so don't expose too much details.
class EventLoop : boost::noncopyable
{
 public:
  /* IO事件回调函数,就像“监听socketfd” ,我们可以把 监听socketfd 的处理函数 做成一个回调函数,
  把这个函数放到pendingFunctors_中,然后由IO线程自己回调处理*/
  typedef boost::function<void()> Functor;
 
  EventLoop();
  ~EventLoop();  // force out-line dtor, for scoped_ptr members.
 
  ///
  /// Loops forever.
  ///
  /// Must be called in the same thread as creation of the object.
  ///
  void loop();
 
  void quit();
 
  ///
  /// Time when poll returns, usually means data arrivial.
  ///
  Timestamp pollReturnTime() const { return pollReturnTime_; }
 
  /// Runs callback immediately in the loop thread.
  /// It wakes up the loop, and run the cb.
  /// If in the same loop thread, cb is run within the function.
  /// Safe to call from other threads.
  void runInLoop(const Functor& cb);
  /// Queues callback in the loop thread.
  /// Runs after finish pooling.
  /// Safe to call from other threads.
  void queueInLoop(const Functor& cb);
 
  // timers
 
  ///
  /// Runs callback at 'time'.
  /// Safe to call from other threads.
  ///
  TimerId runAt(const Timestamp& time, const TimerCallback& cb);
  ///
  /// Runs callback after @c delay seconds.
  /// Safe to call from other threads.
  ///
  TimerId runAfter(double delay, const TimerCallback& cb);
  ///
  /// Runs callback every @c interval seconds.
  /// Safe to call from other threads.
  ///
  TimerId runEvery(double interval, const TimerCallback& cb);
  ///
  /// Cancels the timer.
  /// Safe to call from other threads.
  ///
  void cancel(TimerId timerId);
 
  // internal usage
  void wakeup();
  void updateChannel(Channel* channel);     // 在Poller中添加或者更新通道
  void removeChannel(Channel* channel);     // 从Poller中移除通道
 
  void assertInLoopThread()
  {
    if (!isInLoopThread())
    {
      abortNotInLoopThread();
    }
  }
  bool isInLoopThread() const { return threadId_ == CurrentThread::tid(); }
 
  bool eventHandling() const { return eventHandling_; }
 
  static EventLoop* getEventLoopOfCurrentThread();
 
 private:
  void abortNotInLoopThread();
  void handleRead();  // waked up
  void doPendingFunctors();
 
  void printActiveChannels() const; // DEBUG
 
  typedef std::vector<Channel*> ChannelList;
 
  bool looping_; /* atomic */
  bool quit_; /* atomic */
  bool eventHandling_; /*事件处理函数状态 atomic */
  bool callingPendingFunctors_; /*是否处于IO回调函数的处理状态中 atomic */
  const pid_t threadId_;        // 当前对象所属线程ID
  Timestamp pollReturnTime_;
  boost::scoped_ptr<Poller> poller_;
  boost::scoped_ptr<TimerQueue> timerQueue_;
 
/*****
 
wakeupFd_ : 这个描述符主要是未了
 
****/
  int wakeupFd_;                // 用于eventfd所创建的文件描述符
  // unlike in TimerQueue, which is an internal class,
  // we don't expose Channel to client.
  /*
  wakeupChannel 和EventLoop 是组合的关系,生命周期由EventLoop管理
  一定要看下面的内容,如果不明白下面的内容的话,代码是很难懂的(个人观点,高手勿喷!!^V^)
  当其他线程跟IO线程通信时, 他们使用的是同一个通道(wakeupChannel_) , 当其他线程wakeup(write)时,
poll会返回。然后IO执行handleRead,但是主要的回调函数不是handleRead ,而是pendingFunctors_里面的函数,
hadnleRead只是为了不被多次触发而已,也就是说其他线程要想IO线程执行他们所需的函调函数,只要把回调函数
注入到pendingFunctors_,然后wakeup(write)唤醒一下。
  当然IO线程也可以和IO线程进行通信,但是情况有些不一样,具体哪些不同,大家自己看着办吧!
 
  注: IO线程就是EventLoop所属的线程
  */
  boost::scoped_ptr<Channel> wakeupChannel_;  //wakeupFd_所对应的通道 该通道将会纳入poller_来管理
  ChannelList activeChannels_;      // Poller返回的活动通道
  Channel* currentActiveChannel_;   // 当前正在处理的活动通道
  MutexLock mutex_;
  // IO线程 回调函数容器
  std::vector<Functor> pendingFunctors_; // @BuardedBy mutex_
};
 
}
}
#endif  // MUDUO_NET_EVENTLOOP_H

EventLoop源文件

这里的eventloop和 *muduo_net库源码分析(26-1)*里面的内容有些不一样,这里的eventloop加了一些内容,具体情况可以回去看一看.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
// Copyright 2010, Shuo Chen.  All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
 
// Author: Shuo Chen (chenshuo at chenshuo dot com)
 
#include <muduo/net/EventLoop.h>
 
#include <muduo/base/Logging.h>
#include <muduo/net/Channel.h>
#include <muduo/net/Poller.h>
#include <muduo/net/TimerQueue.h>
 
//#include <poll.h>
#include <boost/bind.hpp>
 
#include <sys/eventfd.h>
 
using namespace muduo;
using namespace muduo::net;
 
namespace
{
// 当前线程EventLoop对象指针
// 线程局部存储
__thread EventLoop* t_loopInThisThread = 0;
 
const int kPollTimeMs = 10000;
 
int createEventfd()
{
  int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
  if (evtfd < 0)
  {
    LOG_SYSERR << "Failed in eventfd";
    abort();
  }
  return evtfd;
}
 
}
 
EventLoop* EventLoop::getEventLoopOfCurrentThread()
{
  return t_loopInThisThread;
}
 
EventLoop::EventLoop()
  : looping_(false),
    quit_(false),
    eventHandling_(false),
    callingPendingFunctors_(false),
    threadId_(CurrentThread::tid()),
    poller_(Poller::newDefaultPoller(this)),
    timerQueue_(new TimerQueue(this)),
    wakeupFd_(createEventfd()), //创建 wakeupFd_
    wakeupChannel_(new Channel(this, wakeupFd_)), //创建wakeupFd_对应的channel
    currentActiveChannel_(NULL)
{
  LOG_TRACE << "EventLoop created " << this << " in thread " << threadId_;
  // 如果当前线程已经创建了EventLoop对象,终止(LOG_FATAL)
  if (t_loopInThisThread)
  {
    LOG_FATAL << "Another EventLoop " << t_loopInThisThread
              << " exists in this thread " << threadId_;
  }
  else
  {
    t_loopInThisThread = this;
  }
 
  // 设定wakeupChannel的回调函数
  wakeupChannel_->setReadCallback(
      boost::bind(&EventLoop::handleRead, this));
  // we are always reading the wakeupfd
  //纳入poller来管理
  wakeupChannel_->enableReading();
}
 
EventLoop::~EventLoop()
{
  ::close(wakeupFd_);
  t_loopInThisThread = NULL;
}
 
// 事件循环,该函数不能跨线程调用
// 只能在创建该对象的线程中调用
void EventLoop::loop()
{
  assert(!looping_);
  // 断言当前处于创建该对象的线程中
  assertInLoopThread();
  looping_ = true;
  quit_ = false;
  LOG_TRACE << "EventLoop " << this << " start looping";
 
  //::poll(NULL, 0, 5*1000);
  while (!quit_)
  {
    activeChannels_.clear();
    pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
    //++iteration_;
    if (Logger::logLevel() <= Logger::TRACE)
    {
      printActiveChannels();
    }
    // TODO sort channel by priority
    eventHandling_ = true;
    for (ChannelList::iterator it = activeChannels_.begin();
        it != activeChannels_.end(); ++it)
    {
      currentActiveChannel_ = *it;
      currentActiveChannel_->handleEvent(pollReturnTime_);
    }
    currentActiveChannel_ = NULL;
    eventHandling_ = false;
    // 让IO线程也能执行一些计算任务
    doPendingFunctors();
  }
 
  LOG_TRACE << "EventLoop " << this << " stop looping";
  looping_ = false;
}
 
// 该函数可以跨线程调用
void EventLoop::quit()
{
  quit_ = true;
  //如果不是由当前IO线程调用的,那么我们还要唤醒IO线程
  if (!isInLoopThread())
  {
    wakeup();
  }
}
/*
sublime 注释的好方法,都是苦力活
===========         ===================               ======         ===================
||        |||      |||=================             =                         {}
||        ****      ||                             =                          {}
||       ****       ||                            =                           {}
|||||||||           ||=================              =  = = =                 {}
||      \\\         ||==================                      ///             {}
||        \\        ||                                      = //              {}
                    //                                        //              {}
||        \\\       || ==================                    //               {}
||          \\      ||===================           =========                 {}
*/
// 在I/O线程中注册某个回调函数,该函数可以跨线程调用
void EventLoop::runInLoop(const Functor& cb)
{
  if (isInLoopThread())
  {
    // 如果是当前IO线程调用runInLoop,则同步调用cb
    cb();
  }
  else
  {
    // 如果是其它线程调用runInLoop,则异步地将cb添加到队列,让EventLoop线程来执行这个cb()函数
    queueInLoop(cb);
  }
}
 
// 把IO回调函数注册到pendingFunctors_中
void EventLoop::queueInLoop(const Functor& cb)
{
  {
    // 保护临界区
  MutexLockGuard lock(mutex_);
    // 将任务添加到任务队列中
  pendingFunctors_.push_back(cb);
  }
 
  // 1.--->调用queueInLoop的线程不是IO线程需要唤醒,以便IO线程及时处理任务
  // 2.--->或者调用queueInLoop的线程是IO线程,并且此时正在调用pending functor,需要唤醒
   /*   如果我们不唤醒,那么下一次poll就没法获取该cb的触发事件,这样就会使事件处理延迟了 ,请参照loop函数 */
  // 3---->只有IO线程的事件回调中调用queueInLoop才不需要唤醒 ,因为handleEvent执行后,
  //下面接着执行doPendingFunctors
  if (!isInLoopThread() || callingPendingFunctors_)
  {
    wakeup();
  }
}
 
TimerId EventLoop::runAt(const Timestamp& time, const TimerCallback& cb)
{
  return timerQueue_->addTimer(cb, time, 0.0);
}
 
TimerId EventLoop::runAfter(double delay, const TimerCallback& cb)
{
  Timestamp time(addTime(Timestamp::now(), delay));
  return runAt(time, cb);
}
 
TimerId EventLoop::runEvery(double interval, const TimerCallback& cb)
{
  Timestamp time(addTime(Timestamp::now(), interval));
  return timerQueue_->addTimer(cb, time, interval);
}
 
void EventLoop::cancel(TimerId timerId)
{
  return timerQueue_->cancel(timerId);
}
 
void EventLoop::updateChannel(Channel* channel)
{
  assert(channel->ownerLoop() == this);
  assertInLoopThread();
  poller_->updateChannel(channel);
}
 
void EventLoop::removeChannel(Channel* channel)
{
  assert(channel->ownerLoop() == this);
  assertInLoopThread();
  if (eventHandling_)
  {
    assert(currentActiveChannel_ == channel ||
        std::find(activeChannels_.begin(), activeChannels_.end(), channel) == activeChannels_.end());
  }
  poller_->removeChannel(channel);
}
 
void EventLoop::abortNotInLoopThread()
{
  LOG_FATAL << "EventLoop::abortNotInLoopThread - EventLoop " << this
            << " was created in threadId_ = " << threadId_
            << ", current thread id = " <<  CurrentThread::tid();
}
 
/*
 
 
唤醒等待的线程
*/
void EventLoop::wakeup()
{
  uint64_t one = 1;
  //ssize_t n = sockets::write(wakeupFd_, &one, sizeof one);
  /*向wakeupFd_中写入数据*/
  ssize_t n = ::write(wakeupFd_, &one, sizeof one);
  if (n != sizeof one)
  {
    LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8";
  }
}
 
// IO事件回调函数
void EventLoop::handleRead()
{
  uint64_t one = 1;
  //ssize_t n = sockets::read(wakeupFd_, &one, sizeof one);--->>muduolib
  ssize_t n = ::read(wakeupFd_, &one, sizeof one); // 这是先这样设定
  if (n != sizeof one)
  {
    LOG_ERROR << "EventLoop::handleRead() reads " << n << " bytes instead of 8";
  }
}
 
/*IO线程的回调函数集 处理函数*/
void EventLoop::doPendingFunctors()
{
  std::vector<Functor> functors;
  /*正在处于IO线程处理函数中*/
  callingPendingFunctors_ = true;
 
  {
    // 加锁
  MutexLockGuard lock(mutex_);
  //交换,顺带将pendingFunctors清空
  functors.swap(pendingFunctors_);
  }
 
  for (size_t i = 0; i < functors.size(); ++i)
  {
    functors[i]();
  }
  /*清空IO线程回调处理函数中 的状态*/
  callingPendingFunctors_ = false;
}
 
void EventLoop::printActiveChannels() const
{
  for (ChannelList::const_iterator it = activeChannels_.begin();
      it != activeChannels_.end(); ++it)
  {
    const Channel* ch = *it;
    LOG_TRACE << "{" << ch->reventsToString() << "} ";
  }
}

测试程序源代码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
#include <muduo/net/EventLoop.h>
//#include <muduo/net/EventLoopThread.h>
//#include <muduo/base/Thread.h>
 
#include <stdio.h>
 
using namespace muduo;
using namespace muduo::net;
 
EventLoop* g_loop;
int g_flag = 0;
 
void run4()
{
  printf("run4(): pid = %d, flag = %d\n", getpid(), g_flag);
  g_loop->quit();
}
 
void run3()
{
  printf("run3(): pid = %d, flag = %d\n", getpid(), g_flag);
 
  g_loop->runAfter(3, run4);
  g_flag = 3;
}
 
void run2()
{
  printf("run2(): pid = %d, flag = %d\n", getpid(), g_flag);
    /*在IO线程中直接调用queueInLoop,并且if (!isInLoopThread() || callingPendingFunctors_)时,
  不用唤醒(wakeup())
    */
  g_loop->queueInLoop(run3);
}
 
void run1()
{
  g_flag = 1;
  printf("run1(): pid = %d, flag = %d\n", getpid(), g_flag);
  /*在IO线程中,调用runInLoop时,不用经过queueInLoop()就可以直接调用Functors*/
  g_loop->runInLoop(run2);
  g_flag = 2;
}
 
int main()
{
  printf("main(): pid = %d, flag = %d\n", getpid(), g_flag);
 
  EventLoop loop;
  g_loop = &loop;
 
  loop.runAfter(2, run1);
  loop.loop();
  printf("main(): pid = %d, flag = %d\n", getpid(), g_flag);
}

测试输出

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
ubuntu@ubuntu-virtual-machine:~/29/jmuduo$ ../build/debug/bin/reactor_test05
main(): pid = 28193, flag = 0
20131022 01:36:15.466526Z 28193 TRACE updateChannel fd = 4 events = 3 - EPollPoller.cc:104
20131022 01:36:15.467517Z 28193 TRACE EventLoop EventLoop created 0xBFEDB874 in thread 28193 - EventLoop.cc:62
20131022 01:36:15.467586Z 28193 TRACE updateChannel fd = 5 events = 3 - EPollPoller.cc:104
20131022 01:36:15.467898Z 28193 TRACE loop EventLoop 0xBFEDB874 start looping - EventLoop.cc:94
20131022 01:36:17.469365Z 28193 TRACE poll 1 events happended - EPollPoller.cc:65
20131022 01:36:17.482824Z 28193 TRACE printActiveChannels {4: IN }  - EventLoop.cc:257
20131022 01:36:17.483170Z 28193 TRACE readTimerfd TimerQueue::handleRead() 1 at 1382405777.482909 - TimerQueue.cc:62
run1(): pid = 28193, flag = 1
run2(): pid = 28193, flag = 1
run3(): pid = 28193, flag = 2
20131022 01:36:20.483953Z 28193 TRACE poll 1 events happended - EPollPoller.cc:65
20131022 01:36:20.484074Z 28193 TRACE printActiveChannels {4: IN }  - EventLoop.cc:257
20131022 01:36:20.484119Z 28193 TRACE readTimerfd TimerQueue::handleRead() 1 at 1382405780.484109 - TimerQueue.cc:62
run4(): pid = 28193, flag = 3
20131022 01:36:20.484229Z 28193 TRACE loop EventLoop 0xBFEDB874 stop looping - EventLoop.cc:119
main(): pid = 28193, flag = 3
ubuntu@ubuntu-virtual-machine:~/29/jmuduo$

[30] EventLoopThread

任何一个线程,只要创建并运行了EventLoop,都称之为IO线程

IO线程不一定是主线程

muduo并发模型one loop per thread + threadpool

为了方便今后使用,定义了EventLoopThread类,该类封装了IO线程EventLoopThread创建了一个线程

在线程函数中创建了一个EvenLoop对象并调用EventLoop::loop

EventLoopThread头文件

eventloopthread.h

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
// Copyright 2010, Shuo Chen.  All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
 
// Author: Shuo Chen (chenshuo at chenshuo dot com)
//
// This is a public header file, it must only include public header files.
 
#ifndef MUDUO_NET_EVENTLOOPTHREAD_H
#define MUDUO_NET_EVENTLOOPTHREAD_H
 
#include <muduo/base/Condition.h>
#include <muduo/base/Mutex.h>
#include <muduo/base/Thread.h>
 
#include <boost/noncopyable.hpp>
 
namespace muduo
{
namespace net
{
 
class EventLoop;
 
class EventLoopThread : boost::noncopyable
{
 public:
  typedef boost::function<void(EventLoop*)> ThreadInitCallback;
 
  EventLoopThread(const ThreadInitCallback& cb =ThreadInitCallback());
  ~EventLoopThread();
  EventLoop* startLoop();   // 启动线程,该线程就成为了IO线程
 
 private:
  void threadFunc();        // 线程函数 ,这个线程函数启动起来之后,会创建一个eventloop对象,loop_指针会指向这个对象
 
  EventLoop* loop_;         // loop_指针指向一个EventLoop对象,一个IO线程有且只有一个eventloop对象
  bool exiting_;
  Thread thread_;       //线程对象
  MutexLock mutex_;
  Condition cond_;
  ThreadInitCallback callback_;     // 回调函数在EventLoop::loop IO线程的事件循环之前被调用
};
 
}
}
 
#endif  // MUDUO_NET_EVENTLOOPTHREAD_H

EventLoopThread源文件

eventloopthread.cc

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
// Copyright 2010, Shuo Chen.  All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
 
// Author: Shuo Chen (chenshuo at chenshuo dot com)
 
#include <muduo/net/EventLoopThread.h>
 
#include <muduo/net/EventLoop.h>
 
#include <boost/bind.hpp>
 
using namespace muduo;
using namespace muduo::net;
// typedef boost::function<void(EventLoop*)> ThreadInitCallback;
 
EventLoopThread::EventLoopThread(const ThreadInitCallback& cb)
  : loop_(NULL),
    exiting_(false),
    thread_(boost::bind(&EventLoopThread::threadFunc, this)),
    mutex_(),
    cond_(mutex_),
    callback_(cb)  //cb 默认为空
{
}
 
EventLoopThread::~EventLoopThread()
{
  exiting_ = true;
  loop_->quit();     // 退出IO线程,让IO线程的loop循环退出,从而退出了IO线程
  thread_.join();
}
 
/*启动线程,也就是启动loop函数EventLoopThread::threadFunc这个函数*/
EventLoop* EventLoopThread::startLoop()
{
  assert(!thread_.started());
  /*线程启动时会调用*/
  thread_.start();
 
  {
    MutexLockGuard lock(mutex_);
    /*eventloop对象创建完成*/
    while (loop_ == NULL)
    {
      cond_.wait();
    }
  }
 
  return loop_;
}
 
/*loop 的初始化函数*/
void EventLoopThread::threadFunc()
{
  EventLoop loop;
  /*创建eventloop对象*/
  if (callback_)
  {
    callback_(&loop);
  }
 
  {
    MutexLockGuard lock(mutex_);
    // loop_指针指向了一个栈上的对象,threadFunc函数退出之后,这个指针就失效了
    // threadFunc函数退出,就意味着线程退出了,EventLoopThread对象也就没有存在的价值了。
    // 因而不会有什么大的问题
    loop_ = &loop;
    cond_.notify();
  }
  /*启动loop()函数*/
  loop.loop();
  //assert(exiting_);
}

测试程序

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
#include <muduo/net/EventLoop.h>
#include <muduo/net/EventLoopThread.h>
 
#include <stdio.h>
 
using namespace muduo;
using namespace muduo::net;
 
void runInThread()
{
  printf("runInThread(): pid = %d, tid = %d\n",
         getpid(), CurrentThread::tid());
}
 
int main()
{
  printf("main(): pid = %d, tid = %d\n",
         getpid(), CurrentThread::tid());
 
  EventLoopThread loopThread;
  EventLoop* loop = loopThread.startLoop();
  // 异步调用runInThread,即将runInThread添加到loop对象所在IO线程,让该IO线程执行
  loop->runInLoop(runInThread);
  sleep(1);
  // runAfter内部也调用了runInLoop,所以这里也是异步调用
  loop->runAfter(2, runInThread);
  sleep(3);
  loop->quit();
 
  printf("exit main().\n");
}
 
/*
TimerId TimerQueue::addTimer(const TimerCallback& cb,
                             Timestamp when,
                             double interval)
{
  Timer* timer = new Timer(cb, when, interval);
 
  loop_->runInLoop(
      boost::bind(&TimerQueue::addTimerInLoop, this, timer));
 
  //addTimerInLoop(timer); 不能用这个,如果直接调用,那么addTimerInLoop断言失败
  return TimerId(timer, timer->sequence());
}
 
void TimerQueue::addTimerInLoop(Timer* timer)
{
  loop_->assertInLoopThread();
  // 插入一个定时器,有可能会使得最早到期的定时器发生改变
  bool earliestChanged = insert(timer);
 
  if (earliestChanged)
  {
    // 重置定时器的超时时刻(timerfd_settime)
    resetTimerfd(timerfd_, timer->expiration());
  }
}
 
bool TimerQueue::insert(Timer* timer)
{
  loop_->assertInLoopThread();
  assert(timers_.size() == activeTimers_.size());
  // 最早到期时间是否改变
  bool earliestChanged = false;
  Timestamp when = timer->expiration();
  TimerList::iterator it = timers_.begin();
  // 如果timers_为空或者when小于timers_中的最早到期时间
  if (it == timers_.end() || when < it->first)
  {
    earliestChanged = true;
  }
  {
    // 插入到timers_中
    std::pair<TimerList::iterator, bool> result
      = timers_.insert(Entry(when, timer));
    assert(result.second); (void)result;
  }
  {
    // 插入到activeTimers_中
    std::pair<ActiveTimerSet::iterator, bool> result
      = activeTimers_.insert(ActiveTimer(timer, timer->sequence()));
    assert(result.second); (void)result;
  }
 
  assert(timers_.size() == activeTimers_.size());
  return earliestChanged;
}
 
 
*/

程序输出

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
ubuntu@ubuntu-virtual-machine:~/pro/30$ ./build/debug/bin/reactor_test06
main(): pid = 29731, tid = 29731
20131022 03:03:50.315042Z 29732 TRACE updateChannel fd = 4 events = 3 - EPollPoller.cc:104
20131022 03:03:50.315588Z 29732 TRACE EventLoop EventLoop created 0xB786BF94 in thread 29732 - EventLoop.cc:62
20131022 03:03:50.315643Z 29732 TRACE updateChannel fd = 5 events = 3 - EPollPoller.cc:104
20131022 03:03:50.315708Z 29732 TRACE loop EventLoop 0xB786BF94 start looping - EventLoop.cc:94
20131022 03:03:50.315976Z 29732 TRACE poll 1 events happended - EPollPoller.cc:65
20131022 03:03:50.316472Z 29732 TRACE printActiveChannels {5: IN }  - EventLoop.cc:257
runInThread(): pid = 29731, tid = 29732
20131022 03:03:51.316968Z 29732 TRACE poll 1 events happended - EPollPoller.cc:65
20131022 03:03:51.317092Z 29732 TRACE printActiveChannels {5: IN }  - EventLoop.cc:257
20131022 03:03:53.317414Z 29732 TRACE poll 1 events happended - EPollPoller.cc:65
20131022 03:03:53.317539Z 29732 TRACE printActiveChannels {4: IN }  - EventLoop.cc:257
20131022 03:03:53.317639Z 29732 TRACE readTimerfd TimerQueue::handleRead() 1 at 1382411033.317576 - TimerQueue.cc:62
runInThread(): pid = 29731, tid = 29732
exit main().
20131022 03:03:54.317901Z 29732 TRACE poll 1 events happended - EPollPoller.cc:65
20131022 03:03:54.317966Z 29732 TRACE printActiveChannels {5: IN }  - EventLoop.cc:257
20131022 03:03:54.317990Z 29732 TRACE loop EventLoop 0xB786BF94 stop looping - EventLoop.cc:119
ubuntu@ubuntu-virtual-machine:~/pro/30$

[31] Socket封装

  • Endian.h 封装了字节序转换函数(全局函数,位于muduo::net::sockets名称空间中)。
  • SocketsOps.h/ SocketsOps.cc 封装了socket相关系统调用(全局函数,位于muduo::net::sockets名称空间中)。
  • Socket.h/Socket.cc(Socket类) 用RAII方法封装socket file descriptor
  • InetAddress.h/InetAddress.cc(InetAddress类) 网际地址sockaddr_in封装

Endian头文件

endian.h

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
// Copyright 2010, Shuo Chen.  All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
 
// Author: Shuo Chen (chenshuo at chenshuo dot com)
//
// This is a public header file, it must only include public header files.
 
#ifndef MUDUO_NET_ENDIAN_H
#define MUDUO_NET_ENDIAN_H
 
#include <stdint.h>
#include <endian.h>
 
namespace muduo
{
namespace net
{
namespace sockets
{
 
// the inline assembler code makes type blur,
// so we disable warnings for a while.
#if __GNUC_MINOR__ >= 6
#pragma GCC diagnostic push
#endif
#pragma GCC diagnostic ignored "-Wconversion"
#pragma GCC diagnostic ignored "-Wold-style-cast"
inline uint64_t hostToNetwork64(uint64_t host64)
{
  return htobe64(host64);
}
 
inline uint32_t hostToNetwork32(uint32_t host32)
{
  return htobe32(host32);
}
 
inline uint16_t hostToNetwork16(uint16_t host16)
{
  return htobe16(host16);
}
 
inline uint64_t networkToHost64(uint64_t net64)
{
  return be64toh(net64);
}
 
inline uint32_t networkToHost32(uint32_t net32)
{
  return be32toh(net32);
}
 
inline uint16_t networkToHost16(uint16_t net16)
{
  return be16toh(net16);
}
#if __GNUC_MINOR__ >= 6
#pragma GCC diagnostic pop
#else
#pragma GCC diagnostic error "-Wconversion"
#pragma GCC diagnostic error "-Wold-style-cast"
#endif
 
 
}
}
}
 
#endif  // MUDUO_NET_ENDIAN_H

InetAddress头文件

inetaddress.h

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
// Copyright 2010, Shuo Chen.  All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
 
// Author: Shuo Chen (chenshuo at chenshuo dot com)
//
// This is a public header file, it must only include public header files.
 
#ifndef MUDUO_NET_INETADDRESS_H
#define MUDUO_NET_INETADDRESS_H
 
#include <muduo/base/copyable.h>
#include <muduo/base/StringPiece.h>
 
#include <netinet/in.h>
 
namespace muduo
{
namespace net
{
 
///
/// Wrapper of sockaddr_in.
///
/// This is an POD interface class.
class InetAddress : public muduo::copyable
{
 public:
  /// Constructs an endpoint with given port number.
  /// Mostly used in TcpServer listening.
  // 仅仅指定port,不指定ip,则ip为INADDR_ANY(即0.0.0.0)
  explicit InetAddress(uint16_t port);
 
  /// Constructs an endpoint with given ip and port.
  /// @c ip should be "1.2.3.4"
  InetAddress(const StringPiece& ip, uint16_t port);
 
  /// Constructs an endpoint with given struct @c sockaddr_in
  /// Mostly used when accepting new connections
  InetAddress(const struct sockaddr_in& addr)
    : addr_(addr)
  { }
 
  string toIp() const;
  string toIpPort() const;
 
  // __attribute__ ((deprecated)) 表示该函数是过时的,被淘汰的
  // 这样使用该函数,在编译的时候,会发出警告
  /* 保留这个函数的原因是 为了 软件升级使用,最后不要调用这个函数,直接调用toIpPort就行了*/
  string toHostPort() const __attribute__ ((deprecated))
  { return toIpPort(); }
 
  // default copy/assignment are Okay
 
  const struct sockaddr_in& getSockAddrInet() const { return addr_; }
  void setSockAddrInet(const struct sockaddr_in& addr) { addr_ = addr; }
 
  uint32_t ipNetEndian() const { return addr_.sin_addr.s_addr; }
  uint16_t portNetEndian() const { return addr_.sin_port; }
 
 private:
  struct sockaddr_in addr_;
};
 
}
}
 
#endif  // MUDUO_NET_INETADDRESS_H

InetAddress源文件

inetaddress.cc

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
// Copyright 2010, Shuo Chen.  All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
 
// Author: Shuo Chen (chenshuo at chenshuo dot com)
 
#include <muduo/net/InetAddress.h>
 
#include <muduo/net/Endian.h>
#include <muduo/net/SocketsOps.h>
 
#include <strings.h>  // bzero
#include <netinet/in.h>
 
#include <boost/static_assert.hpp>
 
// INADDR_ANY use (type)value casting.
#pragma GCC diagnostic ignored "-Wold-style-cast"
static const in_addr_t kInaddrAny = INADDR_ANY;
#pragma GCC diagnostic error "-Wold-style-cast"
 
//     /* Structure describing an Internet socket address.  */
//     struct sockaddr_in {
//         sa_family_t    sin_family; /* address family: AF_INET */
//         uint16_t       sin_port;   /* port in network byte order */
//         struct in_addr sin_addr;   /* internet address */
//     };
 
//     /* Internet address. */
//     typedef uint32_t in_addr_t;
//     struct in_addr {
//         in_addr_t       s_addr;     /* address in network byte order */
//     };
 
using namespace muduo;
using namespace muduo::net;
 
BOOST_STATIC_ASSERT(sizeof(InetAddress) == sizeof(struct sockaddr_in));
 
InetAddress::InetAddress(uint16_t port)
{
  bzero(&addr_, sizeof addr_);
  addr_.sin_family = AF_INET;
  addr_.sin_addr.s_addr = sockets::hostToNetwork32(kInaddrAny);
  addr_.sin_port = sockets::hostToNetwork16(port);
}
 
InetAddress::InetAddress(const StringPiece& ip, uint16_t port)
{
  bzero(&addr_, sizeof addr_);
  sockets::fromIpPort(ip.data(), port, &addr_);
}
 
string InetAddress::toIpPort() const
{
  char buf[32];
  sockets::toIpPort(buf, sizeof buf, addr_);
  return buf;
}
 
string InetAddress::toIp() const
{
  char buf[32];
  sockets::toIp(buf, sizeof buf, addr_);
  return buf;
}

Sokcet头文件

Socket.h

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
// Copyright 2010, Shuo Chen.  All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
 
// Author: Shuo Chen (chenshuo at chenshuo dot com)
//
// This is an internal header file, you should not include this.
 
#ifndef MUDUO_NET_SOCKET_H
#define MUDUO_NET_SOCKET_H
 
#include <boost/noncopyable.hpp>
 
namespace muduo
{
///
/// TCP networking.
///
namespace net
{
 
class InetAddress;
 
///
/// Wrapper of socket file descriptor.
///
/// It closes the sockfd when desctructs.
/// It's thread safe, all operations are delagated to OS.
class Socket : boost::noncopyable
{
 public:
  explicit Socket(int sockfd)
    : sockfd_(sockfd) //已已经初始化的sockfd
  { }
 
  // Socket(Socket&&) // move constructor in C++11
  ~Socket();
 
  int fd() const { return sockfd_; }
 
  /// abort if address in use
  void bindAddress(const InetAddress& localaddr);
  /// abort if address in use
  void listen();
 
  /// On success, returns a non-negative integer that is
  /// a descriptor for the accepted socket, which has been
  /// set to non-blocking and close-on-exec. *peeraddr is assigned.
  /// On error, -1 is returned, and *peeraddr is untouched.
  int accept(InetAddress* peeraddr);
 
  void shutdownWrite();
 
  ///
  /// Enable/disable TCP_NODELAY (disable/enable Nagle's algorithm).
  ///
  // Nagle算法可以一定程度上避免网络拥塞
  // TCP_NODELAY选项可以禁用Nagle算法
  // 禁用Nagle算法,可以避免连续发包出现延迟,这对于编写低延迟的网络服务很重要
  void setTcpNoDelay(bool on);
 
  ///
  /// Enable/disable SO_REUSEADDR
  ///
  void setReuseAddr(bool on);
 
  ///
  /// Enable/disable SO_KEEPALIVE
  ///
  // TCP keepalive是指定期探测连接是否存在,如果应用层有心跳的话,这个选项不是必需要设置的
  void setKeepAlive(bool on);
 
 private:
  const int sockfd_;
};
 
}
}
#endif  // MUDUO_NET_SOCKET_H

Socket源文件

Socket.cc

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
// Copyright 2010, Shuo Chen.  All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
 
// Author: Shuo Chen (chenshuo at chenshuo dot com)
 
#include <muduo/net/Socket.h>
 
#include <muduo/net/InetAddress.h>
#include <muduo/net/SocketsOps.h>
 
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <strings.h>  // bzero
 
using namespace muduo;
using namespace muduo::net;
 
Socket::~Socket()
{
  sockets::close(sockfd_);
}
 
void Socket::bindAddress(const InetAddress& addr)
{
  sockets::bindOrDie(sockfd_, addr.getSockAddrInet());
}
 
void Socket::listen()
{
  sockets::listenOrDie(sockfd_);
}
 
int Socket::accept(InetAddress* peeraddr)
{
  struct sockaddr_in addr;
  bzero(&addr, sizeof addr);
  int connfd = sockets::accept(sockfd_, &addr);
  if (connfd >= 0)
  {
    peeraddr->setSockAddrInet(addr);
  }
  return connfd;
}
 
void Socket::shutdownWrite()
{
  sockets::shutdownWrite(sockfd_);
}
 
void Socket::setTcpNoDelay(bool on)
{
  int optval = on ? 1 : 0;
  ::setsockopt(sockfd_, IPPROTO_TCP, TCP_NODELAY,
               &optval, sizeof optval);
  // FIXME CHECK
}
 
void Socket::setReuseAddr(bool on)
{
  int optval = on ? 1 : 0;
  ::setsockopt(sockfd_, SOL_SOCKET, SO_REUSEADDR,
               &optval, sizeof optval);
  // FIXME CHECK
}
 
void Socket::setKeepAlive(bool on)
{
  int optval = on ? 1 : 0;
  ::setsockopt(sockfd_, SOL_SOCKET, SO_KEEPALIVE,
               &optval, sizeof optval);
  // FIXME CHECK
}

程序程序

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#include <muduo/net/InetAddress.h>
 
//#define BOOST_TEST_MODULE InetAddressTest
#define BOOST_TEST_MAIN
#define BOOST_TEST_DYN_LINK
#include <boost/test/unit_test.hpp>
 
using muduo::string;
using muduo::net::InetAddress;
 
BOOST_AUTO_TEST_CASE(testInetAddress)
{
  InetAddress addr1(1234);
  BOOST_CHECK_EQUAL(addr1.toIp(), string("0.0.0.0"));
  BOOST_CHECK_EQUAL(addr1.toIpPort(), string("0.0.0.0:1234"));
 
  InetAddress addr2("1.2.3.4", 8888);
  BOOST_CHECK_EQUAL(addr2.toIp(), string("1.2.3.4"));
  BOOST_CHECK_EQUAL(addr2.toIpPort(), string("1.2.3.4:8888"));
 
  InetAddress addr3("255.255.255.255", 65535);
  BOOST_CHECK_EQUAL(addr3.toIp(), string("255.255.255.255"));
  BOOST_CHECK_EQUAL(addr3.toIpPort(), string("255.255.255.255:65535"));
}

[32] Acceptor

  • Acceptor用于accept(2)接受TCP连接
  • Acceptor的数据成员包括SocketChannelAcceptorsocketlistening socket(即server socket)。Channel用于观察此socketreadable事件,并回调Accptor::handleRead(),后者调用accept(2)来接受新连接,并回调用户callback

Acceptor的头文件

Acceptor.h

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
// Copyright 2010, Shuo Chen.  All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
 
// Author: Shuo Chen (chenshuo at chenshuo dot com)
//
// This is an internal header file, you should not include this.
 
#ifndef MUDUO_NET_ACCEPTOR_H
#define MUDUO_NET_ACCEPTOR_H
 
#include <boost/function.hpp>
#include <boost/noncopyable.hpp>
 
#include <muduo/net/Channel.h>
#include <muduo/net/Socket.h>
 
namespace muduo
{
namespace net
{
 
class EventLoop;
class InetAddress;
 
///
/// Acceptor of incoming TCP connections.
///
class Acceptor : boost::noncopyable
{
 public:
  typedef boost::function<void (int sockfd,
                                const InetAddress&)> NewConnectionCallback;
 
  Acceptor(EventLoop* loop, const InetAddress& listenAddr);
  ~Acceptor();
 
  void setNewConnectionCallback(const NewConnectionCallback& cb)
  { newConnectionCallback_ = cb; }
 
  bool listenning() const { return listenning_; }
  void listen();
 
 private:
  /*这是监听套接字的可读事件的套接字*/
  void handleRead();
 
  EventLoop* loop_;
  Socket acceptSocket_; /*监听套接字*/
  ;/*通道 会观察acceptSocket_监听套接字的可读事件,这个channel所属的eventloop是loop_*/
  Channel acceptChannel_;
  NewConnectionCallback newConnectionCallback_;
  /*是否处于监听的状态*/
  bool listenning_;
  int idleFd_;
};
 
}
}
 
#endif  // MUDUO_NET_ACCEPTOR_H

Acceptor源文件

Acceptor.cc

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
// Copyright 2010, Shuo Chen.  All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
// Author: Shuo Chen (chenshuo at chenshuo dot com)
#include <muduo/net/Acceptor.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/InetAddress.h>
#include <muduo/net/SocketsOps.h>
 
#include <boost/bind.hpp>
 
#include <errno.h>
#include <fcntl.h>
//#include <sys/types.h>
//#include <sys/stat.h>
 
using namespace muduo;
using namespace muduo::net;
 
Acceptor::Acceptor(EventLoop* loop, const InetAddress& listenAddr)
  : loop_(loop),
  /*创建一个监听套接字*/
    acceptSocket_(sockets::createNonblockingOrDie()),
  /*acceptchannel 关注acceptSocket_套接字的事件 */
    acceptChannel_(loop, acceptSocket_.fd()),
    listenning_(false),
    /*预先准备一个文件描述符*/
    idleFd_(::open("/dev/null", O_RDONLY | O_CLOEXEC))
{
  assert(idleFd_ >= 0);
  /*设置地址重复利用*/
  acceptSocket_.setReuseAddr(true);
  /*绑定地址*/
  acceptSocket_.bindAddress(listenAddr);
  /*设置监听套接字“读”的套接字*/
  acceptChannel_.setReadCallback(
      boost::bind(&Acceptor::handleRead, this));
}
 
/*虚构函数*/
Acceptor::~Acceptor()
{
  /*先取消 acceptSocket_的所有事件,然后才移除*/
  acceptChannel_.disableAll();
  acceptChannel_.remove();
  ::close(idleFd_);
}
/*监听 监听套接字*/
void Acceptor::listen()
{/*断言在IO线程中*/
  loop_->assertInLoopThread();
 
  listenning_ = true;
  acceptSocket_.listen();
  /*关注acceptSocket_的可读事件*/
  acceptChannel_.enableReading();
}
 
/*监听套接字的可读事件的回调函数*/
void Acceptor::handleRead()
{
  /*断言在IO线程中*/
  loop_->assertInLoopThread();
  /*创建一个对等方的地址,这是客户端的地址*/
  InetAddress peerAddr(0);
  //FIXME loop until no more
  /*对等连接套接字*/
  int connfd = acceptSocket_.accept(&peerAddr);
  if (connfd >= 0)
  {
    // string hostport = peerAddr.toIpPort();
    // LOG_TRACE << "Accepts of " << hostport;
    /*回调上层(应用层)的回调函数*/
    if (newConnectionCallback_)
    {
      newConnectionCallback_(connfd, peerAddr);
    }
    /*如果应用层没有回调函数,则直接关闭此连接套接字*/
    else
    {
      sockets::close(connfd);
    }
  }
  else
  {
    // Read the section named "The special problem of
    // accept()ing when you can't" in libev's doc.
    // By Marc Lehmann, author of livev.
    /*如果是文件符超出限制*/
    if (errno == EMFILE)
    {
      /**/
      ::close(idleFd_);
      /*因为这是电平触发,所以为了防止一直触发,可以先接受,然后关闭掉*/
      idleFd_ = ::accept(acceptSocket_.fd(), NULL, NULL);
      ::close(idleFd_);
      idleFd_ = ::open("/dev/null", O_RDONLY | O_CLOEXEC);
    }
  }
}

测试程序

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#include <muduo/net/Acceptor.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/InetAddress.h>
#include <muduo/net/SocketsOps.h>
 
#include <stdio.h>
 
using namespace muduo;
using namespace muduo::net;
 
void newConnection(int sockfd, const InetAddress& peerAddr)
{
  printf("newConnection(): accepted a new connection from %s\n",
         peerAddr.toIpPort().c_str());
  ::write(sockfd, "How are you?\n", 13);
  sockets::close(sockfd);
}
 
int main()
{
  printf("main(): pid = %d\n", getpid());
 
  InetAddress listenAddr(8888);
  EventLoop loop;
 
  Acceptor acceptor(&loop, listenAddr);
  acceptor.setNewConnectionCallback(newConnection);
  acceptor.listen();
 
  loop.loop();
}

程序输出

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
ubuntu@ubuntu-virtual-machine:~$ telnet 127.0.0.1 8888
Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is '^]'.
How Are You ?
Connection closed by foreign host.
ubuntu@ubuntu-virtual-machine:~$ telnet 127.0.0.1 8888
Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is '^]'.
How Are You ?
Connection closed by foreign host.
ubuntu@ubuntu-virtual-machine:~$