Line data Source code
1 : /*
2 : * Copyright (c) 2015 Juniper Networks, Inc. All rights reserved.
3 : */
4 :
5 : #ifndef _GNU_SOURCE
6 : #define _GNU_SOURCE
7 : #endif
8 : #include <sched.h>
9 :
10 : #include <unistd.h>
11 : #include <stdlib.h>
12 : #include <sys/eventfd.h>
13 :
14 : #include <algorithm>
15 : #include <vector>
16 : #include <set>
17 : #include <boost/algorithm/string/case_conv.hpp>
18 : #include <boost/thread.hpp>
19 :
20 : #include <tbb/concurrent_queue.h>
21 :
22 : #include "ksync_object.h"
23 : #include "ksync_sock.h"
24 :
25 : static bool ksync_tx_queue_task_done_ = false;
26 :
27 : // Set CPU affinity for KSync Tx Thread based on cpu_pin_policy.
28 : // By default CPU affinity is not set. cpu_pin_policy can change it,
29 : // "last" : Last CPU-ID
30 : // "<num>" : Specifies CPU-ID to pin
31 0 : static void set_thread_affinity(std::string cpu_pin_policy) {
32 0 : int num_cores = boost::thread::hardware_concurrency();
33 0 : if (!num_cores) {
34 0 : LOG(ERROR, "Failure in checking number of available threads");
35 0 : num_cores = 1;
36 : }
37 0 : char *p = NULL;
38 0 : int cpu_id = strtoul(cpu_pin_policy.c_str(), &p, 0);
39 0 : if (*p || cpu_pin_policy.empty()) {
40 : // cpu_pin_policy is non-integer
41 : // Assume pinning disabled by default
42 0 : cpu_id = -1;
43 : // If policy is "last", pick last CPU-ID
44 0 : boost::algorithm::to_lower(cpu_pin_policy);
45 0 : if (cpu_pin_policy == "last") {
46 0 : cpu_id = num_cores - 1;
47 : }
48 : } else {
49 : // cpu_pin_policy is integer
50 : // Disable pinning if configured value out of range
51 0 : if (cpu_id >= num_cores)
52 0 : cpu_id = -1;
53 : }
54 :
55 0 : if (cpu_id >= 0) {
56 : cpu_set_t cpuset;
57 0 : CPU_ZERO(&cpuset);
58 0 : CPU_SET(cpu_id, &cpuset);
59 0 : LOG(ERROR, "KsyncTxQueue CPU pinning policy <" << cpu_pin_policy
60 : << ">. KsyncTxQueue pinned to CPU " << cpu_id);
61 0 : sched_setaffinity(0, sizeof(cpuset), &cpuset);
62 : } else {
63 0 : LOG(ERROR, "KsyncTxQueue CPU pinning policy <" << cpu_pin_policy
64 : << ">. KsyncTxQueuen not pinned to CPU");
65 : }
66 0 : }
67 :
68 : class KSyncTxQueueTask : public Task {
69 : public:
70 0 : KSyncTxQueueTask(TaskScheduler *scheduler, KSyncTxQueue *queue) :
71 0 : Task(scheduler->GetTaskId("Ksync::KSyncTxQueue"), 0), queue_(queue) {
72 0 : }
73 0 : ~KSyncTxQueueTask() {
74 0 : ksync_tx_queue_task_done_ = true;
75 0 : }
76 :
77 0 : bool Run() {
78 0 : queue_->Run();
79 0 : return true;
80 : }
81 0 : std::string Description() const { return "KSyncTxQueue"; }
82 :
83 : private:
84 : KSyncTxQueue *queue_;
85 : };
86 :
87 0 : KSyncTxQueue::KSyncTxQueue(KSyncSock *sock) :
88 0 : work_queue_(NULL),
89 0 : event_fd_(-1),
90 0 : cpu_pin_policy_(),
91 0 : sock_(sock),
92 0 : enqueues_(0),
93 0 : dequeues_(0),
94 0 : write_events_(0),
95 0 : read_events_(0),
96 0 : busy_time_(0),
97 0 : measure_busy_time_(false) {
98 0 : queue_len_ = 0;
99 0 : shutdown_ = false;
100 0 : ClearStats();
101 0 : }
102 :
103 0 : KSyncTxQueue::~KSyncTxQueue() {
104 0 : }
105 :
106 0 : void KSyncTxQueue::Init(bool use_work_queue,
107 : const std::string &cpu_pin_policy) {
108 0 : cpu_pin_policy_ = cpu_pin_policy;
109 0 : TaskScheduler *scheduler = TaskScheduler::GetInstance();
110 0 : if (use_work_queue) {
111 0 : assert(work_queue_ == NULL);
112 0 : work_queue_ = new WorkQueue<IoContext *>
113 0 : (scheduler->GetTaskId("Ksync::AsyncSend"), 0,
114 0 : boost::bind(&KSyncSock::SendAsyncImpl, sock_, _1));
115 0 : work_queue_->SetExitCallback
116 0 : (boost::bind(&KSyncSock::OnEmptyQueue, sock_, _1));
117 0 : return;
118 : }
119 0 : assert((event_fd_ = eventfd(0, (EFD_CLOEXEC | EFD_SEMAPHORE))) >= 0);
120 :
121 0 : KSyncTxQueueTask *task = new KSyncTxQueueTask(scheduler, this);
122 0 : scheduler->Enqueue(task);
123 : }
124 :
125 0 : void KSyncTxQueue::Shutdown() {
126 0 : shutdown_ = true;
127 0 : if (work_queue_) {
128 0 : assert(work_queue_->Length() == 0);
129 0 : work_queue_->Shutdown();
130 0 : delete work_queue_;
131 0 : work_queue_ = NULL;
132 0 : return;
133 : }
134 :
135 0 : uint64_t u = 1;
136 0 : assert(write(event_fd_, &u, sizeof(u)) == sizeof(u));
137 0 : while (queue_len_ != 0) {
138 0 : usleep(1);
139 : }
140 :
141 0 : while(ksync_tx_queue_task_done_ != true) {
142 0 : usleep(1);
143 : }
144 0 : close(event_fd_);
145 : }
146 :
147 0 : bool KSyncTxQueue::EnqueueInternal(IoContext *io_context) {
148 0 : if (work_queue_) {
149 0 : work_queue_->Enqueue(io_context);
150 0 : return true;
151 : }
152 0 : queue_.push(io_context);
153 0 : enqueues_++;
154 0 : size_t ncount = queue_len_.fetch_add(1) + 1;
155 0 : if (ncount > max_queue_len_)
156 0 : max_queue_len_ = ncount;
157 0 : if (ncount == 1) {
158 0 : uint64_t u = 1;
159 0 : int res = 0;
160 0 : while ((res = write(event_fd_, &u, sizeof(u))) < (int)sizeof(u)) {
161 0 : int ec = errno;
162 0 : if (ec != EINTR && ec != EIO) {
163 0 : LOG(ERROR, "KsyncTxQueue write failure : " << ec << " : "
164 : << strerror(ec));
165 0 : assert(0);
166 : }
167 : }
168 :
169 0 : write_events_++;
170 : }
171 0 : return true;
172 : }
173 :
174 0 : bool KSyncTxQueue::Run() {
175 0 : set_thread_affinity(cpu_pin_policy_);
176 : while (1) {
177 : while (1) {
178 0 : uint64_t u = 0;
179 0 : ssize_t num = read(event_fd_, &u, sizeof(u));
180 0 : if (num >= (int)sizeof(u)) {
181 0 : break;
182 : }
183 0 : if (errno != EINTR && errno != EIO) {
184 0 : LOG(ERROR, "KsyncTxQueue read failure : " << errno << " : "
185 : << strerror(errno));
186 0 : assert(0);
187 : }
188 0 : }
189 0 : read_events_++;
190 :
191 0 : uint64_t t1 = 0;
192 0 : if (measure_busy_time_)
193 0 : t1 = ClockMonotonicUsec();
194 0 : IoContext *io_context = NULL;
195 0 : while (queue_.try_pop(io_context)) {
196 0 : dequeues_++;
197 0 : queue_len_ -= 1;
198 0 : sock_->SendAsyncImpl(io_context);
199 : }
200 0 : sock_->OnEmptyQueue(false);
201 0 : if (shutdown_) {
202 0 : break;
203 : }
204 :
205 0 : if (t1)
206 0 : busy_time_ += (ClockMonotonicUsec() - t1);
207 0 : }
208 0 : return true;
209 : }
|