Commit 2a5dcba5 authored by Eric Wong's avatar Eric Wong

http: initial rewrite using ringbuffer + pthreads

This institutes the usage of a separate thread to buffer HTTP input. It is basically practice code for using the ringbuffer code which I plan on reusing for the OutputBuffer as well as further input buffering for disk (networked filesystems over WAN, laptops on battery, etc). Each readFromInputStream() call on an HTTP stream can take several seconds to complete, short reads are avoided. A single-threaded solution for systems supporting large enough SO_RCVBUF values should also be possible and will likely be done in the future; but this lock-free(except when full/empty) ringbuffer is cool :) git-svn-id: https://svn.musicpd.org/mpd/trunk@7393 09075e82-0dd4-0310-85a5-a0d7c8717e4f
parent c71cfbac
...@@ -23,187 +23,150 @@ ...@@ -23,187 +23,150 @@
#include "log.h" #include "log.h"
#include "conf.h" #include "conf.h"
#include "os_compat.h" #include "os_compat.h"
#include "ringbuf.h"
#define HTTP_CONN_STATE_CLOSED 0
#define HTTP_CONN_STATE_INIT 1 enum conn_state { /* only written by io thread, read by both */
#define HTTP_CONN_STATE_HELLO 2 CONN_STATE_NEW, /* just (re)initialized */
#define HTTP_CONN_STATE_OPEN 3 CONN_STATE_REDIRECT, /* redirect */
#define HTTP_CONN_STATE_REOPEN 4 CONN_STATE_CONNECTED, /* connected to the socket */
CONN_STATE_REQUESTED, /* sent HTTP request */
CONN_STATE_RESP_HEAD, /* reading HTTP response header */
CONN_STATE_PREBUFFER, /* prebuffering data stream */
CONN_STATE_BUFFER, /* buffering data stream */
CONN_STATE_BUFFER_FULL, /* reading actual data stream */
CONN_STATE_CLOSED /* it's over, time to die */
};
/* used by all HTTP header matching */
#define match(s) !strncasecmp(cur, s, (offset = sizeof(s) - 1))
#define assert_state(st) assert(data->state == st)
#define assert_state2(s1,s2) assert((data->state == s1) || (data->state == s2))
enum conn_action { /* only written by control thread, read by both */
CONN_ACTION_NONE,
CONN_ACTION_CLOSE,
CONN_ACTION_DOSEEK
};
#define HTTP_BUFFER_SIZE_DEFAULT 131072 #define HTTP_BUFFER_SIZE_DEFAULT 131072
#define HTTP_PREBUFFER_SIZE_DEFAULT (HTTP_BUFFER_SIZE_DEFAULT >> 2) #define HTTP_PREBUFFER_SIZE_DEFAULT (HTTP_BUFFER_SIZE_DEFAULT >> 2)
#define HTTP_REDIRECT_MAX 10 #define HTTP_REDIRECT_MAX 10
#define HTTP_MAX_TRIES 100 static char *proxy_host;
static char *proxy_port;
static char *proxyHost; static char *proxy_user;
static char *proxyPort; static char *proxy_password;
static char *proxyUser; static size_t buffer_size = HTTP_BUFFER_SIZE_DEFAULT;
static char *proxyPassword; static size_t prebuffer_size = HTTP_PREBUFFER_SIZE_DEFAULT;
static size_t bufferSize = HTTP_BUFFER_SIZE_DEFAULT;
static size_t prebufferSize = HTTP_PREBUFFER_SIZE_DEFAULT;
typedef struct _InputStreemHTTPData {
char *host;
char *path;
char *port;
int sock;
int connState;
char *buffer;
size_t buflen;
int timesRedirected;
size_t icyMetaint;
int prebuffer;
size_t icyOffset;
char *proxyAuth;
char *httpAuth;
/* Number of times mpd tried to get data */
int tries;
} InputStreamHTTPData;
void inputStream_initHttp(void)
{
ConfigParam *param = getConfigParam(CONF_HTTP_PROXY_HOST);
char *test;
if (param) {
proxyHost = param->value;
param = getConfigParam(CONF_HTTP_PROXY_PORT);
if (!param) {
FATAL("%s specified but not %s\n", CONF_HTTP_PROXY_HOST,
CONF_HTTP_PROXY_PORT);
}
proxyPort = param->value;
param = getConfigParam(CONF_HTTP_PROXY_USER);
if (param) {
proxyUser = param->value;
param = getConfigParam(CONF_HTTP_PROXY_PASSWORD);
if (!param) { struct http_data {
FATAL("%s specified but not %s\n", int fd;
CONF_HTTP_PROXY_USER, enum conn_state state;
CONF_HTTP_PROXY_PASSWORD);
}
proxyPassword = param->value;
} else {
param = getConfigParam(CONF_HTTP_PROXY_PASSWORD);
if (param) {
FATAL("%s specified but not %s\n",
CONF_HTTP_PROXY_PASSWORD, CONF_HTTP_PROXY_USER);
}
}
} else if ((param = getConfigParam(CONF_HTTP_PROXY_PORT))) {
FATAL("%s specified but not %s, line %i\n",
CONF_HTTP_PROXY_PORT, CONF_HTTP_PROXY_HOST, param->line);
} else if ((param = getConfigParam(CONF_HTTP_PROXY_USER))) {
FATAL("%s specified but not %s, line %i\n",
CONF_HTTP_PROXY_USER, CONF_HTTP_PROXY_HOST, param->line);
} else if ((param = getConfigParam(CONF_HTTP_PROXY_PASSWORD))) {
FATAL("%s specified but not %s, line %i\n",
CONF_HTTP_PROXY_PASSWORD, CONF_HTTP_PROXY_HOST,
param->line);
}
param = getConfigParam(CONF_HTTP_BUFFER_SIZE); /* { we may have a non-multithreaded HTTP discipline in the future */
enum conn_action action;
int pipe_fds[2];
if (param) { pthread_t io_thread;
long tmp = strtol(param->value, &test, 10); struct ringbuf *rb;
if (*test != '\0' || tmp <= 0) {
FATAL("\"%s\" specified for %s at line %i is not a "
"positive integer\n",
param->value, CONF_HTTP_BUFFER_SIZE, param->line);
}
bufferSize = tmp * 1024; /* TODO: fix Notify so it doesn't use ugly "pending" flag */
} pthread_mutex_t full_lock;
pthread_cond_t full_cond;
pthread_mutex_t empty_lock;
pthread_cond_t empty_cond;
param = getConfigParam(CONF_HTTP_PREBUFFER_SIZE); pthread_mutex_t action_lock;
pthread_cond_t action_cond;
/* } */
if (param) { int nr_redirect;
long tmp = strtol(param->value, &test, 10); size_t icy_metaint;
if (*test != '\0' || tmp <= 0) { size_t icy_offset;
FATAL("\"%s\" specified for %s at line %i is not a " char *host;
"positive integer\n", char *path;
param->value, CONF_HTTP_PREBUFFER_SIZE, char *port;
param->line); char *proxy_auth;
} char *http_auth;
};
prebufferSize = tmp * 1024; static int awaken_buffer_task(struct http_data *data);
}
if (prebufferSize > bufferSize) static void init_http_data(struct http_data *data)
prebufferSize = bufferSize; {
assert(bufferSize > 0 && "http bufferSize too small"); data->fd = -1;
assert(prebufferSize > 0 && "http prebufferSize too small"); data->action = CONN_ACTION_NONE;
data->state = CONN_STATE_NEW;
init_async_pipe(data->pipe_fds);
data->proxy_auth = proxy_host ?
proxy_auth_string(proxy_user, proxy_password) :
NULL;
data->http_auth = NULL;
data->host = NULL;
data->path = NULL;
data->port = NULL;
data->nr_redirect = 0;
data->icy_metaint = 0;
data->icy_offset = 0;
data->rb = ringbuf_create(buffer_size);
pthread_cond_init(&data->action_cond, NULL);
pthread_mutex_init(&data->action_lock, NULL);
pthread_cond_init(&data->full_cond, NULL);
pthread_mutex_init(&data->full_lock, NULL);
pthread_cond_init(&data->empty_cond, NULL);
pthread_mutex_init(&data->empty_lock, NULL);
} }
static InputStreamHTTPData *newInputStreamHTTPData(void) static struct http_data *new_http_data(void)
{ {
InputStreamHTTPData *ret = xmalloc(sizeof(InputStreamHTTPData)); struct http_data *ret = xmalloc(sizeof(struct http_data));
init_http_data(ret);
if (proxyHost) {
ret->proxyAuth = proxyAuthString(proxyUser, proxyPassword);
} else
ret->proxyAuth = NULL;
ret->httpAuth = NULL;
ret->host = NULL;
ret->path = NULL;
ret->port = NULL;
ret->connState = HTTP_CONN_STATE_CLOSED;
ret->timesRedirected = 0;
ret->icyMetaint = 0;
ret->prebuffer = 0;
ret->icyOffset = 0;
ret->buffer = xmalloc(bufferSize);
ret->tries = 0;
return ret; return ret;
} }
static void freeInputStreamHTTPData(InputStreamHTTPData * data) static void free_http_data(struct http_data * data)
{ {
if (data->host) if (data->host) free(data->host);
free(data->host); if (data->path) free(data->path);
if (data->path) if (data->port) free(data->port);
free(data->path); if (data->proxy_auth) free(data->proxy_auth);
if (data->port) if (data->http_auth) free(data->http_auth);
free(data->port);
if (data->proxyAuth) xpthread_cond_destroy(&data->action_cond);
free(data->proxyAuth); xpthread_mutex_destroy(&data->action_lock);
if (data->httpAuth) xpthread_cond_destroy(&data->full_cond);
free(data->httpAuth); xpthread_mutex_destroy(&data->full_lock);
xpthread_cond_destroy(&data->empty_cond);
free(data->buffer); xpthread_mutex_destroy(&data->empty_lock);
xclose(data->pipe_fds[0]);
xclose(data->pipe_fds[1]);
ringbuf_free(data->rb);
free(data); free(data);
} }
static int parseUrl(InputStreamHTTPData * data, char *url) static int parse_url(struct http_data * data, char *url)
{ {
char *temp;
char *colon; char *colon;
char *slash; char *slash;
char *at; char *at;
int len; int len;
char *cur = url;
size_t offset;
if (strncmp("http://", url, strlen("http://")) != 0) if (!match("http://"))
return -1; return -1;
temp = url + strlen("http://"); cur = url + offset;
colon = strchr(cur, ':');
colon = strchr(temp, ':'); at = strchr(cur, '@');
at = strchr(temp, '@');
if (data->httpAuth) { if (data->http_auth) {
free(data->httpAuth); free(data->http_auth);
data->httpAuth = NULL; data->http_auth = NULL;
} }
if (at) { if (at) {
...@@ -211,42 +174,42 @@ static int parseUrl(InputStreamHTTPData * data, char *url) ...@@ -211,42 +174,42 @@ static int parseUrl(InputStreamHTTPData * data, char *url)
char *passwd; char *passwd;
if (colon && colon < at) { if (colon && colon < at) {
user = xmalloc(colon - temp + 1); user = xmalloc(colon - cur + 1);
memcpy(user, temp, colon - temp); memcpy(user, cur, colon - cur);
user[colon - temp] = '\0'; user[colon - cur] = '\0';
passwd = xmalloc(at - colon); passwd = xmalloc(at - colon);
memcpy(passwd, colon + 1, at - colon - 1); memcpy(passwd, colon + 1, at - colon - 1);
passwd[at - colon - 1] = '\0'; passwd[at - colon - 1] = '\0';
} else { } else {
user = xmalloc(at - temp + 1); user = xmalloc(at - cur + 1);
memcpy(user, temp, at - temp); memcpy(user, cur, at - cur);
user[at - temp] = '\0'; user[at - cur] = '\0';
passwd = xstrdup(""); passwd = xstrdup("");
} }
data->httpAuth = httpAuthString(user, passwd); data->http_auth = http_auth_string(user, passwd);
free(user); free(user);
free(passwd); free(passwd);
temp = at + 1; cur = at + 1;
colon = strchr(temp, ':'); colon = strchr(cur, ':');
} }
slash = strchr(temp, '/'); slash = strchr(cur, '/');
if (slash && colon && slash <= colon) if (slash && colon && slash <= colon)
return -1; return -1;
/* fetch the host portion */ /* fetch the host portion */
if (colon) if (colon)
len = colon - temp + 1; len = colon - cur + 1;
else if (slash) else if (slash)
len = slash - temp + 1; len = slash - cur + 1;
else else
len = strlen(temp) + 1; len = strlen(cur) + 1;
if (len <= 1) if (len <= 1)
return -1; return -1;
...@@ -254,7 +217,7 @@ static int parseUrl(InputStreamHTTPData * data, char *url) ...@@ -254,7 +217,7 @@ static int parseUrl(InputStreamHTTPData * data, char *url)
if (data->host) if (data->host)
free(data->host); free(data->host);
data->host = xmalloc(len); data->host = xmalloc(len);
memcpy(data->host, temp, len - 1); memcpy(data->host, cur, len - 1);
data->host[len - 1] = '\0'; data->host[len - 1] = '\0';
if (data->port) if (data->port)
free(data->port); free(data->port);
...@@ -274,14 +237,84 @@ static int parseUrl(InputStreamHTTPData * data, char *url) ...@@ -274,14 +237,84 @@ static int parseUrl(InputStreamHTTPData * data, char *url)
if (data->path) if (data->path)
free(data->path); free(data->path);
/* fetch the path */ /* fetch the path */
if (proxyHost) data->path = proxy_host ? xstrdup(url) : xstrdup(slash ? slash : "/");
data->path = xstrdup(url);
else
data->path = xstrdup(slash ? slash : "/");
return 0; return 0;
} }
static struct timespec * ts_timeout(struct timespec *ts, const long sec)
{
struct timeval tv;
gettimeofday(&tv, NULL);
ts->tv_sec = tv.tv_sec + sec;
ts->tv_nsec = tv.tv_usec * 1000;
return ts;
}
/* triggers an action and waits for completion */
static int trigger_action(struct http_data *data,
enum conn_action action,
int nonblocking)
{
int ret = -1;
assert(!pthread_equal(data->io_thread, pthread_self()));
pthread_mutex_lock(&data->action_lock);
if (data->action != CONN_ACTION_NONE)
goto out;
data->action = action;
if (awaken_buffer_task(data)) {
/* DEBUG("wokeup from cond_wait to trigger action\n"); */
} else if (xwrite(data->pipe_fds[1], "", 1) != 1) {
ERROR(__FILE__ ": pipe full, couldn't trigger action\n");
data->action = CONN_ACTION_NONE;
goto out;
}
if (nonblocking) {
struct timespec ts;
pthread_cond_timedwait(&data->action_cond,
&data->action_lock,
ts_timeout(&ts, 1));
} else {
pthread_cond_wait(&data->action_cond, &data->action_lock);
}
ret = 0;
out:
pthread_mutex_unlock(&data->action_lock);
return ret;
}
static int take_action(struct http_data *data)
{
assert(pthread_equal(data->io_thread, pthread_self()));
pthread_mutex_lock(&data->action_lock);
switch (data->action) {
case CONN_ACTION_NONE:
pthread_mutex_unlock(&data->action_lock);
return 0;
case CONN_ACTION_DOSEEK:
data->state = CONN_STATE_NEW;
break;
case CONN_ACTION_CLOSE:
data->state = CONN_STATE_CLOSED;
}
xclose(data->fd);
data->fd = -1;
data->action = CONN_ACTION_NONE;
pthread_cond_signal(&data->action_cond);
pthread_mutex_unlock(&data->action_lock);
return 1;
}
static int err_close(struct http_data *data)
{
assert(pthread_equal(data->io_thread, pthread_self()));
xclose(data->fd);
data->state = CONN_STATE_CLOSED;
return -1;
}
/* returns -1 on error, 0 on success (and sets dest) */ /* returns -1 on error, 0 on success (and sets dest) */
static int my_getaddrinfo(struct addrinfo **dest, static int my_getaddrinfo(struct addrinfo **dest,
const char *host, const char *port) const char *host, const char *port)
...@@ -325,69 +358,191 @@ static int my_connect_addrs(struct addrinfo *ans) ...@@ -325,69 +358,191 @@ static int my_connect_addrs(struct addrinfo *ans)
if (connect(fd, ap->ai_addr, ap->ai_addrlen) >= 0 if (connect(fd, ap->ai_addr, ap->ai_addrlen) >= 0
|| errno == EINPROGRESS) || errno == EINPROGRESS)
return fd; /* success */ return fd; /* success */
DEBUG(__FILE__ ": unable to connect: %s\n", strerror(errno)); DEBUG(__FILE__ ": unable to connect: %s\n", strerror(errno));
xclose(fd); /* failed, get the next one */ xclose(fd); /* failed, get the next one */
} }
return -1; return -1;
} }
static int initHTTPConnection(InputStream * inStream) static int init_connection(struct http_data *data)
{ {
struct addrinfo *ans = NULL; struct addrinfo *ans = NULL;
InputStreamHTTPData *data = (InputStreamHTTPData *) inStream->data;
if ((proxyHost ? my_getaddrinfo(&ans, proxyHost, proxyPort) : assert(pthread_equal(data->io_thread, pthread_self()));
assert_state2(CONN_STATE_NEW, CONN_STATE_REDIRECT);
if ((proxy_host ? my_getaddrinfo(&ans, proxy_host, proxy_port) :
my_getaddrinfo(&ans, data->host, data->port)) < 0) my_getaddrinfo(&ans, data->host, data->port)) < 0)
return -1; return -1;
data->sock = my_connect_addrs(ans); assert(data->fd < 0);
data->fd = my_connect_addrs(ans);
freeaddrinfo(ans); freeaddrinfo(ans);
if (data->sock < 0) if (data->fd < 0)
return -1; /* failed */ return -1; /* failed */
data->connState = HTTP_CONN_STATE_INIT; data->state = CONN_STATE_CONNECTED;
data->buflen = 0;
return 0; return 0;
} }
static int finishHTTPInit(InputStream * inStream) #define my_nfds(d) ((d->fd > d->pipe_fds[0] ? d->fd : d->pipe_fds[0]) + 1)
static int pipe_notified(struct http_data * data, fd_set *rfds)
{
char buf;
int fd = data->pipe_fds[0];
assert(pthread_equal(data->io_thread, pthread_self()));
return FD_ISSET(fd, rfds) && (xread(fd, &buf, 1) == 1);
}
enum await_result {
AWAIT_READY,
AWAIT_ACTION_PENDING,
AWAIT_ERROR
};
static enum await_result socket_error_or_ready(int fd)
{ {
InputStreamHTTPData *data = (InputStreamHTTPData *) inStream->data;
struct timeval tv;
fd_set writeSet;
fd_set errorSet;
int error;
socklen_t error_len = sizeof(int);
int ret; int ret;
size_t length; int error = 0;
ssize_t nbytes; socklen_t error_len = sizeof(int);
char request[2048];
ret = getsockopt(fd, SOL_SOCKET, SO_ERROR, &error, &error_len);
return (ret < 0 || error) ? AWAIT_ERROR : AWAIT_READY;
}
tv.tv_sec = 0; static enum await_result await_sendable(struct http_data *data)
tv.tv_usec = 0; {
fd_set rfds, wfds;
assert(pthread_equal(data->io_thread, pthread_self()));
assert_state(CONN_STATE_CONNECTED);
FD_ZERO(&rfds);
FD_ZERO(&wfds);
FD_SET(data->pipe_fds[0], &rfds);
FD_SET(data->fd, &wfds);
if (select(my_nfds(data), &rfds, &wfds, NULL, NULL) <= 0)
return AWAIT_ERROR;
if (pipe_notified(data, &rfds)) return AWAIT_ACTION_PENDING;
return socket_error_or_ready(data->fd);
}
static enum await_result await_recvable(struct http_data *data)
{
fd_set rfds;
assert(pthread_equal(data->io_thread, pthread_self()));
FD_ZERO(&rfds);
FD_SET(data->pipe_fds[0], &rfds);
FD_SET(data->fd, &rfds);
if (select(my_nfds(data), &rfds, NULL, NULL, NULL) <= 0)
return AWAIT_ERROR;
if (pipe_notified(data, &rfds)) return AWAIT_ACTION_PENDING;
return socket_error_or_ready(data->fd);
}
static void await_buffer_space(struct http_data *data)
{
assert(pthread_equal(data->io_thread, pthread_self()));
assert_state(CONN_STATE_BUFFER_FULL);
pthread_cond_wait(&data->full_cond, &data->full_lock);
if (ringbuf_write_space(data->rb) > 0)
data->state = CONN_STATE_BUFFER;
/* else spurious wakeup or action triggered ... */
}
static void feed_starved(struct http_data *data)
{
assert(pthread_equal(data->io_thread, pthread_self()));
if (!pthread_mutex_trylock(&data->empty_lock)) {
pthread_cond_signal(&data->empty_cond);
pthread_mutex_unlock(&data->empty_lock);
}
}
static int starved_wait(struct http_data *data, const long sec)
{
struct timespec ts;
FD_ZERO(&writeSet); assert(!pthread_equal(data->io_thread, pthread_self()));
FD_ZERO(&errorSet); return pthread_cond_timedwait(&data->empty_cond,
FD_SET(data->sock, &writeSet); &data->empty_lock,
FD_SET(data->sock, &errorSet); ts_timeout(&ts, sec));
}
ret = select(data->sock + 1, NULL, &writeSet, &errorSet, &tv); static int awaken_buffer_task(struct http_data *data)
{
assert(!pthread_equal(data->io_thread, pthread_self()));
if (!pthread_mutex_trylock(&data->full_lock)) {
pthread_cond_signal(&data->full_cond);
pthread_mutex_unlock(&data->full_lock);
return 1;
}
return 0;
}
if (ret == 0 || (ret < 0 && errno == EINTR)) static ssize_t buffer_data(InputStream *is)
{
struct iovec vec[2];
ssize_t r;
struct http_data *data = (struct http_data *)is->data;
assert(pthread_equal(data->io_thread, pthread_self()));
assert_state2(CONN_STATE_BUFFER, CONN_STATE_PREBUFFER);
if (!ringbuf_get_write_vector(data->rb, vec)) {
data->state = CONN_STATE_BUFFER_FULL;
return 0; return 0;
}
r = readv(data->fd, vec, vec[1].iov_len ? 2 : 1);
if (r > 0) {
size_t buflen;
if (ret < 0) { ringbuf_write_advance(data->rb, r);
DEBUG(__FILE__ ": problem select'ing: %s\n", strerror(errno)); buflen = ringbuf_read_space(data->rb);
goto close_err; if (buflen == 0 || buflen < data->icy_metaint)
data->state = CONN_STATE_PREBUFFER;
else if (buflen >= prebuffer_size)
data->state = CONN_STATE_BUFFER;
if (data->state == CONN_STATE_BUFFER)
feed_starved(data);
return r;
} else if (r < 0) {
if (errno == EAGAIN || errno == EINTR)
return 0;
is->error = errno;
} }
err_close(data);
return r;
}
getsockopt(data->sock, SOL_SOCKET, SO_ERROR, &error, &error_len); /*
if (error) * This requires the socket to be writable beforehand (determined via
goto close_err; * select(2)). This does NOT retry or continue if we can't write the
* HTTP header in one shot. One reason for this is laziness, I don't
* want to have to store the header when recalling this function, but
* the other reason is practical, too: if we can't send a small HTTP
* request without blocking, the connection is pathetic anyways and we
* should just stop
*
* Returns -1 on error, 0 on success
*/
static int send_request(InputStream * is)
{
struct http_data *data = (struct http_data *) is->data;
int length;
ssize_t nbytes;
char request[2048]; /* todo(?): write item-at-a-time and cork */
/* deal with ICY metadata later, for now its fucking up stuff! */ assert(pthread_equal(data->io_thread, pthread_self()));
length = (size_t)snprintf(request, sizeof(request), assert_state(CONN_STATE_CONNECTED);
length = snprintf(request, sizeof(request),
"GET %s HTTP/1.1\r\n" "GET %s HTTP/1.1\r\n"
"Host: %s\r\n" "Host: %s\r\n"
"Connection: close\r\n" "Connection: close\r\n"
...@@ -398,447 +553,529 @@ static int finishHTTPInit(InputStream * inStream) ...@@ -398,447 +553,529 @@ static int finishHTTPInit(InputStream * inStream)
"\r\n", "\r\n",
data->path, data->path,
data->host, data->host,
inStream->offset, is->offset,
data->proxyAuth ? data->proxyAuth : data->proxy_auth ? data->proxy_auth :
(data->httpAuth ? data->httpAuth : "")); (data->http_auth ? data->http_auth : ""));
if (length < 0 || length >= (int)sizeof(request))
if (length >= sizeof(request)) return err_close(data);
goto close_err; nbytes = write(data->fd, request, (size_t)length);
nbytes = write(data->sock, request, length); if (nbytes < 0 || nbytes != (ssize_t)length)
if (nbytes < 0 || (size_t)nbytes != length) return err_close(data);
goto close_err; data->state = CONN_STATE_REQUESTED;
data->connState = HTTP_CONN_STATE_HELLO;
return 0; return 0;
close_err:
close(data->sock);
data->connState = HTTP_CONN_STATE_CLOSED;
return -1;
} }
static int getHTTPHello(InputStream * inStream) /* handles parsing of the first line of the HTTP response */
static int parse_response_code(InputStream * is, const char *response)
{ {
InputStreamHTTPData *data = (InputStreamHTTPData *) inStream->data; size_t offset;
fd_set readSet; const char *cur = response;
struct timeval tv;
int ret; is->seekable = 0;
char *needle; if (match("HTTP/1.0 ")) {
char *cur = data->buffer; return atoi(cur + offset);
int rc; } else if (match("HTTP/1.1 ")) {
long readed; is->seekable = 1;
return atoi(cur + offset);
FD_ZERO(&readSet); } else if (match("ICY 200 OK")) {
FD_SET(data->sock, &readSet); return 200;
} else if (match("ICY 400 Server Full")) {
return 400;
} else if (match("ICY 404"))
return 404;
return 0;
}
tv.tv_sec = 0; static int leading_space(int c)
tv.tv_usec = 0; {
return (c == ' ' || c == '\t');
}
ret = select(data->sock + 1, &readSet, NULL, NULL, &tv); static int parse_header_dup(char **dst, char *cur)
{
char *eol;
size_t len;
if (ret == 0 || (ret < 0 && errno == EINTR)) if (!(eol = strstr(cur, "\r\n")))
return -1;
*eol = '\0';
while (leading_space(*cur))
cur++;
len = strlen(cur) + 1;
*dst = xrealloc(*dst, len);
memcpy(*dst, cur, len);
*eol = '\r';
return 0; return 0;
}
if (ret < 0) { static int parse_redirect(InputStream * is, char *response, const char *needle)
data->connState = HTTP_CONN_STATE_CLOSED; {
close(data->sock); char *url = NULL;
data->buflen = 0; char *cur = strstr(response, "\r\n");
return -1; size_t offset;
} struct http_data *data = (struct http_data *) is->data;
int ret;
if (data->buflen >= bufferSize - 1) { while (cur && cur != needle) {
data->connState = HTTP_CONN_STATE_CLOSED; assert(cur < needle);
close(data->sock); if (match("\r\nLocation:"))
goto found;
cur = strstr(cur + 2, "\r\n");
}
return -1; return -1;
found:
if (parse_header_dup(&url, cur + offset) < 0)
return -1;
ret = parse_url(data, url);
free(url);
if (!ret && data->nr_redirect < HTTP_REDIRECT_MAX) {
data->nr_redirect++;
xclose(data->fd);
data->fd = -1;
data->state = CONN_STATE_REDIRECT;
return 0; /* success */
} }
return -1;
}
readed = recv(data->sock, data->buffer + data->buflen, static int parse_headers(InputStream * is, char *response, const char *needle)
bufferSize - 1 - data->buflen, 0); {
struct http_data *data = (struct http_data *) is->data;
char *cur = strstr(response, "\r\n");
size_t offset;
long tmp;
data->icy_metaint = 0;
data->icy_offset = 0;
if (is->mime) {
free(is->mime);
is->mime = NULL;
}
if (is->metaName) {
free(is->metaName);
is->metaName = NULL;
}
is->size = 0;
if (readed < 0 && (errno == EAGAIN || errno == EINTR)) while (cur && cur != needle) {
assert(cur < needle);
if (match("\r\nContent-Length:")) {
if ((tmp = atol(cur + offset)) >= 0)
is->size = tmp;
} else if (match("\r\nicy-metaint:")) {
if ((tmp = atol(cur + offset)) >= 0)
data->icy_metaint = tmp;
} else if (match("\r\nicy-name:") ||
match("\r\nice-name:") ||
match("\r\nx-audiocast-name:")) {
if (parse_header_dup(&is->metaName, cur + offset) < 0)
return -1;
DEBUG(__FILE__": metaName: %s\n", is->metaName);
} else if (match("\r\nContent-Type:")) {
if (parse_header_dup(&is->mime, cur + offset) < 0)
return -1;
}
cur = strstr(cur + 2, "\r\n");
}
return 0; return 0;
}
if (readed <= 0) { /* Returns -1 on error, 0 on success */
data->connState = HTTP_CONN_STATE_CLOSED; static int recv_response(InputStream * is)
close(data->sock); {
data->buflen = 0; struct http_data *data = (struct http_data *) is->data;
char *needle;
char response[2048];
const size_t response_max = sizeof(response) - 1;
ssize_t r;
ssize_t peeked;
assert(pthread_equal(data->io_thread, pthread_self()));
assert_state2(CONN_STATE_RESP_HEAD, CONN_STATE_REQUESTED);
do {
r = recv(data->fd, response, response_max, MSG_PEEK);
} while (r < 0 && errno == EINTR);
if (r <= 0)
return err_close(data); /* EOF */
response[r] = '\0';
if (!(needle = strstr(response, "\r\n\r\n"))) {
if ((size_t)r == response_max)
return err_close(data);
/* response too small, try again */
data->state = CONN_STATE_RESP_HEAD;
return -1; return -1;
} }
data->buffer[data->buflen + readed] = '\0'; switch (parse_response_code(is, response)) {
data->buflen += readed; case 200: /* OK */
case 206: /* Partial Content */
break;
case 301: /* Moved Permanently */
case 302: /* Moved Temporarily */
if (parse_redirect(is, response, needle) == 0)
return 0; /* success, reconnect */
default:
return err_close(data);
}
needle = strstr(data->buffer, "\r\n\r\n"); parse_headers(is, response, needle);
if (is->size <= 0)
is->seekable = 0;
is->seekable = 0;
needle += sizeof("\r\n\r\n") - 1;
peeked = needle - response;
assert(peeked <= r);
do {
r = recv(data->fd, response, peeked, 0);
} while (r < 0 && errno == EINTR);
assert(r == peeked && "r != peeked");
ringbuf_writer_reset(data->rb);
data->state = CONN_STATE_PREBUFFER;
if (!needle)
return 0; return 0;
}
if (0 == strncmp(cur, "HTTP/1.0 ", 9)) { static void * http_io_task(void *arg)
inStream->seekable = 0; {
rc = atoi(cur + 9); InputStream *is = (InputStream *) arg;
} else if (0 == strncmp(cur, "HTTP/1.1 ", 9)) { struct http_data *data = (struct http_data *) is->data;
inStream->seekable = 1;
rc = atoi(cur + 9); pthread_mutex_lock(&data->full_lock);
} else if (0 == strncmp(cur, "ICY 200 OK", 10)) { while (1) {
inStream->seekable = 0; take_action(data);
rc = 200; switch (data->state) {
} else if (0 == strncmp(cur, "ICY 400 Server Full", 19)) case CONN_STATE_NEW:
rc = 400; case CONN_STATE_REDIRECT:
else if (0 == strncmp(cur, "ICY 404", 7)) init_connection(data);
rc = 404;
else {
close(data->sock);
data->connState = HTTP_CONN_STATE_CLOSED;
return -1;
}
switch (rc) {
case 200:
case 206:
break; break;
case 301: case CONN_STATE_CONNECTED:
case 302: switch (await_sendable(data)) {
cur = strstr(cur, "Location: "); case AWAIT_READY: send_request(is); break;
if (cur) { case AWAIT_ACTION_PENDING: break;
char *url; case AWAIT_ERROR: goto err;
int curlen = 0;
cur += strlen("Location: ");
while (*(cur + curlen) != '\0'
&& *(cur + curlen) != '\r') {
curlen++;
}
url = xmalloc(curlen + 1);
memcpy(url, cur, curlen);
url[curlen] = '\0';
ret = parseUrl(data, url);
free(url);
if (ret == 0 && data->timesRedirected <
HTTP_REDIRECT_MAX) {
data->timesRedirected++;
close(data->sock);
data->connState = HTTP_CONN_STATE_REOPEN;
data->buflen = 0;
return 0;
} }
break;
case CONN_STATE_REQUESTED:
case CONN_STATE_RESP_HEAD:
switch (await_recvable(data)) {
case AWAIT_READY: recv_response(is); break;
case AWAIT_ACTION_PENDING: break;
case AWAIT_ERROR: goto err;
} }
case 400:
case 401:
case 403:
case 404:
default:
close(data->sock);
data->connState = HTTP_CONN_STATE_CLOSED;
data->buflen = 0;
return -1;
}
cur = strstr(data->buffer, "\r\n");
while (cur && cur != needle) {
if (0 == strncasecmp(cur, "\r\nContent-Length: ", 18)) {
if (!inStream->size)
inStream->size = atol(cur + 18);
} else if (0 == strncasecmp(cur, "\r\nicy-metaint:", 14)) {
data->icyMetaint = strtoul(cur + 14, NULL, 0);
} else if (0 == strncasecmp(cur, "\r\nicy-name:", 11) ||
0 == strncasecmp(cur, "\r\nice-name:", 11)) {
int incr = 11;
char *temp = strstr(cur + incr, "\r\n");
if (!temp)
break; break;
*temp = '\0'; case CONN_STATE_PREBUFFER:
if (inStream->metaName) case CONN_STATE_BUFFER:
free(inStream->metaName); switch (await_recvable(data)) {
while (*(incr + cur) == ' ') case AWAIT_READY: buffer_data(is); break;
incr++; case AWAIT_ACTION_PENDING: break;
inStream->metaName = xstrdup(cur + incr); case AWAIT_ERROR: goto err;
*temp = '\r'; }
DEBUG("inputStream_http: metaName: %s\n",
inStream->metaName);
} else if (0 == strncasecmp(cur, "\r\nx-audiocast-name:", 19)) {
int incr = 19;
char *temp = strstr(cur + incr, "\r\n");
if (!temp)
break; break;
*temp = '\0'; case CONN_STATE_BUFFER_FULL:
if (inStream->metaName) await_buffer_space(data);
free(inStream->metaName);
while (*(incr + cur) == ' ')
incr++;
inStream->metaName = xstrdup(cur + incr);
*temp = '\r';
DEBUG("inputStream_http: metaName: %s\n",
inStream->metaName);
} else if (0 == strncasecmp(cur, "\r\nContent-Type:", 15)) {
int incr = 15;
char *temp = strstr(cur + incr, "\r\n");
if (!temp)
break; break;
*temp = '\0'; case CONN_STATE_CLOSED: goto closed;
if (inStream->mime)
free(inStream->mime);
while (*(incr + cur) == ' ')
incr++;
inStream->mime = xstrdup(cur + incr);
*temp = '\r';
} }
cur = strstr(cur + 2, "\r\n");
} }
err:
err_close(data);
closed:
assert_state(CONN_STATE_CLOSED);
pthread_mutex_unlock(&data->full_lock);
return NULL;
}
if (inStream->size <= 0) int inputStream_httpBuffer(InputStream *is)
inStream->seekable = 0; {
needle += 4; /* 4 == strlen("\r\n\r\n") */
data->buflen -= (needle - data->buffer);
memmove(data->buffer, needle, data->buflen);
data->connState = HTTP_CONN_STATE_OPEN;
data->prebuffer = 1;
return 0; return 0;
} }
int inputStream_httpOpen(InputStream * inStream, char *url) int inputStream_httpOpen(InputStream * is, char *url)
{ {
InputStreamHTTPData *data = newInputStreamHTTPData(); struct http_data *data = new_http_data();
pthread_attr_t attr;
inStream->data = data; is->seekable = 0;
if (parseUrl(data, url) < 0) { is->data = data;
freeInputStreamHTTPData(data); if (parse_url(data, url) < 0) {
free_http_data(data);
return -1; return -1;
} }
if (initHTTPConnection(inStream) < 0) { is->seekFunc = inputStream_httpSeek;
freeInputStreamHTTPData(data); is->closeFunc = inputStream_httpClose;
return -1; is->readFunc = inputStream_httpRead;
} is->atEOFFunc = inputStream_httpAtEOF;
is->bufferFunc = inputStream_httpBuffer;
inStream->seekFunc = inputStream_httpSeek; pthread_attr_init(&attr);
inStream->closeFunc = inputStream_httpClose; if (pthread_create(&data->io_thread, &attr, http_io_task, is))
inStream->readFunc = inputStream_httpRead; FATAL("failed to spawn http_io_task: %s", strerror(errno));
inStream->atEOFFunc = inputStream_httpAtEOF;
inStream->bufferFunc = inputStream_httpBuffer;
pthread_mutex_lock(&data->empty_lock);
return 0; return 0;
} }
int inputStream_httpSeek(InputStream * inStream, long offset, int whence) int inputStream_httpSeek(InputStream * is, long offset, int whence)
{ {
InputStreamHTTPData *data; struct http_data *data = (struct http_data *)is->data;
if (!inStream->seekable) long old_offset = is->offset;
long diff;
if (!is->seekable) {
is->error = ESPIPE;
return -1; return -1;
}
assert(is->size > 0);
switch (whence) { switch (whence) {
case SEEK_SET: case SEEK_SET:
inStream->offset = offset; is->offset = offset;
break; break;
case SEEK_CUR: case SEEK_CUR:
inStream->offset += offset; is->offset += offset;
break; break;
case SEEK_END: case SEEK_END:
inStream->offset = inStream->size + offset; is->offset = is->size + offset;
break; break;
default: default:
is->error = EINVAL;
return -1; return -1;
} }
data = (InputStreamHTTPData *)inStream->data; diff = is->offset - old_offset;
close(data->sock); if (diff > 0) { /* seek forward if we've already buffered it */
data->connState = HTTP_CONN_STATE_REOPEN; long avail = (long)ringbuf_read_space(data->rb);
data->buflen = 0; if (avail >= diff) {
ringbuf_read_advance(data->rb, diff);
inputStream_httpBuffer(inStream); return 0;
}
}
trigger_action(data, CONN_ACTION_DOSEEK, 0);
return 0; return 0;
} }
static void parseIcyMetadata(InputStream * inStream, char *metadata, int size) static void parse_icy_metadata(InputStream * is, char *metadata, size_t size)
{ {
char *r = NULL; char *r = NULL;
char *s; char *cur;
char *temp = xmalloc(size + 1); size_t offset;
memcpy(temp, metadata, size);
temp[size] = '\0'; assert(size);
s = strtok_r(temp, ";", &r); metadata[size] = '\0';
while (s) { cur = strtok_r(metadata, ";", &r);
if (0 == strncmp(s, "StreamTitle=", 12)) { while (cur) {
int cur = 12; if (match("StreamTitle=")) {
if (inStream->metaTitle) if (is->metaTitle)
free(inStream->metaTitle); free(is->metaTitle);
if (*(s + cur) == '\'') if (cur[offset] == '\'')
cur++; offset++;
if (s[strlen(s) - 1] == '\'') { if (r[-2] == '\'')
s[strlen(s) - 1] = '\0'; r[-2] = '\0';
} is->metaTitle = xstrdup(cur + offset);
inStream->metaTitle = xstrdup(s + cur); DEBUG(__FILE__ ": metaTitle: %s\n", is->metaTitle);
DEBUG("inputStream_http: metaTitle: %s\n", return;
inStream->metaTitle);
} }
s = strtok_r(NULL, ";", &r); cur = strtok_r(NULL, ";", &r);
} }
free(temp);
} }
size_t inputStream_httpRead(InputStream * inStream, void *ptr, size_t size, static size_t read_with_metadata(InputStream *is, void *ptr, ssize_t len)
size_t nmemb)
{ {
InputStreamHTTPData *data = (InputStreamHTTPData *) inStream->data; struct http_data *data = (struct http_data *) is->data;
size_t tosend = 0; size_t readed = 0;
size_t inlen = size * nmemb; size_t r;
size_t maxToSend = data->buflen; size_t to_read;
assert(data->icy_metaint > 0);
inputStream_httpBuffer(inStream);
while (len > 0) {
switch (data->connState) { if (ringbuf_read_space(data->rb) < data->icy_metaint)
case HTTP_CONN_STATE_OPEN:
if (data->prebuffer || data->buflen < data->icyMetaint)
return 0;
break;
case HTTP_CONN_STATE_CLOSED:
if (data->buflen)
break; break;
default: if (data->icy_offset >= data->icy_metaint) {
return 0; unsigned char metabuf[(UCHAR_MAX << 4) + 1];
} size_t metalen;
r = ringbuf_read(data->rb, metabuf, 1);
if (data->icyMetaint > 0) { assert(r == 1 && "failed to read");
if (data->icyOffset >= data->icyMetaint) { awaken_buffer_task(data);
size_t metalen = *(data->buffer); metalen = *(metabuf);
/* maybe we're in some strange universe where a byte
* can hold more than 255 ... */
assert(metalen <= 255 && "metalen greater than 255");
metalen <<= 4; metalen <<= 4;
if (metalen + 1 > data->buflen) { if (metalen) {
/* damn that's some fucking big metadata! */ r = ringbuf_read(data->rb, metabuf, metalen);
if (bufferSize < metalen + 1) { assert(r == metalen && "short metadata read");
data->connState = parse_icy_metadata(is, (char*)metabuf, metalen);
HTTP_CONN_STATE_CLOSED;
close(data->sock);
data->buflen = 0;
}
return 0;
}
if (metalen > 0) {
parseIcyMetadata(inStream, data->buffer + 1,
metalen);
} }
data->buflen -= metalen + 1; data->icy_offset = 0;
memmove(data->buffer, data->buffer + metalen + 1,
data->buflen);
data->icyOffset = 0;
} }
assert(data->icyOffset <= data->icyMetaint && to_read = len;
"icyOffset bigger than icyMetaint!"); if (to_read > (data->icy_metaint - data->icy_offset))
maxToSend = data->icyMetaint - data->icyOffset; to_read = data->icy_metaint - data->icy_offset;
maxToSend = maxToSend > data->buflen ? data->buflen : maxToSend; if (!(r = ringbuf_read(data->rb, ptr, to_read)))
} break;
awaken_buffer_task(data);
if (data->buflen > 0) { len -= r;
tosend = inlen > maxToSend ? maxToSend : inlen; ptr += r;
tosend = (tosend / size) * size; readed += r;
data->icy_offset += r;
memcpy(ptr, data->buffer, tosend);
data->buflen -= tosend;
data->icyOffset += tosend;
memmove(data->buffer, data->buffer + tosend, data->buflen);
inStream->offset += tosend;
} }
return readed;
return tosend / size;
} }
int inputStream_httpClose(InputStream * inStream) size_t inputStream_httpRead(InputStream * is, void *ptr, size_t size,
size_t nmemb)
{ {
InputStreamHTTPData *data = (InputStreamHTTPData *) inStream->data; struct http_data *data = (struct http_data *) is->data;
size_t len = size * nmemb;
size_t r;
void *ptr0 = ptr;
long tries = len / 128; /* try harder for bigger reads */
retry:
switch (data->state) {
case CONN_STATE_NEW:
case CONN_STATE_REDIRECT:
case CONN_STATE_CONNECTED:
case CONN_STATE_REQUESTED:
case CONN_STATE_RESP_HEAD:
case CONN_STATE_PREBUFFER:
if ((starved_wait(data, 1) == 0) || (tries-- > 0))
goto retry; /* success */
return 0;
case CONN_STATE_BUFFER:
case CONN_STATE_BUFFER_FULL:
break;
case CONN_STATE_CLOSED:
if (!ringbuf_read_space(data->rb))
return 0;
}
switch (data->connState) { while (1) {
case HTTP_CONN_STATE_CLOSED: if (data->icy_metaint > 0)
r = read_with_metadata(is, ptr, len);
else /* easy, no metadata to worry about */
r = ringbuf_read(data->rb, ptr, len);
assert(r <= len);
if (r) {
awaken_buffer_task(data);
is->offset += r;
ptr += r;
len -= r;
}
if (!len || (--tries < 0) ||
(data->state == CONN_STATE_CLOSED &&
!ringbuf_read_space(data->rb)))
break; break;
default: starved_wait(data, 1);
close(data->sock);
} }
return (ptr - ptr0) / size;
}
freeInputStreamHTTPData(data); int inputStream_httpClose(InputStream * is)
{
struct http_data *data = (struct http_data *) is->data;
/*
* The cancellation routines in pthreads suck (and
* are probably unportable) and using signal handlers
* between threads is _definitely_ unportable.
*/
while (data->state != CONN_STATE_CLOSED)
trigger_action(data, CONN_ACTION_CLOSE, 1);
pthread_join(data->io_thread, NULL);
pthread_mutex_unlock(&data->empty_lock);
free_http_data(data);
return 0; return 0;
} }
int inputStream_httpAtEOF(InputStream * inStream) int inputStream_httpAtEOF(InputStream * is)
{ {
InputStreamHTTPData *data = (InputStreamHTTPData *) inStream->data; struct http_data *data = (struct http_data *) is->data;
switch (data->connState) { if (data->state == CONN_STATE_CLOSED && !ringbuf_read_space(data->rb))
case HTTP_CONN_STATE_CLOSED:
if (data->buflen == 0)
return 1; return 1;
default:
return 0; return 0;
}
} }
int inputStream_httpBuffer(InputStream * inStream) void inputStream_initHttp(void)
{ {
InputStreamHTTPData *data = (InputStreamHTTPData *) inStream->data; ConfigParam *param = getConfigParam(CONF_HTTP_PROXY_HOST);
ssize_t readed = 0; char *test;
if (data->connState == HTTP_CONN_STATE_REOPEN) { if (param) {
if (initHTTPConnection(inStream) < 0) proxy_host = param->value;
return -1;
param = getConfigParam(CONF_HTTP_PROXY_PORT);
if (!param) {
FATAL("%s specified but not %s\n", CONF_HTTP_PROXY_HOST,
CONF_HTTP_PROXY_PORT);
} }
proxy_port = param->value;
if (data->connState == HTTP_CONN_STATE_INIT) { param = getConfigParam(CONF_HTTP_PROXY_USER);
if (finishHTTPInit(inStream) < 0)
return -1; if (param) {
proxy_user = param->value;
param = getConfigParam(CONF_HTTP_PROXY_PASSWORD);
if (!param) {
FATAL("%s specified but not %s\n",
CONF_HTTP_PROXY_USER,
CONF_HTTP_PROXY_PASSWORD);
} }
if (data->connState == HTTP_CONN_STATE_HELLO) { proxy_password = param->value;
if (getHTTPHello(inStream) < 0) } else {
return -1; param = getConfigParam(CONF_HTTP_PROXY_PASSWORD);
if (param) {
FATAL("%s specified but not %s\n",
CONF_HTTP_PROXY_PASSWORD, CONF_HTTP_PROXY_USER);
}
}
} else if ((param = getConfigParam(CONF_HTTP_PROXY_PORT))) {
FATAL("%s specified but not %s, line %i\n",
CONF_HTTP_PROXY_PORT, CONF_HTTP_PROXY_HOST, param->line);
} else if ((param = getConfigParam(CONF_HTTP_PROXY_USER))) {
FATAL("%s specified but not %s, line %i\n",
CONF_HTTP_PROXY_USER, CONF_HTTP_PROXY_HOST, param->line);
} else if ((param = getConfigParam(CONF_HTTP_PROXY_PASSWORD))) {
FATAL("%s specified but not %s, line %i\n",
CONF_HTTP_PROXY_PASSWORD, CONF_HTTP_PROXY_HOST,
param->line);
} }
switch (data->connState) { param = getConfigParam(CONF_HTTP_BUFFER_SIZE);
case HTTP_CONN_STATE_OPEN:
case HTTP_CONN_STATE_CLOSED: if (param) {
break; long tmp = strtol(param->value, &test, 10);
default: if (*test != '\0' || tmp <= 0) {
return -1; FATAL("\"%s\" specified for %s at line %i is not a "
"positive integer\n",
param->value, CONF_HTTP_BUFFER_SIZE, param->line);
} }
if (data->buflen == 0 || data->buflen < data->icyMetaint) { buffer_size = tmp * 1024;
data->prebuffer = 1; }
} else if (data->buflen > prebufferSize) if (buffer_size < 4096)
data->prebuffer = 0; FATAL(CONF_HTTP_BUFFER_SIZE" must be >= 4KB\n");
if (data->connState == HTTP_CONN_STATE_OPEN && param = getConfigParam(CONF_HTTP_PREBUFFER_SIZE);
data->buflen < bufferSize - 1) {
readed = read(data->sock, data->buffer + data->buflen,
bufferSize - 1 - data->buflen);
/*
* If the connection is currently unavailable, or
* interrupted (EINTR)
* Don't give an error, so it's retried later.
* Max times in a row to retry this is HTTP_MAX_TRIES
*/
if (readed < 0 &&
(errno == EAGAIN || errno == EINTR) &&
data->tries < HTTP_MAX_TRIES) {
data->tries++;
DEBUG(__FILE__": Resource unavailable, trying %i "
"times again\n", HTTP_MAX_TRIES - data->tries);
readed = 0;
} else if (readed <= 0) {
while (close(data->sock) && errno == EINTR);
data->connState = HTTP_CONN_STATE_CLOSED;
readed = 0;
} else /* readed > 0, reset */
data->tries = 0;
data->buflen += readed; if (param) {
long tmp = strtol(param->value, &test, 10);
if (*test != '\0' || tmp <= 0) {
FATAL("\"%s\" specified for %s at line %i is not a "
"positive integer\n",
param->value, CONF_HTTP_PREBUFFER_SIZE,
param->line);
} }
if (data->buflen > prebufferSize) prebuffer_size = tmp * 1024;
data->prebuffer = 0; }
return (readed ? 1 : 0); if (prebuffer_size > buffer_size)
prebuffer_size = buffer_size;
assert(buffer_size > 0 && "http buffer_size too small");
assert(prebuffer_size > 0 && "http prebuffer_size too small");
} }
...@@ -27,7 +27,7 @@ ...@@ -27,7 +27,7 @@
#define BASE64_LENGTH(len) (4 * (((len) + 2) / 3)) #define BASE64_LENGTH(len) (4 * (((len) + 2) / 3))
static char *base64Dup(char *s) static char *base64dup(char *s)
{ {
int i; int i;
int len = strlen(s); int len = strlen(s);
...@@ -64,7 +64,7 @@ static char *base64Dup(char *s) ...@@ -64,7 +64,7 @@ static char *base64Dup(char *s)
return ret; return ret;
} }
static char *authString(const char *header, static char *auth_string(const char *header,
const char *user, const char *password) const char *user, const char *password)
{ {
char *ret = NULL; char *ret = NULL;
...@@ -80,7 +80,7 @@ static char *authString(const char *header, ...@@ -80,7 +80,7 @@ static char *authString(const char *header,
strcpy(temp, user); strcpy(temp, user);
strcat(temp, ":"); strcat(temp, ":");
strcat(temp, password); strcat(temp, password);
temp64 = base64Dup(temp); temp64 = base64dup(temp);
free(temp); free(temp);
ret = xmalloc(strlen(temp64) + strlen(header) + 3); ret = xmalloc(strlen(temp64) + strlen(header) + 3);
...@@ -95,7 +95,7 @@ static char *authString(const char *header, ...@@ -95,7 +95,7 @@ static char *authString(const char *header,
#define PROXY_AUTH_HEADER "Proxy-Authorization: Basic " #define PROXY_AUTH_HEADER "Proxy-Authorization: Basic "
#define HTTP_AUTH_HEADER "Authorization: Basic " #define HTTP_AUTH_HEADER "Authorization: Basic "
#define proxyAuthString(x, y) authString(PROXY_AUTH_HEADER, x, y) #define proxy_auth_string(x, y) auth_string(PROXY_AUTH_HEADER, x, y)
#define httpAuthString(x, y) authString(HTTP_AUTH_HEADER, x, y) #define http_auth_string(x, y) auth_string(HTTP_AUTH_HEADER, x, y)
#endif /* INPUT_STREAM_HTTP_AUTH_H */ #endif /* INPUT_STREAM_HTTP_AUTH_H */
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment