Commit 18c4ef09 authored by Max Kellermann's avatar Max Kellermann

input/thread: use class HugeArray instead of the low-level function HugeAllocate()

parent bc93c7a1
...@@ -20,8 +20,6 @@ ...@@ -20,8 +20,6 @@
#include "config.h" #include "config.h"
#include "ThreadInputStream.hxx" #include "ThreadInputStream.hxx"
#include "thread/Name.hxx" #include "thread/Name.hxx"
#include "util/CircularBuffer.hxx"
#include "util/HugeAllocator.hxx"
#include <assert.h> #include <assert.h>
#include <string.h> #include <string.h>
...@@ -33,8 +31,10 @@ ThreadInputStream::ThreadInputStream(const char *_plugin, ...@@ -33,8 +31,10 @@ ThreadInputStream::ThreadInputStream(const char *_plugin,
:InputStream(_uri, _mutex, _cond), :InputStream(_uri, _mutex, _cond),
plugin(_plugin), plugin(_plugin),
thread(BIND_THIS_METHOD(ThreadFunc)), thread(BIND_THIS_METHOD(ThreadFunc)),
buffer_size(_buffer_size) allocation(_buffer_size),
buffer(&allocation.front(), allocation.size())
{ {
allocation.ForkCow(false);
} }
ThreadInputStream::~ThreadInputStream() ThreadInputStream::~ThreadInputStream()
...@@ -49,25 +49,12 @@ ThreadInputStream::~ThreadInputStream() ...@@ -49,25 +49,12 @@ ThreadInputStream::~ThreadInputStream()
thread.Join(); thread.Join();
if (buffer != nullptr) { buffer.Clear();
buffer->Clear();
HugeFree(buffer->Write().data, buffer_size);
delete buffer;
}
} }
void void
ThreadInputStream::Start() ThreadInputStream::Start()
{ {
assert(buffer == nullptr);
auto allocation = HugeAllocate(buffer_size);
assert(allocation != nullptr);
HugeForkCow(allocation.data, allocation.size, false);
buffer = new CircularBuffer<uint8_t>((uint8_t *)allocation.data,
allocation.size);
thread.Start(); thread.Start();
} }
...@@ -92,7 +79,7 @@ ThreadInputStream::ThreadFunc() ...@@ -92,7 +79,7 @@ ThreadInputStream::ThreadFunc()
while (!close) { while (!close) {
assert(!postponed_exception); assert(!postponed_exception);
auto w = buffer->Write(); auto w = buffer.Write();
if (w.IsEmpty()) { if (w.IsEmpty()) {
wake_cond.wait(mutex); wake_cond.wait(mutex);
} else { } else {
...@@ -114,7 +101,7 @@ ThreadInputStream::ThreadFunc() ...@@ -114,7 +101,7 @@ ThreadInputStream::ThreadFunc()
break; break;
} }
buffer->Append(nbytes); buffer.Append(nbytes);
} }
} }
...@@ -135,7 +122,7 @@ ThreadInputStream::IsAvailable() noexcept ...@@ -135,7 +122,7 @@ ThreadInputStream::IsAvailable() noexcept
{ {
assert(!thread.IsInside()); assert(!thread.IsInside());
return !buffer->IsEmpty() || eof || postponed_exception; return !buffer.IsEmpty() || eof || postponed_exception;
} }
inline size_t inline size_t
...@@ -147,11 +134,11 @@ ThreadInputStream::Read(void *ptr, size_t read_size) ...@@ -147,11 +134,11 @@ ThreadInputStream::Read(void *ptr, size_t read_size)
if (postponed_exception) if (postponed_exception)
std::rethrow_exception(postponed_exception); std::rethrow_exception(postponed_exception);
auto r = buffer->Read(); auto r = buffer.Read();
if (!r.IsEmpty()) { if (!r.IsEmpty()) {
size_t nbytes = std::min(read_size, r.size); size_t nbytes = std::min(read_size, r.size);
memcpy(ptr, r.data, nbytes); memcpy(ptr, r.data, nbytes);
buffer->Consume(nbytes); buffer.Consume(nbytes);
wake_cond.broadcast(); wake_cond.broadcast();
offset += nbytes; offset += nbytes;
return nbytes; return nbytes;
......
...@@ -24,13 +24,13 @@ ...@@ -24,13 +24,13 @@
#include "InputStream.hxx" #include "InputStream.hxx"
#include "thread/Thread.hxx" #include "thread/Thread.hxx"
#include "thread/Cond.hxx" #include "thread/Cond.hxx"
#include "util/HugeAllocator.hxx"
#include "util/CircularBuffer.hxx"
#include <exception> #include <exception>
#include <stdint.h> #include <stdint.h>
template<typename T> class CircularBuffer;
/** /**
* Helper class for moving InputStream implementations with blocking * Helper class for moving InputStream implementations with blocking
* backend library implementation to a dedicated thread. Data is * backend library implementation to a dedicated thread. Data is
...@@ -54,8 +54,9 @@ class ThreadInputStream : public InputStream { ...@@ -54,8 +54,9 @@ class ThreadInputStream : public InputStream {
std::exception_ptr postponed_exception; std::exception_ptr postponed_exception;
const size_t buffer_size; HugeArray<uint8_t> allocation;
CircularBuffer<uint8_t> *buffer = nullptr;
CircularBuffer<uint8_t> buffer;
/** /**
* Shall the stream be closed? * Shall the stream be closed?
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment