forest-net
an overlay networks for large-scale virtual worlds
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator
Monitor.cpp
Go to the documentation of this file.
1 
9 #include "Monitor.h"
10 
11 using namespace forest;
12 
39 int main(int argc, char *argv[]) {
40  ipa_t cmIp, myIp; int finTime, worldSize;
41 
42  if (argc != 7 ||
43  (cmIp = Np4d::ipAddress(argv[1])) == 0 ||
44  (myIp = Np4d::ipAddress(argv[2])) == 0 ||
45  (sscanf(argv[3],"%d", &worldSize) != 1) ||
46  sscanf(argv[6],"%d", &finTime) != 1)
47  fatal("usage: Monitor myIp worldSize uname pword finTime");
48 
49  Monitor mon(cmIp, myIp,worldSize);
50  if (!mon.init(argv[4],argv[5])) {
51  fatal("Monitor: initialization failure");
52  }
53  mon.run(finTime);
54  exit(0);
55 }
56 
57 namespace forest {
58 
59 // Constructor for Monitor, allocates space and initializes private data
60 Monitor::Monitor(ipa_t cmIp1, ipa_t myIp1, int worldSize1) : cmIp(cmIp1),
61  myIp(myIp1), worldSize(min(worldSize1,MAX_WORLD)) {
62  int nPkts = 10000;
63  ps = new PacketStore(nPkts+1, nPkts+1);
64  mySubs = new set<int>;
65 
66  cornerX = 0; cornerY = 0;
67  viewSize = min(10,worldSize);
68  comt = 0;
69  switchState = IDLE;
70  seqNum = 0;
71 }
72 
73 Monitor::~Monitor() {
74  cout.flush();
75  if (listenSock > 0) close(listenSock);
76  delete ps; delete mySubs;
77 }
78 
82 bool Monitor::init(const string& uname, const string& pword) {
84  if (dgSock < 0 ||
87  return false;
88 
89  // login through client manager
90  if (!login(uname, pword)) return false;
91 
92  // setup external TCP socket for use by remote GUI
94  if (listenSock < 0) return false;
95  if (!Np4d::bind4d(listenSock,myIp,MON_PORT)) return false;
97 }
98 
105 bool Monitor::login(const string& uname, const string& pword) {
106  // open socket to client manager
107  int loginSock = Np4d::streamSocket();
108  if (loginSock < 0 ||
109  !Np4d::bind4d(loginSock, myIp, 0) ||
110  !Np4d::connect4d(loginSock,cmIp,Forest::CM_PORT)) {
111  cerr << "Avatar::login: cannot open/configure socket to "
112  "ClientMgr\n";
113  return false;
114  }
115  // start login dialog
116  string s = "login: " + uname + "\npassword: " + pword + "\nover\n";
117  Np4d::sendString(loginSock,s);
118  NetBuffer buf(loginSock,1024);
119  string s0, s1, s2;
120  if (!buf.readLine(s0) || s0 != "login successful" ||
121  !buf.readLine(s1) || s1 != "over")
122  return false;
123 
124  // proceed to new session dialog
125  s = "newSession\nover\n";
126  Np4d::sendString(loginSock,s);
127 
128  // process reply - expecting my forest address
129  if (!buf.readAlphas(s0) || s0 != "yourAddress" || !buf.verify(':') ||
130  !buf.readForestAddress(s1) || !buf.nextLine())
131  return false;
132  myAdr = Forest::forestAdr(s1.c_str());
133 
134  // continuing - expecting info for my forest access router
135  int port;
136  if (!buf.readAlphas(s0) || s0 != "yourRouter" || !buf.verify(':') ||
137  !buf.verify('(') || !buf.readIpAddress(s1) || !buf.verify(',') ||
138  !buf.readInt(port) || !buf.verify(',') ||
139  !buf.readForestAddress(s2) || !buf.verify(')') || !buf.nextLine())
140  return false;
141  rtrIp = Np4d::getIpAdr(s1.c_str());
142  rtrPort = (ipp_t) port;
143  rtrAdr = Forest::forestAdr(s2.c_str());
144 
145  // continuing - expecting address of comtree controller
146  if (!buf.readAlphas(s0) || s0 != "comtCtlAddress" || !buf.verify(':') ||
147  !buf.readForestAddress(s1) || !buf.nextLine())
148  return false;
149  ccAdr = Forest::forestAdr(s1.c_str());
150 
151  // continuing - expecting connection nonce
152  if (!buf.readAlphas(s0) || s0 != "connectNonce" || !buf.verify(':') ||
153  !buf.readInt(nonce) || !buf.nextLine())
154  return false;
155  if (!buf.readLine(s0) || s0 != "overAndOut")
156  return false;
157 
158  close(loginSock);
159 
160  cout << "avatar address=" << Forest::fAdr2string(myAdr,s) << endl;
161  cout << "router info= (" << Np4d::ip2string(rtrIp,s) << ",";
162  cout << rtrPort << "," << Forest::fAdr2string(rtrAdr,s) << ")\n";
163  cout << "comtCtl address=" << Forest::fAdr2string(ccAdr,s) << "\n";
164  cout << "nonce=" << nonce << endl;
165  return true;
166 }
167 
174 void Monitor::run(uint32_t finishTime) {
175  uint32_t now; // free-running microsecond time
176  uint32_t nextTime; // time of next operational cycle
177  now = nextTime = Misc::getTime();
178 
179  connSock = -1;
180  bool waiting4switch = false;
181  finishTime *= 1000000; // from seconds to microseconds
182  while (now <= finishTime) {
183  comt_t newComt = check4command();
184  if (newComt != 0 && newComt != comt) {
185  startComtSwitch(newComt,now);
186  waiting4switch = true;
187  }
188  pktx px;
189  while ((px = receiveFromRouter()) != 0) {
190  Packet& p = ps->getPacket(px);
191  if (!waiting4switch) {
192  forwardReport(px,now);
193  ps->free(px); continue;
194  }
195  // discard non-signalling packets and pass
196  // signalling packets to completeComtSwitch
197  if (p.type == Forest::CLIENT_DATA) {
198  ps->free(px); continue;
199  }
200  waiting4switch = !completeComtSwitch(px,now);
201  ps->free(px);
202  }
203  // check for timeout
204  waiting4switch = !completeComtSwitch(0,now);
205 
206  nextTime += 1000*UPDATE_PERIOD;
207  useconds_t delay = nextTime - Misc::getTime();
208  if (0 < delay && delay <= 1000*UPDATE_PERIOD) usleep(delay);
209  now = Misc::getTime();
210  }
211 
212  unsubscribeAll();
213  disconnect(); // send final disconnect packet
214 }
215 
222 void Monitor::startComtSwitch(comt_t newComt, uint32_t now) {
223  nextComt = newComt;
224  if (comt != 0) {
225  unsubscribeAll();
226  send2comtCtl(CtlPkt::CLIENT_LEAVE_COMTREE);
227  switchState = LEAVING;
228  switchTimer = now; switchCnt = 1;
229  } else {
230  comt = nextComt;
231  send2comtCtl(CtlPkt::CLIENT_JOIN_COMTREE);
232  switchState = JOINING;
233  switchTimer = now; switchCnt = 1;
234  }
235 }
236 
248 bool Monitor::completeComtSwitch(pktx px, uint32_t now) {
249  if (switchState == IDLE) return true;
250  if (px == 0 && now - switchTimer < SWITCH_TIMEOUT)
251  return false; // nothing to do in this case
252  switch (switchState) {
253  case LEAVING: {
254  if (px == 0) {
255  if (switchCnt > 3) { // give up
256  switchState = IDLE; return true;
257  }
258  // try again
259  send2comtCtl(CtlPkt::CLIENT_LEAVE_COMTREE,RETRY);
260  switchTimer = now; switchCnt++;
261  return false;
262  }
263  Packet& p = ps->getPacket(px);
264  CtlPkt cp(p.payload(),p.length - Forest::OVERHEAD);
265  cp.unpack();
266  if (cp.type == CtlPkt::CLIENT_LEAVE_COMTREE) {
267  if (cp.mode == CtlPkt::POS_REPLY) {
268  comt = nextComt;
269  send2comtCtl(CtlPkt::CLIENT_JOIN_COMTREE);
270  switchState = JOINING;
271  switchTimer = now; switchCnt = 1;
272  return false;
273  } else if (cp.mode == CtlPkt::NEG_REPLY) { // give up
274  switchState = IDLE; return true;
275  }
276  }
277  return false; // ignore anything else
278  }
279  case JOINING: {
280  if (px == 0) {
281  if (switchCnt > 3) { // give up
282  switchState = IDLE; return true;
283  }
284  // try again
285  send2comtCtl(CtlPkt::CLIENT_JOIN_COMTREE,RETRY);
286  switchTimer = now; switchCnt++;
287  return false;
288  }
289  Packet& p = ps->getPacket(px);
290  CtlPkt cp(p.payload(),p.length - Forest::OVERHEAD);
291  cp.unpack();
292  if (cp.type == CtlPkt::CLIENT_JOIN_COMTREE) {
293  if (cp.mode == CtlPkt::POS_REPLY) {
294  subscribeAll();
295  switchState = IDLE; return true;
296  } else if (cp.mode == CtlPkt::NEG_REPLY) { // give up
297  switchState = IDLE; return true;
298  }
299  }
300  return false; // ignore anything else
301  }
302  default: break;
303  }
304  return false;
305 }
306 
311 void Monitor::send2comtCtl(CtlPkt::CpType joinLeave, bool retry) {
312  pktx px = ps->alloc();
313  if (px == 0)
314  fatal("Monitor::send2comtCtl: no packets left to allocate");
315  if (!retry) seqNum++;
316  Packet& p = ps->getPacket(px);
317  CtlPkt cp(joinLeave,CtlPkt::REQUEST,seqNum,p.payload());
318  cp.comtree = comt;
319  cp.ip1 = myIp;
320  cp.port1 = Np4d::getSockPort(dgSock);
321  int len = cp.pack();
322  if (len == 0)
323  fatal("Monitor::send2comtCtl: control packet packing error");
324  p.length = Forest::OVERHEAD + len;
325  p.type = Forest::CLIENT_SIG; p.flags = 0;
327  p.srcAdr = myAdr; p.dstAdr = ccAdr;
328  p.pack();
329  sendToRouter(px);
330 }
331 
334 void Monitor::sendToRouter(pktx px) {
335  Packet& p = ps->getPacket(px);
336  p.pack();
337  int rv = Np4d::sendto4d(dgSock,(void *) p.buffer,p.length,
338  rtrIp,rtrPort);
339  if (rv == -1) fatal("Monitor::sendToRouter: failure in sendto");
340 }
341 
346  pktx px = ps->alloc();
347  if (px == 0) return 0;
348  Packet& p = ps->getPacket(px);
349 
350  int nbytes = Np4d::recv4d(dgSock,p.buffer,1500);
351  if (nbytes < 0) { ps->free(px); return 0; }
352 
353  p.unpack();
354  return px;
355 }
356 
381  if (connSock < 0) {
383  if (connSock < 0) return 0;
384  if (!Np4d::nonblock(connSock))
385  fatal("can't make connection socket nonblocking");
386  bool status; int ndVal = 1;
387  status = setsockopt(listenSock,IPPROTO_TCP,TCP_NODELAY,
388  (void *) &ndVal,sizeof(int));
389  if (status != 0) {
390  cerr << "setsockopt for no-delay failed\n";
391  perror("");
392  exit(1);
393  }
394  }
395  char buf[5];
396  int nbytes = read(connSock, buf, 5);
397  if (nbytes < 0) {
398  if (errno == EAGAIN) return 0;
399  fatal("Monitor::check4command: error in read call");
400  } else if (nbytes == 0) { // remote display closed connection
401  close(connSock); connSock = -1;
402  unsubscribeAll();
403  return 0;
404  } else if (nbytes < 5) {
405  fatal("Monitor::check4command: incomplete command");
406  }
407  char cmd = buf[0];
408  uint32_t param = ntohl(*((int*) &buf[1]));
409  switch (cmd) {
410  case 'x': cornerX = max(0,min(worldSize-viewSize,param)); break;
411  case 'y': cornerY = max(0,min(worldSize-viewSize,param)); break;
412  case 'v': viewSize = param;
413  if (viewSize < 1) viewSize = 1;
416  if (viewSize+cornerX > worldSize)
418  if (viewSize+cornerY > worldSize)
419  viewSize = worldSize-cornerY;
420  break;
421  case 'c': return param;
422  default: fatal("unrecognized command from remote display");
423  }
424  updateSubs();
425  return 0;
426 }
427 
428 
434 int Monitor::groupNum(int x1, int y1) {
435  return 1 + (x1/GRID) + (y1/GRID)*worldSize;
436 }
437 
443 void Monitor::switchComtrees(int newComt) {
444  unsubscribeAll(); comt = newComt; subscribeAll();
445 }
446 
451  list<int> glist;
452  for (int xi = cornerX; xi < cornerX + viewSize; xi++) {
453  //for (int xi = 0; xi < worldSize; xi++) {
454  for (int yi = cornerY; yi < cornerY + viewSize; yi++) {
455  //for (int yi = 0; yi < worldSize; yi++) {
456  int g = groupNum(xi*GRID, yi*GRID);
457  if (mySubs->find(g) == mySubs->end()) {
458  mySubs->insert(g); glist.push_back(g);
459  }
460  }
461  }
462  subscribe(glist);
463 }
464 
468  list<int> glist;
469  set<int>::iterator gp;
470  for (gp = mySubs->begin(); gp != mySubs->end(); gp++)
471  glist.push_back(*gp);
472  unsubscribe(glist);
473  mySubs->clear();
474 }
475 
479 void Monitor::subscribe(list<int>& glist) {
480  if (glist.size() == 0) return;
481 
482  pktx px = ps->alloc();
483  Packet& p = ps->getPacket(px);
484  uint32_t *pp = p.payload();
485 
486  int nsub = 0;
487  list<int>::iterator gp;
488  for (gp = glist.begin(); gp != glist.end(); gp++) {
489  int g = *gp; nsub++;
490  if (nsub > 350) {
491  pp[0] = htonl(nsub-1); pp[nsub] = 0;
492  p.length = Forest::OVERHEAD + 4*(2+nsub);
493  p.type = Forest::SUB_UNSUB; p.flags = 0;
494  p.comtree = comt;
495  p.srcAdr = myAdr; p.dstAdr = rtrAdr;
496  sendToRouter(px);
497  nsub = 1;
498  }
499  pp[nsub] = htonl(-g);
500  }
501  pp[0] = htonl(nsub); pp[nsub+1] = 0;
502 
503  p.length = Forest::OVERHEAD + 4*(2+nsub);
504  p.type = Forest::SUB_UNSUB; p.flags = 0; p.comtree = comt;
505  p.srcAdr = myAdr; p.dstAdr = rtrAdr;
506  sendToRouter(px);
507 
508  ps->free(px);
509 }
510 
514 void Monitor::unsubscribe(list<int>& glist) {
515  if (glist.size() == 0) return;
516 
517  pktx px = ps->alloc();
518  Packet& p = ps->getPacket(px);
519  uint32_t *pp = p.payload();
520 
521  int nunsub = 0;
522  list<int>::iterator gp;
523  for (gp = glist.begin(); gp != glist.end(); gp++) {
524  int g = *gp; nunsub++;
525  if (nunsub > 350) {
526  pp[0] = 0; pp[1] = htonl(nunsub-1);
527  p.length = Forest::OVERHEAD + 4*(2+nunsub);
528  p.type = Forest::SUB_UNSUB; p.flags = 0;
529  p.comtree = comt;
530  p.srcAdr = myAdr; p.dstAdr = rtrAdr;
531  sendToRouter(px);
532  nunsub = 1;
533  }
534  pp[nunsub+1] = htonl(-g);
535  }
536  pp[0] = 0; pp[1] = htonl(nunsub);
537 
538  p.length = Forest::OVERHEAD + 4*(2+nunsub);
539  p.type = Forest::SUB_UNSUB; p.flags = 0; p.comtree = comt;
540  p.srcAdr = myAdr; p.dstAdr = rtrAdr;
541  sendToRouter(px);
542 
543  ps->free(px);
544 }
545 
551  if (comt == 0) return;
552  // identify subscriptions that are not in current view
553  list<int> glist;
554  set<int>::iterator gp;
555  for (gp = mySubs->begin(); gp != mySubs->end(); gp++) {
556  int g = *gp;
557  int xi = (g-1) % worldSize;
558  int yi = (g-1) / worldSize;
559  if (xi >= cornerX && xi < cornerX + viewSize &&
560  yi >= cornerY && yi < cornerY + viewSize)
561  continue; // keep this subscription
562  glist.push_back(g);
563  }
564  // remove them from subscription set and unsubscribe
565  list<int>::iterator p;
566  for (p = glist.begin(); p != glist.end(); p++)
567  mySubs->erase(*p);
568  unsubscribe(glist);
569 
570  // identify subscriptions that are now in our current view,
571  // add them to subscription set and subscribe
572  glist.clear();
573  for (int xi = cornerX; xi < cornerX + viewSize; xi++) {
574  for (int yi = cornerY; yi < cornerY + viewSize; yi++) {
575  int g = groupNum(xi*GRID, yi*GRID);
576  if (mySubs->find(g) == mySubs->end()) {
577  mySubs->insert(g); glist.push_back(g);
578  }
579  }
580  }
581  subscribe(glist);
582 }
583 
590 void Monitor::forwardReport(pktx px, int now) {
591  if (comt == 0 || connSock == -1) return;
592 
593  Packet& p = ps->getPacket(px);
594  uint32_t *pp = p.payload();
595 
596  if (p.comtree != comt || p.type != Forest::CLIENT_DATA ||
597  ntohl(pp[0]) != (uint32_t) Avatar::STATUS_REPORT) {
598  // ignore packets for other comtrees, or that are not
599  // avatar status reports
600  ps->free(px); return;
601  }
602  uint32_t buf[NUMITEMS];
603  for (int i = 0; i < NUMITEMS; i++) buf[i] = pp[i];
604  buf[0] = htonl(now); buf[1] = htonl(p.srcAdr);
605  buf[8] = htonl(comt);
606 
607  int nbytes = NUMITEMS*sizeof(uint32_t);
608  char* bp = (char *) buf;
609  while (nbytes > 0) {
610  int n = write(connSock, (void *) bp, nbytes);
611  if (n < 0) break;
612  bp += n; nbytes -= n;
613  }
614  ps->free(px); return;
615 }
616 
621  pktx px = ps->alloc();
622  Packet& p = ps->getPacket(px);
623 
624  p.payload()[0] = htonl((uint32_t) (nonce >> 32));
625  p.payload()[1] = htonl((uint32_t) (nonce & 0xffffffff));
626  p.length = Forest::OVERHEAD + 8; p.type = Forest::CONNECT; p.flags = 0;
627  p.comtree = Forest::CONNECT_COMT;
628  p.srcAdr = myAdr; p.dstAdr = rtrAdr;
629 
630  int resendTime = Misc::getTime();
631  int resendCount = 1;
632  while (true) {
633  int now = Misc::getTime();
634  if (now > resendTime) {
635  if (resendCount > 3) { ps->free(px); return false; }
636  sendToRouter(px);
637  resendTime += 1000000;
638  resendCount++;
639  }
640  int rx = receiveFromRouter();
641  if (rx == 0) { sleep(100000); continue; }
642  Packet& reply = ps->getPacket(rx);
643  bool status = (reply.type == Forest::CONNECT &&
644  reply.flags == Forest::ACK_FLAG);
645  ps->free(px); ps->free(rx);
646  return status;
647  }
648 }
649 
653  pktx px = ps->alloc();
654  Packet& p = ps->getPacket(px);
655 
656  p.payload()[0] = htonl((uint32_t) (nonce >> 32));
657  p.payload()[1] = htonl((uint32_t) (nonce & 0xffffffff));
659  p.flags = 0; p.comtree = Forest::CONNECT_COMT;
660  p.srcAdr = myAdr; p.dstAdr = rtrAdr;
661 
662  int resendTime = Misc::getTime();
663  int resendCount = 1;
664  while (true) {
665  int now = Misc::getTime();
666  if (now > resendTime) {
667  if (resendCount > 3) { ps->free(px); return false; }
668  sendToRouter(px);
669  resendTime += 1000000;
670  resendCount++;
671  }
672  int rx = receiveFromRouter();
673  if (rx == 0) { sleep(100000); continue; }
674  Packet& reply = ps->getPacket(rx);
675  bool status = (reply.type == Forest::DISCONNECT &&
676  reply.flags == Forest::ACK_FLAG);
677  ps->free(px); ps->free(rx);
678  return status;
679  }
680 }
681 
682 } // ends namespace
683