CLConnection.cc

Go to the documentation of this file.
00001 /*
00002  *    Copyright 2006 Intel Corporation
00003  * 
00004  *    Licensed under the Apache License, Version 2.0 (the "License");
00005  *    you may not use this file except in compliance with the License.
00006  *    You may obtain a copy of the License at
00007  * 
00008  *        http://www.apache.org/licenses/LICENSE-2.0
00009  * 
00010  *    Unless required by applicable law or agreed to in writing, software
00011  *    distributed under the License is distributed on an "AS IS" BASIS,
00012  *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00013  *    See the License for the specific language governing permissions and
00014  *    limitations under the License.
00015  */
00016 
00017 
00018 #include <oasys/util/OptParser.h>
00019 
00020 #include "CLConnection.h"
00021 #include "bundling/BundleDaemon.h"
00022 #include "bundling/BundlePayload.h"
00023 #include "contacts/ContactManager.h"
00024 
00025 namespace dtn {
00026 
00027 //----------------------------------------------------------------------
00028 CLConnection::CLConnection(const char*       classname,
00029                            const char*       logpath,
00030                            ConnectionConvergenceLayer* cl,
00031                            LinkParams*       params,
00032                            bool              active_connector)
00033     : Thread(classname),
00034       Logger(classname, logpath),
00035       contact_(classname),
00036       contact_up_(false),
00037       cmdqueue_(logpath),
00038       cl_(cl),
00039       params_(params),
00040       active_connector_(active_connector),
00041       num_pollfds_(0),
00042       poll_timeout_(-1),
00043       contact_broken_(false),
00044       num_pending_(0)
00045 {
00046     sendbuf_.reserve(params_->sendbuf_len_);
00047     recvbuf_.reserve(params_->recvbuf_len_);
00048 }
00049 
00050 //----------------------------------------------------------------------
00051 CLConnection::~CLConnection()
00052 {
00053 }
00054 
00055 //----------------------------------------------------------------------
00056 void
00057 CLConnection::run()
00058 {
00059     struct pollfd* cmdqueue_poll;
00060 
00061     initialize_pollfds();
00062 
00063     cmdqueue_poll         = &pollfds_[num_pollfds_];
00064     cmdqueue_poll->fd     = cmdqueue_.read_fd();
00065     cmdqueue_poll->events = POLLIN;
00066 
00067     // based on the parameter passed to the constructor, we either
00068     // initiate a connection or accept one, then move on to the main
00069     // run() loop. it is the responsibility of the underlying CL to
00070     // make sure that a contact_ structure is found / created
00071     if (active_connector_) {
00072         connect();
00073     } else {
00074         accept();
00075     }
00076 
00077     while (true) {
00078         if (contact_broken_) {
00079             log_debug("contact_broken set, exiting main loop");
00080             return;
00081         }
00082 
00083         // check the comand queue coming in from the bundle daemon
00084         // if any arrive, we continue to the top of the loop to check
00085         // contact_broken and then process any other commands before
00086         // checking for data to/from the remote side
00087         if (cmdqueue_.size() != 0) {
00088             process_command();
00089             continue;
00090         }
00091 
00092         // send any data there is to send, and if something was sent
00093         // out, we'll call poll() with a zero timeout so we can read
00094         // any data there is to consume, then return to send another
00095         // chunk.
00096         bool more_to_send = send_pending_data();
00097 
00098         // check again here for contact broken since we don't want to
00099         // poll if the socket's been closed
00100         if (contact_broken_) {
00101             log_debug("contact_broken set, exiting main loop");
00102             return;
00103         }
00104         
00105         // now we poll() to wait for a new command (indicated by the
00106         // notifier on the command queue), data arriving from the
00107         // remote side, or write-readiness on the socket indicating
00108         // that we can send more data.
00109         for (int i = 0; i < num_pollfds_ + 1; ++i) {
00110             pollfds_[i].revents = 0;
00111         }
00112 
00113         int timeout = more_to_send ? 0 : poll_timeout_;
00114 
00115         log_debug("calling poll on %d fds with timeout %d",
00116                   num_pollfds_ + 1, timeout);
00117                                                  
00118         int cc = oasys::IO::poll_multiple(pollfds_, num_pollfds_ + 1,
00119                                           timeout, NULL, logpath_);
00120 
00121         if (cc == oasys::IOTIMEOUT)
00122         {
00123             handle_poll_timeout();
00124         }
00125         else if (cc > 0)
00126         {
00127             if (cc == 1 && cmdqueue_poll->revents != 0) {
00128                 continue; // activity on the command queue only
00129             }
00130             handle_poll_activity();
00131         }
00132         else
00133         {
00134             log_err("unexpected return from poll_multiple: %d", cc);
00135             break_contact(ContactEvent::BROKEN);
00136             return;
00137         }
00138     }
00139 }
00140 
00141 //----------------------------------------------------------------------
00142 void
00143 CLConnection::queue_bundle(Bundle* bundle)
00144 {
00145     /*
00146      * Called by the main BundleDaemon thread... push a new CLMsg onto
00147      * the queue and potentially set the BUSY state on the link. Note
00148      * that it's important to update num_pending before pushing the
00149      * message onto the queue since the latter might trigger a context
00150      * switch.
00151      */
00152     LinkParams* params = dynamic_cast<LinkParams*>(contact_->link()->cl_info());
00153     ASSERT(params != NULL);
00154     
00155     oasys::atomic_incr(&num_pending_);
00156     
00157     if (num_pending_.value >= params->busy_queue_depth_)
00158     {
00159         log_debug("%d bundles pending, setting BUSY state",
00160                   num_pending_.value);
00161         contact_->link()->set_state(Link::BUSY);
00162     }
00163     else
00164     {
00165         log_debug("%d bundles pending -- leaving state as-is",
00166                   num_pending_.value);
00167     }
00168 
00169     cmdqueue_.push_back(
00170         CLConnection::CLMsg(CLConnection::CLMSG_SEND_BUNDLE, bundle));
00171 }
00172 
00173 //----------------------------------------------------------------------
00174 void
00175 CLConnection::process_command()
00176 {
00177     CLMsg msg;
00178     bool ok = cmdqueue_.try_pop(&msg);
00179     ASSERT(ok); // shouldn't be called if the queue is empty
00180     
00181     switch(msg.type_) {
00182     case CLMSG_SEND_BUNDLE:
00183         log_debug("processing CLMSG_SEND_BUNDLE");
00184         handle_send_bundle(msg.bundle_.object());
00185         break;
00186         
00187     case CLMSG_CANCEL_BUNDLE:
00188         log_debug("processing CLMSG_CANCEL_BUNDLE");
00189         handle_cancel_bundle(msg.bundle_.object());
00190         break;
00191         
00192     case CLMSG_BREAK_CONTACT:
00193         log_debug("processing CLMSG_BREAK_CONTACT");
00194         break_contact(ContactEvent::USER);
00195         break;
00196     default:
00197         PANIC("invalid CLMsg typecode %d", msg.type_);
00198     }
00199 }
00200 
00201 //----------------------------------------------------------------------
00202 void
00203 CLConnection::check_unblock_link()
00204 {
00205     /*
00206      * Check if we need to unblock the link by clearing the BUSY
00207      * state. Note that we only do this when the link first goes back
00208      * below the threshold since otherwise we'll flood the router with
00209      * incorrect BUSY->AVAILABLE events.
00210      */
00211     LinkParams* params = dynamic_cast<LinkParams*>(contact_->link()->cl_info());
00212     ASSERT(params != NULL);
00213 
00214     oasys::atomic_decr(&num_pending_);
00215     ASSERT((int)num_pending_.value >= 0);
00216 
00217     if (contact_->link()->state() == Link::BUSY)
00218     {
00219         if (num_pending_.value == (params->busy_queue_depth_ - 1))
00220         {
00221             log_debug("%d bundles pending, clearing BUSY state", num_pending_.value);
00222 
00223             // XXX/demmer post the AVAILABLE event at the head of the
00224             // event queue, since we want it to be processed quickly
00225             // in case there's a backlog of events. this whole issue
00226             // of backpressure needs to be worked through in a much
00227             // better way.
00228             BundleDaemon::post_at_head(
00229                 new LinkStateChangeRequest(contact_->link(),
00230                                            Link::AVAILABLE,
00231                                            ContactEvent::UNBLOCKED));
00232         }
00233         else
00234         {
00235             log_debug("%d bundles pending, leaving state as-is",
00236                       num_pending_.value);
00237         }
00238     }
00239 }
00240 
00241 //----------------------------------------------------------------------
00242 void
00243 CLConnection::contact_up()
00244 {
00245     log_debug("contact_up");
00246     ASSERT(contact_ != NULL);
00247     
00248     ASSERT(!contact_up_);
00249     contact_up_ = true;
00250     
00251     BundleDaemon::post(new ContactUpEvent(contact_));
00252 }
00253 
00254 //----------------------------------------------------------------------
00255 void
00256 CLConnection::break_contact(ContactEvent::reason_t reason)
00257 {
00258     log_debug("break_contact: %s", ContactEvent::reason_to_str(reason));
00259 
00260     if (reason != ContactEvent::BROKEN) {
00261         disconnect();
00262     }
00263 
00264     contact_broken_ = true;
00265     
00266     // if the connection isn't being closed by the user, we need to
00267     // notify the daemon that either the contact ended or the link
00268     // became unavailable before a contact began.
00269     //
00270     // we need to check that there is in fact a contact, since a
00271     // connection may be accepted and then break before establishing a
00272     // contact
00273     if ((reason != ContactEvent::USER) && (contact_ != NULL)) {
00274         BundleDaemon::post(
00275             new LinkStateChangeRequest(contact_->link(),
00276                                        Link::CLOSED,
00277                                        reason));
00278     }
00279 }
00280 
00281 //----------------------------------------------------------------------
00282 void
00283 CLConnection::close_contact()
00284 {
00285     LinkParams* params = dynamic_cast<LinkParams*>(contact_->link()->cl_info());
00286     ASSERT(params != NULL);
00287     
00288     // drain the inflight queue, posting transmitted or transmit
00289     // failed events
00290     while (! inflight_.empty()) {
00291         InFlightBundle* inflight = inflight_.front();
00292         u_int32_t sent_bytes  = inflight->sent_data_.num_contiguous();
00293         u_int32_t acked_bytes = inflight->ack_data_.num_contiguous();
00294         
00295         if ((! params->reactive_frag_enabled_) ||
00296             (sent_bytes == 0) ||
00297             (contact_->link()->is_reliable() && acked_bytes == 0))
00298         {
00299             log_debug("posting transmission failed event "
00300                       "(reactive fragmentation %s, %s link, acked_bytes %u)",
00301                       params->reactive_frag_enabled_ ? "enabled" : "disabled",
00302                       contact_->link()->is_reliable() ? "reliable" : "unreliable",
00303                       acked_bytes);
00304             
00305             BundleDaemon::post(
00306                 new BundleTransmitFailedEvent(inflight->bundle_.object(),
00307                                               contact_, contact_->link()));
00308             
00309         } else {
00310             BundleDaemon::post(
00311                 new BundleTransmittedEvent(inflight->bundle_.object(),
00312                                            contact_, contact_->link(),
00313                                            sent_bytes, acked_bytes));
00314         }
00315 
00316         inflight_.pop_front();
00317     }
00318 
00319     // check the tail of the incoming queue to see if there's a
00320     // partially-received bundle that we need to post a received event
00321     // for (if reactive fragmentation is enabled)
00322     if (! incoming_.empty()) {
00323         IncomingBundle* incoming = incoming_.back();
00324         if(!incoming->rcvd_data_.empty())
00325         {  
00326             size_t rcvd_len = incoming->rcvd_data_.last() + 1;
00327 
00328             size_t header_block_length =
00329                 BundleProtocol::payload_offset(&incoming->bundle_->recv_blocks_);
00330         
00331             if ((incoming->total_length_ == 0) && 
00332                 params->reactive_frag_enabled_ &&
00333                 (rcvd_len > header_block_length))
00334             {
00335                 log_debug("partial arrival of bundle: "
00336                           "got %zu bytes [hdr %zu payload %zu]",
00337                           rcvd_len, header_block_length,
00338                           incoming->bundle_->payload_.length());
00339              
00340                 BundleDaemon::post(
00341                     new BundleReceivedEvent(incoming->bundle_.object(),
00342                                             EVENTSRC_PEER, rcvd_len,
00343                                             contact_.object()));
00344             }
00345         }
00346     }
00347 
00348     // now drain the incoming queue
00349     while (!incoming_.empty()) {
00350         IncomingBundle* incoming = incoming_.back();
00351         incoming_.pop_back();
00352         delete incoming;
00353     }
00354     
00355     // finally, drain the message queue, posting transmit failed
00356     // events for any send bundle commands that may be in there
00357     // (though this is unlikely to happen)
00358     if (cmdqueue_.size() > 0) {
00359         log_warn("close_contact: %zu CL commands still in queue: ",
00360                  cmdqueue_.size());
00361         
00362         while (cmdqueue_.size() != 0) {
00363             CLMsg msg;
00364             bool ok = cmdqueue_.try_pop(&msg);
00365             ASSERT(ok);
00366 
00367             log_warn("close_contact: %s still in queue", clmsg_to_str(msg.type_));
00368             
00369             if (msg.type_ == CLMSG_SEND_BUNDLE) {
00370                 BundleDaemon::post(
00371                     new BundleTransmitFailedEvent(msg.bundle_.object(),
00372                                                   contact_,
00373                                                   contact_->link()));
00374 
00375             }
00376         }
00377     }
00378 }
00379 
00380 //----------------------------------------------------------------------
00381 void
00382 CLConnection::find_contact(const EndpointID& peer_eid)
00383 {
00384     /*
00385      * Now we may need to find or create an appropriate opportunistic
00386      * link for the connection.
00387      *
00388      * First, we check if there's an idle (i.e. UNAVAILABLE) link to
00389      * the remote eid. We explicitly ignore the nexthop address, since
00390      * that can change (due to things like TCP/UDP port number
00391      * assignment), but we pass in the remote eid to match for a link.
00392      *
00393      * If we can't find one, then we create a new opportunistic link
00394      * for the connection.
00395      */
00396     if (contact_ == NULL) {
00397 
00398         ASSERT(nexthop_ != ""); // the derived class must have set the
00399                                 // nexthop in the constructor
00400 
00401         ContactManager* cm = BundleDaemon::instance()->contactmgr();
00402         oasys::ScopeLock l(cm->lock(), "CLConnection::find_contact");
00403 
00404         Link* link = cm->find_link_to(cl_, "", peer_eid,
00405                                       Link::OPPORTUNISTIC,
00406                                       Link::AVAILABLE | Link::UNAVAILABLE);
00407 
00408         // XXX/demmer remove check for no contact
00409         if (link != NULL && (link->contact() == NULL)) {
00410             link->set_nexthop(nexthop_);
00411             log_debug("found idle opportunistic link *%p", link);
00412             
00413         } else {
00414             if (link != NULL) {
00415                 log_warn("in-use opportunistic link *%p returned from "
00416                          "ContactManager::find_link_to", link);
00417             }
00418             
00419             link = cm->new_opportunistic_link(cl_,
00420                                               nexthop_.c_str(),
00421                                               peer_eid);
00422             log_debug("created new opportunistic link *%p", link);
00423         }
00424         
00425         ASSERT(! link->isopen());
00426 
00427         contact_ = new Contact(link);
00428         contact_->set_cl_info(this);
00429         link->set_contact(contact_.object());
00430 
00431         /*
00432          * Now that the connection is established, we swing the
00433          * params_ pointer to those of the link, since there's a
00434          * chance they've been modified by the user in the past.
00435          */
00436         LinkParams* lparams = dynamic_cast<LinkParams*>(link->cl_info());
00437         ASSERT(lparams != NULL);
00438         params_ = lparams;
00439     }
00440 }
00441 
00442 
00443 } // namespace dtn

Generated on Sat Sep 8 08:36:16 2007 for DTN Reference Implementation by  doxygen 1.5.3