WorkQueue.hxx 4.81 KB
Newer Older
1
/*
Max Kellermann's avatar
Max Kellermann committed
2
 * Copyright (C) 2003-2015 The Music Player Daemon Project
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
 * http://www.musicpd.org
 *
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; either version 2 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License along
 * with this program; if not, write to the Free Software Foundation, Inc.,
 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
 */

#ifndef _WORKQUEUE_H_INCLUDED_
#define _WORKQUEUE_H_INCLUDED_

#include "thread/Mutex.hxx"
#include "thread/Cond.hxx"

26
#include <assert.h>
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
#include <pthread.h>

#include <string>
#include <queue>

#define LOGINFO(X)
#define LOGERR(X)

/**
 * A WorkQueue manages the synchronisation around a queue of work items,
 * where a number of client threads queue tasks and a number of worker
 * threads take and execute them. The goal is to introduce some level
 * of parallelism between the successive steps of a previously single
 * threaded pipeline. For example data extraction / data preparation / index
 * update, but this could have other uses.
 *
 * There is no individual task status return. In case of fatal error,
 * the client or worker sets an end condition on the queue. A second
 * queue could conceivably be used for returning individual task
 * status.
 */
template <class T>
class WorkQueue {
	// Configuration
51
	const std::string name;
52 53 54

	// Status
	// Worker threads having called exit
55 56
	unsigned n_workers_exited;
	bool ok;
57

58 59
	unsigned n_threads;
	pthread_t *threads;
60 61

	// Synchronization
62 63 64 65
	std::queue<T> queue;
	Cond client_cond;
	Cond worker_cond;
	Mutex mutex;
66 67 68

public:
	/** Create a WorkQueue
Max Kellermann's avatar
Max Kellermann committed
69
	 * @param _name for message printing
70
	 */
71 72
	WorkQueue(const char *_name)
		:name(_name),
73
		 n_workers_exited(0),
74
		 ok(false),
75
		 n_threads(0), threads(nullptr)
76 77 78 79 80 81 82 83 84 85
	{
	}

	~WorkQueue() {
		setTerminateAndWait();
	}

	/** Start the worker threads.
	 *
	 * @param nworkers number of threads copies to start.
Max Kellermann's avatar
Max Kellermann committed
86
	 * @param workproc thread function. It should loop
87 88 89 90
	 *      taking (QueueWorker::take()) and executing tasks.
	 * @param arg initial parameter to thread function.
	 * @return true if ok.
	 */
91
	bool start(unsigned nworkers, void *(*workproc)(void *), void *arg)
92
	{
93
		const ScopeLock protect(mutex);
94

95
		assert(nworkers > 0);
96
		assert(!ok);
97 98 99
		assert(n_threads == 0);
		assert(threads == nullptr);

100 101
		n_threads = nworkers;
		threads = new pthread_t[n_threads];
102

103
		for  (unsigned i = 0; i < nworkers; i++) {
104
			int err;
105
			if ((err = pthread_create(&threads[i], 0, workproc, arg))) {
106
				LOGERR(("WorkQueue:%s: pthread_create failed, err %d\n",
107
					name.c_str(), err));
108 109 110
				return false;
			}
		}
111 112

		ok = true;
113 114 115 116 117 118 119
		return true;
	}

	/** Add item to work queue, called from client.
	 *
	 * Sleeps if there are already too many.
	 */
120 121
	template<typename U>
	bool put(U &&u)
122
	{
123
		const ScopeLock protect(mutex);
124

125
		queue.emplace(std::forward<U>(u));
126 127 128

		// Just wake one worker, there is only one new task.
		worker_cond.signal();
129 130 131 132 133 134 135 136 137

		return true;
	}


	/** Tell the workers to exit, and wait for them.
	 */
	void setTerminateAndWait()
	{
138
		const ScopeLock protect(mutex);
139 140

		// Wait for all worker threads to have called workerExit()
141
		ok = false;
142
		while (n_workers_exited < n_threads) {
143 144
			worker_cond.broadcast();
			client_cond.wait(mutex);
145 146 147 148
		}

		// Perform the thread joins and compute overall status
		// Workers return (void*)1 if ok
149
		for (unsigned i = 0; i < n_threads; ++i) {
150
			void *status;
151
			pthread_join(threads[i], &status);
152 153
		}

154 155 156 157
		delete[] threads;
		threads = nullptr;
		n_threads = 0;

158
		// Reset to start state.
159
		n_workers_exited = 0;
160 161 162 163 164 165 166
	}

	/** Take task from queue. Called from worker.
	 *
	 * Sleeps if there are not enough. Signal if we go to sleep on empty
	 * queue: client may be waiting for our going idle.
	 */
167
	bool take(T &tp)
168
	{
169
		const ScopeLock protect(mutex);
170

171
		if (!ok)
172 173
			return false;

174
		while (queue.empty()) {
175
			worker_cond.wait(mutex);
176
			if (!ok)
177 178 179
				return false;
		}

180
		tp = std::move(queue.front());
181
		queue.pop();
182 183 184 185 186 187 188
		return true;
	}

	/** Advertise exit and abort queue. Called from worker
	 *
	 * This would happen after an unrecoverable error, or when
	 * the queue is terminated by the client. Workers never exit normally,
189
	 * except when the queue is shut down (at which point ok is set to
190 191 192 193 194
	 * false by the shutdown code anyway). The thread must return/exit
	 * immediately after calling this.
	 */
	void workerExit()
	{
195
		const ScopeLock protect(mutex);
196

197 198 199
		n_workers_exited++;
		ok = false;
		client_cond.broadcast();
200 201 202 203
	}
};

#endif /* _WORKQUEUE_H_INCLUDED_ */