Line data Source code
1 : /*
2 : * Copyright (c) 2013 Juniper Networks, Inc. All rights reserved.
3 : */
4 : #include <base/address_util.h>
5 : #include <boost/functional/hash.hpp>
6 : #include <init/agent_param.h>
7 : #include <cmn/agent_stats.h>
8 : #include <oper/agent_profile.h>
9 : #include <vrouter/ksync/flowtable_ksync.h>
10 : #include <vrouter/ksync/ksync_init.h>
11 : #include <vrouter/ksync/ksync_flow_index_manager.h>
12 : #include "vrouter/flow_stats/flow_stats_collector.h"
13 : #include "flow_proto.h"
14 : #include "flow_mgmt/flow_mgmt_dbclient.h"
15 : #include "flow_mgmt.h"
16 : #include "flow_event.h"
17 : #include <strings.h>
18 : #include "flow_entry.h"
19 :
20 : static void UpdateStats(FlowEvent *event, FlowStats *stats);
21 :
22 2 : FlowProto::FlowProto(Agent *agent, boost::asio::io_context &io) :
23 : Proto(agent, kTaskFlowEvent, PktHandler::FLOW, io),
24 2 : add_tokens_("Add Tokens", this, agent->flow_add_tokens()),
25 2 : ksync_tokens_("KSync` Tokens", this, agent->flow_ksync_tokens()),
26 2 : del_tokens_("Delete Tokens", this, agent->flow_del_tokens()),
27 2 : update_tokens_("Update Tokens", this, agent->flow_update_tokens()),
28 2 : flow_update_queue_(agent, this, &update_tokens_,
29 2 : agent->params()->flow_task_latency_limit(), 16),
30 2 : use_vrouter_hash_(false), ipv4_trace_filter_(), ipv6_trace_filter_(),
31 2 : stats_(),
32 2 : port_table_manager_(agent, agent->params()->fabric_snat_hash_table_size()),
33 2 : stats_update_timer_(TimerManager::CreateTimer
34 2 : (*(agent->event_manager())->io_service(), "FlowStatsUpdateTimer",
35 4 : TaskScheduler::GetInstance()->GetTaskId(kTaskFlowStatsUpdate), 0)) {
36 2 : linklocal_flow_count_ = 0;
37 2 : agent->SetFlowProto(this);
38 2 : set_trace(false);
39 2 : uint16_t table_count = agent->flow_thread_count();
40 2 : assert(table_count >= kMinTableCount && table_count <= kMaxTableCount);
41 4 : for (uint8_t i = 0; i < table_count; i++) {
42 2 : flow_table_list_.push_back(new FlowTable(agent_, i));
43 : }
44 :
45 4 : for (uint32_t i = 0; i < table_count; i++) {
46 2 : uint16_t latency = agent->params()->flow_task_latency_limit();
47 : flow_event_queue_.push_back
48 4 : (new FlowEventQueue(agent, this, flow_table_list_[i],
49 : &add_tokens_, latency,
50 2 : FlowEventQueue::Queue::kMaxIterations));
51 :
52 : flow_tokenless_queue_.push_back
53 4 : (new FlowEventQueue(agent, this, flow_table_list_[i],
54 : NULL, latency,
55 2 : 2 * FlowEventQueue::Queue::kMaxIterations));
56 :
57 : flow_delete_queue_.push_back
58 4 : (new DeleteFlowEventQueue(agent, this, flow_table_list_[i],
59 : &del_tokens_, latency,
60 2 : FlowEventQueue::Queue::kMaxIterations));
61 :
62 : flow_ksync_queue_.push_back
63 4 : (new KSyncFlowEventQueue(agent, this, flow_table_list_[i],
64 : &ksync_tokens_, latency,
65 2 : FlowEventQueue::Queue::kMaxIterations));
66 : }
67 2 : if (::getenv("USE_VROUTER_HASH") != NULL) {
68 0 : string opt = ::getenv("USE_VROUTER_HASH");
69 0 : if (opt == "" || strcasecmp(opt.c_str(), "false"))
70 0 : use_vrouter_hash_ = false;
71 : else
72 0 : use_vrouter_hash_ = true;
73 0 : }
74 2 : }
75 :
76 4 : FlowProto::~FlowProto() {
77 2 : STLDeleteValues(&flow_event_queue_);
78 2 : STLDeleteValues(&flow_tokenless_queue_);
79 2 : STLDeleteValues(&flow_delete_queue_);
80 2 : STLDeleteValues(&flow_ksync_queue_);
81 2 : STLDeleteValues(&flow_table_list_);
82 4 : }
83 :
84 2 : void FlowProto::Init() {
85 2 : agent_->stats()->RegisterFlowCountFn(boost::bind(&FlowProto::FlowCount,
86 : this));
87 4 : for (uint16_t i = 0; i < flow_table_list_.size(); i++) {
88 2 : flow_table_list_[i]->Init();
89 : }
90 :
91 2 : AgentProfile *profile = agent_->oper_db()->agent_profile();
92 2 : profile->RegisterPktFlowStatsCb(boost::bind(&FlowProto::SetProfileData,
93 : this, _1));
94 :
95 2 : ipv4_trace_filter_.Init(agent_->flow_trace_enable(), Address::INET);
96 2 : ipv6_trace_filter_.Init(agent_->flow_trace_enable(), Address::INET6);
97 2 : }
98 :
99 2 : void FlowProto::InitDone() {
100 4 : for (uint16_t i = 0; i < flow_table_list_.size(); i++) {
101 2 : flow_table_list_[i]->InitDone();
102 : }
103 2 : stats_update_timer_->Start(agent_->stats()->flow_stats_update_timeout(),
104 : boost::bind(&FlowProto::FlowStatsUpdate, this));
105 2 : }
106 :
107 2 : void FlowProto::Shutdown() {
108 4 : for (uint16_t i = 0; i < flow_table_list_.size(); i++) {
109 2 : flow_table_list_[i]->Shutdown();
110 : }
111 4 : for (uint32_t i = 0; i < flow_event_queue_.size(); i++) {
112 2 : flow_event_queue_[i]->Shutdown();
113 2 : flow_tokenless_queue_[i]->Shutdown();
114 2 : flow_delete_queue_[i]->Shutdown();
115 2 : flow_ksync_queue_[i]->Shutdown();
116 : }
117 2 : flow_update_queue_.Shutdown();
118 2 : if (stats_update_timer_) {
119 2 : stats_update_timer_->Cancel();
120 2 : TimerManager::DeleteTimer(stats_update_timer_);
121 : }
122 2 : }
123 :
124 220 : static std::size_t HashCombine(std::size_t hash, uint64_t val) {
125 220 : boost::hash_combine(hash, val);
126 220 : return hash;
127 : }
128 :
129 88 : static std::size_t HashIp(std::size_t hash, const IpAddress &ip) {
130 88 : if (ip.is_v6()) {
131 : uint64_t val[2];
132 0 : Ip6AddressToU64Array(ip.to_v6(), val, 2);
133 0 : hash = HashCombine(hash, val[0]);
134 0 : hash = HashCombine(hash, val[1]);
135 88 : } else if (ip.is_v4()) {
136 88 : hash = HashCombine(hash, ip.to_v4().to_ulong());
137 : } else {
138 0 : assert(0);
139 : }
140 88 : return hash;
141 : }
142 :
143 : // Get the thread to be used for the flow. We *try* to map forward and reverse
144 : // flow to same thread with following,
145 : // if (sip < dip)
146 : // ip1 = sip
147 : // ip2 = dip
148 : // else
149 : // ip1 = dip
150 : // ip2 = sip
151 : // if (sport < dport)
152 : // port1 = sport
153 : // port2 = dport
154 : // else
155 : // port1 = dport
156 : // port2 = sport
157 : // field5 = proto
158 : // hash = HASH(ip1, ip2, port1, port2, proto)
159 : //
160 : // The algorithm above cannot ensure NAT flows belong to same thread.
161 44 : uint16_t FlowProto::FlowTableIndex(const IpAddress &sip, const IpAddress &dip,
162 : uint8_t proto, uint16_t sport,
163 : uint16_t dport, uint32_t flow_handle) const {
164 44 : if (use_vrouter_hash_) {
165 0 : return (flow_handle/flow_table_list_.size()) % flow_table_list_.size();
166 : }
167 :
168 44 : std::size_t hash = 0;
169 44 : if (sip < dip) {
170 22 : hash = HashIp(hash, sip);
171 22 : hash = HashIp(hash, dip);
172 : } else {
173 22 : hash = HashIp(hash, dip);
174 22 : hash = HashIp(hash, sip);
175 : }
176 :
177 44 : if (sport < dport) {
178 32 : hash = HashCombine(hash, sport);
179 32 : hash = HashCombine(hash, dport);
180 : } else {
181 12 : hash = HashCombine(hash, dport);
182 12 : hash = HashCombine(hash, sport);
183 : }
184 44 : hash = HashCombine(hash, proto);
185 44 : return (hash % (flow_event_queue_.size()));
186 : }
187 :
188 22 : FlowHandler *FlowProto::AllocProtoHandler(PktInfoPtr info,
189 : boost::asio::io_context &io) {
190 22 : uint32_t index = FlowTableIndex(info->ip_saddr, info->ip_daddr,
191 22 : info->ip_proto, info->sport, info->dport,
192 22 : info->agent_hdr.cmd_param);
193 22 : return new FlowHandler(agent(), info, io, this, index);
194 : }
195 :
196 22 : bool FlowProto::Validate(PktInfo *msg) {
197 22 : if (msg->ip == NULL && msg->ip6 == NULL && msg->type != PktType::MESSAGE) {
198 0 : if (msg->family == Address::INET || msg->family == Address::INET6) {
199 0 : FLOW_TRACE(DetailErr, msg->agent_hdr.cmd_param,
200 : msg->agent_hdr.ifindex,
201 : msg->agent_hdr.vrf,
202 : msg->ip_saddr.to_string(),
203 : msg->ip_daddr.to_string(),
204 : "Flow : Non-IP packet. Dropping", false);
205 : } else {
206 0 : assert(0);
207 : }
208 0 : return false;
209 : }
210 22 : return true;
211 : }
212 :
213 0 : FlowTable *FlowProto::GetFlowTable(const FlowKey &key,
214 : uint32_t flow_handle) const {
215 0 : uint32_t index = FlowTableIndex(key.src_addr, key.dst_addr, key.protocol,
216 0 : key.src_port, key.dst_port, flow_handle);
217 0 : return flow_table_list_[index];
218 : }
219 :
220 22 : bool FlowProto::Enqueue(PktInfoPtr msg) {
221 22 : if (Validate(msg.get()) == false) {
222 0 : return true;
223 : }
224 :
225 22 : FreeBuffer(msg.get());
226 22 : EnqueueFlowEvent(new FlowEvent(FlowEvent::VROUTER_FLOW_MSG, msg, NULL, 0));
227 22 : return true;
228 : }
229 :
230 0 : void FlowProto::DisableFlowEventQueue(uint32_t index, bool disabled) {
231 0 : flow_event_queue_[index]->set_disable(disabled);
232 0 : flow_tokenless_queue_[index]->set_disable(disabled);
233 0 : flow_delete_queue_[index]->set_disable(disabled);
234 0 : }
235 :
236 0 : void FlowProto::DisableFlowUpdateQueue(bool disabled) {
237 0 : flow_update_queue_.set_disable(disabled);
238 0 : }
239 :
240 0 : void FlowProto::DisableFlowKSyncQueue(uint32_t index, bool disabled) {
241 0 : flow_ksync_queue_[index]->set_disable(disabled);
242 0 : }
243 :
244 0 : size_t FlowProto::FlowUpdateQueueLength() {
245 0 : return flow_update_queue_.Length();
246 : }
247 :
248 0 : void FlowProto::DisableFlowDeleteQueue(uint32_t index, bool disabled) {
249 0 : flow_delete_queue_[index]->set_disable(disabled);
250 0 : }
251 :
252 : /////////////////////////////////////////////////////////////////////////////
253 : // FlowTable related routines
254 : /////////////////////////////////////////////////////////////////////////////
255 2 : void FlowProto::FlushFlows() {
256 4 : for (uint16_t i = 0; i < flow_table_list_.size(); i++) {
257 2 : flow_table_list_[i]->DeleteAll();
258 : }
259 2 : }
260 :
261 91 : FlowTable *FlowProto::GetTable(uint16_t index) const {
262 91 : return flow_table_list_[index];
263 : }
264 :
265 12 : uint32_t FlowProto::FlowCount() const {
266 12 : uint32_t count = 0;
267 24 : for (uint16_t i = 0; i < flow_table_list_.size(); i++) {
268 12 : count += flow_table_list_[i]->Size();
269 : }
270 12 : return count;
271 : }
272 :
273 0 : void FlowProto::VnFlowCounters(const VnEntry *vn, uint32_t *in_count,
274 : uint32_t *out_count) {
275 0 : *in_count = 0;
276 0 : *out_count = 0;
277 0 : if (vn == NULL)
278 0 : return;
279 :
280 : std::vector<FlowMgmtManager *>::const_iterator it =
281 0 : agent_->pkt()->flow_mgmt_manager_iterator_begin();
282 0 : while (it != agent_->pkt()->flow_mgmt_manager_iterator_end()) {
283 0 : (*it)->VnFlowCounters(vn, in_count, out_count);
284 0 : it++;
285 : }
286 : }
287 :
288 42 : FlowEntry *FlowProto::Find(const FlowKey &key, uint32_t table_index) const {
289 42 : return GetTable(table_index)->Find(key);
290 : }
291 :
292 22 : bool FlowProto::AddFlow(FlowEntry *flow) {
293 22 : FlowTable *table = flow->flow_table();
294 22 : table->Add(flow, flow->reverse_flow_entry());
295 22 : return true;
296 : }
297 :
298 25 : bool FlowProto::UpdateFlow(FlowEntry *flow) {
299 25 : FlowTable *table = flow->flow_table();
300 25 : table->Update(flow, flow->reverse_flow_entry());
301 25 : return true;
302 : }
303 :
304 : /////////////////////////////////////////////////////////////////////////////
305 : // Flow Control Event routines
306 : /////////////////////////////////////////////////////////////////////////////
307 694 : void FlowProto::EnqueueFlowEvent(FlowEvent *event) {
308 694 : FlowEventQueueBase *queue = NULL;
309 694 : switch (event->event()) {
310 22 : case FlowEvent::VROUTER_FLOW_MSG: {
311 22 : PktInfo *info = event->pkt_info().get();
312 44 : uint32_t index = FlowTableIndex(info->ip_saddr, info->ip_daddr,
313 22 : info->ip_proto, info->sport,
314 22 : info->dport,
315 22 : info->agent_hdr.cmd_param);
316 22 : queue = flow_event_queue_[index];
317 22 : break;
318 : }
319 :
320 27 : case FlowEvent::FLOW_MESSAGE: {
321 27 : FlowEntry *flow = event->flow();
322 27 : FlowTable *table = flow->flow_table();
323 27 : queue = flow_event_queue_[table->table_index()];
324 27 : break;
325 : }
326 :
327 0 : case FlowEvent::EVICT_FLOW: {
328 0 : FlowEntry *flow = event->flow();
329 0 : FlowTable *table = flow->flow_table();
330 0 : queue = flow_ksync_queue_[table->table_index()];
331 0 : break;
332 : }
333 :
334 44 : case FlowEvent::FREE_FLOW_REF: {
335 44 : FlowEntry *flow = event->flow();
336 44 : FlowTable *table = flow->flow_table();
337 44 : queue = flow_tokenless_queue_[table->table_index()];
338 44 : break;
339 : }
340 :
341 0 : case FlowEvent::AUDIT_FLOW: {
342 0 : FlowTable *table = GetFlowTable(event->get_flow_key(),
343 : event->flow_handle());
344 0 : queue = flow_event_queue_[table->table_index()];
345 0 : break;
346 : }
347 :
348 0 : case FlowEvent::GROW_FREE_LIST: {
349 0 : queue = flow_tokenless_queue_[event->table_index()];
350 0 : break;
351 : }
352 :
353 144 : case FlowEvent::KSYNC_EVENT: {
354 144 : FlowEventKSync *ksync_event = static_cast<FlowEventKSync *>(event);
355 : FlowTableKSyncEntry *ksync_entry =
356 144 : (static_cast<FlowTableKSyncEntry *> (ksync_event->ksync_entry()));
357 144 : FlowEntry *flow = ksync_entry->flow_entry().get();
358 144 : FlowTable *table = flow->flow_table();
359 144 : queue = flow_ksync_queue_[table->table_index()];
360 144 : break;
361 : }
362 :
363 0 : case FlowEvent::REENTRANT: {
364 0 : queue = flow_event_queue_[event->table_index()];
365 0 : break;
366 : }
367 :
368 0 : case FlowEvent::DELETE_FLOW: {
369 0 : FlowEntry *flow = event->flow();
370 0 : queue = flow_delete_queue_[flow->flow_table()->table_index()];
371 0 : break;
372 : }
373 :
374 313 : case FlowEvent::FREE_DBENTRY: {
375 313 : queue = flow_tokenless_queue_[0];
376 313 : break;
377 : }
378 :
379 144 : case FlowEvent::DELETE_DBENTRY:
380 : case FlowEvent::RECOMPUTE_FLOW:
381 : case FlowEvent::REVALUATE_DBENTRY: {
382 144 : queue = &flow_update_queue_;
383 144 : break;
384 : }
385 :
386 0 : case FlowEvent::UNRESOLVED_FLOW_ENTRY: {
387 0 : FlowTable *table = event->flow()->flow_table();
388 0 : queue = flow_event_queue_[table->table_index()];
389 0 : break;
390 : }
391 0 : default:
392 0 : assert(0);
393 : break;
394 : }
395 :
396 694 : UpdateStats(event, &stats_);
397 694 : queue->Enqueue(event);
398 694 : return;
399 : }
400 :
401 404 : bool FlowProto::FlowEventHandler(FlowEvent *req, FlowTable *table) {
402 : // concurrency check to ensure all request are in right partitions
403 404 : assert(table->ConcurrencyCheck(table->flow_task_id()) == true);
404 :
405 404 : switch (req->event()) {
406 22 : case FlowEvent::VROUTER_FLOW_MSG: {
407 22 : ProcessProto(req->pkt_info());
408 22 : break;
409 : }
410 :
411 0 : case FlowEvent::REENTRANT: {
412 0 : FlowHandler *handler = new FlowHandler(agent(), req->pkt_info(), io_,
413 0 : this, table->table_index());
414 0 : RunProtoHandler(handler);
415 0 : break;
416 : }
417 :
418 25 : case FlowEvent::FLOW_MESSAGE: {
419 25 : FlowEntry *flow = req->flow();
420 : // process event only for forward flow with same gen_id
421 : // it may happen that after enqueued for recompute,
422 : // flow become reverse flow when the following sequence of
423 : // events occur.
424 : // 1. route is changed , flow is enqueued for recompute
425 : // 2. flow get evicted in vrouter
426 : // 3. traffic is received for reverse flow and get the same flow handle
427 : // 4. since flow handle is same , existing flow entries in agent won't
428 : // be deleted but forward flow become reverse and vice versa
429 : // added check to process events only if gen id matches,
430 : // otherwise ignore it. added assertion not to process reverseflow
431 : // at this stage as we only enqueue forward flows.
432 :
433 50 : if ((flow->flow_handle() == req->flow_handle()) &&
434 50 : (flow->gen_id() == req->gen_id()) &&
435 50 : (flow->is_flags_set(FlowEntry::ReverseFlow) == false)) {
436 25 : FlowTaskMsg *flow_msg = new FlowTaskMsg(flow);
437 25 : PktInfoPtr pkt_info(new PktInfo(PktHandler::FLOW, flow_msg));
438 25 : FlowHandler *handler = new FlowHandler(agent(), pkt_info, io_,
439 25 : this, table->table_index());
440 25 : RunProtoHandler(handler);
441 25 : }
442 25 : break;
443 : }
444 :
445 44 : case FlowEvent::FREE_FLOW_REF:
446 44 : break;
447 :
448 0 : case FlowEvent::GROW_FREE_LIST: {
449 0 : table->GrowFreeList();
450 0 : break;
451 : }
452 :
453 0 : case FlowEvent::AUDIT_FLOW: {
454 0 : FlowEntryPtr flow_ref = table->Find(req->get_flow_key());
455 0 : FlowEntry *flow = flow_ref.get();
456 0 : if (flow == NULL) {
457 0 : FlowEntryPtr new_flow = FlowEntry::Allocate(req->get_flow_key(), table);
458 0 : new_flow->InitAuditFlow(req->flow_handle(), req->gen_id());
459 0 : new_flow->flow_table()->Add(new_flow.get(), NULL);
460 0 : } else {
461 : // scenario: forward flow trap is received , before installing
462 : // reverse flow, traffic received for reverse flow and trap is
463 : // dropped and not received in agent. vrouter returns
464 : // EEXIST error for reverse flow. flow entry is present
465 : // in flow table but it is in hold state.
466 : // take lock in forward and reverse flow order to avoid
467 : // deadlock.
468 : // EEXIST is seen only for reverse flows,
469 0 : if (flow && flow->is_flags_set(FlowEntry::ReverseFlow)) {
470 0 : FLOW_LOCK(flow->reverse_flow_entry(), flow, req->event());
471 0 : if (!(flow->deleted()) &&
472 0 : flow->ksync_entry() &&
473 0 : flow->ksync_entry()->ksync_response_error() == EEXIST) {
474 0 : flow->MakeShortFlow(FlowEntry::SHORT_AUDIT_ENTRY);
475 : }
476 0 : }
477 : }
478 0 : break;
479 0 : }
480 :
481 : // Check if flow-handle changed. This can happen if vrouter tries to
482 : // setup the flow which was evicted earlier
483 0 : case FlowEvent::UNRESOLVED_FLOW_ENTRY: {
484 0 : FlowEntry *flow = req->flow();
485 0 : flow->flow_table()->ProcessFlowEvent(req, flow,
486 : flow->reverse_flow_entry());
487 0 : break;
488 : }
489 :
490 0 : case FlowEvent::KSYNC_EVENT: {
491 0 : return FlowKSyncMsgHandler(req, table);
492 : }
493 :
494 313 : case FlowEvent::FREE_DBENTRY: {
495 626 : FlowMgmtManager *mgr = agent()->pkt()->flow_mgmt_manager(
496 313 : req->table_index());
497 313 : mgr->flow_mgmt_dbclient()->FreeDBState(req->db_entry(), req->gen_id());
498 313 : break;
499 : }
500 :
501 0 : default: {
502 0 : assert(0);
503 : break;
504 : }
505 : }
506 :
507 404 : return true;
508 : }
509 :
510 144 : bool FlowProto::FlowKSyncMsgHandler(FlowEvent *req, FlowTable *table) {
511 144 : FlowEventKSync *ksync_event = static_cast<FlowEventKSync *>(req);
512 :
513 : // concurrency check to ensure all request are in right partitions
514 144 : assert((table->ConcurrencyCheck(table->flow_ksync_task_id()) == true) ||
515 : (table->ConcurrencyCheck(table->flow_task_id()) == true));
516 :
517 144 : switch (req->event()) {
518 : // Flow was waiting for an index. Index is available now. Retry acquiring
519 : // the index
520 144 : case FlowEvent::KSYNC_EVENT: {
521 : FlowTableKSyncEntry *ksync_entry =
522 144 : (static_cast<FlowTableKSyncEntry *> (ksync_event->ksync_entry()));
523 144 : FlowEntry *flow = ksync_entry->flow_entry().get();
524 144 : flow->flow_table()->ProcessFlowEvent(req, flow,
525 : flow->reverse_flow_entry());
526 144 : break;
527 : }
528 :
529 0 : case FlowEvent::EVICT_FLOW: {
530 0 : FlowEntry *flow = req->flow();
531 0 : flow->flow_table()->ProcessFlowEvent(req, flow,
532 : flow->reverse_flow_entry());
533 0 : break;
534 : }
535 :
536 0 : default: {
537 0 : assert(0);
538 : break;
539 : }
540 : }
541 :
542 144 : return true;
543 : }
544 :
545 73 : bool FlowProto::FlowUpdateHandler(FlowEvent *req) {
546 73 : switch (req->event()) {
547 40 : case FlowEvent::DELETE_DBENTRY:
548 : case FlowEvent::REVALUATE_DBENTRY: {
549 40 : FlowEntry *flow = req->flow();
550 40 : flow->flow_table()->ProcessFlowEvent(req, flow,
551 : flow->reverse_flow_entry());
552 40 : break;
553 : }
554 :
555 33 : case FlowEvent::RECOMPUTE_FLOW: {
556 33 : FlowEntry *flow = req->flow();
557 33 : flow->flow_table()->ProcessFlowEvent(req, flow,
558 : flow->reverse_flow_entry());
559 33 : break;
560 : }
561 :
562 0 : default: {
563 0 : assert(0);
564 : break;
565 : }
566 : }
567 :
568 73 : return true;
569 : }
570 :
571 0 : bool FlowProto::FlowDeleteHandler(FlowEvent *req, FlowTable *table) {
572 : // concurrency check to ensure all request are in right partitions
573 : // flow-update-queue doenst happen table pointer. Skip concurrency check
574 : // for flow-update-queue
575 0 : if (table) {
576 0 : assert(table->ConcurrencyCheck(table->flow_delete_task_id()) == true);
577 : }
578 :
579 0 : switch (req->event()) {
580 0 : case FlowEvent::DELETE_FLOW: {
581 0 : FlowEntry *flow = req->flow();
582 0 : table->ProcessFlowEvent(req, flow, flow->reverse_flow_entry());
583 0 : break;
584 : }
585 :
586 0 : default: {
587 0 : assert(0);
588 : break;
589 : }
590 : }
591 :
592 0 : return true;
593 : }
594 :
595 : //////////////////////////////////////////////////////////////////////////////
596 : // Utility methods to generate events
597 : //////////////////////////////////////////////////////////////////////////////
598 0 : void FlowProto::DeleteFlowRequest(FlowEntry *flow) {
599 0 : EnqueueFlowEvent(new FlowEvent(FlowEvent::DELETE_FLOW, flow));
600 0 : return;
601 : }
602 :
603 0 : void FlowProto::DeleteFlowRequest(const FlowKey &key) {
604 0 : CHECK_CONCURRENCY(kTaskFlowEvent);
605 0 : FlowEntry *flow = Find(key, 0);
606 0 : if (flow) {
607 0 : EnqueueFlowEvent(new FlowEvent(FlowEvent::DELETE_FLOW, flow));
608 : }
609 0 : }
610 :
611 0 : void FlowProto::EvictFlowRequest(FlowEntry *flow, uint32_t flow_handle,
612 : uint8_t gen_id, uint8_t evict_gen_id) {
613 : FlowEvent *event = new FlowEvent(FlowEvent::EVICT_FLOW, flow,
614 0 : flow_handle, gen_id, evict_gen_id);
615 0 : EnqueueFlowEvent(event);
616 0 : return;
617 : }
618 :
619 0 : void FlowProto::CreateAuditEntry(const FlowKey &key, uint32_t flow_handle,
620 : uint8_t gen_id) {
621 0 : EnqueueFlowEvent(new FlowEvent(FlowEvent::AUDIT_FLOW, key, flow_handle,
622 0 : gen_id));
623 0 : return;
624 : }
625 :
626 :
627 0 : void FlowProto::GrowFreeListRequest(FlowTable *table) {
628 0 : EnqueueFlowEvent(new FlowEvent(FlowEvent::GROW_FREE_LIST,
629 0 : table->table_index()));
630 0 : return;
631 : }
632 :
633 144 : void FlowProto::KSyncEventRequest(KSyncEntry *ksync_entry,
634 : KSyncEntry::KSyncEvent event,
635 : uint32_t flow_handle, uint8_t gen_id,
636 : int ksync_error, uint64_t evict_flow_bytes,
637 : uint64_t evict_flow_packets,
638 : int32_t evict_flow_oflow,
639 : uint32_t transaction_id) {
640 288 : EnqueueFlowEvent(new FlowEventKSync(ksync_entry, event, flow_handle,
641 : gen_id, ksync_error, evict_flow_bytes,
642 : evict_flow_packets, evict_flow_oflow,
643 144 : transaction_id));
644 144 : }
645 :
646 27 : void FlowProto::MessageRequest(FlowEntry *flow) {
647 27 : EnqueueFlowEvent(new FlowEvent(FlowEvent::FLOW_MESSAGE, flow,
648 27 : flow->flow_handle(), flow->gen_id()));
649 27 : return;
650 : }
651 :
652 : // Flow management runs in parallel to flow processing. As a result,
653 : // we need to ensure that last reference for flow will go away from
654 : // kTaskFlowEvent context only. This is ensured by following 2 actions
655 : //
656 : // 1. On return from here reference to the flow is removed which can
657 : // potentially be last reference. So, enqueue a dummy request to
658 : // flow-table queue.
659 : // 2. Due to OS scheduling, its possible that the request we are
660 : // enqueuing completes even before this function is returned. So,
661 : // drop the reference immediately after allocating the event
662 44 : void FlowProto::ForceEnqueueFreeFlowReference(FlowEntryPtr &flow) {
663 : FlowEvent *event = new FlowEvent(FlowEvent::FREE_FLOW_REF,
664 44 : flow.get());
665 44 : flow.reset();
666 44 : EnqueueFlowEvent(event);
667 44 : }
668 :
669 0 : bool FlowProto::EnqueueReentrant(PktInfoPtr msg, uint8_t table_index) {
670 0 : EnqueueFlowEvent(new FlowEvent(FlowEvent::REENTRANT,
671 0 : msg, NULL, table_index));
672 0 : return true;
673 : }
674 :
675 : // Enqueue event to force revaluation of KSync entry
676 0 : void FlowProto::EnqueueUnResolvedFlowEntry(FlowEntry *flow) {
677 0 : FlowEvent *event = new FlowEvent(FlowEvent::UNRESOLVED_FLOW_ENTRY, flow);
678 0 : EnqueueFlowEvent(event);
679 0 : }
680 :
681 : // Apply trace-filter for flow. Will not allow true-false transistions.
682 : // That is, if flows are already marked for tracing, action is retained
683 47 : bool FlowProto::ShouldTrace(const FlowEntry *flow, const FlowEntry *rflow) {
684 : // Handle case where flow is NULL. It can happen if Update is called
685 : // and flow is deleted between event-processing and calling
686 : // FlowTable::Update
687 47 : if (flow == NULL)
688 0 : return false;
689 :
690 47 : bool trace = flow->trace();
691 47 : if (rflow)
692 47 : trace |= rflow->trace();
693 :
694 47 : if (trace == false) {
695 : FlowTraceFilter *filter;
696 40 : if (flow->key().family == Address::INET) {
697 40 : filter = &ipv4_trace_filter_;
698 : } else {
699 0 : filter = &ipv6_trace_filter_;
700 : }
701 :
702 40 : trace = filter->Match(&flow->key());
703 40 : if (rflow && trace == false) {
704 0 : trace = filter->Match(&rflow->key());
705 : }
706 : }
707 :
708 47 : return trace;
709 : }
710 :
711 : //////////////////////////////////////////////////////////////////////////////
712 : // Token Management routines
713 : //////////////////////////////////////////////////////////////////////////////
714 144 : TokenPtr FlowProto::GetToken(FlowEvent::Event event) {
715 144 : switch (event) {
716 44 : case FlowEvent::VROUTER_FLOW_MSG:
717 : case FlowEvent::AUDIT_FLOW:
718 : case FlowEvent::REENTRANT:
719 44 : return add_tokens_.GetToken(NULL);
720 : break;
721 :
722 0 : case FlowEvent::KSYNC_EVENT:
723 0 : return ksync_tokens_.GetToken(NULL);
724 : break;
725 :
726 100 : case FlowEvent::FLOW_MESSAGE:
727 : case FlowEvent::DELETE_DBENTRY:
728 : case FlowEvent::REVALUATE_DBENTRY:
729 : case FlowEvent::RECOMPUTE_FLOW:
730 100 : return update_tokens_.GetToken(NULL);
731 : break;
732 :
733 0 : case FlowEvent::DELETE_FLOW:
734 0 : return del_tokens_.GetToken(NULL);
735 : break;
736 :
737 0 : case FlowEvent::EVICT_FLOW:
738 : case FlowEvent::INVALID:
739 0 : break;
740 :
741 0 : default:
742 0 : assert(0);
743 : break;
744 : }
745 :
746 0 : return add_tokens_.GetToken(NULL);
747 : }
748 :
749 286 : bool FlowProto::TokenCheck(const FlowTokenPool *pool) const {
750 286 : return pool->TokenCheck();
751 : }
752 :
753 0 : void FlowProto::TokenAvailable(TokenPool *pool_base) {
754 0 : FlowTokenPool *pool = dynamic_cast<FlowTokenPool *>(pool_base);
755 0 : if (pool_base == NULL)
756 0 : return;
757 :
758 0 : pool->IncrementRestarts();
759 0 : if (pool == &add_tokens_) {
760 0 : for (uint32_t i = 0; i < flow_event_queue_.size(); i++) {
761 0 : flow_event_queue_[i]->MayBeStartRunner();
762 : }
763 : }
764 :
765 0 : if (pool == &ksync_tokens_) {
766 0 : for (uint32_t i = 0; i < flow_event_queue_.size(); i++) {
767 0 : flow_ksync_queue_[i]->MayBeStartRunner();
768 : }
769 : }
770 :
771 0 : if (pool == &del_tokens_) {
772 0 : for (uint32_t i = 0; i < flow_event_queue_.size(); i++) {
773 0 : flow_delete_queue_[i]->MayBeStartRunner();
774 : }
775 : }
776 :
777 0 : if (pool == &update_tokens_) {
778 0 : flow_update_queue_.MayBeStartRunner();
779 : }
780 : }
781 :
782 : //////////////////////////////////////////////////////////////////////////////
783 : // Set profile information
784 : //////////////////////////////////////////////////////////////////////////////
785 694 : void UpdateStats(FlowEvent *req, FlowStats *stats) {
786 694 : switch (req->event()) {
787 22 : case FlowEvent::VROUTER_FLOW_MSG:
788 22 : stats->add_count_++;
789 22 : break;
790 27 : case FlowEvent::FLOW_MESSAGE:
791 27 : stats->flow_messages_++;
792 27 : break;
793 0 : case FlowEvent::DELETE_FLOW:
794 0 : stats->delete_count_++;
795 0 : break;
796 0 : case FlowEvent::AUDIT_FLOW:
797 0 : stats->audit_count_++;
798 0 : break;
799 71 : case FlowEvent::RECOMPUTE_FLOW:
800 71 : stats->recompute_count_++;
801 71 : break;
802 15 : case FlowEvent::REVALUATE_DBENTRY:
803 15 : stats->revaluate_count_++;
804 15 : break;
805 0 : case FlowEvent::EVICT_FLOW:
806 0 : stats->evict_count_++;
807 0 : break;
808 144 : case FlowEvent::KSYNC_EVENT: {
809 144 : stats->vrouter_responses_++;
810 144 : FlowEventKSync *ksync_event = static_cast<FlowEventKSync *>(req);
811 144 : if (ksync_event->ksync_error())
812 0 : stats->vrouter_error_++;
813 144 : break;
814 : }
815 415 : default:
816 415 : break;
817 : }
818 694 : }
819 :
820 0 : static void SetFlowEventQueueStats(Agent *agent,
821 : const FlowEventQueueBase::Queue *queue,
822 : ProfileData::WorkQueueStats *stats) {
823 0 : stats->name_ = queue->Description();
824 0 : stats->queue_count_ = queue->Length();
825 0 : stats->enqueue_count_ = queue->NumEnqueues();
826 0 : stats->dequeue_count_ = queue->NumDequeues();
827 0 : stats->max_queue_count_ = queue->max_queue_len();
828 0 : stats->start_count_ = queue->task_starts();
829 0 : stats->busy_time_ = queue->busy_time();
830 0 : queue->set_measure_busy_time(agent->MeasureQueueDelay());
831 0 : if (agent->MeasureQueueDelay()) {
832 0 : queue->ClearStats();
833 : }
834 0 : }
835 :
836 0 : static void SetFlowMgmtQueueStats(Agent *agent,
837 : const FlowMgmtManager::FlowMgmtQueue *queue,
838 : ProfileData::WorkQueueStats *stats) {
839 0 : stats->name_ = queue->Description();
840 0 : stats->queue_count_ = queue->Length();
841 0 : stats->enqueue_count_ = queue->NumEnqueues();
842 0 : stats->dequeue_count_ = queue->NumDequeues();
843 0 : stats->max_queue_count_ = queue->max_queue_len();
844 0 : stats->start_count_ = queue->task_starts();
845 0 : stats->busy_time_ = queue->busy_time();
846 0 : queue->set_measure_busy_time(agent->MeasureQueueDelay());
847 0 : if (agent->MeasureQueueDelay())
848 0 : queue->ClearStats();
849 0 : }
850 :
851 0 : static void SetPktHandlerQueueStats(Agent *agent,
852 : const PktHandler::PktHandlerQueue *queue,
853 : ProfileData::WorkQueueStats *stats) {
854 0 : stats->name_ = queue->Description();
855 0 : stats->queue_count_ = queue->Length();
856 0 : stats->enqueue_count_ = queue->NumEnqueues();
857 0 : stats->dequeue_count_ = queue->NumDequeues();
858 0 : stats->max_queue_count_ = queue->max_queue_len();
859 0 : stats->start_count_ = queue->task_starts();
860 0 : stats->busy_time_ = queue->busy_time();
861 0 : queue->set_measure_busy_time(agent->MeasureQueueDelay());
862 0 : if (agent->MeasureQueueDelay())
863 0 : queue->ClearStats();
864 0 : }
865 :
866 0 : void FlowProto::SetProfileData(ProfileData *data) {
867 0 : data->flow_.flow_count_ = FlowCount();
868 0 : data->flow_.add_count_ = stats_.add_count_;
869 0 : data->flow_.del_count_ = stats_.delete_count_;
870 0 : data->flow_.audit_count_ = stats_.audit_count_;
871 0 : data->flow_.reval_count_ = stats_.revaluate_count_;
872 0 : data->flow_.recompute_count_ = stats_.recompute_count_;
873 0 : data->flow_.vrouter_responses_ = stats_.vrouter_responses_;
874 0 : data->flow_.vrouter_error_ = stats_.vrouter_error_;
875 0 : data->flow_.evict_count_ = stats_.evict_count_;
876 :
877 0 : PktModule *pkt = agent()->pkt();
878 0 : std::vector<FlowMgmtManager *> mgr_list = pkt->flow_mgmt_manager_list();
879 :
880 0 : data->flow_.flow_event_queue_.resize(flow_table_list_.size());
881 0 : data->flow_.flow_delete_queue_.resize(flow_table_list_.size());
882 0 : data->flow_.flow_tokenless_queue_.resize(flow_table_list_.size());
883 0 : data->flow_.flow_ksync_queue_.resize(flow_table_list_.size());
884 0 : for (uint16_t i = 0; i < flow_table_list_.size(); i++) {
885 0 : SetFlowMgmtQueueStats(agent(), mgr_list[i]->request_queue(),
886 : &data->flow_.flow_mgmt_queue_);
887 0 : SetFlowEventQueueStats(agent(), flow_event_queue_[i]->queue(),
888 0 : &data->flow_.flow_event_queue_[i]);
889 0 : SetFlowEventQueueStats(agent(), flow_delete_queue_[i]->queue(),
890 0 : &data->flow_.flow_delete_queue_[i]);
891 0 : SetFlowEventQueueStats(agent(), flow_tokenless_queue_[i]->queue(),
892 0 : &data->flow_.flow_tokenless_queue_[i]);
893 0 : SetFlowEventQueueStats(agent(), flow_ksync_queue_[i]->queue(),
894 0 : &data->flow_.flow_ksync_queue_[i]);
895 : }
896 0 : SetFlowEventQueueStats(agent(), flow_update_queue_.queue(),
897 : &data->flow_.flow_update_queue_);
898 : const PktHandler::PktHandlerQueue *pkt_queue =
899 0 : pkt->pkt_handler()->work_queue();
900 0 : SetPktHandlerQueueStats(agent(), pkt_queue,
901 : &data->flow_.pkt_handler_queue_);
902 :
903 0 : data->flow_.token_stats_.add_tokens_ = add_tokens_.token_count();
904 0 : data->flow_.token_stats_.add_failures_ = add_tokens_.failures();
905 0 : data->flow_.token_stats_.add_restarts_ = add_tokens_.restarts();
906 0 : data->flow_.token_stats_.ksync_tokens_ = ksync_tokens_.token_count();
907 0 : data->flow_.token_stats_.ksync_failures_ = ksync_tokens_.failures();
908 0 : data->flow_.token_stats_.ksync_restarts_ = ksync_tokens_.restarts();
909 0 : data->flow_.token_stats_.update_tokens_ = update_tokens_.token_count();
910 0 : data->flow_.token_stats_.update_failures_ = update_tokens_.failures();
911 0 : data->flow_.token_stats_.update_restarts_ = update_tokens_.restarts();
912 0 : data->flow_.token_stats_.del_tokens_ = del_tokens_.token_count();
913 0 : data->flow_.token_stats_.del_failures_ = del_tokens_.failures();
914 0 : data->flow_.token_stats_.del_restarts_ = del_tokens_.restarts();
915 0 : }
916 :
917 0 : bool FlowProto::FlowStatsUpdate() const {
918 0 : agent_->stats()->UpdateFlowMinMaxStats(agent_->stats()->flow_created(),
919 0 : agent_->stats()->added());
920 0 : agent_->stats()->UpdateFlowMinMaxStats(agent_->stats()->flow_aged(),
921 0 : agent_->stats()->deleted());
922 0 : return true;
923 : }
924 :
925 0 : void FlowProto::InterfaceFlowCount(const Interface *intf, uint64_t *created,
926 : uint64_t *aged,
927 : uint32_t *active_flows) const {
928 0 : *created = 0;
929 0 : *aged = 0;
930 0 : *active_flows = 0;
931 0 : if (intf == NULL)
932 0 : return;
933 : std::vector<FlowMgmtManager *>::const_iterator it =
934 0 : agent_->pkt()->flow_mgmt_manager_iterator_begin();
935 0 : while (it != agent_->pkt()->flow_mgmt_manager_iterator_end()) {
936 0 : (*it)->InterfaceFlowCount(intf, created, aged, active_flows);
937 0 : it++;
938 : }
939 : }
|