Line data Source code
1 : /*
2 : * Copyright (c) 2013 Juniper Networks, Inc. All rights reserved.
3 : */
4 :
5 : #include "ifmap/ifmap_update_queue.h"
6 :
7 : #include <boost/checked_delete.hpp>
8 : #include <boost/assign/list_of.hpp>
9 :
10 : #include <sandesh/request_pipeline.h>
11 :
12 : #include "ifmap/ifmap_exporter.h"
13 : #include "ifmap/ifmap_link.h"
14 : #include "ifmap/ifmap_sandesh_context.h"
15 : #include "ifmap/ifmap_server.h"
16 : #include "ifmap/ifmap_server_show_types.h"
17 :
18 : // Convention for SetSequence():
19 : // If we are inserting the item in the middle of the Q, we set its sequence to
20 : // the same value as its successor. If its the last item in the Q, it gets the
21 : // next available sequence number.
22 1686 : void IFMapUpdateQueue::SetSequence(IFMapListEntry *item) {
23 1686 : IFMapListEntry *next = Next(item);
24 1686 : item->set_sequence(next ? next->get_sequence(): ++sequence_);
25 1686 : }
26 :
27 : // Insert 'item' at the end of the list.
28 1664 : void IFMapUpdateQueue::PushbackIntoList(IFMapListEntry *item) {
29 1664 : list_.push_back(*item);
30 1664 : SetSequence(item);
31 1664 : item->set_queue_insert_at_to_now();
32 1664 : }
33 :
34 : // Insert 'item' before 'ptr'.
35 6 : void IFMapUpdateQueue::InsertIntoListBefore(IFMapListEntry *ptr,
36 : IFMapListEntry *item) {
37 12 : list_.insert(list_.iterator_to(*ptr), *item);
38 6 : SetSequence(item);
39 6 : item->set_queue_insert_at_to_now();
40 6 : }
41 :
42 : // Insert 'item' after 'ptr'.
43 16 : void IFMapUpdateQueue::InsertIntoListAfter(IFMapListEntry *ptr,
44 : IFMapListEntry *item) {
45 48 : list_.insert(++list_.iterator_to(*ptr), *item);
46 16 : SetSequence(item);
47 16 : item->set_queue_insert_at_to_now();
48 16 : }
49 :
50 1686 : void IFMapUpdateQueue::EraseFromList(IFMapListEntry *item) {
51 3372 : list_.erase(list_.iterator_to(*item));
52 1686 : item->set_sequence(NULL_SEQUENCE);
53 1686 : }
54 :
55 398 : IFMapUpdateQueue::IFMapUpdateQueue(IFMapServer *server) : server_(server),
56 398 : sequence_(0) {
57 199 : PushbackIntoList(&tail_marker_);
58 199 : }
59 :
60 : struct IFMapListEntryDisposer {
61 0 : void operator()(IFMapListEntry *ptr) {
62 0 : boost::checked_delete(ptr);
63 0 : }
64 : };
65 :
66 199 : void IFMapUpdateQueue::ClearAndDisposeList() {
67 199 : list_.clear_and_dispose(IFMapListEntryDisposer());
68 199 : }
69 :
70 199 : IFMapUpdateQueue::~IFMapUpdateQueue() {
71 199 : EraseFromList(&tail_marker_);
72 199 : ClearAndDisposeList();
73 199 : }
74 :
75 1465 : bool IFMapUpdateQueue::Enqueue(IFMapUpdate *update) {
76 1465 : assert(!update->advertise().empty());
77 1465 : bool tm_last = false;
78 1465 : if (GetLast() == tail_marker()) {
79 91 : tm_last = true;
80 : }
81 1465 : PushbackIntoList(update);
82 1465 : return tm_last;
83 : }
84 :
85 1465 : void IFMapUpdateQueue::Dequeue(IFMapUpdate *update) {
86 1465 : EraseFromList(update);
87 1465 : }
88 :
89 76 : IFMapMarker *IFMapUpdateQueue::GetMarker(int bit) {
90 76 : MarkerMap::iterator loc = marker_map_.find(bit);
91 76 : if (loc == marker_map_.end()) {
92 0 : return NULL;
93 : }
94 76 : return loc->second;
95 : }
96 :
97 100 : void IFMapUpdateQueue::Join(int bit) {
98 100 : IFMapMarker *marker = &tail_marker_;
99 100 : marker->mask.set(bit);
100 100 : marker_map_.insert(std::make_pair(bit, marker));
101 100 : }
102 :
103 32 : void IFMapUpdateQueue::Leave(int bit) {
104 32 : MarkerMap::iterator loc = marker_map_.find(bit);
105 32 : assert(loc != marker_map_.end());
106 32 : IFMapMarker *marker = loc->second;
107 :
108 32 : BitSet reset_bs;
109 32 : reset_bs.set(bit);
110 :
111 : // Start with the first element after the client's marker
112 64 : for (List::iterator iter = list_.iterator_to(*marker), next;
113 132 : iter != list_.end(); iter = next) {
114 34 : IFMapListEntry *item = iter.operator->();
115 34 : next = ++iter;
116 34 : if (item->IsMarker()) {
117 34 : continue;
118 : }
119 0 : IFMapUpdate *update = static_cast<IFMapUpdate *>(item);
120 0 : update->AdvertiseReset(reset_bs);
121 0 : if (update->advertise().empty()) {
122 0 : Dequeue(update);
123 : }
124 :
125 : // Update may be freed.
126 0 : server_->exporter()->StateUpdateOnDequeue(update, reset_bs, true);
127 : }
128 :
129 32 : marker_map_.erase(loc);
130 32 : marker->mask.reset(bit);
131 32 : if ((marker != &tail_marker_) && (marker->mask.empty())) {
132 6 : EraseFromList(marker);
133 6 : delete marker;
134 : }
135 32 : }
136 :
137 8 : void IFMapUpdateQueue::MarkerMerge(IFMapMarker *dst, IFMapMarker *src,
138 : const BitSet &mmove) {
139 : //
140 : // Set the bits in dst and update the MarkerMap. Be sure to set the dst
141 : // before we reset the src since bitset maybe a reference to src->mask.
142 : // Call to operator|=()
143 : //
144 8 : dst->mask |= mmove;
145 8 : for (size_t i = mmove.find_first();
146 20 : i != BitSet::npos; i = mmove.find_next(i)) {
147 12 : MarkerMap::iterator loc = marker_map_.find(i);
148 12 : assert(loc != marker_map_.end());
149 12 : loc->second = dst;
150 : }
151 : // Reset the bits in the src and get rid of it in case it's now empty.
152 8 : src->mask.Reset(mmove);
153 8 : if (src->mask.empty()) {
154 8 : assert(src != &tail_marker_);
155 8 : EraseFromList(src);
156 8 : delete src;
157 : }
158 8 : }
159 :
160 14 : IFMapMarker* IFMapUpdateQueue::MarkerSplit(IFMapMarker *marker,
161 : IFMapListEntry *current,
162 : const BitSet &msplit, bool before) {
163 14 : assert(!msplit.empty());
164 14 : IFMapMarker *new_marker = new IFMapMarker();
165 :
166 : // call to operator=()
167 14 : new_marker->mask = msplit;
168 14 : marker->mask.Reset(msplit);
169 14 : assert(!marker->mask.empty());
170 :
171 14 : for (size_t i = msplit.find_first();
172 28 : i != BitSet::npos; i = msplit.find_next(i)) {
173 14 : MarkerMap::iterator loc = marker_map_.find(i);
174 14 : assert(loc != marker_map_.end());
175 14 : loc->second = new_marker;
176 : }
177 14 : if (before) {
178 : // Insert new_marker before current
179 4 : InsertIntoListBefore(current, new_marker);
180 : } else {
181 : // Insert new_marker after current
182 10 : InsertIntoListAfter(current, new_marker);
183 : }
184 14 : return new_marker;
185 : }
186 :
187 4 : IFMapMarker* IFMapUpdateQueue::MarkerSplitBefore(IFMapMarker *marker,
188 : IFMapListEntry *current,
189 : const BitSet &msplit) {
190 4 : bool before = true;
191 4 : IFMapMarker *ret_marker = MarkerSplit(marker, current, msplit, before);
192 4 : return ret_marker;
193 : }
194 :
195 10 : IFMapMarker* IFMapUpdateQueue::MarkerSplitAfter(IFMapMarker *marker,
196 : IFMapListEntry *current,
197 : const BitSet &msplit) {
198 10 : bool before = false;
199 10 : IFMapMarker *ret_marker = MarkerSplit(marker, current, msplit, before);
200 10 : return ret_marker;
201 : }
202 :
203 : // Insert marker before current
204 2 : void IFMapUpdateQueue::MoveMarkerBefore(IFMapMarker *marker,
205 : IFMapListEntry *current) {
206 2 : if (marker != current) {
207 2 : EraseFromList(marker);
208 2 : InsertIntoListBefore(current, marker);
209 : }
210 2 : }
211 :
212 : // Insert marker after current
213 6 : void IFMapUpdateQueue::MoveMarkerAfter(IFMapMarker *marker,
214 : IFMapListEntry *current) {
215 6 : if (marker != current) {
216 6 : EraseFromList(marker);
217 6 : InsertIntoListAfter(current, marker);
218 : }
219 6 : }
220 :
221 4 : IFMapListEntry *IFMapUpdateQueue::Previous(IFMapListEntry *current) {
222 4 : List::iterator iter = list_.iterator_to(*current);
223 8 : if (iter == list_.begin()) {
224 0 : return NULL;
225 : }
226 : --iter;
227 4 : return iter.operator->();
228 : }
229 :
230 1520 : IFMapListEntry *IFMapUpdateQueue::GetLast() {
231 : // the list must always have the tail_marker
232 1520 : assert(!list_.empty());
233 1520 : List::reverse_iterator riter;
234 1520 : riter = list_.rbegin();
235 3040 : return riter.operator->();
236 : }
237 :
238 2954 : IFMapListEntry * IFMapUpdateQueue::Next(IFMapListEntry *current) {
239 2954 : List::iterator iter = list_.iterator_to(*current);
240 8862 : if (++iter == list_.end()) {
241 1739 : return NULL;
242 : }
243 1215 : return iter.operator->();
244 : }
245 :
246 10 : bool IFMapUpdateQueue::empty() const {
247 30 : return (list_.begin().operator->() == &tail_marker_) &&
248 20 : (list_.rbegin().operator->() == &tail_marker_);
249 : }
250 :
251 50 : int IFMapUpdateQueue::size() const {
252 50 : return (int)list_.size();
253 : }
254 :
255 4 : void IFMapUpdateQueue::PrintQueue() {
256 4 : int i = 0;
257 : IFMapListEntry *item;
258 4 : List::iterator iter = list_.iterator_to(list_.front());
259 44 : while (iter != list_.end()) {
260 18 : item = iter.operator->();
261 18 : if (item->IsMarker()) {
262 10 : IFMapMarker *marker = static_cast<IFMapMarker *>(item);
263 10 : if (marker == &tail_marker_) {
264 4 : std::cout << i << ". Tail Marker: " << item;
265 : } else {
266 6 : std::cout << i << ". Marker: " << item;
267 : }
268 10 : std::cout << " clients:";
269 10 : for (size_t j = marker->mask.find_first();
270 26 : j != BitSet::npos; j = marker->mask.find_next(j)) {
271 16 : std::cout << " " << j;
272 : }
273 10 : std::cout << std::endl;
274 : }
275 18 : if (item->IsUpdate()) {
276 8 : std::cout << i << ". Update: " << item << " ";
277 : }
278 18 : if (item->IsDelete()) {
279 0 : std::cout << i << ". Delete: " << item << " ";
280 : }
281 18 : if (item->IsUpdate() || item->IsDelete()) {
282 8 : IFMapUpdate *update = static_cast<IFMapUpdate *>(item);
283 8 : const IFMapObjectPtr ref = update->data();
284 8 : if (ref.type == IFMapObjectPtr::NODE) {
285 8 : std::cout << "node <";
286 8 : std::cout << ref.u.node->name() << ">" << std::endl;
287 0 : } else if (ref.type == IFMapObjectPtr::LINK) {
288 0 : std::cout << ref.u.link->ToString() << std::endl;
289 : }
290 : }
291 :
292 18 : iter++;
293 18 : i++;
294 : }
295 4 : std::cout << "**End of queue**" << std::endl;
296 4 : }
297 :
298 : // almost everything in this class is static since we dont really want to
299 : // intantiate this class
300 : class ShowIFMapUpdateQueue {
301 : public:
302 : static const int kMaxElementsPerRound = 50;
303 :
304 : struct ShowData : public RequestPipeline::InstData {
305 : std::vector<UpdateQueueShowEntry> send_buffer;
306 : };
307 :
308 0 : static RequestPipeline::InstData *AllocBuffer(int stage) {
309 0 : return static_cast<RequestPipeline::InstData *>(new ShowData);
310 : }
311 :
312 : struct TrackerData : public RequestPipeline::InstData {
313 : // init as 1 indicates we need to init 'first' to begin() since there is
314 : // no way to initialize an iterator here.
315 0 : TrackerData() : init(1) { }
316 : int init;
317 : std::vector<UpdateQueueShowEntry>::const_iterator first;
318 : };
319 :
320 0 : static RequestPipeline::InstData *AllocTracker(int stage) {
321 0 : return static_cast<RequestPipeline::InstData *>(new TrackerData);
322 : }
323 :
324 : static void CopyNode(UpdateQueueShowEntry *dest, IFMapListEntry *src,
325 : IFMapUpdateQueue *queue);
326 : static bool BufferStage(const Sandesh *sr,
327 : const RequestPipeline::PipeSpec ps, int stage,
328 : int instNum, RequestPipeline::InstData *data);
329 : static bool SendStage(const Sandesh *sr, const RequestPipeline::PipeSpec ps,
330 : int stage, int instNum,
331 : RequestPipeline::InstData *data);
332 : };
333 :
334 0 : void ShowIFMapUpdateQueue::CopyNode(UpdateQueueShowEntry *dest,
335 : IFMapListEntry *src,
336 : IFMapUpdateQueue *queue) {
337 0 : if (src->IsUpdate() || src->IsDelete()) {
338 0 : IFMapUpdate *update = static_cast<IFMapUpdate *>(src);
339 0 : const IFMapObjectPtr ref = update->data();
340 0 : if (ref.type == IFMapObjectPtr::NODE) {
341 0 : dest->node_name = "<![CDATA[" + ref.u.node->name() + "]]>";
342 0 : } else if (ref.type == IFMapObjectPtr::LINK) {
343 0 : dest->node_name = "<![CDATA[" + ref.u.link->ToString() + "]]>";
344 : }
345 0 : if (src->IsUpdate()) {
346 0 : dest->qe_type = "Update";
347 : }
348 0 : if (src->IsDelete()) {
349 0 : dest->qe_type = "Delete";
350 : }
351 0 : dest->qe_bitset = update->advertise().ToNumberedString();
352 : }
353 0 : if (src->IsMarker()) {
354 0 : IFMapMarker *marker = static_cast<IFMapMarker *>(src);
355 0 : dest->node_name = "Marker";
356 0 : if (marker == queue->tail_marker()) {
357 0 : dest->qe_type = "Tail-Marker";
358 : } else {
359 0 : dest->qe_type = "Marker";
360 : }
361 0 : dest->qe_bitset = marker->mask.ToNumberedString();
362 : }
363 0 : dest->queue_insert_ago = src->queue_insert_ago_str();
364 0 : dest->sequence = src->get_sequence();
365 0 : }
366 :
367 0 : bool ShowIFMapUpdateQueue::BufferStage(const Sandesh *sr,
368 : const RequestPipeline::PipeSpec ps,
369 : int stage, int instNum,
370 : RequestPipeline::InstData *data) {
371 : const IFMapUpdateQueueShowReq *request =
372 0 : static_cast<const IFMapUpdateQueueShowReq *>(ps.snhRequest_.get());
373 : IFMapSandeshContext *sctx =
374 0 : static_cast<IFMapSandeshContext *>(request->module_context("IFMap"));
375 0 : ShowData *show_data = static_cast<ShowData *>(data);
376 :
377 0 : IFMapUpdateQueue *queue = sctx->ifmap_server()->queue();
378 0 : assert(queue);
379 0 : show_data->send_buffer.reserve(queue->list_.size());
380 :
381 : IFMapUpdateQueue::List::iterator iter =
382 0 : queue->list_.iterator_to(queue->list_.front());
383 0 : while (iter != queue->list_.end()) {
384 0 : IFMapListEntry *item = iter.operator->();
385 :
386 0 : UpdateQueueShowEntry dest;
387 0 : CopyNode(&dest, item, queue);
388 0 : show_data->send_buffer.push_back(dest);
389 :
390 0 : iter++;
391 0 : }
392 :
393 0 : return true;
394 : }
395 :
396 : // Can be called multiple times i.e. approx total/kMaxElementsPerRound
397 0 : bool ShowIFMapUpdateQueue::SendStage(const Sandesh *sr,
398 : const RequestPipeline::PipeSpec ps,
399 : int stage, int instNum,
400 : RequestPipeline::InstData *data) {
401 0 : const RequestPipeline::StageData *prev_stage_data = ps.GetStageData(0);
402 : const ShowIFMapUpdateQueue::ShowData &show_data =
403 : static_cast<const ShowIFMapUpdateQueue::ShowData &>
404 0 : (prev_stage_data->at(0));
405 : // Data for this stage
406 0 : TrackerData *tracker_data = static_cast<TrackerData *>(data);
407 :
408 0 : std::vector<UpdateQueueShowEntry> dest_buffer;
409 0 : std::vector<UpdateQueueShowEntry>::const_iterator first, last;
410 0 : bool more = false;
411 :
412 0 : if (tracker_data->init) {
413 0 : first = show_data.send_buffer.begin();
414 0 : tracker_data->init = 0;
415 : } else {
416 0 : first = tracker_data->first;
417 : }
418 0 : int rem_num = show_data.send_buffer.end() - first;
419 0 : int send_num = (rem_num < kMaxElementsPerRound) ? rem_num :
420 : kMaxElementsPerRound;
421 0 : last = first + send_num;
422 0 : copy(first, last, back_inserter(dest_buffer));
423 : // Decide if we want to be called again.
424 0 : if ((rem_num - send_num) > 0) {
425 0 : more = true;
426 : } else {
427 0 : more = false;
428 : }
429 : const IFMapUpdateQueueShowReq *request =
430 0 : static_cast<const IFMapUpdateQueueShowReq *>(ps.snhRequest_.get());
431 0 : IFMapUpdateQueueShowResp *response = new IFMapUpdateQueueShowResp();
432 0 : response->set_queue(dest_buffer);
433 0 : response->set_context(request->context());
434 0 : response->set_more(more);
435 0 : response->Response();
436 0 : tracker_data->first = first + send_num;
437 :
438 : // Return 'false' to be called again
439 0 : return (!more);
440 0 : }
441 :
442 0 : void IFMapUpdateQueueShowReq::HandleRequest() const {
443 :
444 0 : RequestPipeline::StageSpec s0, s1;
445 0 : TaskScheduler *scheduler = TaskScheduler::GetInstance();
446 :
447 : // 2 stages - first: gather/read, second: send
448 :
449 0 : s0.taskId_ = scheduler->GetTaskId("db::IFMapTable");
450 0 : s0.allocFn_ = ShowIFMapUpdateQueue::AllocBuffer;
451 0 : s0.cbFn_ = ShowIFMapUpdateQueue::BufferStage;
452 0 : s0.instances_.push_back(0);
453 :
454 : // control-node ifmap show command task
455 0 : s1.taskId_ = scheduler->GetTaskId("ifmap::ShowCommandSendStage");
456 0 : s1.allocFn_ = ShowIFMapUpdateQueue::AllocTracker;
457 0 : s1.cbFn_ = ShowIFMapUpdateQueue::SendStage;
458 0 : s1.instances_.push_back(0);
459 :
460 0 : RequestPipeline::PipeSpec ps(this);
461 0 : ps.stages_= boost::assign::list_of(s0)(s1)
462 0 : .convert_to_container<vector<RequestPipeline::StageSpec> >();
463 0 : RequestPipeline rp(ps);
464 0 : }
|