[30] EventLoopThread

  • 任何一个线程,只要创建并运行了EventLoop,都称之为IO线程
  • IO线程不一定是主线程
  • muduo并发模型one loop per thread + threadpool
  • 为了方便今后使用,定义了EventLoopThread类,该类封装了IO线程EventLoopThread创建了一个线程
  • 在线程函数中创建了一个EvenLoop对象并调用EventLoop::loop

EventLoopThread头文件

eventloopthread.h

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
// Copyright 2010, Shuo Chen.  All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
 
// Author: Shuo Chen (chenshuo at chenshuo dot com)
//
// This is a public header file, it must only include public header files.
 
#ifndef MUDUO_NET_EVENTLOOPTHREAD_H
#define MUDUO_NET_EVENTLOOPTHREAD_H
 
#include <muduo/base/Condition.h>
#include <muduo/base/Mutex.h>
#include <muduo/base/Thread.h>
 
#include <boost/noncopyable.hpp>
 
namespace muduo
{
namespace net
{
 
class EventLoop;
 
class EventLoopThread : boost::noncopyable
{
 public:
  typedef boost::function<void(EventLoop*)> ThreadInitCallback;
 
  EventLoopThread(const ThreadInitCallback& cb =ThreadInitCallback());
  ~EventLoopThread();
  EventLoop* startLoop();   // 启动线程,该线程就成为了IO线程
 
 private:
  void threadFunc();        // 线程函数 ,这个线程函数启动起来之后,会创建一个eventloop对象,loop_指针会指向这个对象
 
  EventLoop* loop_;         // loop_指针指向一个EventLoop对象,一个IO线程有且只有一个eventloop对象
  bool exiting_;
  Thread thread_;       //线程对象
  MutexLock mutex_;
  Condition cond_;
  ThreadInitCallback callback_;     // 回调函数在EventLoop::loop IO线程的事件循环之前被调用
};
 
}
}
 
#endif  // MUDUO_NET_EVENTLOOPTHREAD_H

EventLoopThread源文件

eventloopthread.cc

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
// Copyright 2010, Shuo Chen.  All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
 
// Author: Shuo Chen (chenshuo at chenshuo dot com)
 
#include <muduo/net/EventLoopThread.h>
 
#include <muduo/net/EventLoop.h>
 
#include <boost/bind.hpp>
 
using namespace muduo;
using namespace muduo::net;
// typedef boost::function<void(EventLoop*)> ThreadInitCallback;
 
EventLoopThread::EventLoopThread(const ThreadInitCallback& cb)
  : loop_(NULL),
    exiting_(false),
    thread_(boost::bind(&EventLoopThread::threadFunc, this)),
    mutex_(),
    cond_(mutex_),
    callback_(cb)  //cb 默认为空
{
}
 
EventLoopThread::~EventLoopThread()
{
  exiting_ = true;
  loop_->quit();     // 退出IO线程,让IO线程的loop循环退出,从而退出了IO线程
  thread_.join();
}
 
/*启动线程,也就是启动loop函数EventLoopThread::threadFunc这个函数*/
EventLoop* EventLoopThread::startLoop()
{
  assert(!thread_.started());
  /*线程启动时会调用*/
  thread_.start();
 
  {
    MutexLockGuard lock(mutex_);
    /*eventloop对象创建完成*/
    while (loop_ == NULL)
    {
      cond_.wait();
    }
  }
 
  return loop_;
}
 
/*loop 的初始化函数*/
void EventLoopThread::threadFunc()
{
  EventLoop loop;
  /*创建eventloop对象*/
  if (callback_)
  {
    callback_(&loop);
  }
 
  {
    MutexLockGuard lock(mutex_);
    // loop_指针指向了一个栈上的对象,threadFunc函数退出之后,这个指针就失效了
    // threadFunc函数退出,就意味着线程退出了,EventLoopThread对象也就没有存在的价值了。
    // 因而不会有什么大的问题
    loop_ = &loop;
    cond_.notify();
  }
  /*启动loop()函数*/
  loop.loop();
  //assert(exiting_);
}

测试程序

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
#include <muduo/net/EventLoop.h>
#include <muduo/net/EventLoopThread.h>
 
#include <stdio.h>
 
using namespace muduo;
using namespace muduo::net;
 
void runInThread()
{
  printf("runInThread(): pid = %d, tid = %d\n",
         getpid(), CurrentThread::tid());
}
 
int main()
{
  printf("main(): pid = %d, tid = %d\n",
         getpid(), CurrentThread::tid());
 
  EventLoopThread loopThread;
  EventLoop* loop = loopThread.startLoop();
  // 异步调用runInThread,即将runInThread添加到loop对象所在IO线程,让该IO线程执行
  loop->runInLoop(runInThread);
  sleep(1);
  // runAfter内部也调用了runInLoop,所以这里也是异步调用
  loop->runAfter(2, runInThread);
  sleep(3);
  loop->quit();
 
  printf("exit main().\n");
}
 
/*
TimerId TimerQueue::addTimer(const TimerCallback& cb,
                             Timestamp when,
                             double interval)
{
  Timer* timer = new Timer(cb, when, interval);
 
  loop_->runInLoop(
      boost::bind(&TimerQueue::addTimerInLoop, this, timer));
 
  //addTimerInLoop(timer); 不能用这个,如果直接调用,那么addTimerInLoop断言失败
  return TimerId(timer, timer->sequence());
}
 
void TimerQueue::addTimerInLoop(Timer* timer)
{
  loop_->assertInLoopThread();
  // 插入一个定时器,有可能会使得最早到期的定时器发生改变
  bool earliestChanged = insert(timer);
 
  if (earliestChanged)
  {
    // 重置定时器的超时时刻(timerfd_settime)
    resetTimerfd(timerfd_, timer->expiration());
  }
}
 
bool TimerQueue::insert(Timer* timer)
{
  loop_->assertInLoopThread();
  assert(timers_.size() == activeTimers_.size());
  // 最早到期时间是否改变
  bool earliestChanged = false;
  Timestamp when = timer->expiration();
  TimerList::iterator it = timers_.begin();
  // 如果timers_为空或者when小于timers_中的最早到期时间
  if (it == timers_.end() || when < it->first)
  {
    earliestChanged = true;
  }
  {
    // 插入到timers_中
    std::pair<TimerList::iterator, bool> result
      = timers_.insert(Entry(when, timer));
    assert(result.second); (void)result;
  }
  {
    // 插入到activeTimers_中
    std::pair<ActiveTimerSet::iterator, bool> result
      = activeTimers_.insert(ActiveTimer(timer, timer->sequence()));
    assert(result.second); (void)result;
  }
 
  assert(timers_.size() == activeTimers_.size());
  return earliestChanged;
}
 
*/

程序输出

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
ubuntu@ubuntu-virtual-machine:~/pro/30$ ./build/debug/bin/reactor_test06
main(): pid = 29731, tid = 29731
20131022 03:03:50.315042Z 29732 TRACE updateChannel fd = 4 events = 3 - EPollPoller.cc:104
20131022 03:03:50.315588Z 29732 TRACE EventLoop EventLoop created 0xB786BF94 in thread 29732 - EventLoop.cc:62
20131022 03:03:50.315643Z 29732 TRACE updateChannel fd = 5 events = 3 - EPollPoller.cc:104
20131022 03:03:50.315708Z 29732 TRACE loop EventLoop 0xB786BF94 start looping - EventLoop.cc:94
20131022 03:03:50.315976Z 29732 TRACE poll 1 events happended - EPollPoller.cc:65
20131022 03:03:50.316472Z 29732 TRACE printActiveChannels {5: IN }  - EventLoop.cc:257
runInThread(): pid = 29731, tid = 29732
20131022 03:03:51.316968Z 29732 TRACE poll 1 events happended - EPollPoller.cc:65
20131022 03:03:51.317092Z 29732 TRACE printActiveChannels {5: IN }  - EventLoop.cc:257
20131022 03:03:53.317414Z 29732 TRACE poll 1 events happended - EPollPoller.cc:65
20131022 03:03:53.317539Z 29732 TRACE printActiveChannels {4: IN }  - EventLoop.cc:257
20131022 03:03:53.317639Z 29732 TRACE readTimerfd TimerQueue::handleRead() 1 at 1382411033.317576 - TimerQueue.cc:62
runInThread(): pid = 29731, tid = 29732
exit main().
20131022 03:03:54.317901Z 29732 TRACE poll 1 events happended - EPollPoller.cc:65
20131022 03:03:54.317966Z 29732 TRACE printActiveChannels {5: IN }  - EventLoop.cc:257
20131022 03:03:54.317990Z 29732 TRACE loop EventLoop 0xB786BF94 stop looping - EventLoop.cc:119
ubuntu@ubuntu-virtual-machine:~/pro/30$

[33] TcpConnection生存期管理

时序图

![http://laohanlinux.github.io/images/img/blog/muduo-base-library-2/34.png]

主要的改变时TcpConnection 和 channel 连个对象

TcpConnection

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
// Copyright 2010, Shuo Chen.  All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
 
// Author: Shuo Chen (chenshuo at chenshuo dot com)
//
// This is a public header file, it must only include public header files.
 
#ifndef MUDUO_NET_TCPCONNECTION_H
#define MUDUO_NET_TCPCONNECTION_H
 
#include <muduo/base/Mutex.h>
#include <muduo/base/StringPiece.h>
#include <muduo/base/Types.h>
#include <muduo/net/Callbacks.h>
//#include <muduo/net/Buffer.h>
#include <muduo/net/InetAddress.h>
 
//#include <boost/any.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/noncopyable.hpp>
#include <boost/scoped_ptr.hpp>
#include <boost/shared_ptr.hpp>
 
namespace muduo
{
namespace net
{
 
class Channel;
class EventLoop;
class Socket;
 
 
///
/// TCP connection, for both client and server usage.
///
/// This is an interface class, so don't expose too much details.
class TcpConnection : boost::noncopyable,
                      public boost::enable_shared_from_this<TcpConnection>
{
 public:
  /// Constructs a TcpConnection with a connected sockfd
  ///
  /// User should not create this object.
  TcpConnection(EventLoop* loop,
                const string& name,
                int sockfd,
                const InetAddress& localAddr,
                const InetAddress& peerAddr);
  ~TcpConnection();
 
  EventLoop* getLoop() const { return loop_; }
  const string& name() const { return name_; }
  const InetAddress& localAddress() { return localAddr_; }
  const InetAddress& peerAddress() { return peerAddr_; }
  bool connected() const { return state_ == kConnected; }
 
  void setConnectionCallback(const ConnectionCallback& cb)
  { connectionCallback_ = cb; }
 
  void setMessageCallback(const MessageCallback& cb)
  { messageCallback_ = cb; }
 
  /// Internal use only.
  void setCloseCallback(const CloseCallback& cb)
  { closeCallback_ = cb; }
 
  // called when TcpServer accepts a new connection
  void connectEstablished();   // should be called only once
  // called when TcpServer has removed me from its map
  void connectDestroyed();  // should be called only once
 
 private:
  enum StateE { kDisconnected, kConnecting, kConnected, kDisconnecting };
  void handleRead(Timestamp receiveTime);
  void handleClose();
  void handleError();
  void setState(StateE s) { state_ = s; }
 
  EventLoop* loop_;         // 所属EventLoop
  string name_;             // 连接名
  StateE state_;  // FIXME: use atomic variable
  // we don't expose those classes to client.
  boost::scoped_ptr<Socket> socket_;
  boost::scoped_ptr<Channel> channel_;
  InetAddress localAddr_;
  InetAddress peerAddr_;
  ConnectionCallback connectionCallback_;
  MessageCallback messageCallback_;
  /*内部断开的回调函数*/
  CloseCallback closeCallback_;
};
 
typedef boost::shared_ptr<TcpConnection> TcpConnectionPtr;
 
}
}
 
#endif  // MUDUO_NET_TCPCONNECTION_H
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
// Copyright 2010, Shuo Chen.  All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
 
// Author: Shuo Chen (chenshuo at chenshuo dot com)
 
#include <muduo/net/TcpConnection.h>
 
#include <muduo/base/Logging.h>
#include <muduo/net/Channel.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/Socket.h>
#include <muduo/net/SocketsOps.h>
 
#include <boost/bind.hpp>
 
#include <errno.h>
#include <stdio.h>
 
using namespace muduo;
using namespace muduo::net;
/*
void muduo::net::defaultConnectionCallback(const TcpConnectionPtr& conn)
{
  LOG_TRACE << conn->localAddress().toIpPort() << " -> "
            << conn->peerAddress().toIpPort() << " is "
            << (conn->connected() ? "UP" : "DOWN");
}
 
void muduo::net::defaultMessageCallback(const TcpConnectionPtr&,
                                        Buffer* buf,
                                        Timestamp)
{
  buf->retrieveAll();
}
*/
TcpConnection::TcpConnection(EventLoop* loop,
                             const string& nameArg,
                             int sockfd,
                             const InetAddress& localAddr,
                             const InetAddress& peerAddr)
  : loop_(CHECK_NOTNULL(loop)),
    name_(nameArg),
    state_(kConnecting),
    socket_(new Socket(sockfd)),
    channel_(new Channel(loop, sockfd)),
    localAddr_(localAddr),
    peerAddr_(peerAddr)/*,
    highWaterMark_(64*1024*1024)*/
{
  // 通道可读事件到来的时候,回调TcpConnection::handleRead,_1是事件发生时间
  channel_->setReadCallback(
      boost::bind(&TcpConnection::handleRead, this, _1));
  // 连接关闭,回调TcpConnection::handleClose
  channel_->setCloseCallback(
      boost::bind(&TcpConnection::handleClose, this));
  // 发生错误,回调TcpConnection::handleError
  channel_->setErrorCallback(
      boost::bind(&TcpConnection::handleError, this));
  LOG_DEBUG << "TcpConnection::ctor[" <<  name_ << "] at " << this
            << " fd=" << sockfd;
  socket_->setKeepAlive(true);
}
 
TcpConnection::~TcpConnection()
{
  LOG_DEBUG << "TcpConnection::dtor[" <<  name_ << "] at " << this
            << " fd=" << channel_->fd();
}
 
void TcpConnection::connectEstablished()
{
  loop_->assertInLoopThread();
  assert(state_ == kConnecting);
  setState(kConnected);
  LOG_TRACE << "[3] usecount=" << shared_from_this().use_count();
  channel_->tie(shared_from_this()); /*弱引用,不会使指针引用计算加一*/
  channel_->enableReading(); // TcpConnection所对应的通道加入到Poller关注
 
  /*用户的回调函数*/
  connectionCallback_(shared_from_this());
  LOG_TRACE << "[4] usecount=" << shared_from_this().use_count();
}
 
void TcpConnection::connectDestroyed()
{
  loop_->assertInLoopThread();
  if (state_ == kConnected)
  {
    setState(kDisconnected);
    channel_->disableAll();
 
    connectionCallback_(shared_from_this());
  }
  channel_->remove();
}
 
void TcpConnection::handleRead(Timestamp receiveTime)
{
  /*
  loop_->assertInLoopThread();
  int savedErrno = 0;
  ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno);
  if (n > 0)
  {
    messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);
  }
  else if (n == 0)
  {
    handleClose();
  }
  else
  {
    errno = savedErrno;
    LOG_SYSERR << "TcpConnection::handleRead";
    handleError();
  }
  */
  loop_->assertInLoopThread();
  int savedErrno = 0;
  char buf[65536];
  ssize_t n = ::read(channel_->fd(), buf, sizeof buf);
  if (n > 0)
  {
    messageCallback_(shared_from_this(), buf, n);
  }
  /*如果是连接断开(连接断开也是一种可读事件),调用handclose*/
  else if (n == 0)
  {
    handleClose();
  }
  else
  {
    errno = savedErrno;
    LOG_SYSERR << "TcpConnection::handleRead";
    handleError();
  }
 
}
 
/*连接断开的内部回调函数*/
void TcpConnection::handleClose()
{
  loop_->assertInLoopThread();
  LOG_TRACE << "fd = " << channel_->fd() << " state = " << state_;
  assert(state_ == kConnected || state_ == kDisconnecting);
  // we don't close fd, leave it to dtor, so we can find leaks easily.
  setState(kDisconnected);
  channel_->disableAll();
 
  TcpConnectionPtr guardThis(shared_from_this());
  connectionCallback_(guardThis);       // 这一行,可以不调用
  LOG_TRACE << "[7] usecount=" << guardThis.use_count();
  // must be the last line
  closeCallback_(guardThis);    // 调用TcpServer::removeConnection
  LOG_TRACE << "[11] usecount=" << guardThis.use_count();
}
 
void TcpConnection::handleError()
{
  int err = sockets::getSocketError(channel_->fd());
  LOG_ERROR << "TcpConnection::handleError [" << name_
            << "] - SO_ERROR = " << err << " " << strerror_tl(err);
}

Channel

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
// Copyright 2010, Shuo Chen.  All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
 
// Author: Shuo Chen (chenshuo at chenshuo dot com)
//
// This is an internal header file, you should not include this.
 
#ifndef MUDUO_NET_CHANNEL_H
#define MUDUO_NET_CHANNEL_H
 
#include <boost/function.hpp>
#include <boost/noncopyable.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/weak_ptr.hpp>
 
#include <muduo/base/Timestamp.h>
 
namespace muduo
{
namespace net
{
 
class EventLoop;
 
///
/// A selectable I/O channel.
///
/// This class doesn't own the file descriptor.
/// The file descriptor could be a socket,
/// an eventfd, a timerfd, or a signalfd
class Channel : boost::noncopyable
{
 public:
  typedef boost::function<void()> EventCallback;
  typedef boost::function<void(Timestamp)> ReadEventCallback;
 
  Channel(EventLoop* loop, int fd);
  ~Channel();
 
  void handleEvent(Timestamp receiveTime);
  void setReadCallback(const ReadEventCallback& cb)
  { readCallback_ = cb; }
  void setWriteCallback(const EventCallback& cb)
  { writeCallback_ = cb; }
  void setCloseCallback(const EventCallback& cb)
  { closeCallback_ = cb; }
  void setErrorCallback(const EventCallback& cb)
  { errorCallback_ = cb; }
 
  /// Tie this channel to the owner object managed by shared_ptr,
  /// prevent the owner object being destroyed in handleEvent.
  void tie(const boost::shared_ptr<void>&);
 
  int fd() const { return fd_; }
  int events() const { return events_; }
  void set_revents(int revt) { revents_ = revt; } // used by pollers
  // int revents() const { return revents_; }
  bool isNoneEvent() const { return events_ == kNoneEvent; }
 
  void enableReading() { events_ |= kReadEvent; update(); }
  // void disableReading() { events_ &= ~kReadEvent; update(); }
  void enableWriting() { events_ |= kWriteEvent; update(); }
  void disableWriting() { events_ &= ~kWriteEvent; update(); }
  void disableAll() { events_ = kNoneEvent; update(); }
  bool isWriting() const { return events_ & kWriteEvent; }
 
  // for Poller
  int index() { return index_; }
  void set_index(int idx) { index_ = idx; }
 
  // for debug
  string reventsToString() const;
 
  void doNotLogHup() { logHup_ = false; }
 
  EventLoop* ownerLoop() { return loop_; }
  void remove();
 
 private:
  void update();
  void handleEventWithGuard(Timestamp receiveTime);
 
  static const int kNoneEvent;
  static const int kReadEvent;
  static const int kWriteEvent;
 
  EventLoop* loop_;         // 所属EventLoop
  const int  fd_;           // 文件描述符,但不负责关闭该文件描述符
  int        events_;       // 关注的事件
  int        revents_;      // poll/epoll返回的事件
  int        index_;        // used by Poller.表示在poll的事件数组中的序号
  bool       logHup_;       // for POLLHUP
 
  /*弱引用指针*/
  boost::weak_ptr<void> tie_;
  bool tied_;
  bool eventHandling_;      // 是否处于处理事件中
  ReadEventCallback readCallback_;
  EventCallback writeCallback_;
  EventCallback closeCallback_;
  EventCallback errorCallback_;
};
 
}
}
#endif  // MUDUO_NET_CHANNEL_H
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
// Copyright 2010, Shuo Chen.  All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
 
// Author: Shuo Chen (chenshuo at chenshuo dot com)
 
#include <muduo/base/Logging.h>
#include <muduo/net/Channel.h>
#include <muduo/net/EventLoop.h>
 
#include <sstream>
 
#include <poll.h>
 
using namespace muduo;
using namespace muduo::net;
 
const int Channel::kNoneEvent = 0;
const int Channel::kReadEvent = POLLIN | POLLPRI;
const int Channel::kWriteEvent = POLLOUT;
 
Channel::Channel(EventLoop* loop, int fd__)
  : loop_(loop),
    fd_(fd__),
    events_(0),
    revents_(0),
    index_(-1),
    logHup_(true),
    tied_(false),
    eventHandling_(false)
{
}
 
Channel::~Channel()
{
  assert(!eventHandling_);
}
 
void Channel::tie(const boost::shared_ptr<void>& obj)
{
  tie_ = obj;
  tied_ = true;
}
 
void Channel::update()
{
  loop_->updateChannel(this);
}
 
// 调用这个函数之前确保调用disableAll
void Channel::remove()
{
  assert(isNoneEvent());
  loop_->removeChannel(this);
}
 
void Channel::handleEvent(Timestamp receiveTime)
{
  boost::shared_ptr<void> guard;
  if (tied_)
  {
    guard = tie_.lock();
    if (guard)
    {
      LOG_TRACE << "[6] usecount=" << guard.use_count();
      handleEventWithGuard(receiveTime);
      LOG_TRACE << "[12] usecount=" << guard.use_count();
    }
  }
  else
  {
    handleEventWithGuard(receiveTime);
  }
}
 
void Channel::handleEventWithGuard(Timestamp receiveTime)
{
  eventHandling_ = true;
  if ((revents_ & POLLHUP) && !(revents_ & POLLIN))
  {
    if (logHup_)
    {
      LOG_WARN << "Channel::handle_event() POLLHUP";
    }
    if (closeCallback_) closeCallback_();
  }
 
  if (revents_ & POLLNVAL)
  {
    LOG_WARN << "Channel::handle_event() POLLNVAL";
  }
 
  if (revents_ & (POLLERR | POLLNVAL))
  {
    if (errorCallback_) errorCallback_();
  }
  if (revents_ & (POLLIN | POLLPRI | POLLRDHUP))
  {
    if (readCallback_) readCallback_(receiveTime);
  }
  if (revents_ & POLLOUT)
  {
    if (writeCallback_) writeCallback_();
  }
  eventHandling_ = false;
}
 
string Channel::reventsToString() const
{
  std::ostringstream oss;
  oss << fd_ << ": ";
  if (revents_ & POLLIN)
    oss << "IN ";
  if (revents_ & POLLPRI)
    oss << "PRI ";
  if (revents_ & POLLOUT)
    oss << "OUT ";
  if (revents_ & POLLHUP)
    oss << "HUP ";
  if (revents_ & POLLRDHUP)
    oss << "RDHUP ";
  if (revents_ & POLLERR)
    oss << "ERR ";
  if (revents_ & POLLNVAL)
    oss << "NVAL ";
 
  return oss.str().c_str();
}

测试程序

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
#include <muduo/net/TcpServer.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/InetAddress.h>
 
#include <boost/bind.hpp>
 
#include <stdio.h>
 
using namespace muduo;
using namespace muduo::net;
 
class TestServer
{
 public:
  TestServer(EventLoop* loop,
             const InetAddress& listenAddr)
    : loop_(loop),
      server_(loop, listenAddr, "TestServer")
  {
    server_.setConnectionCallback(
        boost::bind(&TestServer::onConnection, this, _1));
    server_.setMessageCallback(
        boost::bind(&TestServer::onMessage, this, _1, _2, _3));
  }
 
  void start()
  {
      server_.start();
  }
 
 private:
  void onConnection(const TcpConnectionPtr& conn)
  {
    if (conn->connected())
    {
      printf("onConnection(): new connection [%s] from %s\n",
             conn->name().c_str(),
             conn->peerAddress().toIpPort().c_str());
    }
    //增加了对等方关闭时的回调处理函数
    else
    {
      printf("onConnection(): connection [%s] is down\n",
             conn->name().c_str());
    }
  }
 
  void onMessage(const TcpConnectionPtr& conn,
                   const char* data,
                   ssize_t len)
  {
    printf("onMessage(): received %zd bytes from connection [%s]\n",
           len, conn->name().c_str());
  }
 
  EventLoop* loop_;
  TcpServer server_;
};
 
 
int main()
{
  printf("main(): pid = %d\n", getpid());
 
  InetAddress listenAddr(8888);
  EventLoop loop;
 
  TestServer server(&loop, listenAddr);
  server.start();
 
  loop.loop();
}