Line data Source code
1 : /*
2 : * Copyright (c) 2013 Juniper Networks, Inc. All rights reserved.
3 : */
4 :
5 : #include "xmpp/xmpp_server.h"
6 :
7 : #include <boost/foreach.hpp>
8 : #include <boost/tuple/tuple.hpp>
9 :
10 : #include "base/task_annotations.h"
11 : #include "xmpp/xmpp_connection.h"
12 : #include "xmpp/xmpp_factory.h"
13 : #include "xmpp/xmpp_lifetime.h"
14 : #include "xmpp/xmpp_log.h"
15 : #include "xmpp/xmpp_sandesh.h"
16 : #include "xmpp/xmpp_session.h"
17 :
18 : #include "sandesh/request_pipeline.h"
19 : #include "sandesh/common/vns_types.h"
20 : #include "sandesh/common/vns_constants.h"
21 : #include "sandesh/xmpp_server_types.h"
22 : #include "sandesh/xmpp_trace_sandesh_types.h"
23 : #include "sandesh/xmpp_client_server_sandesh_types.h"
24 :
25 : using namespace std;
26 : using namespace boost::asio;
27 : using boost::tie;
28 :
29 : #define DEFAULT_XMPP_HOLD_TIME 90
30 : class XmppServer::DeleteActor : public LifetimeActor {
31 : public:
32 2158 : DeleteActor(XmppServer *server)
33 2158 : : LifetimeActor(server->lifetime_manager()), server_(server) {
34 2158 : }
35 2156 : virtual bool MayDelete() const {
36 2156 : CHECK_CONCURRENCY("bgp::Config");
37 2156 : return server_->MayDelete();
38 : }
39 2156 : virtual void Shutdown() {
40 2156 : CHECK_CONCURRENCY("bgp::Config");
41 2156 : server_->SessionShutdown();
42 2156 : }
43 2156 : virtual void Destroy() {
44 2156 : CHECK_CONCURRENCY("bgp::Config");
45 2156 : server_->Terminate();
46 2156 : }
47 :
48 : private:
49 : XmppServer *server_;
50 : };
51 :
52 :
53 414 : XmppServer::XmppServer(EventManager *evm, const string &server_addr,
54 414 : const XmppChannelConfig *config)
55 : : XmppConnectionManager(
56 414 : evm, ssl::context::sslv23_server, config->auth_enabled, true),
57 414 : max_connections_(0),
58 414 : lifetime_manager_(XmppStaticObjectFactory::Create<XmppLifetimeManager>(
59 : TaskScheduler::GetInstance()->GetTaskId("bgp::Config"))),
60 414 : deleter_(new DeleteActor(this)),
61 414 : server_addr_(server_addr),
62 414 : log_uve_(false),
63 414 : auth_enabled_(config->auth_enabled),
64 414 : tcp_hold_time_(config->tcp_hold_time),
65 414 : gr_helper_disable_(config->gr_helper_disable),
66 414 : dscp_value_(0),
67 414 : connection_queue_(TaskScheduler::GetInstance()->GetTaskId("bgp::Config"),
68 1242 : 0, boost::bind(&XmppServer::DequeueConnection, this, _1)) {
69 :
70 414 : if (config->auth_enabled) {
71 :
72 : // Get SSL context from base class and update
73 284 : boost::asio::ssl::context *ctx = context();
74 284 : boost::system::error_code ec;
75 :
76 : // set mode
77 284 : ctx->set_options(ssl::context::default_workarounds |
78 : ssl::context::no_sslv3 | ssl::context::no_sslv2, ec);
79 284 : if (ec.value() != 0) {
80 0 : LOG(ERROR, "Error : " << ec.message() << ", setting ssl options");
81 0 : exit(EINVAL);
82 : }
83 :
84 : // CA certificate, used to verify if the peer certificate
85 : // is signed by a trusted CA
86 284 : std::string ca_cert_filename = config->path_to_ca_cert;
87 284 : if (!ca_cert_filename.empty()) {
88 :
89 : // Verify peer has CA signed certificate
90 0 : ctx->set_verify_mode(boost::asio::ssl::verify_peer, ec);
91 0 : if (ec.value() != 0) {
92 0 : LOG(ERROR, "Error : " << ec.message()
93 : << ", while setting ssl verification mode");
94 0 : exit(EINVAL);
95 : }
96 :
97 0 : ctx->load_verify_file(config->path_to_ca_cert, ec);
98 0 : if (ec.value() != 0) {
99 0 : LOG(ERROR, "Error : " << ec.message()
100 : << ", while using cacert file : "
101 : << config->path_to_ca_cert);
102 0 : exit(EINVAL);
103 : }
104 : }
105 :
106 : // server certificate
107 284 : ctx->use_certificate_file(config->path_to_server_cert,
108 : boost::asio::ssl::context::pem, ec);
109 284 : if (ec.value() != 0) {
110 0 : LOG(ERROR, "Error : " << ec.message()
111 : << ", while using server cert file : "
112 : << config->path_to_server_cert);
113 0 : exit(EINVAL);
114 : }
115 :
116 : // server private key
117 284 : ctx->use_private_key_file(config->path_to_server_priv_key,
118 : boost::asio::ssl::context::pem, ec);
119 284 : if (ec.value() != 0) {
120 0 : LOG(ERROR, "Error : " << ec.message()
121 : << ", while using privkey file : "
122 : << config->path_to_server_priv_key);
123 0 : exit(EINVAL);
124 : }
125 284 : }
126 414 : }
127 :
128 : class XmppConfigUpdater {
129 : public:
130 2078 : explicit XmppConfigUpdater(XmppServer *server,
131 2078 : BgpConfigManager *config_manager) :
132 2078 : server_(server) {
133 2078 : BgpConfigManager::Observers obs;
134 : obs.system= boost::bind(&XmppConfigUpdater::ProcessGlobalSystemConfig,
135 2078 : this, _1, _2);
136 : obs.protocol = boost::bind(&XmppConfigUpdater::ProcessProtocolConfig,
137 2078 : this, _1, _2);
138 2078 : config_manager->RegisterObservers(obs);
139 2078 : }
140 :
141 30126 : const BgpGlobalSystemConfig &config() const { return config_; }
142 :
143 2687 : void ProcessProtocolConfig(const BgpProtocolConfig *protocol_config,
144 : BgpConfigManager::EventType event) {
145 :
146 2687 : if (server_->subcluster_name() != protocol_config->subcluster_name()) {
147 8 : server_->set_subcluster_name(protocol_config->subcluster_name());
148 8 : server_->ClearAllConnections();
149 : }
150 2687 : }
151 :
152 472 : void ProcessGlobalSystemConfig(const BgpGlobalSystemConfig *system,
153 : BgpConfigManager::EventType event) {
154 : // Clear peers only if GR is or was enabled.
155 472 : bool clear_peers = config_.gr_enable() || system->gr_enable();
156 472 : bool update_peers = false;
157 472 : config_.set_gr_enable(system->gr_enable());
158 472 : config_.set_gr_time(system->gr_time());
159 472 : config_.set_llgr_time(system->llgr_time());
160 472 : config_.set_end_of_rib_timeout(system->end_of_rib_timeout());
161 472 : config_.set_gr_xmpp_helper(system->gr_xmpp_helper());
162 :
163 : // Process any change in xmpp-hold-time
164 936 : if (config_.fc_enabled() != system->fc_enabled() ||
165 464 : config_.xmpp_hold_time() != system->xmpp_hold_time()) {
166 16 : config_.set_xmpp_hold_time(system->xmpp_hold_time());
167 16 : config_.set_fc_enabled(system->fc_enabled());
168 : // if fc_enabled is not set, use default hold time
169 16 : if (!system->fc_enabled())
170 0 : config_.set_xmpp_hold_time(DEFAULT_XMPP_HOLD_TIME);
171 16 : update_peers = true;
172 : }
173 :
174 : // Process any change in rd-cluster-seed knob
175 472 : if (config_.rd_cluster_seed() != system->rd_cluster_seed()) {
176 22 : config_.set_rd_cluster_seed(system->rd_cluster_seed());
177 22 : clear_peers = true;
178 : }
179 :
180 472 : if (clear_peers)
181 452 : server_->ClearAllConnections();
182 20 : else if (update_peers)
183 16 : server_->UpdateAllConnections(config_.xmpp_hold_time());
184 472 : }
185 :
186 : const string subcluster_name() const { return server_->subcluster_name(); }
187 : void set_subcluster_name(const string& name) {
188 : server_->set_subcluster_name(name);
189 : }
190 :
191 6540 : uint8_t xmpp_hold_time() const { return config_.xmpp_hold_time(); }
192 : void set_xmpp_hold_time(int hold_time) {
193 : config_.set_xmpp_hold_time(hold_time);
194 : }
195 :
196 : private:
197 : XmppServer *server_;
198 : BgpGlobalSystemConfig config_;
199 : };
200 :
201 1737 : XmppServer::XmppServer(EventManager *evm, const string &server_addr)
202 : : XmppConnectionManager(evm, ssl::context::sslv23_server, false, false),
203 1737 : max_connections_(0),
204 1737 : lifetime_manager_(XmppStaticObjectFactory::Create<XmppLifetimeManager>(
205 : TaskScheduler::GetInstance()->GetTaskId("bgp::Config"))),
206 1737 : deleter_(new DeleteActor(this)),
207 1737 : server_addr_(server_addr),
208 1737 : log_uve_(false),
209 1737 : auth_enabled_(false),
210 1737 : tcp_hold_time_(XmppChannelConfig::kTcpHoldTime),
211 1737 : gr_helper_disable_(false),
212 1737 : xmpp_config_updater_(NULL),
213 1737 : dscp_value_(0),
214 1737 : connection_queue_(TaskScheduler::GetInstance()->GetTaskId("bgp::Config"),
215 5211 : 0, boost::bind(&XmppServer::DequeueConnection, this, _1)) {
216 1737 : }
217 :
218 :
219 7 : XmppServer::XmppServer(EventManager *evm)
220 : : XmppConnectionManager(evm, ssl::context::sslv23_server, false, false),
221 7 : max_connections_(0),
222 7 : lifetime_manager_(new LifetimeManager(
223 14 : TaskScheduler::GetInstance()->GetTaskId("bgp::Config"))),
224 7 : deleter_(new DeleteActor(this)),
225 7 : log_uve_(false),
226 7 : auth_enabled_(false),
227 7 : tcp_hold_time_(XmppChannelConfig::kTcpHoldTime),
228 7 : gr_helper_disable_(false),
229 7 : dscp_value_(0),
230 7 : connection_queue_(TaskScheduler::GetInstance()->GetTaskId("bgp::Config"),
231 14 : 0, boost::bind(&XmppServer::DequeueConnection, this, _1)) {
232 7 : }
233 :
234 2078 : void XmppServer::CreateConfigUpdater(BgpConfigManager *config_manager) {
235 2078 : xmpp_config_updater_.reset(new XmppConfigUpdater(this, config_manager));
236 2078 : }
237 :
238 195 : uint16_t XmppServer::GetGracefulRestartTime() const {
239 : // Check if GR is disabled..
240 195 : if (!xmpp_config_updater_ || !xmpp_config_updater_->config().gr_enable())
241 0 : return 0;
242 195 : return xmpp_config_updater_->config().gr_time();
243 : }
244 :
245 58 : uint32_t XmppServer::GetLongLivedGracefulRestartTime() const {
246 : // Check if GR is disabled..
247 58 : if (!xmpp_config_updater_ || !xmpp_config_updater_->config().gr_enable())
248 0 : return 0;
249 58 : return xmpp_config_updater_->config().llgr_time();
250 : }
251 :
252 6837 : uint32_t XmppServer::GetEndOfRibReceiveTime() const {
253 6837 : if (xmpp_config_updater_)
254 6837 : return xmpp_config_updater_->config().end_of_rib_timeout();
255 0 : return BgpGlobalSystemConfig::kEndOfRibTime;
256 : }
257 :
258 10958 : uint32_t XmppServer::GetEndOfRibSendTime() const {
259 10958 : if (xmpp_config_updater_)
260 10958 : xmpp_config_updater_->config().end_of_rib_timeout();
261 10959 : return BgpGlobalSystemConfig::kEndOfRibTime;
262 : }
263 :
264 10629 : bool XmppServer::IsGRHelperModeEnabled() const {
265 : // Check if disabled in .conf file.
266 10629 : if (gr_helper_disable_)
267 832 : return false;
268 :
269 : // Check from configuration.
270 9797 : if (!xmpp_config_updater_)
271 45 : return false;
272 :
273 : // Check if GR is disabled..
274 9752 : if (!xmpp_config_updater_->config().gr_enable())
275 8589 : return false;
276 :
277 1163 : return xmpp_config_updater_->config().gr_xmpp_helper();
278 : }
279 :
280 12080 : bool XmppServer::IsPeerCloseGraceful() const {
281 :
282 : // If the server is deleted, do not do graceful restart
283 12080 : if (deleter()->IsDeleted())
284 1721 : return false;
285 :
286 : // Check if GR helper mode is disabled.
287 10359 : if (!IsGRHelperModeEnabled())
288 9449 : return false;
289 :
290 : // Enable GR if either gr-time or llgr-time is configured.
291 910 : return (xmpp_config_updater_->config().gr_time() ||
292 910 : xmpp_config_updater_->config().llgr_time());
293 : }
294 :
295 3526 : XmppServer::~XmppServer() {
296 2158 : STLDeleteElements(&connection_endpoint_map_);
297 2158 : TcpServer::ClearSessions();
298 3526 : }
299 :
300 0 : bool XmppServer::Initialize(short port) {
301 0 : log_uve_ = false;
302 0 : return TcpServer::Initialize(port);
303 : }
304 :
305 2036 : bool XmppServer::Initialize(short port, bool logUVE) {
306 2036 : log_uve_ = logUVE;
307 2036 : return TcpServer::Initialize(port);
308 : }
309 :
310 120 : bool XmppServer::Initialize(short port, bool logUVE, const IpAddress& ip) {
311 120 : log_uve_ = logUVE;
312 120 : return TcpServer::Initialize(port, ip);
313 : }
314 :
315 : //
316 : // Can be removed after Shutdown is renamed to ManagedDelete.
317 : //
318 2156 : void XmppServer::SessionShutdown() {
319 2156 : XmppConnectionManager::Shutdown();
320 2156 : }
321 :
322 : //
323 : // Return true if it's possible to delete the XmppServer.
324 : //
325 : // No need to check the connection WorkQueue since XmppServerConnections on it
326 : // are dependents of the XmppServer.
327 : //
328 2156 : bool XmppServer::MayDelete() const {
329 2156 : return (GetSessionQueueSize() == 0);
330 : }
331 :
332 : //
333 : // Trigger deletion of the XmppServer.
334 : //
335 : // A mutex is used to ensure that we do not create new XmppServerConnections
336 : // after this point. Note that this routine and AcceptSession may be called
337 : // concurrently from 2 different threads in tests.
338 : //
339 2223 : void XmppServer::Shutdown() {
340 2223 : std::scoped_lock lock(deletion_mutex_);
341 2223 : deleter_->Delete();
342 2223 : }
343 :
344 : //
345 : // Called when the XmppServer delete actor is being destroyed.
346 : //
347 2156 : void XmppServer::Terminate() {
348 2156 : ClearSessions();
349 2156 : connection_queue_.Shutdown();
350 2156 : }
351 :
352 6626 : LifetimeActor *XmppServer::deleter() {
353 6626 : return deleter_.get();
354 : }
355 :
356 12080 : LifetimeActor *XmppServer::deleter() const {
357 12080 : return deleter_.get();
358 : }
359 :
360 8977 : LifetimeManager *XmppServer::lifetime_manager() {
361 8977 : return lifetime_manager_.get();
362 : }
363 :
364 22 : TcpSession *XmppServer::CreateSession() {
365 : typedef boost::asio::detail::socket_option::boolean<
366 : SOL_SOCKET, SO_REUSEADDR> reuse_addr_t;
367 22 : TcpSession *session = TcpServer::CreateSession();
368 22 : Socket *socket = session->socket();
369 :
370 22 : boost::system::error_code err;
371 22 : socket->open(ip::tcp::v4(), err);
372 22 : if (err) {
373 0 : XMPP_WARNING(ServerOpenFail, err.message());
374 : }
375 :
376 22 : socket->set_option(reuse_addr_t(true), err);
377 22 : if (err) {
378 0 : XMPP_WARNING(SetSockOptFail, "", XMPP_PEER_DIR_OUT, err.message());
379 : }
380 :
381 22 : socket->bind(LocalEndpoint(), err);
382 22 : if (err) {
383 22 : XMPP_WARNING(ServerBindFailure, err.message());
384 : }
385 :
386 22 : XmppSession *xmpps = static_cast<XmppSession *>(session);
387 22 : err = xmpps->EnableTcpKeepalive(tcp_hold_time_);
388 22 : if (err) {
389 0 : XMPP_WARNING(ServerKeepAliveFailure, session->ToUVEKey(),
390 : XMPP_PEER_DIR_OUT, err.message());
391 : }
392 :
393 22 : return session;
394 : }
395 :
396 0 : size_t XmppServer::ConnectionEventCount() const {
397 0 : return connection_event_map_.size();
398 : }
399 :
400 852 : size_t XmppServer::ConnectionMapSize() const {
401 852 : ReadLock lock(connection_map_mutex_);
402 1704 : return connection_map_.size();
403 852 : }
404 :
405 851 : size_t XmppServer::ConnectionCount() const {
406 851 : return ConnectionMapSize() + deleted_connection_set_.size();
407 : }
408 :
409 6529 : XmppServerConnection *XmppServer::FindConnection(Endpoint remote_endpoint) {
410 6529 : ReadLock lock(connection_map_mutex_);
411 6529 : ConnectionMap::iterator loc = connection_map_.find(remote_endpoint);
412 6529 : if (loc != connection_map_.end()) {
413 0 : return loc->second;
414 : }
415 6529 : return NULL;
416 6529 : }
417 :
418 639 : XmppServerConnection *XmppServer::FindConnection(const string &address) {
419 639 : ReadLock lock(connection_map_mutex_);
420 645 : for (auto& value : connection_map_) {
421 34 : if (value.second->ToString() == address)
422 28 : return value.second;
423 : }
424 611 : return NULL;
425 639 : }
426 :
427 56 : bool XmppServer::ClearConnection(const string &hostname) {
428 56 : ReadLock lock(connection_map_mutex_);
429 188 : for (auto& value : connection_map_) {
430 148 : if (value.second->GetComputeHostName() == hostname) {
431 16 : value.second->Clear();
432 16 : return true;
433 : }
434 : }
435 40 : return false;
436 56 : }
437 :
438 16 : void XmppServer::UpdateAllConnections(uint8_t time_out) {
439 16 : ReadLock lock(connection_map_mutex_);
440 40 : for (auto& value : connection_map_) {
441 24 : value.second->UpdateKeepAliveTimer(time_out);
442 : }
443 16 : }
444 :
445 5306 : void XmppServer::ClearAllConnections() {
446 5306 : ReadLock lock(connection_map_mutex_);
447 7416 : for (auto& value : connection_map_) {
448 2110 : value.second->Clear();
449 : }
450 5306 : }
451 :
452 2090 : void XmppServer::RegisterConnectionEvent(xmps::PeerId id,
453 : ConnectionEventCb cb) {
454 2090 : connection_event_map_.insert(make_pair(id, cb));
455 2090 : }
456 :
457 2078 : void XmppServer::UnRegisterConnectionEvent(xmps::PeerId id) {
458 2078 : ConnectionEventCbMap::iterator it = connection_event_map_.find(id);
459 2078 : if (it != connection_event_map_.end())
460 2078 : connection_event_map_.erase(it);
461 2078 : }
462 :
463 12916 : void XmppServer::NotifyConnectionEvent(XmppChannelMux *mux,
464 : xmps::PeerState state) {
465 12916 : ConnectionEventCbMap::iterator iter = connection_event_map_.begin();
466 25811 : for (; iter != connection_event_map_.end(); ++iter) {
467 12893 : ConnectionEventCb cb = iter->second;
468 12892 : cb(mux, state);
469 12895 : }
470 12918 : }
471 :
472 6575 : SslSession *XmppServer::AllocSession(SslSocket *socket) {
473 6575 : SslSession *session = new XmppSession(this, socket);
474 6575 : boost::system::error_code err;
475 6575 : XmppSession *xmpp_session = static_cast<XmppSession *>(session);
476 6575 : err = xmpp_session->EnableTcpKeepalive(tcp_hold_time_);
477 6575 : if (err) {
478 22 : XMPP_WARNING(ServerKeepAliveFailure, xmpp_session->ToUVEKey(),
479 : XMPP_PEER_DIR_OUT, err.message());
480 : }
481 6575 : return session;
482 : }
483 :
484 : //
485 : // Accept newly formed passive tcp session by creating necessary xmpp data
486 : // structures. We do so to make sure that if there is any error reported
487 : // over this tcp session, it can still be correctly handled, even though
488 : // the allocated xmpp data structures are not fully processed yet.
489 : //
490 6553 : bool XmppServer::AcceptSession(TcpSession *tcp_session) {
491 6553 : std::scoped_lock lock(deletion_mutex_);
492 6553 : if (deleter_->IsDeleted())
493 0 : return false;
494 :
495 6553 : XmppSession *session = dynamic_cast<XmppSession *>(tcp_session);
496 6553 : XmppServerConnection *connection = CreateConnection(session);
497 :
498 6553 : if (xmpp_config_updater_) {
499 13080 : connection->state_machine()->set_hold_time(
500 6540 : xmpp_config_updater_->xmpp_hold_time());
501 : }
502 : // Register event handler.
503 6553 : tcp_session->set_observer(boost::bind(&XmppStateMachine::OnSessionEvent,
504 : connection->state_machine(), _1, _2));
505 :
506 : // set async_read_ready as false
507 6553 : session->set_read_on_connect(false);
508 6553 : connection->set_session(session);
509 6553 : connection->state_machine()->set_session(session);
510 6553 : connection->set_on_work_queue();
511 6553 : connection_queue_.Enqueue(connection);
512 6553 : return true;
513 6553 : }
514 :
515 : //
516 : // Remove the given XmppServerConnection from the ConnectionMap.
517 : //
518 6553 : void XmppServer::RemoveConnection(XmppServerConnection *connection) {
519 6553 : CHECK_CONCURRENCY("bgp::Config");
520 :
521 6553 : assert(connection->IsDeleted());
522 6553 : Endpoint endpoint = connection->endpoint();
523 6553 : WriteLock lock(connection_map_mutex_);
524 6553 : ConnectionMap::iterator loc = connection_map_.find(endpoint);
525 6553 : assert(loc != connection_map_.end() && loc->second == connection);
526 6553 : connection_map_.erase(loc);
527 6553 : }
528 :
529 104 : void XmppServer::SwapXmppConnectionMapEntries(
530 : XmppConnection *connection1, XmppConnection *connection2) {
531 104 : WriteLock lock(connection_map_mutex_);
532 : ConnectionMap::iterator loc1 =
533 104 : connection_map_.find(connection1->endpoint());
534 104 : assert(loc1 != connection_map_.end());
535 : ConnectionMap::iterator loc2 =
536 104 : connection_map_.find(connection2->endpoint());
537 104 : assert(loc2 != connection_map_.end());
538 104 : swap(loc1->second, loc2->second);
539 104 : swap(loc1->second->endpoint(), loc2->second->endpoint());
540 104 : }
541 :
542 : //
543 : // Insert the given XmppServerConnection into the ConnectionMap.
544 : //
545 6553 : void XmppServer::InsertConnection(XmppServerConnection *connection) {
546 6553 : CHECK_CONCURRENCY("bgp::Config");
547 :
548 6553 : assert(!connection->IsDeleted());
549 6553 : connection->Initialize();
550 6553 : Endpoint endpoint = connection->endpoint();
551 6553 : ConnectionMap::iterator loc;
552 : bool result;
553 6553 : WriteLock lock(connection_map_mutex_);
554 6553 : tie(loc, result) = connection_map_.insert(make_pair(endpoint, connection));
555 6553 : assert(result);
556 6553 : max_connections_ = max(max_connections_, connection_map_.size());
557 6553 : }
558 :
559 : //
560 : // Create XmppConnnection and its associated data structures. This API is
561 : // only used to allocate data structures and initialize necessary fields.
562 : // The data structures are not populated to any maps in the XmppServer at
563 : // this point. However, the newly created XmppServerConnection does add
564 : // itself as a dependent of the XmppServer via LifetimeManager linkage.
565 : //
566 6553 : XmppServerConnection *XmppServer::CreateConnection(XmppSession *session) {
567 : XmppServerConnection *connection;
568 :
569 6553 : XmppChannelConfig cfg(false);
570 6553 : cfg.endpoint = session->remote_endpoint();
571 6553 : cfg.local_endpoint = session->local_endpoint();
572 6553 : cfg.FromAddr = server_addr_;
573 6553 : cfg.logUVE = log_uve_;
574 6553 : cfg.auth_enabled = auth_enabled_;
575 6553 : cfg.dscp_value = dscp_value_;
576 :
577 6553 : XMPP_DEBUG(XmppCreateConnection, session->ToUVEKey(), XMPP_PEER_DIR_OUT,
578 : session->ToString());
579 : connection = XmppStaticObjectFactory::Create<XmppServerConnection>
580 6553 : (this, static_cast<const XmppChannelConfig*>(&cfg));
581 :
582 6553 : return connection;
583 6553 : }
584 :
585 7 : void XmppServer::SetDscpValue(uint8_t value) {
586 7 : ReadLock lock(connection_map_mutex_);
587 7 : dscp_value_ = value;
588 18 : for (auto& value : connection_map_) {
589 11 : XmppServerConnection *connection = value.second;
590 11 : connection->SetDscpValue(dscp_value_);
591 : }
592 7 : }
593 : //
594 : // Handler for XmppServerConnections that are dequeued from the WorkQueue.
595 : //
596 : // Since the XmppServerConnections on the WorkQueue are dependents of the
597 : // XmppServer, we are guaranteed that the XmppServer won't get destroyed
598 : // before the connection WorkQueue is drained.
599 : //
600 6553 : bool XmppServer::DequeueConnection(XmppServerConnection *connection) {
601 6553 : CHECK_CONCURRENCY("bgp::Config");
602 6553 : connection->clear_on_work_queue();
603 :
604 : // This happens if the XmppServer got deleted while the XmppConnnection
605 : // was on the WorkQueue.
606 6553 : if (connection->IsDeleted()) {
607 24 : connection->RetryDelete();
608 24 : return true;
609 : }
610 :
611 6529 : XmppSession *session = connection->session();
612 6529 : connection->clear_session();
613 6529 : Endpoint remote_endpoint = session->remote_endpoint();
614 6529 : XmppServerConnection *old_connection = FindConnection(remote_endpoint);
615 :
616 : // Close as duplicate if we have a connection from the same Endpoint.
617 : // Otherwise go ahead and insert into the ConnectionMap. We may find
618 : // it has a conflicting Endpoint name and decide to terminate it when
619 : // we process the Open message.
620 6529 : if (old_connection) {
621 0 : XMPP_DEBUG(XmppCreateConnection, session->ToUVEKey(), XMPP_PEER_DIR_IN,
622 : "Close duplicate connection " + session->ToString());
623 : // Remove reference to the session from StateMachine directly. We don't
624 : // go through the entire normal cleanup pipeline for cleaning these
625 : // duplicate connections.
626 0 : assert(connection->state_machine()->session());
627 0 : assert(connection->state_machine()->session() == session);
628 0 : connection->state_machine()->RemoveSession();
629 0 : DeleteSession(session);
630 0 : connection->set_duplicate();
631 0 : connection->ManagedDelete();
632 0 : InsertDeletedConnection(connection);
633 : } else {
634 6529 : InsertConnection(connection);
635 6529 : connection->AcceptSession(session);
636 : }
637 :
638 6529 : return true;
639 : }
640 :
641 675 : size_t XmppServer::GetConnectionQueueSize() const {
642 675 : return connection_queue_.Length();
643 : }
644 :
645 48 : void XmppServer::SetConnectionQueueDisable(bool disabled) {
646 48 : connection_queue_.set_disable(disabled);
647 48 : }
648 :
649 : //
650 : // Connection is marked deleted, add it to the ConnectionSet.
651 : //
652 6577 : void XmppServer::InsertDeletedConnection(XmppServerConnection *connection) {
653 6577 : CHECK_CONCURRENCY("bgp::Config");
654 :
655 6577 : assert(connection->IsDeleted());
656 6577 : ConnectionSet::iterator it;
657 : bool result;
658 6577 : tie(it, result) = deleted_connection_set_.insert(connection);
659 6577 : assert(result);
660 6577 : }
661 :
662 : //
663 : // Connection is being destroyed, remove it from the ConnectionSet.
664 : //
665 6577 : void XmppServer::RemoveDeletedConnection(XmppServerConnection *connection) {
666 6577 : CHECK_CONCURRENCY("bgp::Config");
667 :
668 6577 : assert(connection->IsDeleted());
669 6577 : ConnectionSet::iterator it = deleted_connection_set_.find(connection);
670 6577 : assert(it != deleted_connection_set_.end());
671 6577 : deleted_connection_set_.erase(it);
672 6577 : ReleaseConnectionEndpoint(connection);
673 6577 : }
674 :
675 11797 : XmppConnectionEndpoint *XmppServer::FindConnectionEndpoint(
676 : const string &endpoint_name) {
677 11797 : std::scoped_lock lock(endpoint_map_mutex_);
678 : ConnectionEndpointMap::const_iterator loc =
679 11797 : connection_endpoint_map_.find(endpoint_name);
680 23593 : return (loc != connection_endpoint_map_.end() ? loc->second : NULL);
681 11796 : }
682 :
683 6311 : XmppConnectionEndpoint *XmppServer::LocateConnectionEndpoint(
684 : XmppServerConnection *connection, bool &created) {
685 6311 : created = false;
686 6311 : if (!connection)
687 0 : return NULL;
688 :
689 6311 : std::scoped_lock lock(endpoint_map_mutex_);
690 :
691 : ConnectionEndpointMap::const_iterator loc =
692 6311 : connection_endpoint_map_.find(connection->ToString());
693 : XmppConnectionEndpoint *conn_endpoint;
694 :
695 6310 : if (loc != connection_endpoint_map_.end()) {
696 1435 : conn_endpoint = loc->second;
697 1435 : if (!conn_endpoint->connection()) {
698 1331 : created = true;
699 1331 : conn_endpoint->set_connection(connection);
700 1331 : connection->set_conn_endpoint(conn_endpoint);
701 : }
702 1435 : return conn_endpoint;
703 : }
704 :
705 4875 : created = true;
706 4875 : conn_endpoint = new XmppConnectionEndpoint(connection->ToString());
707 : bool result;
708 4875 : tie(loc, result) = connection_endpoint_map_.insert(
709 9750 : make_pair(connection->ToString(), conn_endpoint));
710 4876 : assert(result);
711 4876 : conn_endpoint->set_connection(connection);
712 4876 : connection->set_conn_endpoint(conn_endpoint);
713 4876 : return conn_endpoint;
714 6311 : }
715 :
716 : //
717 : // Remove association of the given XmppConnectionEndpoint with XmppConnection.
718 : // This method is provided just to make things symmetrical - the caller could
719 : // simply have called XmppConnectionEndpoint::reset_connection directly.
720 : //
721 13130 : void XmppServer::ReleaseConnectionEndpoint(XmppServerConnection *connection) {
722 13130 : std::scoped_lock lock(endpoint_map_mutex_);
723 :
724 13130 : if (!connection->conn_endpoint())
725 6922 : return;
726 6208 : assert(connection->conn_endpoint()->connection() == connection);
727 6208 : connection->conn_endpoint()->reset_connection();
728 6208 : connection->set_conn_endpoint(NULL);
729 13130 : }
730 :
731 40 : void XmppServer::FillShowConnections(
732 : vector<ShowXmppConnection> *show_connection_list) const {
733 40 : ReadLock lock(connection_map_mutex_);
734 88 : for (const auto& value : connection_map_) {
735 48 : const XmppServerConnection *connection = value.second;
736 48 : ShowXmppConnection show_connection;
737 48 : connection->FillShowInfo(&show_connection);
738 48 : show_connection_list->push_back(show_connection);
739 48 : }
740 64 : for (const auto& connection : deleted_connection_set_) {
741 24 : ShowXmppConnection show_connection;
742 24 : connection->FillShowInfo(&show_connection);
743 24 : show_connection_list->push_back(show_connection);
744 24 : }
745 40 : }
746 :
747 1 : void XmppServer::FillShowServer(ShowXmppServerResp *resp) const {
748 1 : SocketIOStats peer_socket_stats;
749 1 : GetRxSocketStats(&peer_socket_stats);
750 1 : resp->set_rx_socket_stats(peer_socket_stats);
751 1 : GetTxSocketStats(&peer_socket_stats);
752 1 : resp->set_tx_socket_stats(peer_socket_stats);
753 1 : resp->set_current_connections(ConnectionMapSize());
754 1 : resp->set_max_connections(max_connections_);
755 1 : }
756 :
757 : class ShowXmppConnectionHandler {
758 : public:
759 40 : static bool CallbackS1(const Sandesh *sr,
760 : const RequestPipeline::PipeSpec ps, int stage, int instNum,
761 : RequestPipeline::InstData *data) {
762 : const ShowXmppConnectionReq *req =
763 40 : static_cast<const ShowXmppConnectionReq *>(ps.snhRequest_.get());
764 : XmppSandeshContext *xsc =
765 40 : dynamic_cast<XmppSandeshContext *>(req->module_context("XMPP"));
766 :
767 40 : ShowXmppConnectionResp *resp = new ShowXmppConnectionResp;
768 40 : vector<ShowXmppConnection> connections;
769 40 : if (xsc)
770 40 : xsc->xmpp_server->FillShowConnections(&connections);
771 40 : resp->set_connections(connections);
772 40 : resp->set_context(req->context());
773 40 : resp->Response();
774 40 : return true;
775 40 : }
776 : };
777 :
778 40 : void ShowXmppConnectionReq::HandleRequest() const {
779 40 : RequestPipeline::PipeSpec ps(this);
780 :
781 : // Request pipeline has single stage to collect connection info and
782 : // respond to the request.
783 40 : RequestPipeline::StageSpec s1;
784 40 : TaskScheduler *scheduler = TaskScheduler::GetInstance();
785 40 : s1.taskId_ = scheduler->GetTaskId("bgp::ShowCommand");
786 40 : s1.cbFn_ = ShowXmppConnectionHandler::CallbackS1;
787 40 : s1.instances_.push_back(0);
788 40 : ps.stages_.push_back(s1);
789 40 : RequestPipeline rp(ps);
790 40 : }
791 :
792 : class ClearXmppConnectionHandler {
793 : public:
794 48 : static bool CallbackS1(const Sandesh *sr,
795 : const RequestPipeline::PipeSpec ps, int stage, int instNum,
796 : RequestPipeline::InstData *data) {
797 :
798 : const ClearXmppConnectionReq *req =
799 48 : static_cast<const ClearXmppConnectionReq *>(ps.snhRequest_.get());
800 : XmppSandeshContext *xsc =
801 48 : dynamic_cast<XmppSandeshContext *>(req->module_context("XMPP"));
802 :
803 48 : ClearXmppConnectionResp *resp = new ClearXmppConnectionResp;
804 48 : if (!xsc || !xsc->test_mode) {
805 24 : resp->set_success(false);
806 24 : } else if (req->get_hostname_or_all() != "all") {
807 16 : if (xsc->xmpp_server->ClearConnection(req->get_hostname_or_all())) {
808 8 : resp->set_success(true);
809 : } else {
810 8 : resp->set_success(false);
811 : }
812 : } else {
813 8 : if (xsc->xmpp_server->ConnectionCount()) {
814 8 : xsc->xmpp_server->ClearAllConnections();
815 8 : resp->set_success(true);
816 : } else {
817 0 : resp->set_success(false);
818 : }
819 : }
820 :
821 48 : resp->set_context(req->context());
822 48 : resp->Response();
823 48 : return true;
824 : }
825 : };
826 :
827 48 : void ClearXmppConnectionReq::HandleRequest() const {
828 :
829 : // config task is used to create and delete connection objects.
830 : // hence use the same task to find the connection
831 48 : RequestPipeline::StageSpec s1;
832 48 : s1.taskId_ = TaskScheduler::GetInstance()->GetTaskId("bgp::Config");
833 48 : s1.instances_.push_back(0);
834 48 : s1.cbFn_ = ClearXmppConnectionHandler::CallbackS1;
835 :
836 48 : RequestPipeline::PipeSpec ps(this);
837 48 : ps.stages_.push_back(s1);
838 48 : RequestPipeline rp(ps);
839 48 : }
840 :
841 : class ShowXmppServerHandler {
842 : public:
843 1 : static bool CallbackS1(const Sandesh *sr,
844 : const RequestPipeline::PipeSpec ps, int stage, int instNum,
845 : RequestPipeline::InstData *data) {
846 : const ShowXmppServerReq *req =
847 1 : static_cast<const ShowXmppServerReq *>(ps.snhRequest_.get());
848 : XmppSandeshContext *xsc =
849 1 : dynamic_cast<XmppSandeshContext *>(req->module_context("XMPP"));
850 :
851 1 : ShowXmppServerResp *resp = new ShowXmppServerResp;
852 1 : if (xsc)
853 1 : xsc->xmpp_server->FillShowServer(resp);
854 1 : resp->set_context(req->context());
855 1 : resp->Response();
856 1 : return true;
857 : }
858 : };
859 :
860 1 : void ShowXmppServerReq::HandleRequest() const {
861 1 : RequestPipeline::PipeSpec ps(this);
862 1 : RequestPipeline::StageSpec s1;
863 1 : TaskScheduler *scheduler = TaskScheduler::GetInstance();
864 1 : s1.taskId_ = scheduler->GetTaskId("bgp::ShowCommand");
865 1 : s1.cbFn_ = ShowXmppServerHandler::CallbackS1;
866 1 : s1.instances_.push_back(0);
867 1 : ps.stages_.push_back(s1);
868 1 : RequestPipeline rp(ps);
869 1 : }
|