[34] TcpServer/TcpConnection

Acceptor类的主要功能是socket、bind、listen
般来说,在上层应用程序中,我们不直接使用Acceptor,而是把它作为TcpServer的成员
TcpServer还包含了一个TcpConnection列表
TcpConnection与Acceptor类似,有两个重要的数据成员,Socket与Channel


时序图

TcpServer头文件

TcpServer.h

// Copyright 2010, Shuo Chen.  All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
 
// Author: Shuo Chen (chenshuo at chenshuo dot com)
//
// This is a public header file, it must only include public header files.
 
#ifndef MUDUO_NET_TCPSERVER_H
#define MUDUO_NET_TCPSERVER_H
 
#include <muduo/base/Types.h>
#include <muduo/net/TcpConnection.h>
 
#include <map>
#include <boost/noncopyable.hpp>
#include <boost/scoped_ptr.hpp>
 
namespace muduo
{
namespace net
{
 
class Acceptor;
class EventLoop;
 
///
/// TCP server, supports single-threaded and thread-pool models.
///
/// This is an interface class, so don't expose too much details.
class TcpServer : boost::noncopyable
{
 public:
  //typedef boost::function<void(EventLoop*)> ThreadInitCallback;
 
  //TcpServer(EventLoop* loop, const InetAddress& listenAddr);
  TcpServer(EventLoop* loop,
            const InetAddress& listenAddr,
            const string& nameArg);
  ~TcpServer();  // force out-line dtor, for scoped_ptr members.
 
  /*返回服务器的名称*/
  const string& hostport() const { return hostport_; }
  /*返回服务器的监听端口*/
  const string& name() const { return name_; }
 
  /// Starts the server if it's not listenning.
  ///
  /// It's harmless to call it multiple times.
  /// Thread safe.
  void start();
 
  /// Set connection callback.
  /// Not thread safe.
  // 设置连接到来或者连接关闭回调函数
  void setConnectionCallback(const ConnectionCallback& cb)
  { connectionCallback_ = cb; }
 
  /// Set message callback.
  /// Not thread safe.
  // 设置消息到来回调函数
  void setMessageCallback(const MessageCallback& cb)
  { messageCallback_ = cb; }
 
 
 private:
  /// Not thread safe, but in loop
  // Acceptor::handleRead函数中会回调用TcpServer::newConnection
  // _1对应的是socket文件描述符,_2对应的是对等方的地址(InetAddress)
  //应该还在IO线程里面
  // newConnection和connectionCallback_都是连接回调函数,但是connectionCallback_
  // 是给应用程序使用的,newConnection是库函数,不会暴露给用户。
  // 其实就是newConnection函数主要负责注册connectionCallback_ 函数
  void newConnection(int sockfd, const InetAddress& peerAddr);
 
  typedef std::map<string, TcpConnectionPtr> ConnectionMap;
 
  EventLoop* loop_;  // the acceptor's loop,不一定是连接所属的eventloop
  const string hostport_;       // 服务端口
  const string name_;           // 服务名
  boost::scoped_ptr<Acceptor> acceptor_; // avoid revealing Acceptor
  /*连接到来的回调函数*/
  ConnectionCallback connectionCallback_;
  /*消息到来的回调函数*/
  MessageCallback messageCallback_;
  bool started_; /*连接是否启动*/
  // always in loop thread
  int nextConnId_;              // 下一个连接ID
  ConnectionMap connections_;   // 连接列表
};
 
}
}
 
#endif  // MUDUO_NET_TCPSERVER_H

TcpServer源文件

TcpServer.cc

// Copyright 2010, Shuo Chen.  All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
 
// Author: Shuo Chen (chenshuo at chenshuo dot com)
 
#include <muduo/net/TcpServer.h>
 
#include <muduo/base/Logging.h>
#include <muduo/net/Acceptor.h>
#include <muduo/net/EventLoop.h>
//#include <muduo/net/EventLoopThreadPool.h>
#include <muduo/net/SocketsOps.h>
 
#include <boost/bind.hpp>
 
#include <stdio.h>  // snprintf
 
using namespace muduo;
using namespace muduo::net;
 
TcpServer::TcpServer(EventLoop* loop,
                     const InetAddress& listenAddr,
                     const string& nameArg)
  : loop_(CHECK_NOTNULL(loop)),
    hostport_(listenAddr.toIpPort()),
    name_(nameArg),
    acceptor_(new Acceptor(loop, listenAddr)),
    /*threadPool_(new EventLoopThreadPool(loop)),
    connectionCallback_(defaultConnectionCallback),
    messageCallback_(defaultMessageCallback),*/
    started_(false),
    nextConnId_(1)
{
  // Acceptor::handleRead函数中会回调用TcpServer::newConnection
  // _1对应的是socket文件描述符,_2对应的是对等方的地址(InetAddress)
  acceptor_->setNewConnectionCallback(
      boost::bind(&TcpServer::newConnection, this, _1, _2));
}
 
TcpServer::~TcpServer()
{
  loop_->assertInLoopThread();
  LOG_TRACE << "TcpServer::~TcpServer [" << name_ << "] destructing";
}
 
// 该函数多次调用是无害的
// 该函数可以跨线程调用
void TcpServer::start()
{
  if (!started_)
  {
    started_ = true;
  }
 
  /*如果还没有被执行,那么让他在IO线程中执行listen函数*/
  if (!acceptor_->listenning())
  {
    // get_pointer返回原生指针
    loop_->runInLoop(
        boost::bind(&Acceptor::listen, get_pointer(acceptor_)));
  }
}
 
void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr)
{
  loop_->assertInLoopThread();
  char buf[32];
  snprintf(buf, sizeof buf, ":%s#%d", hostport_.c_str(), nextConnId_);
  ++nextConnId_;
  string connName = name_ + buf;
 
  LOG_INFO << "TcpServer::newConnection [" << name_
           << "] - new connection [" << connName
           << "] from " << peerAddr.toIpPort();
  InetAddress localAddr(sockets::getLocalAddr(sockfd));
  // FIXME poll with zero timeout to double confirm the new connection
  // FIXME use make_shared if necessary
  /*构建一个连接对象*/
  TcpConnectionPtr conn(new TcpConnection(loop_,
                                          connName,
                                          sockfd,
                                          localAddr,
                                          peerAddr));
  connections_[connName] = conn;
  conn->setConnectionCallback(connectionCallback_);
  conn->setMessageCallback(messageCallback_);
 
  conn->connectEstablished();
}

TcpConnection头文件

TcpConnection.h

// Copyright 2010, Shuo Chen.  All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
 
// Author: Shuo Chen (chenshuo at chenshuo dot com)
//
// This is a public header file, it must only include public header files.
 
#ifndef MUDUO_NET_TCPCONNECTION_H
#define MUDUO_NET_TCPCONNECTION_H
 
#include <muduo/base/Mutex.h>
#include <muduo/base/StringPiece.h>
#include <muduo/base/Types.h>
#include <muduo/net/Callbacks.h>
//#include <muduo/net/Buffer.h>
#include <muduo/net/InetAddress.h>
 
//#include <boost/any.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/noncopyable.hpp>
#include <boost/scoped_ptr.hpp>
#include <boost/shared_ptr.hpp>
 
namespace muduo
{
namespace net
{
 
class Channel;
class EventLoop;
class Socket;
 
///
/// TCP connection, for both client and server usage.
///
/// This is an interface class, so don't expose too much details.
class TcpConnection : boost::noncopyable,
                      public boost::enable_shared_from_this<TcpConnection>
{
 public:
  /// Constructs a TcpConnection with a connected sockfd
  ///
  /// User should not create this object.
  TcpConnection(EventLoop* loop,
                const string& name,
                int sockfd,
                const InetAddress& localAddr,
                const InetAddress& peerAddr);
  ~TcpConnection();
 
  EventLoop* getLoop() const { return loop_; }
  const string& name() const { return name_; }
  const InetAddress& localAddress() { return localAddr_; }
  const InetAddress& peerAddress() { return peerAddr_; }
  bool connected() const { return state_ == kConnected; }
 
  void setConnectionCallback(const ConnectionCallback& cb)
  { connectionCallback_ = cb; }
 
  void setMessageCallback(const MessageCallback& cb)
  { messageCallback_ = cb; }
 
  // called when TcpServer accepts a new connection
  void connectEstablished();   // should be called only once
 
 private:
  enum StateE { /*kDisconnected, */kConnecting, kConnected/*, kDisconnecting*/ };
  void handleRead(Timestamp receiveTime);
  void setState(StateE s) { state_ = s; }
 
  EventLoop* loop_;         // 所属EventLoop
  string name_;             // 连接名
  /*连接状态*/
  StateE state_;  // FIXME: use atomic variable
  // we don't expose those classes to client.
  boost::scoped_ptr<Socket> socket_;
  boost::scoped_ptr<Channel> channel_;
  InetAddress localAddr_;
  InetAddress peerAddr_;
  /*连接套接字的回调函数*/
  ConnectionCallback connectionCallback_;
  /*消息被读到应用层后的回调函数*/
  MessageCallback messageCallback_;
};
 
typedef boost::shared_ptr<TcpConnection> TcpConnectionPtr;
 
}
}
 
#endif  // MUDUO_NET_TCPCONNECTION_H

TcpConnection源文件

TcpConnection.cc

// Copyright 2010, Shuo Chen.  All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
 
// Author: Shuo Chen (chenshuo at chenshuo dot com)
 
#include <muduo/net/TcpConnection.h>
 
#include <muduo/base/Logging.h>
#include <muduo/net/Channel.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/Socket.h>
#include <muduo/net/SocketsOps.h>
 
#include <boost/bind.hpp>
 
#include <errno.h>
#include <stdio.h>
 
using namespace muduo;
using namespace muduo::net;
/*
void muduo::net::defaultConnectionCallback(const TcpConnectionPtr& conn)
{
  LOG_TRACE << conn->localAddress().toIpPort() << " -> "
            << conn->peerAddress().toIpPort() << " is "
            << (conn->connected() ? "UP" : "DOWN");
}
 
void muduo::net::defaultMessageCallback(const TcpConnectionPtr&,
                                        Buffer* buf,
                                        Timestamp)
{
  buf->retrieveAll();
}
*/
TcpConnection::TcpConnection(EventLoop* loop,
                             const string& nameArg,
                             int sockfd,
                             const InetAddress& localAddr,
                             const InetAddress& peerAddr)
  : loop_(CHECK_NOTNULL(loop)),
    name_(nameArg),
    state_(kConnecting),
    socket_(new Socket(sockfd)),
    channel_(new Channel(loop, sockfd)),
    localAddr_(localAddr),
    peerAddr_(peerAddr)/*,
    highWaterMark_(64*1024*1024)*/
{
  // 通道可读事件到来的时候,回调TcpConnection::handleRead,_1是事件发生时间
  channel_->setReadCallback(
      boost::bind(&TcpConnection::handleRead, this, _1));
  LOG_DEBUG << "TcpConnection::ctor[" <<  name_ << "] at " << this
            << " fd=" << sockfd;
  socket_->setKeepAlive(true);
}
 
TcpConnection::~TcpConnection()
{
  LOG_DEBUG << "TcpConnection::dtor[" <<  name_ << "] at " << this
            << " fd=" << channel_->fd();
}
 
void TcpConnection::connectEstablished()
{
  //断言在eventloop IO线程中
  loop_->assertInLoopThread();
  //断言还没有连接
  assert(state_ == kConnecting);
  setState(kConnected);
  channel_->tie(shared_from_this());
  /*如果连接成功,则关注可读事件*/
  channel_->enableReading(); // TcpConnection所对应的通道加入到Poller关注
 
  connectionCallback_(shared_from_this());
}
 
void TcpConnection::handleRead(Timestamp receiveTime)
{
  /*
  loop_->assertInLoopThread();
  int savedErrno = 0;
  ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno);
  if (n > 0)
  {
    messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);
  }
  else if (n == 0)
  {
    handleClose();
  }
  else
  {
    errno = savedErrno;
    LOG_SYSERR << "TcpConnection::handleRead";
    handleError();
  }
  */
  loop_->assertInLoopThread();
  char buf[65536];
  ssize_t n = ::read(channel_->fd(), buf, sizeof buf);
  messageCallback_(shared_from_this(), buf, n);
}

CallBack源文件

CallBack.cc

// Copyright 2010, Shuo Chen.  All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
 
// Author: Shuo Chen (chenshuo at chenshuo dot com)
//
// This is a public header file, it must only include public header files.
 
#ifndef MUDUO_NET_CALLBACKS_H
#define MUDUO_NET_CALLBACKS_H
 
#include <boost/function.hpp>
#include <boost/shared_ptr.hpp>
 
#include <muduo/base/Timestamp.h>
 
namespace muduo
{
/*
// Adapted from google-protobuf stubs/common.h
// see License in muduo/base/Types.h
template<typename To, typename From>
inline ::boost::shared_ptr<To> down_pointer_cast(const ::boost::shared_ptr<From>& f) {
  if (false) {
    implicit_cast<From*, To*>(0);
  }
 
#ifndef NDEBUG
  assert(f == NULL || dynamic_cast<To*>(get_pointer(f)) != NULL);
#endif
  return ::boost::static_pointer_cast<To>(f);
}*/
 
namespace net
{
 
// All client visible callbacks go here.
/*
class Buffer;
*/
class TcpConnection;
typedef boost::shared_ptr<TcpConnection> TcpConnectionPtr;
 
typedef boost::function<void()> TimerCallback;
 
typedef boost::function<void (const TcpConnectionPtr&)> ConnectionCallback;
/*typedef boost::function<void (const TcpConnectionPtr&)> CloseCallback;
typedef boost::function<void (const TcpConnectionPtr&)> WriteCompleteCallback;
typedef boost::function<void (const TcpConnectionPtr&, size_t)> HighWaterMarkCallback;
 
// the data has been read to (buf, len)
typedef boost::function<void (const TcpConnectionPtr&,
                              Buffer*,
                              Timestamp)> MessageCallback;
 
void defaultConnectionCallback(const TcpConnectionPtr& conn);
void defaultMessageCallback(const TcpConnectionPtr& conn,
                            Buffer* buffer,
                            Timestamp receiveTime);
                            */
typedef boost::function<void (const TcpConnectionPtr&,
                              const char* data,
                              ssize_t len)> MessageCallback;
 
}
}
 
#endif  // MUDUO_NET_CALLBACKS_H

测试程序1

#include <muduo/net/TcpServer.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/InetAddress.h>
 
#include <stdio.h>
 
/*
 
这个程序主要用来测试TcpServer、TcpConnection
由于这里的TcpServer还没有实现关闭的事件处理,所以当对等放关闭时,服务器会一直处于高电平状态,
也就是说会一直触发
**/
using namespace muduo;
using namespace muduo::net;
 
void onConnection(const TcpConnectionPtr& conn)
{
  if (conn->connected())
  {
    printf("onConnection(): new connection [%s] from %s\n",
           conn->name().c_str(),
           conn->peerAddress().toIpPort().c_str());
  }
  else
  {
    printf("onConnection(): connection [%s] is down\n",
           conn->name().c_str());
  }
}
 
void onMessage(const TcpConnectionPtr& conn,
               const char* data,
               ssize_t len)
{
  printf("onMessage(): received %zd bytes from connection [%s]\n",
         len, conn->name().c_str());
}
 
int main()
{
  printf("main(): pid = %d\n", getpid());
 
  InetAddress listenAddr(8888);
  EventLoop loop;
 
  TcpServer server(&loop, listenAddr, "TestServer");
  server.setConnectionCallback(onConnection);
  server.setMessageCallback(onMessage);
  server.start();
 
  loop.loop();
}

程序输出:

客户端

aaahuiahsdui crosoft Telnet Client
 
????????haracter is 'CTRL+]'
sadasd
 
asduhasuidhuiahsdiahsdjksdjfhsdiohahsoidoasudguagsdhjgashjdgajhksd
as
d
a
s
d
sad
 
Microsoft Telnet> quit
 
C:\Users\LaoHan>

服务器端

^Cubuntu@ubuntu-virtual-machine:~/pro/33/jmuduo$ ../build/debug/bin/reactor_test08
main(): pid = 6066
20131023 01:08:39.593827Z  6066 TRACE updateChannel fd = 4 events = 3 - EPollPoller.cc:104
20131023 01:08:39.594123Z  6066 TRACE EventLoop EventLoop created 0xBFBEBEDC in thread 6066 - EventLoop.cc:62
20131023 01:08:39.594172Z  6066 TRACE updateChannel fd = 5 events = 3 - EPollPoller.cc:104
20131023 01:08:39.594331Z  6066 TRACE updateChannel fd = 6 events = 3 - EPollPoller.cc:104
20131023 01:08:39.594387Z  6066 TRACE loop EventLoop 0xBFBEBEDC start looping - EventLoop.cc:94
20131023 01:08:43.065719Z  6066 TRACE poll 1 events happended - EPollPoller.cc:65
20131023 01:08:43.066560Z  6066 TRACE printActiveChannels {6: IN }  - EventLoop.cc:257
20131023 01:08:43.066665Z  6066 INFO  TcpServer::newConnection [TestServer] - new connection [TestServer:0.0.0.0:8888#1] from 172.21.95.221:50007 - TcpServer.cc:74
20131023 01:08:43.066762Z  6066 DEBUG TcpConnection TcpConnection::ctor[TestServer:0.0.0.0:8888#1] at 0x9D80468 fd=8 - TcpConnection.cc:56
20131023 01:08:43.066802Z  6066 TRACE updateChannel fd = 8 events = 3 - EPollPoller.cc:104
onConnection(): new connection [TestServer:0.0.0.0:8888#1] from 172.21.95.221:50007
20131023 01:08:43.845782Z  6066 TRACE poll 1 events happended - EPollPoller.cc:65
20131023 01:08:43.845853Z  6066 TRACE printActiveChannels {8: IN }  - EventLoop.cc:257
onMessage(): received 1 bytes from connection [TestServer:0.0.0.0:8888#1]
20131023 01:08:44.122627Z  6066 TRACE poll 1 events happended - EPollPoller.cc:65
20131023 01:08:44.122695Z  6066 TRACE printActiveChannels {8: IN }  - EventLoop.cc:257
onMessage(): received 1 bytes from connection [TestServer:0.0.0.0:8888#1]
20131023 01:08:44.306479Z  6066 TRACE poll 1 events happended - EPollPoller.cc:65
20131023 01:08:44.306545Z  6066 TRACE printActiveChannels {8: IN }  - EventLoop.cc:257
onMessage(): received 1 bytes from connection [TestServer:0.0.0.0:8888#1]
onMessage(): received 0 bytes from connection [TestServer:0.0.0.0:8888#1]
20131023 01:08:23.429136Z  6063 TRACE poll 1 events happended - EPollPoller.cc:65
20131023 01:08:23.429156Z  6063 TRACE printActiveChannels {8: IN }  - EventLoop.cc:257
onMessage(): received 0 bytes from connection [TestServer:0.0.0.0:8888#1]
20131023 01:08:23.429186Z  6063 TRACE poll 1 events happended - EPollPoller.cc:65
20131023 01:08:23.429205Z  6063 TRACE printActiveChannels {8: IN }  - EventLoop.cc:257
onMessage(): received 0 bytes from connection [TestServer:0.0.0.0:8888#1]
20131023 01:08:23.429235Z  6063 TRACE poll 1 events happended - EPollPoller.cc:65
20131023 01:08:23.429253Z  6063 TRACE printActiveChannels {8: IN }  - EventLoop.cc:257
onMessage(): received 0 bytes from connection [TestServer:0.0.0.0:8888#1]
20131023 01:08:23.429283Z  6063 TRACE poll 1 events happended - EPollPoller.cc:65
20131023 01:08:23.429302Z  6063 TRACE printActiveChannels {8: IN }  - EventLoop.cc:257
onMessage(): received 0 bytes from connection [TestServer:0.0.0.0:8888#1]
20131023 01:08:23.429332Z  6063 TRACE poll 1 events happended - EPollPoller.cc:65
20131023 01:08:23.429351Z  6063 TRACE printActiveChannels {8: IN }  - EventLoop.cc:257
onMessage(): received 0 bytes from connection [TestServer:0.0.0.0:8888#1]
20131023 01:08:23.429381Z  6063 TRACE poll 1 events happended - EPollPoller.cc:65
20131023 01:08:23.429399Z  6063 TRACE printActiveChannels {8: IN }  - EventLoop.cc:257
onMessage(): received 0 bytes from connection [TestServer:0.0.0.0:8888#1]
20131023 01:08:23.429447Z  6063 TRACE poll 1 events happended - EPollPoller.cc:65
20131023 01:08:23.429466Z  6063 TRACE printActiveChannels {8: IN }  - EventLoop.cc:257
onMessage(): received 0 bytes from connection [TestServer:0.0.0.0:8888#1]
20131023 01:08:23.429497Z  6063 TRACE poll 1 events happended - EPollPoller.cc:65
20131023 01:08:23.429515Z  6063 TRACE printActiveChannels {8: IN }  - EventLoop.cc:257
onMessage(): received 0 bytes from connection [TestServer:0.0.0.0:8888#1]
20131023 01:08:23.429545Z  6063 TRACE poll 1 events happended - EPollPoller.cc:65

测试程序2

#include <muduo/net/TcpServer.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/InetAddress.h>
 
#include <boost/bind.hpp>
 
#include <stdio.h>
 
using namespace muduo;
using namespace muduo::net;
 
class TestServer
{
 public:
  TestServer(EventLoop* loop,
             const InetAddress& listenAddr)
    : loop_(loop),
      server_(loop, listenAddr, "TestServer")
  {
    server_.setConnectionCallback(
        boost::bind(&TestServer::onConnection, this, _1));
    server_.setMessageCallback(
        boost::bind(&TestServer::onMessage, this, _1, _2, _3));
  }
 
  void start()
  {
      server_.start();
  }
 
 private:
  void onConnection(const TcpConnectionPtr& conn)
  {
    if (conn->connected())
    {
      printf("onConnection(): new connection [%s] from %s\n",
             conn->name().c_str(),
             conn->peerAddress().toIpPort().c_str());
    }
    else
    {
      printf("onConnection(): connection [%s] is down\n",
             conn->name().c_str());
    }
  }
 
  void onMessage(const TcpConnectionPtr& conn,
                   const char* data,
                   ssize_t len)
  {
    printf("onMessage(): received %zd bytes from connection [%s]\n",
           len, conn->name().c_str());
  }
 
  EventLoop* loop_;
  TcpServer server_;
};
int main()
{
  printf("main(): pid = %d\n", getpid());
 
  InetAddress listenAddr(8888);
  EventLoop loop;
 
  TestServer server(&loop, listenAddr);
  server.start();
 
  loop.loop();
}

[35-1] multiple reactors

  • muduo库如何支持多线程
  • EventLoopThreadIO线程类)
  • EventLoopThreadPoolIO线程池类)
  • IO线程池的功能是开启若干个IO线程,并让这些IO线程处于事件循环的状态

下面的这些代码可能和前面给出的源代码有些不一样,阅读的同学请注意了

EventLoopThreadPool头文件

eventloopthreadpool.h

// Copyright 2010, Shuo Chen.  All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
 
// Author: Shuo Chen (chenshuo at chenshuo dot com)
//
// This is an internal header file, you should not include this.
 
#ifndef MUDUO_NET_EVENTLOOPTHREADPOOL_H
#define MUDUO_NET_EVENTLOOPTHREADPOOL_H
 
#include <muduo/base/Condition.h>
#include <muduo/base/Mutex.h>
 
#include <vector>
#include <boost/function.hpp>
#include <boost/noncopyable.hpp>
#include <boost/ptr_container/ptr_vector.hpp>
 
namespace muduo
{
 
namespace net
{
 
class EventLoop;
class EventLoopThread;
 
class EventLoopThreadPool : boost::noncopyable
{
 public:
  typedef boost::function<void(EventLoop*)> ThreadInitCallback;
 
  EventLoopThreadPool(EventLoop* baseLoop);
  ~EventLoopThreadPool();
  void setThreadNum(int numThreads) { numThreads_ = numThreads; }
  void start(const ThreadInitCallback& cb = ThreadInitCallback());
  EventLoop* getNextLoop();
 
 private:
 
  EventLoop* baseLoop_; // 与Acceptor所属EventLoop相同
  bool started_;        /*是否启动*/
  int numThreads_;      // 线程数
  int next_;            // 新连接到来,所选择的EventLoop对象下标
  boost::ptr_vector<EventLoopThread> threads_;        // IO线程列表
  std::vector<EventLoop*> loops_;                 // EventLoop列表
};
 
}
}
 
#endif  // MUDUO_NET_EVENTLOOPTHREADPOOL_H

EventLoopThreadPool源文件

eventloopthreadpool.cc

// Copyright 2010, Shuo Chen.  All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
 
// Author: Shuo Chen (chenshuo at chenshuo dot com)
 
#include <muduo/net/EventLoopThreadPool.h>
 
#include <muduo/net/EventLoop.h>
#include <muduo/net/EventLoopThread.h>
 
#include <boost/bind.hpp>
 
using namespace muduo;
using namespace muduo::net;
 
 
EventLoopThreadPool::EventLoopThreadPool(EventLoop* baseLoop)
  : baseLoop_(baseLoop),  // 与Acceptor所属EventLoop相同
    started_(false),
    numThreads_(0),
    next_(0)
{
}
 
EventLoopThreadPool::~EventLoopThreadPool()
{
  // Don't delete loop, it's stack variable
}
/*每个线程初始化时的回调函数cb*/
void EventLoopThreadPool::start(const ThreadInitCallback& cb)
{
  assert(!started_);
  baseLoop_->assertInLoopThread();
 
  started_ = true;
 
  for (int i = 0; i < numThreads_; ++i)
  {
    EventLoopThread* t = new EventLoopThread(cb);
    threads_.push_back(t);
    loops_.push_back(t->startLoop());    // 启动EventLoopThread线程,在进入事件循环之前,会调用cb
  }
  if (numThreads_ == 0 && cb)
  {
    // 只有一个EventLoop,在这个EventLoop进入事件循环之前,调用cb
    cb(baseLoop_);
  }
}
 
EventLoop* EventLoopThreadPool::getNextLoop()
{
  baseLoop_->assertInLoopThread();
  EventLoop* loop = baseLoop_;
 
  // 如果loops_为空,则loop指向baseLoop_
  // 如果不为空,按照round-robin(RR,轮叫)的调度方式选择一个EventLoop
  if (!loops_.empty())
  {
    // round-robin
    loop = loops_[next_];
    ++next_;
    if (implicit_cast<size_t>(next_) >= loops_.size())
    {
      next_ = 0;
    }
  }
  return loop;
}

TcpServer头文件

TcpServer.h

// Copyright 2010, Shuo Chen.  All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
 
// Author: Shuo Chen (chenshuo at chenshuo dot com)
//
// This is a public header file, it must only include public header files.
 
#ifndef MUDUO_NET_TCPSERVER_H
#define MUDUO_NET_TCPSERVER_H
 
#include <muduo/base/Types.h>
#include <muduo/net/TcpConnection.h>
 
#include <map>
#include <boost/noncopyable.hpp>
#include <boost/scoped_ptr.hpp>
 
namespace muduo
{
namespace net
{
 
class Acceptor;
class EventLoop;
class EventLoopThreadPool;
 
///
/// TCP server, supports single-threaded and thread-pool models.
///
/// This is an interface class, so don't expose too much details.
class TcpServer : boost::noncopyable
{
 public:
  typedef boost::function<void(EventLoop*)> ThreadInitCallback;
 
  //TcpServer(EventLoop* loop, const InetAddress& listenAddr);
  TcpServer(EventLoop* loop,
            const InetAddress& listenAddr,
            const string& nameArg);
  ~TcpServer();  // force out-line dtor, for scoped_ptr members.
 
  const string& hostport() const { return hostport_; }
  const string& name() const { return name_; }
 
  /// Set the number of threads for handling input.
  ///
  /// Always accepts new connection in loop's thread.
  /// Must be called before @c start
  /// @param numThreads
  /// - 0 means all I/O in loop's thread, no thread will created.
  ///   this is the default value.
  /// - 1 means all I/O in another thread.
  /// - N means a thread pool with N threads, new connections
  ///   are assigned on a round-robin basis.
  void setThreadNum(int numThreads);
  void setThreadInitCallback(const ThreadInitCallback& cb)
  { threadInitCallback_ = cb; }
 
  /// Starts the server if it's not listenning.
  ///
  /// It's harmless to call it multiple times.
  /// Thread safe.
  void start();
 
  /// Set connection callback.
  /// Not thread safe.
  // 设置连接到来或者连接关闭回调函数
  void setConnectionCallback(const ConnectionCallback& cb)
  { connectionCallback_ = cb; }
 
  /// Set message callback.
  /// Not thread safe.
  // 设置消息到来回调函数
  void setMessageCallback(const MessageCallback& cb)
  { messageCallback_ = cb; }
 
 
 private:
  /// Not thread safe, but in loop
  void newConnection(int sockfd, const InetAddress& peerAddr);
  /// Thread safe.
  void removeConnection(const TcpConnectionPtr& conn);
  /// Not thread safe, but in loop
  void removeConnectionInLoop(const TcpConnectionPtr& conn);
 
  typedef std::map<string, TcpConnectionPtr> ConnectionMap;
 
  EventLoop* loop_;  // the acceptor loop
  const string hostport_;       // 服务端口
  const string name_;           // 服务名
  boost::scoped_ptr<Acceptor> acceptor_; // avoid revealing Acceptor
  boost::scoped_ptr<EventLoopThreadPool> threadPool_; //线程池
  ConnectionCallback connectionCallback_;
  MessageCallback messageCallback_;
  ThreadInitCallback threadInitCallback_;   // IO线程池中的线程在进入事件循环前,会回调用此函数
  bool started_;
  // always in loop thread
  int nextConnId_;              // 下一个连接ID
  ConnectionMap connections_;   // 连接列表
};
 
}
}
 
#endif  // MUDUO_NET_TCPSERVER_H

TcpServer源文件

TcpServer.cc

// Copyright 2010, Shuo Chen.  All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
 
// Author: Shuo Chen (chenshuo at chenshuo dot com)
 
#include <muduo/net/TcpServer.h>
 
#include <muduo/base/Logging.h>
#include <muduo/net/Acceptor.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/EventLoopThreadPool.h>
#include <muduo/net/SocketsOps.h>
 
#include <boost/bind.hpp>
 
#include <stdio.h>  // snprintf
 
using namespace muduo;
using namespace muduo::net;
 
TcpServer::TcpServer(EventLoop* loop, /*这是main reactor*/
                     const InetAddress& listenAddr,
                     const string& nameArg)
  : loop_(CHECK_NOTNULL(loop)),
    hostport_(listenAddr.toIpPort()),
    name_(nameArg),
    acceptor_(new Acceptor(loop, listenAddr)),
    threadPool_(new EventLoopThreadPool(loop)),
    /*connectionCallback_(defaultConnectionCallback),
    messageCallback_(defaultMessageCallback),*/
    started_(false),
    nextConnId_(1)
{
  // Acceptor::handleRead函数中会回调用TcpServer::newConnection
  // _1对应的是socket文件描述符,_2对应的是对等方的地址(InetAddress)
  acceptor_->setNewConnectionCallback(
      boost::bind(&TcpServer::newConnection, this, _1, _2));
}
 
TcpServer::~TcpServer()
{
  loop_->assertInLoopThread();
  LOG_TRACE << "TcpServer::~TcpServer [" << name_ << "] destructing";
 
  for (ConnectionMap::iterator it(connections_.begin());
      it != connections_.end(); ++it)
  {
    TcpConnectionPtr conn = it->second;
    it->second.reset();      // 释放当前所控制的对象,引用计数减一
    conn->getLoop()->runInLoop(
      boost::bind(&TcpConnection::connectDestroyed, conn));
    conn.reset();           // 释放当前所控制的对象,引用计数减一
  }
}
 
void TcpServer::setThreadNum(int numThreads)
{
  /*numThreads不包含main reactor thread*/
  assert(0 <= numThreads);
  threadPool_->setThreadNum(numThreads);
}
 
// 该函数多次调用是无害的
// 该函数可以跨线程调用
void TcpServer::start()
{
  if (!started_)
  {
    started_ = true;
    /*启动线程池*/
    threadPool_->start(threadInitCallback_);
  }
 
  if (!acceptor_->listenning())
  {
    // get_pointer返回原生指针
    loop_->runInLoop(
        boost::bind(&Acceptor::listen, get_pointer(acceptor_)));
  }
}
 
void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr)
{
  loop_->assertInLoopThread();
  // 按照轮叫的方式选择一个EventLoop
  EventLoop* ioLoop = threadPool_->getNextLoop();
  char buf[32];
  snprintf(buf, sizeof buf, ":%s#%d", hostport_.c_str(), nextConnId_);
  ++nextConnId_;
  string connName = name_ + buf;
 
  LOG_INFO << "TcpServer::newConnection [" << name_
           << "] - new connection [" << connName
           << "] from " << peerAddr.toIpPort();
  InetAddress localAddr(sockets::getLocalAddr(sockfd));
  // FIXME poll with zero timeout to double confirm the new connection
  // FIXME use make_shared if necessary
  /*TcpConnectionPtr conn(new TcpConnection(loop_,
                                          connName,
                                          sockfd,
                                          localAddr,
                                          peerAddr));*/
 
  TcpConnectionPtr conn(new TcpConnection(ioLoop,
                                          connName,
                                          sockfd,
                                          localAddr,
                                          peerAddr));
 
  LOG_TRACE << "[1] usecount=" << conn.use_count();
  connections_[connName] = conn;
  LOG_TRACE << "[2] usecount=" << conn.use_count();
  conn->setConnectionCallback(connectionCallback_);
  conn->setMessageCallback(messageCallback_);
 
  conn->setCloseCallback(
      boost::bind(&TcpServer::removeConnection, this, _1));
 
  // conn->connectEstablished(); 这个表示直接在当前线程中调用
  ioLoop->runInLoop(boost::bind(&TcpConnection::connectEstablished, conn));
  LOG_TRACE << "[5] usecount=" << conn.use_count();
 
}
 
void TcpServer::removeConnection(const TcpConnectionPtr& conn)
{
    /*
  loop_->assertInLoopThread();
  LOG_INFO << "TcpServer::removeConnectionInLoop [" << name_
           << "] - connection " << conn->name();
 
 
  LOG_TRACE << "[8] usecount=" << conn.use_count();
  size_t n = connections_.erase(conn->name());
  LOG_TRACE << "[9] usecount=" << conn.use_count();
 
  (void)n;
  assert(n == 1);
 
  loop_->queueInLoop(
      boost::bind(&TcpConnection::connectDestroyed, conn));
  LOG_TRACE << "[10] usecount=" << conn.use_count();
  */
 
  loop_->runInLoop(boost::bind(&TcpServer::removeConnectionInLoop, this, conn));
 
}
 
void TcpServer::removeConnectionInLoop(const TcpConnectionPtr& conn)
{
  loop_->assertInLoopThread();
  LOG_INFO << "TcpServer::removeConnectionInLoop [" << name_
           << "] - connection " << conn->name();
 
 
  LOG_TRACE << "[8] usecount=" << conn.use_count();
  size_t n = connections_.erase(conn->name());
  LOG_TRACE << "[9] usecount=" << conn.use_count();
 
  (void)n;
  assert(n == 1);
 
  EventLoop* ioLoop = conn->getLoop();
  ioLoop->queueInLoop(
      boost::bind(&TcpConnection::connectDestroyed, conn));
 
  //loop_->queueInLoop(
  //    boost::bind(&TcpConnection::connectDestroyed, conn));
  LOG_TRACE << "[10] usecount=" << conn.use_count(); 
}

测试程序

#include <muduo/net/TcpServer.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/InetAddress.h>
 
#include <boost/bind.hpp>
 
#include <stdio.h>
 
using namespace muduo;
using namespace muduo::net;
 
class TestServer
{
 public:
  TestServer(EventLoop* loop,
             const InetAddress& listenAddr, int numThreads)
    : loop_(loop),
      server_(loop, listenAddr, "TestServer"),
      numThreads_(numThreads)
  {
    server_.setConnectionCallback(
        boost::bind(&TestServer::onConnection, this, _1));
    server_.setMessageCallback(
        boost::bind(&TestServer::onMessage, this, _1, _2, _3));
    server_.setThreadNum(numThreads);
  }
 
  void start()
  {
      server_.start();
  }
 
 private:
  void onConnection(const TcpConnectionPtr& conn)
  {
    if (conn->connected())
    {
      printf("onConnection(): new connection [%s] from %s\n",
             conn->name().c_str(),
             conn->peerAddress().toIpPort().c_str());
    }
    else
    {
      printf("onConnection(): connection [%s] is down\n",
             conn->name().c_str());
    }
  }
 
  void onMessage(const TcpConnectionPtr& conn,
                   const char* data,
                   ssize_t len)
  {
    printf("onMessage(): received %zd bytes from connection [%s]\n",
           len, conn->name().c_str());
  }
 
  EventLoop* loop_;
  TcpServer server_;
  int numThreads_;
};
 
 
int main()
{
  printf("main(): pid = %d\n", getpid());
 
  InetAddress listenAddr(8888);
  EventLoop loop;
 
  TestServer server(&loop, listenAddr,4);
  server.start();
 
  loop.loop();
}

   return n;
}

[35-2] 文件描述符的分析

code

#include <muduo/net/TcpServer.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/InetAddress.h>
 
#include <boost/bind.hpp>
 
#include <stdio.h>
 
using namespace muduo;
using namespace muduo::net;
 
class TestServer
{
 public:
  TestServer(EventLoop* loop,
             const InetAddress& listenAddr)
    : loop_(loop),
      server_(loop, listenAddr, "TestServer")
  {
    server_.setConnectionCallback(
        boost::bind(&TestServer::onConnection, this, _1));
    server_.setMessageCallback(
        boost::bind(&TestServer::onMessage, this, _1, _2, _3));
  }
 
  void start()
  {
      server_.start();
  }
 
 private:
  void onConnection(const TcpConnectionPtr& conn)
  {
    if (conn->connected())
    {
      printf("onConnection(): new connection [%s] from %s\n",
             conn->name().c_str(),
             conn->peerAddress().toIpPort().c_str());
    }
    else
    {
      printf("onConnection(): connection [%s] is down\n",
             conn->name().c_str());
    }
  }
 
  void onMessage(const TcpConnectionPtr& conn,
                   const char* data,
                   ssize_t len)
  {
    printf("onMessage(): received %zd bytes from connection [%s]\n",
           len, conn->name().c_str());
  }
 
  EventLoop* loop_;
  TcpServer server_;
};
 
 
int main()
{
  printf("main(): pid = %d\n", getpid());
 
  InetAddress listenAddr(8888);
  EventLoop loop;
 
  TestServer server(&loop, listenAddr);
  server.start();
 
  loop.loop();
}

分析结果

0 , 1  ,2 进程默认打开
3 EpollerFd
4 timerFd
5 wakeupFd
6 listenFd
7 idleFd
 
ubuntu@ubuntu-virtual-machine:~/pro/35$ ./build/debug/bin/reactor_test09
main(): pid = 10797
20131023 08:05:04.423104Z 10797 TRACE updateChannel fd = 4 events = 3 - EPollPoller.cc:104
20131023 08:05:04.423413Z 10797 TRACE EventLoop EventLoop created 0xBF97AAB4 in thread 10797 - EventLoop.cc:62
20131023 08:05:04.423489Z 10797 TRACE updateChannel fd = 5 events = 3 - EPollPoller.cc:104
20131023 08:05:04.423883Z 10797 TRACE updateChannel fd = 6 events = 3 - EPollPoller.cc:104
20131023 08:05:04.423968Z 10797 TRACE loop EventLoop 0xBF97AAB4 start looping - EventLoop.cc:94
20131023 08:05:13.376959Z 10797 TRACE poll 1 events happended - EPollPoller.cc:65
20131023 08:05:13.377558Z 10797 TRACE printActiveChannels {6: IN }  - EventLoop.cc:257-----》》有连接到来
20131023 08:05:13.377681Z 10797 INFO  TcpServer::newConnection [TestServer] - new connection [TestServer:0.0.0.0:8888#1] from 127.0.0.1:57756 - TcpServer.cc:93
20131023 08:05:13.377719Z 10797 DEBUG TcpConnection TcpConnection::ctor[TestServer:0.0.0.0:8888#1] at 0x8BB3490 fd=8 - TcpConnection.cc:62-------》》已连接的套接字
20131023 08:05:13.377746Z 10797 TRACE newConnection [1] usecount=1 - TcpServer.cc:111
20131023 08:05:13.377772Z 10797 TRACE newConnection [2] usecount=2 - TcpServer.cc:113
20131023 08:05:13.377819Z 10797 TRACE connectEstablished [3] usecount=6 - TcpConnection.cc:78
20131023 08:05:13.377846Z 10797 TRACE updateChannel fd = 8 events = 3 - EPollPoller.cc:104
onConnection(): new connection [TestServer:0.0.0.0:8888#1] from 127.0.0.1:57756
20131023 08:05:13.377902Z 10797 TRACE connectEstablished [4] usecount=6 - TcpConnection.cc:83
20131023 08:05:13.377915Z 10797 TRACE newConnection [5] usecount=2 - TcpServer.cc:122
20131023 08:05:23.388224Z 10797 TRACE poll  nothing happended - EPollPoller.cc:74
20131023 08:05:32.910851Z 10797 TRACE poll 1 events happended - EPollPoller.cc:65
20131023 08:05:32.910969Z 10797 TRACE printActiveChannels {8: IN }  - EventLoop.cc:257----->>已连接的套接字有可读事件
20131023 08:05:32.910990Z 10797 TRACE handleEvent [6] usecount=2 - Channel.cc:67
onMessage(): received 5 bytes from connection [TestServer:0.0.0.0:8888#1]
20131023 08:05:32.911066Z 10797 TRACE handleEvent [12] usecount=2 - Channel.cc:69

[36-1] BUFFER

1. 应用层缓冲区`Buffer`设计
2. 为什么需要有应用层缓冲区(详见`MuduoManual.pdf  P76`)
3. `Buffer`结构
4. `epoll`使用`LT`模式的原因
5. 与`poll`兼容
6. `LT`模式不会发生漏掉事件的`BUG`,但`POLLOUT`事件不能一开始就关注,否则会出现`busy loop`,而应该在`write`无法完全写入内核缓冲区的时候才关注,将未写入内核缓冲区的数据添加到应用层`output buffer`,直到应用层`output buffer`写完,停止关注`POLLOUT`事件。读写的时候不必等候`EAGAIN`,可以节省系统调用次数,降低延迟。(注:如果用`ET`模式,读的时候读到`EAGAIN`,写的时候直到`output buffer`写完或者`EAGAIN`)

BUFFER头文件

// Copyright 2010, Shuo Chen.  All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
 
// Author: Shuo Chen (chenshuo at chenshuo dot com)
//
// This is a public header file, it must only include public header files.
/*
接收到数据,存至input buffer,通知上层的应用程序,OnMessage(Buffer *buf)回调,根据应用层协议
判断是否一个完整的包,codec,如果不是一条完整的消息,不会取走数据,也不会进行相应的处理,
如果是一条完整的信息,将取走这条信息,并进行相应的处理
 
对方发送了两条消息,50,350
接收方第一次接收了200,第二次接收了200
 
从socket读了 200 个字节,写到buffer中
从buffer中取回50 个字节
又从socket读了200个字节,写到buffer中
从buffer中取回350个字节,写到buffer中
从buffer中取回350个字节
 
*/
#ifndef MUDUO_NET_BUFFER_H
#define MUDUO_NET_BUFFER_H
 
#include <muduo/base/copyable.h>
#include <muduo/base/StringPiece.h>
#include <muduo/base/Types.h>
 
#include <muduo/net/Endian.h>
 
#include <algorithm>
#include <vector>
 
#include <assert.h>
#include <string.h>
//#include <unistd.h>  // ssize_t
 
namespace muduo
{
namespace net
{
 
/// A buffer class modeled after org.jboss.netty.buffer.ChannelBuffer
///
/// @code
/// +-------------------+------------------+------------------+
/// | prependable bytes |  readable bytes  |  writable bytes  |
/// |                   |     (CONTENT)    |                  |
/// +-------------------+------------------+------------------+
/// |                   |                  |                  |
/// 0      <=      readerIndex   <=   writerIndex    <=     size
/// @endcode
class Buffer : public muduo::copyable
{
 public:
  static const size_t kCheapPrepend = 8;
  static const size_t kInitialSize = 1024;
 
  Buffer()
  /*缓冲区的大小*/
    : buffer_(kCheapPrepend + kInitialSize),
    /*可读的开始位置*/
      readerIndex_(kCheapPrepend),
    /*可写的开始位置*/
      writerIndex_(kCheapPrepend)
  {
    assert(readableBytes() == 0);
    assert(writableBytes() == kInitialSize);
    assert(prependableBytes() == kCheapPrepend);
  }
 
  // default copy-ctor(拷贝构造函数), dtor and assignment are fine
 
  void swap(Buffer& rhs)
  {
    buffer_.swap(rhs.buffer_);
    std::swap(readerIndex_, rhs.readerIndex_);
    std::swap(writerIndex_, rhs.writerIndex_);
  }
    /*可读的字节数*/
  size_t readableBytes() const
  { return writerIndex_ - readerIndex_; }
    /*可写的字节数*/
  size_t writableBytes() const
  { return buffer_.size() - writerIndex_; }
    /*可预留的空间*/
  size_t prependableBytes() const
  { return readerIndex_; }
    /*可读位置指针*/
  const char* peek() const
  { return begin() + readerIndex_; }
 
  const char* findCRLF() const
  {
    const char* crlf = std::search(peek(), beginWrite(), kCRLF, kCRLF+2);
    return crlf == beginWrite() ? NULL : crlf;
  }
 
  const char* findCRLF(const char* start) const
  {
    assert(peek() <= start);
    assert(start <= beginWrite());
    const char* crlf = std::search(start, beginWrite(), kCRLF, kCRLF+2);
    return crlf == beginWrite() ? NULL : crlf;
  }
 
  // retrieve returns void, to prevent
  // string str(retrieve(readableBytes()), readableBytes());
  // the evaluation of two functions are unspecified
  /*取回数据*/
  void retrieve(size_t len)
  {
    assert(len <= readableBytes());
    if (len < readableBytes())
    {
      readerIndex_ += len;
    }
    else
    {
      retrieveAll();
    }
  }
  /*取回到指定位置的数据*/
  void retrieveUntil(const char* end)
  {
    assert(peek() <= end);
    assert(end <= beginWrite());
    retrieve(end - peek());
  }
  /*取回一个整数*/
  void retrieveInt32()
  {
    retrieve(sizeof(int32_t));
  }
  /*取回两个字节*/
  void retrieveInt16()
  {
    retrieve(sizeof(int16_t));
  }
  /*取回一个字节*/
  void retrieveInt8()
  {
    retrieve(sizeof(int8_t));
  }
  /*取回全部数据*/
  void retrieveAll()
  {
    readerIndex_ = kCheapPrepend;
    writerIndex_ = kCheapPrepend;
  }
 
  string retrieveAllAsString()
  {
    return retrieveAsString(readableBytes());;
  }
 
  string retrieveAsString(size_t len)
  {
    assert(len <= readableBytes());
    string result(peek(), len);
    retrieve(len);
    return result;
  }
 
  StringPiece toStringPiece() const
  {
    return StringPiece(peek(), static_cast<int>(readableBytes()));
  }
  /*追加数据*/
  void append(const StringPiece& str)
  {
    append(str.data(), str.size());
  }
 
  void append(const char* /*restrict*/ data, size_t len)
  {
    ensureWritableBytes(len);
    std::copy(data, data+len, beginWrite());
    hasWritten(len);
  }
 
  void append(const void* /*restrict*/ data, size_t len)
  {
    append(static_cast<const char*>(data), len);
  }
 
  // 确保缓冲区可写空间>=len,如果不足则扩充
  void ensureWritableBytes(size_t len)
  {
    if (writableBytes() < len)
    {
      makeSpace(len);
    }
    assert(writableBytes() >= len);
  }
  /*可写位置的指针*/
  char* beginWrite()
  { return begin() + writerIndex_; }
 
  const char* beginWrite() const
  { return begin() + writerIndex_; }
 
  void hasWritten(size_t len)
  { writerIndex_ += len; }
 
  ///
  /// Append int32_t using network endian
  /// 使用网络字节序追加到buffer
  void appendInt32(int32_t x)
  {
    int32_t be32 = sockets::hostToNetwork32(x);
    append(&be32, sizeof be32);
  }
 
  void appendInt16(int16_t x)
  {
    int16_t be16 = sockets::hostToNetwork16(x);
    append(&be16, sizeof be16);
  }
 
  void appendInt8(int8_t x)
  {
    append(&x, sizeof x);
  }
 
  ///
  /// Read int32_t from network endian
  ///
  /// Require: buf->readableBytes() >= sizeof(int32_t)
  int32_t readInt32()
  {
    int32_t result = peekInt32();
    retrieveInt32();
    return result;
  }
 
  int16_t readInt16()
  {
    int16_t result = peekInt16();
    retrieveInt16();
    return result;
  }
 
  int8_t readInt8()
  {
    int8_t result = peekInt8();
    retrieveInt8();
    return result;
  }
 
  ///
  /// Peek int32_t from network endian
  ///
  /// Require: buf->readableBytes() >= sizeof(int32_t)
  int32_t peekInt32() const
  {
    assert(readableBytes() >= sizeof(int32_t));
    int32_t be32 = 0;
    ::memcpy(&be32, peek(), sizeof be32);
    return sockets::networkToHost32(be32);
  }
 
  int16_t peekInt16() const
  {
    assert(readableBytes() >= sizeof(int16_t));
    int16_t be16 = 0;
    ::memcpy(&be16, peek(), sizeof be16);
    return sockets::networkToHost16(be16);
  }
 
  int8_t peekInt8() const
  {
    assert(readableBytes() >= sizeof(int8_t));
    int8_t x = *peek();
    return x;
  }
 
  ///
  /// Prepend int32_t using network endian
  ///
  void prependInt32(int32_t x)
  {
    int32_t be32 = sockets::hostToNetwork32(x);
    prepend(&be32, sizeof be32);
  }
 
  void prependInt16(int16_t x)
  {
    int16_t be16 = sockets::hostToNetwork16(x);
    prepend(&be16, sizeof be16);
  }
 
  void prependInt8(int8_t x)
  {
    prepend(&x, sizeof x);
  }
 
  void prepend(const void* /*restrict*/ data, size_t len)
  {
    assert(len <= prependableBytes());
    readerIndex_ -= len;
    const char* d = static_cast<const char*>(data);
    std::copy(d, d+len, begin()+readerIndex_);
  }
 
  // 收缩,保留reserve个字节
  void shrink(size_t reserve)
  {
    // FIXME: use vector::shrink_to_fit() in C++ 11 if possible.
    Buffer other;
    other.ensureWritableBytes(readableBytes()+reserve);
    other.append(toStringPiece());
    swap(other);
  }
 
  /// Read data directly into buffer.
  ///
  /// It may implement with readv(2)
  /// @return result of read(2), @c errno is saved
  ssize_t readFd(int fd, int* savedErrno);
 
 private:
 
  char* begin()
  { return &*buffer_.begin(); }
 
  const char* begin() const
  { return &*buffer_.begin(); }
 
  void makeSpace(size_t len)
  {
    if (writableBytes() + prependableBytes() < len + kCheapPrepend)
    {
      // FIXME: move readable data
      buffer_.resize(writerIndex_+len);
    }
    else
    {
      // move readable data to the front, make space inside buffer
      assert(kCheapPrepend < readerIndex_);
      size_t readable = readableBytes();
      std::copy(begin()+readerIndex_,
                begin()+writerIndex_,
                begin()+kCheapPrepend);
      readerIndex_ = kCheapPrepend;
      writerIndex_ = readerIndex_ + readable;
      assert(readable == readableBytes());
    }
  }
 
 private:
  std::vector<char> buffer_;  // vector用于替代固定大小数组
  size_t readerIndex_;          // 读位置
  size_t writerIndex_;          // 写位置
 
  static const char kCRLF[];    // "\r\n"
};
 
}
}
#endif  // MUDUO_NET_BUFFER_H

BUFFER源文件

// Copyright 2010, Shuo Chen.  All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
 
// Author: Shuo Chen (chenshuo at chenshuo dot com)
//
#include <muduo/net/Buffer.h>
 
#include <muduo/net/SocketsOps.h>
 
#include <errno.h>
#include <sys/uio.h>
 
using namespace muduo;
using namespace muduo::net;
 
const char Buffer::kCRLF[] = "\r\n";
 
const size_t Buffer::kCheapPrepend;
const size_t Buffer::kInitialSize;
 
// 结合栈上的空间,避免内存使用过大,提高内存使用率
// 如果有5K个连接,每个连接就分配64K+64K的缓冲区的话,将占用640M内存,
// 而大多数时候,这些缓冲区的使用率很低
ssize_t Buffer::readFd(int fd, int* savedErrno)
{
  // saved an ioctl()/FIONREAD call to tell how much to read
  // 节省一次ioctl系统调用(获取有多少可读数据)
  char extrabuf[65536];
  struct iovec vec[2];
  const size_t writable = writableBytes();
  // 第一块缓冲区
  vec[0].iov_base = begin()+writerIndex_;
  vec[0].iov_len = writable;
  // 第二块缓冲区
  vec[1].iov_base = extrabuf;
  vec[1].iov_len = sizeof extrabuf;
  const ssize_t n = sockets::readv(fd, vec, 2);
  if (n < 0)
  {
    *savedErrno = errno;
  }
  else if (implicit_cast<size_t>(n) <= writable)   //第一块缓冲区足够容纳
  {
    writerIndex_ += n;
  }
  else      // 当前缓冲区,不够容纳,因而数据被接收到了第二块缓冲区extrabuf,将其append至buffer
  {
    writerIndex_ = buffer_.size();
    append(extrabuf, n - writable);
  }
  // if (n == writable + sizeof extrabuf)
  // {
  //   goto line_30;
  // }
  return n;
}

[37] BUFFER

应用程序想关闭连接,但是可能正处于发送数据的过程中,output buffer中有数据还没有发送完,不能直接调用`close`, `conn->send(buff)`==>`conn->shutdown()`==>`POLLOUT`事件.
关闭写的这一半: 连接状态更改为`kDisconnection`,并没有关闭连接
服务器主动断开与客户端的连接, 这意味着 客户端 `read`返回`0`, `close(conn)` ==> `POLLHUP|POLLIN`
如果是客户端主动关闭连接,那么服务器端只返回`POLLIN`,如果是服务器端主动关闭,然后客户端再关闭,那么服务器端返回`POLLIN`和`POLLHUP`事件

测试程序

服务器

#include <muduo/net/TcpServer.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/InetAddress.h>
 
#include <boost/bind.hpp>
 
#include <stdio.h>
 
using namespace muduo;
using namespace muduo::net;
 
class TestServer
{
 public:
  TestServer(EventLoop* loop,
             const InetAddress& listenAddr)
    : loop_(loop),
      server_(loop, listenAddr, "TestServer")
  {
    server_.setConnectionCallback(
        boost::bind(&TestServer::onConnection, this, _1));
    server_.setMessageCallback(
        boost::bind(&TestServer::onMessage, this, _1, _2, _3));
 
    message1_.resize(100);
    message2_.resize(200);
    std::fill(message1_.begin(), message1_.end(), 'A');
    std::fill(message2_.begin(), message2_.end(), 'B');
  }
 
  void start()
  {
      server_.start();
  }
 
 private:
  void onConnection(const TcpConnectionPtr& conn)
  {
    if (conn->connected())
    {
      printf("onConnection(): new connection [%s] from %s\n",
             conn->name().c_str(),
             conn->peerAddress().toIpPort().c_str());
      conn->send(message1_);
      conn->send(message2_);
      conn->shutdown();
    }
    else
    {
      printf("onConnection(): connection [%s] is down\n",
             conn->name().c_str());
    }
  }
 
  void onMessage(const TcpConnectionPtr& conn,
                 Buffer* buf,
                 Timestamp receiveTime)
  {
    muduo::string msg(buf->retrieveAllAsString());
    printf("onMessage(): received %zd bytes from connection [%s] at %s\n",
           msg.size(),
           conn->name().c_str(),
           receiveTime.toFormattedString().c_str());
 
    conn->send(msg);
  }
 
  EventLoop* loop_;
  TcpServer server_;
 
  muduo::string message1_;
  muduo::string message2_;
};
 
 
int main()
{
  printf("main(): pid = %d\n", getpid());
 
  InetAddress listenAddr(8888);
  EventLoop loop;
 
  TestServer server(&loop, listenAddr);
  server.start();
 
  loop.loop();
}

客户端

#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
 
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>
 //该程序可能出现粘包问题
#define ERR_EXIT(m) \
        do \
        { \
                perror(m); \
                exit(EXIT_FAILURE); \
        } while(0)
 
int main(void)
{
    int sock;
    if ((sock = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0)
        ERR_EXIT("socket");
 
    struct sockaddr_in servaddr;
    memset(&servaddr, 0, sizeof(servaddr));
    servaddr.sin_family = AF_INET;
    servaddr.sin_port = htons(8888);
    servaddr.sin_addr.s_addr = inet_addr("127.0.0.1");
 
    if (connect(sock, (struct sockaddr*)&servaddr, sizeof(servaddr)) < 0)
        ERR_EXIT("connect");
 
    char buf[1024] ={0};
    while (1)
    {
        int ret = read(sock, buf, sizeof(buf));
        printf("ret=%d\n", ret);
        if (ret == 0)
            break;
 
        fputs(buf, stdout);
        fflush(stdout);
        memset(buf, 0, sizeof(buf));
    }
 
    sleep(10);
    close(sock);
 
    return 0;
}

程序输出

服务器:

ubuntu@ubuntu-virtual-machine:~/pro/37$ ./build/debug/bin/reactor_test12
main(): pid = 8683
20131024 07:19:37.457271Z  8683 TRACE updateChannel fd = 4 events = 3 - EPollPoller.cc:104
20131024 07:19:37.457470Z  8683 TRACE EventLoop EventLoop created 0xBFDDAB44 in thread 8683 - EventLoop.cc:62
20131024 07:19:37.457485Z  8683 TRACE updateChannel fd = 5 events = 3 - EPollPoller.cc:104
20131024 07:19:37.457645Z  8683 TRACE updateChannel fd = 6 events = 3 - EPollPoller.cc:104
20131024 07:19:37.457661Z  8683 TRACE loop EventLoop 0xBFDDAB44 start looping - EventLoop.cc:94
20131024 07:19:47.468016Z  8683 TRACE poll  nothing happended - EPollPoller.cc:74
20131024 07:19:57.478500Z  8683 TRACE poll  nothing happended - EPollPoller.cc:74
20131024 07:20:07.488977Z  8683 TRACE poll  nothing happended - EPollPoller.cc:74
20131024 07:20:17.499503Z  8683 TRACE poll  nothing happended - EPollPoller.cc:74
20131024 07:20:27.509985Z  8683 TRACE poll  nothing happended - EPollPoller.cc:74
20131024 07:20:37.520451Z  8683 TRACE poll  nothing happended - EPollPoller.cc:74
20131024 07:20:47.528439Z  8683 TRACE poll  nothing happended - EPollPoller.cc:74
20131024 07:20:57.538942Z  8683 TRACE poll  nothing happended - EPollPoller.cc:74
20131024 07:21:01.262358Z  8683 TRACE poll 1 events happended - EPollPoller.cc:65
20131024 07:21:01.262792Z  8683 TRACE printActiveChannels {6: IN }  - EventLoop.cc:257
20131024 07:21:01.262820Z  8683 TRACE handleEventWithGuard 2222222222222222POLLIN22222222222222222222 - Channel.cc:89
20131024 07:21:01.262881Z  8683 INFO  TcpServer::newConnection [TestServer] - new connection [TestServer:0.0.0.0:8888#1] from 127.0.0.1:56659 - TcpServer.cc:93
20131024 07:21:01.262916Z  8683 DEBUG TcpConnection TcpConnection::ctor[TestServer:0.0.0.0:8888#1] at 0x96C65D0 fd=8 - TcpConnection.cc:62
20131024 07:21:01.262934Z  8683 TRACE newConnection [1] usecount=1 - TcpServer.cc:111
20131024 07:21:01.262961Z  8683 TRACE newConnection [2] usecount=2 - TcpServer.cc:113
20131024 07:21:01.262981Z  8683 TRACE connectEstablished [3] usecount=6 - TcpConnection.cc:231
20131024 07:21:01.262998Z  8683 TRACE updateChannel fd = 8 events = 3 - EPollPoller.cc:104
onConnection(): new connection [TestServer:0.0.0.0:8888#1] from 127.0.0.1:56659
20131024 07:21:01.263363Z  8683 TRACE connectEstablished [4] usecount=6 - TcpConnection.cc:236
20131024 07:21:01.263378Z  8683 TRACE newConnection [5] usecount=2 - TcpServer.cc:122
20131024 07:21:11.265287Z  8683 TRACE poll 1 events happended - EPollPoller.cc:65
20131024 07:21:11.265469Z  8683 TRACE printActiveChannels {8: IN HUP }  - EventLoop.cc:257
20131024 07:21:11.265483Z  8683 TRACE handleEvent [6] usecount=2 - Channel.cc:67
20131024 07:21:11.265491Z  8683 TRACE handleEventWithGuard 1111111111111111POLLHUP1111111111111111111 - Channel.cc:85
20131024 07:21:11.265498Z  8683 TRACE handleEventWithGuard 2222222222222222POLLIN22222222222222222222 - Channel.cc:89
20131024 07:21:11.265556Z  8683 TRACE handleClose fd = 8 state = 3 - TcpConnection.cc:297
20131024 07:21:11.265567Z  8683 TRACE updateChannel fd = 8 events = 0 - EPollPoller.cc:104
onConnection(): connection [TestServer:0.0.0.0:8888#1] is down
20131024 07:21:11.265588Z  8683 TRACE handleClose [7] usecount=3 - TcpConnection.cc:305
20131024 07:21:11.265604Z  8683 INFO  TcpServer::removeConnectionInLoop [TestServer] - connection TestServer:0.0.0.0:8888#1 - TcpServer.cc:153
20131024 07:21:11.265611Z  8683 TRACE removeConnectionInLoop [8] usecount=6 - TcpServer.cc:157
20131024 07:21:11.265626Z  8683 TRACE removeConnectionInLoop [9] usecount=5 - TcpServer.cc:159
20131024 07:21:11.265639Z  8683 TRACE removeConnectionInLoop [10] usecount=6 - TcpServer.cc:170
20131024 07:21:11.265646Z  8683 TRACE handleClose [11] usecount=3 - TcpConnection.cc:308
20131024 07:21:11.265652Z  8683 TRACE handleEvent [12] usecount=2 - Channel.cc:69
20131024 07:21:11.265659Z  8683 TRACE removeChannel fd = 8 - EPollPoller.cc:147
20131024 07:21:11.265682Z  8683 DEBUG ~TcpConnection TcpConnection::dtor[TestServer:0.0.0.0:8888#1] at 0x96C65D0 fd=8 - TcpConnection.cc:69
20131024 07:21:21.275816Z  8683 TRACE poll  nothing happended - EPollPoller.cc:74
20131024 07:21:31.285830Z  8683 TRACE poll  nothing happended - EPollPoller.cc:74
20131024 07:21:41.296314Z  8683 TRACE poll  nothing happended - EPollPoller.cc:74
^C
ubuntu@ubuntu-virtual-machine:~/pro/37$

客户端:

ubuntu@ubuntu-virtual-machine:~/pro/37$ ./a.out
ret=300
AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBret=0

[38] 完善TcpConnection

完善TcpConnection WriteCompleteCallback含义 HighWaterMarkCallback含义 boost::any context_ signal(SIGPIPE, SIG_IGN) // 请看channel class 可变类型解决方案 void*. 这种方法不是类型安全的 boost::any boost::any 任意类型的类型安全存储以及安全的取回 在标准库容器中存放不同类型的方法,比如说vector<boost::any> TcpConnection 完整的头文件

TcpConnection.h

// Copyright 2010, Shuo Chen.  All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
 
// Author: Shuo Chen (chenshuo at chenshuo dot com)
//
// This is a public header file, it must only include public header files.
 
#ifndef MUDUO_NET_TCPCONNECTION_H
#define MUDUO_NET_TCPCONNECTION_H
 
#include <muduo/base/Mutex.h>
#include <muduo/base/StringPiece.h>
#include <muduo/base/Types.h>
#include <muduo/net/Callbacks.h>
#include <muduo/net/Buffer.h>
#include <muduo/net/InetAddress.h>
 
#include <boost/any.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/noncopyable.hpp>
#include <boost/scoped_ptr.hpp>
#include <boost/shared_ptr.hpp>
 
namespace muduo
{
namespace net
{
 
class Channel;
class EventLoop;
class Socket;
 
///
/// TCP connection, for both client and server usage.
///
/// This is an interface class, so don't expose too much details.
class TcpConnection : boost::noncopyable,
                      public boost::enable_shared_from_this<TcpConnection>
{
 public:
  /// Constructs a TcpConnection with a connected sockfd
  ///
  /// User should not create this object.
  TcpConnection(EventLoop* loop,
                const string& name,
                int sockfd,
                const InetAddress& localAddr,
                const InetAddress& peerAddr);
  ~TcpConnection();
 
  EventLoop* getLoop() const { return loop_; }
  const string& name() const { return name_; }
  const InetAddress& localAddress() { return localAddr_; }
  const InetAddress& peerAddress() { return peerAddr_; }
  bool connected() const { return state_ == kConnected; }
 
  // void send(string&& message); // C++11
  void send(const void* message, size_t len);
  void send(const StringPiece& message);
  // void send(Buffer&& message); // C++11
  void send(Buffer* message);  // this one will swap data
  void shutdown(); // NOT thread safe, no simultaneous calling
  void setTcpNoDelay(bool on);
 
  void setContext(const boost::any& context)
  { context_ = context; }
 
  const boost::any& getContext() const
  { return context_; }
 
  boost::any* getMutableContext()
  { return &context_; }
 
  void setConnectionCallback(const ConnectionCallback& cb)
  { connectionCallback_ = cb; }
 
  void setMessageCallback(const MessageCallback& cb)
  { messageCallback_ = cb; }
 
  void setWriteCompleteCallback(const WriteCompleteCallback& cb)
  { writeCompleteCallback_ = cb; }
 
  void setHighWaterMarkCallback(const HighWaterMarkCallback& cb, size_t highWaterMark)
  { highWaterMarkCallback_ = cb; highWaterMark_ = highWaterMark; }
 
  Buffer* inputBuffer()
  { return &inputBuffer_; }
 
  /// Internal use only.
  void setCloseCallback(const CloseCallback& cb)
  { closeCallback_ = cb; }
 
  // called when TcpServer accepts a new connection
  void connectEstablished();   // should be called only once
  // called when TcpServer has removed me from its map
  void connectDestroyed();  // should be called only once
 
 private:
  enum StateE { kDisconnected, kConnecting, kConnected, kDisconnecting };
  void handleRead(Timestamp receiveTime);
  void handleWrite();
  void handleClose();
  void handleError();
  void sendInLoop(const StringPiece& message);
  void sendInLoop(const void* message, size_t len);
  void shutdownInLoop();
  void setState(StateE s) { state_ = s; }
 
  EventLoop* loop_;         // 所属EventLoop
  string name_;             // 连接名
  StateE state_;  // FIXME: use atomic variable
  // we don't expose those classes to client.
  boost::scoped_ptr<Socket> socket_;
  boost::scoped_ptr<Channel> channel_;
  InetAddress localAddr_;
  InetAddress peerAddr_;
  ConnectionCallback connectionCallback_;
  MessageCallback messageCallback_;
  WriteCompleteCallback writeCompleteCallback_;     // 数据发送完毕回调函数,即所有的用户数据都已拷贝到内核缓冲区时回调该函数
                                                    // outputBuffer_被清空也会回调该函数,可以理解为低水位标回调函数
  HighWaterMarkCallback highWaterMarkCallback_;     // 高水位标回调函数
  CloseCallback closeCallback_;
  size_t highWaterMark_;        // 高水位标
  Buffer inputBuffer_;          // 应用层接收缓冲区
  Buffer outputBuffer_;         // 应用层发送缓冲区
  boost::any context_;          // 绑定一个未知类型的上下文对象
};
 
typedef boost::shared_ptr<TcpConnection> TcpConnectionPtr;
 
}
}
 
#endif  // MUDUO_NET_TCPCONNECTION_H

TcpConnection 完整的源文件

TcpConnection.cc

// Copyright 2010, Shuo Chen.  All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
 
// Author: Shuo Chen (chenshuo at chenshuo dot com)
 
#include <muduo/net/TcpConnection.h>
 
#include <muduo/base/Logging.h>
#include <muduo/net/Channel.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/Socket.h>
#include <muduo/net/SocketsOps.h>
 
#include <boost/bind.hpp>
 
#include <errno.h>
#include <stdio.h>
 
using namespace muduo;
using namespace muduo::net;
 
void muduo::net::defaultConnectionCallback(const TcpConnectionPtr& conn)
{
  LOG_TRACE << conn->localAddress().toIpPort() << " -> "
            << conn->peerAddress().toIpPort() << " is "
            << (conn->connected() ? "UP" : "DOWN");
}
 
void muduo::net::defaultMessageCallback(const TcpConnectionPtr&,
                                        Buffer* buf,
                                        Timestamp)
{
  buf->retrieveAll();
}
 
TcpConnection::TcpConnection(EventLoop* loop,
                             const string& nameArg,
                             int sockfd,
                             const InetAddress& localAddr,
                             const InetAddress& peerAddr)
  : loop_(CHECK_NOTNULL(loop)),
    name_(nameArg),
    state_(kConnecting),
    socket_(new Socket(sockfd)),
    channel_(new Channel(loop, sockfd)),
    localAddr_(localAddr),
    peerAddr_(peerAddr),
    highWaterMark_(64*1024*1024)
{
  // 通道可读事件到来的时候,回调TcpConnection::handleRead,_1是事件发生时间
  channel_->setReadCallback(
      boost::bind(&TcpConnection::handleRead, this, _1));
  // 通道可写事件到来的时候,回调TcpConnection::handleWrite
  channel_->setWriteCallback(
      boost::bind(&TcpConnection::handleWrite, this));
  // 连接关闭,回调TcpConnection::handleClose
  channel_->setCloseCallback(
      boost::bind(&TcpConnection::handleClose, this));
  // 发生错误,回调TcpConnection::handleError
  channel_->setErrorCallback(
      boost::bind(&TcpConnection::handleError, this));
  LOG_DEBUG << "TcpConnection::ctor[" <<  name_ << "] at " << this
            << " fd=" << sockfd;
  socket_->setKeepAlive(true);
}
 
TcpConnection::~TcpConnection()
{
  LOG_DEBUG << "TcpConnection::dtor[" <<  name_ << "] at " << this
            << " fd=" << channel_->fd();
}
 
// 线程安全,可以跨线程调用
void TcpConnection::send(const void* data, size_t len)
{
  if (state_ == kConnected)
  {
    if (loop_->isInLoopThread())
    {
      sendInLoop(data, len);
    }
    else
    {
      string message(static_cast<const char*>(data), len);
      loop_->runInLoop(
          boost::bind(&TcpConnection::sendInLoop,
                      this,
                      message));
    }
  }
}
 
// 线程安全,可以跨线程调用
void TcpConnection::send(const StringPiece& message)
{
  if (state_ == kConnected)
  {
    if (loop_->isInLoopThread())
    {
      sendInLoop(message);
    }
    else
    {
      loop_->runInLoop(
          boost::bind(&TcpConnection::sendInLoop,
                      this,
                      message.as_string()));
                    //std::forward<string>(message)));
    }
  }
}
 
// 线程安全,可以跨线程调用
void TcpConnection::send(Buffer* buf)
{
  if (state_ == kConnected)
  {
    if (loop_->isInLoopThread())
    {
      sendInLoop(buf->peek(), buf->readableBytes());
      buf->retrieveAll();
    }
    else
    {
      loop_->runInLoop(
          boost::bind(&TcpConnection::sendInLoop,
                      this,
                      buf->retrieveAllAsString()));
                    //std::forward<string>(message)));
    }
  }
}
 
void TcpConnection::sendInLoop(const StringPiece& message)
{
  sendInLoop(message.data(), message.size());
}
 
void TcpConnection::sendInLoop(const void* data, size_t len)
{
  /*
  loop_->assertInLoopThread();
  sockets::write(channel_->fd(), data, len);
  */
 
  loop_->assertInLoopThread();
  ssize_t nwrote = 0;
  size_t remaining = len;
  bool error = false;
  if (state_ == kDisconnected)
  {
    LOG_WARN << "disconnected, give up writing";
    return;
  }
  // if no thing in output queue, try writing directly
  // 通道没有关注可写事件并且发送缓冲区没有数据,直接write
  if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0)
  {
    nwrote = sockets::write(channel_->fd(), data, len);
    if (nwrote >= 0)
    {
      remaining = len - nwrote;
      // 写完了,回调writeCompleteCallback_
      if (remaining == 0 && writeCompleteCallback_)
      {
        loop_->queueInLoop(boost::bind(writeCompleteCallback_, shared_from_this()));
      }
    }
    else // nwrote < 0
    {
      nwrote = 0;
      if (errno != EWOULDBLOCK)
      {
        LOG_SYSERR << "TcpConnection::sendInLoop";
        if (errno == EPIPE) // FIXME: any others?
        {
          error = true;
        }
      }
    }
  }
 
  assert(remaining <= len);
  // 没有错误,并且还有未写完的数据(说明内核发送缓冲区满,要将未写完的数据添加到output buffer中)
  if (!error && remaining > 0)
  {
    LOG_TRACE << "I am going to write more data";
    size_t oldLen = outputBuffer_.readableBytes();
    // 如果超过highWaterMark_(高水位标),回调highWaterMarkCallback_
    //即使oldLen + remaining >= highWaterMark_ ,remain的数据也是可以存放到buffer中的,因为buffer
    //自动伸缩的
    if (oldLen + remaining >= highWaterMark_
        && oldLen < highWaterMark_
        && highWaterMarkCallback_)
    {
      loop_->queueInLoop(boost::bind(highWaterMarkCallback_, shared_from_this(), oldLen + remaining));
    }
    outputBuffer_.append(static_cast<const char*>(data)+nwrote, remaining);
    if (!channel_->isWriting())
    {
      channel_->enableWriting();     // 关注POLLOUT事件
    }
  }
}
 
void TcpConnection::shutdown()
{
  // FIXME: use compare and swap
  if (state_ == kConnected)
  {
    setState(kDisconnecting);
    // FIXME: shared_from_this()?
    loop_->runInLoop(boost::bind(&TcpConnection::shutdownInLoop, this));
  }
}
 
void TcpConnection::shutdownInLoop()
{
  loop_->assertInLoopThread();
  if (!channel_->isWriting())
  {
    // we are not writing
    socket_->shutdownWrite();
  }
}
 
void TcpConnection::setTcpNoDelay(bool on)
{
  socket_->setTcpNoDelay(on);
}
 
void TcpConnection::connectEstablished()
{
  loop_->assertInLoopThread();
  assert(state_ == kConnecting);
  setState(kConnected);
  LOG_TRACE << "[3] usecount=" << shared_from_this().use_count();
  channel_->tie(shared_from_this());
  channel_->enableReading(); // TcpConnection所对应的通道加入到Poller关注
 
  connectionCallback_(shared_from_this());
  LOG_TRACE << "[4] usecount=" << shared_from_this().use_count();
}
 
void TcpConnection::connectDestroyed()
{
  loop_->assertInLoopThread();
  if (state_ == kConnected)
  {
    setState(kDisconnected);
    channel_->disableAll();
 
    connectionCallback_(shared_from_this());
  }
  channel_->remove();
}
 
void TcpConnection::handleRead(Timestamp receiveTime)
{
  /*
  loop_->assertInLoopThread();
  int savedErrno = 0;
  ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno);
  if (n > 0)
  {
    messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);
  }
  else if (n == 0)
  {
    handleClose();
  }
  else
  {
    errno = savedErrno;
    LOG_SYSERR << "TcpConnection::handleRead";
    handleError();
  }
  */
 
  /*
  loop_->assertInLoopThread();
  int savedErrno = 0;
  char buf[65536];
  ssize_t n = ::read(channel_->fd(), buf, sizeof buf);
  if (n > 0)
  {
    messageCallback_(shared_from_this(), buf, n);
  }
  else if (n == 0)
  {
    handleClose();
  }
  else
  {
    errno = savedErrno;
    LOG_SYSERR << "TcpConnection::handleRead";
    handleError();
  }
  */
  loop_->assertInLoopThread();
  int savedErrno = 0;
  /*一次性读完内核缓冲区的数据,这可以防止busy loop*/
  ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno);
  if (n > 0)
  {
    messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);
  }
  else if (n == 0)
  {
    //读到文件eof ,就是对等方已经关闭连接
    handleClose();
  }
  else
  {
    errno = savedErrno;
    LOG_SYSERR << "TcpConnection::handleRead";
    handleError();
  }
}
 
// 内核发送缓冲区有空间了,回调该函数
void TcpConnection::handleWrite()
{
  loop_->assertInLoopThread();
  /*通道有关注write事件 */
  if (channel_->isWriting())
  {
    /*写数据到内核中*/
    ssize_t n = sockets::write(channel_->fd(),
                               outputBuffer_.peek(),
                               outputBuffer_.readableBytes());
    if (n > 0)
    {
      outputBuffer_.retrieve(n);
      if (outputBuffer_.readableBytes() == 0)    // 发送缓冲区已清空
      {
        channel_->disableWriting();      // 停止关注POLLOUT事件,以免出现busy loop
        if (writeCompleteCallback_)     // 回调writeCompleteCallback_
        {
          // 应用层发送缓冲区被清空,就回调用writeCompleteCallback_
          loop_->queueInLoop(boost::bind(writeCompleteCallback_, shared_from_this()));
        }
        /*如果状态码是kDisconnecting ,说明应用层已经发起shutdown命令了,这是最后一个数据包,
        所以发完就可以关闭了。
        请参照void TcpConnection::shutdownInLoop()
        */
        if (state_ == kDisconnecting)   // 发送缓冲区已清空并且连接状态是kDisconnecting, 要关闭连接
        {
          shutdownInLoop();     // 关闭连接
        }
      }
      else
      {
        LOG_TRACE << "I am going to write more data";
      }
    }
    else
    {
      LOG_SYSERR << "TcpConnection::handleWrite";
      // if (state_ == kDisconnecting)
      // {
      //   shutdownInLoop();
      // }
    }
  }
  else
  {
    LOG_TRACE << "Connection fd = " << channel_->fd()
              << " is down, no more writing";
  }
}
 
void TcpConnection::handleClose()
{
  loop_->assertInLoopThread();
  LOG_TRACE << "fd = " << channel_->fd() << " state = " << state_;
  assert(state_ == kConnected || state_ == kDisconnecting);
  // we don't close fd, leave it to dtor, so we can find leaks easily.
  setState(kDisconnected);
  channel_->disableAll();
 
  TcpConnectionPtr guardThis(shared_from_this());
  connectionCallback_(guardThis);       // 这一行,可以不调用
  LOG_TRACE << "[7] usecount=" << guardThis.use_count();
  // must be the last line
  closeCallback_(guardThis);    // 调用TcpServer::removeConnection
  LOG_TRACE << "[11] usecount=" << guardThis.use_count();
}
 
void TcpConnection::handleError()
{
  int err = sockets::getSocketError(channel_->fd());
  LOG_ERROR << "TcpConnection::handleError [" << name_
            << "] - SO_ERROR = " << err << " " << strerror_tl(err);
}

测试程序

#include <muduo/net/TcpServer.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/InetAddress.h>
 
#include <boost/bind.hpp>
 
#include <stdio.h>
/*
程序说明:
  客户端请求连接-------》服务器马上发送一堆数据
 
*/
using namespace muduo;
using namespace muduo::net;
 
class TestServer
{
 public:
  TestServer(EventLoop* loop,
             const InetAddress& listenAddr)
    : loop_(loop),
      server_(loop, listenAddr, "TestServer")
  {
    server_.setConnectionCallback(
        boost::bind(&TestServer::onConnection, this, _1));
    server_.setMessageCallback(
        boost::bind(&TestServer::onMessage, this, _1, _2, _3));
    server_.setWriteCompleteCallback(
      boost::bind(&TestServer::onWriteComplete, this, _1));
 
    // 这是一个数据生成协议
    string line;
    for (int i = 33; i < 127; ++i)
    {
      line.push_back(char(i));
    }
    line += line;
 
    for (size_t i = 0; i < 127-33; ++i)
    {
      message_ += line.substr(i, 72) + '\n';
    }
  }
 
  void start()
  {
      server_.start();
  }
 
 private:
  void onConnection(const TcpConnectionPtr& conn)
  {
    if (conn->connected())
    {
      printf("onConnection(): new connection [%s] from %s\n",
             conn->name().c_str(),
             conn->peerAddress().toIpPort().c_str());
 
      conn->setTcpNoDelay(true);
      /*连接到来直接发送数据*/
      conn->send(message_);
    }
    else
    {
      printf("onConnection(): connection [%s] is down\n",
             conn->name().c_str());
    }
  }
 
  void onMessage(const TcpConnectionPtr& conn,
                 Buffer* buf,
                 Timestamp receiveTime)
  {
    muduo::string msg(buf->retrieveAllAsString());
    printf("onMessage(): received %zd bytes from connection [%s] at %s\n",
           msg.size(),
           conn->name().c_str(),
           receiveTime.toFormattedString().c_str());
 
    conn->send(msg);
  }
 
  void onWriteComplete(const TcpConnectionPtr& conn)
  { /*可写事件,在发送*/
    conn->send(message_);
  }
 
  EventLoop* loop_;
  TcpServer server_;
 
  muduo::string message_;
};
 
 
int main()
{
  printf("main(): pid = %d\n", getpid());
 
  InetAddress listenAddr(8888);
  EventLoop loop;
 
  TestServer server(&loop, listenAddr);
  server.start();
 
  loop.loop();
}

[39] TcpClient和connector

  • muduo库对编写tcp(主动发起连接TcpClient)
  • 客户端程序的支持Connector(包含了一个Connector对象)

测试程序

  • Reactor_test11.cc     // echo server
  • TcpClient_test.cc     // echo client

TcpClient 头文件

TcpClient.h

// Copyright 2010, Shuo Chen.  All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
 
// Author: Shuo Chen (chenshuo at chenshuo dot com)
//
// This is a public header file, it must only include public header files.
 
#ifndef MUDUO_NET_TCPCLIENT_H
#define MUDUO_NET_TCPCLIENT_H
 
#include <boost/noncopyable.hpp>
 
#include <muduo/base/Mutex.h>
#include <muduo/net/TcpConnection.h>
 
namespace muduo
{
namespace net
{
 
class Connector;
typedef boost::shared_ptr<Connector> ConnectorPtr;
 
class TcpClient : boost::noncopyable
{
 public:
  // TcpClient(EventLoop* loop);
  // TcpClient(EventLoop* loop, const string& host, uint16_t port);
  TcpClient(EventLoop* loop,
            const InetAddress& serverAddr,
            const string& name);
  ~TcpClient();  // force out-line dtor, for scoped_ptr members.
 
  void connect();
  void disconnect();
  void stop();
 
  TcpConnectionPtr connection() const
  {
    MutexLockGuard lock(mutex_);
    return connection_;
  }
 
  bool retry() const;
  void enableRetry() { retry_ = true; }
 
  /// Set connection callback.
  /// Not thread safe.
  void setConnectionCallback(const ConnectionCallback& cb)
  { connectionCallback_ = cb; }
 
  /// Set message callback.
  /// Not thread safe.
  void setMessageCallback(const MessageCallback& cb)
  { messageCallback_ = cb; }
 
  /// Set write complete callback.
  /// Not thread safe.
  void setWriteCompleteCallback(const WriteCompleteCallback& cb)
  { writeCompleteCallback_ = cb; }
 
 private:
  /// Not thread safe, but in loop
  void newConnection(int sockfd);
  /// Not thread safe, but in loop
  void removeConnection(const TcpConnectionPtr& conn);
 
  EventLoop* loop_;
  ConnectorPtr connector_;  // 用于主动发起连接
  const string name_;       // 名称
  ConnectionCallback connectionCallback_;       // 连接建立回调函数
  MessageCallback messageCallback_;             // 消息到来回调函数
  WriteCompleteCallback writeCompleteCallback_; // 数据发送完毕回调函数
  bool retry_;   // 重连,是指连接建立之后又断开的时候是否重连,
                //这个跟connector里面的retry是不一样的,connector里面的retry表示连接不成功时是否要重连
  bool connect_; // atomic 是否发起连接
  // always in loop thread
  int nextConnId_;          // name_ + nextConnId_用于标识一个连接
  mutable MutexLock mutex_;
  TcpConnectionPtr connection_; // Connector连接成功以后,得到一个TcpConnection
};
 
}
}
 
#endif  // MUDUO_NET_TCPCLIENT_H

TcpClient 源文件

TcpClient.cc

// Copyright 2010, Shuo Chen.  All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
 
// Author: Shuo Chen (chenshuo at chenshuo dot com)
//
 
#include <muduo/net/TcpClient.h>
 
#include <muduo/base/Logging.h>
#include <muduo/net/Connector.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/SocketsOps.h>
 
#include <boost/bind.hpp>
 
#include <stdio.h>  // snprintf
 
using namespace muduo;
using namespace muduo::net;
 
// TcpClient::TcpClient(EventLoop* loop)
//   : loop_(loop)
// {
// }
 
// TcpClient::TcpClient(EventLoop* loop, const string& host, uint16_t port)
//   : loop_(CHECK_NOTNULL(loop)),
//     serverAddr_(host, port)
// {
// }
 
namespace muduo
{
namespace net
{
namespace detail
{
 
void removeConnection(EventLoop* loop, const TcpConnectionPtr& conn)
{
  loop->queueInLoop(boost::bind(&TcpConnection::connectDestroyed, conn));
}
 
void removeConnector(const ConnectorPtr& connector)
{
  //connector->
}
 
}
}
}
 
TcpClient::TcpClient(EventLoop* loop,
                     const InetAddress& serverAddr,
                     const string& name)
  : loop_(CHECK_NOTNULL(loop)),
    connector_(new Connector(loop, serverAddr)),
    name_(name),
    connectionCallback_(defaultConnectionCallback),
    messageCallback_(defaultMessageCallback),
    retry_(false),
    connect_(true),
    nextConnId_(1)
{
  // 设置连接成功回调函数
  connector_->setNewConnectionCallback(
      boost::bind(&TcpClient::newConnection, this, _1));
  // FIXME setConnectFailedCallback
  LOG_INFO << "TcpClient::TcpClient[" << name_
           << "] - connector " << get_pointer(connector_);
}
 
TcpClient::~TcpClient()
{
  LOG_INFO << "TcpClient::~TcpClient[" << name_
           << "] - connector " << get_pointer(connector_);
  TcpConnectionPtr conn;
  {
    MutexLockGuard lock(mutex_);
    conn = connection_;
  }
  if (conn)
  {
    // FIXME: not 100% safe, if we are in different thread
 
    // 重新设置TcpConnection中的closeCallback_为detail::removeConnection
    //这里不用TcpClient::removeConnection的原因是 TcpClient::removeConnection 有重连功能,
    //这里已经不需要重连了,直接调用detail::removeConnection就行了
    CloseCallback cb = boost::bind(&detail::removeConnection, loop_, _1);
    loop_->runInLoop(
        boost::bind(&TcpConnection::setCloseCallback, conn, cb));
  }
  else
  {
    // 这种情况,说明connector处于未连接状态,将connector_停止
    connector_->stop();
    // FIXME: HACK
    loop_->runAfter(1, boost::bind(&detail::removeConnector, connector_));
  }
}
 
void TcpClient::connect()
{
  // FIXME: check state
  LOG_INFO << "TcpClient::connect[" << name_ << "] - connecting to "
           << connector_->serverAddress().toIpPort();
  connect_ = true;
  connector_->start();   // 发起连接
}
 
// 用于连接已建立的情况下,关闭连接
void TcpClient::disconnect()
{
  connect_ = false;
 
  {
    MutexLockGuard lock(mutex_);
    if (connection_)
    {
      connection_->shutdown();
    }
  }
}
 
// 停止connector_ ,连接尚未成功时进行断开
void TcpClient::stop()
{
  connect_ = false;
  connector_->stop();
}
 
//有新的连接到来
void TcpClient::newConnection(int sockfd)
{
  loop_->assertInLoopThread();
  InetAddress peerAddr(sockets::getPeerAddr(sockfd));
  char buf[32];
  snprintf(buf, sizeof buf, ":%s#%d", peerAddr.toIpPort().c_str(), nextConnId_);
  ++nextConnId_;
  string connName = name_ + buf;
 
  InetAddress localAddr(sockets::getLocalAddr(sockfd));
  // FIXME poll with zero timeout to double confirm the new connection
  // FIXME use make_shared if necessary
  TcpConnectionPtr conn(new TcpConnection(loop_,
                                          connName,
                                          sockfd,
                                          localAddr,
                                          peerAddr));
 
  conn->setConnectionCallback(connectionCallback_);
  conn->setMessageCallback(messageCallback_);
  conn->setWriteCompleteCallback(writeCompleteCallback_);
  conn->setCloseCallback(
      boost::bind(&TcpClient::removeConnection, this, _1)); // FIXME: unsafe
  {
    MutexLockGuard lock(mutex_);
    connection_ = conn;     // 保存TcpConnection
  }
  conn->connectEstablished();        // 这里回调connectionCallback_
}
 
/*移除connector*/
void TcpClient::removeConnection(const TcpConnectionPtr& conn)
{
  loop_->assertInLoopThread();
  assert(loop_ == conn->getLoop());
 
  {
    MutexLockGuard lock(mutex_);
    assert(connection_ == conn);
    connection_.reset();
  }
  /*放到IO线程中销毁*/
  loop_->queueInLoop(boost::bind(&TcpConnection::connectDestroyed, conn));
  //如果需要重连,则重新连接
  if (retry_ && connect_)
  {
    LOG_INFO << "TcpClient::connect[" << name_ << "] - Reconnecting to "
             << connector_->serverAddress().toIpPort();
    // 这里的重连是指连接建立成功之后被断开的重连
    connector_->restart();
  }
}

Connector 头文件

Connector.h

// Copyright 2010, Shuo Chen.  All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
 
// Author: Shuo Chen (chenshuo at chenshuo dot com)
//
// This is an internal header file, you should not include this.
 
#ifndef MUDUO_NET_CONNECTOR_H
#define MUDUO_NET_CONNECTOR_H
 
#include <muduo/net/InetAddress.h>
 
#include <boost/enable_shared_from_this.hpp>
#include <boost/function.hpp>
#include <boost/noncopyable.hpp>
#include <boost/scoped_ptr.hpp>
 
namespace muduo
{
namespace net
{
 
class Channel;
class EventLoop;
 
// 主动发起连接,带有自动重连功能
class Connector : boost::noncopyable,
                  public boost::enable_shared_from_this<Connector>
{
 public:
  typedef boost::function<void (int sockfd)> NewConnectionCallback;
 
  Connector(EventLoop* loop, const InetAddress& serverAddr);
  ~Connector();
  /*设置连接成功后的回调函数*/
  void setNewConnectionCallback(const NewConnectionCallback& cb)
  { newConnectionCallback_ = cb; }
 
  void start();  // can be called in any thread
  void restart();  // must be called in loop thread
  void stop();  // can be called in any thread
//服务器IP地址
  const InetAddress& serverAddress() const { return serverAddr_; }
 
 private:
  enum States { kDisconnected/*断开状态*/, kConnecting/*正在连接中*/, kConnected/*已连接*/ };
  static const int kMaxRetryDelayMs = 30*1000;          // 30秒,最大重连延迟时间
  static const int kInitRetryDelayMs = 500;             // 0.5秒,初始状态,连接不上,0.5秒后重连
 
  void setState(States s) { state_ = s; }
  void startInLoop();
  void stopInLoop();
  void connect();
  void connecting(int sockfd);
  void handleWrite();
  void handleError();
  void retry(int sockfd);
  int removeAndResetChannel();
  void resetChannel();
 
  EventLoop* loop_;         // 所属EventLoop
  InetAddress serverAddr_;  // 服务器端地址
  bool connect_; // atomic是否连接
  States state_;  // FIXME: use atomic variable
  boost::scoped_ptr<Channel> channel_;    // Connector所对应的Channel
  NewConnectionCallback newConnectionCallback_;     // 连接成功回调函数,
  int retryDelayMs_;        // 重连延迟时间(单位:毫秒)
};
 
}
}
 
#endif  // MUDUO_NET_CONNECTOR_H

Connector 源文件

Connector.cc

// Copyright 2010, Shuo Chen.  All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
 
// Author: Shuo Chen (chenshuo at chenshuo dot com)
//
 
#include <muduo/net/Connector.h>
 
#include <muduo/base/Logging.h>
#include <muduo/net/Channel.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/SocketsOps.h>
 
#include <boost/bind.hpp>
 
#include <errno.h>
 
using namespace muduo;
using namespace muduo::net;
 
const int Connector::kMaxRetryDelayMs;
 
Connector::Connector(EventLoop* loop, const InetAddress& serverAddr)
  : loop_(loop),
    serverAddr_(serverAddr),
    connect_(false),
    state_(kDisconnected),
    retryDelayMs_(kInitRetryDelayMs)
{
  LOG_DEBUG << "ctor[" << this << "]";
}
 
Connector::~Connector()
{
  LOG_DEBUG << "dtor[" << this << "]";
  assert(!channel_);
}
 
// 可以跨线程调用 ,把重新发起的连接函数放到IO线程去,所以是线程
// 安全的
void Connector::start()
{
  connect_ = true;
  loop_->runInLoop(boost::bind(&Connector::startInLoop, this)); // FIXME: unsafe
}
 
/*重新发起连接*/
void Connector::startInLoop()
{
  loop_->assertInLoopThread();
  assert(state_ == kDisconnected);
  if (connect_)
  {
    /*如果为true 重新连接,
    如果调用stop 后,那么connect_ = false
    */
    connect();
  }
  else
  {
    LOG_DEBUG << "do not connect";
  }
}
 
/*让IO线程线程停止连接,这个连接处于*/
void Connector::stop()
{
  connect_ = false;
  loop_->runInLoop(boost::bind(&Connector::stopInLoop, this)); // FIXME: unsafe
  // FIXME: cancel timer
}
 
void Connector::stopInLoop()
{
  loop_->assertInLoopThread();
  if (state_ == kConnecting)
  {
    setState(kDisconnected);
    int sockfd = removeAndResetChannel();   // 将通道从poller中移除关注,并将channel置空
    retry(sockfd);      // 这里并非要重连,只是调用sockets::close(sockfd);
  }
}
 
/*发起连接  */
void Connector::connect()
{
  int sockfd = sockets::createNonblockingOrDie();   // 创建非阻塞套接字
  /*进行连接请求*/
  int ret = sockets::connect(sockfd, serverAddr_.getSockAddrInet());
 
  int savedErrno = (ret == 0) ? 0 : errno;
  switch (savedErrno)
  {
    case 0:
    case EINPROGRESS:   // 非阻塞套接字,未连接成功返回码是EINPROGRESS表示正在连接
    case EINTR:
    case EISCONN:           // 连接成功
      connecting(sockfd);
      break;
 
    case EAGAIN:
    case EADDRINUSE:
    case EADDRNOTAVAIL:
    case ECONNREFUSED:
    case ENETUNREACH:
      retry(sockfd);        // 重连
      break;
 
    case EACCES:
    case EPERM:
    case EAFNOSUPPORT:
    case EALREADY:
    case EBADF:
    case EFAULT:
    case ENOTSOCK:
      LOG_SYSERR << "connect error in Connector::startInLoop " << savedErrno;
      sockets::close(sockfd);   // 不能重连,关闭sockfd
      break;
 
    default:
      LOG_SYSERR << "Unexpected error in Connector::startInLoop " << savedErrno;
      sockets::close(sockfd);
      // connectErrorCallback_();
      break;
  }
}
 
// 不能跨线程调用
void Connector::restart()
{
  loop_->assertInLoopThread();
  setState(kDisconnected);
  retryDelayMs_ = kInitRetryDelayMs;
  connect_ = true;
  startInLoop();
}
 
void Connector::connecting(int sockfd)
{
  //不管是真正连接还是连接成功了,都把状态设置为kConnecting
  setState(kConnecting);
  assert(!channel_);
  // Channel与sockfd关联
  channel_.reset(new Channel(loop_, sockfd));
  // 设置可写回调函数,主要是为了“正在连接”状态设置的
  channel_->setWriteCallback(
      boost::bind(&Connector::handleWrite, this)); // FIXME: unsafe
  // 设置错误回调函数
  channel_->setErrorCallback(
      boost::bind(&Connector::handleError, this)); // FIXME: unsafe
 
  // channel_->tie(shared_from_this()); is not working,
  // as channel_ is not managed by shared_ptr
 
  channel_->enableWriting();     // 让Poller关注可写事件,writing一定产生,不管是连接成功还是连接失败,
                                //
}
 
int Connector::removeAndResetChannel()
{
  channel_->disableAll();
  channel_->remove();            // 从poller移除关注
  int sockfd = channel_->fd();
  // Can't reset channel_ here, because we are inside Channel::handleEvent
  // 不能在这里重置channel_,因为正在调用Channel::handleEvent
  loop_->queueInLoop(boost::bind(&Connector::resetChannel, this)); // FIXME: unsafe
  return sockfd;
}
 
void Connector::resetChannel()
{
  channel_.reset();     // channel_ 置空
}
 
 
void Connector::handleWrite()
{
  LOG_TRACE << "Connector::handleWrite " << state_;
  /*由于在Connector::connecting注册可写事件时,“状态” 被设置为kConnecting(不管完不完成连接),
  所以第一次handleWrite时,得把“状态”设置为kConnected*/
  if (state_ == kConnecting)
  {
    /*这里不用再关注可写事件了,这里也是防止busy loop*/
    int sockfd = removeAndResetChannel();   // 从poller中移除关注,并将channel置空
    // socket可写并不意味着连接一定建立成功
    // 还需要用getsockopt(sockfd, SOL_SOCKET, SO_ERROR, ...)再次确认一下。
    int err = sockets::getSocketError(sockfd);
    if (err)        // 有错误
    {
      LOG_WARN << "Connector::handleWrite - SO_ERROR = "
               << err << " " << strerror_tl(err);
      retry(sockfd);        // 重连
    }
    else if (sockets::isSelfConnect(sockfd))        // 自连接
    {
      LOG_WARN << "Connector::handleWrite - Self connect";
      retry(sockfd);        // 重连
    }
    else    // 连接成功
    {
      setState(kConnected);
      if (connect_)
      {
        newConnectionCallback_(sockfd);     // 回调连接成功的回调函数
      }
      else
      {
        sockets::close(sockfd);
      }
    }
  }
  else
  {
    // what happened?
    assert(state_ == kDisconnected);
  }
}
 
/*连接出错的处理函数*/
void Connector::handleError()
{
  LOG_ERROR << "Connector::handleError";
  assert(state_ == kConnecting);
 
  int sockfd = removeAndResetChannel();     // 从poller中移除关注,并将channel置空
  int err = sockets::getSocketError(sockfd);
  LOG_TRACE << "SO_ERROR = " << err << " " << strerror_tl(err);
  retry(sockfd);
}
 
// 采用back-off策略重连,即重连时间逐渐延长,0.5s, 1s, 2s, ...直至30s
void Connector::retry(int sockfd)
{
  sockets::close(sockfd);
  setState(kDisconnected);
  if (connect_)
  {
    LOG_INFO << "Connector::retry - Retry connecting to " << serverAddr_.toIpPort()
             << " in " << retryDelayMs_ << " milliseconds. ";
    // 注册一个定时操作,重连
    loop_->runAfter(retryDelayMs_/1000.0,
                    boost::bind(&Connector::startInLoop, shared_from_this()));
    retryDelayMs_ = std::min(retryDelayMs_ * 2, kMaxRetryDelayMs);
  }
  else
  {
    LOG_DEBUG << "do not connect";
  }
}

测试程序

#include <muduo/net/Channel.h>
#include <muduo/net/TcpClient.h>
 
#include <muduo/base/Logging.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/InetAddress.h>
 
#include <boost/bind.hpp>
 
#include <stdio.h>
 
using namespace muduo;
using namespace muduo::net;
 
class TestClient
{
 public:
  TestClient(EventLoop* loop, const InetAddress& listenAddr)
    : loop_(loop),
      client_(loop, listenAddr, "TestClient"),
      stdinChannel_(loop, 0)
  {
    client_.setConnectionCallback(
        boost::bind(&TestClient::onConnection, this, _1));
    client_.setMessageCallback(
        boost::bind(&TestClient::onMessage, this, _1, _2, _3));
    //client_.enableRetry();
    // 标准输入缓冲区中有数据的时候,回调TestClient::handleRead
    stdinChannel_.setReadCallback(boost::bind(&TestClient::handleRead, this));
    stdinChannel_.enableReading();
  }
 
  void connect()
  {
    client_.connect();
  }
 
 private:
  void onConnection(const TcpConnectionPtr& conn)
  {
    if (conn->connected())
    {
      printf("onConnection(): new connection [%s] from %s\n",
             conn->name().c_str(),
             conn->peerAddress().toIpPort().c_str());
    }
    else
    {
      printf("onConnection(): connection [%s] is down\n",
             conn->name().c_str());
    }
  }
 
  void onMessage(const TcpConnectionPtr& conn, Buffer* buf, Timestamp time)
  {
    string msg(buf->retrieveAllAsString());
    printf("onMessage(): recv a message [%s]\n", msg.c_str());
    LOG_TRACE << conn->name() << " recv " << msg.size() << " bytes at " << time.toFormattedString();
  }
 
  // 标准输入缓冲区中有数据的时候,回调该函数
  void handleRead()
  {
    char buf[1024] = {0};
    fgets(buf, 1024, stdin);
    buf[strlen(buf)-1] = '\0';      // 去除\n
    client_.connection()->send(buf);
  }
 
  EventLoop* loop_;
  TcpClient client_;
  Channel stdinChannel_;        // 标准输入Channel
};
 
int main(int argc, char* argv[])
{
  LOG_INFO << "pid = " << getpid() << ", tid = " << CurrentThread::tid();
  EventLoop loop;
  InetAddress serverAddr("127.0.0.1", 8888);
  TestClient client(&loop, serverAddr);
  client.connect();
  loop.loop();
}

[40] http 简单描述

http request

request line + header + body (header分为普通报头,请求报头与实体报头) headerbody之间有一空行(CRLF)

请求方法有

Get, Post, Head, Put, Delete

协议版本号:1.0, 1.1

常用请求头

  • Accept:浏览器可接受的媒体(MIME)类型;
  • Accept-Language:浏览器所希望的语言种类
  • Accept-Encoding:浏览器能够解码的编码方法,如gzip,deflate等
  • User-Agent:告诉HTTP服务器, 客户端使用的操作系统和浏览器的名称和版本
  • Connection:表示是否需要持久连接,Keep-Alive表示长连接,close表示短连接

http response

status line + header + body (header分为普通报头,响应报头与实体报头) header与body之间有一空行(CRLF)

状态响应码

  • 1XX 提示信息 表示请求已被成功接收,继续处理
  • 2XX 成功 表示请求已被成功接收,理解,接受
  • 3XX 重定向 要完成请求必须进行更进一步的处理
  • 4XX 客户端错误 请求有语法错误或请求无法实现
  • 5XX 服务器端错误 服务器执行一个有效请求失败

一个典型的http请求

GET / HTTP/1.1 (请求行)
Accept: image/jpeg, application/x-ms-application, image/gif, application/xaml+xml, image/pjpeg, application/x-ms-xbap, application/vnd.ms-excel, application/vnd.ms-powerpoint, application/msword, */*(请求头)
Accept-Language: zh-CN(请求头)
User-Agent: Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.1; Trident/4.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; Tablet PC 2.0)(请求头)
Accept-Encoding: gzip, deflate(请求头)
Host: 192.168.159.188:800(请求头)
Connection: Keep-Alive(请求头)

/*当前是get请求,没有body,如果是post请求的话会有实体*/

一个典型的http应答

HTTP/1.1 200 OK(请求行)
Content-Length: 112(头部)
Connection: Keep-Alive(头部)
Content-Type: text/html(头部)
Server: Muduo(头部)

<html><head><title>This is title</title></head><body><h1>Hello</h1>Now is 20130611 02:14:31.518462</body></html>(实体)

muduo_http库涉及到的类

HttpRequest:http请求类封装
HttpResponse:http响应类封装
HttpContext:http协议解析类
HttpServer:http服务器类封装

[40-1] http实现源码

HttpRequest头文件(包含实现)

HttpResquest.h

// Copyright 2010, Shuo Chen.  All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
 
// Author: Shuo Chen (chenshuo at chenshuo dot com)
//
// This is a public header file, it must only include public header files.
 
#ifndef MUDUO_NET_HTTP_HTTPREQUEST_H
#define MUDUO_NET_HTTP_HTTPREQUEST_H
 
#include <muduo/base/copyable.h>
#include <muduo/base/Timestamp.h>
#include <muduo/base/Types.h>
 
#include <map>
#include <assert.h>
#include <stdio.h>
 
namespace muduo
{
namespace net
{
 
class HttpRequest : public muduo::copyable
{
 public:
  enum Method
  {
    kInvalid, kGet, kPost, kHead, kPut, kDelete
  };
  enum Version
  {
    kUnknown, kHttp10, kHttp11
  };
 
  HttpRequest()
    : method_(kInvalid),
      version_(kUnknown)
  {
  }
 
  void setVersion(Version v)
  {
    version_ = v;
  }
 
  Version getVersion() const
  { return version_; }
 
/*设置请求方法*/
  bool setMethod(const char* start, const char* end)
  {
    assert(method_ == kInvalid);
    string m(start, end);
    if (m == "GET")
    {
      method_ = kGet;
    }
    else if (m == "POST")
    {
      method_ = kPost;
    }
    else if (m == "HEAD")
    {
      method_ = kHead;
    }
    else if (m == "PUT")
    {
      method_ = kPut;
    }
    else if (m == "DELETE")
    {
      method_ = kDelete;
    }
    else
    {
      method_ = kInvalid;
    }
    return method_ != kInvalid;
  }
 
/*返回请求方法*/
  Method method() const
  { return method_; }
 
/*请求方法转为字符串*/
  const char* methodString() const
  {
    const char* result = "UNKNOWN";
    switch(method_)
    {
      case kGet:
        result = "GET";
        break;
      case kPost:
        result = "POST";
        break;
      case kHead:
        result = "HEAD";
        break;
      case kPut:
        result = "PUT";
        break;
      case kDelete:
        result = "DELETE";
        break;
      default:
        break;
    }
    return result;
  }
 
/*设置请求路径*/
  void setPath(const char* start, const char* end)
  {
    path_.assign(start, end);
  }
 
/*返回请求路径*/
  const string& path() const
  { return path_; }
 
/*设置接收时间*/
  void setReceiveTime(Timestamp t)
  { receiveTime_ = t; }
 
  Timestamp receiveTime() const
  { return receiveTime_; }
 
/*添加一个头部信息*/
  /*
Accept-Language(start):(colon) zh-CN(end)
 
-->Accept-Language
   zh-CN
*/
  void addHeader(const char* start, const char* colon, const char* end)
  {
    string field(start, colon);     // header域
    ++colon;
    // 去除左空格
    while (colon < end && isspace(*colon))
    {
      ++colon;
    }
    string value(colon, end);       // header值
    // 去除右空格
    while (!value.empty() && isspace(value[value.size()-1]))
    {
      value.resize(value.size()-1);
    }
    headers_[field] = value;
  }
 
/*根据头域,获得头域的信息*/
  string getHeader(const string& field) const
  {
    string result;
    std::map<string, string>::const_iterator it = headers_.find(field);
    if (it != headers_.end())
    {
      result = it->second;
    }
    return result;
  }
 
  const std::map<string, string>& headers() const
  { return headers_; }
 
  void swap(HttpRequest& that)
  {
    /*貌似少了version的交换*/
    std::swap(method_, that.method_);
    path_.swap(that.path_);
    receiveTime_.swap(that.receiveTime_);
    headers_.swap(that.headers_);
  }
 
 private:
  Method method_;       // 请求方法
  Version version_;     // 协议版本1.0/1.1
  string path_;         // 请求路径
  Timestamp receiveTime_;   // 请求时间
  std::map<string, string> headers_;  // header列表
};
 
}
}
 
#endif  // MUDUO_NET_HTTP_HTTPREQUEST_H

HttpResponse头文件

HttpResponse.h

// Copyright 2010, Shuo Chen.  All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
 
// Author: Shuo Chen (chenshuo at chenshuo dot com)
//
// This is a public header file, it must only include public header files.
 
#ifndef MUDUO_NET_HTTP_HTTPRESPONSE_H
#define MUDUO_NET_HTTP_HTTPRESPONSE_H
 
#include <muduo/base/copyable.h>
#include <muduo/base/Types.h>
 
#include <map>
 
namespace muduo
{
namespace net
{
 
class Buffer;
class HttpResponse : public muduo::copyable
{
 public:
  enum HttpStatusCode
  {
    kUnknown,
    k200Ok = 200,       // 成功
    k301MovedPermanently = 301,     // 301重定向,请求的页面永久性移至另一个地址
    k400BadRequest = 400,           // 错误的请求,语法格式有错,服务器无法处理此请求
    k404NotFound = 404,     // 请求的网页不存在
  };
 
  explicit HttpResponse(bool close)
    : statusCode_(kUnknown),
      closeConnection_(close)
  {
  }
 
  void setStatusCode(HttpStatusCode code)
  { statusCode_ = code; }
 
  void setStatusMessage(const string& message)
  { statusMessage_ = message; }
 
  void setCloseConnection(bool on)
  { closeConnection_ = on; }
 
  bool closeConnection() const
  { return closeConnection_; }
 
  // 设置文档媒体类型(MIME)
  void setContentType(const string& contentType)
  { addHeader("Content-Type", contentType); }
 
  // FIXME: replace string with StringPiece
  void addHeader(const string& key, const string& value)
  { headers_[key] = value; }
 
  void setBody(const string& body)
  { body_ = body; }
 
  void appendToBuffer(Buffer* output) const;    // 将HttpResponse添加到Buffer
 
 private:
  std::map<string, string> headers_;  // header列表
  HttpStatusCode statusCode_;           // 状态响应码
  // FIXME: add http version
  string statusMessage_;                // 状态响应码对应的文本信息
  bool closeConnection_;                // 是否关闭连接
  string body_;                         // 实体
};
 
}
}
 
#endif  // MUDUO_NET_HTTP_HTTPRESPONSE_H

HttpResponse源文件

HttpResponse.cc

// Copyright 2010, Shuo Chen.  All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
 
// Author: Shuo Chen (chenshuo at chenshuo dot com)
//
 
#include <muduo/net/http/HttpResponse.h>
#include <muduo/net/Buffer.h>
 
#include <stdio.h>
 
using namespace muduo;
using namespace muduo::net;
 
void HttpResponse::appendToBuffer(Buffer* output) const
{
  char buf[32];
  // 添加响应头
  snprintf(buf, sizeof buf, "HTTP/1.1 %d ", statusCode_);
  output->append(buf);
  output->append(statusMessage_);
  output->append("\r\n");
 
  if (closeConnection_)
  {
    // 如果是短连接,不需要告诉浏览器Content-Length,浏览器也能正确处理
    // 短链接不会出现粘包问题
    output->append("Connection: close\r\n");
  }
  else
  {
    snprintf(buf, sizeof buf, "Content-Length: %zd\r\n", body_.size()); // 实体长度
    output->append(buf);
    output->append("Connection: Keep-Alive\r\n");
  }
 
  // header列表
  for (std::map<string, string>::const_iterator it = headers_.begin();
       it != headers_.end();
       ++it)
  {
    output->append(it->first);
    output->append(": ");
    output->append(it->second);
    output->append("\r\n");
  }
 
  output->append("\r\n");    // header与body之间的空行
  output->append(body_);
}

HttpContext文件

HttpContext.h

// Copyright 2010, Shuo Chen.  All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
 
// Author: Shuo Chen (chenshuo at chenshuo dot com)
//
// This is an internal header file, you should not include this.
 
#ifndef MUDUO_NET_HTTP_HTTPCONTEXT_H
#define MUDUO_NET_HTTP_HTTPCONTEXT_H
 
#include <muduo/base/copyable.h>
 
#include <muduo/net/http/HttpRequest.h>
 
namespace muduo
{
namespace net
{
 
class HttpContext : public muduo::copyable
{
 public:
  enum HttpRequestParseState
  {
    kExpectRequestLine,//正处于请求行的状态
    kExpectHeaders,   //正处于请求头的状态
    kExpectBody,      //正处于请求实体的状态
    kGotAll,          //全部都解析完毕
  };
 
  HttpContext()
    : state_(kExpectRequestLine)
  {
  }
 
  // default copy-ctor, dtor and assignment are fine
 
  bool expectRequestLine() const
  { return state_ == kExpectRequestLine; }
 
  bool expectHeaders() const
  { return state_ == kExpectHeaders; }
 
  bool expectBody() const
  { return state_ == kExpectBody; }
 
  bool gotAll() const
  { return state_ == kGotAll; }
 
  void receiveRequestLine()
  { state_ = kExpectHeaders; }
  /*注意这里没有处理body实体的请求*/
  void receiveHeaders()
  { state_ = kGotAll; }  // FIXME
 
  // 重置HttpContext状态
  void reset()
  {
    state_ = kExpectRequestLine;
    HttpRequest dummy;
    request_.swap(dummy);
  }
 
  const HttpRequest& request() const
  { return request_; }
 
  HttpRequest& request()
  { return request_; }
 
 private:
  HttpRequestParseState state_;     // 请求解析状态
  HttpRequest request_;             // http请求
};
 
}
}
 
#endif  // MUDUO_NET_HTTP_HTTPCONTEXT_H

HttpServer头文件

HttpServer.h

// Copyright 2010, Shuo Chen.  All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
 
// Author: Shuo Chen (chenshuo at chenshuo dot com)
//
// This is a public header file, it must only include public header files.
 
#ifndef MUDUO_NET_HTTP_HTTPSERVER_H
#define MUDUO_NET_HTTP_HTTPSERVER_H
 
#include <muduo/net/TcpServer.h>
#include <boost/noncopyable.hpp>
 
namespace muduo
{
namespace net
{
 
class HttpRequest;
class HttpResponse;
 
/// A simple embeddable HTTP server designed for report status of a program.
/// It is not a fully HTTP 1.1 compliant server, but provides minimum features
/// that can communicate with HttpClient and Web browser.
/// It is synchronous, just like Java Servlet.
class HttpServer : boost::noncopyable
{
 public:
  typedef boost::function<void (const HttpRequest&,
                                HttpResponse*)> HttpCallback;
 
  HttpServer(EventLoop* loop,
             const InetAddress& listenAddr,
             const string& name);
 
  ~HttpServer();  // force out-line dtor, for scoped_ptr members.
 
  /// Not thread safe, callback be registered before calling start().
  void setHttpCallback(const HttpCallback& cb)
  {
    httpCallback_ = cb;
  }
 
  /*subIO thread numbers*/
  void setThreadNum(int numThreads)
  {
    server_.setThreadNum(numThreads);
  }
 
  void start();
 
 private:
  void onConnection(const TcpConnectionPtr& conn);
  /*当以http请求到来时
onMessage--->onRequest--->HttpCallback
  */
  void onMessage(const TcpConnectionPtr& conn,
                 Buffer* buf,
                 Timestamp receiveTime);
  void onRequest(const TcpConnectionPtr&, const HttpRequest&);
 
  TcpServer server_;
  HttpCallback httpCallback_;   // 在处理http请求(即调用onRequest)的过程中回调此函数,对请求进行具体的处理
};
 
}
}
 
#endif  // MUDUO_NET_HTTP_HTTPSERVER_H
````

### HttpServer源文件

HttpServer.cc

``` c++

// Copyright 2010, Shuo Chen.  All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
 
// Author: Shuo Chen (chenshuo at chenshuo dot com)
//
 
#include <muduo/net/http/HttpServer.h>
 
#include <muduo/base/Logging.h>
#include <muduo/net/http/HttpContext.h>
#include <muduo/net/http/HttpRequest.h>
#include <muduo/net/http/HttpResponse.h>
 
#include <boost/bind.hpp>
 
using namespace muduo;
using namespace muduo::net;
/*
GET / HTTP/1.1 (请求行)
Accept: image/jpeg, application/x-ms-application, image/gif, application/xaml+xml, image/pjpeg, application/x-ms-xbap, application/vnd.ms-excel, application/vnd.ms-powerpoint, application/msword(请求头)
Accept-Language: zh-CN(请求头)
User-Agent: Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.1; Trident/4.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; Tablet PC 2.0)(请求头)
Accept-Encoding: gzip, deflate(请求头)
Host: 192.168.159.188:800(请求头)
Connection: Keep-Alive(请求头)
 
*/
namespace muduo
{
namespace net
{
namespace detail
{
 
// FIXME: move to HttpContext class
bool processRequestLine(const char* begin, const char* end, HttpContext* context)
{
  bool succeed = false;
  const char* start = begin;
  const char* space = std::find(start, end, ' ');
  HttpRequest& request = context->request();
  if (space != end && request.setMethod(start, space))      // 解析请求方法
  {
    start = space+1;
    space = std::find(start, end, ' ');
    if (space != end)
    {
      request.setPath(start, space);    // 解析PATH
      start = space+1;
      succeed = end-start == 8 && std::equal(start, end-1, "HTTP/1.");
      if (succeed)
      {
        if (*(end-1) == '1')
        {
          request.setVersion(HttpRequest::kHttp11);     // HTTP/1.1
        }
        else if (*(end-1) == '0')
        {
          request.setVersion(HttpRequest::kHttp10);     // HTTP/1.0
        }
        else
        {
          succeed = false;
        }
      }
    }
  }
  return succeed;
}
 
// FIXME: move to HttpContext class
// return false if any error
// 开始解析请求
bool parseRequest(Buffer* buf, HttpContext* context, Timestamp receiveTime)
{
  bool ok = true;
  bool hasMore = true;
  while (hasMore)
  {
    if (context->expectRequestLine())    // 处于解析请求行状态
    {
      const char* crlf = buf->findCRLF();
      if (crlf)
      {
        ok = processRequestLine(buf->peek(), crlf, context); // 解析请求行 ,*crlf ='\r'
        if (ok)
        {
          context->request().setReceiveTime(receiveTime);        // 设置请求时间
          buf->retrieveUntil(crlf + 2);      // 将请求行从buf中取回,包括\r\n
          context->receiveRequestLine(); // 将HttpContext状态改为kExpectHeaders
        }
        else
        {
          hasMore = false;
        }
      }
      else
      {
        hasMore = false;
      }
    }
    else if (context->expectHeaders())       // 解析header
    {
      const char* crlf = buf->findCRLF();
      if (crlf)
      {
        const char* colon = std::find(buf->peek(), crlf, ':');       //冒号所在位置
        if (colon != crlf)
        {
          context->request().addHeader(buf->peek(), colon, crlf);
        }
        else
        {
          // empty line, end of header
          context->receiveHeaders();     // HttpContext将状态改为kGotAll
          hasMore = !context->gotAll();
        }
        buf->retrieveUntil(crlf + 2);        // 将header从buf中取回,包括\r\n
      }
      else
      {
        hasMore = false;
      }
    }
    else if (context->expectBody())          // 当前还不支持请求中带body
    {
      // FIXME:
    }
  }
  return ok;
}
 
void defaultHttpCallback(const HttpRequest&, HttpResponse* resp)
{
  resp->setStatusCode(HttpResponse::k404NotFound);
  resp->setStatusMessage("Not Found");
  resp->setCloseConnection(true);
}
 
}
}
}
 
HttpServer::HttpServer(EventLoop* loop,
                       const InetAddress& listenAddr,
                       const string& name)
  : server_(loop, listenAddr, name),
    httpCallback_(detail::defaultHttpCallback)
{
  server_.setConnectionCallback(
      boost::bind(&HttpServer::onConnection, this, _1));
  server_.setMessageCallback(
      boost::bind(&HttpServer::onMessage, this, _1, _2, _3));
}
 
HttpServer::~HttpServer()
{
}
 
void HttpServer::start()
{
  LOG_WARN << "HttpServer[" << server_.name()
    << "] starts listenning on " << server_.hostport();
  server_.start();
}
 
void HttpServer::onConnection(const TcpConnectionPtr& conn)
{
  if (conn->connected())
  {
    //boost any
    conn->setContext(HttpContext()); // TcpConnection与一个HttpContext绑定
  }
}
 
void HttpServer::onMessage(const TcpConnectionPtr& conn,
                           Buffer* buf,
                           Timestamp receiveTime)
{
  HttpContext* context = boost::any_cast<HttpContext>(conn->getMutableContext());
 
  if (!detail::parseRequest(buf, context, receiveTime))
  {
    conn->send("HTTP/1.1 400 Bad Request\r\n\r\n");
    conn->shutdown();
  }
 
  // 请求消息解析完毕
  if (context->gotAll())
  {
    onRequest(conn, context->request());
    context->reset();        // 本次请求处理完毕,重置HttpContext,适用于长连接
  }
}
 
void HttpServer::onRequest(const TcpConnectionPtr& conn, const HttpRequest& req)
{
  const string& connection = req.getHeader("Connection");
  //是否关闭连接
  bool close = connection == "close" ||
    (req.getVersion() == HttpRequest::kHttp10 && connection != "Keep-Alive");
  HttpResponse response(close);
  httpCallback_(req, &response);//回调用户函数
  Buffer buf;
  response.appendToBuffer(&buf);
  conn->send(&buf);
  if (response.closeConnection())
  {
    conn->shutdown();
  }
}

测试程序

#include <muduo/net/http/HttpServer.h>
#include <muduo/net/http/HttpRequest.h>
#include <muduo/net/http/HttpResponse.h>
#include <muduo/net/EventLoop.h>
#include <muduo/base/Logging.h>
 
#include <iostream>
#include <map>
 
using namespace muduo;
using namespace muduo::net;
 
extern char favicon[555];
bool benchmark = false;
 
// 实际的请求处理
void onRequest(const HttpRequest& req, HttpResponse* resp)
{
  std::cout << "Headers " << req.methodString() << " " << req.path() << std::endl;
  if (!benchmark)
  {
    const std::map<string, string>& headers = req.headers();
    for (std::map<string, string>::const_iterator it = headers.begin();
         it != headers.end();
         ++it)
    {
      std::cout << it->first << ": " << it->second << std::endl;
    }
  }
 
  if (req.path() == "/")
  {
    resp->setStatusCode(HttpResponse::k200Ok);
    resp->setStatusMessage("OK");
    resp->setContentType("text/html");
    resp->addHeader("Server", "Muduo");
    string now = Timestamp::now().toFormattedString();
    resp->setBody("<html><head><title>This is title</title></head>"
        "<body><h1>Hello</h1>Now is " + now +
        "</body></html>");
  }
  else if (req.path() == "/favicon.ico")
  {
    resp->setStatusCode(HttpResponse::k200Ok);
    resp->setStatusMessage("OK");
    resp->setContentType("image/png");
    resp->setBody(string(favicon, sizeof favicon));
  }
  else if (req.path() == "/hello")
  {
    resp->setStatusCode(HttpResponse::k200Ok);
    resp->setStatusMessage("OK");
    resp->setContentType("text/plain");
    resp->addHeader("Server", "Muduo");
    resp->setBody("hello, world!\n");
  }
  else
  {
    resp->setStatusCode(HttpResponse::k404NotFound);
    resp->setStatusMessage("Not Found");
    resp->setCloseConnection(true);
  }
}
 
int main(int argc, char* argv[])
{
  int numThreads = 0;
  if (argc > 1)
  {
    benchmark = true;
    Logger::setLogLevel(Logger::WARN);
    numThreads = atoi(argv[1]);
  }
  EventLoop loop;
  HttpServer server(&loop, InetAddress(8000), "dummy");
  server.setHttpCallback(onRequest);
  server.setThreadNum(numThreads);
  server.start();
  loop.loop();
}
 
// 这是一个图片数据
char favicon[555] = {
  '\x89', 'P', 'N', 'G', '\xD', '\xA', '\x1A', '\xA',
  '\x0', '\x0', '\x0', '\xD', 'I', 'H', 'D', 'R',
  '\x0', '\x0', '\x0', '\x10', '\x0', '\x0', '\x0', '\x10',
  '\x8', '\x6', '\x0', '\x0', '\x0', '\x1F', '\xF3', '\xFF',
  'a', '\x0', '\x0', '\x0', '\x19', 't', 'E', 'X',
  't', 'S', 'o', 'f', 't', 'w', 'a', 'r',
  'e', '\x0', 'A', 'd', 'o', 'b', 'e', '\x20',
  'I', 'm', 'a', 'g', 'e', 'R', 'e', 'a',
  'd', 'y', 'q', '\xC9', 'e', '\x3C', '\x0', '\x0',
  '\x1', '\xCD', 'I', 'D', 'A', 'T', 'x', '\xDA',
  '\x94', '\x93', '9', 'H', '\x3', 'A', '\x14', '\x86',
  '\xFF', '\x5D', 'b', '\xA7', '\x4', 'R', '\xC4', 'm',
  '\x22', '\x1E', '\xA0', 'F', '\x24', '\x8', '\x16', '\x16',
  'v', '\xA', '6', '\xBA', 'J', '\x9A', '\x80', '\x8',
  'A', '\xB4', 'q', '\x85', 'X', '\x89', 'G', '\xB0',
  'I', '\xA9', 'Q', '\x24', '\xCD', '\xA6', '\x8', '\xA4',
  'H', 'c', '\x91', 'B', '\xB', '\xAF', 'V', '\xC1',
  'F', '\xB4', '\x15', '\xCF', '\x22', 'X', '\x98', '\xB',
  'T', 'H', '\x8A', 'd', '\x93', '\x8D', '\xFB', 'F',
  'g', '\xC9', '\x1A', '\x14', '\x7D', '\xF0', 'f', 'v',
  'f', '\xDF', '\x7C', '\xEF', '\xE7', 'g', 'F', '\xA8',
  '\xD5', 'j', 'H', '\x24', '\x12', '\x2A', '\x0', '\x5',
  '\xBF', 'G', '\xD4', '\xEF', '\xF7', '\x2F', '6', '\xEC',
  '\x12', '\x20', '\x1E', '\x8F', '\xD7', '\xAA', '\xD5', '\xEA',
  '\xAF', 'I', '5', 'F', '\xAA', 'T', '\x5F', '\x9F',
  '\x22', 'A', '\x2A', '\x95', '\xA', '\x83', '\xE5', 'r',
  '9', 'd', '\xB3', 'Y', '\x96', '\x99', 'L', '\x6',
  '\xE9', 't', '\x9A', '\x25', '\x85', '\x2C', '\xCB', 'T',
  '\xA7', '\xC4', 'b', '1', '\xB5', '\x5E', '\x0', '\x3',
  'h', '\x9A', '\xC6', '\x16', '\x82', '\x20', 'X', 'R',
  '\x14', 'E', '6', 'S', '\x94', '\xCB', 'e', 'x',
  '\xBD', '\x5E', '\xAA', 'U', 'T', '\x23', 'L', '\xC0',
  '\xE0', '\xE2', '\xC1', '\x8F', '\x0', '\x9E', '\xBC', '\x9',
  'A', '\x7C', '\x3E', '\x1F', '\x83', 'D', '\x22', '\x11',
  '\xD5', 'T', '\x40', '\x3F', '8', '\x80', 'w', '\xE5',
  '3', '\x7', '\xB8', '\x5C', '\x2E', 'H', '\x92', '\x4',
  '\x87', '\xC3', '\x81', '\x40', '\x20', '\x40', 'g', '\x98',
  '\xE9', '6', '\x1A', '\xA6', 'g', '\x15', '\x4', '\xE3',
  '\xD7', '\xC8', '\xBD', '\x15', '\xE1', 'i', '\xB7', 'C',
  '\xAB', '\xEA', 'x', '\x2F', 'j', 'X', '\x92', '\xBB',
  '\x18', '\x20', '\x9F', '\xCF', '3', '\xC3', '\xB8', '\xE9',
  'N', '\xA7', '\xD3', 'l', 'J', '\x0', 'i', '6',
  '\x7C', '\x8E', '\xE1', '\xFE', 'V', '\x84', '\xE7', '\x3C',
  '\x9F', 'r', '\x2B', '\x3A', 'B', '\x7B', '7', 'f',
  'w', '\xAE', '\x8E', '\xE', '\xF3', '\xBD', 'R', '\xA9',
  'd', '\x2', 'B', '\xAF', '\x85', '2', 'f', 'F',
  '\xBA', '\xC', '\xD9', '\x9F', '\x1D', '\x9A', 'l', '\x22',
  '\xE6', '\xC7', '\x3A', '\x2C', '\x80', '\xEF', '\xC1', '\x15',
  '\x90', '\x7', '\x93', '\xA2', '\x28', '\xA0', 'S', 'j',
  '\xB1', '\xB8', '\xDF', '\x29', '5', 'C', '\xE', '\x3F',
  'X', '\xFC', '\x98', '\xDA', 'y', 'j', 'P', '\x40',
  '\x0', '\x87', '\xAE', '\x1B', '\x17', 'B', '\xB4', '\x3A',
  '\x3F', '\xBE', 'y', '\xC7', '\xA', '\x26', '\xB6', '\xEE',
  '\xD9', '\x9A', '\x60', '\x14', '\x93', '\xDB', '\x8F', '\xD',
  '\xA', '\x2E', '\xE9', '\x23', '\x95', '\x29', 'X', '\x0',
  '\x27', '\xEB', 'n', 'V', 'p', '\xBC', '\xD6', '\xCB',
  '\xD6', 'G', '\xAB', '\x3D', 'l', '\x7D', '\xB8', '\xD2',
  '\xDD', '\xA0', '\x60', '\x83', '\xBA', '\xEF', '\x5F', '\xA4',
  '\xEA', '\xCC', '\x2', 'N', '\xAE', '\x5E', 'p', '\x1A',
  '\xEC', '\xB3', '\x40', '9', '\xAC', '\xFE', '\xF2', '\x91',
  '\x89', 'g', '\x91', '\x85', '\x21', '\xA8', '\x87', '\xB7',
  'X', '\x7E', '\x7E', '\x85', '\xBB', '\xCD', 'N', 'N',
  'b', 't', '\x40', '\xFA', '\x93', '\x89', '\xEC', '\x1E',
  '\xEC', '\x86', '\x2', 'H', '\x26', '\x93', '\xD0', 'u',
  '\x1D', '\x7F', '\x9', '2', '\x95', '\xBF', '\x1F', '\xDB',
  '\xD7', 'c', '\x8A', '\x1A', '\xF7', '\x5C', '\xC1', '\xFF',
  '\x22', 'J', '\xC3', '\x87', '\x0', '\x3', '\x0', 'K',
  '\xBB', '\xF8', '\xD6', '\x2A', 'v', '\x98', 'I', '\x0',
  '\x0', '\x0', '\x0', 'I', 'E', 'N', 'D', '\xAE',
  'B', '\x60', '\x82',
};

[41] muduo_inspect库源码分析

  • muduo_inspect库通过HTTP方式为服务器提供监控接口
  • 接受了多少个TCP连接
  • 当前有多少个活动连接
  • 一共响应了多少次请求
  • 每次请求的平均响应时间多少毫秒

  • Inspector // 包含了一个HttpServer对象

  • ProcessInspector // 通过ProcessInfo返回进程信息

  • ProcessInfo // 获取进程相关信息

ProcessInspectort头文件

ProcessInspector.h

// Copyright 2010, Shuo Chen.  All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
 
// Author: Shuo Chen (chenshuo at chenshuo dot com)
//
// This is an internal header file, you should not include this.
 
#ifndef MUDUO_NET_INSPECT_PROCESSINSPECTOR_H
#define MUDUO_NET_INSPECT_PROCESSINSPECTOR_H
 
#include <muduo/net/inspect/Inspector.h>
#include <boost/noncopyable.hpp>
 
namespace muduo
{
namespace net
{
 
class ProcessInspector : boost::noncopyable
{
 public:
  void registerCommands(Inspector* ins);    // 注册命令接口
 
 private:
  static string pid(HttpRequest::Method, const Inspector::ArgList&);
  static string procStatus(HttpRequest::Method, const Inspector::ArgList&);
  static string openedFiles(HttpRequest::Method, const Inspector::ArgList&);
  static string threads(HttpRequest::Method, const Inspector::ArgList&);
 
};
 
}
}
 
#endif  // MUDUO_NET_INSPECT_PROCESSINSPECTOR_H

ProcessInspectort源文件

ProcessInspector.cc

// Copyright 2010, Shuo Chen.  All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
 
// Author: Shuo Chen (chenshuo at chenshuo dot com)
//
 
#include <muduo/net/inspect/ProcessInspector.h>
#include <muduo/base/ProcessInfo.h>
#include <stdio.h>
 
using namespace muduo;
using namespace muduo::net;
 
/*命令注册*/
void ProcessInspector::registerCommands(Inspector* ins)
{
  /*muduo             :   proc
    command           :   pid
    commandCallback   :   pid
    commnadHelp       :   print id
  */
  ins->add("proc", "pid", ProcessInspector::pid, "print pid");
  ins->add("proc", "status", ProcessInspector::procStatus, "print /proc/self/status");
  ins->add("proc", "opened_files", ProcessInspector::openedFiles, "count /proc/self/fd");
  ins->add("proc", "threads", ProcessInspector::threads, "list /proc/self/task");
}
 
string ProcessInspector::pid(HttpRequest::Method, const Inspector::ArgList&)
{
  char buf[32];
  snprintf(buf, sizeof buf, "%d", ProcessInfo::pid());
  return buf;
}
 
string ProcessInspector::procStatus(HttpRequest::Method, const Inspector::ArgList&)
{
  return ProcessInfo::procStatus();
}
 
string ProcessInspector::openedFiles(HttpRequest::Method, const Inspector::ArgList&)
{
  char buf[32];
  snprintf(buf, sizeof buf, "%d", ProcessInfo::openedFiles());
  return buf;
}
 
string ProcessInspector::threads(HttpRequest::Method, const Inspector::ArgList&)
{
  std::vector<pid_t> threads = ProcessInfo::threads();
  string result;
  for (size_t i = 0; i < threads.size(); ++i)
  {
    char buf[32];
    snprintf(buf, sizeof buf, "%d\n", threads[i]);
    result += buf;
  }
  return result;
}

Inspector头文件

Inspector.h

// Copyright 2010, Shuo Chen.  All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
 
// Author: Shuo Chen (chenshuo at chenshuo dot com)
//
// This is a public header file, it must only include public header files.
 
#ifndef MUDUO_NET_INSPECT_INSPECTOR_H
#define MUDUO_NET_INSPECT_INSPECTOR_H
 
#include <muduo/base/Mutex.h>
#include <muduo/net/http/HttpRequest.h>
#include <muduo/net/http/HttpServer.h>
 
#include <map>
#include <boost/function.hpp>
#include <boost/noncopyable.hpp>
#include <boost/scoped_ptr.hpp>
 
namespace muduo
{
namespace net
{
 
class ProcessInspector;
 
// A internal inspector of the running process, usually a singleton.
class Inspector : boost::noncopyable
{
 public:
  typedef std::vector<string> ArgList;
  typedef boost::function<string (HttpRequest::Method, const ArgList& args)> Callback;
  Inspector(EventLoop* loop,
            const InetAddress& httpAddr,
            const string& name);
  ~Inspector();
 
  // 如add("proc", "pid", ProcessInspector::pid, "print pid");
  // http://192.168.159.188:12345/proc/pid这个http请求就会相应的调用ProcessInspector::pid来处理
  void add(const string& module,
           const string& command,
           const Callback& cb,
           const string& help);
 
 private:
  typedef std::map<string, Callback> CommandList;
  typedef std::map<string, string> HelpList;
 
  void start();
  void onRequest(const HttpRequest& req, HttpResponse* resp);
 
  HttpServer server_;
  boost::scoped_ptr<ProcessInspector> processInspector_;
  MutexLock mutex_;
  std::map<string, CommandList> commands_;
  std::map<string, HelpList> helps_;
  /*
commands_ --- > <muduo commandlist>
helps_    ----> <muduo helplist>
 
commandlist---> <command callback>
HelpList  ----> <command helplist>
 
commands_[][]=xx
helps_[][]=xxx
 
  */
};
 
}
}
 
#endif  // MUDUO_NET_INSPECT_INSPECTOR_H

Inspector源文件

Inspector.cc

// Copyright 2010, Shuo Chen.  All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
 
// Author: Shuo Chen (chenshuo at chenshuo dot com)
//
 
#include <muduo/net/inspect/Inspector.h>
 
#include <muduo/base/Thread.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/http/HttpRequest.h>
#include <muduo/net/http/HttpResponse.h>
#include <muduo/net/inspect/ProcessInspector.h>
 
//#include <iostream>
//#include <iterator>
//#include <sstream>
#include <boost/bind.hpp>
#include <boost/algorithm/string/classification.hpp>
#include <boost/algorithm/string/split.hpp>
 
using namespace muduo;
using namespace muduo::net;
 
namespace
{
Inspector* g_globalInspector = 0;
 
// Looks buggy
std::vector<string> split(const string& str)
{
  std::vector<string> result;
  size_t start = 0;
  size_t pos = str.find('/');
  while (pos != string::npos)
  {
    if (pos > start)
    {
      result.push_back(str.substr(start, pos-start));
    }
    start = pos+1;
    pos = str.find('/', start);
  }
 
  if (start < str.length())      // 说明最后一个字符不是'/'
  {
    result.push_back(str.substr(start));
  }
 
  return result;
}
 
}
 
Inspector::Inspector(EventLoop* loop,
                     const InetAddress& httpAddr,
                     const string& name)
    : server_(loop, httpAddr, "Inspector:"+name),
      processInspector_(new ProcessInspector)
{
  //断言在主线程当中
  assert(CurrentThread::isMainThread());
  assert(g_globalInspector == 0);
  g_globalInspector = this;
  /*设置请求的回调函数,这里的请求是完成了协议的解析之后*/
  server_.setHttpCallback(boost::bind(&Inspector::onRequest, this, _1, _2));
  /*注册命令*/
  processInspector_->registerCommands(this);
  // 这样子做法是为了防止竞态问题
  // 如果直接调用start,(当前线程不是loop所属的IO线程,是主线程)那么有可能,当前构造函数还没返回,
  // HttpServer所在的IO线程可能已经收到了http客户端的请求了(因为这时候HttpServer已启动),那么就会回调
  // Inspector::onRequest,而这时候构造函数还没返回,也就是说对象还没完全构造好。那么就会出现问题了
  loop->runAfter(0, boost::bind(&Inspector::start, this)); // little race condition
}
 
Inspector::~Inspector()
{
  assert(CurrentThread::isMainThread());
  g_globalInspector = NULL;
}
 
void Inspector::add(const string& module,
                    const string& command,
                    const Callback& cb,
                    const string& help)
{
  /*这里要不要加锁??
因为在构造函数时 程序是注册完 processInspector_->registerCommands(this); 才执行
loop->runAfter(0, boost::bind(&Inspector::start, this)); // little race condition。
 
如果程序进行扩充时,就要了。
class TcpInspector : boost::noncopyable
{
 public:
  void registerCommands(Inspector* ins);  // 注册命令接口
 
 private:
  static string pid(HttpRequest::Method, const Inspector::ArgList&);
  static string procStatus(HttpRequest::Method, const Inspector::ArgList&);
  static string openedFiles(HttpRequest::Method, const Inspector::ArgList&);
  static string threads(HttpRequest::Method, const Inspector::ArgList&);
 
};
 
MyInspector :public ProcessInspector
{
   TcpInspector
} ;
那么
MyInspector{
 
  HttpServer server_;
  boost::scoped_ptr<ProcessInspector> processInspector;
  boost::scoped_ptr<ProcessInspector> TcpInspector;
  MutexLock mutex_;
  std::map<string, CommandList> commands_;
  std::map<string, HelpList> helps_;
}
那么MyInspector 在初始化时,会初始化ProcessInspector 和TcpInspector,
如果ProcessInspector初始化完毕后,启动了OnreRequest(),而TcpInspector还没注册完,
也就是说TcpInspector还在add()函数中,那么不加锁的话,就会出现问题了。
  */
  MutexLockGuard lock(mutex_);
  commands_[module][command] = cb;
  helps_[module][command] = help;
}
 
void Inspector::start()
{
  server_.start();
}
 
void Inspector::onRequest(const HttpRequest& req, HttpResponse* resp)
{
  if (req.path() == "/")
  {
    string result;
    MutexLockGuard lock(mutex_);
    // 遍历helps
    for (std::map<string, HelpList>::const_iterator helpListI = helps_.begin();
         helpListI != helps_.end();
         ++helpListI)
    {
      const HelpList& list = helpListI->second;
      for (HelpList::const_iterator it = list.begin();
           it != list.end();
           ++it)
      {
        result += "/";
        result += helpListI->first;      // module
        result += "/";
        result += it->first;         // command
        result += "\t";
        result += it->second;            // help
        result += "\n";
      }
    }
    resp->setStatusCode(HttpResponse::k200Ok);
    resp->setStatusMessage("OK");
    resp->setContentType("text/plain");
    resp->setBody(result);
  }
  else
  {
    // 以"/"进行分割,将得到的字符串保存在result中
    std::vector<string> result = split(req.path());
    // boost::split(result, req.path(), boost::is_any_of("/"));
    //std::copy(result.begin(), result.end(), std::ostream_iterator<string>(std::cout, ", "));
    //std::cout << "\n";
    bool ok = false;
    if (result.size() == 0)
    {
      // 这种情况是错误的,因此ok仍为false
    }
    else if (result.size() == 1)
    {
      // 只有module,没有command也是错的,因此ok仍为false
      string module = result[0];
    }
    else
    {
      string module = result[0];
      // 查找module所对应的命令列表
      std::map<string, CommandList>::const_iterator commListI = commands_.find(module);
      if (commListI != commands_.end())
      {
        string command = result[1];
        const CommandList& commList = commListI->second;
        // 查找command对应的命令
        CommandList::const_iterator it = commList.find(command);
        if (it != commList.end())
        {
          ArgList args(result.begin()+2, result.end());     // 传递给回调函数的参数表
          if (it->second)
          {
            resp->setStatusCode(HttpResponse::k200Ok);
            resp->setStatusMessage("OK");
            resp->setContentType("text/plain");
            const Callback& cb = it->second;
            resp->setBody(cb(req.method(), args));       // 调用cb将返回的字符串传给setBody
            ok = true;
          }
        }
      }
 
    }
 
    if (!ok)
    {
      resp->setStatusCode(HttpResponse::k404NotFound);
      resp->setStatusMessage("Not Found");
    }
    //resp->setCloseConnection(true);
  }
}

测试程序

#include <muduo/net/inspect/Inspector.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/EventLoopThread.h>
 
using namespace muduo;
using namespace muduo::net;
 
int main()
{
  EventLoop loop;
  EventLoopThread t;    // 监控线程 ,这里“线程该没有真正创建,t.startLoop()函数才是真正创建”
  /*Inspector 的loop 和main thread 的loop是不一样的,也就是说,现在已经有两个loop了*/
  Inspector ins(t.startLoop(), InetAddress(12345), "test");
  loop.loop();
}

在浏览器中输入127.0.0.1:12345/proc/status

Name:   inspector_test
State:  S (sleeping)
Tgid:   2030
Pid:    2030
PPid:   1907
TracerPid:  0
Uid:    1000    1000    1000    1000
Gid:    1000    1000    1000    1000
FDSize: 256
Groups: 4 20 24 46 116 118 124 1000
VmPeak:    12348 kB
VmSize:    12348 kB
VmLck:         0 kB
VmHWM:      1580 kB
VmRSS:      1580 kB
VmData:     8408 kB
VmStk:       136 kB
VmExe:       860 kB
VmLib:      2868 kB
VmPTE:        32 kB
VmSwap:        0 kB
Threads:    2
SigQ:   0/9772
SigPnd: 0000000000000000
ShdPnd: 0000000000000000
SigBlk: 0000000000000000
SigIgn: 0000000000001000
SigCgt: 0000000180000000
CapInh: 0000000000000000
CapPrm: 0000000000000000
CapEff: 0000000000000000
CapBnd: ffffffffffffffff
Cpus_allowed:   3
Cpus_allowed_list:  0-1
Mems_allowed:   1
Mems_allowed_list:  0
voluntary_ctxt_switches:    22
nonvoluntary_ctxt_switches: 14