forest-net
an overlay networks for large-scale virtual worlds
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator
Receiver.cpp
1 
9 #include "IoProcessor.h"
10 
11 namespace forest {
12 
13 
22 IoProcessor::IoProcessor(int maxIface1, ipp_t portNum1, IfaceTable *ift1,
23  LinkTable *lt1, PacketStore *ps1, StatsModule *sm1)
24  : maxIface(maxIface1), portNum(portNum1), ift(ift1),
25  lt(lt1), ps(ps1), sm(sm1) {
26  nRdy = 0; maxSockNum = -1;
27  sockets = new fd_set;
28  sock = new int[maxIface+1];
29  for (int i = 0; i <= maxIface; i++) sock[i] = -1;
30  bootSock = -1;
31 }
32 
33 IoProcessor::~IoProcessor() {
34  int iface;
35  for (iface = ift->firstIface(); iface != 0;
36  iface = ift->nextIface(iface))
37  close(sock[iface]);
38  delete sockets;
39 }
40 
45 bool IoProcessor::setup(int i) {
46  // create datagram socket
48  if (sock[i] < 0) {
49  cerr << "IoProcessor::setup: socket call failed\n";
50  return false;
51  }
52  maxSockNum = max(maxSockNum, sock[i]);
53 
54  // bind it to an address
55  if (!Np4d::bind4d(sock[i], ift->getIpAdr(i), portNum)) {
56  string s;
57  cerr << "IoProcessor::setup: bind call failed for ("
58  << Np4d::ip2string(ift->getIpAdr(i),s)
59  << ", " << portNum << ") check interface's IP "
60  << "address and port\n";
61  return false;
62  }
63  ift->setPort(i,Np4d::getSockPort(sock[i]));
64 
65  // send dummy packet to NetMgr - hack to trigger NAT in SPP
66  int zero = 0;
67  Np4d::sendto4d(sock[i], (void *) &zero, sizeof(zero),
69  return true;
70 }
71 
72 bool IoProcessor::setupBootSock(ipa_t bootIp, ipa_t nmIp1) {
73  nmIp = nmIp1;
74  // open datagram socket
76  if (bootSock < 0) {
77  cerr << "IoProcessor::setupBootSock: socket call failed\n";
78  return false;
79  }
80  // bind it to the bootIp
81  if (!Np4d::bind4d(bootSock, bootIp, 0)) {
82  cerr << "IoProcessor::setupBootIp: bind call failed, "
83  << "check boot IP address\n";
84  return false;
85  }
86  return true;
87 }
88 
89 void IoProcessor::closeBootSock() {
90  close(bootSock); bootSock = -1;
91 }
92 
93 // Return next waiting packet or 0 if there is none.
94 pktx IoProcessor::receive() {
95  if (bootSock >= 0) {
96  int nbytes; // number of bytes in received packet
97 
98  pktx px = ps->alloc();
99  if (px == 0) { return 0; }
100  Packet& p = ps->getPacket(px);
101  buffer_t& b = *p.buffer;
102 
103  ipa_t sIpAdr; ipp_t sPort;
104  nbytes = Np4d::recvfrom4d(bootSock, (void *) &b[0], 1500,
105  sIpAdr, sPort);
106  p.bufferLen = nbytes;
107  if (nbytes < 0) {
108  if (errno == EAGAIN) {
109  ps->free(px); return 0;
110  }
111  fatal("IoProcessor::receive: error in recvfrom call");
112  }
113  if (sIpAdr != nmIp || sPort != Forest::NM_PORT) {
114  ps->free(px); return 0;
115  }
116  p.unpack();
117 /*
118 string s;
119 cerr << "iop received " << " nbytes=" << nbytes << endl;
120 cerr << p.toString(s) << endl;
121 if (p.type == Forest::CONNECT) {
122 uint64_t x = ntohl(p.payload()[0]); x <<= 32; x |= ntohl(p.payload()[1]);
123 cerr << x << endl;
124 }
125 */
126  if (!p.hdrErrCheck()) { ps->free(px); return 0; }
127  p.tunIp = sIpAdr; p.tunPort = sPort;
128  p.inLink = 0;
129  return px;
130  }
131  // proceed to normal case, when we're not booting
132  if (nRdy == 0) { // if no ready interface check for new arrivals
133  FD_ZERO(sockets);
134  for (int i = ift->firstIface(); i != 0; i = ift->nextIface(i)) {
135  FD_SET(sock[i],sockets);
136  }
137  struct timeval zero; zero.tv_sec = zero.tv_usec = 0;
138  int cnt = 0;
139  do {
140  nRdy = select(maxSockNum+1,sockets,
141  (fd_set *)NULL, (fd_set *)NULL, &zero);
142  } while (nRdy < 0 && cnt++ < 10);
143  if (cnt > 5) {
144  cerr << "IoProcessor::receive: select failed "
145  << cnt-1 << " times\n";
146  }
147  if (nRdy < 0) {
148  fatal("IoProcessor::receive: select failed");
149  }
150  if (nRdy == 0) return 0;
151  cIf = 0;
152  }
153  while (1) { // find next ready interface
154  cIf++;
155  if (cIf > Forest::MAXINTF) return 0; // should never reach here
156  if (ift->valid(cIf) && FD_ISSET(sock[cIf],sockets)) {
157  nRdy--; break;
158  }
159  }
160  // Now, read the packet from the interface
161  int nbytes; // number of bytes in received packet
162  int lnk; // # of link on which packet received
163 
164  pktx px = ps->alloc();
165  if (px == 0) {
166  cerr << "IoProcessor:receive: out of packets\n";
167  return 0;
168  }
169  Packet& p = ps->getPacket(px);
170  buffer_t& b = *p.buffer;
171 
172  ipa_t sIpAdr; ipp_t sPort;
173  nbytes = Np4d::recvfrom4d(sock[cIf], (void *) &b[0], 1500,
174  sIpAdr, sPort);
175  if (nbytes < 0) fatal("IoProcessor::receive: error in recvfrom call");
176 
177  p.unpack();
178 /*
179 string s;
180 cerr << "iop received from (" << Np4d::ip2string(sIpAdr,s) << ",";
181 cerr << sPort << ") nbytes=" << nbytes << endl;
182 cerr << p.toString(s);
183 if (p.type == Forest::CONNECT) {
184 uint64_t x = ntohl(p.payload()[0]); x <<= 32; x |= ntohl(p.payload()[1]);
185 cerr << x << endl;
186 }
187 */
188 
189  if (!p.hdrErrCheck()) { ps->free(px); return 0; }
190  lnk = lt->lookup(sIpAdr, sPort);
191  if (lnk == 0 && p.type == Forest::CONNECT
192  && p.length == Forest::OVERHEAD+2*sizeof(uint64_t)) {
193  uint64_t nonce = ntohl(p.payload()[2]); nonce <<= 32;
194  nonce |= ntohl(p.payload()[3]);
195  lnk = lt->lookup(nonce); // check for "startup" entry
196  }
197  if (lnk == 0 || cIf != lt->getIface(lnk)) {
198  string s;
199  cerr << "IoProcessor::receive: bad packet: lnk=" << lnk << " "
200  << p.toString(s);
201  cerr << "sender=(" << Np4d::ip2string(sIpAdr,s) << ","
202  << sPort << ")\n";
203  ps->free(px); return 0;
204  }
205 
206  p.inLink = lnk; p.bufferLen = nbytes;
207  p.tunIp = sIpAdr; p.tunPort = sPort;
208 
209  sm->cntInLink(lnk,Forest::truPktLeng(nbytes),
210  (lt->getPeerType(lnk) == Forest::ROUTER));
211  return px;
212 }
213 
214 void IoProcessor::send(pktx px, int lnk) {
215 // Send packet on specified link and recycle storage.
216  Packet p = ps->getPacket(px);
217 string s;
218  if (lnk == 0) { // means we're booting and this is going to NetMgr
219  int rv, lim = 0;
220 /*
221 cerr << "iop sending to (" << Np4d::ip2string(nmIp,s) << ",";
222 cerr << Forest::NM_PORT << ") \n" << p.toString(s);
223 if (p.type == Forest::CONNECT) {
224 uint64_t x = ntohl(p.payload()[0]); x <<= 32; x |= ntohl(p.payload()[1]);
225 cerr << x << endl;
226 }
227 */
228  do {
230  (void *) p.buffer, p.length,
232  } while (rv == -1 && errno == EAGAIN && lim++ < 10);
233  if (rv == -1) {
234  fatal("IoProcessor:: send: failure in sendto");
235  }
236  ps->free(px);
237  return;
238  }
239  ipa_t farIp = lt->getPeerIpAdr(lnk);
240  ipp_t farPort = lt->getPeerPort(lnk);
241  if (farIp == 0 || farPort == 0) { ps->free(px); return; }
242 
243  int rv, lim = 0;
244 /*
245 {
246 cerr << "iop sending to (" << Np4d::ip2string(farIp,s) << ",";
247 cerr << farPort << ") \n" << p.toString(s);
248 if (p.type == Forest::CONNECT) {
249 uint64_t x = ntohl(p.payload()[0]); x <<= 32; x |= ntohl(p.payload()[1]);
250 cerr << x << endl;
251 }
252 }
253 */
254  do {
255  rv = Np4d::sendto4d(sock[lt->getIface(lnk)],
256  (void *) p.buffer, p.length,
257  farIp, farPort);
258  } while (rv == -1 && errno == EAGAIN && lim++ < 10);
259  if (rv == -1) {
260  cerr << "IoProcessor:: send: failure in sendto (errno="
261  << errno << ")\n";
262  exit(1);
263  }
264  sm->cntOutLink(lnk,Forest::truPktLeng(p.length),
265  (lt->getPeerType(lnk) == Forest::ROUTER));
266  ps->free(px);
267 }
268 
269 } // ends namespace
270