Line data Source code
1 : /* 2 : * Copyright (c) 2017 Juniper Networks, Inc. All rights reserved. 3 : */ 4 : 5 : #include <cmn/agent_cmn.h> 6 : #include <cmn/event_notifier.h> 7 : 8 : //Event registeration handle 9 0 : EventNotifyHandle::EventNotifyHandle(KeyPtr key, Callback cb) : 10 0 : key_(key), cb_(cb) { 11 0 : } 12 : 13 0 : EventNotifyHandle::~EventNotifyHandle() { 14 0 : } 15 : 16 : //Event notify manager 17 2 : EventNotifier::EventNotifier(Agent *agent) : map_(), 18 2 : work_queue_(agent->task_scheduler()->GetTaskId(kEventNotifierTask), 0, 19 : boost::bind(&EventNotifier::Process, this, _1)), 20 2 : agent_(agent) { 21 2 : work_queue_.set_name("Event Notify Manager"); 22 2 : } 23 : 24 4 : EventNotifier::~EventNotifier() { 25 2 : work_queue_.Shutdown(); 26 2 : assert(map_.empty() == true); 27 4 : } 28 : 29 0 : bool EventNotifier::Enqueue(WorkQueueMessage::Ptr data) { 30 0 : work_queue_.Enqueue(data); 31 0 : return true; 32 : } 33 : 34 0 : bool EventNotifier::Process(WorkQueueMessage::Ptr data) { 35 0 : if (data->type_ == WorkQueueMessage::PUBLISHER) { 36 0 : NotifyInternal(data->handle_ptr_->key()); 37 0 : } else if (data->type_ == WorkQueueMessage::REGISTER_SUBSCRIBER) { 38 0 : RegisterSubscriberInternal(data->handle_ptr_); 39 0 : } else if (data->type_ == WorkQueueMessage::DEREGISTER_SUBSCRIBER) { 40 0 : DeRegisterSubscriberInternal(data->handle_ptr_); 41 : } 42 0 : return true; 43 : } 44 : 45 0 : void EventNotifier::Notify(EventNotifyKey *key) { 46 0 : EventNotifyHandle::Ptr handle(new EventNotifyHandle(KeyPtr(key), Callback())); 47 : WorkQueueMessage::Ptr data(new WorkQueueMessage(WorkQueueMessage::PUBLISHER, 48 0 : handle)); 49 0 : work_queue_.Enqueue(data); 50 0 : } 51 : 52 0 : void EventNotifier::NotifyInternal(KeyPtr key) { 53 0 : EventNotifier::NotifyMapIter map_it = map_.find(key); 54 0 : if (map_it == map_.end()) { 55 : //No subscribers 56 0 : return; 57 : } 58 0 : EventNotifier::SubscribersList list = map_it->second; 59 0 : for (EventNotifier::SubscribersListIter it = list.begin(); 60 0 : it != list.end(); it++) { 61 0 : (*it)->Notify(); 62 : } 63 0 : } 64 : 65 : EventNotifyHandle::Ptr 66 0 : EventNotifier::RegisterSubscriber(EventNotifyKey *key, 67 : Callback cb) { 68 0 : EventNotifyHandle::Ptr handle(new EventNotifyHandle(KeyPtr(key), cb)); 69 : WorkQueueMessage::Ptr data(new WorkQueueMessage 70 0 : (WorkQueueMessage::REGISTER_SUBSCRIBER, handle)); 71 0 : work_queue_.Enqueue(data); 72 0 : return handle; 73 0 : } 74 : 75 : void 76 0 : EventNotifier::RegisterSubscriberInternal(EventNotifyHandle::Ptr handle) { 77 : //It is not ensured that same subscriber is not registered multiple times. 78 0 : map_[handle->key()].push_back(handle); 79 0 : } 80 : 81 0 : void EventNotifier::DeregisterSubscriber(EventNotifyHandle::Ptr handle) { 82 : WorkQueueMessage::Ptr data(new WorkQueueMessage 83 0 : (WorkQueueMessage::DEREGISTER_SUBSCRIBER, handle)); 84 0 : work_queue_.Enqueue(data); 85 0 : } 86 : 87 : void 88 0 : EventNotifier::DeRegisterSubscriberInternal(EventNotifyHandle::Ptr handle) { 89 0 : if (handle == NULL) 90 0 : return; 91 : 92 0 : KeyPtr key_ptr = static_cast<KeyPtr>(handle->key()); 93 0 : EventNotifier::NotifyMapIter map_it = map_.find(key_ptr); 94 0 : if (map_it == map_.end()) { 95 0 : return; 96 : } 97 : 98 : EventNotifier::SubscribersListIter it = 99 0 : std::find(map_it->second.begin(), map_it->second.end(), handle); 100 0 : if (it == map_it->second.end()) { 101 0 : return; 102 : } 103 0 : map_it->second.erase(it); 104 : 105 0 : if (map_it->second.empty()) { 106 0 : map_.erase(map_it); 107 : } 108 : 109 0 : return; 110 0 : } 111 : 112 0 : EventNotifier::WorkQueueMessage::WorkQueueMessage(Type type, 113 0 : EventNotifyHandle::Ptr ptr) : 114 0 : type_(type), handle_ptr_(ptr) { 115 0 : } 116 : 117 : //EventNotifyKey 118 0 : EventNotifyKey::EventNotifyKey(Type type) : type_(type) { 119 0 : ref_count_ = 0; 120 0 : } 121 : 122 0 : EventNotifyKey::~EventNotifyKey() { 123 0 : } 124 : 125 0 : bool EventNotifyKey::IsLess(const EventNotifyKey &rhs) const { 126 0 : if (type() != rhs.type()) { 127 0 : return (type() < rhs.type()); 128 : } 129 0 : return false; 130 : }