DuckDBのC++コードを読んだメモ1 - リソース管理周り
Ubuntu (WSL2) + VSCodeで、DuckDB (v1.1.3)のコードを動かしながら読むメモ。主にリソース管理周りに絞っている(全体像が掴みたい場合、こちらの記事がよさげ)。
ソースビルド
デバッグしたいので、デバッグビルドに。gdbも無ければ入れておく。
sudo apt-get update && sudo apt-get install -y git g++ cmake ninja-build libssl-dev gdb
git clone https://github.com/duckdb/duckdb
GEN=ninja make debug
ビルドが成功すると、/build/debug以下にいろいろできる。
./build/debug/test/unittest
で、全部のテストが走る(遅い)。
./build/debug/test/unittest test/sql/projection/test_simple_projection.test
のように、対象のテストを指定して、走ればOK。
VSCode側の準備
C++ Extensionを入れた後、launch.jsonに以下を記述。nameとargsは走らせたいテストに応じて適当に。
{
"version": "0.2.0",
"configurations": [
{
"name": "(gdb) test_simple_projection",
"type": "cppdbg",
"request": "launch",
"program": "${workspaceFolder}/build/debug/test/unittest",
"args": ["test/sql/projection/test_simple_projection.test"],
"stopAtEntry": false,
"cwd": "${workspaceFolder}",
"MIMode": "gdb",
"setupCommands": [
{
"description": "Enable pretty-printing for gdb",
"text": "-enable-pretty-printing",
"ignoreFailures": true
},
{
"description": "Set Disassembly Flavor to Intel",
"text": "-gdb-set disassembly-flavor intel",
"ignoreFailures": true
}
]
}
]
}
デバッグ
試しにsrc/main/database.cppのDatabaseInstance::Initializeあたりにブレークポイントを貼ってみる。
この状態でStart Debuggingを実行し、無事ブレークポイントで止まればOK。

主要なデータ構造
duckdb_openまたはduckdb_open_ext (duckdb-c.cpp)からDuckDBインスタンスが生成され、各種初期化処理が実行される。
DuckDBオブジェクトはDatabaseInstanceオブジェクトを持っており、このDatabaseInstanceオブジェクトが、バッファマネージャやタスクスケジューラ、オブジェクトキャッシュ、ファイルシステムなど、主要なオブジェクトを持っている。
duckdb/main/database.hpp:
class DatabaseInstance : public enable_shared_from_this<DatabaseInstance> {
friend class DuckDB;
// (略)
private:
shared_ptr<BufferManager> buffer_manager;
unique_ptr<DatabaseManager> db_manager;
unique_ptr<TaskScheduler> scheduler;
unique_ptr<ObjectCache> object_cache;
unique_ptr<ConnectionManager> connection_manager;
unordered_map<string, ExtensionInfo> loaded_extensions_info;
ValidChecker db_validity;
unique_ptr<DatabaseFileSystem> db_file_system;
shared_ptr<DatabaseCacheEntry> db_cache_entry;
duckdb_ext_api_v1 (*create_api_v1)();
};
バッファマネージャとバッファプール
BufferPool(duckdb/storage/buffer/buffer_pool.hpp)が、DuckDBが内部で確保するメモリを管理するメモリプール。サイズはDBConfigで指定されるが、特に指定されなければシステムで使えるメモリの80%が最大サイズとなる。
void DBConfig::SetDefaultMaxMemory() {
auto memory = GetSystemAvailableMemory(*file_system);
if (memory == DBConfigOptions().maximum_memory) {
// If GetSystemAvailableMemory returned the default, use it as is
options.maximum_memory = memory;
} else {
// Otherwise, use 80% of the available memory
options.maximum_memory = memory * 8 / 10;
}
}
外部からmemory_limitで指定できるのは、多分このバッファプールのサイズ。
DuckDB内部でメモリを確保する際には、バッファプールを直接使うのではなく、バッファマネージャを経由する。バッファマネージャはBufferManagerを継承したStandardBufferManagerが担う。
バッファマネージャの初期化箇所:
void DatabaseInstance::Initialize(const char *database_path, DBConfig *user_config) {
DBConfig default_config;
DBConfig *config_ptr = &default_config;
if (user_config) {
config_ptr = user_config;
}
Configure(*config_ptr, database_path);
create_api_v1 = CreateAPIv1Wrapper;
db_file_system = make_uniq<DatabaseFileSystem>(*this);
db_manager = make_uniq<DatabaseManager>(*this);
if (config.buffer_manager) {
buffer_manager = config.buffer_manager;
} else {
buffer_manager = make_uniq<StandardBufferManager>(*this, config.options.temporary_directory);
}
バッファマネージャはconfigで渡されていればそれを共有し、渡されていなければ新しくインスタンスを生成する。
"システムで使えるメモリ"とは
バッファプールのデフォルト値は、システムで使えるメモリの80%。この”システムで使えるメモリ”とは何か?
Linuxの場合
- Slurmを使っている場合、Slurmのメモリ設定に応じた設定値を返す。
- Slurmはスパコンなどで使われる高性能なジョブスケジューラ。
- cgroupの設定がある場合、その設定値を返す。(v2 -> v1の順で見る)
- 1,2どちらでも無ければ、物理メモリのページ数×ページサイズ(
sysconf(_SC_PHYS_PAGES) * sysconf(_SC_PAGESIZE))を使う。参考:man sysconf(3)- ただし、z/OSの場合(
ifdef __MVS__)、別の定義を使う。
- ただし、z/OSの場合(
Windowsの場合
-
GetPhysicallyInstalledSystemMemoryAPIの戻り値を使う。利用できない場合、GlobalMemoryStatusEx()から取得した情報を使う。
いずれにしても、この”システムで使えるメモリ”は、メモリの空き状況は加味せず、設定された制限値、またはシステムの物理メモリ全体を指す。
メモリ割り当ての流れ
- バッファマネージャは、
StandardBufferManager::EvictBlocksOrThrow経由でバッファプールからメモリの確保を試みる - バッファプールでは、要求されたメモリが確保できない場合、
EvictionQueueに登録されているブロックを順次走査し、解放(unload)して回る- この解放処理は単なる削除のこともあれば、disk spillのこともある
- どちらが適用されるかは、メモリブロックごとのハンドラである
BlockHandleのdestroy_buffer_upon属性によって決まる - 必要分が確保された時点で解放は中断し、確保されたメモリを返す
-
EvictionQueueを全部走査しても要求されたメモリを確保できなかった場合は、OutOfMemoryExceptionを送出する - つまり、足りなくなるまではメモリに置けるものは全部置き、足りなくなったらEvictionQueueから先入れ先出しで解放するLazyな戦略
template <typename... ARGS>
TempBufferPoolReservation StandardBufferManager::EvictBlocksOrThrow(MemoryTag tag, idx_t memory_delta,
unique_ptr<FileBuffer> *buffer, ARGS... args) {
auto r = buffer_pool.EvictBlocks(tag, memory_delta, buffer_pool.maximum_memory, buffer);
if (!r.success) {
string extra_text = StringUtil::Format(" (%s/%s used)", StringUtil::BytesToHumanReadableString(GetUsedMemory()),
StringUtil::BytesToHumanReadableString(GetMaxMemory()));
extra_text += InMemoryWarning();
throw OutOfMemoryException(args..., extra_text);
}
return std::move(r.reservation);
}
BufferPool::EvictionResult BufferPool::EvictBlocksInternal(EvictionQueue &queue, MemoryTag tag, idx_t extra_memory,
idx_t memory_limit, unique_ptr<FileBuffer> *buffer) {
TempBufferPoolReservation r(tag, *this, extra_memory);
bool found = false;
if (memory_usage.GetUsedMemory(MemoryUsageCaches::NO_FLUSH) <= memory_limit) {
if (Allocator::SupportsFlush() && extra_memory > allocator_bulk_deallocation_flush_threshold) {
Allocator::FlushAll();
}
return {true, std::move(r)};
}
queue.IterateUnloadableBlocks([&](BufferEvictionNode &, const shared_ptr<BlockHandle> &handle, BlockLock &lock) {
// hooray, we can unload the block
if (buffer && handle->GetBuffer(lock)->AllocSize() == extra_memory) {
// we can re-use the memory directly
*buffer = handle->UnloadAndTakeBlock(lock);
found = true;
return false;
}
// release the memory and mark the block as unloaded
handle->Unload(lock);
if (memory_usage.GetUsedMemory(MemoryUsageCaches::NO_FLUSH) <= memory_limit) {
found = true;
return false;
}
// Continue iteration
return true;
});
if (!found) {
r.Resize(0);
} else if (Allocator::SupportsFlush() && extra_memory > allocator_bulk_deallocation_flush_threshold) {
Allocator::FlushAll();
}
return {found, std::move(r)};
}
※Eviction … リソース不足時に既存のリソースを解放することで必要量を確保しようとする動きのことをこう呼ぶことがある
EvictionQueue
このEvictionQueueには、以下のような解放しても問題ないメモリブロックがAddToEvictionQueueメソッド経由で明示的に登録されている。
- 参照カウント(
BlockHandle::readers)が0になったブロック- このカウントは
Pin()で増え,Unpin()で減る
- このカウントは
-
ConvertToPersistentを明示的に呼ばれたブロック-
Flush系の名前のメソッドが内部で呼んでいることが多い
-
EvictionQueueは1本ではなく、キュー自体が8本に分かれているが、以下の2つのPRが経緯。
EvictionQueueをFileBufferType(3種類)ごとに分ける:
FileBufferType::MANAGED_BUFFERのキューをさらに6つに分ける:
後者のPRによると、hash aggやjoinで大データをパーティショニングして処理する際に、よりディスクI/Oを減らせるらしい。(ちゃんと理解できていない)
TemporaryMemoryManager
複数スレッドでクエリを並列実行する際、各スレッドが同時に大きなバッファを取ろうとするとOOMになってしまう。
そこで、スレッド間で大規模バッファの調整を行うのがTemporaryMemoryManager。
- 各スレッドは、自分がこれから使うメモリ量を予約しに行く(
SetRemainingSize) - TemporaryMemoryManagerは、その予約量に応じて、現在残っているメモリの最大2/3までを、予約を宣言したスレッドに割り当てる
というシンプルな処理が行われている(多分…)。
各スレッド側は、もし要求した予約量が確保できないことが分かった場合、より省メモリなアルゴリズムに切り替えることでOOMやdisk spillを回避することができる。
以下のPRが詳しいが、このTemporaryMemoryManagerのおかげで、例えば複数Joinが発生するクエリで、各Joinがそれぞれ巨大なハッシュテーブルを必要とするようなケースでも、より省メモリで動作するようになる。
disk spill
EvitionQueueからアンロードされたブロックは、disk spillが必要な場合はStandardBufferManager::WriteTemporaryBufferを呼び出して一時ファイルに内容を書き出す。
unique_ptr<FileBuffer> BlockHandle::UnloadAndTakeBlock(BlockLock &lock) {
VerifyMutex(lock);
if (state == BlockState::BLOCK_UNLOADED) {
// already unloaded: nothing to do
return nullptr;
}
D_ASSERT(!unswizzled);
D_ASSERT(CanUnload());
if (block_id >= MAXIMUM_BLOCK && MustWriteToTemporaryFile()) {
// temporary block that cannot be destroyed upon evict/unpin: write to temporary file
block_manager.buffer_manager.WriteTemporaryBuffer(tag, block_id, *buffer);
}
memory_charge.Resize(0);
state = BlockState::BLOCK_UNLOADED;
return std::move(buffer);
}
書き出す際、TemporaryFileManagerが一時領域のサイズ上限(max_swap_size. 設定名はmax_temp_directory_size)を超過していないかチェックする。※デフォルトは一時領域のサイズは無制限
void TemporaryFileManager::IncreaseSizeOnDisk(idx_t bytes) {
auto current_size_on_disk = GetTotalUsedSpaceInBytes();
if (current_size_on_disk + bytes > max_swap_space) {
auto used = StringUtil::BytesToHumanReadableString(current_size_on_disk);
auto max = StringUtil::BytesToHumanReadableString(max_swap_space);
auto data_size = StringUtil::BytesToHumanReadableString(bytes);
throw OutOfMemoryException(R"(failed to offload data block of size %s (%s/%s used).
This limit was set by the 'max_temp_directory_size' setting.
By default, this setting utilizes the available disk space on the drive where the 'temp_directory' is located.
You can adjust this setting, by using (for example) PRAGMA max_temp_directory_size='10GiB')",
data_size, used, max);
}
size_on_disk += bytes;
}