LCOV - code coverage report
Current view: top level - db - db_partition.cc (source / functions) Hit Total Coverage
Test: OpenSDN C/C++ coverage (all TARGET_SET jobs) Lines: 141 158 89.2 %
Date: 2026-06-11 01:56:02 Functions: 28 35 80.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*
       2             :  * Copyright (c) 2013 Juniper Networks, Inc. All rights reserved.
       3             :  */
       4             : 
       5             : #include "db/db_partition.h"
       6             : 
       7             : #include <list>
       8             : #include <mutex>
       9             : #include <atomic>
      10             : 
      11             : #include <tbb/concurrent_queue.h>
      12             : 
      13             : #include "base/task.h"
      14             : #include "db/db.h"
      15             : #include "db/db_client.h"
      16             : #include "db/db_entry.h"
      17             : 
      18             : using tbb::concurrent_queue;
      19             : 
      20             : struct RequestQueueEntry {
      21             :     // Constructor takes ownership of DBRequest key, data.
      22     1133585 :     RequestQueueEntry(DBTablePartBase *tpart, DBClient *client, DBRequest *req)
      23     1133585 :         : tpart(tpart), client(client) {
      24     1133573 :         request.Swap(req);
      25     1133536 :     }
      26             :     DBTablePartBase *tpart;
      27             :     DBClient *client;
      28             :     DBRequest request;
      29             : };
      30             : 
      31             : struct RemoveQueueEntry {
      32      517425 :     RemoveQueueEntry(DBTablePartBase *tpart, DBEntryBase *db_entry)
      33      517425 :         : tpart(tpart), db_entry(db_entry) {
      34      517425 :     }
      35             :     DBTablePartBase *tpart;
      36             :     DBEntryBase *db_entry;
      37             : };
      38             : 
      39             : class DBPartition::WorkQueue {
      40             : public:
      41             :     static const int kThreshold = 1024;
      42             :     typedef concurrent_queue<RequestQueueEntry *> RequestQueue;
      43             :     typedef concurrent_queue<RemoveQueueEntry *> RemoveQueue;
      44             :     typedef std::list<DBTablePartBase *> TablePartList;
      45             : 
      46      109318 :     explicit WorkQueue(DBPartition *partition, int partition_id)
      47      109318 :         : db_partition_(partition),
      48      109318 :           db_partition_id_(partition_id),
      49      109318 :           disable_(false),
      50      109318 :           running_(false) {
      51      109318 :         request_count_ = 0;
      52      109318 :         max_request_queue_len_ = 0;
      53      109318 :         total_request_count_ = 0;
      54      109318 :     }
      55      109254 :     ~WorkQueue() {
      56      109254 :         for (RequestQueue::iterator iter = request_queue_.unsafe_begin();
      57      109254 :              iter != request_queue_.unsafe_end();) {
      58           0 :             RequestQueueEntry *req_entry = *iter;
      59           0 :             ++iter;
      60           0 :             delete req_entry;
      61      109254 :         }
      62      109254 :         request_queue_.clear();
      63      109254 :     }
      64             : 
      65     1133538 :     bool EnqueueRequest(RequestQueueEntry *req_entry) {
      66     1133538 :         request_queue_.push(req_entry);
      67     1133607 :         MaybeStartRunner();
      68     1133625 :         uint32_t max = request_count_.fetch_add(1);
      69     1133625 :         if (max > max_request_queue_len_)
      70      152545 :             max_request_queue_len_ = max;
      71     1133625 :         total_request_count_++;
      72     1133625 :         return max < (kThreshold - 1);
      73             : 
      74             :     }
      75             : 
      76     1973072 :     bool DequeueRequest(RequestQueueEntry **req_entry) {
      77     1973072 :         bool success = request_queue_.try_pop(*req_entry);
      78     1972834 :         if (success) {
      79     1133620 :             request_count_.fetch_sub(1);
      80             :         }
      81     1972834 :         return success;
      82             :     }
      83             : 
      84      517532 :     void EnqueueRemove(RemoveQueueEntry *rm_entry) {
      85      517532 :         remove_queue_.push(rm_entry);
      86      517534 :         MaybeStartRunner();
      87      517584 :     }
      88             : 
      89     1361393 :     bool DequeueRemove(RemoveQueueEntry **rm_entry) {
      90     1361393 :         bool success = remove_queue_.try_pop(*rm_entry);
      91     1361144 :         return success;
      92             :     }
      93             : 
      94             :     void MaybeStartRunnerUnlocked();
      95             :     void MaybeStartRunner();
      96             :     bool RunnerDone();
      97             : 
      98             :     // Normally called from single task that either runs in DB context or is
      99             :     // exclusive with DB task, but can be called concurrently from multiple
     100             :     // bgp::ConfigHelper tasks.
     101     1678917 :     void SetActive(DBTablePartBase *tpart) {
     102     1678917 :         std::scoped_lock lock(mutex_);
     103     1679143 :         change_list_.push_back(tpart);
     104     1678793 :         MaybeStartRunnerUnlocked();
     105     1678909 :     }
     106             : 
     107     2518410 :     DBTablePartBase *GetActiveTable() {
     108     2518410 :         DBTablePartBase *tpart = NULL;
     109     2518410 :         if (!change_list_.empty()) {
     110     1679022 :             tpart = change_list_.front();
     111     1678984 :             change_list_.pop_front();
     112             :         }
     113     2518523 :         return tpart;
     114             :     }
     115             : 
     116      797293 :     int db_partition_id() {
     117      797293 :         return db_partition_id_;
     118             :     }
     119             : 
     120      797278 :     int db_task_id() const { return db_partition_->task_id(); }
     121             : 
     122     6256065 :     bool IsDBQueueEmpty() const {
     123     6256065 :         return (request_queue_.empty() && change_list_.empty());
     124             :     }
     125             : 
     126      847112 :     bool disable() { return disable_; }
     127          90 :     void set_disable(bool disable) { disable_ = disable; }
     128             : 
     129           0 :     long request_queue_len() const {
     130           0 :         return request_count_;
     131             :     }
     132             : 
     133           0 :     uint64_t total_request_count() const {
     134           0 :         return total_request_count_;
     135             :     }
     136             : 
     137           0 :     uint64_t max_request_queue_len() const {
     138           0 :         return max_request_queue_len_;
     139             :     }
     140             : 
     141             : private:
     142             :     DBPartition *db_partition_;
     143             :     RequestQueue request_queue_;
     144             :     TablePartList change_list_;
     145             :     std::atomic<long> request_count_;
     146             :     uint64_t total_request_count_;
     147             :     uint64_t max_request_queue_len_;
     148             :     RemoveQueue remove_queue_;
     149             :     std::mutex mutex_;
     150             :     int db_partition_id_;
     151             :     bool disable_;
     152             :     bool running_;
     153             : 
     154             :     DISALLOW_COPY_AND_ASSIGN(WorkQueue);
     155             : };
     156             : 
     157     6256065 : bool DBPartition::IsDBQueueEmpty() const {
     158     6256065 :     return work_queue_->IsDBQueueEmpty();
     159             : }
     160             : 
     161          90 : void DBPartition::SetQueueDisable(bool disable) {
     162          90 :     if (disable) {
     163          45 :         work_queue_->set_disable(true);
     164             :     } else {
     165          45 :         work_queue_->set_disable(false);
     166          45 :         work_queue_->MaybeStartRunner();
     167             :     }
     168          90 : }
     169             : 
     170             : class DBPartition::QueueRunner : public Task {
     171             : public:
     172             :     static const int kMaxIterations = 32;
     173      797305 :     QueueRunner(WorkQueue *queue)
     174      797305 :         : Task(queue->db_task_id(), queue->db_partition_id()),
     175      797305 :           queue_(queue) {
     176      797273 :     }
     177             : 
     178      847111 :     virtual bool Run() {
     179      847111 :         int count = 0;
     180             : 
     181             :         // Skip if the queue is disabled.
     182      847111 :         if (queue_->disable())
     183        1083 :             return queue_->RunnerDone();
     184             : 
     185      846031 :         RemoveQueueEntry *rm_entry = NULL;
     186     1361442 :         while (queue_->DequeueRemove(&rm_entry)) {
     187      517413 :             DBEntryBase *db_entry = rm_entry->db_entry;
     188             :             {
     189             :                 tbb::spin_rw_mutex::scoped_lock
     190      517413 :                     lock(rm_entry->tpart->dbstate_mutex(), false);
     191     1035047 :                 if (!db_entry->IsDeleted() || db_entry->is_onlist() ||
     192      517506 :                     !db_entry->is_state_empty_unlocked(rm_entry->tpart)) {
     193          41 :                     db_entry->ClearOnRemoveQ();
     194          41 :                     db_entry = NULL;
     195             :                 }
     196      517524 :             }
     197      517604 :             if (db_entry) {
     198      517563 :                 rm_entry->tpart->Remove(db_entry);
     199             :             }
     200      517567 :             delete rm_entry;
     201      517596 :             if (++count == kMaxIterations) {
     202        2185 :                 return false;
     203             :             }
     204             :         }
     205             : 
     206      843680 :         RequestQueueEntry *req_entry = NULL;
     207     1972967 :         while (queue_->DequeueRequest(&req_entry)) {
     208     1133596 :             req_entry->tpart->Process(req_entry->client, &req_entry->request);
     209     1133518 :             delete req_entry;
     210     1133589 :             if (++count == kMaxIterations) {
     211        4302 :                 return false;
     212             :             }
     213             :         }
     214             : 
     215             :         while (true) {
     216     2518411 :             DBTablePartBase *tpart = queue_->GetActiveTable();
     217     2518490 :             if (tpart == NULL) {
     218      839514 :                 break;
     219             :             }
     220     1678976 :             bool done = tpart->RunNotify();
     221     1679066 :             if (!done) {
     222          44 :                 return false;
     223             :             }
     224     1679022 :         }
     225             : 
     226             :         // Running is done only if queue_ is empty. It's possible that more
     227             :         // entries are added into in the input or remove queues during the
     228             :         // time we were processing those queues.
     229      839514 :         return queue_->RunnerDone();
     230             :     }
     231             : 
     232           0 :     std::string Description() const {
     233           0 :         return "DBPartition QueueRunner";
     234             :     }
     235             : private:
     236             :     WorkQueue *queue_;
     237             : };
     238             : 
     239     3329917 : void DBPartition::WorkQueue::MaybeStartRunnerUnlocked() {
     240     3329917 :     if (running_) {
     241     2532722 :         return;
     242             :     }
     243      797195 :     running_ = true;
     244      797195 :     QueueRunner *runner = new QueueRunner(this);
     245      797263 :     TaskScheduler *scheduler = TaskScheduler::GetInstance();
     246      797250 :     scheduler->Enqueue(runner);
     247             : }
     248             : 
     249     1651167 : void DBPartition::WorkQueue::MaybeStartRunner() {
     250     1651167 :     std::scoped_lock lock(mutex_);
     251     1651201 :     MaybeStartRunnerUnlocked();
     252     1651227 : }
     253             : 
     254      840594 : bool DBPartition::WorkQueue::RunnerDone() {
     255      840594 :     std::scoped_lock lock(mutex_);
     256      840653 :     if (disable_ || (request_queue_.empty() && remove_queue_.empty())) {
     257      797310 :         running_ = false;
     258      797310 :         return true;
     259             :     }
     260             : 
     261       43261 :     running_ = true;
     262       43261 :     return false;
     263      840571 : }
     264             : 
     265      109318 : DBPartition::DBPartition(DB *db, int partition_id)
     266      109318 :     : db_(db), work_queue_(new WorkQueue(this, partition_id)) {
     267      109318 : }
     268             : 
     269             : // The DBPartition destructor needs to be defined after WorkQueue has
     270             : // been declared.
     271      109254 : DBPartition::~DBPartition() {
     272      109254 : }
     273             : 
     274     1133580 : bool DBPartition::EnqueueRequest(DBTablePartBase *tpart, DBClient *client,
     275             :                                  DBRequest *req) {
     276     1133580 :     RequestQueueEntry *entry = new RequestQueueEntry(tpart, client, req);
     277     1133536 :     return work_queue_->EnqueueRequest(entry);
     278             : }
     279             : 
     280      517408 : void DBPartition::EnqueueRemove(DBTablePartBase *tpart, DBEntryBase *db_entry) {
     281      517408 :     RemoveQueueEntry *entry = new RemoveQueueEntry(tpart, db_entry);
     282      517420 :     db_entry->SetOnRemoveQ();
     283      517612 :     work_queue_->EnqueueRemove(entry);
     284      517574 : }
     285             : 
     286             : // concurrency: called from DBPartition task.
     287     1678958 : void DBPartition::OnTableChange(DBTablePartBase *tablepart) {
     288     1678958 :     work_queue_->SetActive(tablepart);
     289     1679121 : }
     290             : 
     291           0 : long DBPartition::request_queue_len() const {
     292           0 :     return work_queue_->request_queue_len();
     293             : }
     294             : 
     295           0 : uint64_t DBPartition::total_request_count() const {
     296           0 :     return work_queue_->total_request_count();
     297             : }
     298             : 
     299           0 : uint64_t DBPartition::max_request_queue_len() const {
     300           0 :     return work_queue_->max_request_queue_len();
     301             : }
     302             : 
     303      797275 : int DBPartition::task_id() const {
     304      797275 :     return db_->task_id();
     305             : }

Generated by: LCOV version 1.14