Open6

DuckDBのC++コードを読んだメモ1 - リソース管理周り

nyanpnyanp

Ubuntu (WSL2) + VSCodeで、DuckDB (v1.1.3)のコードを動かしながら読むメモ。主にリソース管理周りに絞っている(全体像が掴みたい場合、こちらの記事がよさげ)。

ソースビルド

https://duckdb.org/docs/dev/building/build_instructions

デバッグしたいので、デバッグビルドに。gdbも無ければ入れておく。

sudo apt-get update && sudo apt-get install -y git g++ cmake ninja-build libssl-dev gdb
git clone https://github.com/duckdb/duckdb
GEN=ninja make debug

ビルドが成功すると、/build/debug以下にいろいろできる。

./build/debug/test/unittest

で、全部のテストが走る(遅い)。

./build/debug/test/unittest test/sql/projection/test_simple_projection.test

のように、対象のテストを指定して、走ればOK。

VSCode側の準備

C++ Extensionを入れた後、launch.jsonに以下を記述。nameとargsは走らせたいテストに応じて適当に。

{
    "version": "0.2.0",
    "configurations": [
        {
            "name": "(gdb) test_simple_projection",
            "type": "cppdbg",
            "request": "launch",
            "program": "${workspaceFolder}/build/debug/test/unittest",
            "args": ["test/sql/projection/test_simple_projection.test"],
            "stopAtEntry": false,
            "cwd": "${workspaceFolder}",
            "MIMode": "gdb",
            "setupCommands": [
                {
                    "description": "Enable pretty-printing for gdb",
                    "text": "-enable-pretty-printing",
                    "ignoreFailures": true
                },
                {
                    "description": "Set Disassembly Flavor to Intel",
                    "text": "-gdb-set disassembly-flavor intel",
                    "ignoreFailures": true
                }
            ]
        }
        
    ]
}

デバッグ

試しにsrc/main/database.cppDatabaseInstance::Initializeあたりにブレークポイントを貼ってみる。
この状態でStart Debuggingを実行し、無事ブレークポイントで止まればOK。

nyanpnyanp

主要なデータ構造

duckdb_openまたはduckdb_open_ext (duckdb-c.cpp)からDuckDBインスタンスが生成され、各種初期化処理が実行される。
DuckDBオブジェクトはDatabaseInstanceオブジェクトを持っており、このDatabaseInstanceオブジェクトが、バッファマネージャやタスクスケジューラ、オブジェクトキャッシュ、ファイルシステムなど、主要なオブジェクトを持っている。

duckdb/main/database.hpp:

class DatabaseInstance : public enable_shared_from_this<DatabaseInstance> {
	friend class DuckDB;
    // (略)
private:
	shared_ptr<BufferManager> buffer_manager;
	unique_ptr<DatabaseManager> db_manager;
	unique_ptr<TaskScheduler> scheduler;
	unique_ptr<ObjectCache> object_cache;
	unique_ptr<ConnectionManager> connection_manager;
	unordered_map<string, ExtensionInfo> loaded_extensions_info;
	ValidChecker db_validity;
	unique_ptr<DatabaseFileSystem> db_file_system;
	shared_ptr<DatabaseCacheEntry> db_cache_entry;

	duckdb_ext_api_v1 (*create_api_v1)();
};

バッファマネージャとバッファプール

BufferPool(duckdb/storage/buffer/buffer_pool.hpp)が、DuckDBが内部で確保するメモリを管理するメモリプール。サイズはDBConfigで指定されるが、特に指定されなければシステムで使えるメモリの80%が最大サイズとなる。

void DBConfig::SetDefaultMaxMemory() {
	auto memory = GetSystemAvailableMemory(*file_system);
	if (memory == DBConfigOptions().maximum_memory) {
		// If GetSystemAvailableMemory returned the default, use it as is
		options.maximum_memory = memory;
	} else {
		// Otherwise, use 80% of the available memory
		options.maximum_memory = memory * 8 / 10;
	}
}

外部からmemory_limitで指定できるのは、多分このバッファプールのサイズ。

DuckDB内部でメモリを確保する際には、バッファプールを直接使うのではなく、バッファマネージャを経由する。バッファマネージャはBufferManagerを継承したStandardBufferManagerが担う。

バッファマネージャの初期化箇所:

void DatabaseInstance::Initialize(const char *database_path, DBConfig *user_config) {
	DBConfig default_config;
	DBConfig *config_ptr = &default_config;
	if (user_config) {
		config_ptr = user_config;
	}

	Configure(*config_ptr, database_path);

	create_api_v1 = CreateAPIv1Wrapper;

	db_file_system = make_uniq<DatabaseFileSystem>(*this);
	db_manager = make_uniq<DatabaseManager>(*this);
	if (config.buffer_manager) {
		buffer_manager = config.buffer_manager;
	} else {
		buffer_manager = make_uniq<StandardBufferManager>(*this, config.options.temporary_directory);
	}

バッファマネージャはconfigで渡されていればそれを共有し、渡されていなければ新しくインスタンスを生成する。

nyanpnyanp

"システムで使えるメモリ"とは

バッファプールのデフォルト値は、システムで使えるメモリの80%。この”システムで使えるメモリ”とは何か?

Linuxの場合

  1. Slurmを使っている場合、Slurmのメモリ設定に応じた設定値を返す。
    • Slurmはスパコンなどで使われる高性能なジョブスケジューラ。
  2. cgroupの設定がある場合、その設定値を返す。(v2 -> v1の順で見る)
  3. 1,2どちらでも無ければ、物理メモリのページ数×ページサイズ(sysconf(_SC_PHYS_PAGES) * sysconf(_SC_PAGESIZE))を使う。参考:man sysconf(3)
    • ただし、z/OSの場合(ifdef __MVS__)、別の定義を使う。

Windowsの場合

  • GetPhysicallyInstalledSystemMemory APIの戻り値を使う。利用できない場合、GlobalMemoryStatusEx()から取得した情報を使う。

いずれにしても、この”システムで使えるメモリ”は、メモリの空き状況は加味せず、設定された制限値、またはシステムの物理メモリ全体を指す。

nyanpnyanp

メモリ割り当ての流れ

  • バッファマネージャは、StandardBufferManager::EvictBlocksOrThrow経由でバッファプールからメモリの確保を試みる
  • バッファプールでは、要求されたメモリが確保できない場合、EvictionQueueに登録されているブロックを順次走査し、解放(unload)して回る
    • この解放処理は単なる削除のこともあれば、disk spillのこともある
    • どちらが適用されるかは、メモリブロックごとのハンドラであるBlockHandledestroy_buffer_upon属性によって決まる
    • 必要分が確保された時点で解放は中断し、確保されたメモリを返す
  • EvictionQueueを全部走査しても要求されたメモリを確保できなかった場合は、OutOfMemoryExceptionを送出する
  • つまり、足りなくなるまではメモリに置けるものは全部置き、足りなくなったらEvictionQueueから先入れ先出しで解放するLazyな戦略
src/storage/standard_buffer_manager.cpp
template <typename... ARGS>
TempBufferPoolReservation StandardBufferManager::EvictBlocksOrThrow(MemoryTag tag, idx_t memory_delta,
                                                                    unique_ptr<FileBuffer> *buffer, ARGS... args) {
	auto r = buffer_pool.EvictBlocks(tag, memory_delta, buffer_pool.maximum_memory, buffer);
	if (!r.success) {
		string extra_text = StringUtil::Format(" (%s/%s used)", StringUtil::BytesToHumanReadableString(GetUsedMemory()),
		                                       StringUtil::BytesToHumanReadableString(GetMaxMemory()));
		extra_text += InMemoryWarning();
		throw OutOfMemoryException(args..., extra_text);
	}
	return std::move(r.reservation);
}
src/storage/buffer/buffer_pool.cpp
BufferPool::EvictionResult BufferPool::EvictBlocksInternal(EvictionQueue &queue, MemoryTag tag, idx_t extra_memory,
                                                           idx_t memory_limit, unique_ptr<FileBuffer> *buffer) {
	TempBufferPoolReservation r(tag, *this, extra_memory);
	bool found = false;

	if (memory_usage.GetUsedMemory(MemoryUsageCaches::NO_FLUSH) <= memory_limit) {
		if (Allocator::SupportsFlush() && extra_memory > allocator_bulk_deallocation_flush_threshold) {
			Allocator::FlushAll();
		}
		return {true, std::move(r)};
	}

	queue.IterateUnloadableBlocks([&](BufferEvictionNode &, const shared_ptr<BlockHandle> &handle, BlockLock &lock) {
		// hooray, we can unload the block
		if (buffer && handle->GetBuffer(lock)->AllocSize() == extra_memory) {
			// we can re-use the memory directly
			*buffer = handle->UnloadAndTakeBlock(lock);
			found = true;
			return false;
		}

		// release the memory and mark the block as unloaded
		handle->Unload(lock);

		if (memory_usage.GetUsedMemory(MemoryUsageCaches::NO_FLUSH) <= memory_limit) {
			found = true;
			return false;
		}

		// Continue iteration
		return true;
	});

	if (!found) {
		r.Resize(0);
	} else if (Allocator::SupportsFlush() && extra_memory > allocator_bulk_deallocation_flush_threshold) {
		Allocator::FlushAll();
	}

	return {found, std::move(r)};
}

※Eviction … リソース不足時に既存のリソースを解放することで必要量を確保しようとする動きのことをこう呼ぶことがある

EvictionQueue

このEvictionQueueには、以下のような解放しても問題ないメモリブロックがAddToEvictionQueueメソッド経由で明示的に登録されている。

  • 参照カウント(BlockHandle::readers)が0になったブロック
    • このカウントはPin()で増え, Unpin()で減る
  • ConvertToPersistentを明示的に呼ばれたブロック
    • Flush系の名前のメソッドが内部で呼んでいることが多い

EvictionQueueは1本ではなく、キュー自体が8本に分かれているが、以下の2つのPRが経緯。

EvictionQueueをFileBufferType(3種類)ごとに分ける:
https://github.com/duckdb/duckdb/pull/11417
FileBufferType::MANAGED_BUFFERのキューをさらに6つに分ける:
https://github.com/duckdb/duckdb/pull/14375

後者のPRによると、hash aggやjoinで大データをパーティショニングして処理する際に、よりディスクI/Oを減らせるらしい。(ちゃんと理解できていない)

nyanpnyanp

TemporaryMemoryManager

複数スレッドでクエリを並列実行する際、各スレッドが同時に大きなバッファを取ろうとするとOOMになってしまう。
そこで、スレッド間で大規模バッファの調整を行うのがTemporaryMemoryManager

  • 各スレッドは、自分がこれから使うメモリ量を予約しに行く(SetRemainingSize)
  • TemporaryMemoryManagerは、その予約量に応じて、現在残っているメモリの最大2/3までを、予約を宣言したスレッドに割り当てる

というシンプルな処理が行われている(多分…)。
各スレッド側は、もし要求した予約量が確保できないことが分かった場合、より省メモリなアルゴリズムに切り替えることでOOMやdisk spillを回避することができる。

以下のPRが詳しいが、このTemporaryMemoryManagerのおかげで、例えば複数Joinが発生するクエリで、各Joinがそれぞれ巨大なハッシュテーブルを必要とするようなケースでも、より省メモリで動作するようになる。
https://github.com/duckdb/duckdb/pull/10147

nyanpnyanp

disk spill

EvitionQueueからアンロードされたブロックは、disk spillが必要な場合はStandardBufferManager::WriteTemporaryBufferを呼び出して一時ファイルに内容を書き出す。

src/storage/buffer/block_handle.cpp
unique_ptr<FileBuffer> BlockHandle::UnloadAndTakeBlock(BlockLock &lock) {
	VerifyMutex(lock);

	if (state == BlockState::BLOCK_UNLOADED) {
		// already unloaded: nothing to do
		return nullptr;
	}
	D_ASSERT(!unswizzled);
	D_ASSERT(CanUnload());

	if (block_id >= MAXIMUM_BLOCK && MustWriteToTemporaryFile()) {
		// temporary block that cannot be destroyed upon evict/unpin: write to temporary file
		block_manager.buffer_manager.WriteTemporaryBuffer(tag, block_id, *buffer);
	}
	memory_charge.Resize(0);
	state = BlockState::BLOCK_UNLOADED;
	return std::move(buffer);
}

書き出す際、TemporaryFileManagerが一時領域のサイズ上限(max_swap_size. 設定名はmax_temp_directory_size)を超過していないかチェックする。※デフォルトは一時領域のサイズは無制限

src/storage/temporary_file_manager.cpp
void TemporaryFileManager::IncreaseSizeOnDisk(idx_t bytes) {
	auto current_size_on_disk = GetTotalUsedSpaceInBytes();
	if (current_size_on_disk + bytes > max_swap_space) {
		auto used = StringUtil::BytesToHumanReadableString(current_size_on_disk);
		auto max = StringUtil::BytesToHumanReadableString(max_swap_space);
		auto data_size = StringUtil::BytesToHumanReadableString(bytes);
		throw OutOfMemoryException(R"(failed to offload data block of size %s (%s/%s used).
This limit was set by the 'max_temp_directory_size' setting.
By default, this setting utilizes the available disk space on the drive where the 'temp_directory' is located.
You can adjust this setting, by using (for example) PRAGMA max_temp_directory_size='10GiB')",
		                           data_size, used, max);
	}
	size_on_disk += bytes;
}