Line data Source code
1 : /* 2 : * Copyright (c) 2013 Juniper Networks, Inc. All rights reserved. 3 : */ 4 : 5 : #include "db/db_partition.h" 6 : 7 : #include <list> 8 : #include <mutex> 9 : #include <atomic> 10 : 11 : #include <tbb/concurrent_queue.h> 12 : 13 : #include "base/task.h" 14 : #include "db/db.h" 15 : #include "db/db_client.h" 16 : #include "db/db_entry.h" 17 : 18 : using tbb::concurrent_queue; 19 : 20 : struct RequestQueueEntry { 21 : // Constructor takes ownership of DBRequest key, data. 22 1131790 : RequestQueueEntry(DBTablePartBase *tpart, DBClient *client, DBRequest *req) 23 1131790 : : tpart(tpart), client(client) { 24 1131780 : request.Swap(req); 25 1131758 : } 26 : DBTablePartBase *tpart; 27 : DBClient *client; 28 : DBRequest request; 29 : }; 30 : 31 : struct RemoveQueueEntry { 32 513654 : RemoveQueueEntry(DBTablePartBase *tpart, DBEntryBase *db_entry) 33 513654 : : tpart(tpart), db_entry(db_entry) { 34 513654 : } 35 : DBTablePartBase *tpart; 36 : DBEntryBase *db_entry; 37 : }; 38 : 39 : class DBPartition::WorkQueue { 40 : public: 41 : static const int kThreshold = 1024; 42 : typedef concurrent_queue<RequestQueueEntry *> RequestQueue; 43 : typedef concurrent_queue<RemoveQueueEntry *> RemoveQueue; 44 : typedef std::list<DBTablePartBase *> TablePartList; 45 : 46 109306 : explicit WorkQueue(DBPartition *partition, int partition_id) 47 109306 : : db_partition_(partition), 48 109306 : db_partition_id_(partition_id), 49 109306 : disable_(false), 50 109306 : running_(false) { 51 109306 : request_count_ = 0; 52 109306 : max_request_queue_len_ = 0; 53 109306 : total_request_count_ = 0; 54 109306 : } 55 109242 : ~WorkQueue() { 56 109242 : for (RequestQueue::iterator iter = request_queue_.unsafe_begin(); 57 109242 : iter != request_queue_.unsafe_end();) { 58 0 : RequestQueueEntry *req_entry = *iter; 59 0 : ++iter; 60 0 : delete req_entry; 61 109242 : } 62 109242 : request_queue_.clear(); 63 109242 : } 64 : 65 1131760 : bool EnqueueRequest(RequestQueueEntry *req_entry) { 66 1131760 : request_queue_.push(req_entry); 67 1131830 : MaybeStartRunner(); 68 1131841 : uint32_t max = request_count_.fetch_add(1); 69 1131841 : if (max > max_request_queue_len_) 70 156101 : max_request_queue_len_ = max; 71 1131841 : total_request_count_++; 72 1131841 : return max < (kThreshold - 1); 73 : 74 : } 75 : 76 1968090 : bool DequeueRequest(RequestQueueEntry **req_entry) { 77 1968090 : bool success = request_queue_.try_pop(*req_entry); 78 1967936 : if (success) { 79 1131835 : request_count_.fetch_sub(1); 80 : } 81 1967936 : return success; 82 : } 83 : 84 513745 : void EnqueueRemove(RemoveQueueEntry *rm_entry) { 85 513745 : remove_queue_.push(rm_entry); 86 513772 : MaybeStartRunner(); 87 513776 : } 88 : 89 1354282 : bool DequeueRemove(RemoveQueueEntry **rm_entry) { 90 1354282 : bool success = remove_queue_.try_pop(*rm_entry); 91 1354060 : return success; 92 : } 93 : 94 : void MaybeStartRunnerUnlocked(); 95 : void MaybeStartRunner(); 96 : bool RunnerDone(); 97 : 98 : // Normally called from single task that either runs in DB context or is 99 : // exclusive with DB task, but can be called concurrently from multiple 100 : // bgp::ConfigHelper tasks. 101 1676879 : void SetActive(DBTablePartBase *tpart) { 102 1676879 : std::scoped_lock lock(mutex_); 103 1677049 : change_list_.push_back(tpart); 104 1676801 : MaybeStartRunnerUnlocked(); 105 1676875 : } 106 : 107 2513166 : DBTablePartBase *GetActiveTable() { 108 2513166 : DBTablePartBase *tpart = NULL; 109 2513166 : if (!change_list_.empty()) { 110 1676978 : tpart = change_list_.front(); 111 1676940 : change_list_.pop_front(); 112 : } 113 2513233 : return tpart; 114 : } 115 : 116 793353 : int db_partition_id() { 117 793353 : return db_partition_id_; 118 : } 119 : 120 793345 : int db_task_id() const { return db_partition_->task_id(); } 121 : 122 6254922 : bool IsDBQueueEmpty() const { 123 6254922 : return (request_queue_.empty() && change_list_.empty()); 124 : } 125 : 126 843918 : bool disable() { return disable_; } 127 90 : void set_disable(bool disable) { disable_ = disable; } 128 : 129 0 : long request_queue_len() const { 130 0 : return request_count_; 131 : } 132 : 133 0 : uint64_t total_request_count() const { 134 0 : return total_request_count_; 135 : } 136 : 137 0 : uint64_t max_request_queue_len() const { 138 0 : return max_request_queue_len_; 139 : } 140 : 141 : private: 142 : DBPartition *db_partition_; 143 : RequestQueue request_queue_; 144 : TablePartList change_list_; 145 : std::atomic<long> request_count_; 146 : uint64_t total_request_count_; 147 : uint64_t max_request_queue_len_; 148 : RemoveQueue remove_queue_; 149 : std::mutex mutex_; 150 : int db_partition_id_; 151 : bool disable_; 152 : bool running_; 153 : 154 : DISALLOW_COPY_AND_ASSIGN(WorkQueue); 155 : }; 156 : 157 6254922 : bool DBPartition::IsDBQueueEmpty() const { 158 6254922 : return work_queue_->IsDBQueueEmpty(); 159 : } 160 : 161 90 : void DBPartition::SetQueueDisable(bool disable) { 162 90 : if (disable) { 163 45 : work_queue_->set_disable(true); 164 : } else { 165 45 : work_queue_->set_disable(false); 166 45 : work_queue_->MaybeStartRunner(); 167 : } 168 90 : } 169 : 170 : class DBPartition::QueueRunner : public Task { 171 : public: 172 : static const int kMaxIterations = 32; 173 793356 : QueueRunner(WorkQueue *queue) 174 793356 : : Task(queue->db_task_id(), queue->db_partition_id()), 175 793356 : queue_(queue) { 176 793343 : } 177 : 178 843915 : virtual bool Run() { 179 843915 : int count = 0; 180 : 181 : // Skip if the queue is disabled. 182 843915 : if (queue_->disable()) 183 1130 : return queue_->RunnerDone(); 184 : 185 842790 : RemoveQueueEntry *rm_entry = NULL; 186 1354314 : while (queue_->DequeueRemove(&rm_entry)) { 187 513656 : DBEntryBase *db_entry = rm_entry->db_entry; 188 : { 189 : tbb::spin_rw_mutex::scoped_lock 190 513656 : lock(rm_entry->tpart->dbstate_mutex(), false); 191 1027482 : if (!db_entry->IsDeleted() || db_entry->is_onlist() || 192 513715 : !db_entry->is_state_empty_unlocked(rm_entry->tpart)) { 193 43 : db_entry->ClearOnRemoveQ(); 194 43 : db_entry = NULL; 195 : } 196 513752 : } 197 513809 : if (db_entry) { 198 513766 : rm_entry->tpart->Remove(db_entry); 199 : } 200 513780 : delete rm_entry; 201 513798 : if (++count == kMaxIterations) { 202 2274 : return false; 203 : } 204 : } 205 : 206 840401 : RequestQueueEntry *req_entry = NULL; 207 1968014 : while (queue_->DequeueRequest(&req_entry)) { 208 1131823 : req_entry->tpart->Process(req_entry->client, &req_entry->request); 209 1131789 : delete req_entry; 210 1131819 : if (++count == kMaxIterations) { 211 4206 : return false; 212 : } 213 : } 214 : 215 : while (true) { 216 2513153 : DBTablePartBase *tpart = queue_->GetActiveTable(); 217 2513224 : if (tpart == NULL) { 218 836283 : break; 219 : } 220 1676941 : bool done = tpart->RunNotify(); 221 1676984 : if (!done) { 222 46 : return false; 223 : } 224 1676938 : } 225 : 226 : // Running is done only if queue_ is empty. It's possible that more 227 : // entries are added into in the input or remove queues during the 228 : // time we were processing those queues. 229 836283 : return queue_->RunnerDone(); 230 : } 231 : 232 0 : std::string Description() const { 233 0 : return "DBPartition QueueRunner"; 234 : } 235 : private: 236 : WorkQueue *queue_; 237 : }; 238 : 239 3322324 : void DBPartition::WorkQueue::MaybeStartRunnerUnlocked() { 240 3322324 : if (running_) { 241 2529108 : return; 242 : } 243 793216 : running_ = true; 244 793216 : QueueRunner *runner = new QueueRunner(this); 245 793339 : TaskScheduler *scheduler = TaskScheduler::GetInstance(); 246 793329 : scheduler->Enqueue(runner); 247 : } 248 : 249 1645638 : void DBPartition::WorkQueue::MaybeStartRunner() { 250 1645638 : std::scoped_lock lock(mutex_); 251 1645633 : MaybeStartRunnerUnlocked(); 252 1645661 : } 253 : 254 837412 : bool DBPartition::WorkQueue::RunnerDone() { 255 837412 : std::scoped_lock lock(mutex_); 256 837448 : if (disable_ || (request_queue_.empty() && remove_queue_.empty())) { 257 793348 : running_ = false; 258 793348 : return true; 259 : } 260 : 261 44032 : running_ = true; 262 44032 : return false; 263 837380 : } 264 : 265 109306 : DBPartition::DBPartition(DB *db, int partition_id) 266 109306 : : db_(db), work_queue_(new WorkQueue(this, partition_id)) { 267 109306 : } 268 : 269 : // The DBPartition destructor needs to be defined after WorkQueue has 270 : // been declared. 271 109242 : DBPartition::~DBPartition() { 272 109242 : } 273 : 274 1131781 : bool DBPartition::EnqueueRequest(DBTablePartBase *tpart, DBClient *client, 275 : DBRequest *req) { 276 1131781 : RequestQueueEntry *entry = new RequestQueueEntry(tpart, client, req); 277 1131758 : return work_queue_->EnqueueRequest(entry); 278 : } 279 : 280 513644 : void DBPartition::EnqueueRemove(DBTablePartBase *tpart, DBEntryBase *db_entry) { 281 513644 : RemoveQueueEntry *entry = new RemoveQueueEntry(tpart, db_entry); 282 513655 : db_entry->SetOnRemoveQ(); 283 513808 : work_queue_->EnqueueRemove(entry); 284 513775 : } 285 : 286 : // concurrency: called from DBPartition task. 287 1676908 : void DBPartition::OnTableChange(DBTablePartBase *tablepart) { 288 1676908 : work_queue_->SetActive(tablepart); 289 1677029 : } 290 : 291 0 : long DBPartition::request_queue_len() const { 292 0 : return work_queue_->request_queue_len(); 293 : } 294 : 295 0 : uint64_t DBPartition::total_request_count() const { 296 0 : return work_queue_->total_request_count(); 297 : } 298 : 299 0 : uint64_t DBPartition::max_request_queue_len() const { 300 0 : return work_queue_->max_request_queue_len(); 301 : } 302 : 303 793345 : int DBPartition::task_id() const { 304 793345 : return db_->task_id(); 305 : }