AsyncInputStream.hxx 4.14 KB
Newer Older
1
/*
Max Kellermann's avatar
Max Kellermann committed
2
 * Copyright 2003-2017 The Music Player Daemon Project
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
 * http://www.musicpd.org
 *
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; either version 2 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License along
 * with this program; if not, write to the Free Software Foundation, Inc.,
 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
 */

#ifndef MPD_ASYNC_INPUT_STREAM_HXX
#define MPD_ASYNC_INPUT_STREAM_HXX

#include "InputStream.hxx"
24
#include "event/DeferEvent.hxx"
25
#include "util/HugeAllocator.hxx"
26 27
#include "util/CircularBuffer.hxx"

28 29
#include <exception>

30 31 32 33 34 35
/**
 * Helper class for moving asynchronous (non-blocking) InputStream
 * implementations to the I/O thread.  Data is being read into a ring
 * buffer, and that buffer is then consumed by another thread using
 * the regular #InputStream API.
 */
36
class AsyncInputStream : public InputStream {
37 38 39 40
	enum class SeekState : uint8_t {
		NONE, SCHEDULED, PENDING
	};

41 42
	DeferEvent deferred_resume;
	DeferEvent deferred_seek;
43

44
	HugeArray<uint8_t> allocation;
45

46 47 48
	CircularBuffer<uint8_t> buffer;
	const size_t resume_at;

49
	bool open = true;
50 51 52 53 54 55

	/**
	 * Is the connection currently paused?  That happens when the
	 * buffer was getting too large.  It will be unpaused when the
	 * buffer is below the threshold again.
	 */
56
	bool paused = false;
57

58
	SeekState seek_state = SeekState::NONE;
59 60 61 62 63

	/**
	 * The #Tag object ready to be requested via
	 * InputStream::ReadTag().
	 */
64
	std::unique_ptr<Tag> tag;
65 66 67 68

	offset_type seek_offset;

protected:
69 70
	std::exception_ptr postponed_exception;

71
public:
72
	AsyncInputStream(EventLoop &event_loop, const char *_url,
73
			 Mutex &_mutex,
74
			 size_t _buffer_size,
75 76 77 78
			 size_t _resume_at);

	virtual ~AsyncInputStream();

79 80 81 82
	EventLoop &GetEventLoop() {
		return deferred_resume.GetEventLoop();
	}

83
	/* virtual methods from InputStream */
84
	void Check() final;
85
	bool IsEOF() noexcept final;
86
	void Seek(offset_type new_offset) final;
87
	std::unique_ptr<Tag> ReadTag() final;
88
	bool IsAvailable() noexcept final;
89
	size_t Read(void *ptr, size_t read_size) final;
90 91

protected:
92 93 94
	/**
	 * Pass an tag from the I/O thread to the client thread.
	 */
95 96
	void SetTag(std::unique_ptr<Tag> _tag) noexcept;
	void ClearTag() noexcept;
97

98
	void Pause() noexcept;
99

100
	bool IsPaused() const noexcept {
101 102 103
		return paused;
	}

104 105 106 107 108
	/**
	 * Declare that the underlying stream was closed.  We will
	 * continue feeding Read() calls from the buffer until it runs
	 * empty.
	 */
109
	void SetClosed() noexcept {
110 111 112
		open = false;
	}

113
	bool IsBufferEmpty() const noexcept {
114
		return buffer.empty();
115 116
	}

117
	bool IsBufferFull() const noexcept {
118 119 120
		return buffer.IsFull();
	}

121 122 123
	/**
	 * Determine how many bytes can be added to the buffer.
	 */
124
	gcc_pure
125
	size_t GetBufferSpace() const noexcept {
126 127 128
		return buffer.GetSpace();
	}

129
	CircularBuffer<uint8_t>::Range PrepareWriteBuffer() noexcept {
130 131 132
		return buffer.Write();
	}

133
	void CommitWriteBuffer(size_t nbytes) noexcept;
134

135 136 137 138
	/**
	 * Append data to the buffer.  The size must fit into the
	 * buffer; see GetBufferSpace().
	 */
139
	void AppendToBuffer(const void *data, size_t append_size) noexcept;
140

141 142 143 144
	/**
	 * Implement code here that will resume the stream after it
	 * has been paused due to full input buffer.
	 */
145 146 147 148 149 150 151 152 153
	virtual void DoResume() = 0;

	/**
	 * The actual Seek() implementation.  This virtual method will
	 * be called from within the I/O thread.  When the operation
	 * is finished, call SeekDone() to notify the caller.
	 */
	virtual void DoSeek(offset_type new_offset) = 0;

154
	bool IsSeekPending() const noexcept {
155 156 157
		return seek_state == SeekState::PENDING;
	}

158 159 160 161
	/**
	 * Call this after seeking has finished.  It will notify the
	 * client thread.
	 */
162
	void SeekDone() noexcept;
163 164 165 166

private:
	void Resume();

167
	/* for DeferEvent */
168 169
	void DeferredResume() noexcept;
	void DeferredSeek() noexcept;
170 171 172
};

#endif