Line data Source code
1 : /*
2 : * Copyright (c) 2017 Juniper Networks, Inc. All rights reserved.
3 : */
4 :
5 : #include <sandesh/sandesh_types.h>
6 : #include <sandesh/sandesh.h>
7 : #include <cmn/agent_cmn.h>
8 : #include <init/agent_init.h>
9 : #include <oper/interface.h>
10 : #include <oper/vm_interface.h>
11 : #include <oper/metadata_ip.h>
12 : #include <pkt/proto_handler.h>
13 : #include "pkt/pkt_init.h"
14 : #include <services/bfd_proto.h>
15 : #include <services/bfd_handler.h>
16 : #include <services/services_types.h>
17 : #include <services/services_init.h>
18 : #include "base/logging.h"
19 :
20 1 : BfdProto::BfdProto(Agent *agent, boost::asio::io_context &io) :
21 : Proto(agent, "Agent::Services", PktHandler::BFD, io),
22 1 : msg_(new PktInfo(agent, BFD_TX_BUFF_LEN, PktHandler::BFD, 0)),
23 1 : communicator_(this),
24 1 : server_(new BFD::Server(agent->event_manager(), &communicator_)),
25 2 : client_(new BFD::Client(&communicator_)), handler_(agent, msg_, io) {
26 :
27 : // limit the number of entries in the workqueue
28 1 : work_queue_.SetSize(agent->params()->services_queue_limit());
29 1 : work_queue_.SetBounded(true);
30 :
31 1 : agent->health_check_table()->RegisterHealthCheckCallback(
32 : boost::bind(&BfdProto::BfdHealthCheckSessionControl, this, _1, _2),
33 : HealthCheckService::BFD);
34 1 : }
35 :
36 2 : BfdProto::~BfdProto() {
37 2 : }
38 :
39 0 : ProtoHandler *BfdProto::AllocProtoHandler(boost::shared_ptr<PktInfo> info,
40 : boost::asio::io_context &io) {
41 0 : return new BfdHandler(agent(), info, io);
42 : }
43 :
44 0 : bool BfdSessionsKey::IsLess(const BfdSessionsKey &rhs) const {
45 0 : if (id_ != rhs.id_) {
46 0 : return id_ < rhs.id_;
47 : }
48 0 : return ip_ < rhs.ip_;
49 : }
50 :
51 0 : bool BfdSessionsKey::IsEqual(const BfdSessionsKey &rhs) const {
52 0 : if (id_ != rhs.id_) {
53 0 : return false;
54 : }
55 0 : if (ip_ != rhs.ip_) {
56 0 : return false;
57 : }
58 0 : return true;
59 : }
60 :
61 0 : bool BfdProto::BfdHealthCheckSessionControl(
62 : HealthCheckTable::HealthCheckServiceAction action,
63 : HealthCheckInstanceService *service) {
64 :
65 0 : uint16_t remote_port = service->is_multi_hop() ?
66 0 : BFD::kMultiHop : BFD::kSingleHop;
67 0 : IpAddress source_ip = service->update_source_ip();
68 0 : IpAddress destination_ip = service->destination_ip();
69 0 : if (source_ip.is_unspecified()) {
70 0 : BFD_TRACE(Trace, "Ignore",
71 : destination_ip.to_string(),
72 : source_ip.to_string(), service->interface()->id(),
73 : 0, 0, 0);
74 0 : return false;
75 : }
76 : BFD::SessionKey key(destination_ip,
77 0 : BFD::SessionIndex(service->interface()->id()),
78 : remote_port,
79 0 : source_ip);
80 :
81 0 : std::scoped_lock lock(mutex_);
82 0 : switch (action) {
83 0 : case HealthCheckTable::CREATE_SERVICE:
84 : case HealthCheckTable::UPDATE_SERVICE:
85 : {
86 0 : if (source_ip.is_v4() &&
87 0 : source_ip.to_v4() == Ip4Address(METADATA_IP_ADDR)) {
88 0 : return false;
89 : }
90 :
91 0 : uint32_t tx_interval = service->service()->delay() * 1000000 +
92 0 : service->service()->delay_usecs();
93 0 : if (!tx_interval) {
94 0 : tx_interval = kMinTxInterval;
95 : }
96 0 : uint32_t rx_interval = service->service()->timeout() * 1000000 +
97 0 : service->service()->timeout_usecs();
98 0 : if (!rx_interval) {
99 0 : rx_interval = kMinRxInterval;
100 : }
101 0 : uint32_t multiplier = service->service()->max_retries();
102 0 : if (!multiplier) {
103 0 : multiplier = kMultiplier;
104 : }
105 :
106 0 : BFD::SessionConfig session_config;
107 0 : session_config.desiredMinTxInterval =
108 0 : boost::posix_time::microseconds(tx_interval);
109 0 : session_config.requiredMinRxInterval =
110 0 : boost::posix_time::microseconds(rx_interval);
111 0 : session_config.detectionTimeMultiplier = multiplier;
112 :
113 0 : client_->AddSession(key, session_config);
114 0 : BfdSessionsKey skey(service->interface()->id(),
115 0 : service->destination_ip());
116 0 : sessions_[skey] = service;
117 0 : BFD_TRACE(Trace, "Add / Update",
118 : destination_ip.to_string(),
119 : source_ip.to_string(), service->interface()->id(),
120 : tx_interval, rx_interval, multiplier);
121 0 : break;
122 0 : }
123 :
124 0 : case HealthCheckTable::DELETE_SERVICE:
125 : {
126 0 : client_->DeleteSession(key);
127 0 : BfdSessionsKey skey(service->interface()->id(),
128 0 : service->destination_ip());
129 0 : Sessions::iterator it = sessions_.find(skey);
130 0 : if (it != sessions_.end()) {
131 0 : sessions_.erase(it);
132 0 : BFD_TRACE(Trace, "Delete",
133 : destination_ip.to_string(),
134 : source_ip.to_string(), service->interface()->id(),
135 : 0, 0, 0);
136 : } else {
137 : // TODO: replace log with trace
138 0 : std::string str("BFD delete session ");
139 0 : str += " interface-id: " + service->interface()->id();
140 0 : str += " source-ip: " + source_ip.to_string();
141 0 : str += " destination-ip: " + destination_ip.to_string();
142 :
143 0 : LOG(ERROR, "Duplicate request: " << str);
144 0 : }
145 0 : break;
146 0 : }
147 :
148 0 : case HealthCheckTable::RUN_SERVICE:
149 0 : break;
150 :
151 0 : case HealthCheckTable::STOP_SERVICE:
152 0 : break;
153 :
154 0 : default:
155 0 : assert(0);
156 : }
157 :
158 0 : return true;
159 0 : }
160 :
161 0 : void BfdProto::NotifyHealthCheckInstanceService(uint32_t interface,
162 : IpAddress address,
163 : std::string &data) {
164 0 : std::scoped_lock lock(mutex_);
165 0 : BfdSessionsKey key(interface, address);
166 0 : Sessions::iterator it = sessions_.find(key);
167 0 : if (it == sessions_.end()) {
168 0 : return;
169 : }
170 0 : it->second->OnRead(data);
171 :
172 : // log the BFD up/down event
173 0 : std::string str("BFD session ");
174 0 : if (data.find("success") != std::string::npos) {
175 0 : str += "Up,";
176 : }
177 0 : if (data.find("failure") != std::string::npos) {
178 0 : str += "Down,";
179 : }
180 0 : if (it->second->service()) {
181 0 : str += " service-health-check: " + it->second->service()->name();
182 : } else {
183 0 : str += " service: null";
184 : }
185 0 : if (it->second->interface()) {
186 0 : str += " interface: " + it->second->interface()->name();
187 0 : if (it->second->interface()->vrf()) {
188 0 : str += " vrf: " + it->second->interface()->vrf()->GetName();
189 : } else {
190 0 : str += " vrf: null";
191 : }
192 : } else {
193 0 : str += " interface: null";
194 : }
195 0 : if (!it->second->destination_ip().is_unspecified()) {
196 0 : str += " destination-ip: " + it->second->destination_ip().to_string();
197 : } else {
198 0 : str += " destination-ip: null";
199 : }
200 0 : LOG(WARN, "SYS_NOTICE " << str);
201 :
202 0 : }
203 :
204 0 : void BfdProto::HandleReceiveSafe(
205 : boost::asio::const_buffer packet,
206 : const boost::asio::ip::udp::endpoint &local_endpoint,
207 : const boost::asio::ip::udp::endpoint &remote_endpoint,
208 : const BFD::SessionIndex &session_index,
209 : uint8_t len,
210 : boost::system::error_code ec) {
211 :
212 0 : std::scoped_lock lock(rx_mutex_);
213 0 : bfd_communicator().HandleReceive(
214 : packet, local_endpoint, remote_endpoint,
215 : session_index,
216 : len, ec);
217 :
218 0 : IncrementReceived();
219 0 : }
220 :
221 0 : void BfdProto::BfdCommunicator::SendPacket(
222 : const boost::asio::ip::udp::endpoint &local_endpoint,
223 : const boost::asio::ip::udp::endpoint &remote_endpoint,
224 : const BFD::SessionIndex &session_index,
225 : const boost::asio::mutable_buffer &packet, int pktSize) {
226 0 : bfd_proto_->handler_.SendPacket(local_endpoint, remote_endpoint,
227 0 : session_index.if_index, packet, pktSize);
228 0 : bfd_proto_->IncrementSent();
229 0 : }
230 :
231 0 : void BfdProto::BfdCommunicator::NotifyStateChange(const BFD::SessionKey &key,
232 : const bool &up) {
233 0 : std::string data = up ? "success" : "failure";
234 0 : bfd_proto_->NotifyHealthCheckInstanceService(key.index.if_index,
235 0 : key.remote_address ,data);
236 0 : }
237 :
238 0 : bool BfdProto::Enqueue(boost::shared_ptr<PktInfo> msg) {
239 :
240 0 : if (msg->is_bfd_keepalive) {
241 : // Handle BFD keepalive packets
242 0 : return ProcessBfdKeepAlive(msg);
243 : }
244 : // Handle BFD control packets
245 0 : return Proto::Enqueue(msg);
246 : }
247 :
248 : // Handle BFD packet stats
249 0 : void BfdProto::ProcessStats(PktStatsType::Type type) {
250 0 : switch(type) {
251 0 : case PktStatsType::PKT_RX_DROP_STATS:
252 0 : IncrementReceiveDropCount();
253 0 : break;
254 0 : case PktStatsType::PKT_RX_ENQUEUE:
255 0 : IncrementKaEnqueueCount();
256 0 : break;
257 0 : default:
258 0 : return;
259 : }
260 : }
261 :
262 0 : bool BfdProto::ProcessBfdKeepAlive(boost::shared_ptr<PktInfo> msg_info) {
263 0 : PktHandler *pkt_handler = Agent::GetInstance()->pkt()->pkt_handler();
264 :
265 0 : BfdHandler *handler = new BfdHandler(agent(), msg_info, get_io_service());
266 0 : if (handler->HandleReceive())
267 0 : delete handler;
268 :
269 0 : return true;;
270 : }
|