workqueue.c 141 KB
Newer Older
Linus Torvalds's avatar
Linus Torvalds committed
1
/*
Tejun Heo's avatar
Tejun Heo committed
2
 * kernel/workqueue.c - generic async execution with shared worker pool
Linus Torvalds's avatar
Linus Torvalds committed
3
 *
Tejun Heo's avatar
Tejun Heo committed
4
 * Copyright (C) 2002		Ingo Molnar
Linus Torvalds's avatar
Linus Torvalds committed
5
 *
Tejun Heo's avatar
Tejun Heo committed
6 7 8 9 10
 *   Derived from the taskqueue/keventd code by:
 *     David Woodhouse <dwmw2@infradead.org>
 *     Andrew Morton
 *     Kai Petzke <wpp@marie.physik.tu-berlin.de>
 *     Theodore Ts'o <tytso@mit.edu>
Linus Torvalds's avatar
Linus Torvalds committed
11
 *
Tejun Heo's avatar
Tejun Heo committed
12
 * Made to use alloc_percpu by Christoph Lameter.
Linus Torvalds's avatar
Linus Torvalds committed
13
 *
Tejun Heo's avatar
Tejun Heo committed
14 15
 * Copyright (C) 2010		SUSE Linux Products GmbH
 * Copyright (C) 2010		Tejun Heo <tj@kernel.org>
16
 *
Tejun Heo's avatar
Tejun Heo committed
17 18 19 20 21 22 23
 * This is the generic async execution mechanism.  Work items as are
 * executed in process context.  The worker pool is shared and
 * automatically managed.  There is one worker pool for each CPU and
 * one extra for works which are better served by workers which are
 * not bound to any specific CPU.
 *
 * Please read Documentation/workqueue.txt for details.
Linus Torvalds's avatar
Linus Torvalds committed
24 25
 */

26
#include <linux/export.h>
Linus Torvalds's avatar
Linus Torvalds committed
27 28 29 30 31 32 33 34 35 36
#include <linux/kernel.h>
#include <linux/sched.h>
#include <linux/init.h>
#include <linux/signal.h>
#include <linux/completion.h>
#include <linux/workqueue.h>
#include <linux/slab.h>
#include <linux/cpu.h>
#include <linux/notifier.h>
#include <linux/kthread.h>
37
#include <linux/hardirq.h>
38
#include <linux/mempolicy.h>
39
#include <linux/freezer.h>
40 41
#include <linux/kallsyms.h>
#include <linux/debug_locks.h>
42
#include <linux/lockdep.h>
Tejun Heo's avatar
Tejun Heo committed
43
#include <linux/idr.h>
44
#include <linux/jhash.h>
45
#include <linux/hashtable.h>
46
#include <linux/rculist.h>
47
#include <linux/nodemask.h>
48
#include <linux/moduleparam.h>
49
#include <linux/uaccess.h>
50

51
#include "workqueue_internal.h"
Mauro Ribeiro's avatar
Mauro Ribeiro committed
52
#include <mach/exynos-ss.h>
Linus Torvalds's avatar
Linus Torvalds committed
53

54
enum {
55 56
	/*
	 * worker_pool flags
57
	 *
58
	 * A bound pool is either associated or disassociated with its CPU.
59 60 61 62 63 64
	 * While associated (!DISASSOCIATED), all workers are bound to the
	 * CPU and none has %WORKER_UNBOUND set and concurrency management
	 * is in effect.
	 *
	 * While DISASSOCIATED, the cpu may be offline and all workers have
	 * %WORKER_UNBOUND set and concurrency management disabled, and may
65
	 * be executing on any CPU.  The pool behaves as an unbound one.
66
	 *
67 68
	 * Note that DISASSOCIATED should be flipped only while holding
	 * manager_mutex to avoid changing binding state while
69
	 * create_worker() is in progress.
70
	 */
71
	POOL_MANAGE_WORKERS	= 1 << 0,	/* need to manage workers */
72
	POOL_DISASSOCIATED	= 1 << 2,	/* cpu can't serve workers */
73
	POOL_FREEZING		= 1 << 3,	/* freeze in progress */
74

75 76 77 78
	/* worker flags */
	WORKER_STARTED		= 1 << 0,	/* started */
	WORKER_DIE		= 1 << 1,	/* die die die */
	WORKER_IDLE		= 1 << 2,	/* is idle */
79
	WORKER_PREP		= 1 << 3,	/* preparing to run works */
80
	WORKER_CPU_INTENSIVE	= 1 << 6,	/* cpu intensive */
81
	WORKER_UNBOUND		= 1 << 7,	/* worker is unbound */
82
	WORKER_REBOUND		= 1 << 8,	/* worker was rebound */
83

84 85
	WORKER_NOT_RUNNING	= WORKER_PREP | WORKER_CPU_INTENSIVE |
				  WORKER_UNBOUND | WORKER_REBOUND,
86

87
	NR_STD_WORKER_POOLS	= 2,		/* # standard pools per cpu */
88

89
	UNBOUND_POOL_HASH_ORDER	= 6,		/* hashed by pool->attrs */
90
	BUSY_WORKER_HASH_ORDER	= 6,		/* 64 pointers */
91

92 93 94
	MAX_IDLE_WORKERS_RATIO	= 4,		/* 1/4 of busy can be idle */
	IDLE_WORKER_TIMEOUT	= 300 * HZ,	/* keep idle ones for 5 mins */

95 96 97
	MAYDAY_INITIAL_TIMEOUT  = HZ / 100 >= 2 ? HZ / 100 : 2,
						/* call for help after 10ms
						   (min two ticks) */
98 99 100 101 102 103 104 105
	MAYDAY_INTERVAL		= HZ / 10,	/* and then every 100ms */
	CREATE_COOLDOWN		= HZ,		/* time to breath after fail */

	/*
	 * Rescue workers are used only on emergencies and shared by
	 * all cpus.  Give -20.
	 */
	RESCUER_NICE_LEVEL	= -20,
106
	HIGHPRI_NICE_LEVEL	= -20,
107 108

	WQ_NAME_LEN		= 24,
109
};
Linus Torvalds's avatar
Linus Torvalds committed
110 111

/*
112 113
 * Structure fields follow one of the following exclusion rules.
 *
114 115
 * I: Modifiable by initialization/destruction paths and read-only for
 *    everyone else.
116
 *
117 118 119
 * P: Preemption protected.  Disabling preemption is enough and should
 *    only be modified and accessed from the local cpu.
 *
120
 * L: pool->lock protected.  Access with pool->lock held.
121
 *
122 123 124 125
 * X: During normal operation, modification requires pool->lock and should
 *    be done only from local cpu.  Either disabling preemption on local
 *    cpu or grabbing pool->lock is enough for read access.  If
 *    POOL_DISASSOCIATED is set, it's identical to L.
126
 *
127 128 129
 * MG: pool->manager_mutex and pool->lock protected.  Writes require both
 *     locks.  Reads can happen under either lock.
 *
130
 * PL: wq_pool_mutex protected.
131
 *
132
 * PR: wq_pool_mutex protected for writes.  Sched-RCU protected for reads.
133
 *
134 135
 * WQ: wq->mutex protected.
 *
136
 * WR: wq->mutex protected for writes.  Sched-RCU protected for reads.
137 138
 *
 * MD: wq_mayday_lock protected.
Linus Torvalds's avatar
Linus Torvalds committed
139 140
 */

141
/* struct worker is defined in workqueue_internal.h */
Tejun Heo's avatar
Tejun Heo committed
142

143
struct worker_pool {
144
	spinlock_t		lock;		/* the pool lock */
145
	int			cpu;		/* I: the associated cpu */
146
	int			node;		/* I: the associated node ID */
147
	int			id;		/* I: pool ID */
148
	unsigned int		flags;		/* X: flags */
149 150 151

	struct list_head	worklist;	/* L: list of pending works */
	int			nr_workers;	/* L: total number of workers */
152 153

	/* nr_idle includes the ones off idle_list for rebinding */
154 155 156 157 158 159
	int			nr_idle;	/* L: currently idle ones */

	struct list_head	idle_list;	/* X: list of idle workers */
	struct timer_list	idle_timer;	/* L: worker idle timeout */
	struct timer_list	mayday_timer;	/* L: SOS timer for workers */

160
	/* a workers is either on busy_hash or idle_list, or the manager */
161 162 163
	DECLARE_HASHTABLE(busy_hash, BUSY_WORKER_HASH_ORDER);
						/* L: hash of busy workers */

164
	/* see manage_workers() for details on the two manager mutexes */
165
	struct mutex		manager_arb;	/* manager arbitration */
166
	struct mutex		manager_mutex;	/* manager exclusion */
167
	struct idr		worker_idr;	/* MG: worker IDs and iteration */
168

169
	struct workqueue_attrs	*attrs;		/* I: worker attributes */
170 171
	struct hlist_node	hash_node;	/* PL: unbound_pool_hash node */
	int			refcnt;		/* PL: refcnt for unbound pools */
172

173 174 175 176 177 178
	/*
	 * The current concurrency level.  As it's likely to be accessed
	 * from other CPUs during try_to_wake_up(), put it in a separate
	 * cacheline.
	 */
	atomic_t		nr_running ____cacheline_aligned_in_smp;
179 180 181 182 183 184

	/*
	 * Destruction of pool is sched-RCU protected to allow dereferences
	 * from get_work_pool().
	 */
	struct rcu_head		rcu;
185 186
} ____cacheline_aligned_in_smp;

Linus Torvalds's avatar
Linus Torvalds committed
187
/*
188 189 190 191
 * The per-pool workqueue.  While queued, the lower WORK_STRUCT_FLAG_BITS
 * of work_struct->data are used for flags and the remaining high bits
 * point to the pwq; thus, pwqs need to be aligned at two's power of the
 * number of flag bits.
Linus Torvalds's avatar
Linus Torvalds committed
192
 */
193
struct pool_workqueue {
194
	struct worker_pool	*pool;		/* I: the associated pool */
195
	struct workqueue_struct *wq;		/* I: the owning workqueue */
196 197
	int			work_color;	/* L: current color */
	int			flush_color;	/* L: flushing color */
198
	int			refcnt;		/* L: reference count */
199 200
	int			nr_in_flight[WORK_NR_COLORS];
						/* L: nr of in_flight works */
201
	int			nr_active;	/* L: nr of active works */
202
	int			max_active;	/* L: max active works */
203
	struct list_head	delayed_works;	/* L: delayed works */
204
	struct list_head	pwqs_node;	/* WR: node on wq->pwqs */
205
	struct list_head	mayday_node;	/* MD: node on wq->maydays */
206 207 208 209 210

	/*
	 * Release of unbound pwq is punted to system_wq.  See put_pwq()
	 * and pwq_unbound_release_workfn() for details.  pool_workqueue
	 * itself is also sched-RCU protected so that the first pwq can be
211
	 * determined without grabbing wq->mutex.
212 213 214
	 */
	struct work_struct	unbound_release_work;
	struct rcu_head		rcu;
215
} __aligned(1 << WORK_STRUCT_FLAG_BITS);
Linus Torvalds's avatar
Linus Torvalds committed
216

217 218 219 220
/*
 * Structure used to wait for workqueue flush.
 */
struct wq_flusher {
221 222
	struct list_head	list;		/* WQ: list of flushers */
	int			flush_color;	/* WQ: flush color waiting for */
223 224 225
	struct completion	done;		/* flush completion */
};

226 227
struct wq_device;

Linus Torvalds's avatar
Linus Torvalds committed
228
/*
229 230
 * The externally visible workqueue.  It relays the issued work items to
 * the appropriate worker_pool through its pool_workqueues.
Linus Torvalds's avatar
Linus Torvalds committed
231 232
 */
struct workqueue_struct {
233
	struct list_head	pwqs;		/* WR: all pwqs of this wq */
234
	struct list_head	list;		/* PL: list of all workqueues */
235

236 237 238
	struct mutex		mutex;		/* protects this wq */
	int			work_color;	/* WQ: current work color */
	int			flush_color;	/* WQ: current flush color */
239
	atomic_t		nr_pwqs_to_flush; /* flush in progress */
240 241 242
	struct wq_flusher	*first_flusher;	/* WQ: first flusher */
	struct list_head	flusher_queue;	/* WQ: flush waiters */
	struct list_head	flusher_overflow; /* WQ: flush overflow list */
243

244
	struct list_head	maydays;	/* MD: pwqs requesting rescue */
245 246
	struct worker		*rescuer;	/* I: rescue worker */

247
	int			nr_drainers;	/* WQ: drain in progress */
248
	int			saved_max_active; /* WQ: saved pwq max_active */
249

250
	struct workqueue_attrs	*unbound_attrs;	/* WQ: only for unbound wqs */
251
	struct pool_workqueue	*dfl_pwq;	/* WQ: only for unbound wqs */
252

253 254 255
#ifdef CONFIG_SYSFS
	struct wq_device	*wq_dev;	/* I: for sysfs interface */
#endif
256
#ifdef CONFIG_LOCKDEP
257
	struct lockdep_map	lockdep_map;
258
#endif
259
	char			name[WQ_NAME_LEN]; /* I: workqueue name */
260 261 262 263

	/* hot fields used during command issue, aligned to cacheline */
	unsigned int		flags ____cacheline_aligned; /* WQ: WQ_* flags */
	struct pool_workqueue __percpu *cpu_pwqs; /* I: per-cpu pwqs */
264
	struct pool_workqueue __rcu *numa_pwq_tbl[]; /* FR: unbound pwqs indexed by node */
Linus Torvalds's avatar
Linus Torvalds committed
265 266
};

267 268
static struct kmem_cache *pwq_cache;

269 270 271 272
static int wq_numa_tbl_len;		/* highest possible NUMA node id + 1 */
static cpumask_var_t *wq_numa_possible_cpumask;
					/* possible CPUs of each node */

273 274 275
static bool wq_disable_numa;
module_param_named(disable_numa, wq_disable_numa, bool, 0444);

276 277
static bool wq_numa_enabled;		/* unbound NUMA affinity enabled */

278 279 280
/* buf for wq_update_unbound_numa_attrs(), protected by CPU hotplug exclusion */
static struct workqueue_attrs *wq_update_unbound_numa_attrs_buf;

281
static DEFINE_MUTEX(wq_pool_mutex);	/* protects pools and workqueues list */
282
static DEFINE_SPINLOCK(wq_mayday_lock);	/* protects wq->maydays list */
283

284 285
static LIST_HEAD(workqueues);		/* PL: list of all workqueues */
static bool workqueue_freezing;		/* PL: have wqs started freezing? */
286 287 288 289 290

/* the per-cpu worker pools */
static DEFINE_PER_CPU_SHARED_ALIGNED(struct worker_pool [NR_STD_WORKER_POOLS],
				     cpu_worker_pools);

291
static DEFINE_IDR(worker_pool_idr);	/* PR: idr of all pools */
292

293
/* PL: hash of all unbound pools keyed by pool->attrs */
294 295
static DEFINE_HASHTABLE(unbound_pool_hash, UNBOUND_POOL_HASH_ORDER);

296
/* I: attributes used when instantiating standard unbound pools on demand */
297 298
static struct workqueue_attrs *unbound_std_wq_attrs[NR_STD_WORKER_POOLS];

299 300 301
/* I: attributes used when instantiating ordered pools on demand */
static struct workqueue_attrs *ordered_wq_attrs[NR_STD_WORKER_POOLS];

302
struct workqueue_struct *system_wq __read_mostly;
303
EXPORT_SYMBOL(system_wq);
304
struct workqueue_struct *system_highpri_wq __read_mostly;
305
EXPORT_SYMBOL_GPL(system_highpri_wq);
306
struct workqueue_struct *system_long_wq __read_mostly;
307
EXPORT_SYMBOL_GPL(system_long_wq);
308
struct workqueue_struct *system_unbound_wq __read_mostly;
309
EXPORT_SYMBOL_GPL(system_unbound_wq);
310
struct workqueue_struct *system_freezable_wq __read_mostly;
311
EXPORT_SYMBOL_GPL(system_freezable_wq);
312

313 314 315 316
static int worker_thread(void *__worker);
static void copy_workqueue_attrs(struct workqueue_attrs *to,
				 const struct workqueue_attrs *from);

317 318 319
#define CREATE_TRACE_POINTS
#include <trace/events/workqueue.h>

320
#define assert_rcu_or_pool_mutex()					\
321
	rcu_lockdep_assert(rcu_read_lock_sched_held() ||		\
322 323
			   lockdep_is_held(&wq_pool_mutex),		\
			   "sched RCU or wq_pool_mutex should be held")
324

325
#define assert_rcu_or_wq_mutex(wq)					\
326
	rcu_lockdep_assert(rcu_read_lock_sched_held() ||		\
327
			   lockdep_is_held(&wq->mutex),			\
328
			   "sched RCU or wq->mutex should be held")
329

330 331
#ifdef CONFIG_LOCKDEP
#define assert_manager_or_pool_lock(pool)				\
332 333
	WARN_ONCE(debug_locks &&					\
		  !lockdep_is_held(&(pool)->manager_mutex) &&		\
334 335 336 337 338 339
		  !lockdep_is_held(&(pool)->lock),			\
		  "pool->manager_mutex or ->lock should be held")
#else
#define assert_manager_or_pool_lock(pool)	do { } while (0)
#endif

340 341 342
#define for_each_cpu_worker_pool(pool, cpu)				\
	for ((pool) = &per_cpu(cpu_worker_pools, cpu)[0];		\
	     (pool) < &per_cpu(cpu_worker_pools, cpu)[NR_STD_WORKER_POOLS]; \
343
	     (pool)++)
344

345 346 347
/**
 * for_each_pool - iterate through all worker_pools in the system
 * @pool: iteration cursor
348
 * @pi: integer used for iteration
349
 *
350 351 352
 * This must be called either with wq_pool_mutex held or sched RCU read
 * locked.  If the pool needs to be used beyond the locking in effect, the
 * caller is responsible for guaranteeing that the pool stays online.
353 354 355
 *
 * The if/else clause exists only for the lockdep assertion and can be
 * ignored.
356
 */
357 358
#define for_each_pool(pool, pi)						\
	idr_for_each_entry(&worker_pool_idr, pool, pi)			\
359
		if (({ assert_rcu_or_pool_mutex(); false; })) { }	\
360
		else
361

362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377
/**
 * for_each_pool_worker - iterate through all workers of a worker_pool
 * @worker: iteration cursor
 * @wi: integer used for iteration
 * @pool: worker_pool to iterate workers of
 *
 * This must be called with either @pool->manager_mutex or ->lock held.
 *
 * The if/else clause exists only for the lockdep assertion and can be
 * ignored.
 */
#define for_each_pool_worker(worker, wi, pool)				\
	idr_for_each_entry(&(pool)->worker_idr, (worker), (wi))		\
		if (({ assert_manager_or_pool_lock((pool)); false; })) { } \
		else

378 379 380 381
/**
 * for_each_pwq - iterate through all pool_workqueues of the specified workqueue
 * @pwq: iteration cursor
 * @wq: the target workqueue
382
 *
383
 * This must be called either with wq->mutex held or sched RCU read locked.
384 385
 * If the pwq needs to be used beyond the locking in effect, the caller is
 * responsible for guaranteeing that the pwq stays online.
386 387 388
 *
 * The if/else clause exists only for the lockdep assertion and can be
 * ignored.
389 390
 */
#define for_each_pwq(pwq, wq)						\
391
	list_for_each_entry_rcu((pwq), &(wq)->pwqs, pwqs_node)		\
392
		if (({ assert_rcu_or_wq_mutex(wq); false; })) { }	\
393
		else
394

395 396 397 398
#ifdef CONFIG_DEBUG_OBJECTS_WORK

static struct debug_obj_descr work_debug_descr;

399 400 401 402 403
static void *work_debug_hint(void *addr)
{
	return ((struct work_struct *) addr)->func;
}

404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438
/*
 * fixup_init is called when:
 * - an active object is initialized
 */
static int work_fixup_init(void *addr, enum debug_obj_state state)
{
	struct work_struct *work = addr;

	switch (state) {
	case ODEBUG_STATE_ACTIVE:
		cancel_work_sync(work);
		debug_object_init(work, &work_debug_descr);
		return 1;
	default:
		return 0;
	}
}

/*
 * fixup_activate is called when:
 * - an active object is activated
 * - an unknown object is activated (might be a statically initialized object)
 */
static int work_fixup_activate(void *addr, enum debug_obj_state state)
{
	struct work_struct *work = addr;

	switch (state) {

	case ODEBUG_STATE_NOTAVAILABLE:
		/*
		 * This is not really a fixup. The work struct was
		 * statically initialized. We just make sure that it
		 * is tracked in the object tracker.
		 */
439
		if (test_bit(WORK_STRUCT_STATIC_BIT, work_data_bits(work))) {
440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474
			debug_object_init(work, &work_debug_descr);
			debug_object_activate(work, &work_debug_descr);
			return 0;
		}
		WARN_ON_ONCE(1);
		return 0;

	case ODEBUG_STATE_ACTIVE:
		WARN_ON(1);

	default:
		return 0;
	}
}

/*
 * fixup_free is called when:
 * - an active object is freed
 */
static int work_fixup_free(void *addr, enum debug_obj_state state)
{
	struct work_struct *work = addr;

	switch (state) {
	case ODEBUG_STATE_ACTIVE:
		cancel_work_sync(work);
		debug_object_free(work, &work_debug_descr);
		return 1;
	default:
		return 0;
	}
}

static struct debug_obj_descr work_debug_descr = {
	.name		= "work_struct",
475
	.debug_hint	= work_debug_hint,
476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510
	.fixup_init	= work_fixup_init,
	.fixup_activate	= work_fixup_activate,
	.fixup_free	= work_fixup_free,
};

static inline void debug_work_activate(struct work_struct *work)
{
	debug_object_activate(work, &work_debug_descr);
}

static inline void debug_work_deactivate(struct work_struct *work)
{
	debug_object_deactivate(work, &work_debug_descr);
}

void __init_work(struct work_struct *work, int onstack)
{
	if (onstack)
		debug_object_init_on_stack(work, &work_debug_descr);
	else
		debug_object_init(work, &work_debug_descr);
}
EXPORT_SYMBOL_GPL(__init_work);

void destroy_work_on_stack(struct work_struct *work)
{
	debug_object_free(work, &work_debug_descr);
}
EXPORT_SYMBOL_GPL(destroy_work_on_stack);

#else
static inline void debug_work_activate(struct work_struct *work) { }
static inline void debug_work_deactivate(struct work_struct *work) { }
#endif

511 512 513 514 515
/* allocate ID and assign it to @pool */
static int worker_pool_assign_id(struct worker_pool *pool)
{
	int ret;

516
	lockdep_assert_held(&wq_pool_mutex);
517

518
	ret = idr_alloc(&worker_pool_idr, pool, 0, 0, GFP_KERNEL);
519
	if (ret >= 0) {
520
		pool->id = ret;
521 522
		return 0;
	}
523
	return ret;
524 525
}

526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541
/**
 * unbound_pwq_by_node - return the unbound pool_workqueue for the given node
 * @wq: the target workqueue
 * @node: the node ID
 *
 * This must be called either with pwq_lock held or sched RCU read locked.
 * If the pwq needs to be used beyond the locking in effect, the caller is
 * responsible for guaranteeing that the pwq stays online.
 */
static struct pool_workqueue *unbound_pwq_by_node(struct workqueue_struct *wq,
						  int node)
{
	assert_rcu_or_wq_mutex(wq);
	return rcu_dereference_raw(wq->numa_pwq_tbl[node]);
}

542 543 544 545 546 547 548 549 550 551 552 553 554 555 556
static unsigned int work_color_to_flags(int color)
{
	return color << WORK_STRUCT_COLOR_SHIFT;
}

static int get_work_color(struct work_struct *work)
{
	return (*work_data_bits(work) >> WORK_STRUCT_COLOR_SHIFT) &
		((1 << WORK_STRUCT_COLOR_BITS) - 1);
}

static int work_next_color(int color)
{
	return (color + 1) % WORK_NR_COLORS;
}
Linus Torvalds's avatar
Linus Torvalds committed
557

558
/*
559 560
 * While queued, %WORK_STRUCT_PWQ is set and non flag bits of a work's data
 * contain the pointer to the queued pwq.  Once execution starts, the flag
561
 * is cleared and the high bits contain OFFQ flags and pool ID.
562
 *
563 564
 * set_work_pwq(), set_work_pool_and_clear_pending(), mark_work_canceling()
 * and clear_work_data() can be used to set the pwq, pool or clear
565 566
 * work->data.  These functions should only be called while the work is
 * owned - ie. while the PENDING bit is set.
567
 *
568
 * get_work_pool() and get_work_pwq() can be used to obtain the pool or pwq
569
 * corresponding to a work.  Pool is available once the work has been
570
 * queued anywhere after initialization until it is sync canceled.  pwq is
571
 * available only while the work item is queued.
572
 *
573 574 575 576
 * %WORK_OFFQ_CANCELING is used to mark a work item which is being
 * canceled.  While being canceled, a work item may have its PENDING set
 * but stay off timer and worklist for arbitrarily long and nobody should
 * try to steal the PENDING bit.
577
 */
578 579
static inline void set_work_data(struct work_struct *work, unsigned long data,
				 unsigned long flags)
580
{
581
	WARN_ON_ONCE(!work_pending(work));
582 583
	atomic_long_set(&work->data, data | flags | work_static(work));
}
584

585
static void set_work_pwq(struct work_struct *work, struct pool_workqueue *pwq,
586 587
			 unsigned long extra_flags)
{
588 589
	set_work_data(work, (unsigned long)pwq,
		      WORK_STRUCT_PENDING | WORK_STRUCT_PWQ | extra_flags);
590 591
}

592 593 594 595 596 597 598
static void set_work_pool_and_keep_pending(struct work_struct *work,
					   int pool_id)
{
	set_work_data(work, (unsigned long)pool_id << WORK_OFFQ_POOL_SHIFT,
		      WORK_STRUCT_PENDING);
}

599 600
static void set_work_pool_and_clear_pending(struct work_struct *work,
					    int pool_id)
601
{
602 603 604 605 606 607 608
	/*
	 * The following wmb is paired with the implied mb in
	 * test_and_set_bit(PENDING) and ensures all updates to @work made
	 * here are visible to and precede any updates by the next PENDING
	 * owner.
	 */
	smp_wmb();
609
	set_work_data(work, (unsigned long)pool_id << WORK_OFFQ_POOL_SHIFT, 0);
610
}
611

612
static void clear_work_data(struct work_struct *work)
Linus Torvalds's avatar
Linus Torvalds committed
613
{
614 615
	smp_wmb();	/* see set_work_pool_and_clear_pending() */
	set_work_data(work, WORK_STRUCT_NO_POOL, 0);
Linus Torvalds's avatar
Linus Torvalds committed
616 617
}

618
static struct pool_workqueue *get_work_pwq(struct work_struct *work)
619
{
620
	unsigned long data = atomic_long_read(&work->data);
621

622
	if (data & WORK_STRUCT_PWQ)
623 624 625
		return (void *)(data & WORK_STRUCT_WQ_DATA_MASK);
	else
		return NULL;
626 627
}

628 629 630 631 632
/**
 * get_work_pool - return the worker_pool a given work was associated with
 * @work: the work item of interest
 *
 * Return the worker_pool @work was last associated with.  %NULL if none.
633
 *
634 635 636
 * Pools are created and destroyed under wq_pool_mutex, and allows read
 * access under sched-RCU read lock.  As such, this function should be
 * called under wq_pool_mutex or with preemption disabled.
637 638 639 640 641
 *
 * All fields of the returned pool are accessible as long as the above
 * mentioned locking is in effect.  If the returned pool needs to be used
 * beyond the critical section, the caller is responsible for ensuring the
 * returned pool is and stays online.
642 643
 */
static struct worker_pool *get_work_pool(struct work_struct *work)
644
{
645
	unsigned long data = atomic_long_read(&work->data);
646
	int pool_id;
647

648
	assert_rcu_or_pool_mutex();
649

650 651
	if (data & WORK_STRUCT_PWQ)
		return ((struct pool_workqueue *)
652
			(data & WORK_STRUCT_WQ_DATA_MASK))->pool;
653

654 655
	pool_id = data >> WORK_OFFQ_POOL_SHIFT;
	if (pool_id == WORK_OFFQ_POOL_NONE)
656 657
		return NULL;

658
	return idr_find(&worker_pool_idr, pool_id);
659 660 661 662 663 664 665 666 667 668 669
}

/**
 * get_work_pool_id - return the worker pool ID a given work is associated with
 * @work: the work item of interest
 *
 * Return the worker_pool ID @work was last associated with.
 * %WORK_OFFQ_POOL_NONE if none.
 */
static int get_work_pool_id(struct work_struct *work)
{
670 671
	unsigned long data = atomic_long_read(&work->data);

672 673
	if (data & WORK_STRUCT_PWQ)
		return ((struct pool_workqueue *)
674
			(data & WORK_STRUCT_WQ_DATA_MASK))->pool->id;
675

676
	return data >> WORK_OFFQ_POOL_SHIFT;
677 678
}

679 680
static void mark_work_canceling(struct work_struct *work)
{
681
	unsigned long pool_id = get_work_pool_id(work);
682

683 684
	pool_id <<= WORK_OFFQ_POOL_SHIFT;
	set_work_data(work, pool_id | WORK_OFFQ_CANCELING, WORK_STRUCT_PENDING);
685 686 687 688 689 690
}

static bool work_is_canceling(struct work_struct *work)
{
	unsigned long data = atomic_long_read(&work->data);

691
	return !(data & WORK_STRUCT_PWQ) && (data & WORK_OFFQ_CANCELING);
692 693
}

694
/*
695 696
 * Policy functions.  These define the policies on how the global worker
 * pools are managed.  Unless noted otherwise, these functions assume that
697
 * they're being called with pool->lock held.
698 699
 */

700
static bool __need_more_worker(struct worker_pool *pool)
701
{
702
	return !atomic_read(&pool->nr_running);
703 704
}

705
/*
706 707
 * Need to wake up a worker?  Called from anything but currently
 * running workers.
708 709
 *
 * Note that, because unbound workers never contribute to nr_running, this
710
 * function will always return %true for unbound pools as long as the
711
 * worklist isn't empty.
712
 */
713
static bool need_more_worker(struct worker_pool *pool)
714
{
715
	return !list_empty(&pool->worklist) && __need_more_worker(pool);
716
}
717

718
/* Can I start working?  Called from busy but !running workers. */
719
static bool may_start_working(struct worker_pool *pool)
720
{
721
	return pool->nr_idle;
722 723 724
}

/* Do I need to keep working?  Called from currently running workers. */
725
static bool keep_working(struct worker_pool *pool)
726
{
727 728
	return !list_empty(&pool->worklist) &&
		atomic_read(&pool->nr_running) <= 1;
729 730 731
}

/* Do we need a new worker?  Called from manager. */
732
static bool need_to_create_worker(struct worker_pool *pool)
733
{
734
	return need_more_worker(pool) && !may_start_working(pool);
735
}
736

737
/* Do I need to be the manager? */
738
static bool need_to_manage_workers(struct worker_pool *pool)
739
{
740
	return need_to_create_worker(pool) ||
741
		(pool->flags & POOL_MANAGE_WORKERS);
742 743 744
}

/* Do we have too many workers and should some go away? */
745
static bool too_many_workers(struct worker_pool *pool)
746
{
747
	bool managing = mutex_is_locked(&pool->manager_arb);
748 749
	int nr_idle = pool->nr_idle + managing; /* manager is considered idle */
	int nr_busy = pool->nr_workers - nr_idle;
750

751 752 753 754 755 756 757
	/*
	 * nr_idle and idle_list may disagree if idle rebinding is in
	 * progress.  Never return %true if idle_list is empty.
	 */
	if (list_empty(&pool->idle_list))
		return false;

758
	return nr_idle > 2 && (nr_idle - 2) * MAX_IDLE_WORKERS_RATIO >= nr_busy;
759 760
}

761
/*
762 763 764
 * Wake up functions.
 */

765
/* Return the first worker.  Safe with preemption disabled */
766
static struct worker *first_worker(struct worker_pool *pool)
767
{
768
	if (unlikely(list_empty(&pool->idle_list)))
769 770
		return NULL;

771
	return list_first_entry(&pool->idle_list, struct worker, entry);
772 773 774 775
}

/**
 * wake_up_worker - wake up an idle worker
776
 * @pool: worker pool to wake worker from
777
 *
778
 * Wake up the first idle worker of @pool.
779 780
 *
 * CONTEXT:
781
 * spin_lock_irq(pool->lock).
782
 */
783
static void wake_up_worker(struct worker_pool *pool)
784
{
785
	struct worker *worker = first_worker(pool);
786 787 788 789 790

	if (likely(worker))
		wake_up_process(worker->task);
}

791
/**
792 793 794 795 796 797 798 799 800 801
 * wq_worker_waking_up - a worker is waking up
 * @task: task waking up
 * @cpu: CPU @task is waking up to
 *
 * This function is called during try_to_wake_up() when a worker is
 * being awoken.
 *
 * CONTEXT:
 * spin_lock_irq(rq->lock)
 */
802
void wq_worker_waking_up(struct task_struct *task, int cpu)
803 804 805
{
	struct worker *worker = kthread_data(task);

806
	if (!(worker->flags & WORKER_NOT_RUNNING)) {
807
		WARN_ON_ONCE(worker->pool->cpu != cpu);
808
		atomic_inc(&worker->pool->nr_running);
809
	}
810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826
}

/**
 * wq_worker_sleeping - a worker is going to sleep
 * @task: task going to sleep
 * @cpu: CPU in question, must be the current CPU number
 *
 * This function is called during schedule() when a busy worker is
 * going to sleep.  Worker on the same cpu can be woken up by
 * returning pointer to its task.
 *
 * CONTEXT:
 * spin_lock_irq(rq->lock)
 *
 * RETURNS:
 * Worker task on @cpu to wake up, %NULL if none.
 */
827
struct task_struct *wq_worker_sleeping(struct task_struct *task, int cpu)
828 829
{
	struct worker *worker = kthread_data(task), *to_wakeup = NULL;
830
	struct worker_pool *pool;
831

832 833 834 835 836
	/*
	 * Rescuers, which may not have all the fields set up like normal
	 * workers, also reach here, let's not access anything before
	 * checking NOT_RUNNING.
	 */
837
	if (worker->flags & WORKER_NOT_RUNNING)
838 839
		return NULL;

840 841
	pool = worker->pool;

842
	/* this can only happen on the local cpu */
843 844
	if (WARN_ON_ONCE(cpu != raw_smp_processor_id()))
		return NULL;
845 846 847 848 849 850

	/*
	 * The counterpart of the following dec_and_test, implied mb,
	 * worklist not empty test sequence is in insert_work().
	 * Please read comment there.
	 *
851 852 853
	 * NOT_RUNNING is clear.  This means that we're bound to and
	 * running on the local cpu w/ rq lock held and preemption
	 * disabled, which in turn means that none else could be
854
	 * manipulating idle_list, so dereferencing idle_list without pool
855
	 * lock is safe.
856
	 */
857 858
	if (atomic_dec_and_test(&pool->nr_running) &&
	    !list_empty(&pool->worklist))
859
		to_wakeup = first_worker(pool);
860 861 862 863 864
	return to_wakeup ? to_wakeup->task : NULL;
}

/**
 * worker_set_flags - set worker flags and adjust nr_running accordingly
865
 * @worker: self
866 867 868
 * @flags: flags to set
 * @wakeup: wakeup an idle worker if necessary
 *
869 870 871
 * Set @flags in @worker->flags and adjust nr_running accordingly.  If
 * nr_running becomes zero and @wakeup is %true, an idle worker is
 * woken up.
872
 *
873
 * CONTEXT:
874
 * spin_lock_irq(pool->lock)
875 876 877 878
 */
static inline void worker_set_flags(struct worker *worker, unsigned int flags,
				    bool wakeup)
{
879
	struct worker_pool *pool = worker->pool;
880

881 882
	WARN_ON_ONCE(worker->task != current);

883 884 885 886 887 888 889 890
	/*
	 * If transitioning into NOT_RUNNING, adjust nr_running and
	 * wake up an idle worker as necessary if requested by
	 * @wakeup.
	 */
	if ((flags & WORKER_NOT_RUNNING) &&
	    !(worker->flags & WORKER_NOT_RUNNING)) {
		if (wakeup) {
891
			if (atomic_dec_and_test(&pool->nr_running) &&
892
			    !list_empty(&pool->worklist))
893
				wake_up_worker(pool);
894
		} else
895
			atomic_dec(&pool->nr_running);
896 897
	}

898 899 900 901
	worker->flags |= flags;
}

/**
902
 * worker_clr_flags - clear worker flags and adjust nr_running accordingly
903
 * @worker: self
904 905
 * @flags: flags to clear
 *
906
 * Clear @flags in @worker->flags and adjust nr_running accordingly.
907
 *
908
 * CONTEXT:
909
 * spin_lock_irq(pool->lock)
910 911 912
 */
static inline void worker_clr_flags(struct worker *worker, unsigned int flags)
{
913
	struct worker_pool *pool = worker->pool;
914 915
	unsigned int oflags = worker->flags;

916 917
	WARN_ON_ONCE(worker->task != current);

918
	worker->flags &= ~flags;
919

920 921 922 923 924
	/*
	 * If transitioning out of NOT_RUNNING, increment nr_running.  Note
	 * that the nested NOT_RUNNING is not a noop.  NOT_RUNNING is mask
	 * of multiple flags, not a single flag.
	 */
925 926
	if ((flags & WORKER_NOT_RUNNING) && (oflags & WORKER_NOT_RUNNING))
		if (!(worker->flags & WORKER_NOT_RUNNING))
927
			atomic_inc(&pool->nr_running);
928 929
}

930 931
/**
 * find_worker_executing_work - find worker which is executing a work
932
 * @pool: pool of interest
933 934
 * @work: work to find worker for
 *
935 936
 * Find a worker which is executing @work on @pool by searching
 * @pool->busy_hash which is keyed by the address of @work.  For a worker
937 938 939 940 941 942 943 944 945 946 947 948
 * to match, its current execution should match the address of @work and
 * its work function.  This is to avoid unwanted dependency between
 * unrelated work executions through a work item being recycled while still
 * being executed.
 *
 * This is a bit tricky.  A work item may be freed once its execution
 * starts and nothing prevents the freed area from being recycled for
 * another work item.  If the same work item address ends up being reused
 * before the original execution finishes, workqueue will identify the
 * recycled work item as currently executing and make it wait until the
 * current execution finishes, introducing an unwanted dependency.
 *
949 950 951 952 953 954
 * This function checks the work item address and work function to avoid
 * false positives.  Note that this isn't complete as one may construct a
 * work function which can introduce dependency onto itself through a
 * recycled work item.  Well, if somebody wants to shoot oneself in the
 * foot that badly, there's only so much we can do, and if such deadlock
 * actually occurs, it should be easy to locate the culprit work function.
955 956
 *
 * CONTEXT:
957
 * spin_lock_irq(pool->lock).
958 959 960 961
 *
 * RETURNS:
 * Pointer to worker which is executing @work if found, NULL
 * otherwise.
962
 */
963
static struct worker *find_worker_executing_work(struct worker_pool *pool,
964
						 struct work_struct *work)
965
{
966 967
	struct worker *worker;

968
	hash_for_each_possible(pool->busy_hash, worker, hentry,
969 970 971
			       (unsigned long)work)
		if (worker->current_work == work &&
		    worker->current_func == work->func)
972 973 974
			return worker;

	return NULL;
975 976
}

977 978 979 980 981 982 983 984 985 986 987 988 989 990 991
/**
 * move_linked_works - move linked works to a list
 * @work: start of series of works to be scheduled
 * @head: target list to append @work to
 * @nextp: out paramter for nested worklist walking
 *
 * Schedule linked works starting from @work to @head.  Work series to
 * be scheduled starts at @work and includes any consecutive work with
 * WORK_STRUCT_LINKED set in its predecessor.
 *
 * If @nextp is not NULL, it's updated to point to the next work of
 * the last scheduled work.  This allows move_linked_works() to be
 * nested inside outer list_for_each_entry_safe().
 *
 * CONTEXT:
992
 * spin_lock_irq(pool->lock).
993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017
 */
static void move_linked_works(struct work_struct *work, struct list_head *head,
			      struct work_struct **nextp)
{
	struct work_struct *n;

	/*
	 * Linked worklist will always end before the end of the list,
	 * use NULL for list head.
	 */
	list_for_each_entry_safe_from(work, n, NULL, entry) {
		list_move_tail(&work->entry, head);
		if (!(*work_data_bits(work) & WORK_STRUCT_LINKED))
			break;
	}

	/*
	 * If we're already inside safe list traversal and have moved
	 * multiple works to the scheduled queue, the next position
	 * needs to be updated.
	 */
	if (nextp)
		*nextp = n;
}

1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056
/**
 * get_pwq - get an extra reference on the specified pool_workqueue
 * @pwq: pool_workqueue to get
 *
 * Obtain an extra reference on @pwq.  The caller should guarantee that
 * @pwq has positive refcnt and be holding the matching pool->lock.
 */
static void get_pwq(struct pool_workqueue *pwq)
{
	lockdep_assert_held(&pwq->pool->lock);
	WARN_ON_ONCE(pwq->refcnt <= 0);
	pwq->refcnt++;
}

/**
 * put_pwq - put a pool_workqueue reference
 * @pwq: pool_workqueue to put
 *
 * Drop a reference of @pwq.  If its refcnt reaches zero, schedule its
 * destruction.  The caller should be holding the matching pool->lock.
 */
static void put_pwq(struct pool_workqueue *pwq)
{
	lockdep_assert_held(&pwq->pool->lock);
	if (likely(--pwq->refcnt))
		return;
	if (WARN_ON_ONCE(!(pwq->wq->flags & WQ_UNBOUND)))
		return;
	/*
	 * @pwq can't be released under pool->lock, bounce to
	 * pwq_unbound_release_workfn().  This never recurses on the same
	 * pool->lock as this path is taken only for unbound workqueues and
	 * the release work item is scheduled on a per-cpu workqueue.  To
	 * avoid lockdep warning, unbound pool->locks are given lockdep
	 * subclass of 1 in get_unbound_pool().
	 */
	schedule_work(&pwq->unbound_release_work);
}

1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075
/**
 * put_pwq_unlocked - put_pwq() with surrounding pool lock/unlock
 * @pwq: pool_workqueue to put (can be %NULL)
 *
 * put_pwq() with locking.  This function also allows %NULL @pwq.
 */
static void put_pwq_unlocked(struct pool_workqueue *pwq)
{
	if (pwq) {
		/*
		 * As both pwqs and pools are sched-RCU protected, the
		 * following lock operations are safe.
		 */
		spin_lock_irq(&pwq->pool->lock);
		put_pwq(pwq);
		spin_unlock_irq(&pwq->pool->lock);
	}
}

1076
static void pwq_activate_delayed_work(struct work_struct *work)
1077
{
1078
	struct pool_workqueue *pwq = get_work_pwq(work);
1079 1080

	trace_workqueue_activate_work(work);
1081
	move_linked_works(work, &pwq->pool->worklist, NULL);
1082
	__clear_bit(WORK_STRUCT_DELAYED_BIT, work_data_bits(work));
1083
	pwq->nr_active++;
1084 1085
}

1086
static void pwq_activate_first_delayed(struct pool_workqueue *pwq)
1087
{
1088
	struct work_struct *work = list_first_entry(&pwq->delayed_works,
1089 1090
						    struct work_struct, entry);

1091
	pwq_activate_delayed_work(work);
1092 1093
}

1094
/**
1095 1096
 * pwq_dec_nr_in_flight - decrement pwq's nr_in_flight
 * @pwq: pwq of interest
1097 1098 1099
 * @color: color of work which left the queue
 *
 * A work either has completed or is removed from pending queue,
1100
 * decrement nr_in_flight of its pwq and handle workqueue flushing.
1101 1102
 *
 * CONTEXT:
1103
 * spin_lock_irq(pool->lock).
1104
 */
1105
static void pwq_dec_nr_in_flight(struct pool_workqueue *pwq, int color)
1106
{
1107
	/* uncolored work items don't participate in flushing or nr_active */
1108
	if (color == WORK_NO_COLOR)
1109
		goto out_put;
1110

1111
	pwq->nr_in_flight[color]--;
1112

1113 1114
	pwq->nr_active--;
	if (!list_empty(&pwq->delayed_works)) {
1115
		/* one down, submit a delayed one */
1116 1117
		if (pwq->nr_active < pwq->max_active)
			pwq_activate_first_delayed(pwq);
1118 1119 1120
	}

	/* is flush in progress and are we at the flushing tip? */
1121
	if (likely(pwq->flush_color != color))
1122
		goto out_put;
1123 1124

	/* are there still in-flight works? */
1125
	if (pwq->nr_in_flight[color])
1126
		goto out_put;
1127

1128 1129
	/* this pwq is done, clear flush_color */
	pwq->flush_color = -1;
1130 1131

	/*
1132
	 * If this was the last pwq, wake up the first flusher.  It
1133 1134
	 * will handle the rest.
	 */
1135 1136
	if (atomic_dec_and_test(&pwq->wq->nr_pwqs_to_flush))
		complete(&pwq->wq->first_flusher->done);
1137 1138
out_put:
	put_pwq(pwq);
1139 1140
}

1141
/**
1142
 * try_to_grab_pending - steal work item from worklist and disable irq
1143 1144
 * @work: work item to steal
 * @is_dwork: @work is a delayed_work
1145
 * @flags: place to store irq state
1146 1147 1148 1149 1150 1151 1152
 *
 * Try to grab PENDING bit of @work.  This function can handle @work in any
 * stable state - idle, on timer or on worklist.  Return values are
 *
 *  1		if @work was pending and we successfully stole PENDING
 *  0		if @work was idle and we claimed PENDING
 *  -EAGAIN	if PENDING couldn't be grabbed at the moment, safe to busy-retry
1153 1154
 *  -ENOENT	if someone else is canceling @work, this state may persist
 *		for arbitrarily long
1155
 *
1156
 * On >= 0 return, the caller owns @work's PENDING bit.  To avoid getting
1157 1158 1159
 * interrupted while holding PENDING and @work off queue, irq must be
 * disabled on entry.  This, combined with delayed_work->timer being
 * irqsafe, ensures that we return -EAGAIN for finite short period of time.
1160 1161 1162 1163
 *
 * On successful return, >= 0, irq is disabled and the caller is
 * responsible for releasing it using local_irq_restore(*@flags).
 *
1164
 * This function is safe to call from any context including IRQ handler.
1165
 */
1166 1167
static int try_to_grab_pending(struct work_struct *work, bool is_dwork,
			       unsigned long *flags)
1168
{
1169
	struct worker_pool *pool;
1170
	struct pool_workqueue *pwq;
1171

1172 1173
	local_irq_save(*flags);

1174 1175 1176 1177
	/* try to steal the timer if it exists */
	if (is_dwork) {
		struct delayed_work *dwork = to_delayed_work(work);

1178 1179 1180 1181 1182
		/*
		 * dwork->timer is irqsafe.  If del_timer() fails, it's
		 * guaranteed that the timer is not queued anywhere and not
		 * running on the local CPU.
		 */
1183 1184 1185 1186 1187
		if (likely(del_timer(&dwork->timer)))
			return 1;
	}

	/* try to claim PENDING the normal way */
1188 1189 1190 1191 1192 1193 1194
	if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work)))
		return 0;

	/*
	 * The queueing is in progress, or it is already queued. Try to
	 * steal it from ->worklist without clearing WORK_STRUCT_PENDING.
	 */
1195 1196
	pool = get_work_pool(work);
	if (!pool)
1197
		goto fail;
1198

1199
	spin_lock(&pool->lock);
1200
	/*
1201 1202 1203 1204 1205
	 * work->data is guaranteed to point to pwq only while the work
	 * item is queued on pwq->wq, and both updating work->data to point
	 * to pwq on queueing and to pool on dequeueing are done under
	 * pwq->pool->lock.  This in turn guarantees that, if work->data
	 * points to pwq which is associated with a locked pool, the work
1206 1207
	 * item is currently queued on that pool.
	 */
1208 1209
	pwq = get_work_pwq(work);
	if (pwq && pwq->pool == pool) {
1210 1211 1212 1213 1214
		debug_work_deactivate(work);

		/*
		 * A delayed work item cannot be grabbed directly because
		 * it might have linked NO_COLOR work items which, if left
1215
		 * on the delayed_list, will confuse pwq->nr_active
1216 1217 1218 1219
		 * management later on and cause stall.  Make sure the work
		 * item is activated before grabbing.
		 */
		if (*work_data_bits(work) & WORK_STRUCT_DELAYED)
1220
			pwq_activate_delayed_work(work);
1221 1222

		list_del_init(&work->entry);
1223
		pwq_dec_nr_in_flight(get_work_pwq(work), get_work_color(work));
1224

1225
		/* work->data points to pwq iff queued, point to pool */
1226 1227 1228 1229
		set_work_pool_and_keep_pending(work, pool->id);

		spin_unlock(&pool->lock);
		return 1;
1230
	}
1231
	spin_unlock(&pool->lock);
1232 1233 1234 1235 1236
fail:
	local_irq_restore(*flags);
	if (work_is_canceling(work))
		return -ENOENT;
	cpu_relax();
1237
	return -EAGAIN;
1238 1239
}

1240
/**
1241
 * insert_work - insert a work into a pool
1242
 * @pwq: pwq @work belongs to
1243 1244 1245 1246
 * @work: work to insert
 * @head: insertion point
 * @extra_flags: extra WORK_STRUCT_* flags to set
 *
1247
 * Insert @work which belongs to @pwq after @head.  @extra_flags is or'd to
1248
 * work_struct flags.
1249 1250
 *
 * CONTEXT:
Tejun Heo's avatar