LCOV - code coverage report
Current view: top level - bgp - bgp_update_sender.cc (source / functions) Hit Total Coverage
Test: OpenSDN C/C++ coverage (all TARGET_SET jobs) Lines: 460 495 92.9 %
Date: 2026-06-18 01:51:13 Functions: 93 97 95.9 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*
       2             :  * Copyright (c) 2013 Juniper Networks, Inc. All rights reserved.
       3             :  */
       4             : 
       5             : #include "bgp/bgp_update_sender.h"
       6             : 
       7             : #include <boost/bind/bind.hpp>
       8             : #include <boost/foreach.hpp>
       9             : 
      10             : #include <map>
      11             : #include <string>
      12             : #include <atomic>
      13             : 
      14             : #include "base/task_annotations.h"
      15             : #include "bgp/ipeer.h"
      16             : #include "bgp/bgp_ribout.h"
      17             : #include "bgp/bgp_ribout_updates.h"
      18             : #include "db/db.h"
      19             : 
      20             : using std::unique_ptr;
      21             : using std::make_pair;
      22             : using std::map;
      23             : using std::string;
      24             : using std::vector;
      25             : using namespace boost::placeholders;
      26             : 
      27             : //
      28             : // This struct represents RibOut specific state for a PeerState.  There's one
      29             : // instance of this for each RibOut that an IPeerUpdate has joined.
      30             : //
      31             : // The PeerRibState contains a bit mask to keep track of the QueueIds that are
      32             : // currently active for the RibOut for the IPeerUpdate.
      33             : //
      34             : struct BgpSenderPartition::PeerRibState {
      35      367405 :     PeerRibState() : qactive(0) { }
      36             :     uint8_t qactive;
      37             : };
      38             : 
      39             : //
      40             : // This nested class represents IPeerUpdate related state that's specific to
      41             : // the BgpSenderPartition.
      42             : //
      43             : // A PeerState contains a Map of the index for a RibState to a PeerRibState.
      44             : // Each entry in the map logically represents the state of the peer for the
      45             : // ribout.
      46             : //
      47             : // The Map is used in conjunction with the RibStateMap in BgpSenderPartition
      48             : // to implement regular and circular iterator classes that are used to walk
      49             : // through all the RibState entries for a peer.
      50             : //
      51             : // A PeerState maintains the in_sync and send_ready state for the IPeerUpdate.
      52             : //
      53             : // The PeerState is considered to be send_ready when the underlying socket is
      54             : // is writable.  Note that the send_ready state in the PeerState may be out of
      55             : // date with the actual socket state because the socket could have got blocked
      56             : // when writing from another partition. Hence IPeerUpdate::send_ready() is the
      57             : // more authoritative source.
      58             : //
      59             : // The PeerState is considered to be in_sync if it's send_ready and the marker
      60             : // IPeerUpdate the peer has merged with the tail marker for all QueueIds in
      61             : // all RiBOuts for which the IPeerUpdate is subscribed.
      62             : //
      63             : // The PeerState keeps count of the number of active RibOuts for each QueueId.
      64             : // A (RibOut, QueueId) pair is considered to be active if the PeerState isn't
      65             : // send_ready and there's RouteUpdates for the pair.
      66             : //
      67             : class BgpSenderPartition::PeerState {
      68             : public:
      69             :     typedef map<size_t, PeerRibState> Map;
      70             : 
      71             :     class iterator : public boost::iterator_facade<
      72             :         iterator, RibOut, boost::forward_traversal_tag> {
      73             :     public:
      74       33570 :         explicit iterator(const RibStateMap &indexmap, Map *map, size_t index)
      75       33570 :             : indexmap_(indexmap), map_(map), index_(index) {
      76       33570 :         }
      77             :         size_t index() const { return index_; }
      78       11426 :         RibState *rib_state() { return indexmap_.At(index_); }
      79             :         const PeerRibState &peer_rib_state() const { return (*map_)[index_]; }
      80             : 
      81             :     private:
      82             :         friend class boost::iterator_core_access;
      83       11426 :         void increment() {
      84       11426 :             Map::const_iterator loc = map_->upper_bound(index_);
      85       11426 :             if (loc == map_->end()) {
      86       11072 :                 index_ = -1;
      87             :             } else {
      88         354 :                 index_ = loc->first;
      89             :             }
      90       11426 :         }
      91       22498 :         bool equal(const iterator &rhs) const {
      92       22498 :             return index_ == rhs.index_;
      93             :         }
      94             :         RibOut &dereference() const;
      95             : 
      96             :         const RibStateMap &indexmap_;
      97             :         Map *map_;
      98             :         size_t index_;
      99             :     };
     100             : 
     101             :     class circular_iterator : public boost::iterator_facade<
     102             :         circular_iterator, RibOut, boost::forward_traversal_tag> {
     103             :     public:
     104        1196 :         explicit circular_iterator(const RibStateMap &indexmap, Map *map,
     105             :                                    int start, bool is_valid)
     106        1196 :         : indexmap_(indexmap), map_(map), index_(-1), match_(true) {
     107        1196 :             if (map_->empty()) {
     108           0 :                 return;
     109             :             }
     110        1196 :             Map::const_iterator loc = map_->lower_bound(start);
     111        1196 :             if (loc == map_->end()) {
     112        1053 :                 loc = map_->begin();
     113             :             }
     114        1196 :             index_ = loc->first;
     115        1196 :             if (is_valid) match_ = false;
     116             :         }
     117          37 :         int index() const { return index_; }
     118         446 :         RibState *rib_state() { return indexmap_.At(index_); }
     119         485 :         const PeerRibState &peer_rib_state() const { return (*map_)[index_]; }
     120             : 
     121             :     private:
     122             :         friend class boost::iterator_core_access;
     123         448 :         void increment() {
     124         448 :             match_ = true;
     125         448 :             assert(!map_->empty());
     126         448 :             Map::const_iterator loc = map_->upper_bound(index_);
     127         448 :             if (loc == map_->end()) {
     128         337 :                 loc = map_->begin();
     129             :             }
     130         448 :             index_ = loc->first;
     131         448 :         }
     132         822 :         bool equal(const circular_iterator &rhs) const {
     133         822 :             return ((match_ == rhs.match_) && (index_ == rhs.index_));
     134             :         }
     135             :         RibOut &dereference() const;
     136             : 
     137             :         const RibStateMap &indexmap_;
     138             :         Map *map_;
     139             :         int index_;
     140             :         bool match_;
     141             :     };
     142             : 
     143       81317 :     explicit PeerState(IPeerUpdate *peer)
     144       81317 :         : key_(peer), index_(-1),
     145       81317 :         qactive_cnt_(RibOutUpdates::QCOUNT),
     146       81317 :         in_sync_(true), rib_iterator_(BitSet::npos) {
     147       81317 :         send_ready_ = true;
     148      243951 :         for (int i = 0; i < RibOutUpdates::QCOUNT; i++) {
     149      162634 :             qactive_cnt_[i] = 0;
     150             :         }
     151       81317 :     }
     152             : 
     153             :     void Add(RibState *rs);
     154             : 
     155             :     void Remove(RibState *rs);
     156             : 
     157         576 :     bool IsMember(size_t index) const {
     158         576 :         return rib_bitset_.test(index);
     159             :     }
     160             : 
     161       11072 :     iterator begin(const RibStateMap &indexmap) {
     162       11072 :         Map::const_iterator it = rib_set_.begin();
     163       11072 :         size_t index = (it != rib_set_.end() ? it->first : -1);
     164       11072 :         return iterator(indexmap, &rib_set_, index);
     165             :     }
     166       22498 :     iterator end(const RibStateMap &indexmap) {
     167       22498 :         return iterator(indexmap, &rib_set_, -1);
     168             :     }
     169             : 
     170         374 :     circular_iterator circular_begin(const RibStateMap &indexmap) {
     171         374 :         return circular_iterator(indexmap, &rib_set_, rib_iterator_, true);
     172             :     }
     173         822 :     circular_iterator circular_end(const RibStateMap &indexmap) {
     174         822 :         return circular_iterator(indexmap, &rib_set_, rib_iterator_, false);
     175             :     }
     176             : 
     177          37 :     void SetIteratorStart(size_t start) { rib_iterator_ = start; }
     178             : 
     179       23648 :     void SetQueueActive(size_t rib_index, int queue_id) {
     180       23648 :         CHECK_CONCURRENCY("bgp::SendUpdate");
     181       23650 :         Map::iterator loc = rib_set_.find(rib_index);
     182       23649 :         assert(loc != rib_set_.end());
     183       23647 :         if (!BitIsSet(loc->second.qactive, queue_id)) {
     184       21402 :             SetBit(loc->second.qactive, queue_id);
     185       21402 :             qactive_cnt_[queue_id]++;
     186             :         }
     187       23645 :     }
     188             : 
     189      735219 :     void SetQueueInactive(size_t rib_index, int queue_id) {
     190      735219 :         CHECK_CONCURRENCY("bgp::SendUpdate", "bgp::PeerMembership");
     191      735219 :         Map::iterator loc = rib_set_.find(rib_index);
     192      735219 :         assert(loc != rib_set_.end());
     193      735219 :         if (BitIsSet(loc->second.qactive, queue_id)) {
     194       21427 :             ClearBit(loc->second.qactive, queue_id);
     195       21427 :             qactive_cnt_[queue_id]--;
     196             :         }
     197      735219 :     }
     198             : 
     199           0 :     bool IsQueueActive(size_t rib_index, int queue_id) {
     200           0 :         CHECK_CONCURRENCY("bgp::SendUpdate");
     201           0 :         Map::iterator loc = rib_set_.find(rib_index);
     202           0 :         assert(loc != rib_set_.end());
     203           0 :         return BitIsSet(loc->second.qactive, queue_id);
     204             :     }
     205             : 
     206       10701 :     int QueueCount(int queue_id) { return qactive_cnt_[queue_id]; }
     207             : 
     208     3793432 :     IPeerUpdate *peer() const { return key_; }
     209       81317 :     void set_index(size_t index) { index_ = index; }
     210      816547 :     size_t index() const { return index_; }
     211             : 
     212     1943369 :     bool in_sync() const { return in_sync_; }
     213        4070 :     void clear_sync() { in_sync_ = false; }
     214             :     void SetSync();
     215             : 
     216       19228 :     bool send_ready() const { return send_ready_; }
     217        4459 :     void set_send_ready(bool toggle) { send_ready_ = toggle; }
     218             : 
     219      367405 :     bool empty() const { return rib_set_.empty(); }
     220             : 
     221         420 :     bool CheckInvariants() const {
     222        1260 :         for (int i = 0; i < RibOutUpdates::QCOUNT; i++) {
     223         840 :             CHECK_INVARIANT(qactive_cnt_[i] <= (int) rib_set_.size());
     224             :         }
     225         420 :         return true;
     226             :     }
     227             : 
     228             : private:
     229             :     IPeerUpdate *key_;
     230             :     size_t index_;          // assigned from PeerStateMap
     231             :     Map rib_set_;           // list of RibOuts advertised by the peer.
     232             :     BitSet rib_bitset_;     // bitset of RibOuts advertised by the peer
     233             :     vector<int> qactive_cnt_;
     234             :     bool in_sync_;          // whether the peer may dequeue tail markers.
     235             :     std::atomic<bool> send_ready_;    // whether the peer may send updates.
     236             :     size_t rib_iterator_;   // index of last processed rib.
     237             : 
     238             :     DISALLOW_COPY_AND_ASSIGN(PeerState);
     239             : };
     240             : 
     241             : //
     242             : // This nested class represents RibOut related state that's specific to the
     243             : // BgpSenderPartition.
     244             : //
     245             : // A RibState contains a BitSet of all the peers that are advertising the
     246             : // RibOut associated with the RibState.
     247             : //
     248             : // The BitSet is used in conjunction with PeerStateMap in BgpSenderPartition
     249             : // to implement an iterator that is used to walk through all the PeerState
     250             : // entries for the ribout.
     251             : //
     252             : class BgpSenderPartition::RibState {
     253             : public:
     254             :     class iterator : public boost::iterator_facade<
     255             :         iterator, PeerState, boost::forward_traversal_tag> {
     256             :     public:
     257     3167811 :         explicit iterator(const PeerStateMap &indexmap,
     258             :                           const BitSet &set, size_t index)
     259     3167811 :             : indexmap_(indexmap), set_(set), index_(index) {
     260     3167811 :         }
     261             :         size_t index() const { return index_; }
     262             : 
     263             :     private:
     264             :         friend class boost::iterator_core_access;
     265     1936741 :         void increment() {
     266     1936741 :             index_ = set_.find_next(index_);
     267     1936410 :         }
     268     2552083 :         bool equal(const iterator &rhs) const {
     269     2552083 :             return index_ == rhs.index_;
     270             :         }
     271     1936924 :         PeerState &dereference() const {
     272     1936924 :             return *indexmap_.At(index_);
     273             :         }
     274             :         const PeerStateMap &indexmap_;
     275             :         const BitSet &set_;
     276             :         size_t index_;
     277             :     };
     278             : 
     279      135288 :     explicit RibState(RibOut *ribout)
     280      135288 :         : key_(ribout), index_(-1), in_sync_(RibOutUpdates::QCOUNT, true) {
     281      135288 :     }
     282             : 
     283             :     void Add(PeerState *ps);
     284             :     void Remove(PeerState *ps);
     285             : 
     286             :     bool QueueSync(int queue_id);
     287             :     void SetQueueSync(int queue_id);
     288             :     void SetQueueUnsync(int queue_id);
     289             : 
     290         518 :     RibOut *ribout() { return key_; }
     291             : 
     292      615888 :     iterator begin(const PeerStateMap &indexmap) {
     293      615888 :         return iterator(indexmap, peer_set_, peer_set_.find_first());
     294             :     }
     295             : 
     296     2552015 :     iterator end(const PeerStateMap &indexmap) {
     297     2552015 :         return iterator(indexmap, peer_set_, BitSet::npos);
     298             :     }
     299             : 
     300      135288 :     void set_index(size_t index) { index_ = index; }
     301     2363919 :     size_t index() const { return index_; }
     302             : 
     303      367405 :     bool empty() const { return peer_set_.none(); }
     304             : 
     305         576 :     const BitSet &peer_set() const { return peer_set_; }
     306             : 
     307             : private:
     308             :     RibOut *key_;
     309             :     size_t index_;
     310             :     BitSet peer_set_;
     311             :     vector<bool> in_sync_;
     312             : 
     313             :     DISALLOW_COPY_AND_ASSIGN(RibState);
     314             : };
     315             : 
     316             : 
     317      367405 : void BgpSenderPartition::RibState::Add(BgpSenderPartition::PeerState *ps) {
     318      367405 :     CHECK_CONCURRENCY("bgp::PeerMembership");
     319      367405 :     peer_set_.set(ps->index());
     320      367405 : }
     321             : 
     322      367405 : void BgpSenderPartition::RibState::Remove(BgpSenderPartition::PeerState *ps) {
     323      367405 :     CHECK_CONCURRENCY("bgp::PeerMembership");
     324      367405 :     peer_set_.reset(ps->index());
     325      367405 : }
     326             : 
     327       10850 : bool BgpSenderPartition::RibState::QueueSync(int queue_id) {
     328       10850 :     CHECK_CONCURRENCY("bgp::SendUpdate");
     329       10850 :     return (in_sync_[queue_id]);
     330             : }
     331             : 
     332          72 : void BgpSenderPartition::RibState::SetQueueSync(int queue_id) {
     333          72 :     CHECK_CONCURRENCY("bgp::SendUpdate");
     334          72 :     in_sync_[queue_id] = true;
     335          72 : }
     336             : 
     337        6091 : void BgpSenderPartition::RibState::SetQueueUnsync(int queue_id) {
     338        6091 :     CHECK_CONCURRENCY("bgp::SendUpdate");
     339        6092 :     in_sync_[queue_id] = false;
     340        6092 : }
     341             : 
     342      367405 : void BgpSenderPartition::PeerState::Add(RibState *rs) {
     343      367405 :     CHECK_CONCURRENCY("bgp::PeerMembership");
     344      367405 :     PeerRibState init;
     345      367405 :     rib_set_.insert(make_pair(rs->index(), init));
     346      367405 :     rib_bitset_.set(rs->index());
     347      367405 : }
     348             : 
     349      367405 : void BgpSenderPartition::PeerState::Remove(RibState *rs) {
     350      367405 :     CHECK_CONCURRENCY("bgp::PeerMembership");
     351     1102215 :     for (int queue_id = 0; queue_id < RibOutUpdates::QCOUNT; queue_id++) {
     352      734810 :         SetQueueInactive(rs->index(), queue_id);
     353             :     }
     354      367405 :     rib_set_.erase(rs->index());
     355      367405 :     rib_bitset_.reset(rs->index());
     356      367405 : }
     357             : 
     358        5326 : void BgpSenderPartition::PeerState::SetSync() {
     359        5326 :     CHECK_CONCURRENCY("bgp::SendUpdate");
     360       10751 :     for (Map::iterator it = rib_set_.begin(); it != rib_set_.end(); ++it) {
     361        5425 :         assert(it->second.qactive == 0);
     362             :     }
     363       15978 :     for (int i = 0; i < RibOutUpdates::QCOUNT; i++) {
     364       10652 :         assert(qactive_cnt_[i] == 0);
     365             :     }
     366        5326 :     in_sync_ = true;
     367        5326 : }
     368             : 
     369          72 : RibOut &BgpSenderPartition::PeerState::iterator::dereference() const {
     370          72 :     return *indexmap_.At(index_)->ribout();
     371             : }
     372             : 
     373         446 : RibOut &BgpSenderPartition::PeerState::circular_iterator::dereference() const {
     374         446 :     return *indexmap_.At(index_)->ribout();
     375             : }
     376             : 
     377             : class BgpSenderPartition::Worker : public Task {
     378             : public:
     379      219665 :     explicit Worker(BgpSenderPartition *partition)
     380      219665 :         : Task(partition->task_id(), partition->index()),
     381      219665 :           partition_(partition) {
     382      219673 :     }
     383             : 
     384      219740 :     virtual bool Run() {
     385      219740 :         CHECK_CONCURRENCY("bgp::SendUpdate");
     386             : 
     387             :         while (true) {
     388      835516 :             unique_ptr<WorkBase> wentry = partition_->WorkDequeue();
     389      835394 :             if (!wentry.get())
     390      219691 :                 break;
     391      615686 :             if (!wentry->valid)
     392           1 :                 continue;
     393      615713 :             switch (wentry->type) {
     394      615316 :             case WorkBase::WRibOut: {
     395      615316 :                 WorkRibOut *workrib = static_cast<WorkRibOut *>(wentry.get());
     396      615300 :                 partition_->UpdateRibOut(workrib->ribout, workrib->queue_id);
     397      615427 :                 break;
     398             :             }
     399         369 :             case WorkBase::WPeer: {
     400         369 :                 WorkPeer *workpeer = static_cast<WorkPeer *>(wentry.get());
     401         389 :                 partition_->UpdatePeer(workpeer->peer);
     402         389 :                 break;
     403             :             }
     404             :             }
     405     1451324 :         }
     406             : 
     407      219690 :         return true;
     408             :     }
     409           0 :     string Description() const { return "BgpSenderPartition::Worker"; }
     410             : 
     411             : private:
     412             :     BgpSenderPartition *partition_;
     413             : };
     414             : 
     415       38479 : BgpSenderPartition::BgpSenderPartition(BgpUpdateSender *sender, int index)
     416       38479 :     : sender_(sender),
     417       38479 :       index_(index),
     418       38479 :       running_(false),
     419       38479 :       disabled_(false),
     420       38479 :       worker_task_(NULL) {
     421       38479 : }
     422             : 
     423       38479 : BgpSenderPartition::~BgpSenderPartition() {
     424       38479 :     if (worker_task_) {
     425           0 :         TaskScheduler *scheduler = TaskScheduler::GetInstance();
     426           0 :         scheduler->Cancel(worker_task_);
     427             :     }
     428       38479 :     assert(peer_state_imap_.empty());
     429       38479 :     assert(rib_state_imap_.empty());
     430       38479 : }
     431             : 
     432      219657 : int BgpSenderPartition::task_id() const {
     433      219657 :     return sender_->task_id();
     434             : }
     435             : 
     436             : //
     437             : // Add the (RibOut, IPeerUpdate) combo to the BgpSenderPartition.
     438             : // Find or create the corresponding RibState and PeerState and sets up the
     439             : // cross-linkage.
     440             : //
     441      367405 : void BgpSenderPartition::Add(RibOut *ribout, IPeerUpdate *peer) {
     442      367405 :     CHECK_CONCURRENCY("bgp::PeerMembership");
     443             : 
     444      367405 :     RibState *rs = rib_state_imap_.Locate(ribout);
     445      367405 :     PeerState *ps = peer_state_imap_.Locate(peer);
     446      367405 :     rs->Add(ps);
     447      367405 :     ps->Add(rs);
     448      367405 : }
     449             : 
     450             : //
     451             : // Remove the (RibOut, IPeerUpdate) combo from the BgpSenderPartition.
     452             : // Decouple cross linkage between the corresponding RibState and PeerState and
     453             : // get rid of the RibState and/or PeerState if they are no longer needed.
     454             : //
     455      367405 : void BgpSenderPartition::Remove(RibOut *ribout, IPeerUpdate *peer) {
     456      367405 :     CHECK_CONCURRENCY("bgp::PeerMembership");
     457             : 
     458      367405 :     RibState *rs = rib_state_imap_.Find(ribout);
     459      367405 :     PeerState *ps = peer_state_imap_.Find(peer);
     460      367405 :     assert(rs != NULL);
     461      367405 :     assert(ps != NULL);
     462      367405 :     rs->Remove(ps);
     463      367405 :     ps->Remove(rs);
     464      367405 :     if (rs->empty()) {
     465      135288 :         WorkRibOutInvalidate(ribout);
     466      135288 :         rib_state_imap_.Remove(ribout, rs->index());
     467             :     }
     468      367405 :     if (ps->empty())  {
     469       81317 :         WorkPeerInvalidate(peer);
     470       81317 :         peer_state_imap_.Remove(peer, ps->index());
     471             :     }
     472      367405 : }
     473             : 
     474             : //
     475             : // Create and enqueue new WorkRibOut entry since the RibOut is now
     476             : // active.
     477             : //
     478      615209 : void BgpSenderPartition::RibOutActive(RibOut *ribout, int queue_id) {
     479      615209 :     CHECK_CONCURRENCY("db::DBTable", "bgp::SendUpdate", "bgp::PeerMembership");
     480             : 
     481      615296 :     WorkRibOutEnqueue(ribout, queue_id);
     482      615364 : }
     483             : 
     484             : //
     485             : // Mark an IPeerUpdate to be send ready.
     486             : //
     487        5146 : void BgpSenderPartition::PeerSendReady(IPeerUpdate *peer) {
     488        5146 :     CHECK_CONCURRENCY("bgp::SendReadyTask");
     489             : 
     490             :     // The IPeerUpdate may not be registered if it has not reached Established
     491             :     // state or it may already have been unregistered by the time we get around
     492             :     // to processing the notification.
     493        5146 :     PeerState *ps = peer_state_imap_.Find(peer);
     494        5146 :     if (!ps)
     495           0 :         return;
     496             : 
     497             :     // Nothing to do if the IPeerUpdate's already in that state.
     498        5146 :     if (ps->send_ready())
     499        4757 :         return;
     500             : 
     501             :     // Create and enqueue new WorkPeer entry.
     502         389 :     ps->set_send_ready(true);
     503         389 :     WorkPeerEnqueue(peer);
     504             : }
     505             : 
     506             : //
     507             : // Return true if the IPeer is send ready.
     508             : //
     509        2905 : bool BgpSenderPartition::PeerIsSendReady(IPeerUpdate *peer) const {
     510        2905 :     CHECK_CONCURRENCY("bgp::PeerMembership", "bgp::ShowCommand");
     511             : 
     512        2905 :     PeerState *ps = peer_state_imap_.Find(peer);
     513        2905 :     return ps->send_ready();
     514             : }
     515             : 
     516             : //
     517             : // Return true if the IPeer is registered.
     518             : //
     519        9329 : bool BgpSenderPartition::PeerIsRegistered(IPeerUpdate *peer) const {
     520        9329 :     CHECK_CONCURRENCY("bgp::PeerMembership", "bgp::ShowCommand");
     521             : 
     522        9329 :     return (peer_state_imap_.Find(peer) != NULL);
     523             : }
     524             : 
     525             : //
     526             : // Return true if the IPeer is in sync.
     527             : //
     528        7122 : bool BgpSenderPartition::PeerInSync(IPeerUpdate *peer) const {
     529        7122 :     CHECK_CONCURRENCY("bgp::PeerMembership", "bgp::ShowCommand");
     530             : 
     531        7122 :     PeerState *ps = peer_state_imap_.Find(peer);
     532        7122 :     return (ps ? ps->in_sync() : false);
     533             : }
     534             : 
     535             : //
     536             : // Create a Worker if warranted and enqueue it to the TaskScheduler.
     537             : // Assumes that the caller holds the BgpSenderPartition mutex.
     538             : //
     539      615531 : void BgpSenderPartition::MaybeStartWorker() {
     540      615531 :     if (!running_ && !disabled_) {
     541      219661 :         worker_task_ = new Worker(this);
     542      219673 :         TaskScheduler *scheduler = TaskScheduler::GetInstance();
     543      219671 :         scheduler->Enqueue(worker_task_);
     544      219749 :         running_ = true;
     545             :     }
     546      615619 : }
     547             : 
     548             : //
     549             : // Dequeue the first WorkBase item from the work queue and return an
     550             : // unique_ptr to it.  Clear out Worker related state if the work queue
     551             : // is empty.
     552             : //
     553      835540 : unique_ptr<BgpSenderPartition::WorkBase> BgpSenderPartition::WorkDequeue() {
     554      835540 :     CHECK_CONCURRENCY("bgp::SendUpdate");
     555             : 
     556      835427 :     unique_ptr<WorkBase> wentry;
     557      835427 :     if (work_queue_.empty()) {
     558      219697 :         worker_task_ = NULL;
     559      219697 :         running_ = false;
     560             :     } else {
     561      615726 :         wentry.reset(work_queue_.pop_front().release());
     562             :     }
     563      835410 :     return wentry;
     564           0 : }
     565             : 
     566             : //
     567             : // Enqueue a WorkBase entry into the the work queue and start a new Worker
     568             : // task if required.
     569             : //
     570      615691 : void BgpSenderPartition::WorkEnqueue(WorkBase *wentry) {
     571      615691 :     CHECK_CONCURRENCY("db::DBTable", "bgp::SendUpdate", "bgp::SendReadyTask",
     572             :         "bgp::PeerMembership");
     573             : 
     574      615672 :     work_queue_.push_back(wentry);
     575      615494 :     MaybeStartWorker();
     576      615752 : }
     577             : 
     578             : //
     579             : // Disable or enable the worker.
     580             : // For unit testing.
     581             : //
     582          34 : void BgpSenderPartition::set_disabled(bool disabled) {
     583          34 :     disabled_ = disabled;
     584          34 :     MaybeStartWorker();
     585          34 : }
     586             : 
     587             : //
     588             : // Enqueue a WorkPeer to the work queue.
     589             : //
     590         389 : void BgpSenderPartition::WorkPeerEnqueue(IPeerUpdate *peer) {
     591         389 :     CHECK_CONCURRENCY("bgp::SendReadyTask");
     592             : 
     593         389 :     WorkBase *wentry = new WorkPeer(peer);
     594         389 :     WorkEnqueue(wentry);
     595         389 : }
     596             : 
     597             : //
     598             : // Invalidate all WorkBases for the given IPeerUpdate.
     599             : // Used when a IPeerUpdate is removed.
     600             : //
     601       81317 : void BgpSenderPartition::WorkPeerInvalidate(IPeerUpdate *peer) {
     602       81317 :     CHECK_CONCURRENCY("bgp::PeerMembership");
     603             : 
     604       81317 :     for (WorkQueue::iterator it = work_queue_.begin();
     605       81342 :          it != work_queue_.end(); ++it) {
     606          25 :         WorkBase *wentry = it.operator->();
     607          25 :         if (wentry->type != WorkBase::WPeer)
     608          25 :             continue;
     609           0 :         WorkPeer *wpeer = static_cast<WorkPeer *>(wentry);
     610           0 :         if (wpeer->peer != peer)
     611           0 :             continue;
     612           0 :         wpeer->valid = false;
     613             :     }
     614       81317 : }
     615             : 
     616             : //
     617             : // Enqueue a WorkRibOut to the work queue.
     618             : //
     619      615290 : void BgpSenderPartition::WorkRibOutEnqueue(RibOut *ribout, int queue_id) {
     620      615290 :     CHECK_CONCURRENCY("db::DBTable", "bgp::SendUpdate", "bgp::PeerMembership");
     621             : 
     622      615278 :     WorkBase *wentry = new WorkRibOut(ribout, queue_id);
     623      615306 :     WorkEnqueue(wentry);
     624      615364 : }
     625             : 
     626             : //
     627             : // Invalidate all WorkBases for the given RibOut.
     628             : // Used when a RibOut is removed.
     629             : //
     630      135288 : void BgpSenderPartition::WorkRibOutInvalidate(RibOut *ribout) {
     631      135288 :     CHECK_CONCURRENCY("bgp::PeerMembership");
     632             : 
     633      135288 :     for (WorkQueue::iterator it = work_queue_.begin();
     634      135560 :          it != work_queue_.end(); ++it) {
     635         272 :         WorkBase *wentry = it.operator->();
     636         272 :         if (wentry->type != WorkBase::WRibOut)
     637           0 :             continue;
     638         272 :         WorkRibOut *wribout = static_cast<WorkRibOut *>(wentry);
     639         272 :         if (wribout->ribout != ribout)
     640         271 :             continue;
     641           1 :         wribout->valid = false;
     642             :     }
     643      135288 : }
     644             : 
     645             : //
     646             : // Build the RibPeerSet of IPeers for the RibOut that are in sync. Note that
     647             : // we need to use bit indices that are specific to the RibOut, not the ones
     648             : // from the BgpSenderPartition.
     649             : //
     650      615689 : void BgpSenderPartition::BuildSyncBitSet(const RibOut *ribout, RibState *rs,
     651             :     RibPeerSet *msync) {
     652      615689 :     CHECK_CONCURRENCY("bgp::SendUpdate");
     653             : 
     654      615752 :     for (RibState::iterator it = rs->begin(peer_state_imap_);
     655     2551957 :          it != rs->end(peer_state_imap_); ++it) {
     656     1936354 :         PeerState *ps = it.operator->();
     657             : 
     658             :         // If the PeerState is in sync but the IPeerUpdate is not send ready
     659             :         // then update the sync and send ready state in the PeerState.  Note
     660             :         // that the RibOut queue for the PeerState will get marked active via
     661             :         // the call the SetQueueActive from UpdateRibOut.
     662     1936247 :         if (ps->in_sync()) {
     663     1897042 :             if (ps->peer()->send_ready()) {
     664     1896942 :                 int rix = ribout->GetPeerIndex(ps->peer());
     665     1896534 :                 msync->set(rix);
     666             :             } else {
     667          66 :                 ps->clear_sync();
     668          66 :                 ps->set_send_ready(false);
     669             :             }
     670             :         }
     671             :     }
     672      615619 : }
     673             : 
     674             : //
     675             : // Take the RibPeerSet of blocked IPeers and update the relevant PeerStates.
     676             : // Note that bit indices in the RibPeerSet and are specific to the RibOut.
     677             : //
     678      616279 : void BgpSenderPartition::SetSendBlocked(const RibOut *ribout, RibState *rs,
     679             :     int queue_id, const RibPeerSet &blocked) {
     680      616279 :     CHECK_CONCURRENCY("bgp::SendUpdate");
     681             : 
     682      620190 :     for (size_t bit = blocked.find_first(); bit != RibPeerSet::npos;
     683        4005 :          bit = blocked.find_next(bit)) {
     684        4005 :         IPeerUpdate *peer = ribout->GetPeer(bit);
     685        4005 :         PeerState *ps = peer_state_imap_.Find(peer);
     686        4004 :         ps->SetQueueActive(rs->index(), queue_id);
     687        4004 :         ps->clear_sync();
     688        4004 :         ps->set_send_ready(false);
     689             :     }
     690      616186 : }
     691             : 
     692             : //
     693             : // For unit testing only.
     694             : // Take the RibPeerSet of blocked IPeers and update the relevant PeerStates.
     695             : //
     696           8 : void BgpSenderPartition::SetSendBlocked(RibOut *ribout,
     697             :     int queue_id, const RibPeerSet &blocked) {
     698           8 :     CHECK_CONCURRENCY("bgp::SendUpdate");
     699             : 
     700           8 :     RibState *rs = rib_state_imap_.Find(ribout);
     701           8 :     assert(rs);
     702           8 :     SetSendBlocked(ribout, rs, queue_id, blocked);
     703           8 : }
     704             : 
     705             : //
     706             : // Concurrency: called from bgp send task.
     707             : //
     708             : // Take the RibPeerSet of unsync IPeers and update the relevant PeerStates.
     709             : // Note that bit indices in the RibPeerSet and are specific to the RibOut.
     710             : //
     711      615713 : void BgpSenderPartition::SetQueueActive(const RibOut *ribout, RibState *rs,
     712             :     int queue_id, const RibPeerSet &munsync) {
     713      615713 :     CHECK_CONCURRENCY("bgp::SendUpdate");
     714             : 
     715      635361 :     for (size_t bit = munsync.find_first(); bit != RibPeerSet::npos;
     716       19642 :          bit = munsync.find_next(bit)) {
     717       19651 :         IPeerUpdate *peer = ribout->GetPeer(bit);
     718       19649 :         PeerState *ps = peer_state_imap_.Find(peer);
     719       19644 :         ps->SetQueueActive(rs->index(), queue_id);
     720             :     }
     721      615708 : }
     722             : 
     723             : //
     724             : // Concurrency: called from bgp send task.
     725             : //
     726             : // Mark the PeerRibState corresponding to the given IPeerUpdate and RibOut
     727             : // as active.
     728             : //
     729             : // Used by unit test code.
     730             : //
     731           0 : void BgpSenderPartition::SetQueueActive(RibOut *ribout, int queue_id,
     732             :     IPeerUpdate *peer) {
     733           0 :     CHECK_CONCURRENCY("bgp::SendUpdate");
     734             : 
     735           0 :     PeerState *ps = peer_state_imap_.Find(peer);
     736           0 :     RibState *rs = rib_state_imap_.Find(ribout);
     737           0 :     ps->SetQueueActive(rs->index(), queue_id);
     738           0 : }
     739             : 
     740             : //
     741             : // Concurrency: called from bgp send task.
     742             : //
     743             : // Check if the queue corresponding to IPeerUpdate, Ribout and queue id is
     744             : // active.
     745             : //
     746             : // Used by unit test code.
     747             : //
     748           0 : bool BgpSenderPartition::IsQueueActive(RibOut *ribout, int queue_id,
     749             :     IPeerUpdate *peer) {
     750           0 :     CHECK_CONCURRENCY("bgp::SendUpdate");
     751             : 
     752           0 :     PeerState *ps = peer_state_imap_.Find(peer);
     753           0 :     RibState *rs = rib_state_imap_.Find(ribout);
     754           0 :     return ps->IsQueueActive(rs->index(), queue_id);
     755             : }
     756             : 
     757             : //
     758             : // Concurrency: called from bgp send task.
     759             : //
     760             : // Mark all the RibStates for the given peer and queue id as being in sync
     761             : // and trigger a tail dequeue.
     762             : //
     763       10652 : void BgpSenderPartition::SetQueueSync(PeerState *ps, int queue_id) {
     764       10652 :     CHECK_CONCURRENCY("bgp::SendUpdate");
     765             : 
     766       10652 :     for (PeerState::iterator it = ps->begin(rib_state_imap_);
     767       21502 :          it != ps->end(rib_state_imap_); ++it) {
     768       10850 :          RibState *rs = it.rib_state();
     769       10850 :          if (!rs->QueueSync(queue_id)) {
     770          72 :              RibOut *ribout = it.operator->();
     771          72 :              RibOutActive(ribout, queue_id);
     772          72 :              rs->SetQueueSync(queue_id);
     773             :          }
     774             :     }
     775       10652 : }
     776             : 
     777             : //
     778             : // Drain the queue until there are no more updates or all the members become
     779             : // blocked.
     780             : //
     781      615705 : void BgpSenderPartition::UpdateRibOut(RibOut *ribout, int queue_id) {
     782      615705 :     CHECK_CONCURRENCY("bgp::SendUpdate");
     783             : 
     784      615790 :     RibOutUpdates *updates = ribout->updates(index_);
     785      615794 :     RibState *rs = rib_state_imap_.Find(ribout);
     786      615682 :     RibPeerSet msync;
     787             : 
     788             :     // Convert group in-sync list to rib specific bitset.
     789      615689 :     BuildSyncBitSet(ribout, rs, &msync);
     790             : 
     791             :     // Drain the queue till we can do no more.
     792      615621 :     RibPeerSet blocked, munsync;
     793      615624 :     bool done = updates->TailDequeue(queue_id, msync, &blocked, &munsync);
     794      615805 :     assert(msync.Contains(blocked));
     795             : 
     796             :     // Mark peers as send blocked.
     797      615838 :     SetSendBlocked(ribout, rs, queue_id, blocked);
     798             : 
     799             :     // Set the queue to be active for any unsync peers. If we don't do this,
     800             :     // we will forget to mark the (RibOut,QueueId) as active for these peers
     801             :     // since the blocked RibPeerSet does not contain peers that are already
     802             :     // out of sync.  Note that the unsync peers would have been split from
     803             :     // the tail marker in TailDequeue.
     804      615733 :     SetQueueActive(ribout, rs, queue_id, munsync);
     805             : 
     806             :     // If all peers are blocked, mark the queue as unsync in the RibState. We
     807             :     // will trigger tail dequeue for the (RibOut,QueueId) when any peer that
     808             :     // is interested in the RibOut becomes in sync.
     809      615719 :     if (!done)
     810        6091 :         rs->SetQueueUnsync(queue_id);
     811      615720 : }
     812             : 
     813             : //
     814             : // Go through all RibOuts for the IPeerUpdate and drain the given queue till it
     815             : // is up-to date or it becomes blocked. If it's blocked, select the next RibOut
     816             : // to be processed when the IPeerUpdate becomes send ready.
     817             : //
     818             : // Return false if the IPeerUpdate got blocked.
     819             : //
     820         374 : bool BgpSenderPartition::UpdatePeerQueue(IPeerUpdate *peer, PeerState *ps,
     821             :     int queue_id) {
     822         374 :     CHECK_CONCURRENCY("bgp::SendUpdate");
     823             : 
     824         374 :     for (PeerState::circular_iterator it = ps->circular_begin(rib_state_imap_);
     825         822 :          it != ps->circular_end(rib_state_imap_); ++it) {
     826             :         // Skip if this queue is not active in the PeerRibState.
     827         485 :         if (!BitIsSet(it.peer_rib_state().qactive, queue_id))
     828          39 :             continue;
     829             : 
     830             :         // Drain the queue till we can do no more.
     831         446 :         RibOut *ribout = it.operator->();
     832         446 :         RibOutUpdates *updates = ribout->updates(index_);
     833         446 :         RibPeerSet blocked;
     834         446 :         bool done = updates->PeerDequeue(queue_id, peer, &blocked);
     835             : 
     836             :         // Process blocked mask.
     837         446 :         RibState *rs = it.rib_state();
     838         446 :         SetSendBlocked(ribout, rs, queue_id, blocked);
     839             : 
     840             :         // If the peer is still send_ready, mark the queue as inactive for
     841             :         // the peer.  Need to check send_ready because the return value of
     842             :         // PeerDequeue only tells that *some* peer was merged with the tail
     843             :         // marker.
     844             :         // If the peer got blocked, remember where to start next time and
     845             :         // stop processing. We don't want to continue processing for other
     846             :         // merged peers if the lead peer is blocked.  Processing for other
     847             :         // peers will continue when their own WorkPeer items are processed.
     848         446 :         if (ps->send_ready()) {
     849         409 :             assert(done);
     850         409 :             ps->SetQueueInactive(rs->index(), queue_id);
     851             :         } else {
     852          37 :             ps->SetIteratorStart(it.index());
     853          37 :             return false;
     854             :         }
     855         446 :     }
     856             : 
     857         337 :     return true;
     858             : }
     859             : 
     860             : //
     861             : // Drain the queue of all updates for this IPeerUpdate, until it is up-to date
     862             : // or it becomes blocked.
     863             : //
     864        5368 : void BgpSenderPartition::UpdatePeer(IPeerUpdate *peer) {
     865        5368 :     CHECK_CONCURRENCY("bgp::SendUpdate");
     866             : 
     867             :     // Bail if the PeerState is not send ready.
     868        5368 :     PeerState *ps = peer_state_imap_.Find(peer);
     869        5368 :     if (!ps->send_ready()) {
     870           5 :         return;
     871             :     }
     872             : 
     873             :     // Update the PeerState and bail if the IPeerUpdate is not send ready.
     874             :     // This happens if the IPeerUpdate gets blocked while processing some
     875             :     // other partition.
     876        5363 :     if (!peer->send_ready()) {
     877           0 :         ps->set_send_ready(false);
     878           0 :         return;
     879             :     }
     880             : 
     881             :     // Go through all queues and drain them if there's anything on them.
     882       16027 :     for (int queue_id = RibOutUpdates::QCOUNT - 1; queue_id >= 0; --queue_id) {
     883       10701 :         if (ps->QueueCount(queue_id) == 0) {
     884       10327 :             continue;
     885             :         }
     886         374 :         if (!UpdatePeerQueue(peer, ps, queue_id)) {
     887          37 :             assert(!ps->send_ready());
     888          37 :             return;
     889             :         }
     890             :     }
     891             : 
     892             :     // Checking the return value of UpdatePeerQueue above is not sufficient as
     893             :     // that only tells us that *some* peer(s) got merged with the tail marker.
     894             :     // Need to make sure that the IPeerUpdate that we are processing is still
     895             :     // send ready.
     896        5326 :     if (!ps->send_ready()) {
     897           0 :         return;
     898             :     }
     899             : 
     900             :     // Mark the peer as being in sync across all tables.
     901        5326 :     ps->SetSync();
     902             : 
     903             :     // Mark all RibStates for the peer as being in sync. This triggers a tail
     904             :     // dequeue for the corresponding (RibOut, QueueId) if necessary. This in
     905             :     // turn ensures that we do not get stuck in the case where all peers get
     906             :     // blocked and then get back in sync.
     907       15978 :     for (int queue_id = RibOutUpdates::QCOUNT - 1; queue_id >= 0; --queue_id) {
     908       10652 :         SetQueueSync(ps, queue_id);
     909             :     }
     910             : }
     911             : 
     912             : //
     913             : // Check invariants for the BgpSenderPartition.
     914             : //
     915         249 : bool BgpSenderPartition::CheckInvariants() const {
     916         249 :     int grp_peer_count = 0;
     917         249 :     int peer_grp_count = 0;
     918         393 :     for (size_t i = 0; i < rib_state_imap_.size(); i++) {
     919         144 :         if (!rib_state_imap_.bits().test(i)) {
     920           0 :             continue;
     921             :         }
     922         144 :         RibState *rs = rib_state_imap_.At(i);
     923         144 :         assert(rs != NULL);
     924         144 :         CHECK_INVARIANT(rs->index() == i);
     925         144 :         for (RibState::iterator it = rs->begin(peer_state_imap_);
     926         720 :              it != rs->end(peer_state_imap_); ++it) {
     927         576 :             PeerState *ps = it.operator->();
     928         576 :             CHECK_INVARIANT(ps->IsMember(i));
     929         576 :             grp_peer_count++;
     930             :         }
     931             :     }
     932         669 :     for (size_t i = 0; i < peer_state_imap_.size(); i++) {
     933         420 :         if (!peer_state_imap_.bits().test(i)) {
     934           0 :             continue;
     935             :         }
     936         420 :         PeerState *ps = peer_state_imap_.At(i);
     937         420 :         assert(ps != NULL);
     938         420 :         CHECK_INVARIANT(ps->index() == i);
     939         420 :         if (!ps->CheckInvariants()) {
     940           0 :             return false;
     941             :         }
     942         420 :         for (PeerState::iterator it = ps->begin(rib_state_imap_);
     943         996 :              it != ps->end(rib_state_imap_); ++it) {
     944         576 :             RibState *rs = it.rib_state();
     945         576 :             CHECK_INVARIANT(rs->peer_set().test(i));
     946         576 :             peer_grp_count++;
     947             :         }
     948             :     }
     949             : 
     950         249 :     CHECK_INVARIANT(grp_peer_count == peer_grp_count);
     951         249 :     return true;
     952             : }
     953             : 
     954             : //
     955             : // Constructor for BgpUpdateSender.
     956             : // Initialize send ready WorkQueue and allocate BgpSenderPartitions.
     957             : //
     958        9751 : BgpUpdateSender::BgpUpdateSender(BgpServer *server)
     959        9751 :     : server_(server),
     960        9751 :       task_id_(TaskScheduler::GetInstance()->GetTaskId("bgp::SendUpdate")),
     961        9751 :       send_ready_queue_(
     962             :           TaskScheduler::GetInstance()->GetTaskId("bgp::SendReadyTask"), 0,
     963        9751 :           boost::bind(&BgpUpdateSender::SendReadyCallback, this, _1)) {
     964       48230 :     for (int idx = 0; idx < DB::PartitionCount(); ++idx) {
     965       38479 :         partitions_.push_back(new BgpSenderPartition(this, idx));
     966             :     }
     967        9751 : }
     968             : 
     969             : //
     970             : // Destructor for BgpUpdateSender.
     971             : // Shutdown the WorkQueue and delete all BgpSenderPartitions.
     972             : //
     973        9751 : BgpUpdateSender::~BgpUpdateSender() {
     974        9751 :     send_ready_queue_.Shutdown(false);
     975        9751 :     STLDeleteValues(&partitions_);
     976        9751 : }
     977             : 
     978             : //
     979             : // Handle the join of an IPeerUpdate to a RibOut.
     980             : //
     981       93031 : void BgpUpdateSender::Join(RibOut *ribout, IPeerUpdate *peer) {
     982       93031 :     CHECK_CONCURRENCY("bgp::PeerMembership");
     983             : 
     984      827841 :     BOOST_FOREACH(BgpSenderPartition *partition, partitions_) {
     985      367405 :         partition->Add(ribout, peer);
     986             :     }
     987       93031 : }
     988             : 
     989             : //
     990             : // Handle the leave of an IPeerUpdate from a RibOut.
     991             : //
     992       93031 : void BgpUpdateSender::Leave(RibOut *ribout, IPeerUpdate *peer) {
     993       93031 :     CHECK_CONCURRENCY("bgp::PeerMembership");
     994             : 
     995      827841 :     BOOST_FOREACH(BgpSenderPartition *partition, partitions_) {
     996      367405 :         partition->Remove(ribout, peer);
     997             :     }
     998       93031 : }
     999             : 
    1000             : //
    1001             : // Inform the specified BgpSenderPartition that it needs to schedule a tail
    1002             : // dequeue for the given RibOut queue.
    1003             : //
    1004      615128 : void BgpUpdateSender::RibOutActive(int index, RibOut *ribout, int queue_id) {
    1005      615128 :     CHECK_CONCURRENCY("db::DBTable", "bgp::PeerMembership");
    1006             : 
    1007      615154 :     partitions_[index]->RibOutActive(ribout, queue_id);
    1008      615290 : }
    1009             : 
    1010             : //
    1011             : // Concurrency: called from arbitrary task.
    1012             : //
    1013             : // Enqueue the IPeerUpdate to the send ready processing work queue.
    1014             : // The callback is invoked in the context of bgp::SendReadyTask.
    1015             : //
    1016          13 : void BgpUpdateSender::PeerSendReady(IPeerUpdate *peer) {
    1017          13 :     send_ready_queue_.Enqueue(peer);
    1018          13 : }
    1019             : 
    1020             : //
    1021             : // Return true if the IPeer is registered.
    1022             : //
    1023         543 : bool BgpUpdateSender::PeerIsRegistered(IPeerUpdate *peer) const {
    1024         655 :     BOOST_FOREACH(BgpSenderPartition *partition, partitions_) {
    1025         585 :         if (partition->PeerIsRegistered(peer))
    1026         529 :             return true;
    1027             :     }
    1028          14 :     return false;
    1029             : }
    1030             : 
    1031             : //
    1032             : // Return true if the IPeer is in sync.
    1033             : //
    1034         529 : bool BgpUpdateSender::PeerInSync(IPeerUpdate *peer) const {
    1035        4761 :     BOOST_FOREACH(BgpSenderPartition *partition, partitions_) {
    1036        2116 :         if (!partition->PeerInSync(peer))
    1037           0 :             return false;
    1038             :     }
    1039         529 :     return true;
    1040             : }
    1041             : 
    1042             : //
    1043             : // Callback to handle send ready notification for IPeerUpdate.  Processing it
    1044             : // in the context of bgp::SendeReadyTask ensures that there are no concurrency
    1045             : // issues w.r.t. the BgpSenderPartition working on the IPeerUpdate while we are
    1046             : // processing the notification.
    1047             : //
    1048          13 : bool BgpUpdateSender::SendReadyCallback(IPeerUpdate *peer) {
    1049          13 :     CHECK_CONCURRENCY("bgp::SendReadyTask");
    1050             : 
    1051         111 :     BOOST_FOREACH(BgpSenderPartition *partition, partitions_) {
    1052          49 :         partition->PeerSendReady(peer);
    1053             :     }
    1054          13 :     return true;
    1055             : }
    1056             : 
    1057             : //
    1058             : // Check invariants for the BgpUpdateSender.
    1059             : //
    1060          59 : bool BgpUpdateSender::CheckInvariants() const {
    1061         531 :     BOOST_FOREACH(BgpSenderPartition *partition, partitions_) {
    1062         236 :         if (!partition->CheckInvariants())
    1063           0 :             return false;
    1064             :     }
    1065          59 :     return true;
    1066             : }
    1067             : 
    1068             : //
    1069             : // Disable all BgpSenderPartitions.
    1070             : //
    1071          17 : void BgpUpdateSender::DisableProcessing() {
    1072          51 :     BOOST_FOREACH(BgpSenderPartition *partition, partitions_) {
    1073          17 :         partition->set_disabled(true);
    1074             :     }
    1075          17 : }
    1076             : 
    1077             : //
    1078             : // Enable all BgpSenderPartitions.
    1079             : //
    1080          17 : void BgpUpdateSender::EnableProcessing() {
    1081          51 :     BOOST_FOREACH(BgpSenderPartition *partition, partitions_) {
    1082          17 :         partition->set_disabled(false);
    1083             :     }
    1084          17 : }

Generated by: LCOV version 1.14