Line data Source code
1 : /*
2 : * Copyright (c) 2013 Juniper Networks, Inc. All rights reserved.
3 : */
4 :
5 : #ifndef ctrlplane_ksync_sock_h
6 : #define ctrlplane_ksync_sock_h
7 :
8 : #include <atomic>
9 : #include <vector>
10 : #include <mutex>
11 :
12 : #include <boost/asio.hpp>
13 : #include <boost/asio/buffer.hpp>
14 :
15 : #include <boost/asio/netlink_protocol.hpp>
16 : #include <boost/asio/netlink_endpoint.hpp>
17 :
18 : #include <base/queue_task.h>
19 : #include <sandesh/common/vns_constants.h>
20 : #include <sandesh/common/vns_types.h>
21 : #include <io/tcp_session.h>
22 : #include <vr_types.h>
23 : #include <nl_util.h>
24 : #include "ksync_entry.h"
25 : #include "ksync_tx_queue.h"
26 :
27 : #define KSYNC_DEFAULT_MSG_SIZE 4096
28 : #define KSYNC_DEFAULT_Q_ID_SEQ 0x00000001
29 : #define KSYNC_ACK_WAIT_THRESHOLD 200
30 : #define KSYNC_SOCK_RECV_BUFF_SIZE (256 * 1024)
31 : #define KSYNC_BMC_ARR_SIZE 1024
32 :
33 : class KSyncEntry;
34 : class KSyncIoContext;
35 : class KSyncSockTcpSession;
36 : struct nl_client;
37 : class KSyncBulkSandeshContext;
38 :
39 : typedef std::vector<boost::asio::mutable_buffers_1> KSyncBufferList;
40 :
41 : uint32_t GetNetlinkSeqno(char *data);
42 : bool NetlinkMsgDone(char *data);
43 : bool ValidateNetlink(char *data);
44 : void GetNetlinkPayload(char *data, char **buf, uint32_t *buf_len);
45 : void InitNetlink(nl_client *client);
46 : void ResetNetlink(nl_client *client);
47 : void UpdateNetlink(nl_client *client, uint32_t len, uint32_t seq_no);
48 : void DecodeSandeshMessages(char *buf, uint32_t buf_len, SandeshContext *sandesh_context,
49 : uint32_t alignment);
50 :
51 : /* Base class to hold sandesh context information which is passed to
52 : * Sandesh decode
53 : */
54 : class AgentSandeshContext : public SandeshContext {
55 : public:
56 1968 : AgentSandeshContext() : errno_(0), ksync_io_ctx_(NULL) { }
57 83765 : virtual ~AgentSandeshContext() { }
58 :
59 : virtual void IfMsgHandler(vr_interface_req *req) = 0;
60 : virtual void NHMsgHandler(vr_nexthop_req *req) = 0;
61 : virtual void RouteMsgHandler(vr_route_req *req) = 0;
62 : virtual void MplsMsgHandler(vr_mpls_req *req) = 0;
63 : virtual int VrResponseMsgHandler(vr_response *resp) = 0;
64 : virtual void MirrorMsgHandler(vr_mirror_req *req) = 0;
65 : virtual void FlowMsgHandler(vr_flow_req *req) = 0;
66 0 : virtual void FlowResponseHandler(vr_flow_response *req) { assert(0); }
67 0 : virtual void FlowTableInfoHandler(vr_flow_table_data *r) { assert(0); }
68 0 : virtual void BridgeTableInfoHandler(vr_bridge_table_data *r) { assert(0);}
69 : virtual void VrfAssignMsgHandler(vr_vrf_assign_req *req) = 0;
70 : virtual void VrfMsgHandler(vr_vrf_req *req) = 0;
71 : virtual void VrfStatsMsgHandler(vr_vrf_stats_req *req) = 0;
72 : virtual void DropStatsMsgHandler(vr_drop_stats_req *req) = 0;
73 : virtual void VxLanMsgHandler(vr_vxlan_req *req) = 0;
74 0 : virtual void VrouterHugePageHandler(vr_hugepage_config *req) {}
75 : virtual void VrouterOpsMsgHandler(vrouter_ops *req) = 0;
76 : virtual void QosConfigMsgHandler(vr_qos_map_req *req) = 0;
77 : virtual void ForwardingClassMsgHandler(vr_fc_map_req *req) = 0;
78 42727 : virtual void SetErrno(int err) {errno_ = err;}
79 :
80 1166 : int GetErrno() const {return errno_;}
81 2040 : void set_ksync_io_ctx(const KSyncIoContext *ioc) {ksync_io_ctx_ = ioc;}
82 144 : const KSyncIoContext *ksync_io_ctx() const {return ksync_io_ctx_;}
83 : private:
84 : int errno_;
85 : const KSyncIoContext *ksync_io_ctx_;
86 : };
87 :
88 :
89 : /* Base class for context management. Used while sending and
90 : * receiving data via ksync socket
91 : */
92 : class IoContext {
93 : public:
94 : // Type of IoContext. The work-queue used for processing response is based
95 : // on this
96 : // IOC_UVE : Used for UVEs
97 : // IOC_KSYNC : Used for KSync objects
98 : enum Type {
99 : IOC_UVE,
100 : IOC_KSYNC,
101 : MAX_WORK_QUEUES // This should always be last
102 : };
103 : // Work-queue neames used for the ksync receive work-queues
104 : static const char *io_wq_names[MAX_WORK_QUEUES];
105 :
106 : IoContext() :
107 : sandesh_context_(NULL), msg_(NULL), msg_len_(0), seqno_(0),
108 : type_(IOC_KSYNC), index_(0), rx_buffer1_(NULL), rx_buffer2_(NULL) {
109 : }
110 0 : IoContext(char *msg, uint32_t len, uint32_t seq, AgentSandeshContext *ctx,
111 0 : Type type) :
112 0 : sandesh_context_(ctx), msg_(msg), msg_len_(len), seqno_(seq),
113 0 : type_(type), index_(0), rx_buffer1_(NULL), rx_buffer2_(NULL) {
114 0 : }
115 1020 : IoContext(char *msg, uint32_t len, uint32_t seq, AgentSandeshContext *ctx,
116 1020 : Type type, uint32_t index) :
117 1020 : sandesh_context_(ctx), msg_(msg), msg_len_(len), seqno_(seq),
118 1020 : type_(type), index_(index), rx_buffer1_(NULL), rx_buffer2_(NULL) {
119 1020 : }
120 1020 : virtual ~IoContext() {
121 1020 : if (msg_ != NULL)
122 1020 : free(msg_);
123 1020 : assert(rx_buffer1_ == NULL);
124 1020 : assert(rx_buffer2_ == NULL);
125 1020 : }
126 :
127 : bool operator<(const IoContext &rhs) const {
128 : return seqno_ < rhs.seqno_;
129 : }
130 :
131 0 : virtual void Handler() {}
132 0 : virtual void ErrorHandler(int err) {}
133 :
134 4224 : AgentSandeshContext *GetSandeshContext() { return sandesh_context_; }
135 3060 : Type type() { return type_; }
136 :
137 1020 : void SetSeqno(uint32_t seqno) {seqno_ = seqno;}
138 1020 : uint32_t GetSeqno() const {return seqno_;}
139 1020 : char *GetMsg() const { return msg_; }
140 2040 : uint32_t GetMsgLen() const { return msg_len_; }
141 1164 : char *rx_buffer1() const { return rx_buffer1_; }
142 144 : void reset_rx_buffer1() { rx_buffer1_ = NULL; }
143 1164 : char *rx_buffer2() { return rx_buffer2_; }
144 144 : void reset_rx_buffer2() { rx_buffer2_ = NULL; }
145 3060 : uint32_t index() const { return index_; }
146 :
147 : boost::intrusive::list_member_hook<> node_;
148 :
149 : protected:
150 : AgentSandeshContext *sandesh_context_;
151 :
152 : private:
153 : char *msg_;
154 : uint32_t msg_len_;
155 : uint32_t seqno_;
156 : Type type_;
157 : uint32_t index_;
158 : // Buffers allocated to read the ksync responses for this IoContext.
159 : // As an optimization, KSync Tx Queue will use these buffers to minimize
160 : // computation in KSync Tx Queue context.
161 : char *rx_buffer1_;
162 : char *rx_buffer2_;
163 :
164 : friend class KSyncSock;
165 : };
166 :
167 : /* IoContext tied to KSyncEntry */
168 : class KSyncIoContext : public IoContext {
169 : public:
170 : KSyncIoContext(KSyncSock *sock, KSyncEntry *sync_entry, int msg_len,
171 : char *msg, KSyncEntry::KSyncEvent event);
172 2040 : virtual ~KSyncIoContext() {
173 2040 : }
174 :
175 : virtual void Handler();
176 :
177 : void ErrorHandler(int err);
178 144 : KSyncEntry *GetKSyncEntry() const {return entry_;}
179 144 : KSyncEntry::KSyncEvent event() const {return event_;}
180 : private:
181 : KSyncEntry *entry_;
182 : KSyncEntry::KSyncEvent event_;
183 : AgentSandeshContext *agent_sandesh_ctx_;
184 : KSyncSock *sock_;
185 : };
186 :
187 : /*
188 : * KSync implementation of bunching messages.
189 : *
190 : * Message bunching has two parts,
191 : * Encoding messages
192 : * KSyncTxQueue is responsible to bunch KSync requests into single message
193 : *
194 : * KSyncTxQueue uses KSyncBulkMsgContext to bunch KSync events.
195 : * The IoContext for bunched KSync events are stored as list inside
196 : * KSyncBulkMessageContext
197 : *
198 : * KSync Events are bunched if pass following constraints,
199 : * - Number of KSync events is less than max_bulk_msg_count_
200 : * - Total size of buffer for events is less than max_bulk_buf_size_
201 : * - Type of IoContext is same.
202 : * When type of IoContext is same, it also ensures that KSync Responses
203 : * are processed in same WorkQueue.
204 : * - Each IoContext type has multiple work-queues. Flows are sprayed across
205 : * the work-queues based on flow-table index. Entries are bunched only if
206 : * they belong to same work-queue.
207 : * This also ensures that flows from a flow-table partition are processed
208 : * in single KSync Response Work-Queue
209 : * - The input queue for KSyncTxQueue is not empty.
210 : *
211 : * Decoding messages
212 : * KSync response are processed in context of KSyncSock::receive_work_queue_.
213 : * Each IoContext type has its own set of receive work-queue.
214 : *
215 : * Bulk decoder works on following assumption,
216 : * - Expects KSync Responses for each IoContext in KSyncBulkMsgContext
217 : * - Response for each IoContext can itself be more than one Sandesh Response.
218 : * - The sequence of response is same as sequence of IoContext entries
219 : * - Response for each IoContext starts with VrResponseMsg followed
220 : * optionally by other response. Hence, decoder moves to next IoContext
221 : * after getting VrResponse message
222 : *
223 : * class KSyncBulkSandeshContext is used to decode the Sandesh Responses
224 : * and move the IoContext on getting VrResponse
225 : */
226 : typedef boost::intrusive::member_hook<IoContext,
227 : boost::intrusive::list_member_hook<>,
228 : &IoContext::node_> KSyncSockNode;
229 : typedef boost::intrusive::list<IoContext, KSyncSockNode> IoContextList;
230 :
231 : class KSyncBulkMsgContext {
232 : public:
233 : const static unsigned kMaxRxBufferCount = 64;
234 : KSyncBulkMsgContext(IoContext::Type type, uint32_t index);
235 : KSyncBulkMsgContext(const KSyncBulkMsgContext &rhs);
236 : ~KSyncBulkMsgContext();
237 :
238 : void Insert(IoContext *ioc);
239 : void Data(KSyncBufferList *iovec);
240 2086 : IoContext::Type io_context_type() const {
241 2086 : return io_context_type_;
242 : }
243 : void AddReceiveBuffer(char *buff);
244 : char *GetReceiveBuffer();
245 2086 : uint32_t work_queue_index() const { return work_queue_index_; }
246 0 : void set_seqno(uint32_t seq) { seqno_ = seq; }
247 0 : uint32_t seqno() { return seqno_; }
248 : private:
249 : friend class KSyncBulkSandeshContext;
250 : // List of IoContext to be processed in this context
251 : IoContextList io_context_list_;
252 : // Type of message
253 : IoContext::Type io_context_type_;
254 : // Index of work-queue
255 : uint32_t work_queue_index_;
256 : // List of rx-buffers
257 : // The buffers are taken from IoContext added to the list above
258 : // If IoContext does not have buffer, then it dyamically allocates
259 : // buffer
260 : char *rx_buffers_[kMaxRxBufferCount];
261 : // Index of next buffer to process
262 : uint32_t rx_buffer_index_;
263 :
264 : ///////////////////////////////////////////////////////////////////////
265 : // Following fields are used for decode processing
266 : ///////////////////////////////////////////////////////////////////////
267 : // Number of responses already processed
268 : uint32_t vr_response_count_;
269 : // Iterator to IoContext being processed
270 : IoContextList::iterator io_context_list_it_;
271 : uint32_t seqno_;
272 : };
273 :
274 : class KSyncBulkSandeshContext : public AgentSandeshContext {
275 : public:
276 : KSyncBulkSandeshContext();
277 : virtual ~KSyncBulkSandeshContext();
278 :
279 : void IfMsgHandler(vr_interface_req *req);
280 : void NHMsgHandler(vr_nexthop_req *req);
281 : void RouteMsgHandler(vr_route_req *req);
282 : void MplsMsgHandler(vr_mpls_req *req);
283 : int VrResponseMsgHandler(vr_response *resp);
284 : void MirrorMsgHandler(vr_mirror_req *req);
285 : void FlowMsgHandler(vr_flow_req *req);
286 : void FlowResponseHandler(vr_flow_response *req);
287 : void VrfAssignMsgHandler(vr_vrf_assign_req *req);
288 : void VrfMsgHandler(vr_vrf_req *req);
289 : void VrfStatsMsgHandler(vr_vrf_stats_req *req);
290 : void DropStatsMsgHandler(vr_drop_stats_req *req);
291 : void VxLanMsgHandler(vr_vxlan_req *req);
292 : void VrouterOpsMsgHandler(vrouter_ops *req);
293 : void QosConfigMsgHandler(vr_qos_map_req *req);
294 : void ForwardingClassMsgHandler(vr_fc_map_req *req);
295 : void SetErrno(int err);
296 :
297 : bool Decoder(char *buff, uint32_t buff_len, uint32_t alignment, bool more);
298 : AgentSandeshContext *GetSandeshContext();
299 1066 : void set_bulk_message_context(KSyncBulkMsgContext *bulk_context) {
300 1066 : bulk_msg_context_ = bulk_context;
301 1066 : }
302 :
303 : void IoContextStart();
304 : void IoContextDone();
305 :
306 : private:
307 : KSyncBulkMsgContext *bulk_msg_context_;
308 : DISALLOW_COPY_AND_ASSIGN(KSyncBulkSandeshContext);
309 : };
310 :
311 : class KSyncSock {
312 : public:
313 : // Number of flow receive queues
314 : const static int kRxWorkQueueCount = 2;
315 : const static int kMsgGrowSize = 16;
316 : const static unsigned kBufLen = (4*1024);
317 :
318 : // Number of messages that can be bunched together
319 : const static unsigned kMaxBulkMsgCount = 16;
320 : // Sequence number to denote invalid builk-context
321 : const static unsigned kInvalidBulkSeqNo = 0xFFFFFFFF;
322 :
323 : typedef std::map<uint32_t, KSyncBulkMsgContext> WaitTree;
324 : typedef std::pair<uint32_t, KSyncBulkMsgContext> WaitTreePair;
325 : typedef boost::function<void(const boost::system::error_code &, size_t)>
326 : HandlerCb;
327 :
328 : // Request structure in the KSync Response Queue
329 : struct KSyncRxData {
330 : // buffer having KSync response
331 : char *buff_;
332 : // bulk context for decoding response
333 : KSyncBulkMsgContext *bulk_msg_context_;
334 :
335 357 : KSyncRxData() : buff_(NULL), bulk_msg_context_(NULL) { }
336 4264 : KSyncRxData(const KSyncRxData &rhs) :
337 4264 : buff_(rhs.buff_), bulk_msg_context_(rhs.bulk_msg_context_) {
338 4264 : }
339 1066 : KSyncRxData(char *buff, KSyncBulkMsgContext *ctxt) :
340 1066 : buff_(buff), bulk_msg_context_(ctxt) {
341 1066 : }
342 : };
343 : typedef WorkQueue<KSyncRxData> KSyncReceiveQueue;
344 : // structure for ksyncrprocess Rx queue
345 : struct KSyncRxQueueData {
346 : KSyncEntry *entry_;
347 : KSyncEntry::KSyncEvent event_;
348 357 : KSyncRxQueueData():entry_(NULL),event_(KSyncEntry::INVALID) {}
349 1020 : KSyncRxQueueData(KSyncEntry *entry, KSyncEntry::KSyncEvent event) :
350 1020 : entry_(entry), event_(event) {
351 1020 : }
352 : };
353 : typedef WorkQueue<KSyncRxQueueData> KSyncRxWorkQueue;
354 :
355 : KSyncSock();
356 : virtual ~KSyncSock();
357 :
358 : // Virtual methods
359 : virtual bool BulkDecoder(char *data, KSyncBulkSandeshContext *ctxt) = 0;
360 : virtual bool Decoder(char *data, AgentSandeshContext *ctxt) = 0;
361 :
362 : // Write a KSyncEntry to kernel
363 : void SendAsync(KSyncEntry *entry, int msg_len, char *msg,
364 : KSyncEntry::KSyncEvent event);
365 : std::size_t BlockingSend(char *msg, int msg_len);
366 : bool BlockingRecv();
367 : void GenericSend(IoContext *ctx);
368 : uint32_t AllocSeqNo(IoContext::Type type);
369 : uint32_t AllocSeqNo(IoContext::Type type, uint32_t instance);
370 : KSyncReceiveQueue *GetReceiveQueue(IoContext::Type type, uint32_t instance);
371 : KSyncReceiveQueue *GetReceiveQueue(uint32_t seqno);
372 :
373 : // Bulk Messaging methods
374 : KSyncBulkMsgContext *LocateBulkContext(uint32_t seqno,
375 : IoContext::Type io_context_type,
376 : uint32_t work_queue_index);
377 : int SendBulkMessage(KSyncBulkMsgContext *bulk_context, uint32_t seqno);
378 : bool TryAddToBulk(KSyncBulkMsgContext *bulk_context, IoContext *ioc);
379 : void OnEmptyQueue(bool done);
380 2 : int tx_count() const { return tx_count_; }
381 :
382 : // Start Ksync Asio operations
383 : static void Start(bool read_inline);
384 : static void Shutdown();
385 :
386 : // Partition to KSyncSock mapping
387 : static KSyncSock *Get(DBTablePartBase *partition);
388 : static KSyncSock *Get(int partition_id);
389 :
390 0 : static uint32_t GetPid() {return pid_;};
391 2046 : static int GetNetlinkFamilyId() {return vnsw_netlink_family_id_;}
392 : static void SetNetlinkFamilyId(int id);
393 :
394 1026 : static AgentSandeshContext *GetAgentSandeshContext(uint32_t type) {
395 1026 : return agent_sandesh_ctx_[type % kRxWorkQueueCount];
396 : }
397 8 : static void SetAgentSandeshContext(AgentSandeshContext *ctx, uint32_t idx) {
398 8 : agent_sandesh_ctx_[idx] = ctx;
399 8 : }
400 :
401 0 : const KSyncTxQueue *send_queue() const { return &send_queue_; }
402 0 : const KSyncReceiveQueue *get_receive_work_queue(uint16_t index) const {
403 0 : return ksync_rx_queue[index];
404 : }
405 : // Allocate a recieve work-queue
406 :
407 : KSyncReceiveQueue *AllocQueue(KSyncBulkSandeshContext ctxt[],
408 : uint32_t task_id, uint32_t instance,
409 : const char *name);
410 :
411 : uint32_t WaitTreeSize() const;
412 : void SetSeqno(uint32_t seq);
413 : void SetMeasureQueueDelay(bool val);
414 0 : void reset_use_wait_tree() { use_wait_tree_ = false; }
415 0 : void set_process_data_inline() { process_data_inline_ = true; }
416 : // API to enqueue ksync events to rx process work queue
417 : void EnqueueRxProcessData(KSyncEntry *entry, KSyncEntry::KSyncEvent event);
418 : protected:
419 : static void Init(bool use_work_queue, const std::string &cpu_pin_policy);
420 : static void SetSockTableEntry(KSyncSock *sock);
421 : bool ValidateAndEnqueue(char *data, KSyncBulkMsgContext *context);
422 : KSyncBulkSandeshContext *GetBulkSandeshContext(uint32_t seqno);
423 : void ProcessDataInline(char *data);
424 :
425 : std::mutex mutex_;
426 : nl_client *nl_client_;
427 : // Tree of all IoContext pending ksync response
428 : WaitTree wait_tree_;
429 : KSyncTxQueue send_queue_;
430 : KSyncReceiveQueue *uve_rx_queue[kRxWorkQueueCount];
431 : KSyncReceiveQueue *ksync_rx_queue[kRxWorkQueueCount];
432 :
433 : // Information maintained for bulk processing
434 :
435 : // Max messages in one bulk context
436 : uint32_t max_bulk_msg_count_;
437 : // Max buffer size in one bulk context
438 : uint32_t max_bulk_buf_size_;
439 :
440 : // Sequence number of first message in bulk context. Entry in WaitTree is
441 : // added based on this sequence number
442 : uint32_t bulk_seq_no_;
443 : // Current buffer size in bulk context
444 : uint32_t bulk_buf_size_;
445 : // Current message count in bulk context
446 : uint32_t bulk_msg_count_;
447 :
448 : uint32_t bmca_prod_;
449 : uint32_t bmca_cons_;
450 : KSyncBulkMsgContext *bulk_mctx_arr_[KSYNC_BMC_ARR_SIZE];
451 :
452 : private:
453 : friend class KSyncTxQueue;
454 : virtual void AsyncReceive(boost::asio::mutable_buffers_1, HandlerCb) = 0;
455 : virtual void AsyncSendTo(KSyncBufferList *iovec, uint32_t seq_no,
456 : HandlerCb cb) = 0;
457 : virtual std::size_t SendTo(KSyncBufferList *iovec, uint32_t seq_no) = 0;
458 : virtual void Receive(boost::asio::mutable_buffers_1) = 0;
459 : virtual uint32_t GetSeqno(char *data) = 0;
460 : virtual bool IsMoreData(char *data) = 0;
461 : virtual bool Validate(char *data) = 0;
462 :
463 : // Read handler registered with boost::asio. Demux done based on seqno_
464 : void ReadHandler(const boost::system::error_code& error,
465 : size_t bytes_transferred);
466 :
467 : // Write handler registered with boost::asio. Demux done based on seqno_
468 : void WriteHandler(const boost::system::error_code& error,
469 : size_t bytes_transferred);
470 :
471 : bool ProcessKernelData(KSyncBulkSandeshContext *ksync_context,
472 : const KSyncRxData &data);
473 : bool ProcessRxData(KSyncRxQueueData data);
474 : bool SendAsyncImpl(IoContext *ioc);
475 : bool SendAsyncStart() {
476 : std::scoped_lock lock(mutex_);
477 : return (wait_tree_.size() <= KSYNC_ACK_WAIT_THRESHOLD);
478 : }
479 :
480 : private:
481 : char *rx_buff_;
482 : std::atomic<uint32_t> seqno_;
483 : std::atomic<uint32_t> uve_seqno_;
484 : // Read ksync responses inline
485 : // The IoContext WaitTree is not used when response is read-inline
486 : bool read_inline_;
487 : KSyncBulkMsgContext *bulk_msg_context_;
488 : bool use_wait_tree_;
489 : bool process_data_inline_;
490 : KSyncBulkSandeshContext ksync_bulk_sandesh_context_[kRxWorkQueueCount];
491 : KSyncBulkSandeshContext uve_bulk_sandesh_context_[kRxWorkQueueCount];
492 :
493 : // Debug stats
494 : int tx_count_;
495 : int ack_count_;
496 : int err_count_;
497 :
498 : // IO context can defer ksync event processing
499 : // by defering them to this work queue, this queue gets
500 : // processed in Agent::KSync context
501 : KSyncRxWorkQueue rx_process_queue_;
502 : static std::unique_ptr<KSyncSock> sock_;
503 : static pid_t pid_;
504 : static int vnsw_netlink_family_id_;
505 : // AgentSandeshContext used for KSync response handling
506 : // AgentSandeshContext used for decode is picked based on work-queue index
507 : // Picking AgentSandeshContext based on work-queue index also makes it
508 : // thread safe
509 : static AgentSandeshContext *agent_sandesh_ctx_[kRxWorkQueueCount];
510 : static std::atomic<bool> shutdown_;
511 :
512 : DISALLOW_COPY_AND_ASSIGN(KSyncSock);
513 : };
514 :
515 : //netlink socket class for interacting with kernel
516 : class KSyncSockNetlink : public KSyncSock {
517 : public:
518 : KSyncSockNetlink(boost::asio::io_context &ios, int protocol);
519 : virtual ~KSyncSockNetlink();
520 :
521 : virtual uint32_t GetSeqno(char *data);
522 : virtual bool IsMoreData(char *data);
523 : virtual bool BulkDecoder(char *data, KSyncBulkSandeshContext *ctxt);
524 : virtual bool Decoder(char *data, AgentSandeshContext *ctxt);
525 : virtual bool Validate(char *data);
526 : virtual void AsyncReceive(boost::asio::mutable_buffers_1, HandlerCb);
527 : virtual void AsyncSendTo(KSyncBufferList *iovec, uint32_t seq_no,
528 : HandlerCb cb);
529 : virtual std::size_t SendTo(KSyncBufferList *iovec, uint32_t seq_no);
530 : virtual void Receive(boost::asio::mutable_buffers_1);
531 :
532 : static void NetlinkDecoder(char *data, SandeshContext *ctxt);
533 : static void NetlinkBulkDecoder(char *data, SandeshContext *ctxt, bool more);
534 : static void Init(boost::asio::io_context &ios, int protocol, bool use_work_queue,
535 : const std::string &cpu_pin_policy);
536 : private:
537 : boost::asio::netlink::raw::socket sock_;
538 : };
539 :
540 : //udp socket class for interacting with user vrouter
541 : class KSyncSockUdp : public KSyncSock {
542 : public:
543 : KSyncSockUdp(boost::asio::io_context &ios, int port);
544 0 : virtual ~KSyncSockUdp() { }
545 :
546 : virtual uint32_t GetSeqno(char *data);
547 : virtual bool IsMoreData(char *data);
548 : virtual bool BulkDecoder(char *data, KSyncBulkSandeshContext *ctxt);
549 : virtual bool Decoder(char *data, AgentSandeshContext *ctxt);
550 : virtual bool Validate(char *data);
551 : virtual void AsyncReceive(boost::asio::mutable_buffers_1, HandlerCb);
552 : virtual void AsyncSendTo(KSyncBufferList *iovec, uint32_t seq_no,
553 : HandlerCb cb);
554 : virtual std::size_t SendTo(KSyncBufferList *iovec, uint32_t seq_no);
555 : virtual void Receive(boost::asio::mutable_buffers_1);
556 :
557 : static void Init(boost::asio::io_context &ios, int port,
558 : const std::string &cpu_pin_policy);
559 : private:
560 : boost::asio::ip::udp::socket sock_;
561 : boost::asio::ip::udp::endpoint server_ep_;
562 : };
563 :
564 : //Unix domain socket class for interacting with user vrouter
565 : #define KSYNC_AGENT_VROUTER_SOCK_PATH "/var/run/vrouter/dpdk_netlink"
566 : class KSyncSockUds : public KSyncSock {
567 : public:
568 : KSyncSockUds(boost::asio::io_context &ios);
569 0 : virtual ~KSyncSockUds() {
570 0 : delete rx_buff_;
571 0 : delete rx_buff_q_;
572 0 : }
573 :
574 : virtual uint32_t GetSeqno(char *data);
575 : virtual bool IsMoreData(char *data);
576 : virtual bool BulkDecoder(char *data, KSyncBulkSandeshContext *ctxt);
577 : virtual bool Decoder(char *data, AgentSandeshContext *ctxt);
578 : virtual bool Validate(char *data);
579 : virtual void AsyncReceive(boost::asio::mutable_buffers_1, HandlerCb);
580 : virtual void AsyncSendTo(KSyncBufferList *iovec, uint32_t seq_no,
581 : HandlerCb cb);
582 : virtual std::size_t SendTo(KSyncBufferList *iovec, uint32_t seq_no);
583 : virtual void Receive(boost::asio::mutable_buffers_1);
584 : virtual bool Run(void);
585 :
586 : static void Init(boost::asio::io_context &ios,
587 : const std::string &cpu_pin_policy,
588 : const std::string &sockpathvr="");
589 : private:
590 : boost::asio::local::stream_protocol::socket sock_;
591 : boost::asio::local::stream_protocol::endpoint server_ep_;
592 : char *rx_buff_;
593 : char *rx_buff_q_;
594 : size_t remain_;
595 : int socket_;
596 : int connected_;
597 : static string sockpath_;
598 : };
599 :
600 : class KSyncSockTcpSessionReader : public TcpMessageReader {
601 : public:
602 : KSyncSockTcpSessionReader(TcpSession *session, ReceiveCallback callback);
603 0 : virtual ~KSyncSockTcpSessionReader() { }
604 :
605 : protected:
606 : virtual int MsgLength(Buffer buffer, int offset);
607 :
608 0 : virtual const int GetHeaderLenSize() {
609 0 : return sizeof(struct nlmsghdr);
610 : }
611 :
612 0 : virtual const int GetMaxMessageSize() {
613 0 : return kMaxMessageSize;
614 : }
615 :
616 : private:
617 : static const int kMaxMessageSize = 4096;
618 : };
619 :
620 : class KSyncSockTcpSession : public TcpSession {
621 : public:
622 : KSyncSockTcpSession(TcpServer *server, Socket *sock,
623 : bool async_ready = false);
624 : virtual ~KSyncSockTcpSession();
625 : protected:
626 : virtual void OnRead(Buffer buffer);
627 : private:
628 : KSyncSockTcpSessionReader *reader_;
629 : };
630 :
631 : class KSyncSockTcp : public KSyncSock, public TcpServer {
632 : public:
633 : KSyncSockTcp(EventManager *evm, boost::asio::ip::address ip_addr,
634 : int port);
635 0 : virtual ~KSyncSockTcp() { }
636 :
637 : virtual uint32_t GetSeqno(char *data);
638 : virtual bool IsMoreData(char *data);
639 : virtual bool BulkDecoder(char *data, KSyncBulkSandeshContext *ctxt);
640 : virtual bool Decoder(char *data, AgentSandeshContext *ctxt);
641 : virtual bool Validate(char *data);
642 : virtual void AsyncReceive(boost::asio::mutable_buffers_1, HandlerCb);
643 : virtual void AsyncSendTo(KSyncBufferList *iovec, uint32_t seq_no,
644 : HandlerCb cb);
645 : virtual std::size_t SendTo(KSyncBufferList *iovec, uint32_t seq_no);
646 : virtual void Receive(boost::asio::mutable_buffers_1);
647 : virtual TcpSession *AllocSession(Socket *socket);
648 : virtual bool Run(void);
649 :
650 : bool ReceiveMsg(const u_int8_t *msg, size_t size);
651 : void OnSessionEvent(TcpSession *session, TcpSession::Event event);
652 0 : bool connect_complete() const {
653 0 : return connect_complete_;
654 : }
655 : void AsyncReadStart();
656 :
657 : static void Init(EventManager *evm,
658 : boost::asio::ip::address ip_addr, int port,
659 : const std::string &cpu_pin_policy);
660 : private:
661 : EventManager *evm_;
662 : TcpSession *session_;
663 : boost::asio::ip::tcp::endpoint server_ep_;
664 : boost::asio::ip::tcp::socket *tcp_socket_;
665 : bool connect_complete_;
666 : char *rx_buff_;
667 : char *rx_buff_rem_;
668 : size_t remain_;
669 : };
670 : #endif // ctrlplane_ksync_sock_h
|