🐨

DataActivatorを使って投稿した記事がZennのトレンドに上がったらTeamsに通知させる

2024/04/29に公開

やりたいこと

社内でZennの運営をしています。
メンバーが投稿した記事がトレンドに上がったらTeamsに通知が飛ぶようにしたいと思います。

Zennのトレンド一覧を取得

非公式ですが、TechとIdeaそれぞれのトレンド記事を取得できるAPIがありました。

https://zenn-api.vercel.app/

こちらのAPI(非公式)の上から20個のデータもトレンドと一緒なのでした。
ただ毎回上から20個がトレンドという確証がないので、今回は前者のAPIを使用します。

https://zenn.dev/api/articles

Data Activatorとは

Microsoft Fabric内にある機能で、設定したデータのパターン・条件が検出された時に自動でアクションを実行します。
検出した後はメール・Teamsにメッセージを飛ばしたり、Power Automateのフローを実行できたりします。

https://learn.microsoft.com/ja-jp/fabric/data-activator/data-activator-introduction

現状はデータの監視先として設定できるのはBIレポートのビジュアルのみです。
まだパブリックプレビュー版なので使える機能は限られてますが、今後の機能追加に期待大です。

実装

前提

こちらで既にFabricの環境を作っているので、これをベースに進めていきます。

https://zenn.dev/headwaters/articles/c69811c3ed54c0


1. Notebook上でデータを取得

  1. 既存CSVファイルを読み込み
  2. APIを叩いて、既存データ内に同じslugがあったらlast_trend_in_datetimeを更新する
  3. 新規のデータと既存データを組み合わせてCSVに出力
  4. CSVファイルからTableを作成

実行は3時間ごとにスケジューリングします。

import requests
import pandas as pd
import datetime

exist_trend_post_all = pd.read_csv("/lakehouse/default/" + "Files/trend_post_all.csv")
df_trend_post_all = pd.DataFrame()

categories = [
    {"name": "tech", "url": "https://zenn-api.vercel.app/api/trendTech"},
    {"name": "idea", "url": "https://zenn-api.vercel.app/api/trendIdea"},
]

for i in categories:
    dt_now = datetime.datetime.now()
    response_post = requests.get(i["url"])
    articles = response_post.json()

    df_post = pd.DataFrame(articles)
    for d in articles:
        current_slug_row = exist_trend_post_all[exist_trend_post_all['slug'].isin([d["slug"]])]
        if len(current_slug_row) != 0:
            df_post = df_post[df_post['slug'] != d["slug"]]
            exist_trend_post_all.loc[exist_trend_post_all['slug'] == d["slug"],'last_trend_in_datetime'] = dt_now
            

    df_post["user_id"] = df_post["user"].apply(lambda x: x["id"])
    df_post["trend_in_datetime"] = dt_now
    df_post = df_post[["id", "title", "slug", "emoji", "commentsCount", "likedCount", "bodyLettersCount", "publishedAt", "bodyUpdatedAt", "trend_in_datetime", "user_id", "articleType"]]
    df_post = df_post.rename(columns = {'commentsCount':'comments_count','likedCount': 'liked_count','bodyLettersCount':'body_letters_count','publishedAt':'published_at', 'bodyUpdatedAt':'body_updated_at', 'articleType':'article_type'})
    df_trend_post_all = pd.concat([exist_trend_post_all, df_post], ignore_index=True, axis=0)
    exist_trend_post_all = df_trend_post_all

df_trend_post_all.to_csv("/lakehouse/default/Files/trend_post_all.csv", index=False)
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType, DateType

table_name = "trend_post_all"
file_name = f"Files/{table_name}.csv"

schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("title", StringType(), True),
    StructField("slug", StringType(), True),
    StructField("emoji", StringType(), True),
    StructField("comments_count", IntegerType(), True),
    StructField("liked_count", IntegerType(), True),
    StructField("body_letters_count", IntegerType(), True),
    StructField("published_at", DateType(), True),
    StructField("body_updated_at", DateType(), True),
    StructField("user_id", IntegerType(), True),
])

df = spark.read.format("csv").option("header","true").schema(schema).load(file_name)
df.write.mode("overwrite").format("delta").save("Tables/"+table_name)


2. セマンティックモデルの設定

レイクハウス内の「SQL分析エンドポイント」を開く。


「報告」タブ内の「既定のセマンティックモデルの管理」を選択


先ほど作成したTableのチェックをつけて、セマンティックモデルに追加


「既定のセマンティックモデルオブジェクト」に移動して、リレーションを設定。
「dataflow_zenn_all_user」と「post_user_all」の二つのテーブルは前の記事で作成しています。


3. BIレポートを作成

ホームタブに移動して、「新しいレポート」を選択


カードビジュアルに、trend_post_allテーブル内のid列の「一意なカウント」を選択


社内のメンバーの記事のみにしたいので、post_user_allテーブル内のid列をビジュアルフィルターに割り当てる。


最後に今月の記事の数だけにしたいので、trend_post_allテーブル内のtrend_in_datetime列をビジュアルフィルターに割り当てる。
条件は「相対日付」で「現在」の「月」に設定


3. アラートの設定

ビジュアルの「...」をクリックして、「アラートの設定」を選択


条件はあとで変更できるので適当に設定します。
DataActivatorは既に作ってるので、作成済みのものを設定。
「アラートの作成」を選択


DataActivatorに移動して、トリガーの検出条件を変更します。
条件を「increases by」にして、「Value」を「1」、「From last measurement」にして「Each time」に設定します。
こうすることで、前回検出時から値が増えたらトリガーを実行されるようになります。

トレンドの情報は3時間に1回取得するので、トリガーの検出も3時間に1回のペースで行われます。

検証

今日の朝、新しく記事がトレンドに上がったようでTeamsに通知が飛んできました。

ヘッドウォータース

Discussion