LCOV - code coverage report
Current view: top level - ksync - ksync_sock_tcp.cc (source / functions) Hit Total Coverage
Test: OpenSDN C/C++ coverage (all TARGET_SET jobs) Lines: 0 197 0.0 %
Date: 2026-06-04 02:06:09 Functions: 0 27 0.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*
       2             :  * Copyright (c) 2018 Juniper Networks, Inc. All rights reserved.
       3             :  */
       4             : 
       5             : #include "ksync_sock.h"
       6             : 
       7             : #include <boost/asio.hpp>
       8             : #include <boost/bind/bind.hpp>
       9             : 
      10             : using namespace boost::asio;
      11             : using namespace boost::placeholders;
      12             : 
      13             : /////////////////////////////////////////////////////////////////////////////
      14             : // KSyncSockTcp routines
      15             : /////////////////////////////////////////////////////////////////////////////
      16             : //TCP socket class for interacting with vrouter
      17           0 : KSyncSockTcp::KSyncSockTcp(EventManager *evm,
      18           0 :     boost::asio::ip::address ip_address, int port) : TcpServer(evm), evm_(evm),
      19           0 :     session_(NULL), server_ep_(ip_address, port), connect_complete_(false) {
      20             : 
      21           0 :     reset_use_wait_tree();
      22           0 :     set_process_data_inline();
      23           0 :     if (rx_buff_ == NULL) {
      24           0 :         rx_buff_ = new char[kBufLen];
      25             :     }
      26           0 :     rx_buff_rem_ = new char[kBufLen];
      27           0 :     remain_ = 0;
      28             : 
      29           0 :     session_ = CreateSession();
      30           0 :     Connect(session_, server_ep_);
      31           0 : }
      32             : 
      33           0 : void KSyncSockTcp::Init(EventManager *evm, boost::asio::ip::address ip_addr,
      34             :                         int port, const std::string &cpu_pin_policy) {
      35           0 :     KSyncSock::SetSockTableEntry(new KSyncSockTcp(evm, ip_addr, port));
      36           0 :     SetNetlinkFamilyId(10);
      37           0 :     KSyncSock::Init(false, cpu_pin_policy);
      38           0 : }
      39             : 
      40           0 : TcpSession* KSyncSockTcp::AllocSession(Socket *socket) {
      41           0 :     TcpSession *session = new KSyncSockTcpSession(this, socket, false);
      42           0 :     session->set_observer(boost::bind(&KSyncSockTcp::OnSessionEvent,
      43             :                                       this, _1, _2));
      44           0 :     return session;
      45             : }
      46             : 
      47           0 : uint32_t KSyncSockTcp::GetSeqno(char *data) {
      48           0 :     return GetNetlinkSeqno(data);
      49             : }
      50             : 
      51           0 : bool KSyncSockTcp::IsMoreData(char *data) {
      52           0 :     return NetlinkMsgDone(data);
      53             : }
      54             : 
      55           0 : size_t KSyncSockTcp::SendTo(KSyncBufferList *iovec, uint32_t seq_no) {
      56           0 :     size_t len = 0, ret;
      57             :     struct msghdr msg;
      58           0 :     struct iovec iov[max_bulk_msg_count_*2];
      59             :     int i, fd;
      60             : 
      61           0 :     memset(&msg, 0, sizeof(msg));
      62           0 :     msg.msg_iov = iov;
      63             : 
      64           0 :     ResetNetlink(nl_client_);
      65           0 :     int offset = nl_client_->cl_buf_offset;
      66           0 :     UpdateNetlink(nl_client_, bulk_buf_size_, seq_no);
      67             : 
      68           0 :     KSyncBufferList::iterator it = iovec->begin();
      69           0 :     iovec->insert(it, buffer((char *)nl_client_->cl_buf, offset));
      70             : 
      71           0 :     int count = iovec->size();
      72           0 :     for(i = 0; i < count; i++) {
      73           0 :         mutable_buffers_1 buf = iovec->at(i);
      74           0 :         size_t buf_size = boost::asio::buffer_size(buf);
      75           0 :         void* cbuf = boost::asio::buffer_cast<void*>(buf);
      76           0 :         len += buf_size;
      77           0 :         iov[i].iov_base = cbuf;
      78           0 :         iov[i].iov_len = buf_size;
      79             :     }
      80             : 
      81           0 :     msg.msg_iovlen = i;
      82           0 :     fd = tcp_socket_->native_handle();
      83           0 :     ret = sendmsg(fd, &msg, 0);
      84           0 :     if (ret != len) {
      85           0 :         LOG(ERROR, "sendmsg failure " << ret << "len " << len);
      86             :     }
      87           0 :     return len;
      88           0 : }
      89             : 
      90           0 : void KSyncSockTcp::AsyncSendTo(KSyncBufferList *iovec, uint32_t seq_no,
      91             :                                HandlerCb cb) {
      92           0 :     SendTo(iovec, seq_no);
      93           0 :     return;
      94             : }
      95             : 
      96           0 : bool KSyncSockTcp::Validate(char *data) {
      97           0 :     return ValidateNetlink(data);
      98             : }
      99             : 
     100           0 : bool KSyncSockTcp::Decoder(char *data, AgentSandeshContext *context) {
     101           0 :     KSyncSockNetlink::NetlinkDecoder(data, context);
     102           0 :     return true;
     103             : }
     104             : 
     105           0 : bool KSyncSockTcp::BulkDecoder(char *data,
     106             :                                KSyncBulkSandeshContext *bulk_sandesh_context) {
     107             :     // Get sandesh buffer and buffer-length
     108           0 :     uint32_t buf_len = 0;
     109           0 :     char *buf = NULL;
     110           0 :     GetNetlinkPayload(data, &buf, &buf_len);
     111           0 :     return bulk_sandesh_context->Decoder(buf, buf_len, NLA_ALIGNTO, IsMoreData(data));
     112             : }
     113             : 
     114           0 : void KSyncSockTcp::AsyncReceive(mutable_buffers_1 buf, HandlerCb cb) {
     115             :     //Data would be read from ksync tcp session
     116             :     //hence no socket operation would be required
     117           0 : }
     118             : 
     119           0 : void KSyncSockTcp::Receive(mutable_buffers_1 buf) {
     120           0 :     uint32_t bytes_read = 0;
     121           0 :     boost::system::error_code ec;
     122           0 :     const struct nlmsghdr *nlh = NULL;
     123             : 
     124             :     //Create a buffer to read netlink header first
     125             :     mutable_buffers_1 netlink_header(buffer_cast<void *>(buf),
     126           0 :                                      sizeof(struct nlmsghdr));
     127             : 
     128           0 :     bool blocking_socket = session_->socket()->non_blocking();
     129           0 :     session_->socket()->non_blocking(false, ec);
     130           0 :     while (bytes_read < sizeof(struct nlmsghdr)) {
     131             :         mutable_buffers_1 buffer =
     132           0 :             static_cast<mutable_buffers_1>(netlink_header + bytes_read);
     133           0 :         bytes_read += session_->socket()->receive(buffer, 0, ec);
     134           0 :         if (ec.failed()) {
     135           0 :             assert(0);
     136             :         }
     137             :         //Data read is lesser than netlink header
     138             :         //continue reading
     139           0 :         if (bytes_read == sizeof(struct nlmsghdr)) {
     140           0 :             nlh =  buffer_cast<struct nlmsghdr *>(buf);
     141             :         }
     142             :     }
     143             : 
     144           0 :     if (nlh->nlmsg_type == NLMSG_ERROR) {
     145           0 :         LOG(ERROR, "Netlink error for seqno " << nlh->nlmsg_seq
     146             :                 << " len " << nlh->nlmsg_len);
     147           0 :         assert(0);
     148             :     }
     149             : 
     150           0 :     bytes_read = 0;
     151           0 :     uint32_t payload_size = nlh->nlmsg_len - sizeof(struct nlmsghdr);
     152             :     //Read data
     153           0 :     mutable_buffers_1 data(buffer_cast<void *>(buf + sizeof(struct nlmsghdr)),
     154           0 :                            payload_size);
     155             : 
     156           0 :     while (bytes_read < payload_size) {
     157             :         mutable_buffers_1 buffer =
     158           0 :             static_cast<mutable_buffers_1>(data + bytes_read);
     159           0 :         bytes_read += session_->socket()->receive(buffer, 0, ec);
     160           0 :         if (ec.failed()) {
     161           0 :             assert(0);
     162             :         }
     163             :     }
     164           0 :     session_->socket()->non_blocking(blocking_socket, ec);
     165           0 : }
     166             : 
     167           0 : bool KSyncSockTcp::ReceiveMsg(const u_int8_t *msg, size_t size) {
     168           0 :     AgentSandeshContext *ctxt = KSyncSock::GetAgentSandeshContext(0);
     169           0 :     ctxt->SetErrno(0);
     170           0 :     ProcessDataInline((char *) msg);
     171           0 :     return true;
     172             : }
     173             : 
     174           0 : bool KSyncSockTcp::Run() {
     175           0 :     AgentSandeshContext *ctxt = KSyncSock::GetAgentSandeshContext(0);
     176           0 :     int fd = tcp_socket_->native_handle();
     177             : 
     178             :     while (1) {
     179           0 :         char *bufp = rx_buff_;
     180           0 :         struct nlmsghdr *nlh = NULL;
     181             :         struct nlmsghdr tnlh;
     182           0 :         int offset = 0;
     183           0 :         int bytes_transferred = 0;
     184             : 
     185           0 :         bytes_transferred = recv(fd, rx_buff_, kBufLen, 0);
     186           0 :         if (bytes_transferred <= 0) {
     187           0 :              LOG(ERROR, "Connection to dpdk-vrouter lost.");
     188           0 :              sleep(10);
     189           0 :              exit(EXIT_FAILURE);
     190             :         }
     191             : 
     192           0 :         if (remain_ != 0) {
     193           0 :             if (remain_ < sizeof(struct nlmsghdr)) {
     194           0 :                 memcpy((char *)&tnlh, rx_buff_rem_, remain_);
     195           0 :                 memcpy(((char *)&tnlh) + remain_, rx_buff_,
     196           0 :                         (sizeof(struct nlmsghdr) - remain_));
     197           0 :                 nlh =  &tnlh;
     198             :             } else {
     199           0 :                 nlh =  (struct nlmsghdr *)rx_buff_rem_;
     200             :             }
     201             : 
     202           0 :             if (remain_ > nlh->nlmsg_len)
     203           0 :                 assert(0);
     204             : 
     205           0 :             memcpy(rx_buff_rem_+remain_, rx_buff_, nlh->nlmsg_len - remain_);
     206           0 :             bufp += (nlh->nlmsg_len - remain_);
     207           0 :             ctxt->SetErrno(0);
     208           0 :             ProcessDataInline(rx_buff_rem_);
     209           0 :             offset = nlh->nlmsg_len - remain_;
     210             :         }
     211             : 
     212           0 :         while (offset < bytes_transferred) {
     213           0 :             if ((unsigned int)(bytes_transferred - offset) > (sizeof(struct nlmsghdr))) {
     214           0 :                 nlh =  (struct nlmsghdr *)(rx_buff_ + offset);
     215           0 :                 if ((unsigned int)(bytes_transferred - offset) > nlh->nlmsg_len) {
     216           0 :                     ctxt->SetErrno(0);
     217           0 :                     ProcessDataInline(rx_buff_ + offset);
     218           0 :                     offset += nlh->nlmsg_len;
     219             :                 } else {
     220           0 :                     break;
     221             :                 }
     222             :             } else {
     223           0 :                 break;
     224             :             }
     225             :         }
     226             : 
     227           0 :         remain_ = bytes_transferred - offset;
     228           0 :         if (remain_) {
     229           0 :             memcpy(rx_buff_rem_, rx_buff_ + offset, bytes_transferred - offset);
     230             :         }
     231           0 :     }
     232             : 
     233             :     return true;
     234             : }
     235             : 
     236             : class KSyncSockTcpReadTask : public Task {
     237             : public:
     238           0 :     KSyncSockTcpReadTask(TaskScheduler *scheduler, KSyncSockTcp *sock) :
     239           0 :         Task(scheduler->GetTaskId("Ksync::KSyncSockTcpRead"), 0), sock_(sock) {
     240           0 :     }
     241           0 :     ~KSyncSockTcpReadTask() {
     242           0 :     }
     243             : 
     244           0 :     bool Run() {
     245           0 :         sock_->Run();
     246           0 :         return true;
     247             :     }
     248           0 :     std::string Description() const { return "KSyncSockTcpRead"; }
     249             : private:
     250             :     KSyncSockTcp *sock_;
     251             : 
     252             : };
     253             : 
     254           0 : void KSyncSockTcp::AsyncReadStart() {
     255             :     static int started = 0;
     256           0 :     boost::system::error_code ec;
     257             : 
     258           0 :     if (!started) {
     259           0 :         TaskScheduler *scheduler = TaskScheduler::GetInstance();
     260           0 :         KSyncSockTcpReadTask *task = new KSyncSockTcpReadTask(scheduler, this);
     261           0 :         tcp_socket_->non_blocking(false, ec);
     262           0 :         scheduler->Enqueue(task);
     263           0 :         started = 1;
     264             :     }
     265           0 : }
     266             : 
     267           0 : void KSyncSockTcp::OnSessionEvent(TcpSession *session,
     268             :                                   TcpSession::Event event) {
     269           0 :     switch (event) {
     270           0 :     case TcpSession::CONNECT_FAILED:
     271             :         //Retry
     272           0 :         Connect(session_, server_ep_);
     273           0 :         break;
     274           0 :     case TcpSession::CLOSE:
     275           0 :         LOG(ERROR, "Connection to dpdk-vrouter lost.");
     276           0 :         sleep(10);
     277           0 :         exit(EXIT_FAILURE);
     278             :         break;
     279           0 :     case TcpSession::CONNECT_COMPLETE:
     280           0 :         tcp_socket_ = session_->socket();
     281           0 :         connect_complete_ = true;
     282           0 :         session_->SetTcpNoDelay();
     283           0 :         session_->SetTcpSendBufSize(max_bulk_buf_size_*16);
     284           0 :         session_->SetTcpRecvBufSize(max_bulk_buf_size_*16);
     285           0 :     default:
     286           0 :         break;
     287             :     }
     288           0 : }
     289             : 
     290             : /////////////////////////////////////////////////////////////////////////////
     291             : // KSyncSockTcpSession routines
     292             : /////////////////////////////////////////////////////////////////////////////
     293           0 : KSyncSockTcpSession::KSyncSockTcpSession(TcpServer *server, Socket *sock,
     294           0 :     bool async_ready) : TcpSession(server, sock, async_ready) {
     295           0 :     KSyncSockTcp *tcp_ptr = static_cast<KSyncSockTcp *>(server);
     296           0 :     reader_ = new KSyncSockTcpSessionReader(this,
     297           0 :                        boost::bind(&KSyncSockTcp::ReceiveMsg, tcp_ptr, _1, _2));
     298           0 : }
     299             : 
     300           0 : KSyncSockTcpSession::~KSyncSockTcpSession() {
     301           0 :     if (reader_) {
     302           0 :         delete reader_;
     303             :     }
     304           0 : }
     305             : 
     306           0 : void KSyncSockTcpSession::OnRead(Buffer buffer) {
     307           0 :     reader_->OnRead(buffer);
     308           0 : }
     309             : 
     310           0 : KSyncSockTcpSessionReader::KSyncSockTcpSessionReader(
     311           0 :     TcpSession *session, ReceiveCallback callback) :
     312           0 :     TcpMessageReader(session, callback) {
     313           0 : }
     314             : 
     315           0 : int KSyncSockTcpSessionReader::MsgLength(Buffer buffer, int offset) {
     316           0 :     size_t size = TcpSession::BufferSize(buffer);
     317           0 :     int remain = size - offset;
     318           0 :     if (remain < GetHeaderLenSize()) {
     319           0 :         return -1;
     320             :     }
     321             : 
     322             :     //Byte ordering?
     323             :     const struct nlmsghdr *nlh =
     324           0 :         (const struct nlmsghdr *)(TcpSession::BufferData(buffer) + offset);
     325           0 :     return nlh->nlmsg_len;
     326             : }

Generated by: LCOV version 1.14