forest-net
an overlay networks for large-scale virtual worlds
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator
RouterInProc.cpp
Go to the documentation of this file.
1 
9 #include "RouterInProc.h"
10 
11 using namespace forest;
12 
13 namespace forest {
14 
15 RouterInProc::RouterInProc(Router *rtr1) : rtr(rtr1) {
16  nRdy = 0; maxSockNum = -1;
17  sockets = new fd_set;
18 
19  // setup thread pool
20  tpool = new ThreadInfo[numThreads+1];
21  for (int i = 1; i <= numThreads; i++) {
22  tpool[i].q.resize(100);
23  tpool[i].rc = RouterControl(rtr,i,tpool[i].q,retQ);
24  tpool[i].thred = thread(RouterControl::start,&tpool[i].rc);
25  }
26 
27  comtSet = new HashSet<comt_t>(Hash::hash_u32,numThreads,false);
28  rptr = new Repeater(numThreads);
29  repH = new RepeatHandler(maxReplies);
30 }
31 
32 RouterInProc::~RouterInProc() {
33  delete sockets; delete [] tpool;
34  delete comtSet; delete rptr; delete repH;
35 }
36 
40 bool RouterInProc::start(RouterInProc *self) { self->run(); }
41 
45  nanoseconds temp = high_resolution_clock::now() - rtr->tZero;
46  now = temp.count();
47 
48  if (rtr->booting) {
49  if (!bootStart()) {
50  Util::fatal("RouterInProc::run: could not initiate "
51  "remote boot");
52  }
53  }
54 
55  uint64_t statsTime = 0; // time statistics were last processed
56  bool didNothing;
57  high_resolution_clock::time_point
58  finishTime = now + nanoseconds(rtr->runLength);
59  while (finishTime.count == 0 || now < finishTime) {
60  temp = high_resolution_clock::now() - rtr->tZero;
61  now = temp.count();
62  didNothing = !inBound();
63  didNothing &= !outBound();
64 
65  if (didNothing) {
66  pair<int,int> pp = rptr->overdue(now);
67  if (pp.first > 0) {
68  pktx cx = ps.clone(pp.first);
69  if (cx != 0) forward(cx);
70  didNothing = false;
71  } else if (pp.first < 0) {
72  // no more retries, return to thread
73  // with a NO_REPLY mode
74  int px = -pp.first;
75  Packet& p = ps->getPacket(px);
76  CtlPkt cp(p);
77  cp.mode = CtlPkt::NO_REPLY;
78  cp.fmtBase();
80  p.pack();
81  tpool[pp.second].q.enq(px);
82  didNothing = false;
83  }
84  }
85 
86  if (didNothing) {
87  // check for old entries in RepeatHandler
88  int px = repH.expired(now);
89  if (px != 0) {
90  ps->free(px); didNothing = false;
91  }
92  }
93 
94  // every 300 ms, update statistics and check for un-acked
95  // control packets
96  if (now - statsTime > 300*1000000) {
97  rtr->sm->record(now); statsTime = now;
98  didNothing = false;
99  }
100 
101  // if did nothing on that pass, sleep for a millisecond.
102  if (didNothing) {
103  this_thread::sleep_for(milliseconds(1));
104  }
105  }
106 }
107 
108 bool RouterInProc::bootStart() {
109  // open datagram socket
111  if (bootSock < 0) {
112  cerr << "RouterInProc::bootStart: socket call failed\n";
113  return false;
114  }
115  // bind it to the bootIp
116  if (!Np4d::bind4d(bootSock, bootIp, 0)) {
117  cerr << "RouterInProc::bootStart: bind call failed, "
118  << "check boot IP address\n";
119  return false;
120  }
121 
122  // send boot request to net manager
123  int px = ps->alloc();
124  if (px == 0) {
125  Util::fatal("RouterInProc::bootStart: no packets left");
126  }
127  Packet& p = ps->getPacket(px);
128  CtlPkt cp(CtlPkt::BOOT_ROUTER, CtlPkt::REQUEST, rtr->seqNum++);
129  cp.payoad = p.payload; cp.pack();
130 
131  for (int i = 0; i <= 3; i++) {
132  bootSend(px);
133  for (int j = 0; j < 9; j++) {
134  this_thread::sleep_for(milliseconds(100));
135  int rx = bootReceive();
136  if (rx == 0) continue;
137  Packet& reply = ps->getPacket(rx);
138  CtlPkt cpr(reply);
139  if (cpr.type == CtlPkt::BOOT_REQUEST &&
140  cpr.mode == CtlPkt::POS_REPLY)
141  return true;
142  // discard other packets until boot reply comes in
143  ps->free(rx);
144  }
145  }
146 
147  return false;
148 }
149 
153  int nbytes; // number of bytes in received packet
154 
155  pktx px = ps->alloc();
156  if (px == 0) return 0;
157  Packet& p = ps->getPacket(px);
158 
159  ipa_t sIpAdr; ipp_t sPort;
160  nbytes = Np4d::recvfrom4d(bootSock, (void *) p.buffer, 1500,
161  sIpAdr, sPort);
162  p.bufferLen = nbytes;
163  if (nbytes < 0) {
164  if (errno == EAGAIN) {
165  ps->free(px); return 0;
166  }
167  Util::fatal("RouterInProc::bootReceive:receive: error in "
168  "recvfrom call");
169  }
170  if (sIpAdr != nmIp || sPort != Forest::NM_PORT) {
171  ps->free(px); return 0;
172  }
173  p.unpack();
174  if (!p.hdrErrCheck() ||
175  p.srcAdr != rtr->nmAdr || p.type != Forest::NET_SIG) {
176  ps->free(px); return 0;
177  }
178  p.tunIp = sIpAdr; p.tunPort = sPort; p.inLink = 0;
179  return px;
180 }
181 
185 void RouterInProc::bootSend(pktx px) {
186  Packet p = ps->getPacket(px);
187  p.srcAdr = rtr->myAdr;
188  p.dstAdr = rtr->nmAdr;
189  p.comtree = 0;
190  p.pack();
191  int rv, lim = 0;
192  do {
193  rv = Np4d::sendto4d(bootSock,(void *) p.buffer, p.length,
195  } while (rv == -1 && errno == EAGAIN && lim++ < 10);
196  if (rv == -1) {
197  Util::fatal("RouterInProc:: send: failure in sendto");
198  }
199  ps->free(px);
200  return;
201 }
202 
207  pktx px;
208  if (rtr->booting) {
209  px = bootReceive();
210  if (px == 0) return false;
211  Packet& p = ps->getPacket(px);
212  CtlPkt cp(p);
213  if (cp.type == CtlPkt::BOOT_COMPLETE &&
214  cp.mode == CtlPkt::REQUEST) {
215  cp.mode = CtlPkt::POS_REPLY;
216  cp.fmtBase();
217  rtr->booting = false;
218  bootSend(px); return true;
219  } else if (cp.type == CtlPkt::BOOT_ABORT &&
220  cp.mode == CtlPkt::REQUEST) {
221  cp.mode = CtlPkt::POS_REPLY; cp.fmtBase();
222  bootSend(px);
223  Util::fatal("RouterInProc::inBound: remote boot "
224  "aborted by NetMgr");
225  }
226  } else {
227  px = receive();
228  if (px == 0) return false;
229  }
230  Packet& p = ps->getPacket(px);
231  p.outLink = 0;
232  int ptype = p.type;
233  p.rcvSeqNum = ++rcvSeqNum;
234  pktLog->log(px,p.inLink,false,now);
235  // lock ctt
236  int ctx = ctt->getComtIndex(p.comtree);
237  if (!pktCheck(px,ctx)) {
238  // unlock ctt
239  ps->free(px);
240  } else if (p.dstAdr != myAdr &&
241  (p.typ == Forest::CLIENT_DATA ||
242  p.typ == Forest::NET_SIG)) {
243  // unlock ctt
244  forward(px);
245  } else if (p.dstAdr == myAdr &&
246  (p.typ == Forest::NET_SIG || p.typ == Forest::CLIENT_SIG)) {
247  CtlPkt cp(p);
248  if (cp.mode != CtlPkt::REQUEST) {
249  // reply to a request sent earlier
250  pair<int,int> pp = rptr.deleteMatch(p.srcAdr,cp.seqNum);
251  if (pp.first == 0) { // no matching request
252  ps->free(px); return true;
253  }
254  ps->free(pp->first); // free saved copy of request
255  // pass reply to responsible thread; remember rcvSeqNum
256  tpool[pp.second].rcvSeqNum = p.rcvSeqNum;
257  tpool[pp.second].q.enq(px);
258  return true;
259  }
260  if ((pktx sx = replyH->find(p.srcAdr,cp.seqNum)) != 0) {
261  // repeat of a request we've already received
262  ps->free(px);
263  Packet& saved = ps->getPacket(sx);
264  CtlPkt scp(saved);
265  if (scp.mode != CtlPkt::REQUEST) {
266  // already replied to this request, reply again
267  pktx cx = ps->clone(sx);
268  forward(cx);
269  }
270  return true;
271  }
272  // new request packet
273  if (Forest::isSigComt(p.comtree)) {
274  // so not a comtree control packet
275  // assign worker thread
276  thx = freeThreads.first();
277  if (thx == 0) {
278  // no threads available to handle it
279  // return negative reply
280  cp.fmtError("to busy to handle request, "
281  "retry later");
282  p.dstAdr = p.srcAdr;
283  p.srcAdr = rtr->myAdr;
284  p.pack();
285  forward(px);
286  return true;
287  }
288  freeThreads.removeFirst();
289  tpool[thx].rcvSeqNum = p.rcvSeqNum;
290  // save a copy, so we can detect repeats
291  pktx cx = ps->clone(px);
292  pktx ox = repH.saveReq(cx,p.srcAdr,cp.seqNum,now);
293  if (ox != 0) { // old entry was removed to make room
294  ps->free(ox);
295  }
296  // and send original to the thread
297  tpool[thx].q.enq(px);
298  return true;
299  } else {
300  // request for changing comtree
301  index thx = comtSet->find(p.comtree);
302  if (thx == 0) {
303  // no thread assigned to this comtree yet
304  // assign a worker thread
305  thx = freeThreads.first();
306  if (thx == 0) {
307  cp.fmtError("to busy to handle request,"
308  " retry later");
309  p.dstAdr = p.srcAdr;
310  p.srcAdr = rtr->myAdr;
311  p.pack();
312  forward(px);
313  return true;
314  }
315  freeThreads.removeFirst();
316  comtSet->insert(p.comtree,thx);
317  }
318  tpool[thx].rcvSeqNum = p.rcvSeqNum;
319  tpool[thx].q.enq(px);
320  return true;
321  }
322  } else {
323  cerr << "RouterInProc::inBound: unrecognized packet "
324  + p.toString() << endl;
325  rtr->ps->free(px);
326  }
327  return true;
328 }
329 
334  if (retQ.empty()) return false;
335  pair<int,int> retp = retQ.deq();
336 
337  int thx = retp.first(); // index of sending thread
338  pktx px = retp.second(); // packet index of outgoing packet
339 
340  Packet& p = ps->getPacket(px);
341  if (thx < 0) {
342  // means thread is ready to be released and packet is
343  // a dummy used to pass rcvSeqNum back to ensure clean
344  // deallocation of thread
345  //
346  // needed to handle comtree signalling
347  thx = -thx; // restore proper thread index
348  if (tpool[thx].rcvSeqNum == p.rcvSeqNum) {
349  if (comtSet->valid(thx))
350  comtSet->remove(comtSet->retrieve(thx));
351  freeThreads->addFirst(thx);
352  }
353  ps->free(px); return true;
354  }
355  if (p.type != Forest::CLIENT_SIG || p.type != Forest::NET_SIG) {
356  forward(px); return true;
357  }
358  CtlPkt cp(p);
359  if (cp.mode == CtlPkt::REQUEST) {
360  // assign sequence number to packet
361  cp.seqNum = rtr->nextSeqNum();
362  cp.updateSeqNum();
363  // make and send copy
364  pktx cx = ps->clone(px);
365  forward(cx);
366  // save original request packet
367  rptr.saveReq(px, cp.seqNum, now, thx);
368  return true;
369  }
370  // it's a reply, send copy, save original in repeat handler and
371  // recycle corresponding request that was stored in repeat handler
372  pktx cx = ps->clone(px);
373  forward(cx);
374  int sx = repH.saveRep(px, px.dstAdr, cp.seqNum);
375  if (sx != 0) ps->free(sx);
376  return true;
377 }
378 
379 // Return next waiting packet or 0 if there is none.
380 pktx RouterInProc::receive() {
381  if (nRdy == 0) { // if no ready interface check for new arrivals
382  FD_ZERO(sockets);
383  for (int i = ift->firstIface(); i != 0; i = ift->nextIface(i)) {
384  FD_SET(rtr->sock[i],sockets);
385  }
386  struct timeval zero; zero.tv_sec = zero.tv_usec = 0;
387  int cnt = 0;
388  do {
389  nRdy = select(maxSockNum+1,sockets,
390  (fd_set *)NULL, (fd_set *)NULL, &zero);
391  } while (nRdy < 0 && cnt++ < 10);
392  if (cnt > 5) {
393  cerr << "RouterInProc::receive: select failed "
394  << cnt-1 << " times\n";
395  }
396  if (nRdy < 0) {
397  fatal("RouterInProc::receive: select failed");
398  }
399  if (nRdy == 0) return 0;
400  cIf = 0;
401  }
402  while (1) { // find next ready interface
403  cIf++;
404  if (cIf > Forest::MAXINTF) return 0; // should never reach here
405  if (ift->valid(cIf) && FD_ISSET(rtr->sock[cIf],sockets)) {
406  nRdy--; break;
407  }
408  }
409  // Now, read the packet from the interface
410  int nbytes; // number of bytes in received packet
411  int lnk; // # of link on which packet received
412 
413  pktx px;
414  // if packetCache not empty, allocate a pktx from there,
415  // else allocate one from ps
416  if (px == 0) {
417  cerr << "RouterInProc:receive: out of packets\n";
418  return 0;
419  }
420  Packet& p = ps->getPacket(px);
421  buffer_t& b = *p.buffer;
422 
423  ipa_t sIpAdr; ipp_t sPort;
424  int nbytes = Np4d::recvfrom4d(rtr->sock[cIf], (void *) &b[0], 1500,
425  sIpAdr, sPort);
426  if (nbytes < 0) fatal("RouterInProc::receive: error in recvfrom call");
427 
428  p.unpack();
429 
430  if (!p.hdrErrCheck()) { ps->free(px); return 0; }
431  lnk = lt->lookup(sIpAdr, sPort);
432  if (lnk == 0 && p.type == Forest::CONNECT
433  && p.length == Forest::OVERHEAD+2*sizeof(uint64_t)) {
434  uint64_t nonce = ntohl(p.payload()[2]); nonce <<= 32;
435  nonce |= ntohl(p.payload()[3]);
436  lnk = lt->lookup(nonce); // check for "startup" entry
437  }
438  if (lnk == 0 || cIf != lt->getIface(lnk)) {
439  string s;
440  cerr << "RouterInProc::receive: bad packet: lnk=" << lnk << " "
441  << p.toString(s);
442  cerr << "sender=(" << Np4d::ip2string(sIpAdr,s) << ","
443  << sPort << ")\n";
444  ps->free(px); return 0;
445  }
446 
447  p.inLink = lnk; p.outLink = 0;
448  p.bufferLen = nbytes;
449  p.tunIp = sIpAdr; p.tunPort = sPort;
450 
451  sm->cntInLink(lnk,Forest::truPktLeng(nbytes),
452  (lt->getPeerType(lnk) == Forest::ROUTER));
453  return px;
454 }
455 
461 bool RouterInProc::pktCheck(pktx px, int ctx) {
462  Packet& p = ps->getPacket(px);
463  // check version and length
464  if (p.version != Forest::FOREST_VERSION) {
465  return false;
466  }
467  if (p.length != p.bufferLen || p.length < Forest::HDR_LENG) {
468  return false;
469  }
470  if (p.type == Forest::CONNECT || p.type == Forest::DISCONNECT)
471  return (p.length == Forest::OVERHEAD+2*sizeof(uint64_t));
472 
473  fAdr_t adr = p.dstAdr;
474  if (!Forest::validUcastAdr(adr) && !Forest::mcastAdr(adr)) {
475  return false;
476  }
477 
478  int inLink = p.inLink;
479  if (inLink == 0) return false;
480  if (ctx != 0) {
481  int cLnk = ctt->getComtLink(ctt->getComtree(ctx),inLink);
482  if (cLnk == 0) {
483  return false;
484  }
485  }
486 
487  // extra checks for packets from untrusted peers
488  if (lt->getPeerType(inLink) < Forest::TRUSTED) {
489  // verify that type is valid
490  Forest::ptyp_t ptype = p.type;
491  if (ptype != Forest::CLIENT_DATA &&
492  ptype != Forest::CONNECT && ptype != Forest::DISCONNECT &&
493  ptype != Forest::SUB_UNSUB && ptype != Forest::CLIENT_SIG)
494  return false;
495  // check for spoofed source address
496  if (lt->getPeerAdr(inLink) != p.srcAdr) return false;
497  // check that only client signalling packets on new comt
498  if (ctx == 0) return ptype == Forest::CLIENT_SIG);
499  // verify that header consistent with comtree constraints
500  fAdr_t dest = ctt->getDest(cLnk);
501  if (dest!=0 && p.dstAdr != dest && p.dstAdr != myAdr)
502  return false;
503  int comt = ctt->getComtree(ctx);
504  if ((ptype == Forest::CONNECT || ptype == Forest::DISCONNECT) &&
505  comt != (int) Forest::CONNECT_COMT)
506  return false;
507  if (ptype == Forest::CLIENT_SIG &&
508  comt != (int) Forest::CLIENT_SIG_COMT)
509  return false;
510  } else if (ctx == 0) {
511  return p.type == Forest::NET_SIG;
512  }
513  return true;
514 }
515 
516 } // ends namespace