Loop.cxx 5.43 KB
Newer Older
1
/*
2
 * Copyright 2003-2018 The Music Player Daemon Project
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
 * http://www.musicpd.org
 *
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; either version 2 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License along
 * with this program; if not, write to the Free Software Foundation, Inc.,
 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
 */

#include "Loop.hxx"
21 22
#include "SocketMonitor.hxx"
#include "IdleMonitor.hxx"
23
#include "DeferEvent.hxx"
24
#include "util/ScopeExit.hxx"
25

26
EventLoop::EventLoop(ThreadId _thread)
27
	:SocketMonitor(*this),
28 29 30 31 32 33 34
	 /* if this instance is hosted by an EventThread (no ThreadId
	    known yet) then we're not yet alive until the thread is
	    started; for the main EventLoop instance, we assume it's
	    already alive, because nobody but EventThread will call
	    SetAlive() */
	 alive(!_thread.IsNull()),
	 quit(false),
35
	 thread(_thread)
36
{
37
	SocketMonitor::Open(SocketDescriptor(wake_fd.Get()));
38 39
}

40
EventLoop::~EventLoop() noexcept
41 42 43 44 45 46
{
	assert(idle.empty());
	assert(timers.empty());
}

void
47
EventLoop::Break() noexcept
48
{
49
	if (quit.exchange(true))
50 51
		return;

52
	wake_fd.Write();
53 54
}

55
bool
56
EventLoop::Abandon(int _fd, SocketMonitor &m)  noexcept
57
{
58
	assert(IsInside());
59

60 61
	poll_result.Clear(&m);
	return poll_group.Abandon(_fd);
62
}
63

64
bool
65
EventLoop::RemoveFD(int _fd, SocketMonitor &m) noexcept
66
{
67
	assert(IsInside());
68

69 70
	poll_result.Clear(&m);
	return poll_group.Remove(_fd);
71 72 73
}

void
74
EventLoop::AddIdle(IdleMonitor &i) noexcept
75
{
76
	assert(IsInside());
77

78
	idle.push_back(i);
79
	again = true;
80 81 82
}

void
83
EventLoop::RemoveIdle(IdleMonitor &i) noexcept
84
{
85
	assert(IsInside());
86

87
	idle.erase(idle.iterator_to(i));
88 89 90
}

void
91
EventLoop::AddTimer(TimerEvent &t, std::chrono::steady_clock::duration d) noexcept
92
{
93
	assert(IsInside());
94

95
	t.due = now + d;
96
	timers.insert(t);
97
	again = true;
98 99 100
}

void
101
EventLoop::CancelTimer(TimerEvent &t) noexcept
102
{
103
	assert(IsInside());
104

105
	timers.erase(timers.iterator_to(t));
106 107
}

108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130
inline std::chrono::steady_clock::duration
EventLoop::HandleTimers() noexcept
{
	std::chrono::steady_clock::duration timeout;

	while (!quit) {
		auto i = timers.begin();
		if (i == timers.end())
			break;

		TimerEvent &t = *i;
		timeout = t.due - now;
		if (timeout > timeout.zero())
			return timeout;

		timers.erase(i);

		t.Run();
	}

	return std::chrono::steady_clock::duration(-1);
}

131 132 133 134 135 136 137 138 139 140 141 142 143
/**
 * Convert the given timeout specification to a milliseconds integer,
 * to be used by functions like poll() and epoll_wait().  Any negative
 * value (= never times out) is translated to the magic value -1.
 */
static constexpr int
ExportTimeoutMS(std::chrono::steady_clock::duration timeout)
{
	return timeout >= timeout.zero()
		? int(std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count())
		: -1;
}

144
void
145
EventLoop::Run() noexcept
146
{
147 148
	if (thread.IsNull())
		thread = ThreadId::GetCurrent();
149

150
	assert(IsInside());
151
	assert(!quit);
152
	assert(alive);
153
	assert(busy);
154

155
	SocketMonitor::Schedule(SocketMonitor::READ);
156 157 158
	AtScopeExit(this) {
		SocketMonitor::Cancel();
	};
159

160
	do {
161
		now = std::chrono::steady_clock::now();
162
		again = false;
163 164 165

		/* invoke timers */

166 167 168
		const auto timeout = HandleTimers();
		if (quit)
			break;
169 170 171 172

		/* invoke idle */

		while (!idle.empty()) {
173
			IdleMonitor &m = idle.front();
174 175 176 177 178 179 180
			idle.pop_front();
			m.Run();

			if (quit)
				return;
		}

181
		/* try to handle DeferEvents without WakeFD
182
		   overhead */
183 184 185 186 187 188 189 190 191 192 193
		{
			const std::lock_guard<Mutex> lock(mutex);
			HandleDeferred();
			busy = false;

			if (again)
				/* re-evaluate timers because one of
				   the IdleMonitors may have added a
				   new timeout */
				continue;
		}
194 195 196

		/* wait for new event */

197
		poll_group.ReadEvents(poll_result, ExportTimeoutMS(timeout));
198

199
		now = std::chrono::steady_clock::now();
200

201 202 203 204
		{
			const std::lock_guard<Mutex> lock(mutex);
			busy = true;
		}
205

206
		/* invoke sockets */
207
		for (size_t i = 0; i < poll_result.GetSize(); ++i) {
208 209
			auto events = poll_result.GetEvents(i);
			if (events != 0) {
210 211
				if (quit)
					break;
212 213 214

				auto m = (SocketMonitor *)poll_result.GetObject(i);
				m->Dispatch(events);
215 216 217
			}
		}

218
		poll_result.Reset();
219

220
	} while (!quit);
221

222
#ifndef NDEBUG
223
	assert(busy);
224
	assert(thread.IsInside());
225
#endif
226 227
}

228
void
229
EventLoop::AddDeferred(DeferEvent &d) noexcept
230
{
231 232 233 234
	bool must_wake;

	{
		const std::lock_guard<Mutex> lock(mutex);
235
		if (d.IsPending())
236
			return;
237

238
		/* we don't need to wake up the EventLoop if another
239
		   DeferEvent has already done it */
240
		must_wake = !busy && deferred.empty();
241

242
		deferred.push_back(d);
243 244
		again = true;
	}
245

246 247
	if (must_wake)
		wake_fd.Write();
248 249 250
}

void
251
EventLoop::RemoveDeferred(DeferEvent &d) noexcept
252
{
253
	const std::lock_guard<Mutex> protect(mutex);
254

255
	if (d.IsPending())
256
		deferred.erase(deferred.iterator_to(d));
257 258
}

259
void
260
EventLoop::HandleDeferred() noexcept
261
{
262
	while (!deferred.empty() && !quit) {
263
		auto &m = deferred.front();
264
		assert(m.IsPending());
265 266 267

		deferred.pop_front();

268
		const ScopeUnlock unlock(mutex);
269 270
		m.RunDeferred();
	}
271
}
272

273
bool
274
EventLoop::OnSocketReady(gcc_unused unsigned flags) noexcept
275
{
276 277
	assert(IsInside());

278 279
	wake_fd.Read();

280
	const std::lock_guard<Mutex> lock(mutex);
281
	HandleDeferred();
282 283 284

	return true;
}