Line data Source code
1 : /*
2 : * Copyright (c) 2013 Juniper Networks, Inc. All rights reserved.
3 : */
4 :
5 : #include "bgp/bgp_update_sender.h"
6 :
7 : #include <boost/bind/bind.hpp>
8 : #include <boost/foreach.hpp>
9 :
10 : #include <map>
11 : #include <string>
12 : #include <atomic>
13 :
14 : #include "base/task_annotations.h"
15 : #include "bgp/ipeer.h"
16 : #include "bgp/bgp_ribout.h"
17 : #include "bgp/bgp_ribout_updates.h"
18 : #include "db/db.h"
19 :
20 : using std::unique_ptr;
21 : using std::make_pair;
22 : using std::map;
23 : using std::string;
24 : using std::vector;
25 : using namespace boost::placeholders;
26 :
27 : //
28 : // This struct represents RibOut specific state for a PeerState. There's one
29 : // instance of this for each RibOut that an IPeerUpdate has joined.
30 : //
31 : // The PeerRibState contains a bit mask to keep track of the QueueIds that are
32 : // currently active for the RibOut for the IPeerUpdate.
33 : //
34 : struct BgpSenderPartition::PeerRibState {
35 367505 : PeerRibState() : qactive(0) { }
36 : uint8_t qactive;
37 : };
38 :
39 : //
40 : // This nested class represents IPeerUpdate related state that's specific to
41 : // the BgpSenderPartition.
42 : //
43 : // A PeerState contains a Map of the index for a RibState to a PeerRibState.
44 : // Each entry in the map logically represents the state of the peer for the
45 : // ribout.
46 : //
47 : // The Map is used in conjunction with the RibStateMap in BgpSenderPartition
48 : // to implement regular and circular iterator classes that are used to walk
49 : // through all the RibState entries for a peer.
50 : //
51 : // A PeerState maintains the in_sync and send_ready state for the IPeerUpdate.
52 : //
53 : // The PeerState is considered to be send_ready when the underlying socket is
54 : // is writable. Note that the send_ready state in the PeerState may be out of
55 : // date with the actual socket state because the socket could have got blocked
56 : // when writing from another partition. Hence IPeerUpdate::send_ready() is the
57 : // more authoritative source.
58 : //
59 : // The PeerState is considered to be in_sync if it's send_ready and the marker
60 : // IPeerUpdate the peer has merged with the tail marker for all QueueIds in
61 : // all RiBOuts for which the IPeerUpdate is subscribed.
62 : //
63 : // The PeerState keeps count of the number of active RibOuts for each QueueId.
64 : // A (RibOut, QueueId) pair is considered to be active if the PeerState isn't
65 : // send_ready and there's RouteUpdates for the pair.
66 : //
67 : class BgpSenderPartition::PeerState {
68 : public:
69 : typedef map<size_t, PeerRibState> Map;
70 :
71 : class iterator : public boost::iterator_facade<
72 : iterator, RibOut, boost::forward_traversal_tag> {
73 : public:
74 33570 : explicit iterator(const RibStateMap &indexmap, Map *map, size_t index)
75 33570 : : indexmap_(indexmap), map_(map), index_(index) {
76 33570 : }
77 : size_t index() const { return index_; }
78 11426 : RibState *rib_state() { return indexmap_.At(index_); }
79 : const PeerRibState &peer_rib_state() const { return (*map_)[index_]; }
80 :
81 : private:
82 : friend class boost::iterator_core_access;
83 11426 : void increment() {
84 11426 : Map::const_iterator loc = map_->upper_bound(index_);
85 11426 : if (loc == map_->end()) {
86 11072 : index_ = -1;
87 : } else {
88 354 : index_ = loc->first;
89 : }
90 11426 : }
91 22498 : bool equal(const iterator &rhs) const {
92 22498 : return index_ == rhs.index_;
93 : }
94 : RibOut &dereference() const;
95 :
96 : const RibStateMap &indexmap_;
97 : Map *map_;
98 : size_t index_;
99 : };
100 :
101 : class circular_iterator : public boost::iterator_facade<
102 : circular_iterator, RibOut, boost::forward_traversal_tag> {
103 : public:
104 1196 : explicit circular_iterator(const RibStateMap &indexmap, Map *map,
105 : int start, bool is_valid)
106 1196 : : indexmap_(indexmap), map_(map), index_(-1), match_(true) {
107 1196 : if (map_->empty()) {
108 0 : return;
109 : }
110 1196 : Map::const_iterator loc = map_->lower_bound(start);
111 1196 : if (loc == map_->end()) {
112 1053 : loc = map_->begin();
113 : }
114 1196 : index_ = loc->first;
115 1196 : if (is_valid) match_ = false;
116 : }
117 37 : int index() const { return index_; }
118 446 : RibState *rib_state() { return indexmap_.At(index_); }
119 485 : const PeerRibState &peer_rib_state() const { return (*map_)[index_]; }
120 :
121 : private:
122 : friend class boost::iterator_core_access;
123 448 : void increment() {
124 448 : match_ = true;
125 448 : assert(!map_->empty());
126 448 : Map::const_iterator loc = map_->upper_bound(index_);
127 448 : if (loc == map_->end()) {
128 337 : loc = map_->begin();
129 : }
130 448 : index_ = loc->first;
131 448 : }
132 822 : bool equal(const circular_iterator &rhs) const {
133 822 : return ((match_ == rhs.match_) && (index_ == rhs.index_));
134 : }
135 : RibOut &dereference() const;
136 :
137 : const RibStateMap &indexmap_;
138 : Map *map_;
139 : int index_;
140 : bool match_;
141 : };
142 :
143 81305 : explicit PeerState(IPeerUpdate *peer)
144 81305 : : key_(peer), index_(-1),
145 81305 : qactive_cnt_(RibOutUpdates::QCOUNT),
146 81305 : in_sync_(true), rib_iterator_(BitSet::npos) {
147 81305 : send_ready_ = true;
148 243915 : for (int i = 0; i < RibOutUpdates::QCOUNT; i++) {
149 162610 : qactive_cnt_[i] = 0;
150 : }
151 81305 : }
152 :
153 : void Add(RibState *rs);
154 :
155 : void Remove(RibState *rs);
156 :
157 576 : bool IsMember(size_t index) const {
158 576 : return rib_bitset_.test(index);
159 : }
160 :
161 11072 : iterator begin(const RibStateMap &indexmap) {
162 11072 : Map::const_iterator it = rib_set_.begin();
163 11072 : size_t index = (it != rib_set_.end() ? it->first : -1);
164 11072 : return iterator(indexmap, &rib_set_, index);
165 : }
166 22498 : iterator end(const RibStateMap &indexmap) {
167 22498 : return iterator(indexmap, &rib_set_, -1);
168 : }
169 :
170 374 : circular_iterator circular_begin(const RibStateMap &indexmap) {
171 374 : return circular_iterator(indexmap, &rib_set_, rib_iterator_, true);
172 : }
173 822 : circular_iterator circular_end(const RibStateMap &indexmap) {
174 822 : return circular_iterator(indexmap, &rib_set_, rib_iterator_, false);
175 : }
176 :
177 37 : void SetIteratorStart(size_t start) { rib_iterator_ = start; }
178 :
179 23467 : void SetQueueActive(size_t rib_index, int queue_id) {
180 23467 : CHECK_CONCURRENCY("bgp::SendUpdate");
181 23476 : Map::iterator loc = rib_set_.find(rib_index);
182 23469 : assert(loc != rib_set_.end());
183 23468 : if (!BitIsSet(loc->second.qactive, queue_id)) {
184 21393 : SetBit(loc->second.qactive, queue_id);
185 21393 : qactive_cnt_[queue_id]++;
186 : }
187 23465 : }
188 :
189 735419 : void SetQueueInactive(size_t rib_index, int queue_id) {
190 735419 : CHECK_CONCURRENCY("bgp::SendUpdate", "bgp::PeerMembership");
191 735419 : Map::iterator loc = rib_set_.find(rib_index);
192 735419 : assert(loc != rib_set_.end());
193 735419 : if (BitIsSet(loc->second.qactive, queue_id)) {
194 21412 : ClearBit(loc->second.qactive, queue_id);
195 21412 : qactive_cnt_[queue_id]--;
196 : }
197 735419 : }
198 :
199 0 : bool IsQueueActive(size_t rib_index, int queue_id) {
200 0 : CHECK_CONCURRENCY("bgp::SendUpdate");
201 0 : Map::iterator loc = rib_set_.find(rib_index);
202 0 : assert(loc != rib_set_.end());
203 0 : return BitIsSet(loc->second.qactive, queue_id);
204 : }
205 :
206 10701 : int QueueCount(int queue_id) { return qactive_cnt_[queue_id]; }
207 :
208 3792504 : IPeerUpdate *peer() const { return key_; }
209 81305 : void set_index(size_t index) { index_ = index; }
210 816735 : size_t index() const { return index_; }
211 :
212 1942035 : bool in_sync() const { return in_sync_; }
213 4069 : void clear_sync() { in_sync_ = false; }
214 : void SetSync();
215 :
216 19228 : bool send_ready() const { return send_ready_; }
217 4458 : void set_send_ready(bool toggle) { send_ready_ = toggle; }
218 :
219 367505 : bool empty() const { return rib_set_.empty(); }
220 :
221 420 : bool CheckInvariants() const {
222 1260 : for (int i = 0; i < RibOutUpdates::QCOUNT; i++) {
223 840 : CHECK_INVARIANT(qactive_cnt_[i] <= (int) rib_set_.size());
224 : }
225 420 : return true;
226 : }
227 :
228 : private:
229 : IPeerUpdate *key_;
230 : size_t index_; // assigned from PeerStateMap
231 : Map rib_set_; // list of RibOuts advertised by the peer.
232 : BitSet rib_bitset_; // bitset of RibOuts advertised by the peer
233 : vector<int> qactive_cnt_;
234 : bool in_sync_; // whether the peer may dequeue tail markers.
235 : std::atomic<bool> send_ready_; // whether the peer may send updates.
236 : size_t rib_iterator_; // index of last processed rib.
237 :
238 : DISALLOW_COPY_AND_ASSIGN(PeerState);
239 : };
240 :
241 : //
242 : // This nested class represents RibOut related state that's specific to the
243 : // BgpSenderPartition.
244 : //
245 : // A RibState contains a BitSet of all the peers that are advertising the
246 : // RibOut associated with the RibState.
247 : //
248 : // The BitSet is used in conjunction with PeerStateMap in BgpSenderPartition
249 : // to implement an iterator that is used to walk through all the PeerState
250 : // entries for the ribout.
251 : //
252 : class BgpSenderPartition::RibState {
253 : public:
254 : class iterator : public boost::iterator_facade<
255 : iterator, PeerState, boost::forward_traversal_tag> {
256 : public:
257 3166364 : explicit iterator(const PeerStateMap &indexmap,
258 : const BitSet &set, size_t index)
259 3166364 : : indexmap_(indexmap), set_(set), index_(index) {
260 3166364 : }
261 : size_t index() const { return index_; }
262 :
263 : private:
264 : friend class boost::iterator_core_access;
265 1935443 : void increment() {
266 1935443 : index_ = set_.find_next(index_);
267 1935086 : }
268 2550808 : bool equal(const iterator &rhs) const {
269 2550808 : return index_ == rhs.index_;
270 : }
271 1935588 : PeerState &dereference() const {
272 1935588 : return *indexmap_.At(index_);
273 : }
274 : const PeerStateMap &indexmap_;
275 : const BitSet &set_;
276 : size_t index_;
277 : };
278 :
279 135388 : explicit RibState(RibOut *ribout)
280 135388 : : key_(ribout), index_(-1), in_sync_(RibOutUpdates::QCOUNT, true) {
281 135388 : }
282 :
283 : void Add(PeerState *ps);
284 : void Remove(PeerState *ps);
285 :
286 : bool QueueSync(int queue_id);
287 : void SetQueueSync(int queue_id);
288 : void SetQueueUnsync(int queue_id);
289 :
290 518 : RibOut *ribout() { return key_; }
291 :
292 615924 : iterator begin(const PeerStateMap &indexmap) {
293 615924 : return iterator(indexmap, peer_set_, peer_set_.find_first());
294 : }
295 :
296 2550840 : iterator end(const PeerStateMap &indexmap) {
297 2550840 : return iterator(indexmap, peer_set_, BitSet::npos);
298 : }
299 :
300 135388 : void set_index(size_t index) { index_ = index; }
301 2364438 : size_t index() const { return index_; }
302 :
303 367505 : bool empty() const { return peer_set_.none(); }
304 :
305 576 : const BitSet &peer_set() const { return peer_set_; }
306 :
307 : private:
308 : RibOut *key_;
309 : size_t index_;
310 : BitSet peer_set_;
311 : vector<bool> in_sync_;
312 :
313 : DISALLOW_COPY_AND_ASSIGN(RibState);
314 : };
315 :
316 :
317 367505 : void BgpSenderPartition::RibState::Add(BgpSenderPartition::PeerState *ps) {
318 367505 : CHECK_CONCURRENCY("bgp::PeerMembership");
319 367505 : peer_set_.set(ps->index());
320 367505 : }
321 :
322 367505 : void BgpSenderPartition::RibState::Remove(BgpSenderPartition::PeerState *ps) {
323 367505 : CHECK_CONCURRENCY("bgp::PeerMembership");
324 367505 : peer_set_.reset(ps->index());
325 367505 : }
326 :
327 10850 : bool BgpSenderPartition::RibState::QueueSync(int queue_id) {
328 10850 : CHECK_CONCURRENCY("bgp::SendUpdate");
329 10850 : return (in_sync_[queue_id]);
330 : }
331 :
332 72 : void BgpSenderPartition::RibState::SetQueueSync(int queue_id) {
333 72 : CHECK_CONCURRENCY("bgp::SendUpdate");
334 72 : in_sync_[queue_id] = true;
335 72 : }
336 :
337 6015 : void BgpSenderPartition::RibState::SetQueueUnsync(int queue_id) {
338 6015 : CHECK_CONCURRENCY("bgp::SendUpdate");
339 6013 : in_sync_[queue_id] = false;
340 6015 : }
341 :
342 367505 : void BgpSenderPartition::PeerState::Add(RibState *rs) {
343 367505 : CHECK_CONCURRENCY("bgp::PeerMembership");
344 367505 : PeerRibState init;
345 367505 : rib_set_.insert(make_pair(rs->index(), init));
346 367505 : rib_bitset_.set(rs->index());
347 367505 : }
348 :
349 367505 : void BgpSenderPartition::PeerState::Remove(RibState *rs) {
350 367505 : CHECK_CONCURRENCY("bgp::PeerMembership");
351 1102515 : for (int queue_id = 0; queue_id < RibOutUpdates::QCOUNT; queue_id++) {
352 735010 : SetQueueInactive(rs->index(), queue_id);
353 : }
354 367505 : rib_set_.erase(rs->index());
355 367505 : rib_bitset_.reset(rs->index());
356 367505 : }
357 :
358 5326 : void BgpSenderPartition::PeerState::SetSync() {
359 5326 : CHECK_CONCURRENCY("bgp::SendUpdate");
360 10751 : for (Map::iterator it = rib_set_.begin(); it != rib_set_.end(); ++it) {
361 5425 : assert(it->second.qactive == 0);
362 : }
363 15978 : for (int i = 0; i < RibOutUpdates::QCOUNT; i++) {
364 10652 : assert(qactive_cnt_[i] == 0);
365 : }
366 5326 : in_sync_ = true;
367 5326 : }
368 :
369 72 : RibOut &BgpSenderPartition::PeerState::iterator::dereference() const {
370 72 : return *indexmap_.At(index_)->ribout();
371 : }
372 :
373 446 : RibOut &BgpSenderPartition::PeerState::circular_iterator::dereference() const {
374 446 : return *indexmap_.At(index_)->ribout();
375 : }
376 :
377 : class BgpSenderPartition::Worker : public Task {
378 : public:
379 218950 : explicit Worker(BgpSenderPartition *partition)
380 218950 : : Task(partition->task_id(), partition->index()),
381 218950 : partition_(partition) {
382 218952 : }
383 :
384 219033 : virtual bool Run() {
385 219033 : CHECK_CONCURRENCY("bgp::SendUpdate");
386 :
387 : while (true) {
388 834830 : unique_ptr<WorkBase> wentry = partition_->WorkDequeue();
389 834673 : if (!wentry.get())
390 218995 : break;
391 615708 : if (!wentry->valid)
392 1 : continue;
393 615734 : switch (wentry->type) {
394 615341 : case WorkBase::WRibOut: {
395 615341 : WorkRibOut *workrib = static_cast<WorkRibOut *>(wentry.get());
396 615328 : partition_->UpdateRibOut(workrib->ribout, workrib->queue_id);
397 615433 : break;
398 : }
399 366 : case WorkBase::WPeer: {
400 366 : WorkPeer *workpeer = static_cast<WorkPeer *>(wentry.get());
401 389 : partition_->UpdatePeer(workpeer->peer);
402 389 : break;
403 : }
404 : }
405 1450651 : }
406 :
407 218993 : return true;
408 : }
409 0 : string Description() const { return "BgpSenderPartition::Worker"; }
410 :
411 : private:
412 : BgpSenderPartition *partition_;
413 : };
414 :
415 38479 : BgpSenderPartition::BgpSenderPartition(BgpUpdateSender *sender, int index)
416 38479 : : sender_(sender),
417 38479 : index_(index),
418 38479 : running_(false),
419 38479 : disabled_(false),
420 38479 : worker_task_(NULL) {
421 38479 : }
422 :
423 38479 : BgpSenderPartition::~BgpSenderPartition() {
424 38479 : if (worker_task_) {
425 0 : TaskScheduler *scheduler = TaskScheduler::GetInstance();
426 0 : scheduler->Cancel(worker_task_);
427 : }
428 38479 : assert(peer_state_imap_.empty());
429 38479 : assert(rib_state_imap_.empty());
430 38479 : }
431 :
432 218947 : int BgpSenderPartition::task_id() const {
433 218947 : return sender_->task_id();
434 : }
435 :
436 : //
437 : // Add the (RibOut, IPeerUpdate) combo to the BgpSenderPartition.
438 : // Find or create the corresponding RibState and PeerState and sets up the
439 : // cross-linkage.
440 : //
441 367505 : void BgpSenderPartition::Add(RibOut *ribout, IPeerUpdate *peer) {
442 367505 : CHECK_CONCURRENCY("bgp::PeerMembership");
443 :
444 367505 : RibState *rs = rib_state_imap_.Locate(ribout);
445 367505 : PeerState *ps = peer_state_imap_.Locate(peer);
446 367505 : rs->Add(ps);
447 367505 : ps->Add(rs);
448 367505 : }
449 :
450 : //
451 : // Remove the (RibOut, IPeerUpdate) combo from the BgpSenderPartition.
452 : // Decouple cross linkage between the corresponding RibState and PeerState and
453 : // get rid of the RibState and/or PeerState if they are no longer needed.
454 : //
455 367505 : void BgpSenderPartition::Remove(RibOut *ribout, IPeerUpdate *peer) {
456 367505 : CHECK_CONCURRENCY("bgp::PeerMembership");
457 :
458 367505 : RibState *rs = rib_state_imap_.Find(ribout);
459 367505 : PeerState *ps = peer_state_imap_.Find(peer);
460 367505 : assert(rs != NULL);
461 367505 : assert(ps != NULL);
462 367505 : rs->Remove(ps);
463 367505 : ps->Remove(rs);
464 367505 : if (rs->empty()) {
465 135388 : WorkRibOutInvalidate(ribout);
466 135388 : rib_state_imap_.Remove(ribout, rs->index());
467 : }
468 367505 : if (ps->empty()) {
469 81305 : WorkPeerInvalidate(peer);
470 81305 : peer_state_imap_.Remove(peer, ps->index());
471 : }
472 367505 : }
473 :
474 : //
475 : // Create and enqueue new WorkRibOut entry since the RibOut is now
476 : // active.
477 : //
478 615254 : void BgpSenderPartition::RibOutActive(RibOut *ribout, int queue_id) {
479 615254 : CHECK_CONCURRENCY("db::DBTable", "bgp::SendUpdate", "bgp::PeerMembership");
480 :
481 615276 : WorkRibOutEnqueue(ribout, queue_id);
482 615400 : }
483 :
484 : //
485 : // Mark an IPeerUpdate to be send ready.
486 : //
487 5146 : void BgpSenderPartition::PeerSendReady(IPeerUpdate *peer) {
488 5146 : CHECK_CONCURRENCY("bgp::SendReadyTask");
489 :
490 : // The IPeerUpdate may not be registered if it has not reached Established
491 : // state or it may already have been unregistered by the time we get around
492 : // to processing the notification.
493 5146 : PeerState *ps = peer_state_imap_.Find(peer);
494 5146 : if (!ps)
495 0 : return;
496 :
497 : // Nothing to do if the IPeerUpdate's already in that state.
498 5146 : if (ps->send_ready())
499 4757 : return;
500 :
501 : // Create and enqueue new WorkPeer entry.
502 389 : ps->set_send_ready(true);
503 389 : WorkPeerEnqueue(peer);
504 : }
505 :
506 : //
507 : // Return true if the IPeer is send ready.
508 : //
509 2905 : bool BgpSenderPartition::PeerIsSendReady(IPeerUpdate *peer) const {
510 2905 : CHECK_CONCURRENCY("bgp::PeerMembership", "bgp::ShowCommand");
511 :
512 2905 : PeerState *ps = peer_state_imap_.Find(peer);
513 2905 : return ps->send_ready();
514 : }
515 :
516 : //
517 : // Return true if the IPeer is registered.
518 : //
519 9329 : bool BgpSenderPartition::PeerIsRegistered(IPeerUpdate *peer) const {
520 9329 : CHECK_CONCURRENCY("bgp::PeerMembership", "bgp::ShowCommand");
521 :
522 9329 : return (peer_state_imap_.Find(peer) != NULL);
523 : }
524 :
525 : //
526 : // Return true if the IPeer is in sync.
527 : //
528 7122 : bool BgpSenderPartition::PeerInSync(IPeerUpdate *peer) const {
529 7122 : CHECK_CONCURRENCY("bgp::PeerMembership", "bgp::ShowCommand");
530 :
531 7122 : PeerState *ps = peer_state_imap_.Find(peer);
532 7122 : return (ps ? ps->in_sync() : false);
533 : }
534 :
535 : //
536 : // Create a Worker if warranted and enqueue it to the TaskScheduler.
537 : // Assumes that the caller holds the BgpSenderPartition mutex.
538 : //
539 615591 : void BgpSenderPartition::MaybeStartWorker() {
540 615591 : if (!running_ && !disabled_) {
541 218946 : worker_task_ = new Worker(this);
542 218951 : TaskScheduler *scheduler = TaskScheduler::GetInstance();
543 218948 : scheduler->Enqueue(worker_task_);
544 219038 : running_ = true;
545 : }
546 615683 : }
547 :
548 : //
549 : // Dequeue the first WorkBase item from the work queue and return an
550 : // unique_ptr to it. Clear out Worker related state if the work queue
551 : // is empty.
552 : //
553 834842 : unique_ptr<BgpSenderPartition::WorkBase> BgpSenderPartition::WorkDequeue() {
554 834842 : CHECK_CONCURRENCY("bgp::SendUpdate");
555 :
556 834702 : unique_ptr<WorkBase> wentry;
557 834702 : if (work_queue_.empty()) {
558 218991 : worker_task_ = NULL;
559 218991 : running_ = false;
560 : } else {
561 615707 : wentry.reset(work_queue_.pop_front().release());
562 : }
563 834713 : return wentry;
564 0 : }
565 :
566 : //
567 : // Enqueue a WorkBase entry into the the work queue and start a new Worker
568 : // task if required.
569 : //
570 615678 : void BgpSenderPartition::WorkEnqueue(WorkBase *wentry) {
571 615678 : CHECK_CONCURRENCY("db::DBTable", "bgp::SendUpdate", "bgp::SendReadyTask",
572 : "bgp::PeerMembership");
573 :
574 615717 : work_queue_.push_back(wentry);
575 615562 : MaybeStartWorker();
576 615790 : }
577 :
578 : //
579 : // Disable or enable the worker.
580 : // For unit testing.
581 : //
582 34 : void BgpSenderPartition::set_disabled(bool disabled) {
583 34 : disabled_ = disabled;
584 34 : MaybeStartWorker();
585 34 : }
586 :
587 : //
588 : // Enqueue a WorkPeer to the work queue.
589 : //
590 389 : void BgpSenderPartition::WorkPeerEnqueue(IPeerUpdate *peer) {
591 389 : CHECK_CONCURRENCY("bgp::SendReadyTask");
592 :
593 389 : WorkBase *wentry = new WorkPeer(peer);
594 389 : WorkEnqueue(wentry);
595 389 : }
596 :
597 : //
598 : // Invalidate all WorkBases for the given IPeerUpdate.
599 : // Used when a IPeerUpdate is removed.
600 : //
601 81305 : void BgpSenderPartition::WorkPeerInvalidate(IPeerUpdate *peer) {
602 81305 : CHECK_CONCURRENCY("bgp::PeerMembership");
603 :
604 81305 : for (WorkQueue::iterator it = work_queue_.begin();
605 81315 : it != work_queue_.end(); ++it) {
606 10 : WorkBase *wentry = it.operator->();
607 10 : if (wentry->type != WorkBase::WPeer)
608 10 : continue;
609 0 : WorkPeer *wpeer = static_cast<WorkPeer *>(wentry);
610 0 : if (wpeer->peer != peer)
611 0 : continue;
612 0 : wpeer->valid = false;
613 : }
614 81305 : }
615 :
616 : //
617 : // Enqueue a WorkRibOut to the work queue.
618 : //
619 615258 : void BgpSenderPartition::WorkRibOutEnqueue(RibOut *ribout, int queue_id) {
620 615258 : CHECK_CONCURRENCY("db::DBTable", "bgp::SendUpdate", "bgp::PeerMembership");
621 :
622 615272 : WorkBase *wentry = new WorkRibOut(ribout, queue_id);
623 615302 : WorkEnqueue(wentry);
624 615399 : }
625 :
626 : //
627 : // Invalidate all WorkBases for the given RibOut.
628 : // Used when a RibOut is removed.
629 : //
630 135388 : void BgpSenderPartition::WorkRibOutInvalidate(RibOut *ribout) {
631 135388 : CHECK_CONCURRENCY("bgp::PeerMembership");
632 :
633 135388 : for (WorkQueue::iterator it = work_queue_.begin();
634 135718 : it != work_queue_.end(); ++it) {
635 330 : WorkBase *wentry = it.operator->();
636 330 : if (wentry->type != WorkBase::WRibOut)
637 0 : continue;
638 330 : WorkRibOut *wribout = static_cast<WorkRibOut *>(wentry);
639 330 : if (wribout->ribout != ribout)
640 329 : continue;
641 1 : wribout->valid = false;
642 : }
643 135388 : }
644 :
645 : //
646 : // Build the RibPeerSet of IPeers for the RibOut that are in sync. Note that
647 : // we need to use bit indices that are specific to the RibOut, not the ones
648 : // from the BgpSenderPartition.
649 : //
650 615673 : void BgpSenderPartition::BuildSyncBitSet(const RibOut *ribout, RibState *rs,
651 : RibPeerSet *msync) {
652 615673 : CHECK_CONCURRENCY("bgp::SendUpdate");
653 :
654 615790 : for (RibState::iterator it = rs->begin(peer_state_imap_);
655 2550695 : it != rs->end(peer_state_imap_); ++it) {
656 1935030 : PeerState *ps = it.operator->();
657 :
658 : // If the PeerState is in sync but the IPeerUpdate is not send ready
659 : // then update the sync and send ready state in the PeerState. Note
660 : // that the RibOut queue for the PeerState will get marked active via
661 : // the call the SetQueueActive from UpdateRibOut.
662 1934942 : if (ps->in_sync()) {
663 1896555 : if (ps->peer()->send_ready()) {
664 1896475 : int rix = ribout->GetPeerIndex(ps->peer());
665 1896138 : msync->set(rix);
666 : } else {
667 51 : ps->clear_sync();
668 51 : ps->set_send_ready(false);
669 : }
670 : }
671 : }
672 615653 : }
673 :
674 : //
675 : // Take the RibPeerSet of blocked IPeers and update the relevant PeerStates.
676 : // Note that bit indices in the RibPeerSet and are specific to the RibOut.
677 : //
678 616275 : void BgpSenderPartition::SetSendBlocked(const RibOut *ribout, RibState *rs,
679 : int queue_id, const RibPeerSet &blocked) {
680 616275 : CHECK_CONCURRENCY("bgp::SendUpdate");
681 :
682 620234 : for (size_t bit = blocked.find_first(); bit != RibPeerSet::npos;
683 4021 : bit = blocked.find_next(bit)) {
684 4020 : IPeerUpdate *peer = ribout->GetPeer(bit);
685 4020 : PeerState *ps = peer_state_imap_.Find(peer);
686 4019 : ps->SetQueueActive(rs->index(), queue_id);
687 4018 : ps->clear_sync();
688 4018 : ps->set_send_ready(false);
689 : }
690 616217 : }
691 :
692 : //
693 : // For unit testing only.
694 : // Take the RibPeerSet of blocked IPeers and update the relevant PeerStates.
695 : //
696 8 : void BgpSenderPartition::SetSendBlocked(RibOut *ribout,
697 : int queue_id, const RibPeerSet &blocked) {
698 8 : CHECK_CONCURRENCY("bgp::SendUpdate");
699 :
700 8 : RibState *rs = rib_state_imap_.Find(ribout);
701 8 : assert(rs);
702 8 : SetSendBlocked(ribout, rs, queue_id, blocked);
703 8 : }
704 :
705 : //
706 : // Concurrency: called from bgp send task.
707 : //
708 : // Take the RibPeerSet of unsync IPeers and update the relevant PeerStates.
709 : // Note that bit indices in the RibPeerSet and are specific to the RibOut.
710 : //
711 615757 : void BgpSenderPartition::SetQueueActive(const RibOut *ribout, RibState *rs,
712 : int queue_id, const RibPeerSet &munsync) {
713 615757 : CHECK_CONCURRENCY("bgp::SendUpdate");
714 :
715 635175 : for (size_t bit = munsync.find_first(); bit != RibPeerSet::npos;
716 19454 : bit = munsync.find_next(bit)) {
717 19455 : IPeerUpdate *peer = ribout->GetPeer(bit);
718 19455 : PeerState *ps = peer_state_imap_.Find(peer);
719 19453 : ps->SetQueueActive(rs->index(), queue_id);
720 : }
721 615719 : }
722 :
723 : //
724 : // Concurrency: called from bgp send task.
725 : //
726 : // Mark the PeerRibState corresponding to the given IPeerUpdate and RibOut
727 : // as active.
728 : //
729 : // Used by unit test code.
730 : //
731 0 : void BgpSenderPartition::SetQueueActive(RibOut *ribout, int queue_id,
732 : IPeerUpdate *peer) {
733 0 : CHECK_CONCURRENCY("bgp::SendUpdate");
734 :
735 0 : PeerState *ps = peer_state_imap_.Find(peer);
736 0 : RibState *rs = rib_state_imap_.Find(ribout);
737 0 : ps->SetQueueActive(rs->index(), queue_id);
738 0 : }
739 :
740 : //
741 : // Concurrency: called from bgp send task.
742 : //
743 : // Check if the queue corresponding to IPeerUpdate, Ribout and queue id is
744 : // active.
745 : //
746 : // Used by unit test code.
747 : //
748 0 : bool BgpSenderPartition::IsQueueActive(RibOut *ribout, int queue_id,
749 : IPeerUpdate *peer) {
750 0 : CHECK_CONCURRENCY("bgp::SendUpdate");
751 :
752 0 : PeerState *ps = peer_state_imap_.Find(peer);
753 0 : RibState *rs = rib_state_imap_.Find(ribout);
754 0 : return ps->IsQueueActive(rs->index(), queue_id);
755 : }
756 :
757 : //
758 : // Concurrency: called from bgp send task.
759 : //
760 : // Mark all the RibStates for the given peer and queue id as being in sync
761 : // and trigger a tail dequeue.
762 : //
763 10652 : void BgpSenderPartition::SetQueueSync(PeerState *ps, int queue_id) {
764 10652 : CHECK_CONCURRENCY("bgp::SendUpdate");
765 :
766 10652 : for (PeerState::iterator it = ps->begin(rib_state_imap_);
767 21502 : it != ps->end(rib_state_imap_); ++it) {
768 10850 : RibState *rs = it.rib_state();
769 10850 : if (!rs->QueueSync(queue_id)) {
770 72 : RibOut *ribout = it.operator->();
771 72 : RibOutActive(ribout, queue_id);
772 72 : rs->SetQueueSync(queue_id);
773 : }
774 : }
775 10652 : }
776 :
777 : //
778 : // Drain the queue until there are no more updates or all the members become
779 : // blocked.
780 : //
781 615714 : void BgpSenderPartition::UpdateRibOut(RibOut *ribout, int queue_id) {
782 615714 : CHECK_CONCURRENCY("bgp::SendUpdate");
783 :
784 615763 : RibOutUpdates *updates = ribout->updates(index_);
785 615778 : RibState *rs = rib_state_imap_.Find(ribout);
786 615672 : RibPeerSet msync;
787 :
788 : // Convert group in-sync list to rib specific bitset.
789 615679 : BuildSyncBitSet(ribout, rs, &msync);
790 :
791 : // Drain the queue till we can do no more.
792 615655 : RibPeerSet blocked, munsync;
793 615655 : bool done = updates->TailDequeue(queue_id, msync, &blocked, &munsync);
794 615737 : assert(msync.Contains(blocked));
795 :
796 : // Mark peers as send blocked.
797 615847 : SetSendBlocked(ribout, rs, queue_id, blocked);
798 :
799 : // Set the queue to be active for any unsync peers. If we don't do this,
800 : // we will forget to mark the (RibOut,QueueId) as active for these peers
801 : // since the blocked RibPeerSet does not contain peers that are already
802 : // out of sync. Note that the unsync peers would have been split from
803 : // the tail marker in TailDequeue.
804 615765 : SetQueueActive(ribout, rs, queue_id, munsync);
805 :
806 : // If all peers are blocked, mark the queue as unsync in the RibState. We
807 : // will trigger tail dequeue for the (RibOut,QueueId) when any peer that
808 : // is interested in the RibOut becomes in sync.
809 615722 : if (!done)
810 6015 : rs->SetQueueUnsync(queue_id);
811 615722 : }
812 :
813 : //
814 : // Go through all RibOuts for the IPeerUpdate and drain the given queue till it
815 : // is up-to date or it becomes blocked. If it's blocked, select the next RibOut
816 : // to be processed when the IPeerUpdate becomes send ready.
817 : //
818 : // Return false if the IPeerUpdate got blocked.
819 : //
820 374 : bool BgpSenderPartition::UpdatePeerQueue(IPeerUpdate *peer, PeerState *ps,
821 : int queue_id) {
822 374 : CHECK_CONCURRENCY("bgp::SendUpdate");
823 :
824 374 : for (PeerState::circular_iterator it = ps->circular_begin(rib_state_imap_);
825 822 : it != ps->circular_end(rib_state_imap_); ++it) {
826 : // Skip if this queue is not active in the PeerRibState.
827 485 : if (!BitIsSet(it.peer_rib_state().qactive, queue_id))
828 39 : continue;
829 :
830 : // Drain the queue till we can do no more.
831 446 : RibOut *ribout = it.operator->();
832 446 : RibOutUpdates *updates = ribout->updates(index_);
833 446 : RibPeerSet blocked;
834 446 : bool done = updates->PeerDequeue(queue_id, peer, &blocked);
835 :
836 : // Process blocked mask.
837 446 : RibState *rs = it.rib_state();
838 446 : SetSendBlocked(ribout, rs, queue_id, blocked);
839 :
840 : // If the peer is still send_ready, mark the queue as inactive for
841 : // the peer. Need to check send_ready because the return value of
842 : // PeerDequeue only tells that *some* peer was merged with the tail
843 : // marker.
844 : // If the peer got blocked, remember where to start next time and
845 : // stop processing. We don't want to continue processing for other
846 : // merged peers if the lead peer is blocked. Processing for other
847 : // peers will continue when their own WorkPeer items are processed.
848 446 : if (ps->send_ready()) {
849 409 : assert(done);
850 409 : ps->SetQueueInactive(rs->index(), queue_id);
851 : } else {
852 37 : ps->SetIteratorStart(it.index());
853 37 : return false;
854 : }
855 446 : }
856 :
857 337 : return true;
858 : }
859 :
860 : //
861 : // Drain the queue of all updates for this IPeerUpdate, until it is up-to date
862 : // or it becomes blocked.
863 : //
864 5368 : void BgpSenderPartition::UpdatePeer(IPeerUpdate *peer) {
865 5368 : CHECK_CONCURRENCY("bgp::SendUpdate");
866 :
867 : // Bail if the PeerState is not send ready.
868 5368 : PeerState *ps = peer_state_imap_.Find(peer);
869 5368 : if (!ps->send_ready()) {
870 5 : return;
871 : }
872 :
873 : // Update the PeerState and bail if the IPeerUpdate is not send ready.
874 : // This happens if the IPeerUpdate gets blocked while processing some
875 : // other partition.
876 5363 : if (!peer->send_ready()) {
877 0 : ps->set_send_ready(false);
878 0 : return;
879 : }
880 :
881 : // Go through all queues and drain them if there's anything on them.
882 16027 : for (int queue_id = RibOutUpdates::QCOUNT - 1; queue_id >= 0; --queue_id) {
883 10701 : if (ps->QueueCount(queue_id) == 0) {
884 10327 : continue;
885 : }
886 374 : if (!UpdatePeerQueue(peer, ps, queue_id)) {
887 37 : assert(!ps->send_ready());
888 37 : return;
889 : }
890 : }
891 :
892 : // Checking the return value of UpdatePeerQueue above is not sufficient as
893 : // that only tells us that *some* peer(s) got merged with the tail marker.
894 : // Need to make sure that the IPeerUpdate that we are processing is still
895 : // send ready.
896 5326 : if (!ps->send_ready()) {
897 0 : return;
898 : }
899 :
900 : // Mark the peer as being in sync across all tables.
901 5326 : ps->SetSync();
902 :
903 : // Mark all RibStates for the peer as being in sync. This triggers a tail
904 : // dequeue for the corresponding (RibOut, QueueId) if necessary. This in
905 : // turn ensures that we do not get stuck in the case where all peers get
906 : // blocked and then get back in sync.
907 15978 : for (int queue_id = RibOutUpdates::QCOUNT - 1; queue_id >= 0; --queue_id) {
908 10652 : SetQueueSync(ps, queue_id);
909 : }
910 : }
911 :
912 : //
913 : // Check invariants for the BgpSenderPartition.
914 : //
915 249 : bool BgpSenderPartition::CheckInvariants() const {
916 249 : int grp_peer_count = 0;
917 249 : int peer_grp_count = 0;
918 393 : for (size_t i = 0; i < rib_state_imap_.size(); i++) {
919 144 : if (!rib_state_imap_.bits().test(i)) {
920 0 : continue;
921 : }
922 144 : RibState *rs = rib_state_imap_.At(i);
923 144 : assert(rs != NULL);
924 144 : CHECK_INVARIANT(rs->index() == i);
925 144 : for (RibState::iterator it = rs->begin(peer_state_imap_);
926 720 : it != rs->end(peer_state_imap_); ++it) {
927 576 : PeerState *ps = it.operator->();
928 576 : CHECK_INVARIANT(ps->IsMember(i));
929 576 : grp_peer_count++;
930 : }
931 : }
932 669 : for (size_t i = 0; i < peer_state_imap_.size(); i++) {
933 420 : if (!peer_state_imap_.bits().test(i)) {
934 0 : continue;
935 : }
936 420 : PeerState *ps = peer_state_imap_.At(i);
937 420 : assert(ps != NULL);
938 420 : CHECK_INVARIANT(ps->index() == i);
939 420 : if (!ps->CheckInvariants()) {
940 0 : return false;
941 : }
942 420 : for (PeerState::iterator it = ps->begin(rib_state_imap_);
943 996 : it != ps->end(rib_state_imap_); ++it) {
944 576 : RibState *rs = it.rib_state();
945 576 : CHECK_INVARIANT(rs->peer_set().test(i));
946 576 : peer_grp_count++;
947 : }
948 : }
949 :
950 249 : CHECK_INVARIANT(grp_peer_count == peer_grp_count);
951 249 : return true;
952 : }
953 :
954 : //
955 : // Constructor for BgpUpdateSender.
956 : // Initialize send ready WorkQueue and allocate BgpSenderPartitions.
957 : //
958 9751 : BgpUpdateSender::BgpUpdateSender(BgpServer *server)
959 9751 : : server_(server),
960 9751 : task_id_(TaskScheduler::GetInstance()->GetTaskId("bgp::SendUpdate")),
961 9751 : send_ready_queue_(
962 : TaskScheduler::GetInstance()->GetTaskId("bgp::SendReadyTask"), 0,
963 9751 : boost::bind(&BgpUpdateSender::SendReadyCallback, this, _1)) {
964 48230 : for (int idx = 0; idx < DB::PartitionCount(); ++idx) {
965 38479 : partitions_.push_back(new BgpSenderPartition(this, idx));
966 : }
967 9751 : }
968 :
969 : //
970 : // Destructor for BgpUpdateSender.
971 : // Shutdown the WorkQueue and delete all BgpSenderPartitions.
972 : //
973 9751 : BgpUpdateSender::~BgpUpdateSender() {
974 9751 : send_ready_queue_.Shutdown(false);
975 9751 : STLDeleteValues(&partitions_);
976 9751 : }
977 :
978 : //
979 : // Handle the join of an IPeerUpdate to a RibOut.
980 : //
981 93056 : void BgpUpdateSender::Join(RibOut *ribout, IPeerUpdate *peer) {
982 93056 : CHECK_CONCURRENCY("bgp::PeerMembership");
983 :
984 828066 : BOOST_FOREACH(BgpSenderPartition *partition, partitions_) {
985 367505 : partition->Add(ribout, peer);
986 : }
987 93056 : }
988 :
989 : //
990 : // Handle the leave of an IPeerUpdate from a RibOut.
991 : //
992 93056 : void BgpUpdateSender::Leave(RibOut *ribout, IPeerUpdate *peer) {
993 93056 : CHECK_CONCURRENCY("bgp::PeerMembership");
994 :
995 828066 : BOOST_FOREACH(BgpSenderPartition *partition, partitions_) {
996 367505 : partition->Remove(ribout, peer);
997 : }
998 93056 : }
999 :
1000 : //
1001 : // Inform the specified BgpSenderPartition that it needs to schedule a tail
1002 : // dequeue for the given RibOut queue.
1003 : //
1004 615191 : void BgpUpdateSender::RibOutActive(int index, RibOut *ribout, int queue_id) {
1005 615191 : CHECK_CONCURRENCY("db::DBTable", "bgp::PeerMembership");
1006 :
1007 615185 : partitions_[index]->RibOutActive(ribout, queue_id);
1008 615325 : }
1009 :
1010 : //
1011 : // Concurrency: called from arbitrary task.
1012 : //
1013 : // Enqueue the IPeerUpdate to the send ready processing work queue.
1014 : // The callback is invoked in the context of bgp::SendReadyTask.
1015 : //
1016 13 : void BgpUpdateSender::PeerSendReady(IPeerUpdate *peer) {
1017 13 : send_ready_queue_.Enqueue(peer);
1018 13 : }
1019 :
1020 : //
1021 : // Return true if the IPeer is registered.
1022 : //
1023 543 : bool BgpUpdateSender::PeerIsRegistered(IPeerUpdate *peer) const {
1024 655 : BOOST_FOREACH(BgpSenderPartition *partition, partitions_) {
1025 585 : if (partition->PeerIsRegistered(peer))
1026 529 : return true;
1027 : }
1028 14 : return false;
1029 : }
1030 :
1031 : //
1032 : // Return true if the IPeer is in sync.
1033 : //
1034 529 : bool BgpUpdateSender::PeerInSync(IPeerUpdate *peer) const {
1035 4761 : BOOST_FOREACH(BgpSenderPartition *partition, partitions_) {
1036 2116 : if (!partition->PeerInSync(peer))
1037 0 : return false;
1038 : }
1039 529 : return true;
1040 : }
1041 :
1042 : //
1043 : // Callback to handle send ready notification for IPeerUpdate. Processing it
1044 : // in the context of bgp::SendeReadyTask ensures that there are no concurrency
1045 : // issues w.r.t. the BgpSenderPartition working on the IPeerUpdate while we are
1046 : // processing the notification.
1047 : //
1048 13 : bool BgpUpdateSender::SendReadyCallback(IPeerUpdate *peer) {
1049 13 : CHECK_CONCURRENCY("bgp::SendReadyTask");
1050 :
1051 111 : BOOST_FOREACH(BgpSenderPartition *partition, partitions_) {
1052 49 : partition->PeerSendReady(peer);
1053 : }
1054 13 : return true;
1055 : }
1056 :
1057 : //
1058 : // Check invariants for the BgpUpdateSender.
1059 : //
1060 59 : bool BgpUpdateSender::CheckInvariants() const {
1061 531 : BOOST_FOREACH(BgpSenderPartition *partition, partitions_) {
1062 236 : if (!partition->CheckInvariants())
1063 0 : return false;
1064 : }
1065 59 : return true;
1066 : }
1067 :
1068 : //
1069 : // Disable all BgpSenderPartitions.
1070 : //
1071 17 : void BgpUpdateSender::DisableProcessing() {
1072 51 : BOOST_FOREACH(BgpSenderPartition *partition, partitions_) {
1073 17 : partition->set_disabled(true);
1074 : }
1075 17 : }
1076 :
1077 : //
1078 : // Enable all BgpSenderPartitions.
1079 : //
1080 17 : void BgpUpdateSender::EnableProcessing() {
1081 51 : BOOST_FOREACH(BgpSenderPartition *partition, partitions_) {
1082 17 : partition->set_disabled(false);
1083 : }
1084 17 : }
|