Loop.cxx 4.93 KB
Newer Older
1
/*
Max Kellermann's avatar
Max Kellermann committed
2
 * Copyright 2003-2017 The Music Player Daemon Project
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
 * http://www.musicpd.org
 *
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; either version 2 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License along
 * with this program; if not, write to the Free Software Foundation, Inc.,
 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
 */

#include "config.h"
#include "Loop.hxx"
22 23
#include "SocketMonitor.hxx"
#include "IdleMonitor.hxx"
24
#include "DeferEvent.hxx"
25
#include "util/ScopeExit.hxx"
26

27 28
EventLoop::EventLoop(ThreadId _thread)
	:SocketMonitor(*this), thread(_thread)
29
{
30
	SocketMonitor::Open(SocketDescriptor(wake_fd.Get()));
31 32 33 34 35 36 37 38 39 40 41
}

EventLoop::~EventLoop()
{
	assert(idle.empty());
	assert(timers.empty());
}

void
EventLoop::Break()
{
42 43 44
	if (quit)
		return;

45 46
	quit = true;
	wake_fd.Write();
47 48
}

49 50
bool
EventLoop::Abandon(int _fd, SocketMonitor &m)
51
{
52
	assert(IsInside());
53

54 55
	poll_result.Clear(&m);
	return poll_group.Abandon(_fd);
56
}
57

58 59 60
bool
EventLoop::RemoveFD(int _fd, SocketMonitor &m)
{
61
	assert(IsInside());
62

63 64
	poll_result.Clear(&m);
	return poll_group.Remove(_fd);
65 66 67 68 69
}

void
EventLoop::AddIdle(IdleMonitor &i)
{
70
	assert(IsInside());
71

72
	idle.push_back(i);
73
	again = true;
74 75 76 77 78
}

void
EventLoop::RemoveIdle(IdleMonitor &i)
{
79
	assert(IsInside());
80

81
	idle.erase(idle.iterator_to(i));
82 83 84
}

void
85
EventLoop::AddTimer(TimerEvent &t, std::chrono::steady_clock::duration d)
86
{
87
	assert(IsInside());
88

89
	t.due = now + d;
90
	timers.insert(t);
91
	again = true;
92 93 94
}

void
95
EventLoop::CancelTimer(TimerEvent &t)
96
{
97
	assert(IsInside());
98

99
	timers.erase(timers.iterator_to(t));
100 101
}

102 103 104 105 106 107 108 109 110 111 112 113 114
/**
 * Convert the given timeout specification to a milliseconds integer,
 * to be used by functions like poll() and epoll_wait().  Any negative
 * value (= never times out) is translated to the magic value -1.
 */
static constexpr int
ExportTimeoutMS(std::chrono::steady_clock::duration timeout)
{
	return timeout >= timeout.zero()
		? int(std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count())
		: -1;
}

115 116 117
void
EventLoop::Run()
{
118 119
	if (thread.IsNull())
		thread = ThreadId::GetCurrent();
120

121
	assert(IsInside());
122
	assert(!quit);
123
	assert(busy);
124

125 126 127
	SocketMonitor::Schedule(SocketMonitor::READ);
	AtScopeExit(this) { SocketMonitor::Cancel(); };

128
	do {
129
		now = std::chrono::steady_clock::now();
130
		again = false;
131 132 133

		/* invoke timers */

134
		std::chrono::steady_clock::duration timeout;
135 136 137
		while (true) {
			auto i = timers.begin();
			if (i == timers.end()) {
138
				timeout = std::chrono::steady_clock::duration(-1);
139 140 141
				break;
			}

142 143
			TimerEvent &t = *i;
			timeout = t.due - now;
144
			if (timeout > timeout.zero())
145 146 147 148
				break;

			timers.erase(i);

149
			t.Run();
150 151 152 153 154 155 156 157

			if (quit)
				return;
		}

		/* invoke idle */

		while (!idle.empty()) {
158
			IdleMonitor &m = idle.front();
159 160 161 162 163 164 165
			idle.pop_front();
			m.Run();

			if (quit)
				return;
		}

166
		/* try to handle DeferEvents without WakeFD
167
		   overhead */
168 169 170 171 172 173 174 175 176 177 178
		{
			const std::lock_guard<Mutex> lock(mutex);
			HandleDeferred();
			busy = false;

			if (again)
				/* re-evaluate timers because one of
				   the IdleMonitors may have added a
				   new timeout */
				continue;
		}
179 180 181

		/* wait for new event */

182
		poll_group.ReadEvents(poll_result, ExportTimeoutMS(timeout));
183

184
		now = std::chrono::steady_clock::now();
185

186 187 188 189
		{
			const std::lock_guard<Mutex> lock(mutex);
			busy = true;
		}
190

191
		/* invoke sockets */
192 193 194
		for (int i = 0; i < poll_result.GetSize(); ++i) {
			auto events = poll_result.GetEvents(i);
			if (events != 0) {
195 196
				if (quit)
					break;
197 198 199

				auto m = (SocketMonitor *)poll_result.GetObject(i);
				m->Dispatch(events);
200 201 202
			}
		}

203
		poll_result.Reset();
204

205
	} while (!quit);
206

207
#ifndef NDEBUG
208
	assert(busy);
209
	assert(thread.IsInside());
210
#endif
211 212
}

213
void
214
EventLoop::AddDeferred(DeferEvent &d) noexcept
215
{
216 217 218 219
	bool must_wake;

	{
		const std::lock_guard<Mutex> lock(mutex);
220
		if (d.IsPending())
221
			return;
222

223
		/* we don't need to wake up the EventLoop if another
224
		   DeferEvent has already done it */
225
		must_wake = !busy && deferred.empty();
226

227
		deferred.push_back(d);
228 229
		again = true;
	}
230

231 232
	if (must_wake)
		wake_fd.Write();
233 234 235
}

void
236
EventLoop::RemoveDeferred(DeferEvent &d) noexcept
237
{
238
	const std::lock_guard<Mutex> protect(mutex);
239

240
	if (d.IsPending())
241
		deferred.erase(deferred.iterator_to(d));
242 243
}

244 245
void
EventLoop::HandleDeferred()
246
{
247
	while (!deferred.empty() && !quit) {
248
		auto &m = deferred.front();
249
		assert(m.IsPending());
250 251 252

		deferred.pop_front();

253
		const ScopeUnlock unlock(mutex);
254 255
		m.RunDeferred();
	}
256
}
257

258
bool
259
EventLoop::OnSocketReady(gcc_unused unsigned flags) noexcept
260
{
261 262
	assert(IsInside());

263 264
	wake_fd.Read();

265
	const std::lock_guard<Mutex> lock(mutex);
266
	HandleDeferred();
267 268 269

	return true;
}