[30] EventLoopThread

  • 任何一个线程,只要创建并运行了EventLoop,都称之为IO线程
  • IO线程不一定是主线程
  • muduo并发模型one loop per thread + threadpool
  • 为了方便今后使用,定义了EventLoopThread类,该类封装了IO线程EventLoopThread创建了一个线程
  • 在线程函数中创建了一个EvenLoop对象并调用EventLoop::loop



// Copyright 2010, Shuo Chen.  All rights reserved.
// http://code.google.com/p/muduo/
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
// Author: Shuo Chen (chenshuo at chenshuo dot com)
// This is a public header file, it must only include public header files.
#include <muduo/base/Condition.h>
#include <muduo/base/Mutex.h>
#include <muduo/base/Thread.h>
#include <boost/noncopyable.hpp>
namespace muduo
namespace net
class EventLoop;
class EventLoopThread : boost::noncopyable
  typedef boost::function<void(EventLoop*)> ThreadInitCallback;
  EventLoopThread(const ThreadInitCallback& cb =ThreadInitCallback());
  EventLoop* startLoop();   // 启动线程,该线程就成为了IO线程
  void threadFunc();        // 线程函数 ,这个线程函数启动起来之后,会创建一个eventloop对象,loop_指针会指向这个对象
  EventLoop* loop_;         // loop_指针指向一个EventLoop对象,一个IO线程有且只有一个eventloop对象
  bool exiting_;
  Thread thread_;       //线程对象
  MutexLock mutex_;
  Condition cond_;
  ThreadInitCallback callback_;     // 回调函数在EventLoop::loop IO线程的事件循环之前被调用



#include <muduo/net/EventLoopThread.h>
#include <muduo/net/EventLoop.h>
#include <boost/bind.hpp>
using namespace muduo;
using namespace muduo::net;
// typedef boost::function<void(EventLoop*)> ThreadInitCallback;
EventLoopThread::EventLoopThread(const ThreadInitCallback& cb)
  : loop_(NULL),
    thread_(boost::bind(&EventLoopThread::threadFunc, this)),
    callback_(cb)  //cb 默认为空
  exiting_ = true;
  loop_->quit();     // 退出IO线程,让IO线程的loop循环退出,从而退出了IO线程
EventLoop* EventLoopThread::startLoop()
    MutexLockGuard lock(mutex_);
    while (loop_ == NULL)
  return loop_;
/*loop 的初始化函数*/
void EventLoopThread::threadFunc()
  EventLoop loop;
  if (callback_)
    MutexLockGuard lock(mutex_);
    // loop_指针指向了一个栈上的对象,threadFunc函数退出之后,这个指针就失效了
    // threadFunc函数退出,就意味着线程退出了,EventLoopThread对象也就没有存在的价值了。
    // 因而不会有什么大的问题
    loop_ = &loop;


#include <muduo/net/EventLoop.h>
#include <muduo/net/EventLoopThread.h>
#include <stdio.h>
using namespace muduo;
using namespace muduo::net;
void runInThread()
  printf("runInThread(): pid = %d, tid = %d\n",
         getpid(), CurrentThread::tid());
int main()
  printf("main(): pid = %d, tid = %d\n",
         getpid(), CurrentThread::tid());
  EventLoopThread loopThread;
  EventLoop* loop = loopThread.startLoop();
  // 异步调用runInThread,即将runInThread添加到loop对象所在IO线程,让该IO线程执行
  // runAfter内部也调用了runInLoop,所以这里也是异步调用
  loop->runAfter(2, runInThread);
  printf("exit main().\n");
TimerId TimerQueue::addTimer(const TimerCallback& cb,
                             Timestamp when,
                             double interval)
  Timer* timer = new Timer(cb, when, interval);
      boost::bind(&TimerQueue::addTimerInLoop, this, timer));
  //addTimerInLoop(timer); 不能用这个,如果直接调用,那么addTimerInLoop断言失败
  return TimerId(timer, timer->sequence());
void TimerQueue::addTimerInLoop(Timer* timer)
  // 插入一个定时器,有可能会使得最早到期的定时器发生改变
  bool earliestChanged = insert(timer);
  if (earliestChanged)
    // 重置定时器的超时时刻(timerfd_settime)
    resetTimerfd(timerfd_, timer->expiration());
bool TimerQueue::insert(Timer* timer)
  assert(timers_.size() == activeTimers_.size());
  // 最早到期时间是否改变
  bool earliestChanged = false;
  Timestamp when = timer->expiration();
  TimerList::iterator it = timers_.begin();
  // 如果timers_为空或者when小于timers_中的最早到期时间
  if (it == timers_.end() || when < it->first)
    earliestChanged = true;
    // 插入到timers_中
    std::pair<TimerList::iterator, bool> result
      = timers_.insert(Entry(when, timer));
    assert(result.second); (void)result;
    // 插入到activeTimers_中
    std::pair<ActiveTimerSet::iterator, bool> result
      = activeTimers_.insert(ActiveTimer(timer, timer->sequence()));
    assert(result.second); (void)result;
  assert(timers_.size() == activeTimers_.size());
  return earliestChanged;


ubuntu@ubuntu-virtual-machine:~/pro/30$ ./build/debug/bin/reactor_test06
main(): pid = 29731, tid = 29731
20131022 03:03:50.315042Z 29732 TRACE updateChannel fd = 4 events = 3 - EPollPoller.cc:104
20131022 03:03:50.315588Z 29732 TRACE EventLoop EventLoop created 0xB786BF94 in thread 29732 - EventLoop.cc:62
20131022 03:03:50.315643Z 29732 TRACE updateChannel fd = 5 events = 3 - EPollPoller.cc:104
20131022 03:03:50.315708Z 29732 TRACE loop EventLoop 0xB786BF94 start looping - EventLoop.cc:94
20131022 03:03:50.315976Z 29732 TRACE poll 1 events happended - EPollPoller.cc:65
20131022 03:03:50.316472Z 29732 TRACE printActiveChannels {5: IN }  - EventLoop.cc:257
runInThread(): pid = 29731, tid = 29732
20131022 03:03:51.316968Z 29732 TRACE poll 1 events happended - EPollPoller.cc:65
20131022 03:03:51.317092Z 29732 TRACE printActiveChannels {5: IN }  - EventLoop.cc:257
20131022 03:03:53.317414Z 29732 TRACE poll 1 events happended - EPollPoller.cc:65
20131022 03:03:53.317539Z 29732 TRACE printActiveChannels {4: IN }  - EventLoop.cc:257
20131022 03:03:53.317639Z 29732 TRACE readTimerfd TimerQueue::handleRead() 1 at 1382411033.317576 - TimerQueue.cc:62
runInThread(): pid = 29731, tid = 29732
exit main().
20131022 03:03:54.317901Z 29732 TRACE poll 1 events happended - EPollPoller.cc:65
20131022 03:03:54.317966Z 29732 TRACE printActiveChannels {5: IN }  - EventLoop.cc:257
20131022 03:03:54.317990Z 29732 TRACE loop EventLoop 0xB786BF94 stop looping - EventLoop.cc:119

[33] TcpConnection生存期管理



主要的改变时TcpConnection 和 channel 连个对象


#include <muduo/base/Mutex.h>
#include <muduo/base/StringPiece.h>
#include <muduo/base/Types.h>
#include <muduo/net/Callbacks.h>
//#include <muduo/net/Buffer.h>
#include <muduo/net/InetAddress.h>
//#include <boost/any.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/noncopyable.hpp>
#include <boost/scoped_ptr.hpp>
#include <boost/shared_ptr.hpp>
namespace muduo
namespace net
class Channel;
class EventLoop;
class Socket;
/// TCP connection, for both client and server usage.
/// This is an interface class, so don't expose too much details.
class TcpConnection : boost::noncopyable,
                      public boost::enable_shared_from_this<TcpConnection>
  /// Constructs a TcpConnection with a connected sockfd
  /// User should not create this object.
  TcpConnection(EventLoop* loop,
                const string& name,
                int sockfd,
                const InetAddress& localAddr,
                const InetAddress& peerAddr);
  EventLoop* getLoop() const { return loop_; }
  const string& name() const { return name_; }
  const InetAddress& localAddress() { return localAddr_; }
  const InetAddress& peerAddress() { return peerAddr_; }
  bool connected() const { return state_ == kConnected; }
  void setConnectionCallback(const ConnectionCallback& cb)
  { connectionCallback_ = cb; }
  void setMessageCallback(const MessageCallback& cb)
  { messageCallback_ = cb; }
  /// Internal use only.
  void setCloseCallback(const CloseCallback& cb)
  { closeCallback_ = cb; }
  // called when TcpServer accepts a new connection
  void connectEstablished();   // should be called only once
  // called when TcpServer has removed me from its map
  void connectDestroyed();  // should be called only once
  enum StateE { kDisconnected, kConnecting, kConnected, kDisconnecting };
  void handleRead(Timestamp receiveTime);
  void handleClose();
  void handleError();
  void setState(StateE s) { state_ = s; }
  EventLoop* loop_;         // 所属EventLoop
  string name_;             // 连接名
  StateE state_;  // FIXME: use atomic variable
  // we don't expose those classes to client.
  boost::scoped_ptr<Socket> socket_;
  boost::scoped_ptr<Channel> channel_;
  InetAddress localAddr_;
  InetAddress peerAddr_;
  ConnectionCallback connectionCallback_;
  MessageCallback messageCallback_;
  CloseCallback closeCallback_;
typedef boost::shared_ptr<TcpConnection> TcpConnectionPtr;
#include <muduo/net/TcpConnection.h>
#include <muduo/base/Logging.h>
#include <muduo/net/Channel.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/Socket.h>
#include <muduo/net/SocketsOps.h>
#include <boost/bind.hpp>
#include <errno.h>
#include <stdio.h>
using namespace muduo;
using namespace muduo::net;
void muduo::net::defaultConnectionCallback(const TcpConnectionPtr& conn)
  LOG_TRACE << conn->localAddress().toIpPort() << " -> "
            << conn->peerAddress().toIpPort() << " is "
            << (conn->connected() ? "UP" : "DOWN");
void muduo::net::defaultMessageCallback(const TcpConnectionPtr&,
                                        Buffer* buf,
TcpConnection::TcpConnection(EventLoop* loop,
                             const string& nameArg,
                             int sockfd,
                             const InetAddress& localAddr,
                             const InetAddress& peerAddr)
  : loop_(CHECK_NOTNULL(loop)),
    socket_(new Socket(sockfd)),
    channel_(new Channel(loop, sockfd)),
  // 通道可读事件到来的时候,回调TcpConnection::handleRead,_1是事件发生时间
      boost::bind(&TcpConnection::handleRead, this, _1));
  // 连接关闭,回调TcpConnection::handleClose
      boost::bind(&TcpConnection::handleClose, this));
  // 发生错误,回调TcpConnection::handleError
      boost::bind(&TcpConnection::handleError, this));
  LOG_DEBUG << "TcpConnection::ctor[" <<  name_ << "] at " << this
            << " fd=" << sockfd;
  LOG_DEBUG << "TcpConnection::dtor[" <<  name_ << "] at " << this
            << " fd=" << channel_->fd();
void TcpConnection::connectEstablished()
  assert(state_ == kConnecting);
  LOG_TRACE << "[3] usecount=" << shared_from_this().use_count();
  channel_->tie(shared_from_this()); /*弱引用,不会使指针引用计算加一*/
  channel_->enableReading(); // TcpConnection所对应的通道加入到Poller关注
  LOG_TRACE << "[4] usecount=" << shared_from_this().use_count();
void TcpConnection::connectDestroyed()
  if (state_ == kConnected)
void TcpConnection::handleRead(Timestamp receiveTime)
  int savedErrno = 0;
  ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno);
  if (n > 0)
    messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);
  else if (n == 0)
    errno = savedErrno;
    LOG_SYSERR << "TcpConnection::handleRead";
  int savedErrno = 0;
  char buf[65536];
  ssize_t n = ::read(channel_->fd(), buf, sizeof buf);
  if (n > 0)
    messageCallback_(shared_from_this(), buf, n);
  else if (n == 0)
    errno = savedErrno;
    LOG_SYSERR << "TcpConnection::handleRead";
void TcpConnection::handleClose()
  LOG_TRACE << "fd = " << channel_->fd() << " state = " << state_;
  assert(state_ == kConnected || state_ == kDisconnecting);
  // we don't close fd, leave it to dtor, so we can find leaks easily.
  TcpConnectionPtr guardThis(shared_from_this());
  connectionCallback_(guardThis);       // 这一行,可以不调用
  LOG_TRACE << "[7] usecount=" << guardThis.use_count();
  // must be the last line
  closeCallback_(guardThis);    // 调用TcpServer::removeConnection
  LOG_TRACE << "[11] usecount=" << guardThis.use_count();
void TcpConnection::handleError()
  int err = sockets::getSocketError(channel_->fd());
  LOG_ERROR << "TcpConnection::handleError [" << name_
            << "] - SO_ERROR = " << err << " " << strerror_tl(err);


#include <boost/function.hpp>
#include <boost/noncopyable.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/weak_ptr.hpp>
#include <muduo/base/Timestamp.h>
namespace muduo
namespace net
class EventLoop;
/// A selectable I/O channel.
/// This class doesn't own the file descriptor.
/// The file descriptor could be a socket,
/// an eventfd, a timerfd, or a signalfd
class Channel : boost::noncopyable
  typedef boost::function<void()> EventCallback;
  typedef boost::function<void(Timestamp)> ReadEventCallback;
  Channel(EventLoop* loop, int fd);
  void handleEvent(Timestamp receiveTime);
  void setReadCallback(const ReadEventCallback& cb)
  { readCallback_ = cb; }
  void setWriteCallback(const EventCallback& cb)
  { writeCallback_ = cb; }
  void setCloseCallback(const EventCallback& cb)
  { closeCallback_ = cb; }
  void setErrorCallback(const EventCallback& cb)
  { errorCallback_ = cb; }
  /// Tie this channel to the owner object managed by shared_ptr,
  /// prevent the owner object being destroyed in handleEvent.
  void tie(const boost::shared_ptr<void>&);
  int fd() const { return fd_; }
  int events() const { return events_; }
  void set_revents(int revt) { revents_ = revt; } // used by pollers
  // int revents() const { return revents_; }
  bool isNoneEvent() const { return events_ == kNoneEvent; }
  void enableReading() { events_ |= kReadEvent; update(); }
  // void disableReading() { events_ &= ~kReadEvent; update(); }
  void enableWriting() { events_ |= kWriteEvent; update(); }
  void disableWriting() { events_ &= ~kWriteEvent; update(); }
  void disableAll() { events_ = kNoneEvent; update(); }
  bool isWriting() const { return events_ & kWriteEvent; }
  // for Poller
  int index() { return index_; }
  void set_index(int idx) { index_ = idx; }
  // for debug
  string reventsToString() const;
  void doNotLogHup() { logHup_ = false; }
  EventLoop* ownerLoop() { return loop_; }
  void remove();
  void update();
  void handleEventWithGuard(Timestamp receiveTime);
  static const int kNoneEvent;
  static const int kReadEvent;
  static const int kWriteEvent;
  EventLoop* loop_;         // 所属EventLoop
  const int  fd_;           // 文件描述符,但不负责关闭该文件描述符
  int        events_;       // 关注的事件
  int        revents_;      // poll/epoll返回的事件
  int        index_;        // used by Poller.表示在poll的事件数组中的序号
  bool       logHup_;       // for POLLHUP
  boost::weak_ptr<void> tie_;
  bool tied_;
  bool eventHandling_;      // 是否处于处理事件中
  ReadEventCallback readCallback_;
  EventCallback writeCallback_;
  EventCallback closeCallback_;
  EventCallback errorCallback_;
#include <muduo/base/Logging.h>
#include <muduo/net/Channel.h>
#include <muduo/net/EventLoop.h>
#include <sstream>
#include <poll.h>
using namespace muduo;
using namespace muduo::net;
const int Channel::kNoneEvent = 0;
const int Channel::kReadEvent = POLLIN | POLLPRI;
const int Channel::kWriteEvent = POLLOUT;
Channel::Channel(EventLoop* loop, int fd__)
  : loop_(loop),
void Channel::tie(const boost::shared_ptr<void>& obj)
  tie_ = obj;
  tied_ = true;
void Channel::update()
// 调用这个函数之前确保调用disableAll
void Channel::remove()
void Channel::handleEvent(Timestamp receiveTime)
  boost::shared_ptr<void> guard;
  if (tied_)
    guard = tie_.lock();
    if (guard)
      LOG_TRACE << "[6] usecount=" << guard.use_count();
      LOG_TRACE << "[12] usecount=" << guard.use_count();
void Channel::handleEventWithGuard(Timestamp receiveTime)
  eventHandling_ = true;
  if ((revents_ & POLLHUP) && !(revents_ & POLLIN))
    if (logHup_)
      LOG_WARN << "Channel::handle_event() POLLHUP";
    if (closeCallback_) closeCallback_();
  if (revents_ & POLLNVAL)
    LOG_WARN << "Channel::handle_event() POLLNVAL";
  if (revents_ & (POLLERR | POLLNVAL))
    if (errorCallback_) errorCallback_();
  if (revents_ & (POLLIN | POLLPRI | POLLRDHUP))
    if (readCallback_) readCallback_(receiveTime);
  if (revents_ & POLLOUT)
    if (writeCallback_) writeCallback_();
  eventHandling_ = false;
string Channel::reventsToString() const
  std::ostringstream oss;
  oss << fd_ << ": ";
  if (revents_ & POLLIN)
    oss << "IN ";
  if (revents_ & POLLPRI)
    oss << "PRI ";
  if (revents_ & POLLOUT)
    oss << "OUT ";
  if (revents_ & POLLHUP)
    oss << "HUP ";
  if (revents_ & POLLRDHUP)
    oss << "RDHUP ";
  if (revents_ & POLLERR)
    oss << "ERR ";
  if (revents_ & POLLNVAL)
    oss << "NVAL ";
  return oss.str().c_str();


#include <muduo/net/TcpServer.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/InetAddress.h>
#include <boost/bind.hpp>
#include <stdio.h>
using namespace muduo;
using namespace muduo::net;
class TestServer
  TestServer(EventLoop* loop,
             const InetAddress& listenAddr)
    : loop_(loop),
      server_(loop, listenAddr, "TestServer")
        boost::bind(&TestServer::onConnection, this, _1));
        boost::bind(&TestServer::onMessage, this, _1, _2, _3));
  void start()
  void onConnection(const TcpConnectionPtr& conn)
    if (conn->connected())
      printf("onConnection(): new connection [%s] from %s\n",
      printf("onConnection(): connection [%s] is down\n",
  void onMessage(const TcpConnectionPtr& conn,
                   const char* data,
                   ssize_t len)
    printf("onMessage(): received %zd bytes from connection [%s]\n",
           len, conn->name().c_str());
  EventLoop* loop_;
  TcpServer server_;
int main()
  printf("main(): pid = %d\n", getpid());
  InetAddress listenAddr(8888);
  EventLoop loop;
  TestServer server(&loop, listenAddr);