🌊
Prefectをminikube(kubernetes)にインストールする
要約
- minikubeを使えば手のひらPrefectを構築できます。
- PrefectのWork-Poolにk8sを使えます。
インストールする環境
- CPU: 3コア
- メモリ: 2GB
- OS: Debian系Linux Kernel 6.6.15 amd64
- ディスク: 15GBほど使います。minikube,Prefect周りのDockerイメージが主です。
構成概略
- ingressを用意しているのがポイント
- minikube tunnelを使う為、kindがLoadBalancerのingressを使ってPrefect環境はAs-Isを使いたかった
- 開発者はk8sのPrefectAPIに認証もTLSも無しでダイレクトアクセスできる
- セキュリティが気になるなら、さらに上位のingressを設置しよう!
- minikubeじゃなく本番でオンプレk8sを使う場合はNodePortでk8s外にLBを用意することをおすすめします。
環境構築
minikubeのインストールとkubernetesクラスタの作成、helmのインストール
minikube公式サイトの手順に従う
# minikubeのダウンロード
curl -LO https://storage.googleapis.com/minikube/releases/latest/minikube_latest_amd64.deb
# minikubeのインストール
sudo dpkg -i minikube_latest_amd64.deb
# minikubeの実行
minikube start
# rootで実行する場合は --force オプションをつけます
# ingressをホスト側に露出させるためのプラグインのインストール
minikube addons enable ingress
# kubectlを取得
sudo apt install kubectl
# バージョンの問題がある場合は alias kubectl="minikube kubectl --"
# kubectlの設定を確認(kubectlで使われる認証情報)
cat $HOME/.kube/config
# Helmのインストール
sudo apt install helm
Prefectをkubernetesにインストール
- 参考URL
# prefect リポジトリの追加
helm repo add prefect https://prefecthq.github.io/prefect-helm
helm repo update
# prefect用のnamespaceの作成
kubectl create ns prefect
## prefect-serverのダウンロードと実行
# ダウンロード
helm pull prefect/prefect-server --untar
# 実行
helm install prefect-server ./prefect-server \
--namespace prefect \
--set server.image.prefectTag=2.19.5-python3.11
# prefect-workerのダウンロードと実行
# ダウンロード
helm pull prefect/prefect-worker --untar
# 実行
helm install prefect-worker ./prefect-worker \
--namespace prefect \
--set worker.apiConfig=server \
--set worker.config.workPool=minikube \
--set worker.serverApiConfig.apiUrl=http://prefect-server:4200/api \
--set worker.serverApiConfig.uiUrl=http://prefect-server:4200/ \
--set worker.image.prefectTag=2.19.5-python3.11-kubernetes
# サービスの稼働を確認
kubectl get svc -n prefect
Nginx Ingress-Controllerをkubernetesにインストール (minikube addonを使用する事で、不要になりました!)
minikube addonを使用する事で、不要になりました!
- 参考URL: https://helm.nginx.com/
# nginxリポジトリの追加
helm repo add nginx-stable https://helm.nginx.com/stable
helm repo update
## ingress-nginxのダウンロードと実行
helm pull nginx-stable/nginx-ingress --untar
# 実行
helm install nginx-ingress ./nginx-ingress \
--namespace prefect
# ingressclassはnamespace関係ないですが。
# ingressclassの稼働を確認
kubectl get ingressclass
Nginx Ingressをデプロイ
# minikubeのクラスタIPのドメイン
DOMAIN_NAME=prefect.local
# ingress-nginx.yamlの作成
kubectl create ingress ingress-nginx --class=nginx \
--namespace prefect \
--rule="${DOMAIN_NAME}/*=prefect-server:4200" \
--dry-run=client -o yaml > ingress-nginx.yaml
# ingress-nginx.yamlのデプロイ
kubectl apply -f ingress-nginx.yaml
# 待ち受けホスト名がprefect.localとなっていることを確認
kubectl get ingress -n prefect
minikubeのクラスタIPにingressを公開
通常のkubernetesであれば、ここまで実施すればドメイン名を叩けばHTTPアクセス可能になるはずです。認証なしのガバガバ状態です。
しかし、minikubeで構築されたk8sは独自のIP名前空間に存在するので、通常は外部にサービスを公開できません。手動で公開する必要があります。
# 作成するingressへのトンネルの作成
minikube tunnel
DOMAIN_NAME=prefect.local
# prefect.localを/etc/hostsに登録する
sudo echo "`minikube ip` ${DOMAIN_NAME}" >> /etc/hosts
# ingressのADDRESSの箇所にminikube ipが入っている事を確認する
kubectl get ingress -n prefect
# もしingressの待受けIPがminikube ipではない場合はminikube addons enable ingressとminikube tunnelをもう一度実行してください。
ブラウザでPrefectのWebUIを開く
何もFlowを登録していないので空っぽです。
アプリケーションからPrefectのAPIを叩く
ホスト側からAPIを叩いてみます。
Prefectクライアントのインストール
sudo apt install python3-venv
python3 -m venv venv
. ./venv/bin/activate
pip install prefect==2.19.5
Prefectサーバへの接続
DOMAIN_NAME=prefect.local
prefect config set PREFECT_API_URL="http://${DOMAIN_NAME}/api"
Prefectフローを作成
Prefectフローを作成するフォルダを作成
mkdir -p prefect/flow
cd prefect/flow
サンプルPrefectフローを作成し、実行
sample.py
import httpx # an HTTP client library and dependency of Prefect
from prefect import flow, task
@task(retries=2)
def get_repo_info(repo_owner: str, repo_name: str):
"""Get info about a repo - will retry twice after failing"""
url = f"https://api.github.com/repos/{repo_owner}/{repo_name}"
api_response = httpx.get(url)
api_response.raise_for_status()
repo_info = api_response.json()
return repo_info
@task
def get_contributors(repo_info: dict):
"""Get contributors for a repo"""
contributors_url = repo_info["contributors_url"]
response = httpx.get(contributors_url)
response.raise_for_status()
contributors = response.json()
return contributors
@flow(log_prints=True)
def repo_info(repo_owner: str = "PrefectHQ", repo_name: str = "prefect"):
"""
Given a GitHub repository, logs the number of stargazers
and contributors for that repo.
"""
repo_info = get_repo_info(repo_owner, repo_name)
print(f"Stars 🌠 : {repo_info['stargazers_count']}")
contributors = get_contributors(repo_info)
print(f"Number of contributors 👷: {len(contributors)}")
if __name__ == "__main__":
repo_info()
python3 sample.py
WebUI側で実行した形跡が表示されている事を確認
http://prefect.local/ のFlowsを確認
PrefectフローをWorkerPool指定でデプロイメント
登録済みのWorkPoolを確認
prefect work-pool ls
minikubeという名前のWorkPoolがtype:kubernetesで用意されている事を確認
minikube上のkubernetesで先ほどのフローを実行する
deploy.py
from sample import repo_info #モジュールとしてflowをimport
from prefect.deployments import DeploymentImage
repo_info.deploy(
name="minikube_remote_deploy_with_Docker",
work_pool_name="minikube",
image=DeploymentImage(
name="repo_info_image",
tag="latest",
# dockerfile="Dockerfile"
),
push=False,
job_variables={"namespace":"prefect"},
)
eval $(minikube docker-env)
python deploy.py
minikube上のkubernetesで先ほどのフローをDockerfile指定で実行する
-
参考URL
-
Dockerfileを作成
Dockerfile
FROM prefecthq/prefect:2.19.5-python3.11
COPY requirements.txt .
RUN python -m pip install -r requirements.txt --trusted-host pypi.python.org --no-cache-dir
COPY . /opt/prefect/
WORKDIR /opt/prefect/
- requirements.txtを作成
requirements.txt
scapy
- deploy.pyを編集
deploy.py
from sample import repo_info #モジュールとしてflowをimport
from prefect.deployments import DeploymentImage
repo_info.deploy(
name="minikube_remote_deploy_with_Docker",
work_pool_name="minikube",
image=DeploymentImage(
name="repo_info_image",
tag="latest",
dockerfile="Dockerfile"
),
push=False,
job_variables={"namespace":"prefect"},
)
- sample.pyを編集
sample.py
import scapy #特に使わないけどimport
import httpx # an HTTP client library and dependency of Prefect
from prefect import flow, task
@task(retries=2)
def get_repo_info(repo_owner: str, repo_name: str):
"""Get info about a repo - will retry twice after failing"""
url = f"https://api.github.com/repos/{repo_owner}/{repo_name}"
api_response = httpx.get(url)
api_response.raise_for_status()
repo_info = api_response.json()
return repo_info
@task
def get_contributors(repo_info: dict):
"""Get contributors for a repo"""
contributors_url = repo_info["contributors_url"]
response = httpx.get(contributors_url)
response.raise_for_status()
contributors = response.json()
return contributors
@flow(log_prints=True)
def repo_info(repo_owner: str = "PrefectHQ", repo_name: str = "prefect"):
"""
Given a GitHub repository, logs the number of stargazers
and contributors for that repo.
"""
print("Scapy Version:",scapy.VERSION) #意味もなくprint
repo_info = get_repo_info(repo_owner, repo_name)
print(f"Stars 🌠 : {repo_info['stargazers_count']}")
contributors = get_contributors(repo_info)
print(f"Number of contributors 👷: {len(contributors)}")
if __name__ == "__main__":
repo_info()
- デプロイ
# pip installの分、時間がかかります。
python deploy.py
- フォルダ構成
.
├── Dockerfile
├── deploy.py
├── requirements.txt
└── sample.py
ここまでのまとめ
- minikubeのKubernetesを使ってPrefectを手元の端末で実行できた
- minikubeのKubernetesを使ってPrefectをKubernetes上で実行できた
- minikubeのKubernetesを使ってPrefectをKubrenetes上でDockerfileを指定しコンテナ指定で実行できた
ここまでの課題
- deployのたびに、何も変えてなくても、minikubeのDockerRegistoryに<none>っていうゴミイメージが大量生産される(多分オプションで回避できるが、CI/CDで
docker system prune
をするとよい)
備考: 作った環境のクリーンアップ手順
困ったら吹き飛ばせるのがminikubeの良いところ。
helm uninstall prefect-server -n prefect
helm uninstall prefect-worker -n prefect
helm repo remove prefect
helm repo update
rm -rf ./prefect-server
rm -rf ./prefect-worker
rm ingress-nginx.yaml
rm -rf ./prefect
kubectl delete ns prefect
minikube delete
deactivate
rm -rf ./venv
rm -rf ./.kube
rm -rf ./.minikube
rm -rf ./.prefect
# apt remove python3-venv kubectl minikube helm
# 別のシェルで
docker system prune
おまけ
prefect-workerがk8sに許可されているPolicy
意外と最小限だったり。
kubectl describe role prefect-worker -n prefect
Name: prefect-worker
Labels: app.kubernetes.io/component=worker
app.kubernetes.io/instance=prefect-worker
app.kubernetes.io/managed-by=Helm
app.kubernetes.io/name=prefect-worker
app.kubernetes.io/version=2.19.5
helm.sh/chart=prefect-worker-2024.6.13185354
prefect-version=2.19.5
Annotations: meta.helm.sh/release-name: prefect-worker
meta.helm.sh/release-namespace: prefect
PolicyRule:
Resources Non-Resource URLs Resource Names Verbs
--------- ----------------- -------------- -----
jobs.batch [] [] [get list watch create update patch delete]
events [] [] [get watch list]
pods/log [] [] [get watch list]
pods/status [] [] [get watch list]
pods [] [] [get watch list]
prefect-workerがk8sにbatchをdeployする時に使っているyaml
https://prefecthq.github.io/prefect-kubernetes/worker/ に記載の通り。
---
apiVersion: batch/v1
kind: Job
metadata:
labels: "{{ labels }}"
namespace: "{{ namespace }}"
generateName: "{{ name }}-"
spec:
ttlSecondsAfterFinished: "{{ finished_job_ttl }}"
template:
spec:
parallelism: 1
completions: 1
restartPolicy: Never
serviceAccountName: "{{ service_account_name }}"
containers:
- name: prefect-job
env: "{{ env }}"
image: "{{ image }}"
imagePullPolicy: "{{ image_pull_policy }}"
args: "{{ command }}"
Discussion