Skip to content

Commit e6269b1

Browse files
authored
Merge pull request #13 from OpenRiak/nhse-o32-upstream.d31
Nhse o32 upstream.d31
2 parents ae82397 + f0d423e commit e6269b1

File tree

5 files changed

+216
-43
lines changed

5 files changed

+216
-43
lines changed

include/leveled.hrl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
-define(MAX_SSTSLOTS, 256).
3636
-define(MAX_MERGEBELOW, 24).
3737
-define(LOADING_PAUSE, 1000).
38-
-define(LOADING_BATCH, 1000).
38+
-define(LOADING_BATCH, 200).
3939
-define(CACHE_SIZE_JITTER, 25).
4040
-define(JOURNAL_SIZE_JITTER, 20).
4141
-define(LONG_RUNNING, 1000000).

src/leveled_cdb.erl

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -756,6 +756,8 @@ delete_pending({call, From}, cdb_close, State) ->
756756
State#state.filename,
757757
State#state.waste_path),
758758
{stop_and_reply, normal, [{reply, From, ok}]};
759+
delete_pending({call, From}, Event, State) ->
760+
handle_sync_event(Event, From, State);
759761
delete_pending(cast, delete_confirmed, State=#state{delete_point=ManSQN}) ->
760762
leveled_log:log(cdb04, [State#state.filename, ManSQN]),
761763
close_pendingdelete(State#state.handle,

src/leveled_inker.erl

Lines changed: 65 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -558,10 +558,13 @@ handle_call({fold,
558558
Manifest = lists:reverse(leveled_imanifest:to_list(State#state.manifest)),
559559
Folder =
560560
fun() ->
561-
fold_from_sequence(StartSQN,
562-
{FilterFun, InitAccFun, FoldFun},
563-
Acc,
564-
Manifest)
561+
fold_from_sequence(
562+
StartSQN,
563+
State#state.journal_sqn,
564+
{FilterFun, InitAccFun, FoldFun},
565+
Acc,
566+
Manifest
567+
)
565568
end,
566569
case By of
567570
as_ink ->
@@ -1211,8 +1214,12 @@ start_new_activejournal(SQN, RootPath, CDBOpts) ->
12111214

12121215

12131216

1214-
-spec fold_from_sequence(integer(), {fun(), fun(), fun()}, any(), list())
1215-
-> any().
1217+
-spec fold_from_sequence(
1218+
non_neg_integer(),
1219+
pos_integer(),
1220+
{fun(), fun(), fun()},
1221+
any(),
1222+
list()) -> any().
12161223
%% @doc
12171224
%%
12181225
%% Scan from the starting sequence number to the end of the Journal. Apply
@@ -1226,62 +1233,79 @@ start_new_activejournal(SQN, RootPath, CDBOpts) ->
12261233
%% over in batches using foldfile_between_sequence/7. The batch is a range of
12271234
%% sequence numbers (so the batch size may be << ?LOADING_BATCH) in compacted
12281235
%% files
1229-
fold_from_sequence(_MinSQN, _FoldFuns, Acc, []) ->
1236+
fold_from_sequence(_MinSQN, _JournalSQN, _FoldFuns, Acc, []) ->
12301237
Acc;
1231-
fold_from_sequence(MinSQN, FoldFuns, Acc, [{LowSQN, FN, Pid, _LK}|Rest])
1238+
fold_from_sequence(MinSQN, JournalSQN, FoldFuns, Acc, [{LowSQN, FN, Pid, _LK}|Rest])
12321239
when LowSQN >= MinSQN ->
1233-
{NextMinSQN, Acc0} = foldfile_between_sequence(MinSQN,
1234-
MinSQN + ?LOADING_BATCH,
1235-
FoldFuns,
1236-
Acc,
1237-
Pid,
1238-
undefined,
1239-
FN),
1240-
fold_from_sequence(NextMinSQN, FoldFuns, Acc0, Rest);
1241-
fold_from_sequence(MinSQN, FoldFuns, Acc, [{_LowSQN, FN, Pid, _LK}|Rest]) ->
1240+
{NextMinSQN, Acc0} =
1241+
foldfile_between_sequence(
1242+
MinSQN,
1243+
MinSQN + ?LOADING_BATCH,
1244+
JournalSQN,
1245+
FoldFuns,
1246+
Acc,
1247+
Pid,
1248+
undefined,
1249+
FN
1250+
),
1251+
fold_from_sequence(NextMinSQN, JournalSQN, FoldFuns, Acc0, Rest);
1252+
fold_from_sequence(
1253+
MinSQN, JournalSQN, FoldFuns, Acc, [{_LowSQN, FN, Pid, _LK}|Rest]) ->
12421254
% If this file has a LowSQN less than the minimum, we can skip it if the
12431255
% next file also has a LowSQN below the minimum
12441256
{NextMinSQN, Acc0} =
12451257
case Rest of
12461258
[] ->
1247-
foldfile_between_sequence(MinSQN,
1248-
MinSQN + ?LOADING_BATCH,
1249-
FoldFuns,
1250-
Acc,
1251-
Pid,
1252-
undefined,
1253-
FN);
1259+
foldfile_between_sequence(
1260+
MinSQN,
1261+
MinSQN + ?LOADING_BATCH,
1262+
JournalSQN,
1263+
FoldFuns,
1264+
Acc,
1265+
Pid,
1266+
undefined,
1267+
FN
1268+
);
12541269
[{NextSQN, _NxtFN, _NxtPid, _NxtLK}|_Rest] when NextSQN > MinSQN ->
1255-
foldfile_between_sequence(MinSQN,
1256-
MinSQN + ?LOADING_BATCH,
1257-
FoldFuns,
1258-
Acc,
1259-
Pid,
1260-
undefined,
1261-
FN);
1270+
foldfile_between_sequence(
1271+
MinSQN,
1272+
MinSQN + ?LOADING_BATCH,
1273+
JournalSQN,
1274+
FoldFuns,
1275+
Acc,
1276+
Pid,
1277+
undefined,
1278+
FN
1279+
);
12621280
_ ->
12631281
{MinSQN, Acc}
12641282
end,
1265-
fold_from_sequence(NextMinSQN, FoldFuns, Acc0, Rest).
1283+
fold_from_sequence(NextMinSQN, JournalSQN, FoldFuns, Acc0, Rest).
12661284

1267-
foldfile_between_sequence(MinSQN, MaxSQN, FoldFuns,
1268-
Acc, CDBpid, StartPos, FN) ->
1285+
foldfile_between_sequence(
1286+
MinSQN, MaxSQN, JournalSQN, FoldFuns, Acc, CDBpid, StartPos, FN) ->
12691287
{FilterFun, InitAccFun, FoldFun} = FoldFuns,
12701288
InitBatchAcc = {MinSQN, MaxSQN, InitAccFun(FN, MinSQN)},
12711289

12721290
case leveled_cdb:cdb_scan(CDBpid, FilterFun, InitBatchAcc, StartPos) of
12731291
{eof, {AccMinSQN, _AccMaxSQN, BatchAcc}} ->
12741292
{AccMinSQN, FoldFun(BatchAcc, Acc)};
1293+
{_LastPosition, {AccMinSQN, _AccMaxSQN, BatchAcc}}
1294+
when AccMinSQN >= JournalSQN ->
1295+
{AccMinSQN, FoldFun(BatchAcc, Acc)};
12751296
{LastPosition, {_AccMinSQN, _AccMaxSQN, BatchAcc}} ->
12761297
UpdAcc = FoldFun(BatchAcc, Acc),
12771298
NextSQN = MaxSQN + 1,
1278-
foldfile_between_sequence(NextSQN,
1279-
NextSQN + ?LOADING_BATCH,
1280-
FoldFuns,
1281-
UpdAcc,
1282-
CDBpid,
1283-
LastPosition,
1284-
FN)
1299+
foldfile_between_sequence(
1300+
NextSQN,
1301+
NextSQN + ?LOADING_BATCH,
1302+
JournalSQN,
1303+
FoldFuns,
1304+
UpdAcc,
1305+
CDBpid,
1306+
LastPosition,
1307+
FN
1308+
)
12851309
end.
12861310

12871311

src/leveled_pclerk.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -389,7 +389,7 @@ do_merge(
389389

390390
add_entry(empty, FileName, _TS1, Additions) ->
391391
leveled_log:log(pc013, [FileName]),
392-
{[], [], Additions};
392+
{Additions, [], []};
393393
add_entry({ok, Pid, Reply, Bloom}, FileName, TS1, Additions) ->
394394
{{KL1Rem, KL2Rem}, SmallestKey, HighestKey} = Reply,
395395
Entry =

test/end_to_end/riak_SUITE.erl

Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
fetchclocks_modifiedbetween/1,
1212
crossbucket_aae/1,
1313
handoff/1,
14+
handoff_close/1,
15+
handoff_withcompaction/1,
1416
dollar_bucket_index/1,
1517
dollar_key_index/1,
1618
bigobject_memorycheck/1,
@@ -23,6 +25,8 @@ all() -> [
2325
fetchclocks_modifiedbetween,
2426
crossbucket_aae,
2527
handoff,
28+
handoff_close,
29+
handoff_withcompaction,
2630
dollar_bucket_index,
2731
dollar_key_index,
2832
bigobject_memorycheck,
@@ -1633,6 +1637,149 @@ dollar_key_index(_Config) ->
16331637
ok = leveled_bookie:book_close(Bookie1),
16341638
testutil:reset_filestructure().
16351639

1640+
handoff_close(_Config) ->
1641+
RootPath = testutil:reset_filestructure(),
1642+
KeyCount = 500000,
1643+
Bucket = {<<"BType">>, <<"BName">>},
1644+
StartOpts1 =
1645+
[
1646+
{root_path, RootPath},
1647+
{max_journalobjectcount, KeyCount + 1},
1648+
{max_pencillercachesize, 12000},
1649+
{sync_strategy, testutil:sync_strategy()}
1650+
],
1651+
{ok, Bookie1} = leveled_bookie:book_start(StartOpts1),
1652+
ObjList1 =
1653+
testutil:generate_objects(
1654+
KeyCount div 10,
1655+
{fixed_binary, 1}, [],
1656+
leveled_rand:rand_bytes(512),
1657+
fun() -> [] end,
1658+
Bucket
1659+
),
1660+
ObjList2 =
1661+
testutil:generate_objects(
1662+
KeyCount - (KeyCount div 10),
1663+
{fixed_binary, KeyCount div 10 + 1}, [],
1664+
leveled_rand:rand_bytes(512),
1665+
fun() -> [] end,
1666+
Bucket
1667+
),
1668+
testutil:riakload(Bookie1, ObjList1),
1669+
FoldObjectsFun =
1670+
fun(_, _, _, Acc) ->
1671+
[os:timestamp()|Acc]
1672+
end,
1673+
{async, Runner} =
1674+
leveled_bookie:book_objectfold(
1675+
Bookie1,
1676+
?RIAK_TAG,
1677+
{FoldObjectsFun, []},
1678+
true,
1679+
sqn_order
1680+
),
1681+
testutil:riakload(Bookie1, ObjList2),
1682+
TSList = Runner(),
1683+
QueryCompletionTime = os:timestamp(),
1684+
LastTS = hd(TSList),
1685+
io:format(
1686+
"Found ~w objects with Last TS ~w completion time ~w~n",
1687+
[length(TSList), LastTS, QueryCompletionTime]
1688+
),
1689+
true = KeyCount div 10 == length(TSList),
1690+
TimeSinceLastObjectTouchedMS =
1691+
timer:now_diff(QueryCompletionTime, LastTS) div 1000,
1692+
true = TimeSinceLastObjectTouchedMS < 1000,
1693+
leveled_bookie:book_destroy(Bookie1),
1694+
testutil:reset_filestructure().
1695+
1696+
1697+
handoff_withcompaction(_Config) ->
1698+
RootPath = testutil:reset_filestructure(),
1699+
KeyCount = 100000,
1700+
Bucket = {<<"BType">>, <<"BName">>},
1701+
StartOpts1 =
1702+
[
1703+
{root_path, RootPath},
1704+
{max_journalobjectcount, KeyCount div 4},
1705+
{max_pencillercachesize, 12000},
1706+
{sync_strategy, testutil:sync_strategy()},
1707+
{max_run_length, 4}
1708+
],
1709+
{ok, Bookie1} = leveled_bookie:book_start(StartOpts1),
1710+
ObjList1 =
1711+
testutil:generate_objects(
1712+
KeyCount div 4,
1713+
{fixed_binary, 1}, [],
1714+
crypto:strong_rand_bytes(512),
1715+
fun() -> [] end,
1716+
Bucket
1717+
),
1718+
testutil:riakload(Bookie1, ObjList1),
1719+
ObjList2 =
1720+
testutil:generate_objects(
1721+
KeyCount div 4,
1722+
{fixed_binary, (KeyCount div 4) + 1}, [],
1723+
crypto:strong_rand_bytes(512),
1724+
fun() -> [] end,
1725+
Bucket
1726+
),
1727+
testutil:riakload(Bookie1, ObjList2),
1728+
ObjList3 =
1729+
testutil:generate_objects(
1730+
KeyCount div 4,
1731+
{fixed_binary, (KeyCount div 4) * 2 + 1}, [],
1732+
crypto:strong_rand_bytes(512),
1733+
fun() -> [] end,
1734+
Bucket
1735+
),
1736+
testutil:riakload(Bookie1, ObjList3),
1737+
ObjList4 =
1738+
testutil:generate_objects(
1739+
KeyCount div 4,
1740+
{fixed_binary, (KeyCount div 4) * 3 + 1}, [],
1741+
crypto:strong_rand_bytes(512),
1742+
fun() -> [] end,
1743+
Bucket
1744+
),
1745+
testutil:riakload(Bookie1, ObjList4),
1746+
% Now update some objects to prompt compaction
1747+
testutil:update_some_objects(Bookie1, ObjList1, KeyCount div 8),
1748+
testutil:update_some_objects(Bookie1, ObjList2, KeyCount div 8),
1749+
testutil:update_some_objects(Bookie1, ObjList3, KeyCount div 8),
1750+
testutil:update_some_objects(Bookie1, ObjList4, KeyCount div 8),
1751+
1752+
% Setup a handoff-style fold to snapshot journal
1753+
FoldObjectsFun =
1754+
fun(_, K, _, Acc) ->
1755+
[K|Acc]
1756+
end,
1757+
{async, Runner} =
1758+
leveled_bookie:book_objectfold(
1759+
Bookie1,
1760+
?RIAK_TAG,
1761+
{FoldObjectsFun, []},
1762+
true,
1763+
sqn_order
1764+
),
1765+
1766+
% Now compact the journal, twice to be sure
1767+
ok = leveled_bookie:book_compactjournal(Bookie1, 30000),
1768+
testutil:wait_for_compaction(Bookie1),
1769+
ok = leveled_bookie:book_compactjournal(Bookie1, 30000),
1770+
testutil:wait_for_compaction(Bookie1),
1771+
1772+
% Run the fold - some cdb files should now be delete_pending
1773+
{TC0, Results} = timer:tc(Runner),
1774+
io:format(
1775+
"Found ~w objects in ~w ms~n",
1776+
[length(Results), TC0 div 1000]
1777+
),
1778+
true = KeyCount == length(Results),
1779+
leveled_bookie:book_destroy(Bookie1),
1780+
testutil:reset_filestructure().
1781+
1782+
16361783
%% @doc test that the riak specific $bucket indexes can be iterated
16371784
%% using leveled's existing folders
16381785
dollar_bucket_index(_Config) ->

0 commit comments

Comments
 (0)