🔗

dltの仕組み

に公開

はじめに

何か月か前に、dlt入門 と題して、dlt のチュートリアルをなぞるような記事を書きました。
dlt が、ELT の E と L をやってくれるのは、わかったのですが、あまりにも少ない行数でコピーできてしまったので「で、どうなってるの?」を調べてみようという記事です。

やってみる

復習

まずは、前回の復習から。チュートリアルのとおりにやると、CLI が init でモリモリ書いてくれたコードもあるので、バッサリ削除してみます。

REST API

以下のとおり、30行ちょいです。

rest-api/rest_api_pipeline.py
import dlt
from dlt.sources.rest_api import rest_api_source


pokemon_source = rest_api_source(
    {
        "client": {
            "base_url": "https://pokeapi.co/api/v2/",
        },
        "resource_defaults": {
            "endpoint": {
                "params": {
                    "limit": 1000,
                },
            },
        },
        "resources": [
            "pokemon",
            "berry",
            "location",
        ],
    }
)

pipeline = dlt.pipeline(
    pipeline_name="rest_api_pokemon",
    destination="duckdb",
    dataset_name="rest_api_data",
)

load_info = pipeline.run(pokemon_source)
print(load_info)

SQL DB

こっちは、10行ちょいです。

sql-database/sql_database_pipeline.py
import dlt
from dlt.sources.sql_database import sql_database


source = sql_database().with_resources("family", "genome")

pipeline = dlt.pipeline(
    pipeline_name="sql_to_duckdb_pipeline",
    destination="duckdb",
    dataset_name="sql_to_duckdb_pipeline_data",
)

load_info = pipeline.run(source)
print(load_info)

あと、DB への接続情報が必要です。

sql-database/.dlt/secret.toml
[sources.sql_database.credentials]
drivername = "mysql+pymysql" # database+dialect
database = "Rfam"
password = ""
username = "rfamro"
host = "mysql-rfam-public.ebi.ac.uk"
port = 4497

File System

これも 10行ちょい。

file-system/filesystem_pipeline.py
import dlt
from dlt.sources.filesystem import filesystem, read_csv


files = filesystem(bucket_url="gs://filesystem-tutorial", file_glob="encounters*.csv")
reader = (files | read_csv()).with_name("encounters")

pipeline = dlt.pipeline(
    pipeline_name="hospital_data_pipeline",
    destination="duckdb",
    dataset_name="hospital_data",
)

info = pipeline.run(reader)
print(info)

設定情報は、以下の2つ

file-system/.dlt/config.toml
[sources.readers.filesystem]
bucket_url="gs://filesystem-tutorial"
file-system/.dlt/secret.toml
[sources.readers.filesystem.credentials]
client_email = "public-access@dlthub-sandbox.iam.gserviceaccount.com"
project_id = "dlthub-sandbox"
private_key = "-----BEGIN PRIVATE KEY-----\nMIIEvAIBADANBgkqhkiG9w0BAQEFAASCBKYwggSiAgEAAoIBAQDGWsVHJRjliojx\nTo+j1qu+x8PzC5ZHZrMx6e8OD6tO8uxMyl65ByW/4FZkVXkS4SF/UYPigGN+rel4\nFmySTbP9orva4t3Pk1B9YSvQMB7V5IktmTIW9Wmdmn5Al8Owb1RehgIidm1EX/Z9\nLr09oLpO6+jUu9RIP2Lf2mVQ6tvkgl7UOdpdGACSNGzRiZgVZDOaDIgH0Tl4UWmK\n6iPxhwZy9YC2B1beLB/NU+F6DUykrEpBzCFQTqFoTUcuDAEvuvpU9JrU2iBMiOGw\nuP3TYSiudhBjmauEUWaMiqWAgFeX5ft1vc7/QWLdI//SAjaiTAu6pTer29Q0b6/5\niGh0jRXpAgMBAAECggEAL8G9C9MXunRvYkH6/YR7F1T7jbH1fb1xWYwsXWNSaJC+\nagKzabMZ2KfHxSJ7IxuHOCNFMKyex+pRcvNbMqJ4upGKzzmeFBMw5u8VYGulkPQU\nPyFKWRK/Wg3PZffkSr+TPargKrH+vt6n9x3gvEzNbqEIDugmRTrVsHXhvOi/BrYc\nWhppHSVQidWZi5KVwDEPJjDQiHEcYI/vfIy1WhZ8VuPAaE5nMZ1m7gTdeaWWKIAj\n/p2ZkLgRdCY8vNkfaNDAxDbvH+CMuTtOw55GydzsYYiofANS6xZ8CedGkYaGi82f\nqGdLghX61Sg3UAb5SI36T/9XbuCpTf3B/SMV20ew8QKBgQDm2yUxL71UqI/xK9LS\nHWnqfHpKmHZ+U9yLvp3v79tM8XueSRKBTJJ4H+UvVQrXlypT7cUEE+sGpTrCcDGL\nm8irtdUmMvdi7AnRBgmWdYKig/kgajLOUrjXqFt/BcFgqMyTfzqPt3xdp6F3rSEK\nHE6PQ8I3pJ0BJOSJRa6Iw2VH1QKBgQDb9WbVFjYwTIKJOV4J2plTK581H8PI9FSt\nUASXcoMTixybegk8beGdwfm2TkyF/UMzCvHfuaUhf+S0GS5Zk31Wkmh1YbmFU4Q9\nm9K/3eoaqF7CohpigB0wJw4HfqNh6Qt+nICOMCv++gw7+/UwfV72dCqr0lpzfX5F\nAsez8igTxQKBgDsq/axOnQr+rO3WGpGJwmS8BKfrzarxGXyjnV0qr51X4yQdfGWx\nV3T8T8RC2qWI8+tQ7IbwB/PLE3VURg6PHe6MixXgSDGNZ7KwBnMOqS23/3kEXwMs\nhn2Xg+PZeMeqW8yN9ldxYqmqViMTN32c5bGoXzXdtfPeHcjlGCerVOEFAoGADVPi\nRjkRUX3hTvVF6Gzxa2OyQuLI1y1O0C2QCakrngyI0Dblxl6WFBwDyHMYGepNnxMj\nsr2p7sy0C+GWuGDCcHNwluQz/Ish8SW28F8+5xyamUp/NMa0fg1vwS6AMdeQFbzf\n4T2z/MAj66KJqcV+8on5Z+3YAzVwaDgR56pdmU0CgYBo2KWcNWAhZ1Qa6sNrITLV\nGlxg6tWP3OredZrmKb1kj5Tk0V+EwVN+HnKzMalv6yyyK7SWq1Z6rvCye37vy27q\nD7xfuz0c0H+48uWJpdLcsxpTioopsRPayiVDKlHSe/Qa+MEjAG3ded5TJiC+5iSw\nxWJ51y0wpme0LWgzzoLbRw==\n-----END PRIVATE KEY-----\n"

要するに、pipeline.run() に dlt 用語で言うところの Source を渡しておしまい。

まぁ、黒魔術的な何か(笑)である pipeline.run() の中の構造とそれぞれの箇所で何をやっているか見ていきます。

どうなってるの?

上記の REST API を実行すると、ソースコードの最終行 print(load_info) によって、以下のような出力が得られます。

$ uv run rest_api_pipeline.py 
2025-07-08 02:20:03,926|[WARNING]|13043|139762141091648|dlt|client.py|detect_paginator:312|Fallback paginator used: SinglePagePaginator at 7f1cbfca83b0. Please provide right paginator manually.
Pipeline rest_api_pokemon load step completed in 0.19 seconds
1 load package(s) were loaded to destination duckdb and into dataset rest_api_data
The duckdb destination used duckdb:////home/ec2-user/work/mds/dlt-getting-started/rest-api/rest_api_pokemon.duckdb location to store data
Load package 1751941203.521931 is LOADED and contains no failed jobs

このパイプラインの Destination である rest_api_pokemon.duckdb を調べてみます。

$ ~/.duckdb/cli/latest/duckdb rest_api_pokemon.duckdb
v1.2.1 8e52ec4395
Enter ".help" for usage hints.
D .tables
_dlt_loads           _dlt_version         location           
_dlt_pipeline_state  berry                pokemon            
D .schema pokemon
CREATE TABLE rest_api_data.pokemon("name" VARCHAR, url VARCHAR, _dlt_load_id VARCHAR NOT NULL, _dlt_id VARCHAR NOT NULL);
D .maxrows 10
D select * from rest_api_data.pokemon;
┌──────────────────────┬──────────────────────────────────────────┬───────────────────┬────────────────┐
│         name         │                   url                    │   _dlt_load_id    │    _dlt_id     │
│       varchar        │                 varchar                  │      varchar      │    varchar     │
├──────────────────────┼──────────────────────────────────────────┼───────────────────┼────────────────┤
│ bulbasaur            │ https://pokeapi.co/api/v2/pokemon/1/     │ 1751941203.521931 │ rRaJzqYDYOQKzw │
│ ivysaur              │ https://pokeapi.co/api/v2/pokemon/2/     │ 1751941203.521931 │ f+4TykRTqpvGow │
│ venusaur             │ https://pokeapi.co/api/v2/pokemon/3/     │ 1751941203.521931 │ jLaaVsbYDZ2RKg │
│ charmander           │ https://pokeapi.co/api/v2/pokemon/4/     │ 1751941203.521931 │ l63T6AFiyzV1Mw │
│ charmeleon           │ https://pokeapi.co/api/v2/pokemon/5/     │ 1751941203.521931 │ H6cElA2854FO/A │
│     ·                │                  ·                       │         ·         │       ·        │
│     ·                │                  ·                       │         ·         │       ·        │
│     ·                │                  ·                       │         ·         │       ·        │
│ ogerpon-wellspring…  │ https://pokeapi.co/api/v2/pokemon/10273/ │ 1751941203.521931 │ 3ngMzqxa7i1/vA │
│ ogerpon-hearthflam…  │ https://pokeapi.co/api/v2/pokemon/10274/ │ 1751941203.521931 │ wEsxg2ZxkTOgHg │
│ ogerpon-cornerston…  │ https://pokeapi.co/api/v2/pokemon/10275/ │ 1751941203.521931 │ GAB25+W8YF47cg │
│ terapagos-terastal   │ https://pokeapi.co/api/v2/pokemon/10276/ │ 1751941203.521931 │ 9Eu4tjzcesxQCQ │
│ terapagos-stellar    │ https://pokeapi.co/api/v2/pokemon/10277/ │ 1751941203.521931 │ v/g6/OOE1W5xLg │
├──────────────────────┴──────────────────────────────────────────┴───────────────────┴────────────────┤
│ 1302 rows (10 shown)                                                                       4 columns │
└──────────────────────────────────────────────────────────────────────────────────────────────────────┘
D .quit

Pipeline 作業ディレクトリ

やっぱり、よくわかりません。別に隠しているわけではないのでしょうが、Pipeline working directory というのがあって、これとセットで考えないと、スキーマの推定だったり、進化だったりができちゃうのは、意味がわからないです。

ということで、見てみます。

$ tree -a ~/.dlt
/home/ec2-user/.dlt
├── .anonymous_id
└── pipelines
    └── rest_api_pokemon
        ├── load
        │   ├── .version
        │   ├── loaded
        │   │   └── 1751941203.521931
        │   │       ├── applied_schema_updates.json
        │   │       ├── completed_jobs
        │   │       │   ├── _dlt_pipeline_state.01049b38d0.0.insert_values
        │   │       │   ├── berry.fcc100835e.0.insert_values
        │   │       │   ├── location.c617abb58e.0.insert_values
        │   │       │   └── pokemon.4c44034d94.0.insert_values
        │   │       ├── failed_jobs
        │   │       ├── load_package_state.json
        │   │       ├── new_jobs
        │   │       ├── package_completed.json
        │   │       ├── schema.json
        │   │       └── started_jobs
        │   ├── new
        │   └── normalized
        ├── normalize
        │   ├── .version
        │   └── extracted
        ├── schemas
        │   └── rest_api.schema.json
        ├── state.json
        └── trace.pickle

14 directories, 14 files

ま、直感的に normalize/extracted -> load/normalized -> load/loaded なんでしょうね。

読めそうなヤツから、見ていきましょう。以下、~/.dlt/pipeline/rest_api_pokemon/ 配下です。あと、人間用にフォーマットしてあります。

state.json
{
    "_state_version": 1,
    "_state_engine_version": 4,
    "_local": {
        "first_run": false,
        "initial_cwd": "/home/ec2-user/work/mds/dlt-getting-started/rest-api",
        "_last_extracted_at": "2025-07-08T02:20:05.457912+00:00",
        "_last_extracted_hash": "+wss72MdiSWsH7g47UzNMXD/ImQwLpGyJGjr/Honm0k="
    },
    "default_schema_name": "rest_api",
    "pipeline_name": "rest_api_pokemon",
    "dataset_name": "rest_api_data",
    "schema_names": [
        "rest_api"
    ],
    "destination_type": "dlt.destinations.duckdb",
    "destination_name": null,
    "_version_hash": "+wss72MdiSWsH7g47UzNMXD/ImQwLpGyJGjr/Honm0k="
}

前回実行時のバージョン、名前、パス、時刻などがありますので、今回、環境が変わったりしても判定できそうです。

schemas/rest_api.schema.json
{
  "version": 2,
  "version_hash": "bjSDsQVwCCxpVbi/s/7nk6E1Ezg9g65MV87tUMMvA8k=",
  "engine_version": 11,
  "name": "rest_api",
  "tables": {
    "_dlt_version": {
      "name": "_dlt_version",
      "columns": {
        "version": {
          "name": "version",
          "data_type": "bigint",
          "nullable": false
        },
        "engine_version": {
          "name": "engine_version",
          "data_type": "bigint",
          "nullable": false
        },
        "inserted_at": {
          "name": "inserted_at",
          "data_type": "timestamp",
          "nullable": false
        },
        "schema_name": {
          "name": "schema_name",
          "data_type": "text",
          "nullable": false
        },
        "version_hash": {
          "name": "version_hash",
          "data_type": "text",
          "nullable": false
        },
        "schema": {
          "name": "schema",
          "data_type": "text",
          "nullable": false
        }
      },
      "write_disposition": "skip",
      "resource": "_dlt_version",
      "description": "Created by DLT. Tracks schema updates"
    },
    "_dlt_loads": {
      "name": "_dlt_loads",
      "columns": {
        "load_id": {
          "name": "load_id",
          "data_type": "text",
          "nullable": false
        },
        "schema_name": {
          "name": "schema_name",
          "data_type": "text",
          "nullable": true
        },
        "status": {
          "name": "status",
          "data_type": "bigint",
          "nullable": false
        },
        "inserted_at": {
          "name": "inserted_at",
          "data_type": "timestamp",
          "nullable": false
        },
        "schema_version_hash": {
          "name": "schema_version_hash",
          "data_type": "text",
          "nullable": true
        }
      },
      "write_disposition": "skip",
      "resource": "_dlt_loads",
      "description": "Created by DLT. Tracks completed loads"
    },
    "pokemon": {
      "columns": {
        "name": {
          "name": "name",
          "data_type": "text",
          "nullable": true
        },
        "url": {
          "name": "url",
          "data_type": "text",
          "nullable": true
        },
        "_dlt_load_id": {
          "name": "_dlt_load_id",
          "data_type": "text",
          "nullable": false
        },
        "_dlt_id": {
          "name": "_dlt_id",
          "data_type": "text",
          "nullable": false,
          "unique": true,
          "row_key": true
        }
      },
      "write_disposition": "append",
      "name": "pokemon",
      "resource": "pokemon",
      "x-normalizer": {
        "seen-data": true
      }
    },
    "berry": {
      "columns": {
        "name": {
          "name": "name",
          "data_type": "text",
          "nullable": true
        },
        "url": {
          "name": "url",
          "data_type": "text",
          "nullable": true
        },
        "_dlt_load_id": {
          "name": "_dlt_load_id",
          "data_type": "text",
          "nullable": false
        },
        "_dlt_id": {
          "name": "_dlt_id",
          "data_type": "text",
          "nullable": false,
          "unique": true,
          "row_key": true
        }
      },
      "write_disposition": "append",
      "name": "berry",
      "resource": "berry",
      "x-normalizer": {
        "seen-data": true
      }
    },
    "location": {
      "columns": {
        "name": {
          "name": "name",
          "data_type": "text",
          "nullable": true
        },
        "url": {
          "name": "url",
          "data_type": "text",
          "nullable": true
        },
        "_dlt_load_id": {
          "name": "_dlt_load_id",
          "data_type": "text",
          "nullable": false
        },
        "_dlt_id": {
          "name": "_dlt_id",
          "data_type": "text",
          "nullable": false,
          "unique": true,
          "row_key": true
        }
      },
      "write_disposition": "append",
      "name": "location",
      "resource": "location",
      "x-normalizer": {
        "seen-data": true
      }
    },
    "_dlt_pipeline_state": {
      "columns": {
        "version": {
          "name": "version",
          "data_type": "bigint",
          "nullable": false
        },
        "engine_version": {
          "name": "engine_version",
          "data_type": "bigint",
          "nullable": false
        },
        "pipeline_name": {
          "name": "pipeline_name",
          "data_type": "text",
          "nullable": false
        },
        "state": {
          "name": "state",
          "data_type": "text",
          "nullable": false
        },
        "created_at": {
          "name": "created_at",
          "data_type": "timestamp",
          "nullable": false
        },
        "version_hash": {
          "name": "version_hash",
          "data_type": "text",
          "nullable": true
        },
        "_dlt_load_id": {
          "name": "_dlt_load_id",
          "data_type": "text",
          "nullable": false
        },
        "_dlt_id": {
          "name": "_dlt_id",
          "data_type": "text",
          "nullable": false,
          "unique": true,
          "row_key": true
        }
      },
      "write_disposition": "append",
      "file_format": "preferred",
      "name": "_dlt_pipeline_state",
      "resource": "_dlt_pipeline_state",
      "x-normalizer": {
        "seen-data": true
      }
    }
  },
  "settings": {
    "detections": [
      "iso_timestamp"
    ],
    "default_hints": {
      "not_null": [
        "_dlt_id",
        "_dlt_root_id",
        "_dlt_parent_id",
        "_dlt_list_idx",
        "_dlt_load_id"
      ],
      "parent_key": [
        "_dlt_parent_id"
      ],
      "root_key": [
        "_dlt_root_id"
      ],
      "unique": [
        "_dlt_id"
      ],
      "row_key": [
        "_dlt_id"
      ]
    }
  },
  "normalizers": {
    "names": "snake_case",
    "json": {
      "module": "dlt.common.normalizers.json.relational"
    }
  },
  "previous_hashes": [
    "Gdc4n1qRXwTiCnCfqycCWAxxyGDgn7jbj2mCE7FJefs=",
    "um3SJOIZvj59vzg3myb1oaPOCmc8F4VMRHh2hVBN694="
  ]
}

これらは、送信先の DuckDB のスキーマに対応しているようです。(このファイルは、人間用なのか、上記のとおりフォーマットされています)

pokemonberrylocation の3つが Resource 指定したエンドポイントで、_dlt_version_dlt_loads_dlt_pipeline_state の3つが dlt の制御用です。

制御用のテーブルの中身も覗いてみます。hash 値で、その回の状態を保持しているようです。

D select * from rest_api_data._dlt_version;
┌─────────┬────────────────┬──────────────────────┬─────────────┬──────────────────────┬──────────────────────────────────────────────┐
│ version │ engine_version │     inserted_at      │ schema_name │     version_hash     │                    schema                    │
│  int64  │     int64      │ timestamp with tim…  │   varchar   │       varchar        │                   varchar                    │
├─────────┼────────────────┼──────────────────────┼─────────────┼──────────────────────┼──────────────────────────────────────────────┤
│    2    │       11       │ 2025-07-08 02:20:0…  │ rest_api    │ bjSDsQVwCCxpVbi/s/…  │ {"version":2,"version_hash":"bjSDsQVwCCxpV…  │
└─────────┴────────────────┴──────────────────────┴─────────────┴──────────────────────┴──────────────────────────────────────────────┘
D select * from rest_api_data._dlt_loads;
┌───────────────────┬─────────────┬────────┬───────────────────────────────┬──────────────────────────────────────────────┐
│      load_id      │ schema_name │ status │          inserted_at          │             schema_version_hash              │
│      varchar      │   varchar   │ int64  │   timestamp with time zone    │                   varchar                    │
├───────────────────┼─────────────┼────────┼───────────────────────────────┼──────────────────────────────────────────────┤
│ 1751941203.521931 │ rest_api    │   0    │ 2025-07-08 02:20:05.711779+00 │ bjSDsQVwCCxpVbi/s/7nk6E1Ezg9g65MV87tUMMvA8k= │
└───────────────────┴─────────────┴────────┴───────────────────────────────┴──────────────────────────────────────────────┘
D select * from rest_api_data._dlt_version;
┌─────────┬────────────────┬──────────────────────┬─────────────┬──────────────────────┬──────────────────────────────────────────────┐
│ version │ engine_version │     inserted_at      │ schema_name │     version_hash     │                    schema                    │
│  int64  │     int64      │ timestamp with tim…  │   varchar   │       varchar        │                   varchar                    │
├─────────┼────────────────┼──────────────────────┼─────────────┼──────────────────────┼──────────────────────────────────────────────┤
│    2    │       11       │ 2025-07-08 02:20:0…  │ rest_api    │ bjSDsQVwCCxpVbi/s/…  │ {"version":2,"version_hash":"bjSDsQVwCCxpV…  │
└─────────┴────────────────┴──────────────────────┴─────────────┴──────────────────────┴──────────────────────────────────────────────┘

pipeline.run() の内部

これも How dlt works に書いてありますが、pipeline.run は、3つのフェーズで構成されていて、それぞれ以下のとおりです。

  • Extract - ソースからハード ドライブにデータを完全に抽出します。
  • Normalize - データを検査して正規化し、宛先と互換性のあるスキーマを計算します。
  • Load - 必要に応じて宛先でスキーマ移行を実行し、データを宛先にロードします。

また、この3つのフェーズは、前提後続の関係にある(前のフェーズが終わらないと次のフェーズは実行されない)ため、インタプリタで段階的に実行してみます。

$ uv run python
Python 3.12.9 (main, Mar 17 2025, 21:01:58) [Clang 20.1.0 ] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import dlt
>>> from dlt.sources.rest_api import rest_api_source
>>> 
>>> pokemon_source = rest_api_source(
...     {
...         "client": {
...             "base_url": "https://pokeapi.co/api/v2/",
...         },
...         "resource_defaults": {
...             "endpoint": {
...                 "params": {
...                     "limit": 1000,
...                 },
...             },
...         },
...         "resources": [
...             "pokemon",
...             "berry",
...             "location",
...         ],
...     }
... )

>>> 
>>> pipeline = dlt.pipeline(
...     pipeline_name="rest_api_pokemon",
...     destination="duckdb",
...     dataset_name="rest_api_data",
... )
>>> 

この時点で、作業フォルダが作られます。

$ tree -a ~/.dlt
/home/ec2-user/.dlt
└── pipelines
    └── rest_api_pokemon
        ├── schemas
        └── state.json

3 directories, 1 file

Extract

抽出フェーズでは、dlt はソースからハードドライブの新しい ロードパッケージにデータを完全に抽出します。このパッケージには一意の ID が割り当てられ、ソースから受信した生データが含まれます。さらに、スキーマヒントを提供して、一部の列のデータ型を定義したり、主キーや一意のインデックスを追加したりすることもできます。このフェーズは、インクリメンタルカーソルフィールドを使用して 1 回の実行で抽出される項目の数を制限して、並列化でパフォーマンスを調整することでも制御できます。また、フィルターとマップを適用して個人データを難読化または削除したり、トランスフォーマーを使用して派生データを作成したりすることもできます。

Python インタプリタで続けます。

>>> pipeline.extract(pokemon_source)
2025-07-08 05:48:33,220|[WARNING]|26071|139882239305536|dlt|client.py|detect_paginator:312|Fallback paginator used: SinglePagePaginator at 7f38b5b07bc0. Please provide right paginator manually.
ExtractInfo(pipeline=<dlt.pipeline.pipeline.Pipeline object at 0x7f38b67073b0>, metrics={'1751953713.128814': [{'started_at': DateTime(2025, 7, 8, 5, 48, 33, 130317, tzinfo=Timezone('UTC')), 'finished_at': DateTime(2025, 7, 8, 5, 48, 33, 336674, tzinfo=Timezone('UTC')), 'schema_name': 'rest_api', 'job_metrics': {'pokemon.7bc3cd353d.typed-jsonl': DataWriterMetrics(file_path='/home/ec2-user/.dlt/pipelines/rest_api_pokemon/normalize/65328621ae5f8924/1751953713.128814/new_jobs/pokemon.7bc3cd353d.0.typed-jsonl', items_count=1302, file_size=89576, created=1751953713.1911597, last_modified=1751953713.2932928), 'berry.b25d2f5b9b.typed-jsonl': DataWriterMetrics(file_path='/home/ec2-user/.dlt/pipelines/rest_api_pokemon/normalize/65328621ae5f8924/1751953713.128814/new_jobs/berry.b25d2f5b9b.0.typed-jsonl', items_count=64, file_size=3917, created=1751953713.2216043, last_modified=1751953713.2216108), 'location.0aaf26aeae.typed-jsonl': DataWriterMetrics(file_path='/home/ec2-user/.dlt/pipelines/rest_api_pokemon/normalize/65328621ae5f8924/1751953713.128814/new_jobs/location.0aaf26aeae.0.typed-jsonl', items_count=1070, file_size=79277, created=1751953713.2685933, last_modified=1751953713.3165731)}, 'table_metrics': {'pokemon': DataWriterMetrics(file_path='', items_count=1302, file_size=89576, created=1751953713.1911597, last_modified=1751953713.2932928), 'berry': DataWriterMetrics(file_path='', items_count=64, file_size=3917, created=1751953713.2216043, last_modified=1751953713.2216108), 'location': DataWriterMetrics(file_path='', items_count=1070, file_size=79277, created=1751953713.2685933, last_modified=1751953713.3165731)}, 'resource_metrics': {'pokemon': DataWriterMetrics(file_path='', items_count=1302, file_size=89576, created=1751953713.1911597, last_modified=1751953713.2932928), 'berry': DataWriterMetrics(file_path='', items_count=64, file_size=3917, created=1751953713.2216043, last_modified=1751953713.2216108), 'location': DataWriterMetrics(file_path='', items_count=1070, file_size=79277, created=1751953713.2685933, last_modified=1751953713.3165731)}, 'dag': [('pokemon', 'pokemon'), ('berry', 'berry'), ('location', 'location')], 'hints': {'pokemon': {'write_disposition': 'append'}, 'berry': {'write_disposition': 'append'}, 'location': {'write_disposition': 'append'}}}, {'started_at': DateTime(2025, 7, 8, 5, 48, 33, 340304, tzinfo=Timezone('UTC')), 'finished_at': DateTime(2025, 7, 8, 5, 48, 33, 353538, tzinfo=Timezone('UTC')), 'schema_name': 'rest_api', 'job_metrics': {'_dlt_pipeline_state.fa339c289f.typed-jsonl': DataWriterMetrics(file_path='/home/ec2-user/.dlt/pipelines/rest_api_pokemon/normalize/65328621ae5f8924/1751953713.128814/new_jobs/_dlt_pipeline_state.fa339c289f.0.typed-jsonl', items_count=1, file_size=493, created=1751953713.3420017, last_modified=1751953713.342006)}, 'table_metrics': {'_dlt_pipeline_state': DataWriterMetrics(file_path='', items_count=1, file_size=493, created=1751953713.3420017, last_modified=1751953713.342006)}, 'resource_metrics': {'_dlt_pipeline_state': DataWriterMetrics(file_path='', items_count=1, file_size=493, created=1751953713.3420017, last_modified=1751953713.342006)}, 'dag': [('_dlt_pipeline_state', '_dlt_pipeline_state')], 'hints': {'_dlt_pipeline_state': {'columns': 'version:\n  name: version\n  data_type: bigint\n  nullable: false\nengine_version:\n  name: engine_version\n  data_type: bigint\n  nullable: false\npipeline_name:\n  name: pipeline_name\n  data_type: text\n  nullable: false\nstate:\n  name: state\n  data_type: text\n  nullable: false\ncreated_at:\n  name: created_at\n  data_type: timestamp\n  nullable: false\nversion_hash:\n  name: version_hash\n  data_type: text\n  nullable: true\n_dlt_load_id:\n  name: _dlt_load_id\n  data_type: text\n  nullable: false\n', 'write_disposition': 'append', 'file_format': 'preferred', 'original_columns': 'dict'}}}]}, extract_data_info=[{'name': 'rest_api', 'data_type': 'source'}], loads_ids=['1751953713.128814'], load_packages=[LoadPackageInfo(load_id='1751953713.128814', package_path='/home/ec2-user/.dlt/pipelines/rest_api_pokemon/normalize/extracted/1751953713.128814', state='extracted', schema=Schema rest_api at 139881552664320, schema_update={}, completed_at=None, jobs={'started_jobs': [], 'failed_jobs': [], 'new_jobs': [LoadJobInfo(state='new_jobs', file_path='/home/ec2-user/.dlt/pipelines/rest_api_pokemon/normalize/extracted/1751953713.128814/new_jobs/pokemon.7bc3cd353d.0.typed-jsonl', file_size=11308, created_at=DateTime(2025, 7, 8, 5, 48, 33, 323021, tzinfo=Timezone('UTC')), elapsed=0.03204035758972168, job_file_info=ParsedLoadJobFileName(table_name='pokemon', file_id='7bc3cd353d', retry_count=0, file_format='typed-jsonl'), failed_message=None), LoadJobInfo(state='new_jobs', file_path='/home/ec2-user/.dlt/pipelines/rest_api_pokemon/normalize/extracted/1751953713.128814/new_jobs/berry.b25d2f5b9b.0.typed-jsonl', file_size=591, created_at=DateTime(2025, 7, 8, 5, 48, 33, 323021, tzinfo=Timezone('UTC')), elapsed=0.032064199447631836, job_file_info=ParsedLoadJobFileName(table_name='berry', file_id='b25d2f5b9b', retry_count=0, file_format='typed-jsonl'), failed_message=None), LoadJobInfo(state='new_jobs', file_path='/home/ec2-user/.dlt/pipelines/rest_api_pokemon/normalize/extracted/1751953713.128814/new_jobs/location.0aaf26aeae.0.typed-jsonl', file_size=9972, created_at=DateTime(2025, 7, 8, 5, 48, 33, 323021, tzinfo=Timezone('UTC')), elapsed=0.03207993507385254, job_file_info=ParsedLoadJobFileName(table_name='location', file_id='0aaf26aeae', retry_count=0, file_format='typed-jsonl'), failed_message=None), LoadJobInfo(state='new_jobs', file_path='/home/ec2-user/.dlt/pipelines/rest_api_pokemon/normalize/extracted/1751953713.128814/new_jobs/_dlt_pipeline_state.fa339c289f.0.typed-jsonl', file_size=484, created_at=DateTime(2025, 7, 8, 5, 48, 33, 343021, tzinfo=Timezone('UTC')), elapsed=0.01209259033203125, job_file_info=ParsedLoadJobFileName(table_name='_dlt_pipeline_state', file_id='fa339c289f', retry_count=0, file_format='typed-jsonl'), failed_message=None)], 'completed_jobs': []})], first_run=True)

この時点の作業フォルダは、以下のようになります。最初に見た run() の実行後では空になっていた normalize フォルダにデータが作成され、schemas/rest_api.schema.json も作成されています。

$ tree -a ~/.dlt
/home/ec2-user/.dlt
├── .anonymous_id
└── pipelines
    └── rest_api_pokemon
        ├── normalize
        │   ├── .version
        │   └── extracted
        │       └── 1751953713.128814
        │           ├── completed_jobs
        │           ├── failed_jobs
        │           ├── load_package_state.json
        │           ├── new_jobs
        │           │   ├── _dlt_pipeline_state.fa339c289f.0.typed-jsonl
        │           │   ├── berry.b25d2f5b9b.0.typed-jsonl
        │           │   ├── location.0aaf26aeae.0.typed-jsonl
        │           │   └── pokemon.7bc3cd353d.0.typed-jsonl
        │           ├── schema.json
        │           └── started_jobs
        ├── schemas
        │   └── rest_api.schema.json
        ├── state.json
        └── trace.pickle

10 directories, 11 files
.json ファイル

extract() 時点で生成されていた .json ファイルは、以下の4つ

  • state.json
  • schemas/rest_api.schema.json
  • normalize/extracted/1751953713.128814/load_package_state.json
  • normalize/extracted/1751953713.128814/schema.json

ただし、rest_api.schema.json と schema.json は、フォーマット以外は同じ

.typed-jsonl ファイル

gzip 圧縮された JSON ファイルなので、gunzip するとエディタで見ることができます。
値は、Resource からの戻りのままのようで、dlt の制御用のカラムなどはありません。

normalize/extracted/1751953713.128814/new_jobs/berry.b25d2f5b9b.0.typed-jsonl
[
    {
        "name": "cheri",
        "url": "https://pokeapi.co/api/v2/berry/1/"
    },
    {
        "name": "chesto",
        "url": "https://pokeapi.co/api/v2/berry/2/"
    },
    {
        "name": "pecha",
        "url": "https://pokeapi.co/api/v2/berry/3/"
    },

上記は、ファイルの先頭部分を gunzip して、フォーマット処理しています。

Normalize

続いて、normalize です。

正規化フェーズでは、dlt はデータを検査して正規化し、入力データに対応するスキーマを計算します。スキーマは、新しい列やテーブルなどの将来のソースデータの変更に対応するために自動的に進化します。また、dlt は、検出された値が前回の実行中に計算されたスキーマと一致しない場合は、ネストされたデータ構造を子テーブルにネスト解除し、バリアント列を作成します。正規化フェーズの結果は、宛先が理解できる形式で正規化されたデータを保持する更新されたロードパッケージと、データを宛先に移行するために使用できる完全なスキーマです。正規化フェーズは、たとえば、入力データの 許可されるネストレベルを定義 したり、スキーマがどのように進化するか、および適合しない行がどのように処理されるかを制御する スキーマコントラクトを適用したりすることで制御できます。パフォーマンス設定も 使用可能です。

>>> pipeline.normalize()
NormalizeInfo(pipeline=<dlt.pipeline.pipeline.Pipeline object at 0x7f38b67073b0>, metrics={'1751953713.128814': [{'started_at': DateTime(2025, 7, 8, 7, 40, 29, 401318, tzinfo=Timezone('UTC')), 'finished_at': DateTime(2025, 7, 8, 7, 40, 29, 455414, tzinfo=Timezone('UTC')), 'job_metrics': {'pokemon.8b737a2346.insert_values': DataWriterMetrics(file_path='/home/ec2-user/.dlt/pipelines/rest_api_pokemon/load/new/1751953713.128814/new_jobs/pokemon.8b737a2346.0.insert_values', items_count=1302, file_size=127392, created=1751960429.4061627, last_modified=1751960429.416432), 'berry.3d831d8f67.insert_values': DataWriterMetrics(file_path='/home/ec2-user/.dlt/pipelines/rest_api_pokemon/load/new/1751953713.128814/new_jobs/berry.3d831d8f67.0.insert_values', items_count=64, file_size=5831, created=1751960429.4174974, last_modified=1751960429.4179692), 'location.5aaa1586bf.insert_values': DataWriterMetrics(file_path='/home/ec2-user/.dlt/pipelines/rest_api_pokemon/load/new/1751953713.128814/new_jobs/location.5aaa1586bf.0.insert_values', items_count=1070, file_size=110365, created=1751960429.4197912, last_modified=1751960429.4280107), '_dlt_pipeline_state.897a9fd413.insert_values': DataWriterMetrics(file_path='/home/ec2-user/.dlt/pipelines/rest_api_pokemon/load/new/1751953713.128814/new_jobs/_dlt_pipeline_state.897a9fd413.0.insert_values', items_count=1, file_size=543, created=1751960429.4289558, last_modified=1751960429.4289594)}, 'table_metrics': {'pokemon': DataWriterMetrics(file_path='', items_count=1302, file_size=127392, created=1751960429.4061627, last_modified=1751960429.416432), 'berry': DataWriterMetrics(file_path='', items_count=64, file_size=5831, created=1751960429.4174974, last_modified=1751960429.4179692), 'location': DataWriterMetrics(file_path='', items_count=1070, file_size=110365, created=1751960429.4197912, last_modified=1751960429.4280107), '_dlt_pipeline_state': DataWriterMetrics(file_path='', items_count=1, file_size=543, created=1751960429.4289558, last_modified=1751960429.4289594)}}]}, loads_ids=['1751953713.128814'], load_packages=[LoadPackageInfo(load_id='1751953713.128814', package_path='/home/ec2-user/.dlt/pipelines/rest_api_pokemon/load/normalized/1751953713.128814', state='normalized', schema=Schema rest_api at 139881553269472, schema_update={}, completed_at=None, jobs={'started_jobs': [], 'failed_jobs': [], 'new_jobs': [LoadJobInfo(state='new_jobs', file_path='/home/ec2-user/.dlt/pipelines/rest_api_pokemon/load/normalized/1751953713.128814/new_jobs/pokemon.8b737a2346.0.insert_values', file_size=28504, created_at=DateTime(2025, 7, 8, 7, 40, 29, 433150, tzinfo=Timezone('UTC')), elapsed=0.02295827865600586, job_file_info=ParsedLoadJobFileName(table_name='pokemon', file_id='8b737a2346', retry_count=0, file_format='insert_values'), failed_message=None), LoadJobInfo(state='new_jobs', file_path='/home/ec2-user/.dlt/pipelines/rest_api_pokemon/load/normalized/1751953713.128814/new_jobs/berry.3d831d8f67.0.insert_values', file_size=1485, created_at=DateTime(2025, 7, 8, 7, 40, 29, 433150, tzinfo=Timezone('UTC')), elapsed=0.022977113723754883, job_file_info=ParsedLoadJobFileName(table_name='berry', file_id='3d831d8f67', retry_count=0, file_format='insert_values'), failed_message=None), LoadJobInfo(state='new_jobs', file_path='/home/ec2-user/.dlt/pipelines/rest_api_pokemon/load/normalized/1751953713.128814/new_jobs/location.5aaa1586bf.0.insert_values', file_size=23973, created_at=DateTime(2025, 7, 8, 7, 40, 29, 443150, tzinfo=Timezone('UTC')), elapsed=0.012989997863769531, job_file_info=ParsedLoadJobFileName(table_name='location', file_id='5aaa1586bf', retry_count=0, file_format='insert_values'), failed_message=None), LoadJobInfo(state='new_jobs', file_path='/home/ec2-user/.dlt/pipelines/rest_api_pokemon/load/normalized/1751953713.128814/new_jobs/_dlt_pipeline_state.897a9fd413.0.insert_values', file_size=515, created_at=DateTime(2025, 7, 8, 7, 40, 29, 443150, tzinfo=Timezone('UTC')), elapsed=0.013002395629882812, job_file_info=ParsedLoadJobFileName(table_name='_dlt_pipeline_state', file_id='897a9fd413', retry_count=0, file_format='insert_values'), failed_message=None)], 'completed_jobs': []})], first_run=True)

そして、作業フォルダーは、以下のように、予想通り normalize/extracted から load/normalized へシフトしました。

$ tree -a ~/.dlt
/home/ec2-user/.dlt
├── .anonymous_id
└── pipelines
    └── rest_api_pokemon
        ├── load
        │   ├── .version
        │   ├── loaded
        │   ├── new
        │   └── normalized
        │       └── 1751953713.128814
        │           ├── completed_jobs
        │           ├── failed_jobs
        │           ├── load_package_state.json
        │           ├── new_jobs
        │           │   ├── _dlt_pipeline_state.897a9fd413.0.insert_values
        │           │   ├── berry.3d831d8f67.0.insert_values
        │           │   ├── location.5aaa1586bf.0.insert_values
        │           │   └── pokemon.8b737a2346.0.insert_values
        │           ├── schema.json
        │           ├── schema_updates.json
        │           └── started_jobs
        ├── normalize
        │   ├── .version
        │   └── extracted
        ├── schemas
        │   └── rest_api.schema.json
        ├── state.json
        └── trace.pickle

14 directories, 13 files
state.json

extract() 時点と同じです。

schemas/rest_api.schema.json と schema.json

extract() 時点では、Schema 推定がされていませんでしたので、以下のように情報が追加されています。

rest_api.schema.json
--- ext/rest_api.schema.json    2025-07-08 08:43:26.903138339 +0000
+++ nor/rest_api.schema.json    2025-07-08 08:43:39.153138116 +0000
@@ -1,6 +1,6 @@
 {
-  "version": 1,
-  "version_hash": "Gdc4n1qRXwTiCnCfqycCWAxxyGDgn7jbj2mCE7FJefs=",
+  "version": 2,
+  "version_hash": "bjSDsQVwCCxpVbi/s/7nk6E1Ezg9g65MV87tUMMvA8k=",
   "engine_version": 11,
   "name": "rest_api",
   "tables": {
@@ -76,22 +76,100 @@
       "description": "Created by DLT. Tracks completed loads"
     },
     "pokemon": {
-      "columns": {},
+      "columns": {
+        "name": {
+          "name": "name",
+          "data_type": "text",
+          "nullable": true
+        },
+        "url": {
+          "name": "url",
+          "data_type": "text",
+          "nullable": true
+        },
+        "_dlt_load_id": {
+          "name": "_dlt_load_id",
+          "data_type": "text",
+          "nullable": false
+        },
+        "_dlt_id": {
+          "name": "_dlt_id",
+          "data_type": "text",
+          "nullable": false,
+          "unique": true,
+          "row_key": true
+        }
+      },
       "write_disposition": "append",
       "name": "pokemon",
-      "resource": "pokemon"
+      "resource": "pokemon",
+      "x-normalizer": {
+        "seen-data": true
+      }
     },
     "berry": {
-      "columns": {},
+      "columns": {
+        "name": {
+          "name": "name",
+          "data_type": "text",
+          "nullable": true
+        },
+        "url": {
+          "name": "url",
+          "data_type": "text",
+          "nullable": true
+        },
+        "_dlt_load_id": {
+          "name": "_dlt_load_id",
+          "data_type": "text",
+          "nullable": false
+        },
+        "_dlt_id": {
+          "name": "_dlt_id",
+          "data_type": "text",
+          "nullable": false,
+          "unique": true,
+          "row_key": true
+        }
+      },
       "write_disposition": "append",
       "name": "berry",
-      "resource": "berry"
+      "resource": "berry",
+      "x-normalizer": {
+        "seen-data": true
+      }
     },
     "location": {
-      "columns": {},
+      "columns": {
+        "name": {
+          "name": "name",
+          "data_type": "text",
+          "nullable": true
+        },
+        "url": {
+          "name": "url",
+          "data_type": "text",
+          "nullable": true
+        },
+        "_dlt_load_id": {
+          "name": "_dlt_load_id",
+          "data_type": "text",
+          "nullable": false
+        },
+        "_dlt_id": {
+          "name": "_dlt_id",
+          "data_type": "text",
+          "nullable": false,
+          "unique": true,
+          "row_key": true
+        }
+      },
       "write_disposition": "append",
       "name": "location",
-      "resource": "location"
+      "resource": "location",
+      "x-normalizer": {
+        "seen-data": true
+      }
     },
     "_dlt_pipeline_state": {
       "columns": {
@@ -129,12 +207,22 @@
           "name": "_dlt_load_id",
           "data_type": "text",
           "nullable": false
+        },
+        "_dlt_id": {
+          "name": "_dlt_id",
+          "data_type": "text",
+          "nullable": false,
+          "unique": true,
+          "row_key": true
         }
       },
       "write_disposition": "append",
       "file_format": "preferred",
       "name": "_dlt_pipeline_state",
-      "resource": "_dlt_pipeline_state"
+      "resource": "_dlt_pipeline_state",
+      "x-normalizer": {
+        "seen-data": true
+      }
     }
   },
   "settings": {
@@ -170,6 +258,7 @@
     }
   },
   "previous_hashes": [
+    "Gdc4n1qRXwTiCnCfqycCWAxxyGDgn7jbj2mCE7FJefs=",
     "um3SJOIZvj59vzg3myb1oaPOCmc8F4VMRHh2hVBN694="
   ]
 }
load/normalized/1751953713.128814/load_package_state.json

extract() 時点と同じです。

load/normalized/1751953713.128814/schema_updates.json

normalize() で追加されています。schema.json の更新部分に該当します。

load/normalized/1751953713.128814/schema_updates.json
{
    "pokemon": {
        "columns": {
            "name": {
                "name": "name",
                "data_type": "text",
                "nullable": true
            },
            "url": {
                "name": "url",
                "data_type": "text",
                "nullable": true
            },
            "_dlt_load_id": {
                "name": "_dlt_load_id",
                "data_type": "text",
                "nullable": false
            },
            "_dlt_id": {
                "name": "_dlt_id",
                "data_type": "text",
                "nullable": false,
                "unique": true,
                "row_key": true
            }
        },
        "write_disposition": "append",
        "name": "pokemon",
        "resource": "pokemon"
    },
    "berry": {
        "columns": {
            "name": {
                "name": "name",
                "data_type": "text",
                "nullable": true
            },
            "url": {
                "name": "url",
                "data_type": "text",
                "nullable": true
            },
            "_dlt_load_id": {
                "name": "_dlt_load_id",
                "data_type": "text",
                "nullable": false
            },
            "_dlt_id": {
                "name": "_dlt_id",
                "data_type": "text",
                "nullable": false,
                "unique": true,
                "row_key": true
            }
        },
        "write_disposition": "append",
        "name": "berry",
        "resource": "berry"
    },
    "location": {
        "columns": {
            "name": {
                "name": "name",
                "data_type": "text",
                "nullable": true
            },
            "url": {
                "name": "url",
                "data_type": "text",
                "nullable": true
            },
            "_dlt_load_id": {
                "name": "_dlt_load_id",
                "data_type": "text",
                "nullable": false
            },
            "_dlt_id": {
                "name": "_dlt_id",
                "data_type": "text",
                "nullable": false,
                "unique": true,
                "row_key": true
            }
        },
        "write_disposition": "append",
        "name": "location",
        "resource": "location"
    },
    "_dlt_pipeline_state": {
        "columns": {
            "_dlt_id": {
                "name": "_dlt_id",
                "data_type": "text",
                "nullable": false,
                "unique": true,
                "row_key": true
            }
        },
        "write_disposition": "append",
        "file_format": "preferred",
        "name": "_dlt_pipeline_state",
        "resource": "_dlt_pipeline_state"
    }
}
.insert_values ファイル

このファイルも gzip 圧縮されたテキストファイルです。.typed-jsonl ファイルとは違って、INSERT 文なのと dlt の制御用項目が追加されています。

load/normalized/1751953713.128814/new_jobs/berry.3d831d8f67.0.insert_values
INSERT INTO {}("name","url","_dlt_load_id","_dlt_id")
VALUES
(E'cheri',E'https://pokeapi.co/api/v2/berry/1/',E'1751953713.128814',E'rD+cQgbhNw+00Q'),
(E'chesto',E'https://pokeapi.co/api/v2/berry/2/',E'1751953713.128814',E'w1ozBUchmgOyUA'),
(E'pecha',E'https://pokeapi.co/api/v2/berry/3/',E'1751953713.128814',E'Wt2UDpLn8i1Wqw'),

上記は、ファイルの先頭部分を gunzip しています。

Load

最後は load です。

ロードフェーズでは、dlt は最初に必要に応じて宛先でスキーマ移行を実行し、次にデータを宛先に読み込みます。dlt は、大規模な読み込みを並列化できるように、ロードジョブと呼ばれる小さなチャンクでデータを読み込みます。宛先への接続が失敗した場合は、パイプラインを再実行しても安全であり、dlt は現在の読み込みパッケージからすべてのロードジョブの読み込みを続行します。dlt は、内部 dlt スキーマ、すべてのロードパッケージに関する情報、およびインクリメンタルに使用されるいくつかの状態情報などを格納する特別なテーブルも作成します。これらの情報は、インクリメンタルに前回の実行から別のマシンへの増分状態を復元できるようにします。ロードフェーズを制御する方法には、異なる write_dispositions を使用して宛先のデータを置き換えたり、単に追加したり、テーブルごとに構成できる特定のマージキーでマージしたりする方法があります。一部の宛先では、バケットプロバイダー上のリモートステージングデータセットを使用できます。また、dlt は deltables や iceberg などの最新のオープンテーブル形式もサポートしており、リバース ETL も可能です。

>>> load_info = pipeline.load()
>>> print(load_info)
Pipeline rest_api_pokemon load step completed in 0.18 seconds
1 load package(s) were loaded to destination duckdb and into dataset rest_api_data
The duckdb destination used duckdb:////home/ec2-user/work/mds/dlt-getting-started/rest-api/rest_api_pokemon.duckdb location to store data
Load package 1751953713.128814 is LOADED and contains no failed jobs

作業フォルダーは、当然、run のときと同じような状態になります。
もちろん、DuckDB にロードもできています。

$ tree -a ~/.dlt
/home/ec2-user/.dlt
├── .anonymous_id
└── pipelines
    └── rest_api_pokemon
        ├── load
        │   ├── .version
        │   ├── loaded
        │   │   └── 1751953713.128814
        │   │       ├── applied_schema_updates.json
        │   │       ├── completed_jobs
        │   │       │   ├── _dlt_pipeline_state.897a9fd413.0.insert_values
        │   │       │   ├── berry.3d831d8f67.0.insert_values
        │   │       │   ├── location.5aaa1586bf.0.insert_values
        │   │       │   └── pokemon.8b737a2346.0.insert_values
        │   │       ├── failed_jobs
        │   │       ├── load_package_state.json
        │   │       ├── new_jobs
        │   │       ├── package_completed.json
        │   │       ├── schema.json
        │   │       └── started_jobs
        │   ├── new
        │   └── normalized
        ├── normalize
        │   ├── .version
        │   └── extracted
        ├── schemas
        │   └── rest_api.schema.json
        ├── state.json
        └── trace.pickle

14 directories, 14 files
state.json

extract()normalize() 時点からの更新され、 _local.first_run の値が true から false に変わりましたので、この load() 以降は、前回データがある想定で動きます。

schemas/rest_api.schema.json と schema.json

normalize() で確定した schema と同じです。

load/loaded/1751953713.128814/applied_schema_updates.json

名前的にも normalize() で作成された schema_updates.json に対応していそうですが、以下のように、dlt の制御用のテーブルのスキーマが追加されています。

applied_schema_updates.json
--- nor/schema_updates.json     2025-07-08 08:25:07.673154450 +0000
+++ loa/applied_schema_updates.json     2025-07-08 08:25:41.403152288 +0000
@@ -26,7 +26,10 @@
         },
         "write_disposition": "append",
         "name": "pokemon",
-        "resource": "pokemon"
+        "resource": "pokemon",
+        "x-normalizer": {
+            "seen-data": true
+        }
     },
     "berry": {
         "columns": {
@@ -55,7 +58,48 @@
         },
         "write_disposition": "append",
         "name": "berry",
-        "resource": "berry"
+        "resource": "berry",
+        "x-normalizer": {
+            "seen-data": true
+        }
+    },
+    "_dlt_version": {
+        "name": "_dlt_version",
+        "columns": {
+            "version": {
+                "name": "version",
+                "data_type": "bigint",
+                "nullable": false
+            },
+            "engine_version": {
+                "name": "engine_version",
+                "data_type": "bigint",
+                "nullable": false
+            },
+            "inserted_at": {
+                "name": "inserted_at",
+                "data_type": "timestamp",
+                "nullable": false
+            },
+            "schema_name": {
+                "name": "schema_name",
+                "data_type": "text",
+                "nullable": false
+            },
+            "version_hash": {
+                "name": "version_hash",
+                "data_type": "text",
+                "nullable": false
+            },
+            "schema": {
+                "name": "schema",
+                "data_type": "text",
+                "nullable": false
+            }
+        },
+        "write_disposition": "skip",
+        "resource": "_dlt_version",
+        "description": "Created by DLT. Tracks schema updates"
     },
     "location": {
         "columns": {
@@ -84,10 +128,48 @@
         },
         "write_disposition": "append",
         "name": "location",
-        "resource": "location"
+        "resource": "location",
+        "x-normalizer": {
+            "seen-data": true
+        }
     },
     "_dlt_pipeline_state": {
         "columns": {
+            "version": {
+                "name": "version",
+                "data_type": "bigint",
+                "nullable": false
+            },
+            "engine_version": {
+                "name": "engine_version",
+                "data_type": "bigint",
+                "nullable": false
+            },
+            "pipeline_name": {
+                "name": "pipeline_name",
+                "data_type": "text",
+                "nullable": false
+            },
+            "state": {
+                "name": "state",
+                "data_type": "text",
+                "nullable": false
+            },
+            "created_at": {
+                "name": "created_at",
+                "data_type": "timestamp",
+                "nullable": false
+            },
+            "version_hash": {
+                "name": "version_hash",
+                "data_type": "text",
+                "nullable": true
+            },
+            "_dlt_load_id": {
+                "name": "_dlt_load_id",
+                "data_type": "text",
+                "nullable": false
+            },
             "_dlt_id": {
                 "name": "_dlt_id",
                 "data_type": "text",
@@ -99,6 +181,42 @@
         "write_disposition": "append",
         "file_format": "preferred",
         "name": "_dlt_pipeline_state",
-        "resource": "_dlt_pipeline_state"
+        "resource": "_dlt_pipeline_state",
+        "x-normalizer": {
+            "seen-data": true
+        }
+    },
+    "_dlt_loads": {
+        "name": "_dlt_loads",
+        "columns": {
+            "load_id": {
+                "name": "load_id",
+                "data_type": "text",
+                "nullable": false
+            },
+            "schema_name": {
+                "name": "schema_name",
+                "data_type": "text",
+                "nullable": true
+            },
+            "status": {
+                "name": "status",
+                "data_type": "bigint",
+                "nullable": false
+            },
+            "inserted_at": {
+                "name": "inserted_at",
+                "data_type": "timestamp",
+                "nullable": false
+            },
+            "schema_version_hash": {
+                "name": "schema_version_hash",
+                "data_type": "text",
+                "nullable": true
+            }
+        },
+        "write_disposition": "skip",
+        "resource": "_dlt_loads",
+        "description": "Created by DLT. Tracks completed loads"
     }
 }
load/loaded/1751953713.128814/load_package_state.json

extract()normalize() 時点と同じです。

.insert_values ファイル

normalize() で生成されたものと同じです。

おわりに

まだ、スッキリしない部分があるかもしれませんが、dlt は、Python のライブラリでしかないので、実行時に、デバッガーで内部まで追跡することも可能です。前回の記事のようにセットアップして、今回の記事で取り上げたローカルマシンの作業フォルダーも見ながら、デバッガーで追跡するとだんだんわかった気分になれます。

GitHubで編集を提案
株式会社ROBONの技術ブログ

Discussion