これは Snowflake Advent Calendar 2021 の14日目の記事です。
現在のチームでは Snowflake を使っており、Snowflake 上で AWS の RDS 上で動いている MySQL を参照して集計をしたくなるケースがありました。そこで、DMS(Database Migration Service)という AWS のサービスを使って、MySQL のデータを継続的にレプリケートした実例をご紹介します。
また、この記事は社内イベントで登壇した際の資料をもとに作成しています。
DMS とは
概要
DMS(Database Migration Service)とは、AWS のデータのマイグレーションサービスで、DB → DB や DB → S3 など、複数かつ異なるリソース間でデータをマイグレーションすることが可能です。
公式サイトより
移行元のデータストアを「ソース」と呼び、移行対象のデータストアを「ターゲット」と呼びます。また、それぞれにアクセスするためのエンドポイントを「ソースエンドポイント」「ターゲットエンドポイント」と呼びます。
DMS では専用のインスタンスファミリーが用意されており、それを自前で構築してそのインスタンスの上でタスクを実行するという仕組みです。
加えて、マイグレーションの方法としては、「既存データの移行のみ」「既存データの移行と継続的なレプリケート」「継続的なレプリケートのみ」の3種類を選択することができます。
要は、DB のある瞬間のデータをダンプするだけなのか、ダンプして変更されたデータも継続的に取り込みたいのか、変更だけを継続的に取り込みたいのかという選択肢がある、ということです。
S3 への抽出のログフォーマット
S3 へデータを抽出する際のフォーマットは csv と parquet の2種類です。
以下のような MySQL 上でのテーブルとテストデータを用いてログフォーマットの説明をします。このテーブルのデータで「既存データの移行と継続的なレプリケート」を例にとってデータのフォーマットを見てみましょう。
create table user ( id varchar(255), name varchar(255) ); -- 初期データ insert into user (id, name) VALUES (1, 'dragontaro'), (2, 'hoge'); -- ===== DMS タスクを作成 ===== -- レプリケートデータ insert into user (id, name) values (4, 'fuga'); update user set name = 'fugafuga' where id = 4;
id: 1, 2 のレコードは初期データとして、DMS のタスク実行時に一気に抽出されます。これが「既存データの移行」です。そして、 DMS のレプリケートタスクを起動後に id: 4 のデータの insert と update を実行したとします。
csv の初回ロード
ヘッダーなしの csv です。id, name とテーブルのカラム順にコンマ区切り(変更可)で抽出されます。
以降は、この初回ロード時に抽出されるデータのことを「初回スナップショット」と呼ぶことにします。
1,dragontaro 2,hoge
csv のレプリケート時
同じくヘッダーなしの csv で、あるオペレーションが実行された直後のレコードが抽出されます。ただし、最初にどんなオペレーションだったのかが記載されていることが先ほどとは異なります。
以降は、このレプリケート時のオペレーションが実行された直後のデータのことを、単に「レプリケートのスナップショット」と呼ぶことにします。
I: Insert, U: Update, D: Delete
を示しています。
I,4,fuga U,4,fugafuga
parquet の初回ロード
id, name とテーブルのデータが、parquet なのでもちろんカラムつきで抽出されます。json にパースして表示すると以下のようになります。
[ { "id": "1", "name": "dragontaro" }, { "id": "2", "name": "hoge" } ]
parquet のレプリケート
オペレーションが Op というカラムで保存されていること以外はこれまでと変わりはありません。
[ { "Op": "I", "id": "4", "name": "fuga" }, { "Op": "U", "id": "4", "name": "fugafuga" } ]
MySQL と Snowflake を同期するアーキテクチャ
概要
ソースとして Amazon RDS を、ターゲットとして S3 を選択しました。また、Snowflake への継続的な取り込みは SnowPipe を利用しました。その後、Snowflake 内で継続的に取り込んでいるレプリケートのスナップショットから View で元のテーブルを構築するようになっています。
より詳細には、初回ロード時のスナップショットのみ初回ロードの専用のクエリを叩き、それ以降は初回ロードのファイル以外のファイルが Snwoflake に取り込まれるようなアーキテクチャです。
ソースエンドポイントの構築
ソースエンドポイントとして Amazon RDS への接続を設定します。 接続のための認証情報を Secret Manager 経由で渡すことと、抽出対象のテーブルを選択することが、やるべきことです。
抽出時にデータの変形などができるようですが、今回は必要ないので特に何もしていません。
ターゲットエンドポイントの構築
ターゲットエンドポイントとして S3 への接続を設定します。バケットの準備、DMS インスタンスからの S3 への書き込み権限の付与、ファイルフォーマットの指定が必要です。ファイルフォーマットとしては csv を選択しました。(後述しますが、 csv を選択したのは愚かでした。。)
その他、書き込みの際に date でディレクトリを分けるかどうかなどを設定できます。
タスクの構築
DMS 専用のインスタンスを構築して、その上で上記のソースエンドポイントとターゲットエンドポイントを指定してタスクを実行します。タスクの実行頻度はデフォルトで5分で、今回は1分としました。
もちろんですが、DMS は RDS にアクセスできるネットワークを選択すること(VPC や Security Group の設定など)が必須です。
Snowflake にスナップショットを取り込む
スナップショットの保管テーブル
Snowflake 上にスナップショットを取り込むためにまずはテーブルに作成します。
のちに Snowflake で View を構築する際にオペレーションの種類とクエリの順序が大事になってくるので、オペレーションの種類とスナップショットの作成日時、スナップショットファイルの行数も保管するようにしました。同じ id のレコードの場合は、より新しい日付のスナップショットが現在の状態で、日付が同じ場合は行数がより大きい方のレコードを最新と判断できます。
クエリは以下です。
create table raw_user ( id number, name varchar(255), _type varchar(255), -- オペレーションの種類 _loaded_at timestamp_ntz, -- スナップショットの作成日時 _row_num number -- スナップショットのファイルの中での行数 );
初回スナップショット
初回のスナップショットは、オペレーションの種類が入っていないため、type: L という文字列をハードコーディングで入れて他との整合性を合わています。(null でもいいですが、気持ち悪いのでこうしています。)
また、_loaded_at は適当に現在時刻より前の時刻にしています。稼働させてから途中で初回スナップショットをもう一度取り込む想定はしていないので問題ありません。
_row_num は metadata$file_row_number
からファイル内の行数を取得しています。
copy into raw_user from ( select $1 as id, $2 as name, 'L' as _type, sysdate() - interval '1 day' as _loaded_at, metadata$file_row_number as _row_num from @xxx_stage/user/LOADXXXXX.csv );
レプリケートのスナップショット
続いてレプリケートのスナップショットの取り込みを行う SnowPipe を構築します。_type は csv の先頭に入っていること、スナップショットの作成時刻はファイル名の日付をパースしていることに留意してください。
また、pattern 句を使って初回スナップショットのファイルを除外しています。
create pipe raw_user_pipe as copy into raw_user from ( select $2 as id, $3 as name, $1 as _type, to_timestamp_ntz( substring(metadata$filename, 28, 18), 'YYYYMMDD-HH24MISSFF' ) as _loaded_at, metadata$file_row_number as _row_num from @xxx_stage/user/ ) pattern = '^(?!.*LOAD.*\.csv).*$';
View で元のテーブルを再現する
先ほど取り入れたスナップショットのデータから元の MySQL のテーブルを再現していきます。上述の通り、同じ id のレコードに関しては、スナップショットの作成日時が最新のものを優先し、その日時が同じ場合は行番号が大きいものを優先する必要があります。そして最後に削除済みのものを排除しています。
これをウインドウ関数を用いると以下のように書けます。
create view mysql_user as with latest_record as ( select * from raw_user -- 最新のファイルかつ最新のレコードを現在のスナップショットとみなす qualify row_number() over ( partition by ID order by _LOADED_AT desc, _ROW_NUM desc ) = 1 ) select * from latest_record where _type != 'D';
DMS の課題点と改善ポイント
csv
csv はカラムの順序に依存するため変更に弱すぎました。継続的なレプリケートを考える際には parquet を選択することが無難に思えます。
MySQL ではカラムの追加の際に「どのカラムの後に目的のカラムを追加するのか」を選ぶことができます。一方で、Snowflake で csv をパースする際に「 n 番目のカラムは XXX だ」と決めでやっているため、カラムの順序が変更されると破綻してしまいます。
これを回避するためには単に parquet を使えば大丈夫です。
停止・再開の挙動
DMS タスクを停止する際にはチェックポイントという概念があり、次にどこから binlog を参照するのかを一時的に管理しておくことができるようです。しかし、トランザクション中に止めると壊れるといった記事等もあるため、あまり信用しない方が良いと思っています。
幸い View で構築しているため、スナップショットテーブルをもう一つ作成して、View を作り替えるだけで特に壊れずに移行できるので、今後はこちらの手段を取ろうと思っています。(精神衛生上もこちらの方がいいのでw)
エラーの検知
今のところ監視を導入せずに運用しているため、監視が喫緊の課題となっています。冗長化構成にしてあることと、最悪壊れても事業にあまり影響が出ない部分でしか使っていないのとで油断していますが、最低限壊れたことを検知できるようにはしておこうと思っています。
まとめ
DMS を使って MySQL のデータを Snowflake に同期する方法を紹介してきました。まだまだ課題はありますが、参考にしていただけたら幸いです。
また、勝手にレプリケートしてくれる Aurora を始め RDS というサービスの恩恵を改めて感じることができました。