本文共 4618 字,大约阅读时间需要 15 分钟。
ceph的网络通信的源码在src/msg下面目前实现由三个框架,这点从Messenger.cc 这个类中可以看出Messenger *Messenger::create(CephContext *cct, const string &type, entity_name_t name, string lname, uint64_t nonce, uint64_t cflags){ if (r == 0 || type == "simple") return new SimpleMessenger(cct, name, std::move(lname), nonce); else if (r == 1 || type.find("async") != std::string::npos) return new AsyncMessenger(cct, name, type, std::move(lname), nonce);#ifdef HAVE_XIO else if ((type == "xio") && cct->check_experimental_feature_enabled("ms-type-xio")) return new XioMessenger(cct, name, std::move(lname), nonce, cflags);#endif lderr(cct) << "unrecognized ms_type '" << type << "'" << dendl; return nullptr;}可以看到可以在create的时候根据type选择网络框架是simple/async/xio.其中选择xio的话需要的话需要定义HAVE_XIO 这个宏,并且xio支持tcp/ip。infiniband 这两种协议,而前两种支持tcp/ip 协议我们看看目前用到最多的simple框架SimpleMessenger 是simple框架具体的实现。从这个构造函数中可以看到SimpleMessenger 会在构造函数中分别建立结束accepter和发送dispatch_queue 两个线程SimpleMessenger::SimpleMessenger(CephContext *cct, entity_name_t name, string mname, uint64_t _nonce) : SimplePolicyMessenger(cct, name,mname, _nonce), accepter(this, _nonce), dispatch_queue(cct, this, mname), reaper_thread(this), nonce(_nonce), lock("SimpleMessenger::lock"), need_addr(true), did_bind(false), global_seq(0), cluster_protocol(0), reaper_started(false), reaper_stop(false), timeout(0), local_connection(new PipeConnection(cct, this)){ ANNOTATE_BENIGN_RACE_SIZED(&timeout, sizeof(timeout), "SimpleMessenger read timeout"); init_local_connection();}这里以accepter为例看看是如何建立接受线程的class Accepter : public Thread { SimpleMessenger *msgr; bool done; int listen_sd; uint64_t nonce; int shutdown_rd_fd; int shutdown_wr_fd; int create_selfpipe(int *pipe_rd, int *pipe_wr);public: Accepter(SimpleMessenger *r, uint64_t n) : msgr(r), done(false), listen_sd(-1), nonce(n), shutdown_rd_fd(-1), shutdown_wr_fd(-1) {} void *entry() override; void stop(); int bind(const entity_addr_t &bind_addr, const set & avoid_ports); int rebind(const set & avoid_port); int start();};可以看到accepter 是thread的子类所以我们先看看其start函数int Accepter::start(){ ldout(msgr->cct,1) << __func__ << dendl; // start thread create("ms_accepter"); return 0;}在start中通过create来新建一个接收线程,其name是ms_accepter其次我们在看看这个线程只要执行的工作,其在entry中实现void *Accepter::entry(){ ldout(msgr->cct,1) << __func__ << " start" << dendl; int errors = 0; int ch; struct pollfd pfd[2]; memset(pfd, 0, sizeof(pfd)); pfd[0].fd = listen_sd; pfd[0].events = POLLIN | POLLERR | POLLNVAL | POLLHUP; pfd[1].fd = shutdown_rd_fd; pfd[1].events = POLLIN | POLLERR | POLLNVAL | POLLHUP; #开始polling while (!done) { ldout(msgr->cct,20) << __func__ << " calling poll for sd:" << listen_sd << dendl; int r = poll(pfd, 2, -1); if (r < 0) { if (errno == EINTR) { continue; } ldout(msgr->cct,1) << __func__ << " poll got error" << " errno " << errno << " " << cpp_strerror(errno) << dendl; break; } #检查是否polling返回error if (pfd[0].revents & (POLLERR | POLLNVAL | POLLHUP)) { ldout(msgr->cct,1) << __func__ << " poll got errors in revents " << pfd[0].revents << dendl; break; } if (pfd[1].revents & (POLLIN | POLLERR | POLLNVAL | POLLHUP)) { // We got "signaled" to exit the poll // clean the selfpipe #检查是否要退出polling if (::read(shutdown_rd_fd, &ch, 1) == -1) { if (errno != EAGAIN) ldout(msgr->cct,1) << __func__ << " Cannot read selfpipe: " << " errno " << errno << " " << cpp_strerror(errno) << dendl; } break; } if (done) break; // accept #走到这里polling函数就正常返回了,通过accept函数开始接收 sockaddr_storage ss; socklen_t slen = sizeof(ss); int sd = ::accept(listen_sd, (sockaddr*)&ss, &slen); if (sd >= 0) { int r = set_close_on_exec(sd); if (r) { ldout(msgr->cct,1) << __func__ << " set_close_on_exec() failed " << cpp_strerror(r) << dendl; } errors = 0; ldout(msgr->cct,10) << __func__ << " incoming on sd " << sd << dendl; #实际从pipe中读入msg msgr->add_accept_pipe(sd); } else { ldout(msgr->cct,0) << __func__ << " no incoming connection? sd = " << sd << " errno " << errno << " " << cpp_strerror(errno) << dendl; if (++errors > 4) break; } } ldout(msgr->cct,20) << __func__ << " closing" << dendl; // socket is closed right after the thread has joined. // closing it here might race if (shutdown_rd_fd >= 0) { ::close(shutdown_rd_fd); shutdown_rd_fd = -1; } ldout(msgr->cct,10) << __func__ << " stopping" << dendl; return 0;}
转载地址:http://sdnmi.baihongyu.com/