Line data Source code
1 : /*
2 : * Copyright (c) 2013 Juniper Networks, Inc. All rights reserved.
3 : */
4 :
5 : #include <atomic>
6 : #include <string>
7 :
8 : #include "base/os.h"
9 :
10 : #if defined(__linux__)
11 : #include <asm/types.h>
12 : #include <linux/netlink.h>
13 : #include <linux/rtnetlink.h>
14 : #include <linux/genetlink.h>
15 : #include <linux/sockios.h>
16 : #endif
17 :
18 : #include <sys/socket.h>
19 :
20 : #include <boost/bind/bind.hpp>
21 :
22 : #include <base/logging.h>
23 : #include <db/db.h>
24 : #include <db/db_entry.h>
25 : #include <db/db_table.h>
26 : #include <db/db_table_partition.h>
27 :
28 : #include "ksync_index.h"
29 : #include "ksync_entry.h"
30 : #include "ksync_object.h"
31 : #include "ksync_sock.h"
32 : #include "ksync_sock_user.h"
33 : #include "ksync_types.h"
34 :
35 : #include "nl_util.h"
36 : #include "udp_util.h"
37 : #include "vr_genetlink.h"
38 : #include "vr_types.h"
39 :
40 : using namespace boost::asio;
41 : using namespace boost::placeholders;
42 :
43 : /* Note SO_RCVBUFFORCE is supported only for linux version 2.6.14 and above */
44 : typedef boost::asio::detail::socket_option::integer<SOL_SOCKET,
45 : SO_RCVBUFFORCE> ReceiveBuffForceSize;
46 :
47 : int KSyncSock::vnsw_netlink_family_id_;
48 : AgentSandeshContext *KSyncSock::agent_sandesh_ctx_[kRxWorkQueueCount];
49 : std::unique_ptr<KSyncSock> KSyncSock::sock_;
50 : pid_t KSyncSock::pid_;
51 : std::atomic<bool> KSyncSock::shutdown_;
52 :
53 : // Name of task used in KSync Response work-queues
54 : const char* IoContext::io_wq_names[IoContext::MAX_WORK_QUEUES] =
55 : {
56 : "Agent::Uve",
57 : "Agent::KSync"
58 : };
59 : /////////////////////////////////////////////////////////////////////////////
60 : // Netlink utilities
61 : /////////////////////////////////////////////////////////////////////////////
62 0 : uint32_t GetNetlinkSeqno(char *data) {
63 0 : struct nlmsghdr *nlh = (struct nlmsghdr *)data;
64 0 : return nlh->nlmsg_seq;
65 : }
66 :
67 0 : bool NetlinkMsgDone(char *data) {
68 0 : struct nlmsghdr *nlh = (struct nlmsghdr *)data;
69 0 : return ((nlh->nlmsg_flags & NLM_F_MULTI) != 0);
70 : }
71 :
72 : // Common validation for netlink messages
73 1068 : bool ValidateNetlink(char *data) {
74 1068 : struct nlmsghdr *nlh = (struct nlmsghdr *)data;
75 1068 : if (nlh->nlmsg_type == NLMSG_ERROR) {
76 0 : LOG(ERROR, "Netlink error for seqno " << nlh->nlmsg_seq << " len "
77 : << nlh->nlmsg_len);
78 0 : assert(0);
79 : return false;
80 : }
81 :
82 1068 : if (nlh->nlmsg_len > KSyncSock::kBufLen) {
83 0 : LOG(ERROR, "Length of " << nlh->nlmsg_len << " is more than expected "
84 : "length of " << KSyncSock::kBufLen);
85 0 : assert(0);
86 : return false;
87 : }
88 :
89 1068 : if (nlh->nlmsg_type == NLMSG_DONE) {
90 46 : return true;
91 : }
92 :
93 : // Sanity checks for generic-netlink message
94 1022 : if (nlh->nlmsg_type != KSyncSock::GetNetlinkFamilyId()) {
95 0 : LOG(ERROR, "Netlink unknown message type : " << nlh->nlmsg_type);
96 0 : assert(0);
97 : return false;
98 : }
99 :
100 1022 : struct genlmsghdr *genlh = (struct genlmsghdr *) (data + NLMSG_HDRLEN);
101 1022 : if (genlh->cmd != SANDESH_REQUEST) {
102 0 : LOG(ERROR, "Unknown generic netlink cmd : " << genlh->cmd);
103 0 : assert(0);
104 : return false;
105 : }
106 :
107 1022 : struct nlattr * attr = (struct nlattr *)(data + NLMSG_HDRLEN
108 : + GENL_HDRLEN);
109 1022 : if (attr->nla_type != NL_ATTR_VR_MESSAGE_PROTOCOL) {
110 0 : LOG(ERROR, "Unknown generic netlink TLV type : " << attr->nla_type);
111 0 : assert(0);
112 : return false;
113 : }
114 :
115 1022 : return true;
116 : }
117 :
118 1068 : void GetNetlinkPayload(char *data, char **buf, uint32_t *buf_len) {
119 1068 : struct nlmsghdr *nlh = (struct nlmsghdr *)data;
120 1068 : int len = 0;
121 1068 : if (nlh->nlmsg_type == NLMSG_DONE) {
122 46 : len = NLMSG_HDRLEN;
123 : } else {
124 1022 : len = NLMSG_HDRLEN + GENL_HDRLEN + NLA_HDRLEN;
125 : }
126 :
127 1068 : *buf = data + len;
128 1068 : *buf_len = nlh->nlmsg_len - len;
129 1068 : }
130 :
131 2 : void InitNetlink(nl_client *client) {
132 2 : nl_init_generic_client_req(client, KSyncSock::GetNetlinkFamilyId());
133 : unsigned char *nl_buf;
134 : uint32_t nl_buf_len;
135 2 : assert(nl_build_header(client, &nl_buf, &nl_buf_len) >= 0);
136 2 : }
137 :
138 0 : void ResetNetlink(nl_client *client) {
139 : unsigned char *nl_buf;
140 : uint32_t nl_buf_len;
141 0 : client->cl_buf_offset = 0;
142 0 : nl_build_header(client, &nl_buf, &nl_buf_len);
143 0 : }
144 :
145 0 : void UpdateNetlink(nl_client *client, uint32_t len, uint32_t seq_no) {
146 0 : nl_update_header(client, len);
147 0 : struct nlmsghdr *nlh = (struct nlmsghdr *)client->cl_buf;
148 0 : nlh->nlmsg_pid = KSyncSock::GetPid();
149 0 : nlh->nlmsg_seq = seq_no;
150 0 : }
151 :
152 1068 : void DecodeSandeshMessages(char *buf, uint32_t buf_len, SandeshContext *sandesh_context,
153 : uint32_t alignment) {
154 2236 : while (buf_len > (alignment - 1)) {
155 : int error;
156 1168 : int decode_len = Sandesh::ReceiveBinaryMsgOne((uint8_t *)buf, buf_len,
157 : &error, sandesh_context);
158 1168 : if (decode_len < 0) {
159 0 : LOG(DEBUG, "Incorrect decode len " << decode_len);
160 0 : break;
161 : }
162 1168 : buf += decode_len;
163 1168 : buf_len -= decode_len;
164 : }
165 1068 : }
166 :
167 : /////////////////////////////////////////////////////////////////////////////
168 : // KSyncSock routines
169 : /////////////////////////////////////////////////////////////////////////////
170 2 : KSyncSock::KSyncSock() :
171 2 : nl_client_(NULL), wait_tree_(), send_queue_(this),
172 2 : max_bulk_msg_count_(kMaxBulkMsgCount),
173 2 : bulk_seq_no_(kInvalidBulkSeqNo), bulk_buf_size_(0), bulk_msg_count_(0),
174 2 : rx_buff_(NULL), read_inline_(true), bulk_msg_context_(NULL),
175 2 : use_wait_tree_(true), process_data_inline_(false),
176 10 : ksync_bulk_sandesh_context_(), uve_bulk_sandesh_context_(),
177 2 : tx_count_(0), ack_count_(0), err_count_(0),
178 2 : rx_process_queue_(TaskScheduler::GetInstance()->GetTaskId("Agent::KSync"), 0,
179 2 : boost::bind(&KSyncSock::ProcessRxData, this, _1)) {
180 2 : TaskScheduler *scheduler = TaskScheduler::GetInstance();
181 :
182 : uint32_t uve_task_id =
183 2 : scheduler->GetTaskId(IoContext::io_wq_names[IoContext::IOC_UVE]);
184 : uint32_t ksync_task_id =
185 2 : scheduler->GetTaskId(IoContext::io_wq_names[IoContext::IOC_KSYNC]);
186 6 : for(uint32_t i = 0; i < kRxWorkQueueCount; i++) {
187 4 : ksync_rx_queue[i] = AllocQueue(ksync_bulk_sandesh_context_,
188 : ksync_task_id, i, "KSync Receive Queue");
189 4 : uve_rx_queue[i] = AllocQueue(uve_bulk_sandesh_context_,
190 : uve_task_id, i, "KSync UVE Receive Queue");
191 : }
192 :
193 2 : nl_client_ = (nl_client *)malloc(sizeof(nl_client));
194 2 : memset(nl_client_, 0, sizeof(nl_client));
195 2 : rx_buff_ = NULL;
196 2 : seqno_ = 0;
197 2 : uve_seqno_ = 0;
198 :
199 2 : memset(bulk_mctx_arr_, 0, sizeof(bulk_mctx_arr_));
200 2 : bmca_prod_ = bmca_cons_ = 0;
201 2 : }
202 :
203 0 : KSyncSock::~KSyncSock() {
204 0 : assert(wait_tree_.size() == 0);
205 :
206 0 : if (rx_buff_) {
207 0 : delete [] rx_buff_;
208 0 : rx_buff_ = NULL;
209 : }
210 :
211 0 : for(int i = 0; i < kRxWorkQueueCount; i++) {
212 0 : ksync_rx_queue[i]->Shutdown();
213 0 : delete ksync_rx_queue[i];
214 :
215 0 : uve_rx_queue[i]->Shutdown();
216 0 : delete uve_rx_queue[i];
217 : }
218 :
219 0 : if (nl_client_->cl_buf) {
220 0 : free(nl_client_->cl_buf);
221 : }
222 0 : free(nl_client_);
223 0 : }
224 :
225 2 : void KSyncSock::Shutdown() {
226 2 : shutdown_ = true;
227 2 : sock_->send_queue_.Shutdown();
228 2 : sock_.release();
229 2 : }
230 :
231 2 : void KSyncSock::Init(bool use_work_queue, const std::string &cpu_pin_policy) {
232 2 : sock_->send_queue_.Init(use_work_queue, cpu_pin_policy);
233 2 : pid_ = getpid();
234 2 : shutdown_ = false;
235 2 : }
236 :
237 8 : KSyncSock::KSyncReceiveQueue *KSyncSock::AllocQueue
238 : (KSyncBulkSandeshContext ctxt[], uint32_t task_id, uint32_t instance,
239 : const char *name) {
240 : KSyncReceiveQueue *queue;
241 16 : queue = new KSyncReceiveQueue
242 : (task_id, instance, boost::bind(&KSyncSock::ProcessKernelData, this,
243 16 : &ctxt[instance], _1));
244 : char tmp[128];
245 8 : sprintf(tmp, "%s-%d", name, instance);
246 8 : queue->set_name(tmp);
247 8 : return queue;
248 : }
249 :
250 2 : void KSyncSock::SetMeasureQueueDelay(bool val) {
251 2 : sock_->send_queue_.set_measure_busy_time(val);
252 6 : for (int i = 0; i < kRxWorkQueueCount; i++) {
253 4 : ksync_rx_queue[i]->set_measure_busy_time(val);
254 : }
255 2 : }
256 :
257 2 : void KSyncSock::Start(bool read_inline) {
258 2 : sock_->read_inline_ = read_inline;
259 2 : if (sock_->read_inline_) {
260 2 : return;
261 : }
262 0 : sock_->rx_buff_ = new char[kBufLen];
263 0 : sock_->AsyncReceive(boost::asio::buffer(sock_->rx_buff_, kBufLen),
264 : boost::bind(&KSyncSock::ReadHandler, sock_.get(),
265 : placeholders::error,
266 : placeholders::bytes_transferred));
267 : }
268 :
269 2 : void KSyncSock::SetSockTableEntry(KSyncSock *sock) {
270 2 : assert(sock_.get() == NULL);
271 2 : sock_.reset(sock);
272 2 : }
273 :
274 2 : void KSyncSock::SetNetlinkFamilyId(int id) {
275 2 : vnsw_netlink_family_id_ = id;
276 2 : InitNetlink(sock_->nl_client_);
277 2 : }
278 :
279 0 : uint32_t KSyncSock::WaitTreeSize() const {
280 0 : return wait_tree_.size();
281 : }
282 :
283 0 : void KSyncSock::SetSeqno(uint32_t seq) {
284 0 : seqno_ = seq;
285 0 : uve_seqno_ = seq;
286 0 : }
287 :
288 1020 : uint32_t KSyncSock::AllocSeqNo(IoContext::Type type, uint32_t instance) {
289 : uint32_t seq;
290 1020 : if (type == IoContext::IOC_UVE) {
291 0 : seq = uve_seqno_.fetch_add(1);
292 0 : seq = (seq * kRxWorkQueueCount + (instance % kRxWorkQueueCount)) << 1;
293 : } else {
294 1020 : seq = seqno_.fetch_add(1);
295 1020 : seq = (seq * kRxWorkQueueCount + (instance % kRxWorkQueueCount)) << 1;
296 1020 : seq |= KSYNC_DEFAULT_Q_ID_SEQ;
297 : }
298 1020 : if (seq == kInvalidBulkSeqNo) {
299 0 : return AllocSeqNo(type, instance);
300 : }
301 1020 : return seq;
302 : }
303 :
304 0 : uint32_t KSyncSock::AllocSeqNo(IoContext::Type type) {
305 0 : return AllocSeqNo(type, 0);
306 : }
307 :
308 1066 : KSyncSock::KSyncReceiveQueue *KSyncSock::GetReceiveQueue(IoContext::Type type,
309 : uint32_t instance) {
310 1066 : if (type == IoContext::IOC_UVE) {
311 0 : return uve_rx_queue[instance % kRxWorkQueueCount];
312 : } else {
313 1066 : return ksync_rx_queue[instance % kRxWorkQueueCount];
314 : }
315 : }
316 :
317 0 : KSyncSock::KSyncReceiveQueue *KSyncSock::GetReceiveQueue(uint32_t seqno) {
318 : IoContext::Type type;
319 0 : if (seqno & KSYNC_DEFAULT_Q_ID_SEQ)
320 0 : type = IoContext::IOC_KSYNC;
321 : else
322 0 : type = IoContext::IOC_UVE;
323 :
324 0 : uint32_t instance = (seqno >> 1) % kRxWorkQueueCount;
325 0 : return GetReceiveQueue(type, instance);
326 : }
327 1020 : void KSyncSock::EnqueueRxProcessData(KSyncEntry *entry, KSyncEntry::KSyncEvent event) {
328 1020 : rx_process_queue_.Enqueue(KSyncRxQueueData(entry, event));
329 1020 : }
330 1020 : bool KSyncSock::ProcessRxData(KSyncRxQueueData data) {
331 1020 : assert(data.event_ != KSyncEntry::INVALID);
332 1020 : KSyncObject *object = data.entry_->GetObject();
333 1020 : object->NetlinkAck(data.entry_, data.event_);
334 1020 : return true;
335 : }
336 0 : KSyncBulkSandeshContext *KSyncSock::GetBulkSandeshContext(uint32_t seqno) {
337 :
338 0 : uint32_t instance = (seqno >> 1) % kRxWorkQueueCount;
339 0 : if (seqno & KSYNC_DEFAULT_Q_ID_SEQ)
340 0 : return &ksync_bulk_sandesh_context_[instance];
341 : else
342 0 : return &uve_bulk_sandesh_context_[instance];
343 : }
344 :
345 0 : KSyncSock *KSyncSock::Get(DBTablePartBase *partition) {
346 0 : return sock_.get();
347 : }
348 :
349 1024 : KSyncSock *KSyncSock::Get(int idx) {
350 1024 : assert(idx == 0);
351 1024 : return sock_.get();
352 : }
353 :
354 1066 : bool KSyncSock::ValidateAndEnqueue(char *data, KSyncBulkMsgContext *context) {
355 1066 : Validate(data);
356 :
357 : KSyncReceiveQueue *queue;
358 1066 : if (context) {
359 1066 : queue = GetReceiveQueue(context->io_context_type(),
360 : context->work_queue_index());
361 : } else {
362 0 : queue = GetReceiveQueue(GetSeqno(data));
363 : }
364 1066 : queue->Enqueue(KSyncRxData(data, context));
365 1066 : return true;
366 : }
367 :
368 : // Read handler registered with boost::asio. Demux done based on seqno_
369 0 : void KSyncSock::ReadHandler(const boost::system::error_code& error,
370 : size_t bytes_transferred) {
371 0 : if (error) {
372 0 : LOG(ERROR, "Error reading from Ksync sock. Error : " <<
373 : boost::system::system_error(error).what());
374 0 : if (shutdown_ == false) {
375 0 : assert(0);
376 : }
377 0 : return;
378 : }
379 :
380 0 : ValidateAndEnqueue(rx_buff_, NULL);
381 :
382 0 : rx_buff_ = new char[kBufLen];
383 0 : AsyncReceive(boost::asio::buffer(rx_buff_, kBufLen),
384 : boost::bind(&KSyncSock::ReadHandler, this,
385 : placeholders::error,
386 : placeholders::bytes_transferred));
387 : }
388 :
389 : // Process kernel data - executes in the task specified by IoContext
390 : // Currently only Agent::KSync and Agent::Uve are possibilities
391 1066 : bool KSyncSock::ProcessKernelData(KSyncBulkSandeshContext *bulk_sandesh_context,
392 : const KSyncRxData &data) {
393 1066 : KSyncBulkMsgContext *bulk_message_context = data.bulk_msg_context_;
394 1066 : WaitTree::iterator it;
395 1066 : if (data.bulk_msg_context_ == NULL) {
396 0 : uint32_t seqno = GetSeqno(data.buff_);
397 : {
398 0 : std::scoped_lock lock(mutex_);
399 0 : it = wait_tree_.find(seqno);
400 0 : }
401 0 : if (it == wait_tree_.end()) {
402 0 : LOG(ERROR, "KSync error in finding for sequence number : "
403 : << seqno);
404 0 : assert(0);
405 : }
406 0 : bulk_message_context = &(it->second);
407 : }
408 :
409 1066 : bulk_sandesh_context->set_bulk_message_context(bulk_message_context);
410 1066 : BulkDecoder(data.buff_, bulk_sandesh_context);
411 : // Remove the IoContext only on last netlink message
412 1066 : if (IsMoreData(data.buff_) == false) {
413 926 : if (data.bulk_msg_context_ != NULL) {
414 926 : delete data.bulk_msg_context_;
415 : } else {
416 0 : std::scoped_lock lock(mutex_);
417 0 : wait_tree_.erase(it);
418 0 : }
419 : }
420 1066 : delete[] data.buff_;
421 1066 : return true;
422 : }
423 :
424 2 : bool KSyncSock::BlockingRecv() {
425 : char data[kBufLen];
426 2 : bool ret = false;
427 :
428 : do {
429 2 : Receive(boost::asio::buffer(data, kBufLen));
430 2 : AgentSandeshContext *ctxt = KSyncSock::GetAgentSandeshContext(0);
431 2 : ctxt->SetErrno(0);
432 : // BlockingRecv used only during Init and doesnt support bulk messages
433 : // Use non-bulk version of decoder
434 2 : Decoder(data, ctxt);
435 2 : if (ctxt->GetErrno() != 0 && ctxt->GetErrno() != EEXIST) {
436 0 : KSYNC_ERROR(VRouterError, "VRouter operation failed. Error <",
437 : ctxt->GetErrno(), ":",
438 : KSyncEntry::VrouterErrorToString(ctxt->GetErrno()),
439 : ">. Object <", "N/A", ">. State <", "N/A",
440 : ">. Message number :", 0);
441 0 : ret = true;
442 : }
443 2 : } while (IsMoreData(data));
444 :
445 2 : return ret;
446 : }
447 :
448 : // BlockingSend does not support bulk messages.
449 2 : size_t KSyncSock::BlockingSend(char *msg, int msg_len) {
450 2 : KSyncBufferList iovec;
451 2 : iovec.push_back(buffer(msg, msg_len));
452 2 : bulk_buf_size_ = msg_len;
453 4 : return SendTo(&iovec, 0);
454 2 : }
455 :
456 0 : void KSyncSock::GenericSend(IoContext *ioc) {
457 0 : send_queue_.Enqueue(ioc);
458 0 : }
459 :
460 1020 : void KSyncSock::SendAsync(KSyncEntry *entry, int msg_len, char *msg,
461 : KSyncEntry::KSyncEvent event) {
462 1020 : KSyncIoContext *ioc = new KSyncIoContext(this, entry, msg_len, msg, event);
463 : // Pre-allocate buffers to minimize processing in KSyncTxQueue context
464 1020 : if (read_inline_ && entry->pre_alloc_rx_buffer()) {
465 144 : ioc->rx_buffer1_ = new char [kBufLen];
466 144 : ioc->rx_buffer2_ = new char [kBufLen];
467 : } else {
468 876 : ioc->rx_buffer1_ = ioc->rx_buffer2_ = NULL;
469 : }
470 1020 : send_queue_.Enqueue(ioc);
471 1020 : }
472 :
473 : // Write handler registered with boost::asio
474 0 : void KSyncSock::WriteHandler(const boost::system::error_code& error,
475 : size_t bytes_transferred) {
476 0 : if (error) {
477 0 : LOG(ERROR, "Ksync sock write error : " <<
478 : boost::system::system_error(error).what());
479 0 : if (shutdown_ == false) {
480 0 : assert(0);
481 : }
482 : }
483 0 : }
484 :
485 : // End of messages in the work-queue. Send messages pending in bulk context
486 926 : void KSyncSock::OnEmptyQueue(bool done) {
487 926 : if (bulk_seq_no_ == kInvalidBulkSeqNo)
488 0 : return;
489 :
490 926 : KSyncBulkMsgContext *bulk_message_context = NULL;
491 926 : if (use_wait_tree_) {
492 926 : if (read_inline_ == false) {
493 0 : std::scoped_lock lock(mutex_);
494 0 : WaitTree::iterator it = wait_tree_.find(bulk_seq_no_);
495 0 : assert(it != wait_tree_.end());
496 0 : bulk_message_context = &it->second;
497 0 : } else {
498 926 : bulk_message_context = bulk_msg_context_;
499 : }
500 : } else {
501 0 : bulk_message_context = bulk_mctx_arr_[bmca_prod_];
502 : }
503 :
504 926 : SendBulkMessage(bulk_message_context, bulk_seq_no_);
505 : }
506 :
507 : // Send messages accumilated in bulk context
508 926 : int KSyncSock::SendBulkMessage(KSyncBulkMsgContext *bulk_message_context,
509 : uint32_t seqno) {
510 926 : KSyncBufferList iovec;
511 : // Get all buffers to send into single io-vector
512 926 : bulk_message_context->Data(&iovec);
513 926 : tx_count_++;
514 :
515 926 : if (!read_inline_) {
516 0 : if (!use_wait_tree_) {
517 0 : bmca_prod_++;
518 0 : if (bmca_prod_ >= KSYNC_BMC_ARR_SIZE) {
519 0 : bmca_prod_ = 0;
520 : }
521 : }
522 :
523 0 : AsyncSendTo(&iovec, seqno,
524 : boost::bind(&KSyncSock::WriteHandler, this,
525 : placeholders::error,
526 : placeholders::bytes_transferred));
527 : } else {
528 926 : SendTo(&iovec, seqno);
529 926 : bool more_data = false;
530 1066 : do {
531 1066 : char *rxbuf = bulk_message_context->GetReceiveBuffer();
532 1066 : Receive(boost::asio::buffer(rxbuf, kBufLen));
533 1066 : more_data = IsMoreData(rxbuf);
534 1066 : if (!process_data_inline_) {
535 1066 : ValidateAndEnqueue(rxbuf, bulk_message_context);
536 : } else {
537 0 : ProcessDataInline(rxbuf);
538 : }
539 : } while(more_data);
540 : }
541 :
542 926 : bulk_msg_context_ = NULL;
543 926 : bulk_seq_no_ = kInvalidBulkSeqNo;
544 926 : return true;
545 926 : }
546 :
547 : // Get the bulk-context for sequence-number
548 1020 : KSyncBulkMsgContext *KSyncSock::LocateBulkContext
549 : (uint32_t seqno, IoContext::Type io_context_type,
550 : uint32_t work_queue_index) {
551 1020 : if (read_inline_) {
552 1020 : if (bulk_seq_no_ == kInvalidBulkSeqNo) {
553 926 : assert(bulk_msg_context_ == NULL);
554 926 : bulk_seq_no_ = seqno;
555 926 : bulk_buf_size_ = 0;
556 926 : bulk_msg_count_ = 0;
557 926 : bulk_msg_context_ = new KSyncBulkMsgContext(io_context_type,
558 926 : work_queue_index);
559 : }
560 1020 : return bulk_msg_context_;
561 : }
562 :
563 0 : if (use_wait_tree_) {
564 0 : std::scoped_lock lock(mutex_);
565 0 : if (bulk_seq_no_ == kInvalidBulkSeqNo) {
566 0 : bulk_seq_no_ = seqno;
567 0 : bulk_buf_size_ = 0;
568 0 : bulk_msg_count_ = 0;
569 :
570 0 : wait_tree_.insert(WaitTreePair(seqno,
571 0 : KSyncBulkMsgContext(io_context_type,
572 : work_queue_index)));
573 : }
574 :
575 0 : WaitTree::iterator it = wait_tree_.find(bulk_seq_no_);
576 0 : assert(it != wait_tree_.end());
577 0 : return &it->second;
578 0 : } else {
579 0 : if (bulk_seq_no_ == kInvalidBulkSeqNo) {
580 0 : bulk_seq_no_ = seqno;
581 0 : bulk_buf_size_ = 0;
582 0 : bulk_msg_count_ = 0;
583 :
584 0 : bulk_mctx_arr_[bmca_prod_] = new KSyncBulkMsgContext(io_context_type,
585 0 : work_queue_index);
586 0 : bulk_mctx_arr_[bmca_prod_]->set_seqno(seqno);
587 : }
588 :
589 0 : return bulk_mctx_arr_[bmca_prod_];
590 : }
591 :
592 : return NULL;
593 : }
594 :
595 : // Try adding an io-context to bulk context. Returns
596 : // - true : if message can be added to bulk context
597 : // - false : if message cannot be added to bulk context
598 1020 : bool KSyncSock::TryAddToBulk(KSyncBulkMsgContext *bulk_message_context,
599 : IoContext *ioc) {
600 1020 : if (bulk_msg_count_ >= max_bulk_msg_count_)
601 0 : return false;
602 :
603 1020 : if (bulk_message_context->io_context_type() != ioc->type())
604 0 : return false;
605 :
606 1020 : if (use_wait_tree_) {
607 1020 : if (bulk_message_context->work_queue_index() != ioc->index())
608 0 : return false;
609 : }
610 :
611 1020 : bulk_buf_size_ += ioc->GetMsgLen();
612 1020 : bulk_msg_count_++;
613 :
614 1020 : bulk_message_context->Insert(ioc);
615 1020 : if (ioc->rx_buffer1()) {
616 144 : bulk_message_context->AddReceiveBuffer(ioc->rx_buffer1());
617 144 : ioc->reset_rx_buffer1();
618 :
619 : }
620 1020 : if (ioc->rx_buffer2()) {
621 144 : bulk_message_context->AddReceiveBuffer(ioc->rx_buffer2());
622 144 : ioc->reset_rx_buffer2();
623 : }
624 1020 : return true;
625 : }
626 :
627 1020 : bool KSyncSock::SendAsyncImpl(IoContext *ioc) {
628 : KSyncBulkMsgContext *bulk_message_context =
629 1020 : LocateBulkContext(ioc->GetSeqno(), ioc->type(), ioc->index());
630 : // Try adding message to bulk-message list
631 1020 : if (TryAddToBulk(bulk_message_context, ioc)) {
632 : // Message added to bulk-list. Nothing more to do
633 1020 : return true;
634 : }
635 :
636 : // Message cannot be added to bulk-list. Send the current list
637 0 : SendBulkMessage(bulk_message_context, bulk_seq_no_);
638 :
639 : // Allocate a new context and add message to it
640 0 : bulk_message_context = LocateBulkContext(ioc->GetSeqno(), ioc->type(),
641 : ioc->index());
642 0 : assert(TryAddToBulk(bulk_message_context, ioc));
643 0 : return true;
644 : }
645 :
646 :
647 : /////////////////////////////////////////////////////////////////////////////
648 : // KSyncSockNetlink routines
649 : /////////////////////////////////////////////////////////////////////////////
650 0 : KSyncSockNetlink::KSyncSockNetlink(boost::asio::io_context &ios, int protocol)
651 0 : : sock_(ios, protocol) {
652 0 : ReceiveBuffForceSize set_rcv_buf;
653 0 : set_rcv_buf = KSYNC_SOCK_RECV_BUFF_SIZE;
654 0 : boost::system::error_code ec;
655 0 : sock_.set_option(set_rcv_buf, ec);
656 0 : if (ec.value() != 0) {
657 0 : LOG(ERROR, "Error Changing netlink receive sock buffer size to " <<
658 : set_rcv_buf.value() << " error = " <<
659 : boost::system::system_error(ec).what());
660 : }
661 0 : boost::asio::socket_base::receive_buffer_size rcv_buf_size;
662 0 : boost::system::error_code ec1;
663 0 : sock_.get_option(rcv_buf_size, ec);
664 0 : LOG(INFO, "Current receive sock buffer size is " << rcv_buf_size.value());
665 0 : }
666 :
667 0 : KSyncSockNetlink::~KSyncSockNetlink() {
668 0 : }
669 :
670 0 : void KSyncSockNetlink::Init(io_service &ios, int protocol, bool use_work_queue,
671 : const std::string &cpu_pin_policy) {
672 0 : KSyncSock::SetSockTableEntry(new KSyncSockNetlink(ios, protocol));
673 0 : KSyncSock::Init(use_work_queue, cpu_pin_policy);
674 0 : }
675 :
676 0 : uint32_t KSyncSockNetlink::GetSeqno(char *data) {
677 0 : return GetNetlinkSeqno(data);
678 : }
679 :
680 0 : bool KSyncSockNetlink::IsMoreData(char *data) {
681 0 : return NetlinkMsgDone(data);
682 : }
683 :
684 0 : bool KSyncSockNetlink::Validate(char *data) {
685 0 : return ValidateNetlink(data);
686 : }
687 :
688 : //netlink socket class for interacting with kernel
689 0 : void KSyncSockNetlink::AsyncSendTo(KSyncBufferList *iovec, uint32_t seq_no,
690 : HandlerCb cb) {
691 0 : ResetNetlink(nl_client_);
692 0 : KSyncBufferList::iterator it = iovec->begin();
693 0 : iovec->insert(it, buffer((char *)nl_client_->cl_buf,
694 0 : nl_client_->cl_buf_offset));
695 0 : UpdateNetlink(nl_client_, bulk_buf_size_, seq_no);
696 :
697 0 : boost::asio::netlink::raw::endpoint ep;
698 0 : sock_.async_send_to(*iovec, ep, cb);
699 0 : }
700 :
701 0 : size_t KSyncSockNetlink::SendTo(KSyncBufferList *iovec, uint32_t seq_no) {
702 0 : ResetNetlink(nl_client_);
703 0 : KSyncBufferList::iterator it = iovec->begin();
704 0 : iovec->insert(it, buffer((char *)nl_client_->cl_buf,
705 0 : nl_client_->cl_buf_offset));
706 0 : UpdateNetlink(nl_client_, bulk_buf_size_, seq_no);
707 :
708 0 : boost::asio::netlink::raw::endpoint ep;
709 0 : return sock_.send_to(*iovec, ep);
710 : }
711 :
712 : // Static method to decode non-bulk message
713 2 : void KSyncSockNetlink::NetlinkDecoder(char *data, SandeshContext *ctxt) {
714 2 : assert(ValidateNetlink(data));
715 2 : char *buf = NULL;
716 2 : uint32_t buf_len = 0;
717 2 : GetNetlinkPayload(data, &buf, &buf_len);
718 2 : DecodeSandeshMessages(buf, buf_len, ctxt, NLA_ALIGNTO);
719 2 : }
720 :
721 0 : bool KSyncSockNetlink::Decoder(char *data, AgentSandeshContext *context) {
722 0 : NetlinkDecoder(data, context);
723 0 : return true;
724 : }
725 :
726 : // Static method used in ksync_sock_user only
727 1066 : void KSyncSockNetlink::NetlinkBulkDecoder(char *data, SandeshContext *ctxt,
728 : bool more) {
729 1066 : assert(ValidateNetlink(data));
730 1066 : char *buf = NULL;
731 1066 : uint32_t buf_len = 0;
732 1066 : GetNetlinkPayload(data, &buf, &buf_len);
733 : KSyncBulkSandeshContext *bulk_sandesh_context =
734 1066 : dynamic_cast<KSyncBulkSandeshContext *>(ctxt);
735 1066 : bulk_sandesh_context->Decoder(buf, buf_len, NLA_ALIGNTO, more);
736 1066 : }
737 :
738 0 : bool KSyncSockNetlink::BulkDecoder(char *data,
739 : KSyncBulkSandeshContext *bulk_sandesh_context) {
740 : // Get sandesh buffer and buffer-length
741 0 : uint32_t buf_len = 0;
742 0 : char *buf = NULL;
743 0 : GetNetlinkPayload(data, &buf, &buf_len);
744 0 : return bulk_sandesh_context->Decoder(buf, buf_len, NLA_ALIGNTO, IsMoreData(data));
745 : }
746 :
747 0 : void KSyncSockNetlink::AsyncReceive(mutable_buffers_1 buf, HandlerCb cb) {
748 0 : sock_.async_receive(buf, cb);
749 0 : }
750 :
751 0 : void KSyncSockNetlink::Receive(mutable_buffers_1 buf) {
752 0 : sock_.receive(buf);
753 0 : struct nlmsghdr *nlh = buffer_cast<struct nlmsghdr *>(buf);
754 0 : if (nlh->nlmsg_type == NLMSG_ERROR) {
755 0 : LOG(ERROR, "Netlink error for seqno " << nlh->nlmsg_seq
756 : << " len " << nlh->nlmsg_len);
757 0 : assert(0);
758 : }
759 0 : }
760 :
761 : /////////////////////////////////////////////////////////////////////////////
762 : // KSyncSockUdp routines
763 : /////////////////////////////////////////////////////////////////////////////
764 : //Udp socket class for interacting with kernel
765 0 : KSyncSockUdp::KSyncSockUdp(boost::asio::io_context &ios, int port) :
766 0 : sock_(ios, boost::asio::ip::udp::endpoint(boost::asio::ip::udp::v4(), 0)),
767 0 : server_ep_(boost::asio::ip::address::from_string("127.0.0.1"), port) {
768 0 : }
769 :
770 0 : void KSyncSockUdp::Init(io_service &ios, int port,
771 : const std::string &cpu_pin_policy) {
772 0 : KSyncSock::SetSockTableEntry(new KSyncSockUdp(ios, port));
773 0 : KSyncSock::Init(false, cpu_pin_policy);
774 0 : }
775 :
776 0 : uint32_t KSyncSockUdp::GetSeqno(char *data) {
777 0 : struct uvr_msg_hdr *hdr = (struct uvr_msg_hdr *)data;
778 0 : return hdr->seq_no;
779 : }
780 :
781 0 : bool KSyncSockUdp::IsMoreData(char *data) {
782 0 : struct uvr_msg_hdr *hdr = (struct uvr_msg_hdr *)data;
783 0 : return ((hdr->flags & UVR_MORE) == UVR_MORE);
784 : }
785 :
786 : // We dont expect any non-bulk operation on UDP
787 0 : bool KSyncSockUdp::Decoder(char *data, AgentSandeshContext *context) {
788 0 : assert(0);
789 : return false;
790 : }
791 :
792 0 : bool KSyncSockUdp::BulkDecoder(char *data,
793 : KSyncBulkSandeshContext *bulk_sandesh_context) {
794 0 : struct uvr_msg_hdr *hdr = (struct uvr_msg_hdr *)data;
795 0 : uint32_t buf_len = hdr->msg_len;
796 0 : char *buf = data + sizeof(struct uvr_msg_hdr);
797 0 : return bulk_sandesh_context->Decoder(buf, buf_len, 1, IsMoreData(data));
798 : }
799 :
800 0 : void KSyncSockUdp::AsyncSendTo(KSyncBufferList *iovec, uint32_t seq_no,
801 : HandlerCb cb) {
802 : struct uvr_msg_hdr hdr;
803 0 : hdr.seq_no = seq_no;
804 0 : hdr.flags = 0;
805 0 : hdr.msg_len = bulk_buf_size_;
806 :
807 0 : KSyncBufferList::iterator it = iovec->begin();
808 0 : iovec->insert(it, buffer((char *)(&hdr), sizeof(hdr)));
809 :
810 0 : sock_.async_send_to(*iovec, server_ep_, cb);
811 0 : }
812 :
813 0 : size_t KSyncSockUdp::SendTo(KSyncBufferList *iovec, uint32_t seq_no) {
814 : struct uvr_msg_hdr hdr;
815 0 : hdr.seq_no = seq_no;
816 0 : hdr.flags = 0;
817 0 : hdr.msg_len = bulk_buf_size_;
818 :
819 0 : KSyncBufferList::iterator it = iovec->begin();
820 0 : iovec->insert(it, buffer((char *)(&hdr), sizeof(hdr)));
821 :
822 0 : return sock_.send_to(*iovec, server_ep_, MSG_DONTWAIT);
823 : }
824 :
825 0 : bool KSyncSockUdp::Validate(char *data) {
826 0 : return true;
827 : }
828 :
829 0 : void KSyncSockUdp::AsyncReceive(mutable_buffers_1 buf, HandlerCb cb) {
830 0 : boost::asio::ip::udp::endpoint ep;
831 0 : sock_.async_receive_from(buf, ep, cb);
832 0 : }
833 :
834 0 : void KSyncSockUdp::Receive(mutable_buffers_1 buf) {
835 0 : boost::asio::ip::udp::endpoint ep;
836 0 : sock_.receive_from(buf, ep);
837 0 : }
838 :
839 0 : void KSyncSock::ProcessDataInline(char *data) {
840 0 : KSyncBulkMsgContext *bulk_message_context = NULL;
841 : KSyncBulkSandeshContext *bulk_sandesh_context;
842 0 : uint32_t seqno = GetSeqno(data);
843 :
844 0 : assert(!use_wait_tree_);
845 0 : Validate(data);
846 :
847 0 : bulk_sandesh_context = GetBulkSandeshContext(seqno);
848 0 : bulk_message_context = bulk_mctx_arr_[bmca_cons_];
849 0 : assert(bulk_message_context->seqno() == seqno);
850 :
851 0 : bulk_sandesh_context->set_bulk_message_context(bulk_message_context);
852 0 : BulkDecoder(data, bulk_sandesh_context);
853 :
854 : // Remove the IoContext only on last netlink message
855 0 : if (IsMoreData(data) == false) {
856 0 : delete bulk_message_context;
857 0 : bmca_cons_++;
858 0 : if (bmca_cons_ >= KSYNC_BMC_ARR_SIZE) {
859 0 : bmca_cons_ = 0;
860 : }
861 : }
862 :
863 0 : return;
864 : }
865 :
866 : /////////////////////////////////////////////////////////////////////////////
867 : // KSyncIoContext routines
868 : /////////////////////////////////////////////////////////////////////////////
869 1020 : KSyncIoContext::KSyncIoContext(KSyncSock *sock, KSyncEntry *sync_entry,
870 : int msg_len, char *msg,
871 1020 : KSyncEntry::KSyncEvent event) :
872 : IoContext(msg, msg_len, 0,
873 1020 : sock->GetAgentSandeshContext(sync_entry->GetTableIndex()),
874 1020 : IoContext::IOC_KSYNC, sync_entry->GetTableIndex()),
875 1020 : entry_(sync_entry), event_(event), sock_(sock) {
876 1020 : SetSeqno(sock->AllocSeqNo(type(), index()));
877 1020 : }
878 :
879 1020 : void KSyncIoContext::Handler() {
880 1020 : sock_->EnqueueRxProcessData(entry_, event_);
881 1020 : }
882 :
883 0 : void KSyncIoContext::ErrorHandler(int err) {
884 0 : entry_->ErrorHandler(err, GetSeqno(), event_);
885 0 : }
886 :
887 : /////////////////////////////////////////////////////////////////////////////
888 : // Routines for KSyncBulkSandeshContext
889 : /////////////////////////////////////////////////////////////////////////////
890 8 : KSyncBulkSandeshContext::KSyncBulkSandeshContext() :
891 8 : AgentSandeshContext(), bulk_msg_context_(NULL) { }
892 :
893 0 : KSyncBulkSandeshContext::~KSyncBulkSandeshContext() {
894 0 : }
895 :
896 : // Sandesh responses for old context are done. Check for any errors
897 1020 : void KSyncBulkSandeshContext::IoContextDone() {
898 1020 : IoContext *io_context = &(*bulk_msg_context_->io_context_list_it_);
899 1020 : AgentSandeshContext *sandesh_context = io_context->GetSandeshContext();
900 :
901 1020 : sandesh_context->set_ksync_io_ctx(NULL);
902 1020 : if (sandesh_context->GetErrno() != 0 &&
903 0 : sandesh_context->GetErrno() != EEXIST) {
904 0 : io_context->ErrorHandler(sandesh_context->GetErrno());
905 : }
906 1020 : io_context->Handler();
907 1020 : }
908 :
909 1020 : void KSyncBulkSandeshContext::IoContextStart() {
910 1020 : bulk_msg_context_->vr_response_count_++;
911 1020 : IoContext &io_context = *bulk_msg_context_->io_context_list_it_;
912 1020 : AgentSandeshContext *sandesh_context = io_context.GetSandeshContext();
913 : sandesh_context->set_ksync_io_ctx
914 1020 : (static_cast<KSyncIoContext *>(&io_context));
915 1020 : }
916 :
917 : // Process the sandesh messages
918 : // There can be more then one sandesh messages in the netlink buffer.
919 : // Iterate and process all of them
920 1066 : bool KSyncBulkSandeshContext::Decoder(char *data, uint32_t len,
921 : uint32_t alignment, bool more) {
922 1066 : DecodeSandeshMessages(data, len, this, alignment);
923 2132 : assert(bulk_msg_context_->io_context_list_it_ !=
924 : bulk_msg_context_->io_context_list_.end());
925 1066 : if (more == true)
926 140 : return false;
927 :
928 926 : IoContextDone();
929 :
930 : // No more netlink messages. Validate that iterator points to last element
931 : // in IoContextList
932 926 : bulk_msg_context_->io_context_list_it_++;
933 1852 : assert(bulk_msg_context_->io_context_list_it_ ==
934 : bulk_msg_context_->io_context_list_.end());
935 926 : return true;
936 : }
937 :
938 1020 : void KSyncBulkSandeshContext::SetErrno(int err) {
939 1020 : AgentSandeshContext *context = GetSandeshContext();
940 1020 : context->SetErrno(err);
941 1020 : }
942 :
943 1164 : AgentSandeshContext *KSyncBulkSandeshContext::GetSandeshContext() {
944 1164 : assert(bulk_msg_context_->vr_response_count_);
945 2328 : return bulk_msg_context_->io_context_list_it_->GetSandeshContext();
946 : }
947 :
948 0 : void KSyncBulkSandeshContext::IfMsgHandler(vr_interface_req *req) {
949 0 : AgentSandeshContext *context = GetSandeshContext();
950 0 : context->IfMsgHandler(req);
951 0 : }
952 :
953 0 : void KSyncBulkSandeshContext::NHMsgHandler(vr_nexthop_req *req) {
954 0 : AgentSandeshContext *context = GetSandeshContext();
955 0 : context->NHMsgHandler(req);
956 0 : }
957 :
958 0 : void KSyncBulkSandeshContext::RouteMsgHandler(vr_route_req *req) {
959 0 : AgentSandeshContext *context = GetSandeshContext();
960 0 : context->RouteMsgHandler(req);
961 0 : }
962 :
963 0 : void KSyncBulkSandeshContext::MplsMsgHandler(vr_mpls_req *req) {
964 0 : AgentSandeshContext *context = GetSandeshContext();
965 0 : context->MplsMsgHandler(req);
966 0 : }
967 :
968 0 : void KSyncBulkSandeshContext::QosConfigMsgHandler(vr_qos_map_req *req) {
969 0 : AgentSandeshContext *context = GetSandeshContext();
970 0 : context->QosConfigMsgHandler(req);
971 0 : }
972 :
973 0 : void KSyncBulkSandeshContext::ForwardingClassMsgHandler(vr_fc_map_req *req) {
974 0 : AgentSandeshContext *context = GetSandeshContext();
975 0 : context->ForwardingClassMsgHandler(req);
976 0 : }
977 :
978 : // vr_response message is treated as delimiter in a bulk-context. So, move to
979 : // next io-context within bulk-message context.
980 1020 : int KSyncBulkSandeshContext::VrResponseMsgHandler(vr_response *resp) {
981 1020 : AgentSandeshContext *sandesh_context = NULL;
982 : // If this is first vr_reponse received, move io-context to first entry in
983 : // bulk context
984 1020 : if (bulk_msg_context_->vr_response_count_ == 0) {
985 926 : bulk_msg_context_->io_context_list_it_ =
986 926 : bulk_msg_context_->io_context_list_.begin();
987 : sandesh_context =
988 1852 : bulk_msg_context_->io_context_list_it_->GetSandeshContext();
989 926 : IoContextStart();
990 : } else {
991 : // Sandesh responses for old io-context are done.
992 : // Check for any errors and trigger state-machine for old io-context
993 94 : IoContextDone();
994 : // Move to the next io-context
995 94 : bulk_msg_context_->io_context_list_it_++;
996 188 : assert(bulk_msg_context_->io_context_list_it_ !=
997 : bulk_msg_context_->io_context_list_.end());
998 : sandesh_context =
999 188 : bulk_msg_context_->io_context_list_it_->GetSandeshContext();
1000 94 : IoContextStart();
1001 : }
1002 1020 : return sandesh_context->VrResponseMsgHandler(resp);
1003 : }
1004 :
1005 0 : void KSyncBulkSandeshContext::MirrorMsgHandler(vr_mirror_req *req) {
1006 0 : AgentSandeshContext *context = GetSandeshContext();
1007 0 : context->MirrorMsgHandler(req);
1008 0 : }
1009 :
1010 0 : void KSyncBulkSandeshContext::FlowMsgHandler(vr_flow_req *req) {
1011 0 : AgentSandeshContext *context = GetSandeshContext();
1012 0 : context->FlowMsgHandler(req);
1013 0 : }
1014 :
1015 144 : void KSyncBulkSandeshContext::FlowResponseHandler(vr_flow_response *req) {
1016 144 : AgentSandeshContext *context = GetSandeshContext();
1017 144 : context->FlowResponseHandler(req);
1018 144 : }
1019 :
1020 0 : void KSyncBulkSandeshContext::VrfAssignMsgHandler(vr_vrf_assign_req *req) {
1021 0 : AgentSandeshContext *context = GetSandeshContext();
1022 0 : context->VrfAssignMsgHandler(req);
1023 0 : }
1024 :
1025 0 : void KSyncBulkSandeshContext::VrfMsgHandler(vr_vrf_req *req) {
1026 0 : AgentSandeshContext *context = GetSandeshContext();
1027 0 : context->VrfMsgHandler(req);
1028 0 : }
1029 :
1030 0 : void KSyncBulkSandeshContext::VrfStatsMsgHandler(vr_vrf_stats_req *req) {
1031 0 : AgentSandeshContext *context = GetSandeshContext();
1032 0 : context->VrfStatsMsgHandler(req);
1033 0 : }
1034 :
1035 0 : void KSyncBulkSandeshContext::DropStatsMsgHandler(vr_drop_stats_req *req) {
1036 0 : AgentSandeshContext *context = GetSandeshContext();
1037 0 : context->DropStatsMsgHandler(req);
1038 0 : }
1039 :
1040 0 : void KSyncBulkSandeshContext::VxLanMsgHandler(vr_vxlan_req *req) {
1041 0 : AgentSandeshContext *context = GetSandeshContext();
1042 0 : context->VxLanMsgHandler(req);
1043 0 : }
1044 :
1045 0 : void KSyncBulkSandeshContext::VrouterOpsMsgHandler(vrouter_ops *req) {
1046 0 : AgentSandeshContext *context = GetSandeshContext();
1047 0 : context->VrouterOpsMsgHandler(req);
1048 0 : }
1049 :
1050 : /////////////////////////////////////////////////////////////////////////////
1051 : // KSyncBulkMsgContext routines
1052 : /////////////////////////////////////////////////////////////////////////////
1053 926 : KSyncBulkMsgContext::KSyncBulkMsgContext(IoContext::Type type,
1054 926 : uint32_t index) :
1055 926 : io_context_list_(), io_context_type_(type), work_queue_index_(index),
1056 926 : rx_buffer_index_(0), vr_response_count_(0), io_context_list_it_() {
1057 926 : }
1058 :
1059 0 : KSyncBulkMsgContext::KSyncBulkMsgContext(const KSyncBulkMsgContext &rhs) :
1060 0 : io_context_list_(), io_context_type_(rhs.io_context_type_),
1061 0 : work_queue_index_(rhs.work_queue_index_),
1062 0 : rx_buffer_index_(0), vr_response_count_(0), io_context_list_it_() {
1063 0 : assert(rhs.vr_response_count_ == 0);
1064 0 : assert(rhs.rx_buffer_index_ == 0);
1065 0 : assert(rhs.io_context_list_.size() == 0);
1066 0 : }
1067 :
1068 : struct IoContextDisposer {
1069 1020 : void operator() (IoContext *io_context) { delete io_context; }
1070 : };
1071 :
1072 926 : KSyncBulkMsgContext::~KSyncBulkMsgContext() {
1073 926 : assert(vr_response_count_ == io_context_list_.size());
1074 926 : io_context_list_.clear_and_dispose(IoContextDisposer());
1075 1028 : for (uint32_t i = 0; i < rx_buffer_index_; i++) {
1076 102 : delete[] rx_buffers_[i];
1077 : }
1078 926 : }
1079 :
1080 1066 : char *KSyncBulkMsgContext::GetReceiveBuffer() {
1081 1066 : if (rx_buffer_index_ == 0)
1082 880 : return new char[KSyncSock::kBufLen];
1083 :
1084 186 : return rx_buffers_[--rx_buffer_index_];
1085 : }
1086 :
1087 288 : void KSyncBulkMsgContext::AddReceiveBuffer(char *buff) {
1088 288 : assert(rx_buffer_index_ < kMaxRxBufferCount);
1089 288 : rx_buffers_[rx_buffer_index_++] = buff;
1090 288 : }
1091 :
1092 1020 : void KSyncBulkMsgContext::Insert(IoContext *ioc) {
1093 1020 : io_context_list_.push_back(*ioc);
1094 1020 : return;
1095 : }
1096 :
1097 926 : void KSyncBulkMsgContext::Data(KSyncBufferList *iovec) {
1098 926 : IoContextList::iterator it = io_context_list_.begin();
1099 3892 : while (it != io_context_list_.end()) {
1100 3060 : iovec->push_back(buffer(it->GetMsg(), it->GetMsgLen()));
1101 1020 : it++;
1102 : }
1103 926 : }
|