00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017 #include <netinet/in.h>
00018 #include "bundling/Bundle.h"
00019 #include "bundling/BundleRef.h"
00020 #include "bundling/BundleList.h"
00021 #include "bundling/BundleDaemon.h"
00022 #include "ProphetController.h"
00023 #include <oasys/thread/Lock.h>
00024 #include <oasys/util/Random.h>
00025 #include <oasys/util/ScratchBuffer.h>
00026
00027 #include <queue>
00028
00029 namespace dtn {
00030
00031 template<>
00032 ProphetController* oasys::Singleton<ProphetController,false>::instance_ = NULL;
00033
00034 void
00035 ProphetController::do_init(ProphetParams* params,
00036 const BundleList* list,
00037 BundleActions* actions,
00038 const char* logpath)
00039 {
00040 ASSERT(params != NULL);
00041 ASSERT(actions != NULL);
00042 ASSERT(list != NULL);
00043
00044 params_ = params;
00045 actions_ = actions;
00046
00047 bundles_ = new ProphetBundleQueue(list, actions, params,
00048 *(QueueComp::queuecomp(params->qp_,
00049 &pstats_,
00050 &nodes_)));
00051
00052 node_age_timer_ = new ProphetTableAgeTimer(&nodes_,
00053 params_->age_period_,
00054 params_->epsilon_);
00055 ack_age_timer_ = new ProphetAckAgeTimer(&acks_,params_->age_period_);
00056
00057 lock_ = new oasys::SpinLock();
00058
00059 set_logpath(logpath);
00060 }
00061
00062 ProphetController::ProphetController()
00063 : oasys::Logger("ProphetController","/dtn/route/prophet/controller"),
00064 params_(NULL),
00065 node_age_timer_(NULL),
00066 ack_age_timer_(NULL),
00067 actions_(NULL),
00068 bundles_(NULL)
00069 {
00070 Prophet::UniqueID::init();
00071 encounters_.clear();
00072 prophet_eid_.assign(BundleDaemon::instance()->local_eid());
00073 ASSERT(prophet_eid_.append_service_tag("prophet"));
00074 }
00075
00076 ProphetController::~ProphetController()
00077 {
00078 delete node_age_timer_;
00079 delete ack_age_timer_;
00080 delete bundles_;
00081 delete lock_;
00082 }
00083
00084 void
00085 ProphetController::shutdown()
00086 {
00087 {
00088 oasys::ScopeLock l(lock_,"destructor");
00089 enc_set::iterator it = encounters_.begin();
00090 while( it != encounters_.end() )
00091 {
00092 ProphetEncounter* pe = *it;
00093 pe->neighbor_gone();
00094 it++;
00095 }
00096 encounters_.clear();
00097 }
00098 node_age_timer_->cancel();
00099 ack_age_timer_->cancel();
00100 }
00101
00102
00103 void
00104 ProphetController::dump_state(oasys::StringBuffer* buf)
00105 {
00106 oasys::ScopeLock l(lock_,"dump_state");
00107 buf->appendf("\n"
00108 "ProphetRouter [%s] [%s] (%zu active, %zu known)\n"
00109 "-------------\n",
00110 Prophet::fs_to_str(params_->fs_),
00111 Prophet::qp_to_str(params_->qp_),
00112 encounters_.size(), nodes_.size());
00113
00114
00115 for (enc_set::iterator it = encounters_.begin();
00116 it != encounters_.end();
00117 it++)
00118 {
00119 ProphetEncounter* pe = *it;
00120 pe->dump_state(buf);
00121 }
00122
00123
00124 buf->appendf("\n"
00125 "Known peers\n"
00126 "-----------\n");
00127 oasys::ScopeLock n(nodes_.lock(),"ProphetController::dump_state");
00128 for (ProphetTable::iterator i = nodes_.begin();
00129 i != nodes_.end();
00130 i++)
00131 {
00132 EndpointID eid = (*i).first;
00133 ProphetNode* node = (*i).second;
00134 buf->appendf("%02.2f %c%c%c %s\n",
00135 node->p_value(),
00136 node->relay() ? 'R' : ' ',
00137 node->custody() ? 'C' : ' ',
00138 node->internet_gw() ? 'I' : ' ',
00139 eid.c_str());
00140 }
00141
00142 buf->appendf("\n R - relay C - custody I - internet gateway\n\n");
00143 }
00144
00145 ProphetEncounter*
00146 ProphetController::find_instance(Link* link)
00147 {
00148 oasys::ScopeLock l(lock_,"find_instance");
00149 enc_set::iterator it = encounters_.begin();
00150 while( it != encounters_.end() )
00151 {
00152 if((*it)->next_hop()->remote_eid().equals(link->remote_eid()))
00153 return (ProphetEncounter*) (*it);
00154 else
00155 log_debug("find_instance: %s != %s",
00156 (*it)->next_hop()->remote_eid().c_str(),
00157 link->remote_eid().c_str());
00158 it++;
00159 }
00160 return NULL;
00161 }
00162
00163 void
00164 ProphetController::new_neighbor(const ContactRef& contact)
00165 {
00166 log_info("NEW_NEIGHBOR signal from *%p",contact.object());
00167 Link* link = contact.object()->link();
00168 ProphetEncounter* pe = find_instance(link);
00169 if (pe == NULL && !link->remote_eid().equals(EndpointID::NULL_EID()))
00170 {
00171 pe = new ProphetEncounter(link, this);
00172 if (!reg(pe))
00173 {
00174 delete pe;
00175 return;
00176 }
00177 pe->start();
00178 }
00179 }
00180
00181 void
00182 ProphetController::neighbor_gone(const ContactRef& contact)
00183 {
00184 Link* link = contact.object()->link();
00185 log_info("NEIGHBOR_GONE signal from *%p",contact.object());
00186 ProphetEncounter* pe = NULL;
00187 if((pe = find_instance(link)) != NULL)
00188 {
00189 pe->neighbor_gone();
00190 log_info("found and stopped ProphetEncounter instance");
00191 }
00192 else
00193 {
00194 log_info("did not find ProphetEncounter instance");
00195 }
00196 }
00197
00198 void
00199 ProphetController::handle_bundle_received(Bundle* bundle,const ContactRef& contact)
00200 {
00201 log_debug("handle_bundle_received, *%p from *%p",bundle,contact.object());
00202
00203
00204 EndpointID routeid = Prophet::eid_to_routeid(bundle->dest_);
00205 ProphetNode* node = nodes_.find(routeid);
00206 if (node == NULL && !routeid.equals(BundleDaemon::instance()->local_eid()))
00207 {
00208 node = new ProphetNode(params_);
00209 node->set_eid(Prophet::eid_to_routeid(bundle->dest_));
00210 nodes_.update(node);
00211 }
00212
00213 if (prophet_eid_.equals(bundle->dest_))
00214 {
00215
00216 ProphetTLV* pt = ProphetTLV::deserialize(bundle);
00217 if (pt != NULL)
00218 {
00219 log_debug("handle_bundle_received, got TLV size %d",pt->length());
00220
00221 ProphetEncounter *pe = find_instance(contact->link());
00222 if (pe == NULL)
00223 {
00224
00225 new_neighbor(contact);
00226 if ((pe = find_instance(contact->link())) == NULL)
00227 {
00228 log_err("Unable to find or create ProphetEncounter to "
00229 "handle Prophet control message *%p",bundle);
00230 delete pt;
00231 }
00232 }
00233
00234 if (pe != NULL)
00235 {
00236
00237 log_debug("handle_bundle_received, dispatching TLV to instance %d",
00238 pe->local_instance());
00239 pe->receive_tlv(pt);
00240 }
00241
00242
00243 actions_->delete_bundle(bundle,BundleProtocol::REASON_NO_ADDTL_INFO);
00244 }
00245 }
00246
00247
00248 else
00249 {
00250 oasys::ScopeLock l(lock_,"handle_bundle_received");
00251
00252 enc_set::iterator it = encounters_.begin();
00253 while( it != encounters_.end() )
00254 {
00255 ProphetEncounter* pe = *it;
00256 pe->handle_bundle_received(bundle);
00257 it++;
00258 }
00259
00260
00261 bundles_->push(bundle);
00262 }
00263 }
00264
00265 void
00266 ProphetController::handle_bundle_delivered(Bundle* b)
00267 {
00268 BundleRef bundle("handle_bundle_delivered");
00269 bundle = b;
00270 if (bundle.object() == NULL) return;
00271
00272
00273 acks_.insert(bundle.object());
00274
00275
00276 bundle = NULL;
00277 bundles_->drop_bundle(b);
00278 }
00279
00280 void
00281 ProphetController::handle_bundle_expired(Bundle* b)
00282 {
00283 BundleRef bundle("handle_bundle_expired");
00284 bundle = b;
00285 if (bundle.object() == NULL) return;
00286
00287
00288 pstats_.drop_bundle(bundle.object());
00289
00290
00291 bundle = NULL;
00292 bundles_->drop_bundle(b);
00293 }
00294
00295 void
00296 ProphetController::handle_link_state_change_request(const ContactRef& c)
00297 {
00298
00299 ProphetEncounter* pe = find_instance(c.object()->link());
00300 if (pe != NULL)
00301 {
00302
00303 pe->flush_pending();
00304 }
00305 }
00306
00307 }