ring_buffer.c 131 KB
Newer Older
1 2 3 4 5
/*
 * Generic ring buffer
 *
 * Copyright (C) 2008 Steven Rostedt <srostedt@redhat.com>
 */
6
#include <linux/trace_events.h>
7
#include <linux/ring_buffer.h>
8
#include <linux/trace_clock.h>
9
#include <linux/sched/clock.h>
10
#include <linux/trace_seq.h>
11
#include <linux/spinlock.h>
12
#include <linux/irq_work.h>
13
#include <linux/uaccess.h>
14
#include <linux/hardirq.h>
15
#include <linux/kthread.h>	/* for self test */
16
#include <linux/kmemcheck.h>
17 18 19
#include <linux/module.h>
#include <linux/percpu.h>
#include <linux/mutex.h>
20
#include <linux/delay.h>
21
#include <linux/slab.h>
22 23 24
#include <linux/init.h>
#include <linux/hash.h>
#include <linux/list.h>
25
#include <linux/cpu.h>
26

27
#include <asm/local.h>
28

29 30
static void update_pages_handler(struct work_struct *work);

31 32 33 34 35
/*
 * The ring buffer header is special. We must manually up keep it.
 */
int ring_buffer_print_entry_header(struct trace_seq *s)
{
36 37 38 39 40 41 42 43 44 45 46 47 48
	trace_seq_puts(s, "# compressed entry header\n");
	trace_seq_puts(s, "\ttype_len    :    5 bits\n");
	trace_seq_puts(s, "\ttime_delta  :   27 bits\n");
	trace_seq_puts(s, "\tarray       :   32 bits\n");
	trace_seq_putc(s, '\n');
	trace_seq_printf(s, "\tpadding     : type == %d\n",
			 RINGBUF_TYPE_PADDING);
	trace_seq_printf(s, "\ttime_extend : type == %d\n",
			 RINGBUF_TYPE_TIME_EXTEND);
	trace_seq_printf(s, "\tdata max type_len  == %d\n",
			 RINGBUF_TYPE_DATA_TYPE_LEN_MAX);

	return !trace_seq_has_overflowed(s);
49 50
}

51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118
/*
 * The ring buffer is made up of a list of pages. A separate list of pages is
 * allocated for each CPU. A writer may only write to a buffer that is
 * associated with the CPU it is currently executing on.  A reader may read
 * from any per cpu buffer.
 *
 * The reader is special. For each per cpu buffer, the reader has its own
 * reader page. When a reader has read the entire reader page, this reader
 * page is swapped with another page in the ring buffer.
 *
 * Now, as long as the writer is off the reader page, the reader can do what
 * ever it wants with that page. The writer will never write to that page
 * again (as long as it is out of the ring buffer).
 *
 * Here's some silly ASCII art.
 *
 *   +------+
 *   |reader|          RING BUFFER
 *   |page  |
 *   +------+        +---+   +---+   +---+
 *                   |   |-->|   |-->|   |
 *                   +---+   +---+   +---+
 *                     ^               |
 *                     |               |
 *                     +---------------+
 *
 *
 *   +------+
 *   |reader|          RING BUFFER
 *   |page  |------------------v
 *   +------+        +---+   +---+   +---+
 *                   |   |-->|   |-->|   |
 *                   +---+   +---+   +---+
 *                     ^               |
 *                     |               |
 *                     +---------------+
 *
 *
 *   +------+
 *   |reader|          RING BUFFER
 *   |page  |------------------v
 *   +------+        +---+   +---+   +---+
 *      ^            |   |-->|   |-->|   |
 *      |            +---+   +---+   +---+
 *      |                              |
 *      |                              |
 *      +------------------------------+
 *
 *
 *   +------+
 *   |buffer|          RING BUFFER
 *   |page  |------------------v
 *   +------+        +---+   +---+   +---+
 *      ^            |   |   |   |-->|   |
 *      |   New      +---+   +---+   +---+
 *      |  Reader------^               |
 *      |   page                       |
 *      +------------------------------+
 *
 *
 * After we make this swap, the reader can hand this page off to the splice
 * code and be done with it. It can even allocate a new page if it needs to
 * and swap that into the ring buffer.
 *
 * We will be using cmpxchg soon to make all this lockless.
 *
 */

119 120
/* Used for individual buffers (after the counter) */
#define RB_BUFFER_OFF		(1 << 20)
121

122
#define BUF_PAGE_HDR_SIZE offsetof(struct buffer_data_page, data)
123

124
#define RB_EVNT_HDR_SIZE (offsetof(struct ring_buffer_event, array))
125
#define RB_ALIGNMENT		4U
126
#define RB_MAX_SMALL_DATA	(RB_ALIGNMENT * RINGBUF_TYPE_DATA_TYPE_LEN_MAX)
127
#define RB_EVNT_MIN_SIZE	8U	/* two 32bit words */
128

129
#ifndef CONFIG_HAVE_64BIT_ALIGNED_ACCESS
130 131 132 133 134 135 136
# define RB_FORCE_8BYTE_ALIGNMENT	0
# define RB_ARCH_ALIGNMENT		RB_ALIGNMENT
#else
# define RB_FORCE_8BYTE_ALIGNMENT	1
# define RB_ARCH_ALIGNMENT		8U
#endif

137 138
#define RB_ALIGN_DATA		__aligned(RB_ARCH_ALIGNMENT)

139 140
/* define RINGBUF_TYPE_DATA for 'case RINGBUF_TYPE_DATA:' */
#define RINGBUF_TYPE_DATA 0 ... RINGBUF_TYPE_DATA_TYPE_LEN_MAX
141 142 143 144 145 146

enum {
	RB_LEN_TIME_EXTEND = 8,
	RB_LEN_TIME_STAMP = 16,
};

147 148 149
#define skip_time_extend(event) \
	((struct ring_buffer_event *)((char *)event + RB_LEN_TIME_EXTEND))

150 151
static inline int rb_null_event(struct ring_buffer_event *event)
{
152
	return event->type_len == RINGBUF_TYPE_PADDING && !event->time_delta;
153 154 155 156
}

static void rb_event_set_padding(struct ring_buffer_event *event)
{
157
	/* padding has a NULL time_delta */
158
	event->type_len = RINGBUF_TYPE_PADDING;
159 160 161
	event->time_delta = 0;
}

162
static unsigned
163
rb_event_data_length(struct ring_buffer_event *event)
164 165 166
{
	unsigned length;

167 168
	if (event->type_len)
		length = event->type_len * RB_ALIGNMENT;
169 170 171 172 173
	else
		length = event->array[0];
	return length + RB_EVNT_HDR_SIZE;
}

174 175 176 177 178 179
/*
 * Return the length of the given event. Will return
 * the length of the time extend if the event is a
 * time extend.
 */
static inline unsigned
180 181
rb_event_length(struct ring_buffer_event *event)
{
182
	switch (event->type_len) {
183
	case RINGBUF_TYPE_PADDING:
184 185 186
		if (rb_null_event(event))
			/* undefined */
			return -1;
187
		return  event->array[0] + RB_EVNT_HDR_SIZE;
188 189 190 191 192 193 194 195

	case RINGBUF_TYPE_TIME_EXTEND:
		return RB_LEN_TIME_EXTEND;

	case RINGBUF_TYPE_TIME_STAMP:
		return RB_LEN_TIME_STAMP;

	case RINGBUF_TYPE_DATA:
196
		return rb_event_data_length(event);
197 198 199 200 201 202 203
	default:
		BUG();
	}
	/* not hit */
	return 0;
}

204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220
/*
 * Return total length of time extend and data,
 *   or just the event length for all other events.
 */
static inline unsigned
rb_event_ts_length(struct ring_buffer_event *event)
{
	unsigned len = 0;

	if (event->type_len == RINGBUF_TYPE_TIME_EXTEND) {
		/* time extends include the data event after it */
		len = RB_LEN_TIME_EXTEND;
		event = skip_time_extend(event);
	}
	return len + rb_event_length(event);
}

221 222 223
/**
 * ring_buffer_event_length - return the length of the event
 * @event: the event to get the length of
224 225 226 227 228 229
 *
 * Returns the size of the data load of a data event.
 * If the event is something other than a data event, it
 * returns the size of the event itself. With the exception
 * of a TIME EXTEND, where it still returns the size of the
 * data load of the data event after it.
230 231 232
 */
unsigned ring_buffer_event_length(struct ring_buffer_event *event)
{
233 234 235 236 237 238
	unsigned length;

	if (event->type_len == RINGBUF_TYPE_TIME_EXTEND)
		event = skip_time_extend(event);

	length = rb_event_length(event);
239
	if (event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX)
240 241 242 243 244
		return length;
	length -= RB_EVNT_HDR_SIZE;
	if (length > RB_MAX_SMALL_DATA + sizeof(event->array[0]))
                length -= sizeof(event->array[0]);
	return length;
245
}
246
EXPORT_SYMBOL_GPL(ring_buffer_event_length);
247 248

/* inline for ring buffer fast paths */
249
static __always_inline void *
250 251
rb_event_data(struct ring_buffer_event *event)
{
252 253
	if (event->type_len == RINGBUF_TYPE_TIME_EXTEND)
		event = skip_time_extend(event);
254
	BUG_ON(event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX);
255
	/* If length is in len field, then array[0] has the data */
256
	if (event->type_len)
257 258 259 260 261 262 263 264 265 266 267 268 269
		return (void *)&event->array[0];
	/* Otherwise length is in array[0] and array[1] has the data */
	return (void *)&event->array[1];
}

/**
 * ring_buffer_event_data - return the data of the event
 * @event: the event to get the data from
 */
void *ring_buffer_event_data(struct ring_buffer_event *event)
{
	return rb_event_data(event);
}
270
EXPORT_SYMBOL_GPL(ring_buffer_event_data);
271 272

#define for_each_buffer_cpu(buffer, cpu)		\
273
	for_each_cpu(cpu, buffer->cpumask)
274 275 276 277 278

#define TS_SHIFT	27
#define TS_MASK		((1ULL << TS_SHIFT) - 1)
#define TS_DELTA_TEST	(~TS_MASK)

279 280
/* Flag when events were overwritten */
#define RB_MISSED_EVENTS	(1 << 31)
281 282
/* Missed count stored at end */
#define RB_MISSED_STORED	(1 << 30)
283

284
struct buffer_data_page {
285
	u64		 time_stamp;	/* page time stamp */
Wenji Huang's avatar
Wenji Huang committed
286
	local_t		 commit;	/* write committed index */
287
	unsigned char	 data[] RB_ALIGN_DATA;	/* data of buffer page */
288 289
};

Steven Rostedt's avatar
Steven Rostedt committed
290 291 292 293 294 295 296 297
/*
 * Note, the buffer_page list must be first. The buffer pages
 * are allocated in cache lines, which means that each buffer
 * page will be at the beginning of a cache line, and thus
 * the least significant bits will be zero. We use this to
 * add flags in the list struct pointers, to make the ring buffer
 * lockless.
 */
298
struct buffer_page {
299
	struct list_head list;		/* list of buffer pages */
300
	local_t		 write;		/* index for next write */
301
	unsigned	 read;		/* index for next read */
302
	local_t		 entries;	/* entries on this page */
303
	unsigned long	 real_end;	/* real end of data */
304
	struct buffer_data_page *page;	/* Actual data page */
305 306
};

Steven Rostedt's avatar
Steven Rostedt committed
307 308 309 310 311 312 313 314 315 316 317 318 319 320 321
/*
 * The buffer page counters, write and entries, must be reset
 * atomically when crossing page boundaries. To synchronize this
 * update, two counters are inserted into the number. One is
 * the actual counter for the write position or count on the page.
 *
 * The other is a counter of updaters. Before an update happens
 * the update partition of the counter is incremented. This will
 * allow the updater to update the counter atomically.
 *
 * The counter is 20 bits, and the state data is 12.
 */
#define RB_WRITE_MASK		0xfffff
#define RB_WRITE_INTCNT		(1 << 20)

322
static void rb_init_page(struct buffer_data_page *bpage)
323
{
324
	local_set(&bpage->commit, 0);
325 326
}

327 328 329 330 331 332
/**
 * ring_buffer_page_len - the size of data on the page.
 * @page: The page to read
 *
 * Returns the amount of data on the page, including buffer page header.
 */
333 334
size_t ring_buffer_page_len(void *page)
{
335 336
	return local_read(&((struct buffer_data_page *)page)->commit)
		+ BUF_PAGE_HDR_SIZE;
337 338
}

339 340 341 342
/*
 * Also stolen from mm/slob.c. Thanks to Mathieu Desnoyers for pointing
 * this issue out.
 */
343
static void free_buffer_page(struct buffer_page *bpage)
344
{
345
	free_page((unsigned long)bpage->page);
346
	kfree(bpage);
347 348
}

349 350 351 352 353 354 355 356 357 358
/*
 * We need to fit the time_stamp delta into 27 bits.
 */
static inline int test_time_stamp(u64 delta)
{
	if (delta & TS_DELTA_TEST)
		return 1;
	return 0;
}

359
#define BUF_PAGE_SIZE (PAGE_SIZE - BUF_PAGE_HDR_SIZE)
360

361 362 363
/* Max payload is BUF_PAGE_SIZE - header (8bytes) */
#define BUF_MAX_DATA_SIZE (BUF_PAGE_SIZE - (sizeof(u32) * 2))

364 365 366
int ring_buffer_print_page_header(struct trace_seq *s)
{
	struct buffer_data_page field;
367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391

	trace_seq_printf(s, "\tfield: u64 timestamp;\t"
			 "offset:0;\tsize:%u;\tsigned:%u;\n",
			 (unsigned int)sizeof(field.time_stamp),
			 (unsigned int)is_signed_type(u64));

	trace_seq_printf(s, "\tfield: local_t commit;\t"
			 "offset:%u;\tsize:%u;\tsigned:%u;\n",
			 (unsigned int)offsetof(typeof(field), commit),
			 (unsigned int)sizeof(field.commit),
			 (unsigned int)is_signed_type(long));

	trace_seq_printf(s, "\tfield: int overwrite;\t"
			 "offset:%u;\tsize:%u;\tsigned:%u;\n",
			 (unsigned int)offsetof(typeof(field), commit),
			 1,
			 (unsigned int)is_signed_type(long));

	trace_seq_printf(s, "\tfield: char data;\t"
			 "offset:%u;\tsize:%u;\tsigned:%u;\n",
			 (unsigned int)offsetof(typeof(field), data),
			 (unsigned int)BUF_PAGE_SIZE,
			 (unsigned int)is_signed_type(char));

	return !trace_seq_has_overflowed(s);
392 393
}

394 395 396
struct rb_irq_work {
	struct irq_work			work;
	wait_queue_head_t		waiters;
397
	wait_queue_head_t		full_waiters;
398
	bool				waiters_pending;
399 400
	bool				full_waiters_pending;
	bool				wakeup_full;
401 402
};

403 404 405 406 407 408 409 410 411 412 413
/*
 * Structure to hold event state and handle nested events.
 */
struct rb_event_info {
	u64			ts;
	u64			delta;
	unsigned long		length;
	struct buffer_page	*tail_page;
	int			add_timestamp;
};

414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430
/*
 * Used for which event context the event is in.
 *  NMI     = 0
 *  IRQ     = 1
 *  SOFTIRQ = 2
 *  NORMAL  = 3
 *
 * See trace_recursive_lock() comment below for more details.
 */
enum {
	RB_CTX_NMI,
	RB_CTX_IRQ,
	RB_CTX_SOFTIRQ,
	RB_CTX_NORMAL,
	RB_CTX_MAX
};

431 432 433 434 435
/*
 * head_page == tail_page && head == tail then buffer is empty.
 */
struct ring_buffer_per_cpu {
	int				cpu;
436
	atomic_t			record_disabled;
437
	struct ring_buffer		*buffer;
438
	raw_spinlock_t			reader_lock;	/* serialize readers */
439
	arch_spinlock_t			lock;
440
	struct lock_class_key		lock_key;
441
	struct buffer_data_page		*free_page;
442
	unsigned long			nr_pages;
443
	unsigned int			current_context;
444
	struct list_head		*pages;
445 446
	struct buffer_page		*head_page;	/* read from head */
	struct buffer_page		*tail_page;	/* write to tail */
Wenji Huang's avatar
Wenji Huang committed
447
	struct buffer_page		*commit_page;	/* committed pages */
448
	struct buffer_page		*reader_page;
449 450
	unsigned long			lost_events;
	unsigned long			last_overrun;
451
	local_t				entries_bytes;
452
	local_t				entries;
453 454 455
	local_t				overrun;
	local_t				commit_overrun;
	local_t				dropped_events;
456 457
	local_t				committing;
	local_t				commits;
Steven Rostedt's avatar
Steven Rostedt committed
458
	unsigned long			read;
459
	unsigned long			read_bytes;
460 461
	u64				write_stamp;
	u64				read_stamp;
462
	/* ring buffer pages to update, > 0 to add, < 0 to remove */
463
	long				nr_pages_to_update;
464
	struct list_head		new_pages; /* new pages to add */
465
	struct work_struct		update_pages_work;
466
	struct completion		update_done;
467 468

	struct rb_irq_work		irq_work;
469 470 471 472 473 474
};

struct ring_buffer {
	unsigned			flags;
	int				cpus;
	atomic_t			record_disabled;
475
	atomic_t			resize_disabled;
476
	cpumask_var_t			cpumask;
477

478 479
	struct lock_class_key		*reader_lock_key;

480 481 482
	struct mutex			mutex;

	struct ring_buffer_per_cpu	**buffers;
483

484
	struct hlist_node		node;
485
	u64				(*clock)(void);
486 487

	struct rb_irq_work		irq_work;
488 489 490 491 492 493
};

struct ring_buffer_iter {
	struct ring_buffer_per_cpu	*cpu_buffer;
	unsigned long			head;
	struct buffer_page		*head_page;
494 495
	struct buffer_page		*cache_reader_page;
	unsigned long			cache_read;
496 497 498
	u64				read_stamp;
};

499 500 501 502 503 504 505 506 507 508 509
/*
 * rb_wake_up_waiters - wake up tasks waiting for ring buffer input
 *
 * Schedules a delayed work to wake up any task that is blocked on the
 * ring buffer waiters queue.
 */
static void rb_wake_up_waiters(struct irq_work *work)
{
	struct rb_irq_work *rbwork = container_of(work, struct rb_irq_work, work);

	wake_up_all(&rbwork->waiters);
510 511 512 513
	if (rbwork->wakeup_full) {
		rbwork->wakeup_full = false;
		wake_up_all(&rbwork->full_waiters);
	}
514 515 516 517 518 519
}

/**
 * ring_buffer_wait - wait for input to the ring buffer
 * @buffer: buffer to wait on
 * @cpu: the cpu buffer to wait on
520
 * @full: wait until a full page is available, if @cpu != RING_BUFFER_ALL_CPUS
521 522 523 524 525
 *
 * If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon
 * as data is added to any of the @buffer's cpu buffers. Otherwise
 * it will wait for data to be added to a specific cpu buffer.
 */
526
int ring_buffer_wait(struct ring_buffer *buffer, int cpu, bool full)
527
{
528
	struct ring_buffer_per_cpu *uninitialized_var(cpu_buffer);
529 530
	DEFINE_WAIT(wait);
	struct rb_irq_work *work;
531
	int ret = 0;
532 533 534 535 536 537

	/*
	 * Depending on what the caller is waiting for, either any
	 * data in any cpu buffer, or a specific buffer, put the
	 * caller on the appropriate wait queue.
	 */
538
	if (cpu == RING_BUFFER_ALL_CPUS) {
539
		work = &buffer->irq_work;
540 541 542
		/* Full only makes sense on per cpu reads */
		full = false;
	} else {
543 544
		if (!cpumask_test_cpu(cpu, buffer->cpumask))
			return -ENODEV;
545 546 547 548 549
		cpu_buffer = buffer->buffers[cpu];
		work = &cpu_buffer->irq_work;
	}


550
	while (true) {
551 552 553 554
		if (full)
			prepare_to_wait(&work->full_waiters, &wait, TASK_INTERRUPTIBLE);
		else
			prepare_to_wait(&work->waiters, &wait, TASK_INTERRUPTIBLE);
555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575

		/*
		 * The events can happen in critical sections where
		 * checking a work queue can cause deadlocks.
		 * After adding a task to the queue, this flag is set
		 * only to notify events to try to wake up the queue
		 * using irq_work.
		 *
		 * We don't clear it even if the buffer is no longer
		 * empty. The flag only causes the next event to run
		 * irq_work to do the work queue wake up. The worse
		 * that can happen if we race with !trace_empty() is that
		 * an event will cause an irq_work to try to wake up
		 * an empty queue.
		 *
		 * There's no reason to protect this flag either, as
		 * the work queue and irq_work logic will do the necessary
		 * synchronization for the wake ups. The only thing
		 * that is necessary is that the wake up happens after
		 * a task has been queued. It's OK for spurious wake ups.
		 */
576 577 578 579
		if (full)
			work->full_waiters_pending = true;
		else
			work->waiters_pending = true;
580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603

		if (signal_pending(current)) {
			ret = -EINTR;
			break;
		}

		if (cpu == RING_BUFFER_ALL_CPUS && !ring_buffer_empty(buffer))
			break;

		if (cpu != RING_BUFFER_ALL_CPUS &&
		    !ring_buffer_empty_cpu(buffer, cpu)) {
			unsigned long flags;
			bool pagebusy;

			if (!full)
				break;

			raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
			pagebusy = cpu_buffer->reader_page == cpu_buffer->commit_page;
			raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);

			if (!pagebusy)
				break;
		}
604 605

		schedule();
606
	}
607

608 609 610 611
	if (full)
		finish_wait(&work->full_waiters, &wait);
	else
		finish_wait(&work->waiters, &wait);
612 613

	return ret;
614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638
}

/**
 * ring_buffer_poll_wait - poll on buffer input
 * @buffer: buffer to wait on
 * @cpu: the cpu buffer to wait on
 * @filp: the file descriptor
 * @poll_table: The poll descriptor
 *
 * If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon
 * as data is added to any of the @buffer's cpu buffers. Otherwise
 * it will wait for data to be added to a specific cpu buffer.
 *
 * Returns POLLIN | POLLRDNORM if data exists in the buffers,
 * zero otherwise.
 */
int ring_buffer_poll_wait(struct ring_buffer *buffer, int cpu,
			  struct file *filp, poll_table *poll_table)
{
	struct ring_buffer_per_cpu *cpu_buffer;
	struct rb_irq_work *work;

	if (cpu == RING_BUFFER_ALL_CPUS)
		work = &buffer->irq_work;
	else {
639 640 641
		if (!cpumask_test_cpu(cpu, buffer->cpumask))
			return -EINVAL;

642 643 644 645 646
		cpu_buffer = buffer->buffers[cpu];
		work = &cpu_buffer->irq_work;
	}

	poll_wait(filp, &work->waiters, poll_table);
647 648 649 650 651 652 653 654 655 656 657 658 659 660 661
	work->waiters_pending = true;
	/*
	 * There's a tight race between setting the waiters_pending and
	 * checking if the ring buffer is empty.  Once the waiters_pending bit
	 * is set, the next event will wake the task up, but we can get stuck
	 * if there's only a single event in.
	 *
	 * FIXME: Ideally, we need a memory barrier on the writer side as well,
	 * but adding a memory barrier to all events will cause too much of a
	 * performance hit in the fast path.  We only need a memory barrier when
	 * the buffer goes from empty to having content.  But as this race is
	 * extremely small, and it's not a problem if another event comes in, we
	 * will fix it later.
	 */
	smp_mb();
662 663 664 665 666 667 668

	if ((cpu == RING_BUFFER_ALL_CPUS && !ring_buffer_empty(buffer)) ||
	    (cpu != RING_BUFFER_ALL_CPUS && !ring_buffer_empty_cpu(buffer, cpu)))
		return POLLIN | POLLRDNORM;
	return 0;
}

669
/* buffer may be either ring_buffer or ring_buffer_per_cpu */
670 671 672 673 674 675 676 677 678 679 680 681 682
#define RB_WARN_ON(b, cond)						\
	({								\
		int _____ret = unlikely(cond);				\
		if (_____ret) {						\
			if (__same_type(*(b), struct ring_buffer_per_cpu)) { \
				struct ring_buffer_per_cpu *__b =	\
					(void *)b;			\
				atomic_inc(&__b->buffer->record_disabled); \
			} else						\
				atomic_inc(&b->record_disabled);	\
			WARN_ON(1);					\
		}							\
		_____ret;						\
683
	})
684

685 686 687
/* Up this if you want to test the TIME_EXTENTS and normalization */
#define DEBUG_SHIFT 0

688
static inline u64 rb_time_stamp(struct ring_buffer *buffer)
689 690 691 692 693
{
	/* shift to debug/test normalization and TIME_EXTENTS */
	return buffer->clock() << DEBUG_SHIFT;
}

694 695 696 697 698
u64 ring_buffer_time_stamp(struct ring_buffer *buffer, int cpu)
{
	u64 time;

	preempt_disable_notrace();
699
	time = rb_time_stamp(buffer);
700 701 702 703 704 705 706 707 708 709 710 711 712 713
	preempt_enable_no_resched_notrace();

	return time;
}
EXPORT_SYMBOL_GPL(ring_buffer_time_stamp);

void ring_buffer_normalize_time_stamp(struct ring_buffer *buffer,
				      int cpu, u64 *ts)
{
	/* Just stupid testing the normalize function and deltas */
	*ts >>= DEBUG_SHIFT;
}
EXPORT_SYMBOL_GPL(ring_buffer_normalize_time_stamp);

Steven Rostedt's avatar
Steven Rostedt committed
714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803
/*
 * Making the ring buffer lockless makes things tricky.
 * Although writes only happen on the CPU that they are on,
 * and they only need to worry about interrupts. Reads can
 * happen on any CPU.
 *
 * The reader page is always off the ring buffer, but when the
 * reader finishes with a page, it needs to swap its page with
 * a new one from the buffer. The reader needs to take from
 * the head (writes go to the tail). But if a writer is in overwrite
 * mode and wraps, it must push the head page forward.
 *
 * Here lies the problem.
 *
 * The reader must be careful to replace only the head page, and
 * not another one. As described at the top of the file in the
 * ASCII art, the reader sets its old page to point to the next
 * page after head. It then sets the page after head to point to
 * the old reader page. But if the writer moves the head page
 * during this operation, the reader could end up with the tail.
 *
 * We use cmpxchg to help prevent this race. We also do something
 * special with the page before head. We set the LSB to 1.
 *
 * When the writer must push the page forward, it will clear the
 * bit that points to the head page, move the head, and then set
 * the bit that points to the new head page.
 *
 * We also don't want an interrupt coming in and moving the head
 * page on another writer. Thus we use the second LSB to catch
 * that too. Thus:
 *
 * head->list->prev->next        bit 1          bit 0
 *                              -------        -------
 * Normal page                     0              0
 * Points to head page             0              1
 * New head page                   1              0
 *
 * Note we can not trust the prev pointer of the head page, because:
 *
 * +----+       +-----+        +-----+
 * |    |------>|  T  |---X--->|  N  |
 * |    |<------|     |        |     |
 * +----+       +-----+        +-----+
 *   ^                           ^ |
 *   |          +-----+          | |
 *   +----------|  R  |----------+ |
 *              |     |<-----------+
 *              +-----+
 *
 * Key:  ---X-->  HEAD flag set in pointer
 *         T      Tail page
 *         R      Reader page
 *         N      Next page
 *
 * (see __rb_reserve_next() to see where this happens)
 *
 *  What the above shows is that the reader just swapped out
 *  the reader page with a page in the buffer, but before it
 *  could make the new header point back to the new page added
 *  it was preempted by a writer. The writer moved forward onto
 *  the new page added by the reader and is about to move forward
 *  again.
 *
 *  You can see, it is legitimate for the previous pointer of
 *  the head (or any page) not to point back to itself. But only
 *  temporarially.
 */

#define RB_PAGE_NORMAL		0UL
#define RB_PAGE_HEAD		1UL
#define RB_PAGE_UPDATE		2UL


#define RB_FLAG_MASK		3UL

/* PAGE_MOVED is not part of the mask */
#define RB_PAGE_MOVED		4UL

/*
 * rb_list_head - remove any bit
 */
static struct list_head *rb_list_head(struct list_head *list)
{
	unsigned long val = (unsigned long)list;

	return (struct list_head *)(val & ~RB_FLAG_MASK);
}

/*
804
 * rb_is_head_page - test if the given page is the head page
Steven Rostedt's avatar
Steven Rostedt committed
805 806 807 808 809 810
 *
 * Because the reader may move the head_page pointer, we can
 * not trust what the head page is (it may be pointing to
 * the reader page). But if the next page is a header page,
 * its flags will be non zero.
 */
811
static inline int
Steven Rostedt's avatar
Steven Rostedt committed
812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831
rb_is_head_page(struct ring_buffer_per_cpu *cpu_buffer,
		struct buffer_page *page, struct list_head *list)
{
	unsigned long val;

	val = (unsigned long)list->next;

	if ((val & ~RB_FLAG_MASK) != (unsigned long)&page->list)
		return RB_PAGE_MOVED;

	return val & RB_FLAG_MASK;
}

/*
 * rb_is_reader_page
 *
 * The unique thing about the reader page, is that, if the
 * writer is ever on it, the previous pointer never points
 * back to the reader page.
 */
832
static bool rb_is_reader_page(struct buffer_page *page)
Steven Rostedt's avatar
Steven Rostedt committed
833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903
{
	struct list_head *list = page->list.prev;

	return rb_list_head(list->next) != &page->list;
}

/*
 * rb_set_list_to_head - set a list_head to be pointing to head.
 */
static void rb_set_list_to_head(struct ring_buffer_per_cpu *cpu_buffer,
				struct list_head *list)
{
	unsigned long *ptr;

	ptr = (unsigned long *)&list->next;
	*ptr |= RB_PAGE_HEAD;
	*ptr &= ~RB_PAGE_UPDATE;
}

/*
 * rb_head_page_activate - sets up head page
 */
static void rb_head_page_activate(struct ring_buffer_per_cpu *cpu_buffer)
{
	struct buffer_page *head;

	head = cpu_buffer->head_page;
	if (!head)
		return;

	/*
	 * Set the previous list pointer to have the HEAD flag.
	 */
	rb_set_list_to_head(cpu_buffer, head->list.prev);
}

static void rb_list_head_clear(struct list_head *list)
{
	unsigned long *ptr = (unsigned long *)&list->next;

	*ptr &= ~RB_FLAG_MASK;
}

/*
 * rb_head_page_dactivate - clears head page ptr (for free list)
 */
static void
rb_head_page_deactivate(struct ring_buffer_per_cpu *cpu_buffer)
{
	struct list_head *hd;

	/* Go through the whole list and clear any pointers found. */
	rb_list_head_clear(cpu_buffer->pages);

	list_for_each(hd, cpu_buffer->pages)
		rb_list_head_clear(hd);
}

static int rb_head_page_set(struct ring_buffer_per_cpu *cpu_buffer,
			    struct buffer_page *head,
			    struct buffer_page *prev,
			    int old_flag, int new_flag)
{
	struct list_head *list;
	unsigned long val = (unsigned long)&head->list;
	unsigned long ret;

	list = &prev->list;

	val &= ~RB_FLAG_MASK;

904 905
	ret = cmpxchg((unsigned long *)&list->next,
		      val | old_flag, val | new_flag);
Steven Rostedt's avatar
Steven Rostedt committed
906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996

	/* check if the reader took the page */
	if ((ret & ~RB_FLAG_MASK) != val)
		return RB_PAGE_MOVED;

	return ret & RB_FLAG_MASK;
}

static int rb_head_page_set_update(struct ring_buffer_per_cpu *cpu_buffer,
				   struct buffer_page *head,
				   struct buffer_page *prev,
				   int old_flag)
{
	return rb_head_page_set(cpu_buffer, head, prev,
				old_flag, RB_PAGE_UPDATE);
}

static int rb_head_page_set_head(struct ring_buffer_per_cpu *cpu_buffer,
				 struct buffer_page *head,
				 struct buffer_page *prev,
				 int old_flag)
{
	return rb_head_page_set(cpu_buffer, head, prev,
				old_flag, RB_PAGE_HEAD);
}

static int rb_head_page_set_normal(struct ring_buffer_per_cpu *cpu_buffer,
				   struct buffer_page *head,
				   struct buffer_page *prev,
				   int old_flag)
{
	return rb_head_page_set(cpu_buffer, head, prev,
				old_flag, RB_PAGE_NORMAL);
}

static inline void rb_inc_page(struct ring_buffer_per_cpu *cpu_buffer,
			       struct buffer_page **bpage)
{
	struct list_head *p = rb_list_head((*bpage)->list.next);

	*bpage = list_entry(p, struct buffer_page, list);
}

static struct buffer_page *
rb_set_head_page(struct ring_buffer_per_cpu *cpu_buffer)
{
	struct buffer_page *head;
	struct buffer_page *page;
	struct list_head *list;
	int i;

	if (RB_WARN_ON(cpu_buffer, !cpu_buffer->head_page))
		return NULL;

	/* sanity check */
	list = cpu_buffer->pages;
	if (RB_WARN_ON(cpu_buffer, rb_list_head(list->prev->next) != list))
		return NULL;

	page = head = cpu_buffer->head_page;
	/*
	 * It is possible that the writer moves the header behind
	 * where we started, and we miss in one loop.
	 * A second loop should grab the header, but we'll do
	 * three loops just because I'm paranoid.
	 */
	for (i = 0; i < 3; i++) {
		do {
			if (rb_is_head_page(cpu_buffer, page, page->list.prev)) {
				cpu_buffer->head_page = page;
				return page;
			}
			rb_inc_page(cpu_buffer, &page);
		} while (page != head);
	}

	RB_WARN_ON(cpu_buffer, 1);

	return NULL;
}

static int rb_head_page_replace(struct buffer_page *old,
				struct buffer_page *new)
{
	unsigned long *ptr = (unsigned long *)&old->list.prev->next;
	unsigned long val;
	unsigned long ret;

	val = *ptr & ~RB_FLAG_MASK;
	val |= RB_PAGE_HEAD;

997
	ret = cmpxchg(ptr, val, (unsigned long)&new->list);
Steven Rostedt's avatar
Steven Rostedt committed
998 999 1000 1001 1002 1003 1004

	return ret == val;
}

/*
 * rb_tail_page_update - move the tail page forward
 */
1005
static void rb_tail_page_update(struct ring_buffer_per_cpu *cpu_buffer,
Steven Rostedt's avatar
Steven Rostedt committed
1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034
			       struct buffer_page *tail_page,
			       struct buffer_page *next_page)
{
	unsigned long old_entries;
	unsigned long old_write;

	/*
	 * The tail page now needs to be moved forward.
	 *
	 * We need to reset the tail page, but without messing
	 * with possible erasing of data brought in by interrupts
	 * that have moved the tail page and are currently on it.
	 *
	 * We add a counter to the write field to denote this.
	 */
	old_write = local_add_return(RB_WRITE_INTCNT, &next_page->write);
	old_entries = local_add_return(RB_WRITE_INTCNT, &next_page->entries);

	/*
	 * Just make sure we have seen our old_write and synchronize
	 * with any interrupts that come in.
	 */
	barrier();

	/*
	 * If the tail page is still the same as what we think
	 * it is, then it is up to us to update the tail
	 * pointer.
	 */
1035
	if (tail_page == READ_ONCE(cpu_buffer->tail_page)) {
Steven Rostedt's avatar
Steven Rostedt committed
1036 1037 1038 1039 1040 1041 1042 1043
		/* Zero the write counter */
		unsigned long val = old_write & ~RB_WRITE_MASK;
		unsigned long eval = old_entries & ~RB_WRITE_MASK;

		/*
		 * This will only succeed if an interrupt did
		 * not come in and change it. In which case, we
		 * do not want to modify it.
1044 1045 1046 1047 1048
		 *
		 * We add (void) to let the compiler know that we do not care
		 * about the return value of these functions. We use the
		 * cmpxchg to only update if an interrupt did not already
		 * do it for us. If the cmpxchg fails, we don't care.
Steven Rostedt's avatar
Steven Rostedt committed
1049
		 */
1050 1051
		(void)local_cmpxchg(&next_page->write, old_write, val);
		(void)local_cmpxchg(&next_page->entries, old_entries, eval);
Steven Rostedt's avatar
Steven Rostedt committed
1052 1053 1054 1055 1056 1057 1058 1059

		/*
		 * No need to worry about races with clearing out the commit.
		 * it only can increment when a commit takes place. But that
		 * only happens in the outer most nested commit.
		 */
		local_set(&next_page->page->commit, 0);

1060 1061
		/* Again, either we update tail_page or an interrupt does */
		(void)cmpxchg(&cpu_buffer->tail_page, tail_page, next_page);
Steven Rostedt's avatar
Steven Rostedt committed
1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088
	}
}

static int rb_check_bpage(struct ring_buffer_per_cpu *cpu_buffer,
			  struct buffer_page *bpage)
{
	unsigned long val = (unsigned long)bpage;

	if (RB_WARN_ON(cpu_buffer, val & RB_FLAG_MASK))
		return 1;

	return 0;
}

/**
 * rb_check_list - make sure a pointer to a list has the last bits zero
 */
static int rb_check_list(struct ring_buffer_per_cpu *cpu_buffer,
			 struct list_head *list)
{
	if (RB_WARN_ON(cpu_buffer, rb_list_head(list->prev) != list->prev))
		return 1;
	if (RB_WARN_ON(cpu_buffer, rb_list_head(list->next) != list->next))
		return 1;
	return 0;
}

1089
/**
1090
 * rb_check_pages - integrity check of buffer pages
1091 1092
 * @cpu_buffer: CPU buffer with pages to test
 *
Wenji Huang's avatar
Wenji Huang committed
1093
 * As a safety measure we check to make sure the data pages have not
1094 1095 1096 1097
 * been corrupted.
 */
static int rb_check_pages(struct ring_buffer_per_cpu *cpu_buffer)
{
1098
	struct list_head *head = cpu_buffer->pages;
1099
	struct buffer_page *bpage, *tmp;
1100

1101 1102 1103 1104
	/* Reset the head page if it exists */
	if (cpu_buffer->head_page)
		rb_set_head_page(cpu_buffer);

Steven Rostedt's avatar
Steven Rostedt committed
1105 1106
	rb_head_page_deactivate(cpu_buffer);

1107 1108 1109 1110
	if (RB_WARN_ON(cpu_buffer, head->next->prev != head))
		return -1;
	if (RB_WARN_ON(cpu_buffer, head->prev->next != head))
		return -1;
1111

Steven Rostedt's avatar
Steven Rostedt committed
1112 1113 1114
	if (rb_check_list(cpu_buffer, head))
		return -1;

1115
	list_for_each_entry_safe(bpage, tmp, head, list) {
1116
		if (RB_WARN_ON(cpu_buffer,
1117
			       bpage->list.next->prev != &bpage->list))
1118 1119
			return -1;
		if (RB_WARN_ON(cpu_buffer,
1120
			       bpage->list.prev->next != &bpage->list))
1121
			return -1;
Steven Rostedt's avatar
Steven Rostedt committed
1122 1123
		if (rb_check_list(cpu_buffer, &bpage->list))
			return -1;
1124 1125
	}

Steven Rostedt's avatar
Steven Rostedt committed
1126 1127
	rb_head_page_activate(cpu_buffer);

1128 1129 1130
	return 0;
}

1131
static int __rb_allocate_pages(long nr_pages, struct list_head *pages, int cpu)
1132
{
1133
	struct buffer_page *bpage, *tmp;
1134
	long i;
1135

1136
	for (i = 0; i < nr_pages; i++) {
1137
		struct page *page;
1138
		/*
1139 1140 1141
		 * __GFP_RETRY_MAYFAIL flag makes sure that the allocation fails
		 * gracefully without invoking oom-killer and the system is not
		 * destabilized.
1142
		 */
1143
		bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()),
1144
				    GFP_KERNEL | __GFP_RETRY_MAYFAIL,
1145
				    cpu_to_node(cpu));
1146
		if (!bpage)
1147
			goto free_pages;
Steven Rostedt's avatar
Steven Rostedt committed
1148

1149
		list_add(&bpage->list, pages);
Steven Rostedt's avatar
Steven Rostedt committed
1150

1151
		page = alloc_pages_node(cpu_to_node(cpu),
1152
					GFP_KERNEL | __GFP_RETRY_MAYFAIL, 0);
1153
		if (!page)
1154
			goto free_pages;
1155
		bpage->page = page_address(page);
1156
		rb_init_page(bpage->page);
1157 1158
	}

1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170
	return 0;

free_pages:
	list_for_each_entry_safe(bpage, tmp, pages, list) {
		list_del_init(&bpage->list);
		free_buffer_page(bpage);
	}

	return -ENOMEM;
}

static int rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer,
1171
			     unsigned long nr_pages)
1172 1173 1174 1175 1176 1177 1178 1179
{
	LIST_HEAD(pages);

	WARN_ON(!nr_pages);

	if (__rb_allocate_pages(nr_pages, &pages, cpu_buffer->cpu))
		return -ENOMEM;

1180 1181 1182 1183 1184 1185 1186
	/*
	 * The ring buffer page list is a circular list that does not
	 * start and end with a list head. All page list items point to
	 * other pages.
	 */
	cpu_buffer->pages = pages.next;
	list_del(&pages);
1187

1188 1189
	cpu_buffer->nr_pages = nr_pages;

1190 1191 1192 1193 1194 1195
	rb_check_pages(cpu_buffer);

	return 0;
}

static struct ring_buffer_per_cpu *
1196
rb_allocate_cpu_buffer(struct ring_buffer *buffer, long nr_pages, int cpu)
1197 1198
{
	struct ring_buffer_per_cpu *cpu_buffer;
1199
	struct buffer_page *bpage;
1200
	struct page *page;
1201 1202 1203 1204 1205 1206 1207 1208 1209
	int ret;

	cpu_buffer = kzalloc_node(ALIGN(sizeof(*cpu_buffer), cache_line_size()),
				  GFP_KERNEL, cpu_to_node(cpu));
	if (!cpu_buffer)
		return NULL;

	cpu_buffer->cpu = cpu;
	cpu_buffer->buffer = buffer;
1210
	raw_spin_lock_init(&cpu_buffer->reader_lock);
1211
	lockdep_set_class(&cpu_buffer->reader_lock, buffer->reader_lock_key);
1212
	cpu_buffer->lock = (arch_spinlock_t)__ARCH_SPIN_LOCK_UNLOCKED;
1213
	INIT_WORK(&cpu_buffer->update_pages_work, update_pages_handler);
1214
	init_completion(&cpu_buffer->update_done);
1215
	init_irq_work(&cpu_buffer->irq_work.work, rb_wake_up_waiters);
1216
	init_waitqueue_head(&cpu_buffer->irq_work.waiters);
1217
	init_waitqueue_head(&cpu_buffer->irq_work.full_waiters);
1218

1219
	bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()),
1220
			    GFP_KERNEL, cpu_to_node(cpu));
1221
	if (!bpage)
1222 1223
		goto fail_free_buffer;

Steven Rostedt's avatar
Steven Rostedt committed
1224 1225
	rb_check_bpage(cpu_buffer, bpage);

1226
	cpu_buffer->reader_page = bpage;
1227 1228
	page = alloc_pages_node(cpu_to_node(cpu), GFP_KERNEL, 0);
	if (!page)
1229
		goto fail_free_reader;
1230
	bpage->page = page_address(page);
1231
	rb_init_page(bpage->page);
1232

1233
	INIT_LIST_HEAD(&cpu_buffer->reader_page->list);
1234
	INIT_LIST_HEAD(&cpu_buffer->new_pages);
1235

1236
	ret = rb_allocate_pages(cpu_buffer, nr_pages);
1237
	if (ret < 0)
1238
		goto fail_free_reader;
1239 1240

	cpu_buffer->head_page
1241
		= list_entry(cpu_buffer->pages, struct buffer_page, list);
Steven Rostedt's avatar
Steven Rostedt committed
1242
	cpu_buffer->tail_page = cpu_buffer->commit_page = cpu_buffer->head_page;
1243

Steven Rostedt's avatar
Steven Rostedt committed
1244 1245
	rb_head_page_activate(cpu_buffer);

1246 1247
	return cpu_buffer;

1248 1249 1250
 fail_free_reader:
	free_buffer_page(cpu_buffer->reader_page);

1251 1252 1253 1254 1255 1256 1257
 fail_free_buffer:
	kfree(cpu_buffer);
	return NULL;
}

static void rb_free_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer)
{
1258
	struct list_head *head = cpu_buffer->pages;
1259
	struct buffer_page *bpage, *tmp;
1260

1261 1262
	free_buffer_page(cpu_buffer->reader_page);

Steven Rostedt's avatar
Steven Rostedt committed
1263 1264
	rb_head_page_deactivate(cpu_buffer);

1265 1266 1267 1268 1269 1270
	if (head) {
		list_for_each_entry_safe(bpage, tmp, head, list) {
			list_del_init(&bpage->list);
			free_buffer_page(bpage);
		}
		bpage = list_entry(head, struct buffer_page, list);
1271
		free_buffer_page(bpage);
1272
	}
1273

1274 1275 1276 1277
	kfree(cpu_buffer);
}

/**
1278
 * __ring_buffer_alloc - allocate a new ring_buffer
1279
 * @size: the size in bytes per cpu that is needed.
1280 1281 1282 1283 1284 1285 1286
 * @flags: attributes to set for the ring buffer.
 *
 * Currently the only flag that is available is the RB_FL_OVERWRITE
 * flag. This flag means that the buffer will overwrite old data
 * when the buffer wraps. If this flag is not set, the buffer will
 * drop data when the tail hits the head.
 */
1287 1288
struct ring_buffer *__ring_buffer_alloc(unsigned long size, unsigned flags,
					struct lock_class_key *key)
1289 1290
{
	struct ring_buffer *buffer;
1291
	long nr_pages;
1292
	int bsize;
1293
	int cpu;
1294
	int ret;
1295 1296 1297 1298 1299 1300 1301

	/* keep it in its own cache line */
	buffer = kzalloc(ALIGN(sizeof(*buffer), cache_line_size()),
			 GFP_KERNEL);
	if (!buffer)
		return NULL;

1302
	if (!zalloc_cpumask_var(&buffer->cpumask, GFP_KERNEL))
1303 1304
		goto fail_free_buffer;

1305
	nr_pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE);
1306
	buffer->flags = flags;
1307
	buffer->clock = trace_clock_local;
1308
	buffer->reader_lock_key = key;
1309

1310
	init_irq_work(&buffer->irq_work.work, rb_wake_up_waiters);
1311
	init_waitqueue_head(&buffer->irq_work.waiters);
1312

1313
	/* need at least two pages */
1314 1315
	if (nr_pages < 2)
		nr_pages = 2;
1316 1317 1318 1319 1320 1321 1322

	buffer->cpus = nr_cpu_ids;

	bsize = sizeof(void *) * nr_cpu_ids;
	buffer->buffers = kzalloc(ALIGN(bsize, cache_line_size()),
				  GFP_KERNEL);
	if (!buffer->buffers)
1323
		goto fail_free_cpumask;
1324

1325 1326 1327 1328 1329
	cpu = raw_smp_processor_id();
	cpumask_set_cpu(cpu, buffer->cpumask);
	buffer->buffers[cpu] = rb_allocate_cpu_buffer(buffer, nr_pages, cpu);
	if (!buffer->buffers[cpu])
		goto fail_free_buffers;
1330

1331 1332 1333
	ret = cpuhp_state_add_instance(CPUHP_TRACE_RB_PREPARE, &buffer->node);
	if (ret < 0)
		goto fail_free_buffers;
1334

1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345
	mutex_init(&buffer->mutex);

	return buffer;

 fail_free_buffers:
	for_each_buffer_cpu(buffer, cpu) {
		if (buffer->buffers[cpu])
			rb_free_cpu_buffer(buffer->buffers[cpu]);
	}
	kfree(buffer->buffers);

1346 1347 1348
 fail_free_cpumask:
	free_cpumask_var(buffer->cpumask);

1349 1350 1351 1352
 fail_free_buffer:
	kfree(buffer);
	return NULL;
}
1353
EXPORT_SYMBOL_GPL(__ring_buffer_alloc);
1354 1355 1356 1357 1358 1359 1360 1361 1362 1363

/**
 * ring_buffer_free - free a ring buffer.
 * @buffer: the buffer to free.
 */
void
ring_buffer_free(struct ring_buffer *buffer)
{
	int cpu;

1364
	cpuhp_state_remove_instance(CPUHP_TRACE_RB_PREPARE, &buffer->node);
1365

1366 1367 1368
	for_each_buffer_cpu(buffer, cpu)
		rb_free_cpu_buffer(buffer->buffers[cpu]);

1369
	kfree(buffer->buffers);
1370 1371
	free_cpumask_var(buffer->cpumask);

1372 1373
	kfree(buffer);
}
1374
EXPORT_SYMBOL_GPL(ring_buffer_free);
1375

1376 1377 1378 1379 1380 1381
void ring_buffer_set_clock(struct ring_buffer *buffer,
			   u64 (*clock)(void))
{
	buffer->clock = clock;
}

1382 1383
static void rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer);

1384 1385 1386 1387 1388 1389 1390 1391 1392 1393
static inline unsigned long rb_page_entries(struct buffer_page *bpage)
{
	return local_read(&bpage->entries) & RB_WRITE_MASK;
}

static inline unsigned long rb_page_write(struct buffer_page *bpage)
{
	return local_read(&bpage->write) & RB_WRITE_MASK;
}

1394
static int
1395
rb_remove_pages(struct ring_buffer_per_cpu *cpu_buffer, unsigned long nr_pages)
1396
{
1397 1398 1399
	struct list_head *tail_page, *to_remove, *next_page;
	struct buffer_page *to_remove_page, *tmp_iter_page;
	struct buffer_page *last_page, *first_page;
1400
	unsigned long nr_removed;
1401 1402 1403 1404
	unsigned long head_bit;
	int page_entries;

	head_bit = 0;
1405

1406
	raw_spin_lock_irq(&cpu_buffer->reader_lock);
1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417
	atomic_inc(&cpu_buffer->record_disabled);
	/*
	 * We don't race with the readers since we have acquired the reader
	 * lock. We also don't race with writers after disabling recording.
	 * This makes it easy to figure out the first and the last page to be
	 * removed from the list. We unlink all the pages in between including
	 * the first and last pages. This is done in a busy loop so that we
	 * lose the least number of traces.
	 * The pages are freed after we restart recording and unlock readers.
	 */
	tail_page = &cpu_buffer->tail_page->list;
Steven Rostedt's avatar
Steven Rostedt committed
1418

1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433
	/*
	 * tail page might be on reader page, we remove the next page
	 * from the ring buffer
	 */
	if (cpu_buffer->tail_page == cpu_buffer->reader_page)
		tail_page = rb_list_head(tail_page->next);
	to_remove = tail_page;

	/* start of pages to remove */
	first_page = list_entry(rb_list_head(to_remove->next),
				struct buffer_page, list);

	for (nr_removed = 0; nr_removed < nr_pages; nr_removed++) {
		to_remove = rb_list_head(to_remove)->next;
		head_bit |= (unsigned long)to_remove & RB_PAGE_HEAD;
1434 1435
	}

1436
	next_page = rb_list_head(to_remove)->next;
1437

1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463
	/*
	 * Now we remove all pages between tail_page and next_page.
	 * Make sure that we have head_bit value preserved for the
	 * next page
	 */
	tail_page->next = (struct list_head *)((unsigned long)next_page |
						head_bit);
	next_page = rb_list_head(next_page);
	next_page->prev = tail_page;

	/* make sure pages points to a valid page in the ring buffer */
	cpu_buffer->pages = next_page;

	/* update head page */
	if (head_bit)
		cpu_buffer->head_page = list_entry(next_page,
						struct buffer_page, list);

	/*
	 * change read pointer to make sure any read iterators reset
	 * themselves
	 */
	cpu_buffer->read = 0;

	/* pages are removed, resume tracing and then free the pages */
	atomic_dec(&cpu_buffer->record_disabled);
1464
	raw_spin_unlock_irq(&cpu_buffer->reader_lock);
1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476