博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
ceph网络通信架构
阅读量:4216 次
发布时间:2019-05-26

本文共 4618 字,大约阅读时间需要 15 分钟。

ceph的网络通信的源码在src/msg下面目前实现由三个框架,这点从Messenger.cc 这个类中可以看出Messenger *Messenger::create(CephContext *cct, const string &type,			     entity_name_t name, string lname,			     uint64_t nonce, uint64_t cflags){  if (r == 0 || type == "simple")    return new SimpleMessenger(cct, name, std::move(lname), nonce);  else if (r == 1 || type.find("async") != std::string::npos)    return new AsyncMessenger(cct, name, type, std::move(lname), nonce);#ifdef HAVE_XIO  else if ((type == "xio") &&	   cct->check_experimental_feature_enabled("ms-type-xio"))    return new XioMessenger(cct, name, std::move(lname), nonce, cflags);#endif  lderr(cct) << "unrecognized ms_type '" << type << "'" << dendl;  return nullptr;}可以看到可以在create的时候根据type选择网络框架是simple/async/xio.其中选择xio的话需要的话需要定义HAVE_XIO 这个宏,并且xio支持tcp/ip。infiniband 这两种协议,而前两种支持tcp/ip 协议我们看看目前用到最多的simple框架SimpleMessenger 是simple框架具体的实现。从这个构造函数中可以看到SimpleMessenger 会在构造函数中分别建立结束accepter和发送dispatch_queue 两个线程SimpleMessenger::SimpleMessenger(CephContext *cct, entity_name_t name,				 string mname, uint64_t _nonce)  : SimplePolicyMessenger(cct, name,mname, _nonce),    accepter(this, _nonce),    dispatch_queue(cct, this, mname),    reaper_thread(this),    nonce(_nonce),    lock("SimpleMessenger::lock"), need_addr(true), did_bind(false),    global_seq(0),    cluster_protocol(0),    reaper_started(false), reaper_stop(false),    timeout(0),    local_connection(new PipeConnection(cct, this)){  ANNOTATE_BENIGN_RACE_SIZED(&timeout, sizeof(timeout),                             "SimpleMessenger read timeout");  init_local_connection();}这里以accepter为例看看是如何建立接受线程的class Accepter : public Thread {  SimpleMessenger *msgr;  bool done;  int listen_sd;  uint64_t nonce;  int shutdown_rd_fd;  int shutdown_wr_fd;  int create_selfpipe(int *pipe_rd, int *pipe_wr);public:  Accepter(SimpleMessenger *r, uint64_t n)     : msgr(r), done(false), listen_sd(-1), nonce(n),      shutdown_rd_fd(-1), shutdown_wr_fd(-1)    {}      void *entry() override;  void stop();  int bind(const entity_addr_t &bind_addr, const set
& avoid_ports); int rebind(const set
& avoid_port); int start();};可以看到accepter 是thread的子类所以我们先看看其start函数int Accepter::start(){ ldout(msgr->cct,1) << __func__ << dendl; // start thread create("ms_accepter"); return 0;}在start中通过create来新建一个接收线程,其name是ms_accepter其次我们在看看这个线程只要执行的工作,其在entry中实现void *Accepter::entry(){ ldout(msgr->cct,1) << __func__ << " start" << dendl; int errors = 0; int ch; struct pollfd pfd[2]; memset(pfd, 0, sizeof(pfd)); pfd[0].fd = listen_sd; pfd[0].events = POLLIN | POLLERR | POLLNVAL | POLLHUP; pfd[1].fd = shutdown_rd_fd; pfd[1].events = POLLIN | POLLERR | POLLNVAL | POLLHUP; #开始polling while (!done) { ldout(msgr->cct,20) << __func__ << " calling poll for sd:" << listen_sd << dendl; int r = poll(pfd, 2, -1); if (r < 0) { if (errno == EINTR) { continue; } ldout(msgr->cct,1) << __func__ << " poll got error" << " errno " << errno << " " << cpp_strerror(errno) << dendl; break; } #检查是否polling返回error if (pfd[0].revents & (POLLERR | POLLNVAL | POLLHUP)) { ldout(msgr->cct,1) << __func__ << " poll got errors in revents " << pfd[0].revents << dendl; break; } if (pfd[1].revents & (POLLIN | POLLERR | POLLNVAL | POLLHUP)) { // We got "signaled" to exit the poll // clean the selfpipe #检查是否要退出polling if (::read(shutdown_rd_fd, &ch, 1) == -1) { if (errno != EAGAIN) ldout(msgr->cct,1) << __func__ << " Cannot read selfpipe: " << " errno " << errno << " " << cpp_strerror(errno) << dendl; } break; } if (done) break; // accept #走到这里polling函数就正常返回了,通过accept函数开始接收 sockaddr_storage ss; socklen_t slen = sizeof(ss); int sd = ::accept(listen_sd, (sockaddr*)&ss, &slen); if (sd >= 0) { int r = set_close_on_exec(sd); if (r) { ldout(msgr->cct,1) << __func__ << " set_close_on_exec() failed " << cpp_strerror(r) << dendl; } errors = 0; ldout(msgr->cct,10) << __func__ << " incoming on sd " << sd << dendl; #实际从pipe中读入msg msgr->add_accept_pipe(sd); } else { ldout(msgr->cct,0) << __func__ << " no incoming connection? sd = " << sd << " errno " << errno << " " << cpp_strerror(errno) << dendl; if (++errors > 4) break; } } ldout(msgr->cct,20) << __func__ << " closing" << dendl; // socket is closed right after the thread has joined. // closing it here might race if (shutdown_rd_fd >= 0) { ::close(shutdown_rd_fd); shutdown_rd_fd = -1; } ldout(msgr->cct,10) << __func__ << " stopping" << dendl; return 0;}

转载地址:http://sdnmi.baihongyu.com/

你可能感兴趣的文章
poj 3863Business Center
查看>>
Android编译系统简要介绍和学习计划
查看>>
Android编译系统环境初始化过程分析
查看>>
user2eng 笔记
查看>>
DRM in Android
查看>>
ARC MRC 变换
查看>>
Swift cell的自适应高度
查看>>
【linux】.fuse_hiddenXXXX 文件是如何生成的?
查看>>
【LKM】整合多个LKM为1个
查看>>
【Windows C++】调用powershell上传指定目录下所有文件
查看>>
Java图形界面中单选按钮JRadioButton和按钮Button事件处理
查看>>
小练习 - 排序:冒泡、选择、快排
查看>>
SparkStreaming 如何保证消费Kafka的数据不丢失不重复
查看>>
Spark Shuffle及其调优
查看>>
数据仓库分层
查看>>
常见数据结构-TrieTree/线段树/TreeSet
查看>>
Hive数据倾斜
查看>>
TopK问题
查看>>
Hive调优
查看>>
HQL排查数据倾斜
查看>>