Line data Source code
1 : /* 2 : * Copyright (c) 2013 Juniper Networks, Inc. All rights reserved. 3 : */ 4 : 5 : #include "xmpp/xmpp_channel_mux.h" 6 : 7 : #include <boost/foreach.hpp> 8 : 9 : #include "base/task_annotations.h" 10 : #include "xmpp/xmpp_init.h" 11 : #include "xmpp/xmpp_connection.h" 12 : 13 : using namespace std; 14 : using namespace xmsm; 15 : 16 11615 : XmppChannelMux::XmppChannelMux(XmppConnection *connection) 17 11615 : : connection_(connection), rx_message_trace_cb_(NULL), 18 23230 : tx_message_trace_cb_(NULL) { 19 11615 : last_received_ = 0; 20 11615 : last_sent_ = 0; 21 11615 : } 22 : 23 11725 : XmppChannelMux::~XmppChannelMux() { 24 11615 : assert(map_.empty()); 25 11725 : } 26 : 27 547 : void XmppChannelMux::Close() { 28 547 : connection_->Clear(); 29 547 : } 30 : 31 8860 : bool XmppChannelMux::LastReceived(time_t duration) const { 32 8860 : return (UTCTimestamp() - last_received_) <= duration; 33 : } 34 : 35 2772 : bool XmppChannelMux::LastSent(time_t duration) const { 36 2772 : return (UTCTimestamp() - last_sent_) <= duration; 37 : } 38 : 39 3587559 : xmps::PeerState XmppChannelMux::GetPeerState() const { 40 3587559 : xmsm::XmState st = connection_->GetStateMcState(); 41 3587536 : return (st == xmsm::ESTABLISHED) ? xmps::READY : 42 3587536 : xmps::NOT_READY; 43 : } 44 : 45 1 : void XmppChannelMux::WriteReady(const boost::system::error_code &ec) { 46 1 : std::scoped_lock lock(mutex_); 47 : 48 1 : WriteReadyCbMap::iterator iter = map_.begin(); 49 1 : WriteReadyCbMap::iterator next = iter; 50 2 : for (; iter != map_.end(); iter = next) { 51 1 : ++next; 52 1 : SendReadyCb cb = iter->second; 53 1 : cb(ec); 54 1 : map_.erase(iter); 55 1 : } 56 1 : } 57 : 58 1652698 : bool XmppChannelMux::Send(const uint8_t *msg, size_t msgsize, 59 : const string *msg_str, xmps::PeerId id, 60 : SendReadyCb cb) { 61 1652698 : if (!connection_) return false; 62 : 63 1652698 : std::scoped_lock lock(mutex_); 64 1653156 : last_sent_ = UTCTimestamp(); 65 1653283 : bool res = connection_->Send(msg, msgsize, msg_str); 66 1653085 : if (res == false) { 67 78 : RegisterWriteReady(id, cb); 68 : } 69 1653085 : return res; 70 1653085 : } 71 : 72 22378 : int XmppChannelMux::GetTaskInstance() const { 73 22378 : return connection_->GetTaskInstance(); 74 : } 75 : 76 10003 : void XmppChannelMux::RegisterReferer(xmps::PeerId id) { 77 10003 : referers_.insert(id); 78 10003 : } 79 : 80 10003 : void XmppChannelMux::UnRegisterReferer(xmps::PeerId id) { 81 10003 : referers_.erase(id); 82 10003 : } 83 : 84 13964 : void XmppChannelMux::RegisterReceive(xmps::PeerId id, ReceiveCb cb) { 85 13964 : rxmap_.insert(make_pair(id, cb)); 86 13964 : } 87 : 88 14084 : void XmppChannelMux::UnRegisterReceive(xmps::PeerId id) { 89 14084 : ReceiveCbMap::iterator it = rxmap_.find(id); 90 14084 : if (it != rxmap_.end()) { 91 13956 : rxmap_.erase(it); 92 : } 93 : 94 14084 : if (ReceiverCount()) 95 4079 : return; 96 : 97 : XmppServerConnection *server_connection = 98 10258 : dynamic_cast<XmppServerConnection *>(connection_); 99 : 100 : // If GracefulRestart helper mode close process is complete, restart the 101 : // state machine to form new session with the client. 102 10536 : if (!connection_->IsDeleted() && server_connection && 103 278 : server_connection->server()->IsGRHelperModeEnabled()) { 104 253 : server_connection->state_machine()->Initialize(); 105 253 : return; 106 : } 107 : 108 10005 : connection_->RetryDelete(); 109 : } 110 : 111 11676 : size_t XmppChannelMux::RefererCount() const { 112 11676 : return referers_.size(); 113 : } 114 : 115 31929 : size_t XmppChannelMux::ReceiverCount() const { 116 31929 : return rxmap_.size(); 117 : } 118 : 119 72 : vector<string> XmppChannelMux::GetReceiverList() const { 120 72 : vector<string> receivers; 121 96 : for (const auto& value : rxmap_) { 122 24 : receivers.push_back(xmps::PeerIdToName(value.first)); 123 : } 124 72 : return receivers; 125 0 : } 126 : 127 : // 128 : // To be called after acquiring mutex 129 : // 130 79 : void XmppChannelMux::RegisterWriteReady(xmps::PeerId id, SendReadyCb cb) { 131 79 : map_.insert(make_pair(id, cb)); 132 79 : } 133 : 134 : // 135 : // To be called after acquiring mutex 136 : // 137 13860 : void XmppChannelMux::UnRegisterWriteReady(xmps::PeerId id) { 138 13860 : map_.erase(id); 139 13860 : } 140 : 141 1708441 : const std::string &XmppChannelMux::ToString() const { 142 1708441 : return connection_->ToString(); 143 : } 144 : 145 0 : const std::string &XmppChannelMux::FromString() const { 146 0 : return connection_->FromString(); 147 : } 148 : 149 974 : std::string XmppChannelMux::StateName() const { 150 974 : return connection_->StateName(); 151 : } 152 : 153 0 : std::string XmppChannelMux::AuthType() const { 154 0 : return connection_->GetXmppAuthenticationType(); 155 : } 156 : 157 0 : std::string XmppChannelMux::PeerAddress() const { 158 0 : return connection_->endpoint_string(); 159 : } 160 : 161 3228212 : inline bool MatchCallback(string to, xmps::PeerId peer) { 162 3228212 : if ((to.find(XmppInit::kBgpPeer) != string::npos) && 163 : (peer == xmps::BGP)) { 164 1642534 : return true; 165 : } 166 1585684 : if ((to.find(XmppInit::kConfigPeer) != string::npos) && 167 : (peer == xmps::CONFIG)) { 168 0 : return true; 169 : } 170 1585684 : if ((to.find(XmppInit::kDnsPeer) != string::npos) && 171 : (peer == xmps::DNS)) { 172 0 : return true; 173 : } 174 1585684 : if ((to.find(XmppInit::kOtherPeer) != string::npos) && 175 : (peer == xmps::OTHER)) { 176 4 : return true; 177 : } 178 1585680 : return false; 179 : } 180 : 181 1642709 : void XmppChannelMux::ProcessXmppMessage(const XmppStanza::XmppMessage *msg) { 182 1642709 : last_received_ = UTCTimestamp(); 183 1642710 : ReceiveCbMap::iterator iter = rxmap_.begin(); 184 4870925 : for (; iter != rxmap_.end(); ++iter) { 185 3228212 : if (MatchCallback(msg->to, iter->first)) { 186 1642538 : ReceiveCb cb = iter->second; 187 1642537 : cb(msg, GetPeerState()); 188 1642536 : } 189 : } 190 1642708 : } 191 : 192 25216 : void XmppChannelMux::HandleStateEvent(xmsm::XmState state) { 193 25216 : CHECK_CONCURRENCY("xmpp::StateMachine"); 194 25210 : xmps::PeerState st = xmps::NOT_READY; 195 25210 : if (state == xmsm::ESTABLISHED) { 196 12621 : st = xmps::READY; 197 12589 : } else if (state == xmsm::ACTIVE) { 198 234 : st = xmps::TIMEDOUT; 199 : } 200 : 201 25210 : if (connection_->IsClient()) { 202 12302 : XmppClient *client = static_cast<XmppClient *>(connection_->server()); 203 12302 : client->NotifyConnectionEvent(this, st); 204 : } else { 205 : // Event to create the peer on server 206 12908 : XmppServer *server = static_cast<XmppServer *>(connection_->server()); 207 12915 : server->NotifyConnectionEvent(this, st); 208 : } 209 25219 : } 210 : 211 493 : std::string XmppChannelMux::LastStateName() const { 212 493 : return connection_->LastStateName(); 213 : } 214 487 : std::string XmppChannelMux::LastStateChangeAt() const { 215 487 : return connection_->LastStateChangeAt(); 216 : } 217 487 : std::string XmppChannelMux::LastEvent() const { 218 487 : return connection_->LastEvent(); 219 : } 220 590 : uint32_t XmppChannelMux::rx_open() const { 221 590 : return connection_->rx_open(); 222 : } 223 590 : uint32_t XmppChannelMux::rx_close() const { 224 590 : return connection_->rx_close(); 225 : } 226 590 : uint32_t XmppChannelMux::rx_update() const { 227 590 : return connection_->rx_update(); 228 : } 229 590 : uint32_t XmppChannelMux::rx_keepalive() const { 230 590 : return connection_->rx_keepalive(); 231 : } 232 590 : uint32_t XmppChannelMux::tx_open() const { 233 590 : return connection_->tx_open(); 234 : } 235 590 : uint32_t XmppChannelMux::tx_close() const { 236 590 : return connection_->tx_close(); 237 : } 238 590 : uint32_t XmppChannelMux::tx_update() const { 239 590 : return connection_->tx_update(); 240 : } 241 590 : uint32_t XmppChannelMux::tx_keepalive() const { 242 590 : return connection_->tx_keepalive(); 243 : } 244 1461 : uint32_t XmppChannelMux::FlapCount() const { 245 1461 : return connection_->flap_count(); 246 : } 247 1461 : std::string XmppChannelMux::LastFlap() const { 248 1461 : return connection_->last_flap_at(); 249 : } 250 : 251 2 : void XmppChannelMux::RegisterRxMessageTraceCallback(RxMessageTraceCb cb) { 252 2 : rx_message_trace_cb_ = cb; 253 2 : } 254 2 : void XmppChannelMux::RegisterTxMessageTraceCallback(TxMessageTraceCb cb) { 255 2 : tx_message_trace_cb_ = cb; 256 2 : } 257 : 258 1662911 : bool XmppChannelMux::RxMessageTrace(const std::string &to_address, 259 : int port, 260 : int msg_size, 261 : const std::string &msg, 262 : const XmppStanza::XmppMessage *xmpp_msg) { 263 1662911 : if (rx_message_trace_cb_) { 264 0 : return rx_message_trace_cb_(to_address, port, msg_size, msg, xmpp_msg); 265 : } 266 1662910 : return false; 267 : } 268 : 269 1652824 : bool XmppChannelMux::TxMessageTrace(const std::string &to_address, 270 : int port, 271 : int msg_size, 272 : const std::string &msg, 273 : const XmppStanza::XmppMessage *xmpp_msg) { 274 1652824 : if (tx_message_trace_cb_) { 275 0 : return tx_message_trace_cb_(to_address, port, msg_size, msg, xmpp_msg); 276 : } 277 1652806 : return false; 278 : }