Commit 555a4d73 authored by Max Kellermann's avatar Max Kellermann

input/buffering: pass offset to Read() and eliminate Seek()

Another step towards supporting multiple readers.
parent 813567bf
...@@ -46,11 +46,10 @@ BufferedInputStream::Check() ...@@ -46,11 +46,10 @@ BufferedInputStream::Check()
} }
void void
BufferedInputStream::Seek(std::unique_lock<Mutex> &lock, BufferedInputStream::Seek(std::unique_lock<Mutex> &,
offset_type new_offset) offset_type new_offset)
{ {
BufferingInputStream::Seek(lock, new_offset); offset = new_offset;
InputStream::offset = new_offset;
} }
bool bool
...@@ -62,14 +61,14 @@ BufferedInputStream::IsEOF() noexcept ...@@ -62,14 +61,14 @@ BufferedInputStream::IsEOF() noexcept
bool bool
BufferedInputStream::IsAvailable() noexcept BufferedInputStream::IsAvailable() noexcept
{ {
return BufferingInputStream::IsAvailable(); return BufferingInputStream::IsAvailable(offset);
} }
size_t size_t
BufferedInputStream::Read(std::unique_lock<Mutex> &lock, BufferedInputStream::Read(std::unique_lock<Mutex> &lock,
void *ptr, size_t s) void *ptr, size_t s)
{ {
size_t nbytes = BufferingInputStream::Read(lock, ptr, s); size_t nbytes = BufferingInputStream::Read(lock, offset, ptr, s);
InputStream::offset += nbytes; InputStream::offset += nbytes;
return nbytes; return nbytes;
} }
...@@ -55,42 +55,25 @@ BufferingInputStream::Check() ...@@ -55,42 +55,25 @@ BufferingInputStream::Check()
input->Check(); input->Check();
} }
void bool
BufferingInputStream::Seek(std::unique_lock<Mutex> &lock, size_t new_offset) BufferingInputStream::IsAvailable(size_t offset) noexcept
{ {
if (new_offset >= size()) { if (offset >= size())
offset = new_offset; return true;
return;
}
auto r = buffer.Read(new_offset);
if (r.HasData()) {
/* nice, we already have some data at the desired
offset and this method call is a no-op */
offset = new_offset;
return;
}
seek_offset = new_offset;
seek = true;
wake_cond.notify_one();
client_cond.wait(lock, [this]{ return !seek; }); if (buffer.Read(offset).HasData())
return true;
if (seek_error) /* if no data is available now, make sure it will be soon */
std::rethrow_exception(std::exchange(seek_error, {})); if (want_offset == INVALID_OFFSET)
want_offset = offset;
offset = new_offset; return false;
}
bool
BufferingInputStream::IsAvailable() noexcept
{
return offset == size() || buffer.Read(offset).HasData();
} }
size_t size_t
BufferingInputStream::Read(std::unique_lock<Mutex> &lock, void *ptr, size_t s) BufferingInputStream::Read(std::unique_lock<Mutex> &lock, size_t offset,
void *ptr, size_t s)
{ {
if (offset >= size()) if (offset >= size())
return 0; return 0;
...@@ -101,13 +84,15 @@ BufferingInputStream::Read(std::unique_lock<Mutex> &lock, void *ptr, size_t s) ...@@ -101,13 +84,15 @@ BufferingInputStream::Read(std::unique_lock<Mutex> &lock, void *ptr, size_t s)
/* yay, we have some data */ /* yay, we have some data */
size_t nbytes = std::min(s, r.defined_buffer.size); size_t nbytes = std::min(s, r.defined_buffer.size);
memcpy(ptr, r.defined_buffer.data, nbytes); memcpy(ptr, r.defined_buffer.data, nbytes);
offset += nbytes;
return nbytes; return nbytes;
} }
if (error) if (error)
std::rethrow_exception(error); std::rethrow_exception(error);
if (want_offset == INVALID_OFFSET)
want_offset = offset;
client_cond.wait(lock); client_cond.wait(lock);
} }
} }
...@@ -132,27 +117,13 @@ inline void ...@@ -132,27 +117,13 @@ inline void
BufferingInputStream::RunThreadLocked(std::unique_lock<Mutex> &lock) BufferingInputStream::RunThreadLocked(std::unique_lock<Mutex> &lock)
{ {
while (!stop) { while (!stop) {
if (seek) { if (want_offset != INVALID_OFFSET) {
try { assert(want_offset < size());
input->Seek(lock, seek_offset);
} catch (...) {
seek_error = std::current_exception();
}
seek = false; const size_t seek_offset = want_offset;
client_cond.notify_all(); want_offset = INVALID_OFFSET;
} else if (offset != input->GetOffset() && !IsAvailable()) { if (!buffer.Read(seek_offset).HasData())
/* a past Seek() call was a no-op because data input->Seek(lock, seek_offset);
was already available at that position, but
now we've reached a new position where
there is no more data in the buffer, and
our input is reading somewhere else (maybe
stuck at the end of the file); to find a
way out, we now seek our input to our
reading position to be able to fill our
buffer */
input->Seek(lock, offset);
} else if (input->IsEOF()) { } else if (input->IsEOF()) {
/* our input has reached its end: prepare /* our input has reached its end: prepare
reading the first remaining hole */ reading the first remaining hole */
...@@ -170,27 +141,13 @@ BufferingInputStream::RunThreadLocked(std::unique_lock<Mutex> &lock) ...@@ -170,27 +141,13 @@ BufferingInputStream::RunThreadLocked(std::unique_lock<Mutex> &lock)
auto w = buffer.Write(read_offset); auto w = buffer.Write(read_offset);
if (w.empty()) { if (w.empty()) {
if (IsAvailable()) { size_t new_offset = FindFirstHole();
/* we still have enough data if (new_offset == INVALID_OFFSET)
for the next Read() - seek /* the file has been read
to the first hole */ completely */
break;
size_t new_offset = FindFirstHole();
if (new_offset == INVALID_OFFSET) input->Seek(lock, new_offset);
/* the file has been
read completely */
break;
input->Seek(lock, new_offset);
} else {
/* we need more data at our
current position, because
the next Read() will stall
- seek our input to our
offset to prepare filling
the buffer from there */
input->Seek(lock, offset);
}
continue; continue;
} }
......
...@@ -55,11 +55,9 @@ class BufferingInputStream : InputStreamHandler { ...@@ -55,11 +55,9 @@ class BufferingInputStream : InputStreamHandler {
SparseBuffer<uint8_t> buffer; SparseBuffer<uint8_t> buffer;
bool stop = false, seek = false; bool stop = false;
size_t offset = 0; size_t want_offset = INVALID_OFFSET;
size_t seek_offset;
std::exception_ptr error, seek_error; std::exception_ptr error, seek_error;
...@@ -78,9 +76,9 @@ public: ...@@ -78,9 +76,9 @@ public:
} }
void Check(); void Check();
void Seek(std::unique_lock<Mutex> &lock, size_t new_offset); bool IsAvailable(size_t offset) noexcept;
bool IsAvailable() noexcept; size_t Read(std::unique_lock<Mutex> &lock, size_t offset,
size_t Read(std::unique_lock<Mutex> &lock, void *ptr, size_t size); void *ptr, size_t size);
protected: protected:
virtual void OnBufferAvailable() noexcept {} virtual void OnBufferAvailable() noexcept {}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment