Commit 733b4385 authored by Nikolay Sivov's avatar Nikolay Sivov Committed by Alexandre Julliard

ntdll/threadpool: Add support for callback priority.

parent b2585d98
...@@ -517,12 +517,21 @@ static void test_tp_simple(void) ...@@ -517,12 +517,21 @@ static void test_tp_simple(void)
memset(&environment3, 0, sizeof(environment3)); memset(&environment3, 0, sizeof(environment3));
environment3.Version = 3; environment3.Version = 3;
environment3.Pool = pool; environment3.Pool = pool;
environment3.CallbackPriority = TP_CALLBACK_PRIORITY_NORMAL;
environment3.Size = sizeof(environment3); environment3.Size = sizeof(environment3);
for (i = 0; i < 3; ++i)
{
environment3.CallbackPriority = TP_CALLBACK_PRIORITY_HIGH + i;
status = pTpSimpleTryPost(simple_cb, semaphore, (TP_CALLBACK_ENVIRON *)&environment3);
ok(!status, "TpSimpleTryPost failed with status %x\n", status);
result = WaitForSingleObject(semaphore, 1000);
ok(result == WAIT_OBJECT_0, "WaitForSingleObject returned %u\n", result);
}
environment3.CallbackPriority = 10;
status = pTpSimpleTryPost(simple_cb, semaphore, (TP_CALLBACK_ENVIRON *)&environment3); status = pTpSimpleTryPost(simple_cb, semaphore, (TP_CALLBACK_ENVIRON *)&environment3);
ok(!status, "TpSimpleTryPost failed with status %x\n", status); ok(status == STATUS_INVALID_PARAMETER || broken(!status) /* Vista does not support priorities */,
result = WaitForSingleObject(semaphore, 1000); "TpSimpleTryPost failed with status %x\n", status);
ok(result == WAIT_OBJECT_0, "WaitForSingleObject returned %u\n", result);
/* test with invalid version number */ /* test with invalid version number */
memset(&environment, 0, sizeof(environment)); memset(&environment, 0, sizeof(environment));
......
...@@ -123,8 +123,8 @@ struct threadpool ...@@ -123,8 +123,8 @@ struct threadpool
LONG objcount; LONG objcount;
BOOL shutdown; BOOL shutdown;
CRITICAL_SECTION cs; CRITICAL_SECTION cs;
/* pool of work items, locked via .cs */ /* Pools of work items, locked via .cs, order matches TP_CALLBACK_PRIORITY - high, normal, low. */
struct list pool; struct list pools[3];
RTL_CONDITION_VARIABLE update_event; RTL_CONDITION_VARIABLE update_event;
/* information about worker threads, locked via .cs */ /* information about worker threads, locked via .cs */
int max_workers; int max_workers;
...@@ -155,6 +155,7 @@ struct threadpool_object ...@@ -155,6 +155,7 @@ struct threadpool_object
PTP_SIMPLE_CALLBACK finalization_callback; PTP_SIMPLE_CALLBACK finalization_callback;
BOOL may_run_long; BOOL may_run_long;
HMODULE race_dll; HMODULE race_dll;
TP_CALLBACK_PRIORITY priority;
/* information about the group, locked via .group->cs */ /* information about the group, locked via .group->cs */
struct list group_entry; struct list group_entry;
BOOL is_group_member; BOOL is_group_member;
...@@ -1648,6 +1649,7 @@ static void tp_waitqueue_unlock( struct threadpool_object *wait ) ...@@ -1648,6 +1649,7 @@ static void tp_waitqueue_unlock( struct threadpool_object *wait )
static NTSTATUS tp_threadpool_alloc( struct threadpool **out ) static NTSTATUS tp_threadpool_alloc( struct threadpool **out )
{ {
struct threadpool *pool; struct threadpool *pool;
unsigned int i;
pool = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*pool) ); pool = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*pool) );
if (!pool) if (!pool)
...@@ -1660,7 +1662,8 @@ static NTSTATUS tp_threadpool_alloc( struct threadpool **out ) ...@@ -1660,7 +1662,8 @@ static NTSTATUS tp_threadpool_alloc( struct threadpool **out )
RtlInitializeCriticalSection( &pool->cs ); RtlInitializeCriticalSection( &pool->cs );
pool->cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": threadpool.cs"); pool->cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": threadpool.cs");
list_init( &pool->pool ); for (i = 0; i < ARRAY_SIZE(pool->pools); ++i)
list_init( &pool->pools[i] );
RtlInitializeConditionVariable( &pool->update_event ); RtlInitializeConditionVariable( &pool->update_event );
pool->max_workers = 500; pool->max_workers = 500;
...@@ -1696,6 +1699,8 @@ static void tp_threadpool_shutdown( struct threadpool *pool ) ...@@ -1696,6 +1699,8 @@ static void tp_threadpool_shutdown( struct threadpool *pool )
*/ */
static BOOL tp_threadpool_release( struct threadpool *pool ) static BOOL tp_threadpool_release( struct threadpool *pool )
{ {
unsigned int i;
if (interlocked_dec( &pool->refcount )) if (interlocked_dec( &pool->refcount ))
return FALSE; return FALSE;
...@@ -1703,7 +1708,8 @@ static BOOL tp_threadpool_release( struct threadpool *pool ) ...@@ -1703,7 +1708,8 @@ static BOOL tp_threadpool_release( struct threadpool *pool )
assert( pool->shutdown ); assert( pool->shutdown );
assert( !pool->objcount ); assert( !pool->objcount );
assert( list_empty( &pool->pool ) ); for (i = 0; i < ARRAY_SIZE(pool->pools); ++i)
assert( list_empty( &pool->pools[i] ) );
pool->cs.DebugInfo->Spare[0] = 0; pool->cs.DebugInfo->Spare[0] = 0;
RtlDeleteCriticalSection( &pool->cs ); RtlDeleteCriticalSection( &pool->cs );
...@@ -1725,7 +1731,25 @@ static NTSTATUS tp_threadpool_lock( struct threadpool **out, TP_CALLBACK_ENVIRON ...@@ -1725,7 +1731,25 @@ static NTSTATUS tp_threadpool_lock( struct threadpool **out, TP_CALLBACK_ENVIRON
NTSTATUS status = STATUS_SUCCESS; NTSTATUS status = STATUS_SUCCESS;
if (environment) if (environment)
{
/* Validate environment parameters. */
if (environment->Version == 3)
{
TP_CALLBACK_ENVIRON_V3 *environment3 = (TP_CALLBACK_ENVIRON_V3 *)environment;
switch (environment3->CallbackPriority)
{
case TP_CALLBACK_PRIORITY_HIGH:
case TP_CALLBACK_PRIORITY_NORMAL:
case TP_CALLBACK_PRIORITY_LOW:
break;
default:
return STATUS_INVALID_PARAMETER;
}
}
pool = (struct threadpool *)environment->Pool; pool = (struct threadpool *)environment->Pool;
}
if (!pool) if (!pool)
{ {
...@@ -1860,6 +1884,7 @@ static void tp_object_initialize( struct threadpool_object *object, struct threa ...@@ -1860,6 +1884,7 @@ static void tp_object_initialize( struct threadpool_object *object, struct threa
object->finalization_callback = NULL; object->finalization_callback = NULL;
object->may_run_long = 0; object->may_run_long = 0;
object->race_dll = NULL; object->race_dll = NULL;
object->priority = TP_CALLBACK_PRIORITY_NORMAL;
memset( &object->group_entry, 0, sizeof(object->group_entry) ); memset( &object->group_entry, 0, sizeof(object->group_entry) );
object->is_group_member = FALSE; object->is_group_member = FALSE;
...@@ -1881,6 +1906,13 @@ static void tp_object_initialize( struct threadpool_object *object, struct threa ...@@ -1881,6 +1906,13 @@ static void tp_object_initialize( struct threadpool_object *object, struct threa
object->finalization_callback = environment->FinalizationCallback; object->finalization_callback = environment->FinalizationCallback;
object->may_run_long = environment->u.s.LongFunction != 0; object->may_run_long = environment->u.s.LongFunction != 0;
object->race_dll = environment->RaceDll; object->race_dll = environment->RaceDll;
if (environment->Version == 3)
{
TP_CALLBACK_ENVIRON_V3 *environment_v3 = (TP_CALLBACK_ENVIRON_V3 *)environment;
object->priority = environment_v3->CallbackPriority;
assert( object->priority < ARRAY_SIZE(pool->pools) );
}
if (environment->ActivationContext) if (environment->ActivationContext)
FIXME( "activation context not supported yet\n" ); FIXME( "activation context not supported yet\n" );
...@@ -1916,6 +1948,11 @@ static void tp_object_initialize( struct threadpool_object *object, struct threa ...@@ -1916,6 +1948,11 @@ static void tp_object_initialize( struct threadpool_object *object, struct threa
tp_object_release( object ); tp_object_release( object );
} }
static void tp_object_prio_queue( struct threadpool_object *object )
{
list_add_tail( &object->pool->pools[object->priority], &object->pool_entry );
}
/*********************************************************************** /***********************************************************************
* tp_object_submit (internal) * tp_object_submit (internal)
* *
...@@ -1940,7 +1977,7 @@ static void tp_object_submit( struct threadpool_object *object, BOOL signaled ) ...@@ -1940,7 +1977,7 @@ static void tp_object_submit( struct threadpool_object *object, BOOL signaled )
/* Queue work item and increment refcount. */ /* Queue work item and increment refcount. */
interlocked_inc( &object->refcount ); interlocked_inc( &object->refcount );
if (!object->num_pending_callbacks++) if (!object->num_pending_callbacks++)
list_add_tail( &pool->pool, &object->pool_entry ); tp_object_prio_queue( object );
/* Count how often the object was signaled. */ /* Count how often the object was signaled. */
if (object->type == TP_OBJECT_TYPE_WAIT && signaled) if (object->type == TP_OBJECT_TYPE_WAIT && signaled)
...@@ -2061,6 +2098,20 @@ static BOOL tp_object_release( struct threadpool_object *object ) ...@@ -2061,6 +2098,20 @@ static BOOL tp_object_release( struct threadpool_object *object )
return TRUE; return TRUE;
} }
static struct list *threadpool_get_next_item( const struct threadpool *pool )
{
struct list *ptr;
unsigned int i;
for (i = 0; i < ARRAY_SIZE(pool->pools); ++i)
{
if ((ptr = list_head( &pool->pools[i] )))
break;
}
return ptr;
}
/*********************************************************************** /***********************************************************************
* threadpool_worker_proc (internal) * threadpool_worker_proc (internal)
*/ */
...@@ -2080,7 +2131,7 @@ static void CALLBACK threadpool_worker_proc( void *param ) ...@@ -2080,7 +2131,7 @@ static void CALLBACK threadpool_worker_proc( void *param )
pool->num_busy_workers--; pool->num_busy_workers--;
for (;;) for (;;)
{ {
while ((ptr = list_head( &pool->pool ))) while ((ptr = threadpool_get_next_item( pool )))
{ {
struct threadpool_object *object = LIST_ENTRY( ptr, struct threadpool_object, pool_entry ); struct threadpool_object *object = LIST_ENTRY( ptr, struct threadpool_object, pool_entry );
assert( object->num_pending_callbacks > 0 ); assert( object->num_pending_callbacks > 0 );
...@@ -2089,7 +2140,7 @@ static void CALLBACK threadpool_worker_proc( void *param ) ...@@ -2089,7 +2140,7 @@ static void CALLBACK threadpool_worker_proc( void *param )
* the end of the pool list. Otherwise remove it from the pool. */ * the end of the pool list. Otherwise remove it from the pool. */
list_remove( &object->pool_entry ); list_remove( &object->pool_entry );
if (--object->num_pending_callbacks) if (--object->num_pending_callbacks)
list_add_tail( &pool->pool, &object->pool_entry ); tp_object_prio_queue( object );
/* For wait objects check if they were signaled or have timed out. */ /* For wait objects check if they were signaled or have timed out. */
if (object->type == TP_OBJECT_TYPE_WAIT) if (object->type == TP_OBJECT_TYPE_WAIT)
...@@ -2230,7 +2281,7 @@ static void CALLBACK threadpool_worker_proc( void *param ) ...@@ -2230,7 +2281,7 @@ static void CALLBACK threadpool_worker_proc( void *param )
* can be terminated. */ * can be terminated. */
timeout.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000; timeout.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000;
if (RtlSleepConditionVariableCS( &pool->update_event, &pool->cs, &timeout ) == STATUS_TIMEOUT && if (RtlSleepConditionVariableCS( &pool->update_event, &pool->cs, &timeout ) == STATUS_TIMEOUT &&
!list_head( &pool->pool ) && (pool->num_workers > max( pool->min_workers, 1 ) || !threadpool_get_next_item( pool ) && (pool->num_workers > max( pool->min_workers, 1 ) ||
(!pool->min_workers && !pool->objcount))) (!pool->min_workers && !pool->objcount)))
{ {
break; break;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment