9 #include "RouterOutProc.h"
11 using namespace forest;
19 RouterOutProc::RouterOutProc(
Router *rtr1) : rtr(rtr1) {
22 RouterOutProc::~RouterOutProc() {
30 void RouterOutProc::run(seconds runTime) {
32 int controlCount = 20;
34 unique_lock ltLock(
rtr->
ltMtx,defer_lock);
35 unique_lock cttLock(
rtr->
cttMtx,defer_lock);
36 unique_lock rtLock(
rtr->
rtMtx, defer_lock);
37 lock(ltLock, cttLock, rtLock);
39 nanoseconds temp = high_resolution_clock::now() -
rtr->
tZero;
41 int64_t startTime =
now;
42 int64_t statsTime =
now;
43 int64_t finishTime = runTime.count();
44 finishTime *= 1000000000; finishTime +=
now;
45 while (runTime.count() == 0 ||
now < finishTime) {
47 nanoseconds temp = high_resolution_clock::now() -
rtr->
tZero;
57 lock(ltLock, cttLock, rtLock);
72 int64_t seqNum =htohl((p.payload())[0]);
74 seqNum |= ntohl((p.payload())[1]);
75 int rx = rptr->deleteMatch(seqNum);
80 handleRteReply(px,ctx);
97 pktLog->log(px,lnk,
true,
now);
103 if (
now > statsTime + 300*1000000) {
104 sm->record(
now); statsTime =
now; resendControl();
110 this_thread::sleep_for(chrono::milliseconds(1));
116 cout << sm->iPktCnt(0) <<
" packets received, "
117 << sm->oPktCnt(0) <<
" packets sent\n";
118 cout << sm->iPktCnt(-1) <<
" from routers, "
119 << sm->oPktCnt(-1) <<
" to routers\n";
120 cout << sm->iPktCnt(-2) <<
" from clients, "
121 << sm->oPktCnt(-2) <<
" to clients\n";
128 void RouterOutProc::forward(pktx px,
int ctx) {
134 sendRteReply(px,ctx);
148 multiSend(px,ctx,rtx);
157 (p.payload())[0] = htonl(p.
dstAdr);
171 multiSend(px,ctx,rtx);
185 void RouterOutProc::multiSend(pktx px,
int ctx,
int rtx) {
186 int qvec[nLnks];
int n = 0;
198 if (pZip == myZip && peerZip != myZip)
continue;
199 if (lnk == inLink)
continue;
209 if (lnk == inLink || lnk == pLink)
continue;
213 if (pLink != 0 && pLink != inLink) {
221 if (lnk == inLink)
continue;
227 if (n == 0) {
rtr->
ps->
free(px);
return; }
231 for (
int i = 0; i < n-1; i++) {
248 void RouterOutProc::sendRteReply(pktx px,
int ctx) {
261 (p1.payload())[0] = htonl(p.
dstAdr);
276 void RouterOutProc::handleRteReply(pktx px,
int ctx) {
281 sendRteReply(px,ctx);
282 int adr = ntohl((p.payload())[0]);
291 multiSend(px,ctx,rtx);
308 void RouterOutProc::subUnsub(pktx px,
int ctx) {
310 uint32_t *pp = p.payload();
328 bool propagate =
false;
332 int addcnt = ntohl(pp[2]);
333 if (addcnt < 0 || addcnt > 350 ||
337 for (
int i = 3; i <= addcnt + 2; i++) {
340 rtx =
rtr->
rt->getRteIndex(comt,addr);
344 }
else if (!rt->isLink(rtx,cLnk)) {
350 int dropcnt = ntohl(pp[addcnt+3]);
351 if (dropcnt < 0 || addcnt + dropcnt > 350 ||
355 for (
int i = addcnt + 4; i <= addcnt + dropcnt + 3; i++) {
359 if (rtx == 0)
continue;
369 if (propagate && !ctt->inCore(ctx) &&
rtr->
ctt->
getPlink(ctx) != 0) {
370 pp[0] = htonl((uint32_t) (seqNum >> 32));
371 pp[1] = htonl((uint32_t) (seqNum & 0xffffffff));
375 sendControl(px,seqNum++,ctt->getPlink(ctx));
393 void RouterOutProc::handleConnDisc(pktx px) {
402 uint64_t nonce = ntohl(p.payload()[0]);
403 nonce <<= 32; nonce |= ntohl(p.payload()[1]);
404 if (nonce !=
rtr->
lt->getNonce(inLnk)) {
rtr->
ps->
free(px);
return; }
406 if (
rtr->
lt->isConnected(inLnk) &&
413 lt->setConnectStatus(inLnk,
true);
415 CtlPkt cp(CtlPkt::CLIENT_CONNECT,CtlPkt::REQUEST,0);
420 rtr->
lt->setConnectStatus(inLnk,
false);
424 CtlPkt cp(CtlPkt::CLIENT_DISCONNECT,CtlPkt::REQUEST,0);
433 pktLog->log(px,inLnk,
true,
now); send(px,inLnk);
445 uint64_t nonce =
rtr->
lt->getNonce(lnk);
447 p.
comtree = Forest::CONNECT_COMT;
449 p.payload()[2] = htonl((uint32_t) (nonce >> 32));
450 p.payload()[3] = htonl((uint32_t) (nonce & 0xffffffff));
452 sendControl(px,nonce,lnk);
457 bool RouterOutProc::sendControl(pktx px,
int lnk) {
460 p.payload()[0] = htonl((uint32_t) (seqNum >> 32));
461 p.payload()[1] = htonl((uint32_t) (seqNum & 0xffffffff));
467 cerr <<
"RouterOutProc::sendControl: no packets left in packet "
478 / send a conn/disc packet
481 pktLog->log(cx,lnk,
true,
now); bootSend(cx,0);
482 }
else if (lnk != 0) {
483 int ctx;
int clnk;
int qid;
502 void RouterOutProc::resendControl() {
503 map<uint64_t,ControlInfo>::iterator pp;
504 list<uint64_t> dropList;
505 for (pp = pending->begin(); pp != pending->end(); pp++) {
506 if (now < pp->second.timestamp + 1000000000)
continue;
507 pktx px = pp->second.px;
510 if (pp->second.nSent >= 3) {
511 cerr <<
"RouterOutProc::resendControl: received no "
512 "reply to control packet after 3 attempts\n"
514 rtr->
ps->
free(px); dropList.push_front(pp->first);
518 pp->second.timestamp =
now;
522 cerr <<
"RouterOutProc::resendControl: no packets left in "
528 pktLog->log(cx,lnk,
true,
now); iop->send(cx,lnk);
529 }
else if (lnk != 0) {
530 int ctx;
int clnk;
int qid;
537 forward(cx,ctt->getComtIndex(comt));
541 for (uint64_t pid : dropList) pending->erase(pid);
552 void RouterOutProc::handleControlReply(pktx rx) {
559 pid = ntohl(reply.payload()[0]); pid <<= 32;
560 pid |= ntohl(reply.payload()[1]);
565 cerr <<
"RouterOutProc::handleControlReply: unexpected reply "
570 map<uint64_t,ControlInfo>::iterator pp = pending->find(pid);
571 if (pp == pending->end()) {
575 cerr <<
"RouterOutProc::handleControlReply: unexpected reply "
581 if (cpr.
mode == CtlPkt::NEG_REPLY) {
583 cerr <<
"RouterOutProc::handleControlReply: got negative "
586 cerr <<
"reply=" << reply.
toString(s);
587 }
else if (cpr.
type == CtlPkt::BOOT_ROUTER) {
588 if (booting && !setup()) {
589 cerr <<
"RouterOutProc::handleControlReply: "
590 "setup failed after completion of boot "
596 iop->closeBootSock();
610 void RouterOutProc::returnToSender(pktx px,
CtlPkt& cp) {
613 int paylen = cp.
pack();
615 cerr <<
"RouterOutProc::returnToSender: control packet formatting "
616 "error, zero payload length\n";
619 p.
length = (Packet::OVERHEAD + paylen);
626 pktLog->log(px,0,
true,
now);