forest-net
an overlay networks for large-scale virtual worlds
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator
Substrate.cpp
Go to the documentation of this file.
1 
9 #include "Substrate.h"
10 
11 namespace forest {
12 
13 Substrate::Substrate(fAdr_t myAdr1, ipa_t myIp1, fAdr_t rtrAdr1, ipa_t rtrIp1,
14  ipp_t rtrPort1, uint64_t nonce1, int threadCount1,
15  void* (*handler1)(void*), int dgPort1, int listenPort1,
16  PacketStoreTs *ps1, Logger *logger1)
17  : myAdr(myAdr1), myIp(myIp1), rtrAdr(rtrAdr1), rtrIp(rtrIp1),
18  rtrPort(rtrPort1), nonce(nonce1), threadCount(threadCount1),
19  handler(handler1), dgPort(dgPort1),
20  listenPort(listenPort1), ps(ps1), logger(logger1) {
21  pool = new ThreadInfo[threadCount+1];
22  threads = new UiSetPair(threadCount);
23  inReqMap = new IdMap(threadCount);
24  outReqMap = new IdMap(threadCount);
25 
26  rtrReady = false;
27 }
28 
29 Substrate::~Substrate() {
30  if (dgSock > 0) close(dgSock);
31  if (listenSock > 0) close(listenSock);
32  delete [] pool; delete threads; delete inReqMap; delete outReqMap;
33 }
34 
35 bool Substrate::init() {
36  // setup thread pool for handling control packets
37  pthread_attr_t attr;
38  pthread_attr_init(&attr);
39  pthread_attr_setstacksize(&attr,4*PTHREAD_STACK_MIN);
40  size_t stacksize;
41  pthread_attr_getstacksize(&attr,&stacksize);
42  if (stacksize != 4*PTHREAD_STACK_MIN)
43  logger->log("Substrate:init: can't set stack size",4);
44  for (int t = 1; t <= threadCount; t++) {
45  if (!pool[t].qp.in.init() || !pool[t].qp.out.init()) {
46  logger->log("Substrate::init: cannot initialize "
47  "thread queues",2);
48  return false;
49  }
50  if (pthread_create(&pool[t].thid,&attr,handler,
51  (void *) &pool[t].qp) != 0) {
52  logger->log("Substrate::init: cannot create "
53  "thread pool",2);
54  return false;
55  }
56  }
57 
58  // setup sockets
59  dgSock = Np4d::datagramSocket();
60  if (dgSock < 0 || !Np4d::bind4d(dgSock,myIp,dgPort) ||
61  !Np4d::nonblock(dgSock)) {
62  return false;
63  }
64  listenSock = Np4d::streamSocket();
65  if (listenSock < 0) return false;
66  return Np4d::bind4d(listenSock,INADDR_ANY,listenPort) &&
67  Np4d::listen4d(listenSock) && Np4d::nonblock(listenSock);
68 }
69 
74 bool Substrate::run(int finTimeSec) {
75  seqNum = 1;
76  now = Misc::getTimeNs();
77  uint64_t finishTime = finTimeSec;
78  finishTime *= 1000000000; // convert to ns
79 
80  bool nothing2do; bool connected = false;
81  while (finishTime == 0 || now <= finishTime) {
82  nothing2do = true;
83  if (!connected && rtrReady) {
84  // this allows substrate to run before its
85  // access router has booted; used by NetMgr
86  if (!connect()) {
87  return false;
88  }
89  connected = true;
90  }
91  // check for connection requests from remote clients
92  int connSock = -1;
93  if ((connSock = Np4d::accept4d(listenSock)) > 0) {
94  // let handler know this is socket# for remote host
95  int t = threads->firstOut();
96  if (t != 0) {
97  threads->swap(t);
98  pool[t].qp.in.enq(-connSock);
99  } else {
100  logger->log("Substrate: thread pool is "
101  "exhausted",4);
102  }
103  }
104  // check for packets from the Forest net
105  pktx px = recvFromForest();
106  if (px != 0) {
107  inbound(px); nothing2do = false;
108  }
109 
110  // now handle outgoing packets from the thread pool
111 // can we avoid iteration? Simplest way to do this is to have threads
112 // share the outgoing queue, and include thread number with packet number.
113  for (int t = threads->firstIn(); t!=0; t = threads->nextIn(t)) {
114  if (!pool[t].qp.out.empty()) {
115  outbound(pool[t].qp.out.deq(),t);
116  nothing2do = false;
117  }
118  }
119 
120  // check for expired timeouts
121 // a heap based on timestamp would be better, although
122 // this is ok for the common case of a few worker threads
123  for (int t = threads->firstIn(); t != 0;
124  t = threads->nextIn(t)) {
125  if (pool[t].seqNum != 0 && pool[t].ts < now) {
126  outReqMap->dropPair(pool[t].seqNum);
127  pool[t].seqNum = 0;
128  }
129  }
130  if (nothing2do && threads->firstIn() == 0) usleep(1000);
131  sched_yield();
132  now = Misc::getTimeNs();
133  }
134  if (connected) return disconnect();
135  return true;
136 }
137 
141 void Substrate::inbound(pktx px) {
142  Packet& p = ps->getPacket(px);
143  if (p.type == Forest::CLIENT_SIG || p.type == Forest::NET_SIG) {
144  CtlPkt cp(p);
145  if (cp.mode == CtlPkt::REQUEST) {
146  int t = threads->firstOut();
147  uint64_t key = p.srcAdr;
148  key <<= 32; key += cp.seqNum;
149  if (inReqMap->validKey(key)) {
150  // in this case, we've got an active thread
151  // handling this request, so ignore duplicate
152  } else if (t != 0) { // assign thread
153  threads->swap(t);
154  inReqMap->addPair(key,t);
155  pool[t].seqNum = 0;
156  pool[t].qp.in.enq(px);
157  return;
158  } else {
159  logger->log("Substrate: thread pool is "
160  "exhausted",4);
161  }
162  } else {
163  // replies are returned to the thread that
164  // sent the request - outReqMap contains thread index
165  int t = outReqMap->getId(cp.seqNum);
166  if (t != 0) {
167  outReqMap->dropPair(cp.seqNum);
168  pool[t].seqNum = 0;
169  pool[t].qp.in.enq(px);
170  return;
171  }
172  }
173  }
174  ps->free(px);
175 }
176 
181 void Substrate::outbound(pktx px, int t) {
182  if (px == 0) { // means worker thread completed task
183  inReqMap->dropPair(inReqMap->getKey(t));
184  pool[t].qp.in.reset();
185  threads->swap(t);
186  return;
187  }
188  Packet& p = ps->getPacket(px);
189  CtlPkt cp(p.payload(),p.length-Forest::OVERHEAD);
190  cp.unpack();
191  if (cp.mode != CtlPkt::REQUEST) {
192  // just send it and return
193  sendToForest(px); return;
194  }
195  if (cp.seqNum == 1) {
196  // means this is a repeat of a pending
197  // outgoing request
198  if (outReqMap->validId(t)) {
199  cp.seqNum = outReqMap->getKey(t);
200  } else {
201  // in this case, reply has arrived but was not yet
202  // seen by thread so, suppress duplicate request
203  ps->free(px); return;
204  }
205  } else {
206  // first time for this request, assign seq number and
207  // remember it so we can route reply to correct thread
208  if (outReqMap->validId(t))
209  outReqMap->dropPair(outReqMap->getKey(t));
210  outReqMap->addPair(seqNum,t);
211  pool[t].seqNum = seqNum;
212  cp.seqNum = seqNum++;
213 // consider using timestamp to repeat outgoing requests,
214 // so worker thread does not need to
215 // requires heap to keep track of pending requests
216 // will need to make Dheap auto-expandable
217  }
218  cp.pack();
219  p.payErrUpdate();
220  // timeout used to purge old entries
221  pool[t].ts = now + 2000000000; // 2 sec timeout
222  sendToForest(px);
223 }
224 
229 pktx Substrate::recvFromForest() {
230  pktx px = ps->alloc();
231  if (px == 0) return 0;
232  Packet& p = ps->getPacket(px);
233 
234  ipa_t srcIp; ipp_t srcPort;
235  int nbytes = Np4d::recvfrom4d(dgSock,p.buffer,1500,srcIp,srcPort);
236  if (nbytes < Forest::OVERHEAD || !p.unpack()) {
237  ps->free(px); return 0;
238  }
239  p.tunIp = srcIp; p.tunPort = srcPort;
240 /*
241 string s;
242 cerr << "got packet from " << Np4d::ip2string(srcIp,s) << " " << srcPort << endl;
243 cerr << p.toString(s) << endl;
244 */
245 
246  return px;
247 }
248 
254 void Substrate::sendToForest(pktx px) {
255  Packet p = ps->getPacket(px); p.pack();
256  ipa_t ip; ipp_t port;
257  if (p.dstAdr == 0) { // send to tunnel, not router
258  ip = p.tunIp; port = p.tunPort;
259  } else {
260  ip = rtrIp; port = rtrPort;
261  }
262 /*
263 string s;
264 cerr << "substrate sending to " << Np4d::ip2string(ip,s) << " " << port << endl;
265 cerr << p.toString(s) << endl;
266 */
267  if (port == 0) fatal("Substrate::sendToForest: zero port number");
268  int rv = Np4d::sendto4d(dgSock,(void *) p.buffer, p.length,ip,port);
269  if (rv == -1) fatal("Substrate::sendToForest: failure in sendto");
270  ps->free(px);
271 }
272 
276 bool Substrate::connect() {
277  pktx px = ps->alloc();
278  Packet& p = ps->getPacket(px);
279 
280  p.payload()[0] = htonl((uint32_t) (nonce >> 32));
281  p.payload()[1] = htonl((uint32_t) (nonce & 0xffffffff));
282  p.length = Forest::OVERHEAD + 8; p.type = Forest::CONNECT; p.flags = 0;
283  p.comtree = Forest::CONNECT_COMT;
284  p.srcAdr = myAdr; p.dstAdr = rtrAdr;
285 
286  int resendTime = Misc::getTime();
287  int resendCount = 1;
288  while (true) {
289  int now = Misc::getTime();
290  if (now > resendTime) {
291  if (resendCount > 3) { ps->free(px); return false; }
292  sendToForest(px);
293  resendTime += 1000000;
294  resendCount++;
295  }
296  usleep(10000);
297  int rx = recvFromForest();
298  if (rx != 0) {
299  Packet& reply = ps->getPacket(rx);
300  if (reply.type == Forest::CONNECT &&
301  reply.flags == Forest::ACK_FLAG) {
302  ps->free(px); ps->free(rx);
303  return true;
304  }
305  }
306  }
307 }
308 
309 
312 bool Substrate::disconnect() {
313  pktx px = ps->alloc();
314  Packet& p = ps->getPacket(px);
315 
316  p.payload()[0] = htonl((uint32_t) (nonce >> 32));
317  p.payload()[1] = htonl((uint32_t) (nonce & 0xffffffff));
318  p.length = Forest::OVERHEAD + 8; p.type = Forest::DISCONNECT;
319  p.flags = 0; p.comtree = Forest::CONNECT_COMT;
320  p.srcAdr = myAdr; p.dstAdr = rtrAdr;
321 
322  int resendTime = Misc::getTime();
323  int resendCount = 1;
324  while (true) {
325  int now = Misc::getTime();
326  if (now > resendTime) {
327  if (resendCount > 3) { ps->free(px); return false; }
328  sendToForest(px);
329  resendTime += 1000000;
330  resendCount++;
331  }
332  usleep(10000);
333  int rx = recvFromForest();
334  if (rx != 0) {
335  Packet& reply = ps->getPacket(rx);
336  if (reply.type == Forest::DISCONNECT &&
337  reply.flags == Forest::ACK_FLAG) {
338  ps->free(px); ps->free(rx);
339  return true;
340  }
341  }
342  }
343 }
344 
345 } // ends namespace