Open3

Python Tips

DaiDai

配列から2要素ずつ取得してネストした配列を作る

def __nest_points(points: list) -> list:
    if not points:
        return []
    """
    points: [x1,y1,x2,y2]
    return: points: [[x1,y1], [x2,y2]]
    """
    it = iter(points)
    return [[x, y] for x, y in zip(it, it)]
DaiDai

SQLAlchemy(MySQL)

Relationについて

https://poyo.hatenablog.jp/entry/2017/01/08/212227

scoped_sessionとは

https://mee.hatenablog.com/entry/2018/09/04/235509

Relationのあるテーブルごと更新

task.pyとcontent.pyがあるとする。

task.py
from datetime import datetime

from sqlalchemy import Column, String, Enum, DateTime, Boolean
from sqlalchemy.orm import relationship

from app.db import Base
from app.services.utils import string_util
from app.types.const import TaskStatus


class Task(Base):
    __tablename__ = "tasks"

    id = Column(String, primary_key=True, default=string_util.str_uuid)
    name = Column(String) # Unique Key
    status = Column(Enum(TaskStatus), default=TaskStatus.registered)
    is_priority = Column(Boolean, default=False)
    created_at = Column(DateTime, default=datetime.now)
    updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now)

    contents = relationship("Content", back_populates="task")
content.py
from datetime import datetime

from sqlalchemy import ForeignKey, Column, Integer, String, Enum, DateTime
from sqlalchemy.orm import relationship
from sqlalchemy.dialects.mysql import DOUBLE

from app.db import Base
from app.services.utils import string_util
from app.types.const import StorageType


class Content(Base):
    __tablename__ = "contents"

    id = Column(String, primary_key=True, default=string_util.str_uuid)
    task_id = Column(String, ForeignKey("tasks.id"))
    storage_type = Column(Enum(StorageType), default=StorageType.local)
    name = Column(String)
    size = Column(Integer, default=0)
    created_at = Column(DateTime, default=datetime.now)
    updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now)

    task = relationship("Task", back_populates="contents")

このように更新すると、同時に作られ外部キーであるContent#task_idは新しく生成されるTask#idに自動で紐づいてくれる。

main.py
content = Content(
    storage_type=StorageType.zip,
    name=filename,
    size=filesize,
)
self._session.add(Task(
    name=filename,
    status=status,
    is_priority=is_priority,
    contents=[content])
)

Bulk Insert Ignore(既にあるレコードは無視する)

main.py
insert_ignore_stmt = insert(Task).prefix_with("IGNORE")
self._session.execute(insert_ignore_stmt, [{
    "name": filename1,
    "status": status1,
    "is_priority": is_priority1
},{
    "name": filename2,
    "status": status2,
    "is_priority": is_priority2
}])

Bulk Upsert

main.py
from sqlalchemy.dialects.mysql import insert

insert_stmt = insert(Task).values([{
    "name": filename1,
    "status": status1,
    "is_priority": is_priority1
},{
    "name": filename2,
    "status": status2,
    "is_priority": is_priority2
}])
# name が Unique Key なのでそれが重複したときに指定された status, is_priority が更新される
on_duplicate_key_stmt = insert_stmt.on_duplicate_key_update(
    status=insert_stmt.inserted.status,
    is_priority=insert_stmt.inserted.is_priority
)
self._session.execute(on_duplicate_key_stmt)

Upsert(既存のカラムの値に加算する)

main.py
workspace_usage_value = {
    "workspace_id": workspace_id,
    "year": year,
    "month": month,
    "uploaded_content_size": content_size_chunk,
    "uploaded_content_count": len(content_values),
}
insert_stmt = insert(WorkspaceUsage).values(
    workspace_usage_value)
on_duplicate_key_stmt = insert_stmt.on_duplicate_key_update(
    uploaded_content_size=WorkspaceUsage.uploaded_content_size +
    insert_stmt.inserted.uploaded_content_size,
    uploaded_content_count=WorkspaceUsage.uploaded_content_count +
    insert_stmt.inserted.uploaded_content_count
)
self._session.execute(on_duplicate_key_stmt)
self._session.commit()

created_at と updated_at

下記のように定義する。

task.py
from datetime import datetime

from sqlalchemy import Column, String, Enum, DateTime, Boolean
from sqlalchemy.orm import relationship

from app.db import Base
from app.services.utils import string_util
from app.types.const import TaskStatus


class Task(Base):
    __tablename__ = "tasks"

    id = Column(String, primary_key=True, default=string_util.str_uuid)
    name = Column(String) # Unique Key
    status = Column(Enum(TaskStatus), default=TaskStatus.registered)
    is_priority = Column(Boolean, default=False)
    created_at = Column(DateTime, default=datetime.now)
    updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now)

    contents = relationship("Content", back_populates="task")

SQL出力

print.py
from sqlalchemy.dialects import mysql

print(query.statement.compile(
    dialect=mysql.dialect(),
    compile_kwargs={"literal_binds": True}))