forest-net
an overlay networks for large-scale virtual worlds
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator
ComtCtl.cpp
Go to the documentation of this file.
1 
9 #include "ComtCtl.h"
10 
11 using namespace forest;
12 
24 int main(int argc, char *argv[]) {
25  ipa_t nmIp, myIp; int firstComt, lastComt; uint32_t finTime;
26 
27  if (argc != 7 ||
28  (nmIp = Np4d::ipAddress(argv[1])) == 0 ||
29  (myIp = Np4d::ipAddress(argv[2])) == 0 ||
30  (sscanf(argv[3],"%d", &firstComt)) != 1 ||
31  (sscanf(argv[4],"%d", &lastComt)) != 1 ||
32  sscanf(argv[6],"%d", &finTime) != 1) {
33  fatal("usage: ComtCtl nmIp myIp firstComt lastComt topoFile "
34  "finTime");
35  }
36  if (!init(nmIp, myIp, firstComt, lastComt, argv[5])) {
37  fatal("ComtCtl: initialization failure");
38  }
39  sub->run(finTime);
40  cleanup();
41  exit(0);
42 }
43 
44 namespace forest {
45 
49 bool init(ipa_t nmIp1, ipa_t myIp1, int firstComt1, int lastComt1,
50  const char *topoFile) {
51  nmIp = nmIp1; myIp = myIp1;
52  firstComt = firstComt1; lastComt = lastComt1;
53 
54  int nPkts = 10000;
55  ps = new PacketStoreTs(nPkts+1);
56  pool = new ThreadInfo[TPSIZE+1];
57  threads = new UiSetPair(TPSIZE);
58  logger = new Logger();
59 
60  if (firstComt < 1 || lastComt < 1 || firstComt > lastComt) {
61  logger->log("init: invalid comtree range\n",2);
62  return false;
63  }
64 
65  // read NetInfo/ComtInfo from topology file
66  int maxNode = 5000; int maxLink = 10000;
67  int maxRtr = 4500; int maxCtl = 400;
68  maxComtree = 100000; // declared in header file
69  net = new NetInfo(maxNode, maxLink, maxRtr, maxCtl);
71  if (!comtrees->init()) {
72  logger->log("init: cannot initialize ComtInfo object",2);
73  return false;
74  }
75  ifstream fs; fs.open(topoFile);
76  if (fs.fail() || !net->read(fs) || !comtrees->read(fs)) {
77  logger->log("ComtCtl::init: could not read topology file, or "
78  "error in topology file",2);
79  return false;
80  }
81  fs.close();
82 
83  // Mark all pre-configured comtrees in the range this controller
84  // is responsible for as in-use".
85  comtSet = new UiSetPair((lastComt-firstComt)+1);
86  for (int ctx = comtrees->firstComtree(); ctx != 0;
87  ctx = comtrees->nextComtree(ctx)) {
88  int comt = comtrees->getComtree(ctx);
89  if (firstComt <= comt && comt <= lastComt) {
90  comtSet->swap((comt-firstComt)+1);
91  }
92  }
93 
94  // boot from net manager
95  fAdr_t rtrAdr; ipa_t rtrIp; ipp_t rtrPort; uint64_t nonce;
96  if (!bootMe(nmIp, myIp, nmAdr, myAdr, rtrAdr, rtrIp, rtrPort, nonce)) {
97  return false;
98  }
99 
100  // configure substrate
101  sub = new Substrate(myAdr,myIp,rtrAdr,rtrIp,rtrPort,nonce,
102  500, handler, 0, Forest::CC_PORT, ps, logger);
103  if (!sub->init()) {
104  logger->log("init: can't initialize substrate",2);
105  return false;
106  }
107  sub->setRtrReady(true);
108 
109  // initialize comtSetLock
110  if (pthread_mutex_init(&comtSetLock,NULL) != 0) {
111  logger->log("init: could not initialize comtree set lock",2);
112  return false;
113  }
114  return true;
115 }
116 
117 bool bootMe(ipa_t nmIp, ipa_t myIp, fAdr_t& nmAdr, fAdr_t& myAdr,
118  fAdr_t& rtrAdr, ipa_t& rtrIp, ipp_t& rtrPort, uint64_t& nonce) {
119 
120  // open boot socket
121  int bootSock = Np4d::datagramSocket();
122  if (bootSock < 0) return false;
123  if (!Np4d::bind4d(bootSock,myIp,0) || !Np4d::nonblock(bootSock)) {
124  close(bootSock);
125  return false;
126  }
127 
128  // setup boot leaf packet to net manager
129  Packet p; buffer_t buf1; p.buffer = &buf1;
130  CtlPkt cp(CtlPkt::BOOT_LEAF,CtlPkt::REQUEST,1,p.payload());
131  int plen = cp.pack();
132  if (plen == 0) { close(bootSock); return false; }
134  p.flags = 0; p.srcAdr = p.dstAdr = 0; p.comtree = Forest::NET_SIG_COMT;
135  p.pack();
136 
137  // setup reply packet
138  Packet reply; buffer_t buf2; reply.buffer = &buf2;
139  CtlPkt repCp;
140 
141  int resendTime = Misc::getTime();
142  ipa_t srcIp; ipp_t srcPort;
143  while (true) {
144  int now = Misc::getTime();
145  if (now > resendTime) {
146  if (Np4d::sendto4d(bootSock,(void *) p.buffer, p.length,
147  nmIp,Forest::NM_PORT) == -1) {
148  close(bootSock); return false;
149  }
150  resendTime += 1000000; // retry after 1 second
151  }
152  usleep(10000);
153  int nbytes = Np4d::recvfrom4d(bootSock,reply.buffer,1500,
154  srcIp, srcPort);
155  if (nbytes < 0) continue;
156  reply.unpack();
157 
158  // do some checking
159  if (srcIp != nmIp || reply.type != Forest::NET_SIG) {
160  logger->log("unexpected response to boot request",
161  2,reply);
162  close(bootSock); return false;
163  }
164  repCp.reset(reply);
165  if (repCp.type != CtlPkt::CONFIG_LEAF ||
166  repCp.mode != CtlPkt::REQUEST) {
167  logger->log("unexpected response from NetMgr",2,reply);
168  close(bootSock); return false;
169  }
170 
171  // unpack data from packet
172  myAdr = repCp.adr1; rtrAdr = repCp.adr2;
173  rtrIp = repCp.ip1; rtrPort = repCp.port1;
174  nonce = repCp.nonce;
175  nmAdr = reply.srcAdr;
176 
177  // send positive reply
178  repCp.reset(CtlPkt::CONFIG_LEAF,CtlPkt::POS_REPLY,repCp.seqNum,
179  repCp.payload);
180  plen = repCp.pack();
181  if (plen == 0) { close(bootSock); return false; }
182  reply.length = Forest::OVERHEAD + plen;
183  reply.srcAdr = myAdr; reply.dstAdr = nmAdr;
184  reply.pack();
185  if (Np4d::sendto4d(bootSock,(void *) reply.buffer, reply.length,
186  nmIp,Forest::NM_PORT) == -1) {
187  close(bootSock); return false;
188  }
189  break; // proceed to next step
190  }
191  // we have the configuration information, now just wait for
192  // final ack
193  while (true) {
194  int now = Misc::getTime();
195  if (now > resendTime) {
196  if (Np4d::sendto4d(bootSock,(void *) p.buffer, p.length,
197  nmIp,Forest::NM_PORT) == -1) {
198  close(bootSock); return false;
199  }
200  resendTime += 1000000; // retry after 1 second
201  }
202  int nbytes = Np4d::recvfrom4d(bootSock,reply.buffer,1500,
203  srcIp, srcPort);
204  if (nbytes < 0) { usleep(100000); continue; }
205  reply.unpack();
206  if (srcIp != nmIp || reply.type != Forest::NET_SIG) {
207  logger->log("unexpected response to boot request",
208  2,reply);
209  close(bootSock); return false;
210  }
211  // do some checking
212  repCp.reset(reply);
213  if (repCp.type == CtlPkt::CONFIG_LEAF &&
214  repCp.mode == CtlPkt::REQUEST) {
215  // our reply must have been lost, send it again
216  repCp.reset(CtlPkt::CONFIG_LEAF,CtlPkt::POS_REPLY,
217  repCp.seqNum);
218  plen = repCp.pack();
219  if (plen == 0) { close(bootSock); return false; }
220  reply.length = Forest::OVERHEAD + plen;
221  reply.srcAdr = myAdr; reply.dstAdr = nmAdr;
222  reply.pack();
223  if (Np4d::sendto4d(bootSock,(void *) reply.buffer,
224  reply.length,nmIp,Forest::NM_PORT)
225  == -1) {
226  close(bootSock); return false;
227  }
228  } else if (repCp.type != CtlPkt::BOOT_LEAF ||
229  repCp.mode != CtlPkt::POS_REPLY) {
230  logger->log("unexpected response from NetMgr",
231  2,reply);
232  close(bootSock); return false;
233  }
234  break; // success
235  }
236  close(bootSock); return true;
237 }
238 
239 void cleanup() {
240  pthread_mutex_destroy(&comtSetLock);
241  delete ps; delete comtrees; delete net; delete sub;
242 }
243 
261 void* handler(void *qp) {
262  Queue& inq = ((QueuePair *) qp)->in;
263  Queue& outq = ((QueuePair *) qp)->out;
264  CpHandler cph(&inq, &outq, myAdr, logger, ps);
265 
266  while (true) {
267  pktx px = inq.deq();
268  bool success = false;
269  if (px < 0) {
270  // in this case, p is really a negated socket number
271  // for a remote comtree display module
272  int sock = -px;
273  success = handleComtreeDisplay(sock);
274  } else {
275  Packet& p = ps->getPacket(px);
276  CtlPkt cp(p);
277  switch (cp.type) {
278  case CtlPkt::CLIENT_ADD_COMTREE:
279  success = handleAddComtReq(px,cp,cph);
280  break;
281  case CtlPkt::CLIENT_DROP_COMTREE:
282  success = handleDropComtReq(px,cp,cph);
283  break;
284  case CtlPkt::CLIENT_JOIN_COMTREE:
285  success = handleJoinComtReq(px,cp,cph);
286  break;
287  case CtlPkt::CLIENT_LEAVE_COMTREE:
288  success = handleLeaveComtReq(px,cp,cph);
289  break;
290  case CtlPkt::COMTREE_PATH:
291  success = handleComtPath(px,cp,cph);
292  break;
293  case CtlPkt::COMTREE_NEW_LEAF:
294  success = handleComtNewLeaf(px,cp,cph);
295  break;
296  case CtlPkt::COMTREE_PRUNE:
297  success = handleComtPrune(px,cp,cph);
298  break;
299  default:
300  cph.errReply(px,cp,"invalid control packet "
301  "type for ComtCtl");
302  break;
303  }
304  if (!success) {
305  string s;
306  logger->log("handler: operation failed",2,p);
307  }
308  }
309  ps->free(px); // release px now that we're done
310  outq.enq(0); // signal completion to main thread
311  }
312 }
313 
320 bool handleComtreeDisplay(int sock) {
321  NetBuffer buf(sock,1024);
322  //const int BLEN = 100;
323  //char buf[BLEN+1]; int i;
324  while (true) {
325  // wait for a request from comtreeDisplay
326  // these take three forms
327  //
328  // getNet
329  // getComtSet
330  // getComtree 1234
331  //
332  // each appears on a line by itself; the first
333  // requests the network topology, the second, a list
334  // of comtrees, and the third the comtree status for
335  // the specified comtree
336 
337  // read characters into buffer until a newline;
338  // expect remote display to send single comtree status request
339  // and wait for reply before sending another
340  string word;
341  if (!buf.readAlphas(word)) {
342  logger->log("handleComtreeDisplay: could not read "
343  "request from remote display",2);
344  return true;
345  }
346  if (word == "getNet") {
347  net->lock();
348  string netString; net->toString(netString);
349  net->unlock();
350  if (Np4d::sendString(sock,netString) < 0) {
351  logger->log("handleComtreeDisplay: unable to "
352  "send network topology to display",2);
353  return false;
354  }
355  } else if (word == "getComtSet") {
356  stringstream comtSet;
357  comtSet << "comtSet(";
358  int count = 0;
359  for (int ctx = comtrees->firstComtree(); ctx != 0;
360  ctx = comtrees->nextComtree(ctx)) {
361  comtSet << comtrees->getComtree(ctx) << ",";
362  count++;
363  }
364  string s = comtSet.str();
365  if (count > 0) s.erase(s.end()-1); // remove last comma
366  s += ")\n";
367  if (Np4d::sendString(sock,s) < 0) {
368  logger->log("handleComtreeDisplay: unable to "
369  "send comtree set to display",2);
370  return false;
371  }
372  } else if (word.compare("getComtree") == 0) {
373  int comt; string s;
374  if (!buf.readInt(comt)) {
375  s = "invalid comtree request\n";
376  }
377  int ctx = comtrees->getComtIndex(comt);
378  if (ctx == 0) {
379  // send back error message
380  s = "invalid comtree request\n";
381  } else {
382  comtrees->comtStatus2string(ctx,s);
383  }
384  comtrees->releaseComtree(ctx);
385  if (Np4d::sendString(sock,s) < 0) {
386  logger->log("handleComtreeDisplay: unable to "
387  "send comtree status update to "
388  "display",2);
389  return false;
390  }
391  } else {
392  logger->log("handleComtreeDisplay: unrecognized request "
393  + word + " from comtreeDisplay",2);
394  return false;
395  }
396  }
397 }
398 
409 bool handleAddComtReq(pktx px, CtlPkt& cp, CpHandler& cph) {
410  Packet& p = ps->getPacket(px);
411  if (cp.zipCode != 0) {
412  cph.errReply(px,cp,"missing required attribute");
413  return true;
414  }
415  int rootZip = cp.zipCode;
416 
417  int comt = newComtreeNum();
418  if (comt == 0) {
419  cph.errReply(px,cp,"no comtrees available to satisfy request");
420  return true;
421  }
422  int ctx = comtrees->addComtree(comt);
423  if (ctx == 0) {
424  releaseComtreeNum(comt);
425  cph.errReply(px,cp,"internal error prevents adding new "
426  "comtree");
427  logger->log("handleAddComt: addComtree() failed due to program "
428  "error\n",3);
429  return false;
430  }
431  // comtree ctx is now locked in comtrees
432 
433  // find router in specified zipCode
434  // if more than one, choose randomly (for now)
435  // ultimately, we may want to select the one with the
436  // most available capacity on its router-to-router links
437  vector<int> matches(100,0); int cnt = 0;
438  net->lock();
439  for (int rtr = net->firstRouter(); rtr != 0;
440  rtr = net->nextRouter(rtr)) {
441  if (Forest::zipCode(net->getNodeAdr(rtr)) == rootZip) {
442  matches[cnt++] = rtr;
443  }
444  }
445  net->unlock();
446  if (cnt == 0) {
447  releaseComtreeNum(comt);
448  comtrees->removeComtree(ctx); // releases lock on ctx
449  cph.errReply(px,cp,"network contains no router with specified "
450  "zip code");
451  return true;
452  }
453  int rootRtr = matches[randint(0,cnt-1)];
454  fAdr_t rootAdr = net->getNodeAdr(rootRtr);
455 
456  // configure root router to add comtree
457  CtlPkt repCp;
458  int reply = cph.addComtree(rtrAdr,comt,repCp);
459  if (reply == 0 || repCp.mode != CtlPkt::POS_REPLY) {
460  releaseComtreeNum(comt);
461  comtrees->removeComtree(ctx); // releases lock on ctx
462  cph.errReply(px,cp,(reply == 0 ?
463  "root router never replied" :
464  "root router could not add comtree"));
465  if (reply != 0) ps->free(reply);
466  return false;
467  }
468 
469  // modify comtree
470  reply = cph.modComtree(rtrAdr,comt,0,1,repCp);
471  if (reply == 0 || repCp.mode != CtlPkt::POS_REPLY) {
472  releaseComtreeNum(comt);
473  comtrees->removeComtree(ctx);
474  cph.errReply(px,cp,(reply == 0 ?
475  "root router never replied" :
476  "root router could not modify comtree"));
477  if (reply != 0) ps->free(reply);
478  return false;
479  }
480 
481  // now update local data structure to reflect addition
482  comtrees->addNode(ctx,rootAdr);
483  comtrees->addCoreNode(ctx,rootAdr);
484  comtrees->setRoot(ctx,rootAdr);
485  fAdr_t cliAdr = p.srcAdr;
486  comtrees->setOwner(ctx,cliAdr);
487  comtrees->releaseComtree(ctx);
488 
489  // send positive reply back to client
490  repCp.reset(cp.type, CtlPkt::POS_REPLY, cp.seqNum,repCp.payload);
491  cph.sendReply(repCp,p.srcAdr);
492 
493  return true;
494 }
495 
500 int newComtreeNum() {
501  pthread_mutex_lock(&comtSetLock);
502  int comt = comtSet->firstOut();
503  if (comt == 0) {
504  pthread_mutex_unlock(&comtSetLock);
505  return 0;
506  }
507  comtSet->swap(comt);
508  pthread_mutex_unlock(&comtSetLock);
509  comt += (firstComt - 1);
510  return comt;
511 }
512 
517 void releaseComtreeNum(int comt) {
518  comt -= (firstComt - 1);
519  pthread_mutex_lock(&comtSetLock);
520  if (comtSet->isIn(comt)) comtSet->swap(comt);
521  pthread_mutex_unlock(&comtSetLock);
522 }
523 
524 /* future refinement
525 add a refresh thread that periodically sends a refresh message to
526 each router saying, "I think comtree x exists at your location".
527 routers would record time of last refresh and drop any comtrees
528 for which refresh has not been received; can be a fairly long refresh
529 time, say ten minutes or more.
530 */
531 
541 bool handleDropComtReq(pktx px, CtlPkt& cp, CpHandler& cph) {
542  Packet& p = ps->getPacket(px);
543  if (cp.comtree == 0) {
544  cph.errReply(px,cp,"missing required attribute");
545  return true;
546  }
547  int comt = cp.comtree;
548  fAdr_t cliAdr = p.srcAdr;
549 
550  int ctx = comtrees->getComtIndex(comt);
551  if (ctx == 0) { // treat this case as a success, already removed
552  CtlPkt repCp(cp.type,CtlPkt::POS_REPLY,cp.seqNum);
553  cph.sendReply(cp,cliAdr);
554  return true;
555  }
556  if (cliAdr != comtrees->getOwner(ctx)) {
557  // eventually should base this decision on actual owner
558  // name not just address
559  comtrees->releaseComtree(ctx);
560  cph.errReply(px,cp,"only the owner can drop a comtree");
561  return true;
562  }
563 
564  // first, teardown comtrees at all routers
565  for (fAdr_t rtr = comtrees->firstRouter(ctx); rtr != 0;
566  rtr = comtrees->nextRouter(ctx,rtr)) {
567  teardownComtNode(ctx,rtr,cph);
568  }
569 
570  // next, unprovision and remove
571  net->lock();
572  comtrees->unprovision(ctx);
573  net->unlock();
574  comtrees->removeComtree(ctx);
575  releaseComtreeNum(comt);
576 
577  // send positive reply to client
578  CtlPkt repCp(cp.type,CtlPkt::POS_REPLY,cp.seqNum);
579  cph.sendReply(cp,p.srcAdr);
580  return true;
581 }
582 
595 bool handleComtPath(pktx px, CtlPkt& cp, CpHandler& cph) {
596  Packet& p = ps->getPacket(px);
597  fAdr_t cliRtrAdr = p.srcAdr;
598  comt_t comt = cp.comtree;
599 
600  net->lock();
601  int cliRtr = net->getNodeNum(cliRtrAdr);
602  if (cliRtr == 0) {
603  net->unlock();
604  cph.errReply(px,cp,"no such router");
605  return true;
606  }
607 
608  // acquire lock on comtree
609  // hold this lock as long as this operation is in progress
610  int ctx = comtrees->getComtIndex(comt);
611  if (ctx == 0) {
612  net->unlock();
613  cph.errReply(px,cp,"no such comtree");
614  return true;
615  }
616 
617  RateSpec leafDefRates = comtrees->getDefLeafRates(ctx);
618  RateSpec bbDefRates = comtrees->getDefBbRates(ctx);
619 
620  bool autoConfig = comtrees->getConfigMode(ctx);
621  RateSpec pathRates = (autoConfig ? leafDefRates : bbDefRates);
622  vector<int> path;
623  if (!comtrees->findRootPath(ctx,cliRtr,pathRates,path)) {
624  net->unlock(); comtrees->releaseComtree(ctx);
625  cph.errReply(px,cp,"cannot find path to comtree");
626  return true;
627  }
628  net->unlock(); comtrees->releaseComtree(ctx);
629 
630  // send positive reply to client router and return
631  CtlPkt repCp(cp.type, CtlPkt::POS_REPLY, cp.seqNum);
632  repCp.rspec1 = pathRates; repCp.rspec2 = leafDefRates;
633  repCp.ivec = path;
634  cph.sendReply(repCp,cliRtrAdr);
635  return true;
636 }
637 
647 bool handleComtNewLeaf(pktx px, CtlPkt& cp, CpHandler& cph) {
648  Packet& p = ps->getPacket(px);
649  fAdr_t cliRtrAdr = p.srcAdr;
650  comt_t comt = cp.comtree;
651  fAdr_t cliAdr = cp.adr1;
652  fAdr_t branchRtrAdr = cp.adr2;
653  vector<int>& path = cp.ivec;
654 
655  net->lock();
656  int cliRtr = net->getNodeNum(cliRtrAdr);
657  if (cliRtr == 0) {
658  net->unlock();
659  cph.errReply(px,cp,"no such router");
660  return true;
661  }
662 
663  // acquire lock on comtree
664  // hold this lock as long as this operation is in progress
665  int ctx = comtrees->getComtIndex(comt);
666  if (ctx == 0) {
667  net->unlock();
668  cph.errReply(px,cp,"no such comtree");
669  return true;
670  }
671 
672  if (comtrees->isComtLeaf(ctx,cliAdr)) {
673  // if client already in comtree, this is probably a
674  // second attempt, caused by a lost acknowledgment
675  net->unlock(); comtrees->releaseComtree(ctx);
676  CtlPkt repCp(cp.type,CtlPkt::POS_REPLY,cp.seqNum);
677  cph.sendReply(repCp,cliAdr);
678  return true;
679  }
680 
681  int len = path.size();
682  int top = -1; int topRtr = 0; // router just below branch router
683  if (len == 0) {
684  if (branchRtrAdr != cliRtrAdr ||
685  !comtrees->isComtRtr(ctx,cliRtrAdr)) {
686  net->unlock(); comtrees->releaseComtree(ctx);
687  cph.errReply(px,cp,"specified client and branch router "
688  "not consistent with comtree topology");
689  return true;
690  }
691  } else {
692  // find the branch router
693  fAdr_t radr = cliRtrAdr;
694  int r = net->getNodeNum(cliRtrAdr);
695  for (int i = 0; i < len; i++) {
696  int lnk = net->getLinkNum(r,path[i]);
697  if (radr == branchRtrAdr &&
698  comtrees->isComtRtr(ctx,radr)) {
699  top = i-1; break;
700  } else if (i == len-1) {
701  top = i; topRtr ...
702  topRtr = r;
703  r = net->getPeer(r,lnk);
704  radr = net->getNodeAdr(r);
705  }
706  if (top == -1) {
707  net->unlock(); comtrees->releaseComtree(ctx);
708  cph.errReply(px,cp,"specified branch router not "
709  "in path");
710  return true;
711  }
712  }
713  RateSpec flipped = cp.rspec2; flipped.flip();
714  // now, go down the path adding new routers and their links
715  // note: some of these routers may already be in comtree;
716  // addNode just returns normally in this case; we may change
717  // comtree topology as a result of this process
718  int r = topRtr; fAdr_t radr = net->getNodeAdr(r);
719  for (int i = top; i >= 0; i--) {
720  int lnk = net->getLinkNum(r,path[i]);
721  comtrees->addNode(ctx, radr);
722  comtrees->setPlink(ctx, radr, lnk);
723  comtrees->getLinkRates(ctx, radr) = flipped;
724  if (r == net->getLeft(lnk)) {
725  net->getAvailRates(lnk).subtract(flipped);
726  } else {
727  net->getAvailRates(lnk).subtract(cp.rspec1);
728  }
729  if (i > 0) {
730  r = net->getPeer(r,net->getLinkNum(r,path[i-1]));
731  radr = net->getNodeAdr(r);
732  }
733  }
734  // finally, add the new leaf
735  comtrees->addNode(ctx, cliAdr);
736  comtrees->setParent(ctx, cliAdr, cliRtrAdr, cp.link);
737  comtrees->getLinkRates(ctx, cliAdr) = cp.rspec1;
738 
739  net->unlock(); comtrees->releaseComtree(ctx);
740 
741  // send positive reply to router and return
742  CtlPkt repCp(cp.type, CtlPkt::POS_REPLY, cp.seqNum);
743  cph.sendReply(repCp,cliRtrAdr);
744  return true;
745 }
746 
755 bool handleComtPrune(pktx px, CtlPkt& cp, CpHandler& cph) {
756  Packet& p = ps->getPacket(px);
757  fAdr_t rtrAdr = p.srcAdr;
758  comt_t comt = cp.comtree;
759  fAdr_t pruneAdr = cp.adr1;
760 
761  net->lock();
762  int rtr = net->getNodeNum(rtrAdr);
763  if (rtr == 0) {
764  net->unlock();
765  cph.errReply(px,cp,"no such router");
766  return true;
767  }
768 
769  // acquire lock on comtree
770  // hold this lock as long as this operation is in progress
771  int ctx = comtrees->getComtIndex(comt);
772  if (ctx == 0) {
773  net->unlock();
774  cph.errReply(px,cp,"no such comtree");
775  return true;
776  }
777 
778  if (comtrees->isComtLeaf(ctx,pruneAdr)) {
779  // if pruneAdr is a leaf, remove it
780  comtrees->removeNode(ctx,pruneAdr);
781  } else if (comtrees->isComtRtr(ctx,pruneAdr)) {
782  if (pruneAdr != rtrAdr) {
783  net->unlock(); comtrees->releaseComtree(ctx);
784  cph.errReply(px,cp,"cannot prune a different router");
785  return true;
786  }
787  removeSubtree(ctx,rtrAdr);
788  }
789  net->unlock(); comtrees->releaseComtree(ctx);
790 
791  // send positive reply to router and return
792  CtlPkt repCp(cp.type, CtlPkt::POS_REPLY, cp.seqNum);
793  cph.sendReply(repCp,rtrAdr);
794  return true;
795 }
796 
805 void removeSubtree(int ctx, fAdr_t rtrAdr) {
806  int rtr = net->getNodeNum(rtrAdr);
807  if (comtrees->getLinkCnt(ctx,rtrAdr) > 1) {
808  // remove all children that are leaf nodes
809  vector<fAdr_t> dropVec;
810  for (fAdr_t ladr = comtrees->firstLeaf(ctx); ladr != 0;
811  ladr = comtrees->nextLeaf(ctx,ladr)) {
812  if (comtrees->getParent(ctx,ladr) == rtrAdr)
813  dropVec.push_back(ladr);
814  }
815  for (fAdr_t ladr : dropVec) comtrees->removeNode(ctx,ladr);
816  if (comtrees->getLinkCnt(ctx,rtrAdr) > 1) {
817  // if still have children, find and remove
818  for (fAdr_t cadr = comtrees->firstRouter(ctx); cadr !=0;
819  cadr = comtrees->nextRouter(ctx,cadr)) {
820  if (comtrees->getParent(ctx,cadr) == rtrAdr)
821  removeSubtree(ctx,cadr);
822  }
823  }
824  }
825  int lnk = comtrees->getPlink(ctx,rtrAdr);
826  RateSpec rs = comtrees->getLinkRates(ctx,rtrAdr);
827  if (rtr != net->getLeft(lnk)) rs.flip();
828  net->getAvailRates(lnk).add(rs);
829  comtrees->removeNode(ctx,rtrAdr);
830 }
831 
842 bool handleJoinComtReq(pktx px, CtlPkt& cp, CpHandler& cph) {
843  Packet& p = ps->getPacket(px);
844  fAdr_t cliAdr = p.srcAdr;
845  comt_t comt = cp.comtree;
846 
847  // find the client's router, based on its forest address
848  int cliRtr;
849  net->lock();
850  for (cliRtr = net->firstRouter(); cliRtr != 0;
851  cliRtr = net->nextRouter(cliRtr)) {
852  pair<fAdr_t,fAdr_t> leafRange;
853  net->getLeafRange(cliRtr,leafRange);
854  if (cliAdr >= leafRange.first && cliAdr <= leafRange.second)
855  break;
856  // replace this with an interval tree when we start using
857  // configurations with more than 50 routers
858  }
859  if (cliRtr == 0) {
860  net->unlock();
861  cph.errReply(px,cp,"can't find client's access router");
862  return false;
863  }
864  fAdr_t cliRtrAdr = net->getNodeAdr(cliRtr);
865  net->unlock();
866 
867  // acquire lock on comtree
868  // hold this lock as long as this operation is in progress
869  int ctx = comtrees->getComtIndex(comt);
870  if (ctx == 0) {
871  cph.errReply(px,cp,"no such comtree");
872  return true;
873  }
874 
875  if (comtrees->isComtLeaf(ctx,cliAdr)) {
876  // if client already in comtree, this is probably a
877  // second attempt, caused by a lost acknowledgment
878  comtrees->releaseComtree(ctx);
879  CtlPkt repCp(cp.type,CtlPkt::POS_REPLY,cp.seqNum);
880  cph.sendReply(repCp,cliAdr);
881  return true;
882  }
883 
884  list<LinkMod> path; // for new path added to comtree
885  list<LinkMod> modList; // for link modifications in rest of comtree
886  RateSpec bbDefRates, leafDefRates;
887  leafDefRates = comtrees->getDefLeafRates(ctx);
888  bbDefRates = comtrees->getDefBbRates(ctx);
889  bool autoConfig = comtrees->getConfigMode(ctx);
890  RateSpec pathRates = (autoConfig ? leafDefRates : bbDefRates);
891  int tryCount = 1; int branchRtr = 0;
892  while (true) {
893  net->lock(); // lock net while searching for path
894  branchRtr = comtrees->findPath(ctx,cliRtr,pathRates,path);
895  if (branchRtr == 0 || tryCount++ > 3) {
896  net->unlock();
897  comtrees->releaseComtree(ctx);
898  cph.errReply(px,cp,"cannot find path to comtree");
899  return true;
900  }
901  comtrees->addPath(ctx,path);
902  comtrees->adjustSubtreeRates(ctx,cliRtrAdr,leafDefRates);
903  if (autoConfig) {
904  if (comtrees->computeMods(ctx,modList)) {
905  comtrees->provision(ctx,modList);
906  } else {
907  leafDefRates.negate();
909  ctx,cliRtrAdr,leafDefRates);
910  leafDefRates.negate();
911  comtrees->removePath(ctx,path);
912  net->unlock();
913  comtrees->releaseComtree(ctx);
914  cph.errReply(px,cp,"cannot add required "
915  "capacity to comtree backbone");
916  return true;
917  }
918  }
919  net->unlock();
920  // we now have the path reserved in the internal data structure,
921  // so no other thread can interfere with completion
922 
923  // now configure routers on path and exit loop if successful
924  if (!setupPath(ctx,path,cph)) {
925  // could not configure path at some router
926  teardownPath(ctx,path,cph);
927  net->lock();
928  if (autoConfig) comtrees->unprovision(ctx,modList);
929  leafDefRates.negate();
931  ctx,cliRtrAdr,leafDefRates);
932  leafDefRates.negate();
933  comtrees->removePath(ctx,path);
934  net->unlock();
935  } else if (autoConfig &&
936  !modComtRates(ctx,modList,false,cph)) {
937  net->lock();
938  comtrees->unprovision(ctx,modList);
939  modComtRates(ctx,modList,true,cph);
940  leafDefRates.negate();
942  ctx,cliRtrAdr,leafDefRates);
943  leafDefRates.negate();
944  comtrees->removePath(ctx,path);
945  net->unlock();
946  teardownPath(ctx,path,cph);
947  } else { // all routers successfully configured
948  break;
949  }
950  path.clear(); modList.clear();
951  }
952 
953  // now add client to comtree
954  int llnk = setupClientLink(ctx,cliAdr,cliRtr,cph);
955  comtrees->addNode(ctx,cliAdr);
956  comtrees->setParent(ctx,cliAdr,cliRtrAdr,llnk);
957  comtrees->getLinkRates(ctx,cliAdr) = leafDefRates;
958 
959  if (llnk == 0 || !setComtLeafRates(ctx,cliAdr,cph)) {
960  // tear it all down and fail
961  comtrees->removeNode(ctx,cliAdr);
962  net->lock();
963  comtrees->unprovision(ctx,modList);
964  modComtRates(ctx,modList,true,cph);
965  leafDefRates.negate();
967  ctx,cliRtrAdr,leafDefRates);
968  comtrees->removePath(ctx,path);
969  net->unlock();
970  leafDefRates.negate();
971  teardownPath(ctx,path,cph);
972  comtrees->releaseComtree(ctx);
973  cph.errReply(px,cp,"cannot configure leaf node");
974  return true;
975  }
976  comtrees->releaseComtree(ctx);
977 
978  // finally, send positive reply to client and return
979  CtlPkt repCp(cp.type,CtlPkt::POS_REPLY, cp.seqNum);
980  cph.sendReply(repCp,cliAdr);
981  return true;
982 }
983 
990 bool handleLeaveComtReq(pktx px, CtlPkt& cp, CpHandler& cph) {
991  Packet& p = ps->getPacket(px);
992  fAdr_t cliAdr = p.srcAdr;
993  comt_t comt = cp.comtree;
994 
995  // find the client's router, based on its forest address
996  int cliRtr;
997  net->lock();
998  for (cliRtr = net->firstRouter(); cliRtr != 0;
999  cliRtr = net->nextRouter(cliRtr)) {
1000  pair<int,int> leafRange; net->getLeafRange(cliRtr,leafRange);
1001  if (cliAdr >= leafRange.first && cliAdr <= leafRange.second)
1002  break;
1003  }
1004  if (cliRtr == 0) {
1005  cph.errReply(px,cp,"can't find client's access router");
1006  logger->log("handleLeaveComt: cannot find client's access "
1007  "router in network topology\n",2,p);
1008  return false;
1009  }
1010  fAdr_t cliRtrAdr = net->getNodeAdr(cliRtr);
1011  net->unlock();
1012 
1013  // acquire the comtree lock
1014  int ctx = comtrees->getComtIndex(comt);
1015  if (ctx == 0) {
1016  cph.errReply(px,cp,"no such comtree");
1017  return true;
1018  }
1019 
1020  if (!comtrees->isComtLeaf(ctx,cliAdr)) {
1021  // if client is not in comtree, this is probably a
1022  // second attempt, caused by a lost acknowledgment
1023  comtrees->releaseComtree(ctx);
1024  CtlPkt repCp(cp.type,CtlPkt::POS_REPLY,cp.seqNum);
1025  cph.sendReply(repCp,cliAdr);
1026  return true;
1027  }
1028  teardownClientLink(ctx,cliAdr,cliRtr,cph);
1029 
1030  // reduce subtree rates in response to dropped client
1031  // then remove client from comtrees
1032  RateSpec rs;
1033  rs = comtrees->getLinkRates(ctx,cliAdr); rs.negate();
1034  net->lock();
1035  comtrees->adjustSubtreeRates(ctx,cliRtrAdr,rs);
1036  comtrees->removeNode(ctx,cliAdr);
1037 
1038  // for autoConfig case, modify backbone rates
1039  if (comtrees->getConfigMode(ctx)) {
1040  list<LinkMod> modList;
1041  // note, since we've adjusted subtree rates, computeMods
1042  // returns negative rates, so provisioning these rates
1043  // effectively reduces allocation
1044  comtrees->computeMods(ctx,modList);
1045  comtrees->provision(ctx,modList);
1046  modComtRates(ctx,modList,true,cph);
1047  }
1048 
1049  // now, find path used only to support this client
1050  list<LinkMod> path;
1051  fAdr_t rtrAdr = cliRtrAdr;
1052  int rtr = net->getNodeNum(rtrAdr);
1053  while (true) {
1054  int plnk = comtrees->getPlink(ctx,rtrAdr);
1055  int lnkCnt = comtrees->getLinkCnt(ctx,rtrAdr);
1056  if (plnk == 0 || (rtrAdr == cliRtrAdr && lnkCnt > 1) ||
1057  (rtrAdr != cliRtrAdr && lnkCnt > 2))
1058  break;
1059  RateSpec rs = comtrees->getLinkRates(ctx,rtrAdr);
1060  path.push_back(LinkMod(plnk,rtr,rs));
1061  rtr = net->getPeer(rtr,plnk);
1062  rtrAdr = net->getNodeAdr(rtr);
1063  }
1064 
1065  // drop path from comtrees object, release allocated capacity
1066  // and remove comtree at routers on path
1067  comtrees->removePath(ctx,path);
1068  net->unlock();
1069  teardownPath(ctx,path,cph);
1070  comtrees->releaseComtree(ctx);
1071 
1072  // send positive reply and return
1073  CtlPkt repCp(cp.type,CtlPkt::POS_REPLY,cp.seqNum);
1074  cph.sendReply(repCp,cliAdr);
1075  return true;
1076 }
1077 
1089 bool setupPath(int ctx, list<LinkMod>& path, CpHandler& cph) {
1090  // First, configure all routers on the path to add the comtree
1091  for (LinkMod& lm : path) {
1092  if (!setupComtNode(ctx,lm.child,cph)) {
1093  return false;
1094  }
1095  }
1096 
1097  // Next, add link at all routers and configure comtree attributes
1098  for (LinkMod& lm : path) {
1099  int parent = net->getPeer(lm.child,lm.lnk);
1100  if (!setupComtLink(ctx,lm.lnk,lm.child,cph)) {
1101  return false;
1102  }
1103  if (!setupComtLink(ctx,lm.lnk,parent,cph)) {
1104  return false;
1105  }
1106  if (!setupComtAttrs(ctx,lm.child,cph)) {
1107  return false;
1108  }
1109  if (!setComtLinkRates(ctx,lm.lnk,lm.child,cph)) {
1110  return false;
1111  }
1112  if (!setComtLinkRates(ctx,lm.lnk,parent,cph)) {
1113  return false;
1114  }
1115  }
1116  return true;
1117 }
1118 
1128 bool teardownPath(int ctx, list<LinkMod>& path, CpHandler& cph) {
1129  bool status = true;
1130  for (LinkMod& lm : path) {
1131  if (!teardownComtNode(ctx,lm.child,cph))
1132  status = false;
1133  }
1134  return status;
1135 }
1136 
1142 bool setupComtNode(int ctx, int rtr, CpHandler& cph) {
1143  CtlPkt repCp;
1144  int reply = cph.addComtree(net->getNodeAdr(rtr),
1145  comtrees->getComtree(ctx),repCp);
1146  if (reply == 0) return false;
1147  ps->free(reply);
1148  return repCp.mode == CtlPkt::POS_REPLY;
1149 }
1150 
1156 bool teardownComtNode(int ctx, int rtr, CpHandler& cph) {
1157  CtlPkt repCp;
1158  int reply = cph.dropComtree(net->getNodeAdr(rtr),
1159  comtrees->getComtree(ctx),repCp);
1160  if (reply == 0) return false;
1161  ps->free(reply);
1162  return repCp.mode == CtlPkt::POS_REPLY;
1163 }
1164 
1171 bool setupComtLink(int ctx, int lnk, int rtr, CpHandler& cph) {
1172  int parent = net->getPeer(rtr,lnk);
1173  CtlPkt repCp;
1174  int reply = cph.addComtreeLink(net->getNodeAdr(rtr),
1175  comtrees->getComtree(ctx),
1176  net->getLLnum(lnk,rtr),
1177  comtrees->isCoreNode(ctx,parent),repCp);
1178  if (reply == 0) return false;
1179  ps->free(reply);
1180  return repCp.mode == CtlPkt::POS_REPLY;
1181 }
1182 
1190 int setupClientLink(int ctx, fAdr_t cliAdr, int rtr, CpHandler& cph) {
1191  CtlPkt repCp;
1192  pktx reply = cph.addComtreeLink(net->getNodeAdr(rtr),
1193  comtrees->getComtree(ctx),
1194  cliAdr, repCp);
1195  if (reply == 0) return 0;
1196  ps->free(reply);
1197  return (repCp.mode == CtlPkt::POS_REPLY ? repCp.link : 0);
1198 }
1199 
1206 bool teardownClientLink(int ctx, fAdr_t cliAdr, int rtr, CpHandler& cph) {
1207  CtlPkt repCp;
1208  int reply = cph.dropComtreeLink(net->getNodeAdr(rtr),
1209  comtrees->getComtree(ctx),
1210  0, cliAdr, repCp);
1211  if (reply == 0) return false;
1212  ps->free(reply);
1213  return repCp.mode == CtlPkt::POS_REPLY;
1214 }
1215 
1221 bool setupComtAttrs(int ctx, int rtr, CpHandler& cph) {
1222  fAdr_t rtrAdr = net->getNodeAdr(rtr);
1223  int llnk = net->getLLnum(comtrees->getPlink(ctx,rtrAdr),rtr);
1224  CtlPkt repCp;
1225  int reply = cph.modComtree(rtrAdr, comtrees->getComtree(ctx), llnk,
1226  comtrees->isCoreNode(ctx,rtrAdr),repCp);
1227  if (reply == 0) return false;
1228  ps->free(reply);
1229  return repCp.mode == CtlPkt::POS_REPLY;
1230 }
1231 
1239 bool setComtLinkRates(int ctx, int lnk, int rtr, CpHandler& cph) {
1240  fAdr_t rtrAdr = net->getNodeAdr(rtr);
1241  fAdr_t peerAdr = net->getNodeAdr(net->getPeer(rtr,lnk));
1242  RateSpec rs;
1243  if (rtrAdr == comtrees->getChild(ctx,lnk)) {
1244  rs = comtrees->getLinkRates(ctx,rtrAdr);
1245  rs.flip(); // make up in comtree ratespec match input at rtr
1246  } else {
1247  rs = comtrees->getLinkRates(ctx,peerAdr);
1248  }
1249  CtlPkt repCp;
1250  int reply = cph.modComtreeLink( rtrAdr,
1251  comtrees->getComtree(ctx),
1252  net->getLLnum(lnk,rtr),rs,repCp);
1253  if (reply == 0) return false;
1254  ps->free(reply);
1255  if (repCp.mode == CtlPkt::POS_REPLY) return true;
1256 
1257  // router rejected, probably because available rate at router
1258  // is different from information stored in comtrees object;
1259  // such differences can occur, when we have multiple comtree
1260  // controllers; request available rate at router and use it
1261  // to update local information
1262  reply = cph.getLink(rtrAdr,net->getLLnum(lnk,rtr),repCp);
1263  if (reply == 0) return false;
1264  ps->free(reply);
1265  if (repCp.mode != CtlPkt::POS_REPLY) return false;
1266  if (repCp.rspec2.isSet()) {
1267  if (rtr == net->getLeft(lnk)) repCp.rspec2.flip();
1268  net->getAvailRates(lnk) = repCp.rspec2;
1269  }
1270  return false;
1271 }
1272 
1279 bool setComtLeafRates(int ctx, fAdr_t leafAdr, CpHandler& cph) {
1280  CtlPkt repCp;
1281  int reply = cph.modComtreeLink( comtrees->getParent(ctx, leafAdr),
1282  comtrees->getComtree(ctx),
1283  comtrees->getPlink(ctx,leafAdr),
1284  comtrees->getLinkRates(ctx,leafAdr),
1285  repCp);
1286  if (reply == 0) return false;
1287  ps->free(reply);
1288  return repCp.mode == CtlPkt::POS_REPLY;
1289 }
1290 
1300 bool modComtRates(int ctx, list<LinkMod>& modList, bool nostop,CpHandler& cph) {
1301  for (LinkMod& lm : modList) {
1302  if (!nostop && !setComtLinkRates(ctx,lm.lnk,lm.child,cph))
1303  return false;
1304  int parent = net->getPeer(lm.child,lm.lnk);
1305  if (!nostop && !setComtLinkRates(ctx,lm.lnk,parent,cph))
1306  return false;
1307  }
1308  return true;
1309 }
1310 
1311 } // ends namespace