🔖

Pythonを使ってRDFデータをDydraに登録する

2024/07/26に公開

概要

Pythonを使ってRDFデータをDydraに登録するライブラリを作成しました。

https://github.com/nakamura196/dydra-py

中途半端な実装が含まれますが、お役に立つ場面があれば幸いです。

工夫

インポートは以下で行なっています。

https://github.com/nakamura196/dydra-py/blob/main/dydra_py/api.py#L55

以下のように、SPARQLのINSERT DATA オペレーションを使用しています。

    def import_by_file(self, file_path, format, graph_uri=None, verbose=False):
        """
        Imports RDF data from a file into the Dydra store.

        Args:
            file_path (str): The path to the RDF file to import.
            format (str): The format of the RDF file (e.g., 'xml', 'nt').
            graph_uri (str, optional): URI of the graph where data will be inserted. Defaults to None.
        """

        headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/sparql-update"
        }

        files = self._chunk_rdf_file(file_path, format=format)

        print("Number of chunks: ", len(files))

        for file in tqdm(files):

            # RDFファイルの読み込み
            graph = rdflib.Graph()
            graph.parse(file, format=format)  # フォーマットはファイルに応じて変更

            nt_data = graph.serialize(format='nt')

            if graph_uri is None:
                query = f"""
                INSERT DATA {{
                {nt_data}
                }}
                """

            else:
                query = f"""
                INSERT DATA {{
                GRAPH <{graph_uri}> {{
                    {nt_data}
                }}
                }}
                """

            if verbose:
                print(query)

            response = requests.post(self.endpoint, data=query, headers=headers)
            if response.status_code == 200:
                print("Data successfully inserted.")
            else:
                print(f"Error: {response.status_code} {response.text}")

工夫

工夫した点として、サイズが大きいRDFファイルを一度にアップロードした際、プロセスが途中で止まってしまうケースがありました。

そこで、以下のようにRDFファイルを分割する処理を加えて、複数回アップロードするようにしています。

    def _chunk_rdf_file(self, input_file, output_dir=None, chunk_size=500*1024, format='turtle'):
        if output_dir is None:
            output_dir = tempfile.mkdtemp()

        self.output_dir = output_dir

        self._clear_output_dir(output_dir)

        """RDF ファイルを指定されたサイズのチャンクに分割する"""
        g = Graph()
        g.parse(input_file, format=format)

        current_chunk = Graph()
        current_size = 0
        chunk_index = 1

        count = 0

        files = []

        print("Triple count: ", len(g))

        for s, p, o in tqdm(g):
            count += 1

            # 一時的なグラフにトリプルを追加
            current_chunk.add((s, p, o))

            if count % 1000 == 0:

                # シリアライズしてサイズを確認
                temp_serialized = current_chunk.serialize(format=format)
                temp_size = len(temp_serialized)

                # 現在のチャンクサイズが設定値を超えたらシリアライズしてファイルに保存
                if temp_size >= chunk_size:
                    path = self._serialize_chunk(current_chunk, chunk_index, format=format)
                    files.append(path)
                    current_chunk = Graph()  # 新しいグラフを開始
                    chunk_index += 1

        # 最後のチャンクを保存
        if len(current_chunk) > 0:
            path = self._serialize_chunk(current_chunk, chunk_index, format=format)
            files.append(path)

        return files

まとめ

RDFやDydraの利用にあたり、参考になりましたら幸いです。

Discussion