❄️

SnowflakeとAWS API Gatewayを統合してデータ参照用のREST APIを作成する

2022/03/26に公開

はじめに

SnowflakeをDWHとして使い始めると、それをさまざまな形で活用したくなります。BIツールなどで参照する方法もありますが、最近だとWebアプリやネイティブアプリからREST APIを通してアクセスしてデータを参照したくなることも多いでしょう。

SnowflakeはクラウドDWHとして優れている一方、アプリケーションのインテグレーションとしては、AWS、特にLambdaやAPI Gatewayを使ってサーバレスで実装したい、というケースが多いのではないかと思います。

また、「いろいろなクエリをクイックにREST APIとして実装してアプリに提供したい」という場合、「如何にライトに実装できるか、サクサクとAPIを量産できるか」という要素も欠かせません。

ということで、本エントリでは「極力シンプルで量産しやすい形で、Snowflakeへの参照系クエリをAWSのAPI Gatewayと統合してREST APIとして提供する方法」を試してみます。

イメージとしては、「SELECTクエリの結果をJSONで返すREST APIを簡単に作る方法を見つけたい」といった感じで、具体的には、以下のようにAPI GatewayからLambda関数を呼び出して、Snowflakeのクエリ結果をJSONで表示できることがゴールイメージになります。

なお、本エントリでは、企業内の業務で利用されるようなREST APIを想定しており、コンシューマサービスなどで使われる高トラフィックなREST APIなどは想定していません。

Snowflakeにクエリを発行するLambda Functionsを作成する

まず、Snowflakeにクエリを発行して結果を返却するLambda Functionsを作成します。
今回は、以下の条件を満たすLambda Functionsを作成するものとします。

  • API GatewayのLambda統合に適合したパラメータ/レスポンス処理する
  • SnowflakeのクエリにAPI Gatewayから渡されたパラメータをbindできる
  • SnowflakeのPythonコネクタや共通処理をLayerに分離する
  • SnowflakeのクレデンシャルはSystems Manager(SSM)のパラメータストアに保存する

Lambda関数全体

今回作成したLambda関数とSnowflake共通モジュール(Layer用)は以下の通りです。

以下、個々の実装について説明をしていきます。

API GatewayのLambda統合に適したパラメータ処理をする

API GatewayのLambda統合を利用すると、クエリパラメータとパスパラメータが event のメンバーとして渡されます。

「クエリパラメータ」はGETパラメータとして渡される ?foo=bar&foo2=buz のようなパラメータで、「パスパラメータ」はAPIのURLに埋め込まれる/customers/17 のようなパラメータです。

以下のメソッドは event を受け取って、クエリパラメータとパスパラメータを単一の dict型にまとめて返却します。

def get_api_params(event):
    if event is None:
        return {}
    params = {}
    if event.get('queryStringParameters'):
        params.update(event.get('queryStringParameters'))
    if event.get('pathParameters'):
        params.update(event.get('pathParameters'))
    return params

Lambda関数のハンドラから呼び出して、パラメータを dict として扱えるように変換します。

def lambda_handler(event=None, context=None):
    # Get API parameters from query string params and path params
    params = get_api_params(event)

API GatewayのLambda統合に適したレスポンス処理をする

Lambda統合を使うLambda関数は所定のフォーマットでレスポンスを返さなければなりません。

まず、Snowflakeのクエリの実行結果を、カラム名をキーとして持つdict型の配列として返却されるように変換します。

def query(ctx, sql):
    cs = ctx.cursor()

    rs = []
    try:
        cs.execute(sql)
        cols = []
        for c in cs.description:
            cols.append(c[0].lower())
        for r in cs.fetchall():
            d = {}
            for a in zip(cols, r):
                d[a[0]] = a[1]
            rs.append(d)
    except snowflake.connector.errors.ProgrammingError as e:
        raise Exception('{0} ({1})'.format(e, sql))
    finally:
        cs.close()

    return rs

以下のようにクエリを実行すると、結果 rs は「1レコードをひとつのdictとして、複数レコードを持つdictの配列」として返却されます。

        rs = sflib.query(ctx, q)

具体的には、以下のようなデータ構造となります。

[
  {'col1': 'val1', 'col2': 'val2', 'col3': 'val3'},
  {'col1': 'val11', 'col2': 'val12', 'col3': 'val13'}
]

そして、そのdictの配列をJSONに変換し、ステータスコードとともに返却します。

    return {
        'statusCode': 200,
        'headers': {
        },
        'body': json.dumps(rs, cls=DecimalEncoder),
        'isBase64Encoded': False
    }

なお、Snowflakeの浮動小数点の値はPythonではDecimal型となりそのままではJSONに変換できませんので、適切なエンコーダーを定義して変換します。

class DecimalEncoder(json.JSONEncoder):
    def default(self, o):
        if isinstance(o, decimal.Decimal):
            return str(o)
        return super(DecimalEncoder, self).default(o)

Snowflakeのクエリに任意のパラメータをbindする

外部からパスパラメータやクエリパラメータとして値を受け取ることができるようになったので、この値をSnowflakeのクエリに設定して期待する結果を取得する必要があります。そのためには、クエリの条件の一部を受け取ったパラメータで置換できるようにする必要があります。

具体的には、例えばSnowflakeのクエリに :foo: という文字列があった場合に、パラメータ(Pythonのdict)から foo というキーを探し、該当する値で :foo: という文字列を置換します。

今回は、Snowflakeのサンプルデータベースにある SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.CUSTOMER というテーブルから、指定した C_CUSTKEY に該当するレコードを取得するクエリを実行します。

その際、 :custkey: というパラメータを埋め込めるようにクエリを作成して bind_parms() というメソッド(後述)でパラメータのバインドを行うことで、任意のパラメータを渡して結果を取得することができるようになります。

    # Snowflake query with bind parameters.
    query = '''SELECT
      C_CUSTKEY
      ,C_NAME
      ,C_ADDRESS
      ,C_NATIONKEY
      ,C_PHONE
      ,C_ACCTBAL
      ,C_MKTSEGMENT
      ,C_COMMENT
    FROM
      SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.CUSTOMER
    WHERE
      C_CUSTKEY = :custkey:
'''

    # Bind API params to the Snowflake query
    q = sflib.bind_params(query, params)

bind_params() メソッドは、「バインド変数を含むクエリ文字列」と「パラメータのdict」を受け取り、「パラメータをバインド(置換)したクエリ文字列」を返します。

def bind_params(query, params):
    if params is None:
        return query
    
    for p in params:
        if isinstance(params[p], int) or isinstance(params[p], float):
            v = "{0}".format(params[p])
        else:
            v = "'{0}'".format(params[p].replace("'", "").replace(';', ''))
        query = query.replace(':{0}:'.format(p), v)
    return query

Snowflakeの共通処理をLambda Layersとして分離する

基本的には、SnowflakeのPythonコネクタなども含めて、ひとつのLambda関数のパッケージに入れて動作させることは可能なのですが、複数のAPIのために複数のLambda関数を作成し始めると、サイズの大きいZipファイルを毎回作成することになって若干不便ですので、複数のLambda関数から共通して使いたいものをLayersとして分離しようと思います。

以下のように python ディレクトリを作って、そこにSnowflakeのPythonコネクタとともにSnowflake用の共通モジュールを配置して、Layer用のZipファイルを作成します。

mkdir -p ./python
pip install --target ./python snowflake-connector-python

mkdir -p ./python/sflib
cp sflib.py ./python/sflib

zip -r lambda_layer.zip python

Lambda関数からレイヤーに含まれるモジュールを利用する場合には、以下のようにimportします。

from sflib import sflib

Lambdaにおけるレイヤーの詳細については、以下の記事を参考にしてください。

SnowflakeのクレデンシャルをSystems Manager(SSM)のパラメータストアから取得する

SnowflakeのクレデンシャルをSystems Manager(SSM)のパラメータストアに保存し、Lambda関数ではそこから取り出して使うようにします。

以下は、boto3を使って指定したキーに設定された値をパラメータストアから取得するメソッドです。

ssm = None

def get_ssm_parameter(name):
    global ssm
    if ssm is None:
        ssm = boto3.client('ssm')
    return ssm.get_parameter(Name=name,
                             WithDecryption=True)['Parameter']['Value']

今回は snowflake_* という4つのクレデンシャルをパラメータストアに保存していますので、それを取得してSnowflakeへの接続を行います。

    u = get_ssm_parameter('snowflake_username')
    p = get_ssm_parameter('snowflake_password')
    a = get_ssm_parameter('snowflake_account')
    d = get_ssm_parameter('snowflake_database')

    ctx = sflib.connect(u, p, a, d)

Lambda関数とLayerをデプロイする

Lambda関数のデプロイ

まず、Lambda関数のデプロイ用のZipファイルを作成します。

zip -r lambda_function.zip lambda_function.py

Zipファイルができたら、初回は create-function でデプロイします。

aws lambda create-function --function-name hello \
    --role arn:aws:iam::xxxxxxxxxxxx:role/xxxxxxxxxxxxxxr \
    --runtime python3.8 --handler lambda_function.lambda_handler \
    --zip-file fileb://lambda_function.zip

2回目以降の更新の場合は update-function-code でデプロイします。

aws lambda update-function-code --function-name hello \
    --zip-file fileb://lambda_function.zip

Layerのデプロイ

上記で作成したLayerのZipファイルは、以下のようにpublishします。

aws lambda publish-layer-version --layer-name layer_sflib \
    --zip-file fileb://lambda_layer.zip \
    --compatible-runtimes python3.8

Layerの関連付け

最後に、Lambda関数とLayerを関連付けして、Lambda関数からLayerを使えるように設定します。

Lambda関数にLayerを関連付けるには update-function-configuration を実行しますが、その際にLayerのバージョンARNが必要になりますので、 list-layer-versions でLayerのバージョン情報を取得しておき、そちらを指定するようにします。

$ aws lambda list-layer-versions --layer-name layer_sflib | grep LayerVersionArn | sort -r | head -1
            "LayerVersionArn": "arn:aws:lambda:ap-northeast-1:xxxxxxxxxxxx:layer:layer_sflib:9",
$ aws lambda update-function-configuration --function-name hello \
    --layers arn:aws:lambda:ap-northeast-1:xxxxxxxxxxxx:layer:layer_sflib:9 \
    --timeout 30

Lambda関数をAPI Gatewayと統合する

Lambda統合を使ってAPI GatewayとLambda関数を結合する

API Gatewayの設定の詳細は割愛しますが、API GatewayのLambda統合を使って、Lambda関数とAPIを結合します。

今回は custkey というパスパラメータを持つ /customers/{custkey} というリソースを定義し、そのGETメソッドにLambda関数を結合しました。

そして、テストを実行する際に {custkey}16 と入力すると、Snowflakeで

    SELECT
      C_CUSTKEY
      ,C_NAME
      ,C_ADDRESS
      ,C_NATIONKEY
      ,C_PHONE
      ,C_ACCTBAL
      ,C_MKTSEGMENT
      ,C_COMMENT
    FROM
      SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.CUSTOMER
    WHERE
      C_CUSTKEY = 16

というクエリが実行され、その結果がJSON形式で返却されます。

[
  {
    "c_custkey": 16,
    "c_name": "Customer#000000016",
    "c_address": "cYiaeMLZSMAOQ2 d0W,",
    "c_nationkey": 10,
    "c_phone": "20-781-609-3107",
    "c_acctbal": "4681.03",
    "c_mktsegment": "FURNITURE",
    "c_comment": "kly silent courts. thinly regular theodolites sleep fluffily after "
  }
]

まとめ

以上、簡単ではありましたが、Snowflakeへの参照クエリの結果をREST APIとして提供する方法を検討してきました。

実際に利用するにはもう少し検討する項目が増えるかと思いますが(SQLインジェクション対策やキャッシングなど)、わりと簡単にSnowflakeをREST API化することができるのではないかと思います。

もう少し検討を重ねて、Snowflakeをより上手く活用していければと思っています。

では。

Discussion