未定の部屋

主にデータサイエンス関係の記事を書きます

Lambdaで形態素解析をする

最近業務でAWSを触ることが多く、その中で形態素解析をする機会があったので、その際の内容を備忘録として残しておきます。

やったこと

  • AWSのサービスを使い、ファイルがS3に置かれたら自動で形態素解析ができるようなシステムを作る
    • サービスとして、今回はLambdaを採用した
  • 形態素解析ツールとしてMeCabを利用する

手順

今回実装した大まかな手順は以下になります

  1. EC2インスタンスを作成し、SSH接続をする

  2. EC2インスタンス上にDockerとAWS CLIをインストールする

  3. EC2インスタンス上でDockerfile、requirements.txt、lambda_function.pyを作成し、Dockerイメージを作成する

  4. 作成したイメージをAmzon ECRへプッシュする

  5. Lambda関数をコンテナイメージから作成する

  6. S3バケットをトリガーとして設定する
    アーキテクチャの全体像は↓のようになります

手順詳細

FROM public.ecr.aws/lambda/python:3.9

COPY requirements.txt  .
RUN  pip3 install -r requirements.txt --target ./

COPY lambda_function.py   ./
CMD ["lambda_function.lambda_handler"]
  • requirements.txt
mecab-python3==1.0.6
unidic-lite==1.0.8
  • lambda_function.py
    • 入力ファイルはcsv形式で、A列にテキストが入っている想定です
import boto3
import csv
import io
import os
import urllib.parse
from datetime import datetime

import MeCab

s3 = boto3.client("s3")

def lambda_handler(event, context):
    # インプット用S3バケットからファイルを取得
    bucket = event["Records"][0]["s3"]["bucket"]["name"]
    key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8')
    response = s3.get_object(Bucket=bucket, Key=key)
    input_data = response["Body"].read().decode("utf-8").splitlines()
    csv_reader = csv.reader(input_data)
    headers = next(csv_reader, None) # Skip headers

    # 形態素解析
    mecab = MeCab.Tagger()
    result = []

    for row in csv_reader:
        text = row[0]
        node = mecab.parseToNode(text)
        # parsed_words = []
        # pos_info = []

        while node:
            if node.surface != '':
                result.append(text, node.surface, node.feature])
            node = node.next
        

    # アウトプットファイルを作成
    output_buffer = io.StringIO()
    csv_writer = csv.writer(output_buffer)
    csv_writer.writerow(["元のテキスト", "分解された単語", "情報"])
    csv_writer.writerows(result)

    # アウトプット用S3バケットに保存
    output_bucket = "your-output-bucket"  # アウトプット用バケット名に置き換える
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    output_key = f"output_{timestamp}.csv" # Set the output file name with a timestamp
    s3.put_object(Bucket=output_bucket, Key=output_key, Body=output_buffer.getvalue())
  • S3バケットをトリガーとして設定する

    • 関数を作成後、関数の概要図からS3の「+トリガーを追加」を選択し、S3を指定
    • S3トリガーとなるバケットを選択する
  • ランタイムやメモリの設定変更

    • 処理に時間を要するものは、Lambda関数の「設定」からメモリとタイムアウトの設定を変更し、「保存」
  • 入力データの格納(=トリガーの発動)

  • 出力バケットの確認

  • 出力用のS3バケットの中身を確認する
    (実行に時間がかかるものもある。15分以内で収まるはず。)

    • S3画面でバケットを選択し、吐き出されたCSVを選択、ダウンロード
  • エラー・バグの対応

    • Lambda関数の「モニタリング」タブから「View CloudWatch Logs」を選択 CloudWatchの該当「ログストリーム」を選択
    • うまくいかないときはエラーが吐かれているはずなので確認してみる

参考