Commit f8eeded5 authored by Max Kellermann's avatar Max Kellermann

input/async: pass EventLoop& to constructor

parent c3fa7e13
...@@ -22,20 +22,20 @@ ...@@ -22,20 +22,20 @@
#include "Domain.hxx" #include "Domain.hxx"
#include "tag/Tag.hxx" #include "tag/Tag.hxx"
#include "thread/Cond.hxx" #include "thread/Cond.hxx"
#include "IOThread.hxx" #include "event/Loop.hxx"
#include <stdexcept> #include <stdexcept>
#include <assert.h> #include <assert.h>
#include <string.h> #include <string.h>
AsyncInputStream::AsyncInputStream(const char *_url, AsyncInputStream::AsyncInputStream(EventLoop &event_loop, const char *_url,
Mutex &_mutex, Cond &_cond, Mutex &_mutex, Cond &_cond,
size_t _buffer_size, size_t _buffer_size,
size_t _resume_at) size_t _resume_at)
:InputStream(_url, _mutex, _cond), :InputStream(_url, _mutex, _cond),
deferred_resume(io_thread_get(), BIND_THIS_METHOD(DeferredResume)), deferred_resume(event_loop, BIND_THIS_METHOD(DeferredResume)),
deferred_seek(io_thread_get(), BIND_THIS_METHOD(DeferredSeek)), deferred_seek(event_loop, BIND_THIS_METHOD(DeferredSeek)),
allocation(_buffer_size), allocation(_buffer_size),
buffer((uint8_t *)allocation.get(), _buffer_size), buffer((uint8_t *)allocation.get(), _buffer_size),
resume_at(_resume_at), resume_at(_resume_at),
...@@ -61,7 +61,7 @@ AsyncInputStream::SetTag(Tag *_tag) ...@@ -61,7 +61,7 @@ AsyncInputStream::SetTag(Tag *_tag)
void void
AsyncInputStream::Pause() AsyncInputStream::Pause()
{ {
assert(io_thread_inside()); assert(GetEventLoop().IsInside());
paused = true; paused = true;
} }
...@@ -69,7 +69,7 @@ AsyncInputStream::Pause() ...@@ -69,7 +69,7 @@ AsyncInputStream::Pause()
inline void inline void
AsyncInputStream::Resume() AsyncInputStream::Resume()
{ {
assert(io_thread_inside()); assert(GetEventLoop().IsInside());
if (paused) { if (paused) {
paused = false; paused = false;
...@@ -143,7 +143,7 @@ AsyncInputStream::Seek(offset_type new_offset) ...@@ -143,7 +143,7 @@ AsyncInputStream::Seek(offset_type new_offset)
void void
AsyncInputStream::SeekDone() AsyncInputStream::SeekDone()
{ {
assert(io_thread_inside()); assert(GetEventLoop().IsInside());
assert(IsSeekPending()); assert(IsSeekPending());
/* we may have reached end-of-file previously, and the /* we may have reached end-of-file previously, and the
...@@ -174,7 +174,7 @@ AsyncInputStream::IsAvailable() ...@@ -174,7 +174,7 @@ AsyncInputStream::IsAvailable()
size_t size_t
AsyncInputStream::Read(void *ptr, size_t read_size) AsyncInputStream::Read(void *ptr, size_t read_size)
{ {
assert(!io_thread_inside()); assert(!GetEventLoop().IsInside());
/* wait for data */ /* wait for data */
CircularBuffer<uint8_t>::Range r; CircularBuffer<uint8_t>::Range r;
......
...@@ -73,7 +73,7 @@ public: ...@@ -73,7 +73,7 @@ public:
* @param _buffer a buffer allocated with HugeAllocate(); the * @param _buffer a buffer allocated with HugeAllocate(); the
* destructor will free it using HugeFree() * destructor will free it using HugeFree()
*/ */
AsyncInputStream(const char *_url, AsyncInputStream(EventLoop &event_loop, const char *_url,
Mutex &_mutex, Cond &_cond, Mutex &_mutex, Cond &_cond,
size_t _buffer_size, size_t _buffer_size,
size_t _resume_at); size_t _resume_at);
......
...@@ -76,7 +76,7 @@ public: ...@@ -76,7 +76,7 @@ public:
const char *_uri, Mutex &_mutex, Cond &_cond, const char *_uri, Mutex &_mutex, Cond &_cond,
const char *_device, const char *_device,
snd_pcm_t *_handle, int _frame_size) snd_pcm_t *_handle, int _frame_size)
:AsyncInputStream(_uri, _mutex, _cond, :AsyncInputStream(loop, _uri, _mutex, _cond,
ALSA_MAX_BUFFERED, ALSA_RESUME_AT), ALSA_MAX_BUFFERED, ALSA_RESUME_AT),
MultiSocketMonitor(loop), MultiSocketMonitor(loop),
DeferredMonitor(loop), DeferredMonitor(loop),
......
...@@ -74,8 +74,9 @@ struct CurlInputStream final : public AsyncInputStream, CurlResponseHandler { ...@@ -74,8 +74,9 @@ struct CurlInputStream final : public AsyncInputStream, CurlResponseHandler {
/** parser for icy-metadata */ /** parser for icy-metadata */
IcyInputStream *icy; IcyInputStream *icy;
CurlInputStream(const char *_url, Mutex &_mutex, Cond &_cond) CurlInputStream(EventLoop &event_loop, const char *_url,
:AsyncInputStream(_url, _mutex, _cond, Mutex &_mutex, Cond &_cond)
:AsyncInputStream(event_loop, _url, _mutex, _cond,
CURL_MAX_BUFFERED, CURL_MAX_BUFFERED,
CURL_RESUME_AT), CURL_RESUME_AT),
icy(new IcyInputStream(this)) { icy(new IcyInputStream(this)) {
...@@ -420,7 +421,8 @@ CurlInputStream::DoSeek(offset_type new_offset) ...@@ -420,7 +421,8 @@ CurlInputStream::DoSeek(offset_type new_offset)
inline InputStream * inline InputStream *
CurlInputStream::Open(const char *url, Mutex &mutex, Cond &cond) CurlInputStream::Open(const char *url, Mutex &mutex, Cond &cond)
{ {
CurlInputStream *c = new CurlInputStream(url, mutex, cond); CurlInputStream *c = new CurlInputStream(curl_global->GetEventLoop(),
url, mutex, cond);
try { try {
BlockingCall(c->GetEventLoop(), [c](){ BlockingCall(c->GetEventLoop(), [c](){
......
...@@ -48,7 +48,7 @@ class NfsInputStream final : public AsyncInputStream, NfsFileReader { ...@@ -48,7 +48,7 @@ class NfsInputStream final : public AsyncInputStream, NfsFileReader {
public: public:
NfsInputStream(const char *_uri, Mutex &_mutex, Cond &_cond) NfsInputStream(const char *_uri, Mutex &_mutex, Cond &_cond)
:AsyncInputStream(_uri, _mutex, _cond, :AsyncInputStream(io_thread_get(), _uri, _mutex, _cond,
NFS_MAX_BUFFERED, NFS_MAX_BUFFERED,
NFS_RESUME_AT), NFS_RESUME_AT),
NfsFileReader(io_thread_get()), NfsFileReader(io_thread_get()),
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment