Open14

0からのKubernetes勉強めも

Yuji MiyashitaYuji Miyashita

概要

  • Kubernetesを略すと k8s
  • ひたすら公式ドキュメントの設定値を読んで、ひたすらyamlに設定を書いていく開発
  • k8sのリソースは、以下の3パターンで作成していく
    • 標準リソース (DeploymentとかServiceとか) でyaml定義
    • カスタムリソース定義 (SparkApplicationとかCronWorkflowとか) を使ってyaml定義
    • helm経由でパッケージをインストールし、 values.yaml を使って値だけ上書き
  • k8sクラスター内が一目でわかる Lens は入れておいた方が良い
  • こっち からログインすること
  • ローカルのMacでk8sを試すには minikube を使う
    • 他にも種類はあるが、プラグインの充実さからminikubeがやりやすい
  • minikubeコマンドとkubectlの違いは以下の通り
    • kubectl: k8sクラスターとのやりとり
    • minikube: ローカル環境で単一ノードのKubernetesクラスタを起動

その他

  • ローカルで複数のリソースを組み合わせての実行はマシン強化が必須、minikubeが耐えられない
Hidden comment
Yuji MiyashitaYuji Miyashita

標準リソース

Pod

  • ボリュームを指定、共有できる
  • 各Podごとに、ユニークなIPアドレスが割り当てられている
  • Pod内のすべてのコンテナは、IPアドレスとネットワークポートを含むネットワーク名前空間を共有する
  • 同一Pod内のコンテナは、localhostを使用して他のコンテナと通信できる
  • 異なるPod上で実行中のコンテナ間でやり取りをしたい場合は、IPネットワークを使用して通信できる

Service

  • svc と略されることがある
  • 通常、PodはKubernetesクラスター内部のIPアドレスからのみアクセス可
  • 何もしないとPodに対して、外部からアクセスできない
  • 外部からアクセスできるようにするために、 Service としてPodを公開する必要がある
  • ロードバランサーをサポートするクラウドプロバイダーでは、Serviceにアクセスするための外部IPアドレスが提供される
  • minikube では、LoadBalancerタイプはminikube serviceコマンドを使用した接続可能なServiceを作成
  • 開発してて、localhost:xxxxでアクセスしたいものには必ずServiceが存在する
    • minio
    • kafka
    • argo cd
  • 複数ポートを公開することもできる
  • Type
    • ClusterIP
      • クラスター内部のIPでServiceを公開
    • NodePort
      • 各NodeのIPにて、静的なポート上でServiceを公開
      • 転送先であるClusterIP Serviceが自動生成される
      • [NodeIP]:[NodePort] にてアクセス可能
    • LoadBalancer
      • ロードバランサを使用して、Serviceを公開
      • 転送先であるNodePort ServiceとClusterIP Serviceが自動生成される
  • minikube tunnelは、LoadBalancerのtypeをNodePortに調整してくれるようなもの

Deployment

  • ReplicaSet (rs) のあるべき姿を記述
    • 何個立ち上げるか
    • どのイメージを使用するか
    • rsにどんな名前をつけるか
  • ReplicaSetを単独で作成するケースは稀、Deployment経由でReplicaSetを定義
  • それぞれに定義するもの
    • metadata: リソースに関するメタデータ
    • spec: リソースのあるべき姿
    • template: Pod個々のあるべき姿
  • Deploymentの全てのロールアウト履歴は、いつでもロールバックできるようにデフォルトでシステムに保持されている
Yuji MiyashitaYuji Miyashita

マニフェストの見方

apiVersion: apps/v1 # `kubectl api-resources | development` のバージョンを指定
kind: Deployment # 標準リソース名(Service, Deploymentなど) or カスタムリソース名(SparkApplication, kafkaなど)
metadata:
  name: nginx-deployment # という名前のDeploymentが作成、ReplicaSetは `nginx-deployment-xxx`
spec: # 作成されるReplicaSetのあるべき姿を定義
  replicas: 3 # つのReplicaSetを作成
  selector:
    matchLabels: # Deploymentが管理するPodのラベルを定義
      app: nginx
  template: # ReplicaSet個々のあるべき姿
    metadata:
      labels:
        app: nginx
    spec: # -- ここからPodTemplate --
      containers:
      - name: nginx
        image: nginx:1.14.2 # Podで使用するDockerイメージ
        ports:
        - containerPort: 80
---
# https://kubernetes.io/ja/docs/concepts/services-networking/service/
apiVersion: v1 # `kubectl api-resources | service` のバージョンを指定
kind: Service
metadata:
  name: nginx-service
  labels:
    app: nginx
spec:
  type: NodePort
  ports:
  - port: 8080 # Serviceのポート
    targetPort: 80 # コンテナのポート
    nodePort: 30080 # ワーカーノードのポート
    protocol: TCP
  selector:
    app: nginx

※以下を参考
https://beyondjapan.com/blog/2022/09/start-kubernetes/

Yuji MiyashitaYuji Miyashita

helm

  • k8sにおけるyumやbrewのような感じ
  • yaml形式でパッケージ化 (= Helmチャート )
  • k8sにおけるパッケージとは、Deployment, Service, Secret, ConfigMapの集合体
  • Helmリポジトリに上げて他に開発者でも使えるようにしたり、誰かが作った人のHelm Chartを使ったりできる
  • 大量のk8sリソースを一括でかつ、整合性をもって削除することは、手動でのkubectlコマンドでは困難、Helmのようなパッケージマネージャがあって初めて実現できる
  • helmの構成は以下の通り
    • templates: 変数が含まれたマニフェスト、liquidみたいな変数が含まれている
    • values.yml: マニフェストで必要な変数定義。開発者が上書きできる
  • {{ .Values.xxx }} になっているところが対象
Yuji MiyashitaYuji Miyashita

チュートリアルその1 Nginxのデフォルトページを表示

minikube start
# nginx.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: nginx-deployment
spec:
  replicas: 3
  selector:
    matchLabels:
      app: nginx
  template:
    metadata:
      labels:
        app: nginx
    spec:
      containers:
      - name: nginx
        image: nginx:1.14.2
        ports:
        - containerPort: 80
---
apiVersion: v1
kind: Service
metadata:
  name: nginx-service
  labels:
    app: nginx
spec:
  type: NodePort
  ports:
  - port: 8080
    targetPort: 80
    nodePort: 30080
    protocol: TCP
  selector:
    app: nginx
kubectl apply -f ./nginx.yaml
kubectl get pod

NAME                                READY   STATUS    RESTARTS   AGE
nginx-deployment-86dcfdf4c6-7cvlf   1/1     Running   0          13m
nginx-deployment-86dcfdf4c6-jfs4v   1/1     Running   0          13m
nginx-deployment-86dcfdf4c6-wjtmw   1/1     Running   0          13m
kubectl get service

NAME            TYPE        CLUSTER-IP      EXTERNAL-IP   PORT(S)          AGE
kubernetes      ClusterIP   10.96.0.1       <none>        443/TCP          56m
nginx-service   NodePort    10.96.112.174   <none>        8080:30080/TCP   13m
kubectl get deployment

NAME               READY   UP-TO-DATE   AVAILABLE   AGE
nginx-deployment   3/3     3            3           13m
minikube service nginx-service

|-----------|---------------|-------------|---------------------------|
| NAMESPACE |     NAME      | TARGET PORT |            URL            |
|-----------|---------------|-------------|---------------------------|
| default   | nginx-service |        8080 | http://192.168.49.2:30080 |
|-----------|---------------|-------------|---------------------------|
🏃  nginx-service サービス用のトンネルを起動しています。
|-----------|---------------|-------------|------------------------|
| NAMESPACE |     NAME      | TARGET PORT |          URL           |
|-----------|---------------|-------------|------------------------|
| default   | nginx-service |             | http://127.0.0.1:61211 |
|-----------|---------------|-------------|------------------------|
🎉  デフォルトブラウザーで default/nginx-service サービスを開いています...
❗  Docker ドライバーを darwin 上で使用しているため、実行するにはターミナルを開く必要があります。
kubectl delete -f ./nginx.yaml

※以下を参考
https://beyondjapan.com/blog/2022/09/start-kubernetes/

Yuji MiyashitaYuji Miyashita

チュートリアルその2 Redisを使用したPHPのゲストブックアプリケーションのデプロイ

minikube start
kubectl apply -f https://raw.githubusercontent.com/kubernetes/website/main/content/ja/examples/application/guestbook/redis-master-deployment.yaml
kubectl logs -f $(kubectl get pod --no-headers -o custom-columns=":metadata.name" | grep redis-master)
kubectl apply -f https://raw.githubusercontent.com/kubernetes/website/main/content/ja/examples/application/guestbook/redis-master-service.yaml
kubectl apply -f https://raw.githubusercontent.com/kubernetes/website/main/content/ja/examples/application/guestbook/redis-slave-deployment.yaml
kubectl apply -f https://raw.githubusercontent.com/kubernetes/website/main/content/ja/examples/application/guestbook/redis-slave-service.yaml
kubectl apply -f https://raw.githubusercontent.com/kubernetes/website/main/content/ja/examples/application/guestbook/frontend-deployment.yaml
kubectl set image deployment/frontend php-redis=gcr.io/google-samples/gb-frontend@sha256:cbc8ef4b0a2d0b95965e0e7dc8938c270ea98e34ec9d60ea64b2d5f2df2dfbbf
kubectl apply -f https://raw.githubusercontent.com/kubernetes/website/main/content/ja/examples/application/guestbook/frontend-service.yaml
minikube service frontend

以下参考

https://kubernetes.io/ja/docs/tutorials/stateless-application/guestbook/

Yuji MiyashitaYuji Miyashita

チュートリアルその3

mkdir wordpress && cd ./wordpress/
curl -LO https://k8s.io/examples/application/wordpress/mysql-deployment.yaml
curl -LO https://k8s.io/examples/application/wordpress/wordpress-deployment.yaml

kustomization.yaml

secretGenerator:
- name: mysql-pass
  literals:
  - password=YOUR_PASSWORD
resources:
  - mysql-deployment.yaml
  - wordpress-deployment.yaml
kubectl apply -k ./
kubectl get secrets

NAME                    TYPE     DATA   AGE
mysql-pass-5m26tmdb5k   Opaque   1      61s
kubectl get pvc

NAME             STATUS   VOLUME                                     CAPACITY   ACCESS MODES   STORAGECLASS   AGE
mysql-pv-claim   Bound    pvc-24d41e3b-231c-41a2-987a-56989a0be4f7   20Gi       RWO            standard       89s
wp-pv-claim      Bound    pvc-f69c680d-ec6a-4a83-bcf5-18c0c0ebdb74   20Gi       RWO            standard       89s

※以下を参考
https://kubernetes.io/ja/docs/tutorials/stateful-application/mysql-wordpress-persistent-volume/

Yuji MiyashitaYuji Miyashita

チュートリアルその4 StatefulSetを使用したCassandraのデプロイ

minikube start --memory 5120 --cpus=4
kubectl apply -f https://raw.githubusercontent.com/kubernetes/website/main/content/ja/examples/application/cassandra/cassandra-service.yaml
kubectl get svc cassandra

NAME        TYPE        CLUSTER-IP   EXTERNAL-IP   PORT(S)    AGE
cassandra   ClusterIP   None         <none>        9042/TCP   85s
kubectl apply -f https://raw.githubusercontent.com/kubernetes/website/main/content/ja/examples/application/cassandra/cassandra-statefulset.yaml

※以下を参考
https://kubernetes.io/ja/docs/tutorials/stateful-application/cassandra/
https://stackoverflow.com/questions/45270070/kubernetes-minikube-pod-oomkilled-with-apparently-plenty-of-memory-left-in-nod

Yuji MiyashitaYuji Miyashita

Sparkお試し

docker pull apache/spark
minikube start
minikube image load apache/spark
minikube image ls
helm repo add spark-operator https://googlecloudplatform.github.io/spark-on-k8s-operator
helm install my-release spark-operator/spark-operator --namespace spark-operator --create-namespace --set webhook.enable=true
apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: ScheduledSparkApplication
metadata:
  name: pyspark-pi-scheduled
  namespace: spark-operator
spec:
  schedule: "@every 1m"
  concurrencyPolicy: Allow
  successfulRunHistoryLimit: 1
  failedRunHistoryLimit: 3
  template:
    type: Python
    pythonVersion: "3"
    mode: cluster
    image: "apache/spark"
    imagePullPolicy: Never
    mainApplicationFile: local:///opt/spark/examples/src/main/python/pi.py
    sparkVersion: "3.5.0"
    restartPolicy:
      type: Never
    driver:
      cores: 1
      coreLimit: "1200m"
      memory: "512m"
      labels:
        version: 3.5.0
      serviceAccount: my-release-spark
    executor:
      cores: 1
      instances: 2
      memory: "512m"
      labels:
        version: 3.5.0
      serviceAccount: my-release-spark
cat <<EOF | kubectl apply -f -
[上記yamlをコピペ]
EOF
cat <<EOF | kubectl delete -f -
[上記yamlをコピペ]
EOF
helm uninstall my-release -n spark-operator

概要

  • 公式で管理されているSparkのイメージはないので、自分たちでイメージを管理
  • これ を元にイメージを作成する
  • EKSで動かそうとすると、 aws-java-sdkhadoop-aws が必要になったりする
  • 順序
    • spark-submit
    • Schedulerにて、Sparkを実行するNodeを決定
    • Spark Driver(Executorの管理を行うPod) がデプロイ
    • SparkExecutorのPodがデプロイ
  • 計算が終わるたびにPodが削除されるため、従量課金には優しい
  • 計算結果は、Spark ExecuterのPodのログに記載
  • これ もよい
Yuji MiyashitaYuji Miyashita

Spark - minioお試し

https://blog.min.io/spark-minio-kubernetes/

minikube start
brew services start minio
brew install minio/stable/mc
helm repo add spark-operator https://googlecloudplatform.github.io/spark-on-k8s-operator
helm install my-release spark-operator/spark-operator \
--namespace spark-operator \
--set webhook.enable=true \
--set image.repository=openlake/spark-operator \
--set image.tag=3.3.1 \
--create-namespace

ここ からサンプルデータをダウンロード

mc alias set minio http://127.0.0.1:9000 minioadmin minioadmin
mc mb minio/openlake
mc mb minio/openlake/spark
mc mb minio/openlake/spark/sample-data
mc cp ~/Downloads/2018_Yellow_Taxi_Trip_Data.csv minio/openlake/spark/sample-data/taxi-data.csv
kubectl create secret generic minio-secret \
--from-literal=AWS_ACCESS_KEY_ID=[CHANGEME] \
--from-literal=AWS_SECRET_ACCESS_KEY=[CHANGEME] \
--from-literal=ENDPOINT=http://host.minikube.internal:9000 \
--from-literal=AWS_REGION=us-east-1 \
--namespace spark-operator
import logging
import os
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, LongType, DoubleType, StringType

logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
logger = logging.getLogger("MinioSparkJob")
spark = SparkSession.builder.getOrCreate()

def load_config(spark_context: SparkContext):
  spark_context._jsc.hadoopConfiguration().set("fs.s3a.access.key", os.getenv("AWS_ACCESS_KEY_ID"))
  spark_context._jsc.hadoopConfiguration().set("fs.s3a.secret.key", os.getenv("AWS_SECRET_ACCESS_KEY"))
  spark_context._jsc.hadoopConfiguration().set("fs.s3a.endpoint", os.getenv("ENDPOINT"))
  spark_context._jsc.hadoopConfiguration().set("fs.s3a.connection.ssl.enabled", "false")
  spark_context._jsc.hadoopConfiguration().set("fs.s3a.path.style.access", "true")
  spark_context._jsc.hadoopConfiguration().set("fs.s3a.attempts.maximum", "1")
  spark_context._jsc.hadoopConfiguration().set("fs.s3a.connection.establish.timeout", "5000")
  spark_context._jsc.hadoopConfiguration().set("fs.s3a.connection.timeout", "10000")

load_config(spark.sparkContext)

# Define schema for NYC Taxi Data
schema = StructType([
  StructField('VendorID', LongType(), True),
  StructField('tpep_pickup_datetime', StringType(), True),
  StructField('tpep_dropoff_datetime', StringType(), True),
  StructField('passenger_count', DoubleType(), True),
  StructField('trip_distance', DoubleType(), True),
  StructField('RatecodeID', DoubleType(), True),
  StructField('store_and_fwd_flag', StringType(), True),
  StructField('PULocationID', LongType(), True),
  StructField('DOLocationID', LongType(), True),
  StructField('payment_type', LongType(), True),
  StructField('fare_amount', DoubleType(), True),
  StructField('extra', DoubleType(), True),
  StructField('mta_tax', DoubleType(), True),
  StructField('tip_amount', DoubleType(), True),
  StructField('tolls_amount', DoubleType(), True),
  StructField('improvement_surcharge', DoubleType(), True),
  StructField('total_amount', DoubleType(), True)])

# Read CSV file from MinIO
df = spark.read.option("header", "true").schema(schema).csv(os.getenv("INPUT_PATH", "s3a://openlake/spark/sample-data/taxi-data.csv"))

# Filter dataframe based on passenger_count greater than 6
large_passengers_df = df.filter(df.passenger_count > 6)
total_rows_count = df.count()
filtered_rows_count = large_passengers_df.count()

# File Output Committer is used to write the output to the destination (Not recommended for Production)
large_passengers_df.write.format("csv").option("header", "true").save(os.getenv("OUTPUT_PATH", "s3a://openlake/spark/sample-data/output"))

logger.info(f"Total Rows for NYC Taxi Data: {total_rows_count}")
logger.info(f"Total Rows for Passenger Count > 6: {filtered_rows_count}")
FROM openlake/spark-py:3.3.1
USER root
WORKDIR /app
RUN pip3 install pyspark==3.3.1
COPY src/*.py .
docker build -t sparkjob-demo-local:latest .
minikube image load sparkjob-demo-local
apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
  name: spark-minio
  namespace: spark-operator
spec:
  type: Python
  pythonVersion: "3"
  mode: cluster
  image: "sparkjob-demo-local"
  imagePullPolicy: Never
  mainApplicationFile: local:///app/main.py
  sparkVersion: "3.3.1"
  restartPolicy:
    type: OnFailure
    onFailureRetries: 3
    onFailureRetryInterval: 10
    onSubmissionFailureRetries: 5
    onSubmissionFailureRetryInterval: 20
  driver:
    cores: 1
    memory: "1024m"
    labels:
      version: 3.3.1
    serviceAccount: my-release-spark
    env:
      - name: AWS_REGION
        value: us-east-1
      - name: AWS_ACCESS_KEY_ID
        value: [CHANGEME]
      - name: AWS_SECRET_ACCESS_KEY
        value: [CHANGEME]
  executor:
    cores: 1
    instances: 3
    memory: "1024m"
    labels:
      version: 3.3.1
    env:
      - name: INPUT_PATH
        value: "s3a://openlake/spark/sample-data/taxi-data.csv"
      - name: OUTPUT_PATH
        value: "s3a://openlake/spark/output/taxi-data-output"
      - name: AWS_REGION
        valueFrom:
          secretKeyRef:
            name: minio-secret
            key: AWS_REGION
      - name: AWS_ACCESS_KEY_ID
        valueFrom:
          secretKeyRef:
            name: minio-secret
            key: AWS_ACCESS_KEY_ID
      - name: AWS_SECRET_ACCESS_KEY
        valueFrom:
          secretKeyRef:
            name: minio-secret
            key: AWS_SECRET_ACCESS_KEY
      - name: ENDPOINT
        valueFrom:
          secretKeyRef:
            name: minio-secret
            key: ENDPOINT

---
apiVersion: v1
kind: Service
metadata:
  namespace: spark-operator
  name: spark-ui-service
spec:
  type: NodePort
  ports:
    - protocol: TCP
      port: 4040
      targetPort: 4040
  selector:
    app: spark-ui-service
cat <<EOF | kubectl apply -f -
[上記コピペ]
EOF
kubectl logs -f spark-minio-driver -n spark-operator
kubectl exec -it spark-minio-driver -n spark-operator -- /bin/bash
minikube service spark-ui-service -n spark-minio-ui-svc
Yuji MiyashitaYuji Miyashita

お試しの順番について

  • TrinoやSuperSetのように、ファイルを参照しに行くようなミドルウェアの繋ぎこみは後回し
  • とにかく以下を最優先でお試しすること
    • k8s上でSparkを動かせるように
    • k8s上でSparkを動かし、icebarg形式のファイルをストレージ(minio)に書き出せるように
Yuji MiyashitaYuji Miyashita

pyspark×icebergお試し

export AWS_ACCESS_KEY_ID=[CHANGEME]
export AWS_SECRET_ACCESS_KEY=[CHANGEME]
export JDBC_USERNAME=[CHANGEME]
export JDBC_PASSWORD=[CHANGEME]

python
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import rank, concat, lit, when, min, max, col, dense_rank, asc, desc, date_format, collect_list, concat_ws, row_number
from pyspark.sql.window import Window
import json
import os
import webbrowser
import requests
spark = SparkSession.builder \
    .appName("Jupyter") \
    .config(f"spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.2_2.12:1.4.3") \
    .config(f"spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.spark_catalog.catalog-impl", "org.apache.iceberg.jdbc.JdbcCatalog") \
    .config("spark.sql.catalog.spark_catalog.uri", "jdbc:mysql://localhost/spark_test") \
    .config("spark.sql.catalog.spark_catalog.jdbc.verifyServerCertificate", "true" ) \
    .config("spark.sql.catalog.spark_catalog.jdbc.useSSL", "false" ) \
    .config("spark.sql.catalog.spark_catalog.jdbc.user", os.getenv('JDBC_USERNAME')) \
    .config("spark.sql.catalog.spark_catalog.jdbc.password", os.getenv('JDBC_USERNAME')) \
    .config("spark.sql.catalog.spark_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") \
    .config("spark.sql.catalog.spark_catalog.warehouse", "s3://iceberg/") \
    .config("spark.sql.catalogImplementation", "in-memory") \
    .config("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2") \
    .config("spark.hadoop.mapreduce.fileoutputcommitter.cleanup-failures.ignored", "true") \
    .config("spark.hadoop.parquet.enable.summary-metadata", "false") \
    .config("spark.sql.parquet.mergeSchema", "false") \
    .config("spark.sql.parquet.filterPushdown", "true") \
    .config("spark.sql.hive.metastorePartitionPruning", "true") \
    .getOrCreate()
spark.read.csv("localhost:9000")