forest-net
an overlay networks for large-scale virtual worlds
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator
RouterCoreDoowon.cpp
1 
9 #include "RouterCore.h"
10 
11 using namespace forest;
12 
23 bool processArgs(int argc, char *argv[], RouterInfo& args) {
24  // set default values
25  args.mode = "local";
26  args.myAdr = args.bootIp = args.nmAdr = args.nmIp = 0;
27  args.ccAdr = args.firstLeafAdr = args.lastLeafAdr = 0;
28  args.ifTbl = ""; args.lnkTbl = ""; args.comtTbl = "";
29  args.rteTbl = ""; args.statSpec = "";
30  args.portNum = 0; args.finTime = 0;
31 
32  string s;
33  for (int i = 1; i < argc; i++) {
34  s = argv[i];
35  if (s.compare(0,10,"mode=local") == 0) {
36  args.mode = "local";
37  } else if (s.compare(0,11,"mode=remote") == 0) {
38  args.mode = "remote";
39  } else if (s.compare(0,6,"myAdr=") == 0) {
40  args.myAdr = Forest::forestAdr(&argv[i][6]);
41  } else if (s.compare(0,7,"bootIp=") == 0) {
42  args.bootIp = Np4d::ipAddress(&argv[i][7]);
43  } else if (s.compare(0,6,"nmAdr=") == 0) {
44  args.nmAdr = Forest::forestAdr(&argv[i][6]);
45  } else if (s.compare(0,5,"nmIp=") == 0) {
46  args.nmIp = Np4d::ipAddress(&argv[i][5]);
47  } else if (s.compare(0,6,"ccAdr=") == 0) {
48  args.ccAdr = Forest::forestAdr(&argv[i][6]);
49  } else if (s.compare(0,13,"firstLeafAdr=") == 0) {
50  args.firstLeafAdr = Forest::forestAdr(&argv[i][13]);
51  } else if (s.compare(0,12,"lastLeafAdr=") == 0) {
52  args.lastLeafAdr = Forest::forestAdr(&argv[i][12]);
53  } else if (s.compare(0,6,"ifTbl=") == 0) {
54  args.ifTbl = &argv[i][6];
55  } else if (s.compare(0,7,"lnkTbl=") == 0) {
56  args.lnkTbl = &argv[i][7];
57  } else if (s.compare(0,8,"comtTbl=") == 0) {
58  args.comtTbl = &argv[i][8];
59  } else if (s.compare(0,7,"rteTbl=") == 0) {
60  args.rteTbl = &argv[i][7];
61  } else if (s.compare(0,9,"statSpec=") == 0) {
62  args.statSpec = &argv[i][9];
63  } else if (s.compare(0,8,"portNum=") == 0) {
64  sscanf(&argv[i][8],"%hd",&args.portNum);
65  } else if (s.compare(0,8,"finTime=") == 0) {
66  sscanf(&argv[i][8],"%d",&args.finTime);
67  } else {
68  cerr << "unrecognized argument: " << argv[i] << endl;
69  return false;
70  }
71  }
72  if (args.mode.compare("local") == 0 &&
73  (args.myAdr == 0 || args.firstLeafAdr == 0 ||
74  args.lastLeafAdr ==0 || args.lastLeafAdr < args.firstLeafAdr)) {
75  cerr << "processArgs: local configuration requires myAdr, "
76  "firstLeafAdr, lastLeafAdr and that firstLeafAdr "
77  "be no larger than lastLeafAdr\n";
78  return false;
79  } else if (args.mode.compare("remote") == 0 &&
80  (args.bootIp == 0 || args.myAdr == 0 ||
81  args.nmIp == 0 || args.nmAdr == 0)) {
82  cerr << "processArgs: remote configuration requires bootIp, "
83  "myAdr, netMgrIp and netMgrAdr\n";
84  return false;
85  }
86  return true;
87 }
88 
91 int main(int argc, char *argv[]) {
92  RouterInfo args;
93  if (!processArgs(argc, argv, args))
94  fatal("fRouter: error processing command line arguments");
95  bool booting = args.mode.compare("remote") == 0;
96  RouterCore router(booting,args);
97 
98  if (!router.readTables(args))
99  fatal("router: could not read specified config files");
100  if (!booting) {
101  if (!router.setup())
102  fatal("router: inconsistency in config files");
103  }
104  router.run(args.finTime);
105  cout << endl;
106  router.dump(cout); // print final tables
107  cout << endl;
108  return 0;
109 }
110 
111 namespace forest {
112 
117 RouterCore::RouterCore(bool booting1, const RouterInfo& config)
118  : booting(booting1) {
119  nIfaces = 50; nLnks = 1000;
120  nComts = 5000; nRts = 100000;
121  nPkts = 100000; nBufs = 50000; nQus = 10000;
122 
123  myAdr = config.myAdr;
124  bootIp = config.bootIp;
125  nmAdr = config.nmAdr;
126  nmIp = config.nmIp;
127  ccAdr = config.ccAdr;
128  firstLeafAdr = config.firstLeafAdr;
129 
130  ps = new PacketStore(nPkts, nBufs);
131  ift = new IfaceTable(nIfaces);
132  lt = new LinkTable(nLnks);
133  ctt = new ComtreeTable(nComts,10*nComts,lt);
134  rt = new RouteTable(nRts,myAdr,ctt);
135  sm = new StatsModule(1000, nLnks, nQus, ctt);
136  iop = new IoProcessor(nIfaces, config.portNum, ift, lt, ps, sm);
137  qm = new QuManager(nLnks, nPkts, nQus, min(50,5*nPkts/nLnks), ps, sm);
138  pktLog = new PacketLog(20000,500,ps);
139 
140  if (!booting)
141  leafAdr = new UiSetPair((config.lastLeafAdr - firstLeafAdr)+1);
142 
143  seqNum = 1;
144  pending = new map<uint64_t,CpInfo>;
145 }
146 
147 RouterCore::~RouterCore() {
148  delete pktLog; delete qm; delete iop; delete sm;
149  delete rt; delete ctt; delete lt; delete ift; delete ps;
150  if (leafAdr != 0) delete leafAdr;
151 }
152 
159 bool RouterCore::readTables(const RouterInfo& config) {
160  if (config.ifTbl.compare("") != 0) {
161  ifstream fs; fs.open(config.ifTbl.c_str());
162  if (fs.fail() || !ift->read(fs)) {
163  cerr << "RouterCore::init: can't read "
164  << "interface table\n";
165  return false;
166  }
167  fs.close();
168  }
169  if (config.lnkTbl.compare("") != 0) {
170  ifstream fs; fs.open(config.lnkTbl.c_str());
171  if (fs.fail() || !lt->read(fs)) {
172  cerr << "RouterCore::init: can't read "
173  << "link table\n";
174  return false;
175  }
176  fs.close();
177  }
178  if (config.comtTbl.compare("") != 0) {
179  ifstream fs; fs.open(config.comtTbl.c_str());
180  if (fs.fail() || !ctt->read(fs)) {
181  cerr << "RouterCore::init: can't read "
182  << "comtree table\n";
183  return false;
184  }
185  fs.close();
186  }
187  if (config.rteTbl.compare("") != 0) {
188  ifstream fs; fs.open(config.rteTbl.c_str());
189  if (fs.fail() || !rt->read(fs)) {
190  cerr << "RouterCore::init: can't read "
191  << "routing table\n";
192  return false;
193  }
194  fs.close();
195  }
196  if (config.statSpec.compare("") != 0) {
197  ifstream fs; fs.open(config.statSpec.c_str());
198  if (fs.fail() || !sm->read(fs)) {
199  cerr << "RouterCore::init: can't read "
200  << "statistics spec\n";
201  return false;
202  }
203  fs.close();
204  }
205  return true;
206 }
207 
212 bool RouterCore::setup() {
213  dump(cout);
214  if (!setupIfaces()) return false;
215  if (!setupLeafAddresses()) return false;
216  if (!setupQueues()) return false;
217  if (!checkTables()) return false;
218  if (!setAvailRates()) return false;
219  addLocalRoutes();
220 
221  return true;
222 }
223 
229  for (int iface = ift->firstIface(); iface != 0;
230  iface = ift->nextIface(iface)) {
231  if (iop->ready(iface)) continue;
232  if (!iop->setup(iface)) {
233  cerr << "RouterCore::setupIfaces: could not "
234  "setup interface " << iface << endl;
235  return false;
236  }
237  }
238  return true;
239 }
240 
241 
248  for (int lnk = lt->firstLink(); lnk != 0; lnk = lt->nextLink(lnk)) {
249  if (booting || lt->getPeerType(lnk) == Forest::ROUTER) continue;
250  if (!allocLeafAdr(lt->getPeerAdr(lnk)))
251  return false;
252  }
253  return true;
254 }
255 
263  // Set link rates in QuManager
264  for (int lnk = lt->firstLink(); lnk != 0; lnk = lt->nextLink(lnk)) {
265  qm->setLinkRates(lnk,lt->getRates(lnk));
266  }
269  int ctx;
270  for (ctx = ctt->firstComtIndex(); ctx != 0;
271  ctx = ctt->nextComtIndex(ctx)) {
272  set<int>& links = ctt->getLinks(ctx);
273  set<int>::iterator p;
274  for (p = links.begin(); p != links.end(); p++) {
275  int cLnk = *p; int lnk = ctt->getLink(cLnk);
276  int qid = qm->allocQ(lnk);
277  if (qid == 0) return false;
278  ctt->setLinkQ(cLnk,qid);
279  qm->setQRates(qid,rs);
280  if (lt->getPeerType(lnk) == Forest::ROUTER)
281  qm->setQLimits(qid,100,200000);
282  else
283  qm->setQLimits(qid,50,100000);
284  sm->clearQuStats(qid);
285  }
286  }
287  return true;
288 }
289 
297  bool success = true;
298 
299  // verify that the default interface is valid and
300  // that each interface has a non-zero IP address
301  if (!ift->valid(ift->getDefaultIface())) {
302  cerr << "RouterCore::checkTables: specified default iface "
303  << ift->getDefaultIface() << " is invalid\n";
304  success = false;
305  }
306  int iface;
307  for (iface = ift->firstIface(); iface != 0;
308  iface = ift->nextIface(iface)) {
309  if (ift->getIpAdr(iface) == 0) {
310  cerr << "RouterCore::checkTables: interface "
311  << iface << " has zero for IP address\n";
312  success = false;
313  }
314  }
315 
316  // verify that each link is assigned to a valid interface
317  // that the peer Ip address and port are non-zero and consistent,
318  // the the peer type is valid and that the peer address is a valid
319  // unicast forest address
320  int lnk;
321  for (lnk = lt->firstLink(); lnk != 0; lnk = lt->nextLink(lnk)) {
322  iface = lt->getIface(lnk);
323  if (!ift->valid(iface)) {
324  cerr << "RouterCore::checkTables: interface " << iface
325  << " for link " << lnk << " is not valid\n";
326  success = false;
327  }
328  if (lt->getPeerIpAdr(lnk) == 0 &&
329  lt->getPeerType(lnk) == Forest::ROUTER) {
330  cerr << "RouterCore::checkTables: invalid peer IP "
331  << "for link " << lnk << endl;
332  success = false;
333  }
334  if (!Forest::validUcastAdr(lt->getPeerAdr(lnk))) {
335  cerr << "RouterCore::checkTables: invalid peer address "
336  << "for link " << lnk << endl;
337  success = false;
338  }
339  }
340 
341  // verify that the links in each comtree are valid,
342  // that the router links and core links refer to peer routers,
343  // and that each comtree link is consistent
344  int ctx;
345  for (ctx = ctt->firstComtIndex(); ctx != 0;
346  ctx = ctt->nextComtIndex(ctx)) {
347  int comt = ctt->getComtree(ctx);
348  int plnk = ctt->getPlink(ctx);
349  int pcLnk = ctt->getPCLink(ctx);
350  if (plnk != ctt->getLink(pcLnk)) {
351  cerr << "RouterCore::checkTables: parent link "
352  << plnk << " not consistent with pcLnk\n";
353  success = false;
354  }
355  if (ctt->inCore(ctx) && plnk != 0 && !ctt->isCoreLink(pcLnk)) {
356  cerr << "RouterCore::checkTables: parent link "
357  << plnk << " of core node does not lead to "
358  << "another core node\n";
359  success = false;
360  }
361  set<int>& links = ctt->getLinks(ctx);
362  set<int>::iterator p;
363  for (p = links.begin(); p != links.end(); p++) {
364  int cLnk = *p; int lnk = ctt->getLink(cLnk);
365  if (!lt->valid(lnk)) {
366  cerr << "RouterCore::checkTables: link "
367  << lnk << " in comtree " << comt
368  << " not in link table" << endl;
369  success = false;
370  continue;
371  }
372  fAdr_t dest = ctt->getDest(cLnk);
373  if (dest != 0 && !Forest::validUcastAdr(dest)) {
374  cerr << "RouterCore::checkTables: dest addr "
375  << "for " << lnk << " in comtree " << comt
376  << " is not valid" << endl;
377  success = false;
378  }
379  int qid = ctt->getLinkQ(cLnk);
380  if (qid == 0) {
381  cerr << "RouterCore::checkTables: queue id "
382  << "for " << lnk << " in comtree " << comt
383  << " is zero" << endl;
384  success = false;
385  }
386  }
387  if (!success) break;
388  set<int>& rtrLinks = ctt->getRtrLinks(ctx);
389  for (p = rtrLinks.begin(); p != rtrLinks.end(); p++) {
390  int cLnk = *p; int lnk = ctt->getLink(cLnk);
391  if (!ctt->isLink(ctx,lnk)) {
392  cerr << "RouterCore::checkTables: router link "
393  << lnk << " is not valid in comtree "
394  << comt << endl;
395  success = false;
396  }
397  if (lt->getPeerType(lnk) != Forest::ROUTER) {
398  cerr << "RouterCore::checkTables: router link "
399  << lnk << " in comtree " << comt
400  << " connects to non-router peer\n";
401  success = false;
402  }
403  }
404  set<int>& coreLinks = ctt->getCoreLinks(ctx);
405  for (p = coreLinks.begin(); p != coreLinks.end(); p++) {
406  int cLnk = *p; int lnk = ctt->getLink(cLnk);
407  if (!ctt->isRtrLink(ctx,lnk)) {
408  cerr << "RouterCore::checkTables: core link "
409  << lnk << " is not a router link "
410  << comt << endl;
411  success = false;
412  }
413  }
414  }
415  // come back later and add checks for route table
416  return success;
417 }
418 
425  bool success = true;
430  int iface;
431  for (iface = ift->firstIface(); iface != 0;
432  iface = ift->nextIface(iface)) {
433  RateSpec ifRates = ift->getRates(iface);
434  if (!minRates.leq(ifRates) || !ifRates.leq(maxRates)) {
435  cerr << "RouterCore::setAvailRates: interface rates "
436  "outside allowed range\n";
437  success = false;
438  }
439  ift->getAvailRates(iface) = ifRates;
440  }
441  if (!success) return false;
442  int lnk;
443  for (lnk = lt->firstLink(); lnk != 0; lnk = lt->nextLink(lnk)) {
444  RateSpec lnkRates = lt->getRates(lnk);
445  if (!minRates.leq(lnkRates) || !lnkRates.leq(maxRates)) {
446  cerr << "RouterCore::setAvailRates: link rates "
447  "outside allowed range\n";
448  success = false;
449  }
450  iface = lt->getIface(lnk);
451  RateSpec ifAvail = ift->getRates(iface);
452  if (!lnkRates.leq(ifAvail)) {
453  cerr << "RouterCore::setAvailRates: oversubscribing "
454  "interface " << iface << endl;
455  success = false;
456  }
457  ift->getAvailRates(iface).subtract(lnkRates);
458  lnkRates.scale(.9); // allocate at most 90% of link
459  lt->getAvailRates(lnk) = lnkRates;
460  sm->clearLnkStats(lnk);
461  }
462  if (!success) return false;
463  int ctx;
464  for (ctx = ctt->firstComtIndex(); ctx != 0;
465  ctx = ctt->nextComtIndex(ctx)) {
466  set<int>& comtLinks = ctt->getLinks(ctx);
467  set<int>::iterator p;
468  for (p = comtLinks.begin(); p != comtLinks.end(); p++) {
469  int cLnk = *p; int lnk = ctt->getLink(cLnk);
470  RateSpec comtRates = ctt->getRates(cLnk);
471  if (!comtRates.leq(lt->getAvailRates(lnk))) {
472  cerr << "RouterCore::setAvailRates: "
473  "oversubscribing link "
474  << lnk << endl;
475  success = false;
476  }
477  lt->getAvailRates(lnk).subtract(comtRates);
478  }
479  }
480  return success;
481 }
482 
487  for (int ctx = ctt->firstComtIndex(); ctx != 0;
488  ctx = ctt->nextComtIndex(ctx)) {
489  int comt = ctt->getComtree(ctx);
490  set<int>& comtLinks = ctt->getLinks(ctx);
491  set<int>::iterator p;
492  for (p = comtLinks.begin(); p != comtLinks.end(); p++) {
493  int cLnk = *p; int lnk = ctt->getLink(cLnk);
494  fAdr_t peerAdr = lt->getPeerAdr(lnk);
495  if (lt->getPeerType(lnk) == Forest::ROUTER &&
496  Forest::zipCode(peerAdr)
498  continue;
499  if (rt->getRteIndex(comt,peerAdr) != 0)
500  continue;
501  rt->addEntry(comt,peerAdr,cLnk);
502  }
503  }
504 }
505 
509 void RouterCore::dump(ostream& out) {
510  string s;
511  out << "Interface Table\n\n" << ift->toString(s) << endl;
512  out << "Link Table\n\n" << lt->toString(s) << endl;
513  out << "Comtree Table\n\n" << ctt->toString(s) << endl;
514  out << "Routing Table\n\n" << rt->toString(s) << endl;
515  out << "Statistics\n\n" << sm->toString(s) << endl;
516 }
517 
546 void RouterCore::run(uint64_t finishTime) {
547  now = Misc::getTimeNs();
548  if (booting) {
549  if (!iop->setupBootSock(bootIp,nmIp))
550  fatal("RouterCore:run: could not setup boot socket\n");
551  string s1;
552  cout << "sending boot request to " << Np4d::ip2string(nmIp,s1)
553  << endl;
554  CtlPkt cp(CtlPkt::BOOT_ROUTER,CtlPkt::REQUEST,0);
555  if (!sendCpReq(cp,nmAdr))
556  fatal("RouterCore::run: could not send boot request\n");
557  }
558  // create packet log to record a sample of packets handled
559 
560  uint64_t statsTime = 0; // time statistics were last processed
561  bool didNothing;
562  int controlCount = 20; // used to limit overhead of control
563  // packet processing
564  queue<int> ctlQ; // queue for control packets
565 
566  now = Misc::getTimeNs();
567  finishTime *= 1000000000; // convert from seconds to ns
568  while (finishTime == 0 || now < finishTime) {
569  didNothing = true;
570 
571  // input processing
572  pktx px = iop->receive();
573  if (px != 0) {
574  didNothing = false;
575  Packet& p = ps->getPacket(px);
576  int ptype = p.type;
577  pktLog->log(px,p.inLink,false,now);
578  int ctx = ctt->getComtIndex(p.comtree);
579  if (!pktCheck(px,ctx)) {
580  ps->free(px);
581  } else if (booting) {
582  handleCtlPkt(px);
583  } else if (ptype == Forest::CLIENT_DATA) {
584  forward(px,ctx);
585  } else if (ptype == Forest::SUB_UNSUB) {
586  subUnsub(px,ctx);
587  } else if (ptype == Forest::RTE_REPLY) {
588  handleRteReply(px,ctx);
589  } else if (ptype == Forest::CONNECT ||
590  ptype == Forest::DISCONNECT) {
591  handleConnDisc(px);
592  } else if (p.dstAdr != myAdr) {
593  forward(px,ctx);
594  } else {
595  ctlQ.push(px);
596  }
597  }
598 
599  // output processing
600  int lnk;
601  while ((px = qm->deq(lnk, now)) != 0) {
602  didNothing = false;
603  pktLog->log(px,lnk,true,now);
604  iop->send(px,lnk);
605  }
606 
607  // control packet processing
608  if (!ctlQ.empty() && (didNothing || --controlCount <= 0)) {
609  handleCtlPkt(ctlQ.front());
610  ctlQ.pop();
611  controlCount = 20; // do one control packet for
612  // every 20 iterations when busy
613  }
614 
615  // every 300 ms, update statistics and check for un-acked
616  // control packets
617  if (now - statsTime > 300000000) {
618  sm->record(now);
619  statsTime = now;
620  resendCpReq();
621  didNothing = false;
622  }
623 
624  // if did nothing on that pass, sleep for a millisecond.
625  if (didNothing) { usleep(1000); }
626 
627  // update current time
628  now = Misc::getTimeNs();
629  }
630 
631  // write out recorded events
632  pktLog->write(cout);
633  cout << endl;
634  cout << sm->iPktCnt(0) << " packets received, "
635  << sm->oPktCnt(0) << " packets sent\n";
636  cout << sm->iPktCnt(-1) << " from routers, "
637  << sm->oPktCnt(-1) << " to routers\n";
638  cout << sm->iPktCnt(-2) << " from clients, "
639  << sm->oPktCnt(-2) << " to clients\n";
640 }
641 
647 bool RouterCore::pktCheck(pktx px, int ctx) {
648  Packet& p = ps->getPacket(px);
649  // check version and length
650  if (p.version != Forest::FOREST_VERSION) {
651  return false;
652  }
653  if (p.length != p.bufferLen || p.length < Forest::HDR_LENG) {
654  return false;
655  }
656  if (booting) {
657  return p.tunIp == nmIp && p.type == Forest::NET_SIG
659  }
660  if (p.type == Forest::CONNECT || p.type == Forest::DISCONNECT)
661  return (p.length == Forest::OVERHEAD+8);
662 
663  if (!ctt->validComtIndex(ctx)) {
664  return false;
665  }
666  fAdr_t adr = p.dstAdr;
667  if (!Forest::validUcastAdr(adr) && !Forest::mcastAdr(adr)) {
668  return false;
669  }
670 
671  int inLink = p.inLink;
672  if (inLink == 0) return false;
673  int cLnk = ctt->getComtLink(ctt->getComtree(ctx),inLink);
674  if (cLnk == 0) {
675  return false;
676  }
677 
678  // extra checks for packets from untrusted peers
679  if (lt->getPeerType(inLink) < Forest::TRUSTED) {
680  // check for spoofed source address
681  if (lt->getPeerAdr(inLink) != p.srcAdr) return false;
682  // and that destination restrictions are respected
683  fAdr_t dest = ctt->getDest(cLnk);
684  if (dest!=0 && p.dstAdr != dest && p.dstAdr != myAdr)
685  return false;
686  // verify that type is valid
687  Forest::ptyp_t ptype = p.type;
688  if (ptype != Forest::CLIENT_DATA &&
689  ptype != Forest::CONNECT && ptype != Forest::DISCONNECT &&
690  ptype != Forest::SUB_UNSUB && ptype != Forest::CLIENT_SIG)
691  return false;
692  int comt = ctt->getComtree(ctx);
693  if ((ptype == Forest::CONNECT || ptype == Forest::DISCONNECT) &&
694  comt != (int) Forest::CONNECT_COMT)
695  return false;
696  if (ptype == Forest::CLIENT_SIG &&
697  comt != (int) Forest::CLIENT_SIG_COMT)
698  return false;
699  }
700  return true;
701 }
702 
712 void RouterCore::forward(pktx px, int ctx) {
713  Packet& p = ps->getPacket(px);
714  int rtx = rt->getRteIndex(p.comtree,p.dstAdr);
715  if (rtx != 0) { // valid route case
716  // reply to route request
717  if ((p.flags & Forest::RTE_REQ)) {
718  sendRteReply(px,ctx);
719  p.flags = (p.flags & (~Forest::RTE_REQ));
720  p.pack();
721  p.hdrErrUpdate();
722  }
723  if (Forest::validUcastAdr(p.dstAdr)) {
724  int rcLnk = rt->getLink(rtx);
725  int lnk = ctt->getLink(rcLnk);
726  int qid = ctt->getLinkQ(rcLnk);
727  if (lnk == p.inLink || !qm->enq(px,qid,now)) {
728  ps->free(px);
729  }
730  return;
731  }
732  // multicast data packet
733  multiSend(px,ctx,rtx);
734  return;
735  }
736  // no valid route
737 // think about suppressing flooding, if address is in range for this router
738  if (Forest::validUcastAdr(p.dstAdr)) {
739  // send to neighboring routers in comtree
741  p.pack(); p.hdrErrUpdate();
742  }
743  multiSend(px,ctx,rtx);
744  return;
745 }
756 void RouterCore::multiSend(pktx px, int ctx, int rtx) {
757  int qvec[nLnks]; int n = 0;
758  Packet& p = ps->getPacket(px);
759 
760  int inLink = p.inLink;
761  if (Forest::validUcastAdr(p.dstAdr)) {
762  // flooding a unicast packet to neighboring routers
763  int myZip = Forest::zipCode(myAdr);
764  int pZip = Forest::zipCode(p.dstAdr);
765  set<int>& rtrLinks = ctt->getRtrLinks(ctx);
766  set<int>::iterator lp;
767  for (lp = rtrLinks.begin(); lp != rtrLinks.end(); lp++) {
768  int rcLnk = *lp; int lnk = ctt->getLink(rcLnk);
769  int peerZip = Forest::zipCode(lt->getPeerAdr(lnk));
770  if (pZip == myZip && peerZip != myZip) continue;
771  if (lnk == inLink) continue;
772  qvec[n++] = ctt->getLinkQ(rcLnk);
773  }
774  } else {
775  // forwarding a multicast packet
776  // first identify neighboring core routers to get copies
777  int pLink = ctt->getPlink(ctx);
778  set<int>& coreLinks = ctt->getCoreLinks(ctx);
779  set<int>::iterator lp;
780  for (lp = coreLinks.begin(); lp != coreLinks.end(); lp++) {
781  int rcLnk = *lp; int lnk = ctt->getLink(rcLnk);
782  if (lnk == inLink || lnk == pLink) continue;
783  qvec[n++] = ctt->getLinkQ(rcLnk);
784  }
785  // now copy for parent
786  if (pLink != 0 && pLink != inLink) {
787  qvec[n++] = ctt->getLinkQ(ctt->getPCLink(ctx));
788  }
789  // now, copies for subscribers if any
790  if (rtx != 0) {
791  set<int>& subLinks = rt->getSubLinks(rtx);
792  for (lp = subLinks.begin(); lp !=subLinks.end(); lp++) {
793  int rcLnk = *lp; int lnk = ctt->getLink(rcLnk);
794  if (lnk == inLink) continue;
795  qvec[n++] = ctt->getLinkQ(rcLnk);
796  }
797  }
798  }
799 
800  if (n == 0) { ps->free(px); return; }
801 
802  // make copies and queue them
803  pktx px1 = px;
804  for (int i = 0; i < n-1; i++) { // process first n-1 copies
805  if (qm->enq(px1,qvec[i],now)) {
806  px1 = ps->clone(px);
807  }
808  }
809  // process last copy
810  if (!qm->enq(px1,qvec[n-1],now)) {
811  ps->free(px1);
812  }
813 }
814 
821 void RouterCore::sendRteReply(pktx px, int ctx) {
822  Packet& p = ps->getPacket(px);
823 
824  pktx px1 = ps->alloc();
825  Packet& p1 = ps->getPacket(px1);
826  p1.length = Forest::HDR_LENG + 8;
827  p1.type = Forest::RTE_REPLY;
828  p1.flags = 0;
829  p1.comtree = p.comtree;
830  p1.srcAdr = myAdr;
831  p1.dstAdr = p.srcAdr;
832 
833  p1.pack();
834  (p1.payload())[0] = htonl(p.dstAdr);
835  p1.hdrErrUpdate(); p.payErrUpdate();
836 
837  int cLnk = ctt->getComtLink(ctt->getComtree(ctx),p.inLink);
838  qm->enq(px1,ctt->getLinkQ(cLnk),now);
839 }
840 
849 void RouterCore::handleRteReply(pktx px, int ctx) {
850  Packet& p = ps->getPacket(px);
851  int rtx = rt->getRteIndex(p.comtree, p.dstAdr);
852  int cLnk = ctt->getComtLink(ctt->getComtree(ctx),p.inLink);
853  if ((p.flags & Forest::RTE_REQ) && rtx != 0)
854  sendRteReply(px,ctx);
855  int adr = ntohl((p.payload())[0]);
856  if (Forest::validUcastAdr(adr) &&
857  rt->getRteIndex(p.comtree,adr) == 0) {
858  rt->addEntry(p.comtree,adr,cLnk);
859  }
860  if (rtx == 0) {
861  // send to neighboring routers in comtree
863  p.pack(); p.hdrErrUpdate();
864  multiSend(px,ctx,rtx);
865  return;
866  }
867  int dcLnk = rt->getLink(rtx); int dLnk = ctt->getLink(dcLnk);
868  if (lt->getPeerType(dLnk) != Forest::ROUTER || !qm->enq(px,dLnk,now))
869  ps->free(px);
870  return;
871 }
872 
880 void RouterCore::subUnsub(pktx px, int ctx) {
881  Packet& p = ps->getPacket(px);
882  uint32_t *pp = p.payload();
883 
884  // add/remove branches from routes
885  // if non-core node, also propagate requests upward as
886  // appropriate
887  int comt = ctt->getComtree(ctx);
888  int inLink = p.inLink;
889  int cLnk = ctt->getComtLink(comt,inLink);
890  // ignore subscriptions from the parent or core neighbors
891  if (inLink == ctt->getPlink(ctx) || ctt->isCoreLink(cLnk)) {
892  ps->free(px); return;
893  }
894  bool propagate = false;
895  int rtx; fAdr_t addr;
896 
897  // add subscriptions
898  int addcnt = ntohl(pp[0]);
899  if (addcnt < 0 || addcnt > 350 ||
900  Forest::OVERHEAD + (addcnt + 2)*4 > p.length) {
901  ps->free(px); return;
902  }
903  for (int i = 1; i <= addcnt; i++) {
904  addr = ntohl(pp[i]);
905  if (!Forest::mcastAdr(addr)) continue; // ignore unicast or 0
906  rtx = rt->getRteIndex(comt,addr);
907  if (rtx == 0) {
908  rtx = rt->addEntry(comt,addr,cLnk);
909  propagate = true;
910  } else if (!rt->isLink(rtx,cLnk)) {
911  rt->addLink(rtx,cLnk);
912  pp[i] = 0; // so, parent will ignore
913  }
914  }
915  // remove subscriptions
916  int dropcnt = ntohl(pp[addcnt+1]);
917  if (dropcnt < 0 || addcnt + dropcnt > 350 ||
918  Forest::OVERHEAD + (addcnt + dropcnt + 2)*4 > p.length) {
919  ps->free(px); return;
920  }
921  for (int i = addcnt + 2; i <= addcnt + dropcnt + 1; i++) {
922  addr = ntohl(pp[i]);
923  if (!Forest::mcastAdr(addr)) continue; // ignore unicast or 0
924  rtx = rt->getRteIndex(comt,addr);
925  if (rtx == 0) continue;
926  rt->removeLink(rtx,cLnk);
927  if (rt->noLinks(rtx)) {
928  rt->removeEntry(rtx);
929  propagate = true;
930  } else {
931  pp[i] = 0;
932  }
933  }
934  // propagate subscription packet to parent if not a core node
935  if (propagate && !ctt->inCore(ctx) && ctt->getPlink(ctx) != 0) {
936  p.payErrUpdate();
937  int qid = ctt->getLinkQ(ctt->getPCLink(ctx));
938  if (qm->enq(px,qid,now)) {
939  return;
940  }
941  }
942  ps->free(px); return;
943 }
944 
948 void RouterCore::handleConnDisc(pktx px) {
949  Packet& p = ps->getPacket(px);
950  int inLnk = p.inLink;
951 
952  if (p.srcAdr != lt->getPeerAdr(inLnk) ||
953  p.length != Forest::OVERHEAD + 8) {
954  ps->free(px); return;
955  }
956  uint64_t nonce = ntohl(p.payload()[0]);
957  nonce <<= 32;
958  nonce |= ntohl(p.payload()[1]);
959  if (nonce != lt->getNonce(inLnk)) { ps->free(px); return; }
960  if ((p.flags & Forest::ACK_FLAG) != 0) {
961  uint64_t x = 1; x <<= 63; x |= nonce;
962  pending->erase(x); ps->free(px); return;
963  }
964  if (p.type == Forest::CONNECT) {
965  if (lt->isConnected(inLnk) && !lt->revertEntry(inLnk)) {
966  ps->free(px); return;
967  }
968  if (!lt->remapEntry(inLnk,p.tunIp,p.tunPort)) {
969  ps->free(px); return;
970  }
971  lt->setConnectStatus(inLnk,true);
972  if (nmAdr != 0 && lt->getPeerType(inLnk) == Forest::CLIENT) {
973  CtlPkt cp(CtlPkt::CLIENT_CONNECT,CtlPkt::REQUEST,0);
974  cp.adr1 = p.srcAdr; cp.adr2 = myAdr;
975  sendCpReq(cp,nmAdr);
976  }
977  } else if (p.type == Forest::DISCONNECT) {
978  lt->setConnectStatus(inLnk,false);
979  lt->revertEntry(inLnk);
980  if (nmAdr != 0 && lt->getPeerType(inLnk) == Forest::CLIENT) {
981  dropLink(inLnk);
982  CtlPkt cp(CtlPkt::CLIENT_DISCONNECT,CtlPkt::REQUEST,0);
983  cp.adr1 = p.srcAdr; cp.adr2 = myAdr;
984  sendCpReq(cp,nmAdr);
985  }
986  }
987  // send ack back to sender
988  p.flags |= Forest::ACK_FLAG; p.dstAdr = p.srcAdr; p.srcAdr = myAdr;
989  p.pack();
990  iop->send(px,inLnk);
991  return;
992 }
993 
998 void RouterCore::handleCtlPkt(int px) {
999  Packet& p = ps->getPacket(px);
1000  CtlPkt cp(p.payload(), p.length - Packet::OVERHEAD);
1001 
1002  if (p.type != Forest::NET_SIG || p.comtree != Forest::NET_SIG_COMT) {
1003  ps->free(px); return;
1004  }
1005  if (!cp.unpack()) {
1006  string s;
1007  cerr << "RouterCore::handleCtlPkt: misformatted control "
1008  " packet\n" << p.toString(s);
1009  cp.reset(cp.type,CtlPkt::NEG_REPLY,cp.seqNum);
1010  cp.mode = CtlPkt::NEG_REPLY;
1011  cp.errMsg = "misformatted control packet";
1012  returnToSender(px,cp);
1013  return;
1014  }
1015  if (cp.mode != CtlPkt::REQUEST) {
1016  handleCpReply(px,cp); return;
1017  }
1018 
1019  // Prepare positive reply packet for use where appropriate
1020  CtlPkt reply(cp.type,CtlPkt::POS_REPLY,cp.seqNum);
1021 
1022  switch (cp.type) {
1023 
1024  // configuring logical interfaces
1025  case CtlPkt::ADD_IFACE: addIface(cp,reply); break;
1026  case CtlPkt::DROP_IFACE: dropIface(cp,reply); break;
1027  case CtlPkt::GET_IFACE: getIface(cp,reply); break;
1028  case CtlPkt::MOD_IFACE: modIface(cp,reply); break;
1029 
1030  // configuring links
1031  case CtlPkt::ADD_LINK: addLink(cp,reply); break;
1032  case CtlPkt::DROP_LINK: dropLink(cp,reply); break;
1033  case CtlPkt::GET_LINK: getLink(cp,reply); break;
1034  case CtlPkt::MOD_LINK: modLink(cp,reply); break;
1035 
1036  // configuring comtrees
1037  case CtlPkt::ADD_COMTREE: addComtree(cp,reply); break;
1038  case CtlPkt::DROP_COMTREE: dropComtree(cp,reply); break;
1039  case CtlPkt::GET_COMTREE: getComtree(cp,reply); break;
1040  case CtlPkt::MOD_COMTREE: modComtree(cp,reply); break;
1041  case CtlPkt::ADD_COMTREE_LINK: addComtreeLink(cp,reply); break;
1042  case CtlPkt::DROP_COMTREE_LINK: dropComtreeLink(cp,reply); break;
1043  case CtlPkt::GET_COMTREE_LINK: getComtreeLink(cp,reply); break;
1044  case CtlPkt::MOD_COMTREE_LINK: modComtreeLink(cp,reply); break;
1045 
1046  // configuring routes
1047  case CtlPkt::ADD_ROUTE: addRoute(cp,reply); break;
1048  case CtlPkt::DROP_ROUTE: dropRoute(cp,reply); break;
1049  case CtlPkt::GET_ROUTE: getRoute(cp,reply); break;
1050  case CtlPkt::MOD_ROUTE: modRoute(cp,reply); break;
1051 
1052  // setting parameters
1053  case CtlPkt::SET_LEAF_RANGE: setLeafRange(cp,reply); break;
1054  //feng get link table info
1055  case CtlPkt::GET_LINK_SET: getLinkSet(cp,reply); break;
1056  default:
1057  cerr << "unrecognized control packet type " << cp.type
1058  << endl;
1059  reply.errMsg = "invalid control packet for router";
1060  reply.mode = CtlPkt::NEG_REPLY;
1061  break;
1062  }
1063 
1064  returnToSender(px,reply);
1065 
1066  return;
1067 }
1068 
1069 
1070 
1082 bool RouterCore::addIface(CtlPkt& cp, CtlPkt& reply) {
1083  int iface = cp.iface;
1084  RateSpec rs(max(min(cp.rspec1.bitRateUp, Forest::MAXBITRATE),
1086  max(min(cp.rspec1.bitRateDown,Forest::MAXBITRATE),
1088  max(min(cp.rspec1.pktRateUp, Forest::MAXPKTRATE),
1090  max(min(cp.rspec1.pktRateDown,Forest::MAXPKTRATE),
1092  if (ift->valid(iface)) {
1093  if (cp.iface != ift->getIpAdr(iface) ||
1094  !rs.equals(ift->getRates(iface))) {
1095  reply.errMsg = "add iface: requested interface "
1096  "conflicts with existing interface";
1097  reply.mode = CtlPkt::NEG_REPLY;
1098  return false;
1099  }
1100  // means reply to earlier add iface was lost
1101  reply.ip1 = ift->getIpAdr(iface);
1102  reply.port1 = ift->getPort(iface);
1103  return true;
1104  } else if (!ift->addEntry(iface, cp.ip1, 0, rs)) {
1105  reply.errMsg = "add iface: cannot add interface";
1106  reply.mode = CtlPkt::NEG_REPLY;
1107  return false;
1108  } else if (!iop->setup(iface)) {
1109  reply.errMsg = "add iface: could not setup interface";
1110  reply.mode = CtlPkt::NEG_REPLY;
1111  return false;
1112  }
1113  reply.ip1 = ift->getIpAdr(iface);
1114  reply.port1 = ift->getPort(iface);
1115  return true;
1116 }
1117 
1118 bool RouterCore::dropIface(CtlPkt& cp, CtlPkt& reply) {
1119  ift->removeEntry(cp.iface);
1120  return true;
1121 }
1122 
1123 bool RouterCore::getIface(CtlPkt& cp, CtlPkt& reply) {
1124  int iface = cp.iface;
1125  if (ift->valid(iface)) {
1126  reply.iface = iface;
1127  reply.ip1 = ift->getIpAdr(iface);
1128  reply.port1 = ift->getPort(iface);
1129  reply.rspec1 = ift->getRates(iface);
1130  reply.rspec2 = ift->getAvailRates(iface);
1131  return true;
1132  }
1133  reply.errMsg = "get iface: invalid interface";
1134  reply.mode = CtlPkt::NEG_REPLY;
1135  return false;
1136 }
1137 
1138 bool RouterCore::modIface(CtlPkt& cp, CtlPkt& reply) {
1139  int iface = cp.iface;
1140  if (ift->valid(iface)) {
1141  ift->getRates(iface) = cp.rspec1;
1142  return true;
1143  }
1144  reply.errMsg = "mod iface: invalid interface";
1145  reply.mode = CtlPkt::NEG_REPLY;
1146  return false;
1147 }
1148 
1149 bool RouterCore::addLink(CtlPkt& cp, CtlPkt& reply) {
1150  Forest::ntyp_t peerType = cp.nodeType;
1151  if (peerType == Forest::ROUTER && cp.adr1 == 0) {
1152  reply.errMsg = "add link: adding link to router, but no peer "
1153  "address supplied";
1154  reply.mode = CtlPkt::NEG_REPLY;
1155  return false;
1156  }
1157  int iface = cp.iface;
1158 
1159  int xlnk = lt->lookup(cp.ip1,cp.port1);
1160  if (xlnk != 0 || (cp.link != 0 && lt->valid(cp.link))) {
1161  if (cp.link != xlnk ||
1162  (peerType != lt->getPeerType(xlnk)) ||
1163  (cp.iface != lt->getIface(xlnk)) ||
1164  (cp.adr1 != 0 && cp.adr1 != lt->getPeerAdr(xlnk)) ||
1165  (cp.ip1 != 0 && cp.ip1 != ift->getIpAdr(iface)) ||
1166  (cp.port1 != 0 && cp.port1 != ift->getPort(iface))) {
1167  reply.errMsg = "add link: new link conflicts "
1168  "with existing link";
1169  reply.mode = CtlPkt::NEG_REPLY;
1170  return false;
1171  }
1172  // assume response to previous add link was lost
1173  reply.link = xlnk;
1174  reply.adr1 = lt->getPeerAdr(xlnk);
1175  reply.ip1 = lt->getPeerIpAdr(xlnk);
1176  return true;
1177  }
1178 
1179  // first ensure that the interface has enough
1180  // capacity to support a new link of minimum capacity
1183  RateSpec& availRates = ift->getAvailRates(iface);
1184  if (!rs.leq(availRates)) {
1185  reply.errMsg = "add link: requested link "
1186  "exceeds interface capacity";
1187  reply.mode = CtlPkt::NEG_REPLY;
1188  return false;
1189  }
1190 
1191  // setting up link
1192  // case 1: adding link to leaf; peer (ip,port) not known, use nonce
1193  // subcase - lnk, peer forest address is known
1194  // (for preconfigured leaf)
1195  // subcase - lnk, peer address to be assigned by router
1196  // case 2: adding link to router not yet up; port not known, use nonce
1197  // lnk, peer address specified
1198  // case 3: adding link to a router that is already up
1199  // lnk, peer address specified, peer (ip,port) specified
1200 
1201  // add table entry with (ip,port) or nonce
1202  // note: when lt->addEntry succeeds, link rates are
1203  // initialized to Forest minimum rates
1204  int lnk = lt->addEntry(cp.link,cp.ip1,cp.port1,cp.nonce);
1205  if (lnk == 0) {
1206  reply.errMsg = "add link: cannot add requested link";
1207  reply.mode = CtlPkt::NEG_REPLY;
1208  return false;
1209  }
1210 
1211  if (peerType == Forest::ROUTER) {
1212  lt->setPeerAdr(lnk,cp.adr1);
1213  } else { // case 1
1214  fAdr_t peerAdr = 0;
1215  if (cp.adr1 == 0) peerAdr = allocLeafAdr();
1216  else if (allocLeafAdr(cp.adr1)) peerAdr = cp.adr1;
1217  if (peerAdr == 0) {
1218  lt->removeEntry(lnk);
1219  reply.errMsg = "add link: cannot add link using "
1220  "specified address";
1221  reply.mode = CtlPkt::NEG_REPLY;
1222  return false;
1223  }
1224  lt->setPeerAdr(lnk,peerAdr);
1225  }
1226 
1227  availRates.subtract(rs);
1228  lt->setIface(lnk,iface);
1229  lt->setPeerType(lnk,peerType);
1230  lt->setConnectStatus(lnk,false);
1231  sm->clearLnkStats(lnk);
1232  if (peerType == Forest::ROUTER && cp.ip1 != 0 && cp.port1 != 0) {
1233  // link to a router that's already up, so connect
1235  }
1236 
1237  reply.link = lnk;
1238  reply.adr1 = lt->getPeerAdr(lnk);
1239  return true;
1240 }
1241 
1242 bool RouterCore::dropLink(CtlPkt& cp, CtlPkt& reply) {
1243  dropLink(cp.link, cp.adr1);
1244  return true;
1245 }
1246 
1255 void RouterCore::dropLink(int lnk, fAdr_t peerAdr) {
1256  if (lnk == 0) lnk = lt->lookup(peerAdr);
1257  int *comtVec = new int[lt->getComtSet(lnk).size()];
1258  int i = 0;
1259  for (int comt : lt->getComtSet(lnk)) comtVec[i++] = comt;
1260  while (--i >= 0) {
1261  int ctx = comtVec[i];
1262  int cLnk = ctt->getComtLink(ctt->getComtree(ctx),lnk);
1263  dropComtreeLink(ctx,lnk,cLnk);
1264  }
1265  int iface = lt->getIface(lnk);
1266  ift->getAvailRates(iface).add(lt->getRates(lnk));
1267  lt->removeEntry(lnk);
1268  freeLeafAdr(lt->getPeerAdr(lnk));
1269 }
1270 
1271 bool RouterCore::getLink(CtlPkt& cp, CtlPkt& reply) {
1272  int link = cp.link;
1273  if (lt->valid(link)) {
1274  reply.link = link;
1275  reply.iface = lt->getIface(link);
1276  reply.ip1 = lt->getPeerIpAdr(link);
1277  reply.nodeType = lt->getPeerType(link);
1278  reply.port1 = lt->getPeerPort(link);
1279  reply.adr1 = lt->getPeerAdr(link);
1280  reply.rspec1 = lt->getRates(link);
1281  reply.rspec2 = lt->getAvailRates(link);
1282  return true;
1283  }
1284  reply.errMsg = "get link: invalid link number";
1285  reply.mode = CtlPkt::NEG_REPLY;
1286  return false;
1287 }
1288 
1289 bool RouterCore::modLink(CtlPkt& cp, CtlPkt& reply) {
1290  int link = cp.link;
1291  if (!lt->valid(link)) {
1292  reply.errMsg = "get link: invalid link number";
1293  reply.mode = CtlPkt::NEG_REPLY;
1294  return false;
1295  }
1296  reply.link = link;
1297  int iface = lt->getIface(link);
1298  if (cp.rspec1.isSet()) {
1299  RateSpec& linkRates = lt->getRates(link);
1300  RateSpec& ifAvail = ift->getAvailRates(iface);
1301  RateSpec delta = cp.rspec1;
1302  delta.subtract(linkRates);
1303  if (!delta.leq(ifAvail)) {
1304  string s;
1305  reply.errMsg = "mod link: request "
1306  + cp.rspec1.toString(s) +
1307  "exceeds interface capacity";
1308  reply.mode = CtlPkt::NEG_REPLY;
1309  return false;
1310  }
1311  ifAvail.subtract(delta);
1312  linkRates = cp.rspec1;
1313  qm->setLinkRates(link,cp.rspec1);
1314  cp.rspec1.scale(.9); // allocate at most 90% of link
1315  lt->getAvailRates(link) = cp.rspec1;
1316  }
1317  return true;
1318 }
1319 
1320 bool RouterCore::addComtree(CtlPkt& cp, CtlPkt& reply) {
1321  int comt = cp.comtree;
1322  if(ctt->validComtree(comt) || ctt->addEntry(comt) != 0)
1323  return true;
1324  reply.errMsg = "add comtree: cannot add comtree";
1325  reply.mode = CtlPkt::NEG_REPLY;
1326  return false;
1327 }
1328 
1329 bool RouterCore::dropComtree(CtlPkt& cp, CtlPkt& reply) {
1330  int comt = cp.comtree;
1331  int ctx = ctt->getComtIndex(comt);
1332  if (!ctt->validComtIndex(ctx))
1333  return true; // so dropComtree op is idempotent
1334 
1335  // remove all routes involving this comtree
1336  // also degisters each route in the comtree table
1337  rt->purgeRoutes(comt);
1338 
1339  // remove all the comtree links
1340  // first, copy out the comtree links, then remove them
1341  // two step process is needed because dropComtreeLink modifies linkSet
1342  set<int>& linkSet = ctt->getLinks(ctx);
1343  set<int>::iterator pp;
1344  int *clnks = new int[linkSet.size()]; int i = 0;
1345  for (pp = linkSet.begin(); pp != linkSet.end(); pp++) clnks[i++] = *pp;
1346  while (--i >= 0) {
1347  dropComtreeLink(ctx,ctt->getLink(clnks[i]),clnks[i]);
1348  }
1349  delete [] clnks;
1350 
1351  ctt->removeEntry(ctx); // and finally drop entry in comtree table
1352  return true;
1353 }
1354 
1355 bool RouterCore::getComtree(CtlPkt& cp, CtlPkt& reply) {
1356  comt_t comt = cp.comtree;
1357  int ctx = ctt->getComtIndex(comt);
1358  if (ctx == 0) {
1359  reply.errMsg = "get comtree: invalid comtree";
1360  reply.mode = CtlPkt::NEG_REPLY;
1361  return false;
1362  }
1363  reply.comtree = comt;
1364  reply.coreFlag = ctt->inCore(ctx);
1365  reply.link = ctt->getPlink(ctx);
1366  reply.count = ctt->getLinkCount(ctx);
1367  return true;
1368 }
1369 
1370 bool RouterCore::modComtree(CtlPkt& cp, CtlPkt& reply) {
1371  comt_t comt = cp.comtree;
1372  int ctx = ctt->getComtIndex(comt);
1373  if (ctx != 0) {
1374  if (cp.coreFlag >= 0)
1375  ctt->setCoreFlag(ctx,cp.coreFlag);
1376  if (cp.link != 0) {
1377  int plnk = cp.link;
1378  if (plnk != 0 && !ctt->isLink(ctx,plnk)) {
1379  reply.errMsg = "specified link does "
1380  "not belong to comtree";
1381  reply.mode = CtlPkt::NEG_REPLY;
1382  return false;
1383  }
1384  if (plnk != 0 && !ctt->isRtrLink(ctx,plnk)) {
1385  reply.errMsg = "specified link does "
1386  "not connect to a router";
1387  reply.mode = CtlPkt::NEG_REPLY;
1388  return false;
1389  }
1390  ctt->setPlink(ctx,plnk);
1391  }
1392  return true;
1393  }
1394  reply.errMsg = "modify comtree: invalid comtree";
1395  reply.mode = CtlPkt::NEG_REPLY;
1396  return false;
1397 }
1398 
1399 bool RouterCore::addComtreeLink(CtlPkt& cp, CtlPkt& reply) {
1400  comt_t comt = cp.comtree;
1401  int ctx = ctt->getComtIndex(comt);
1402  if (ctx == 0) {
1403  reply.errMsg = "add comtree link: invalid comtree";
1404  reply.mode = CtlPkt::NEG_REPLY;
1405  return false;
1406  }
1407  int lnk = 0;
1408  if (cp.link != 0) {
1409  lnk = cp.link;
1410  } else if (cp.ip1 != 0 && cp.port1 != 0) {
1411  lnk = lt->lookup(cp.ip1, cp.port1);
1412  } else if (cp.adr1 != 0) {
1413  lnk = lt->lookup(cp.adr1);
1414  }
1415  if (!lt->valid(lnk)) {
1416  reply.errMsg = "add comtree link: invalid link or "
1417  "peer IP and port";
1418  reply.mode = CtlPkt::NEG_REPLY;
1419  return false;
1420  }
1421  bool isRtr = (lt->getPeerType(lnk) == Forest::ROUTER);
1422  bool isCore = false;
1423  if (isRtr) {
1424  if (cp.coreFlag < 0) {
1425  reply.errMsg = "add comtree link: must specify "
1426  "core flag on links to routers";
1427  reply.mode = CtlPkt::NEG_REPLY;
1428  return false;
1429  }
1430  isCore = cp.coreFlag;
1431  }
1432  int cLnk = ctt->getComtLink(comt,lnk);
1433  if (cLnk != 0) {
1434  if (ctt->isRtrLink(cLnk) == isRtr &&
1435  ctt->isCoreLink(cLnk) == isCore) {
1436  reply.link = lnk;
1437  return true;
1438  } else {
1439  reply.errMsg = "add comtree link: specified "
1440  "link already in comtree";
1441  reply.mode = CtlPkt::NEG_REPLY;
1442  return false;
1443  }
1444  }
1445  // define new comtree link
1446  if (!ctt->addLink(ctx,lnk,isRtr,isCore)) {
1447  reply.errMsg = "add comtree link: cannot add "
1448  "requested comtree link";
1449  reply.mode = CtlPkt::NEG_REPLY;
1450  return false;
1451  }
1452  cLnk = ctt->getComtLink(comt,lnk);
1453 
1454  // add unicast route to cLnk if peer is a leaf or a router
1455  // in a different zip code
1456  fAdr_t peerAdr = lt->getPeerAdr(lnk);
1457  if (lt->getPeerType(lnk) != Forest::ROUTER) {
1458  int rtx = rt->getRteIndex(comt,peerAdr);
1459  if (rtx == 0) rt->addEntry(comt,peerAdr,cLnk);
1460  } else {
1461  int zipPeer = Forest::zipCode(peerAdr);
1462  if (zipPeer != Forest::zipCode(myAdr)) {
1463  fAdr_t dest = Forest::forestAdr(zipPeer,0);
1464  int rtx = rt->getRteIndex(comt,dest);
1465  if (rtx == 0) rt->addEntry(comt,dest,cLnk);
1466  }
1467  }
1468 
1469  // allocate queue and bind it to lnk and comtree link
1470  int qid = qm->allocQ(lnk);
1471  if (qid == 0) {
1472  ctt->removeLink(ctx,cLnk);
1473  reply.errMsg = "add comtree link: no queues "
1474  "available for link";
1475  reply.mode = CtlPkt::NEG_REPLY;
1476  return false;
1477  }
1478  ctt->setLinkQ(cLnk,qid);
1479 
1480  // adjust rates for link comtree and queue
1483  if (!minRates.leq(lt->getAvailRates(lnk))) {
1484  reply.errMsg = "add comtree link: request "
1485  "exceeds link capacity";
1486  reply.mode = CtlPkt::NEG_REPLY;
1487  return false;
1488  }
1489  lt->getAvailRates(lnk).subtract(minRates);
1490  ctt->getRates(cLnk) = minRates;
1491 
1492  qm->setQRates(qid,minRates);
1493  if (isRtr) qm->setQLimits(qid,500,1000000);
1494  else qm->setQLimits(qid,500,1000000);
1495  sm->clearQuStats(qid);
1496  reply.link = lnk;
1497  return true;
1498 }
1499 
1500 bool RouterCore::dropComtreeLink(CtlPkt& cp, CtlPkt& reply) {
1501  comt_t comt = cp.comtree;
1502  int ctx = ctt->getComtIndex(comt);
1503  if (ctx == 0) {
1504  reply.errMsg = "drop comtree link: invalid comtree";
1505  reply.mode = CtlPkt::NEG_REPLY;
1506  return false;
1507  }
1508  int lnk = 0;
1509  if (cp.link != 0) {
1510  lnk = cp.link;
1511  } else if (cp.ip1 != 0 && cp.port1 != 0) {
1512  lnk = lt->lookup(cp.ip1, cp.port1);
1513  } else if (cp.adr1 != 0) {
1514  lnk = lt->lookup(cp.adr1);
1515  }
1516  if (!lt->valid(lnk)) {
1517  reply.errMsg = "drop comtree link: invalid link "
1518  "or peer IP and port";
1519  reply.mode = CtlPkt::NEG_REPLY;
1520  return false;
1521  }
1522  int cLnk = ctt->getComtLink(comt,lnk);
1523  if (cLnk != 0) {
1524  dropComtreeLink(ctx,lnk,cLnk);
1525  }
1526  return true;
1527 }
1528 
1529 void RouterCore::dropComtreeLink(int ctx, int lnk, int cLnk) {
1530  // release the link bandwidth used by comtree link
1531  lt->getAvailRates(lnk).add(ctt->getRates(cLnk));
1532 
1533  // remove unicast route for this comtree
1534  fAdr_t peerAdr = lt->getPeerAdr(lnk);
1535  int comt = ctt->getComtree(ctx);
1536  if (lt->getPeerType(lnk) != Forest::ROUTER) {
1537  int rtx = rt->getRteIndex(comt,peerAdr);
1538  if (rtx != 0) rt->removeEntry(rtx);
1539  } else {
1540  int zipPeer = Forest::zipCode(peerAdr);
1541  if (zipPeer != Forest::zipCode(myAdr)) {
1542  fAdr_t dest = Forest::forestAdr(zipPeer,0);
1543  int rtx = rt->getRteIndex(comt,dest);
1544  if (rtx != 0) rt->removeEntry(rtx);
1545  }
1546  }
1547  // remove cLnk from multicast routes for this comtree
1548  // must first copy out the routes that are registered for
1549  // cLnk, then remove cLnk from all these routes;
1550  // two step process is needed because the removal of the link from
1551  // the route, also deregisters the route in the comtree table
1552  // causing the rteSet to change
1553  set<int>& rteSet = ctt->getRteSet(cLnk);
1554  set<int>::iterator rp;
1555  int *routes = new int[rteSet.size()];
1556  int i = 0;
1557  for (rp = rteSet.begin(); rp != rteSet.end(); rp++) routes[i++] = *rp;
1558  while (--i >= 0) rt->removeLink(routes[i],cLnk);
1559  delete [] routes;
1560 
1561  // release queue and remove link from comtree
1562  // (which also deregisters comtree with lnk)
1563  int qid = ctt->getLinkQ(cLnk);
1564  qm->freeQ(qid);
1565  if (!ctt->removeLink(ctx,cLnk)) {
1566  cerr << "dropComtreeLink: internal error detected "
1567  "final removeLink failed\n";
1568  }
1569 }
1570 
1571 bool RouterCore::modComtreeLink(CtlPkt& cp, CtlPkt& reply) {
1572  comt_t comt = cp.comtree;
1573  int ctx = ctt->getComtIndex(comt);
1574  if (ctx == 0) {
1575  reply.errMsg = "modify comtree link: invalid comtree";
1576  reply.mode = CtlPkt::NEG_REPLY;
1577  return false;
1578  }
1579  int lnk = cp.link;
1580  if (!lt->valid(lnk)) {
1581  reply.errMsg = "modify comtree link: invalid link number";
1582  reply.mode = CtlPkt::NEG_REPLY;
1583  return false;
1584  }
1585  int cLnk = ctt->getComtLink(comt,lnk);
1586  if (cLnk == 0) {
1587  reply.errMsg = "modify comtree link: specified link "
1588  "not defined in specified comtree";
1589  reply.mode = CtlPkt::NEG_REPLY;
1590  return false;
1591  }
1592 
1593  RateSpec rs = cp.rspec1;
1594  if (!rs.isSet()) return true;
1595  RateSpec diff = rs; diff.subtract(ctt->getRates(cLnk));
1596  if (!diff.leq(lt->getAvailRates(lnk))) {
1597  reply.errMsg = "modify comtree link: new rate spec "
1598  "exceeds available link capacity";
1599  return false;
1600  }
1601  lt->getAvailRates(lnk).subtract(diff);
1602  ctt->getRates(cLnk) = rs;
1603  return true;
1604 }
1605 
1606 bool RouterCore::getComtreeLink(CtlPkt& cp, CtlPkt& reply) {
1607  comt_t comt = cp.comtree;
1608  int ctx = ctt->getComtIndex(comt);
1609  if (ctx == 0) {
1610  reply.errMsg = "get comtree link: invalid comtree";
1611  reply.mode = CtlPkt::NEG_REPLY;
1612  return false;
1613  }
1614  int lnk = cp.link;
1615  if (!lt->valid(lnk)) {
1616  reply.errMsg = "get comtree link: invalid link number";
1617  reply.mode = CtlPkt::NEG_REPLY;
1618  return false;
1619  }
1620  int cLnk = ctt->getComtLink(comt,lnk);
1621  if (cLnk == 0) {
1622  reply.errMsg = "get comtree link: specified link "
1623  "not defined in specified comtree";
1624  reply.mode = CtlPkt::NEG_REPLY;
1625  return false;
1626  }
1627  reply.comtree = comt;
1628  reply.link = lnk;
1629  reply.queue = ctt->getLinkQ(cLnk);
1630  reply.adr1 = ctt->getDest(cLnk);
1631  reply.rspec1 = ctt->getRates(cLnk);
1632 
1633  return true;
1634 }
1635 
1636 bool RouterCore::addRoute(CtlPkt& cp, CtlPkt& reply) {
1637  comt_t comt = cp.comtree;
1638  if (!ctt->validComtree(comt)) {
1639  reply.errMsg = "comtree not defined at this router\n";
1640  reply.mode = CtlPkt::NEG_REPLY;
1641  return false;
1642  }
1643  fAdr_t dest = cp.adr1;
1644  if (!Forest::validUcastAdr(dest) && !Forest::mcastAdr(dest)) {
1645  reply.errMsg = "invalid address\n";
1646  reply.mode = CtlPkt::NEG_REPLY;
1647  return false;
1648  }
1649  int lnk = cp.link;
1650  int cLnk = ctt->getComtLink(comt,lnk);
1651  int rtx = rt->getRteIndex(comt,dest);
1652  if (rtx != 0) {
1653  if ((Forest::validUcastAdr(dest) && rt->getLink(rtx) == cLnk) ||
1654  (Forest::mcastAdr(dest) && rt->isLink(rtx,cLnk))) {
1655  return true;
1656  } else {
1657  reply.errMsg = "add route: requested route "
1658  "conflicts with existing route";
1659  reply.mode = CtlPkt::NEG_REPLY;
1660  return false;
1661  }
1662  } else if (rt->addEntry(comt, dest, lnk)) {
1663  return true;
1664  }
1665  reply.errMsg = "add route: cannot add route";
1666  reply.mode = CtlPkt::NEG_REPLY;
1667  return false;
1668 }
1669 
1670 bool RouterCore::dropRoute(CtlPkt& cp, CtlPkt& reply) {
1671  comt_t comt = cp.comtree;
1672  if (!ctt->validComtree(comt)) {
1673  reply.errMsg = "comtree not defined at this router\n";
1674  reply.mode = CtlPkt::NEG_REPLY;
1675  return false;
1676  }
1677  fAdr_t dest = cp.adr1;
1678  if (!Forest::validUcastAdr(dest) && !Forest::mcastAdr(dest)) {
1679  reply.errMsg = "invalid address\n";
1680  reply.mode = CtlPkt::NEG_REPLY;
1681  return false;
1682  }
1683  int rtx = rt->getRteIndex(comt,dest);
1684  rt->removeEntry(rtx);
1685  return true;
1686 }
1687 
1688 bool RouterCore::getRoute(CtlPkt& cp, CtlPkt& reply) {
1689  comt_t comt = cp.comtree;
1690  if (!ctt->validComtree(comt)) {
1691  reply.errMsg = "comtree not defined at this router\n";
1692  reply.mode = CtlPkt::NEG_REPLY;
1693  return false;
1694  }
1695  fAdr_t dest = cp.adr1;
1696  if (!Forest::validUcastAdr(dest) && !Forest::mcastAdr(dest)) {
1697  reply.errMsg = "invalid address\n";
1698  reply.mode = CtlPkt::NEG_REPLY;
1699  return false;
1700  }
1701  int rtx = rt->getRteIndex(comt,dest);
1702  if (rtx != 0) {
1703  reply.comtree = comt;
1704  reply.adr1 = dest;
1705  if (Forest::validUcastAdr(dest)) {
1706  int lnk = ctt->getLink(rt->getLink(rtx));
1707  reply.link = lnk;
1708  } else {
1709  reply.link = 0;
1710  }
1711  return true;
1712  }
1713  reply.errMsg = "get route: no route for specified address";
1714  reply.mode = CtlPkt::NEG_REPLY;
1715  return false;
1716 }
1717 
1721 bool RouterCore::getLinkSet(CtlPkt& cp, CtlPkt& reply) {
1722  reply.lt = lt;
1723  reply.firstLinkNum = cp.firstLinkNum;
1724  reply.numOfLinks = cp.numOfLinks;
1725  cout << "RouterCore " << __FUNCTION__ << endl;
1726  return true;
1727 }
1728 
1729 bool RouterCore::modRoute(CtlPkt& cp, CtlPkt& reply) {
1730  comt_t comt = cp.comtree;
1731  if (!ctt->validComtree(comt)) {
1732  reply.errMsg = "comtree not defined at this router\n";
1733  reply.mode = CtlPkt::NEG_REPLY;
1734  return false;
1735  }
1736  fAdr_t dest = cp.adr1;
1737  if (!Forest::validUcastAdr(dest) && !Forest::mcastAdr(dest)) {
1738  reply.errMsg = "invalid address\n";
1739  reply.mode = CtlPkt::NEG_REPLY;
1740  return false;
1741  }
1742  int rtx = rt->getRteIndex(comt,dest);
1743  if (rtx != 0) {
1744  if (cp.link != 0) {
1745  if (Forest::mcastAdr(dest)) {
1746  reply.errMsg = "modify route: cannot "
1747  "set link in multicast route";
1748  reply.mode = CtlPkt::NEG_REPLY;
1749  return false;
1750  }
1751  rt->setLink(rtx,cp.link);
1752  }
1753  return true;
1754  }
1755  reply.errMsg = "modify route: invalid route";
1756  reply.mode = CtlPkt::NEG_REPLY;
1757  return false;
1758 }
1759 
1768 bool RouterCore::setLeafRange(CtlPkt& cp, CtlPkt& reply) {
1769  if (!booting) {
1770  reply.errMsg = "attempting to set leaf address range when "
1771  "not booting";
1772  reply.mode = CtlPkt::NEG_REPLY;
1773  return false;
1774  }
1775  firstLeafAdr = cp.adr1;
1776  fAdr_t lastLeafAdr = cp.adr2;
1777  if (firstLeafAdr > lastLeafAdr) {
1778  reply.errMsg = "request contained empty leaf address range";
1779  reply.mode = CtlPkt::NEG_REPLY;
1780  return false;
1781  }
1782  leafAdr = new UiSetPair((lastLeafAdr - firstLeafAdr)+1);
1783  return true;
1784 }
1785 
1789 void RouterCore::sendConnDisc(int lnk, Forest::ptyp_t type) {
1790  pktx px = ps->alloc();
1791  Packet& p = ps->getPacket(px);
1792 
1793  uint64_t nonce = lt->getNonce(lnk);
1794  p.length = Forest::OVERHEAD + 8; p.type = type; p.flags = 0;
1795  p.comtree = Forest::CONNECT_COMT;
1796  p.srcAdr = myAdr; p.dstAdr = lt->getPeerAdr(lnk);
1797  p.payload()[0] = htonl((int) (nonce >> 32));
1798  p.payload()[1] = htonl((int) (nonce & 0xffffffff));
1799  p.inLink = lnk; // hack to force consistent routing of conn/disc
1800  p.pack();
1801 
1802  // save a record of the packet in pending map
1803  pair<uint64_t,CpInfo> cpp;
1804  cpp.first = 1; cpp.first <<= 63; cpp.first |= nonce;
1805  cpp.second.px = px; cpp.second.nSent = 1; cpp.second.timestamp = now;
1806  pending->insert(cpp);
1807 
1808  // now, make copy of packet and send the copy
1809  int copy = ps->fullCopy(px);
1810  if (copy == 0) {
1811  cerr << "RouterCore::sendConnect: no packets left in packet "
1812  "store\n";
1813  return;
1814  }
1815  iop->send(copy,lnk);
1816 }
1817 
1826 bool RouterCore::sendCpReq(CtlPkt& cp, fAdr_t dest) {
1827  pktx px = ps->alloc();
1828  if (px == 0) {
1829  cerr << "RouterCore::sendCpReq: no packets left in packet "
1830  "store\n";
1831  return false;
1832  }
1833  Packet& p = ps->getPacket(px);
1834 
1835  // pack cp into p, setting mode and seq number
1836  cp.mode = CtlPkt::REQUEST;
1837  cp.seqNum = seqNum;
1838  cp.payload = p.payload();
1839  if (cp.pack() == 0) {
1840  cerr << "RouterCore::sendCpReq: control packet packing error\n";
1841  return false;
1842  }
1843  p.length = Forest::OVERHEAD + cp.paylen;
1844  p.type = Forest::NET_SIG; p.flags = 0;
1846  p.srcAdr = myAdr; p.dstAdr = dest;
1847  p.inLink = 0;
1848  p.pack();
1849 
1850  // save a record of the packet in pending map
1851  pair<uint64_t,CpInfo> cpp;
1852  cpp.first = seqNum;
1853  cpp.second.px = px; cpp.second.nSent = 1; cpp.second.timestamp = now;
1854  pending->insert(cpp);
1855  seqNum++;
1856 
1857  // now, make copy of packet and send the copy
1858  int copy = ps->fullCopy(px);
1859  if (copy == 0) {
1860  cerr << "RouterCore::sendCpReq: no packets left in packet "
1861  "store\n";
1862  return false;
1863  }
1864  if (booting) {
1865  iop->send(copy,0);
1866  pktLog->log(copy,0,true,now);
1867  } else
1868  forward(copy,ctt->getComtIndex(p.comtree));
1869 
1870  return true;
1871 }
1872 
1879 void RouterCore::resendCpReq() {
1880  map<uint64_t,CpInfo>::iterator pp;
1881  for (pp = pending->begin(); pp != pending->end(); pp++) {
1882  if (now < pp->second.timestamp + 1000000000) continue;
1883  pktx px = pp->second.px;
1884  Packet& p = ps->getPacket(px);
1885  if (pp->second.nSent >= 3) { // give up on this packet
1886  string s;
1887  cerr << "RouterCore::resendCpReq: received no reply to "
1888  "control packet after three attempts\n"
1889  << p.toString(s);
1890  ps->free(px);
1891  pending->erase(pp->first);
1892  continue;
1893  }
1894  string s1;
1895  cout << "resending control packet\n" << p.toString(s1);
1896  // make copy of packet and send the copy
1897  pp->second.timestamp = now;
1898  pp->second.nSent++;
1899  pktx copy = ps->fullCopy(px);
1900  if (copy == 0) {
1901  cerr << "RouterCore::resendCpReq: no packets left in "
1902  "packet store\n";
1903  return;
1904  }
1905  if (p.type == Forest::CONNECT || p.type == Forest::DISCONNECT) {
1906  iop->send(copy,p.inLink);
1907  } else if (booting) {
1908  pktLog->log(copy,0,true,now);
1909  iop->send(copy,0);
1910  } else {
1911  forward(copy,ctt->getComtIndex(p.comtree));
1912  }
1913  }
1914 }
1915 
1925 void RouterCore::handleCpReply(pktx reply, CtlPkt& cpr) {
1926  map<uint64_t,CpInfo>::iterator pp = pending->find(cpr.seqNum);
1927  if (pp == pending->end()) {
1928  // this is a reply to a request we never sent, or
1929  // possibly, a reply to a request we gave up on
1930  ps->free(reply);
1931  return;
1932  }
1933  // an expected reply, so remove it from the map of pending requests
1934  ps->free(pp->second.px);
1935  pending->erase(pp);
1936 
1937  // and then handle it
1938  switch (cpr.type) {
1939  case CtlPkt::CLIENT_CONNECT: case CtlPkt::CLIENT_DISCONNECT:
1940  if (cpr.mode == CtlPkt::NEG_REPLY) {
1941  cerr << "RouterCore::handleCpReply: got negative reply "
1942  "to a connect or disconnect request: "
1943  << cpr.errMsg << endl;
1944  break;
1945  }
1946  // otherwise, nothing to do
1947  break;
1948  case CtlPkt::BOOT_ROUTER: {
1949  if (cpr.mode == CtlPkt::NEG_REPLY) {
1950  cerr << "RouterCore::handleCpReply: got "
1951  "negative reply to a boot request: "
1952  << cpr.errMsg << endl;
1953  break;
1954  }
1955  if (booting && !setup()) {
1956  cerr << "RouterCore::handleCpReply: setup failed after "
1957  "completion of boot phase\n";
1958  perror("");
1959  pktLog->write(cout);
1960  exit(1);
1961  }
1962  iop->closeBootSock();
1963  booting = false;
1964  break;
1965  }
1966  default:
1967  cerr << "RouterCore::handleCpReply: unexpected control packet "
1968  "type\n";
1969  break;
1970  }
1971  ps->free(reply);
1972 }
1973 
1979 void RouterCore::returnToSender(pktx px, CtlPkt& cp) {
1980  Packet& p = ps->getPacket(px);
1981  cp.payload = p.payload();
1982  int paylen = cp.pack();
1983  if (paylen == 0) {
1984  cerr << "RouterCore::returnToSender: control packet formatting "
1985  "error, zero payload length\n";
1986  ps->free(px);
1987  }
1988  p.length = (Packet::OVERHEAD + paylen);
1989  p.flags = 0;
1990  p.dstAdr = p.srcAdr;
1991  p.srcAdr = myAdr;
1992  p.pack();
1993 
1994  if (booting) {
1995  pktLog->log(px,0,true,now);
1996  iop->send(px,0);
1997  return;
1998  }
1999 
2000  int cLnk = ctt->getComtLink(p.comtree,p.inLink);
2001  int qn = ctt->getLinkQ(cLnk);
2002  if (!qm->enq(px,qn,now)) { ps->free(px); }
2003 }
2004 
2005 } // ends namespace