11 using namespace forest;
15 RouterInProc::RouterInProc(
Router *rtr1) : rtr(rtr1) {
16 nRdy = 0; maxSockNum = -1;
21 for (
int i = 1; i <= numThreads; i++) {
22 tpool[i].q.resize(100);
24 tpool[i].thred = thread(RouterControl::start,&tpool[i].rc);
27 comtSet =
new HashSet<comt_t>(Hash::hash_u32,numThreads,
false);
32 RouterInProc::~RouterInProc() {
45 nanoseconds temp = high_resolution_clock::now() -
rtr->
tZero;
50 Util::fatal(
"RouterInProc::run: could not initiate "
55 uint64_t statsTime = 0;
57 high_resolution_clock::time_point
59 while (finishTime.count == 0 ||
now < finishTime) {
60 temp = high_resolution_clock::now() -
rtr->
tZero;
68 pktx cx =
ps.clone(pp.first);
69 if (cx != 0) forward(cx);
71 }
else if (pp.first < 0) {
77 cp.
mode = CtlPkt::NO_REPLY;
90 ps->
free(px); didNothing =
false;
96 if (
now - statsTime > 300*1000000) {
103 this_thread::sleep_for(milliseconds(1));
108 bool RouterInProc::bootStart() {
112 cerr <<
"RouterInProc::bootStart: socket call failed\n";
117 cerr <<
"RouterInProc::bootStart: bind call failed, "
118 <<
"check boot IP address\n";
125 Util::fatal(
"RouterInProc::bootStart: no packets left");
129 cp.payoad = p.payload; cp.pack();
131 for (
int i = 0; i <= 3; i++) {
133 for (
int j = 0; j < 9; j++) {
134 this_thread::sleep_for(milliseconds(100));
136 if (rx == 0)
continue;
139 if (cpr.type == CtlPkt::BOOT_REQUEST &&
140 cpr.mode == CtlPkt::POS_REPLY)
156 if (px == 0)
return 0;
159 ipa_t sIpAdr; ipp_t sPort;
164 if (errno == EAGAIN) {
167 Util::fatal(
"RouterInProc::bootReceive:receive: error in "
195 }
while (rv == -1 && errno == EAGAIN && lim++ < 10);
197 Util::fatal(
"RouterInProc:: send: failure in sendto");
210 if (px == 0)
return false;
213 if (cp.
type == CtlPkt::BOOT_COMPLETE &&
214 cp.
mode == CtlPkt::REQUEST) {
215 cp.
mode = CtlPkt::POS_REPLY;
219 }
else if (cp.
type == CtlPkt::BOOT_ABORT &&
220 cp.
mode == CtlPkt::REQUEST) {
223 Util::fatal(
"RouterInProc::inBound: remote boot "
224 "aborted by NetMgr");
228 if (px == 0)
return false;
236 int ctx = ctt->getComtIndex(p.
comtree);
248 if (cp.
mode != CtlPkt::REQUEST) {
252 ps->
free(px);
return true;
260 if ((pktx sx = replyH->find(p.
srcAdr,cp.
seqNum)) != 0) {
265 if (scp.
mode != CtlPkt::REQUEST) {
267 pktx cx =
ps->clone(sx);
273 if (Forest::isSigComt(p.
comtree)) {
280 cp.
fmtError(
"to busy to handle request, "
291 pktx cx =
ps->clone(px);
307 cp.
fmtError(
"to busy to handle request,"
323 cerr <<
"RouterInProc::inBound: unrecognized packet "
335 pair<int,int> retp =
retQ.
deq();
337 int thx = retp.first();
338 pktx px = retp.second();
353 ps->
free(px);
return true;
356 forward(px);
return true;
359 if (cp.
mode == CtlPkt::REQUEST) {
364 pktx cx =
ps->clone(px);
372 pktx cx =
ps->clone(px);
375 if (sx != 0)
ps->
free(sx);
380 pktx RouterInProc::receive() {
383 for (
int i = ift->firstIface(); i != 0; i = ift->nextIface(i)) {
386 struct timeval zero; zero.tv_sec = zero.tv_usec = 0;
390 (fd_set *)NULL, (fd_set *)NULL, &zero);
391 }
while (
nRdy < 0 && cnt++ < 10);
393 cerr <<
"RouterInProc::receive: select failed "
394 << cnt-1 <<
" times\n";
397 fatal(
"RouterInProc::receive: select failed");
399 if (
nRdy == 0)
return 0;
417 cerr <<
"RouterInProc:receive: out of packets\n";
423 ipa_t sIpAdr; ipp_t sPort;
426 if (nbytes < 0) fatal(
"RouterInProc::receive: error in recvfrom call");
431 lnk = lt->lookup(sIpAdr, sPort);
434 uint64_t nonce = ntohl(p.payload()[2]); nonce <<= 32;
435 nonce |= ntohl(p.payload()[3]);
436 lnk = lt->lookup(nonce);
438 if (lnk == 0 ||
cIf != lt->getIface(lnk)) {
440 cerr <<
"RouterInProc::receive: bad packet: lnk=" << lnk <<
" "
479 if (inLink == 0)
return false;
481 int cLnk = ctt->getComtLink(ctt->getComtree(ctx),inLink);
496 if (lt->getPeerAdr(inLink) != p.
srcAdr)
return false;
500 fAdr_t dest = ctt->getDest(cLnk);
503 int comt = ctt->getComtree(ctx);
505 comt != (
int) Forest::CONNECT_COMT)
510 }
else if (ctx == 0) {