-
Notifications
You must be signed in to change notification settings - Fork 1k
feat(tiering): Faster small bins serialization #3340
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,6 +4,7 @@ | |
|
||
#include "server/snapshot.h" | ||
|
||
#include <absl/container/btree_set.h> | ||
#include <absl/functional/bind_front.h> | ||
#include <absl/strings/match.h> | ||
#include <absl/strings/str_cat.h> | ||
|
@@ -37,8 +38,80 @@ size_t SliceSnapshot::DbRecord::size() const { | |
return HeapSize(value); | ||
} | ||
|
||
struct SliceSnapshot::TieredSerializer { | ||
struct ValueBase { | ||
DbIndex dbid; | ||
CompactObj key; | ||
time_t expire; | ||
}; | ||
|
||
using PendingValue = std::pair<ValueBase, util::fb2::Future<PrimeValue>>; | ||
using DelayedValue = std::pair<ValueBase, PrimeValue /* stores segment and encoding flags */>; | ||
|
||
static const int kMaxPageAccum = 100000; | ||
|
||
void Save(DbIndex db_indx, const PrimeKey& pk, const PrimeValue& pv, time_t expire_time) { | ||
ValueBase base{db_indx, PrimeKey(pk.ToString()), expire_time}; | ||
|
||
// Delay serialization of small values to possibly find more for the same page, | ||
// reducing the total number of issued io requests. | ||
if (auto [offset, size] = pv.GetExternalSlice(); size < TieredStorage::kMinOccupancySize) { | ||
PrimeValue pv_copy; | ||
pv_copy.ImportExternal(pv); | ||
size_t page = offset / tiering::kPageSize; | ||
auto& entries = delayed_[page]; | ||
{ | ||
delayed_sizes_.erase({entries.size(), page}); | ||
entries.emplace_back(std::move(base), std::move(pv_copy)); | ||
delayed_sizes_.insert({entries.size(), page}); | ||
} | ||
} else { | ||
pending_.push_back(Read(std::move(base), pv)); | ||
} | ||
} | ||
|
||
void FlushDelayed(bool force) { | ||
// Flush pages with most records accumulated first, or all, if forced. | ||
// It's enough just to issue reads, because they are collapsed by the tiered storage internally | ||
while ((force && !delayed_.empty()) || delayed_.size() > kMaxPageAccum) { | ||
Comment on lines
+73
to
+76
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We need to choose some kind of limit here... by the number of pages? by the total sum of enties? 🤷🏻♂️ We have to take into account that once we flush all of those, we can get a memory spike |
||
DCHECK(!delayed_sizes_.empty()); | ||
auto [size, page] = delayed_sizes_.extract(delayed_sizes_.begin()).value(); | ||
auto entries = delayed_.extract(page); | ||
for (auto& [base, value] : entries.mapped()) { | ||
DCHECK(value.IsExternal()); | ||
pending_.push_back(Read(std::move(base), value)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what guarantess that by the time you flush delayed, their segment are still correct? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Inside tiered storage, I don't delete small bin pages while we're serializing |
||
} | ||
} | ||
} | ||
|
||
void Serialize(RdbSerializer* serializer) { | ||
for (auto& [base, value] : pending_) | ||
serializer->SaveEntry(base.key, value.Get(), base.expire, base.dbid); | ||
pending_.clear(); | ||
} | ||
|
||
private: | ||
static PendingValue Read(ValueBase value, const PrimeValue& pv) { | ||
auto* ts = EngineShard::tlocal()->tiered_storage(); | ||
util::fb2::Future<PrimeValue> future; // store PrimeValue directly to avoid further copies | ||
ts->Read(value.dbid, value.key.ToString(), pv, | ||
[future](const std::string& v) mutable { future.Resolve(PrimeValue(v)); }); | ||
return PendingValue{std::move(value), std::move(future)}; | ||
} | ||
|
||
std::vector<PendingValue> pending_; | ||
|
||
// Small values with delayed serialization | ||
absl::flat_hash_map<size_t /* page */, std::vector<DelayedValue>> delayed_; | ||
// Largest entries in delayed map | ||
absl::btree_set<std::pair<size_t /* size */, size_t /* page */>, std::greater<>> delayed_sizes_; | ||
}; | ||
|
||
SliceSnapshot::SliceSnapshot(DbSlice* slice, RecordChannel* dest, CompressionMode compression_mode) | ||
: db_slice_(slice), dest_(dest), compression_mode_(compression_mode) { | ||
: db_slice_(slice), | ||
dest_(dest), | ||
tiered_serializer_(new TieredSerializer{}), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do you want to initialize it unconditionally? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🤷🏻♂️ I don't think it's expensive |
||
compression_mode_(compression_mode) { | ||
db_array_ = slice->databases(); | ||
tl_slice_snapshots.insert(this); | ||
} | ||
|
@@ -308,12 +381,7 @@ void SliceSnapshot::SerializeEntry(DbIndex db_indx, const PrimeKey& pk, const Pr | |
} | ||
|
||
if (pv.IsExternal()) { | ||
// We can't block, so we just schedule a tiered read and append it to the delayed entries | ||
util::fb2::Future<PrimeValue> future; | ||
EngineShard::tlocal()->tiered_storage()->Read( | ||
db_indx, pk.ToString(), pv, | ||
[future](const std::string& v) mutable { future.Resolve(PrimeValue(v)); }); | ||
delayed_entries_.push_back({db_indx, PrimeKey(pk.ToString()), std::move(future), expire_time}); | ||
tiered_serializer_->Save(db_indx, pk, pv, expire_time); | ||
++type_freq_map_[RDB_TYPE_STRING]; | ||
} else { | ||
io::Result<uint8_t> res = serializer->SaveEntry(pk, pv, expire_time, db_indx); | ||
|
@@ -335,29 +403,19 @@ size_t SliceSnapshot::Serialize() { | |
DbRecord db_rec{.id = id, .value = std::move(sfile.val)}; | ||
|
||
dest_->Push(std::move(db_rec)); | ||
if (serialized != 0) { | ||
VLOG(2) << "Pushed with Serialize() " << serialized << " bytes"; | ||
} | ||
|
||
VLOG_IF(2, serialized != 0) << "Pushed with Serialize() " << serialized << " bytes"; | ||
return serialized; | ||
} | ||
|
||
bool SliceSnapshot::PushSerializedToChannel(bool force) { | ||
tiered_serializer_->FlushDelayed(force); | ||
tiered_serializer_->Serialize(serializer_.get()); | ||
|
||
if (!force && serializer_->SerializedLen() < 4096) | ||
return false; | ||
|
||
// Flush any of the leftovers to avoid interleavings | ||
const auto serialized = Serialize(); | ||
|
||
// Bucket serialization might have accumulated some delayed values. | ||
// Because we can finally block in this function, we'll await and serialize them | ||
while (!delayed_entries_.empty()) { | ||
auto& entry = delayed_entries_.back(); | ||
serializer_->SaveEntry(entry.key, entry.value.Get(), entry.expire, entry.dbid); | ||
delayed_entries_.pop_back(); | ||
} | ||
|
||
const auto total_serialized = Serialize() + serialized; | ||
return total_serialized > 0; | ||
return Serialize() > 0; | ||
} | ||
|
||
void SliceSnapshot::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req) { | ||
|
@@ -377,6 +435,9 @@ void SliceSnapshot::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req) | |
stats_.side_saved += SerializeBucket(db_index, it); | ||
}); | ||
} | ||
|
||
// Flush tiered delayed entries to avoid reordering with journal | ||
tiered_serializer_->FlushDelayed(true); | ||
} | ||
Comment on lines
437
to
441
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ideally we pass the value with ChangeReq& and find out whether it was tiered, only if it's tiered and part of a relevant page, we should flush it |
||
|
||
// For any key any journal entry must arrive at the replica strictly after its first original rdb | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -158,7 +158,14 @@ class TieredStorage::ShardOpManager : public tiering::OpManager { | |
return db_slice_.memory_budget() - memory_margin_ - value_len > 0; | ||
} | ||
|
||
void ClearDelayed() { | ||
for (auto segment : delayed_deletes_) | ||
OpManager::DeleteOffloaded(segment); | ||
delayed_deletes_.clear(); | ||
} | ||
|
||
int64_t memory_margin_ = 0; | ||
std::vector<tiering::DiskSegment> delayed_deletes_; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. todo: better naming |
||
|
||
struct { | ||
size_t total_stashes = 0, total_cancels = 0, total_fetches = 0; | ||
|
@@ -228,6 +235,11 @@ bool TieredStorage::ShardOpManager::NotifyDelete(tiering::DiskSegment segment) { | |
|
||
auto bin = ts_->bins_->Delete(segment); | ||
if (bin.empty) { | ||
if (SliceSnapshot::IsSnaphotInProgress()) { | ||
delayed_deletes_.push_back(segment); | ||
return false; | ||
} | ||
|
||
return true; | ||
} | ||
|
||
|
@@ -422,6 +434,8 @@ void TieredStorage::RunOffloading(DbIndex dbid) { | |
if (SliceSnapshot::IsSnaphotInProgress()) | ||
return; | ||
|
||
op_manager_->ClearDelayed(); | ||
|
||
// Don't run offloading if there's only very little space left | ||
auto disk_stats = op_manager_->GetStats().disk_stats; | ||
if (disk_stats.allocated_bytes + kMaxIterations / 2 * tiering::kPageSize > | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: pending_reads_