SQLAlchemy は ETL を魔法のように簡単にします
データ サイエンスのワークフローの重要な側面の 1 つは、上流で使用できる形式で生データを取得、クリーニング、保存することです。このプロセスは一般に「抽出-変換-ロード」、または短縮して ETL と呼ばれます。
効率的で堅牢かつ信頼性の高い ETL プロセス、つまり「データ パイプライン」を設計することが重要です。 」 非効率的なパイプラインでは、データの操作が遅くなり非生産的になります。堅牢でないパイプラインは簡単に破損し、隙間が生じます。
さらに悪いことに、信頼性の低いデータ パイプラインは、被害が発生するまで明らかにならない可能性のある偽のデータでデータベースを静かに汚染します。
ETL 開発は非常に重要ですが、時間がかかり、面倒なプロセスになる場合があります。幸いなことに、作業をはるかに容易にするオープンソース ソリューションがあります。
SQLAlchemy とは何ですか?
そのようなソリューションの 1 つは、SQLAlchemy と呼ばれる Python モジュールです。これにより、データ エンジニアや開発者はスキーマの定義、クエリの作成、SQL データベースの操作を完全に Python を通じて行うことができます。
SQLAlchemy のオブジェクト リレーショナル マッパー (ORM) と式言語の機能は、Python のクラスと構造をデータ テーブルと式に関連付けることができるため、SQL のさまざまな実装間で明らかな特異性の一部を解決します。
ここでは、SQLAlchemy のいくつかのハイライトを見て、SQLAlchemy で何ができるのか、そしてどのように ETL 開発プロセスをよりスムーズにすることができるのかを確認します。
セットアップ中
pip パッケージ インストーラーを使用して SQLAlchemy をインストールできます。
$ sudo pip install sqlalchemy
SQL 自体に関しては、MySQL、Postgres、Oracle、Microsoft SQL Server など、さまざまなバージョンが利用可能です。この記事では SQLite を使用します。
SQLite は、SQL のオープンソース実装であり、通常、Linux および Mac OS X にプリインストールされています。Windows でも使用できます。システムにまだインストールされていない場合は、次の手順に従って起動して実行できます。
新しいディレクトリで、ターミナルを使用して新しいデータベースを作成します。
$ mkdir sqlalchemy-demo && cd sqlalchemy-demo
$ touch demo.db
スキーマの定義
データベース スキーマは、テーブル、列、フィールド、およびそれらの間の関係の観点からデータベース システムの構造を定義します。スキーマは、生の SQL で定義することも、SQLAlchemy の ORM 機能を使用して定義することもできます。
以下は、架空のブログ プラットフォーム用に 2 つのテーブルのスキーマを定義する方法を示す例です。 1 つはユーザーのテーブル、もう 1 つはアップロードされた投稿のテーブルです。
from sqlalchemy import *
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy.sql import *
engine = create_engine('sqlite:///demo.db')
Base = declarative_base()
class Users(Base):
__tablename__ = "users"
UserId = Column(Integer, primary_key=True)
Title = Column(String)
FirstName = Column(String)
LastName = Column(String)
Email = Column(String)
Username = Column(String)
DOB = Column(DateTime)
class Uploads(Base):
__tablename__ = "uploads"
UploadId = Column(Integer, primary_key=True)
UserId = Column(Integer)
Title = Column(String)
Body = Column(String)
Timestamp = Column(DateTime)
Users.__table__.create(bind=engine, checkfirst=True)
Uploads.__table__.create(bind=engine, checkfirst=True)
まず、SQLAlchemy から必要なものをすべてインポートします。次に、create_engine(connection_string)
を使用してデータベースに接続します。正確な接続文字列は、使用している SQL のバージョンによって異なります。この例では、前に作成した SQLite データベースへの相対パスを使用します。
次に、テーブル クラスの定義を開始します。この例の最初のものは Users
です。このテーブルの各列は、SQLAlchemy の Column(type)
を使用してクラス変数として定義されます。type
はデータ型 (Integer
など) String
、DateTime
など)。 primary_key=True
を使用して、主キーとして使用される列を指定します。
ここで定義されている次のテーブルは Uploads
です。これはほぼ同じ考え方です。各列は前と同じように定義されます。
最後の 2 行で実際にテーブルを作成します。 checkfirst=True
パラメータにより、新しいテーブルがデータベースに現在存在しない場合にのみ作成されるようになります。
抽出する
スキーマが定義されたら、次のタスクはソースから生データを抽出することです。正確な詳細は、生データの提供方法に応じて、ケースバイケースで大きく異なる可能性があります。アプリが社内 API やサードパーティ API を呼び出す場合や、CSV ファイルに記録されたデータを読み取る必要がある場合があります。
以下の例では、2 つの API を使用して、上記の架空のブログ プラットフォームのデータをシミュレートします。 Users
テーブルには、randomuser.me でランダムに生成されたプロファイルが入力され、Uploads
テーブルには、JSONPlaceholder から提供された lorem ipsum からインスピレーションを得たデータが含まれます。
以下に示すように、Python の Requests
モジュールを使用してこれらの API を呼び出すことができます。
import requests
url = 'https://randomuser.me/api/?results=10'
users_json = requests.get(url).json()
url2 = 'https://jsonplaceholder.typicode.com/posts/'
uploads_json = requests.get(url2).json()
データは現在、JSON 形式の 2 つのオブジェクト (users_json
と uploads_json
) に保持されています。次のステップでは、このデータを変換して、前に定義したテーブルにロードします。
変身
データをデータベースにロードする前に、データが正しい形式であることを確認することが重要です。上記のコードで作成された JSON オブジェクトはネストされており、定義されたテーブルに必要なデータよりも多くのデータが含まれています。
重要な中間ステップは、データを現在のネストされた JSON 形式から、エラーなく安全にデータベースに書き込むことができるフラット形式に変換することです。
この記事で実行されている例では、データは比較的単純なので、あまり変換する必要はありません。以下のコードは、最後のステップで使用される 2 つのリスト users
と uploads
を作成します。
from datetime import datetime, timedelta
from random import randint
users, uploads = [], []
for i, result in enumerate(users_json['results']):
row = {}
row['UserId'] = i
row['Title'] = result['name']['title']
row['FirstName'] = result['name']['first']
row['LastName'] = result['name']['last']
row['Email'] = result['email']
row['Username'] = result['login']['username']
dob = datetime.strptime(result['dob'],'%Y-%m-%d %H:%M:%S')
row['DOB'] = dob.date()
users.append(row)
for result in uploads_json:
row = {}
row['UploadId'] = result['id']
row['UserId'] = result['userId']
row['Title'] = result['title']
row['Body'] = result['body']
delta = timedelta(seconds=randint(1,86400))
row['Timestamp'] = datetime.now() - delta
uploads.append(row)
ここでの主な手順は、前に作成した JSON オブジェクトを反復処理することです。結果ごとに、スキーマ内の関連テーブルに定義された各列に対応するキーを持つ新しい Python 辞書オブジェクトを作成します。これにより、データがネストされなくなり、テーブルに必要なデータのみが保持されます。
もう 1 つのステップは、Python の datetime
モジュールを使用して日付を操作し、データベースに書き込むことができる DateTime
型のオブジェクトに変換することです。この例では、Python の DateTime モジュールの timedelta()
メソッドを使用して、ランダムな DateTime
オブジェクトを生成します。
作成された各辞書はリストに追加され、パイプラインの最終ステップで使用されます。
負荷
最後に、 データはデータベースにロードできる形式になります。 SQLAlchemy では、セッション API を通じてこのステップを簡単に実行できます。
セッション API は、データベースからロードした、またはデータベースに関連付けられた Python オブジェクトの仲介者、つまり「ホールディング ゾーン」のように機能します。これらのオブジェクトは、データベースにコミットされる前にセッション内で操作できます。
以下のコードは、新しいセッション オブジェクトを作成し、それに行を追加して、それらをマージしてデータベースにコミットします。
Session = sessionmaker(bind=engine)
session = Session()
for user in users:
row = Users(**user)
session.add(row)
for upload in uploads:
row = Uploads(**upload)
session.add(row)
session.commit()
sessionmaker
ファクトリは、新しく構成された Session
クラスを生成するために使用されます。 Session
は、2 行目で session
としてインスタンス化される日常的な Python クラスです。
次に、前に作成した users
リストと uploads
リストを反復する 2 つのループです。これらのリストの要素は、キーが前に定義した Users
クラスと Uploads
クラスで指定された列に対応する辞書オブジェクトです。
各オブジェクトは、関連するクラスの新しいインスタンスをインスタンス化するために使用されます (Python の便利な some_function(**some_dict)
トリックを使用します)。このオブジェクトは、session.add()
を使用して現在のセッションに追加されます。
最後に、追加する行がセッションに含まれている場合、session.commit()
を使用してトランザクションをデータベースにコミットします。
集約中
SQLAlchemy のもう 1 つの優れた機能は、式言語システムを使用してバックエンドに依存しない SQL クエリを作成および実行できることです。
バックエンドに依存しないクエリを作成する利点は何ですか?まず、将来の移行プロジェクトが非常に簡単になります。 SQL の異なるバージョンには多少互換性のない構文がありますが、SQLAlchemy の式言語はそれらの間の共通語として機能します。
また、Python 的な方法でシームレスにデータベースにクエリを実行したり操作したりできることは、自分が最もよく知っている言語で完全に作業したいと考えている開発者にとって大きな利点です。ただし、事前に作成されたクエリを使用する方が簡単な場合には、SQLAlchemy を使用してプレーン SQL で作業することもできます。
ここでは、架空のブログ プラットフォームの例を拡張して、これがどのように機能するかを説明します。基本的なユーザー テーブルとアップロード テーブルを作成してデータを入力したら、次のステップとして集計テーブルを作成します。たとえば、各ユーザーが投稿した記事の数と、最後にアクティブだった時間を表示します。 。
まず、集約テーブルのクラスを定義します。
class UploadCounts(Base):
__tablename__ = "upload_counts"
UserId = Column(Integer, primary_key=True)
LastActive = Column(DateTime)
PostCount = Column(Integer)
UploadCounts.__table__.create(bind=engine, checkfirst=True)
このテーブルには 3 つの列があります。 UserId
ごとに、最後にアクティブだったときのタイムスタンプと、アップロードした投稿の数が保存されます。
単純な SQL では、次のようなクエリを使用してこのテーブルにデータが設定されます。
INSERT INTO upload_counts
SELECT
UserId,
MAX(Timestamp) AS LastActive,
COUNT(UploadId) AS PostCount
FROM
uploads
GROUP BY 1;
SQLAlchemy では、これは次のように記述されます。
connection = engine.connect()
query = select([Uploads.UserId,
func.max(Uploads.Timestamp).label('LastActive'),
func.count(Uploads.UploadId).label('PostCount')]).\
group_by('UserId')
results = connection.execute(query)
for result in results:
row = UploadCounts(**result)
session.add(row)
session.commit()
最初の行では、engine
オブジェクトの connect()
メソッドを使用して Connection
オブジェクトを作成します。次に、select()
関数を使用してクエリを定義します。
このクエリは、上記の単純な SQL バージョンと同じです。 uploads
テーブルから UserId
列を選択します。また、func.max()
を Timestamp
列に適用して、最新のタイムスタンプを識別します。これには、label()
メソッドを使用して LastActive
というラベルが付けられます。
同様に、クエリは func.count()
を適用して、Title
列に表示されるレコードの数をカウントします。これには PostCount
というラベルが付けられます。
最後に、クエリは group_by()
を使用して結果を UserId
ごとにグループ化します。
クエリの結果を使用するには、for ループで connection.execute(query)
によって返された行オブジェクトを反復処理します。各行は、UploadCounts
テーブル クラスのインスタンスをインスタンス化するために使用されます。前と同様に、各行が session
オブジェクトに追加され、最後にセッションがデータベースにコミットされます。
チェックアウト
このスクリプトを実行したら、前に作成した demo.db
データベースにデータが正しく書き込まれたことを確認できます。
Python を終了した後、SQLite でデータベースを開きます。
$ sqlite3 demo.db
これで、次のクエリを実行できるようになります。
SELECT * FROM users;
SELECT * FROM uploads;
SELECT * FROM upload_counts;
そして、各テーブルの内容がコンソールに出力されます。 Python スクリプトを定期的に実行するようにスケジュールすると、データベースを確実に最新の状態に保つことができます。
これらのテーブルを使用して、さらに分析するためのクエリを作成したり、視覚化を目的としたダッシュボードを構築したりできるようになりました。
さらに読む
ここまで読んだ方は、SQLAlchemy を使用して Python での ETL 開発をより簡単にする方法について、1 つまたは 2 つ学んだことと思います。
単一の記事で SQLAlchemy のすべての機能を完全に評価することは不可能です。ただし、このプロジェクトの主な利点の 1 つは、ドキュメントの深さと詳細です。ここで詳しく知ることができます。
それ以外の場合は、すぐに開始したい場合は、このチートシートを確認してください。
この記事の完全なコードは、この gist にあります。
読んでくれてありがとう!ご質問やご意見がございましたら、以下にご返信ください。