Line data Source code
1 : /*
2 : * Copyright (c) 2014 CodiLime, Inc. All rights reserved.
3 : */
4 :
5 : #include <atomic>
6 :
7 : #include "base/task_annotations.h"
8 : #include "bfd/bfd_server.h"
9 : #include "bfd/bfd_session.h"
10 : #include "bfd/bfd_connection.h"
11 : #include "bfd/bfd_control_packet.h"
12 : #include "bfd/bfd_state_machine.h"
13 : #include "bfd/bfd_common.h"
14 :
15 : #include <boost/foreach.hpp>
16 : #include <boost/random/mersenne_twister.hpp>
17 :
18 : #include "base/logging.h"
19 : #include "io/event_manager.h"
20 :
21 : namespace BFD {
22 :
23 21 : Server::Server(EventManager *evm, Connection *communicator) :
24 21 : evm_(evm),
25 21 : communicator_(communicator),
26 21 : session_manager_(evm),
27 42 : event_queue_(new WorkQueue<Event *>(
28 42 : TaskScheduler::GetInstance()->GetTaskId("BFD"), 0,
29 63 : boost::bind(&Server::EventCallback, this, _1))) {
30 21 : communicator->SetServer(this);
31 21 : }
32 :
33 22 : Server::~Server() {
34 22 : }
35 :
36 28 : void Server::AddSession(const SessionKey &key, const SessionConfig &config,
37 : ChangeCb cb) {
38 28 : EnqueueEvent(new Event(ADD_CONNECTION, key, config, cb));
39 28 : }
40 :
41 28 : void Server::AddSession(Event *event) {
42 28 : CHECK_CONCURRENCY("BFD");
43 : Discriminator discriminator;
44 28 : ConfigureSession(event->key, event->config, &discriminator);
45 28 : sessions_.insert(event->key);
46 28 : Session *session = SessionByKey(event->key);
47 28 : if (session) {
48 28 : session->RegisterChangeCallback(0, event->cb);
49 28 : event->cb(session->key(), session->local_state());
50 : }
51 28 : }
52 :
53 25 : void Server::DeleteSession(const SessionKey &key) {
54 25 : EnqueueEvent(new Event(DELETE_CONNECTION, key));
55 25 : }
56 :
57 25 : void Server::DeleteSession(Event *event) {
58 25 : CHECK_CONCURRENCY("BFD");
59 25 : int erase_size = sessions_.erase(event->key);
60 25 : if (erase_size) {
61 25 : RemoveSessionReference(event->key);
62 : } else {
63 0 : LOG(ERROR, __func__ << "Cannot find session: " <<
64 : event->key.to_string());
65 : }
66 25 : }
67 :
68 18 : void Server::DeleteClientSessions() {
69 18 : EnqueueEvent(new Event(DELETE_CLIENT_CONNECTIONS));
70 18 : }
71 :
72 18 : void Server::DeleteClientSessions(Event *event) {
73 18 : CHECK_CONCURRENCY("BFD");
74 18 : for (Sessions::iterator it = sessions_.begin(), next;
75 18 : it != sessions_.end(); it = next) {
76 0 : SessionKey key = *it;
77 0 : next = ++it;
78 0 : sessions_.erase(key);
79 0 : RemoveSessionReference(key);
80 : }
81 18 : }
82 :
83 186 : void Server::EnqueueEvent(Event *event) {
84 186 : event_queue_->Enqueue(event);
85 186 : }
86 :
87 186 : bool Server::EventCallback(Event *event) {
88 186 : switch (event->type) {
89 28 : case ADD_CONNECTION:
90 28 : AddSession(event);
91 28 : break;
92 25 : case DELETE_CONNECTION:
93 25 : DeleteSession(event);
94 25 : break;
95 18 : case DELETE_CLIENT_CONNECTIONS:
96 18 : DeleteClientSessions(event);
97 18 : break;
98 115 : case PROCESS_PACKET:
99 115 : ProcessControlPacket(event);
100 115 : break;
101 : }
102 186 : delete event;
103 186 : return true;
104 : }
105 :
106 115 : Session* Server::GetSession(const ControlPacket *packet) {
107 115 : CHECK_CONCURRENCY("BFD");
108 :
109 115 : SessionIndex session_index;
110 115 : if (packet->local_endpoint.port() == kSingleHop) {
111 106 : session_index.if_index = packet->session_index.if_index;
112 : } else {
113 9 : session_index.vrf_index = packet->session_index.vrf_index;
114 : }
115 :
116 115 : if (packet->receiver_discriminator) {
117 : Session *session_bydesc = session_manager_.SessionByDiscriminator
118 102 : (packet->receiver_discriminator);
119 102 : if (session_bydesc && session_bydesc->Stats().rx_count == 0) {
120 13 : Session *session_bykey = session_manager_.SessionByKey(
121 13 : SessionKey(packet->remote_endpoint.address(), session_index,
122 13 : packet->local_endpoint.port(),
123 13 : packet->local_endpoint.address()));
124 13 : if (!session_bykey) {
125 0 : if ((session_bydesc->key()).local_address ==
126 0 : packet->local_endpoint.address()) {
127 0 : LOG(ERROR, __func__ << " SRC-ADDR Changed, on Peer:" <<
128 : packet->remote_endpoint.address().to_string() <<
129 : "on Agent:" << (session_bydesc->key()).remote_address.to_string());
130 : //Accept pkt to support src-addr change at peer
131 : } else {
132 0 : LOG(ERROR, __func__ << " Session from Key NULL: " <<
133 : packet->remote_endpoint.address().to_string() <<
134 : " -> " <<
135 : packet->local_endpoint.address().to_string());
136 0 : return NULL;
137 : }
138 13 : } else if (session_bydesc->local_discriminator() !=
139 13 : session_bykey->local_discriminator()) {
140 0 : LOG(ERROR, __func__ << " DISC -> Session MISMATCH: " <<
141 : packet->remote_endpoint.address().to_string() <<
142 : " -> " <<
143 : packet->local_endpoint.address().to_string());
144 0 : LOG(ERROR, "PACKET Your_Discriminator mapped session: " <<
145 : session_bydesc->toString());
146 0 : return NULL;
147 : }
148 : }
149 102 : return session_bydesc;
150 : }
151 :
152 : // Use ifindex for single hop and vrfindex for multihop sessions.
153 13 : Session *session = session_manager_.SessionByKey(
154 13 : SessionKey(packet->remote_endpoint.address(), session_index,
155 13 : packet->local_endpoint.port(),
156 13 : packet->local_endpoint.address()));
157 :
158 : // Try with 0.0.0.0 local address
159 13 : if (!session) {
160 0 : session = session_manager_.SessionByKey(
161 0 : SessionKey(packet->remote_endpoint.address(), session_index,
162 0 : packet->local_endpoint.port()));
163 : }
164 13 : return session;
165 : }
166 :
167 0 : Session *Server::SessionByKey(const boost::asio::ip::address &address,
168 : const SessionIndex &index) {
169 0 : return session_manager_.SessionByKey(SessionKey(address, index));
170 : }
171 :
172 26 : Session *Server::SessionByKey(const SessionKey &key) const {
173 26 : return session_manager_.SessionByKey(key);
174 : }
175 :
176 10675 : Session *Server::SessionByKey(const SessionKey &key) {
177 10675 : return session_manager_.SessionByKey(key);
178 : }
179 :
180 115 : void Server::ProcessControlPacket(
181 : const boost::asio::ip::udp::endpoint &local_endpoint,
182 : const boost::asio::ip::udp::endpoint &remote_endpoint,
183 : const SessionIndex &session_index,
184 : const boost::asio::const_buffer &recv_buffer,
185 : std::size_t bytes_transferred, const boost::system::error_code& error) {
186 230 : EnqueueEvent(new Event(PROCESS_PACKET, local_endpoint, remote_endpoint,
187 115 : session_index, recv_buffer, bytes_transferred));
188 115 : }
189 :
190 115 : void Server::ProcessControlPacket(Event *event) {
191 115 : if (event->bytes_transferred != (std::size_t) kMinimalPacketLength) {
192 0 : LOG(ERROR, __func__ << "Wrong packet size: " <<
193 : event->bytes_transferred);
194 0 : return;
195 : }
196 :
197 : boost::scoped_ptr<ControlPacket> packet(ParseControlPacket(
198 115 : boost::asio::buffer_cast<const uint8_t *>(event->recv_buffer),
199 115 : event->bytes_transferred));
200 115 : if (packet == NULL) {
201 0 : LOG(ERROR, __func__ << "Unable to parse packet");
202 0 : return;
203 : }
204 115 : packet->local_endpoint = event->local_endpoint;
205 115 : packet->remote_endpoint = event->remote_endpoint;
206 115 : packet->session_index = event->session_index;
207 115 : ProcessControlPacketActual(packet.get());
208 115 : delete[] boost::asio::buffer_cast<const uint8_t *>(event->recv_buffer);
209 115 : }
210 :
211 115 : ResultCode Server::ProcessControlPacketActual(const ControlPacket *packet) {
212 : ResultCode result;
213 115 : result = packet->Verify();
214 115 : if (result != kResultCode_Ok) {
215 0 : LOG(ERROR, "Wrong packet: " << result);
216 0 : return result;
217 : }
218 115 : Session *session = NULL;
219 115 : session = GetSession(packet);
220 115 : if (session == NULL) {
221 8 : LOG(ERROR, "Unknown session: " <<
222 : packet->remote_endpoint.address().to_string() << "/" <<
223 : packet->receiver_discriminator);
224 8 : return kResultCode_UnknownSession;
225 : }
226 107 : session->Stats().rx_count++;
227 107 : result = session->ProcessControlPacket(packet);
228 107 : if (result != kResultCode_Ok) {
229 0 : LOG(ERROR, "Unable to process session: result " << result
230 : << ", session: " << session->toString());
231 0 : session->Stats().rx_error_count++;
232 0 : return result;
233 : }
234 107 : return kResultCode_Ok;
235 : }
236 :
237 28 : ResultCode Server::ConfigureSession(const SessionKey &key,
238 : const SessionConfig &config,
239 : Discriminator *assignedDiscriminator) {
240 28 : return session_manager_.ConfigureSession(key, config, communicator_,
241 28 : assignedDiscriminator);
242 : }
243 :
244 25 : ResultCode Server::RemoveSessionReference(const SessionKey &key) {
245 25 : return session_manager_.RemoveSessionReference(key);
246 : }
247 :
248 102 : Session *Server::SessionManager::SessionByDiscriminator(
249 : Discriminator discriminator) {
250 : DiscriminatorSessionMap::const_iterator it =
251 102 : by_discriminator_.find(discriminator);
252 102 : if (it == by_discriminator_.end())
253 8 : return NULL;
254 94 : return it->second;
255 : }
256 :
257 10754 : Session *Server::SessionManager::SessionByKey(const SessionKey &key) {
258 10754 : KeySessionMap::const_iterator it = by_key_.find(key);
259 10754 : return it != by_key_.end() ? it->second : NULL;
260 : }
261 :
262 26 : Session *Server::SessionManager::SessionByKey(const SessionKey &key) const {
263 26 : KeySessionMap::const_iterator it = by_key_.find(key);
264 26 : return it != by_key_.end() ? it->second : NULL;
265 : }
266 :
267 25 : ResultCode Server::SessionManager::RemoveSessionReference(
268 : const SessionKey &key) {
269 25 : Session *session = SessionByKey(key);
270 25 : if (session == NULL) {
271 0 : LOG(DEBUG, __FUNCTION__ << " No such session: " << key.to_string());
272 0 : return kResultCode_UnknownSession;
273 : }
274 :
275 25 : if (!--refcounts_[session]) {
276 25 : by_discriminator_.erase(session->local_discriminator());
277 25 : by_key_.erase(key);
278 25 : delete session;
279 : }
280 :
281 25 : return kResultCode_Ok;
282 : }
283 :
284 28 : ResultCode Server::SessionManager::ConfigureSession(const SessionKey &key,
285 : const SessionConfig &config, Connection *communicator,
286 : Discriminator *assignedDiscriminator) {
287 28 : Session *session = SessionByKey(key);
288 28 : if (session) {
289 3 : session->UpdateConfig(config);
290 :
291 3 : LOG(INFO, __func__ << ": UpdateConfig : "
292 : << session->key().to_string() << "/"
293 : << session->local_discriminator());
294 :
295 3 : return kResultCode_Ok;
296 : }
297 :
298 25 : *assignedDiscriminator = GenerateUniqueDiscriminator();
299 25 : session = new Session(*assignedDiscriminator, key, evm_, config,
300 25 : communicator);
301 :
302 25 : by_discriminator_[*assignedDiscriminator] = session;
303 25 : by_key_[key] = session;
304 25 : refcounts_[session] = 1;
305 :
306 25 : LOG(INFO, __func__ << ": New session configured: " << key.to_string() << "/"
307 : << *assignedDiscriminator);
308 :
309 25 : return kResultCode_Ok;
310 : }
311 :
312 25 : Discriminator Server::SessionManager::GenerateUniqueDiscriminator() {
313 : class DiscriminatorGenerator {
314 : public:
315 1 : DiscriminatorGenerator() {
316 1 : next_ = gen()%0x1000000 + 1;
317 1 : }
318 :
319 25 : Discriminator Next() {
320 50 : return next_.fetch_add(1);
321 : }
322 :
323 : private:
324 : std::atomic<Discriminator> next_;
325 : boost::random::mt19937 gen;
326 : };
327 :
328 25 : static DiscriminatorGenerator generator;
329 :
330 25 : return generator.Next();
331 : }
332 :
333 21 : Server::SessionManager::~SessionManager() {
334 21 : for (DiscriminatorSessionMap::iterator it = by_discriminator_.begin();
335 21 : it != by_discriminator_.end(); ++it) {
336 0 : it->second->Stop();
337 0 : delete it->second;
338 : }
339 21 : }
340 : } // namespace BFD
|