Commit 718b1b71 authored by Martin Wilck's avatar Martin Wilck Committed by Alexandre Julliard

Add server side support for async IO on files.

Use pread/pwrite to read/write from the offset specified in the overlapped structure.
parent f1a0de99
...@@ -55,6 +55,9 @@ DEFAULT_DEBUG_CHANNEL(file); ...@@ -55,6 +55,9 @@ DEFAULT_DEBUG_CHANNEL(file);
/* Size of per-process table of DOS handles */ /* Size of per-process table of DOS handles */
#define DOS_TABLE_SIZE 256 #define DOS_TABLE_SIZE 256
/* Macro to derive file offset from OVERLAPPED struct */
#define OVERLAPPED_OFFSET(overlapped) ((off_t) (overlapped)->Offset + ((off_t) (overlapped)->OffsetHigh << 32))
static HANDLE dos_handles[DOS_TABLE_SIZE]; static HANDLE dos_handles[DOS_TABLE_SIZE];
...@@ -1304,12 +1307,16 @@ static void FILE_AsyncReadService(async_private *ovp) ...@@ -1304,12 +1307,16 @@ static void FILE_AsyncReadService(async_private *ovp)
{ {
LPOVERLAPPED lpOverlapped = ovp->lpOverlapped; LPOVERLAPPED lpOverlapped = ovp->lpOverlapped;
int result, r; int result, r;
int already = lpOverlapped->InternalHigh;
TRACE("%p %p\n", lpOverlapped, ovp->buffer ); TRACE("%p %p\n", lpOverlapped, ovp->buffer );
/* check to see if the data is ready (non-blocking) */ /* check to see if the data is ready (non-blocking) */
result = read(ovp->fd, &ovp->buffer[lpOverlapped->InternalHigh],
ovp->count - lpOverlapped->InternalHigh); result = pread (ovp->fd, &ovp->buffer[already], ovp->count - already,
OVERLAPPED_OFFSET (lpOverlapped) + already);
if ((result < 0) && (errno == ESPIPE))
result = read (ovp->fd, &ovp->buffer[already], ovp->count - already);
if ( (result<0) && ((errno == EAGAIN) || (errno == EINTR))) if ( (result<0) && ((errno == EAGAIN) || (errno == EINTR)))
{ {
...@@ -1465,7 +1472,9 @@ BOOL WINAPI ReadFile( HANDLE hFile, LPVOID buffer, DWORD bytesToRead, ...@@ -1465,7 +1472,9 @@ BOOL WINAPI ReadFile( HANDLE hFile, LPVOID buffer, DWORD bytesToRead,
} }
/* see if we can read some data already (this shouldn't block) */ /* see if we can read some data already (this shouldn't block) */
result = read( unix_handle, buffer, bytesToRead ); result = pread( unix_handle, buffer, bytesToRead, OVERLAPPED_OFFSET(overlapped) );
if ((result < 0) && (errno == ESPIPE))
result = read( unix_handle, buffer, bytesToRead );
close(unix_handle); close(unix_handle);
if(result<0) if(result<0)
...@@ -1543,12 +1552,16 @@ static void FILE_AsyncWriteService(struct async_private *ovp) ...@@ -1543,12 +1552,16 @@ static void FILE_AsyncWriteService(struct async_private *ovp)
{ {
LPOVERLAPPED lpOverlapped = ovp->lpOverlapped; LPOVERLAPPED lpOverlapped = ovp->lpOverlapped;
int result, r; int result, r;
int already = lpOverlapped->InternalHigh;
TRACE("(%p %p)\n",lpOverlapped,ovp->buffer); TRACE("(%p %p)\n",lpOverlapped,ovp->buffer);
/* write some data (non-blocking) */ /* write some data (non-blocking) */
result = write(ovp->fd, &ovp->buffer[lpOverlapped->InternalHigh],
ovp->count-lpOverlapped->InternalHigh); result = pwrite(ovp->fd, &ovp->buffer[already], ovp->count - already,
OVERLAPPED_OFFSET (lpOverlapped) + already);
if ((result < 0) && (errno == ESPIPE))
result = write(ovp->fd, &ovp->buffer[already], ovp->count - already);
if ( (result<0) && ((errno == EAGAIN) || (errno == EINTR))) if ( (result<0) && ((errno == EAGAIN) || (errno == EINTR)))
{ {
...@@ -1678,7 +1691,11 @@ BOOL WINAPI WriteFile( HANDLE hFile, LPCVOID buffer, DWORD bytesToWrite, ...@@ -1678,7 +1691,11 @@ BOOL WINAPI WriteFile( HANDLE hFile, LPCVOID buffer, DWORD bytesToWrite,
} }
/* see if we can write some data already (this shouldn't block) */ /* see if we can write some data already (this shouldn't block) */
result = write( unix_handle, buffer, bytesToWrite );
result = pwrite( unix_handle, buffer, bytesToWrite, OVERLAPPED_OFFSET (overlapped) );
if ((result < 0) && (errno == ESPIPE))
result = write( unix_handle, buffer, bytesToWrite );
close(unix_handle); close(unix_handle);
if(result<0) if(result<0)
...@@ -1692,8 +1709,8 @@ BOOL WINAPI WriteFile( HANDLE hFile, LPCVOID buffer, DWORD bytesToWrite, ...@@ -1692,8 +1709,8 @@ BOOL WINAPI WriteFile( HANDLE hFile, LPCVOID buffer, DWORD bytesToWrite,
else else
result = 0; result = 0;
} }
/* if we read enough to keep the app happy, then return now */ /* if we wrote enough to keep the app happy, then return now */
if(result>=bytesToWrite) if(result>=bytesToWrite)
{ {
*bytesWritten = result; *bytesWritten = result;
......
...@@ -29,6 +29,7 @@ ...@@ -29,6 +29,7 @@
#include "handle.h" #include "handle.h"
#include "thread.h" #include "thread.h"
#include "request.h" #include "request.h"
#include "async.h"
struct file struct file
{ {
...@@ -39,6 +40,8 @@ struct file ...@@ -39,6 +40,8 @@ struct file
unsigned int flags; /* flags (FILE_FLAG_*) */ unsigned int flags; /* flags (FILE_FLAG_*) */
unsigned int sharing; /* file sharing mode */ unsigned int sharing; /* file sharing mode */
int drive_type; /* type of drive the file is on */ int drive_type; /* type of drive the file is on */
struct async_queue read_q;
struct async_queue write_q;
}; };
#define NAME_HASH_SIZE 37 #define NAME_HASH_SIZE 37
...@@ -47,10 +50,12 @@ static struct file *file_hash[NAME_HASH_SIZE]; ...@@ -47,10 +50,12 @@ static struct file *file_hash[NAME_HASH_SIZE];
static void file_dump( struct object *obj, int verbose ); static void file_dump( struct object *obj, int verbose );
static int file_get_poll_events( struct object *obj ); static int file_get_poll_events( struct object *obj );
static void file_poll_event( struct object *obj, int event );
static int file_get_fd( struct object *obj ); static int file_get_fd( struct object *obj );
static int file_flush( struct object *obj ); static int file_flush( struct object *obj );
static int file_get_info( struct object *obj, struct get_file_info_reply *reply ); static int file_get_info( struct object *obj, struct get_file_info_reply *reply );
static void file_destroy( struct object *obj ); static void file_destroy( struct object *obj );
static struct async_queue * file_queue_async(struct object *obj, struct async* async, int type, int count);
static const struct object_ops file_ops = static const struct object_ops file_ops =
{ {
...@@ -61,11 +66,11 @@ static const struct object_ops file_ops = ...@@ -61,11 +66,11 @@ static const struct object_ops file_ops =
default_poll_signaled, /* signaled */ default_poll_signaled, /* signaled */
no_satisfied, /* satisfied */ no_satisfied, /* satisfied */
file_get_poll_events, /* get_poll_events */ file_get_poll_events, /* get_poll_events */
default_poll_event, /* poll_event */ file_poll_event, /* poll_event */
file_get_fd, /* get_fd */ file_get_fd, /* get_fd */
file_flush, /* flush */ file_flush, /* flush */
file_get_info, /* get_file_info */ file_get_info, /* get_file_info */
NULL, /* queue_async */ file_queue_async, /* queue_async */
file_destroy /* destroy */ file_destroy /* destroy */
}; };
...@@ -116,6 +121,11 @@ static struct file *create_file_for_fd( int fd, unsigned int access, unsigned in ...@@ -116,6 +121,11 @@ static struct file *create_file_for_fd( int fd, unsigned int access, unsigned in
file->flags = attrs; file->flags = attrs;
file->sharing = sharing; file->sharing = sharing;
file->drive_type = drive_type; file->drive_type = drive_type;
if (file->flags & FILE_FLAG_OVERLAPPED)
{
init_async_queue (&file->read_q);
init_async_queue (&file->write_q);
}
} }
return file; return file;
} }
...@@ -253,6 +263,27 @@ static int file_get_poll_events( struct object *obj ) ...@@ -253,6 +263,27 @@ static int file_get_poll_events( struct object *obj )
return events; return events;
} }
static void file_poll_event( struct object *obj, int event )
{
struct file *file = (struct file *)obj;
assert( obj->ops == &file_ops );
if ( file->flags & FILE_FLAG_OVERLAPPED )
{
if( IS_READY(file->read_q) && (POLLIN & event) )
{
async_notify(file->read_q.head, STATUS_ALERTED);
return;
}
if( IS_READY(file->write_q) && (POLLOUT & event) )
{
async_notify(file->write_q.head, STATUS_ALERTED);
return;
}
}
default_poll_event( obj, event );
}
static int file_get_fd( struct object *obj ) static int file_get_fd( struct object *obj )
{ {
struct file *file = (struct file *)obj; struct file *file = (struct file *)obj;
...@@ -308,9 +339,44 @@ static int file_get_info( struct object *obj, struct get_file_info_reply *reply ...@@ -308,9 +339,44 @@ static int file_get_info( struct object *obj, struct get_file_info_reply *reply
reply->index_low = st.st_ino; reply->index_low = st.st_ino;
reply->serial = 0; /* FIXME */ reply->serial = 0; /* FIXME */
} }
if (file->flags & FILE_FLAG_OVERLAPPED) return FD_TYPE_OVERLAPPED;
return FD_TYPE_DEFAULT; return FD_TYPE_DEFAULT;
} }
static struct async_queue *file_queue_async(struct object *obj, struct async *async, int type, int count)
{
struct file *file = (struct file *)obj;
struct async_queue *q;
assert( obj->ops == &file_ops );
if ( !(file->flags & FILE_FLAG_OVERLAPPED) )
{
set_error ( STATUS_INVALID_HANDLE );
return NULL;
}
switch(type)
{
case ASYNC_TYPE_READ:
q = &file->read_q;
break;
case ASYNC_TYPE_WRITE:
q = &file->write_q;
break;
default:
set_error( STATUS_INVALID_PARAMETER );
return NULL;
}
if(async && !async->q)
async_insert(q, async);
return q;
}
static void file_destroy( struct object *obj ) static void file_destroy( struct object *obj )
{ {
struct file *file = (struct file *)obj; struct file *file = (struct file *)obj;
...@@ -326,6 +392,11 @@ static void file_destroy( struct object *obj ) ...@@ -326,6 +392,11 @@ static void file_destroy( struct object *obj )
if (file->flags & FILE_FLAG_DELETE_ON_CLOSE) unlink( file->name ); if (file->flags & FILE_FLAG_DELETE_ON_CLOSE) unlink( file->name );
free( file->name ); free( file->name );
} }
if (file->flags & FILE_FLAG_OVERLAPPED)
{
destroy_async_queue (&file->read_q);
destroy_async_queue (&file->write_q);
}
} }
/* set the last error depending on errno */ /* set the last error depending on errno */
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment