forest-net
an overlay networks for large-scale virtual worlds
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator
ControlSender.cpp
1 
9 #include "CpHandler.h"
10 
11 namespace forest {
12 
20 pktx CpHandler::clientAddComtree(fAdr_t dest, int zipCode, CtlPkt& repCp) {
21  CtlPkt reqCp(CtlPkt::CLIENT_ADD_COMTREE,CtlPkt::REQUEST,0);
22  reqCp.zipCode = zipCode;
23  return sendRequest(reqCp,dest,repCp);
24 }
25 
34  CtlPkt reqCp(CtlPkt::CLIENT_DROP_COMTREE,CtlPkt::REQUEST,0);
35  reqCp.comtree = comt;
36  return sendRequest(reqCp,dest,repCp);
37 }
38 
49  ipa_t clientIp, ipp_t clientPort,CtlPkt& repCp) {
50  CtlPkt reqCp(CtlPkt::CLIENT_JOIN_COMTREE,CtlPkt::REQUEST,0);
51  reqCp.comtree = comt;
52  reqCp.ip1 = clientIp; reqCp.port1 = clientPort;
53  return sendRequest(reqCp,dest,repCp);
54 }
55 
66  ipa_t clientIp, ipp_t clientPort,CtlPkt& repCp) {
67  CtlPkt reqCp(CtlPkt::CLIENT_LEAVE_COMTREE,CtlPkt::REQUEST,0);
68  reqCp.comtree = comt;
69  reqCp.ip1 = clientIp; reqCp.port1 = clientPort;
70  return sendRequest(reqCp,dest,repCp);
71 }
72 
82 pktx CpHandler::addIface(fAdr_t dest, int iface, ipa_t ifip, RateSpec& rates,CtlPkt& repCp) {
83  CtlPkt reqCp(CtlPkt::ADD_IFACE,CtlPkt::REQUEST,0);
84  reqCp.iface = iface;
85  reqCp.ip1 = ifip;
86  reqCp.rspec1 = rates;
87  return sendRequest(reqCp,dest,repCp);
88 }
89 
97 pktx CpHandler::dropIface(fAdr_t dest, int iface,CtlPkt& repCp) {
98  CtlPkt reqCp(CtlPkt::DROP_IFACE,CtlPkt::REQUEST,0);
99  reqCp.iface = iface;
100  return sendRequest(reqCp,dest,repCp);
101 }
102 
112 pktx CpHandler::modIface(fAdr_t dest, int iface, ipa_t ifip, RateSpec& rates,CtlPkt& repCp) {
113  CtlPkt reqCp(CtlPkt::MOD_IFACE,CtlPkt::REQUEST,0);
114  reqCp.iface = iface;
115  reqCp.ip1 = ifip;
116  reqCp.rspec1 = rates;
117  return sendRequest(reqCp,dest,repCp);
118 }
119 
127 pktx CpHandler::getIface(fAdr_t dest, int iface,CtlPkt& repCp) {
128  CtlPkt reqCp(CtlPkt::GET_IFACE,CtlPkt::REQUEST,0);
129  reqCp.iface = iface;
130  return sendRequest(reqCp,dest,repCp);
131 }
132 
147 pktx CpHandler::addLink(fAdr_t dest,Forest::ntyp_t peerType, int iface, int lnk,
148  ipa_t peerIp, ipp_t peerPort, fAdr_t peerAdr,
149  uint64_t nonce, CtlPkt& repCp) {
150  CtlPkt reqCp(CtlPkt::ADD_LINK,CtlPkt::REQUEST,0);
151  reqCp.nodeType = peerType; reqCp.iface = iface; reqCp.link = lnk;
152  reqCp.ip1 = peerIp; reqCp.port1 = peerPort; reqCp.adr1 = peerAdr;
153  reqCp.nonce = nonce;
154  return sendRequest(reqCp,dest,repCp);
155 }
156 
168 pktx CpHandler::addLink(fAdr_t dest, Forest::ntyp_t peerType, int iface,
169  uint64_t nonce, CtlPkt& repCp) {
170  CtlPkt reqCp(CtlPkt::ADD_LINK,CtlPkt::REQUEST,0);
171  reqCp.nodeType = peerType; reqCp.iface = iface; reqCp.nonce = nonce;
172  return sendRequest(reqCp,dest,repCp);
173 }
174 
183 pktx CpHandler::dropLink(fAdr_t dest, int link, fAdr_t peerAdr, CtlPkt& repCp) {
184  CtlPkt reqCp(CtlPkt::DROP_LINK,CtlPkt::REQUEST,0);
185  if (link != 0) reqCp.link = link;
186  if (peerAdr != 0) reqCp.adr1 = peerAdr;
187  return sendRequest(reqCp,dest,repCp);
188 }
189 
198 pktx CpHandler::modLink(fAdr_t dest, int link, RateSpec& rates,CtlPkt& repCp) {
199  CtlPkt reqCp(CtlPkt::MOD_LINK,CtlPkt::REQUEST,0);
200  reqCp.link = link; reqCp.rspec1 = rates;
201  return sendRequest(reqCp,dest,repCp);
202 }
203 
211 pktx CpHandler::getLink(fAdr_t dest, int link,CtlPkt& repCp) {
212  CtlPkt reqCp(CtlPkt::GET_LINK,CtlPkt::REQUEST,0);
213  reqCp.link = link;
214  return sendRequest(reqCp,dest,repCp);
215 }
216 
226 pktx CpHandler::getLinkSet(fAdr_t dest, int link, int count, CtlPkt& repCp) {
227  CtlPkt reqCp(CtlPkt::GET_LINK_SET,CtlPkt::REQUEST,0);
228  reqCp.index1 = link; reqCp.count = count;
229  return sendRequest(reqCp,dest,repCp);
230 }
231 
241 pktx CpHandler::getComtreeSet(fAdr_t dest, int link, int count, CtlPkt& repCp) {
242  CtlPkt reqCp(CtlPkt::GET_COMTREE_SET,CtlPkt::REQUEST,0);
243  reqCp.index1 = link; reqCp.count = count;
244  return sendRequest(reqCp,dest,repCp);
245 }
246 
256 pktx CpHandler::getIfaceSet(fAdr_t dest, int iface, int count, CtlPkt& repCp) {
257  CtlPkt reqCp(CtlPkt::GET_IFACE_SET,CtlPkt::REQUEST,0);
258  reqCp.index1 = iface; reqCp.count = count;
259  return sendRequest(reqCp,dest,repCp);
260 }
261 
271 pktx CpHandler::getRouteSet(fAdr_t dest, int route, int count, CtlPkt& repCp) {
272  CtlPkt reqCp(CtlPkt::GET_ROUTE_SET,CtlPkt::REQUEST,0);
273  reqCp.index1 = route; reqCp.count = count;
274  return sendRequest(reqCp,dest,repCp);
275 }
276 
284 pktx CpHandler::addComtree(fAdr_t dest, comt_t comtree,CtlPkt& repCp) {
285  CtlPkt reqCp(CtlPkt::ADD_COMTREE,CtlPkt::REQUEST,0);
286  reqCp.comtree = comtree;
287  return sendRequest(reqCp,dest,repCp);
288 }
289 
297 pktx CpHandler::dropComtree(fAdr_t dest, comt_t comtree,CtlPkt& repCp) {
298  CtlPkt reqCp(CtlPkt::DROP_COMTREE,CtlPkt::REQUEST,0);
299  reqCp.comtree = comtree;
300  return sendRequest(reqCp,dest,repCp);
301 }
302 
312 pktx CpHandler::modComtree(fAdr_t dest, comt_t comtree, int pLink,
313  int coreFlag,CtlPkt& repCp) {
314  CtlPkt reqCp(CtlPkt::MOD_COMTREE,CtlPkt::REQUEST,0);
315  reqCp.comtree = comtree;
316  reqCp.link = pLink; reqCp.coreFlag = coreFlag;
317  return sendRequest(reqCp,dest,repCp);
318 }
319 
327 pktx CpHandler::getComtree(fAdr_t dest, comt_t comtree,CtlPkt& repCp) {
328  CtlPkt reqCp(CtlPkt::GET_COMTREE,CtlPkt::REQUEST,0);
329  reqCp.comtree = comtree;
330  return sendRequest(reqCp,dest,repCp);
331 }
332 
342 pktx CpHandler::addComtreeLink(fAdr_t dest, comt_t comtree, int link,
343  int peerCoreFlag,CtlPkt& repCp) {
344  CtlPkt reqCp(CtlPkt::ADD_COMTREE_LINK,CtlPkt::REQUEST,0);
345  reqCp.comtree = comtree; reqCp.link = link;
346  reqCp.coreFlag = peerCoreFlag;
347  return sendRequest(reqCp,dest,repCp);
348 }
349 
359 pktx CpHandler::addComtreeLink(fAdr_t dest, comt_t comtree, fAdr_t peerAdr,
360  CtlPkt& repCp) {
361  CtlPkt reqCp(CtlPkt::ADD_COMTREE_LINK,CtlPkt::REQUEST,0);
362  reqCp.comtree = comtree; reqCp.adr1 = peerAdr;
363  return sendRequest(reqCp,dest,repCp);
364 }
365 
393 pktx CpHandler::dropComtreeLink(fAdr_t dest, comt_t comtree, int link,
394  fAdr_t peerAdr, CtlPkt& repCp) {
395  CtlPkt reqCp(CtlPkt::DROP_COMTREE_LINK,CtlPkt::REQUEST,0);
396  reqCp.comtree = comtree;
397  if (link == 0 && peerAdr == 0) {
398  logger->log("CpHandler::dropLink: link, peerAdr both 0", 2);
399  return false;
400  }
401  if (link != 0) reqCp.link = link;
402  else if (peerAdr != 0) reqCp.adr1 = peerAdr;
403  return sendRequest(reqCp,dest,repCp);
404 }
405 
432 pktx CpHandler::modComtreeLink( fAdr_t dest, comt_t comtree, int link,
433  RateSpec& rates,CtlPkt& repCp) {
434  CtlPkt reqCp(CtlPkt::MOD_COMTREE_LINK,CtlPkt::REQUEST,0);
435  reqCp.comtree = comtree;
436  reqCp.link = link; reqCp.rspec1 = rates;
437  return sendRequest(reqCp,dest,repCp);
438 }
439 
448 pktx CpHandler::getComtreeLink( fAdr_t dest, comt_t comtree, int link,CtlPkt& repCp) {
449  CtlPkt reqCp(CtlPkt::GET_COMTREE_LINK,CtlPkt::REQUEST,0);
450  reqCp.comtree = comtree; reqCp.link = link;
451  return sendRequest(reqCp,dest,repCp);
452 }
453 
462 pktx CpHandler::addFilter(fAdr_t dest, CtlPkt& repCp) {
463  CtlPkt reqCp(CtlPkt::ADD_FILTER,CtlPkt::REQUEST,0);
464  return sendRequest(reqCp,dest,repCp);
465 }
466 
474 pktx CpHandler::dropFilter(fAdr_t dest, int filter, CtlPkt& repCp) {
475  CtlPkt reqCp(CtlPkt::DROP_FILTER,CtlPkt::REQUEST,0);
476  reqCp.index1 = filter;
477  return sendRequest(reqCp,dest,repCp);
478 }
479 
488 pktx CpHandler::modFilter(fAdr_t dest, int filter, string& filterString,
489  CtlPkt& repCp) {
490  CtlPkt reqCp(CtlPkt::MOD_FILTER,CtlPkt::REQUEST,0);
491  reqCp.index1 = filter; reqCp.stringData.assign(filterString);
492  return sendRequest(reqCp,dest,repCp);
493 }
494 
502 pktx CpHandler::getFilter(fAdr_t dest, int filter, CtlPkt& repCp) {
503  CtlPkt reqCp(CtlPkt::GET_FILTER,CtlPkt::REQUEST,0);
504  reqCp.index1 = filter;
505  return sendRequest(reqCp,dest,repCp);
506 }
507 
517 pktx CpHandler::getFilterSet(fAdr_t dest, int firstFilter, int count, CtlPkt& repCp) {
518  CtlPkt reqCp(CtlPkt::GET_FILTER_SET,CtlPkt::REQUEST,0);
519  reqCp.index1 = firstFilter; reqCp.count = count;
520  return sendRequest(reqCp,dest,repCp);
521 }
522 
530  CtlPkt reqCp(CtlPkt::GET_LOGGED_PACKETS,CtlPkt::REQUEST,0);
531  return sendRequest(reqCp,dest,repCp);
532 }
533 
542 pktx CpHandler::enablePacketLog(fAdr_t dest,bool on,bool local,CtlPkt& repCp) {
543  CtlPkt reqCp(CtlPkt::ENABLE_PACKET_LOG,CtlPkt::REQUEST,0);
544  reqCp.index1 = (on ? 1 : 0); reqCp.index2 = (local ? 1 : 0);
545  return sendRequest(reqCp,dest,repCp);
546 }
547 
556 pktx CpHandler::newSession(fAdr_t dest, ipa_t clientIp, RateSpec& rates, CtlPkt& repCp) {
557  CtlPkt reqCp(CtlPkt::NEW_SESSION,CtlPkt::REQUEST,0);
558  reqCp.ip1 = clientIp; reqCp.rspec1 = rates;
559  return sendRequest(reqCp,dest,repCp);
560 }
561 
571  CtlPkt& repCp) {
572  CtlPkt reqCp(CtlPkt::CANCEL_SESSION,CtlPkt::REQUEST,0);
573  reqCp.adr1 = clientAdr; reqCp.adr2 = rtrAdr;
574  return sendRequest(reqCp,dest,repCp);
575 }
576 
585 pktx CpHandler::clientConnect(fAdr_t dest, fAdr_t clientAdr, fAdr_t rtrAdr,CtlPkt& repCp) {
586  CtlPkt reqCp(CtlPkt::CLIENT_CONNECT,CtlPkt::REQUEST,0);
587  reqCp.adr1 = clientAdr; reqCp.adr2 = rtrAdr;
588  return sendRequest(reqCp,dest,repCp);
589 }
590 
600  CtlPkt reqCp(CtlPkt::CLIENT_DISCONNECT,CtlPkt::REQUEST,0);
601  reqCp.adr1 = clientAdr; reqCp.adr2 = rtrAdr;
602  return sendRequest(reqCp,dest,repCp);
603 }
604 
612  CtlPkt reqCp(CtlPkt::BOOT_ROUTER,CtlPkt::REQUEST,0);
613  return sendRequest(reqCp,dest,repCp);
614 }
615 
628  ipa_t rtrIp, ipp_t rtrPort, uint64_t nonce,
629  CtlPkt& repCp) {
630  CtlPkt reqCp(CtlPkt::CONFIG_LEAF,CtlPkt::REQUEST,0);
631  reqCp.adr1 = leafAdr; reqCp.adr2 = rtrAdr;
632  reqCp.ip1 = rtrIp; reqCp.port1 = rtrPort;
633  reqCp.nonce = nonce;
634  return sendRequest(reqCp,dest,repCp);
635 }
636 
646  CtlPkt& repCp) {
647  CtlPkt reqCp(CtlPkt::SET_LEAF_RANGE,CtlPkt::REQUEST,0);
648  reqCp.adr1 = first; reqCp.adr2 = last;
649  return sendRequest(reqCp,dest,repCp);
650 }
651 
658 pktx CpHandler::bootLeaf(fAdr_t dest, CtlPkt& repCp) {
659  CtlPkt reqCp(CtlPkt::BOOT_LEAF,CtlPkt::REQUEST,0);
660  return sendRequest(reqCp,dest,repCp);
661 }
662 
669  CtlPkt reqCp(CtlPkt::BOOT_COMPLETE,CtlPkt::REQUEST,0);
670  return sendRequest(reqCp,dest,repCp);
671 }
672 
679 pktx CpHandler::bootAbort(fAdr_t dest,CtlPkt& repCp) {
680  CtlPkt reqCp(CtlPkt::BOOT_ABORT,CtlPkt::REQUEST,0);
681  return sendRequest(reqCp,dest,repCp);
682 }
683 
696 pktx CpHandler::sendRequest(CtlPkt& cp, fAdr_t dest, CtlPkt& repCp) {
697  pktx px = ps->alloc();
698  if (px == 0) {
699  logger->log("CpHandler::sendRequest: no packets "
700  "left in packet store\n",4,cp);
701  // terminates
702  }
703  Packet& p = ps->getPacket(px);
704  // set default seq# for requests - tells main thread to use "next" seq#
705  if (cp.mode == CtlPkt::REQUEST) cp.seqNum = 0;
706  cp.payload = p.payload();
707  int plen = cp.pack();
708  if (plen == 0) {
709  logger->log("CpHandler::sendRequest: packing error\n",4,cp);
710  // terminates
711  }
712  p.length = plen + Forest::OVERHEAD;
713  if (cp.type < CtlPkt::CLIENT_NET_SIG_SEP) {
715  } else {
717  }
718  p.flags = 0; p.dstAdr = dest; p.srcAdr = myAdr;
719  p.tunIp = tunIp; p.tunPort = tunPort;
720  p.pack();
721 
722  if (cp.mode != CtlPkt::REQUEST) {
723  outq->enq(px); return 0;
724  }
725  pktx reply = sendAndWait(px,cp);
726  if (reply != 0) repCp.reset(ps->getPacket(reply));
727  ps->free(px);
728  return reply;
729 }
730 
741 int CpHandler::sendAndWait(pktx px, CtlPkt& cp) {
742  Packet& p = ps->getPacket(px);
743  p.srcAdr = myAdr; p.pack();
744 
745  // make copy of packet and send the copy
746  pktx copy = ps->fullCopy(px);
747  if (copy == 0) {
748  logger->log("CpHandler::sendAndWait: no packets "
749  "left in packet store\n",4,p);
750  // terminates
751  }
752 
753  outq->enq(copy);
754 
755  for (int i = 1; i < 3; i++) {
756  pktx reply = inq->deq(1000000000); // 1 sec timeout
757  if (reply == Queue::TIMEOUT) {
758  // no reply, make new copy and send
759  pktx retry = ps->fullCopy(px);
760  if (retry == 0) {
761  logger->log("CpHandler::sendAndWait: no "
762  "packets left in packet store\n",
763  4,p);
764  // terminates
765  }
766  Packet& pr = ps->getPacket(retry);
767  cp.payload = pr.payload();
768  cp.seqNum = 1; // tag retry as a repeat
769  cp.pack();
770  pr.payErrUpdate();
771  outq->enq(retry);
772  } else {
773  Packet& pr = ps->getPacket(reply);
774  CtlPkt repCp(pr.payload(),pr.length-Forest::OVERHEAD);
775  repCp.unpack();
776  if (repCp.mode == CtlPkt::NEG_REPLY) {
777  logger->log("CpHandler::sendAndWait: negative "
778  "reply (" + repCp.errMsg +
779  ") to control packet",1,pr);
780  }
781  return reply;
782  }
783  }
784  logger->log("CpHandler::sendAndWait: no response to control packet",
785  2,p);
786 
787  return 0;
788 }
789 
790 
791 
798  pktx px = ps->alloc();
799  if (px == 0) {
800  logger->log("CpHandler::sendRequest: no packets "
801  "left in packet store\n",4,cp);
802  // terminates
803  }
804  Packet& p = ps->getPacket(px);
805  cp.payload = p.payload();
806  int plen = cp.pack();
807  if (plen == 0) {
808  logger->log("CpHandler::sendRequest: packing error\n",4,cp);
809  // terminates
810  }
811  p.length = plen + Forest::OVERHEAD;
812  if (cp.type < CtlPkt::CLIENT_NET_SIG_SEP) {
814  } else {
816  }
817  p.flags = 0; p.dstAdr = dest; p.srcAdr = myAdr;
818  p.tunIp = tunIp; p.tunPort = tunPort;
819  p.pack();
820  outq->enq(px);
821 }
822 
828 void CpHandler::errReply(pktx px, CtlPkt& cp, const string& msg) {
829  Packet& p = ps->getPacket(px);
830 
831  pktx px1 = ps->alloc();
832  Packet& p1 = ps->getPacket(px1);
833  CtlPkt cp1(cp.type,CtlPkt::NEG_REPLY,cp.seqNum);
834  cp1.errMsg = msg;
835  cp1.payload = p1.payload();
836 
837  p1.length = Forest::OVERHEAD + cp1.pack();
838  p1.type = p.type; p1.flags = 0; p1.comtree = p.comtree;
839  p1.dstAdr = p.srcAdr; p1.srcAdr = myAdr;
840  p1.pack();
841 
842  outq->enq(px1);
843 }
844 
845 } // ends namespace
846