AsyncInputStream.hxx 4.13 KB
Newer Older
1
/*
Max Kellermann's avatar
Max Kellermann committed
2
 * Copyright 2003-2017 The Music Player Daemon Project
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
 * http://www.musicpd.org
 *
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; either version 2 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License along
 * with this program; if not, write to the Free Software Foundation, Inc.,
 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
 */

#ifndef MPD_ASYNC_INPUT_STREAM_HXX
#define MPD_ASYNC_INPUT_STREAM_HXX

#include "InputStream.hxx"
24
#include "event/DeferredCall.hxx"
25
#include "util/HugeAllocator.hxx"
26 27
#include "util/CircularBuffer.hxx"

28 29
#include <exception>

30 31 32 33 34 35
/**
 * Helper class for moving asynchronous (non-blocking) InputStream
 * implementations to the I/O thread.  Data is being read into a ring
 * buffer, and that buffer is then consumed by another thread using
 * the regular #InputStream API.
 */
36
class AsyncInputStream : public InputStream {
37 38 39 40
	enum class SeekState : uint8_t {
		NONE, SCHEDULED, PENDING
	};

41 42 43
	DeferredCall deferred_resume;
	DeferredCall deferred_seek;

44 45
	HugeAllocation allocation;

46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
	CircularBuffer<uint8_t> buffer;
	const size_t resume_at;

	bool open;

	/**
	 * Is the connection currently paused?  That happens when the
	 * buffer was getting too large.  It will be unpaused when the
	 * buffer is below the threshold again.
	 */
	bool paused;

	SeekState seek_state;

	/**
	 * The #Tag object ready to be requested via
	 * InputStream::ReadTag().
	 */
	Tag *tag;

	offset_type seek_offset;

protected:
69 70
	std::exception_ptr postponed_exception;

71
public:
72 73 74 75
	/**
	 * @param _buffer a buffer allocated with HugeAllocate(); the
	 * destructor will free it using HugeFree()
	 */
76 77
	AsyncInputStream(const char *_url,
			 Mutex &_mutex, Cond &_cond,
78
			 size_t _buffer_size,
79 80 81 82 83
			 size_t _resume_at);

	virtual ~AsyncInputStream();

	/* virtual methods from InputStream */
84
	void Check() final;
85
	bool IsEOF() noexcept final;
86
	void Seek(offset_type new_offset) final;
87
	Tag *ReadTag() final;
88
	bool IsAvailable() noexcept final;
89
	size_t Read(void *ptr, size_t read_size) final;
90 91

protected:
92 93 94
	/**
	 * Pass an tag from the I/O thread to the client thread.
	 */
95
	void SetTag(Tag *_tag) noexcept;
96

97
	void ClearTag() noexcept {
98 99 100
		SetTag(nullptr);
	}

101
	void Pause() noexcept;
102

103
	bool IsPaused() const noexcept {
104 105 106
		return paused;
	}

107 108 109 110 111
	/**
	 * Declare that the underlying stream was closed.  We will
	 * continue feeding Read() calls from the buffer until it runs
	 * empty.
	 */
112
	void SetClosed() noexcept {
113 114 115
		open = false;
	}

116
	bool IsBufferEmpty() const noexcept {
117 118 119
		return buffer.IsEmpty();
	}

120
	bool IsBufferFull() const noexcept {
121 122 123
		return buffer.IsFull();
	}

124 125 126
	/**
	 * Determine how many bytes can be added to the buffer.
	 */
127
	gcc_pure
128
	size_t GetBufferSpace() const noexcept {
129 130 131
		return buffer.GetSpace();
	}

132
	CircularBuffer<uint8_t>::Range PrepareWriteBuffer() noexcept {
133 134 135
		return buffer.Write();
	}

136
	void CommitWriteBuffer(size_t nbytes) noexcept;
137

138 139 140 141
	/**
	 * Append data to the buffer.  The size must fit into the
	 * buffer; see GetBufferSpace().
	 */
142
	void AppendToBuffer(const void *data, size_t append_size) noexcept;
143

144 145 146 147
	/**
	 * Implement code here that will resume the stream after it
	 * has been paused due to full input buffer.
	 */
148 149 150 151 152 153 154 155 156
	virtual void DoResume() = 0;

	/**
	 * The actual Seek() implementation.  This virtual method will
	 * be called from within the I/O thread.  When the operation
	 * is finished, call SeekDone() to notify the caller.
	 */
	virtual void DoSeek(offset_type new_offset) = 0;

157
	bool IsSeekPending() const noexcept {
158 159 160
		return seek_state == SeekState::PENDING;
	}

161 162 163 164
	/**
	 * Call this after seeking has finished.  It will notify the
	 * client thread.
	 */
165
	void SeekDone() noexcept;
166 167 168 169

private:
	void Resume();

170
	/* for DeferredCall */
171 172
	void DeferredResume() noexcept;
	void DeferredSeek() noexcept;
173 174 175
};

#endif