forest-net
an overlay networks for large-scale virtual worlds
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator
RouterCore_feng.cpp
1 
9 #include "RouterCore.h"
10 
11 using namespace forest;
12 
23 bool processArgs(int argc, char *argv[], RouterInfo& args) {
24  // set default values
25  args.mode = "local";
26  args.myAdr = args.bootIp = args.nmAdr = args.nmIp = 0;
27  args.ccAdr = args.firstLeafAdr = args.lastLeafAdr = 0;
28  args.ifTbl = ""; args.lnkTbl = ""; args.comtTbl = "";
29  args.rteTbl = ""; args.statSpec = "";
30  args.finTime = 0;
31 
32  string s;
33  for (int i = 1; i < argc; i++) {
34  s = argv[i];
35  if (s.compare(0,10,"mode=local") == 0) {
36  args.mode = "local";
37  } else if (s.compare(0,11,"mode=remote") == 0) {
38  args.mode = "remote";
39  } else if (s.compare(0,6,"myAdr=") == 0) {
40  args.myAdr = Forest::forestAdr(&argv[i][6]);
41  } else if (s.compare(0,7,"bootIp=") == 0) {
42  args.bootIp = Np4d::ipAddress(&argv[i][7]);
43  } else if (s.compare(0,6,"nmAdr=") == 0) {
44  args.nmAdr = Forest::forestAdr(&argv[i][6]);
45  } else if (s.compare(0,5,"nmIp=") == 0) {
46  args.nmIp = Np4d::ipAddress(&argv[i][5]);
47  } else if (s.compare(0,6,"ccAdr=") == 0) {
48  args.ccAdr = Forest::forestAdr(&argv[i][6]);
49  } else if (s.compare(0,13,"firstLeafAdr=") == 0) {
50  args.firstLeafAdr = Forest::forestAdr(&argv[i][13]);
51  } else if (s.compare(0,12,"lastLeafAdr=") == 0) {
52  args.lastLeafAdr = Forest::forestAdr(&argv[i][12]);
53  } else if (s.compare(0,6,"ifTbl=") == 0) {
54  args.ifTbl = &argv[i][6];
55  } else if (s.compare(0,7,"lnkTbl=") == 0) {
56  args.lnkTbl = &argv[i][7];
57  } else if (s.compare(0,8,"comtTbl=") == 0) {
58  args.comtTbl = &argv[i][8];
59  } else if (s.compare(0,7,"rteTbl=") == 0) {
60  args.rteTbl = &argv[i][7];
61  } else if (s.compare(0,9,"statSpec=") == 0) {
62  args.statSpec = &argv[i][9];
63  } else if (s.compare(0,8,"finTime=") == 0) {
64  sscanf(&argv[i][8],"%d",&args.finTime);
65  } else {
66  cerr << "unrecognized argument: " << argv[i] << endl;
67  return false;
68  }
69  }
70  if (args.mode.compare("local") == 0 &&
71  (args.myAdr == 0 || args.firstLeafAdr == 0 ||
72  args.lastLeafAdr ==0 || args.lastLeafAdr < args.firstLeafAdr)) {
73  cerr << "processArgs: local configuration requires myAdr, "
74  "firstLeafAdr, lastLeafAdr and that firstLeafAdr "
75  "be no larger than lastLeafAdr\n";
76  return false;
77  } else if (args.mode.compare("remote") == 0 &&
78  (args.bootIp == 0 || args.myAdr == 0 ||
79  args.nmIp == 0 || args.nmAdr == 0)) {
80  cerr << "processArgs: remote configuration requires bootIp, "
81  "myAdr, netMgrIp and netMgrAdr\n";
82  return false;
83  }
84  return true;
85 }
86 
89 int main(int argc, char *argv[]) {
90  RouterInfo args;
91  if (!processArgs(argc, argv, args))
92  fatal("fRouter: error processing command line arguments");
93  bool booting = args.mode.compare("remote") == 0;
94  RouterCore router(booting,args);
95 
96  if (!router.readTables(args))
97  fatal("router: could not read specified config files");
98  if (!booting) {
99  if (!router.setup())
100  fatal("router: inconsistency in config files");
101  }
102  router.run(args.finTime);
103  cout << endl;
104  router.dump(cout); // print final tables
105  cout << endl;
106  return 0;
107 }
108 
109 namespace forest {
110 
115 RouterCore::RouterCore(bool booting1, const RouterInfo& config)
116  : booting(booting1) {
117  nIfaces = 50; nLnks = 1000;
118  nComts = 5000; nRts = 100000;
119  nPkts = 100000; nBufs = 50000; nQus = 10000;
120 
121  myAdr = config.myAdr;
122  bootIp = config.bootIp;
123  nmAdr = config.nmAdr;
124  nmIp = config.nmIp;
125  ccAdr = config.ccAdr;
126  firstLeafAdr = config.firstLeafAdr;
127 
128  ps = new PacketStore(nPkts, nBufs);
129  ift = new IfaceTable(nIfaces);
130  lt = new LinkTable(nLnks);
131  ctt = new ComtreeTable(nComts,10*nComts,lt);
132  rt = new RouteTable(nRts,myAdr,ctt);
133  sm = new StatsModule(1000, nLnks, nQus, ctt);
134  iop = new IoProcessor(nIfaces, ift, lt, ps, sm);
135  qm = new QuManager(nLnks, nPkts, nQus, min(50,5*nPkts/nLnks), ps, sm);
136  pktLog = new PacketLog(20000,500,ps);
137 
138  if (!booting)
139  leafAdr = new UiSetPair((config.lastLeafAdr - firstLeafAdr)+1);
140 
141  seqNum = 1;
142  pending = new map<uint64_t,CpInfo>;
143 }
144 
145 RouterCore::~RouterCore() {
146  delete pktLog; delete qm; delete iop; delete sm;
147  delete rt; delete ctt; delete lt; delete ift; delete ps;
148  if (leafAdr != 0) delete leafAdr;
149 }
150 
157 bool RouterCore::readTables(const RouterInfo& config) {
158  if (config.ifTbl.compare("") != 0) {
159  ifstream fs; fs.open(config.ifTbl.c_str());
160  if (fs.fail() || !ift->read(fs)) {
161  cerr << "RouterCore::init: can't read "
162  << "interface table\n";
163  return false;
164  }
165  fs.close();
166  }
167  if (config.lnkTbl.compare("") != 0) {
168  ifstream fs; fs.open(config.lnkTbl.c_str());
169  if (fs.fail() || !lt->read(fs)) {
170  cerr << "RouterCore::init: can't read "
171  << "link table\n";
172  return false;
173  }
174  fs.close();
175  }
176  if (config.comtTbl.compare("") != 0) {
177  ifstream fs; fs.open(config.comtTbl.c_str());
178  if (fs.fail() || !ctt->read(fs)) {
179  cerr << "RouterCore::init: can't read "
180  << "comtree table\n";
181  return false;
182  }
183  fs.close();
184  }
185  if (config.rteTbl.compare("") != 0) {
186  ifstream fs; fs.open(config.rteTbl.c_str());
187  if (fs.fail() || !rt->read(fs)) {
188  cerr << "RouterCore::init: can't read "
189  << "routing table\n";
190  return false;
191  }
192  fs.close();
193  }
194  if (config.statSpec.compare("") != 0) {
195  ifstream fs; fs.open(config.statSpec.c_str());
196  if (fs.fail() || !sm->read(fs)) {
197  cerr << "RouterCore::init: can't read "
198  << "statistics spec\n";
199  return false;
200  }
201  fs.close();
202  }
203  return true;
204 }
205 
210 bool RouterCore::setup() {
211  dump(cout);
212  if (!setupIfaces()) return false;
213  if (!setupLeafAddresses()) return false;
214  if (!setupQueues()) return false;
215  if (!checkTables()) return false;
216  if (!setAvailRates()) return false;
217  addLocalRoutes();
218 
219  return true;
220 }
221 
227  for (int iface = ift->firstIface(); iface != 0;
228  iface = ift->nextIface(iface)) {
229  if (iop->ready(iface)) continue;
230  if (!iop->setup(iface)) {
231  cerr << "RouterCore::setupIfaces: could not "
232  "setup interface " << iface << endl;
233  return false;
234  }
235  }
236  return true;
237 }
238 
239 
246  for (int lnk = lt->firstLink(); lnk != 0; lnk = lt->nextLink(lnk)) {
247  if (booting || lt->getPeerType(lnk) == Forest::ROUTER) continue;
248  if (!allocLeafAdr(lt->getPeerAdr(lnk)))
249  return false;
250  }
251  return true;
252 }
253 
261  // Set link rates in QuManager
262  for (int lnk = lt->firstLink(); lnk != 0; lnk = lt->nextLink(lnk)) {
263  qm->setLinkRates(lnk,lt->getRates(lnk));
264  }
267  int ctx;
268  for (ctx = ctt->firstComtIndex(); ctx != 0;
269  ctx = ctt->nextComtIndex(ctx)) {
270  set<int>& links = ctt->getLinks(ctx);
271  set<int>::iterator p;
272  for (p = links.begin(); p != links.end(); p++) {
273  int cLnk = *p; int lnk = ctt->getLink(cLnk);
274  int qid = qm->allocQ(lnk);
275  if (qid == 0) return false;
276  ctt->setLinkQ(cLnk,qid);
277  qm->setQRates(qid,rs);
278  if (lt->getPeerType(lnk) == Forest::ROUTER)
279  qm->setQLimits(qid,100,200000);
280  else
281  qm->setQLimits(qid,50,100000);
282  sm->clearQuStats(qid);
283  }
284  }
285  return true;
286 }
287 
295  bool success = true;
296 
297  // verify that the default interface is valid and
298  // that each interface has a non-zero IP address
299  if (!ift->valid(ift->getDefaultIface())) {
300  cerr << "RouterCore::checkTables: specified default iface "
301  << ift->getDefaultIface() << " is invalid\n";
302  success = false;
303  }
304  int iface;
305  for (iface = ift->firstIface(); iface != 0;
306  iface = ift->nextIface(iface)) {
307  if (ift->getIpAdr(iface) == 0) {
308  cerr << "RouterCore::checkTables: interface "
309  << iface << " has zero for IP address\n";
310  success = false;
311  }
312  }
313 
314  // verify that each link is assigned to a valid interface
315  // that the peer Ip address and port are non-zero and consistent,
316  // the the peer type is valid and that the peer address is a valid
317  // unicast forest address
318  int lnk;
319  for (lnk = lt->firstLink(); lnk != 0; lnk = lt->nextLink(lnk)) {
320  iface = lt->getIface(lnk);
321  if (!ift->valid(iface)) {
322  cerr << "RouterCore::checkTables: interface " << iface
323  << " for link " << lnk << " is not valid\n";
324  success = false;
325  }
326  if (lt->getPeerIpAdr(lnk) == 0 &&
327  lt->getPeerType(lnk) == Forest::ROUTER) {
328  cerr << "RouterCore::checkTables: invalid peer IP "
329  << "for link " << lnk << endl;
330  success = false;
331  }
332  if (!Forest::validUcastAdr(lt->getPeerAdr(lnk))) {
333  cerr << "RouterCore::checkTables: invalid peer address "
334  << "for link " << lnk << endl;
335  success = false;
336  }
337  }
338 
339  // verify that the links in each comtree are valid,
340  // that the router links and core links refer to peer routers,
341  // and that each comtree link is consistent
342  int ctx;
343  for (ctx = ctt->firstComtIndex(); ctx != 0;
344  ctx = ctt->nextComtIndex(ctx)) {
345  int comt = ctt->getComtree(ctx);
346  int plnk = ctt->getPlink(ctx);
347  int pcLnk = ctt->getPCLink(ctx);
348  if (plnk != ctt->getLink(pcLnk)) {
349  cerr << "RouterCore::checkTables: parent link "
350  << plnk << " not consistent with pcLnk\n";
351  success = false;
352  }
353  if (ctt->inCore(ctx) && plnk != 0 && !ctt->isCoreLink(pcLnk)) {
354  cerr << "RouterCore::checkTables: parent link "
355  << plnk << " of core node does not lead to "
356  << "another core node\n";
357  success = false;
358  }
359  set<int>& links = ctt->getLinks(ctx);
360  set<int>::iterator p;
361  for (p = links.begin(); p != links.end(); p++) {
362  int cLnk = *p; int lnk = ctt->getLink(cLnk);
363  if (!lt->valid(lnk)) {
364  cerr << "RouterCore::checkTables: link "
365  << lnk << " in comtree " << comt
366  << " not in link table" << endl;
367  success = false;
368  continue;
369  }
370  fAdr_t dest = ctt->getDest(cLnk);
371  if (dest != 0 && !Forest::validUcastAdr(dest)) {
372  cerr << "RouterCore::checkTables: dest addr "
373  << "for " << lnk << " in comtree " << comt
374  << " is not valid" << endl;
375  success = false;
376  }
377  int qid = ctt->getLinkQ(cLnk);
378  if (qid == 0) {
379  cerr << "RouterCore::checkTables: queue id "
380  << "for " << lnk << " in comtree " << comt
381  << " is zero" << endl;
382  success = false;
383  }
384  }
385  if (!success) break;
386  set<int>& rtrLinks = ctt->getRtrLinks(ctx);
387  for (p = rtrLinks.begin(); p != rtrLinks.end(); p++) {
388  int cLnk = *p; int lnk = ctt->getLink(cLnk);
389  if (!ctt->isLink(ctx,lnk)) {
390  cerr << "RouterCore::checkTables: router link "
391  << lnk << " is not valid in comtree "
392  << comt << endl;
393  success = false;
394  }
395  if (lt->getPeerType(lnk) != Forest::ROUTER) {
396  cerr << "RouterCore::checkTables: router link "
397  << lnk << " in comtree " << comt
398  << " connects to non-router peer\n";
399  success = false;
400  }
401  }
402  set<int>& coreLinks = ctt->getCoreLinks(ctx);
403  for (p = coreLinks.begin(); p != coreLinks.end(); p++) {
404  int cLnk = *p; int lnk = ctt->getLink(cLnk);
405  if (!ctt->isRtrLink(ctx,lnk)) {
406  cerr << "RouterCore::checkTables: core link "
407  << lnk << " is not a router link "
408  << comt << endl;
409  success = false;
410  }
411  }
412  }
413  // come back later and add checks for route table
414  return success;
415 }
416 
423  bool success = true;
428  int iface;
429  for (iface = ift->firstIface(); iface != 0;
430  iface = ift->nextIface(iface)) {
431  RateSpec ifRates = ift->getRates(iface);
432  if (!minRates.leq(ifRates) || !ifRates.leq(maxRates)) {
433  cerr << "RouterCore::setAvailRates: interface rates "
434  "outside allowed range\n";
435  success = false;
436  }
437  ift->getAvailRates(iface) = ifRates;
438  }
439  if (!success) return false;
440  int lnk;
441  for (lnk = lt->firstLink(); lnk != 0; lnk = lt->nextLink(lnk)) {
442  RateSpec lnkRates = lt->getRates(lnk);
443  if (!minRates.leq(lnkRates) || !lnkRates.leq(maxRates)) {
444  cerr << "RouterCore::setAvailRates: link rates "
445  "outside allowed range\n";
446  success = false;
447  }
448  iface = lt->getIface(lnk);
449  RateSpec ifAvail = ift->getRates(iface);
450  if (!lnkRates.leq(ifAvail)) {
451  cerr << "RouterCore::setAvailRates: oversubscribing "
452  "interface " << iface << endl;
453  success = false;
454  }
455  ift->getAvailRates(iface).subtract(lnkRates);
456  lnkRates.scale(.9); // allocate at most 90% of link
457  lt->getAvailRates(lnk) = lnkRates;
458  sm->clearLnkStats(lnk);
459  }
460  if (!success) return false;
461  int ctx;
462  for (ctx = ctt->firstComtIndex(); ctx != 0;
463  ctx = ctt->nextComtIndex(ctx)) {
464  set<int>& comtLinks = ctt->getLinks(ctx);
465  set<int>::iterator p;
466  for (p = comtLinks.begin(); p != comtLinks.end(); p++) {
467  int cLnk = *p; int lnk = ctt->getLink(cLnk);
468  RateSpec comtRates = ctt->getRates(cLnk);
469  if (!comtRates.leq(lt->getAvailRates(lnk))) {
470  cerr << "RouterCore::setAvailRates: "
471  "oversubscribing link "
472  << lnk << endl;
473  success = false;
474  }
475  lt->getAvailRates(lnk).subtract(comtRates);
476  }
477  }
478  return success;
479 }
480 
485  for (int ctx = ctt->firstComtIndex(); ctx != 0;
486  ctx = ctt->nextComtIndex(ctx)) {
487  int comt = ctt->getComtree(ctx);
488  set<int>& comtLinks = ctt->getLinks(ctx);
489  set<int>::iterator p;
490  for (p = comtLinks.begin(); p != comtLinks.end(); p++) {
491  int cLnk = *p; int lnk = ctt->getLink(cLnk);
492  fAdr_t peerAdr = lt->getPeerAdr(lnk);
493  if (lt->getPeerType(lnk) == Forest::ROUTER &&
494  Forest::zipCode(peerAdr)
496  continue;
497  if (rt->getRteIndex(comt,peerAdr) != 0)
498  continue;
499  rt->addEntry(comt,peerAdr,cLnk);
500  }
501  }
502 }
503 
507 void RouterCore::dump(ostream& out) {
508  string s;
509  out << "Interface Table\n\n" << ift->toString(s) << endl;
510  out << "Link Table\n\n" << lt->toString(s) << endl;
511  out << "Comtree Table\n\n" << ctt->toString(s) << endl;
512  out << "Routing Table\n\n" << rt->toString(s) << endl;
513  out << "Statistics\n\n" << sm->toString(s) << endl;
514 }
515 
544 void RouterCore::run(uint64_t finishTime) {
545  now = Misc::getTimeNs();
546  if (booting) {
547  if (!iop->setupBootSock(bootIp,nmIp))
548  fatal("RouterCore:run: could not setup boot socket\n");
549  string s1;
550  cout << "sending boot request to " << Np4d::ip2string(nmIp,s1)
551  << endl;
552  CtlPkt cp(CtlPkt::BOOT_ROUTER,CtlPkt::REQUEST,0);
553  if (!sendCpReq(cp,nmAdr))
554  fatal("RouterCore::run: could not send boot request\n");
555  }
556  // create packet log to record a sample of packets handled
557 
558  uint64_t statsTime = 0; // time statistics were last processed
559  bool didNothing;
560  int controlCount = 20; // used to limit overhead of control
561  // packet processing
562  queue<int> ctlQ; // queue for control packets
563 
564  now = Misc::getTimeNs();
565  finishTime *= 1000000000; // convert from seconds to ns
566  while (finishTime == 0 || now < finishTime) {
567  didNothing = true;
568 
569  // input processing
570  pktx px = iop->receive();
571  if (px != 0) {
572  didNothing = false;
573  Packet& p = ps->getPacket(px);
574  int ptype = p.type;
575  pktLog->log(px,p.inLink,false,now);
576  int ctx = ctt->getComtIndex(p.comtree);
577  if (!pktCheck(px,ctx)) {
578  ps->free(px);
579  } else if (booting) {
580  handleCtlPkt(px);
581  } else if (ptype == Forest::CLIENT_DATA) {
582  forward(px,ctx);
583  } else if (ptype == Forest::SUB_UNSUB) {
584  subUnsub(px,ctx);
585  } else if (ptype == Forest::RTE_REPLY) {
586  handleRteReply(px,ctx);
587  } else if (ptype == Forest::CONNECT ||
588  ptype == Forest::DISCONNECT) {
589  handleConnDisc(px);
590  } else if (p.dstAdr != myAdr) {
591  forward(px,ctx);
592  } else {
593  ctlQ.push(px);
594  }
595  }
596 
597  // output processing
598  int lnk;
599  while ((px = qm->deq(lnk, now)) != 0) {
600  didNothing = false;
601  pktLog->log(px,lnk,true,now);
602  iop->send(px,lnk);
603  }
604 
605  // control packet processing
606  if (!ctlQ.empty() && (didNothing || --controlCount <= 0)) {
607  handleCtlPkt(ctlQ.front());
608  ctlQ.pop();
609  controlCount = 20; // do one control packet for
610  // every 20 iterations when busy
611  }
612 
613  // every 300 ms, update statistics and check for un-acked
614  // control packets
615  if (now - statsTime > 300000000) {
616  sm->record(now);
617  statsTime = now;
618  resendCpReq();
619  didNothing = false;
620  }
621 
622  // if did nothing on that pass, sleep for a millisecond.
623  if (didNothing) { usleep(1000); }
624 
625  // update current time
626  now = Misc::getTimeNs();
627  }
628 
629  // write out recorded events
630  pktLog->write(cout);
631  cout << endl;
632  cout << sm->iPktCnt(0) << " packets received, "
633  << sm->oPktCnt(0) << " packets sent\n";
634  cout << sm->iPktCnt(-1) << " from routers, "
635  << sm->oPktCnt(-1) << " to routers\n";
636  cout << sm->iPktCnt(-2) << " from clients, "
637  << sm->oPktCnt(-2) << " to clients\n";
638 }
639 
645 bool RouterCore::pktCheck(pktx px, int ctx) {
646  Packet& p = ps->getPacket(px);
647  // check version and length
648  if (p.version != Forest::FOREST_VERSION) {
649  return false;
650  }
651  if (p.length != p.bufferLen || p.length < Forest::HDR_LENG) {
652  return false;
653  }
654  if (booting) {
655  return p.tunIp == nmIp && p.type == Forest::NET_SIG
657  }
658  if (p.type == Forest::CONNECT || p.type == Forest::DISCONNECT)
659  return (p.length == Forest::OVERHEAD+8);
660 
661  if (!ctt->validComtIndex(ctx)) {
662  return false;
663  }
664  fAdr_t adr = p.dstAdr;
665  if (!Forest::validUcastAdr(adr) && !Forest::mcastAdr(adr)) {
666  return false;
667  }
668 
669  int inLink = p.inLink;
670  if (inLink == 0) return false;
671  int cLnk = ctt->getComtLink(ctt->getComtree(ctx),inLink);
672  if (cLnk == 0) {
673  return false;
674  }
675 
676  // extra checks for packets from untrusted peers
677  if (lt->getPeerType(inLink) < Forest::TRUSTED) {
678  // check for spoofed source address
679  if (lt->getPeerAdr(inLink) != p.srcAdr) return false;
680  // and that destination restrictions are respected
681  fAdr_t dest = ctt->getDest(cLnk);
682  if (dest!=0 && p.dstAdr != dest && p.dstAdr != myAdr)
683  return false;
684  // verify that type is valid
685  Forest::ptyp_t ptype = p.type;
686  if (ptype != Forest::CLIENT_DATA &&
687  ptype != Forest::CONNECT && ptype != Forest::DISCONNECT &&
688  ptype != Forest::SUB_UNSUB && ptype != Forest::CLIENT_SIG)
689  return false;
690  int comt = ctt->getComtree(ctx);
691  if ((ptype == Forest::CONNECT || ptype == Forest::DISCONNECT) &&
692  comt != (int) Forest::CONNECT_COMT)
693  return false;
694  if (ptype == Forest::CLIENT_SIG &&
695  comt != (int) Forest::CLIENT_SIG_COMT)
696  return false;
697  }
698  return true;
699 }
700 
710 void RouterCore::forward(pktx px, int ctx) {
711  Packet& p = ps->getPacket(px);
712  int rtx = rt->getRteIndex(p.comtree,p.dstAdr);
713  if (rtx != 0) { // valid route case
714  // reply to route request
715  if ((p.flags & Forest::RTE_REQ)) {
716  sendRteReply(px,ctx);
717  p.flags = (p.flags & (~Forest::RTE_REQ));
718  p.pack();
719  p.hdrErrUpdate();
720  }
721  if (Forest::validUcastAdr(p.dstAdr)) {
722  int rcLnk = rt->getLink(rtx);
723  int lnk = ctt->getLink(rcLnk);
724  int qid = ctt->getLinkQ(rcLnk);
725  if (lnk == p.inLink || !qm->enq(px,qid,now)) {
726  ps->free(px);
727  }
728  return;
729  }
730  // multicast data packet
731  multiSend(px,ctx,rtx);
732  return;
733  }
734  // no valid route
735 // think about suppressing flooding, if address is in range for this router
736  if (Forest::validUcastAdr(p.dstAdr)) {
737  // send to neighboring routers in comtree
739  p.pack(); p.hdrErrUpdate();
740  }
741  multiSend(px,ctx,rtx);
742  return;
743 }
754 void RouterCore::multiSend(pktx px, int ctx, int rtx) {
755  int qvec[nLnks]; int n = 0;
756  Packet& p = ps->getPacket(px);
757 
758  int inLink = p.inLink;
759  if (Forest::validUcastAdr(p.dstAdr)) {
760  // flooding a unicast packet to neighboring routers
761  int myZip = Forest::zipCode(myAdr);
762  int pZip = Forest::zipCode(p.dstAdr);
763  set<int>& rtrLinks = ctt->getRtrLinks(ctx);
764  set<int>::iterator lp;
765  for (lp = rtrLinks.begin(); lp != rtrLinks.end(); lp++) {
766  int rcLnk = *lp; int lnk = ctt->getLink(rcLnk);
767  int peerZip = Forest::zipCode(lt->getPeerAdr(lnk));
768  if (pZip == myZip && peerZip != myZip) continue;
769  if (lnk == inLink) continue;
770  qvec[n++] = ctt->getLinkQ(rcLnk);
771  }
772  } else {
773  // forwarding a multicast packet
774  // first identify neighboring core routers to get copies
775  int pLink = ctt->getPlink(ctx);
776  set<int>& coreLinks = ctt->getCoreLinks(ctx);
777  set<int>::iterator lp;
778  for (lp = coreLinks.begin(); lp != coreLinks.end(); lp++) {
779  int rcLnk = *lp; int lnk = ctt->getLink(rcLnk);
780  if (lnk == inLink || lnk == pLink) continue;
781  qvec[n++] = ctt->getLinkQ(rcLnk);
782  }
783  // now copy for parent
784  if (pLink != 0 && pLink != inLink) {
785  qvec[n++] = ctt->getLinkQ(ctt->getPCLink(ctx));
786  }
787  // now, copies for subscribers if any
788  if (rtx != 0) {
789  set<int>& subLinks = rt->getSubLinks(rtx);
790  for (lp = subLinks.begin(); lp !=subLinks.end(); lp++) {
791  int rcLnk = *lp; int lnk = ctt->getLink(rcLnk);
792  if (lnk == inLink) continue;
793  qvec[n++] = ctt->getLinkQ(rcLnk);
794  }
795  }
796  }
797 
798  if (n == 0) { ps->free(px); return; }
799 
800  // make copies and queue them
801  pktx px1 = px;
802  for (int i = 0; i < n-1; i++) { // process first n-1 copies
803  if (qm->enq(px1,qvec[i],now)) {
804  px1 = ps->clone(px);
805  }
806  }
807  // process last copy
808  if (!qm->enq(px1,qvec[n-1],now)) {
809  ps->free(px1);
810  }
811 }
812 
819 void RouterCore::sendRteReply(pktx px, int ctx) {
820  Packet& p = ps->getPacket(px);
821 
822  pktx px1 = ps->alloc();
823  Packet& p1 = ps->getPacket(px1);
824  p1.length = Forest::HDR_LENG + 8;
825  p1.type = Forest::RTE_REPLY;
826  p1.flags = 0;
827  p1.comtree = p.comtree;
828  p1.srcAdr = myAdr;
829  p1.dstAdr = p.srcAdr;
830 
831  p1.pack();
832  (p1.payload())[0] = htonl(p.dstAdr);
833  p1.hdrErrUpdate(); p.payErrUpdate();
834 
835  int cLnk = ctt->getComtLink(ctt->getComtree(ctx),p.inLink);
836  qm->enq(px1,ctt->getLinkQ(cLnk),now);
837 }
838 
847 void RouterCore::handleRteReply(pktx px, int ctx) {
848  Packet& p = ps->getPacket(px);
849  int rtx = rt->getRteIndex(p.comtree, p.dstAdr);
850  int cLnk = ctt->getComtLink(ctt->getComtree(ctx),p.inLink);
851  if ((p.flags & Forest::RTE_REQ) && rtx != 0)
852  sendRteReply(px,ctx);
853  int adr = ntohl((p.payload())[0]);
854  if (Forest::validUcastAdr(adr) &&
855  rt->getRteIndex(p.comtree,adr) == 0) {
856  rt->addEntry(p.comtree,adr,cLnk);
857  }
858  if (rtx == 0) {
859  // send to neighboring routers in comtree
861  p.pack(); p.hdrErrUpdate();
862  multiSend(px,ctx,rtx);
863  return;
864  }
865  int dcLnk = rt->getLink(rtx); int dLnk = ctt->getLink(dcLnk);
866  if (lt->getPeerType(dLnk) != Forest::ROUTER || !qm->enq(px,dLnk,now))
867  ps->free(px);
868  return;
869 }
870 
878 void RouterCore::subUnsub(pktx px, int ctx) {
879  Packet& p = ps->getPacket(px);
880  uint32_t *pp = p.payload();
881  // add/remove branches from routes
882  // if non-core node, also propagate requests upward as
883  // appropriate
884  int comt = ctt->getComtree(ctx);
885  int inLink = p.inLink;
886  int cLnk = ctt->getComtLink(comt,inLink);
887  uint64_t seq = ntohl(pp[0]);
888  seq <<= 32;
889  seq |= ntohl(pp[1]);
890  //process ack
891  if ((p.flags & Forest::ACK_FLAG) != 0) {
892  map<uint64_t,CpInfo>::iterator it = pending->find(seq);
893  if (it == pending->end()) {
894  // this is a reply to a request we never sent, or
895  // possibly, a reply to a request we gave up on
896  ps->free(px);
897  return;
898  }
899  pending->erase(it); ps->free(px); return;
900  }
901  // ignore subscriptions from the parent or core neighbors
902  if (inLink == ctt->getPlink(ctx) || ctt->isCoreLink(cLnk)) {
903  ps->free(px); return;
904  }
905  bool propagate = false;
906  int rtx; fAdr_t addr;
907 
908  // add subscriptions
909  int addcnt = ntohl(pp[2]);
910  if (addcnt < 0 || addcnt > 350 ||
911  Forest::OVERHEAD + (addcnt + 4)*4 > p.length) {
912  ps->free(px); return;
913  }
914  for (int i = 3; i <= addcnt + 2; i++) {
915  addr = ntohl(pp[i]);
916  if (!Forest::mcastAdr(addr)) continue; // ignore unicast or 0
917  rtx = rt->getRteIndex(comt,addr);
918  if (rtx == 0) {
919  rtx = rt->addEntry(comt,addr,cLnk);
920  propagate = true;
921  } else if (!rt->isLink(rtx,cLnk)) {
922  rt->addLink(rtx,cLnk);
923  pp[i] = 0; // so, parent will ignore
924  }
925  }
926  // remove subscriptions
927  int dropcnt = ntohl(pp[addcnt+3]);
928  if (dropcnt < 0 || addcnt + dropcnt > 350 ||
929  Forest::OVERHEAD + (addcnt + dropcnt + 4)*4 > p.length) {
930  ps->free(px); return;
931  }
932  for (int i = addcnt + 4; i <= addcnt + dropcnt + 3; i++) {
933  addr = ntohl(pp[i]);
934  if (!Forest::mcastAdr(addr)) continue; // ignore unicast or 0
935  rtx = rt->getRteIndex(comt,addr);
936  if (rtx == 0) continue;
937  rt->removeLink(rtx,cLnk);
938  if (rt->noLinks(rtx)) {
939  rt->removeEntry(rtx);
940  propagate = true;
941  } else {
942  pp[i] = 0;
943  }
944  }
945  pktx copy = ps->fullCopy(px);
946  Packet& p1 = ps->getPacket(copy);
947  // propagate subscription packet to parent if not a core node
948  if (propagate && !ctt->inCore(ctx) && ctt->getPlink(ctx) != 0) {
949  pair<uint64_t,CpInfo> cpp;
950  cpp.first = seqNum;
951  pp[0] = htonl((uint32_t) (seqNum >> 32));
952  pp[1] = htonl((uint32_t) (seqNum & 0xffffffff));
953  seqNum++;
954  //change srcAdr of outgoing subscription packet
955  p.srcAdr = myAdr;
956  //better to change desAdr too
957  p.pack();
958  cpp.second.px = px; cpp.second.nSent = 1; cpp.second.timestamp = now;
959  pending->insert(cpp);
960  //propagate
961  p.payErrUpdate();
962  int qid = ctt->getLinkQ(ctt->getPCLink(ctx));
963  qm->enq(px,qid,now);
964  }
965  p1.flags |= Forest::ACK_FLAG; p1.dstAdr = p1.srcAdr; p1.srcAdr = myAdr;
966  p1.pack();
967  int qid = ctt->getLinkQ(cLnk);
968  qm->enq(copy,qid,now);
969  return;
970 }
971 
975 void RouterCore::handleConnDisc(pktx px) {
976  Packet& p = ps->getPacket(px);
977  int inLnk = p.inLink;
978 
979  if (p.srcAdr != lt->getPeerAdr(inLnk) ||
980  p.length != Forest::OVERHEAD + 8) {
981  ps->free(px); return;
982  }
983  uint64_t nonce = ntohl(p.payload()[0]);
984  nonce <<= 32;
985  nonce |= ntohl(p.payload()[1]);
986  if (nonce != lt->getNonce(inLnk)) { ps->free(px); return; }
987  if ((p.flags & Forest::ACK_FLAG) != 0) {
988  uint64_t x = 1; x <<= 63; x |= nonce;
989  pending->erase(x); ps->free(px); return;
990  }
991  if (p.type == Forest::CONNECT) {
992  if (lt->isConnected(inLnk) && !lt->revertEntry(inLnk)) {
993  ps->free(px); return;
994  }
995  if (!lt->remapEntry(inLnk,p.tunIp,p.tunPort)) {
996  ps->free(px); return;
997  }
998  lt->setConnectStatus(inLnk,true);
999  if (nmAdr != 0 && lt->getPeerType(inLnk) == Forest::CLIENT) {
1000  CtlPkt cp(CtlPkt::CLIENT_CONNECT,CtlPkt::REQUEST,0);
1001  cp.adr1 = p.srcAdr; cp.adr2 = myAdr;
1002  sendCpReq(cp,nmAdr);
1003  }
1004  } else if (p.type == Forest::DISCONNECT) {
1005  lt->setConnectStatus(inLnk,false);
1006  lt->revertEntry(inLnk);
1007  if (nmAdr != 0 && lt->getPeerType(inLnk) == Forest::CLIENT) {
1008  dropLink(inLnk);
1009  CtlPkt cp(CtlPkt::CLIENT_DISCONNECT,CtlPkt::REQUEST,0);
1010  cp.adr1 = p.srcAdr; cp.adr2 = myAdr;
1011  sendCpReq(cp,nmAdr);
1012  }
1013  }
1014  // send ack back to sender
1015  p.flags |= Forest::ACK_FLAG; p.dstAdr = p.srcAdr; p.srcAdr = myAdr;
1016  p.pack();
1017  iop->send(px,inLnk);
1018  return;
1019 }
1020 
1025 void RouterCore::handleCtlPkt(int px) {
1026  Packet& p = ps->getPacket(px);
1027  CtlPkt cp(p.payload(), p.length - Packet::OVERHEAD);
1028 
1029  if (p.type != Forest::NET_SIG || p.comtree != Forest::NET_SIG_COMT) {
1030  ps->free(px); return;
1031  }
1032  if (!cp.unpack()) {
1033  string s;
1034  cerr << "RouterCore::handleCtlPkt: misformatted control "
1035  " packet\n" << p.toString(s);
1036  cp.reset(cp.type,CtlPkt::NEG_REPLY,cp.seqNum);
1037  cp.mode = CtlPkt::NEG_REPLY;
1038  cp.errMsg = "misformatted control packet";
1039  returnToSender(px,cp);
1040  return;
1041  }
1042  if (cp.mode != CtlPkt::REQUEST) {
1043  handleCpReply(px,cp); return;
1044  }
1045 
1046  // Prepare positive reply packet for use where appropriate
1047  CtlPkt reply(cp.type,CtlPkt::POS_REPLY,cp.seqNum);
1048 
1049  switch (cp.type) {
1050 
1051  // configuring logical interfaces
1052  case CtlPkt::ADD_IFACE: addIface(cp,reply); break;
1053  case CtlPkt::DROP_IFACE: dropIface(cp,reply); break;
1054  case CtlPkt::GET_IFACE: getIface(cp,reply); break;
1055  case CtlPkt::MOD_IFACE: modIface(cp,reply); break;
1056 
1057  // configuring links
1058  case CtlPkt::ADD_LINK: addLink(cp,reply); break;
1059  case CtlPkt::DROP_LINK: dropLink(cp,reply); break;
1060  case CtlPkt::GET_LINK: getLink(cp,reply); break;
1061  case CtlPkt::MOD_LINK: modLink(cp,reply); break;
1062 
1063  // configuring comtrees
1064  case CtlPkt::ADD_COMTREE: addComtree(cp,reply); break;
1065  case CtlPkt::DROP_COMTREE: dropComtree(cp,reply); break;
1066  case CtlPkt::GET_COMTREE: getComtree(cp,reply); break;
1067  case CtlPkt::MOD_COMTREE: modComtree(cp,reply); break;
1068  case CtlPkt::ADD_COMTREE_LINK: addComtreeLink(cp,reply); break;
1069  case CtlPkt::DROP_COMTREE_LINK: dropComtreeLink(cp,reply); break;
1070  case CtlPkt::GET_COMTREE_LINK: getComtreeLink(cp,reply); break;
1071  case CtlPkt::MOD_COMTREE_LINK: modComtreeLink(cp,reply); break;
1072 
1073  // configuring routes
1074  case CtlPkt::ADD_ROUTE: addRoute(cp,reply); break;
1075  case CtlPkt::DROP_ROUTE: dropRoute(cp,reply); break;
1076  case CtlPkt::GET_ROUTE: getRoute(cp,reply); break;
1077  case CtlPkt::MOD_ROUTE: modRoute(cp,reply); break;
1078 
1079  // setting parameters
1080  case CtlPkt::SET_LEAF_RANGE: setLeafRange(cp,reply); break;
1081  //feng get link table info
1082  case CtlPkt::GET_LINK_SET: getLinkSet(px,reply); break;
1083  default:
1084  cerr << "unrecognized control packet type " << cp.type
1085  << endl;
1086  reply.errMsg = "invalid control packet for router";
1087  reply.mode = CtlPkt::NEG_REPLY;
1088  break;
1089  }
1090 
1091  returnToSender(px,reply);
1092 
1093  return;
1094 }
1095 
1107 bool RouterCore::addIface(CtlPkt& cp, CtlPkt& reply) {
1108  int iface = cp.iface;
1109  RateSpec rs(max(min(cp.rspec1.bitRateUp, Forest::MAXBITRATE),
1111  max(min(cp.rspec1.bitRateDown,Forest::MAXBITRATE),
1113  max(min(cp.rspec1.pktRateUp, Forest::MAXPKTRATE),
1115  max(min(cp.rspec1.pktRateDown,Forest::MAXPKTRATE),
1117  if (ift->valid(iface)) {
1118  if (cp.iface != ift->getIpAdr(iface) ||
1119  !rs.equals(ift->getRates(iface))) {
1120  reply.errMsg = "add iface: requested interface "
1121  "conflicts with existing interface";
1122  reply.mode = CtlPkt::NEG_REPLY;
1123  return false;
1124  }
1125  // means reply to earlier add iface was lost
1126  reply.ip1 = ift->getIpAdr(iface);
1127  reply.port1 = ift->getPort(iface);
1128  return true;
1129  } else if (!ift->addEntry(iface, cp.ip1, 0, rs)) {
1130  reply.errMsg = "add iface: cannot add interface";
1131  reply.mode = CtlPkt::NEG_REPLY;
1132  return false;
1133  } else if (!iop->setup(iface)) {
1134  reply.errMsg = "add iface: could not setup interface";
1135  reply.mode = CtlPkt::NEG_REPLY;
1136  return false;
1137  }
1138  reply.ip1 = ift->getIpAdr(iface);
1139  reply.port1 = ift->getPort(iface);
1140  return true;
1141 }
1142 
1143 bool RouterCore::dropIface(CtlPkt& cp, CtlPkt& reply) {
1144  ift->removeEntry(cp.iface);
1145  return true;
1146 }
1147 
1148 bool RouterCore::getIface(CtlPkt& cp, CtlPkt& reply) {
1149  int iface = cp.iface;
1150  if (ift->valid(iface)) {
1151  reply.iface = iface;
1152  reply.ip1 = ift->getIpAdr(iface);
1153  reply.port1 = ift->getPort(iface);
1154  reply.rspec1 = ift->getRates(iface);
1155  reply.rspec2 = ift->getAvailRates(iface);
1156  return true;
1157  }
1158  reply.errMsg = "get iface: invalid interface";
1159  reply.mode = CtlPkt::NEG_REPLY;
1160  return false;
1161 }
1162 
1163 bool RouterCore::modIface(CtlPkt& cp, CtlPkt& reply) {
1164  int iface = cp.iface;
1165  if (ift->valid(iface)) {
1166  ift->getRates(iface) = cp.rspec1;
1167  return true;
1168  }
1169  reply.errMsg = "mod iface: invalid interface";
1170  reply.mode = CtlPkt::NEG_REPLY;
1171  return false;
1172 }
1173 
1174 bool RouterCore::addLink(CtlPkt& cp, CtlPkt& reply) {
1175  Forest::ntyp_t peerType = cp.nodeType;
1176  if (peerType == Forest::ROUTER && cp.adr1 == 0) {
1177  reply.errMsg = "add link: adding link to router, but no peer "
1178  "address supplied";
1179  reply.mode = CtlPkt::NEG_REPLY;
1180  return false;
1181  }
1182  int iface = cp.iface;
1183 
1184  int xlnk = lt->lookup(cp.ip1,cp.port1);
1185  if (xlnk != 0 || (cp.link != 0 && lt->valid(cp.link))) {
1186  if (cp.link != xlnk ||
1187  (peerType != lt->getPeerType(xlnk)) ||
1188  (cp.iface != lt->getIface(xlnk)) ||
1189  (cp.adr1 != 0 && cp.adr1 != lt->getPeerAdr(xlnk)) ||
1190  (cp.ip1 != 0 && cp.ip1 != ift->getIpAdr(iface)) ||
1191  (cp.port1 != 0 && cp.port1 != ift->getPort(iface))) {
1192  reply.errMsg = "add link: new link conflicts "
1193  "with existing link";
1194  reply.mode = CtlPkt::NEG_REPLY;
1195  return false;
1196  }
1197  // assume response to previous add link was lost
1198  reply.link = xlnk;
1199  reply.adr1 = lt->getPeerAdr(xlnk);
1200  reply.ip1 = lt->getPeerIpAdr(xlnk);
1201  return true;
1202  }
1203 
1204  // first ensure that the interface has enough
1205  // capacity to support a new link of minimum capacity
1208  RateSpec& availRates = ift->getAvailRates(iface);
1209  if (!rs.leq(availRates)) {
1210  reply.errMsg = "add link: requested link "
1211  "exceeds interface capacity";
1212  reply.mode = CtlPkt::NEG_REPLY;
1213  return false;
1214  }
1215 
1216  // setting up link
1217  // case 1: adding link to leaf; peer (ip,port) not known, use nonce
1218  // subcase - lnk, peer forest address is known
1219  // (for preconfigured leaf)
1220  // subcase - lnk, peer address to be assigned by router
1221  // case 2: adding link to router not yet up; port not known, use nonce
1222  // lnk, peer address specified
1223  // case 3: adding link to a router that is already up
1224  // lnk, peer address specified, peer (ip,port) specified
1225 
1226  // add table entry with (ip,port) or nonce
1227  // note: when lt->addEntry succeeds, link rates are
1228  // initialized to Forest minimum rates
1229  int lnk = lt->addEntry(cp.link,cp.ip1,cp.port1,cp.nonce);
1230  if (lnk == 0) {
1231  reply.errMsg = "add link: cannot add requested link";
1232  reply.mode = CtlPkt::NEG_REPLY;
1233  return false;
1234  }
1235 
1236  if (peerType == Forest::ROUTER) {
1237  lt->setPeerAdr(lnk,cp.adr1);
1238  } else { // case 1
1239  fAdr_t peerAdr = 0;
1240  if (cp.adr1 == 0) peerAdr = allocLeafAdr();
1241  else if (allocLeafAdr(cp.adr1)) peerAdr = cp.adr1;
1242  if (peerAdr == 0) {
1243  lt->removeEntry(lnk);
1244  reply.errMsg = "add link: cannot add link using "
1245  "specified address";
1246  reply.mode = CtlPkt::NEG_REPLY;
1247  return false;
1248  }
1249  lt->setPeerAdr(lnk,peerAdr);
1250  }
1251 
1252  availRates.subtract(rs);
1253  lt->setIface(lnk,iface);
1254  lt->setPeerType(lnk,peerType);
1255  lt->setConnectStatus(lnk,false);
1256  sm->clearLnkStats(lnk);
1257  if (peerType == Forest::ROUTER && cp.ip1 != 0 && cp.port1 != 0) {
1258  // link to a router that's already up, so connect
1260  }
1261 
1262  reply.link = lnk;
1263  reply.adr1 = lt->getPeerAdr(lnk);
1264  return true;
1265 }
1266 
1267 bool RouterCore::dropLink(CtlPkt& cp, CtlPkt& reply) {
1268  dropLink(cp.link);
1269  return true;
1270 }
1271 
1272 void RouterCore::dropLink(int lnk) {
1273  int *comtVec = new int[lt->getComtSet(lnk).size()];
1274  int i = 0;
1275  for (int comt : lt->getComtSet(lnk)) comtVec[i++] = comt;
1276  while (--i >= 0) {
1277  int ctx = comtVec[i];
1278  int cLnk = ctt->getComtLink(ctt->getComtree(ctx),lnk);
1279  dropComtreeLink(ctx,lnk,cLnk);
1280  }
1281  int iface = lt->getIface(lnk);
1282  ift->getAvailRates(iface).add(lt->getRates(lnk));
1283  lt->removeEntry(lnk);
1284  freeLeafAdr(lt->getPeerAdr(lnk));
1285 }
1286 
1287 bool RouterCore::getLink(CtlPkt& cp, CtlPkt& reply) {
1288  int link = cp.link;
1289  if (lt->valid(link)) {
1290  reply.link = link;
1291  reply.iface = lt->getIface(link);
1292  reply.ip1 = lt->getPeerIpAdr(link);
1293  reply.nodeType = lt->getPeerType(link);
1294  reply.port1 = lt->getPeerPort(link);
1295  reply.adr1 = lt->getPeerAdr(link);
1296  reply.rspec1 = lt->getRates(link);
1297  reply.rspec2 = lt->getAvailRates(link);
1298  return true;
1299  }
1300  reply.errMsg = "get link: invalid link number";
1301  reply.mode = CtlPkt::NEG_REPLY;
1302  return false;
1303 }
1304 
1305 bool RouterCore::modLink(CtlPkt& cp, CtlPkt& reply) {
1306  int link = cp.link;
1307  if (!lt->valid(link)) {
1308  reply.errMsg = "get link: invalid link number";
1309  reply.mode = CtlPkt::NEG_REPLY;
1310  return false;
1311  }
1312  reply.link = link;
1313  int iface = lt->getIface(link);
1314  if (cp.rspec1.isSet()) {
1315  RateSpec& linkRates = lt->getRates(link);
1316  RateSpec& ifAvail = ift->getAvailRates(iface);
1317  RateSpec delta = cp.rspec1;
1318  delta.subtract(linkRates);
1319  if (!delta.leq(ifAvail)) {
1320  string s;
1321  reply.errMsg = "mod link: request "
1322  + cp.rspec1.toString(s) +
1323  "exceeds interface capacity";
1324  reply.mode = CtlPkt::NEG_REPLY;
1325  return false;
1326  }
1327  ifAvail.subtract(delta);
1328  linkRates = cp.rspec1;
1329  qm->setLinkRates(link,cp.rspec1);
1330  cp.rspec1.scale(.9); // allocate at most 90% of link
1331  lt->getAvailRates(link) = cp.rspec1;
1332  }
1333  return true;
1334 }
1335 
1336 bool RouterCore::addComtree(CtlPkt& cp, CtlPkt& reply) {
1337  int comt = cp.comtree;
1338  if(ctt->validComtree(comt) || ctt->addEntry(comt) != 0)
1339  return true;
1340  reply.errMsg = "add comtree: cannot add comtree";
1341  reply.mode = CtlPkt::NEG_REPLY;
1342  return false;
1343 }
1344 
1345 bool RouterCore::dropComtree(CtlPkt& cp, CtlPkt& reply) {
1346  int comt = cp.comtree;
1347  int ctx = ctt->getComtIndex(comt);
1348  if (!ctt->validComtIndex(ctx))
1349  return true; // so dropComtree op is idempotent
1350 
1351  // remove all routes involving this comtree
1352  // also degisters each route in the comtree table
1353  rt->purgeRoutes(comt);
1354 
1355  // remove all the comtree links
1356  // first, copy out the comtree links, then remove them
1357  // two step process is needed because dropComtreeLink modifies linkSet
1358  set<int>& linkSet = ctt->getLinks(ctx);
1359  set<int>::iterator pp;
1360  int *clnks = new int[linkSet.size()]; int i = 0;
1361  for (pp = linkSet.begin(); pp != linkSet.end(); pp++) clnks[i++] = *pp;
1362  while (--i >= 0) {
1363  dropComtreeLink(ctx,ctt->getLink(clnks[i]),clnks[i]);
1364  }
1365  delete [] clnks;
1366 
1367  ctt->removeEntry(ctx); // and finally drop entry in comtree table
1368  return true;
1369 }
1370 
1371 bool RouterCore::getComtree(CtlPkt& cp, CtlPkt& reply) {
1372  comt_t comt = cp.comtree;
1373  int ctx = ctt->getComtIndex(comt);
1374  if (ctx == 0) {
1375  reply.errMsg = "get comtree: invalid comtree";
1376  reply.mode = CtlPkt::NEG_REPLY;
1377  return false;
1378  }
1379  reply.comtree = comt;
1380  reply.coreFlag = ctt->inCore(ctx);
1381  reply.link = ctt->getPlink(ctx);
1382  reply.count = ctt->getLinkCount(ctx);
1383  return true;
1384 }
1385 
1386 bool RouterCore::modComtree(CtlPkt& cp, CtlPkt& reply) {
1387  comt_t comt = cp.comtree;
1388  int ctx = ctt->getComtIndex(comt);
1389  if (ctx != 0) {
1390  if (cp.coreFlag >= 0)
1391  ctt->setCoreFlag(ctx,cp.coreFlag);
1392  if (cp.link != 0) {
1393  int plnk = cp.link;
1394  if (plnk != 0 && !ctt->isLink(ctx,plnk)) {
1395  reply.errMsg = "specified link does "
1396  "not belong to comtree";
1397  reply.mode = CtlPkt::NEG_REPLY;
1398  return false;
1399  }
1400  if (plnk != 0 && !ctt->isRtrLink(ctx,plnk)) {
1401  reply.errMsg = "specified link does "
1402  "not connect to a router";
1403  reply.mode = CtlPkt::NEG_REPLY;
1404  return false;
1405  }
1406  ctt->setPlink(ctx,plnk);
1407  }
1408  return true;
1409  }
1410  reply.errMsg = "modify comtree: invalid comtree";
1411  reply.mode = CtlPkt::NEG_REPLY;
1412  return false;
1413 }
1414 
1415 bool RouterCore::addComtreeLink(CtlPkt& cp, CtlPkt& reply) {
1416  comt_t comt = cp.comtree;
1417  int ctx = ctt->getComtIndex(comt);
1418  if (ctx == 0) {
1419  reply.errMsg = "add comtree link: invalid comtree";
1420  reply.mode = CtlPkt::NEG_REPLY;
1421  return false;
1422  }
1423  int lnk = 0;
1424  if (cp.link != 0) {
1425  lnk = cp.link;
1426  } else if (cp.ip1 != 0 && cp.port1 != 0) {
1427  lnk = lt->lookup(cp.ip1, cp.port1);
1428  } else if (cp.adr1 != 0) {
1429  lnk = lt->lookup(cp.adr1);
1430  }
1431  if (!lt->valid(lnk)) {
1432  reply.errMsg = "add comtree link: invalid link or "
1433  "peer IP and port";
1434  reply.mode = CtlPkt::NEG_REPLY;
1435  return false;
1436  }
1437  bool isRtr = (lt->getPeerType(lnk) == Forest::ROUTER);
1438  bool isCore = false;
1439  if (isRtr) {
1440  if (cp.coreFlag < 0) {
1441  reply.errMsg = "add comtree link: must specify "
1442  "core flag on links to routers";
1443  reply.mode = CtlPkt::NEG_REPLY;
1444  return false;
1445  }
1446  isCore = cp.coreFlag;
1447  }
1448  int cLnk = ctt->getComtLink(comt,lnk);
1449  if (cLnk != 0) {
1450  if (ctt->isRtrLink(cLnk) == isRtr &&
1451  ctt->isCoreLink(cLnk) == isCore) {
1452  reply.link = lnk;
1453  return true;
1454  } else {
1455  reply.errMsg = "add comtree link: specified "
1456  "link already in comtree";
1457  reply.mode = CtlPkt::NEG_REPLY;
1458  return false;
1459  }
1460  }
1461  // define new comtree link
1462  if (!ctt->addLink(ctx,lnk,isRtr,isCore)) {
1463  reply.errMsg = "add comtree link: cannot add "
1464  "requested comtree link";
1465  reply.mode = CtlPkt::NEG_REPLY;
1466  return false;
1467  }
1468  cLnk = ctt->getComtLink(comt,lnk);
1469 
1470  // add unicast route to cLnk if peer is a leaf or a router
1471  // in a different zip code
1472  fAdr_t peerAdr = lt->getPeerAdr(lnk);
1473  if (lt->getPeerType(lnk) != Forest::ROUTER) {
1474  int rtx = rt->getRteIndex(comt,peerAdr);
1475  if (rtx == 0) rt->addEntry(comt,peerAdr,cLnk);
1476  } else {
1477  int zipPeer = Forest::zipCode(peerAdr);
1478  if (zipPeer != Forest::zipCode(myAdr)) {
1479  fAdr_t dest = Forest::forestAdr(zipPeer,0);
1480  int rtx = rt->getRteIndex(comt,dest);
1481  if (rtx == 0) rt->addEntry(comt,dest,cLnk);
1482  }
1483  }
1484 
1485  // allocate queue and bind it to lnk and comtree link
1486  int qid = qm->allocQ(lnk);
1487  if (qid == 0) {
1488  ctt->removeLink(ctx,cLnk);
1489  reply.errMsg = "add comtree link: no queues "
1490  "available for link";
1491  reply.mode = CtlPkt::NEG_REPLY;
1492  return false;
1493  }
1494  ctt->setLinkQ(cLnk,qid);
1495 
1496  // adjust rates for link comtree and queue
1499  if (!minRates.leq(lt->getAvailRates(lnk))) {
1500  reply.errMsg = "add comtree link: request "
1501  "exceeds link capacity";
1502  reply.mode = CtlPkt::NEG_REPLY;
1503  return false;
1504  }
1505  lt->getAvailRates(lnk).subtract(minRates);
1506  ctt->getRates(cLnk) = minRates;
1507 
1508  qm->setQRates(qid,minRates);
1509  if (isRtr) qm->setQLimits(qid,500,1000000);
1510  else qm->setQLimits(qid,500,1000000);
1511  sm->clearQuStats(qid);
1512  reply.link = lnk;
1513  return true;
1514 }
1515 
1516 bool RouterCore::dropComtreeLink(CtlPkt& cp, CtlPkt& reply) {
1517  comt_t comt = cp.comtree;
1518  int ctx = ctt->getComtIndex(comt);
1519  if (ctx == 0) {
1520  reply.errMsg = "drop comtree link: invalid comtree";
1521  reply.mode = CtlPkt::NEG_REPLY;
1522  return false;
1523  }
1524  int lnk = 0;
1525  if (cp.link != 0) {
1526  lnk = cp.link;
1527  } else if (cp.ip1 != 0 && cp.port1 != 0) {
1528  lnk = lt->lookup(cp.ip1, cp.port1);
1529  } else if (cp.adr1 != 0) {
1530  lnk = lt->lookup(cp.adr1);
1531  }
1532  if (!lt->valid(lnk)) {
1533  reply.errMsg = "drop comtree link: invalid link "
1534  "or peer IP and port";
1535  reply.mode = CtlPkt::NEG_REPLY;
1536  return false;
1537  }
1538  int cLnk = ctt->getComtLink(comt,lnk);
1539  if (cLnk != 0) {
1540  dropComtreeLink(ctx,lnk,cLnk);
1541  }
1542  return true;
1543 }
1544 
1545 void RouterCore::dropComtreeLink(int ctx, int lnk, int cLnk) {
1546  // release the link bandwidth used by comtree link
1547  lt->getAvailRates(lnk).add(ctt->getRates(cLnk));
1548 
1549  // remove unicast route for this comtree
1550  fAdr_t peerAdr = lt->getPeerAdr(lnk);
1551  int comt = ctt->getComtree(ctx);
1552  if (lt->getPeerType(lnk) != Forest::ROUTER) {
1553  int rtx = rt->getRteIndex(comt,peerAdr);
1554  if (rtx != 0) rt->removeEntry(rtx);
1555  } else {
1556  int zipPeer = Forest::zipCode(peerAdr);
1557  if (zipPeer != Forest::zipCode(myAdr)) {
1558  fAdr_t dest = Forest::forestAdr(zipPeer,0);
1559  int rtx = rt->getRteIndex(comt,dest);
1560  if (rtx != 0) rt->removeEntry(rtx);
1561  }
1562  }
1563  // remove cLnk from multicast routes for this comtree
1564  // must first copy out the routes that are registered for
1565  // cLnk, then remove cLnk from all these routes;
1566  // two step process is needed because the removal of the link from
1567  // the route, also deregisters the route in the comtree table
1568  // causing the rteSet to change
1569  set<int>& rteSet = ctt->getRteSet(cLnk);
1570  set<int>::iterator rp;
1571  int *routes = new int[rteSet.size()];
1572  int i = 0;
1573  for (rp = rteSet.begin(); rp != rteSet.end(); rp++) routes[i++] = *rp;
1574  while (--i >= 0) rt->removeLink(routes[i],cLnk);
1575  delete [] routes;
1576 
1577  // release queue and remove link from comtree
1578  // (which also deregisters comtree with lnk)
1579  int qid = ctt->getLinkQ(cLnk);
1580  qm->freeQ(qid);
1581  if (!ctt->removeLink(ctx,cLnk)) {
1582  cerr << "dropComtreeLink: internal error detected "
1583  "final removeLink failed\n";
1584  }
1585 }
1586 
1587 bool RouterCore::modComtreeLink(CtlPkt& cp, CtlPkt& reply) {
1588  comt_t comt = cp.comtree;
1589  int ctx = ctt->getComtIndex(comt);
1590  if (ctx == 0) {
1591  reply.errMsg = "modify comtree link: invalid comtree";
1592  reply.mode = CtlPkt::NEG_REPLY;
1593  return false;
1594  }
1595  int lnk = cp.link;
1596  if (!lt->valid(lnk)) {
1597  reply.errMsg = "modify comtree link: invalid link number";
1598  reply.mode = CtlPkt::NEG_REPLY;
1599  return false;
1600  }
1601  int cLnk = ctt->getComtLink(comt,lnk);
1602  if (cLnk == 0) {
1603  reply.errMsg = "modify comtree link: specified link "
1604  "not defined in specified comtree";
1605  reply.mode = CtlPkt::NEG_REPLY;
1606  return false;
1607  }
1608 
1609  RateSpec rs = cp.rspec1;
1610  if (!rs.isSet()) return true;
1611  RateSpec diff = rs; diff.subtract(ctt->getRates(cLnk));
1612  if (!diff.leq(lt->getAvailRates(lnk))) {
1613  reply.errMsg = "modify comtree link: new rate spec "
1614  "exceeds available link capacity";
1615  return false;
1616  }
1617  lt->getAvailRates(lnk).subtract(diff);
1618  ctt->getRates(cLnk) = rs;
1619  return true;
1620 }
1621 
1622 bool RouterCore::getComtreeLink(CtlPkt& cp, CtlPkt& reply) {
1623  comt_t comt = cp.comtree;
1624  int ctx = ctt->getComtIndex(comt);
1625  if (ctx == 0) {
1626  reply.errMsg = "get comtree link: invalid comtree";
1627  reply.mode = CtlPkt::NEG_REPLY;
1628  return false;
1629  }
1630  int lnk = cp.link;
1631  if (!lt->valid(lnk)) {
1632  reply.errMsg = "get comtree link: invalid link number";
1633  reply.mode = CtlPkt::NEG_REPLY;
1634  return false;
1635  }
1636  int cLnk = ctt->getComtLink(comt,lnk);
1637  if (cLnk == 0) {
1638  reply.errMsg = "get comtree link: specified link "
1639  "not defined in specified comtree";
1640  reply.mode = CtlPkt::NEG_REPLY;
1641  return false;
1642  }
1643  reply.comtree = comt;
1644  reply.link = lnk;
1645  reply.queue = ctt->getLinkQ(cLnk);
1646  reply.adr1 = ctt->getDest(cLnk);
1647  reply.rspec1 = ctt->getRates(cLnk);
1648 
1649  return true;
1650 }
1651 
1652 bool RouterCore::addRoute(CtlPkt& cp, CtlPkt& reply) {
1653  comt_t comt = cp.comtree;
1654  if (!ctt->validComtree(comt)) {
1655  reply.errMsg = "comtree not defined at this router\n";
1656  reply.mode = CtlPkt::NEG_REPLY;
1657  return false;
1658  }
1659  fAdr_t dest = cp.adr1;
1660  if (!Forest::validUcastAdr(dest) && !Forest::mcastAdr(dest)) {
1661  reply.errMsg = "invalid address\n";
1662  reply.mode = CtlPkt::NEG_REPLY;
1663  return false;
1664  }
1665  int lnk = cp.link;
1666  int cLnk = ctt->getComtLink(comt,lnk);
1667  int rtx = rt->getRteIndex(comt,dest);
1668  if (rtx != 0) {
1669  if ((Forest::validUcastAdr(dest) && rt->getLink(rtx) == cLnk) ||
1670  (Forest::mcastAdr(dest) && rt->isLink(rtx,cLnk))) {
1671  return true;
1672  } else {
1673  reply.errMsg = "add route: requested route "
1674  "conflicts with existing route";
1675  reply.mode = CtlPkt::NEG_REPLY;
1676  return false;
1677  }
1678  } else if (rt->addEntry(comt, dest, lnk)) {
1679  return true;
1680  }
1681  reply.errMsg = "add route: cannot add route";
1682  reply.mode = CtlPkt::NEG_REPLY;
1683  return false;
1684 }
1685 
1686 bool RouterCore::dropRoute(CtlPkt& cp, CtlPkt& reply) {
1687  comt_t comt = cp.comtree;
1688  if (!ctt->validComtree(comt)) {
1689  reply.errMsg = "comtree not defined at this router\n";
1690  reply.mode = CtlPkt::NEG_REPLY;
1691  return false;
1692  }
1693  fAdr_t dest = cp.adr1;
1694  if (!Forest::validUcastAdr(dest) && !Forest::mcastAdr(dest)) {
1695  reply.errMsg = "invalid address\n";
1696  reply.mode = CtlPkt::NEG_REPLY;
1697  return false;
1698  }
1699  int rtx = rt->getRteIndex(comt,dest);
1700  rt->removeEntry(rtx);
1701  return true;
1702 }
1703 
1704 bool RouterCore::getRoute(CtlPkt& cp, CtlPkt& reply) {
1705  comt_t comt = cp.comtree;
1706  if (!ctt->validComtree(comt)) {
1707  reply.errMsg = "comtree not defined at this router\n";
1708  reply.mode = CtlPkt::NEG_REPLY;
1709  return false;
1710  }
1711  fAdr_t dest = cp.adr1;
1712  if (!Forest::validUcastAdr(dest) && !Forest::mcastAdr(dest)) {
1713  reply.errMsg = "invalid address\n";
1714  reply.mode = CtlPkt::NEG_REPLY;
1715  return false;
1716  }
1717  int rtx = rt->getRteIndex(comt,dest);
1718  if (rtx != 0) {
1719  reply.comtree = comt;
1720  reply.adr1 = dest;
1721  if (Forest::validUcastAdr(dest)) {
1722  int lnk = ctt->getLink(rt->getLink(rtx));
1723  reply.link = lnk;
1724  } else {
1725  reply.link = 0;
1726  }
1727  return true;
1728  }
1729  reply.errMsg = "get route: no route for specified address";
1730  reply.mode = CtlPkt::NEG_REPLY;
1731  return false;
1732 }
1733 //feng
1734 bool RouterCore::getLinkSet(pktx px, CtlPkt& reply) {
1735  reply.lt = lt;
1736  return true;
1737 }
1738 
1739 bool RouterCore::modRoute(CtlPkt& cp, CtlPkt& reply) {
1740  comt_t comt = cp.comtree;
1741  if (!ctt->validComtree(comt)) {
1742  reply.errMsg = "comtree not defined at this router\n";
1743  reply.mode = CtlPkt::NEG_REPLY;
1744  return false;
1745  }
1746  fAdr_t dest = cp.adr1;
1747  if (!Forest::validUcastAdr(dest) && !Forest::mcastAdr(dest)) {
1748  reply.errMsg = "invalid address\n";
1749  reply.mode = CtlPkt::NEG_REPLY;
1750  return false;
1751  }
1752  int rtx = rt->getRteIndex(comt,dest);
1753  if (rtx != 0) {
1754  if (cp.link != 0) {
1755  if (Forest::mcastAdr(dest)) {
1756  reply.errMsg = "modify route: cannot "
1757  "set link in multicast route";
1758  reply.mode = CtlPkt::NEG_REPLY;
1759  return false;
1760  }
1761  rt->setLink(rtx,cp.link);
1762  }
1763  return true;
1764  }
1765  reply.errMsg = "modify route: invalid route";
1766  reply.mode = CtlPkt::NEG_REPLY;
1767  return false;
1768 }
1769 
1778 bool RouterCore::setLeafRange(CtlPkt& cp, CtlPkt& reply) {
1779  if (!booting) {
1780  reply.errMsg = "attempting to set leaf address range when "
1781  "not booting";
1782  reply.mode = CtlPkt::NEG_REPLY;
1783  return false;
1784  }
1785  firstLeafAdr = cp.adr1;
1786  fAdr_t lastLeafAdr = cp.adr2;
1787  if (firstLeafAdr > lastLeafAdr) {
1788  reply.errMsg = "request contained empty leaf address range";
1789  reply.mode = CtlPkt::NEG_REPLY;
1790  return false;
1791  }
1792  leafAdr = new UiSetPair((lastLeafAdr - firstLeafAdr)+1);
1793  return true;
1794 }
1795 
1799 void RouterCore::sendConnDisc(int lnk, Forest::ptyp_t type) {
1800  pktx px = ps->alloc();
1801  Packet& p = ps->getPacket(px);
1802 
1803  uint64_t nonce = lt->getNonce(lnk);
1804  p.length = Forest::OVERHEAD + 8; p.type = type; p.flags = 0;
1805  p.comtree = Forest::CONNECT_COMT;
1806  p.srcAdr = myAdr; p.dstAdr = lt->getPeerAdr(lnk);
1807  p.payload()[0] = htonl((int) (nonce >> 32));
1808  p.payload()[1] = htonl((int) (nonce & 0xffffffff));
1809  p.inLink = lnk; // hack to force consistent routing of conn/disc
1810  p.pack();
1811 
1812  // save a record of the packet in pending map
1813  pair<uint64_t,CpInfo> cpp;
1814  cpp.first = 1; cpp.first <<= 63; cpp.first |= nonce;
1815  cpp.second.px = px; cpp.second.nSent = 1; cpp.second.timestamp = now;
1816  pending->insert(cpp);
1817 
1818  // now, make copy of packet and send the copy
1819  int copy = ps->fullCopy(px);
1820  if (copy == 0) {
1821  cerr << "RouterCore::sendConnect: no packets left in packet "
1822  "store\n";
1823  return;
1824  }
1825  iop->send(copy,lnk);
1826 }
1827 
1836 bool RouterCore::sendCpReq(CtlPkt& cp, fAdr_t dest) {
1837  pktx px = ps->alloc();
1838  if (px == 0) {
1839  cerr << "RouterCore::sendCpReq: no packets left in packet "
1840  "store\n";
1841  return false;
1842  }
1843  Packet& p = ps->getPacket(px);
1844 
1845  // pack cp into p, setting mode and seq number
1846  cp.mode = CtlPkt::REQUEST;
1847  cp.seqNum = seqNum;
1848  cp.payload = p.payload();
1849  if (cp.pack() == 0) {
1850  cerr << "RouterCore::sendCpReq: control packet packing error\n";
1851  return false;
1852  }
1853  p.length = Forest::OVERHEAD + cp.paylen;
1854  p.type = Forest::NET_SIG; p.flags = 0;
1856  p.srcAdr = myAdr; p.dstAdr = dest;
1857  p.inLink = 0;
1858  p.pack();
1859 
1860  // save a record of the packet in pending map
1861  pair<uint64_t,CpInfo> cpp;
1862  cpp.first = seqNum;
1863  cpp.second.px = px; cpp.second.nSent = 1; cpp.second.timestamp = now;
1864  pending->insert(cpp);
1865  seqNum++;
1866 
1867  // now, make copy of packet and send the copy
1868  int copy = ps->fullCopy(px);
1869  if (copy == 0) {
1870  cerr << "RouterCore::sendCpReq: no packets left in packet "
1871  "store\n";
1872  return false;
1873  }
1874  if (booting) {
1875  iop->send(copy,0);
1876  pktLog->log(copy,0,true,now);
1877  } else
1878  forward(copy,ctt->getComtIndex(p.comtree));
1879 
1880  return true;
1881 }
1882 
1889 void RouterCore::resendCpReq() {
1890  map<uint64_t,CpInfo>::iterator pp;
1891  for (pp = pending->begin(); pp != pending->end(); pp++) {
1892  if (now < pp->second.timestamp + 1000000000) continue;
1893  pktx px = pp->second.px;
1894  Packet& p = ps->getPacket(px);
1895  if (pp->second.nSent >= 3) { // give up on this packet
1896  string s;
1897  cerr << "RouterCore::resendCpReq: received no reply to "
1898  "control packet after three attempts\n"
1899  << p.toString(s);
1900  ps->free(px);
1901  pending->erase(pp->first);
1902  continue;
1903  }
1904  string s1;
1905  cout << "resending control packet\n" << p.toString(s1);
1906  // make copy of packet and send the copy
1907  pp->second.timestamp = now;
1908  pp->second.nSent++;
1909  pktx copy = ps->fullCopy(px);
1910  if (copy == 0) {
1911  cerr << "RouterCore::resendCpReq: no packets left in "
1912  "packet store\n";
1913  return;
1914  }
1915  if (p.type == Forest::CONNECT || p.type == Forest::DISCONNECT) {
1916  iop->send(copy,p.inLink);
1917  } else if (booting) {
1918  pktLog->log(copy,0,true,now);
1919  iop->send(copy,0);
1920  } else {
1921  forward(copy,ctt->getComtIndex(p.comtree));
1922  }
1923  }
1924 }
1925 
1935 void RouterCore::handleCpReply(pktx reply, CtlPkt& cpr) {
1936  map<uint64_t,CpInfo>::iterator pp = pending->find(cpr.seqNum);
1937  if (pp == pending->end()) {
1938  // this is a reply to a request we never sent, or
1939  // possibly, a reply to a request we gave up on
1940  ps->free(reply);
1941  return;
1942  }
1943  // an expected reply, so remove it from the map of pending requests
1944  ps->free(pp->second.px);
1945  pending->erase(pp);
1946 
1947  // and then handle it
1948  switch (cpr.type) {
1949  case CtlPkt::CLIENT_CONNECT: case CtlPkt::CLIENT_DISCONNECT:
1950  if (cpr.mode == CtlPkt::NEG_REPLY) {
1951  cerr << "RouterCore::handleCpReply: got negative reply "
1952  "to a connect or disconnect request: "
1953  << cpr.errMsg << endl;
1954  break;
1955  }
1956  // otherwise, nothing to do
1957  break;
1958  case CtlPkt::BOOT_ROUTER: {
1959  if (cpr.mode == CtlPkt::NEG_REPLY) {
1960  cerr << "RouterCore::handleCpReply: got "
1961  "negative reply to a boot request: "
1962  << cpr.errMsg << endl;
1963  break;
1964  }
1965  if (booting && !setup()) {
1966  cerr << "RouterCore::handleCpReply: setup failed after "
1967  "completion of boot phase\n";
1968  perror("");
1969  pktLog->write(cout);
1970  exit(1);
1971  }
1972  iop->closeBootSock();
1973  booting = false;
1974  break;
1975  }
1976  default:
1977  cerr << "RouterCore::handleCpReply: unexpected control packet "
1978  "type\n";
1979  break;
1980  }
1981  ps->free(reply);
1982 }
1983 
1989 void RouterCore::returnToSender(pktx px, CtlPkt& cp) {
1990  Packet& p = ps->getPacket(px);
1991  cp.payload = p.payload();
1992  int paylen = cp.pack();
1993  if (paylen == 0) {
1994  cerr << "RouterCore::returnToSender: control packet formatting "
1995  "error, zero payload length\n";
1996  ps->free(px);
1997  }
1998  p.length = (Packet::OVERHEAD + paylen);
1999  p.flags = 0;
2000  p.dstAdr = p.srcAdr;
2001  p.srcAdr = myAdr;
2002  p.pack();
2003 
2004  if (booting) {
2005  pktLog->log(px,0,true,now);
2006  iop->send(px,0);
2007  return;
2008  }
2009 
2010  int cLnk = ctt->getComtLink(p.comtree,p.inLink);
2011  int qn = ctt->getLinkQ(cLnk);
2012  if (!qm->enq(px,qn,now)) { ps->free(px); }
2013 }
2014 
2015 } // ends namespace