LCOV - code coverage report
Current view: top level - bgp - bgp_ribout_updates.cc (source / functions) Hit Total Coverage
Test: OpenSDN C/C++ coverage (all TARGET_SET jobs) Lines: 284 290 97.9 %
Date: 2026-06-22 02:21:21 Functions: 24 24 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*
       2             :  * Copyright (c) 2013 Juniper Networks, Inc. All rights reserved.
       3             :  */
       4             : 
       5             : #include "bgp/bgp_ribout_updates.h"
       6             : 
       7             : #include <string>
       8             : 
       9             : #include "sandesh/sandesh_trace.h"
      10             : #include "base/task_annotations.h"
      11             : #include "bgp/bgp_log.h"
      12             : #include "bgp/bgp_peer_types.h"
      13             : #include "bgp/bgp_ribout.h"
      14             : #include "bgp/bgp_route.h"
      15             : #include "bgp/bgp_update_queue.h"
      16             : #include "bgp/bgp_update_monitor.h"
      17             : #include "bgp/bgp_update_sender.h"
      18             : #include "bgp/message_builder.h"
      19             : 
      20             : using std::unique_ptr;
      21             : using std::vector;
      22             : 
      23             : vector<Message *> RibOutUpdates::bgp_messages_;
      24             : vector<Message *> RibOutUpdates::xmpp_messages_;
      25             : 
      26             : //
      27             : // Create a new RibOutUpdates.  Also create the necessary UpdateQueue and
      28             : // add them to the vector.
      29             : //
      30      135680 : RibOutUpdates::RibOutUpdates(RibOut *ribout, int index)
      31      135680 :     : ribout_(ribout),
      32      135680 :       index_(index) {
      33      407040 :     for (int i = 0; i < QCOUNT; i++) {
      34      271360 :         UpdateQueue *queue = new UpdateQueue(ribout, i);
      35      271360 :         queue_vec_.push_back(queue);
      36             :     }
      37      135680 :     monitor_.reset(new RibUpdateMonitor(ribout, &queue_vec_));
      38      135680 :     memset(&stats_, 0, sizeof(stats_));
      39      135680 : }
      40             : 
      41             : //
      42             : // Destructor.  Get rid of all the UpdateQueues.
      43             : //
      44      271060 : RibOutUpdates::~RibOutUpdates() {
      45      135680 :     STLDeleteValues(&queue_vec_);
      46      271060 : }
      47             : 
      48             : //
      49             : // Initialize static vectors of bgp/xmpp message pointers to NULL.
      50             : //
      51         158 : void RibOutUpdates::Initialize() {
      52         158 :     bgp_messages_.resize(DB::PartitionCount(), NULL);
      53         158 :     xmpp_messages_.resize(DB::PartitionCount(), NULL);
      54         158 : }
      55             : 
      56             : //
      57             : // Free any memory allocated for bgp/xmpp messages.
      58             : //
      59          62 : void RibOutUpdates::Terminate() {
      60          62 :     STLDeleteValues(&bgp_messages_);
      61          62 :     STLDeleteValues(&xmpp_messages_);
      62          62 : }
      63             : 
      64             : //
      65             : // Create if needed, and return the bgp/xmpp message for this RibOutUpdates.
      66             : // Note that we use static vectors of bgp/xmpp messages, one per partition,
      67             : // so that we don't need to allocate and free messages repeatedly.
      68             : //
      69      616133 : Message *RibOutUpdates::GetMessage() const {
      70      616133 :     if (ribout_->IsEncodingBgp()) {
      71      159015 :         if (!bgp_messages_[index_]) {
      72             :             MessageBuilder *builder =
      73         154 :                 MessageBuilder::GetInstance(RibExportPolicy::BGP);
      74         154 :             Message *message = builder->Create();
      75         154 :             bgp_messages_[index_] = message;
      76             :         }
      77      159015 :         return bgp_messages_[index_];
      78             :     }
      79      457126 :     if (ribout_->IsEncodingXmpp()) {
      80      457175 :         if (!xmpp_messages_[index_]) {
      81             :             MessageBuilder *builder =
      82         144 :                 MessageBuilder::GetInstance(RibExportPolicy::XMPP);
      83         144 :             Message *message = builder->Create();
      84         144 :             xmpp_messages_[index_] = message;
      85             :         }
      86      457172 :         return xmpp_messages_[index_];
      87             :     }
      88           0 :     return NULL;
      89             : }
      90             : 
      91             : //
      92             : // Concurrency: Called in the context of the routing table partition task.
      93             : //
      94             : // Enqueue the RouteUpdate corresponding to the DBEntryBase into the queue.
      95             : // This is called in the context of the routing table partition task. All
      96             : // the concurrency issues are handled by going through the monitor.
      97             : //
      98             : // If the UpdateQueue corresponding to the RouteUpdate previously had no
      99             : // updates after the tail marker, we kick the BgpUpdateSender to perform
     100             : // a tail dequeue for the RibOut.
     101             : //
     102      900148 : void RibOutUpdates::Enqueue(DBEntryBase *db_entry, RouteUpdate *rt_update) {
     103      900148 :     CHECK_CONCURRENCY("db::DBTable");
     104             : 
     105      900083 :     bool need_tail_dequeue = monitor_->EnqueueUpdate(db_entry, rt_update);
     106      900370 :     if (need_tail_dequeue) {
     107      536209 :         ribout_->sender()->RibOutActive(index_, ribout_, rt_update->queue_id());
     108             :     }
     109      900448 : }
     110             : 
     111             : //
     112             : // Concurrency: Called in the context of the bgp::SendUpdate task.
     113             : //
     114             : // Common dequeue routine invoked by tail dequeue and peer dequeue. It builds
     115             : // and sends updates for each UpdateInfo element in the list hanging off the
     116             : // RouteUpdate. For each update that it builds, it also includes prefixes for
     117             : // other UpdateInfo elements that share the same attributes, provided that
     118             : // the associated RouteUpdate was enqueued after the original one.
     119             : 
     120             : // Each update is targeted at the peers in the RibPeerSet of the UpdateMarker
     121             : // passed in to us.  This set of peers is subsequently culled based on the
     122             : // RibPeerSet in each UpdateInfo.  IOW, the update is sent only to the set
     123             : // of peers in the intersection of the UpdateMarker and the UpdateInfo. Note
     124             : // that the UpdateMarker could specify a single peer if we are called from
     125             : // peer dequeue.
     126             : //
     127             : // Return false if all the peers in the marker get blocked.  In any case, the
     128             : // blocked parameter is populated with the set of peers that are send blocked.
     129             : //
     130      627406 : bool RibOutUpdates::DequeueCommon(UpdateQueue *queue, UpdateMarker *marker,
     131             :         RouteUpdate *rt_update, RibPeerSet *blocked) {
     132      627406 :     CHECK_CONCURRENCY("bgp::SendUpdate");
     133             : 
     134             :     // Pass a hint to the Message telling it whether it needs to cache the
     135             :     // formatted version of each route. This is used only for xmpp messages.
     136             :     // Heuristic is to cache if there's markers other than the tail marker.
     137             :     // The reasoning is that the cached version can be used later when the
     138             :     // markers in question are being processed.  Put another way - there's
     139             :     // no need to cache if route is not going to be advertised to any peers
     140             :     // other than the ones in the given UpdateMarker.
     141      627444 :     bool cache_routes = queue->marker_count() != 0;
     142             : 
     143             :     // Go through all UpdateInfo elements for the RouteUpdate.
     144      627445 :     int queue_id = rt_update->queue_id();
     145      627441 :     RibPeerSet rt_blocked;
     146      627442 :     for (UpdateInfoSList::List::iterator iter = rt_update->Updates()->begin();
     147     2518825 :          iter != rt_update->Updates()->end();) {
     148             :         // Get the UpdateInfo and move the iterator to next one before doing
     149             :         // any processing, since we may delete the UpdateInfo further down.
     150      631901 :         UpdateInfo *uinfo = iter.operator->();
     151             :         ++iter;
     152             : 
     153             :         // Skip if there's no overlap between the UpdateMarker and the targets
     154             :         // for the UpdateInfo.  The intersection is the set of peers to which
     155             :         // the message we are about to build will be sent.
     156      631901 :         RibPeerSet msgset;
     157      631926 :         msgset.BuildIntersection(uinfo->target, marker->members);
     158      631807 :         if (msgset.empty()) {
     159       15665 :             continue;
     160             :         }
     161             : 
     162             :         // Generate the update, merge additional updates into that message and
     163             :         // send it message to the target RibPeerSet.
     164             :         //
     165             :         // In the rare case that the first route and it's attributes don't fit
     166             :         // into the message, clear the target bits in the UpdateInfo to ensure
     167             :         // that the UpdateQueue doesn't get wedged. However, don't update the
     168             :         // history bits in the RouteUpdate since the message did not get sent.
     169             :         //
     170             :         // The Create routine has the responsibility of logging an error and
     171             :         // incrementing any counters.
     172      616131 :         RibPeerSet msg_blocked;
     173      616127 :         stats_[queue_id].messages_built_count_++;
     174      616127 :         Message *message = GetMessage();
     175      616110 :         assert(message);
     176     1848510 :         bool msg_built = message->Start(
     177      616110 :             ribout_, cache_routes, &uinfo->roattr, rt_update->route());
     178      616290 :         if (msg_built) {
     179      616186 :             UpdatePack(queue_id, message, uinfo, msgset);
     180      616221 :             message->Finish();
     181      616213 :             UpdateSend(queue_id, message, msgset, &msg_blocked);
     182             :         }
     183             : 
     184             :         // Reset bits in the UpdateInfo.  Note that this has already been done
     185             :         // via UpdatePack for all the other UpdateInfo elements that we packed
     186             :         // into this message.
     187      616376 :         bool empty = ClearAdvertisedBits(rt_update, uinfo, msgset, msg_built);
     188      616369 :         if (empty) {
     189      615539 :             rt_update->RemoveUpdateInfo(uinfo);
     190             :         }
     191             : 
     192             :         // Update RibPeerSet of peers that got blocked while processing this
     193             :         // RouteUpdate. Since there's no overlap of peers between UpdateInfos
     194             :         // for the same RouteUpdate, we can update the markers for all blocked
     195             :         // peers in one shot i.e. outside this loop.
     196      616358 :         rt_blocked |= msg_blocked;
     197      632010 :     }
     198             : 
     199             :     // Update the markers for any peers that got blocked while processing this
     200             :     // RouteUpdate. If all peers in the UpdateMarker got blocked, we shouldn't
     201             :     // build any more update messages.  Return false to let the callers know
     202             :     // that this has happened.
     203      627451 :     if (rt_blocked.empty()) {
     204      625789 :         return true;
     205             :     } else {
     206        1753 :         *blocked |= rt_blocked;
     207        1755 :         return !UpdateMarkersOnBlocked(marker, rt_update, &rt_blocked);
     208             :     }
     209      627545 : }
     210             : 
     211             : //
     212             : // Concurrency: Called in the context of the bgp::SendUpdate task.
     213             : //
     214             : // Dequeue and build updates for the in-sync peers in the RibPeerSet of the
     215             : // tail marker for the given queue id.
     216             : //
     217             : // Return false if all the peers in the marker get blocked.  In any case, the
     218             : // blocked parameter is populated with the set of peers that are send blocked
     219             : // and the unsync parameter is populated with the set of peers from the tail
     220             : // marker that are not in the msync set passed in to the method.
     221             : //
     222      615499 : bool RibOutUpdates::TailDequeue(int queue_id, const RibPeerSet &msync,
     223             :         RibPeerSet *blocked, RibPeerSet *unsync) {
     224      615499 :     CHECK_CONCURRENCY("bgp::SendUpdate");
     225             : 
     226      615587 :     stats_[queue_id].tail_dequeue_count_++;
     227      615587 :     UpdateQueue *queue = queue_vec_[queue_id];
     228      615584 :     UpdateMarker *start_marker = queue->tail_marker();
     229      615583 :     RouteUpdatePtr update = monitor_->GetNextUpdate(queue_id, start_marker);
     230             : 
     231      615621 :     if (update.get() == NULL) {
     232        1476 :         return true;
     233             :     }
     234             : 
     235             :     // Intersect marker membership and in-sync peers to come up with the
     236             :     // unsync peers. If all the peers are unsync return right away. The
     237             :     // BgpUpdateSender will take care of triggering a TailDequeue again
     238             :     // when at least one peer becomes in-sync.
     239      614145 :     unsync->BuildComplement(start_marker->members, msync);
     240      614097 :     if (*unsync == start_marker->members) {
     241        4907 :         return false;
     242             :     }
     243             : 
     244             :     // Split the unsync peers from the tail marker. Note that this updates
     245             :     // the RibPeerSet in the tail marker.
     246      609186 :     if (!unsync->empty()) {
     247        2655 :         stats_[queue_id].marker_split_count_++;
     248        2655 :         queue->MarkerSplit(start_marker, *unsync);
     249             :     }
     250             : 
     251             :     // Update send loop. Select next update to send, format a message.
     252             :     // Add other updates with the same attributes and replicate the
     253             :     // packet.
     254      609186 :     RibPeerSet members = start_marker->members;
     255      609179 :     RouteUpdatePtr next_update;
     256     1235160 :     for (; update.get() != NULL; update = next_update) {
     257      626987 :         if (!DequeueCommon(queue, start_marker, update.get(), blocked)) {
     258             :             // Be sure to get rid of the RouteUpdate if it's empty.
     259        1036 :             if (update->empty()) {
     260         921 :                 ClearUpdate(&update);
     261             :             }
     262             : 
     263        1036 :             return false;
     264             :         }
     265             : 
     266             :         // Iterate to the next update before we potentially delete the
     267             :         // current one. If there are no more updates in the queue, the
     268             :         // marker will get moved so that it's after the current update.
     269      626069 :         next_update = monitor_->GetNextUpdate(queue_id, update.get());
     270             : 
     271             :         // Be sure to get rid of the RouteUpdate if it's empty.
     272      626050 :         if (update->empty()) {
     273      609879 :             ClearUpdate(&update);
     274             :         }
     275             :     }
     276             : 
     277             :     // Request peers to flush accumulated update messages.
     278             :     // Return false if all peers got blocked.
     279      608102 :     UpdateFlush(members, blocked);
     280      608232 :     return (members != *blocked);
     281      615650 : }
     282             : 
     283             : //
     284             : // Concurrency: Called in the context of the bgp::SendUpdate task.
     285             : //
     286             : // Dequeue and build updates for all the peers that share the same marker as
     287             : // the specified peer.  This routine has some extra intelligence beyond the
     288             : // TailDequeue. As it encounters update markers, it merges in any send ready
     289             : // peers from those with the marker being processed for dequeue. This is done
     290             : // to reduce the number of times we build an update message containing the
     291             : // the same information.
     292             : //
     293             : // Return false if all the peers in the marker get blocked.  In any case, the
     294             : // blocked parameter is populated with the set of peers that are send blocked.
     295             : //
     296         299 : bool RibOutUpdates::PeerDequeue(int queue_id, IPeerUpdate *peer,
     297             :         RibPeerSet *blocked) {
     298         299 :     CHECK_CONCURRENCY("bgp::SendUpdate");
     299             : 
     300         299 :     stats_[queue_id].peer_dequeue_count_++;
     301         299 :     UpdateQueue *queue = queue_vec_[queue_id];
     302         299 :     int peer_idx = ribout_->GetPeerIndex(peer);
     303         299 :     UpdateMarker *start_marker = queue->GetMarker(peer_idx);
     304             : 
     305             :     // We're done if this is the same as the tail marker.  Updates will be
     306             :     // built subsequently via TailDequeue.
     307         299 :     assert(start_marker);
     308         299 :     if (start_marker == queue->tail_marker()) {
     309         209 :         return true;
     310             :     }
     311             : 
     312             :     // We're done if the lead peer is not send ready. This can happen if
     313             :     // the peer got blocked when processing updates in another partition.
     314          90 :     RibPeerSet mready;
     315          90 :     ribout_->BuildSendReadyBitSet(start_marker->members, &mready);
     316          90 :     if (!mready.test(peer_idx)) {
     317           0 :         blocked->set(peer_idx);
     318           0 :         return false;
     319             :     }
     320             : 
     321             :     // Split out any peers from the marker that are not send ready. Note that
     322             :     // this updates the RibPeerSet in the marker.
     323          90 :     RibPeerSet notready;
     324          90 :     notready.BuildComplement(start_marker->members, mready);
     325          90 :     if (!notready.empty()) {
     326           3 :         stats_[queue_id].marker_split_count_++;
     327           3 :         queue->MarkerSplit(start_marker, notready);
     328             :     }
     329             : 
     330             :     // Get the encapsulator for the first RouteUpdate.  Even if there's no
     331             :     // RouteUpdate, we should find another marker or the tail marker.
     332             :     UpdateEntry *upentry;
     333             :     RouteUpdatePtr update =
     334          90 :         monitor_->GetNextEntry(queue_id, start_marker, &upentry);
     335          90 :     assert(upentry);
     336             : 
     337             :     // Update loop.  Keep going till we reach the tail marker or till all the
     338             :     // peers get blocked.
     339          90 :     RibPeerSet members = start_marker->members;
     340          90 :     RouteUpdatePtr next_update;
     341             :     UpdateEntry *next_upentry;
     342         507 :     for (; upentry != NULL; upentry = next_upentry, update = next_update) {
     343         507 :         UpdateMarker *marker = NULL;
     344         507 :         if (upentry->IsMarker()) {
     345             :             // The queue entry is a marker.  We're done if we've reached the
     346             :             // tail marker.  Updates will be built later via TailDequeue.
     347          88 :             marker = static_cast<UpdateMarker *>(upentry);
     348          88 :             if (marker == queue->tail_marker()) {
     349          75 :                 stats_[queue_id].marker_merge_count_++;
     350          75 :                 queue->MarkerMerge(queue->tail_marker(), start_marker,
     351          75 :                         start_marker->members);
     352          75 :                 break;
     353             :             }
     354             :         } else {
     355             :             // The queue entry is a RouteUpdate. Go ahead and build an update
     356             :             // message.  Bail if all the peers in the marker get blocked.
     357         419 :             if (!DequeueCommon(queue, start_marker, update.get(), blocked)) {
     358             :                 // Be sure to get rid of the RouteUpdate if it's empty.
     359          15 :                 if (update->empty()) {
     360          14 :                     ClearUpdate(&update);
     361             :                 }
     362          15 :                 break;
     363             :             }
     364             :         }
     365             : 
     366             :         // Iterate to the next element before we potentially delete the
     367             :         // current one.
     368             :         next_update =
     369         417 :             monitor_->GetNextEntry(queue_id, upentry, &next_upentry);
     370             : 
     371         417 :         if (upentry->IsMarker()) {
     372             :             // As the entry is a marker, merge send-ready peers from it
     373             :             // with the marker that is being processed for dequeue.  Note
     374             :             // that this updates the RibPeerSet in the marker.
     375          13 :             RibPeerSet mmove;
     376          13 :             ribout_->BuildSendReadyBitSet(marker->members, &mmove);
     377          13 :             if  (!mmove.empty()) {
     378           9 :                 stats_[queue_id].marker_merge_count_++;
     379           9 :                 queue->MarkerMerge(start_marker, marker, mmove);
     380           9 :                 members |= mmove;
     381             :             }
     382         417 :         } else if (update->empty()) {
     383             :             // Be sure to get rid of the RouteUpdate since it's empty.
     384         359 :             ClearUpdate(&update);
     385             :         }
     386             :     }
     387             : 
     388             :     // Request peers to flush accumulated update messages.
     389             :     // Return false if all peers got blocked.
     390          90 :     UpdateFlush(members, blocked);
     391          90 :     return (members != *blocked);
     392          90 : }
     393             : 
     394             : //
     395             : // Concurrency: Called in the context of the bgp::SendUpdate task.
     396             : //
     397             : // Go through all the UpdateInfo elements that have the same attribute as
     398             : // the start parameter and pack the corresponding prefixes into the Message.
     399             : // The attributes and the prefix associated with start are already in the
     400             : // Message when this method is invoked.
     401             : //
     402             : // The set of peers for which this update is being built is represented by
     403             : // the msgset parameter.  As the msgset has already been determined by the
     404             : // caller, we should only add prefixes that need to go to all the peers in
     405             : // the msgset.
     406             : //
     407      616167 : void RibOutUpdates::UpdatePack(int queue_id, Message *message,
     408             :         UpdateInfo *start_uinfo, const RibPeerSet &msgset) {
     409      616167 :     CHECK_CONCURRENCY("bgp::SendUpdate");
     410             : 
     411             :     UpdateInfo *uinfo, *next_uinfo;
     412      616121 :     RouteUpdatePtr next_update;
     413             : 
     414             :     // Walk through all the UpdateInfo elements with the same attribute in
     415             :     // enqueue order.
     416             :     RouteUpdatePtr update =
     417      616117 :         monitor_->GetAttrNext(queue_id, start_uinfo, &uinfo);
     418      927211 :     for (; update.get() != NULL; update = next_update, uinfo = next_uinfo) {
     419             :         // Iterate to the next element before we potentially delete the
     420             :         // current one.
     421      311247 :         next_update = monitor_->GetAttrNext(queue_id, uinfo, &next_uinfo);
     422             : 
     423             :         // Skip if the msgset RibPeerSet is not a subset of the target in
     424             :         // UpdateInfo.
     425      311194 :         if (!uinfo->target.Contains(msgset))
     426        3817 :             continue;
     427             : 
     428             :         // Go ahead and add the route to the message.  Terminate the loop
     429             :         // if the message doesn't have room for the route.  The route will
     430             :         // get included in another update message.
     431      307361 :         bool success = message->AddRoute(update->route(), &uinfo->roattr);
     432      307430 :         if (!success) {
     433         255 :             break;
     434             :         }
     435             : 
     436             :         // First clear the advertised bits as represented by msgset from
     437             :         // the target RibPeerSet in the UpdateInfo. If the target is now
     438             :         // empty, remove the UpdateInfo from the list container in the
     439             :         // underlying RouteUpdate.
     440             :         //
     441             :         // If the RouteUpdate itself is now empty i.e. there are no more
     442             :         // UpdateInfo elements associated with it, we can get rid of it.
     443      307175 :         bool empty = ClearAdvertisedBits(update.get(), uinfo, msgset, true);
     444      307152 :         if (empty && update->RemoveUpdateInfo(uinfo)) {
     445      306385 :             ClearUpdate(&update);
     446             :         }
     447             :     }
     448      616180 : }
     449             : 
     450             : //
     451             : // Concurrency: Called in the context of the bgp::SendUpdate task.
     452             : //
     453             : // Go through all the peers in the specified RibPeerSet and send the given
     454             : // message to each of them.  Update the blocked RibPeerSet with peers that
     455             : // become blocked after sending the message.
     456             : //
     457      616184 : void RibOutUpdates::UpdateSend(int queue_id, Message *message,
     458             :         const RibPeerSet &dst, RibPeerSet *blocked) {
     459      616184 :     CHECK_CONCURRENCY("bgp::SendUpdate");
     460             : 
     461      616143 :     RibOut::PeerIterator iter(ribout_, dst);
     462     2360372 :     while (iter.HasNext()) {
     463     1744036 :         int ix_current = iter.index();
     464     1744009 :         IPeerUpdate *peer = iter.Next();
     465     1743635 :         size_t msgsize = 0;
     466     1743635 :         const string *msg_str = NULL;
     467     1743635 :         string temp;
     468     1743624 :         const uint8_t *data = message->GetData(peer, &msgsize, &msg_str, &temp);
     469     1743731 :         if (Sandesh::LoggingLevel() >= Sandesh::LoggingUtLevel()) {
     470     2113210 :             BGP_LOG_PEER(Message, peer, Sandesh::LoggingUtLevel(),
     471             :                 BGP_LOG_FLAG_SYSLOG, BGP_PEER_DIR_OUT,
     472             :                 "Update size " << msgsize <<
     473             :                 " reach " << message->num_reach_routes() <<
     474             :                 " unreach " << message->num_unreach_routes());
     475             :         }
     476     1744117 :         stats_[queue_id].messages_sent_count_++;
     477     1744117 :         stats_[queue_id].reach_count_ += message->num_reach_routes();
     478     1744086 :         stats_[queue_id].unreach_count_ += message->num_unreach_routes();
     479     1744077 :         bool more = peer->SendUpdate(data, msgsize, msg_str);
     480     1744169 :         if (!more) {
     481        3857 :             blocked->set(ix_current);
     482             :         }
     483     1744205 :         IPeer *ipeer = dynamic_cast<IPeer *>(peer);
     484     1744205 :         if (!ipeer) {
     485       10472 :             continue;
     486             :         }
     487     1733733 :         IPeerDebugStats *stats = ipeer->peer_stats();
     488     1733708 :         if (stats) {
     489     1733725 :             stats->UpdateTxReachRoute(message->num_reach_routes());
     490     1733792 :             stats->UpdateTxUnreachRoute(message->num_unreach_routes());
     491             :         }
     492     1744272 :     }
     493      616187 : }
     494             : 
     495             : //
     496             : // Concurrency: Called in the context of the bgp::SendUpdate task.
     497             : //
     498             : // Go through all the peers in the specified RibPeerSet and ask them to flush
     499             : // i.e. send immediately, any accumulated updates.  Update blocked RibPeerSet
     500             : // with peers that become blocked after flushing.
     501             : //
     502             : // Skip if the RibOut is XMPP.
     503             : //
     504      608251 : void RibOutUpdates::UpdateFlush(const RibPeerSet &dst, RibPeerSet *blocked) {
     505      608251 :     CHECK_CONCURRENCY("bgp::SendUpdate");
     506             : 
     507      608288 :     if (ribout_->IsEncodingXmpp())
     508      465652 :         return;
     509             : 
     510      142637 :     RibOut::PeerIterator iter(ribout_, dst);
     511      300726 :     while (iter.HasNext()) {
     512      158048 :         int ix_current = iter.index();
     513      158050 :         IPeerUpdate *peer = iter.Next();
     514      158048 :         bool more = peer->FlushUpdate();
     515      158063 :         if (!more) {
     516          19 :             blocked->set(ix_current);
     517             :         }
     518             :     }
     519             : }
     520             : 
     521             : //
     522             : // Take the AdvertisedInfo history in the RouteUpdate and move it to a new
     523             : // RouteState. Go through the monitor to associate the new RouteState as the
     524             : // listener state for the Route.
     525             : //
     526      583696 : void RibOutUpdates::StoreHistory(RouteUpdate *rt_update) {
     527      583696 :     CHECK_CONCURRENCY("bgp::SendUpdate");
     528             : 
     529      583670 :     BgpRoute *route = rt_update->route();
     530      583668 :     RouteState *rstate = new RouteState();
     531      583655 :     rt_update->MoveHistory(rstate);
     532      583671 :     monitor_->SetEntryState(route, rstate);
     533      583755 : }
     534             : 
     535             : //
     536             : // Go through the monitor to clear the listener state for the underlying Route
     537             : // of the RouteUpdate.
     538             : //
     539      333640 : void RibOutUpdates::ClearState(RouteUpdate *rt_update) {
     540      333640 :     CHECK_CONCURRENCY("bgp::SendUpdate");
     541             : 
     542      333604 :     BgpRoute *route = rt_update->route();
     543      333601 :     monitor_->ClearEntryState(route);
     544      333762 : }
     545             : 
     546             : //
     547             : // Called when the RouteUpdate encapsulated by the RouteUpdatePtr has no more
     548             : // UpdateInfo elements. Releases ownership of the RouteUpdate and deletes the
     549             : // RouteUpdate, as well as any associated UpdateList if appropriate.
     550             : //
     551      917551 : void RibOutUpdates::ClearUpdate(RouteUpdatePtr *update) {
     552      917551 :     CHECK_CONCURRENCY("bgp::SendUpdate");
     553             : 
     554             :     // Dequeue the route update.
     555      917405 :     RouteUpdate *rt_update = update->get();
     556      917398 :     monitor_->DequeueUpdate(rt_update);
     557             : 
     558      917511 :     if (rt_update->OnUpdateList()) {
     559             :         // Remove the route update from the update list and check if the list
     560             :         // can now be downgraded back to a route update.
     561         110 :         UpdateList *uplist = rt_update->GetUpdateList(ribout_);
     562         110 :         uplist->RemoveUpdate(rt_update);
     563         110 :         RouteUpdate *last_rt_update = uplist->MakeRouteUpdate();
     564             : 
     565             :         // If we were able to downgrade, set the DBEntry to point to the last
     566             :         // remaining route update and get rid of the current route update and
     567             :         // the update list.  Otherwise, just get rid of the route update.
     568         110 :         if (last_rt_update) {
     569         110 :             monitor_->SetEntryState(rt_update->route(), last_rt_update);
     570         110 :             update->release();
     571         110 :             delete rt_update;
     572         110 :             delete uplist;
     573             :         } else {
     574           0 :             update->release();
     575           0 :             delete rt_update;
     576             :         }
     577             :     } else {
     578             :         // Store the history from the route update or clear the state for the
     579             :         // DBEntry depending on whether we advertised the route.  In either
     580             :         // case, get rid of the route update.
     581      917396 :         if (rt_update->IsAdvertised()) {
     582      583704 :             StoreHistory(rt_update);
     583             :         } else {
     584      333623 :             ClearState(rt_update);
     585             :         }
     586      917506 :         update->release();
     587      917436 :         delete rt_update;
     588             :     }
     589      917437 : }
     590             : 
     591             : //
     592             : // Clear the advertised bits specified by isect from the target RibPeerSet in
     593             : // the UpdateInfo.  If the target is now empty, remove the UpdateInfo from the
     594             : // set container in the UpdateQueue.  Note that the UpdateInfo will still be
     595             : // on the SList in the RouteUpdate.
     596             : //
     597             : // Return true if the UpdateInfo was removed from the set container.
     598             : //
     599      923547 : bool RibOutUpdates::ClearAdvertisedBits(RouteUpdate *rt_update,
     600             :         UpdateInfo *uinfo, const RibPeerSet &isect, bool update_history) {
     601      923547 :     CHECK_CONCURRENCY("bgp::SendUpdate");
     602             : 
     603      923408 :     if (update_history) {
     604      923296 :         rt_update->UpdateHistory(ribout_, &uinfo->roattr, isect);
     605             :     }
     606      923412 :     uinfo->target.Reset(isect);
     607      923350 :     bool empty = uinfo->target.empty();
     608      923339 :     if (empty) {
     609      922225 :         UpdateQueue *queue = queue_vec_[rt_update->queue_id()];
     610      922218 :         queue->AttrDequeue(uinfo);
     611             :     }
     612      923491 :     return empty;
     613             : }
     614             : 
     615             : //
     616             : // Concurrency: Called in the context of the bgp::SendUpdate task.
     617             : //
     618             : // Update the markers for all the peers in the blocked RibPeerSet.  In the
     619             : // general case we clear the blocked RibPeerSet from the UpdateMarker and
     620             : // create a new UpdateMarker for the blocked peers.
     621             : //
     622             : // Return true in the special case where all peers in the UpdateMarker have
     623             : // become blocked.
     624             : //
     625        1755 : bool RibOutUpdates::UpdateMarkersOnBlocked(UpdateMarker *marker,
     626             :         RouteUpdate *rt_update,
     627             :         const RibPeerSet *blocked) {
     628        1755 :     CHECK_CONCURRENCY("bgp::SendUpdate");
     629             : 
     630        1756 :     assert(!blocked->empty());
     631        1756 :     int queue_id = rt_update->queue_id();
     632        1756 :     UpdateQueue *queue = queue_vec_[queue_id];
     633             : 
     634             :     // If all the peers in the UpdateMarker are blocked, we simply move the
     635             :     // marker after the RouteUpdate.
     636        1756 :     if (marker->members == *blocked) {
     637        1051 :         stats_[queue_id].marker_move_count_++;
     638        1051 :         queue->MoveMarker(marker, rt_update);
     639        1051 :         return true;
     640             :     }
     641             : 
     642             :     // Reset bits in the specified UpdateMarker, create a new one for the
     643             :     // blocked peers and insert the new one after the RouteUpdate.
     644         705 :     marker->members.Reset(*blocked);
     645         705 :     assert(!marker->members.empty());
     646         705 :     UpdateMarker *new_marker = new UpdateMarker();
     647         705 :     new_marker->members = *blocked;
     648         705 :     stats_[queue_id].marker_split_count_++;
     649         705 :     queue->AddMarker(new_marker, rt_update);
     650             : 
     651         705 :     return false;
     652             : }
     653             : 
     654           8 : bool RibOutUpdates::Empty() const {
     655          24 :     for (int i = 0; i < RibOutUpdates::QCOUNT; ++i) {
     656          16 :         UpdateQueue *queue = queue_vec_[i];
     657          16 :         if (!queue->empty()) {
     658           0 :             return false;
     659             :         }
     660             :     }
     661           8 :     return true;
     662             : }
     663             : 
     664       13494 : size_t RibOutUpdates::queue_size(int queue_id) const {
     665       13494 :     const UpdateQueue *queue = queue_vec_[queue_id];
     666       13494 :     return queue->size();
     667             : }
     668             : 
     669       13494 : size_t RibOutUpdates::queue_marker_count(int queue_id) const {
     670       13494 :     const UpdateQueue *queue = queue_vec_[queue_id];
     671       13494 :     return queue->marker_count();
     672             : }
     673             : 
     674      735010 : bool RibOutUpdates::QueueJoin(int queue_id, int bit) {
     675      735010 :     UpdateQueue *queue = queue_vec_[queue_id];
     676      735010 :     return queue->Join(bit);
     677             : }
     678             : 
     679      735010 : void RibOutUpdates::QueueLeave(int queue_id, int bit) {
     680      735010 :     UpdateQueue *queue = queue_vec_[queue_id];
     681      735010 :     queue->Leave(bit);
     682      735010 : }
     683             : 
     684             : //
     685             : // Add statistics information to the provided Stats structure.
     686             : //
     687          16 : void RibOutUpdates::AddStatisticsInfo(int queue_id, Stats *stats) const {
     688          16 :     stats->messages_built_count_ += stats_[queue_id].messages_built_count_;
     689          16 :     stats->messages_sent_count_  += stats_[queue_id].messages_sent_count_;
     690          16 :     stats->reach_count_          += stats_[queue_id].reach_count_;
     691          16 :     stats->unreach_count_        += stats_[queue_id].unreach_count_;
     692          16 :     stats->tail_dequeue_count_   += stats_[queue_id].tail_dequeue_count_;
     693          16 :     stats->peer_dequeue_count_   += stats_[queue_id].peer_dequeue_count_;
     694          16 :     stats->marker_split_count_   += stats_[queue_id].marker_split_count_;
     695          16 :     stats->marker_merge_count_   += stats_[queue_id].marker_merge_count_;
     696          16 :     stats->marker_move_count_    += stats_[queue_id].marker_move_count_;
     697          16 : }

Generated by: LCOV version 1.14