forest-net
an overlay networks for large-scale virtual worlds
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator
RouterOutProc.cpp
Go to the documentation of this file.
1 
9 #include "RouterOutProc.h"
10 
11 using namespace forest;
12 
13 namespace forest {
14 
19 RouterOutProc::RouterOutProc(Router *rtr1) : rtr(rtr1) {
20 }
21 
22 RouterOutProc::~RouterOutProc() {
23 }
24 
30 void RouterOutProc::run(seconds runTime) {
31  bool didNothing;
32  int controlCount = 20; // used to limit overhead of control
33  // packet processing
34  unique_lock ltLock( rtr->ltMtx,defer_lock);
35  unique_lock cttLock(rtr->cttMtx,defer_lock);
36  unique_lock rtLock(rtr->rtMtx, defer_lock);
37  lock(ltLock, cttLock, rtLock);
38 
39  nanoseconds temp = high_resolution_clock::now() - rtr->tZero;
40  now = temp.count(); // time since router started running
41  int64_t startTime = now;
42  int64_t statsTime = now;
43  int64_t finishTime = runTime.count();
44  finishTime *= 1000000000; finishTime += now;
45  while (runTime.count() == 0 || now < finishTime) {
46  // update time
47  nanoseconds temp = high_resolution_clock::now() - rtr->tZero;
48  now = temp.count();
49 
50  didNothing = true;
51 
52  // process packet from transfer queue, if any
53  if (!rtr->xferQ.empty()) {
54  didNothing = false;
55  pktx px = rtr->xferQ.deq();
56  Packet& p = rtr->ps->getPacket(px);
57  lock(ltLock, cttLock, rtLock);
58  int ctx = rtr->ctt->getComtIndex(p.comtree);
59  if (p.destAdr != rtr->myAdr) {
60  if (p.outLink == 0) {
61  forward(px,ctx);
62  } else if (p.outLink != 0) {
63  // enqueue it on specified link
64  int cLnk = rtr->ctt->getComtLink(
65  p.comtree,p.outLink);
66  int qid = rtr->ctt->getLinkQ(ctx,cLnk)
67  if (!rtr->qm->enq(px,qid,now))
68  rtr->ps->free(px);
69  }
70  } else if (p.flags & Forest::ACK_FLAG) {
71  // find and remove matching request
72  int64_t seqNum =htohl((p.payload())[0]);
73  seqNum <<= 32;
74  seqNum |= ntohl((p.payload())[1]);
75  int rx = rptr->deleteMatch(seqNum);
76  if (rx != 0) ps->free(rx);
77  } else if (p.type == Forest::SUB_UNSUB) {
78  subUnsub(px,ctx);
79  } else if (p.type == Forest::RTE_REPLY) {
80  handleRteReply(px,ctx);
81  } else if (p.type == Forest::CONNECT ||
82  p.type == Forest::DISCONNECT) {
83  handleConnDisc(px);
84  } else {
85  ps->free(px);
86  }
87  // unlock everything
88  ltLock.unlock();
89  cttLock.unlock();
90  rtLock.unlock();
91  }
92 
93  // output processing
94  int lnk;
95  while ((px = rtr->qm->deq(lnk, now)) != 0) {
96  didNothing = false;
97  pktLog->log(px,lnk,true,now);
98  send(px,lnk);
99  }
100 
101  // every 300 ms, update statistics and check for un-acked
102  // in-band control packets
103  if (now > statsTime + 300*1000000) {
104  sm->record(now); statsTime = now; resendControl();
105  didNothing = false;
106  }
107 
108  // if did nothing on that pass, sleep for a millisecond.
109  if (didNothing)
110  this_thread::sleep_for(chrono::milliseconds(1));
111  }
112 
113  // write out recorded events
114  pktLog->write(cout);
115  cout << endl;
116  cout << sm->iPktCnt(0) << " packets received, "
117  << sm->oPktCnt(0) << " packets sent\n";
118  cout << sm->iPktCnt(-1) << " from routers, "
119  << sm->oPktCnt(-1) << " to routers\n";
120  cout << sm->iPktCnt(-2) << " from clients, "
121  << sm->oPktCnt(-2) << " to clients\n";
122 }
123 
128 void RouterOutProc::forward(pktx px, int ctx) {
129  Packet& p = rtr->ps->getPacket(px);
130  int rtx = rtr->rt->getRtx(p.comtree,p.dstAdr);
131  if (rtx != 0) { // valid route case
132  // reply to route request
133  if ((p.flags & Forest::RTE_REQ)) {
134  sendRteReply(px,ctx);
135  p.flags = (p.flags & (~Forest::RTE_REQ));
136  p.pack();
137  p.hdrErrUpdate();
138  }
139  if (Forest::validUcastAdr(p.dstAdr)) {
140  int rcLnk = rtr->rt->getClnkNum(p.comtree, p.inLink);
141  int qid = rtr->ctt->getLinkQ(ctx,rcLnk);
142  if (lnk == p.inLink || !rtr->qm->enq(px,qid,now)) {
143  rtr->ps->free(px);
144  }
145  return;
146  }
147  // multicast data packet
148  multiSend(px,ctx,rtx);
149  return;
150  }
151  // no valid route
152  if (Forest::validUcastAdr(p.dstAdr)) {
153  if (rtr->firstLeafAdr <= p.dstAdr &&
154  p.dstAdr <= rtr->lastLeafAdr)) {
155  // send error packet to client
157  (p.payload())[0] = htonl(p.dstAdr);
158  p.dstAdr = p.srcAdr;
159  p.srcAdr = rtr->myAdr;
160  p.length = Forest::OVERHEAD + sizeof(fAdr_t);
161  p.pack(); p.hdrErrUpdate(); p.payErrUpdate();
162  int cLnk = rtr->ctt->getClnkNum(ctx,p.inLink);
163  int qid = rtr->ctt->getLinkQ(ctx,cLnk);
164  if (!rtr->qm->enq(px,qid,now)) rtr->ps->free(px);
165  return;
166  }
167  // send to neighboring routers in comtree
169  p.pack(); p.hdrErrUpdate();
170  }
171  multiSend(px,ctx,rtx);
172  return;
173 }
174 
185 void RouterOutProc::multiSend(pktx px, int ctx, int rtx) {
186  int qvec[nLnks]; int n = 0;
187  Packet& p = rtr->ps->getPacket(px);
188 
189  int inLink = p.inLink;
190  if (Forest::validUcastAdr(p.dstAdr)) {
191  // flooding a unicast packet to neighboring routers
192  int myZip = Forest::zipCode(myAdr);
193  int pZip = Forest::zipCode(p.dstAdr);
194  for (int rcLnk = rtr->ctt->firstRtrLink(ctx); rcLnk != 0;
195  rcLnk = rtr->ctt->nextRtrLink(ctx,rcLnk)) {
196  int lnk = rtr->ctt->getLink(ctx,rcLnk);
197  int peerZip = Forest::zipCode(rtr->lt->getPeerAdr(lnk));
198  if (pZip == myZip && peerZip != myZip) continue;
199  if (lnk == inLink) continue;
200  qvec[n++] = rtr->ctt->getLinkQ(ctx,rcLnk);
201  }
202  } else {
203  // forwarding a multicast packet
204  // first identify neighboring core routers to get copies
205  int pLink = rtr->ctt->getPlink(ctx);
206  for (int rcLnk = rtr->ctt->firstCoreLink(ctx); rcLnk != 0;
207  rcLnk = rtr->ctt->nextCoreLink(ctx,rcLnk)) {
208  int lnk = rtr->ctt->getLink(ctx,rcLnk);
209  if (lnk == inLink || lnk == pLink) continue;
210  qvec[n++] = rtr->ctt->getLinkQ(ctx,rcLnk);
211  }
212  // now copy for parent
213  if (pLink != 0 && pLink != inLink) {
214  qvec[n++] = rtr->ctt->getLinkQ(ctt->getPCLink(ctx));
215  }
216  // now, copies for subscribers if any
217  if (rtx != 0) {
218  for (int rcLnk = rtr->rt->firstComtLink(rtx); rcLnk!=0;
219  rcLnk = rtr->rt->nextComtLink(rtx)) {
220  int lnk = rtr->ctt->getLink(rcLnk);
221  if (lnk == inLink) continue;
222  qvec[n++] = rtr->ctt->getLinkQ(rcLnk);
223  }
224  }
225  }
226 
227  if (n == 0) { rtr->ps->free(px); return; }
228 
229  // make copies and queue them
230  pktx px1 = px;
231  for (int i = 0; i < n-1; i++) { // process first n-1 copies
232  if (rtr->qm->enq(px1,qvec[i],now)) {
233  px1 = rtr->ps->clone(px);
234  }
235  }
236  // process last copy
237  if (!rtr->qm->enq(px1,qvec[n-1],now)) {
238  rtr->ps->free(px1);
239  }
240 }
241 
248 void RouterOutProc::sendRteReply(pktx px, int ctx) {
249  Packet& p = rtr->ps->getPacket(px);
250 
251  pktx px1 = rtr->ps->alloc();
252  Packet& p1 = rtr->ps->getPacket(px1);
253  p1.length = Forest::OVERHEAD + sizeof(fAdr_t);
254  p1.type = Forest::RTE_REPLY;
255  p1.flags = 0;
256  p1.comtree = p.comtree;
257  p1.srcAdr = myAdr;
258  p1.dstAdr = p.srcAdr;
259 
260  p1.pack();
261  (p1.payload())[0] = htonl(p.dstAdr);
262  p1.hdrErrUpdate(); p.payErrUpdate();
263 
264  int cLnk = rtr->ctt->getComtLink(rtr->ctt->getComtree(ctx),p.inLink);
265  if (!rtr->qm->enq(px1,rtr->ctt->getLinkQ(cLnk),now)) rtr->ps->free(px1);
266 }
267 
276 void RouterOutProc::handleRteReply(pktx px, int ctx) {
277  Packet& p = rtr->ps->getPacket(px);
278  int rtx = rtr->rt->getRteIndex(p.comtree, p.dstAdr);
279  int cLnk = rtr->ctt->getComtLink(ctt->getComtree(ctx),p.inLink);
280  if ((p.flags & Forest::RTE_REQ) && rtx != 0)
281  sendRteReply(px,ctx);
282  int adr = ntohl((p.payload())[0]);
283  if (Forest::validUcastAdr(adr) &&
284  rtr->rt->getRteIndex(p.comtree,adr) == 0) {
285  rtr->rt->addEntry(p.comtree,adr,cLnk);
286  }
287  if (rtx == 0) {
288  // send to neighboring routers in comtree
290  p.pack(); p.hdrErrUpdate();
291  multiSend(px,ctx,rtx);
292  return;
293  }
294  int dcLnk = rtr->rt->getLink(rtx); int dLnk = rtr->ctt->getLink(dcLnk);
295  if (rtr->lt->getPeerType(dLnk) != Forest::ROUTER ||
296  !rtr->qm->enq(px,dLnk,now))
297  rtr->ps->free(px);
298  return;
299 }
300 
308 void RouterOutProc::subUnsub(pktx px, int ctx) {
309  Packet& p = rtr->ps->getPacket(px);
310  uint32_t *pp = p.payload();
311 
312  // add/remove branches from routes
313  // if non-core node, also propagate requests upward as appropriate
314  int comt = rtr->ctt->getComtree(ctx);
315  int inLink = p.inLink;
316  int cLnk = rtr->ctt->getClnkNum(comt,inLink);
317 
318  // ignore subscriptions from the parent or core neighbors
319  if (inLink == rtr->ctt->getPlink(ctx) ||
320  rtr->ctt->isCoreLink(ctx,cLnk)) {
321  rtr->ps->free(px); return;
322  }
323 
324  // make copy to be used for ack
325  pktx cx = rtr->ps->fullCopy(px);
326  Packet& copy = rtr->ps->getPacket(cx);
327 
328  bool propagate = false;
329  int rtx; fAdr_t addr;
330 
331  // add subscriptions
332  int addcnt = ntohl(pp[2]);
333  if (addcnt < 0 || addcnt > 350 ||
334  Forest::OVERHEAD + (addcnt + 4)*4 > p.length) {
335  rtr->ps->free(px); rtr->ps->free(cx); return;
336  }
337  for (int i = 3; i <= addcnt + 2; i++) {
338  addr = ntohl(pp[i]);
339  if (!Forest::mcastAdr(addr)) continue; // ignore unicast or 0
340  rtx = rtr->rt->getRteIndex(comt,addr);
341  if (rtx == 0) {
342  rtx = rtr->rt->addRoute(comt,addr,cLnk);
343  propagate = true;
344  } else if (!rt->isLink(rtx,cLnk)) {
345  rtr->rt->addLink(rtx,cLnk);
346  pp[i] = 0; // so, parent will ignore
347  }
348  }
349  // remove subscriptions
350  int dropcnt = ntohl(pp[addcnt+3]);
351  if (dropcnt < 0 || addcnt + dropcnt > 350 ||
352  Forest::OVERHEAD + (addcnt + dropcnt + 4)*4 > p.length) {
353  rtr->ps->free(px); rtr->ps->free(cx); return;
354  }
355  for (int i = addcnt + 4; i <= addcnt + dropcnt + 3; i++) {
356  addr = ntohl(pp[i]);
357  if (!Forest::mcastAdr(addr)) continue; // ignore unicast or 0
358  rtx = rtr->rt->getRtx(comt,addr);
359  if (rtx == 0) continue;
360  rtr->rt->removeLink(rtx,cLnk);
361  if (rtr->rt->noLinks(rtx)) {
362  rtr->rt->removeRoute(rtx);
363  propagate = true;
364  } else {
365  pp[i] = 0;
366  }
367  }
368  // propagate subscription packet to parent if not a core node
369  if (propagate && !ctt->inCore(ctx) && rtr->ctt->getPlink(ctx) != 0) {
370  pp[0] = htonl((uint32_t) (seqNum >> 32));
371  pp[1] = htonl((uint32_t) (seqNum & 0xffffffff));
372  p.srcAdr = myAdr;
373  p.dstAdr = rtr->lt->getPeerAdr(rtr->ctt->getPlink(ctx));
374 // change
375  sendControl(px,seqNum++,ctt->getPlink(ctx));
376  } else {
377  rtr->ps->free(px);
378  }
379  // send ack back to sender
380  // note that we send ack before getting ack from sender
381  // this is by design
382  copy.flags |= Forest::ACK_FLAG;
383  copy.dstAdr = copy.srcAdr; copy.srcAdr = myAdr;
384  copy.pack();
385  int qid = rtr->ctt->getLinkQ(cLnk);
386  if (!rtr->qm->enq(cx,qid,now)) rtr->ps->free(cx);
387  return;
388 }
389 
393 void RouterOutProc::handleConnDisc(pktx px) {
394  Packet& p = rtr->ps->getPacket(px);
395  int inLnk = p.inLink;
396 
397  if (p.srcAdr != rtr->lt->getPeerAdr(inLnk) ||
398  p.length != Forest::OVERHEAD + 8) {
399  rtr->ps->free(px); return;
400  }
401 
402  uint64_t nonce = ntohl(p.payload()[0]);
403  nonce <<= 32; nonce |= ntohl(p.payload()[1]);
404  if (nonce != rtr->lt->getNonce(inLnk)) { rtr->ps->free(px); return; }
405  if (p.type == Forest::CONNECT) {
406  if (rtr->lt->isConnected(inLnk) &&
407  !rtr->lt->revertEntry(inLnk)) {
408  rtr->ps->free(px); return;
409  }
410  if (!rtr->lt->remapEntry(inLnk,p.tunIp,p.tunPort)) {
411  rtr->ps->free(px); return;
412  }
413  lt->setConnectStatus(inLnk,true);
414  if (nmAdr != 0 && rtr->lt->getPeerType(inLnk)==Forest::CLIENT) {
415  CtlPkt cp(CtlPkt::CLIENT_CONNECT,CtlPkt::REQUEST,0);
416  cp.adr1 = p.srcAdr; cp.adr2 = myAdr;
417  sendCpReq(cp,nmAdr);
418  }
419  } else if (p.type == Forest::DISCONNECT) {
420  rtr->lt->setConnectStatus(inLnk,false);
421  rtr->lt->revertEntry(inLnk);
422  if (nmAdr != 0 && rtr->lt->getPeerType(inLnk)==Forest::CLIENT) {
423  dropLink(inLnk);
424  CtlPkt cp(CtlPkt::CLIENT_DISCONNECT,CtlPkt::REQUEST,0);
425  cp.adr1 = p.srcAdr; cp.adr2 = myAdr;
426 // todo
427  sendCpReq(cp,nmAdr);
428  }
429  }
430  // send ack back to sender
431  p.flags |= Forest::ACK_FLAG; p.dstAdr = p.srcAdr; p.srcAdr = myAdr;
432  p.pack();
433  pktLog->log(px,inLnk,true,now); send(px,inLnk);
434  return;
435 
436 
441 void RouterOutProc::sendConnDisc(int lnk, Forest::ptyp_t type) {
442  pktx px = rtr->ps->alloc();
443  Packet& p = rtr->ps->getPacket(px);
444 
445  uint64_t nonce = rtr->lt->getNonce(lnk);
446  p.length = Forest::OVERHEAD + 8; p.type = type; p.flags = 0;
447  p.comtree = Forest::CONNECT_COMT;
448  p.srcAdr = myAdr; p.dstAdr = rtr->lt->getPeerAdr(lnk);
449  p.payload()[2] = htonl((uint32_t) (nonce >> 32));
450  p.payload()[3] = htonl((uint32_t) (nonce & 0xffffffff));
451 
452  sendControl(px,nonce,lnk);
453 }
454 
457 bool RouterOutProc::sendControl(pktx px, int lnk) {
458  Packet& p = rtr->ps->getPacket(px);
459  uint64_t = rtr->nextSeqNum();
460  p.payload()[0] = htonl((uint32_t) (seqNum >> 32));
461  p.payload()[1] = htonl((uint32_t) (seqNum & 0xffffffff));
462  p.pack();
463 
464  // now, make copy of packet and save it in pending
465  pktx cx = rtr->ps->clone(px);
466  if (cx == 0) {
467  cerr << "RouterOutProc::sendControl: no packets left in packet "
468  "store\n";
469  return false;
470  }
471 
472  // save a record of the packet in pending map
473  int x = resendMap->insert(pid,px);
474 
475  // send the packet
476 // need to review this
477 // another issue, when adding/removing a link, we need to
478 / send a conn/disc packet
479  comt_t comt = p.comtree;
480  if (rtr->booting) {
481  pktLog->log(cx,lnk,true,now); bootSend(cx,0);
482  } else if (lnk != 0) {
483  int ctx; int clnk; int qid;
484  if ((ctx = rtr->ctt->getComtIndex(comt)) == 0 ||
485  (clnk = rtr->ctt->getClnkNum(comt,lnk)) == 0 ||
486  (qid = rtr->ctt->getLinkQ(clnk)) == 0 ||
487  (!rtr->qm->enq(cx,qid,now))) {
488  rtr->ps->free(cx);
489  }
490  } else {
491  forward(cx,rtr->ctt->getComtIndex(comt));
492  }
493  return true;
494 }
495 
502 void RouterOutProc::resendControl() {
503  map<uint64_t,ControlInfo>::iterator pp;
504  list<uint64_t> dropList;
505  for (pp = pending->begin(); pp != pending->end(); pp++) {
506  if (now < pp->second.timestamp + 1000000000) continue;
507  pktx px = pp->second.px;
508  Packet& p = rtr->ps->getPacket(px);
509  string s;
510  if (pp->second.nSent >= 3) { // give up on this packet
511  cerr << "RouterOutProc::resendControl: received no "
512  "reply to control packet after 3 attempts\n"
513  << p.toString(s);
514  rtr->ps->free(px); dropList.push_front(pp->first);
515  continue;
516  }
517  // make copy of packet and send the copy
518  pp->second.timestamp = now;
519  pp->second.nSent++;
520  pktx cx = rtr->ps->fullCopy(px);
521  if (cx == 0) {
522  cerr << "RouterOutProc::resendControl: no packets left in "
523  "packet store\n";
524  break;
525  }
526  int lnk = pp->second.lnk; comt_t comt = p.comtree;
527  if (booting) {
528  pktLog->log(cx,lnk,true,now); iop->send(cx,lnk);
529  } else if (lnk != 0) {
530  int ctx; int clnk; int qid;
531  if ((ctx = rtr->ctt->getComtIndex(comt)) == 0 ||
532  (clnk = rtr->ctt->getComtLink(comt,lnk)) == 0 ||
533  (qid = rtr->ctt->getLinkQ(clnk)) == 0 ||
534  (!rtr->qm->enq(cx,qid,now)))
535  rtr->ps->free(cx);
536  } else {
537  forward(cx,ctt->getComtIndex(comt));
538  }
539  }
540  // remove expired entries from pending list
541  for (uint64_t pid : dropList) pending->erase(pid);
542 }
543 
552 void RouterOutProc::handleControlReply(pktx rx) {
553  Packet& reply = rtr->ps->getPacket(rx);
554  uint64_t pid;
555 
556  CtlPkt cpr;
557  if (reply.type == Forest::CONNECT || reply.type == Forest::DISCONNECT ||
558  reply.type == Forest::SUB_UNSUB) {
559  pid = ntohl(reply.payload()[0]); pid <<= 32;
560  pid |= ntohl(reply.payload()[1]);
561  } else if (reply.type == Forest::NET_SIG) {
562  cpr.reset(reply); pid = cpr.seqNum;
563  } else {
564  string s;
565  cerr << "RouterOutProc::handleControlReply: unexpected reply "
566  << reply.toString(s);
567  rtr->ps->free(rx); return;
568  }
569 
570  map<uint64_t,ControlInfo>::iterator pp = pending->find(pid);
571  if (pp == pending->end()) {
572  // this is a reply to a request we never sent, or
573  // possibly, a reply to a request we gave up on
574  string s;
575  cerr << "RouterOutProc::handleControlReply: unexpected reply "
576  << reply.toString(s);
577  rtr->ps->free(rx); return;
578  }
579  // handle signalling packets
580  if (reply.type == Forest::NET_SIG) {
581  if (cpr.mode == CtlPkt::NEG_REPLY) {
582  string s;
583  cerr << "RouterOutProc::handleControlReply: got negative "
584  "reply to "
585  << rtr->ps->getPacket(pp->second.px).toString(s);
586  cerr << "reply=" << reply.toString(s);
587  } else if (cpr.type == CtlPkt::BOOT_ROUTER) {
588  if (booting && !setup()) {
589  cerr << "RouterOutProc::handleControlReply: "
590  "setup failed after completion of boot "
591  "phase\n";
592  perror("");
593  pktLog->write(cout);
594  exit(1);
595  }
596  iop->closeBootSock();
597  booting = false;
598  }
599  }
600 
601  // free both packets and erase pending entry
602  rtr->ps->free(pp->second.px); rtr->ps->free(rx); pending->erase(pp);
603 }
604 
610 void RouterOutProc::returnToSender(pktx px, CtlPkt& cp) {
611  Packet& p = rtr->ps->getPacket(px);
612  cp.payload = p.payload();
613  int paylen = cp.pack();
614  if (paylen == 0) {
615  cerr << "RouterOutProc::returnToSender: control packet formatting "
616  "error, zero payload length\n";
617  rtr->ps->free(px);
618  }
619  p.length = (Packet::OVERHEAD + paylen);
620  p.flags = 0;
621  p.dstAdr = p.srcAdr;
622  p.srcAdr = myAdr;
623  p.pack();
624 
625  if (booting) {
626  pktLog->log(px,0,true,now);
627  iop->send(px,0);
628  return;
629  }
630 
631  int cLnk = rtr->ctt->getComtLink(p.comtree,p.inLink);
632  int qn = rtr->ctt->getLinkQ(cLnk);
633  if (!rtr->qm->enq(px,qn,now)) { rtr->ps->free(px); }
634 }
635 
636 } // ends namespace