Line data Source code
1 : /*
2 : * Copyright (c) 2013 Juniper Networks, Inc. All rights reserved.
3 : */
4 :
5 : #include "xmpp/xmpp_state_machine.h"
6 :
7 : #include <atomic>
8 : #include <typeinfo>
9 : #include <boost/bind/bind.hpp>
10 : #include <boost/date_time/posix_time/posix_time.hpp>
11 : #include <boost/statechart/custom_reaction.hpp>
12 : #include <boost/statechart/event.hpp>
13 : #include <boost/statechart/simple_state.hpp>
14 : #include <boost/statechart/state.hpp>
15 : #include <boost/statechart/state_machine.hpp>
16 : #include <boost/statechart/transition.hpp>
17 :
18 : #include "base/logging.h"
19 : #include "base/task_annotations.h"
20 : #include "io/event_manager.h"
21 : #include "io/ssl_session.h"
22 : #include "sandesh/sandesh_trace.h"
23 : #include "sandesh/common/vns_types.h"
24 : #include "sandesh/common/vns_constants.h"
25 : #include "sandesh/xmpp_client_server_sandesh_types.h"
26 : #include "sandesh/xmpp_peer_info_types.h"
27 : #include "sandesh/xmpp_state_machine_sandesh_types.h"
28 : #include "sandesh/xmpp_trace_sandesh_types.h"
29 : #include "xmpp/xmpp_connection.h"
30 : #include "xmpp/xmpp_factory.h"
31 : #include "xmpp/xmpp_log.h"
32 : #include "xmpp/xmpp_server.h"
33 : #include "xmpp/xmpp_session.h"
34 :
35 : using namespace std;
36 : using namespace boost::placeholders;
37 :
38 : namespace mpl = boost::mpl;
39 : namespace sc = boost::statechart;
40 :
41 : #define SM_LOG(_sm, _msg) do { \
42 : XMPP_UTDEBUG(XmppStateMachineDebug, \
43 : (_sm)->connection() ? (_sm)->connection()->ToUVEKey() : "", \
44 : XMPP_PEER_DIR_NA, (_sm)->ChannelType(), _msg); \
45 : } while (false)
46 :
47 : namespace xmsm {
48 : // events
49 :
50 : struct EvStart : sc::event<EvStart> {
51 : static const char *Name() {
52 : return "EvStart";
53 : }
54 : };
55 :
56 : struct EvStop : sc::event<EvStop> {
57 2967 : static const char *Name() {
58 2967 : return "EvStop";
59 : }
60 : };
61 :
62 : struct EvAdminDown : sc::event<EvAdminDown> {
63 3355 : static const char *Name() {
64 3355 : return "EvAdminDown";
65 : }
66 : };
67 :
68 : struct EvConnectTimerExpired : sc::event<EvConnectTimerExpired> {
69 7485 : static const char *Name() {
70 7485 : return "EvConnectTimerExpired";
71 : }
72 : };
73 :
74 : struct EvOpenTimerExpired : sc::event<EvOpenTimerExpired> {
75 1 : static const char *Name() {
76 1 : return "EvOpenTimerExpired";
77 : }
78 : };
79 :
80 : struct EvHoldTimerExpired : sc::event<EvHoldTimerExpired> {
81 13 : static const char *Name() {
82 13 : return "EvHoldTimerExpired";
83 : }
84 : };
85 :
86 : struct EvTcpConnected : sc::event<EvTcpConnected> {
87 6580 : EvTcpConnected(XmppSession *session) : session(session) { };
88 6579 : static const char *Name() {
89 6579 : return "EvTcpConnected";
90 : }
91 : XmppSession *session;
92 : } ;
93 :
94 : struct EvTcpConnectFail : sc::event<EvTcpConnectFail> {
95 900 : EvTcpConnectFail(XmppSession *session) : session(session) { };
96 900 : static const char *Name() {
97 900 : return "EvTcpConnectFail";
98 : }
99 : XmppSession *session;
100 : } ;
101 :
102 : struct EvTcpPassiveOpen : sc::event<EvTcpPassiveOpen> {
103 6518 : EvTcpPassiveOpen(XmppSession *session) : session(session) { };
104 6518 : static const char *Name() {
105 6518 : return "EvTcpPassiveOpen";
106 : }
107 : XmppSession *session;
108 : };
109 :
110 : struct EvTcpClose : sc::event<EvTcpClose> {
111 6605 : EvTcpClose(XmppSession *session) : session(session) { };
112 6377 : static const char *Name() {
113 6377 : return "EvTcpClose";
114 : }
115 : XmppSession *session;
116 : };
117 :
118 : struct EvTcpDeleteSession : sc::event<EvTcpDeleteSession> {
119 12008 : explicit EvTcpDeleteSession(TcpSession *session) : session(session) { }
120 : static const char *Name() {
121 : return "EvTcpDeleteSession";
122 : }
123 : TcpSession *session;
124 : };
125 :
126 : struct EvXmppMessage : sc::event<EvXmppMessage> {
127 4123313 : explicit EvXmppMessage(XmppSession *session,
128 4123313 : const XmppStanza::XmppMessage *msg) : session(session), msg(msg) { }
129 : static const char *Name() {
130 : return "EvXmppMessage";
131 : }
132 : XmppSession *session;
133 : const XmppStanza::XmppMessage *msg;
134 : };
135 :
136 : struct EvXmppOpen : public sc::event<EvXmppOpen> {
137 15590 : EvXmppOpen(XmppSession *session, const XmppStanza::XmppMessage *msg) :
138 15589 : session(session),
139 15590 : msg(static_cast<const XmppStanza::XmppStreamMessage *>(msg)) {
140 15589 : }
141 15589 : static const char *Name() {
142 15589 : return "EvXmppOpen";
143 : }
144 : XmppSession *session;
145 : boost::shared_ptr<const XmppStanza::XmppStreamMessage> msg;
146 : };
147 :
148 : struct EvStreamFeatureRequest : sc::event<EvStreamFeatureRequest> {
149 1469 : EvStreamFeatureRequest(XmppSession *session,
150 1469 : const XmppStanza::XmppMessage *msg) :
151 1469 : session(session),
152 1469 : msg(static_cast<const XmppStanza::XmppStreamMessage *>(msg)) {
153 1469 : }
154 1469 : static const char *Name() {
155 1469 : return "EvStreamFeatureRequest";
156 : }
157 : XmppSession *session;
158 : boost::shared_ptr<const XmppStanza::XmppMessage> msg;
159 : };
160 :
161 : struct EvStartTls : sc::event<EvStartTls> {
162 1464 : EvStartTls(XmppSession *session,
163 1464 : const XmppStanza::XmppMessage *msg) :
164 1464 : session(session),
165 1464 : msg(static_cast<const XmppStanza::XmppStreamMessage *>(msg)) {
166 1464 : }
167 1464 : static const char *Name() {
168 1464 : return "EvStartTls";
169 : }
170 : XmppSession *session;
171 : boost::shared_ptr<const XmppStanza::XmppMessage> msg;
172 : };
173 :
174 : struct EvTlsProceed : sc::event<EvTlsProceed> {
175 1467 : EvTlsProceed(XmppSession *session,
176 1467 : const XmppStanza::XmppMessage *msg) :
177 1467 : session(session),
178 1467 : msg(static_cast<const XmppStanza::XmppStreamMessage *>(msg)) {
179 1467 : }
180 1467 : static const char *Name() {
181 1467 : return "EvTlsProceed";
182 : }
183 : XmppSession *session;
184 : boost::shared_ptr<const XmppStanza::XmppMessage> msg;
185 : };
186 :
187 : struct EvTlsHandShakeSuccess : sc::event<EvTlsHandShakeSuccess> {
188 2924 : explicit EvTlsHandShakeSuccess(XmppSession *session) :
189 2924 : session(session) { }
190 2924 : static const char *Name() {
191 2924 : return "EvTlsHandShakeSuccess";
192 : }
193 : XmppSession *session;
194 : };
195 :
196 : struct EvTlsHandShakeFailure : sc::event<EvTlsHandShakeFailure> {
197 2 : explicit EvTlsHandShakeFailure(XmppSession *session) :
198 2 : session(session) { }
199 2 : static const char *Name() {
200 2 : return "EvTlsHandShakeFailure";
201 : }
202 : XmppSession *session;
203 : };
204 :
205 : struct EvXmppKeepalive : sc::event<EvXmppKeepalive> {
206 2485894 : EvXmppKeepalive(XmppSession *session,
207 2485894 : const XmppStanza::XmppMessage *msg) :
208 2485894 : session(session), msg(msg) {
209 2485894 : }
210 : static const char *Name() {
211 : return "EvXmppKeepalive";
212 : }
213 : XmppSession *session;
214 : boost::shared_ptr<const XmppStanza::XmppMessage> msg;
215 : };
216 :
217 : struct EvXmppMessageStanza : sc::event<EvXmppMessageStanza> {
218 1560058 : EvXmppMessageStanza(XmppSession *session,
219 1560058 : const XmppStanza::XmppMessage *msg) :
220 1560058 : session(session), msg(msg) {
221 1560058 : }
222 : static const char *Name() {
223 : return "EvXmppMessageStanza";
224 : }
225 : XmppSession *session;
226 : boost::shared_ptr<const XmppStanza::XmppMessage> msg;
227 : };
228 :
229 : struct EvXmppIqStanza : sc::event<EvXmppIqStanza> {
230 56891 : EvXmppIqStanza(XmppSession *session,
231 : const XmppStanza::XmppMessage *msg)
232 56891 : : session(session), msg(msg) {
233 56891 : }
234 : static const char *Name() {
235 : return "EvXmppIqStanza";
236 : }
237 : XmppSession *session;
238 : boost::shared_ptr<const XmppStanza::XmppMessage> msg;
239 : };
240 :
241 : struct EvXmppOpenReceive : sc::event<EvXmppOpenReceive> {
242 : EvXmppOpenReceive(XmppSession *session) : session(session) {
243 : }
244 : static const char *Name() {
245 : return "EvXmppOpenReceive";
246 : }
247 : XmppSession *session;
248 : };
249 :
250 : struct Idle : public sc::state<Idle, XmppStateMachine> {
251 : typedef sc::transition<EvStart, Active,
252 : XmppStateMachine, &XmppStateMachine::OnStart> reactions;
253 21777 : Idle(my_context ctx) : my_base(ctx) {
254 21774 : XmppStateMachine *state_machine = &context<XmppStateMachine>();
255 21776 : bool flap = (state_machine->get_state() == ESTABLISHED);
256 21776 : state_machine->set_state(IDLE);
257 21779 : state_machine->SendConnectionInfo("Start", "Active");
258 21776 : if (flap) {
259 9486 : state_machine->connection()->increment_flap_count();
260 : }
261 21778 : }
262 : };
263 :
264 : struct Active : public sc::state<Active, XmppStateMachine> {
265 : typedef mpl::list<
266 : sc::custom_reaction<EvAdminDown>,
267 : sc::custom_reaction<EvConnectTimerExpired>,
268 : sc::custom_reaction<EvOpenTimerExpired>,
269 : sc::custom_reaction<EvTcpPassiveOpen>,
270 : sc::custom_reaction<EvTcpClose>,
271 : sc::custom_reaction<EvXmppOpen>,
272 : sc::custom_reaction<EvStop>
273 : > reactions;
274 :
275 15712 : Active(my_context ctx) : my_base(ctx) {
276 15711 : XmppStateMachine *state_machine = &context<XmppStateMachine>();
277 15710 : state_machine->keepalive_count_clear();
278 15710 : bool flap = (state_machine->get_state() == ESTABLISHED);
279 15710 : state_machine->set_state(ACTIVE);
280 15711 : if (flap) {
281 2577 : state_machine->connection()->increment_flap_count();
282 : }
283 15711 : if (state_machine->IsActiveChannel() ) {
284 8917 : if (state_machine->get_connect_attempts() >=
285 : XmppStateMachine::kMaxAttempts) {
286 220 : XmppConnection *connection = state_machine->connection();
287 220 : if (connection) {
288 220 : state_machine->SendConnectionInfo(
289 : "Connect failed after retries");
290 : // Notify clients if any action to be taken
291 220 : connection->ChannelMux()->HandleStateEvent(xmsm::ACTIVE);
292 : }
293 : }
294 8917 : state_machine->StartConnectTimer(state_machine->GetConnectTime());
295 : }
296 15709 : }
297 31421 : ~Active() {
298 15711 : XmppStateMachine *state_machine = &context<XmppStateMachine>();
299 15711 : state_machine->CancelConnectTimer();
300 15712 : state_machine->CancelOpenTimer();
301 31422 : }
302 :
303 : //event on client only
304 7484 : sc::result react(const EvConnectTimerExpired &event) {
305 7484 : XmppStateMachine *state_machine = &context<XmppStateMachine>();
306 7484 : if (state_machine->ConnectTimerCancelled()) {
307 0 : SM_LOG(state_machine, "Discard EvConnectTimerExpired in (Active) State");
308 0 : return discard_event();
309 : } else {
310 7484 : state_machine->SendConnectionInfo(event.Name(), "Connect");
311 7484 : SM_LOG(state_machine, "EvConnectTimerExpired in (Active) State");
312 7484 : return transit<Connect>();
313 : }
314 : }
315 :
316 : // event on server only
317 6518 : sc::result react(const EvTcpPassiveOpen &event) {
318 6518 : XmppStateMachine *state_machine = &context<XmppStateMachine>();
319 6518 : event.session->AsyncReadStart();
320 6517 : assert(state_machine->session() == event.session);
321 6517 : event.session->set_observer(
322 : boost::bind(&XmppStateMachine::OnSessionEvent,
323 : state_machine, _1, _2));
324 6518 : state_machine->StartOpenTimer(XmppStateMachine::kOpenTime);
325 6518 : XmppConnectionInfo info;
326 6518 : info.set_local_port(event.session->local_port());
327 6518 : info.set_remote_port(event.session->remote_port());
328 6518 : state_machine->SendConnectionInfo(&info, event.Name());
329 13036 : return discard_event();
330 6518 : }
331 :
332 : // event on server only
333 177 : sc::result react(const EvTcpClose &event) {
334 177 : XmppStateMachine *state_machine = &context<XmppStateMachine>();
335 177 : if (event.session != state_machine->session()) {
336 0 : return discard_event();
337 : }
338 177 : state_machine->CancelOpenTimer();
339 177 : state_machine->ResetSession();
340 177 : state_machine->SendConnectionInfo(event.Name(), "Idle");
341 177 : return transit<Idle>();
342 : }
343 :
344 : //event on server only
345 1 : sc::result react(const EvOpenTimerExpired &event) {
346 1 : XmppStateMachine *state_machine = &context<XmppStateMachine>();
347 1 : if (state_machine->OpenTimerCancelled()) {
348 0 : SM_LOG(state_machine,
349 : "Discard EvOpenTimerExpired in (Active) State");
350 0 : return discard_event();
351 : }
352 :
353 : // At this point session on connection is not set, hence SendClose
354 : // using session on the state_machine.
355 1 : XmppSession *session = state_machine->session();
356 1 : XmppConnection *connection = state_machine->connection();
357 1 : connection->SendClose(session);
358 1 : state_machine->ResetSession();
359 1 : state_machine->SendConnectionInfo(event.Name(), "Idle");
360 1 : return transit<Idle>();
361 : }
362 :
363 : //event on server only
364 6360 : sc::result react(const EvXmppOpen &event) {
365 6360 : XmppStateMachine *state_machine = &context<XmppStateMachine>();
366 6360 : if (event.session != state_machine->session()) {
367 0 : return discard_event();
368 : }
369 6360 : state_machine->AssignSession();
370 :
371 6360 : XmppConnection *connection = state_machine->connection();
372 6359 : if (connection->IsDeleted()) {
373 0 : state_machine->ResetSession();
374 0 : return discard_event();
375 : }
376 :
377 6359 : XmppSession *session = state_machine->session();
378 6359 : if (!state_machine->IsAuthEnabled())
379 4847 : state_machine->ResurrectOldConnection(connection, session);
380 :
381 : // Get the possibly updated XmppConnection information.
382 6360 : connection = state_machine->connection();
383 6360 : state_machine->CancelOpenTimer();
384 6360 : if (!connection->SendOpenConfirm(session)) {
385 0 : connection->SendClose(session);
386 0 : state_machine->ResetSession();
387 0 : state_machine->SendConnectionInfo("Send Open Confirm Failed",
388 : "Idle");
389 0 : return transit<Idle>();
390 : } else {
391 6360 : XmppConnectionInfo info;
392 6360 : info.set_identifier(event.msg->from);
393 6360 : if (state_machine->IsAuthEnabled()) {
394 1512 : state_machine->SendConnectionInfo(&info, event.Name(),
395 : "Open Confirm");
396 1512 : return transit<OpenConfirm>();
397 : } else {
398 4848 : connection->StartKeepAliveTimer();
399 4848 : state_machine->SendConnectionInfo(&info, event.Name(),
400 : "Established");
401 4848 : return transit<XmppStreamEstablished>();
402 : }
403 6360 : }
404 : }
405 :
406 66 : sc::result react(const EvStop &event) {
407 66 : XmppStateMachine *state_machine = &context<XmppStateMachine>();
408 66 : XmppConnection *connection = state_machine->connection();
409 66 : state_machine->CancelOpenTimer();
410 66 : state_machine->CancelConnectTimer();
411 66 : connection->StopKeepAliveTimer();
412 :
413 66 : if (state_machine->IsActiveChannel() ) {
414 1 : state_machine->set_session(NULL);
415 1 : state_machine->SendConnectionInfo(event.Name(), "Active");
416 1 : return transit<Active>();
417 : } else {
418 65 : state_machine->ResetSession();
419 65 : state_machine->SendConnectionInfo(event.Name(), "Idle");
420 65 : return transit<Idle>();
421 : }
422 : }
423 :
424 98 : sc::result react(const EvAdminDown &event) {
425 98 : XmppStateMachine *state_machine = &context<XmppStateMachine>();
426 98 : XmppConnectionInfo info;
427 98 : info.set_close_reason("Administratively down");
428 98 : state_machine->connection()->set_close_reason("Administratively down");
429 98 : state_machine->SendConnectionInfo(&info, event.Name(), "Idle");
430 196 : return transit<Idle>();
431 98 : }
432 :
433 : };
434 :
435 : //State valid only for client side, connection in Active state
436 : struct Connect : public sc::state<Connect, XmppStateMachine> {
437 : typedef mpl::list<
438 : sc::custom_reaction<EvAdminDown>,
439 : sc::custom_reaction<EvConnectTimerExpired>,
440 : sc::custom_reaction<EvTcpConnected>,
441 : sc::custom_reaction<EvTcpConnectFail>,
442 : sc::custom_reaction<EvTcpClose>,
443 : sc::custom_reaction<EvStop>
444 : > reactions;
445 :
446 : static const int kConnectTimeout = 60; // seconds
447 :
448 7484 : Connect(my_context ctx) : my_base(ctx) {
449 7484 : XmppStateMachine *state_machine = &context<XmppStateMachine>();
450 7484 : StartSession(state_machine);
451 7484 : state_machine->connect_attempts_inc();
452 7484 : state_machine->set_state(CONNECT);
453 7484 : state_machine->StartConnectTimer(state_machine->GetConnectTime());
454 7484 : XmppSession *session = state_machine->session();
455 7484 : if (session != NULL) {
456 7484 : XmppConnectionInfo info;
457 7484 : info.set_local_port(session->local_port());
458 7484 : info.set_remote_port(session->remote_port());
459 7484 : state_machine->SendConnectionInfo(&info, "Connect Event");
460 7484 : }
461 7484 : }
462 14968 : ~Connect() {
463 7484 : XmppStateMachine *state_machine = &context<XmppStateMachine>();
464 7484 : state_machine->CancelConnectTimer();
465 14968 : }
466 :
467 1 : sc::result react(const EvConnectTimerExpired &event) {
468 1 : XmppStateMachine *state_machine = &context<XmppStateMachine>();
469 1 : if (state_machine->ConnectTimerCancelled()) {
470 0 : SM_LOG(state_machine,
471 : "Discard EvConnectTimerExpired in (Connect) State");
472 0 : return discard_event();
473 : }
474 1 : CloseSession(state_machine);
475 1 : XmppConnectionInfo info;
476 1 : info.set_close_reason("Connect timer expired");
477 1 : state_machine->connection()->set_close_reason("Connect timer expired");
478 1 : state_machine->SendConnectionInfo(&info, event.Name(), "Active");
479 1 : return transit<Active>();
480 1 : }
481 :
482 6579 : sc::result react(const EvTcpConnected &event) {
483 6579 : XmppStateMachine *state_machine = &context<XmppStateMachine>();
484 6579 : if (event.session != state_machine->session()) {
485 0 : return discard_event();
486 : }
487 6579 : XmppConnection *connection = state_machine->connection();
488 6579 : state_machine->CancelConnectTimer();
489 6579 : XmppSession *session = state_machine->session();
490 6579 : XmppConnectionInfo info;
491 6579 : info.set_local_port(session->local_port());
492 6579 : info.set_remote_port(session->remote_port());
493 6579 : if (connection->SendOpen(session)) {
494 6579 : state_machine->StartHoldTimer();
495 6579 : state_machine->SendConnectionInfo(&info, event.Name(), "OpenSent");
496 6579 : return transit<OpenSent>();
497 : } else {
498 0 : SM_LOG(state_machine, "SendOpen failed in (Connect) State");
499 0 : CloseSession(state_machine);
500 0 : info.set_close_reason("SendOpen failed");
501 0 : state_machine->connection()->set_close_reason("Send Open failed");
502 0 : state_machine->SendConnectionInfo(&info, "Send Open failed",
503 : "Active");
504 0 : return transit<Active>();
505 : }
506 6579 : }
507 :
508 900 : sc::result react(const EvTcpConnectFail &event) {
509 : // delete session; restart connect timer.
510 900 : XmppStateMachine *state_machine = &context<XmppStateMachine>();
511 900 : if (event.session != state_machine->session()) {
512 0 : return discard_event();
513 : }
514 900 : state_machine->set_session(NULL);
515 900 : state_machine->CancelConnectTimer();
516 900 : state_machine->SendConnectionInfo(event.Name(), "Active");
517 900 : return transit<Active>();
518 : }
519 :
520 1 : sc::result react(const EvTcpClose &event) {
521 1 : XmppStateMachine *state_machine = &context<XmppStateMachine>();
522 1 : if (event.session != state_machine->session()) {
523 0 : return discard_event();
524 : }
525 : // close the tcp sessions.
526 1 : CloseSession(state_machine);
527 1 : state_machine->CancelConnectTimer();
528 1 : state_machine->SendConnectionInfo(event.Name(), "Active");
529 1 : return transit<Active>();
530 : }
531 :
532 1 : sc::result react(const EvStop &event) {
533 1 : XmppStateMachine *state_machine = &context<XmppStateMachine>();
534 1 : CloseSession(state_machine);
535 1 : state_machine->CancelConnectTimer();
536 1 : XmppConnectionInfo info;
537 1 : info.set_close_reason("EvStop received");
538 1 : state_machine->connection()->set_close_reason("EvStop received");
539 1 : state_machine->SendConnectionInfo(&info, event.Name(), "Active");
540 2 : return transit<Active>();
541 1 : }
542 :
543 0 : sc::result react(const EvAdminDown &event) {
544 0 : XmppStateMachine *state_machine = &context<XmppStateMachine>();
545 0 : CloseSession(state_machine);
546 0 : XmppConnectionInfo info;
547 0 : info.set_close_reason("Administratively down");
548 0 : state_machine->connection()->set_close_reason("Administratively down");
549 0 : state_machine->SendConnectionInfo(&info, event.Name(), "Idle");
550 0 : return transit<Idle>();
551 0 : }
552 :
553 : // Create an active connection request.
554 7484 : void StartSession(XmppStateMachine *state_machine) {
555 7484 : XmppConnection *connection = state_machine->connection();
556 7484 : XmppSession *session = connection->CreateSession();
557 7484 : state_machine->set_session(session);
558 7484 : session->set_observer(boost::bind(&XmppStateMachine::OnSessionEvent,
559 : state_machine, _1, _2));
560 7484 : boost::system::error_code err;
561 7484 : session->socket()->bind(connection->local_endpoint(), err);
562 7484 : if (err) {
563 0 : LOG(WARN, "Bind failure for local address " <<
564 : connection->local_endpoint() << " : " << err.message());
565 0 : assert(false);
566 : }
567 7484 : connection->server()->Connect(session, connection->endpoint());
568 7484 : }
569 :
570 3 : void CloseSession(XmppStateMachine *state_machine) {
571 3 : state_machine->set_session(NULL);
572 3 : }
573 : };
574 :
575 : // The client reaches OpenSent after sending an immediate OPEN on a active
576 : // connection. Server should not come in this state.
577 : struct OpenSent : public sc::state<OpenSent, XmppStateMachine> {
578 : typedef mpl::list<
579 : sc::custom_reaction<EvAdminDown>,
580 : sc::custom_reaction<EvTcpClose>,
581 : sc::custom_reaction<EvXmppOpen>,
582 : sc::custom_reaction<EvHoldTimerExpired>,
583 : sc::custom_reaction<EvStop>
584 : > reactions;
585 :
586 6579 : OpenSent(my_context ctx) : my_base(ctx) {
587 6579 : XmppStateMachine *state_machine = &context<XmppStateMachine>();
588 6579 : state_machine->set_state(OPENSENT);
589 6579 : }
590 13158 : ~OpenSent() {
591 6579 : XmppStateMachine *state_machine = &context<XmppStateMachine>();
592 6579 : state_machine->CancelHoldTimer();
593 13158 : }
594 :
595 174 : sc::result react(const EvTcpClose &event) {
596 174 : XmppStateMachine *state_machine = &context<XmppStateMachine>();
597 174 : if (event.session != state_machine->session()) {
598 0 : return discard_event();
599 : }
600 174 : state_machine->CancelHoldTimer();
601 174 : if (event.session) {
602 174 : state_machine->set_session(NULL);
603 174 : state_machine->SendConnectionInfo(event.Name(), "Active");
604 174 : return transit<Active>();
605 : }
606 0 : return discard_event();
607 : }
608 :
609 6310 : sc::result react(const EvXmppOpen &event) {
610 6310 : XmppStateMachine *state_machine = &context<XmppStateMachine>();
611 6310 : XmppConnection *connection = state_machine->connection();
612 6310 : if (event.session == state_machine->session()) {
613 6310 : state_machine->AssignSession();
614 6310 : XmppConnectionInfo info;
615 6310 : info.set_identifier(event.msg->from);
616 6310 : if (state_machine->IsAuthEnabled()) {
617 1472 : state_machine->SendConnectionInfo(&info, event.Name(),
618 : "Open Confirm");
619 1472 : return transit<OpenConfirm>();
620 : } else {
621 4838 : connection->SendKeepAlive();
622 4838 : connection->StartKeepAliveTimer();
623 4838 : state_machine->StartHoldTimer();
624 4838 : state_machine->SendConnectionInfo(&info, event.Name(),
625 : "Established");
626 4838 : return transit<XmppStreamEstablished>();
627 : }
628 6310 : }
629 0 : return discard_event();
630 : }
631 :
632 2 : sc::result react(const EvHoldTimerExpired &event) {
633 2 : XmppStateMachine *state_machine = &context<XmppStateMachine>();
634 2 : if (state_machine->HoldTimerCancelled()) {
635 0 : SM_LOG(state_machine,
636 : "Discard EvHoldTimerExpired in (OpenSent) State");
637 0 : return discard_event();
638 : }
639 2 : CloseSession(state_machine);
640 2 : XmppConnectionInfo info;
641 2 : info.set_close_reason("Hold timer expired");
642 2 : state_machine->connection()->set_close_reason("Hold timer expired");
643 2 : state_machine->SendConnectionInfo(&info, event.Name(), "Active");
644 2 : return transit<Active>();
645 2 : }
646 :
647 2 : sc::result react(const EvStop &event) {
648 2 : XmppStateMachine *state_machine = &context<XmppStateMachine>();
649 2 : state_machine->CancelHoldTimer();
650 2 : CloseSession(state_machine);
651 2 : XmppConnectionInfo info;
652 2 : info.set_close_reason("EvStop received");
653 2 : state_machine->connection()->set_close_reason("EvStop received");
654 2 : state_machine->SendConnectionInfo(&info, event.Name(), "Active");
655 4 : return transit<Active>();
656 2 : }
657 :
658 72 : sc::result react(const EvAdminDown &event) {
659 72 : XmppStateMachine *state_machine = &context<XmppStateMachine>();
660 72 : CloseSession(state_machine);
661 72 : XmppConnectionInfo info;
662 72 : info.set_close_reason("Administratively down");
663 72 : state_machine->connection()->set_close_reason("Administratively down");
664 72 : state_machine->SendConnectionInfo(&info, event.Name(), "Idle");
665 144 : return transit<Idle>();
666 72 : }
667 :
668 76 : void CloseSession(XmppStateMachine *state_machine) {
669 76 : state_machine->set_session(NULL);
670 76 : }
671 : };
672 :
673 : struct OpenConfirm : public sc::state<OpenConfirm, XmppStateMachine> {
674 : typedef mpl::list<
675 : sc::custom_reaction<EvAdminDown>,
676 : sc::custom_reaction<EvTcpClose>,
677 : sc::custom_reaction<EvHoldTimerExpired>,
678 : sc::custom_reaction<EvStreamFeatureRequest>, //received by client
679 : sc::custom_reaction<EvStartTls>, //received by server
680 : sc::custom_reaction<EvTlsProceed>, //received by client
681 : sc::custom_reaction<EvTlsHandShakeSuccess>,
682 : sc::custom_reaction<EvTlsHandShakeFailure>,
683 : sc::custom_reaction<EvXmppOpen>, //received by server
684 : sc::custom_reaction<EvStop>
685 : > reactions;
686 :
687 2984 : OpenConfirm(my_context ctx) : my_base(ctx) {
688 2984 : XmppStateMachine *state_machine = &context<XmppStateMachine>();
689 2984 : state_machine->StartHoldTimer();
690 2984 : XmppConnectionInfo info;
691 2984 : if (!state_machine->IsActiveChannel()) { //server
692 1512 : if (state_machine->IsAuthEnabled()) {
693 1512 : XmppConnection *connection = state_machine->connection();
694 1512 : XmppSession *session = state_machine->session();
695 1512 : if (!connection->SendStreamFeatureRequest(session)) {
696 0 : connection->SendClose(session);
697 0 : SM_LOG(state_machine,
698 : "Xmpp Send Stream Feature Request Failed, IDLE");
699 0 : state_machine->ResetSession();
700 0 : info.set_close_reason("Send Stream Feature Request Failed");
701 0 : state_machine->connection()->set_close_reason(
702 : "Send Stream Feature Request Failed");
703 0 : state_machine->SendConnectionInfo(&info,
704 : "Send Stream Feature Request failed", "Idle");
705 : // cannot transition state as this is the constructor
706 : // of new state
707 : }
708 : }
709 : }
710 2984 : state_machine->set_state(OPENCONFIRM);
711 2984 : state_machine->set_openconfirm_state(OPENCONFIRM_INIT);
712 2984 : }
713 :
714 46 : sc::result react(const EvTcpClose &event) {
715 46 : XmppStateMachine *state_machine = &context<XmppStateMachine>();
716 46 : state_machine->CancelHoldTimer();
717 46 : if (state_machine->IsActiveChannel() ) {
718 1 : CloseSession(state_machine);
719 1 : state_machine->SendConnectionInfo(event.Name(), "Active");
720 1 : return transit<Active>();
721 : } else {
722 45 : state_machine->ResetSession();
723 45 : state_machine->SendConnectionInfo(event.Name(), "Idle");
724 45 : return transit<Idle>();
725 : }
726 : }
727 :
728 6 : sc::result react(const EvHoldTimerExpired &event) {
729 6 : XmppStateMachine *state_machine = &context<XmppStateMachine>();
730 6 : if (state_machine->HoldTimerCancelled()) {
731 0 : SM_LOG(state_machine,
732 : "Discard EvHoldTimerExpired in (OpenConfirm) State");
733 0 : return discard_event();
734 : }
735 6 : XmppConnectionInfo info;
736 6 : info.set_close_reason("Hold timer expired");
737 6 : state_machine->connection()->set_close_reason("Hold timer expired");
738 6 : if (state_machine->IsActiveChannel() ) {
739 4 : CloseSession(state_machine);
740 4 : state_machine->SendConnectionInfo(&info, event.Name(), "Active");
741 4 : return transit<Active>();
742 : } else {
743 2 : state_machine->ResetSession();
744 2 : state_machine->SendConnectionInfo(&info, event.Name(), "Idle");
745 2 : return transit<Idle>();
746 : }
747 6 : }
748 :
749 : // received by the client
750 1469 : sc::result react(const EvStreamFeatureRequest &event) {
751 1469 : XmppStateMachine *state_machine = &context<XmppStateMachine>();
752 1469 : XmppConnection *connection = state_machine->connection();
753 1469 : XmppSession *session = state_machine->session();
754 : // TODO, we need to have a supported stream feature list
755 : // and compare against the requested stream feature list
756 : // which will enable us to send start of various features
757 1469 : if (!connection->SendStartTls(session)) {
758 0 : connection->SendClose(session);
759 0 : state_machine->ResetSession();
760 0 : XmppConnectionInfo info;
761 0 : info.set_close_reason("Send Start Tls Failed");
762 0 : state_machine->SendConnectionInfo(&info, event.Name(), "Active");
763 0 : return transit<Active>();
764 0 : } else {
765 1469 : state_machine->StartHoldTimer();
766 1469 : state_machine->SendConnectionInfo(event.Name(),
767 : "Sent Start Tls, OpenConfirm Feature Negotiation");
768 1469 : state_machine->set_openconfirm_state(
769 : OPENCONFIRM_FEATURE_NEGOTIATION);
770 1469 : return discard_event();
771 : }
772 : }
773 :
774 : // received by client
775 1467 : sc::result react(const EvTlsProceed &event) {
776 1467 : XmppStateMachine *state_machine = &context<XmppStateMachine>();
777 1467 : XmppConnection *connection = state_machine->connection();
778 1467 : XmppSession *session = state_machine->session();
779 1467 : state_machine->StartHoldTimer();
780 1467 : XmppConnectionInfo info;
781 1467 : info.set_identifier(connection->GetTo());
782 : // Trigger Ssl Handshake on client side
783 1467 : session->TriggerSslHandShake(state_machine->HandShakeCbHandler());
784 1467 : state_machine->SendConnectionInfo( &info, event.Name(),
785 : "Trigger Client Ssl Handshake");
786 2934 : return discard_event();
787 1467 : }
788 :
789 : //received by server
790 1464 : sc::result react(const EvStartTls &event) {
791 1464 : XmppStateMachine *state_machine = &context<XmppStateMachine>();
792 1464 : if (event.session != state_machine->session()) {
793 0 : return discard_event();
794 : }
795 1464 : XmppConnection *connection = state_machine->connection();
796 1464 : XmppSession *session = state_machine->session();
797 1464 : XmppConnectionInfo info;
798 1464 : info.set_identifier(connection->GetTo());
799 1464 : if (!connection->SendProceedTls(session)) {
800 0 : connection->SendClose(session);
801 0 : state_machine->ResetSession();
802 0 : info.set_close_reason("Send Proceed Tls Failed");
803 0 : state_machine->SendConnectionInfo(&info, event.Name(), "Idle");
804 0 : return transit<Idle>();
805 : } else {
806 1464 : state_machine->StartHoldTimer();
807 : // Trigger Ssl Handshake on server side
808 1464 : session->TriggerSslHandShake(state_machine->HandShakeCbHandler());
809 1464 : state_machine->SendConnectionInfo(&info, event.Name(),
810 : "Trigger Server Ssl Handshake");
811 1464 : return discard_event();
812 : }
813 1464 : }
814 :
815 1 : sc::result react(const EvStop &event) {
816 1 : XmppStateMachine *state_machine = &context<XmppStateMachine>();
817 1 : state_machine->CancelHoldTimer();
818 1 : if (state_machine->IsActiveChannel() ) {
819 1 : CloseSession(state_machine);
820 1 : state_machine->SendConnectionInfo(event.Name(), "Active");
821 1 : return transit<Active>();
822 : } else {
823 0 : state_machine->ResetSession();
824 0 : state_machine->SendConnectionInfo(event.Name(), "Idle");
825 0 : return transit<Idle>();
826 : }
827 : }
828 :
829 2923 : sc::result react(const EvTlsHandShakeSuccess &event) {
830 2923 : XmppStateMachine *state_machine = &context<XmppStateMachine>();
831 2923 : if (event.session != state_machine->session()) {
832 0 : return discard_event();
833 : }
834 2923 : XmppConnection *connection = state_machine->connection();
835 2923 : XmppSession *session = state_machine->session();
836 2923 : session->AsyncReadStart();
837 2924 : state_machine->set_openconfirm_state(OPENCONFIRM_FEATURE_SUCCESS);
838 2924 : if (state_machine->IsActiveChannel()) { //client
839 1462 : if (!connection->SendOpen(session)) {
840 0 : connection->SendClose(session);
841 0 : state_machine->ResetSession();
842 0 : XmppConnectionInfo info;
843 0 : info.set_close_reason("Open send failed in OpenConfirm State");
844 0 : state_machine->SendConnectionInfo(event.Name(), "Active");
845 0 : return transit<Active>();
846 0 : }
847 : }
848 : // both server and client
849 2924 : state_machine->StartHoldTimer();
850 2924 : state_machine->SendConnectionInfo(event.Name(),
851 : "OpenConfirm Feature Negotiation Success");
852 2924 : return discard_event();
853 : }
854 :
855 2 : sc::result react(const EvTlsHandShakeFailure &event) {
856 2 : XmppStateMachine *state_machine = &context<XmppStateMachine>();
857 2 : if (event.session != state_machine->session()) {
858 0 : return discard_event();
859 : }
860 : // Do not send stream close as error occured at TLS layer
861 2 : state_machine->ResetSession();
862 2 : if (state_machine->IsActiveChannel()) { // client
863 1 : state_machine->SendConnectionInfo(event.Name(), "Active");
864 1 : return transit<Active>();
865 : } else {
866 1 : state_machine->SendConnectionInfo(event.Name(), "Idle");
867 1 : return transit<Idle>();
868 : }
869 : }
870 :
871 : //event on server and client
872 2919 : sc::result react(const EvXmppOpen &event) {
873 2919 : XmppStateMachine *state_machine = &context<XmppStateMachine>();
874 2919 : if (event.session != state_machine->session()) {
875 0 : return discard_event();
876 : }
877 2919 : XmppConnectionInfo info;
878 2919 : info.set_identifier(event.msg->from);
879 2919 : XmppConnection *connection = state_machine->connection();
880 2919 : XmppSession *session = state_machine->session();
881 2919 : if (connection->IsActiveChannel()) { //client
882 1460 : connection->SendKeepAlive();
883 1460 : connection->StartKeepAliveTimer();
884 1460 : state_machine->StartHoldTimer();
885 1460 : state_machine->SendConnectionInfo(&info, event.Name(),
886 : "Established");
887 1460 : return transit<XmppStreamEstablished>();
888 : } else { //server
889 1459 : if (!connection->SendOpenConfirm(session)) {
890 0 : connection->SendClose(session);
891 0 : SM_LOG(state_machine,
892 : "Xmpp Send Open Confirm Failed, IDLE");
893 0 : state_machine->ResetSession();
894 0 : info.set_close_reason("Send Open Confirm Failed");
895 0 : state_machine->connection()->set_close_reason(
896 : "Send Open Confirm Failed");
897 0 : state_machine->SendConnectionInfo(&info, event.Name(), "Idle");
898 0 : return transit<Idle>();
899 : } else {
900 1459 : if (state_machine->IsAuthEnabled())
901 1459 : state_machine->ResurrectOldConnection(connection, session);
902 1459 : connection = state_machine->connection();
903 1459 : connection->StartKeepAliveTimer();
904 1459 : state_machine->StartHoldTimer();
905 1459 : state_machine->SendConnectionInfo(&info, event.Name(),
906 : "Established");
907 1459 : return transit<XmppStreamEstablished>();
908 : }
909 : }
910 2919 : }
911 :
912 0 : sc::result react(const EvAdminDown &event) {
913 0 : XmppStateMachine *state_machine = &context<XmppStateMachine>();
914 0 : CloseSession(state_machine);
915 0 : XmppConnectionInfo info;
916 0 : info.set_close_reason("Administratively down");
917 0 : state_machine->connection()->set_close_reason("Administratively down");
918 0 : state_machine->SendConnectionInfo(&info, event.Name(), "Idle");
919 0 : return transit<Idle>();
920 0 : }
921 :
922 6 : void CloseSession(XmppStateMachine *state_machine) {
923 6 : XmppConnection *connection = state_machine->connection();
924 6 : if (connection != NULL) connection->StopKeepAliveTimer();
925 6 : state_machine->set_session(NULL);
926 6 : }
927 : };
928 :
929 : struct XmppStreamEstablished :
930 : public sc::state<XmppStreamEstablished, XmppStateMachine> {
931 : typedef mpl::list<
932 : sc::custom_reaction<EvAdminDown>,
933 : sc::custom_reaction<EvTcpClose>,
934 : sc::custom_reaction<EvXmppKeepalive>,
935 : sc::custom_reaction<EvXmppMessageStanza>,
936 : sc::custom_reaction<EvXmppIqStanza>,
937 : sc::custom_reaction<EvHoldTimerExpired>,
938 : sc::custom_reaction<EvStop>
939 : > reactions;
940 :
941 12605 : XmppStreamEstablished(my_context ctx) : my_base(ctx) {
942 12604 : XmppStateMachine *state_machine = &context<XmppStateMachine>();
943 12603 : XmppConnection *connection = state_machine->connection();
944 12603 : state_machine->StartHoldTimer();
945 12605 : state_machine->set_state(ESTABLISHED);
946 12605 : connection->ChannelMux()->HandleStateEvent(xmsm::ESTABLISHED);
947 12604 : }
948 25206 : ~XmppStreamEstablished() {
949 12603 : XmppStateMachine *state_machine = &context<XmppStateMachine>();
950 12604 : state_machine->CancelHoldTimer();
951 25207 : }
952 :
953 6008 : sc::result react(const EvTcpClose &event) {
954 6008 : XmppStateMachine *state_machine = &context<XmppStateMachine>();
955 6008 : if (event.session != state_machine->session()) {
956 29 : return discard_event();
957 : }
958 5979 : state_machine->ResetSession();
959 5979 : if (state_machine->IsActiveChannel()) {
960 2573 : state_machine->SendConnectionInfo(event.Name(), "Active");
961 2573 : return transit<Active>();
962 : } else {
963 3406 : state_machine->SendConnectionInfo(event.Name(), "Idle");
964 3406 : return transit<Idle>();
965 : }
966 : }
967 :
968 2481526 : sc::result react(const EvXmppKeepalive &event) {
969 2481526 : XmppStateMachine *state_machine = &context<XmppStateMachine>();
970 2481526 : if (event.session != state_machine->session()) {
971 0 : return discard_event();
972 : }
973 2481526 : state_machine->keepalive_count_inc();
974 2481526 : if (state_machine->get_keepalive_count() == 3) {
975 3023 : state_machine->connect_attempts_clear();
976 : }
977 2481526 : state_machine->StartHoldTimer();
978 2481526 : return discard_event();
979 : }
980 :
981 1560058 : sc::result react(const EvXmppMessageStanza &event) {
982 1560058 : XmppStateMachine *state_machine = &context<XmppStateMachine>();
983 1560058 : if (event.session != state_machine->session()) {
984 0 : return discard_event();
985 : }
986 1560058 : state_machine->StartHoldTimer();
987 3120116 : state_machine->connection()->ProcessXmppChatMessage(
988 1560058 : static_cast<const XmppStanza::XmppChatMessage *>(event.msg.get()));
989 1560058 : return discard_event();
990 : }
991 :
992 56889 : sc::result react(const EvXmppIqStanza &event) {
993 56889 : XmppStateMachine *state_machine = &context<XmppStateMachine>();
994 56891 : if (event.session != state_machine->session()) {
995 0 : return discard_event();
996 : }
997 56891 : state_machine->StartHoldTimer();
998 113782 : state_machine->connection()->ProcessXmppIqMessage(
999 56891 : static_cast<const XmppStanza::XmppMessage *>(event.msg.get()));
1000 56891 : return discard_event();
1001 : }
1002 :
1003 5 : sc::result react(const EvHoldTimerExpired &event) {
1004 5 : XmppStateMachine *state_machine = &context<XmppStateMachine>();
1005 5 : if (state_machine->HoldTimerCancelled()) {
1006 0 : SM_LOG(state_machine,
1007 : "Discard EvHoldTimerExpired in (Established) State");
1008 0 : return discard_event();
1009 : }
1010 5 : XMPP_NOTICE(XmppStateMachineNotice,
1011 : state_machine->connection()->ToUVEKey(),
1012 : XMPP_PEER_DIR_NA,
1013 : state_machine->ChannelType(),
1014 : "EvHoldTimerExpired in (Established) State. Transit to IDLE");
1015 5 : state_machine->SendConnectionInfo(event.Name());
1016 5 : state_machine->AssertOnHoldTimeout();
1017 5 : state_machine->ResetSession();
1018 5 : if (state_machine->IsActiveChannel()) {
1019 4 : return transit<Active>();
1020 : } else {
1021 1 : return transit<Idle>();
1022 : }
1023 : }
1024 :
1025 2894 : sc::result react(const EvStop &event) {
1026 2894 : XmppStateMachine *state_machine = &context<XmppStateMachine>();
1027 2897 : state_machine->SendConnectionInfo(event.Name());
1028 2895 : state_machine->ResetSession();
1029 2897 : if (state_machine->IsActiveChannel()) {
1030 0 : return transit<Active>();
1031 : } else {
1032 2897 : return transit<Idle>();
1033 : }
1034 : }
1035 :
1036 3185 : sc::result react(const EvAdminDown &event) {
1037 3185 : XmppStateMachine *state_machine = &context<XmppStateMachine>();
1038 3185 : state_machine->ResetSession();
1039 3185 : XmppConnectionInfo info;
1040 3185 : info.set_close_reason("Administratively down");
1041 3185 : state_machine->connection()->set_close_reason("Administratively down");
1042 3185 : state_machine->SendConnectionInfo(&info, event.Name(), "Idle");
1043 6370 : return transit<Idle>();
1044 3185 : }
1045 : };
1046 :
1047 : } // namespace xmsm
1048 :
1049 5 : void XmppStateMachine::AssertOnHoldTimeout() {
1050 : static bool init_ = false;
1051 : static bool assert_ = false;
1052 :
1053 5 : if (!init_) {
1054 3 : char *str = getenv("XMPP_ASSERT_ON_HOLD_TIMEOUT");
1055 3 : if (str && strtoul(str, NULL, 0) != 0) assert_ = true;
1056 3 : init_ = true;
1057 : }
1058 :
1059 5 : if (!assert_) return;
1060 :
1061 0 : if (connection()) {
1062 0 : connection()->LogMsg("HOLD TIMER EXPIRED: ");
1063 : } else {
1064 0 : LOG4CPLUS_DEBUG(log4cplus::Logger::getRoot(), "HOLD TIMER EXPIRED: ");
1065 : }
1066 :
1067 0 : assert(!assert_);
1068 : }
1069 :
1070 12356 : void XmppStateMachine::ResetSession() {
1071 12356 : XmppConnection *connection = this->connection();
1072 12355 : set_session(NULL);
1073 12356 : CancelHoldTimer();
1074 :
1075 12358 : if (!connection)
1076 0 : return;
1077 :
1078 : // Stop keepalives, transition to IDLE and notify registered entities.
1079 12358 : connection->StopKeepAliveTimer();
1080 12356 : connection->ChannelMux()->HandleStateEvent(xmsm::IDLE);
1081 12358 : if (IsActiveChannel())
1082 5763 : return;
1083 :
1084 : // Retain the connection if graceful restart is supported.
1085 6595 : XmppServer *server = dynamic_cast<XmppServer *>(connection->server());
1086 6595 : if (!server->IsPeerCloseGraceful())
1087 6389 : connection->ManagedDelete();
1088 : }
1089 :
1090 11599 : XmppStateMachine::XmppStateMachine(XmppConnection *connection, bool active,
1091 11599 : bool auth_enabled, int config_hold_time)
1092 11599 : : work_queue_(TaskScheduler::GetInstance()->GetTaskId("xmpp::StateMachine"),
1093 : connection->GetTaskInstance(),
1094 : boost::bind(&XmppStateMachine::DequeueEvent, this, _1)),
1095 11599 : connection_(connection),
1096 11599 : session_(NULL),
1097 11599 : server_(connection->server()),
1098 11599 : connect_timer_(
1099 11599 : TimerManager::CreateTimer(*server_->event_manager()->io_service(),
1100 : "Connect timer",
1101 : TaskScheduler::GetInstance()->GetTaskId("xmpp::StateMachine"),
1102 : connection->GetTaskInstance())),
1103 11599 : open_timer_(
1104 11599 : TimerManager::CreateTimer(*server_->event_manager()->io_service(),
1105 : "Open timer",
1106 : TaskScheduler::GetInstance()->GetTaskId("xmpp::StateMachine"),
1107 : connection->GetTaskInstance())),
1108 11599 : hold_timer_(
1109 11599 : TimerManager::CreateTimer(*server_->event_manager()->io_service(),
1110 : "Hold timer",
1111 : TaskScheduler::GetInstance()->GetTaskId("xmpp::StateMachine"),
1112 : connection->GetTaskInstance())),
1113 11599 : config_hold_time_(config_hold_time),
1114 11599 : hold_time_(GetConfiguredHoldTime()),
1115 11599 : attempts_(0),
1116 11599 : keepalive_count_(0),
1117 11599 : deleted_(false),
1118 11599 : in_dequeue_(false),
1119 11599 : is_active_(active),
1120 11599 : auth_enabled_(auth_enabled),
1121 11599 : state_(xmsm::IDLE),
1122 11599 : last_state_(xmsm::IDLE),
1123 23198 : openconfirm_state_(xmsm::OPENCONFIRM_INIT) {
1124 : handshake_cb_ = boost::bind(
1125 11599 : &XmppConnection::ProcessSslHandShakeResponse, connection, _1, _2);
1126 11599 : }
1127 :
1128 11690 : XmppStateMachine::~XmppStateMachine() {
1129 11599 : assert(!deleted_);
1130 11599 : deleted_ = true;
1131 11599 : work_queue_.Shutdown();
1132 11599 : set_session(NULL);
1133 :
1134 : // Explicitly call the state destructor before the state machine itself.
1135 : // This is needed because some of the destructors access the state machine
1136 : // context.
1137 11599 : terminate();
1138 :
1139 : // Delete timer after state machine is terminated so that there is no
1140 : // possible reference to the timers being deleted any more
1141 11599 : TimerManager::DeleteTimer(connect_timer_);
1142 11599 : TimerManager::DeleteTimer(open_timer_);
1143 11599 : TimerManager::DeleteTimer(hold_timer_);
1144 11690 : }
1145 :
1146 11829 : void XmppStateMachine::Initialize() {
1147 11829 : initiate();
1148 11829 : Enqueue(xmsm::EvStart());
1149 11829 : }
1150 :
1151 : // Note this api does not enqueue the deletion of TCP session
1152 11599 : void XmppStateMachine::clear_session() {
1153 11599 : if (session_ != NULL) {
1154 2038 : session_->set_observer(NULL);
1155 2038 : session_->ClearConnection();
1156 2038 : session_->Close();
1157 2038 : connection_->clear_session();
1158 2038 : session_ = NULL;
1159 : }
1160 11599 : }
1161 :
1162 12008 : void XmppStateMachine::DeleteSession(XmppSession *session) {
1163 : // If there is a session assigned to this state machine, reset the observer
1164 : // so that tcp does not have a reference to 'this' which is going away
1165 12008 : if (session != NULL) {
1166 12008 : session->set_observer(NULL);
1167 12007 : session->ClearConnection();
1168 12007 : session->Close();
1169 12008 : Enqueue(xmsm::EvTcpDeleteSession(session));
1170 : }
1171 12008 : }
1172 :
1173 39160 : void XmppStateMachine::set_session(TcpSession *session) {
1174 39160 : if (session_ != NULL) {
1175 12007 : connection_->clear_session();
1176 12008 : DeleteSession(session_);
1177 : }
1178 39161 : session_ = static_cast<XmppSession *>(session);
1179 39161 : }
1180 :
1181 0 : void XmppStateMachine::TimerErrorHandler(std::string name, std::string error) {
1182 0 : }
1183 :
1184 145 : void XmppStateMachine::StartConnectTimer(int seconds) {
1185 145 : CancelConnectTimer();
1186 :
1187 : // Add up to +/- kJitter percentage to reduce connection collisions.
1188 145 : int ms = ((seconds)? seconds * 1000 : 50);
1189 145 : ms = (ms * (100 - kJitter)) / 100;
1190 145 : ms += (ms * (rand() % (kJitter * 2))) / 100;
1191 145 : connect_timer_->Start(ms,
1192 : boost::bind(&XmppStateMachine::ConnectTimerExpired, this),
1193 : boost::bind(&XmppStateMachine::TimerErrorHandler, this, _1, _2));
1194 145 : }
1195 :
1196 30887 : void XmppStateMachine::CancelConnectTimer() {
1197 30887 : connect_timer_->Cancel();
1198 30888 : }
1199 :
1200 14 : void XmppStateMachine::StartOpenTimer(int seconds) {
1201 14 : CancelOpenTimer();
1202 14 : open_timer_->Start(seconds * 1000,
1203 : boost::bind(&XmppStateMachine::OpenTimerExpired, this),
1204 : boost::bind(&XmppStateMachine::TimerErrorHandler, this, _1, _2));
1205 14 : }
1206 :
1207 22329 : void XmppStateMachine::CancelOpenTimer() {
1208 22329 : open_timer_->Cancel();
1209 22329 : }
1210 :
1211 12086 : int XmppStateMachine::GetConfiguredHoldTime() const {
1212 : static std::atomic<bool> env_checked = std::atomic<bool>();
1213 : static std::atomic<int> env_hold_time = std::atomic<int>();
1214 :
1215 : // For testing only - configure through environment variable.
1216 12086 : if (!env_checked) {
1217 63 : char *keepalive_time_str = getenv("XMPP_KEEPALIVE_SECONDS");
1218 63 : if (keepalive_time_str) {
1219 0 : env_hold_time = strtoul(keepalive_time_str, NULL, 0) * 3;
1220 0 : env_checked = true;
1221 0 : return env_hold_time;
1222 : } else {
1223 63 : env_checked = true;
1224 : }
1225 12023 : } else if (env_hold_time) {
1226 0 : return env_hold_time;
1227 12023 : } else if (config_hold_time_) {
1228 12023 : return config_hold_time_;
1229 : }
1230 :
1231 : // Use hard coded default.
1232 63 : return kHoldTime;
1233 : }
1234 :
1235 4135712 : void XmppStateMachine::StartHoldTimer() {
1236 4135712 : CancelHoldTimer();
1237 :
1238 4135716 : if (hold_time_ <= 0)
1239 0 : return;
1240 :
1241 4135716 : hold_timer_->Start(hold_time_ * 1000,
1242 : boost::bind(&XmppStateMachine::HoldTimerExpired, this),
1243 : boost::bind(&XmppStateMachine::TimerErrorHandler, this, _1, _2));
1244 : }
1245 :
1246 4167474 : void XmppStateMachine::CancelHoldTimer() {
1247 4167474 : hold_timer_->Cancel();
1248 4167480 : }
1249 :
1250 12046 : void XmppStateMachine::OnStart(const xmsm::EvStart &event) {
1251 12046 : }
1252 :
1253 7485 : bool XmppStateMachine::ConnectTimerExpired() {
1254 7485 : XMPP_UTDEBUG(XmppStateMachineTimerExpire,
1255 : connection() ? connection()->ToUVEKey() : "",
1256 : XMPP_PEER_DIR_NA, this->ChannelType(), "Connect", StateName());
1257 7485 : Enqueue(xmsm::EvConnectTimerExpired());
1258 7485 : return false;
1259 : }
1260 :
1261 1 : bool XmppStateMachine::OpenTimerExpired() {
1262 1 : XMPP_NOTICE(XmppEventLog, connection()->ToUVEKey(), XMPP_PEER_DIR_NA,
1263 : this->ChannelType(),
1264 : "Event: OpenTimer Expired ",
1265 : connection()->endpoint().address().to_string(),
1266 : connection()->GetTo());
1267 1 : Enqueue(xmsm::EvOpenTimerExpired());
1268 1 : return false;
1269 : }
1270 :
1271 :
1272 13 : bool XmppStateMachine::HoldTimerExpired() {
1273 13 : boost::system::error_code error;
1274 :
1275 : // Reset hold timer if there is data already present in the socket.
1276 26 : if (session() && session()->socket() &&
1277 13 : session()->socket()->available(error) > 0) {
1278 0 : return true;
1279 : }
1280 13 : XMPP_NOTICE(XmppEventLog, connection()->ToUVEKey(), XMPP_PEER_DIR_NA,
1281 : this->ChannelType(), "Event: HoldTimer Expired ",
1282 : connection()->endpoint().address().to_string(),
1283 : connection()->GetTo());
1284 13 : Enqueue(xmsm::EvHoldTimerExpired());
1285 13 : return false;
1286 : }
1287 :
1288 20451 : void XmppStateMachine::OnSessionEvent(
1289 : TcpSession *session, TcpSession::Event event) {
1290 20451 : switch (event) {
1291 6541 : case TcpSession::ACCEPT:
1292 6541 : break;
1293 6580 : case TcpSession::CONNECT_COMPLETE:
1294 6580 : XMPP_NOTICE(XmppEventLog, session->ToUVEKey(), XMPP_PEER_DIR_IN,
1295 : this->ChannelType(), "Event: Tcp Connected ",
1296 : connection()->endpoint().address().to_string(),
1297 : connection()->GetTo());
1298 6580 : Enqueue(xmsm::EvTcpConnected(static_cast<XmppSession *>(session)));
1299 6580 : break;
1300 900 : case TcpSession::CONNECT_FAILED:
1301 900 : XMPP_NOTICE(XmppEventLog, session->ToUVEKey(), XMPP_PEER_DIR_IN,
1302 : this->ChannelType(), "Event: Tcp Connect Fail ",
1303 : connection()->endpoint().address().to_string(),
1304 : connection()->GetTo());
1305 900 : Enqueue(xmsm::EvTcpConnectFail(static_cast<XmppSession *>(session)));
1306 900 : connection_->inc_connect_error();
1307 900 : break;
1308 6430 : case TcpSession::CLOSE:
1309 6430 : XMPP_NOTICE(XmppEventLog, session->ToUVEKey(), XMPP_PEER_DIR_IN,
1310 : this->ChannelType(), "Event: Tcp Connection Closed ",
1311 : connection()->endpoint().address().to_string(),
1312 : connection()->GetTo());
1313 6430 : Enqueue(xmsm::EvTcpClose(static_cast<XmppSession *>(session)));
1314 6430 : connection_->inc_session_close();
1315 6430 : break;
1316 0 : default:
1317 0 : XMPP_WARNING(XmppUnknownEvent, session->ToUVEKey(), XMPP_PEER_DIR_IN,
1318 : this->ChannelType(), event);
1319 0 : break;
1320 : }
1321 20451 : }
1322 :
1323 6518 : bool XmppStateMachine::PassiveOpen(XmppSession *session) {
1324 6518 : string state = "PassiveOpen in state: " + StateName();
1325 6518 : XMPP_NOTICE(XmppEventLog, session->ToUVEKey(), XMPP_PEER_DIR_IN,
1326 : this->ChannelType(), state,
1327 : session->Connection()->endpoint().address().to_string(), "");
1328 13036 : return Enqueue(xmsm::EvTcpPassiveOpen(session));
1329 6518 : }
1330 :
1331 : // Process XmppStream header message received over a session. Close the stream
1332 : // if an old session is still present and undergoing graceful restart.
1333 : //
1334 : // Return true if msg is enqueued for further processing, false otherwise.
1335 15765 : void XmppStateMachine::ProcessStreamHeaderMessage(XmppSession *session,
1336 : const XmppStanza::XmppMessage *msg) {
1337 : XmppConnectionManager *connection_manager =
1338 15765 : dynamic_cast<XmppConnectionManager *>(connection_->server());
1339 15765 : std::scoped_lock lock(connection_manager->mutex());
1340 :
1341 : // Update "To" information which can be used to map an older session
1342 15765 : session->Connection()->SetTo(msg->from);
1343 :
1344 15764 : XmppServer *xmpp_server = dynamic_cast<XmppServer *>(server_);
1345 15764 : XmppConnectionEndpoint *endpoint = NULL;
1346 :
1347 : // Look for an endpoint which may already exist
1348 15764 : if (xmpp_server) {
1349 : endpoint =
1350 7994 : xmpp_server->FindConnectionEndpoint(connection_->ToString());
1351 :
1352 15998 : if (!xmpp_server->subcluster_name().empty() &&
1353 8003 : xmpp_server->subcluster_name() != msg->xmlns) {
1354 2 : string reason = "Subcluster mismatch: Agent subcluster " +
1355 4 : msg->xmlns + ", Control subcluster " +
1356 4 : xmpp_server->subcluster_name();
1357 2 : XMPP_WARNING(XmppDeleteConnection, session->ToUVEKey(),
1358 : XMPP_PEER_DIR_IN,
1359 : "Drop new xmpp connection " + session->ToString() +
1360 : " as " + reason);
1361 2 : session->Connection()->set_close_reason(reason);
1362 2 : ProcessEvent(xmsm::EvTcpClose(session));
1363 2 : delete msg;
1364 2 : return;
1365 2 : }
1366 : }
1367 :
1368 : // If there is no connection already associated with the end-point,
1369 : // process the incoming open message and move forward with the session
1370 : // establishment.
1371 16092 : if (!endpoint || !endpoint->connection() ||
1372 330 : connection_ == endpoint->connection()) {
1373 15434 : ProcessEvent(xmsm::EvXmppOpen(session, msg));
1374 15434 : return;
1375 : }
1376 :
1377 : // Check if the IP addresses match.
1378 : boost::asio::ip::address addr =
1379 328 : endpoint->connection()->endpoint().address();
1380 329 : if (connection_->endpoint().address() != addr) {
1381 36 : XMPP_WARNING(XmppDeleteConnection, session->ToUVEKey(),
1382 : XMPP_PEER_DIR_IN,
1383 : "Drop new xmpp connection " + session->ToString() +
1384 : " as another connection with same name " + msg->from +
1385 : " but with different IP address " + addr.to_string() +
1386 : " already exists");
1387 36 : ProcessEvent(xmsm::EvTcpClose(session));
1388 36 : delete msg;
1389 36 : return;
1390 : }
1391 :
1392 293 : XmppChannelMux *channel = endpoint->connection()->ChannelMux();
1393 :
1394 : // If GR Helper mode is not enabled, ignore the new connection request.
1395 : // Existing connection, if down will get cleaned up eventually. If it is
1396 : // up, it shall remain intact.
1397 293 : if (!xmpp_server->IsPeerCloseGraceful()) {
1398 126 : XMPP_NOTICE(XmppDeleteConnection, session->ToUVEKey(),
1399 : XMPP_PEER_DIR_IN,
1400 : "Drop new xmpp connection " + session->ToString() +
1401 : " as another connection is alreready present");
1402 126 : ProcessEvent(xmsm::EvTcpClose(session));
1403 126 : delete msg;
1404 126 : return;
1405 : }
1406 :
1407 167 : XmppStateMachine *old_sm = endpoint->connection()->state_machine();
1408 :
1409 : // Bring down old session if connection is already up and ready. This is
1410 : // the scenario in which old session's TCP did not learn the session down
1411 : // event, possibly due to compute cold reboot. In that case, trigger
1412 : // closure (and possibly GR) process for the old session.
1413 167 : if (channel->GetPeerState() == xmps::READY) {
1414 0 : XMPP_NOTICE(XmppDeleteConnection, old_sm->session()->ToUVEKey(),
1415 : XMPP_PEER_DIR_IN, "Delete old xmpp connection " +
1416 : old_sm->session()->ToString() +
1417 : " as a new connection as been initiated (GR Helper is active)");
1418 0 : old_sm->Enqueue(xmsm::EvTcpClose(old_sm->session()));
1419 :
1420 : // Drop the new session until old one is deleted or marked stale.
1421 0 : ProcessEvent(xmsm::EvTcpClose(session));
1422 0 : delete msg;
1423 0 : return;
1424 : }
1425 :
1426 : // If previous closure is still in progress, drop the new connection.
1427 167 : if (channel->ReceiverCount()) {
1428 11 : XMPP_NOTICE(XmppDeleteConnection, session->ToUVEKey(),
1429 : XMPP_PEER_DIR_IN,
1430 : "Drop new xmpp connection " + session->ToString() +
1431 : " as current connection is still under deletion");
1432 11 : ProcessEvent(xmsm::EvTcpClose(session));
1433 11 : delete msg;
1434 11 : return;
1435 : }
1436 :
1437 : // Now we reach for the classic GR case, in which existing session stale
1438 : // process is complete, (peer is in GR/LLGR timer wait state).
1439 : // In this case, we should process the open message, and later switch to
1440 : // the new connection and state machine, retaining the old XmppPeer,
1441 : // BgpXmppChannel, routes, etc.
1442 156 : ProcessEvent(xmsm::EvXmppOpen(session, msg));
1443 15765 : }
1444 :
1445 4123313 : void XmppStateMachine::OnMessage(XmppSession *session,
1446 : const XmppStanza::XmppMessage *msg) {
1447 4123313 : if (!Enqueue(xmsm::EvXmppMessage(session, msg)))
1448 0 : delete msg;
1449 4123312 : }
1450 :
1451 4123311 : void XmppStateMachine::ProcessMessage(XmppSession *session,
1452 : const XmppStanza::XmppMessage *msg) {
1453 : // Bail if session is already reset and disassociated from the connection.
1454 4123311 : if (!session->Connection()) {
1455 303 : delete msg;
1456 309 : return;
1457 : }
1458 :
1459 4123008 : const XmppStanza::XmppStreamMessage *stream_msg =
1460 : static_cast<const XmppStanza::XmppStreamMessage *>(msg);
1461 :
1462 4123008 : switch (msg->type) {
1463 20165 : case XmppStanza::STREAM_HEADER:
1464 20165 : if (stream_msg->strmtype ==
1465 : XmppStanza::XmppStreamMessage::FEATURE_TLS) {
1466 :
1467 4400 : switch (stream_msg->strmtlstype) {
1468 1469 : case (XmppStanza::XmppStreamMessage::TLS_FEATURE_REQUEST):
1469 1469 : ProcessEvent(xmsm::EvStreamFeatureRequest(session,
1470 : msg));
1471 1469 : break;
1472 1464 : case (XmppStanza::XmppStreamMessage::TLS_START):
1473 1464 : ProcessEvent(xmsm::EvStartTls(session, msg));
1474 1464 : break;
1475 1467 : case (XmppStanza::XmppStreamMessage::TLS_PROCEED):
1476 1467 : ProcessEvent(xmsm::EvTlsProceed(session, msg));
1477 1467 : break;
1478 0 : default:
1479 0 : break;
1480 : }
1481 :
1482 15765 : } else if (stream_msg->strmtype ==
1483 0 : XmppStanza::XmppStreamMessage::INIT_STREAM_HEADER ||
1484 0 : stream_msg->strmtype ==
1485 : XmppStanza::XmppStreamMessage::INIT_STREAM_HEADER_RESP)
1486 15765 : ProcessStreamHeaderMessage(session, msg);
1487 20164 : break;
1488 2485894 : case XmppStanza::WHITESPACE_MESSAGE_STANZA:
1489 2485894 : ProcessEvent(xmsm::EvXmppKeepalive(session, msg));
1490 2485894 : break;
1491 56891 : case XmppStanza::IQ_STANZA:
1492 56891 : ProcessEvent(xmsm::EvXmppIqStanza(session, msg));
1493 56891 : break;
1494 1560058 : case XmppStanza::MESSAGE_STANZA:
1495 1560058 : ProcessEvent(xmsm::EvXmppMessageStanza(session, msg));
1496 1560058 : break;
1497 0 : default:
1498 0 : if (!msg->IsValidType(msg->type)) {
1499 0 : XMPP_NOTICE(XmppStateMachineUnsupportedMessage,
1500 : XMPP_PEER_DIR_IN, session->ToUVEKey(),
1501 : ChannelType(), (int)msg->type);
1502 : }
1503 0 : delete msg;
1504 0 : break;
1505 : }
1506 :
1507 : }
1508 :
1509 9199 : void XmppStateMachine::Clear() {
1510 9199 : Enqueue(xmsm::EvStop());
1511 9199 : }
1512 :
1513 3768 : void XmppStateMachine::SetAdminState(bool down) {
1514 3768 : assert(IsActiveChannel());
1515 3768 : if (down) {
1516 3508 : Enqueue(xmsm::EvAdminDown());
1517 : } else {
1518 260 : Enqueue(xmsm::EvStart());
1519 : }
1520 3768 : }
1521 :
1522 2926 : void XmppStateMachine::OnEvent(SslSession *session,
1523 : xmsm::SslHandShakeResponse resp) {
1524 2926 : XmppSession *sess = static_cast<XmppSession *>(session);
1525 2926 : switch(resp) {
1526 2 : case xmsm::EvTLSHANDSHAKE_FAILURE:
1527 2 : Enqueue(xmsm::EvTlsHandShakeFailure(sess));
1528 2 : break;
1529 2924 : case xmsm::EvTLSHANDSHAKE_SUCCESS:
1530 2924 : Enqueue(xmsm::EvTlsHandShakeSuccess(sess));
1531 2924 : break;
1532 0 : default:
1533 0 : break;
1534 : }
1535 2926 : }
1536 :
1537 67138 : void XmppStateMachine::set_state(xmsm::XmState state) {
1538 67138 : last_state_ = state_;
1539 67138 : state_ = state;
1540 67138 : state_since_ = UTCTimestampUsec();
1541 :
1542 67142 : if (!logUVE()) return;
1543 :
1544 0 : XmppPeerInfoData peer_info;
1545 0 : peer_info.set_name(connection()->ToUVEKey());
1546 0 : PeerStateInfo state_info;
1547 0 : state_info.set_state(StateName());
1548 0 : state_info.set_last_state(LastStateName());
1549 0 : state_info.set_last_state_at(state_since_);
1550 0 : peer_info.set_state_info(state_info);
1551 0 : assert(!peer_info.get_name().empty());
1552 0 : XMPPPeerInfo::Send(peer_info);
1553 0 : }
1554 :
1555 :
1556 7377 : void XmppStateMachine::set_openconfirm_state(xmsm::XmOpenConfirmState state) {
1557 7377 : openconfirm_state_ = state;
1558 7377 : }
1559 :
1560 : static const char *state_names[] = {
1561 : "Idle",
1562 : "Active",
1563 : "Connect",
1564 : "OpenSent",
1565 : "OpenConfirm",
1566 : "Established" };
1567 :
1568 4086440 : string XmppStateMachine::StateName() const {
1569 4086440 : return state_names[state_];
1570 : }
1571 :
1572 559 : string XmppStateMachine::LastStateName() const {
1573 559 : return state_names[last_state_];
1574 : }
1575 :
1576 559 : string XmppStateMachine::LastStateChangeAt() const {
1577 1118 : return integerToString(UTCUsecToPTime(state_since_));
1578 : }
1579 :
1580 7900041 : xmsm::XmState XmppStateMachine::StateType() const {
1581 7900041 : return state_;
1582 : }
1583 :
1584 4365640 : xmsm::XmOpenConfirmState XmppStateMachine::OpenConfirmStateType() const {
1585 4365640 : return openconfirm_state_;
1586 : }
1587 :
1588 12670 : void XmppStateMachine::AssignSession() {
1589 12670 : connection_->set_session(static_cast<XmppSession *>(session_));
1590 12670 : }
1591 :
1592 : const int XmppStateMachine::kConnectInterval;
1593 :
1594 16401 : int XmppStateMachine::GetConnectTime() const {
1595 16401 : int backoff = attempts_ > 6 ? 6 : attempts_;
1596 16401 : return std::min(backoff ? 1 << (backoff - 1) : 0, kConnectInterval);
1597 : }
1598 :
1599 4082430 : bool XmppStateMachine::IsActiveChannel() {
1600 4082430 : return is_active_;
1601 : }
1602 :
1603 4245364 : bool XmppStateMachine::logUVE() {
1604 4245364 : return connection()->logUVE();
1605 : }
1606 :
1607 4009886 : const char *XmppStateMachine::ChannelType() {
1608 4009886 : return (IsActiveChannel() ? " Mode Client: " : " Mode Server: " );
1609 : }
1610 :
1611 4190545 : bool XmppStateMachine::DequeueEvent(
1612 : boost::intrusive_ptr<const sc::event_base> event) {
1613 : // Process message event and enqueue additional events as necessary.
1614 : const xmsm::EvXmppMessage *ev_xmpp_message =
1615 4190545 : dynamic_cast<const xmsm::EvXmppMessage *>(event.get());
1616 4190542 : if (ev_xmpp_message) {
1617 4123312 : ProcessMessage(ev_xmpp_message->session, ev_xmpp_message->msg);
1618 : } else {
1619 67230 : ProcessEvent(*event);
1620 67229 : event.reset();
1621 : }
1622 4190549 : return true;
1623 : }
1624 :
1625 4190202 : void XmppStateMachine::ProcessEvent(const sc::event_base &event) {
1626 4190202 : const xmsm::EvTcpDeleteSession *deferred_delete =
1627 4190202 : dynamic_cast<const xmsm::EvTcpDeleteSession *>(&event);
1628 4190202 : XMPP_UTDEBUG(XmppStateMachineDequeueEvent,
1629 : connection() ? connection()->ToUVEKey() : "",
1630 : XMPP_PEER_DIR_IN, ChannelType(), TYPE_NAME(event), StateName(),
1631 : connection() ?
1632 : connection()->endpoint().address().to_string() : "",
1633 : connection() ? connection()->GetTo() : "");
1634 4190241 : if (deferred_delete) {
1635 12009 : TcpSession *session = deferred_delete->session;
1636 12009 : session->server()->DeleteSession(session);
1637 12009 : return;
1638 : }
1639 4178232 : if (deleted_) {
1640 0 : return;
1641 : }
1642 :
1643 4178232 : update_last_event(TYPE_NAME(event));
1644 4178222 : in_dequeue_ = true;
1645 4178222 : process_event(event);
1646 4178214 : in_dequeue_ = false;
1647 : }
1648 :
1649 10569 : void XmppStateMachine::unconsumed_event(const sc::event_base &event) {
1650 10569 : XMPP_INFO(XmppUnconsumedEvent, connection() ? connection()->ToUVEKey() : "",
1651 : XMPP_PEER_DIR_IN, ChannelType(), TYPE_NAME(event), StateName());
1652 10570 : }
1653 :
1654 4178223 : void XmppStateMachine::update_last_event(const std::string &event) {
1655 4178223 : set_last_event(event);
1656 :
1657 4178228 : if (!logUVE()) return;
1658 :
1659 : // Skip iq and message events.
1660 0 : if (event == "xmsm::EvXmppIqStanza" ||
1661 0 : event == "xmsm::EvXmppMessageStanza") {
1662 0 : return;
1663 : }
1664 :
1665 : // Skip keepalive events after we've reached established state.
1666 0 : if (state_ == xmsm::ESTABLISHED && event == "xmsm::EvXmppKeepalive") {
1667 0 : return;
1668 : }
1669 :
1670 0 : XmppPeerInfoData peer_info;
1671 0 : peer_info.set_name(connection()->ToUVEKey());
1672 0 : PeerEventInfo event_info;
1673 0 : event_info.set_last_event(last_event_);
1674 0 : event_info.set_last_event_at(last_event_at_);
1675 0 : peer_info.set_event_info(event_info);
1676 :
1677 0 : assert(!peer_info.get_name().empty());
1678 0 : XMPPPeerInfo::Send(peer_info);
1679 0 : }
1680 :
1681 : //
1682 : // Enqueue an event to xmpp state machine.
1683 : // Return false if the event is not enqueued.
1684 : //
1685 4190957 : bool XmppStateMachine::Enqueue(const sc::event_base &event) {
1686 4190957 : if (!deleted_) {
1687 4190957 : work_queue_.Enqueue(event.intrusive_from_this());
1688 4190974 : return true;
1689 : }
1690 :
1691 0 : return false;
1692 : }
1693 :
1694 : // Object trace routines.
1695 44118 : void XmppStateMachine::SendConnectionInfo(const string &event,
1696 : const string &nextstate) {
1697 44118 : XmppConnectionInfo info;
1698 44112 : SendConnectionInfo(&info, event, nextstate);
1699 88234 : return;
1700 44117 : }
1701 :
1702 86577 : void XmppStateMachine::SendConnectionInfo(XmppConnectionInfo *info,
1703 : const string &event, const string &nextstate) {
1704 :
1705 86577 : info->set_ip_address(this->connection()->endpoint().address().to_string());
1706 86591 : info->set_state(StateName());
1707 86583 : info->set_event(event);
1708 86582 : if (!nextstate.empty()) {
1709 69463 : info->set_next_state(nextstate);
1710 : }
1711 86581 : XMPP_CONNECTION_LOG_MSG(*info);
1712 86582 : return;
1713 : }
1714 :
1715 : // Resurrect an old xmpp connection if present (when under GracefulRestart)
1716 : //
1717 : // During Graceful Restart (or otherwise), new connections are rejected in
1718 : // ProcessStreamHeaderMessage() itself until old one's cleanup process is
1719 : // complete and the system is ready to start a new session.
1720 : //
1721 : // Hence in here, when called upon receipt of OpenMessage, we can try to reuse
1722 : // old XmppConnection if present and there by complete any pending GR process
1723 : //
1724 : // We do so by reusing XmppConnection, XmppChannel, etc. from the old connection
1725 : // and only use the XmppSession and XmppStateMachine from the new session
1726 : //
1727 : // New connection is instead associated with the old state machine and session,
1728 : // and their deletion is triggered
1729 6306 : void XmppStateMachine::ResurrectOldConnection(XmppConnection *new_connection,
1730 : XmppSession *new_session) {
1731 :
1732 : // Look for an endpoint (which is a persistent data structure) across
1733 : // xmpp session flips
1734 : bool created;
1735 : XmppConnectionEndpoint *connection_endpoint =
1736 : static_cast<XmppServer *>(
1737 6306 : new_connection->server())->LocateConnectionEndpoint(
1738 : static_cast<XmppServerConnection *>(new_connection), created);
1739 :
1740 : // If this is a new endpoint, then there is no older connection to manage.
1741 6307 : if (created)
1742 6203 : return;
1743 :
1744 : // GR Helper must be enabled when we are trying to resurrect connection.
1745 104 : XmppServer *server = dynamic_cast<XmppServer *>(new_connection->server());
1746 104 : assert(server->IsPeerCloseGraceful());
1747 :
1748 104 : XMPP_NOTICE(XmppCreateConnection, new_session->ToUVEKey(),
1749 : XMPP_PEER_DIR_IN,
1750 : "Resurrect xmpp connection " + new_session->ToString());
1751 :
1752 : // Retrieve old XmppConnection and XmppStateMachine (to reuse)
1753 104 : XmppConnection *old_xmpp_connection = connection_endpoint->connection();
1754 104 : assert(old_xmpp_connection);
1755 :
1756 104 : XmppStateMachine *old_state_machine = old_xmpp_connection->state_machine();
1757 104 : assert(old_state_machine);
1758 :
1759 : // Swap Old and New connections and state machines linkages
1760 104 : new_connection->SwapXmppStateMachine(old_xmpp_connection);
1761 104 : this->SwapXmppConnection(old_state_machine);
1762 :
1763 : // Update XmppConnection in the old session.
1764 104 : XmppSession *old_xmpp_session = old_state_machine->session();
1765 104 : if (old_xmpp_session)
1766 0 : old_xmpp_session->SetConnection(new_connection);
1767 104 : new_session->SetConnection(old_xmpp_connection);
1768 :
1769 : // Set new session with the old connection as it would be the current active
1770 : // connection from now on.
1771 104 : old_xmpp_connection->set_session(new_session);
1772 :
1773 : // Trigger deletion of the new connection which now is associated wth the
1774 : // the old_state_machine
1775 104 : new_connection->Shutdown();
1776 : }
1777 :
1778 104 : void XmppStateMachine::SwapXmppConnection(XmppStateMachine *other) {
1779 104 : swap(connection_, other->connection_);
1780 104 : connection_->SwapContents(other->connection_);
1781 104 : }
|