LCOV - code coverage report
Current view: top level - db - db_partition.cc (source / functions) Hit Total Coverage
Test: OpenSDN C/C++ coverage (all TARGET_SET jobs) Lines: 141 158 89.2 %
Date: 2026-06-18 01:51:13 Functions: 28 35 80.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*
       2             :  * Copyright (c) 2013 Juniper Networks, Inc. All rights reserved.
       3             :  */
       4             : 
       5             : #include "db/db_partition.h"
       6             : 
       7             : #include <list>
       8             : #include <mutex>
       9             : #include <atomic>
      10             : 
      11             : #include <tbb/concurrent_queue.h>
      12             : 
      13             : #include "base/task.h"
      14             : #include "db/db.h"
      15             : #include "db/db_client.h"
      16             : #include "db/db_entry.h"
      17             : 
      18             : using tbb::concurrent_queue;
      19             : 
      20             : struct RequestQueueEntry {
      21             :     // Constructor takes ownership of DBRequest key, data.
      22     1131790 :     RequestQueueEntry(DBTablePartBase *tpart, DBClient *client, DBRequest *req)
      23     1131790 :         : tpart(tpart), client(client) {
      24     1131780 :         request.Swap(req);
      25     1131758 :     }
      26             :     DBTablePartBase *tpart;
      27             :     DBClient *client;
      28             :     DBRequest request;
      29             : };
      30             : 
      31             : struct RemoveQueueEntry {
      32      513654 :     RemoveQueueEntry(DBTablePartBase *tpart, DBEntryBase *db_entry)
      33      513654 :         : tpart(tpart), db_entry(db_entry) {
      34      513654 :     }
      35             :     DBTablePartBase *tpart;
      36             :     DBEntryBase *db_entry;
      37             : };
      38             : 
      39             : class DBPartition::WorkQueue {
      40             : public:
      41             :     static const int kThreshold = 1024;
      42             :     typedef concurrent_queue<RequestQueueEntry *> RequestQueue;
      43             :     typedef concurrent_queue<RemoveQueueEntry *> RemoveQueue;
      44             :     typedef std::list<DBTablePartBase *> TablePartList;
      45             : 
      46      109306 :     explicit WorkQueue(DBPartition *partition, int partition_id)
      47      109306 :         : db_partition_(partition),
      48      109306 :           db_partition_id_(partition_id),
      49      109306 :           disable_(false),
      50      109306 :           running_(false) {
      51      109306 :         request_count_ = 0;
      52      109306 :         max_request_queue_len_ = 0;
      53      109306 :         total_request_count_ = 0;
      54      109306 :     }
      55      109242 :     ~WorkQueue() {
      56      109242 :         for (RequestQueue::iterator iter = request_queue_.unsafe_begin();
      57      109242 :              iter != request_queue_.unsafe_end();) {
      58           0 :             RequestQueueEntry *req_entry = *iter;
      59           0 :             ++iter;
      60           0 :             delete req_entry;
      61      109242 :         }
      62      109242 :         request_queue_.clear();
      63      109242 :     }
      64             : 
      65     1131760 :     bool EnqueueRequest(RequestQueueEntry *req_entry) {
      66     1131760 :         request_queue_.push(req_entry);
      67     1131830 :         MaybeStartRunner();
      68     1131841 :         uint32_t max = request_count_.fetch_add(1);
      69     1131841 :         if (max > max_request_queue_len_)
      70      156101 :             max_request_queue_len_ = max;
      71     1131841 :         total_request_count_++;
      72     1131841 :         return max < (kThreshold - 1);
      73             : 
      74             :     }
      75             : 
      76     1968090 :     bool DequeueRequest(RequestQueueEntry **req_entry) {
      77     1968090 :         bool success = request_queue_.try_pop(*req_entry);
      78     1967936 :         if (success) {
      79     1131835 :             request_count_.fetch_sub(1);
      80             :         }
      81     1967936 :         return success;
      82             :     }
      83             : 
      84      513745 :     void EnqueueRemove(RemoveQueueEntry *rm_entry) {
      85      513745 :         remove_queue_.push(rm_entry);
      86      513772 :         MaybeStartRunner();
      87      513776 :     }
      88             : 
      89     1354282 :     bool DequeueRemove(RemoveQueueEntry **rm_entry) {
      90     1354282 :         bool success = remove_queue_.try_pop(*rm_entry);
      91     1354060 :         return success;
      92             :     }
      93             : 
      94             :     void MaybeStartRunnerUnlocked();
      95             :     void MaybeStartRunner();
      96             :     bool RunnerDone();
      97             : 
      98             :     // Normally called from single task that either runs in DB context or is
      99             :     // exclusive with DB task, but can be called concurrently from multiple
     100             :     // bgp::ConfigHelper tasks.
     101     1676879 :     void SetActive(DBTablePartBase *tpart) {
     102     1676879 :         std::scoped_lock lock(mutex_);
     103     1677049 :         change_list_.push_back(tpart);
     104     1676801 :         MaybeStartRunnerUnlocked();
     105     1676875 :     }
     106             : 
     107     2513166 :     DBTablePartBase *GetActiveTable() {
     108     2513166 :         DBTablePartBase *tpart = NULL;
     109     2513166 :         if (!change_list_.empty()) {
     110     1676978 :             tpart = change_list_.front();
     111     1676940 :             change_list_.pop_front();
     112             :         }
     113     2513233 :         return tpart;
     114             :     }
     115             : 
     116      793353 :     int db_partition_id() {
     117      793353 :         return db_partition_id_;
     118             :     }
     119             : 
     120      793345 :     int db_task_id() const { return db_partition_->task_id(); }
     121             : 
     122     6254922 :     bool IsDBQueueEmpty() const {
     123     6254922 :         return (request_queue_.empty() && change_list_.empty());
     124             :     }
     125             : 
     126      843918 :     bool disable() { return disable_; }
     127          90 :     void set_disable(bool disable) { disable_ = disable; }
     128             : 
     129           0 :     long request_queue_len() const {
     130           0 :         return request_count_;
     131             :     }
     132             : 
     133           0 :     uint64_t total_request_count() const {
     134           0 :         return total_request_count_;
     135             :     }
     136             : 
     137           0 :     uint64_t max_request_queue_len() const {
     138           0 :         return max_request_queue_len_;
     139             :     }
     140             : 
     141             : private:
     142             :     DBPartition *db_partition_;
     143             :     RequestQueue request_queue_;
     144             :     TablePartList change_list_;
     145             :     std::atomic<long> request_count_;
     146             :     uint64_t total_request_count_;
     147             :     uint64_t max_request_queue_len_;
     148             :     RemoveQueue remove_queue_;
     149             :     std::mutex mutex_;
     150             :     int db_partition_id_;
     151             :     bool disable_;
     152             :     bool running_;
     153             : 
     154             :     DISALLOW_COPY_AND_ASSIGN(WorkQueue);
     155             : };
     156             : 
     157     6254922 : bool DBPartition::IsDBQueueEmpty() const {
     158     6254922 :     return work_queue_->IsDBQueueEmpty();
     159             : }
     160             : 
     161          90 : void DBPartition::SetQueueDisable(bool disable) {
     162          90 :     if (disable) {
     163          45 :         work_queue_->set_disable(true);
     164             :     } else {
     165          45 :         work_queue_->set_disable(false);
     166          45 :         work_queue_->MaybeStartRunner();
     167             :     }
     168          90 : }
     169             : 
     170             : class DBPartition::QueueRunner : public Task {
     171             : public:
     172             :     static const int kMaxIterations = 32;
     173      793356 :     QueueRunner(WorkQueue *queue)
     174      793356 :         : Task(queue->db_task_id(), queue->db_partition_id()),
     175      793356 :           queue_(queue) {
     176      793343 :     }
     177             : 
     178      843915 :     virtual bool Run() {
     179      843915 :         int count = 0;
     180             : 
     181             :         // Skip if the queue is disabled.
     182      843915 :         if (queue_->disable())
     183        1130 :             return queue_->RunnerDone();
     184             : 
     185      842790 :         RemoveQueueEntry *rm_entry = NULL;
     186     1354314 :         while (queue_->DequeueRemove(&rm_entry)) {
     187      513656 :             DBEntryBase *db_entry = rm_entry->db_entry;
     188             :             {
     189             :                 tbb::spin_rw_mutex::scoped_lock
     190      513656 :                     lock(rm_entry->tpart->dbstate_mutex(), false);
     191     1027482 :                 if (!db_entry->IsDeleted() || db_entry->is_onlist() ||
     192      513715 :                     !db_entry->is_state_empty_unlocked(rm_entry->tpart)) {
     193          43 :                     db_entry->ClearOnRemoveQ();
     194          43 :                     db_entry = NULL;
     195             :                 }
     196      513752 :             }
     197      513809 :             if (db_entry) {
     198      513766 :                 rm_entry->tpart->Remove(db_entry);
     199             :             }
     200      513780 :             delete rm_entry;
     201      513798 :             if (++count == kMaxIterations) {
     202        2274 :                 return false;
     203             :             }
     204             :         }
     205             : 
     206      840401 :         RequestQueueEntry *req_entry = NULL;
     207     1968014 :         while (queue_->DequeueRequest(&req_entry)) {
     208     1131823 :             req_entry->tpart->Process(req_entry->client, &req_entry->request);
     209     1131789 :             delete req_entry;
     210     1131819 :             if (++count == kMaxIterations) {
     211        4206 :                 return false;
     212             :             }
     213             :         }
     214             : 
     215             :         while (true) {
     216     2513153 :             DBTablePartBase *tpart = queue_->GetActiveTable();
     217     2513224 :             if (tpart == NULL) {
     218      836283 :                 break;
     219             :             }
     220     1676941 :             bool done = tpart->RunNotify();
     221     1676984 :             if (!done) {
     222          46 :                 return false;
     223             :             }
     224     1676938 :         }
     225             : 
     226             :         // Running is done only if queue_ is empty. It's possible that more
     227             :         // entries are added into in the input or remove queues during the
     228             :         // time we were processing those queues.
     229      836283 :         return queue_->RunnerDone();
     230             :     }
     231             : 
     232           0 :     std::string Description() const {
     233           0 :         return "DBPartition QueueRunner";
     234             :     }
     235             : private:
     236             :     WorkQueue *queue_;
     237             : };
     238             : 
     239     3322324 : void DBPartition::WorkQueue::MaybeStartRunnerUnlocked() {
     240     3322324 :     if (running_) {
     241     2529108 :         return;
     242             :     }
     243      793216 :     running_ = true;
     244      793216 :     QueueRunner *runner = new QueueRunner(this);
     245      793339 :     TaskScheduler *scheduler = TaskScheduler::GetInstance();
     246      793329 :     scheduler->Enqueue(runner);
     247             : }
     248             : 
     249     1645638 : void DBPartition::WorkQueue::MaybeStartRunner() {
     250     1645638 :     std::scoped_lock lock(mutex_);
     251     1645633 :     MaybeStartRunnerUnlocked();
     252     1645661 : }
     253             : 
     254      837412 : bool DBPartition::WorkQueue::RunnerDone() {
     255      837412 :     std::scoped_lock lock(mutex_);
     256      837448 :     if (disable_ || (request_queue_.empty() && remove_queue_.empty())) {
     257      793348 :         running_ = false;
     258      793348 :         return true;
     259             :     }
     260             : 
     261       44032 :     running_ = true;
     262       44032 :     return false;
     263      837380 : }
     264             : 
     265      109306 : DBPartition::DBPartition(DB *db, int partition_id)
     266      109306 :     : db_(db), work_queue_(new WorkQueue(this, partition_id)) {
     267      109306 : }
     268             : 
     269             : // The DBPartition destructor needs to be defined after WorkQueue has
     270             : // been declared.
     271      109242 : DBPartition::~DBPartition() {
     272      109242 : }
     273             : 
     274     1131781 : bool DBPartition::EnqueueRequest(DBTablePartBase *tpart, DBClient *client,
     275             :                                  DBRequest *req) {
     276     1131781 :     RequestQueueEntry *entry = new RequestQueueEntry(tpart, client, req);
     277     1131758 :     return work_queue_->EnqueueRequest(entry);
     278             : }
     279             : 
     280      513644 : void DBPartition::EnqueueRemove(DBTablePartBase *tpart, DBEntryBase *db_entry) {
     281      513644 :     RemoveQueueEntry *entry = new RemoveQueueEntry(tpart, db_entry);
     282      513655 :     db_entry->SetOnRemoveQ();
     283      513808 :     work_queue_->EnqueueRemove(entry);
     284      513775 : }
     285             : 
     286             : // concurrency: called from DBPartition task.
     287     1676908 : void DBPartition::OnTableChange(DBTablePartBase *tablepart) {
     288     1676908 :     work_queue_->SetActive(tablepart);
     289     1677029 : }
     290             : 
     291           0 : long DBPartition::request_queue_len() const {
     292           0 :     return work_queue_->request_queue_len();
     293             : }
     294             : 
     295           0 : uint64_t DBPartition::total_request_count() const {
     296           0 :     return work_queue_->total_request_count();
     297             : }
     298             : 
     299           0 : uint64_t DBPartition::max_request_queue_len() const {
     300           0 :     return work_queue_->max_request_queue_len();
     301             : }
     302             : 
     303      793345 : int DBPartition::task_id() const {
     304      793345 :     return db_->task_id();
     305             : }

Generated by: LCOV version 1.14