Connection.cxx 14.1 KB
Newer Older
1
/*
Max Kellermann's avatar
Max Kellermann committed
2
 * Copyright 2003-2017 The Music Player Daemon Project
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
 * http://www.musicpd.org
 *
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; either version 2 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License along
 * with this program; if not, write to the Free Software Foundation, Inc.,
 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
 */

#include "config.h"
#include "Connection.hxx"
#include "Lease.hxx"
#include "Callback.hxx"
24
#include "event/Loop.hxx"
25
#include "net/SocketDescriptor.hxx"
26
#include "util/RuntimeError.hxx"
27 28 29 30 31 32 33

extern "C" {
#include <nfsc/libnfs.h>
}

#include <utility>

34 35
#include <poll.h> /* for POLLIN, POLLOUT */

36 37
static constexpr std::chrono::steady_clock::duration NFS_MOUNT_TIMEOUT =
	std::chrono::minutes(1);
38

39
inline void
40
NfsConnection::CancellableCallback::Stat(nfs_context *ctx,
41
					 const char *path)
42
{
43 44
	assert(connection.GetEventLoop().IsInside());

45
	int result = nfs_stat_async(ctx, path, Callback, this);
46 47 48
	if (result < 0)
		throw FormatRuntimeError("nfs_stat_async() failed: %s",
					 nfs_get_error(ctx));
49 50
}

51
inline void
52
NfsConnection::CancellableCallback::OpenDirectory(nfs_context *ctx,
53
						  const char *path)
54
{
55 56
	assert(connection.GetEventLoop().IsInside());

57
	int result = nfs_opendir_async(ctx, path, Callback, this);
58 59 60
	if (result < 0)
		throw FormatRuntimeError("nfs_opendir_async() failed: %s",
					 nfs_get_error(ctx));
61 62
}

63
inline void
64
NfsConnection::CancellableCallback::Open(nfs_context *ctx,
65
					 const char *path, int flags)
66
{
67 68
	assert(connection.GetEventLoop().IsInside());

69 70
	int result = nfs_open_async(ctx, path, flags,
				    Callback, this);
71 72 73
	if (result < 0)
		throw FormatRuntimeError("nfs_open_async() failed: %s",
					 nfs_get_error(ctx));
74 75
}

76
inline void
77
NfsConnection::CancellableCallback::Stat(nfs_context *ctx,
78
					 struct nfsfh *fh)
79
{
80 81
	assert(connection.GetEventLoop().IsInside());

82
	int result = nfs_fstat_async(ctx, fh, Callback, this);
83 84 85
	if (result < 0)
		throw FormatRuntimeError("nfs_fstat_async() failed: %s",
					 nfs_get_error(ctx));
86 87
}

88
inline void
89
NfsConnection::CancellableCallback::Read(nfs_context *ctx, struct nfsfh *fh,
90
					 uint64_t offset, size_t size)
91
{
92 93
	assert(connection.GetEventLoop().IsInside());

94
	int result = nfs_pread_async(ctx, fh, offset, size, Callback, this);
95 96 97
	if (result < 0)
		throw FormatRuntimeError("nfs_pread_async() failed: %s",
					 nfs_get_error(ctx));
98 99
}

100
inline void
101
NfsConnection::CancellableCallback::CancelAndScheduleClose(struct nfsfh *fh) noexcept
102
{
103
	assert(connection.GetEventLoop().IsInside());
104 105 106 107 108 109 110 111
	assert(!open);
	assert(close_fh == nullptr);
	assert(fh != nullptr);

	close_fh = fh;
	Cancel();
}

112
inline void
113
NfsConnection::CancellableCallback::PrepareDestroyContext() noexcept
114 115 116 117 118 119 120 121 122
{
	assert(IsCancelled());

	if (close_fh != nullptr) {
		connection.InternalClose(close_fh);
		close_fh = nullptr;
	}
}

123
inline void
124
NfsConnection::CancellableCallback::Callback(int err, void *data) noexcept
125
{
126 127
	assert(connection.GetEventLoop().IsInside());

128
	if (!IsCancelled()) {
129 130
		assert(close_fh == nullptr);

131 132 133 134 135 136 137
		NfsCallback &cb = Get();

		connection.callbacks.Remove(*this);

		if (err >= 0)
			cb.OnNfsCallback((unsigned)err, data);
		else
138
			cb.OnNfsError(std::make_exception_ptr(std::runtime_error((const char *)data)));
139
	} else {
140 141 142 143
		if (open) {
			/* a nfs_open_async() call was cancelled - to
			   avoid a memory leak, close the newly
			   allocated file handle immediately */
144 145
			assert(close_fh == nullptr);

146 147 148 149
			if (err >= 0) {
				struct nfsfh *fh = (struct nfsfh *)data;
				connection.Close(fh);
			}
150 151
		} else if (close_fh != nullptr)
			connection.DeferClose(close_fh);
152

153 154 155 156 157 158 159
		connection.callbacks.Remove(*this);
	}
}

void
NfsConnection::CancellableCallback::Callback(int err,
					     gcc_unused struct nfs_context *nfs,
160 161
					     void *data,
					     void *private_data) noexcept
162 163 164 165 166 167
{
	CancellableCallback &c = *(CancellableCallback *)private_data;
	c.Callback(err, data);
}

static constexpr unsigned
168
libnfs_to_events(int i) noexcept
169 170 171 172 173 174
{
	return ((i & POLLIN) ? SocketMonitor::READ : 0) |
		((i & POLLOUT) ? SocketMonitor::WRITE : 0);
}

static constexpr int
175
events_to_libnfs(unsigned i) noexcept
176 177 178 179 180
{
	return ((i & SocketMonitor::READ) ? POLLIN : 0) |
		((i & SocketMonitor::WRITE) ? POLLOUT : 0);
}

181
NfsConnection::~NfsConnection() noexcept
182
{
183
	assert(GetEventLoop().IsInside());
184 185 186
	assert(new_leases.empty());
	assert(active_leases.empty());
	assert(callbacks.IsEmpty());
187
	assert(deferred_close.empty());
188 189

	if (context != nullptr)
190
		DestroyContext();
191 192 193
}

void
194
NfsConnection::AddLease(NfsLease &lease) noexcept
195
{
196 197 198
	assert(GetEventLoop().IsInside());

	new_leases.push_back(&lease);
199

200
	defer_new_lease.Schedule();
201 202 203
}

void
204
NfsConnection::RemoveLease(NfsLease &lease) noexcept
205
{
206
	assert(GetEventLoop().IsInside());
207 208 209 210 211

	new_leases.remove(&lease);
	active_leases.remove(&lease);
}

212 213
void
NfsConnection::Stat(const char *path, NfsCallback &callback)
214
{
215
	assert(GetEventLoop().IsInside());
216 217 218
	assert(!callbacks.Contains(callback));

	auto &c = callbacks.Add(callback, *this, false);
219 220 221
	try {
		c.Stat(context, path);
	} catch (...) {
222
		callbacks.Remove(c);
223
		throw;
224 225 226 227 228
	}

	ScheduleSocket();
}

229 230
void
NfsConnection::OpenDirectory(const char *path, NfsCallback &callback)
231
{
232
	assert(GetEventLoop().IsInside());
233 234 235
	assert(!callbacks.Contains(callback));

	auto &c = callbacks.Add(callback, *this, true);
236 237 238
	try {
		c.OpenDirectory(context, path);
	} catch (...) {
239
		callbacks.Remove(c);
240
		throw;
241 242 243 244 245 246
	}

	ScheduleSocket();
}

const struct nfsdirent *
247
NfsConnection::ReadDirectory(struct nfsdir *dir) noexcept
248
{
249 250
	assert(GetEventLoop().IsInside());

251 252 253 254
	return nfs_readdir(context, dir);
}

void
255
NfsConnection::CloseDirectory(struct nfsdir *dir) noexcept
256
{
257 258
	assert(GetEventLoop().IsInside());

259 260 261
	return nfs_closedir(context, dir);
}

262 263
void
NfsConnection::Open(const char *path, int flags, NfsCallback &callback)
264
{
265
	assert(GetEventLoop().IsInside());
266 267
	assert(!callbacks.Contains(callback));

268
	auto &c = callbacks.Add(callback, *this, true);
269 270 271
	try {
		c.Open(context, path, flags);
	} catch (...) {
272
		callbacks.Remove(c);
273
		throw;
274 275 276 277 278
	}

	ScheduleSocket();
}

279 280
void
NfsConnection::Stat(struct nfsfh *fh, NfsCallback &callback)
281
{
282
	assert(GetEventLoop().IsInside());
283 284
	assert(!callbacks.Contains(callback));

285
	auto &c = callbacks.Add(callback, *this, false);
286 287 288
	try {
		c.Stat(context, fh);
	} catch (...) {
289
		callbacks.Remove(c);
290
		throw;
291 292 293 294 295
	}

	ScheduleSocket();
}

296
void
297
NfsConnection::Read(struct nfsfh *fh, uint64_t offset, size_t size,
298
		    NfsCallback &callback)
299
{
300
	assert(GetEventLoop().IsInside());
301 302
	assert(!callbacks.Contains(callback));

303
	auto &c = callbacks.Add(callback, *this, false);
304 305 306
	try {
		c.Read(context, fh, offset, size);
	} catch (...) {
307
		callbacks.Remove(c);
308
		throw;
309 310 311 312 313 314
	}

	ScheduleSocket();
}

void
315
NfsConnection::Cancel(NfsCallback &callback) noexcept
316 317 318 319 320
{
	callbacks.Cancel(callback);
}

static void
321
DummyCallback(int, struct nfs_context *, void *, void *) noexcept
322 323 324
{
}

325
inline void
326
NfsConnection::InternalClose(struct nfsfh *fh) noexcept
327 328 329 330 331 332 333 334
{
	assert(GetEventLoop().IsInside());
	assert(context != nullptr);
	assert(fh != nullptr);

	nfs_close_async(context, fh, DummyCallback, nullptr);
}

335
void
336
NfsConnection::Close(struct nfsfh *fh) noexcept
337
{
338 339
	assert(GetEventLoop().IsInside());

340
	InternalClose(fh);
341 342 343
	ScheduleSocket();
}

344
void
345
NfsConnection::CancelAndClose(struct nfsfh *fh, NfsCallback &callback) noexcept
346 347 348 349 350
{
	CancellableCallback &cancel = callbacks.Get(callback);
	cancel.CancelAndScheduleClose(fh);
}

351
void
352
NfsConnection::DestroyContext() noexcept
353
{
354
	assert(GetEventLoop().IsInside());
355 356
	assert(context != nullptr);

357 358 359 360 361
#ifndef NDEBUG
	assert(!in_destroy);
	in_destroy = true;
#endif

362
	if (!mount_finished) {
363 364
		assert(mount_timeout_event.IsActive());
		mount_timeout_event.Cancel();
365 366
	}

367
	/* cancel pending DeferEvent that was scheduled to notify
368
	   new leases */
369
	defer_new_lease.Cancel();
370

371
	if (SocketMonitor::IsDefined())
372
		SocketMonitor::Steal();
373

374 375 376 377
	callbacks.ForEach([](CancellableCallback &c){
			c.PrepareDestroyContext();
		});

378 379 380 381
	nfs_destroy_context(context);
	context = nullptr;
}

382
inline void
383
NfsConnection::DeferClose(struct nfsfh *fh) noexcept
384
{
385
	assert(GetEventLoop().IsInside());
386 387
	assert(in_event);
	assert(in_service);
388 389
	assert(context != nullptr);
	assert(fh != nullptr);
390 391 392 393

	deferred_close.push_front(fh);
}

394
void
395
NfsConnection::ScheduleSocket() noexcept
396
{
397
	assert(GetEventLoop().IsInside());
398 399
	assert(context != nullptr);

400 401 402 403 404 405 406 407 408 409 410
	const int which_events = nfs_which_events(context);

	if (which_events == POLLOUT && SocketMonitor::IsDefined())
		/* kludge: if libnfs asks only for POLLOUT, it means
		   that it is currently waiting for the connect() to
		   finish - rpc_reconnect_requeue() may have been
		   called from inside nfs_service(); we must now
		   unregister the old socket and register the new one
		   instead */
		SocketMonitor::Steal();

411
	if (!SocketMonitor::IsDefined()) {
412 413
		SocketDescriptor _fd(nfs_get_fd(context));
		if (!_fd.IsDefined())
414 415
			return;

416
		_fd.EnableCloseOnExec();
417
		SocketMonitor::Open(_fd);
418 419
	}

420 421
	SocketMonitor::Schedule(libnfs_to_events(which_events)
				| SocketMonitor::HANGUP);
422 423
}

424
inline int
425
NfsConnection::Service(unsigned flags) noexcept
426
{
427
	assert(GetEventLoop().IsInside());
428
	assert(context != nullptr);
429

430
#ifndef NDEBUG
431 432 433 434 435
	assert(!in_event);
	in_event = true;

	assert(!in_service);
	in_service = true;
436
#endif
437 438 439

	int result = nfs_service(context, events_to_libnfs(flags));

440
#ifndef NDEBUG
441 442 443
	assert(context != nullptr);
	assert(in_service);
	in_service = false;
444
#endif
445

446 447 448 449
	return result;
}

bool
450
NfsConnection::OnSocketReady(unsigned flags) noexcept
451 452 453 454 455 456 457
{
	assert(GetEventLoop().IsInside());
	assert(deferred_close.empty());

	bool closed = false;

	const bool was_mounted = mount_finished;
458
	if (!mount_finished || (flags & SocketMonitor::HANGUP) != 0)
459 460 461
		/* until the mount is finished, the NFS client may use
		   various sockets, therefore we unregister and
		   re-register it each time */
462 463 464 465
		/* also re-register the socket if we got a HANGUP,
		   which is a sure sign that libnfs will close the
		   socket, which can lead to a race condition if
		   epoll_ctl() is called later */
466 467 468 469
		SocketMonitor::Steal();

	const int result = Service(flags);

470
	while (!deferred_close.empty()) {
471
		InternalClose(deferred_close.front());
472 473 474
		deferred_close.pop_front();
	}

475
	if (!was_mounted && mount_finished) {
476
		if (postponed_mount_error) {
477 478 479 480 481 482 483 484
			DestroyContext();
			closed = true;
			BroadcastMountError(std::move(postponed_mount_error));
		} else if (result == 0)
			BroadcastMountSuccess();
	} else if (result < 0) {
		/* the connection has failed */

485 486 487
		auto e = FormatRuntimeError("NFS connection has failed: %s",
					    nfs_get_error(context));
		BroadcastError(std::make_exception_ptr(e));
488

489 490
		DestroyContext();
		closed = true;
491
	} else if (nfs_get_fd(context) < 0) {
492
		/* this happens when rpc_reconnect_requeue() is called
493
		   after the connection broke, but autoreconnect was
494
		   disabled - nfs_service() returns 0 */
495

496 497
		const char *msg = nfs_get_error(context);
		if (msg == nullptr)
498 499
			msg = "<unknown>";
		auto e = FormatRuntimeError("NFS socket disappeared: %s", msg);
500

501
		BroadcastError(std::make_exception_ptr(e));
502

503 504
		DestroyContext();
		closed = true;
505 506
	}

507 508
	assert(context == nullptr || nfs_get_fd(context) >= 0);

509
#ifndef NDEBUG
510 511
	assert(in_event);
	in_event = false;
512
#endif
513 514 515 516 517 518 519 520 521

	if (context != nullptr)
		ScheduleSocket();

	return !closed;
}

inline void
NfsConnection::MountCallback(int status, gcc_unused nfs_context *nfs,
522
			     gcc_unused void *data) noexcept
523
{
524
	assert(GetEventLoop().IsInside());
525 526 527 528
	assert(context == nfs);

	mount_finished = true;

529 530
	assert(mount_timeout_event.IsActive() || in_destroy);
	mount_timeout_event.Cancel();
531

532
	if (status < 0) {
533 534 535
		auto e = FormatRuntimeError("nfs_mount_async() failed: %s",
					    nfs_get_error(context));
		postponed_mount_error = std::make_exception_ptr(e);
536 537 538 539 540 541
		return;
	}
}

void
NfsConnection::MountCallback(int status, nfs_context *nfs, void *data,
542
			     void *private_data) noexcept
543 544 545 546 547 548
{
	NfsConnection *c = (NfsConnection *)private_data;

	c->MountCallback(status, nfs, data);
}

549 550
inline void
NfsConnection::MountInternal()
551
{
552
	assert(GetEventLoop().IsInside());
553
	assert(context == nullptr);
554 555

	context = nfs_init_context();
556 557
	if (context == nullptr)
		throw std::runtime_error("nfs_init_context() failed");
558

559
	postponed_mount_error = std::exception_ptr();
560
	mount_finished = false;
561

562
	mount_timeout_event.Schedule(NFS_MOUNT_TIMEOUT);
563

564
#ifndef NDEBUG
565 566
	in_service = false;
	in_event = false;
567
	in_destroy = false;
568
#endif
569 570 571

	if (nfs_mount_async(context, server.c_str(), export_name.c_str(),
			    MountCallback, this) != 0) {
572 573
		auto e = FormatRuntimeError("nfs_mount_async() failed: %s",
					    nfs_get_error(context));
574 575
		nfs_destroy_context(context);
		context = nullptr;
576
		throw e;
577 578 579 580 581 582
	}

	ScheduleSocket();
}

void
583
NfsConnection::BroadcastMountSuccess() noexcept
584
{
585 586
	assert(GetEventLoop().IsInside());

587 588 589 590 591 592 593 594
	while (!new_leases.empty()) {
		auto i = new_leases.begin();
		active_leases.splice(active_leases.end(), new_leases, i);
		(*i)->OnNfsConnectionReady();
	}
}

void
595
NfsConnection::BroadcastMountError(std::exception_ptr &&e) noexcept
596
{
597 598
	assert(GetEventLoop().IsInside());

599 600 601
	while (!new_leases.empty()) {
		auto l = new_leases.front();
		new_leases.pop_front();
602
		l->OnNfsConnectionFailed(e);
603 604
	}

605
	OnNfsConnectionError(std::move(e));
606 607 608
}

void
609
NfsConnection::BroadcastError(std::exception_ptr &&e) noexcept
610
{
611 612
	assert(GetEventLoop().IsInside());

613 614 615
	while (!active_leases.empty()) {
		auto l = active_leases.front();
		active_leases.pop_front();
616
		l->OnNfsConnectionDisconnected(e);
617 618
	}

619
	BroadcastMountError(std::move(e));
620 621
}

622
void
623
NfsConnection::OnMountTimeout() noexcept
624 625 626 627 628 629 630
{
	assert(GetEventLoop().IsInside());
	assert(!mount_finished);

	mount_finished = true;
	DestroyContext();

631
	BroadcastMountError(std::make_exception_ptr(std::runtime_error("Mount timeout")));
632 633
}

634
void
635
NfsConnection::RunDeferred() noexcept
636
{
637 638
	assert(GetEventLoop().IsInside());

639
	if (context == nullptr) {
640 641 642 643
		try {
			MountInternal();
		} catch (...) {
			BroadcastMountError(std::current_exception());
644 645 646 647
			return;
		}
	}

648
	if (mount_finished)
649 650
		BroadcastMountSuccess();
}