Line data Source code
1 : /* 2 : * Copyright (c) 2013 Juniper Networks, Inc. All rights reserved. 3 : */ 4 : 5 : #include "xmpp/xmpp_channel_mux.h" 6 : 7 : #include <boost/foreach.hpp> 8 : 9 : #include "base/task_annotations.h" 10 : #include "xmpp/xmpp_init.h" 11 : #include "xmpp/xmpp_connection.h" 12 : 13 : using namespace std; 14 : using namespace xmsm; 15 : 16 11600 : XmppChannelMux::XmppChannelMux(XmppConnection *connection) 17 11600 : : connection_(connection), rx_message_trace_cb_(NULL), 18 23200 : tx_message_trace_cb_(NULL) { 19 11600 : last_received_ = 0; 20 11600 : last_sent_ = 0; 21 11600 : } 22 : 23 11695 : XmppChannelMux::~XmppChannelMux() { 24 11600 : assert(map_.empty()); 25 11695 : } 26 : 27 545 : void XmppChannelMux::Close() { 28 545 : connection_->Clear(); 29 545 : } 30 : 31 9132 : bool XmppChannelMux::LastReceived(time_t duration) const { 32 9132 : return (UTCTimestamp() - last_received_) <= duration; 33 : } 34 : 35 2945 : bool XmppChannelMux::LastSent(time_t duration) const { 36 2945 : return (UTCTimestamp() - last_sent_) <= duration; 37 : } 38 : 39 3534661 : xmps::PeerState XmppChannelMux::GetPeerState() const { 40 3534661 : xmsm::XmState st = connection_->GetStateMcState(); 41 3534611 : return (st == xmsm::ESTABLISHED) ? xmps::READY : 42 3534611 : xmps::NOT_READY; 43 : } 44 : 45 1 : void XmppChannelMux::WriteReady(const boost::system::error_code &ec) { 46 1 : std::scoped_lock lock(mutex_); 47 : 48 1 : WriteReadyCbMap::iterator iter = map_.begin(); 49 1 : WriteReadyCbMap::iterator next = iter; 50 2 : for (; iter != map_.end(); iter = next) { 51 1 : ++next; 52 1 : SendReadyCb cb = iter->second; 53 1 : cb(ec); 54 1 : map_.erase(iter); 55 1 : } 56 1 : } 57 : 58 1626872 : bool XmppChannelMux::Send(const uint8_t *msg, size_t msgsize, 59 : const string *msg_str, xmps::PeerId id, 60 : SendReadyCb cb) { 61 1626872 : if (!connection_) return false; 62 : 63 1626872 : std::scoped_lock lock(mutex_); 64 1627293 : last_sent_ = UTCTimestamp(); 65 1627366 : bool res = connection_->Send(msg, msgsize, msg_str); 66 1627262 : if (res == false) { 67 72 : RegisterWriteReady(id, cb); 68 : } 69 1627262 : return res; 70 1627262 : } 71 : 72 22361 : int XmppChannelMux::GetTaskInstance() const { 73 22361 : return connection_->GetTaskInstance(); 74 : } 75 : 76 9997 : void XmppChannelMux::RegisterReferer(xmps::PeerId id) { 77 9997 : referers_.insert(id); 78 9997 : } 79 : 80 9997 : void XmppChannelMux::UnRegisterReferer(xmps::PeerId id) { 81 9997 : referers_.erase(id); 82 9997 : } 83 : 84 13942 : void XmppChannelMux::RegisterReceive(xmps::PeerId id, ReceiveCb cb) { 85 13942 : rxmap_.insert(make_pair(id, cb)); 86 13942 : } 87 : 88 14062 : void XmppChannelMux::UnRegisterReceive(xmps::PeerId id) { 89 14062 : ReceiveCbMap::iterator it = rxmap_.find(id); 90 14062 : if (it != rxmap_.end()) { 91 13938 : rxmap_.erase(it); 92 : } 93 : 94 14062 : if (ReceiverCount()) 95 4075 : return; 96 : 97 : XmppServerConnection *server_connection = 98 10240 : dynamic_cast<XmppServerConnection *>(connection_); 99 : 100 : // If GracefulRestart helper mode close process is complete, restart the 101 : // state machine to form new session with the client. 102 10510 : if (!connection_->IsDeleted() && server_connection && 103 270 : server_connection->server()->IsGRHelperModeEnabled()) { 104 253 : server_connection->state_machine()->Initialize(); 105 253 : return; 106 : } 107 : 108 9987 : connection_->RetryDelete(); 109 : } 110 : 111 11661 : size_t XmppChannelMux::RefererCount() const { 112 11661 : return referers_.size(); 113 : } 114 : 115 31883 : size_t XmppChannelMux::ReceiverCount() const { 116 31883 : return rxmap_.size(); 117 : } 118 : 119 72 : vector<string> XmppChannelMux::GetReceiverList() const { 120 72 : vector<string> receivers; 121 96 : for (const auto& value : rxmap_) { 122 24 : receivers.push_back(xmps::PeerIdToName(value.first)); 123 : } 124 72 : return receivers; 125 0 : } 126 : 127 : // 128 : // To be called after acquiring mutex 129 : // 130 73 : void XmppChannelMux::RegisterWriteReady(xmps::PeerId id, SendReadyCb cb) { 131 73 : map_.insert(make_pair(id, cb)); 132 73 : } 133 : 134 : // 135 : // To be called after acquiring mutex 136 : // 137 13838 : void XmppChannelMux::UnRegisterWriteReady(xmps::PeerId id) { 138 13838 : map_.erase(id); 139 13838 : } 140 : 141 1682719 : const std::string &XmppChannelMux::ToString() const { 142 1682719 : return connection_->ToString(); 143 : } 144 : 145 0 : const std::string &XmppChannelMux::FromString() const { 146 0 : return connection_->FromString(); 147 : } 148 : 149 974 : std::string XmppChannelMux::StateName() const { 150 974 : return connection_->StateName(); 151 : } 152 : 153 0 : std::string XmppChannelMux::AuthType() const { 154 0 : return connection_->GetXmppAuthenticationType(); 155 : } 156 : 157 0 : std::string XmppChannelMux::PeerAddress() const { 158 0 : return connection_->endpoint_string(); 159 : } 160 : 161 3176680 : inline bool MatchCallback(string to, xmps::PeerId peer) { 162 3176680 : if ((to.find(XmppInit::kBgpPeer) != string::npos) && 163 : (peer == xmps::BGP)) { 164 1616763 : return true; 165 : } 166 1559924 : if ((to.find(XmppInit::kConfigPeer) != string::npos) && 167 : (peer == xmps::CONFIG)) { 168 0 : return true; 169 : } 170 1559925 : if ((to.find(XmppInit::kDnsPeer) != string::npos) && 171 : (peer == xmps::DNS)) { 172 0 : return true; 173 : } 174 1559925 : if ((to.find(XmppInit::kOtherPeer) != string::npos) && 175 : (peer == xmps::OTHER)) { 176 4 : return true; 177 : } 178 1559921 : return false; 179 : } 180 : 181 1616948 : void XmppChannelMux::ProcessXmppMessage(const XmppStanza::XmppMessage *msg) { 182 1616948 : last_received_ = UTCTimestamp(); 183 1616949 : ReceiveCbMap::iterator iter = rxmap_.begin(); 184 4793634 : for (; iter != rxmap_.end(); ++iter) { 185 3176684 : if (MatchCallback(msg->to, iter->first)) { 186 1616766 : ReceiveCb cb = iter->second; 187 1616766 : cb(msg, GetPeerState()); 188 1616768 : } 189 : } 190 1616945 : } 191 : 192 25181 : void XmppChannelMux::HandleStateEvent(xmsm::XmState state) { 193 25181 : CHECK_CONCURRENCY("xmpp::StateMachine"); 194 25169 : xmps::PeerState st = xmps::NOT_READY; 195 25169 : if (state == xmsm::ESTABLISHED) { 196 12605 : st = xmps::READY; 197 12564 : } else if (state == xmsm::ACTIVE) { 198 220 : st = xmps::TIMEDOUT; 199 : } 200 : 201 25169 : if (connection_->IsClient()) { 202 12281 : XmppClient *client = static_cast<XmppClient *>(connection_->server()); 203 12281 : client->NotifyConnectionEvent(this, st); 204 : } else { 205 : // Event to create the peer on server 206 12888 : XmppServer *server = static_cast<XmppServer *>(connection_->server()); 207 12898 : server->NotifyConnectionEvent(this, st); 208 : } 209 25183 : } 210 : 211 487 : std::string XmppChannelMux::LastStateName() const { 212 487 : return connection_->LastStateName(); 213 : } 214 487 : std::string XmppChannelMux::LastStateChangeAt() const { 215 487 : return connection_->LastStateChangeAt(); 216 : } 217 487 : std::string XmppChannelMux::LastEvent() const { 218 487 : return connection_->LastEvent(); 219 : } 220 554 : uint32_t XmppChannelMux::rx_open() const { 221 554 : return connection_->rx_open(); 222 : } 223 554 : uint32_t XmppChannelMux::rx_close() const { 224 554 : return connection_->rx_close(); 225 : } 226 554 : uint32_t XmppChannelMux::rx_update() const { 227 554 : return connection_->rx_update(); 228 : } 229 554 : uint32_t XmppChannelMux::rx_keepalive() const { 230 554 : return connection_->rx_keepalive(); 231 : } 232 554 : uint32_t XmppChannelMux::tx_open() const { 233 554 : return connection_->tx_open(); 234 : } 235 554 : uint32_t XmppChannelMux::tx_close() const { 236 554 : return connection_->tx_close(); 237 : } 238 554 : uint32_t XmppChannelMux::tx_update() const { 239 554 : return connection_->tx_update(); 240 : } 241 554 : uint32_t XmppChannelMux::tx_keepalive() const { 242 554 : return connection_->tx_keepalive(); 243 : } 244 1461 : uint32_t XmppChannelMux::FlapCount() const { 245 1461 : return connection_->flap_count(); 246 : } 247 1461 : std::string XmppChannelMux::LastFlap() const { 248 1461 : return connection_->last_flap_at(); 249 : } 250 : 251 0 : void XmppChannelMux::RegisterRxMessageTraceCallback(RxMessageTraceCb cb) { 252 0 : rx_message_trace_cb_ = cb; 253 0 : } 254 0 : void XmppChannelMux::RegisterTxMessageTraceCallback(TxMessageTraceCb cb) { 255 0 : tx_message_trace_cb_ = cb; 256 0 : } 257 : 258 1637166 : bool XmppChannelMux::RxMessageTrace(const std::string &to_address, 259 : int port, 260 : int msg_size, 261 : const std::string &msg, 262 : const XmppStanza::XmppMessage *xmpp_msg) { 263 1637166 : if (rx_message_trace_cb_) { 264 0 : return rx_message_trace_cb_(to_address, port, msg_size, msg, xmpp_msg); 265 : } 266 1637166 : return false; 267 : } 268 : 269 1626943 : bool XmppChannelMux::TxMessageTrace(const std::string &to_address, 270 : int port, 271 : int msg_size, 272 : const std::string &msg, 273 : const XmppStanza::XmppMessage *xmpp_msg) { 274 1626943 : if (tx_message_trace_cb_) { 275 0 : return tx_message_trace_cb_(to_address, port, msg_size, msg, xmpp_msg); 276 : } 277 1626910 : return false; 278 : }