Line data Source code
1 : /* 2 : * Copyright (c) 2016 Juniper Networks, Inc. All rights reserved. 3 : */ 4 : 5 : #include "db/db_table_walk_mgr.h" 6 : 7 : #include <boost/bind/bind.hpp> 8 : #include <boost/foreach.hpp> 9 : 10 : #include "base/logging.h" 11 : #include "base/task.h" 12 : #include "base/task_annotations.h" 13 : #include "db/db.h" 14 : #include "db/db_partition.h" 15 : #include "db/db_table.h" 16 : #include "db/db_table_partition.h" 17 : 18 : using namespace boost::placeholders; 19 : 20 27401 : DBTableWalkMgr::DBTableWalkMgr() 21 54802 : : walk_request_trigger_(new TaskTrigger( 22 : boost::bind(&DBTableWalkMgr::ProcessWalkRequestList, this), 23 54802 : TaskScheduler::GetInstance()->GetTaskId("db::Walker"), 0)), 24 54802 : walk_done_trigger_(new TaskTrigger( 25 : boost::bind(&DBTableWalkMgr::ProcessWalkDone, this), 26 54802 : TaskScheduler::GetInstance()->GetTaskId("db::Walker"), 0)) { 27 27401 : } 28 : 29 330136 : bool DBTableWalkMgr::ProcessWalkRequestList() { 30 330136 : CHECK_CONCURRENCY("db::Walker"); 31 330136 : std::scoped_lock lock(mutex_); 32 330136 : if (!current_table_walk_.empty()) return true; 33 : while (true) { 34 313719 : if (walk_request_list_.empty()) break; 35 227480 : WalkRequestInfoPtr info = walk_request_list_.front(); 36 227480 : walk_request_set_.erase(info.get()); 37 227480 : walk_request_list_.pop_front(); 38 227480 : current_table_walk_.swap(info->pending_requests); 39 227480 : DBTable *table = info->table; 40 227480 : bool walk_table = false; 41 456649 : for (auto walker : current_table_walk_) { 42 229169 : if (walker->stopped()) continue; 43 229167 : walker->set_in_progress(); 44 229167 : walker->reset_walk_again(); 45 229167 : walk_table = true; 46 229169 : } 47 227480 : if (walk_table) { 48 : // start the walk 49 227479 : table->StartWalk(); 50 227479 : break; 51 : } else { 52 1 : current_table_walk_.clear(); 53 : } 54 227481 : } 55 313718 : return true; 56 330136 : } 57 : 58 227479 : bool DBTableWalkMgr::ProcessWalkDone() { 59 227479 : CHECK_CONCURRENCY("db::Walker"); 60 227479 : assert(!current_table_walk_.empty()); 61 456647 : for (auto walker : current_table_walk_) { 62 229168 : if (walker->walk_again()) 63 263 : walker->set_walk_requested(); 64 228905 : else if (!walker->stopped()) 65 228902 : walker->set_walk_done(); 66 229168 : if (walker->stopped() || walker->walk_again()) continue; 67 228902 : walker->walk_complete()(walker, walker->table()); 68 229168 : } 69 227479 : current_table_walk_.clear(); 70 227479 : walk_request_trigger_->Set(); 71 227479 : return true; 72 : } 73 : 74 220650 : DBTable::DBTableWalkRef DBTableWalkMgr::AllocWalker(DBTable *table, 75 : DBTable::WalkFn walk_fn, DBTable::WalkCompleteFn walk_complete) { 76 220650 : table->incr_walker_count(); 77 220650 : DBTableWalk *walker = new DBTableWalk(table, walk_fn, walk_complete); 78 220650 : return DBTable::DBTableWalkRef(walker); 79 : } 80 : 81 158467 : void DBTableWalkMgr::ReleaseWalker(DBTable::DBTableWalkRef &ref) { 82 158467 : ref->set_walk_stopped(); 83 158467 : ref.reset(); 84 158467 : } 85 : 86 30233 : void DBTableWalkMgr::WalkAgain(DBTable::DBTableWalkRef ref) { 87 30233 : WalkTable(ref); 88 30233 : } 89 : 90 257614 : void DBTableWalkMgr::WalkTable(DBTable::DBTableWalkRef walk) { 91 257614 : std::scoped_lock lock(mutex_); 92 257614 : DBTable *table = walk->table(); 93 : 94 257614 : if (walk->in_progress()) { 95 483 : table->incr_walk_again_count(); 96 483 : walk->set_walk_again(); 97 : } else { 98 257131 : table->incr_walk_request_count(); 99 257131 : walk->set_walk_requested(); 100 : } 101 : 102 257614 : WalkRequestInfo tmp_info = WalkRequestInfo(table); 103 257614 : WalkRequestInfoSet::iterator it = walk_request_set_.find(&tmp_info); 104 257614 : if (it != walk_request_set_.end()) { 105 30134 : (*it)->AppendWalkReq(walk); 106 30134 : return; 107 : } 108 : 109 227480 : WalkRequestInfo *new_info = new WalkRequestInfo(table); 110 227480 : new_info->AppendWalkReq(walk); 111 227480 : walk_request_list_.push_back(WalkRequestInfoPtr(new_info)); 112 227480 : walk_request_set_.insert(new_info); 113 227480 : walk_request_trigger_->Set(); 114 287748 : } 115 : 116 227479 : void DBTableWalkMgr::WalkDone() { 117 227479 : walk_done_trigger_->Set(); 118 227479 : } 119 : 120 797034 : bool DBTableWalkMgr::InvokeWalkCb(DBTablePartBase *part, DBEntryBase *entry) { 121 797034 : uint32_t skip_walk_count = 0; 122 1596513 : for (auto walker : current_table_walk_) { 123 799400 : if (walker->done() || walker->stopped() || walker->walk_again()) { 124 371 : skip_walk_count++; 125 371 : continue; 126 : } 127 798612 : bool more = walker->walk_fn()(part, entry); 128 798763 : if (!more) { 129 2 : skip_walk_count++; 130 2 : if (!walker->stopped()) walker->set_walk_done(); 131 : } 132 799136 : } 133 797028 : return (skip_walk_count < current_table_walk_.size()); 134 : }