LCOV - code coverage report
Current view: top level - ksync - ksync_sock_uds.cc (source / functions) Hit Total Coverage
Test: OpenSDN C/C++ coverage (all TARGET_SET jobs) Lines: 0 158 0.0 %
Date: 2026-06-22 02:21:21 Functions: 0 17 0.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*
       2             :  * Copyright (c) 2018 Juniper Networks, Inc. All rights reserved.
       3             :  */
       4             : 
       5             : #include "ksync_sock.h"
       6             : 
       7             : #include <boost/asio.hpp>
       8             : #include <boost/bind/bind.hpp>
       9             : #include <errno.h>
      10             : 
      11             : class AgentParam;
      12             : 
      13             : using namespace boost::asio;
      14             : using namespace boost::placeholders;
      15             : 
      16             : /////////////////////////////////////////////////////////////////////////////
      17             : // KSyncSockUds routines
      18             : /////////////////////////////////////////////////////////////////////////////
      19             : //Unix domain socket class for interacting with dpdk vrouter
      20             : 
      21             : class KSyncSockUdsReadTask : public Task {
      22             : public:
      23           0 :     KSyncSockUdsReadTask(TaskScheduler *scheduler, KSyncSockUds *queue) :
      24           0 :         Task(scheduler->GetTaskId("Ksync::KSyncSockUdsRead"), 0), queue_(queue) {
      25           0 :     }
      26           0 :     ~KSyncSockUdsReadTask() {
      27           0 :     }
      28             : 
      29           0 :     bool Run() {
      30           0 :         queue_->Run();
      31           0 :         return true;
      32             :     }
      33           0 :     std::string Description() const { return "KSyncSockUdsRead"; }
      34             : private:
      35             :     KSyncSockUds *queue_;
      36             : 
      37             : };
      38             : 
      39             : 
      40             : 
      41             : string KSyncSockUds::sockpath_= KSYNC_AGENT_VROUTER_SOCK_PATH;
      42             : 
      43           0 : KSyncSockUds::KSyncSockUds(boost::asio::io_context &ios) :
      44           0 :     sock_(ios),
      45           0 :     server_ep_(sockpath_),
      46           0 :     rx_buff_(NULL),
      47           0 :     rx_buff_q_(NULL),
      48           0 :     remain_(0),
      49           0 :     socket_(0),
      50           0 :     connected_(false) {
      51           0 :     boost::system::error_code ec;
      52           0 :     reset_use_wait_tree();
      53           0 :     set_process_data_inline();
      54           0 : retry:;
      55           0 :     sock_.connect(server_ep_, ec);
      56           0 :     if (ec) {
      57           0 :         sleep(1);
      58           0 :         goto retry;
      59             :     }
      60           0 :     socket_ = sock_.native_handle();
      61           0 :     connected_ = true;
      62           0 :     rx_buff_   = new char[10*kBufLen];
      63           0 :     rx_buff_q_ = new char[10*kBufLen];
      64           0 : }
      65             : 
      66           0 : bool KSyncSockUds::Run() {
      67           0 :     AgentSandeshContext *ctxt = KSyncSock::GetAgentSandeshContext(0);
      68           0 :     char *ret_buff = new char[1024*kBufLen];
      69           0 :     boost::system::error_code ec;
      70             : 
      71             :     // Read data from the socket and append it to the existing
      72             :     // unprocessed data in the local buffer.
      73             :     while (1) {
      74           0 :         char *bufp = rx_buff_;
      75           0 :         struct nlmsghdr *nlh = NULL;
      76             :         struct nlmsghdr tnlh;
      77           0 :         size_t offset = 0;
      78             :         int ret_val;
      79           0 :         size_t bytes_transferred = 0;
      80           0 :         bytes_transferred = ret_val = recv(socket_, rx_buff_, 10*kBufLen, 0);
      81           0 :         if (ret_val == 0) {
      82             :             // connection reset by peer
      83             :             // close socket and exit
      84           0 :             sock_.close(ec);
      85           0 :             LOG(INFO, " dpdk vrouter is down, exiting.. errno:" << errno);
      86           0 :             exit(0);
      87             :         }
      88           0 :         if (ret_val < 0) {
      89           0 :             if (errno != EAGAIN) {
      90           0 :                 sock_.close(ec);
      91           0 :                 connected_ = false;
      92           0 : retry:;
      93           0 :                 sock_.connect(server_ep_, ec);
      94           0 :                 if (ec) {
      95           0 :                     sleep(1);
      96           0 :                     goto retry;
      97             :                 }
      98           0 :                 socket_ = sock_.native_handle();
      99           0 :                 connected_ = true;
     100             :             }
     101           0 :             continue;
     102             :         }
     103             : 
     104           0 :         if (remain_ != 0) {
     105           0 :             if (remain_ < sizeof(struct nlmsghdr)) {
     106           0 :                 memcpy((char *)&tnlh, rx_buff_q_, remain_);
     107           0 :                 memcpy(((char *)&tnlh) + remain_, rx_buff_,
     108           0 :                         (sizeof(struct nlmsghdr) - remain_));
     109           0 :                 nlh =  &tnlh;
     110             :             } else {
     111           0 :                 nlh =  (struct nlmsghdr *)rx_buff_q_;
     112             :             }
     113           0 :             if (remain_ > nlh->nlmsg_len)
     114           0 :                 assert(0);
     115           0 :             memcpy(ret_buff, rx_buff_q_, remain_);
     116           0 :             memcpy(ret_buff+remain_, rx_buff_, nlh->nlmsg_len - remain_);
     117           0 :             bufp += (nlh->nlmsg_len - remain_);
     118           0 :             ctxt->SetErrno(0);
     119           0 :             ProcessDataInline(ret_buff);
     120           0 :             offset = nlh->nlmsg_len - remain_;
     121             :         }
     122           0 :         while (offset < bytes_transferred) {
     123           0 :             if ((bytes_transferred - offset) > (sizeof(struct nlmsghdr))) {
     124           0 :                 nlh =  (struct nlmsghdr *)(rx_buff_ + offset);
     125           0 :                 if ((bytes_transferred - offset) > nlh->nlmsg_len) {
     126           0 :                     memcpy(ret_buff, rx_buff_ + offset, nlh->nlmsg_len);
     127           0 :                     ctxt->SetErrno(0);
     128           0 :                     ProcessDataInline(ret_buff);
     129           0 :                     offset += nlh->nlmsg_len;
     130             :                 } else {
     131           0 :                     break;
     132             :                 }
     133             :             } else {
     134           0 :                 break;
     135             :             }
     136             :         }
     137           0 :         memcpy(rx_buff_q_, rx_buff_ + offset, bytes_transferred - offset);
     138           0 :         remain_ = bytes_transferred - offset;
     139           0 :     }
     140             :     return true;
     141             : }
     142             : 
     143           0 : void KSyncSockUds::Init(io_service &ios, const std::string &cpu_pin_policy,
     144             :     const std::string &sockpathvr) {
     145           0 :     KSyncSock::SetSockTableEntry(new KSyncSockUds(ios));
     146           0 :     SetNetlinkFamilyId(10);
     147           0 :     KSyncSock::Init(false, cpu_pin_policy);
     148           0 :     sockpath_ = sockpathvr;
     149           0 : }
     150             : 
     151           0 : uint32_t KSyncSockUds::GetSeqno(char *data) {
     152           0 :     return GetNetlinkSeqno(data);
     153             : }
     154             : 
     155           0 : bool KSyncSockUds::IsMoreData(char *data) {
     156           0 :     return NetlinkMsgDone(data);
     157             : }
     158             : 
     159           0 : bool KSyncSockUds::Decoder(char *data, AgentSandeshContext *context) {
     160           0 :     KSyncSockNetlink::NetlinkDecoder(data, context);
     161           0 :     return true;
     162             : }
     163             : 
     164           0 : bool KSyncSockUds::BulkDecoder(char *data,
     165             :                                KSyncBulkSandeshContext *bulk_sandesh_context) {
     166             :     // Get sandesh buffer and buffer-length
     167           0 :     uint32_t buf_len = 0;
     168           0 :     char *buf = NULL;
     169           0 :     GetNetlinkPayload(data, &buf, &buf_len);
     170           0 :     return bulk_sandesh_context->Decoder(buf, buf_len, NLA_ALIGNTO, IsMoreData(data));
     171             : }
     172             : 
     173           0 : void KSyncSockUds::AsyncSendTo(KSyncBufferList *iovec, uint32_t seq_no,
     174             :                                HandlerCb cb) {
     175           0 :     if (connected_ == true)
     176           0 :         SendTo(iovec, seq_no);
     177           0 : }
     178             : 
     179           0 : size_t KSyncSockUds::SendTo(KSyncBufferList *iovec, uint32_t seq_no) {
     180           0 :     size_t len = 0, ret;
     181             :     struct msghdr msg;
     182           0 :     struct iovec iov[max_bulk_msg_count_*2];
     183             :     int i;
     184             : 
     185           0 :     memset(&msg, 0, sizeof(msg));
     186           0 :     msg.msg_iov = iov;
     187             : 
     188           0 :     ResetNetlink(nl_client_);
     189           0 :     int offset = nl_client_->cl_buf_offset;
     190           0 :     UpdateNetlink(nl_client_, bulk_buf_size_, seq_no);
     191             : 
     192           0 :     KSyncBufferList::iterator it = iovec->begin();
     193           0 :     iovec->insert(it, buffer((char *)nl_client_->cl_buf, offset));
     194             : 
     195           0 :     int count = iovec->size();
     196           0 :     for(i = 0; i < count; i++) {
     197           0 :         mutable_buffers_1 buf = iovec->at(i);
     198           0 :         size_t buf_size = boost::asio::buffer_size(buf);
     199           0 :         void* cbuf = boost::asio::buffer_cast<void*>(buf);
     200           0 :         len += buf_size;
     201           0 :         iov[i].iov_base = cbuf;
     202           0 :         iov[i].iov_len = buf_size;
     203             :     }
     204             : 
     205           0 :     msg.msg_iovlen = i;
     206           0 :     ret = sendmsg(socket_, &msg, 0);
     207           0 :     if (ret != len) {
     208           0 :         LOG(ERROR, "sendmsg failure " << ret << "len " << len);
     209             :     }
     210           0 :     return len;
     211           0 : }
     212             : 
     213           0 : bool KSyncSockUds::Validate(char *data) {
     214           0 :     return true;
     215             : }
     216             : 
     217           0 : void KSyncSockUds::AsyncReceive(mutable_buffers_1 buf, HandlerCb cb) {
     218             :     static int started = 0;
     219           0 :     if (!started) {
     220           0 :         TaskScheduler *scheduler = TaskScheduler::GetInstance();
     221             :         //Receive is handled in a separate Run() thread
     222           0 :         KSyncSockUdsReadTask *task = new KSyncSockUdsReadTask(scheduler, this);
     223           0 :         scheduler->Enqueue(task);
     224           0 :         started = 1;
     225             :     }
     226           0 : }
     227             : 
     228           0 : void KSyncSockUds::Receive(mutable_buffers_1 buf) {
     229           0 :     boost::system::error_code ec;
     230           0 :     uint32_t bytes_read = 0;
     231           0 :     const struct nlmsghdr *nlh = NULL;
     232             : 
     233           0 :     char *netlink_header(buffer_cast<char *>(buf));
     234             : 
     235           0 :     while (bytes_read < sizeof(struct nlmsghdr)) {
     236           0 :         char *buffer = netlink_header + bytes_read;
     237           0 :         bytes_read += recv(socket_, buffer, sizeof(struct nlmsghdr) - bytes_read, 0);
     238             :         //Data read is lesser than netlink header
     239             :         //continue reading
     240           0 :         if (bytes_read == sizeof(struct nlmsghdr)) {
     241           0 :             nlh =  buffer_cast<struct nlmsghdr *>(buf);
     242             :         }
     243             :     }
     244             : 
     245           0 :     if (nlh->nlmsg_type == NLMSG_ERROR) {
     246           0 :         LOG(ERROR, "Netlink error for seqno " << nlh->nlmsg_seq
     247             :                 << " len " << nlh->nlmsg_len);
     248           0 :         assert(0);
     249             :     }
     250             : 
     251           0 :     bytes_read = 0;
     252           0 :     uint32_t payload_size = nlh->nlmsg_len - sizeof(struct nlmsghdr);
     253           0 :     char *data(buffer_cast<char *>(buf + sizeof(struct nlmsghdr)));
     254             : 
     255           0 :     while (bytes_read < payload_size) {
     256           0 :         char *buffer = data + bytes_read;
     257           0 :         bytes_read += recv(socket_, buffer, payload_size - bytes_read, 0);
     258             :     }
     259           0 : }

Generated by: LCOV version 1.14