Snowflakeに中森明菜データレイク(通称NADL)を構築しました

実際に中森明菜を聴きながら執筆しました
2023.02.07

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

大阪オフィスの玉井です。

題名の通り、極めて先進的な(?)データレイクの構築に成功したので、当記事にて詳細をお伝え致します。

概要説明

NADL(Nakamori Akina Data Lake)とは?

日本が誇る歌姫である中森明菜氏の(ほぼ)全ての楽曲に関するデータを保存したデータレイクです。

SnowflakeはDWHのイメージが強いですが、データレイクを構築・運用する機能を備えています。今回、Snowflakeのデータレイク周りの機能を活用して、NADLを構築しました。

構築作業の流れ

  1. SpotifyのAPIから中森明菜の楽曲データを取得する
  2. 取得した楽曲データをGoogle Cloud Storage(GCS)にアップロードする
  3. 上記のGCS(のバケット)をSnowflakeの外部ステージとして連携する
  4. Snowflakeからデータにアクセスしていつでも中森明菜の楽曲を分析できる状態にする

構築作業を実施した環境など

  • Snowflake 7.3.2
  • Hex

私はHex上でスクリプトを実行しましたが、Pythonが動く環境であれば、必ずHexである必要はありません。

作業経緯

Spotify APIから楽曲データを取得〜GCSにアップロード

当方、Python素人につき、コードの品質は甘めで見てください。

準備など

まず、必要なライブラリ等を準備します。

!pip install spotipy --upgrade
import json
import spotipy
import os
import pprint
from spotipy.oauth2 import SpotifyClientCredentials
from google.cloud import storage
from google.cloud.storage import Bucket

SpotifyのAPIを叩くための情報を準備します。client_idclient_secretはsecret値としてHex上にセットしています。

client_credentials_manager = spotipy.oauth2.SpotifyClientCredentials(client_id, client_secret)
spotify = spotipy.Spotify(client_credentials_manager=client_credentials_manager, requests_timeout=10, retries=10)
#中森明菜のSpotify ID
uri = 'spotify:artist:7140bcJ0ZySe314nUfOo1J'

GCSへデータをアップロードするための準備をします。sa_jsonはGCSへアクセスするためのサービスアカウントの鍵ファイルです(secret値としてHex上にセットしています)。バケット作成周りの処理はすごい適当&冗長なので半目で見てください。

bucket_name = 'hex-demo-tamai2'
service_account_info = json.loads(sa_json)
client2 = storage.Client.from_service_account_info(service_account_info)

if Bucket(client2, bucket_name).exists():
    bucket = storage.Bucket(client2)
    bucket.name = bucket_name
else:
    bucket = storage.Bucket(client2)
    bucket.name = bucket_name
    client2.create_bucket(bucket)

楽曲データのアップロード

ここからは実際にデータを取得してアップロードしていきます。

この処理は、まず中森明菜の楽曲を全て取得する必要がありますが、Spotify APIを使ってミュージシャンの全曲を取得する方法については、ググれば死ぬほど出てきます。今回は下記をめちゃくちゃ参考にしました。

まずは中森明菜のアルバム情報を全て取得します(上記の記事ほとんどそのまま)。

#ここはSpotipy公式のサンプルコードと同じ
results = spotify.artist_albums(uri, album_type='album')
albums = results['items']
while results['next']:
    results = spotify.next(results)
    albums.extend(results['items'])

#アルバム名とURIを別々のリストに保存する
artist_album_names = []
artist_album_uris = []
for i in range(len(albums)):
    artist_album_names.append(albums[i]['name'])
    artist_album_uris.append(albums[i]['uri'])

アルバムのリストを取得できたので、このリストを使って「アルバムの曲リストを取得」→「1曲づつ楽曲データ取得〜アップロード」をアルバム毎に繰り返していきます。

def album_songs(uri):
    album = uri 
    spotify_albums[album] = {}
    #空のリストのキーと値を作成する。
    spotify_albums[album]['album'] = [] 
    spotify_albums[album]['track_number'] = []
    spotify_albums[album]['id'] = []
    spotify_albums[album]['name'] = []
    spotify_albums[album]['uri'] = []
    #アルバムの曲データを取得する
    tracks = spotify.album_tracks(album) 
    for n in range(len(tracks['items'])):
        # 楽曲データ取得
        audio_analysis = spotify.audio_analysis(tracks['items'][n]['id'])
        # 楽曲データ出力
        path = tracks['items'][n]['id']+ ".json"
        with open(path, 'w') as audio_file:
            json.dump(audio_analysis, audio_file)
        # GCSへアップロード
        source_file_name = tracks['items'][n]['id'] + ".json"
        destination_blob_name = artist_album_names[album_count] + "/" + str(tracks['items'][n]['track_number']) + "_" + repr(tracks['items'][n]['name'])[1:-1] + ".json"
        blob = bucket.blob(destination_blob_name)
        blob.upload_from_filename(source_file_name)
        # ファイル削除
        os.remove(source_file_name)


spotify_albums = {}
album_count = 0
for i in artist_album_uris:
    album_songs(i)
    album_count+=1 #すべてのトラックが追加されると、アルバムカウントを更新する

「一旦jsonファイルとして出力」→「そのファイルをGCSにアップロード」→「アップロード済のファイルを削除」という流れを繰り返しています。正直、ここも処理としてかなりヘボいと思いますので、半目で見てください(もっとカッコいいやり方があるはず)。

これらの処理が完了すると、GCSが下記のようになります。ゲラッゲラッゲラッゲラッバーニンハァァァァァァァァッッッッッ(超絶ビブラート)。

GCSをSnowflakeの外部ステージとして設定する

実質、この時点で「(Google Cloudに)NADLを構築した」と言えるっちゃ言えますが、それだとアレなので、このバケットをSnowflakeの外部ステージにします。

Snowflake側の作業

Snowflake側で外部ステージを作成します。

create stage gcs_na_stage
  url = 'gcs://hex-demo-tamai2'
  storage_integration = nadl
  file_format = json_file_format

作成後は、外部ステージの詳細を確認することができますが、ここでは下記画像の赤枠の情報をメモっておきます。

Google Cloud側の作業

次に、Snowflakeがバケットにアクセスするためのロールを作成します。

IAMの管理画面にいき…

これらの権限をもつロールを作成します。

そして、GCSに先ほどのロールを割り当てます。

確認

Snowflakeの外部ステージの設定が完了すれば、Snowflakeからバケットの情報を照会することができます。

SnowflakeからGCSの楽曲データに直接クエリする(外部テーブル)

外部ステージとして接続すれば、(権限によりますが)バケット内のデータを自由にロードできるため、テーブル作成→ステージのファイルからロードとするだけで、バケット内のデータをSQLで分析することができます。

しかし、今回はSnowflakeの機能「外部テーブル」を使い、データロードなしで、バケット内のデータをSQLでアクセスしたいと思います(GCS内のファイルに対して直接SQLを実行する)。

今回は比較的新しいアルバムである「FIXER」(ジャケ写がちょっと怖い)の楽曲に対してSQLを実行してみます。

というわけで、外部テーブルを作成します。

create external table ext_na_table
  (
    file_name varchar as(metadata$filename)
  ) 
  with location = @gcs_na_stage/FIXER/
  auto_refresh = false 
  file_format = (type = json)

上記により、FIXERに収録されている楽曲データに対してクエリできるようになりました。例えば、紅白歌合戦で披露されたことでも有名な「Rojo -Tierra-」を、曲のセクション別に分析したい場合は、下記のクエリを実行することで可能となります。

select
  value:start::float as "start"
  ,value:duration::float as "duration"
  ,value:confidence::float as "confidence"
  ,value:loudness::float as "loudness"
  ,value:tempo::float as "tempo"
  ,value:tempo_confidence::float as "tempo_confidence"
  ,value:key::integer::float as "key"
  ,value:key_confidence::float as "key_confidence"
  ,value:mode::integer as "mode"
  ,value:mode_confidence::float as "mode_confidence"
  ,value:time_signature::integer as "time_signature"
  ,value:time_signature_confidence::float as "time_signature_confidence"
from
  (
    select
      et.value as raw_status
    from
      ext_na_table et
    where
      FILE_NAME = 'FIXER/2_Rojo -Tierra-.json'
  )
  ,lateral flatten(
    input = > raw_status :sections
  )

データをテーブルにロードすることなく、行列として構造化された形でデータを照会することができました。ちなみにRojo -Tierra-の作曲は浅倉大介氏です。

上記のようにSQLを工夫することで、Snowflake上で中森明菜の(ほぼ)全て楽曲の分析が可能となりました。

課題など

NADLですが、当然完璧ではなく、まだまだ改善点等があります。

「1/2の神話」問題

中森明菜の有名な曲に「1/2の神話」がありますが、まさかこの曲名のスラッシュに悩まされる日が来るとは思っていませんでした。

御存知の通り、GCSは、/でフォルダ分けみたいにファイルを整理することができます(実際にフォルダがあるわけではない)。

NADLでは、曲名をファイル名にしているのですが、1/2/のせいで、GCS上はファイルが下記のようになってしまいます。

今回のスクリプトではrepr関数を使って、ファイル名をraw文字列として扱ってみたりしたのですが、効果はありませんでした。たぶん普通にやり方があると思うのですが、私のpython力では無理でした。いい加減にしてええええェェェェェェェェェ(超絶ビブラート)。

結局Spotify問題

今回のそもそものデータ元はSpotifyです。なので、当然ながらSpotifyに無い楽曲は分析できません。

今回のNADLも、私の大好きなアルバム「DIVA」は未収録となります。なぜならSpotifyで配信されてないからです。昔の「如何にもな中森明菜」とは一味違う、EDMっぽい要素も入った名曲揃いのアルバムなので、非常に残念であります。冗談じゃないイイィィィィィィィィッッッ(凄絶ビブラート)。

おわりに

中森明菜は最高のアーティストだということがわかりましたね。

Snowflakeの外部ステージ&外部テーブルで、データレイク運用が捗ると思うので、ぜひご検討ください。