LCOV - code coverage report
Current view: top level - vnsw/agent/pkt - flow_event.cc (source / functions) Hit Total Coverage
Test: OpenSDN C/C++ coverage (all TARGET_SET jobs) Lines: 163 185 88.1 %
Date: 2026-06-04 02:06:09 Functions: 23 28 82.1 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*
       2             :  * Copyright (c) 2016 Juniper Networks, Inc. All rights reserved.
       3             :  */
       4             : #include <base/address_util.h>
       5             : #include <boost/functional/hash.hpp>
       6             : #include <init/agent_param.h>
       7             : #include <cmn/agent_stats.h>
       8             : #include <oper/agent_profile.h>
       9             : #include <vrouter/ksync/flowtable_ksync.h>
      10             : #include <vrouter/ksync/ksync_init.h>
      11             : #include <vrouter/ksync/ksync_flow_index_manager.h>
      12             : #include "vrouter/flow_stats/flow_stats_collector.h"
      13             : #include "flow_proto.h"
      14             : #include "flow_mgmt.h"
      15             : #include "flow_event.h"
      16             : 
      17             : //////////////////////////////////////////////////////////////////////////////
      18             : // FlowEventQueue routines
      19             : //////////////////////////////////////////////////////////////////////////////
      20          10 : FlowEventQueueBase::FlowEventQueueBase(FlowProto *proto,
      21             :                                        const std::string &name,
      22             :                                        uint32_t task_id, int task_instance,
      23             :                                        FlowTokenPool *pool,
      24             :                                        uint16_t latency_limit,
      25          10 :                                        uint32_t max_iterations) :
      26          10 :     flow_proto_(proto), token_pool_(pool), task_start_(0), count_(0),
      27          10 :     events_processed_(0), latency_limit_(latency_limit) {
      28          20 :     queue_ = new Queue(task_id, task_instance,
      29             :                        boost::bind(&FlowEventQueueBase::Handler, this, _1),
      30          10 :                        Queue::kMaxSize, max_iterations);
      31             :     char buff[100];
      32          10 :     sprintf(buff, "%s-%d", name.c_str(), task_instance);
      33          10 :     queue_->set_name(buff);
      34          10 :     if (token_pool_)
      35           8 :         queue_->SetStartRunnerFunc(boost::bind(&FlowEventQueueBase::TokenCheck,
      36             :                                                this));
      37          10 :     queue_->set_measure_busy_time(proto->agent()->MeasureQueueDelay());
      38          10 :     if (latency_limit_) {
      39           0 :         queue_->SetEntryCallback(boost::bind(&FlowEventQueueBase::TaskEntry,
      40             :                                              this));
      41           0 :         queue_->SetExitCallback(boost::bind(&FlowEventQueueBase::TaskExit,
      42             :                                             this, _1));
      43             :     }
      44          10 : }
      45             : 
      46          10 : FlowEventQueueBase::~FlowEventQueueBase() {
      47          10 :     delete queue_;
      48          10 : }
      49             : 
      50          10 : void FlowEventQueueBase::Shutdown() {
      51          10 :     queue_->Shutdown();
      52          10 : }
      53             : 
      54        1046 : void FlowEventQueueBase::Enqueue(FlowEvent *event) {
      55        1046 :     if (CanEnqueue(event) == false) {
      56          47 :         delete event;
      57          47 :         return;
      58             :     }
      59         999 :     queue_->Enqueue(event);
      60             : }
      61             : 
      62         286 : bool FlowEventQueueBase::TokenCheck() {
      63         286 :     return flow_proto_->TokenCheck(token_pool_);
      64             : }
      65             : 
      66           0 : bool FlowEventQueueBase::TaskEntry() {
      67           0 :     count_ = 0;
      68           0 :     task_start_ = ClockMonotonicUsec();
      69           0 :     getrusage(RUSAGE_THREAD, &rusage_);
      70           0 :     return true;
      71             : }
      72             : 
      73           0 : void FlowEventQueueBase::TaskExit(bool done) {
      74           0 :     if (task_start_ == 0)
      75           0 :         return;
      76             : 
      77           0 :     uint64_t t = ClockMonotonicUsec();
      78           0 :     if (((t - task_start_) / 1000) >= latency_limit_) {
      79             :         struct rusage r;
      80           0 :         getrusage(RUSAGE_THREAD, &r);
      81             : 
      82           0 :         uint32_t user = (r.ru_utime.tv_sec - rusage_.ru_utime.tv_sec) * 1000;
      83           0 :         user += ((r.ru_utime.tv_usec - rusage_.ru_utime.tv_usec) / 1000);
      84             : 
      85           0 :         uint32_t sys = (r.ru_stime.tv_sec - rusage_.ru_stime.tv_sec) * 1000;
      86           0 :         sys += ((r.ru_stime.tv_usec - rusage_.ru_stime.tv_usec) / 1000);
      87             : 
      88           0 :         LOG(ERROR, queue_->Description()
      89             :             << " Time exceeded " << ((t - task_start_) / 1000)
      90             :             << " Count " << count_
      91             :             << " User " << user << " Sys " << sys);
      92             :     }
      93           0 :     return;
      94             : }
      95             : 
      96         998 : bool FlowEventQueueBase::Handler(FlowEvent *event) {
      97         998 :     std::unique_ptr<FlowEvent> event_ptr(event);
      98         997 :     count_++;
      99         997 :     if (CanProcess(event) == false) {
     100          26 :         ProcessDone(event, false);
     101          26 :         return true;
     102             :     }
     103             : 
     104         972 :     HandleEvent(event);
     105             : 
     106         973 :     ProcessDone(event, true);
     107         973 :     return true;
     108         999 : }
     109             : 
     110        1046 : bool FlowEventQueueBase::CanEnqueue(FlowEvent *event) {
     111        1046 :     FlowEntry *flow = event->flow();
     112        1046 :     bool ret = true;
     113        1046 :     switch (event->event()) {
     114             : 
     115          58 :     case FlowEvent::DELETE_DBENTRY:
     116             :     case FlowEvent::DELETE_FLOW: {
     117          58 :         std::scoped_lock mutext(flow->mutex());
     118          58 :         ret = flow->GetPendingAction()->SetDelete();
     119          58 :         break;
     120          58 :     }
     121             : 
     122             :     // lock already token for the flow
     123          27 :     case FlowEvent::FLOW_MESSAGE: {
     124          27 :         ret = flow->GetPendingAction()->SetRecompute();
     125          27 :         break;
     126             :     }
     127             : 
     128          71 :     case FlowEvent::RECOMPUTE_FLOW: {
     129          71 :         std::scoped_lock mutext(flow->mutex());
     130          71 :         ret = flow->GetPendingAction()->SetRecomputeDBEntry();
     131          71 :         break;
     132          71 :     }
     133             : 
     134          15 :     case FlowEvent::REVALUATE_DBENTRY: {
     135          15 :         std::scoped_lock mutext(flow->mutex());
     136          15 :         ret = flow->GetPendingAction()->SetRevaluate();
     137          15 :         break;
     138          15 :     }
     139             : 
     140         875 :     default:
     141         875 :         break;
     142             :     }
     143             : 
     144        1046 :     return ret;
     145             : }
     146             : 
     147         997 : bool FlowEventQueueBase::CanProcess(FlowEvent *event) {
     148         997 :     FlowEntry *flow = event->flow();
     149         997 :     bool ret = true;
     150         997 :     switch (event->event()) {
     151             : 
     152          37 :     case FlowEvent::DELETE_DBENTRY:
     153             :     case FlowEvent::DELETE_FLOW: {
     154          37 :         std::scoped_lock mutext(flow->mutex());
     155          37 :         events_processed_++;
     156          37 :         ret = flow->GetPendingAction()->CanDelete();
     157          37 :         break;
     158          37 :     }
     159             : 
     160          25 :     case FlowEvent::FLOW_MESSAGE: {
     161          25 :         std::scoped_lock mutext(flow->mutex());
     162          25 :         events_processed_++;
     163          25 :         ret = flow->GetPendingAction()->CanRecompute();
     164          25 :         break;
     165          25 :     }
     166             : 
     167          59 :     case FlowEvent::RECOMPUTE_FLOW: {
     168          59 :         std::scoped_lock mutext(flow->mutex());
     169          59 :         events_processed_++;
     170          59 :         ret = flow->GetPendingAction()->CanRecomputeDBEntry();
     171          59 :         break;
     172          59 :     }
     173             : 
     174           3 :     case FlowEvent::REVALUATE_DBENTRY: {
     175           3 :         events_processed_++;
     176           3 :         std::scoped_lock mutext(flow->mutex());
     177           3 :         ret = flow->GetPendingAction()->CanRevaluate();
     178           3 :         break;
     179           3 :     }
     180             : 
     181         874 :     default:
     182         874 :         break;
     183             :     }
     184             : 
     185         998 :     return ret;
     186             : }
     187             : 
     188         999 : void FlowEventQueueBase::ProcessDone(FlowEvent *event, bool update_rev_flow) {
     189         999 :     FlowEntry *flow = event->flow();
     190         999 :     FlowEntry *rflow = NULL;
     191         999 :     if (flow && update_rev_flow)
     192         142 :         rflow = flow->reverse_flow_entry();
     193             : 
     194         999 :     switch (event->event()) {
     195             : 
     196          37 :     case FlowEvent::DELETE_DBENTRY:
     197             :     case FlowEvent::DELETE_FLOW: {
     198          74 :         FLOW_LOCK(flow, rflow, event->event());
     199          37 :         flow->GetPendingAction()->ResetDelete();
     200          37 :         if (rflow)
     201           0 :             rflow->GetPendingAction()->ResetDelete();
     202          37 :         break;
     203          37 :     }
     204             : 
     205          25 :     case FlowEvent::FLOW_MESSAGE: {
     206          25 :         FLOW_LOCK(flow, rflow, event->event());
     207          25 :         flow->GetPendingAction()->ResetRecompute();
     208          25 :         if (rflow)
     209          25 :             rflow->GetPendingAction()->ResetRecompute();
     210          25 :         break;
     211          25 :     }
     212             : 
     213          59 :     case FlowEvent::RECOMPUTE_FLOW: {
     214          59 :         std::scoped_lock mutext(flow->mutex());
     215          59 :         flow->GetPendingAction()->ResetRecomputeDBEntry();
     216          59 :         break;
     217          59 :     }
     218             : 
     219           3 :     case FlowEvent::REVALUATE_DBENTRY: {
     220           3 :         FLOW_LOCK(flow, rflow, event->event());
     221           3 :         flow->GetPendingAction()->ResetRevaluate();
     222           3 :         if (rflow)
     223           3 :             rflow->GetPendingAction()->ResetRevaluate();
     224           3 :         break;
     225           3 :     }
     226             : 
     227         875 :     default:
     228         875 :         break;
     229             :     }
     230             : 
     231         999 :     return;
     232             : }
     233             : 
     234           4 : FlowEventQueue::FlowEventQueue(Agent *agent, FlowProto *proto,
     235             :                                FlowTable *table, FlowTokenPool *pool,
     236             :                                uint16_t latency_limit,
     237           4 :                                uint32_t max_iterations) :
     238             :     FlowEventQueueBase(proto, "Flow Event Queue",
     239           8 :                        agent->task_scheduler()->GetTaskId(kTaskFlowEvent),
     240           4 :                        table->table_index(), pool, latency_limit,
     241             :                        max_iterations),
     242           8 :     flow_table_(table) {
     243           4 : }
     244             : 
     245           8 : FlowEventQueue::~FlowEventQueue() {
     246           8 : }
     247             : 
     248         756 : bool FlowEventQueue::HandleEvent(FlowEvent *event) {
     249         756 :     return flow_proto_->FlowEventHandler(event, flow_table_);
     250             : }
     251             : 
     252           2 : DeleteFlowEventQueue::DeleteFlowEventQueue(Agent *agent, FlowProto *proto,
     253             :                                            FlowTable *table,
     254             :                                            FlowTokenPool *pool,
     255             :                                            uint16_t latency_limit,
     256           2 :                                            uint32_t max_iterations) :
     257             :     FlowEventQueueBase(proto, "Flow Delete Queue",
     258           4 :                        agent->task_scheduler()->GetTaskId(kTaskFlowDelete),
     259           2 :                        table->table_index(), pool, latency_limit,
     260             :                        max_iterations),
     261           4 :     flow_table_(table) {
     262           2 : }
     263             : 
     264           4 : DeleteFlowEventQueue::~DeleteFlowEventQueue() {
     265           4 : }
     266             : 
     267           0 : bool DeleteFlowEventQueue::HandleEvent(FlowEvent *event) {
     268           0 :     return flow_proto_->FlowDeleteHandler(event, flow_table_);
     269             : }
     270             : 
     271           2 : KSyncFlowEventQueue::KSyncFlowEventQueue(Agent *agent, FlowProto *proto,
     272             :                                          FlowTable *table,
     273             :                                          FlowTokenPool *pool,
     274             :                                          uint16_t latency_limit,
     275           2 :                                          uint32_t max_iterations) :
     276             :     FlowEventQueueBase(proto, "Flow KSync Queue",
     277           4 :                        agent->task_scheduler()->GetTaskId(kTaskFlowKSync),
     278           2 :                        table->table_index(), pool, latency_limit,
     279             :                        max_iterations),
     280           4 :     flow_table_(table) {
     281           2 : }
     282             : 
     283           4 : KSyncFlowEventQueue::~KSyncFlowEventQueue() {
     284           4 : }
     285             : 
     286         144 : bool KSyncFlowEventQueue::HandleEvent(FlowEvent *event) {
     287         144 :     return flow_proto_->FlowKSyncMsgHandler(event, flow_table_);
     288             : }
     289             : 
     290           2 : UpdateFlowEventQueue::UpdateFlowEventQueue(Agent *agent, FlowProto *proto,
     291             :                                            FlowTokenPool *pool,
     292             :                                            uint16_t latency_limit,
     293           2 :                                            uint32_t max_iterations) :
     294             :     FlowEventQueueBase(proto, "Flow Update Queue",
     295           4 :                        agent->task_scheduler()->GetTaskId(kTaskFlowUpdate), 0,
     296           4 :                        pool, latency_limit, max_iterations) {
     297           2 : }
     298             : 
     299           2 : UpdateFlowEventQueue::~UpdateFlowEventQueue() {
     300           2 : }
     301             : 
     302          73 : bool UpdateFlowEventQueue::HandleEvent(FlowEvent *event) {
     303          73 :     return flow_proto_->FlowUpdateHandler(event);
     304             : }

Generated by: LCOV version 1.14