LCOV - code coverage report
Current view: top level - vnsw/agent/pkt - flow_proto.cc (source / functions) Hit Total Coverage
Test: OpenSDN C/C++ coverage (all TARGET_SET jobs) Lines: 261 543 48.1 %
Date: 2026-06-11 01:56:02 Functions: 29 51 56.9 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*
       2             :  * Copyright (c) 2013 Juniper Networks, Inc. All rights reserved.
       3             :  */
       4             : #include <base/address_util.h>
       5             : #include <boost/functional/hash.hpp>
       6             : #include <init/agent_param.h>
       7             : #include <cmn/agent_stats.h>
       8             : #include <oper/agent_profile.h>
       9             : #include <vrouter/ksync/flowtable_ksync.h>
      10             : #include <vrouter/ksync/ksync_init.h>
      11             : #include <vrouter/ksync/ksync_flow_index_manager.h>
      12             : #include "vrouter/flow_stats/flow_stats_collector.h"
      13             : #include "flow_proto.h"
      14             : #include "flow_mgmt/flow_mgmt_dbclient.h"
      15             : #include "flow_mgmt.h"
      16             : #include "flow_event.h"
      17             : #include <strings.h>
      18             : #include "flow_entry.h"
      19             : 
      20             : static void UpdateStats(FlowEvent *event, FlowStats *stats);
      21             : 
      22           2 : FlowProto::FlowProto(Agent *agent, boost::asio::io_context &io) :
      23             :     Proto(agent, kTaskFlowEvent, PktHandler::FLOW, io),
      24           2 :     add_tokens_("Add Tokens", this, agent->flow_add_tokens()),
      25           2 :     ksync_tokens_("KSync` Tokens", this, agent->flow_ksync_tokens()),
      26           2 :     del_tokens_("Delete Tokens", this, agent->flow_del_tokens()),
      27           2 :     update_tokens_("Update Tokens", this, agent->flow_update_tokens()),
      28           2 :     flow_update_queue_(agent, this, &update_tokens_,
      29           2 :                        agent->params()->flow_task_latency_limit(), 16),
      30           2 :     use_vrouter_hash_(false), ipv4_trace_filter_(), ipv6_trace_filter_(),
      31           2 :     stats_(),
      32           2 :     port_table_manager_(agent, agent->params()->fabric_snat_hash_table_size()),
      33           2 :     stats_update_timer_(TimerManager::CreateTimer
      34           2 :         (*(agent->event_manager())->io_service(), "FlowStatsUpdateTimer",
      35           4 :          TaskScheduler::GetInstance()->GetTaskId(kTaskFlowStatsUpdate), 0)) {
      36           2 :     linklocal_flow_count_ = 0;
      37           2 :     agent->SetFlowProto(this);
      38           2 :     set_trace(false);
      39           2 :     uint16_t table_count = agent->flow_thread_count();
      40           2 :     assert(table_count >= kMinTableCount && table_count <= kMaxTableCount);
      41           4 :     for (uint8_t i = 0; i < table_count; i++) {
      42           2 :         flow_table_list_.push_back(new FlowTable(agent_, i));
      43             :     }
      44             : 
      45           4 :     for (uint32_t i = 0; i < table_count; i++) {
      46           2 :         uint16_t latency = agent->params()->flow_task_latency_limit();
      47             :         flow_event_queue_.push_back
      48           4 :             (new FlowEventQueue(agent, this, flow_table_list_[i],
      49             :                                 &add_tokens_, latency,
      50           2 :                                 FlowEventQueue::Queue::kMaxIterations));
      51             : 
      52             :         flow_tokenless_queue_.push_back
      53           4 :             (new FlowEventQueue(agent, this, flow_table_list_[i],
      54             :                                 NULL, latency,
      55           2 :                                 2 * FlowEventQueue::Queue::kMaxIterations));
      56             : 
      57             :         flow_delete_queue_.push_back
      58           4 :             (new DeleteFlowEventQueue(agent, this, flow_table_list_[i],
      59             :                                       &del_tokens_, latency,
      60           2 :                                       FlowEventQueue::Queue::kMaxIterations));
      61             : 
      62             :         flow_ksync_queue_.push_back
      63           4 :             (new KSyncFlowEventQueue(agent, this, flow_table_list_[i],
      64             :                                      &ksync_tokens_, latency,
      65           2 :                                      FlowEventQueue::Queue::kMaxIterations));
      66             :     }
      67           2 :     if (::getenv("USE_VROUTER_HASH") != NULL) {
      68           0 :         string opt = ::getenv("USE_VROUTER_HASH");
      69           0 :         if (opt == "" || strcasecmp(opt.c_str(), "false"))
      70           0 :             use_vrouter_hash_ = false;
      71             :         else
      72           0 :             use_vrouter_hash_ = true;
      73           0 :     }
      74           2 : }
      75             : 
      76           4 : FlowProto::~FlowProto() {
      77           2 :     STLDeleteValues(&flow_event_queue_);
      78           2 :     STLDeleteValues(&flow_tokenless_queue_);
      79           2 :     STLDeleteValues(&flow_delete_queue_);
      80           2 :     STLDeleteValues(&flow_ksync_queue_);
      81           2 :     STLDeleteValues(&flow_table_list_);
      82           4 : }
      83             : 
      84           2 : void FlowProto::Init() {
      85           2 :     agent_->stats()->RegisterFlowCountFn(boost::bind(&FlowProto::FlowCount,
      86             :                                                      this));
      87           4 :     for (uint16_t i = 0; i < flow_table_list_.size(); i++) {
      88           2 :         flow_table_list_[i]->Init();
      89             :     }
      90             : 
      91           2 :     AgentProfile *profile = agent_->oper_db()->agent_profile();
      92           2 :     profile->RegisterPktFlowStatsCb(boost::bind(&FlowProto::SetProfileData,
      93             :                                                 this, _1));
      94             : 
      95           2 :     ipv4_trace_filter_.Init(agent_->flow_trace_enable(), Address::INET);
      96           2 :     ipv6_trace_filter_.Init(agent_->flow_trace_enable(), Address::INET6);
      97           2 : }
      98             : 
      99           2 : void FlowProto::InitDone() {
     100           4 :     for (uint16_t i = 0; i < flow_table_list_.size(); i++) {
     101           2 :         flow_table_list_[i]->InitDone();
     102             :     }
     103           2 :     stats_update_timer_->Start(agent_->stats()->flow_stats_update_timeout(),
     104             :         boost::bind(&FlowProto::FlowStatsUpdate, this));
     105           2 : }
     106             : 
     107           2 : void FlowProto::Shutdown() {
     108           4 :     for (uint16_t i = 0; i < flow_table_list_.size(); i++) {
     109           2 :         flow_table_list_[i]->Shutdown();
     110             :     }
     111           4 :     for (uint32_t i = 0; i < flow_event_queue_.size(); i++) {
     112           2 :         flow_event_queue_[i]->Shutdown();
     113           2 :         flow_tokenless_queue_[i]->Shutdown();
     114           2 :         flow_delete_queue_[i]->Shutdown();
     115           2 :         flow_ksync_queue_[i]->Shutdown();
     116             :     }
     117           2 :     flow_update_queue_.Shutdown();
     118           2 :     if (stats_update_timer_) {
     119           2 :         stats_update_timer_->Cancel();
     120           2 :         TimerManager::DeleteTimer(stats_update_timer_);
     121             :     }
     122           2 : }
     123             : 
     124         220 : static std::size_t HashCombine(std::size_t hash, uint64_t val) {
     125         220 :     boost::hash_combine(hash, val);
     126         220 :     return hash;
     127             : }
     128             : 
     129          88 : static std::size_t HashIp(std::size_t hash, const IpAddress &ip) {
     130          88 :     if (ip.is_v6()) {
     131             :         uint64_t val[2];
     132           0 :         Ip6AddressToU64Array(ip.to_v6(), val, 2);
     133           0 :         hash = HashCombine(hash, val[0]);
     134           0 :         hash = HashCombine(hash, val[1]);
     135          88 :     } else if (ip.is_v4()) {
     136          88 :         hash = HashCombine(hash, ip.to_v4().to_ulong());
     137             :     } else {
     138           0 :         assert(0);
     139             :     }
     140          88 :     return hash;
     141             : }
     142             : 
     143             : // Get the thread to be used for the flow. We *try* to map forward and reverse
     144             : // flow to same thread with following,
     145             : //  if (sip < dip)
     146             : //      ip1 = sip
     147             : //      ip2 = dip
     148             : //  else
     149             : //      ip1 = dip
     150             : //      ip2 = sip
     151             : //  if (sport < dport)
     152             : //      port1 = sport
     153             : //      port2 = dport
     154             : //  else
     155             : //      port1 = dport
     156             : //      port2 = sport
     157             : //  field5 = proto
     158             : //  hash = HASH(ip1, ip2, port1, port2, proto)
     159             : //
     160             : // The algorithm above cannot ensure NAT flows belong to same thread.
     161          44 : uint16_t FlowProto::FlowTableIndex(const IpAddress &sip, const IpAddress &dip,
     162             :                                    uint8_t proto, uint16_t sport,
     163             :                                    uint16_t dport, uint32_t flow_handle) const {
     164          44 :     if (use_vrouter_hash_) {
     165           0 :         return (flow_handle/flow_table_list_.size()) % flow_table_list_.size();
     166             :     }
     167             : 
     168          44 :     std::size_t hash = 0;
     169          44 :     if (sip < dip) {
     170          22 :         hash = HashIp(hash, sip);
     171          22 :         hash = HashIp(hash, dip);
     172             :     } else {
     173          22 :         hash = HashIp(hash, dip);
     174          22 :         hash = HashIp(hash, sip);
     175             :     }
     176             : 
     177          44 :     if (sport < dport) {
     178          32 :         hash = HashCombine(hash, sport);
     179          32 :         hash = HashCombine(hash, dport);
     180             :     } else {
     181          12 :         hash = HashCombine(hash, dport);
     182          12 :         hash = HashCombine(hash, sport);
     183             :     }
     184          44 :     hash = HashCombine(hash, proto);
     185          44 :     return (hash % (flow_event_queue_.size()));
     186             : }
     187             : 
     188          22 : FlowHandler *FlowProto::AllocProtoHandler(PktInfoPtr info,
     189             :                                           boost::asio::io_context &io) {
     190          22 :     uint32_t index = FlowTableIndex(info->ip_saddr, info->ip_daddr,
     191          22 :                                     info->ip_proto, info->sport, info->dport,
     192          22 :                                     info->agent_hdr.cmd_param);
     193          22 :     return new FlowHandler(agent(), info, io, this, index);
     194             : }
     195             : 
     196          22 : bool FlowProto::Validate(PktInfo *msg) {
     197          22 :     if (msg->ip == NULL && msg->ip6 == NULL && msg->type != PktType::MESSAGE) {
     198           0 :         if (msg->family == Address::INET || msg->family == Address::INET6) {
     199           0 :             FLOW_TRACE(DetailErr, msg->agent_hdr.cmd_param,
     200             :                        msg->agent_hdr.ifindex,
     201             :                        msg->agent_hdr.vrf,
     202             :                        msg->ip_saddr.to_string(),
     203             :                        msg->ip_daddr.to_string(),
     204             :                        "Flow : Non-IP packet. Dropping", false);
     205             :         } else {
     206           0 :             assert(0);
     207             :         }
     208           0 :         return false;
     209             :     }
     210          22 :     return true;
     211             : }
     212             : 
     213           0 : FlowTable *FlowProto::GetFlowTable(const FlowKey &key,
     214             :                                    uint32_t flow_handle) const {
     215           0 :     uint32_t index = FlowTableIndex(key.src_addr, key.dst_addr, key.protocol,
     216           0 :                                     key.src_port, key.dst_port, flow_handle);
     217           0 :     return flow_table_list_[index];
     218             : }
     219             : 
     220          22 : bool FlowProto::Enqueue(PktInfoPtr msg) {
     221          22 :     if (Validate(msg.get()) == false) {
     222           0 :         return true;
     223             :     }
     224             : 
     225          22 :     FreeBuffer(msg.get());
     226          22 :     EnqueueFlowEvent(new FlowEvent(FlowEvent::VROUTER_FLOW_MSG, msg, NULL, 0));
     227          22 :     return true;
     228             : }
     229             : 
     230           0 : void FlowProto::DisableFlowEventQueue(uint32_t index, bool disabled) {
     231           0 :     flow_event_queue_[index]->set_disable(disabled);
     232           0 :     flow_tokenless_queue_[index]->set_disable(disabled);
     233           0 :     flow_delete_queue_[index]->set_disable(disabled);
     234           0 : }
     235             : 
     236           0 : void FlowProto::DisableFlowUpdateQueue(bool disabled) {
     237           0 :     flow_update_queue_.set_disable(disabled);
     238           0 : }
     239             : 
     240           0 : void FlowProto::DisableFlowKSyncQueue(uint32_t index, bool disabled) {
     241           0 :     flow_ksync_queue_[index]->set_disable(disabled);
     242           0 : }
     243             : 
     244           0 : size_t FlowProto::FlowUpdateQueueLength() {
     245           0 :     return flow_update_queue_.Length();
     246             : }
     247             : 
     248           0 : void FlowProto::DisableFlowDeleteQueue(uint32_t index, bool disabled) {
     249           0 :     flow_delete_queue_[index]->set_disable(disabled);
     250           0 : }
     251             : 
     252             : /////////////////////////////////////////////////////////////////////////////
     253             : // FlowTable related routines
     254             : /////////////////////////////////////////////////////////////////////////////
     255           2 : void FlowProto::FlushFlows() {
     256           4 :     for (uint16_t i = 0; i < flow_table_list_.size(); i++) {
     257           2 :         flow_table_list_[i]->DeleteAll();
     258             :     }
     259           2 : }
     260             : 
     261          91 : FlowTable *FlowProto::GetTable(uint16_t index) const {
     262          91 :     return flow_table_list_[index];
     263             : }
     264             : 
     265          12 : uint32_t FlowProto::FlowCount() const {
     266          12 :     uint32_t count = 0;
     267          24 :     for (uint16_t i = 0; i < flow_table_list_.size(); i++) {
     268          12 :         count += flow_table_list_[i]->Size();
     269             :     }
     270          12 :     return count;
     271             : }
     272             : 
     273           0 : void FlowProto::VnFlowCounters(const VnEntry *vn, uint32_t *in_count,
     274             :                                uint32_t *out_count) {
     275           0 :     *in_count = 0;
     276           0 :     *out_count = 0;
     277           0 :     if (vn == NULL)
     278           0 :         return;
     279             : 
     280             :     std::vector<FlowMgmtManager *>::const_iterator it =
     281           0 :         agent_->pkt()->flow_mgmt_manager_iterator_begin();
     282           0 :     while (it != agent_->pkt()->flow_mgmt_manager_iterator_end()) {
     283           0 :         (*it)->VnFlowCounters(vn, in_count, out_count);
     284           0 :         it++;
     285             :     }
     286             : }
     287             : 
     288          42 : FlowEntry *FlowProto::Find(const FlowKey &key, uint32_t table_index) const {
     289          42 :     return GetTable(table_index)->Find(key);
     290             : }
     291             : 
     292          22 : bool FlowProto::AddFlow(FlowEntry *flow) {
     293          22 :     FlowTable *table = flow->flow_table();
     294          22 :     table->Add(flow, flow->reverse_flow_entry());
     295          22 :     return true;
     296             : }
     297             : 
     298          25 : bool FlowProto::UpdateFlow(FlowEntry *flow) {
     299          25 :     FlowTable *table = flow->flow_table();
     300          25 :     table->Update(flow, flow->reverse_flow_entry());
     301          25 :     return true;
     302             : }
     303             : 
     304             : /////////////////////////////////////////////////////////////////////////////
     305             : // Flow Control Event routines
     306             : /////////////////////////////////////////////////////////////////////////////
     307         694 : void FlowProto::EnqueueFlowEvent(FlowEvent *event) {
     308         694 :     FlowEventQueueBase *queue = NULL;
     309         694 :     switch (event->event()) {
     310          22 :     case FlowEvent::VROUTER_FLOW_MSG: {
     311          22 :         PktInfo *info = event->pkt_info().get();
     312          44 :         uint32_t index = FlowTableIndex(info->ip_saddr, info->ip_daddr,
     313          22 :                                         info->ip_proto, info->sport,
     314          22 :                                         info->dport,
     315          22 :                                         info->agent_hdr.cmd_param);
     316          22 :         queue = flow_event_queue_[index];
     317          22 :         break;
     318             :     }
     319             : 
     320          27 :     case FlowEvent::FLOW_MESSAGE: {
     321          27 :         FlowEntry *flow = event->flow();
     322          27 :         FlowTable *table = flow->flow_table();
     323          27 :         queue = flow_event_queue_[table->table_index()];
     324          27 :         break;
     325             :     }
     326             : 
     327           0 :     case FlowEvent::EVICT_FLOW: {
     328           0 :         FlowEntry *flow = event->flow();
     329           0 :         FlowTable *table = flow->flow_table();
     330           0 :         queue = flow_ksync_queue_[table->table_index()];
     331           0 :         break;
     332             :     }
     333             : 
     334          44 :     case FlowEvent::FREE_FLOW_REF: {
     335          44 :         FlowEntry *flow = event->flow();
     336          44 :         FlowTable *table = flow->flow_table();
     337          44 :         queue = flow_tokenless_queue_[table->table_index()];
     338          44 :         break;
     339             :     }
     340             : 
     341           0 :     case FlowEvent::AUDIT_FLOW: {
     342           0 :         FlowTable *table = GetFlowTable(event->get_flow_key(),
     343             :                                         event->flow_handle());
     344           0 :         queue = flow_event_queue_[table->table_index()];
     345           0 :         break;
     346             :     }
     347             : 
     348           0 :     case FlowEvent::GROW_FREE_LIST: {
     349           0 :         queue = flow_tokenless_queue_[event->table_index()];
     350           0 :         break;
     351             :     }
     352             : 
     353         144 :     case FlowEvent::KSYNC_EVENT: {
     354         144 :         FlowEventKSync *ksync_event = static_cast<FlowEventKSync *>(event);
     355             :         FlowTableKSyncEntry *ksync_entry =
     356         144 :             (static_cast<FlowTableKSyncEntry *> (ksync_event->ksync_entry()));
     357         144 :         FlowEntry *flow = ksync_entry->flow_entry().get();
     358         144 :         FlowTable *table = flow->flow_table();
     359         144 :         queue = flow_ksync_queue_[table->table_index()];
     360         144 :         break;
     361             :     }
     362             : 
     363           0 :     case FlowEvent::REENTRANT: {
     364           0 :         queue = flow_event_queue_[event->table_index()];
     365           0 :         break;
     366             :     }
     367             : 
     368           0 :     case FlowEvent::DELETE_FLOW: {
     369           0 :         FlowEntry *flow = event->flow();
     370           0 :         queue = flow_delete_queue_[flow->flow_table()->table_index()];
     371           0 :         break;
     372             :     }
     373             : 
     374         313 :     case FlowEvent::FREE_DBENTRY: {
     375         313 :         queue = flow_tokenless_queue_[0];
     376         313 :         break;
     377             :     }
     378             : 
     379         144 :     case FlowEvent::DELETE_DBENTRY:
     380             :     case FlowEvent::RECOMPUTE_FLOW:
     381             :     case FlowEvent::REVALUATE_DBENTRY: {
     382         144 :         queue = &flow_update_queue_;
     383         144 :         break;
     384             :     }
     385             : 
     386           0 :     case FlowEvent::UNRESOLVED_FLOW_ENTRY: {
     387           0 :         FlowTable *table = event->flow()->flow_table();
     388           0 :         queue = flow_event_queue_[table->table_index()];
     389           0 :         break;
     390             :     }
     391           0 :     default:
     392           0 :         assert(0);
     393             :         break;
     394             :     }
     395             : 
     396         694 :     UpdateStats(event, &stats_);
     397         694 :     queue->Enqueue(event);
     398         694 :     return;
     399             : }
     400             : 
     401         404 : bool FlowProto::FlowEventHandler(FlowEvent *req, FlowTable *table) {
     402             :     // concurrency check to ensure all request are in right partitions
     403         404 :     assert(table->ConcurrencyCheck(table->flow_task_id()) == true);
     404             : 
     405         404 :     switch (req->event()) {
     406          22 :     case FlowEvent::VROUTER_FLOW_MSG: {
     407          22 :         ProcessProto(req->pkt_info());
     408          22 :         break;
     409             :     }
     410             : 
     411           0 :     case FlowEvent::REENTRANT: {
     412           0 :         FlowHandler *handler = new FlowHandler(agent(), req->pkt_info(), io_,
     413           0 :                                                this, table->table_index());
     414           0 :         RunProtoHandler(handler);
     415           0 :         break;
     416             :     }
     417             : 
     418          25 :     case FlowEvent::FLOW_MESSAGE: {
     419          25 :         FlowEntry *flow = req->flow();
     420             :         // process event only for forward flow with same gen_id
     421             :         // it may happen that after enqueued for recompute,
     422             :         // flow become reverse flow when the following sequence of
     423             :         // events occur.
     424             :         // 1. route is changed , flow is enqueued for recompute
     425             :         // 2. flow get evicted in vrouter
     426             :         // 3. traffic is received for reverse flow and get the same flow handle
     427             :         // 4. since flow handle is same , existing flow entries in agent won't
     428             :         //    be deleted but forward flow become reverse and vice versa
     429             :         //   added check to process events only if gen id matches,
     430             :         //   otherwise ignore it. added assertion not to process reverseflow
     431             :         //   at this stage as we only enqueue forward flows.
     432             : 
     433          50 :         if ((flow->flow_handle() == req->flow_handle()) &&
     434          50 :             (flow->gen_id() == req->gen_id()) &&
     435          50 :             (flow->is_flags_set(FlowEntry::ReverseFlow) == false)) {
     436          25 :             FlowTaskMsg *flow_msg = new FlowTaskMsg(flow);
     437          25 :             PktInfoPtr pkt_info(new PktInfo(PktHandler::FLOW, flow_msg));
     438          25 :             FlowHandler *handler = new FlowHandler(agent(), pkt_info, io_,
     439          25 :                                                this, table->table_index());
     440          25 :             RunProtoHandler(handler);
     441          25 :          }
     442          25 :         break;
     443             :     }
     444             : 
     445          44 :     case FlowEvent::FREE_FLOW_REF:
     446          44 :         break;
     447             : 
     448           0 :     case FlowEvent::GROW_FREE_LIST: {
     449           0 :         table->GrowFreeList();
     450           0 :         break;
     451             :     }
     452             : 
     453           0 :     case FlowEvent::AUDIT_FLOW: {
     454           0 :         FlowEntryPtr flow_ref = table->Find(req->get_flow_key());
     455           0 :         FlowEntry *flow  = flow_ref.get();
     456           0 :         if (flow == NULL) {
     457           0 :             FlowEntryPtr new_flow = FlowEntry::Allocate(req->get_flow_key(), table);
     458           0 :             new_flow->InitAuditFlow(req->flow_handle(), req->gen_id());
     459           0 :             new_flow->flow_table()->Add(new_flow.get(), NULL);
     460           0 :         } else {
     461             :             // scenario: forward flow trap is received , before installing
     462             :             // reverse flow, traffic received for reverse flow and trap is
     463             :             // dropped and not received in agent. vrouter returns
     464             :             // EEXIST error for reverse flow. flow entry is present
     465             :             // in flow table but it is in hold state.
     466             :             // take lock in forward and reverse flow order to avoid
     467             :             // deadlock.
     468             :             // EEXIST is seen only for reverse flows,
     469           0 :             if (flow && flow->is_flags_set(FlowEntry::ReverseFlow)) {
     470           0 :                 FLOW_LOCK(flow->reverse_flow_entry(), flow, req->event());
     471           0 :                 if (!(flow->deleted()) &&
     472           0 :                     flow->ksync_entry() &&
     473           0 :                     flow->ksync_entry()->ksync_response_error() == EEXIST) {
     474           0 :                     flow->MakeShortFlow(FlowEntry::SHORT_AUDIT_ENTRY);
     475             :                 }
     476           0 :             }
     477             :         }
     478           0 :         break;
     479           0 :     }
     480             : 
     481             :     // Check if flow-handle changed. This can happen if vrouter tries to
     482             :     // setup the flow which was evicted earlier
     483           0 :     case FlowEvent::UNRESOLVED_FLOW_ENTRY: {
     484           0 :         FlowEntry *flow = req->flow();
     485           0 :         flow->flow_table()->ProcessFlowEvent(req, flow,
     486             :                                              flow->reverse_flow_entry());
     487           0 :         break;
     488             :     }
     489             : 
     490           0 :     case FlowEvent::KSYNC_EVENT: {
     491           0 :         return FlowKSyncMsgHandler(req, table);
     492             :     }
     493             : 
     494         313 :     case FlowEvent::FREE_DBENTRY: {
     495         626 :         FlowMgmtManager *mgr = agent()->pkt()->flow_mgmt_manager(
     496         313 :                                    req->table_index());
     497         313 :         mgr->flow_mgmt_dbclient()->FreeDBState(req->db_entry(), req->gen_id());
     498         313 :         break;
     499             :     }
     500             : 
     501           0 :     default: {
     502           0 :         assert(0);
     503             :         break;
     504             :     }
     505             :     }
     506             : 
     507         404 :     return true;
     508             : }
     509             : 
     510         144 : bool FlowProto::FlowKSyncMsgHandler(FlowEvent *req, FlowTable *table) {
     511         144 :     FlowEventKSync *ksync_event = static_cast<FlowEventKSync *>(req);
     512             : 
     513             :     // concurrency check to ensure all request are in right partitions
     514         144 :     assert((table->ConcurrencyCheck(table->flow_ksync_task_id()) == true) ||
     515             :            (table->ConcurrencyCheck(table->flow_task_id()) == true));
     516             : 
     517         144 :     switch (req->event()) {
     518             :     // Flow was waiting for an index. Index is available now. Retry acquiring
     519             :     // the index
     520         144 :     case FlowEvent::KSYNC_EVENT: {
     521             :         FlowTableKSyncEntry *ksync_entry =
     522         144 :             (static_cast<FlowTableKSyncEntry *> (ksync_event->ksync_entry()));
     523         144 :         FlowEntry *flow = ksync_entry->flow_entry().get();
     524         144 :         flow->flow_table()->ProcessFlowEvent(req, flow,
     525             :                                              flow->reverse_flow_entry());
     526         144 :         break;
     527             :     }
     528             : 
     529           0 :     case FlowEvent::EVICT_FLOW: {
     530           0 :         FlowEntry *flow = req->flow();
     531           0 :         flow->flow_table()->ProcessFlowEvent(req, flow,
     532             :                                              flow->reverse_flow_entry());
     533           0 :         break;
     534             :     }
     535             : 
     536           0 :     default: {
     537           0 :         assert(0);
     538             :         break;
     539             :     }
     540             :     }
     541             : 
     542         144 :     return true;
     543             : }
     544             : 
     545          73 : bool FlowProto::FlowUpdateHandler(FlowEvent *req) {
     546          73 :     switch (req->event()) {
     547          40 :     case FlowEvent::DELETE_DBENTRY:
     548             :     case FlowEvent::REVALUATE_DBENTRY: {
     549          40 :         FlowEntry *flow = req->flow();
     550          40 :         flow->flow_table()->ProcessFlowEvent(req, flow,
     551             :                                              flow->reverse_flow_entry());
     552          40 :         break;
     553             :     }
     554             : 
     555          33 :     case FlowEvent::RECOMPUTE_FLOW: {
     556          33 :         FlowEntry *flow = req->flow();
     557          33 :         flow->flow_table()->ProcessFlowEvent(req, flow,
     558             :                                              flow->reverse_flow_entry());
     559          33 :         break;
     560             :     }
     561             : 
     562           0 :     default: {
     563           0 :         assert(0);
     564             :         break;
     565             :     }
     566             :     }
     567             : 
     568          73 :     return true;
     569             : }
     570             : 
     571           0 : bool FlowProto::FlowDeleteHandler(FlowEvent *req, FlowTable *table) {
     572             :     // concurrency check to ensure all request are in right partitions
     573             :     // flow-update-queue doenst happen table pointer. Skip concurrency check
     574             :     // for flow-update-queue
     575           0 :     if (table) {
     576           0 :         assert(table->ConcurrencyCheck(table->flow_delete_task_id()) == true);
     577             :     }
     578             : 
     579           0 :     switch (req->event()) {
     580           0 :     case FlowEvent::DELETE_FLOW: {
     581           0 :         FlowEntry *flow = req->flow();
     582           0 :         table->ProcessFlowEvent(req, flow, flow->reverse_flow_entry());
     583           0 :         break;
     584             :     }
     585             : 
     586           0 :     default: {
     587           0 :         assert(0);
     588             :         break;
     589             :     }
     590             :     }
     591             : 
     592           0 :     return true;
     593             : }
     594             : 
     595             : //////////////////////////////////////////////////////////////////////////////
     596             : // Utility methods to generate events
     597             : //////////////////////////////////////////////////////////////////////////////
     598           0 : void FlowProto::DeleteFlowRequest(FlowEntry *flow) {
     599           0 :     EnqueueFlowEvent(new FlowEvent(FlowEvent::DELETE_FLOW, flow));
     600           0 :     return;
     601             : }
     602             : 
     603           0 : void FlowProto::DeleteFlowRequest(const FlowKey &key) {
     604           0 :     CHECK_CONCURRENCY(kTaskFlowEvent);
     605           0 :     FlowEntry *flow = Find(key, 0);
     606           0 :     if (flow) {
     607           0 :         EnqueueFlowEvent(new FlowEvent(FlowEvent::DELETE_FLOW, flow));
     608             :     }
     609           0 : }
     610             : 
     611           0 : void FlowProto::EvictFlowRequest(FlowEntry *flow, uint32_t flow_handle,
     612             :                                  uint8_t gen_id, uint8_t evict_gen_id) {
     613             :     FlowEvent *event = new FlowEvent(FlowEvent::EVICT_FLOW, flow,
     614           0 :                                      flow_handle, gen_id, evict_gen_id);
     615           0 :     EnqueueFlowEvent(event);
     616           0 :    return;
     617             : }
     618             : 
     619           0 : void FlowProto::CreateAuditEntry(const FlowKey &key, uint32_t flow_handle,
     620             :                                  uint8_t gen_id) {
     621           0 :     EnqueueFlowEvent(new FlowEvent(FlowEvent::AUDIT_FLOW, key, flow_handle,
     622           0 :                                    gen_id));
     623           0 :     return;
     624             : }
     625             : 
     626             : 
     627           0 : void FlowProto::GrowFreeListRequest(FlowTable *table) {
     628           0 :     EnqueueFlowEvent(new FlowEvent(FlowEvent::GROW_FREE_LIST,
     629           0 :                                    table->table_index()));
     630           0 :     return;
     631             : }
     632             : 
     633         144 : void FlowProto::KSyncEventRequest(KSyncEntry *ksync_entry,
     634             :                                   KSyncEntry::KSyncEvent event,
     635             :                                   uint32_t flow_handle, uint8_t gen_id,
     636             :                                   int ksync_error, uint64_t evict_flow_bytes,
     637             :                                   uint64_t evict_flow_packets,
     638             :                                   int32_t evict_flow_oflow,
     639             :                                   uint32_t transaction_id) {
     640         288 :     EnqueueFlowEvent(new FlowEventKSync(ksync_entry, event, flow_handle,
     641             :                                         gen_id, ksync_error, evict_flow_bytes,
     642             :                                         evict_flow_packets, evict_flow_oflow,
     643         144 :                                         transaction_id));
     644         144 : }
     645             : 
     646          27 : void FlowProto::MessageRequest(FlowEntry *flow) {
     647          27 :     EnqueueFlowEvent(new FlowEvent(FlowEvent::FLOW_MESSAGE, flow,
     648          27 :                             flow->flow_handle(), flow->gen_id()));
     649          27 :     return;
     650             : }
     651             : 
     652             : // Flow management runs in parallel to flow processing. As a result,
     653             : // we need to ensure that last reference for flow will go away from
     654             : // kTaskFlowEvent context only. This is ensured by following 2 actions
     655             : //
     656             : // 1. On return from here reference to the flow is removed which can
     657             : //    potentially be last reference. So, enqueue a dummy request to
     658             : //    flow-table queue.
     659             : // 2. Due to OS scheduling, its possible that the request we are
     660             : //    enqueuing completes even before this function is returned. So,
     661             : //    drop the reference immediately after allocating the event
     662          44 : void FlowProto::ForceEnqueueFreeFlowReference(FlowEntryPtr &flow) {
     663             :     FlowEvent *event = new FlowEvent(FlowEvent::FREE_FLOW_REF,
     664          44 :                                      flow.get());
     665          44 :     flow.reset();
     666          44 :     EnqueueFlowEvent(event);
     667          44 : }
     668             : 
     669           0 : bool FlowProto::EnqueueReentrant(PktInfoPtr msg, uint8_t table_index) {
     670           0 :     EnqueueFlowEvent(new FlowEvent(FlowEvent::REENTRANT,
     671           0 :                                    msg, NULL, table_index));
     672           0 :     return true;
     673             : }
     674             : 
     675             : // Enqueue event to force revaluation of KSync entry
     676           0 : void FlowProto::EnqueueUnResolvedFlowEntry(FlowEntry *flow) {
     677           0 :     FlowEvent *event = new FlowEvent(FlowEvent::UNRESOLVED_FLOW_ENTRY, flow);
     678           0 :     EnqueueFlowEvent(event);
     679           0 : }
     680             : 
     681             : // Apply trace-filter for flow. Will not allow true-false transistions.
     682             : // That is, if flows are already marked for tracing, action is retained
     683          47 : bool FlowProto::ShouldTrace(const FlowEntry *flow, const FlowEntry *rflow) {
     684             :     // Handle case where flow is NULL. It can happen if Update is called
     685             :     // and flow is deleted between event-processing and calling
     686             :     // FlowTable::Update
     687          47 :     if (flow == NULL)
     688           0 :         return false;
     689             : 
     690          47 :     bool trace = flow->trace();
     691          47 :     if (rflow)
     692          47 :         trace |= rflow->trace();
     693             : 
     694          47 :     if (trace == false) {
     695             :         FlowTraceFilter *filter;
     696          40 :         if (flow->key().family == Address::INET) {
     697          40 :             filter = &ipv4_trace_filter_;
     698             :         } else {
     699           0 :             filter = &ipv6_trace_filter_;
     700             :         }
     701             : 
     702          40 :         trace = filter->Match(&flow->key());
     703          40 :         if (rflow && trace == false) {
     704           0 :             trace = filter->Match(&rflow->key());
     705             :         }
     706             :     }
     707             : 
     708          47 :     return trace;
     709             : }
     710             : 
     711             : //////////////////////////////////////////////////////////////////////////////
     712             : // Token Management routines
     713             : //////////////////////////////////////////////////////////////////////////////
     714         144 : TokenPtr FlowProto::GetToken(FlowEvent::Event event) {
     715         144 :     switch (event) {
     716          44 :     case FlowEvent::VROUTER_FLOW_MSG:
     717             :     case FlowEvent::AUDIT_FLOW:
     718             :     case FlowEvent::REENTRANT:
     719          44 :         return add_tokens_.GetToken(NULL);
     720             :         break;
     721             : 
     722           0 :     case FlowEvent::KSYNC_EVENT:
     723           0 :         return ksync_tokens_.GetToken(NULL);
     724             :         break;
     725             : 
     726         100 :     case FlowEvent::FLOW_MESSAGE:
     727             :     case FlowEvent::DELETE_DBENTRY:
     728             :     case FlowEvent::REVALUATE_DBENTRY:
     729             :     case FlowEvent::RECOMPUTE_FLOW:
     730         100 :         return update_tokens_.GetToken(NULL);
     731             :         break;
     732             : 
     733           0 :     case FlowEvent::DELETE_FLOW:
     734           0 :         return del_tokens_.GetToken(NULL);
     735             :         break;
     736             : 
     737           0 :     case FlowEvent::EVICT_FLOW:
     738             :     case FlowEvent::INVALID:
     739           0 :         break;
     740             : 
     741           0 :     default:
     742           0 :         assert(0);
     743             :         break;
     744             :     }
     745             : 
     746           0 :     return add_tokens_.GetToken(NULL);
     747             : }
     748             : 
     749         286 : bool FlowProto::TokenCheck(const FlowTokenPool *pool) const {
     750         286 :     return pool->TokenCheck();
     751             : }
     752             : 
     753           0 : void FlowProto::TokenAvailable(TokenPool *pool_base) {
     754           0 :     FlowTokenPool *pool = dynamic_cast<FlowTokenPool *>(pool_base);
     755           0 :     if (pool_base == NULL)
     756           0 :         return;
     757             : 
     758           0 :     pool->IncrementRestarts();
     759           0 :     if (pool == &add_tokens_) {
     760           0 :         for (uint32_t i = 0; i < flow_event_queue_.size(); i++) {
     761           0 :             flow_event_queue_[i]->MayBeStartRunner();
     762             :         }
     763             :     }
     764             : 
     765           0 :     if (pool == &ksync_tokens_) {
     766           0 :         for (uint32_t i = 0; i < flow_event_queue_.size(); i++) {
     767           0 :             flow_ksync_queue_[i]->MayBeStartRunner();
     768             :         }
     769             :     }
     770             : 
     771           0 :     if (pool == &del_tokens_) {
     772           0 :         for (uint32_t i = 0; i < flow_event_queue_.size(); i++) {
     773           0 :             flow_delete_queue_[i]->MayBeStartRunner();
     774             :         }
     775             :     }
     776             : 
     777           0 :     if (pool == &update_tokens_) {
     778           0 :         flow_update_queue_.MayBeStartRunner();
     779             :     }
     780             : }
     781             : 
     782             : //////////////////////////////////////////////////////////////////////////////
     783             : // Set profile information
     784             : //////////////////////////////////////////////////////////////////////////////
     785         694 : void UpdateStats(FlowEvent *req, FlowStats *stats) {
     786         694 :     switch (req->event()) {
     787          22 :     case FlowEvent::VROUTER_FLOW_MSG:
     788          22 :         stats->add_count_++;
     789          22 :         break;
     790          27 :     case FlowEvent::FLOW_MESSAGE:
     791          27 :         stats->flow_messages_++;
     792          27 :         break;
     793           0 :     case FlowEvent::DELETE_FLOW:
     794           0 :         stats->delete_count_++;
     795           0 :         break;
     796           0 :     case FlowEvent::AUDIT_FLOW:
     797           0 :         stats->audit_count_++;
     798           0 :         break;
     799          71 :     case FlowEvent::RECOMPUTE_FLOW:
     800          71 :         stats->recompute_count_++;
     801          71 :         break;
     802          15 :     case FlowEvent::REVALUATE_DBENTRY:
     803          15 :         stats->revaluate_count_++;
     804          15 :         break;
     805           0 :     case FlowEvent::EVICT_FLOW:
     806           0 :         stats->evict_count_++;
     807           0 :         break;
     808         144 :     case FlowEvent::KSYNC_EVENT: {
     809         144 :         stats->vrouter_responses_++;
     810         144 :         FlowEventKSync *ksync_event = static_cast<FlowEventKSync *>(req);
     811         144 :         if (ksync_event->ksync_error())
     812           0 :             stats->vrouter_error_++;
     813         144 :         break;
     814             :     }
     815         415 :     default:
     816         415 :         break;
     817             :     }
     818         694 : }
     819             : 
     820           0 : static void SetFlowEventQueueStats(Agent *agent,
     821             :                                    const FlowEventQueueBase::Queue *queue,
     822             :                                    ProfileData::WorkQueueStats *stats) {
     823           0 :     stats->name_ = queue->Description();
     824           0 :     stats->queue_count_ = queue->Length();
     825           0 :     stats->enqueue_count_ = queue->NumEnqueues();
     826           0 :     stats->dequeue_count_ = queue->NumDequeues();
     827           0 :     stats->max_queue_count_ = queue->max_queue_len();
     828           0 :     stats->start_count_ = queue->task_starts();
     829           0 :     stats->busy_time_ = queue->busy_time();
     830           0 :     queue->set_measure_busy_time(agent->MeasureQueueDelay());
     831           0 :     if (agent->MeasureQueueDelay()) {
     832           0 :         queue->ClearStats();
     833             :     }
     834           0 : }
     835             : 
     836           0 : static void SetFlowMgmtQueueStats(Agent *agent,
     837             :                                   const FlowMgmtManager::FlowMgmtQueue *queue,
     838             :                                   ProfileData::WorkQueueStats *stats) {
     839           0 :     stats->name_ = queue->Description();
     840           0 :     stats->queue_count_ = queue->Length();
     841           0 :     stats->enqueue_count_ = queue->NumEnqueues();
     842           0 :     stats->dequeue_count_ = queue->NumDequeues();
     843           0 :     stats->max_queue_count_ = queue->max_queue_len();
     844           0 :     stats->start_count_ = queue->task_starts();
     845           0 :     stats->busy_time_ = queue->busy_time();
     846           0 :     queue->set_measure_busy_time(agent->MeasureQueueDelay());
     847           0 :     if (agent->MeasureQueueDelay())
     848           0 :         queue->ClearStats();
     849           0 : }
     850             : 
     851           0 : static void SetPktHandlerQueueStats(Agent *agent,
     852             :                                     const PktHandler::PktHandlerQueue *queue,
     853             :                                     ProfileData::WorkQueueStats *stats) {
     854           0 :     stats->name_ = queue->Description();
     855           0 :     stats->queue_count_ = queue->Length();
     856           0 :     stats->enqueue_count_ = queue->NumEnqueues();
     857           0 :     stats->dequeue_count_ = queue->NumDequeues();
     858           0 :     stats->max_queue_count_ = queue->max_queue_len();
     859           0 :     stats->start_count_ = queue->task_starts();
     860           0 :     stats->busy_time_ = queue->busy_time();
     861           0 :     queue->set_measure_busy_time(agent->MeasureQueueDelay());
     862           0 :     if (agent->MeasureQueueDelay())
     863           0 :         queue->ClearStats();
     864           0 : }
     865             : 
     866           0 : void FlowProto::SetProfileData(ProfileData *data) {
     867           0 :     data->flow_.flow_count_ = FlowCount();
     868           0 :     data->flow_.add_count_ = stats_.add_count_;
     869           0 :     data->flow_.del_count_ = stats_.delete_count_;
     870           0 :     data->flow_.audit_count_ = stats_.audit_count_;
     871           0 :     data->flow_.reval_count_ = stats_.revaluate_count_;
     872           0 :     data->flow_.recompute_count_ = stats_.recompute_count_;
     873           0 :     data->flow_.vrouter_responses_ = stats_.vrouter_responses_;
     874           0 :     data->flow_.vrouter_error_ = stats_.vrouter_error_;
     875           0 :     data->flow_.evict_count_ = stats_.evict_count_;
     876             : 
     877           0 :     PktModule *pkt = agent()->pkt();
     878           0 :     std::vector<FlowMgmtManager *> mgr_list = pkt->flow_mgmt_manager_list();
     879             : 
     880           0 :     data->flow_.flow_event_queue_.resize(flow_table_list_.size());
     881           0 :     data->flow_.flow_delete_queue_.resize(flow_table_list_.size());
     882           0 :     data->flow_.flow_tokenless_queue_.resize(flow_table_list_.size());
     883           0 :     data->flow_.flow_ksync_queue_.resize(flow_table_list_.size());
     884           0 :     for (uint16_t i = 0; i < flow_table_list_.size(); i++) {
     885           0 :         SetFlowMgmtQueueStats(agent(), mgr_list[i]->request_queue(),
     886             :                               &data->flow_.flow_mgmt_queue_);
     887           0 :         SetFlowEventQueueStats(agent(), flow_event_queue_[i]->queue(),
     888           0 :                                &data->flow_.flow_event_queue_[i]);
     889           0 :         SetFlowEventQueueStats(agent(), flow_delete_queue_[i]->queue(),
     890           0 :                                &data->flow_.flow_delete_queue_[i]);
     891           0 :         SetFlowEventQueueStats(agent(), flow_tokenless_queue_[i]->queue(),
     892           0 :                                &data->flow_.flow_tokenless_queue_[i]);
     893           0 :         SetFlowEventQueueStats(agent(), flow_ksync_queue_[i]->queue(),
     894           0 :                                &data->flow_.flow_ksync_queue_[i]);
     895             :     }
     896           0 :     SetFlowEventQueueStats(agent(), flow_update_queue_.queue(),
     897             :                            &data->flow_.flow_update_queue_);
     898             :     const PktHandler::PktHandlerQueue *pkt_queue =
     899           0 :         pkt->pkt_handler()->work_queue();
     900           0 :     SetPktHandlerQueueStats(agent(), pkt_queue,
     901             :                             &data->flow_.pkt_handler_queue_);
     902             : 
     903           0 :     data->flow_.token_stats_.add_tokens_ = add_tokens_.token_count();
     904           0 :     data->flow_.token_stats_.add_failures_ = add_tokens_.failures();
     905           0 :     data->flow_.token_stats_.add_restarts_ = add_tokens_.restarts();
     906           0 :     data->flow_.token_stats_.ksync_tokens_ = ksync_tokens_.token_count();
     907           0 :     data->flow_.token_stats_.ksync_failures_ = ksync_tokens_.failures();
     908           0 :     data->flow_.token_stats_.ksync_restarts_ = ksync_tokens_.restarts();
     909           0 :     data->flow_.token_stats_.update_tokens_ = update_tokens_.token_count();
     910           0 :     data->flow_.token_stats_.update_failures_ = update_tokens_.failures();
     911           0 :     data->flow_.token_stats_.update_restarts_ = update_tokens_.restarts();
     912           0 :     data->flow_.token_stats_.del_tokens_ = del_tokens_.token_count();
     913           0 :     data->flow_.token_stats_.del_failures_ = del_tokens_.failures();
     914           0 :     data->flow_.token_stats_.del_restarts_ = del_tokens_.restarts();
     915           0 : }
     916             : 
     917           0 : bool FlowProto::FlowStatsUpdate() const {
     918           0 :     agent_->stats()->UpdateFlowMinMaxStats(agent_->stats()->flow_created(),
     919           0 :                                            agent_->stats()->added());
     920           0 :     agent_->stats()->UpdateFlowMinMaxStats(agent_->stats()->flow_aged(),
     921           0 :                                            agent_->stats()->deleted());
     922           0 :     return true;
     923             : }
     924             : 
     925           0 : void FlowProto::InterfaceFlowCount(const Interface *intf, uint64_t *created,
     926             :                                    uint64_t *aged,
     927             :                                    uint32_t *active_flows) const {
     928           0 :     *created = 0;
     929           0 :     *aged = 0;
     930           0 :     *active_flows = 0;
     931           0 :     if (intf == NULL)
     932           0 :         return;
     933             :     std::vector<FlowMgmtManager *>::const_iterator it =
     934           0 :         agent_->pkt()->flow_mgmt_manager_iterator_begin();
     935           0 :     while (it != agent_->pkt()->flow_mgmt_manager_iterator_end()) {
     936           0 :         (*it)->InterfaceFlowCount(intf, created, aged, active_flows);
     937           0 :         it++;
     938             :     }
     939             : }

Generated by: LCOV version 1.14