13 Substrate::Substrate(
fAdr_t myAdr1, ipa_t myIp1,
fAdr_t rtrAdr1, ipa_t rtrIp1,
14 ipp_t rtrPort1, uint64_t nonce1,
int threadCount1,
15 void* (*handler1)(
void*),
int dgPort1,
int listenPort1,
16 PacketStoreTs *ps1, Logger *logger1)
18 rtrPort(rtrPort1), nonce(nonce1), threadCount(threadCount1),
19 handler(handler1), dgPort(dgPort1),
20 listenPort(listenPort1),
ps(ps1),
logger(logger1) {
21 pool =
new ThreadInfo[threadCount+1];
22 threads =
new UiSetPair(threadCount);
23 inReqMap =
new IdMap(threadCount);
24 outReqMap =
new IdMap(threadCount);
29 Substrate::~Substrate() {
30 if (dgSock > 0) close(dgSock);
31 if (listenSock > 0) close(listenSock);
32 delete []
pool;
delete threads;
delete inReqMap;
delete outReqMap;
38 pthread_attr_init(&attr);
39 pthread_attr_setstacksize(&attr,4*PTHREAD_STACK_MIN);
41 pthread_attr_getstacksize(&attr,&stacksize);
42 if (stacksize != 4*PTHREAD_STACK_MIN)
43 logger->
log(
"Substrate:init: can't set stack size",4);
44 for (
int t = 1; t <= threadCount; t++) {
45 if (!
pool[t].qp.in.init() || !
pool[t].
qp.out.init()) {
46 logger->
log(
"Substrate::init: cannot initialize "
50 if (pthread_create(&
pool[t].thid,&attr,handler,
51 (
void *) &
pool[t].qp) != 0) {
52 logger->
log(
"Substrate::init: cannot create "
59 dgSock = Np4d::datagramSocket();
60 if (dgSock < 0 || !Np4d::bind4d(dgSock,myIp,dgPort) ||
61 !Np4d::nonblock(dgSock)) {
64 listenSock = Np4d::streamSocket();
65 if (listenSock < 0)
return false;
66 return Np4d::bind4d(listenSock,INADDR_ANY,listenPort) &&
67 Np4d::listen4d(listenSock) && Np4d::nonblock(listenSock);
74 bool Substrate::run(
int finTimeSec) {
76 now = Misc::getTimeNs();
77 uint64_t finishTime = finTimeSec;
78 finishTime *= 1000000000;
80 bool nothing2do;
bool connected =
false;
81 while (finishTime == 0 || now <= finishTime) {
83 if (!connected && rtrReady) {
93 if ((connSock = Np4d::accept4d(listenSock)) > 0) {
105 pktx px = recvFromForest();
107 inbound(px); nothing2do =
false;
114 if (!
pool[t].qp.out.empty()) {
115 outbound(
pool[t].qp.out.deq(),t);
123 for (
int t =
threads->firstIn(); t != 0;
125 if (
pool[t].seqNum != 0 &&
pool[t].ts < now) {
126 outReqMap->dropPair(
pool[t].seqNum);
130 if (nothing2do &&
threads->firstIn() == 0) usleep(1000);
132 now = Misc::getTimeNs();
134 if (connected)
return disconnect();
141 void Substrate::inbound(pktx px) {
143 if (p.
type == Forest::CLIENT_SIG || p.
type == Forest::NET_SIG) {
145 if (cp.
mode == CtlPkt::REQUEST) {
148 key <<= 32; key += cp.
seqNum;
149 if (inReqMap->validKey(key)) {
154 inReqMap->addPair(key,t);
165 int t = outReqMap->getId(cp.
seqNum);
167 outReqMap->dropPair(cp.
seqNum);
181 void Substrate::outbound(pktx px,
int t) {
183 inReqMap->dropPair(inReqMap->getKey(t));
191 if (cp.mode != CtlPkt::REQUEST) {
193 sendToForest(px);
return;
195 if (cp.seqNum == 1) {
198 if (outReqMap->validId(t)) {
199 cp.seqNum = outReqMap->getKey(t);
208 if (outReqMap->validId(t))
209 outReqMap->dropPair(outReqMap->getKey(t));
210 outReqMap->addPair(seqNum,t);
212 cp.seqNum = seqNum++;
221 pool[t].
ts = now + 2000000000;
229 pktx Substrate::recvFromForest() {
231 if (px == 0)
return 0;
234 ipa_t srcIp; ipp_t srcPort;
235 int nbytes = Np4d::recvfrom4d(dgSock,p.
buffer,1500,srcIp,srcPort);
236 if (nbytes < Forest::OVERHEAD || !p.
unpack()) {
254 void Substrate::sendToForest(pktx px) {
256 ipa_t ip; ipp_t port;
267 if (port == 0) fatal(
"Substrate::sendToForest: zero port number");
268 int rv = Np4d::sendto4d(dgSock,(
void *) p.
buffer, p.
length,ip,port);
269 if (rv == -1) fatal(
"Substrate::sendToForest: failure in sendto");
276 bool Substrate::connect() {
280 p.payload()[0] = htonl((uint32_t) (nonce >> 32));
281 p.payload()[1] = htonl((uint32_t) (nonce & 0xffffffff));
282 p.
length = Forest::OVERHEAD + 8; p.
type = Forest::CONNECT; p.
flags = 0;
283 p.
comtree = Forest::CONNECT_COMT;
286 int resendTime = Misc::getTime();
289 int now = Misc::getTime();
290 if (now > resendTime) {
291 if (resendCount > 3) {
ps->
free(px);
return false; }
293 resendTime += 1000000;
297 int rx = recvFromForest();
300 if (reply.
type == Forest::CONNECT &&
301 reply.
flags == Forest::ACK_FLAG) {
312 bool Substrate::disconnect() {
316 p.payload()[0] = htonl((uint32_t) (nonce >> 32));
317 p.payload()[1] = htonl((uint32_t) (nonce & 0xffffffff));
318 p.
length = Forest::OVERHEAD + 8; p.
type = Forest::DISCONNECT;
322 int resendTime = Misc::getTime();
325 int now = Misc::getTime();
326 if (now > resendTime) {
327 if (resendCount > 3) {
ps->
free(px);
return false; }
329 resendTime += 1000000;
333 int rx = recvFromForest();
336 if (reply.
type == Forest::DISCONNECT &&
337 reply.
flags == Forest::ACK_FLAG) {