-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathInplaceMerge.h
557 lines (505 loc) · 25 KB
/
InplaceMerge.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
#pragma once
/*************************************************************************
* File: InplaceMerge.hh
* Author: Keith Schwarz (htiek@cs.stanford.edu)
*
* An implementation of the fast inplace merge algorithm described by Huang
* and Langston in "Practical In-Place Merging," Communications of the ACM,
* Volume 31, Issue 3, March 1988, pages 348-352.
*
* The algorithm works by splitting the input into sqrt(n) blocks of size
* sqrt(n), choosing one block to use as a buffer, and then merging the
* other blocks together with the buffer as scratch space. The main complexity
* of the algorithm comes from the case where the input is not a perfect
* square in length. When this happens, the input needs to be heavily
* preprocessed into a sequence of blocks that are of suitable size.
*
* A summary of the algorithm is as follows. First, we compute the size of
* each block to be s = ceil(sqrt(n)). Next, we locate the largest s elements
* of the input, which are some combination of the last elements of the two
* sequences to be merged. The paper refers to the group of largest elements
* in the first sequence as A and the group in the second sequence as B.
* Next, a group of elements C is chosen which directly precedes A and has
* size equal to B, and a group of elements D is chosen which directly precedes
* B and has size K - |B| mod s, where K is the length of either list. The
* reason for this choice is that the second list, with B and D removed, has
* a perfect multiple of s elements. A group F is then chosen at the front
* of the first list with size K mod s, again to make the first list have
* a size multiple of s. The input now looks like this:
*
* F s s s ... s s C A s s s ... s D B
*
* Here, each s refers to an arbitrary block of size s. We ned swap regions
* C and B, which have the same size. This puts B and A next to each other,
* and since they were chosen to be the largest elements of the entire
* sequence, we will treat them together as a buffer block, referred to by
* Z. This is seen here:
*
* F s s s ... s s Z s s s ... s D B
*
* Everything here except for F, D, and B are in their proper place. Without
* any optimizations, we could now correctly sort all of these elements
* in-place in O(n) as follows. First we apply the subroutine described
* below to sort all of the s blocks together, giving us the following
* sequence:
*
* F [all s blocks in sorted order] Z D B
*
* Since |F| < s and |Z| = s, we could then merge F and the sorted sequence
* using Z as a buffer in O(n) time, then repeat this process to merge
* C and B into the sequence as well. However, the paper details several
* optimizations which yield an O(n) algorithm with much smaller runtime
* as follows. Recall that the elements are ordered as follows:
*
* F s s s ... s s Z s s s ... s D B
*
* We begin by merging together the elements of D and B together using Z as
* a temporary buffer. This produces a new group E, as seen here:
*
* F s s s ... s s Z s s s ... s E
*
* At this point we slightly diverge from the original text. Let the first
* block of the second range be G, and then swap that block with the first
* actual block of the first list (H):
*
* F G s s ... s s Z H s ... s E
*
* Notice that F and G have the smallest elements from the range, so if we
* merge the two of them together, the first |F| elements of the result are
* the smallest elements of the entire range. We merge them together
* using the buffer as scratch space, yielding two new groups F' and G',
* as seen here:
*
* F' G' s s ... s s Z H s ... s E
*
* Finally, exchange G' and H to get
*
* F' H s s ... s s Z G' s ... s E
*
* Now, the only parts of this setup that aren't perfect multiples of s are
* F', which we don't need to touch (it's already in the right place), and
* E. Once we've run the subroutine described below, we can merge E into the
* sequence quite easily; we'll see how later.
*
* The core of the algorithm is a block merging step in which all of the
* size-s blocks are merged into sorted order, using a special buffer block
* as scratch space. This buffer must consist of the largest elements of the
* sequence for reasons that will become clearer later on. Thus the input to
* this subroutine looks like this:
*
* s s s ... s Z s ... s s
*
* Where Z is the buffer block. Note that because the input was initially two
* sorted sequences, each of these blocks is sorted in ascending order.
*
* The first step of this algorithm is to swap the buffer to the front of the
* sequence in O(s) time, as seen here:
*
* Z s s s ... s s
*
* Next, we sort the remaining blocks in ascending order by their last element.
* To do so, we use naive selection sort. Selection sorting O(s) blocks will
* make O(s^2) = O(n) comparisons, and will perform a total of O(s) block
* swaps, each of which takes O(s) time, for a total of O(s^2) = O(n) swap
* time.
*
* Finally, we begin the merge. Starting at the leftmost block, we scan for
* the largest increasing sequence we can find by picking a string of blocks
* where each block's last element is less than the next block's first element.
* This gives us an O(s) way of locating increasing sequences, since each
* block is internally sorted. Once we have located this sequence, we merge
* it in with the next block, using the buffer as temporary space. We stop
* the merge as soon as the last element of the first sequence is moved into
* the correct place. Since the blocks are stored in sorted ascending order,
* the merge will terminate with the buffer elements moved up past the first
* sequence and before some remaining portion of the second sequence. This
* process terminates when the buffer comes directly before a nondecreasing
* sequence. When this happens, we use the rotate algorithm to push the
* buffer past the last elements. This gives us a sequence of sorted blocks
* followed by an unsorted block containing the largest elements of the
* sequence, which are all in the correct region of the input. We can then
* sort the buffer to finish the merge.
*
* Once we've finished this step, we have everything in place except for the
* elements in E. We can fix this easily as follows. First, swap |E| of the
* smallest elements from the sorted sequence and E. We now have this setup:
*
* E {...sorted sequence...} {|E| of the smallest elements}
*
* Now, do a backwards merge of E and the sorted sequence in a fashion that
* moves the last |E| elements to the front of the array, then sort the first
* E elements in-place. Voila! Everything is sorted!
*/
#ifndef InplaceMerge_Included
#define InplaceMerge_Included
#include <algorithm> // For rotate, swap_ranges, make_heap, sort_heap, iter_swap
#include <iterator> // For iterator_traits, distance
#include <stdexcept> // For invalid_argument
#include <cmath> // For ceil, sqrt
#include <utility> // For pair
#include <functional> // For less
/**
* Function: InplaceMerge(RandomIterator start, RandomIterator end);
* -------------------------------------------------------------------
* Given the sequence [start, end) such that the first half of the
* elements form an ascending sequence and the second half of the
* elements is also an ascending sequences, sorts the entire range
* in ascending order in O(n) time and O(1) auxiliary storage space.
*/
template <typename RandomIterator>
void InplaceMerge(RandomIterator start, RandomIterator end);
/**
* Function: InplaceMerge(RandomIterator start, RandomIterator end,
* Comparator comp);
* -------------------------------------------------------------------
* Given the sequence [start, end) such that the first half of the
* elements form an ascending sequence and the second half of the
* elements is also an ascending sequences, sorts the entire range
* in ascending order in O(n) time and O(1) auxiliary storage space.
* The ranges are assumed to be sorted in ascending order according
* to comp.
*/
template <typename RandomIterator, typename Comparator>
void InplaceMerge(RandomIterator start, RandomIterator end,
Comparator comp);
/* * * * * Implementation Below This Point * * * * */
namespace inplacemerge_detail {
/**
* Function: pair<RandomIterator, RandomIterator>
* LocateMaxElems(RandomIterator begin, RandomIterator mid,
* RandomIterator end, size_t s,
* Comparator comp);
* ---------------------------------------------------------------------
* Given two sorted ranges [begin, mid) and [mid, end) which are sorted
* according to comp and a parameter s, returns iterators (A, B) such
* that [A, mid) and [B, end) are the s largest elements of the two
* sorted ranges.
*/
template <typename RandomIterator, typename Comparator>
std::pair<RandomIterator, RandomIterator>
LocateMaxElems(RandomIterator begin, RandomIterator mid,
RandomIterator end, size_t s, Comparator comp) {
/* This operation is essentially a merge step in which we don't move
* anything around. We keep two pointers, one to the end of each range,
* then continuously move them inward by skipping past the larger of
* the two at each step. Once we've visited s elements, we're done.
*/
std::pair<RandomIterator, RandomIterator> result(mid, end);
/* Locate the largest s elements one at a time. */
for (size_t i = 0; i < s; ++i) {
/* The elements we're interested in comparing are one step before
* the current iterator positions. This is because iterators define
* half-open ranges.
*/
if (comp(*(result.first - 1), *(result.second - 1))) // Second > First
--result.second;
else // First >= Second
--result.first;
}
return result;
}
/**
* Function: BufferedInplaceMerge(RandomIterator begin, RandomIterator mid,
* RandomIterator end, RandomIterator bStart,
* RandomIterator bEnd, Comparator comp);
* ----------------------------------------------------------------------
* Utility function which, given adjacent ranges [begin, mid) and [mid, end)
* to be merged, does a standard merge using the buffer [bStart, bEnd) for
* auxiliary storage space. The input ranges are assumed to be sorted
* using comp and will ultimately be sorted by comp. It is assumed that
* the range [begin, mid) fits in the buffer, though [mid, end) does not
* need to.
*/
template <typename RandomIterator, typename Comparator>
void BufferedInplaceMerge(RandomIterator begin, RandomIterator mid,
RandomIterator end, RandomIterator bStart,
RandomIterator bEnd, Comparator comp) {
/* The algorithm works as follows. We begin by swapping the first range
* into the buffer. Next, we maintain two pointers to the start points
* of the ranges, and use a standard merge technique to select the
* smaller of the two, which is then swapped into the proper position.
* If the second range is exhausted, what's left of the first range
* is swapped into place. If the first range is exhausted, then we do
* not need to take any action because the second range is already
* at the end of the destination.
*/
/* Move the first elements into the buffer. */
std::swap_ranges(begin, mid, bStart);
/* Maintain pointers to the first elements of both ranges and the next
* write position.
*/
RandomIterator one = bStart, two = mid, nextWrite = begin;
/* Cache the end point of the first range. */
const RandomIterator oneEnd = bStart + (mid - begin);
/* While neither range has been exhausted, do the standard merge step. */
while (one != oneEnd && two != end) {
/* Determine which element comes next. */
if (comp(*one, *two)) { // First element is smaller
std::iter_swap(one, nextWrite);
++one;
}
else { // Second element is smaller
std::iter_swap(two, nextWrite);
++two;
}
/* Bump the next write position. */
++nextWrite;
}
/* If we ran out of elements from the first range, we're done. */
if (one == oneEnd) return;
/* Otherwise, we need to exchange the rest of the first range with
* the accumulated buffer elements at the end of the merge range.
*/
std::swap_ranges(one, oneEnd, nextWrite);
}
/**
* Function: SortBlocks(RandomIterator begin, RandomIterator end,
* size_t blockSize, Comparator comp);
* ----------------------------------------------------------------------
* Given a range of blocks [begin, end) of size blockSize, sorts them
* in ascending order by comp. This step uses naive selection sort to
* ensure that the sorting takes time O(n).
*/
template <typename RandomIterator, typename Comparator>
void SortBlocks(RandomIterator begin, RandomIterator end,
size_t blockSize, Comparator comp) {
for (RandomIterator itr = begin; itr != end; itr += blockSize) {
/* Scan over all the blocks from here forward, finding the smallest. */
RandomIterator minBlock = itr;
for (RandomIterator curr = itr + blockSize; curr != end; curr += blockSize) {
/* If this is outright smaller, pick it. */
if (comp(*(curr + blockSize - 1), *(minBlock + blockSize - 1)))
minBlock = curr;
/* Otherwise if it's equal but has a smaller first element, pick it. */
else if (!comp(*(minBlock + blockSize - 1), *(curr + blockSize - 1)) &&
comp(*curr, *minBlock))
minBlock = curr;
}
/* If the element is in the right place, do nothing. Otherwise, swap
* the current block and the best block.
*/
if (minBlock != itr)
std::swap_ranges(itr, itr + blockSize, minBlock);
}
}
/**
* Function: FindEndOfRange(RandomIterator begin, RandomIterator end,
* size_t blockSize, Comparator comp);
* ---------------------------------------------------------------------
* Given a range of blocks [begin, end) of size blockSize, returns an
* iterator to the start of the first block whose head is smaller than
* its predecessor's tail. If no such block exists, end is returned.
*/
template <typename RandomIterator, typename Comparator>
RandomIterator FindEndOfRange(RandomIterator begin, RandomIterator end,
size_t blockSize, Comparator comp) {
/* Scan over the blocks after the first, checking whether the previous
* block end is greater than the current block start.
*/
for (RandomIterator itr = begin + blockSize; itr != end; itr += blockSize)
if (comp(*itr, *(itr - 1)))
return itr;
/* All sorted; hand back end as a sentinel. */
return end;
}
/**
* Function: BufferMovingMerge(RandomIterator begin, RandomIterator mid,
* RandomIterator end, RandomIterator buffer,
* Comparator comp);
* ----------------------------------------------------------------------
* Given a range of elements [buffer, begin), [begin, mid), [mid, end),
* merges the elements into a sorted range such that the resulting
* sequence looks like
*
* [-merged elements-] [buffer] [unconsumed second sequence]
*
* This function then returns an iterator to the new start position of
* the buffer.
*/
template <typename RandomIterator, typename Comparator>
RandomIterator BufferMovingMerge(RandomIterator begin, RandomIterator mid,
RandomIterator end, RandomIterator buffer,
Comparator comp) {
/* This function works by doing a standard merge operation to consume
* [begin, mid). We maintain iterators to the start of the buffer,
* the start of the first sequence, and the start of the second sequence.
* We then continously swap the smaller of the two sequence elements
* with the next buffer element and bump the pointers forward.
*/
RandomIterator one = begin, two = mid, output = buffer;
/* Keep writing until we exhaust the first sequence. We are guaranteed
* that we won't exhaust the second because its last element is bigger
* than the last element of the first sequence.
*/
while (one != mid) {
/* Move the smaller element forward. */
if (!comp(*two, *one)) { // First element is <=
std::iter_swap(one, output);
++one;
}
else { // Second element is smaller
std::iter_swap(two, output);
++two;
}
/* Advance the output pointer to the next spot. */
++output;
}
/* Return the start of the new buffer. */
return output;
}
/**
* Function: MergeBlocks(RandomIterator begin, RandomIterator end,
* size_t blockSize, Comparator comp)
* ----------------------------------------------------------------------
* Given a range of blocks of size blockSize that evenly partition
* [begin, end) into a sequence of blocks, does a merge step to convert the
* range into a sorted range. It is assumed that the elements
* [begin, begin + blockSize) are buffer elements, and these elements will
* end up occupying the last block of the sequence.
*/
template <typename RandomIterator, typename Comparator>
void MergeBlocks(RandomIterator begin, RandomIterator end,
size_t blockSize, Comparator comp) {
/* Begin by sorting all of the blocks in ascending order by their last
* elements.
*/
SortBlocks(begin + blockSize, end, blockSize, comp);
/* Now, we need to start scanning over the blocks, looking for increasing
* sequences that can be merged. We'll do this by maintaining a pointer
* to the start of the first block to merge, where in that block to
* pick up from, and where the buffer is.
*/
RandomIterator buffer = begin, currBlock = begin + blockSize,
currPos = begin + blockSize;
while (true) {
/* Find the first block past the end of the current sequence. */
RandomIterator rangeEnd = FindEndOfRange(currBlock, end,
blockSize,
comp);
/* If we didn't find anything, we're done. */
if (rangeEnd == end) break;
/* Otherwise, do a buffer-moving merge of the range up through the
* start of the last block with the last block, and remember where
* the buffer ended up getting repositioned.
*/
buffer = BufferMovingMerge(currPos, rangeEnd, rangeEnd + blockSize,
buffer, comp);
/* Update the current block to be the end of the range since we
* didn't finish moving everything out of it.
*/
currBlock = rangeEnd;
/* Update the write position to be one block past the buffer start. */
currPos = buffer + blockSize;
}
/* When we get here, we know that the range [currPos, end) is sorted.
* To finish up this step, we sort the buffer using an in-place sort
* (here, heapsort), then rotate it through the rest of the range to
* finish up the sequence.
*/
std::make_heap(buffer, buffer + blockSize, comp);
std::sort_heap(buffer, buffer + blockSize, comp);
std::rotate(buffer, buffer + blockSize, end);
}
}
/* Actual implementation of InplaceMerge */
template <typename RandomIterator, typename Comparator>
void InplaceMerge(RandomIterator begin, RandomIterator end,
Comparator comp) {
/* Grant access to the utility functions we've written. */
using namespace inplacemerge_detail;
/* Confirm that the input has an even number of elements. */
if (std::distance(begin, end) % 2 != 0)
throw std::invalid_argument("Range to merge must have an even number of elements.");
/* In order to guarantee that the input range has sufficiently many elements
* that the named blocks exist, there should be at least 50 elements in
* the range. Otherwise, each half is not guaranteed to have at least
* three blocks. If there are fewer than 50 elements, we'll just use an
* in-place heap sort to do the work. This doesn't affect the overall
* asymptotic runtime of the algorithm, since big-O only considers behavior
* in the limit.
*/
if (std::distance(begin, end) < 50) {
std::make_heap(begin, end, comp);
std::sort_heap(begin, end, comp);
return;
}
/* Compute s, the block size. The casts are necessary to resolve which
* overload to use.
*/
const size_t s = (size_t)std::ceil(std::sqrt(double(end - begin)));
/* Cache the number of elements in each subrange. */
const size_t listSize = (end - begin) / 2;
/* Compute the midpoint, which is where the second list starts and
* the first list stops.
*/
RandomIterator mid = begin + listSize;
/* Get back iterators to the start of blocks A and B. */
std::pair<RandomIterator, RandomIterator> maxElems =
LocateMaxElems(begin, mid, end, s, comp);
/* Compute the groups C and D. Group C is formed from the group of elements
* of size |B| that directly precede A.
*/
RandomIterator cStart = maxElems.first - std::distance(maxElems.second, end);
/* Group D is formed by taking the K - |B| % s elements that precede B. */
RandomIterator dStart = maxElems.second - ((listSize - std::distance(maxElems.second, end)) % s);
/* Exchange C and B. This makes the range [cStart, mid) the buffer. */
std::swap_ranges(cStart, maxElems.first, maxElems.second);
/* Now, we need to merge D and B together using the buffer as the temporary
* storage space. We don't want to move the buffer when doing this, so we'll
* do an in-place merge using the buffer as a temporary.
*/
BufferedInplaceMerge(dStart, maxElems.second, end, // D B
cStart, mid, // Buffer location
comp);
/* Next, we need to fix up the leftover elements from the front of the first
* list. The first list is as it originally was, except that the last block
* has been swapped with the buffer. This means that the number of elements
* that are beyond what's necessary is K mod s. Let's see what this is.
*/
const size_t firstSlack = listSize % s;
/* There are two cases to consider. First, if s == 0, then there is no
* leftover slack and we can just run the main algorithm. Otherwise,
* we need to do a merge of these elements with the smallest elements from
* the second list.
*/
if (firstSlack != 0) {
/* Swap the first block from the second list and the second block from
* the first list. This puts the smallest elements from the two lists
* adjacent to one another.
*/
std::swap_ranges(begin + firstSlack, begin + firstSlack + s, mid);
/* Merge the slack space and this block using the buffer for scratch. */
BufferedInplaceMerge(begin, begin + firstSlack, begin + firstSlack + s,
cStart, mid, comp);
/* Move the blocks back. */
std::swap_ranges(begin + firstSlack, begin + firstSlack + s, mid);
}
/* Okay! At this point we're ready to begin the main algorithm. We do
* this by swapping the buffer and the first actual block.
*/
std::swap_ranges(begin + firstSlack, begin + firstSlack + s, cStart);
/* Run the block-merging step to eat everything from the first block up to
* the start of E.
*/
MergeBlocks(begin + firstSlack, dStart, s, comp);
/* At this point, all that's left to do is merge E into this resulting
* sequence. We do this by swapping E and the smallest elements of the
* range, doing an in-place merge, then rotating the buffer back and
* sorting it.
*/
std::swap_ranges(dStart, end, begin);
BufferedInplaceMerge(begin, begin + distance(dStart, end), dStart,
dStart, end, comp);
/* Sort the buffer using heapsort. */
std::make_heap(dStart, end, comp);
std::sort_heap(dStart, end, comp);
/* Rotate the buffer to the front. */
std::rotate(begin, dStart, end);
}
/* Non-comparator version calls comparator version. */
template <typename RandomIterator>
void InplaceMerge(RandomIterator begin, RandomIterator end) {
InplaceMerge(begin, end,
std::less<typename std::iterator_traits<RandomIterator>::value_type>());
}
#endif