はじめに
多くのブログ記事やチュートリアルは、機械学習や深層学習モデルのトレーニングに焦点を当てています。しかし、これらのモデルをWebサイトや他のシステムから簡単に利用できるように作成し、デプロイする方法については、あまり多くの情報がありません。
この記事では、これらのモデルのWeb APIを作成するためのアプローチと、AWSへのデプロイメントについて説明します。
この記事ではTensorFlow 2に焦点を当てていますが、いくつかの変更を加えることで、コードはPyTorch やLightGBMなどの他のフレームワークにも対応できます。
自己紹介
私はアダムと申します。PROTO SolutionのAIテクノロジー推進室で機械学習/深層学習のシステムやプロトタイプを開発しております。
主な仕事は、TensorFlowを使った深層学習モデルの開発と、それらを使ってAmazon Web Services (AWS)にデプロイできるAPIやバッチアプリケーションの開発です。
アプローチ
機械学習モデルをデプロイするためにいくつかAWSサービスが使えます。例えば、
これらのサービスはそれぞれ料金、複雑さ、機能が異なります。
この記事では、AWS LambdaとAWS Elastic Beanstalkへのデプロイに焦点を当てます。理由は、この二つのサービスが一番使いやすいですし、私のプロトタイプやアプリケーションはこれらでデプロイすることが多いです。
今後のブログでは、他のサービスについても記事を書いていきたいと思います。
API
API(Application Programming Interface)とは、ユーザーや他のシステムが、アプリケーションの内部を知らなくても、そのアプリケーションと使用できる方法を示すものです。
Web APIは、HTTPやその他のプロトコルを使用して、アプリケーションの機能をインターネット上で提供し利用することを可能にします。
この記事では、FastAPI フレームワークを使用してWeb APIを作成し、DockerおよびAWSサービスを利用してデプロイします。
ImageNetというデータセットに学習されたTensorFlowのMobileNetV2 モデルを例として使用します。
FastAPI
FastAPI は、PythonでWeb APIを構築するためのフレームワークです。
Flaskとその他のフレームワークも非常に人気がありますが、FastAPIはこれらのフレームワークに比べて、いくつかの利点と特徴があると思います。例えば、
- ・モダンで非常に高速処理
- ・コードの重複を最小限にしていますので、APIのソースコードが短い
- ・Pydanticによるデータのシリアル化と検証が組み込まれています。
- ・自動的にOpen APIドキュメント(仕様書)生成
Docker
Dockerは、アプリケーションをデプロイするための決定的な方法となっています。
Dockerで、アプリケーションと、オペレーティングシステムを含むすべての依存関係を、Dockerイメージと呼ばれるものにパッケージ化できます。
これらのイメージは、Dockerエンジンをサポートするさまざまなプラットフォームで実行できます。
Dockerエンジンで実行されるイメージは、コンテナと言います。
Dockerには、コンテナとイメージを管理するためのさまざまなコマンドがありますが、この記事では、buildとrunコマンドのみを使用します。
チュートリアル
このセクションでは、APIの各部分を紹介し、説明します。
依存関係のインストール
最初のステップは、Pythonの分離されたコピーである新しい仮想環境を作成することです。
この記事では、Condaを使って新しいPython 3.7の仮想環境を作成し、それに切り替えます。
conda create -n ApiDeployment python=3.7
conda activate ApiDeployment
次に、requirements.txtに記載されている依存関係を仮想環境にインストールします。
pip install -r requirements.txt
必要な依存関係のパッケージを少し見てみましょう。
Pillow==8.2.0
tensorflow==2.4.2
numpy==1.19.5
fastapi==0.65.2
pydantic==1.8.2
aiohttp==3.7.3
uvloop==0.14.0
uvicorn[standard]==0.14.0
gunicorn==20.1.0
aiofiles==0.7.0
mangum==0.12.2
パッケージのインポートとロギング
まずは始めに、APIが使用するライブラリのインポートです。
これらのライブラリの使い方については、コードを読み進めながら説明します。
import argparse
import base64
import io
import os
import logging
import sys
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input, decode_predictions
from urllib.parse import urlparse
from aiohttp.client import ClientSession
from asyncio import wait_for, gather, Semaphore
from typing import Optional, List
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
from pydantic import BaseModel, validator
import numpy as np
from PIL import Image
from mangum import Mangum
APIは環境変数を使って設定されますが、これらはコマンドラインで簡単にDockerに渡すことができます。
THREAD_COUNT = int(os.environ.get('THREAD_COUNT', 5))
"""The number of threads used to download and process image content."""
BATCH_SIZE = int(os.environ.get('BATCH_SIZE', 4))
"""The number of images to process in a batch."""
TIMEOUT = int(os.environ.get('TIMEOUT', 30))
"""The timeout to use when downloading files."""
また、メッセージをログに書き込むためのロギングオブジェクトも作ります。
logger = logging.getLogger(__name__)
データのモデル
次に、Pydantic というパッケージを使って、APIへのインターフェースを定義し、データ構造を作成します。
データ構造を作成するための詳しいチュートリアルはこちらをご覧ください。
class HealthCheck(BaseModel):
"""
Represents an image to be predicted.
"""
message: Optional[str] = 'OK'
class ImageInput(BaseModel):
"""
Represents an image to be predicted.
"""
url: Optional[str] = None
data: Optional[str] = None
class ImageOutput(BaseModel):
"""
Represents the result of a prediction
"""
score: Optional[float] = 0.0
category: Optional[str] = None
name: Optional[str] = None
@validator('score')
def result_check(cls, v):
return round(v, 4)
class PredictRequest(BaseModel):
"""
Represents a request to process
"""
images: List[ImageInput] = []
class PredictResponse(BaseModel):
"""
Represents a request to process
"""
images: List[ImageOutput] = []
PredictRequestオブジェクトは、APIに渡されるデータ、つまり処理したい画像を表しています。
PredictRequestオブジェクトは、予測結果がAPIの呼び出し元にどのように返されるかを定義します。
これらの構造を定義することで、画像のURLやBase64エンコードされた画像データを含むJSONリクエストを自動的にデシリアライズし、妥当性を検査することができます。
サンプルなリクエスト
{
"images": [
{
"url": "https://localhost/test.jpg"
}
]
}
また、各画像のImageNetカテゴリー予測値、信頼度スコアを含むJSONレスポンスをシリアライズすることもできます。例えば、
サンプルなレスポンス
{
"images": [
{
"score": 0.508,
"category": "n03770679",
"name": "minivan"
}
]
}
FastAPI アプリケーション
次のステップでは、FastAPIのアプリケーションのオブジェクトを作ります。これにより、次のステップでいくつかのアノテーションを使用できるようになります。
app = FastAPI()
例外処理
以下の例外ハンドラは、処理中にエラーが発生した場合に、呼び出し元に返されるエラーメッセージを生成するために使用されます。
class ImageNotDownloadedException(Exception):
pass
@app.exception_handler(Exception)
async def unknown_exception_handler(request: Request, exc: Exception):
"""
Catch-all for all other errors.
"""
return JSONResponse(status_code=500, content={'message': 'Internal error.'})
@app.exception_handler(ImageNotDownloadedException)
async def client_exception_handler(request: Request, exc: ImageNotDownloadedException):
"""
Called when the image could not be downloaded.
"""
return JSONResponse(status_code=400, content={'message': 'One or more images could not be downloaded.'})
Image Classifier
このクラスには,学習されたMobileNetV2モデルが利用されています。
predict
という関数を呼び出すことで、画像のリストに対するImageNetカテゴリを予測することができます。
画像は前処理され、モデルの入力に合わせてリサイズされます(224ピクセル×224ピクセル)。
class ImageClassifier:
"""
Classifies images according to ImageNet categories.
"""
def __init__(self):
"""
Prepares the model used by the application for use.
"""
self.model = MobileNetV2()
_, height, width, channels = self.model.input_shape
self.input_width = width
self.input_height = height
self.input_channels = channels
def _prepare_images(self, images):
"""
Prepares the images for prediction.
:param images: The list of images to prepare for prediction in Pillow Image format.
:return: A list of processed images.
"""
batch = np.zeros((len(images), self.input_height, self.input_width, self.input_channels), dtype=np.float32)
for i, image in enumerate(images):
x = image.resize((self.input_width, self.input_height), Image.BILINEAR)
batch[i, :] = np.array(x, dtype=np.float32)
batch = preprocess_input(batch)
return batch
def predict(self, images, batch_size):
"""
Predicts the category of each image.
:param images: A list of images to classify.
:param batch_size: The number of images to process at once.
:return: A list containing the predicted category and confidence score for each image.
"""
batch = self._prepare_images(images)
scores = self.model.predict(batch, batch_size)
results = decode_predictions(scores, top=1)
return results
ロギングの設定
次に、アプリケーションのログを設定するための関数を作成します。
すべてのログのメッセージは stdout に出力されます。これにより、AWS LambdaやElastic Beanstalkは簡単にメッセージを記録できるようになります。
def configure_logging(logging_level=logging.INFO):
"""
Configures logging for the application.
"""
root = logging.getLogger()
root.handlers.clear()
stream_handler = logging.StreamHandler(stream=sys.stdout)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
stream_handler.setFormatter(formatter)
root.setLevel(logging_level)
root.addHandler(stream_handler)
モデルのロード
このステップでは,アプリケーションの起動時に、ImageClassifierオブジェクトを生成してロードします。
また、前のセクションで定義した関数を使って、アプリケーションのログを設定します。
@app.on_event('startup')
def load_model():
"""
Loads the model prior to the first request.
"""
configure_logging()
logger.info('Loading model...')
app.state.model = ImageClassifier()
ここでは、app
の state
に ImageClassifier
オブジェクトを格納しますが,代わりにグローバル変数としてモデルを格納することも可能です。
画像処理
このセクションでは、URLから画像をダウンロードしたり、リクエストメッセージに格納されている Base64 画像データをデコードしたりするためのいくつかの関数を定義します。
ダウンロードするためにaiohttp
というパッケージが使われています。画像のデータはPillow
の形式に変換しています。
def get_url_scheme(url, default_scheme='unknown'):
"""
Returns the scheme of the specified URL or 'unknown' if it could not be determined.
"""
result = urlparse(url, scheme=default_scheme)
return result.scheme
async def retrieve_content(entry, sess, sem):
"""
Retrieves the image content for the specified entry.
"""
raw_data = None
if entry.data is not None:
raw_data = base64.b64decode(entry.data)
elif entry.url is not None:
source_uri = entry.url
scheme = get_url_scheme(source_uri)
if scheme in ('http', 'https'):
raw_data = await download(source_uri, sess, sem)
else:
raise ValueError('Invalid scheme: %s' % scheme)
if raw_data is not None:
image = Image.open(io.BytesIO(raw_data))
image = image.convert('RGB')
return image
return None
async def retrieve_images(entries):
"""
Retrieves the images for processing.
:param entries: The entries to process.
:return: The retrieved data.
"""
tasks = list()
sem = Semaphore(THREAD_COUNT)
async with ClientSession() as sess:
for entry in entries:
tasks.append(
wait_for(
retrieve_content(entry, sess, sem),
timeout=TIMEOUT,
)
)
return await gather(*tasks)
async def download(url, sess, sem):
"""
Downloads an image from the specified URL.
:param url: The URL to download the image from.
:param sess: The session to use to retrieve the data.
:param sem: Used to limit concurrency.
:return: The file's data.
"""
async with sem, sess.get(url) as res:
logger.info('Downloading %s' % url)
content = await res.read()
logger.info('Finished downloading %s' % url)
if res.status != 200:
raise ImageNotDownloadedException('Could not download image.')
return content
画像の予測
最後の2つのメソッドは,上記のコードをすべて結びつけるものです。
predict_images
は,Pillow
形式の画像のリストに対してImageNet
のカテゴリを予測し、その結果をImageOutput
オブジェクトのリストとして返します。
process
関数は、URLのリストから画像をダウンロードして処理し、レスポンスを返します。
app.post
のアノテーションは、HTTP POST
による関数を呼び出すことでき、関数のリクエストとレスポンスの構造がそれぞれPredictRequest
とPredictResponse
によって定義されることを示しています。
def predict_images(images):
"""
Predicts the image's category and transforms the results into the output format.
:param images: The Pillow Images to predict.
:return: The prediction results.
"""
response = list()
results = app.state.model.predict(images, BATCH_SIZE)
for top_n in results:
category, name, score = top_n[0]
response.append(ImageOutput(category=category, name=name, score=score))
return response
@app.post('/v1/predict', response_model=PredictResponse)
async def process(req: PredictRequest):
"""
Predicts the category of the images contained in the request.
:param req: The request object containing the image data to predict.
:return: The prediction results.
"""
logger.info('Processing request.')
logger.debug(req.json())
logger.info('Downloading images.')
images = await retrieve_images(req.images)
logger.info('Performing prediction.')
results = predict_images(images)
logger.info('Transaction complete.')
return PredictResponse(images=results)
健全性のチェック
また、ロードバランサーがアプリケーションの健全性をチェックするためにHTTP GET
による呼び出すことができる関数も提供しています。
@app.get('/health')
def test():
"""
Can be called by load balancers as a health check.
"""
return HealthCheck()
Handler (Lambdaに必須)
Lambda
でFastAPI
を使用するためには、FastAPI
のapp
オブジェクトをMangumでラップする必要があります。
handler = Mangum(app)
注:アプリケーションをローカルまたはAWS Elastic Beanstalkにデプロイする場合、このハンドラは使用されません。
APIサーバーの実行 (ローカル環境)
最後に、コマンドラインからAPIを実行できるように、組み込みのuvicorn
サーバーを追加します。
if __name__ == '__main__':
import uvicorn
parser = argparse.ArgumentParser(description='Runs the API locally.')
parser.add_argument('--port',
help='The port to listen for requests on.',
type=int,
default=8080)
args = parser.parse_args()
configure_logging()
uvicorn.run(app, host='0.0.0.0', port=args.port)
以下のコマンドを実行することで、APIをローカルに実行することができます。
python main.py
Lambdaのデプロイメント
LambdaへのAPIのデプロイはいくつかのメリットがあります。
- サーバーやインフラの管理が不要です。
- 一時的な大量のトラフィックを対応するために、APIインスタンスの数を容易に拡張できます。
しかし、いくつかデメリットと制限もあり、特に以下の点が挙げられます。
- しばらくLambdaが呼ばれないときに、コンテナが起動時間が発生し、リクエスト処理の遅延が発生します。
- パフォーマンスを向上させるために調整できるのは、メモリ(および比例するCPU量)のみです。
- API GatewayからLambdaを呼び出す場合、タイムアウトは29秒に制限されます。
- Dockerイメージの容量は10GBに制限されています。
- 使用料金は、リクエスト数と処理時間の両方に基づいて設定されるため、長時間の処理を大量に実行する場合には割高になる可能性があります。
- ローカルストレージを必要とするアプリケーションには適していません。Lambdaは
/tmp
ディレクトリに512MBの追加ストレージしか提供しません。
Dockerイメージのビルド
AWSでは、Lambdaがサポートするプログラミング言語のベースイメージが多数用意されています。
この記事では、Python 3.7のベースイメージを使用します。
注:ハンドラの名前は、ソースコードで定義されているMangumハンドラの名前と一致していることが必要です
FROM public.ecr.aws/lambda/python:3.7
# Copy function code
COPY main.py ${LAMBDA_TASK_ROOT}/app.py
# Install the function's dependencies using file requirements.txt
COPY requirements.txt .
RUN pip3 install --no-cache-dir -r requirements.txt --target "${LAMBDA_TASK_ROOT}"
# Set the CMD to your handler (could also be done as a parameter override outside of the Dockerfile)
CMD [ "app.handler" ]
次に、docker build
コマンドを使ってLambdaイメージをビルドし、タグ付けします。
docker build -t imagenet-lambda -f Dockerfile.lambda .
コマンドが完了すると、以下のような出力が表示されます:
Step 5/5 : CMD [ "app.handler" ]
---> Running in 2257d01b05b0
Removing intermediate container 2257d01b05b0
---> 14755a1c4440
Successfully built 14755a1c4440
Successfully tagged imagenet-lambda:latest
イメージをLambdaで使用する前に、AWS Elastic Container Registry (ECR)にアップロードする必要もあります。
まず、AWS ECRコンソールに移動し、新しいリポジトリを作成します。
次に、先ほど作成したプライベートECRリポジトリにログインし、<Region Name>
と<Account ID>
のプレースホルダーをECRリポジトリのリージョンとAWSアカウントのアカウント番号に書き換え、以下のコマンドを実行します。
aws ecr get-login-password --region <Region Name> | docker login --username AWS --password-stdin <Account ID>.dkr.ecr.<Region Name>.amazonaws.com
注:上記のコマンドを実行するには、awscli
パッケージがインストールされている必要があります。インストール方法はこちらをご覧ください。
最後に、イメージにタグを付けて、ECRにプッシュ(アップロード)します。
docker tag imagenet-lambda:latest <AccountID>.dkr.ecr.<Region>.amazonaws.com/<RepositoryName>:latest
docker push <AccountID>.dkr.ecr.<Region>.amazonaws.com/<RepositoryName>:latest
CloudFormation
Dockerイメージの作成が完了したら、APIをデプロイするLambda関数を作成します。このLambda関数を簡単に作成するために、CloudFormationテンプレートを用意しました。
テンプレートを実行するには、以下の手順を行います。
- 1. CloudFormationのコンソールを開きます 。
- 2. 利用したいリージョンを選択します。
- 3. Create Stack → With new resources (standard) 「スタックの作成 (新しいリソース)」を選択します。
- 4. テンプレートファイルのアップロードを選択します。
- 5. deploy/lambda.yamlのパスを入力します。(Next)「次へ」をクリックします。
- 6. テンプレートに必要なパラメータを入力します。
Stack Name
. 今から作成するスタックの名前です。例:「ImageNetTest」。ImageUri
。<Account ID>.dkr.ecr.<Region>.amazonaws.com/<Repository Name>:latest
というフォーマットで、前のステップでアップロードしたDockerイメージのURI。
- 7. (Next) 「次へ」をクリックします。
- 8. 画面下部のチェックボックスをすべてチェックします。
- 9. (Create Stack)「スタック作成」をクリックします。
これでAPIがデプロイされ、すべてのAWSリソースが準備完了したら、API GatewayのURLを呼び出すことで、APIのLambdaを起動することができます。
Elastic Beanstalkのデプロイメント
APIは、Elastic Beanstalk上にもデプロイすることができ、Elastic Beanstalkの使用には以下いくつかメリットがあります。
- ・より柔軟な設定が可能です。例えば、GPUインスタンスを含む様々なインスタンスタイプを選択できます。
- ・しばらくAPIを呼び出さない場合、コンテナの起動時間は発生しません。
- ・使用料金は、リクエストごとではなく、インスタンスが稼働している時間に応じて決まります。APIによってはこの方が安いかもしれません。
しかし、Elastic Beanstalkにはいくつかのデメリットもあります。
- ・Lambdaに比べてインスタンスの起動とスケーリングが遅くなります。
- ・インスタンスがアイドル状態でも料金が発生します。
- ・Lambdaより設定に調整できる項目が増えますが、使い方も複雑になります。
Docker イメージのビルド
Lambdaと同様に、Dockerイメージを作成するためにDockerfile
を作りますが、ファイルの内容は少し違います。
Mangum
のハンドラを呼び出すのではなく、コンテナにあるuvicornとgunicornサーバー上にAPIをデプロイします。
イメージの作成を楽にするために、uvicorn-gunicorn-docker
templateを再利用します。
Elastic Beanstalk向けたDockerイメージを構築するために、以下のコマンドを実行します。
docker build -t imagenet-eb -f Dockerfile .
ビルドが完了すると、以下のような出力が表示されます。
Removing intermediate container 91a51bb166d4
---> 0bb1a7031a5e
Successfully built 0bb1a7031a5e
Successfully tagged imagenet-eb:latest
イメージをElastic Beanstalkで使用する前に、AWS Elastic Container Registry (ECR)にアップロードする必要もあります。
まず、AWS ECRコンソールに移動し、新しいリポジトリを作成します。
次に、先ほど作成したプライベートECRリポジトリにログインし、<Region Name>
と<Account ID>
のプレースホルダーをECRリポジトリのリージョンとAWSアカウントのアカウント番号に書き換え、以下のコマンドを実行します。
docker tag imagenet-eb:latest <Account ID>.dkr.ecr.<Region>.amazonaws.com/<Repository Name>:latest
docker push <Account ID>.dkr.ecr.<Region>.amazonaws.com/<Repository Name>:latest
CloudFormation
Dockerイメージの作成が完了したら、APIをデプロイするElastic Beanstalkの環境を作成します。この環境を簡単に作成するために、CloudFormationテンプレートを用意しました。
テンプレートを実行するには、以下の手順を行います。
- 1. CloudFormationのコンソールを開きます 。
- 2. 利用したいリージョンを選択します。
- 3. Create Stack With new resources (standard) 「スタックの作成 (新しいリソース)」を選択します。
- 4. テンプレートファイルのアップロードを選択します。
- 5. deploy/elastic-beanstalk.yamlのパスを入力します。(Next)「次へ」をクリックします。
- 6. テンプレートに必要なパラメータを入力します。
Stack Name
. 今から作成するスタックの名前です。例:「ImageNetTest」。InstanceType
。使用するインスタンスの種類です。
- 7. (Next) 「次へ」をクリックします。
- 8. 画面下部のチェックボックスをすべてチェックします。
- 9. (Create Stack)「スタック作成」をクリックします。
APIのデプロイメント
CloudFormationでElastic Beanstalkの環境の作成が完了したら、そこにAPIをデプロイします。
Elastic Beanstalkには、アプリケーションをデプロイするための2つの方法が用意されています。
- 1. Zipファイルをアップロードします。APIのファイルを含むZipファイルをアップロードします。デプロイに必要なDockerrun.aws.jsonというファイルにホストとDocker間のポートマッピングを定義している必要がありますが、DockerイメージのURIを指定する必要はありません。zipファイル内にDockerfileが存在する場合、APIコンテナはECRにアップロードする必要がなく、Elastic Beanstalkのインスタンスにビルドされます。
- 2. Dockerrun.aws.jsonのみをアップロードします。このファイルには、ホストとDocker間のDockerイメージのパスとポートのマッピングも記載されていますが、上と異なり、DockerイメージのURIを指定する必要があります。
今回は、アプローチ2番で進めます。
以下のようにDockerrun.aws.jsonという新しいファイルを作成します。
{
"AWSEBDockerrunVersion": "1",
"Image": {
"Name": "<Account ID>.dkr.ecr.<Region>.amazonaws.com/<Repository Name>:latest"
},
"Ports": [
{
"ContainerPort": "80"
}
]
}
Image Nameの部分を、先にアップロードしたイメージのURIに書き換えます。
Elastic Beanstalkのコンソールに移動し、(Upload and deploy) 「アップロードとデプロイ」ボタンを使ってDockerrun.aws.jsonのファイルをアップロードします。
しばらく待つとデプロイは完了します。
最後に
この記事では、FastAPIによるTensorFlowモデルを活用するAPIを作成するアプローチを説明しました。
また、同じコードを使ってAPIを2つの異なるAWSサービスにデプロイしました。
機械学習のデプロイにどのAWSサービスを使用するかを選択するのは時に難しいことですが、まずAWS Lambdaを試してみることをお勧めします。他のサービスに比べて、Lambdaは使いやすくてスケーラビリティが高いです。
より高速な推論のためにGPUを必要とする場合や、その他の要件でAWS Lambdaが適さない場合は、代わりにAWS Elastic Beanstalkをお勧めします。
ソースファイル
ソースファイルを GitHub に公開しました。
・main.py – APIのソースコード
・Dockerfile – Elastic Beanstalkのデプロイに向けたDockerfile
・Dockerfile.lambda – Lambdaのデプロイに向けたDockerfile
・Dockerrun.aws.json – Elastic Beanstalkのデプロイに向けたコンテナ設定ファイル
・requirements.txt – APIの依存関係が書かれているファイル
・deploy/elastic-beanstalk.yaml – Elastic Beanstalkのデプロイに向けたCloudFormationのテンプレート
・deploy/lambda.yaml – Lambdaのデプロイに向けたCloudFormationのテンプレート
最後までお読みいただき、ありがとうございました。