Line data Source code
1 : /* 2 : * Copyright (c) 2013 Juniper Networks, Inc. All rights reserved. 3 : */ 4 : 5 : #ifndef SRC_BGP_BGP_UPDATE_SENDER_H_ 6 : #define SRC_BGP_BGP_UPDATE_SENDER_H_ 7 : 8 : #include <boost/ptr_container/ptr_list.hpp> 9 : 10 : #include <vector> 11 : 12 : #include "base/bitset.h" 13 : #include "base/index_map.h" 14 : #include "base/queue_task.h" 15 : 16 : class BgpServer; 17 : class BgpUpdateSender; 18 : class IPeerUpdate; 19 : class RibOut; 20 : class RibPeerSet; 21 : 22 : // 23 : // This class maintains state to generate updates for a DB partition for all 24 : // RibOuts and IPeerUpdates in a BgpServer. All BgpSenderPartitions work in 25 : // parallel and maintain their own view of whether an IPeerUpdate is blocked, 26 : // in sync etc. 27 : // 28 : // A BgpSenderPartition maintains two indexed maps for internal bookkeeping. 29 : // 30 : // o PeerStateMap allocates an index per IPeerUpdate to allow direct access to 31 : // the corresponding PeerState using the index. The PeerState nested class 32 : // is described elsewhere. 33 : // o RibStateMap allocates a bit index per RibOut and allows direct access to 34 : // the corresponding RibState using the index. The RibState nested class is 35 : // described elsewhere. 36 : // 37 : // BgpSenderPartition contains a WorkQueue of WorkBase entries to represent 38 : // pending work. A WorkBase can either be a WorkRibOut, which corresponds to 39 : // to a tail dequeue for a (RibOut, QueueId) or a WorkPeer which corresponds 40 : // to a peer dequeue. 41 : // 42 : // A mutex is used to control access to the WorkQueue between producers that 43 : // need to enqueue WorkBase entries and the Worker which dequeues the entries 44 : // and processes them. The producers are the BgpExport class which creates a 45 : // WorkRibOut entry after adding a RouteUpdate to an empty UpdateQueue, and 46 : // IPeerUpdate class which creates a WorkPeer entry when it becomes unblocked. 47 : // 48 : class BgpSenderPartition { 49 : public: 50 : BgpSenderPartition(BgpUpdateSender *sender, int index); 51 : ~BgpSenderPartition(); 52 : 53 : void Add(RibOut *ribout, IPeerUpdate *peer); 54 : void Remove(RibOut *ribout, IPeerUpdate *peer); 55 : 56 : void RibOutActive(RibOut *ribout, int queue_id); 57 : 58 : void PeerSendReady(IPeerUpdate *peer); 59 : bool PeerIsSendReady(IPeerUpdate *peer) const; 60 : bool PeerIsRegistered(IPeerUpdate *peer) const; 61 : bool PeerInSync(IPeerUpdate *peer) const; 62 : 63 : bool CheckInvariants() const; 64 : 65 : int task_id() const; 66 220637 : int index() const { return index_; } 67 : 68 : // For unit testing. 69 : void set_disabled(bool disabled); 70 : 71 : private: 72 : friend class BgpUpdateSenderTest; 73 : friend class RibOutUpdatesTest; 74 : 75 : struct WorkBase { 76 : enum Type { 77 : WPeer, 78 : WRibOut 79 : }; 80 619858 : explicit WorkBase(Type type) 81 619858 : : type(type), valid(true) { 82 619858 : } 83 : Type type; 84 : bool valid; 85 : }; 86 : 87 : struct WorkRibOut : public WorkBase { 88 619409 : WorkRibOut(RibOut *ribout, int queue_id) 89 619409 : : WorkBase(WRibOut), ribout(ribout), queue_id(queue_id) { 90 619469 : } 91 : RibOut *ribout; 92 : int queue_id; 93 : }; 94 : 95 : struct WorkPeer : public WorkBase { 96 389 : explicit WorkPeer(IPeerUpdate *peer) 97 389 : : WorkBase(WPeer), peer(peer) { 98 389 : } 99 : IPeerUpdate *peer; 100 : }; 101 : 102 : class PeerState; 103 : struct PeerRibState; 104 : class RibState; 105 : class Worker; 106 : 107 : typedef boost::ptr_list<WorkBase> WorkQueue; 108 : typedef IndexMap<IPeerUpdate *, PeerState> PeerStateMap; 109 : typedef IndexMap<RibOut *, RibState> RibStateMap; 110 : 111 : void MaybeStartWorker(); 112 : std::unique_ptr<WorkBase> WorkDequeue(); 113 : void WorkEnqueue(WorkBase *wentry); 114 : void WorkPeerEnqueue(IPeerUpdate *peer); 115 : void WorkPeerInvalidate(IPeerUpdate *peer); 116 : void WorkRibOutEnqueue(RibOut *ribout, int queue_id); 117 : void WorkRibOutInvalidate(RibOut *ribout); 118 : 119 : void UpdateRibOut(RibOut *ribout, int queue_id); 120 : void UpdatePeer(IPeerUpdate *peer); 121 : 122 : bool UpdatePeerQueue(IPeerUpdate *peer, PeerState *ps, int queue_id); 123 : 124 : void BuildSyncBitSet(const RibOut *ribout, RibState *rs, RibPeerSet *msync); 125 : 126 : void SetQueueActive(const RibOut *ribout, RibState *rs, int queue_id, 127 : const RibPeerSet &munsync); 128 : void SetQueueActive(RibOut *ribout, int queue_id, IPeerUpdate *peer); 129 : bool IsQueueActive(RibOut *ribout, int queue_id, IPeerUpdate *peer); 130 : void SetSendBlocked(RibOut *ribout, int queue_id, 131 : const RibPeerSet &blocked); 132 : void SetSendBlocked(const RibOut *ribout, RibState *rs, int queue_id, 133 : const RibPeerSet &blocked); 134 : void SetQueueSync(PeerState *ps, int queue_id); 135 : 136 : BgpUpdateSender *sender_; 137 : int index_; 138 : bool running_; 139 : bool disabled_; 140 : WorkQueue work_queue_; 141 : Worker *worker_task_; 142 : PeerStateMap peer_state_imap_; 143 : RibStateMap rib_state_imap_; 144 : 145 : DISALLOW_COPY_AND_ASSIGN(BgpSenderPartition); 146 : }; 147 : 148 : // 149 : // This is a wrapper that hides the existence of multiple BgpSenderPartitions 150 : // from client classes. It relays APIs to appropriate/all BgpSenderPartition 151 : // instance(s). 152 : // 153 : // The send ready WorkQueue is needed to process send ready notifications for 154 : // IPeerUpdates in the context of bgp::SendReadyTask. This ensures that there 155 : // are no concurrency issues in case the IPeerUpdate gets unblocked while we 156 : // are still processing the previous WorkBase which caused it to get blocked 157 : // in the first place. 158 : // 159 : class BgpUpdateSender { 160 : public: 161 : explicit BgpUpdateSender(BgpServer *server); 162 : ~BgpUpdateSender(); 163 : 164 : void Join(RibOut *ribout, IPeerUpdate *peer); 165 : void Leave(RibOut *ribout, IPeerUpdate *peer); 166 : 167 : void RibOutActive(int index, RibOut *ribout, int queue_id); 168 : void PeerSendReady(IPeerUpdate *peer); 169 : bool PeerIsRegistered(IPeerUpdate *peer) const; 170 : bool PeerInSync(IPeerUpdate *peer) const; 171 : 172 220508 : int task_id() const { return task_id_; } 173 : bool CheckInvariants() const; 174 : 175 : // For unit testing. 176 : void DisableProcessing(); 177 : void EnableProcessing(); 178 : 179 : private: 180 : friend class BgpTestPeer; 181 : friend class BgpUpdateSenderTest; 182 : friend class RibOutUpdatesTest; 183 : 184 : bool SendReadyCallback(IPeerUpdate *peer); 185 99 : BgpSenderPartition *partition(int index) { return partitions_[index]; } 186 : 187 : BgpServer *server_; 188 : int task_id_; 189 : std::vector<BgpSenderPartition *> partitions_; 190 : WorkQueue<IPeerUpdate *> send_ready_queue_; 191 : 192 : DISALLOW_COPY_AND_ASSIGN(BgpUpdateSender); 193 : }; 194 : 195 : #endif // SRC_BGP_BGP_UPDATE_SENDER_H_