Commit d0a812d2 authored by Max Kellermann's avatar Max Kellermann

input/Soup: move code into the class

parent 3dad2e1c
...@@ -72,6 +72,15 @@ struct SoupInputStream { ...@@ -72,6 +72,15 @@ struct SoupInputStream {
bool completed; bool completed;
GError *postponed_error; GError *postponed_error;
SoupInputStream(const char *uri, Mutex &mutex, Cond &cond);
~SoupInputStream();
bool CopyError(const SoupMessage *msg);
bool WaitData();
size_t Read(void *ptr, size_t size, GError **error_r);
}; };
static inline GQuark static inline GQuark
...@@ -127,31 +136,31 @@ input_soup_finish(void) ...@@ -127,31 +136,31 @@ input_soup_finish(void)
* *
* @return true if there was no error * @return true if there was no error
*/ */
static bool bool
input_soup_copy_error(SoupInputStream *s, const SoupMessage *msg) SoupInputStream::CopyError(const SoupMessage *src)
{ {
if (SOUP_STATUS_IS_SUCCESSFUL(msg->status_code)) if (SOUP_STATUS_IS_SUCCESSFUL(src->status_code))
return true; return true;
if (msg->status_code == SOUP_STATUS_CANCELLED) if (src->status_code == SOUP_STATUS_CANCELLED)
/* failure, but don't generate a GError, because this /* failure, but don't generate a GError, because this
status was caused by _close() */ status was caused by _close() */
return false; return false;
if (s->postponed_error != NULL) if (postponed_error != nullptr)
/* there's already a GError, don't overwrite it */ /* there's already a GError, don't overwrite it */
return false; return false;
if (SOUP_STATUS_IS_TRANSPORT_ERROR(msg->status_code)) if (SOUP_STATUS_IS_TRANSPORT_ERROR(src->status_code))
s->postponed_error = postponed_error =
g_error_new(soup_quark(), msg->status_code, g_error_new(soup_quark(), src->status_code,
"HTTP client error: %s", "HTTP client error: %s",
msg->reason_phrase); src->reason_phrase);
else else
s->postponed_error = postponed_error =
g_error_new(soup_quark(), msg->status_code, g_error_new(soup_quark(), src->status_code,
"got HTTP status: %d %s", "got HTTP status: %d %s",
msg->status_code, msg->reason_phrase); src->status_code, src->reason_phrase);
return false; return false;
} }
...@@ -168,7 +177,7 @@ input_soup_session_callback(G_GNUC_UNUSED SoupSession *session, ...@@ -168,7 +177,7 @@ input_soup_session_callback(G_GNUC_UNUSED SoupSession *session,
const ScopeLock protect(*s->base.mutex); const ScopeLock protect(*s->base.mutex);
if (!s->base.ready) if (!s->base.ready)
input_soup_copy_error(s, msg); s->CopyError(msg);
s->base.ready = true; s->base.ready = true;
s->alive = false; s->alive = false;
...@@ -184,7 +193,7 @@ input_soup_got_headers(SoupMessage *msg, gpointer user_data) ...@@ -184,7 +193,7 @@ input_soup_got_headers(SoupMessage *msg, gpointer user_data)
s->base.mutex->lock(); s->base.mutex->lock();
if (!input_soup_copy_error(s, msg)) { if (!s->CopyError(msg)) {
s->base.mutex->unlock(); s->base.mutex->unlock();
soup_session_cancel_message(soup_session, msg, soup_session_cancel_message(soup_session, msg,
...@@ -237,22 +246,22 @@ input_soup_got_body(G_GNUC_UNUSED SoupMessage *msg, gpointer user_data) ...@@ -237,22 +246,22 @@ input_soup_got_body(G_GNUC_UNUSED SoupMessage *msg, gpointer user_data)
s->base.mutex->unlock(); s->base.mutex->unlock();
} }
static bool inline bool
input_soup_wait_data(SoupInputStream *s) SoupInputStream::WaitData()
{ {
while (true) { while (true) {
if (s->eof) if (eof)
return true; return true;
if (!s->alive) if (!alive)
return false; return false;
if (!g_queue_is_empty(s->buffers)) if (!g_queue_is_empty(buffers))
return true; return true;
assert(s->current_consumed == 0); assert(current_consumed == 0);
s->base.cond->wait(*s->base.mutex); base.cond->wait(*base.mutex);
} }
} }
...@@ -267,22 +276,16 @@ input_soup_queue(gpointer data) ...@@ -267,22 +276,16 @@ input_soup_queue(gpointer data)
return NULL; return NULL;
} }
static struct input_stream * SoupInputStream::SoupInputStream(const char *uri,
input_soup_open(const char *uri, Mutex &mutex, Cond &cond)
Mutex &mutex, Cond &cond, :buffers(g_queue_new()),
G_GNUC_UNUSED GError **error_r) current_consumed(0), total_buffered(0),
alive(false), pause(false), eof(false), completed(false),
postponed_error(nullptr)
{ {
if (strncmp(uri, "http://", 7) != 0) input_stream_init(&base, &input_plugin_soup, uri,
return NULL;
SoupInputStream *s = g_new(SoupInputStream, 1);
input_stream_init(&s->base, &input_plugin_soup, uri,
mutex, cond); mutex, cond);
s->buffers = g_queue_new();
s->current_consumed = 0;
s->total_buffered = 0;
#if GCC_CHECK_VERSION(4,6) #if GCC_CHECK_VERSION(4,6)
#pragma GCC diagnostic push #pragma GCC diagnostic push
/* the libsoup macro SOUP_METHOD_GET discards the "const" /* the libsoup macro SOUP_METHOD_GET discards the "const"
...@@ -291,32 +294,36 @@ input_soup_open(const char *uri, ...@@ -291,32 +294,36 @@ input_soup_open(const char *uri,
#pragma GCC diagnostic ignored "-Wcast-qual" #pragma GCC diagnostic ignored "-Wcast-qual"
#endif #endif
s->msg = soup_message_new(SOUP_METHOD_GET, uri); msg = soup_message_new(SOUP_METHOD_GET, uri);
#if GCC_CHECK_VERSION(4,6) #if GCC_CHECK_VERSION(4,6)
#pragma GCC diagnostic pop #pragma GCC diagnostic pop
#endif #endif
soup_message_set_flags(s->msg, SOUP_MESSAGE_NO_REDIRECT); soup_message_set_flags(msg, SOUP_MESSAGE_NO_REDIRECT);
soup_message_headers_append(s->msg->request_headers, "User-Agent", soup_message_headers_append(msg->request_headers, "User-Agent",
"Music Player Daemon " VERSION); "Music Player Daemon " VERSION);
g_signal_connect(s->msg, "got-headers", g_signal_connect(msg, "got-headers",
G_CALLBACK(input_soup_got_headers), s); G_CALLBACK(input_soup_got_headers), this);
g_signal_connect(s->msg, "got-chunk", g_signal_connect(msg, "got-chunk",
G_CALLBACK(input_soup_got_chunk), s); G_CALLBACK(input_soup_got_chunk), this);
g_signal_connect(s->msg, "got-body", g_signal_connect(msg, "got-body",
G_CALLBACK(input_soup_got_body), s); G_CALLBACK(input_soup_got_body), this);
s->alive = true;
s->pause = false;
s->eof = false;
s->completed = false;
s->postponed_error = NULL;
io_thread_call(input_soup_queue, s); io_thread_call(input_soup_queue, this);
}
static struct input_stream *
input_soup_open(const char *uri,
Mutex &mutex, Cond &cond,
G_GNUC_UNUSED GError **error_r)
{
if (strncmp(uri, "http://", 7) != 0)
return NULL;
SoupInputStream *s = new SoupInputStream(uri, mutex, cond);
return &s->base; return &s->base;
} }
...@@ -332,38 +339,42 @@ input_soup_cancel(gpointer data) ...@@ -332,38 +339,42 @@ input_soup_cancel(gpointer data)
return NULL; return NULL;
} }
static void SoupInputStream::~SoupInputStream()
input_soup_close(struct input_stream *is)
{ {
SoupInputStream *s = (SoupInputStream *)is; base.mutex->lock();
s->base.mutex->lock(); if (!completed) {
if (!s->completed) {
/* the messages's session callback hasn't been invoked /* the messages's session callback hasn't been invoked
yet; cancel it and wait for completion */ yet; cancel it and wait for completion */
s->base.mutex->unlock(); base.mutex->unlock();
io_thread_call(input_soup_cancel, s); io_thread_call(input_soup_cancel, this);
s->base.mutex->lock(); base.mutex->lock();
while (!s->completed) while (!completed)
s->base.cond->wait(*s->base.mutex); base.cond->wait(*base.mutex);
} }
s->base.mutex->unlock(); base.mutex->unlock();
SoupBuffer *buffer; SoupBuffer *buffer;
while ((buffer = (SoupBuffer *)g_queue_pop_head(s->buffers)) != NULL) while ((buffer = (SoupBuffer *)g_queue_pop_head(buffers)) != NULL)
soup_buffer_free(buffer); soup_buffer_free(buffer);
g_queue_free(s->buffers); g_queue_free(buffers);
if (s->postponed_error != NULL) if (postponed_error != NULL)
g_error_free(s->postponed_error); g_error_free(postponed_error);
input_stream_deinit(&s->base); input_stream_deinit(&base);
g_free(s); }
static void
input_soup_close(struct input_stream *is)
{
SoupInputStream *s = (SoupInputStream *)is;
delete s;
} }
static bool static bool
...@@ -388,18 +399,15 @@ input_soup_available(struct input_stream *is) ...@@ -388,18 +399,15 @@ input_soup_available(struct input_stream *is)
return s->eof || !s->alive || !g_queue_is_empty(s->buffers); return s->eof || !s->alive || !g_queue_is_empty(s->buffers);
} }
static size_t inline size_t
input_soup_read(struct input_stream *is, void *ptr, size_t size, SoupInputStream::Read(void *ptr, size_t size, GError **error_r)
G_GNUC_UNUSED GError **error_r)
{ {
SoupInputStream *s = (SoupInputStream *)is; if (!WaitData()) {
assert(!alive);
if (!input_soup_wait_data(s)) {
assert(!s->alive);
if (s->postponed_error != NULL) { if (postponed_error != nullptr) {
g_propagate_error(error_r, s->postponed_error); g_propagate_error(error_r, postponed_error);
s->postponed_error = NULL; postponed_error = nullptr;
} else } else
g_set_error_literal(error_r, soup_quark(), 0, g_set_error_literal(error_r, soup_quark(), 0,
"HTTP failure"); "HTTP failure");
...@@ -410,19 +418,19 @@ input_soup_read(struct input_stream *is, void *ptr, size_t size, ...@@ -410,19 +418,19 @@ input_soup_read(struct input_stream *is, void *ptr, size_t size,
while (p < p_end) { while (p < p_end) {
SoupBuffer *buffer = (SoupBuffer *) SoupBuffer *buffer = (SoupBuffer *)
g_queue_pop_head(s->buffers); g_queue_pop_head(buffers);
if (buffer == NULL) { if (buffer == NULL) {
assert(s->current_consumed == 0); assert(current_consumed == 0);
break; break;
} }
assert(s->current_consumed < buffer->length); assert(current_consumed < buffer->length);
assert(s->total_buffered >= buffer->length); assert(total_buffered >= buffer->length);
const char *q = buffer->data; const char *q = buffer->data;
q += s->current_consumed; q += current_consumed;
size_t remaining = buffer->length - s->current_consumed; size_t remaining = buffer->length - current_consumed;
size_t nbytes = p_end - p; size_t nbytes = p_end - p;
if (nbytes > remaining) if (nbytes > remaining)
nbytes = remaining; nbytes = remaining;
...@@ -430,31 +438,40 @@ input_soup_read(struct input_stream *is, void *ptr, size_t size, ...@@ -430,31 +438,40 @@ input_soup_read(struct input_stream *is, void *ptr, size_t size,
memcpy(p, q, nbytes); memcpy(p, q, nbytes);
p += nbytes; p += nbytes;
s->current_consumed += remaining; current_consumed += remaining;
if (s->current_consumed >= buffer->length) { if (current_consumed >= buffer->length) {
/* done with this buffer */ /* done with this buffer */
s->total_buffered -= buffer->length; total_buffered -= buffer->length;
soup_buffer_free(buffer); soup_buffer_free(buffer);
s->current_consumed = 0; current_consumed = 0;
} else { } else {
/* partial read */ /* partial read */
assert(p == p_end); assert(p == p_end);
g_queue_push_head(s->buffers, buffer); g_queue_push_head(buffers, buffer);
} }
} }
if (s->pause && s->total_buffered < SOUP_RESUME_AT) { if (pause && total_buffered < SOUP_RESUME_AT) {
s->pause = false; pause = false;
soup_session_unpause_message(soup_session, s->msg); soup_session_unpause_message(soup_session, msg);
} }
size_t nbytes = p - p0; size_t nbytes = p - p0;
s->base.offset += nbytes; base.offset += nbytes;
return nbytes; return nbytes;
} }
static size_t
input_soup_read(struct input_stream *is, void *ptr, size_t size,
GError **error_r)
{
SoupInputStream *s = (SoupInputStream *)is;
return s->Read(ptr, size, error_r);
}
static bool static bool
input_soup_eof(G_GNUC_UNUSED struct input_stream *is) input_soup_eof(G_GNUC_UNUSED struct input_stream *is)
{ {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment