LCOV - code coverage report
Current view: top level - ksync - ksync_tx_queue.cc (source / functions) Hit Total Coverage
Test: OpenSDN C/C++ coverage (all TARGET_SET jobs) Lines: 0 122 0.0 %
Date: 2026-06-22 02:21:21 Functions: 0 12 0.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*
       2             :  * Copyright (c) 2015 Juniper Networks, Inc. All rights reserved.
       3             :  */
       4             : 
       5             : #ifndef _GNU_SOURCE
       6             : #define _GNU_SOURCE
       7             : #endif
       8             : #include <sched.h>
       9             : 
      10             : #include <unistd.h>
      11             : #include <stdlib.h>
      12             : #include <sys/eventfd.h>
      13             : 
      14             : #include <algorithm>
      15             : #include <vector>
      16             : #include <set>
      17             : #include <boost/algorithm/string/case_conv.hpp>
      18             : #include <boost/thread.hpp>
      19             : 
      20             : #include <tbb/concurrent_queue.h>
      21             : 
      22             : #include "ksync_object.h"
      23             : #include "ksync_sock.h"
      24             : 
      25             : static bool ksync_tx_queue_task_done_ = false;
      26             : 
      27             : // Set CPU affinity for KSync Tx Thread based on cpu_pin_policy.
      28             : // By default CPU affinity is not set. cpu_pin_policy can change it,
      29             : //    "last"  : Last CPU-ID
      30             : //    "<num>" : Specifies CPU-ID to pin
      31           0 : static void set_thread_affinity(std::string cpu_pin_policy) {
      32           0 :     int num_cores = boost::thread::hardware_concurrency();
      33           0 :     if (!num_cores) {
      34           0 :         LOG(ERROR, "Failure in checking number of available threads");
      35           0 :         num_cores = 1;
      36             :     }
      37           0 :     char *p = NULL;
      38           0 :     int cpu_id = strtoul(cpu_pin_policy.c_str(), &p, 0);
      39           0 :     if (*p || cpu_pin_policy.empty()) {
      40             :         // cpu_pin_policy is non-integer
      41             :         // Assume pinning disabled by default
      42           0 :         cpu_id = -1;
      43             :         // If policy is "last", pick last CPU-ID
      44           0 :         boost::algorithm::to_lower(cpu_pin_policy);
      45           0 :         if (cpu_pin_policy == "last") {
      46           0 :             cpu_id = num_cores - 1;
      47             :         }
      48             :     } else {
      49             :         // cpu_pin_policy is integer
      50             :         // Disable pinning if configured value out of range
      51           0 :         if (cpu_id >= num_cores)
      52           0 :             cpu_id = -1;
      53             :     }
      54             : 
      55           0 :     if (cpu_id >= 0) {
      56             :         cpu_set_t cpuset;
      57           0 :         CPU_ZERO(&cpuset);
      58           0 :         CPU_SET(cpu_id, &cpuset);
      59           0 :         LOG(ERROR, "KsyncTxQueue CPU pinning policy <" << cpu_pin_policy
      60             :             << ">. KsyncTxQueue pinned to CPU " << cpu_id);
      61           0 :         sched_setaffinity(0, sizeof(cpuset), &cpuset);
      62             :     } else {
      63           0 :         LOG(ERROR, "KsyncTxQueue CPU pinning policy <" << cpu_pin_policy
      64             :             << ">. KsyncTxQueuen not pinned to CPU");
      65             :     }
      66           0 : }
      67             : 
      68             : class KSyncTxQueueTask : public Task {
      69             : public:
      70           0 :     KSyncTxQueueTask(TaskScheduler *scheduler, KSyncTxQueue *queue) :
      71           0 :         Task(scheduler->GetTaskId("Ksync::KSyncTxQueue"), 0), queue_(queue) {
      72           0 :     }
      73           0 :     ~KSyncTxQueueTask() {
      74           0 :         ksync_tx_queue_task_done_ = true;
      75           0 :     }
      76             : 
      77           0 :     bool Run() {
      78           0 :         queue_->Run();
      79           0 :         return true;
      80             :     }
      81           0 :     std::string Description() const { return "KSyncTxQueue"; }
      82             : 
      83             : private:
      84             :     KSyncTxQueue *queue_;
      85             : };
      86             : 
      87           0 : KSyncTxQueue::KSyncTxQueue(KSyncSock *sock) :
      88           0 :     work_queue_(NULL),
      89           0 :     event_fd_(-1),
      90           0 :     cpu_pin_policy_(),
      91           0 :     sock_(sock),
      92           0 :     enqueues_(0),
      93           0 :     dequeues_(0),
      94           0 :     write_events_(0),
      95           0 :     read_events_(0),
      96           0 :     busy_time_(0),
      97           0 :     measure_busy_time_(false) {
      98           0 :     queue_len_ = 0;
      99           0 :     shutdown_ = false;
     100           0 :     ClearStats();
     101           0 : }
     102             : 
     103           0 : KSyncTxQueue::~KSyncTxQueue() {
     104           0 : }
     105             : 
     106           0 : void KSyncTxQueue::Init(bool use_work_queue,
     107             :                         const std::string &cpu_pin_policy) {
     108           0 :     cpu_pin_policy_ = cpu_pin_policy;
     109           0 :     TaskScheduler *scheduler = TaskScheduler::GetInstance();
     110           0 :     if (use_work_queue) {
     111           0 :         assert(work_queue_ == NULL);
     112           0 :         work_queue_ = new WorkQueue<IoContext *>
     113           0 :             (scheduler->GetTaskId("Ksync::AsyncSend"), 0,
     114           0 :              boost::bind(&KSyncSock::SendAsyncImpl, sock_, _1));
     115           0 :         work_queue_->SetExitCallback
     116           0 :             (boost::bind(&KSyncSock::OnEmptyQueue, sock_, _1));
     117           0 :         return;
     118             :     }
     119           0 :     assert((event_fd_ = eventfd(0, (EFD_CLOEXEC | EFD_SEMAPHORE))) >= 0);
     120             : 
     121           0 :     KSyncTxQueueTask *task = new KSyncTxQueueTask(scheduler, this);
     122           0 :     scheduler->Enqueue(task);
     123             : }
     124             : 
     125           0 : void KSyncTxQueue::Shutdown() {
     126           0 :     shutdown_ = true;
     127           0 :     if (work_queue_) {
     128           0 :         assert(work_queue_->Length() == 0);
     129           0 :         work_queue_->Shutdown();
     130           0 :         delete work_queue_;
     131           0 :         work_queue_ = NULL;
     132           0 :         return;
     133             :     }
     134             : 
     135           0 :     uint64_t u = 1;
     136           0 :     assert(write(event_fd_, &u, sizeof(u)) == sizeof(u));
     137           0 :     while (queue_len_ != 0) {
     138           0 :         usleep(1);
     139             :     }
     140             : 
     141           0 :     while(ksync_tx_queue_task_done_ != true) {
     142           0 :         usleep(1);
     143             :     }
     144           0 :     close(event_fd_);
     145             : }
     146             : 
     147           0 : bool KSyncTxQueue::EnqueueInternal(IoContext *io_context) {
     148           0 :     if (work_queue_) {
     149           0 :         work_queue_->Enqueue(io_context);
     150           0 :         return true;
     151             :     }
     152           0 :     queue_.push(io_context);
     153           0 :     enqueues_++;
     154           0 :     size_t ncount = queue_len_.fetch_add(1) + 1;
     155           0 :     if (ncount > max_queue_len_)
     156           0 :         max_queue_len_ = ncount;
     157           0 :     if (ncount == 1) {
     158           0 :         uint64_t u = 1;
     159           0 :         int res = 0;
     160           0 :         while ((res = write(event_fd_, &u, sizeof(u))) < (int)sizeof(u)) {
     161           0 :             int ec = errno;
     162           0 :             if (ec != EINTR && ec != EIO) {
     163           0 :                 LOG(ERROR, "KsyncTxQueue write failure : " << ec << " : "
     164             :                     << strerror(ec));
     165           0 :                 assert(0);
     166             :             }
     167             :         }
     168             : 
     169           0 :         write_events_++;
     170             :     }
     171           0 :     return true;
     172             : }
     173             : 
     174           0 : bool KSyncTxQueue::Run() {
     175           0 :     set_thread_affinity(cpu_pin_policy_);
     176             :     while (1) {
     177             :         while (1) {
     178           0 :             uint64_t u = 0;
     179           0 :             ssize_t num = read(event_fd_, &u, sizeof(u));
     180           0 :             if (num >= (int)sizeof(u)) {
     181           0 :                 break;
     182             :             }
     183           0 :             if (errno != EINTR && errno != EIO) {
     184           0 :                 LOG(ERROR, "KsyncTxQueue read failure : " << errno << " : "
     185             :                     << strerror(errno));
     186           0 :                 assert(0);
     187             :             }
     188           0 :         }
     189           0 :         read_events_++;
     190             : 
     191           0 :         uint64_t t1 = 0;
     192           0 :         if (measure_busy_time_)
     193           0 :             t1 = ClockMonotonicUsec();
     194           0 :         IoContext *io_context = NULL;
     195           0 :         while (queue_.try_pop(io_context)) {
     196           0 :             dequeues_++;
     197           0 :             queue_len_ -= 1;
     198           0 :             sock_->SendAsyncImpl(io_context);
     199             :         }
     200           0 :         sock_->OnEmptyQueue(false);
     201           0 :         if (shutdown_) {
     202           0 :             break;
     203             :         }
     204             : 
     205           0 :         if (t1)
     206           0 :             busy_time_ += (ClockMonotonicUsec() - t1);
     207           0 :     }
     208           0 :     return true;
     209             : }

Generated by: LCOV version 1.14