n8nでRSSを取得する
n8nとは
(エイトエヌ)は、ノーコード/ローコードで使えるワークフロー自動化ツールです。Zapier や Make (旧Integromat) に似たツールですが、自己ホスティングが可能で拡張性が高いのが特徴です。
n8n
はnode
でセルフホスティングできますし、カスタムノード(独自の処理)もTypescript
で開発しnpm
で公開するなども可能です。
この記事について
この記事では、n8n
でRSSを購読し、処理済み記事を除外して最新記事のみを取得するフローを構築します。記事の一意なリンクを永続的に記録し、すでに処理済みのリンクは除外。記録は6ヶ月経過で自動削除され、$getWorkflowStaticData()
を利用することで外部サービスなしに実現します。Webhook対応や多RSS対応も紹介しています。
対象者
- なんであれ、
n8n
を利用できる状態である人 - Typescriptをある程度理解できる事
-
n8n
でフローを作成するヒントが欲しい人
さて...
n8n
でRSSとそれに含まれる記事のリストを取得することは簡単です
なぜならRSS Read
というノードがすでに用意されているからです
しかし、これをそのままフローに組み込むには難があります。
処理済みの記事を無視したい
つまり既知の古い記事を省き、最新の記事だけ取得したい
これはRSSを処理する際にはたいてい必要な要件でしょう。
なぜなら、最新記事の通知を作成するとしても、通知済みの記事を何度も通知したくないですし、記事をAIで処理したりする場合にはコストに影響するでしょう。
「最新」を検知する方法
方針は以下の2パターンではないかと思います
- 前回の取得時刻より新しい公開日時 (pubDate) を持つ記事を抽出する方法。
- 前回の取得時刻より新しい公開日時 (pubDate) を持つ記事を抽出する方法。
- 既知ID(またはURL)との照合
- 記事の一意なID・URL・タイトルなどを保存しておき、それに含まれていないものだけを抽出する方法。
どちらも使える方針です
以下に長所と短所を考察します
前回の取得時刻より新しい公開日時 (pubDate) を持つ記事を抽出する方法。
直感的でシンプルです。
- pubDateが不正確、不掲載、使いまわし、であるRSSには利用できない
既知ID(またはURL)との照合
pubDateに依存せず、精度が高い
- 既知のIDを記録することになるが、肥大化する
- guidやlinkが不掲載、毎回変動、というRSSには利用できない
タイムベースにするか、IDベースにするか、また対象となるRSSの品質によって選択する必要がある
n8nでの値の永続的保持
どちらにせよ、取得時刻
か既知ID
を永続的に記憶しなければなりません
方針はn8n
の内部か外部かの2パターンではないかと思います
-
n8n
の内部-
$getWorkflowStaticData()
を使う -
n8n単体
で完結する
-
-
n8n
の外部- DBaaSなどを利用する
- Amazon系、Google系、Microsoft系、MongoDB、Supabase、etc...ノードが用意されているものもある
- 他のシステムからの参照もできる
- n8n以外のコスト(利用料や管理、メンテなど)が発生する
- DBaaSなどを利用する
この記事の基本方針
記事のID(guid
or link
)にて、既知の記事リストを記憶し、最新の記事のみをフィルタする。
既知の記事リストの記憶には$getWorkflowStaticData()
を使う
想定している後工程が記事のスクレイピングなので
link
を採用するのは妥当であるし、よってLink
の行儀がわるいRSSは対象から外すだろう。利用サービスを増やすメリットが無くn8n
単体で利用できるほうが見通しが良い
pubDate
を採用した事例
今回は採用しないが
$getWorkflowStaticData()
について学ぶ
ひとまず原本を読み解きましょう
- 実験的機能である
- テストでは機能しない
- グローバルとノードの2種類がある
- JSON的構造を読み、書き、削除できる
実装してみて厄介なのが「テストでは機能しない」である。
ニュアンスとしては、テストもーどでは「テスト実行の範囲では記憶するが、実行が完了すると忘れる」
$getWorkflowStaticData()
の基本的な構文
これらをCode
ノードで利用する
// ワークフローの永続的データを取得する
const workflowStaticData = $getWorkflowStaticData('node')
// hogeの値を取得する。初期はundefinedであろう
const hoge = workflowStaticData.hoge
// hogeの値を記録する。
workflowStaticData.hoge = 'hoga'
// hogeの値を削除する
delete workflowStaticData.hoge;
global
かnode
かは、値を他のフローで利用するかどうかで判断する。
-
global
にするなら、他のフローから意図せぬ上書きをしてしまわないよう命名を考慮したい。 -
node
にしたなら、利用したノードで閉じるのでシンプルであるが、読み、書き、削除を単一ノードで実現する必要がある。
実装
フロー
まず、RSSのURLを一つ渡すとRSSを読み込み、item
(記事)ごとにループするフローを作ってみます
Code (URL)
とりあえず、RSSのURLをフローに流すようにします
これはYahoo!ニュースの科学カテゴリのRSSです
return {
url: "https://news.yahoo.co.jp/rss/topics/science.xml"
}
RSS Read
直前のCode (URL)
で設定したurl
を利用するようにします
Code (Static Data)
本題の、一つの記事についてそれが既知であるかを判定する部分です
return $input.all().map((a) => {
const nodeStaticData = $getWorkflowStaticData('node')
let items = nodeStaticData.items
if (!items) {
items = []
}
let hit = true
if (!items.some((item) => {
return item === a.json.link
})) {
items.push(a.json.link)
nodeStaticData.items = items
hit = false
}
return {
json: {
hit,
item: a.json,
},
}
})
-
node
をスコープにしてStatic Dataを取得します -
items
を取得します- これは記事の
link
の配列であるとします
- これは記事の
-
items
がundefinedである可能性があるが配列としてあつかうので、undefined
の場合は[]
をセットします -
items
の中に、判定対象のlink
と合致しない場合-
items
にlink
を追加し - Static Dataに保存します
- さらに、既存かどうかを
hit
にセット
-
- 最終的に、
hit
を追加して記事を返します
Filter
hit
がfalse
であるもののみをFilterします
テスト
テストは厄介です。
なぜなら$getWorkflowStaticData()
は「テストでは機能しない」からです。
テストを何回行っても、全ての記事がhit === false
となります。
一旦、それを前提に「本番の運用、つまり定期実行やWebhookにすれば動くのかも」という状態になったならば、次に進みます。
Webhookで利用できるようにする
Webhookを配置する
今回は「Respond to Webhook」(Webhookをコールした結果のレスポンスについてのノード)も併用するので、「Respond」は「Using Respond to Webhook
node」を選択します。
Respond to Webhook
Webhookの「Respond」に「Using Respond to Webhook
node」を選択したので、レスポンスを返す場所(多くはフローの最後でしょう)に「Respond to Webhook」を配置します。
結果のすべてを返す「All Incoming Items」を選択します
Code (Query)
Webhookで対象のRSSを指定できるようにします。
以下はurl
というクエリパラメタを利用する場合です
最初に作ったCode (Query)
ノードの形式のアウトプットを踏襲します
const url = $input.first().json.query.url
return {
url
}
テスト
Webhookのテストの手順は
- Webhookノードの「Webhook URLs」で
Test URL
を取得する - フローで「Execute workflow」をクリックし、Webhookを待機状態にする
- ブラウザの別ウィンドウでWebhookをコールする
- 例えば以下のようなフォーマットになるだろう
{Your Test URLs}?url=https://news.yahoo.co.jp/rss/media/kbs/all.xml
-
n8n
のウィンドウでフローが起動している- 個々のノードの振る舞いを確認できます
- 呼び出した側では「Respond to Webhook」の結果が表示される
アクティブにする
問題が無ければアクティブ(本番運用)にします
「Webhook」ノードの「Wrbhook URLs」の「Production URL」が本番URLです
このモードで$getWorkflowStaticData()
による永続的データ保存つまり「最新の記事しか表示しない」が機能するようになり、新しい記事が公開されない限り、結果は空になります
更なる実用化
「とあるRSSの最新記事を取得する」というニーズも充分ありますが、「RSSのリストを定期的に取得し、最新の記事を取得する」というニーズもあるでしょう。
例えば、チェックしたいサイトのRSSリストを登録して、新しい記事をニュース的に通知するなどです
ここまでで、単一のRSSの最新記事を得るフローが完成していますから、それをサブフローとしてループでコールすれば実現できます。
つまり、フローを繰り返しコールする親フローを作成します
Execute Workflow
↓ Execute Workflow
ノードで、先ほどまでに作ったFlowを指定しています
lists
lists にはチェックしたいURLをリストしています
return [
{
json: {
url: 'https://www.autoby.jp/_tags/CT125/rss20.xml'
}
},
{
json: {
url: 'https://news.yahoo.co.jp/rss/topics/entertainment.xml'
}
},
{
json: {
url: "https://news.yahoo.co.jp/rss/media/kbs/all.xml"
}
}
];
そして
これらのフローにどのようなノードを追加し、何を実現するか
- link をスクレイピングして要約、なにがしらのサービスに投稿する
- AIに読み上げさせて、Podcastとして配信する
- etc
課題
$getWorkflowStaticData()
の注意点でもありますが、ひたすら既知IDを追加し続けるので、このままでは「経年でデータが膨大になる可能性がある」
追加した日次も記憶データに追加し、6ヶ月以前の記録は削除する、などの処理を加えるべきである。
return $input.all().map((a) => {
const nodeStaticData = $getWorkflowStaticData("node")
let items = nodeStaticData.items
if (!items) {
items = []
}
console.log(9, items.length)
let hit = true
if (!items.some((item) => {
return item.link === a.json.link
})) {
const now = Date.now()
// 記録時のタイムスタンプも記憶
items.push({
link: a.json.link,
timestamp: now
});
// nodeStaticData.items から6ヶ月以上前の記録を削除
items = items.filter((entry) => {
return now - entry.timestamp < 1000 * 60 * 60 * 24 * 180
})
nodeStaticData.items = items
hit = false
}
return {
json: {
hit,
item: a.json,
},
}
})
Discussion