🌊

Prefectをminikube(kubernetes)にインストールする

2024/05/12に公開

要約

  • minikubeを使えば手のひらPrefectを構築できます。
  • PrefectのWork-Poolにk8sを使えます。

インストールする環境

  • CPU: 3コア
  • メモリ: 2GB
  • OS: Debian系Linux Kernel 6.6.15 amd64
  • ディスク: 15GBほど使います。minikube,Prefect周りのDockerイメージが主です。

構成概略

  • ingressを用意しているのがポイント
    • minikube tunnelを使う為、kindがLoadBalancerのingressを使ってPrefect環境はAs-Isを使いたかった
    • 開発者はk8sのPrefectAPIに認証もTLSも無しでダイレクトアクセスできる
      • セキュリティが気になるなら、さらに上位のingressを設置しよう!
    • minikubeじゃなく本番でオンプレk8sを使う場合はNodePortでk8s外にLBを用意することをおすすめします。

環境構築

minikubeのインストールとkubernetesクラスタの作成、helmのインストール

minikube公式サイトの手順に従う

# minikubeのダウンロード
curl -LO https://storage.googleapis.com/minikube/releases/latest/minikube_latest_amd64.deb

# minikubeのインストール
sudo dpkg -i minikube_latest_amd64.deb

# minikubeの実行
minikube start
# rootで実行する場合は --force オプションをつけます

# ingressをホスト側に露出させるためのプラグインのインストール
minikube addons enable ingress

# kubectlを取得
sudo apt install kubectl
# バージョンの問題がある場合は alias kubectl="minikube kubectl --"

# kubectlの設定を確認(kubectlで使われる認証情報)
cat $HOME/.kube/config

# Helmのインストール
sudo apt install helm

Prefectをkubernetesにインストール

# prefect リポジトリの追加
helm repo add prefect https://prefecthq.github.io/prefect-helm
helm repo update

# prefect用のnamespaceの作成
kubectl create ns prefect

## prefect-serverのダウンロードと実行
# ダウンロード
helm pull prefect/prefect-server --untar
# 実行
helm install prefect-server ./prefect-server \
--namespace prefect \
--set server.image.prefectTag=2.19.5-python3.11

# prefect-workerのダウンロードと実行
# ダウンロード
helm pull prefect/prefect-worker --untar
# 実行
helm install prefect-worker ./prefect-worker \
--namespace prefect \
--set worker.apiConfig=server \
--set worker.config.workPool=minikube \
--set worker.serverApiConfig.apiUrl=http://prefect-server:4200/api \
--set worker.serverApiConfig.uiUrl=http://prefect-server:4200/ \
--set worker.image.prefectTag=2.19.5-python3.11-kubernetes

# サービスの稼働を確認
kubectl get svc -n prefect

Nginx Ingress-Controllerをkubernetesにインストール (minikube addonを使用する事で、不要になりました!)

minikube addonを使用する事で、不要になりました!
# nginxリポジトリの追加
helm repo add nginx-stable https://helm.nginx.com/stable
helm repo update

## ingress-nginxのダウンロードと実行
helm pull nginx-stable/nginx-ingress --untar

# 実行
helm install nginx-ingress ./nginx-ingress \
--namespace prefect
# ingressclassはnamespace関係ないですが。

# ingressclassの稼働を確認
kubectl get ingressclass

Nginx Ingressをデプロイ

# minikubeのクラスタIPのドメイン
DOMAIN_NAME=prefect.local

# ingress-nginx.yamlの作成
kubectl create ingress ingress-nginx --class=nginx \
--namespace prefect \
--rule="${DOMAIN_NAME}/*=prefect-server:4200" \
--dry-run=client -o yaml > ingress-nginx.yaml

# ingress-nginx.yamlのデプロイ
kubectl apply -f ingress-nginx.yaml

# 待ち受けホスト名がprefect.localとなっていることを確認
kubectl get ingress -n prefect

minikubeのクラスタIPにingressを公開

通常のkubernetesであれば、ここまで実施すればドメイン名を叩けばHTTPアクセス可能になるはずです。認証なしのガバガバ状態です。

しかし、minikubeで構築されたk8sは独自のIP名前空間に存在するので、通常は外部にサービスを公開できません。手動で公開する必要があります。

# 作成するingressへのトンネルの作成
minikube tunnel
DOMAIN_NAME=prefect.local
# prefect.localを/etc/hostsに登録する
sudo echo "`minikube ip` ${DOMAIN_NAME}" >> /etc/hosts

# ingressのADDRESSの箇所にminikube ipが入っている事を確認する
kubectl get ingress -n prefect
# もしingressの待受けIPがminikube ipではない場合はminikube addons enable ingressとminikube tunnelをもう一度実行してください。

ブラウザでPrefectのWebUIを開く

http://prefect.local/ を開く

dashboard.png

何もFlowを登録していないので空っぽです。

アプリケーションからPrefectのAPIを叩く

ホスト側からAPIを叩いてみます。

Prefectクライアントのインストール

sudo apt install python3-venv
python3 -m venv venv
. ./venv/bin/activate
pip install prefect==2.19.5

Prefectサーバへの接続

DOMAIN_NAME=prefect.local
prefect config set PREFECT_API_URL="http://${DOMAIN_NAME}/api"

Prefectフローを作成

Prefectフローを作成するフォルダを作成

mkdir -p prefect/flow
cd prefect/flow

サンプルPrefectフローを作成し、実行

sample.py
import httpx   # an HTTP client library and dependency of Prefect
from prefect import flow, task


@task(retries=2)
def get_repo_info(repo_owner: str, repo_name: str):
    """Get info about a repo - will retry twice after failing"""
    url = f"https://api.github.com/repos/{repo_owner}/{repo_name}"
    api_response = httpx.get(url)
    api_response.raise_for_status()
    repo_info = api_response.json()
    return repo_info


@task
def get_contributors(repo_info: dict):
    """Get contributors for a repo"""
    contributors_url = repo_info["contributors_url"]
    response = httpx.get(contributors_url)
    response.raise_for_status()
    contributors = response.json()
    return contributors


@flow(log_prints=True)
def repo_info(repo_owner: str = "PrefectHQ", repo_name: str = "prefect"):
    """
    Given a GitHub repository, logs the number of stargazers
    and contributors for that repo.
    """
    repo_info = get_repo_info(repo_owner, repo_name)
    print(f"Stars 🌠 : {repo_info['stargazers_count']}")

    contributors = get_contributors(repo_info)
    print(f"Number of contributors 👷: {len(contributors)}")


if __name__ == "__main__":
    repo_info()
python3 sample.py

WebUI側で実行した形跡が表示されている事を確認

http://prefect.local/ のFlowsを確認

result_local.png

PrefectフローをWorkerPool指定でデプロイメント

登録済みのWorkPoolを確認

prefect work-pool ls

minikubeという名前のWorkPoolがtype:kubernetesで用意されている事を確認

minikube上のkubernetesで先ほどのフローを実行する

deploy.py
from sample import repo_info #モジュールとしてflowをimport
from prefect.deployments import DeploymentImage

repo_info.deploy(
    name="minikube_remote_deploy_with_Docker",
    work_pool_name="minikube",
    image=DeploymentImage(
        name="repo_info_image",
        tag="latest",
#        dockerfile="Dockerfile"
    ),
    push=False,
    job_variables={"namespace":"prefect"}, 
)
eval $(minikube docker-env)
python deploy.py 

minikube上のkubernetesで先ほどのフローをDockerfile指定で実行する

Dockerfile
FROM prefecthq/prefect:2.19.5-python3.11
COPY requirements.txt .
RUN python -m pip install -r requirements.txt --trusted-host pypi.python.org --no-cache-dir
COPY . /opt/prefect/
WORKDIR /opt/prefect/
  • requirements.txtを作成
requirements.txt
scapy
  • deploy.pyを編集
deploy.py
from sample import repo_info #モジュールとしてflowをimport
from prefect.deployments import DeploymentImage

repo_info.deploy(
    name="minikube_remote_deploy_with_Docker",
    work_pool_name="minikube",
    image=DeploymentImage(
        name="repo_info_image",
        tag="latest",
        dockerfile="Dockerfile"
    ),
    push=False,
    job_variables={"namespace":"prefect"}, 
)
  • sample.pyを編集
sample.py
import scapy #特に使わないけどimport
import httpx   # an HTTP client library and dependency of Prefect
from prefect import flow, task


@task(retries=2)
def get_repo_info(repo_owner: str, repo_name: str):
    """Get info about a repo - will retry twice after failing"""
    url = f"https://api.github.com/repos/{repo_owner}/{repo_name}"
    api_response = httpx.get(url)
    api_response.raise_for_status()
    repo_info = api_response.json()
    return repo_info


@task
def get_contributors(repo_info: dict):
    """Get contributors for a repo"""
    contributors_url = repo_info["contributors_url"]
    response = httpx.get(contributors_url)
    response.raise_for_status()
    contributors = response.json()
    return contributors


@flow(log_prints=True)
def repo_info(repo_owner: str = "PrefectHQ", repo_name: str = "prefect"):
    """
    Given a GitHub repository, logs the number of stargazers
    and contributors for that repo.
    """
    print("Scapy Version:",scapy.VERSION) #意味もなくprint
    repo_info = get_repo_info(repo_owner, repo_name)
    print(f"Stars 🌠 : {repo_info['stargazers_count']}")

    contributors = get_contributors(repo_info)
    print(f"Number of contributors 👷: {len(contributors)}")

if __name__ == "__main__":
    repo_info()
  • デプロイ
# pip installの分、時間がかかります。
python deploy.py
  • フォルダ構成
.
├── Dockerfile
├── deploy.py
├── requirements.txt
└── sample.py

ここまでのまとめ

  • minikubeのKubernetesを使ってPrefectを手元の端末で実行できた
  • minikubeのKubernetesを使ってPrefectをKubernetes上で実行できた
  • minikubeのKubernetesを使ってPrefectをKubrenetes上でDockerfileを指定しコンテナ指定で実行できた

ここまでの課題

  • deployのたびに、何も変えてなくても、minikubeのDockerRegistoryに<none>っていうゴミイメージが大量生産される(多分オプションで回避できるが、CI/CDでdocker system pruneをするとよい)

備考: 作った環境のクリーンアップ手順

困ったら吹き飛ばせるのがminikubeの良いところ。

helm uninstall prefect-server -n prefect
helm uninstall prefect-worker -n prefect
helm repo remove prefect
helm repo update
rm -rf ./prefect-server
rm -rf ./prefect-worker
rm ingress-nginx.yaml
rm -rf ./prefect
kubectl delete ns prefect
minikube delete
deactivate
rm -rf ./venv
rm -rf ./.kube
rm -rf ./.minikube
rm -rf ./.prefect
# apt remove python3-venv kubectl minikube helm
# 別のシェルで
docker system prune

おまけ

prefect-workerがk8sに許可されているPolicy

意外と最小限だったり。

kubectl describe role prefect-worker -n prefect
Name:         prefect-worker
Labels:       app.kubernetes.io/component=worker
              app.kubernetes.io/instance=prefect-worker
              app.kubernetes.io/managed-by=Helm
              app.kubernetes.io/name=prefect-worker
              app.kubernetes.io/version=2.19.5
              helm.sh/chart=prefect-worker-2024.6.13185354
              prefect-version=2.19.5
Annotations:  meta.helm.sh/release-name: prefect-worker
              meta.helm.sh/release-namespace: prefect
PolicyRule:
  Resources    Non-Resource URLs  Resource Names  Verbs
  ---------    -----------------  --------------  -----
  jobs.batch   []                 []              [get list watch create update patch delete]
  events       []                 []              [get watch list]
  pods/log     []                 []              [get watch list]
  pods/status  []                 []              [get watch list]
  pods         []                 []              [get watch list]

prefect-workerがk8sにbatchをdeployする時に使っているyaml

https://prefecthq.github.io/prefect-kubernetes/worker/ に記載の通り。

---
apiVersion: batch/v1
kind: Job
metadata:
labels: "{{ labels }}"
namespace: "{{ namespace }}"
generateName: "{{ name }}-"
spec:
ttlSecondsAfterFinished: "{{ finished_job_ttl }}"
template:
    spec:
    parallelism: 1
    completions: 1
    restartPolicy: Never
    serviceAccountName: "{{ service_account_name }}"
    containers:
    - name: prefect-job
        env: "{{ env }}"
        image: "{{ image }}"
        imagePullPolicy: "{{ image_pull_policy }}"
        args: "{{ command }}"

Discussion