💭

Microsoft fabricとAzureのアーキテクチャ-②【Notebook編】

2024/02/10に公開

やること

①.SQLDBにデータ(商品レビューデータ)をinsertする
②.DataFactoryでデータを取り込む
③.組み込みAIを使い、データを前処理する

検証手順

  1. 「新しいノートブック」をクリック
  2. ノートブックが開いていることを確認
  3. 下記のコードを実行し、テーブルの中身を確認
df = spark.sql("SELECT * FROM sqldbtolakehouse.Product_review LIMIT 1000")
display(df)


4. 下記のコードを実行し、commentの感情分析を行う

https://zenn.dev/headwaters/articles/ac43d6306c7c6b

# Get workload endpoints and access token

from synapse.ml.mlflow import get_mlflow_env_config
import json
import requests
from pprint import pprint
import uuid

mlflow_env_configs = get_mlflow_env_config()
pandas_df = df.toPandas()
access_token = access_token = mlflow_env_configs.driver_aad_token
prebuilt_AI_base_host = mlflow_env_configs.workload_endpoint + "cognitive/textanalytics/"
print("Workload endpoint for AI service: \n" + prebuilt_AI_base_host)

service_url = prebuilt_AI_base_host + "language/:analyze-text?api-version=2022-05-01"

# Make a RESful request to AI service

post_headers = {
    "Content-Type" : "application/json",
    "Authorization" : "Bearer {}".format(access_token)
}

def printresponse(response):
    print(f"HTTP {response.status_code}")
    if response.status_code == 200:
        try:
            result = response.json()
            print(json.dumps(result, indent=2, ensure_ascii=False))
        except:
            print(f"pasre error {response.content}")
    else:
        print(response.headers)
        print(f"error message: {response.content}")

# 'comment'列からテキストを取得して感情分析APIに送信
for idx, row in pandas_df.iterrows():

    post_body = {
        "kind": "SentimentAnalysis",
        "parameters": {
            "modelVersion": "latest",
            "opinionMining": "True"
        },
        "analysisInput":{
            "documents":[
                {
                    "id": str(idx),
                    "language":"ja",
                    "text": row['comment']
                }
            ]
        }
    } 

    post_headers["x-ms-workload-resource-moniker"] = str(uuid.uuid1())
    response = requests.post(service_url, json=post_body, headers=post_headers)
    response_json = response.json()

    # Get 'confidenceScores' from the response
    confidence_scores = response_json.get('results', {}).get('documents', [{}])[0].get('confidenceScores', {})

    # Output all information of the request process
    print(confidence_scores)
  1. 下記が出力される
{'positive': 1.0, 'neutral': 0.0, 'negative': 0.0}
{'positive': 0.98, 'neutral': 0.02, 'negative': 0.0}
{'positive': 0.0, 'neutral': 0.0, 'negative': 1.0}
{'positive': 1.0, 'neutral': 0.0, 'negative': 0.0}
{'positive': 0.01, 'neutral': 0.0, 'negative': 0.99}
{'positive': 1.0, 'neutral': 0.0, 'negative': 0.0}
{'positive': 1.0, 'neutral': 0.0, 'negative': 0.0}
{'positive': 0.0, 'neutral': 0.0, 'negative': 1.0}
{'positive': 0.96, 'neutral': 0.04, 'negative': 0.0}
{'positive': 0.0, 'neutral': 0.0, 'negative': 1.0}
{'positive': 0.91, 'neutral': 0.02, 'negative': 0.07}
{'positive': 0.0, 'neutral': 0.0, 'negative': 0.99}
{'positive': 0.11, 'neutral': 0.0, 'negative': 0.89}
{'positive': 0.05, 'neutral': 0.83, 'negative': 0.12}
{'positive': 0.0, 'neutral': 0.0, 'negative': 1.0}
{'positive': 1.0, 'neutral': 0.0, 'negative': 0.0}
{'positive': 0.1, 'neutral': 0.51, 'negative': 0.4}
{'positive': 0.98, 'neutral': 0.02, 'negative': 0.0}
{'positive': 0.0, 'neutral': 0.0, 'negative': 1.0}
{'positive': 1.0, 'neutral': 0.0, 'negative': 0.0}
  1. 下記のコードを実行し、新たな列(positive,neutral,negative)を追加してテーブルを作成
from pyspark.sql.functions import udf
from pyspark.sql.types import StructType, StructField, DoubleType, StringType

# UDFを定義
def analyze_sentiment(comment):
    post_body = {
        "kind": "SentimentAnalysis",
        "parameters": {
            "modelVersion": "latest",
            "opinionMining": "True"
        },
        "analysisInput":{
            "documents":[
                {
                    "id":"1",
                    "language":"ja",
                    "text": comment
                }
            ]
        }
    } 

    post_headers["x-ms-workload-resource-moniker"] = str(uuid.uuid1())
    response = requests.post(service_url, json=post_body, headers=post_headers)
    # Get 'confidenceScores' from the response
    response_json = response.json()
    confidence_scores = response_json.get('results', {}).get('documents', [{}])[0].get('confidenceScores', {})
    return (confidence_scores.get('positive'), confidence_scores.get('neutral'), confidence_scores.get('negative'))

# Define the schema for the return value
schema = StructType([
    StructField("positive", DoubleType(), True),
    StructField("neutral", DoubleType(), True),
    StructField("negative", DoubleType(), True)
])

# Register the UDF
sentiment_udf = udf(analyze_sentiment, schema)

# Add new columns
df = df.withColumn("sentiment", sentiment_udf(df['comment']))

# Split the struct column into separate columns
df = df.select("*", "sentiment.*").drop("sentiment")
df.write.saveAsTable("sqldbtolakehouse.Product_review_with_sentiment")
  1. 下記のコードを実行し、テーブルが作成されていることを確認
df = spark.sql("SELECT * FROM sqldbtolakehouse.product_review_with_sentiment LIMIT 1000")
display(df)

ヘッドウォータース

Discussion