Open3
Python Tips
配列から2要素ずつ取得してネストした配列を作る
def __nest_points(points: list) -> list:
if not points:
return []
"""
points: [x1,y1,x2,y2]
return: points: [[x1,y1], [x2,y2]]
"""
it = iter(points)
return [[x, y] for x, y in zip(it, it)]
SQLAlchemy(MySQL)
Relationについて
scoped_sessionとは
Relationのあるテーブルごと更新
task.pyとcontent.pyがあるとする。
task.py
from datetime import datetime
from sqlalchemy import Column, String, Enum, DateTime, Boolean
from sqlalchemy.orm import relationship
from app.db import Base
from app.services.utils import string_util
from app.types.const import TaskStatus
class Task(Base):
__tablename__ = "tasks"
id = Column(String, primary_key=True, default=string_util.str_uuid)
name = Column(String) # Unique Key
status = Column(Enum(TaskStatus), default=TaskStatus.registered)
is_priority = Column(Boolean, default=False)
created_at = Column(DateTime, default=datetime.now)
updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now)
contents = relationship("Content", back_populates="task")
content.py
from datetime import datetime
from sqlalchemy import ForeignKey, Column, Integer, String, Enum, DateTime
from sqlalchemy.orm import relationship
from sqlalchemy.dialects.mysql import DOUBLE
from app.db import Base
from app.services.utils import string_util
from app.types.const import StorageType
class Content(Base):
__tablename__ = "contents"
id = Column(String, primary_key=True, default=string_util.str_uuid)
task_id = Column(String, ForeignKey("tasks.id"))
storage_type = Column(Enum(StorageType), default=StorageType.local)
name = Column(String)
size = Column(Integer, default=0)
created_at = Column(DateTime, default=datetime.now)
updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now)
task = relationship("Task", back_populates="contents")
このように更新すると、同時に作られ外部キーであるContent#task_idは新しく生成されるTask#idに自動で紐づいてくれる。
main.py
content = Content(
storage_type=StorageType.zip,
name=filename,
size=filesize,
)
self._session.add(Task(
name=filename,
status=status,
is_priority=is_priority,
contents=[content])
)
Bulk Insert Ignore(既にあるレコードは無視する)
main.py
insert_ignore_stmt = insert(Task).prefix_with("IGNORE")
self._session.execute(insert_ignore_stmt, [{
"name": filename1,
"status": status1,
"is_priority": is_priority1
},{
"name": filename2,
"status": status2,
"is_priority": is_priority2
}])
Bulk Upsert
main.py
from sqlalchemy.dialects.mysql import insert
insert_stmt = insert(Task).values([{
"name": filename1,
"status": status1,
"is_priority": is_priority1
},{
"name": filename2,
"status": status2,
"is_priority": is_priority2
}])
# name が Unique Key なのでそれが重複したときに指定された status, is_priority が更新される
on_duplicate_key_stmt = insert_stmt.on_duplicate_key_update(
status=insert_stmt.inserted.status,
is_priority=insert_stmt.inserted.is_priority
)
self._session.execute(on_duplicate_key_stmt)
Upsert(既存のカラムの値に加算する)
main.py
workspace_usage_value = {
"workspace_id": workspace_id,
"year": year,
"month": month,
"uploaded_content_size": content_size_chunk,
"uploaded_content_count": len(content_values),
}
insert_stmt = insert(WorkspaceUsage).values(
workspace_usage_value)
on_duplicate_key_stmt = insert_stmt.on_duplicate_key_update(
uploaded_content_size=WorkspaceUsage.uploaded_content_size +
insert_stmt.inserted.uploaded_content_size,
uploaded_content_count=WorkspaceUsage.uploaded_content_count +
insert_stmt.inserted.uploaded_content_count
)
self._session.execute(on_duplicate_key_stmt)
self._session.commit()
created_at と updated_at
下記のように定義する。
task.py
from datetime import datetime
from sqlalchemy import Column, String, Enum, DateTime, Boolean
from sqlalchemy.orm import relationship
from app.db import Base
from app.services.utils import string_util
from app.types.const import TaskStatus
class Task(Base):
__tablename__ = "tasks"
id = Column(String, primary_key=True, default=string_util.str_uuid)
name = Column(String) # Unique Key
status = Column(Enum(TaskStatus), default=TaskStatus.registered)
is_priority = Column(Boolean, default=False)
created_at = Column(DateTime, default=datetime.now)
updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now)
contents = relationship("Content", back_populates="task")
SQL出力
print.py
from sqlalchemy.dialects import mysql
print(query.statement.compile(
dialect=mysql.dialect(),
compile_kwargs={"literal_binds": True}))
メモリ
【Python】メモリ・CPUの使用率を取得する(psutil)
main.py
import psutil
mem = psutil.virtual_memory()
print(mem.total / 1000 / 1000)
print(mem.used / 1000 / 1000)
print(mem.available / 1000 / 1000)