Open14
0からのKubernetes勉強めも
概要
- Kubernetesを略すと
k8s
- ひたすら公式ドキュメントの設定値を読んで、ひたすらyamlに設定を書いていく開発
- k8sのリソースは、以下の3パターンで作成していく
- 標準リソース (DeploymentとかServiceとか) でyaml定義
- カスタムリソース定義 (SparkApplicationとかCronWorkflowとか) を使ってyaml定義
- helm経由でパッケージをインストールし、
values.yaml
を使って値だけ上書き
- k8sクラスター内が一目でわかる Lens は入れておいた方が良い
- こっち からログインすること
- ローカルのMacでk8sを試すには minikube を使う
- 他にも種類はあるが、プラグインの充実さからminikubeがやりやすい
- minikubeコマンドとkubectlの違いは以下の通り
- kubectl: k8sクラスターとのやりとり
- minikube: ローカル環境で単一ノードのKubernetesクラスタを起動
その他
- ローカルで複数のリソースを組み合わせての実行はマシン強化が必須、minikubeが耐えられない
標準リソース
Pod
- ボリュームを指定、共有できる
- 各Podごとに、ユニークなIPアドレスが割り当てられている
- Pod内のすべてのコンテナは、IPアドレスとネットワークポートを含むネットワーク名前空間を共有する
- 同一Pod内のコンテナは、localhostを使用して他のコンテナと通信できる
- 異なるPod上で実行中のコンテナ間でやり取りをしたい場合は、IPネットワークを使用して通信できる
Service
-
svc
と略されることがある - 通常、PodはKubernetesクラスター内部のIPアドレスからのみアクセス可
- 何もしないとPodに対して、外部からアクセスできない
- 外部からアクセスできるようにするために、 Service としてPodを公開する必要がある
- ロードバランサーをサポートするクラウドプロバイダーでは、Serviceにアクセスするための外部IPアドレスが提供される
- minikube では、
LoadBalancer
タイプはminikube service
コマンドを使用した接続可能なServiceを作成 - 開発してて、localhost:xxxxでアクセスしたいものには必ずServiceが存在する
- minio
- kafka
- argo cd
- 複数ポートを公開することもできる
- Type
- ClusterIP
- クラスター内部のIPでServiceを公開
- NodePort
- 各NodeのIPにて、静的なポート上でServiceを公開
- 転送先であるClusterIP Serviceが自動生成される
-
[NodeIP]:[NodePort]
にてアクセス可能
- LoadBalancer
- ロードバランサを使用して、Serviceを公開
- 転送先であるNodePort ServiceとClusterIP Serviceが自動生成される
- ClusterIP
- minikube tunnelは、LoadBalancerのtypeをNodePortに調整してくれるようなもの
Deployment
- ReplicaSet (rs) のあるべき姿を記述
- 何個立ち上げるか
- どのイメージを使用するか
- rsにどんな名前をつけるか
- ReplicaSetを単独で作成するケースは稀、Deployment経由でReplicaSetを定義
- それぞれに定義するもの
- metadata: リソースに関するメタデータ
- spec: リソースのあるべき姿
- template: Pod個々のあるべき姿
- Deploymentの全てのロールアウト履歴は、いつでもロールバックできるようにデフォルトでシステムに保持されている
マニフェストの見方
apiVersion: apps/v1 # `kubectl api-resources | development` のバージョンを指定
kind: Deployment # 標準リソース名(Service, Deploymentなど) or カスタムリソース名(SparkApplication, kafkaなど)
metadata:
name: nginx-deployment # という名前のDeploymentが作成、ReplicaSetは `nginx-deployment-xxx`
spec: # 作成されるReplicaSetのあるべき姿を定義
replicas: 3 # つのReplicaSetを作成
selector:
matchLabels: # Deploymentが管理するPodのラベルを定義
app: nginx
template: # ReplicaSet個々のあるべき姿
metadata:
labels:
app: nginx
spec: # -- ここからPodTemplate --
containers:
- name: nginx
image: nginx:1.14.2 # Podで使用するDockerイメージ
ports:
- containerPort: 80
---
# https://kubernetes.io/ja/docs/concepts/services-networking/service/
apiVersion: v1 # `kubectl api-resources | service` のバージョンを指定
kind: Service
metadata:
name: nginx-service
labels:
app: nginx
spec:
type: NodePort
ports:
- port: 8080 # Serviceのポート
targetPort: 80 # コンテナのポート
nodePort: 30080 # ワーカーノードのポート
protocol: TCP
selector:
app: nginx
※以下を参考
helm
- k8sにおけるyumやbrewのような感じ
- yaml形式でパッケージ化 (=
Helmチャート
) - k8sにおけるパッケージとは、Deployment, Service, Secret, ConfigMapの集合体
- Helmリポジトリに上げて他に開発者でも使えるようにしたり、誰かが作った人のHelm Chartを使ったりできる
- 大量のk8sリソースを一括でかつ、整合性をもって削除することは、手動でのkubectlコマンドでは困難、Helmのようなパッケージマネージャがあって初めて実現できる
- helmの構成は以下の通り
- templates: 変数が含まれたマニフェスト、liquidみたいな変数が含まれている
- values.yml: マニフェストで必要な変数定義。開発者が上書きできる
-
{{ .Values.xxx }}
になっているところが対象
チュートリアルその1 Nginxのデフォルトページを表示
minikube start
# nginx.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: nginx-deployment
spec:
replicas: 3
selector:
matchLabels:
app: nginx
template:
metadata:
labels:
app: nginx
spec:
containers:
- name: nginx
image: nginx:1.14.2
ports:
- containerPort: 80
---
apiVersion: v1
kind: Service
metadata:
name: nginx-service
labels:
app: nginx
spec:
type: NodePort
ports:
- port: 8080
targetPort: 80
nodePort: 30080
protocol: TCP
selector:
app: nginx
kubectl apply -f ./nginx.yaml
kubectl get pod
NAME READY STATUS RESTARTS AGE
nginx-deployment-86dcfdf4c6-7cvlf 1/1 Running 0 13m
nginx-deployment-86dcfdf4c6-jfs4v 1/1 Running 0 13m
nginx-deployment-86dcfdf4c6-wjtmw 1/1 Running 0 13m
kubectl get service
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
kubernetes ClusterIP 10.96.0.1 <none> 443/TCP 56m
nginx-service NodePort 10.96.112.174 <none> 8080:30080/TCP 13m
kubectl get deployment
NAME READY UP-TO-DATE AVAILABLE AGE
nginx-deployment 3/3 3 3 13m
minikube service nginx-service
|-----------|---------------|-------------|---------------------------|
| NAMESPACE | NAME | TARGET PORT | URL |
|-----------|---------------|-------------|---------------------------|
| default | nginx-service | 8080 | http://192.168.49.2:30080 |
|-----------|---------------|-------------|---------------------------|
🏃 nginx-service サービス用のトンネルを起動しています。
|-----------|---------------|-------------|------------------------|
| NAMESPACE | NAME | TARGET PORT | URL |
|-----------|---------------|-------------|------------------------|
| default | nginx-service | | http://127.0.0.1:61211 |
|-----------|---------------|-------------|------------------------|
🎉 デフォルトブラウザーで default/nginx-service サービスを開いています...
❗ Docker ドライバーを darwin 上で使用しているため、実行するにはターミナルを開く必要があります。
kubectl delete -f ./nginx.yaml
※以下を参考
チュートリアルその2 Redisを使用したPHPのゲストブックアプリケーションのデプロイ
minikube start
kubectl apply -f https://raw.githubusercontent.com/kubernetes/website/main/content/ja/examples/application/guestbook/redis-master-deployment.yaml
kubectl logs -f $(kubectl get pod --no-headers -o custom-columns=":metadata.name" | grep redis-master)
kubectl apply -f https://raw.githubusercontent.com/kubernetes/website/main/content/ja/examples/application/guestbook/redis-master-service.yaml
kubectl apply -f https://raw.githubusercontent.com/kubernetes/website/main/content/ja/examples/application/guestbook/redis-slave-deployment.yaml
kubectl apply -f https://raw.githubusercontent.com/kubernetes/website/main/content/ja/examples/application/guestbook/redis-slave-service.yaml
kubectl apply -f https://raw.githubusercontent.com/kubernetes/website/main/content/ja/examples/application/guestbook/frontend-deployment.yaml
kubectl set image deployment/frontend php-redis=gcr.io/google-samples/gb-frontend@sha256:cbc8ef4b0a2d0b95965e0e7dc8938c270ea98e34ec9d60ea64b2d5f2df2dfbbf
kubectl apply -f https://raw.githubusercontent.com/kubernetes/website/main/content/ja/examples/application/guestbook/frontend-service.yaml
minikube service frontend
以下参考
チュートリアルその3
mkdir wordpress && cd ./wordpress/
curl -LO https://k8s.io/examples/application/wordpress/mysql-deployment.yaml
curl -LO https://k8s.io/examples/application/wordpress/wordpress-deployment.yaml
kustomization.yaml
secretGenerator:
- name: mysql-pass
literals:
- password=YOUR_PASSWORD
resources:
- mysql-deployment.yaml
- wordpress-deployment.yaml
kubectl apply -k ./
kubectl get secrets
NAME TYPE DATA AGE
mysql-pass-5m26tmdb5k Opaque 1 61s
kubectl get pvc
NAME STATUS VOLUME CAPACITY ACCESS MODES STORAGECLASS AGE
mysql-pv-claim Bound pvc-24d41e3b-231c-41a2-987a-56989a0be4f7 20Gi RWO standard 89s
wp-pv-claim Bound pvc-f69c680d-ec6a-4a83-bcf5-18c0c0ebdb74 20Gi RWO standard 89s
※以下を参考
チュートリアルその4 StatefulSetを使用したCassandraのデプロイ
minikube start --memory 5120 --cpus=4
kubectl apply -f https://raw.githubusercontent.com/kubernetes/website/main/content/ja/examples/application/cassandra/cassandra-service.yaml
kubectl get svc cassandra
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
cassandra ClusterIP None <none> 9042/TCP 85s
kubectl apply -f https://raw.githubusercontent.com/kubernetes/website/main/content/ja/examples/application/cassandra/cassandra-statefulset.yaml
※以下を参考
Sparkお試し
docker pull apache/spark
minikube start
minikube image load apache/spark
minikube image ls
helm repo add spark-operator https://googlecloudplatform.github.io/spark-on-k8s-operator
helm install my-release spark-operator/spark-operator --namespace spark-operator --create-namespace --set webhook.enable=true
apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: ScheduledSparkApplication
metadata:
name: pyspark-pi-scheduled
namespace: spark-operator
spec:
schedule: "@every 1m"
concurrencyPolicy: Allow
successfulRunHistoryLimit: 1
failedRunHistoryLimit: 3
template:
type: Python
pythonVersion: "3"
mode: cluster
image: "apache/spark"
imagePullPolicy: Never
mainApplicationFile: local:///opt/spark/examples/src/main/python/pi.py
sparkVersion: "3.5.0"
restartPolicy:
type: Never
driver:
cores: 1
coreLimit: "1200m"
memory: "512m"
labels:
version: 3.5.0
serviceAccount: my-release-spark
executor:
cores: 1
instances: 2
memory: "512m"
labels:
version: 3.5.0
serviceAccount: my-release-spark
cat <<EOF | kubectl apply -f -
[上記yamlをコピペ]
EOF
cat <<EOF | kubectl delete -f -
[上記yamlをコピペ]
EOF
helm uninstall my-release -n spark-operator
概要
Spark - minioお試し
minikube start
brew services start minio
brew install minio/stable/mc
helm repo add spark-operator https://googlecloudplatform.github.io/spark-on-k8s-operator
helm install my-release spark-operator/spark-operator \
--namespace spark-operator \
--set webhook.enable=true \
--set image.repository=openlake/spark-operator \
--set image.tag=3.3.1 \
--create-namespace
ここ からサンプルデータをダウンロード
mc alias set minio http://127.0.0.1:9000 minioadmin minioadmin
mc mb minio/openlake
mc mb minio/openlake/spark
mc mb minio/openlake/spark/sample-data
mc cp ~/Downloads/2018_Yellow_Taxi_Trip_Data.csv minio/openlake/spark/sample-data/taxi-data.csv
kubectl create secret generic minio-secret \
--from-literal=AWS_ACCESS_KEY_ID=[CHANGEME] \
--from-literal=AWS_SECRET_ACCESS_KEY=[CHANGEME] \
--from-literal=ENDPOINT=http://host.minikube.internal:9000 \
--from-literal=AWS_REGION=us-east-1 \
--namespace spark-operator
import logging
import os
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, LongType, DoubleType, StringType
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
logger = logging.getLogger("MinioSparkJob")
spark = SparkSession.builder.getOrCreate()
def load_config(spark_context: SparkContext):
spark_context._jsc.hadoopConfiguration().set("fs.s3a.access.key", os.getenv("AWS_ACCESS_KEY_ID"))
spark_context._jsc.hadoopConfiguration().set("fs.s3a.secret.key", os.getenv("AWS_SECRET_ACCESS_KEY"))
spark_context._jsc.hadoopConfiguration().set("fs.s3a.endpoint", os.getenv("ENDPOINT"))
spark_context._jsc.hadoopConfiguration().set("fs.s3a.connection.ssl.enabled", "false")
spark_context._jsc.hadoopConfiguration().set("fs.s3a.path.style.access", "true")
spark_context._jsc.hadoopConfiguration().set("fs.s3a.attempts.maximum", "1")
spark_context._jsc.hadoopConfiguration().set("fs.s3a.connection.establish.timeout", "5000")
spark_context._jsc.hadoopConfiguration().set("fs.s3a.connection.timeout", "10000")
load_config(spark.sparkContext)
# Define schema for NYC Taxi Data
schema = StructType([
StructField('VendorID', LongType(), True),
StructField('tpep_pickup_datetime', StringType(), True),
StructField('tpep_dropoff_datetime', StringType(), True),
StructField('passenger_count', DoubleType(), True),
StructField('trip_distance', DoubleType(), True),
StructField('RatecodeID', DoubleType(), True),
StructField('store_and_fwd_flag', StringType(), True),
StructField('PULocationID', LongType(), True),
StructField('DOLocationID', LongType(), True),
StructField('payment_type', LongType(), True),
StructField('fare_amount', DoubleType(), True),
StructField('extra', DoubleType(), True),
StructField('mta_tax', DoubleType(), True),
StructField('tip_amount', DoubleType(), True),
StructField('tolls_amount', DoubleType(), True),
StructField('improvement_surcharge', DoubleType(), True),
StructField('total_amount', DoubleType(), True)])
# Read CSV file from MinIO
df = spark.read.option("header", "true").schema(schema).csv(os.getenv("INPUT_PATH", "s3a://openlake/spark/sample-data/taxi-data.csv"))
# Filter dataframe based on passenger_count greater than 6
large_passengers_df = df.filter(df.passenger_count > 6)
total_rows_count = df.count()
filtered_rows_count = large_passengers_df.count()
# File Output Committer is used to write the output to the destination (Not recommended for Production)
large_passengers_df.write.format("csv").option("header", "true").save(os.getenv("OUTPUT_PATH", "s3a://openlake/spark/sample-data/output"))
logger.info(f"Total Rows for NYC Taxi Data: {total_rows_count}")
logger.info(f"Total Rows for Passenger Count > 6: {filtered_rows_count}")
FROM openlake/spark-py:3.3.1
USER root
WORKDIR /app
RUN pip3 install pyspark==3.3.1
COPY src/*.py .
docker build -t sparkjob-demo-local:latest .
minikube image load sparkjob-demo-local
apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
name: spark-minio
namespace: spark-operator
spec:
type: Python
pythonVersion: "3"
mode: cluster
image: "sparkjob-demo-local"
imagePullPolicy: Never
mainApplicationFile: local:///app/main.py
sparkVersion: "3.3.1"
restartPolicy:
type: OnFailure
onFailureRetries: 3
onFailureRetryInterval: 10
onSubmissionFailureRetries: 5
onSubmissionFailureRetryInterval: 20
driver:
cores: 1
memory: "1024m"
labels:
version: 3.3.1
serviceAccount: my-release-spark
env:
- name: AWS_REGION
value: us-east-1
- name: AWS_ACCESS_KEY_ID
value: [CHANGEME]
- name: AWS_SECRET_ACCESS_KEY
value: [CHANGEME]
executor:
cores: 1
instances: 3
memory: "1024m"
labels:
version: 3.3.1
env:
- name: INPUT_PATH
value: "s3a://openlake/spark/sample-data/taxi-data.csv"
- name: OUTPUT_PATH
value: "s3a://openlake/spark/output/taxi-data-output"
- name: AWS_REGION
valueFrom:
secretKeyRef:
name: minio-secret
key: AWS_REGION
- name: AWS_ACCESS_KEY_ID
valueFrom:
secretKeyRef:
name: minio-secret
key: AWS_ACCESS_KEY_ID
- name: AWS_SECRET_ACCESS_KEY
valueFrom:
secretKeyRef:
name: minio-secret
key: AWS_SECRET_ACCESS_KEY
- name: ENDPOINT
valueFrom:
secretKeyRef:
name: minio-secret
key: ENDPOINT
---
apiVersion: v1
kind: Service
metadata:
namespace: spark-operator
name: spark-ui-service
spec:
type: NodePort
ports:
- protocol: TCP
port: 4040
targetPort: 4040
selector:
app: spark-ui-service
cat <<EOF | kubectl apply -f -
[上記コピペ]
EOF
kubectl logs -f spark-minio-driver -n spark-operator
kubectl exec -it spark-minio-driver -n spark-operator -- /bin/bash
minikube service spark-ui-service -n spark-minio-ui-svc
お試しの順番について
- TrinoやSuperSetのように、ファイルを参照しに行くようなミドルウェアの繋ぎこみは後回し
- とにかく以下を最優先でお試しすること
- k8s上でSparkを動かせるように
- k8s上でSparkを動かし、icebarg形式のファイルをストレージ(minio)に書き出せるように
pyspark×Iceberg
カタログの選択肢
- Hadoop
- JDBC
- Hive Metastore
pyspark×icebergお試し
export AWS_ACCESS_KEY_ID=[CHANGEME]
export AWS_SECRET_ACCESS_KEY=[CHANGEME]
export JDBC_USERNAME=[CHANGEME]
export JDBC_PASSWORD=[CHANGEME]
python
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import rank, concat, lit, when, min, max, col, dense_rank, asc, desc, date_format, collect_list, concat_ws, row_number
from pyspark.sql.window import Window
import json
import os
import webbrowser
import requests
spark = SparkSession.builder \
.appName("Jupyter") \
.config(f"spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.2_2.12:1.4.3") \
.config(f"spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.spark_catalog.catalog-impl", "org.apache.iceberg.jdbc.JdbcCatalog") \
.config("spark.sql.catalog.spark_catalog.uri", "jdbc:mysql://localhost/spark_test") \
.config("spark.sql.catalog.spark_catalog.jdbc.verifyServerCertificate", "true" ) \
.config("spark.sql.catalog.spark_catalog.jdbc.useSSL", "false" ) \
.config("spark.sql.catalog.spark_catalog.jdbc.user", os.getenv('JDBC_USERNAME')) \
.config("spark.sql.catalog.spark_catalog.jdbc.password", os.getenv('JDBC_USERNAME')) \
.config("spark.sql.catalog.spark_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") \
.config("spark.sql.catalog.spark_catalog.warehouse", "s3://iceberg/") \
.config("spark.sql.catalogImplementation", "in-memory") \
.config("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2") \
.config("spark.hadoop.mapreduce.fileoutputcommitter.cleanup-failures.ignored", "true") \
.config("spark.hadoop.parquet.enable.summary-metadata", "false") \
.config("spark.sql.parquet.mergeSchema", "false") \
.config("spark.sql.parquet.filterPushdown", "true") \
.config("spark.sql.hive.metastorePartitionPruning", "true") \
.getOrCreate()
spark.read.csv("localhost:9000")