Line data Source code
1 : /* 2 : * Copyright (c) 2016 Juniper Networks, Inc. All rights reserved. 3 : */ 4 : 5 : #include "db/db_table_walk_mgr.h" 6 : 7 : #include <boost/bind/bind.hpp> 8 : #include <boost/foreach.hpp> 9 : 10 : #include "base/logging.h" 11 : #include "base/task.h" 12 : #include "base/task_annotations.h" 13 : #include "db/db.h" 14 : #include "db/db_partition.h" 15 : #include "db/db_table.h" 16 : #include "db/db_table_partition.h" 17 : 18 : using namespace boost::placeholders; 19 : 20 27398 : DBTableWalkMgr::DBTableWalkMgr() 21 54796 : : walk_request_trigger_(new TaskTrigger( 22 : boost::bind(&DBTableWalkMgr::ProcessWalkRequestList, this), 23 54796 : TaskScheduler::GetInstance()->GetTaskId("db::Walker"), 0)), 24 54796 : walk_done_trigger_(new TaskTrigger( 25 : boost::bind(&DBTableWalkMgr::ProcessWalkDone, this), 26 54796 : TaskScheduler::GetInstance()->GetTaskId("db::Walker"), 0)) { 27 27398 : } 28 : 29 328601 : bool DBTableWalkMgr::ProcessWalkRequestList() { 30 328601 : CHECK_CONCURRENCY("db::Walker"); 31 328601 : std::scoped_lock lock(mutex_); 32 328601 : if (!current_table_walk_.empty()) return true; 33 : while (true) { 34 312263 : if (walk_request_list_.empty()) break; 35 226255 : WalkRequestInfoPtr info = walk_request_list_.front(); 36 226255 : walk_request_set_.erase(info.get()); 37 226255 : walk_request_list_.pop_front(); 38 226255 : current_table_walk_.swap(info->pending_requests); 39 226255 : DBTable *table = info->table; 40 226255 : bool walk_table = false; 41 453998 : for (auto walker : current_table_walk_) { 42 227743 : if (walker->stopped()) continue; 43 227741 : walker->set_in_progress(); 44 227741 : walker->reset_walk_again(); 45 227741 : walk_table = true; 46 227743 : } 47 226255 : if (walk_table) { 48 : // start the walk 49 226254 : table->StartWalk(); 50 226254 : break; 51 : } else { 52 1 : current_table_walk_.clear(); 53 : } 54 226256 : } 55 312262 : return true; 56 328601 : } 57 : 58 226254 : bool DBTableWalkMgr::ProcessWalkDone() { 59 226254 : CHECK_CONCURRENCY("db::Walker"); 60 226254 : assert(!current_table_walk_.empty()); 61 453996 : for (auto walker : current_table_walk_) { 62 227742 : if (walker->walk_again()) 63 257 : walker->set_walk_requested(); 64 227485 : else if (!walker->stopped()) 65 227482 : walker->set_walk_done(); 66 227742 : if (walker->stopped() || walker->walk_again()) continue; 67 227482 : walker->walk_complete()(walker, walker->table()); 68 227742 : } 69 226254 : current_table_walk_.clear(); 70 226254 : walk_request_trigger_->Set(); 71 226254 : return true; 72 : } 73 : 74 219262 : DBTable::DBTableWalkRef DBTableWalkMgr::AllocWalker(DBTable *table, 75 : DBTable::WalkFn walk_fn, DBTable::WalkCompleteFn walk_complete) { 76 219262 : table->incr_walker_count(); 77 219262 : DBTableWalk *walker = new DBTableWalk(table, walk_fn, walk_complete); 78 219262 : return DBTable::DBTableWalkRef(walker); 79 : } 80 : 81 157707 : void DBTableWalkMgr::ReleaseWalker(DBTable::DBTableWalkRef &ref) { 82 157707 : ref->set_walk_stopped(); 83 157707 : ref.reset(); 84 157707 : } 85 : 86 29106 : void DBTableWalkMgr::WalkAgain(DBTable::DBTableWalkRef ref) { 87 29106 : WalkTable(ref); 88 29106 : } 89 : 90 255992 : void DBTableWalkMgr::WalkTable(DBTable::DBTableWalkRef walk) { 91 255992 : std::scoped_lock lock(mutex_); 92 255992 : DBTable *table = walk->table(); 93 : 94 255992 : if (walk->in_progress()) { 95 480 : table->incr_walk_again_count(); 96 480 : walk->set_walk_again(); 97 : } else { 98 255512 : table->incr_walk_request_count(); 99 255512 : walk->set_walk_requested(); 100 : } 101 : 102 255992 : WalkRequestInfo tmp_info = WalkRequestInfo(table); 103 255992 : WalkRequestInfoSet::iterator it = walk_request_set_.find(&tmp_info); 104 255992 : if (it != walk_request_set_.end()) { 105 29737 : (*it)->AppendWalkReq(walk); 106 29737 : return; 107 : } 108 : 109 226255 : WalkRequestInfo *new_info = new WalkRequestInfo(table); 110 226255 : new_info->AppendWalkReq(walk); 111 226255 : walk_request_list_.push_back(WalkRequestInfoPtr(new_info)); 112 226255 : walk_request_set_.insert(new_info); 113 226255 : walk_request_trigger_->Set(); 114 285729 : } 115 : 116 226254 : void DBTableWalkMgr::WalkDone() { 117 226254 : walk_done_trigger_->Set(); 118 226254 : } 119 : 120 792193 : bool DBTableWalkMgr::InvokeWalkCb(DBTablePartBase *part, DBEntryBase *entry) { 121 792193 : uint32_t skip_walk_count = 0; 122 1586683 : for (auto walker : current_table_walk_) { 123 794395 : if (walker->done() || walker->stopped() || walker->walk_again()) { 124 381 : skip_walk_count++; 125 381 : continue; 126 : } 127 793526 : bool more = walker->walk_fn()(part, entry); 128 793721 : if (!more) { 129 6 : skip_walk_count++; 130 6 : if (!walker->stopped()) walker->set_walk_done(); 131 : } 132 794101 : } 133 792186 : return (skip_walk_count < current_table_walk_.size()); 134 : }