Line data Source code
1 : /*
2 : * Copyright (c) 2013 Juniper Networks, Inc. All rights reserved.
3 : */
4 :
5 : #include "db/db_table_walker.h"
6 :
7 : #include <atomic>
8 :
9 : #include "base/logging.h"
10 : #include "base/task.h"
11 : #include "db/db.h"
12 : #include "db/db_partition.h"
13 : #include "db/db_table.h"
14 : #include "db/db_table_partition.h"
15 :
16 : int DBTableWalker::max_iteration_to_yield_ = kIterationToYield;
17 :
18 : class DBTableWalker::Walker {
19 : public:
20 : Walker(WalkId id, DBTableWalker *wkmgr, DBTable *table,
21 : const DBRequestKey *key, WalkFn walker,
22 : WalkCompleteFn walk_done, bool postpone_walk);
23 :
24 0 : void StopWalk() {
25 0 : assert(workers_.empty());
26 0 : should_stop_.exchange(true);
27 0 : }
28 :
29 : // Test only - resume walk that was postponed at creation.
30 0 : void ResumeWalk() {
31 0 : assert(!workers_.empty());
32 0 : TaskScheduler *scheduler = TaskScheduler::GetInstance();
33 0 : for (std::vector<Task *>::iterator it = workers_.begin();
34 0 : it != workers_.end(); ++it) {
35 0 : scheduler->Enqueue(*it);
36 : }
37 0 : workers_.clear();
38 0 : }
39 :
40 : WalkId id_;
41 :
42 : // Parent walker manager
43 : DBTableWalker *wkmgr_;
44 :
45 : // Table on which walk is done
46 : DBTable *table_;
47 :
48 : // Take the ownership of key passed
49 : std::unique_ptr<DBRequestKey> key_start_;
50 :
51 : WalkFn walker_fn_;
52 : WalkCompleteFn done_fn_;
53 :
54 : // Will be true if Table walk is cancelled
55 : std::atomic<bool> should_stop_;
56 :
57 : // check whether iteration is completed on all Table Partition
58 : std::atomic<long> status_;
59 :
60 : std::vector<Task *> workers_;
61 1800 : int task_id() const { return wkmgr_->task_id(); }
62 : };
63 :
64 : class DBTableWalker::Worker : public Task {
65 : public:
66 1800 : Worker(Walker *walker, int db_partition_id, const DBRequestKey *key)
67 3600 : : Task(walker->task_id(), db_partition_id), walker_(walker),
68 1800 : key_start_(key) {
69 1800 : tbl_partition_ = static_cast<DBTablePartition *>(
70 1800 : walker_->table_->GetTablePartition(db_partition_id));
71 1800 : }
72 :
73 : virtual bool Run();
74 0 : std::string Description() const { return "DBTableWalker::Worker"; }
75 :
76 : private:
77 : DBTableWalker::Walker *walker_;
78 :
79 : // Store the last visited node to continue walk
80 : std::unique_ptr<DBRequestKey> walk_ctx_;
81 :
82 : // This is where the walk started
83 : const DBRequestKey *key_start_;
84 :
85 : // Table partition for which this worker was created
86 : DBTablePartition *tbl_partition_;
87 : };
88 :
89 162 : static void db_walker_wait() {
90 : static unsigned int walk_sleep_usecs_;
91 : static bool once;
92 :
93 162 : if (!once) {
94 1 : once = true;
95 :
96 1 : char *wait = getenv("DB_WALKER_WAIT_USECS");
97 1 : if (wait) {
98 0 : walk_sleep_usecs_ = (unsigned int) strtoul(wait, NULL, 0);
99 0 : if (walk_sleep_usecs_ > 1000000)
100 0 : walk_sleep_usecs_ = 1000000;
101 : }
102 : }
103 :
104 162 : if (walk_sleep_usecs_) {
105 0 : usleep(walk_sleep_usecs_);
106 : }
107 162 : }
108 :
109 1845 : bool DBTableWalker::Worker::Run() {
110 1845 : int count = 0;
111 : DBRequestKey *key_resume;
112 :
113 : // Check whether Walker was requested to be cancelled
114 1845 : if (walker_->should_stop_) {
115 0 : goto walk_done;
116 : }
117 :
118 : // Check where we left in last iteration
119 1845 : if ((key_resume = walk_ctx_.get()) == NULL) {
120 : // First time invoke of worker thread, start from key_start_
121 1800 : key_resume = const_cast <DBRequestKey *>(key_start_);
122 : }
123 :
124 : DBEntry *entry;
125 1845 : if (key_resume != NULL) {
126 45 : DBTable *table = walker_->table_;
127 45 : std::unique_ptr<const DBEntryBase> start;
128 45 : start = table->AllocEntry(key_resume);
129 : // Find matching or next in sort order
130 45 : entry = tbl_partition_->lower_bound(start.get());
131 45 : } else {
132 1800 : entry = tbl_partition_->GetFirst();
133 : }
134 1845 : if (entry == NULL) {
135 1683 : goto walk_done;
136 : }
137 :
138 324 : for (DBEntry *next = NULL; entry; entry = next) {
139 207 : next = tbl_partition_->GetNext(entry);
140 : // Check whether Walker was requested to be cancelled
141 207 : if (walker_->should_stop_) {
142 0 : break;
143 : }
144 207 : if (count == GetIterationToYield()) {
145 : // store the context
146 45 : walk_ctx_ = entry->GetDBRequestKey();
147 45 : return false;
148 : }
149 :
150 : // Invoke walker function
151 162 : bool more = walker_->walker_fn_(tbl_partition_, entry);
152 162 : if (!more) {
153 0 : break;
154 : }
155 :
156 162 : db_walker_wait();
157 162 : count++;
158 : }
159 :
160 117 : walk_done:
161 : // Check whether all other walks on the table is completed
162 1800 : long num_walkers_on_tpart = walker_->status_.fetch_sub(1);
163 1800 : if (num_walkers_on_tpart == 1) {
164 450 : if (walker_->should_stop_) {
165 0 : walker_->table_->incr_walk_cancel_count();
166 : } else {
167 450 : walker_->table_->incr_walk_complete_count();
168 : // Invoke Walker_Complete callback
169 450 : if (walker_->done_fn_ != NULL) {
170 450 : walker_->done_fn_(walker_->table_);
171 : }
172 : }
173 :
174 : // Release the memory for walker and bitmap
175 450 : walker_->wkmgr_->PurgeWalker(walker_->id_);
176 : }
177 1800 : return true;
178 : }
179 :
180 450 : DBTableWalker::Walker::Walker(WalkId id, DBTableWalker *wkmgr,
181 : DBTable *table, const DBRequestKey *key,
182 : WalkFn walker, WalkCompleteFn walk_done,
183 450 : bool postpone_walk)
184 450 : : id_(id), wkmgr_(wkmgr), table_(table),
185 450 : key_start_(const_cast<DBRequestKey *>(key)),
186 450 : walker_fn_(walker), done_fn_(walk_done) {
187 450 : int num_worker = table->PartitionCount();
188 450 : should_stop_ = false;
189 450 : status_ = num_worker;
190 2250 : for (int i = 0; i < num_worker; i++) {
191 1800 : Worker *task = new Worker(this, i, key);
192 1800 : if (postpone_walk) {
193 0 : workers_.push_back(task);
194 : } else {
195 1800 : TaskScheduler *scheduler = TaskScheduler::GetInstance();
196 1800 : scheduler->Enqueue(task);
197 : }
198 : }
199 450 : }
200 :
201 27401 : DBTableWalker::DBTableWalker(int task_id) : task_id_(task_id) {
202 27401 : if (task_id == -1) {
203 0 : TaskScheduler *scheduler = TaskScheduler::GetInstance();
204 0 : task_id_ = scheduler->GetTaskId("db::DBTable");
205 : }
206 27401 : }
207 :
208 450 : DBTableWalker::WalkId DBTableWalker::WalkTable(DBTable *table,
209 : const DBRequestKey *key_start,
210 : WalkFn walkerfn ,
211 : WalkCompleteFn walk_complete,
212 : bool postpone_walk) {
213 450 : table->incr_walk_request_count();
214 450 : std::scoped_lock lock(walkers_mutex_);
215 450 : size_t i = walker_map_.find_first();
216 450 : if (i == walker_map_.npos) {
217 447 : i = walkers_.size();
218 : Walker *walker = new Walker(i, this, table, key_start,
219 447 : walkerfn, walk_complete, postpone_walk);
220 447 : walkers_.push_back(walker);
221 : } else {
222 3 : walker_map_.reset(i);
223 3 : if (walker_map_.none()) {
224 3 : walker_map_.clear();
225 : }
226 : Walker *walker = new Walker(i, this, table, key_start,
227 3 : walkerfn, walk_complete, postpone_walk);
228 3 : walkers_[i] = walker;
229 : }
230 450 : table->incr_walker_count();
231 450 : return i;
232 450 : }
233 :
234 0 : void DBTableWalker::WalkCancel(WalkId id) {
235 0 : std::scoped_lock lock(walkers_mutex_);
236 0 : walkers_[id]->StopWalk();
237 : // Purge to be called after task has stopped
238 0 : }
239 :
240 0 : void DBTableWalker::WalkResume(WalkId id) {
241 0 : std::scoped_lock lock(walkers_mutex_);
242 0 : walkers_[id]->ResumeWalk();
243 0 : }
244 :
245 450 : void DBTableWalker::PurgeWalker(WalkId id) {
246 450 : std::scoped_lock lock(walkers_mutex_);
247 450 : Walker *walker = walkers_[id];
248 450 : DBTable *table = walker->table_;
249 450 : delete walker;
250 450 : walkers_[id] = NULL;
251 450 : if ((size_t) id == walkers_.size() - 1) {
252 461 : while (!walkers_.empty() && walkers_.back() == NULL) {
253 447 : walkers_.pop_back();
254 : }
255 14 : if (walker_map_.size() > walkers_.size()) {
256 14 : walker_map_.resize(walkers_.size());
257 : }
258 : } else {
259 436 : if ((size_t) id >= walker_map_.size()) {
260 429 : walker_map_.resize(id + 1);
261 : }
262 436 : walker_map_.set(id);
263 : }
264 :
265 : // Retry table deletion when the last walker is purged.
266 450 : if (table->decr_walker_count() == 0) {
267 317 : table->RetryDelete();
268 : }
269 450 : }
|