PythonからPostgreSQLにCSVインポートするときハマった話
背景
筆者はいままで参照権限しか与えられずDDL知識は半端な状態。
晴れてWRITE権限を貰えたが、ETLツールは使用できない環境。
postgres.exeもインストール禁止の環境。
それでも自動化しなければチームの業務も滞る。
その時のトライアンドエラーを記録する。
DBクライアントツールはDBeaverを使用。
先に結論
close()も大事だけど、commit()も大事
「PostgreSQL Python インサートできない」
この検索ワードで辿り着いた方は、先にcommit()されているかを確認していただきたい。
権限の確認もする
copy_fromでのインポートはスーパーユーザー以外の権限ではできない。
今回はスーパーユーザーではない場合の方法。
DBeaverからのインサート(手動)
まずは確認用に、ツールからのインサート方法を記載する。
自動化コードが見たい方は飛ばして。
比較的シェアの高いDBクライアントツールで、
テーブル作成、カラム追加などがノーコードででき、テーブル検索できるため
使いやすいが、癖のある部分もあり好き嫌いは分かれそう。
そんなビーバー🐭からの手動インサート方法は以下の通り。
データベース(D)>Tasks>CreateNewTask...
ポップアップが現れるため任意の名前を付け、「Data import」を選択する。
次へ進み、投入先のテーブルを指定する。
次へ進み、投入するデータ形式を選択する。(今回はCSV)
次へ進み、CSVファイルを選択する。
file(s)とあるが、複数ファイルの選択ができない様子。
まだ調べが足りないかもしれない。
次へ進み、マッピングを確認し問題なければ続行、完了まで待つ。
同じフォルダ内の同じ形式の名前のファイルが更新されたら差分更新してくれたらいいが、
まだ勉強不足でできるのかどうか、わからない。
Pythonからインサート
接続確認
PostgreSQLをPythonから操作する場合は「psycopg2」を使用する。
pip install psycopg2
インストール完了したら、以下の情報を先に入れておく。
import psycopg2
from psycopg2 import Error
conn = psycopg2.connect('postgresql://{user}:{password}@{host}:{port}/{dbname}'.format(
user="str", #ユーザ
password="str", #パスワード
host="str", #ホスト名
port="5432", #ポート
dbname="str") #データベース名
)
さらに、接続できるか確認する。
cursor = conn.cursor()
# 任意のSQL
cursor.execute("SELECT * FROM table;")
result = cursor.fetchone()
print('成功!', result[0])
cursor.close()
conn.close()
成功したら次へ。
エラーが出たらCloseするようにする
暫くデバッグするので、エラーが起きてもCloseできるようにする。
ここまで、すべてを合わせると以下のようになる。
import psycopg2
from psycopg2 import Error
conn = psycopg2.connect('postgresql://{user}:{password}@{host}:{port}/{dbname}'.format(
user="str", #ユーザ
password="str", #パスワード
host="str", #ホスト名
port="5432", #ポート
dbname="str") #データベース名
)
try:
cursor = conn.cursor()
# 任意のSQL
cursor.execute("SELECT * FROM table;")
result = cursor.fetchone()
print('成功!', result[0])
except(Exception, Error) as error:
print("エラー発生!:",error)
finally:
cursor.close()
conn.close()
簡単なテストテーブルで試す
まずは簡単なテーブルを用意して、INSERTされるか確認する。
「testschema」スキーマに「newtable」というテーブル、違うデータ型の列を3つ持たせた。
datetimeは日付時間型、column1は文字列型、column2は整数型。
次に先ほどのコードの「except(Exception, Error) as error:」の上に、以下を記載する。
cursor.executemany(
'''INSERT INTO testschema.newtable (datetime, column1, column2)
VALUES ('2023-03-02 09:30:05', 'a', 1)''')
conn.commit()
print('成功!')
実行する。
データ型の問題
これで問題なくインサートが成功したら、CSVを1行ずつ取り込むようにする。
今回は以下のようなものを用意した。
Excelから
同じデータをサクラエディタから
設定の甘いPythonから吐き出されたCSVを想定しているため、
敢えて3列目は文字列に設定、Index列を持たせ、文字コードはUTF-8に設定した。
「filepath」の変数を用意し、使用するCSVのPathを入れる。
filepath = "C:\\Users\\デスクトップ\\test_csv.csv"
CSVを開き、1行ずつ読み込む。Index列は読み込まないためpopする。
先ほどのコードの「print('成功!')」の下に以下を入れるが、
このまま実行すると、エラーが出るのだ。
import csv
open_csv = open(filepath, encoding='utf-8')
doc = [row for row in csv.reader(open_csv)]
for row in doc:
row.pop(0)
cursor.executemany(
'''INSERT INTO testschema.newtable (datetime, column1, column2)
VALUES (%s, %s, %s)''', row
)
conn.commit()
print('成功!')
2023-03-01 09:30:05
エラー発生!: invalid input syntax for type integer: "1.0"
LINE 2: VALUES ('2023-03-03 09:30:05', 'a', '1.0')
^
それはそうで、int/floatの列に「1.0」という文字列が入るからである。
初めは「%s」を「%d」にするのか?と悩んだが、「'」もあり変換できない。
そのため以下のようにした。
open_csv = open(filepath, encoding='utf-8')
doc = [row for row in csv.reader(open_csv)]
for row in doc:
row.pop(0)
cursor.execute(
'''INSERT INTO testschema.newtable (datetime, column1, column2)
VALUES (\''''+row[0]+'''\', \''''+row[1]+'''\', '''+str(float(row[2]))+''')'''
)
conn.commit()
print('成功!')
文字列の列には「'」、数値の列はstr変換するが「'」はないようにする。
実行すると、CSVの内容が格納されていることが確認できた。
欠損値の対応
文字列をfloatに変換はできたが、欠損していた場合はできないのでifで変換した。
r2 = ['null' if pd.isnull(row[2]) or row[2] is None else '\''+str(row[2])+'\'']
すべてを合わせたもの
抽出確認、インサート確認はコメントアウトしている。
import psycopg2
from psycopg2 import Error
import csv
conn = psycopg2.connect('postgresql://{user}:{password}@{host}:{port}/{dbname}'.format(
user="str", #ユーザ
password="str", #パスワード
host="str", #ホスト名
port="5432", #ポート
dbname="str") #データベース名
)
filepath = "C:\\Users\\デスクトップ\\test_csv.csv"
try:
cursor = conn.cursor()
## データ抽出用
# cursor.execute("SELECT * FROM tmp_schema.newtable;")
# result = cursor.fetchone()
# print(result[0])
## インサート確認用
# cursor.execute(
# '''INSERT INTO testschema.newtable (datetime, column1, column2)
# VALUES ('2023-03-03 09:30:05', 'a', 1)''')
# conn.commit()
# print('成功!')
# CSV file Open
open_csv = open(filepath, encoding='utf-8')
read_csv = csv.reader(open_csv)
nextrow = next(read_csv) # ヘッダーは飛ばす
doc = [row for row in csv.reader(open_csv)]
# insert
for row in doc:
row.pop(0) # Index列を飛ばす
r2 = ['null' if pd.isnull(row[2]) or row[2] is None else str(row[2])]
r0 = ['null' if pd.isnull(row[0]) or row[0] is None else '\''+str(row[0])+'\'']
cursor.execute(
'''INSERT INTO tmp_schema.newtable (datetime, column1, column2)
VALUES ('''+r0[0]+''', \''''+row[1]+'''\', '''+r2[0]+''')'''
)
# commit and notification
conn.commit()
print('成功!')
except(Exception, Error) as error:
print("エラー発生!:",error)
finally:
cursor.close()
conn.close()
結果(感想)
そもそも、CSVのデータ型をどうにかすればいい!という気がする😅
50列とかある場合はどうするんだか。
executemanyも使用していたが今回はやめておいた。
業務で使わない範囲の知識は本を読むだけではなかなか身につかない。
CLOSE?COMMIT?バルクインサート?ストアドプロシージャ?
名前は知ってるのに、がつがつ書けない、動かない…
寂しくもなり、まだまだ勉強しようと思った😢
Discussion