Rust製のWebアプリケーションのトランザクション管理方法について

業務でRustでWebアプリケーションを書いています。 同然データベーストランザクションを扱っているのですが、トランザクションの管理方法が煩雑で不具合になったりしていました。

これについて解決方法を考えてみたのでブログ記事として残してみます。

前提

使用しているクレートは以下のものになります。

Webアプリの構成について

アーキテクチャは所謂クリーンアーキテクチャを採用しています。 上の階層から順に列挙すると、

  • handler (HTTPクリエスト、レスポンスと言った部分を管理する層)
  • usecase (業務ロジックを管理する層)
  • repository (データベースアクセスや外部APIとの通信を管理する層)

のようになっています。

1つのリクエストの処理に沿って見てみると、

  1. handlerがHTTPリクエストを受け取る
  2. usecaseが業務ロジックに基づき処理を行う
  3. usecaseからrepositoryの実装を呼び出してDBアクセスなどを行う
  4. repositoryの処理の結果をusecaseへ返す
  5. usecaseが業務ロジックに基づき処理を行う
  6. handlerがHTTPレスポンスを返す

と言った流れです。

これまでのトランザクション管理方法

クリーンアーキテクチャを採用したWebアプリケーションでDBコネクションを管理しようと思うと、 repositoryにコネクションを持つのが一般的ではないかなと思います。 以下のような形です。

use sea_orm::DatabaseConnection;

struct DatabaseRepositoryImpl {
  db_conneciton: DatabaseConnection,
}

ただし、コネクションをrepositoryに持たせるとrepositoryの複数の処理に跨がるようなトランザクションを開始出来ないという問題があります。

例えばrepositoryが以下のように実装されているとします。

impl DatabaseRespository for DatabaseRepositoryImpl {

  pub async fn create_user(&self, user: User) -> anyhow::Result<()> {
    let tx = self.db_connection.begin();
    // ユーザ作成処理 (省略)
   tx.commit().await
  }

  pub async fn create_blog(&self) -> anyhow::Result<()> {
    let tx = self.db_connection.begin();
    // ブログ作成処理 (省略)
   tx.commit().await
  }
}

これをusecase層で以下のように使うことを考えた時に、create_usercreate_blogはそれぞれ独立したトランザクション毎の処理となります。

pub fn create_user_and_blog(&self, user: User) -> anyhow::Result<()> {
  self.database_repository.create_user(user).await?;
  self.database_repository.create_blog().await
}

なので場合によっては、userは無事作成されたものの、blogは作成できなかったという状況に陥ります。

これを回避するために、DBコネクションはactix-webのデータとして持つ方法を採用しています。 リクエストの度にhandlerの引数としてDBコネクションを含むデータを受け取り、それをusecaseに渡す。 usecase内でトランザクションの開始をして、処理の最後でコミットあるいはロールバックをするという方式です。

ソースコードを見ていきます。

まず、Webアプリの起動時にDBコネクションを確立します。それをactix-webのapp_data()に渡していました。

use sea_orm::DatabaseConnection;

#[derive(Clone)]
pub struct AppState {
    db_conn: DatabaseConnection,
}

impl AppState {
    pub fn new(db_conn: DatabaseConnection) -> Self {
        Self {
            db_conn,
        }
    }

    pub fn db_connection(&self) -> DatabaseConnection {
        self.db_conn.clone()
    }
}


async fn main() -> std::io::Result<()> {
let db_connection = database::establish_database_connection(config.db_dsn())
        .await
        .unwrap();

    let app_state = AppState::new(db_connection);

    HttpServer::new(move || {
        App::new()
            .app_data(web::Data::new(app_state.clone()))
            .service(
                web::scope("/protect")
                    .service(post_account)
            )
            .service(web::scope("/public"))
    })
    .bind(("0.0.0.0", 9090))?
    .run()
    .await
}

async fn post_account(ctx: web::Data<AppState>, body: web::Json<PostAccountRequest>) -> AppResult {
    let account =  self.get_usecase()?
        .create_user_and_blog(ctx.into_inner(), body.into_inner().into())
        .await?;
    Ok(HttpResponse::Ok().finish())
}

App::new()のタイミングで.app_data()にDBコネクションを含む構造体AppStateを渡しています。

その後リクエストを受け取った時に、post_accountの引数の1つとしてAppStateを受け取り、usecase層のメソッドに渡しています。

ではusecase層はこれをどのように使うのかというと、以下のようになります。

pub fn create_user_and_blog(&self, ctx: AppState, user: User) -> anyhow::Result<()> {
  let tx = ctx.db_connection().begin().await?;
  self.database_repository.create_user(&tx, user).await?;
  self.database_repository.create_blog(&tx).await?;
  tx.commit().await
}

冒頭でトランザクションを開始し、トランザクションをrepository層の各メソッドに渡しています。 repository層では受け取ったトランザクションを用いて処理を行い、全ての処理が完了するとusecase層の最後にコミットされるという流れです。

このようにすることにより、respoitoryの各処理が同一のトランザクション内で実行されるため、 userが作成されたけど、blogは作成されなかったという状況を防ぐことができます。

問題点

この方法の問題点として、usecase層の処理の最後でコミットすることを忘れるという問題がありました。

最後のコミットを忘れるとどうなるのかと言うと、変数txdropされたタイミングでトランザクションロールバックし、 変更が全てなかったことになります。

当然テストを実施すればバグとして問題には気付けるのですが、逆に言えばテストをするまで気付かないことも多いです。

解決方法

usecase内の処理の成否によって暗黙にコミットあるいはロールバックできれば、この問題は解決しそうです。

そこでトランザクションを管理するための構造体を用意してみました。

use anyhow::Context as _;
use sea_orm::{
    ConnectOptions, Database, DatabaseConnection, DatabaseTransaction, TransactionTrait,
};
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;

#[derive(Clone)]
pub struct TransactionManager {
    database_connection: DatabaseConnection,
}

impl TransactionManager {
    pub fn new(connection: DatabaseConnection) -> Self {
        Self {
            database_connection: connection,
        }
    }
}

impl TransactionManager {
    pub async fn run_in_transaction<T, F>(&self, f: F) -> anyhow::Result<T>
    where
        T: Send,
        F: FnOnce(
                CloneableDatabaseTransaction,
            ) -> Pin<Box<dyn Future<Output = anyhow::Result<T>> + Send>>
            + Send,
    {
        let tx = self.database_connection.begin().await?;
        let cloneable_tx = CloneableDatabaseTransaction::new(tx);
        let tx_clone = cloneable_tx.clone();

        let result = f(cloneable_tx).await;

        match result {
            Ok(_) => tx_clone.commit().await?,
            Err(_) => tx_clone.rollback().await?,
        }

        result
    }
}

フィールドとしてDBコネクションを持ち、また特定の処理をトランザクション内で処理するためのメソッドrun_in_transactionを持ちます。

run_in_transactionの内部ではまずトランザクションを開始し、そのトランザクションをクローンします。 クローンは後にコミットあるいはロールバックを実施するために必須です。

クローンされたトランザクションを引数で受け取ったクロージャに渡し、クロージャ内では受け取ったトランザクションを使ってDBアクセスを行います。 クロージャの処理が完了し、結果が戻ってくると、その結果を元にコミットあるいはロールバックを行います。

これを行うためにsea_rom::DatabaseTransactionのラッパーとしてCloneableDatabaseTransactionを用意しています。

実装は以下のようになっています。

#[derive(Clone)]
pub struct CloneableDatabaseTransaction(Arc<DatabaseTransaction>);

impl CloneableDatabaseTransaction {
    pub fn new(tx: DatabaseTransaction) -> Self {
        Self(Arc::new(tx))
    }

    pub async fn commit(self) -> anyhow::Result<()> {
        let tx = Arc::try_unwrap(self.0).map_err(|_| anyhow::anyhow!("Cannot unwrap Arc"))?;
        tx.commit().await.map_err(Into::into)
    }

    pub async fn rollback(self) -> anyhow::Result<()> {
        let tx = Arc::try_unwrap(self.0).map_err(|_| anyhow::anyhow!("Cannot unwrap Arc"))?;
        tx.rollback().await.map_err(Into::into)
    }

    pub fn inner(&self) -> &DatabaseTransaction {
        &self.0
    }
}

sea_orm::DatabaseBaseTransactionArcで囲んでいます。これはトランザクションがスレッド安全であることを求められるためです。 またトランザクションがクローン可能であることを求められるため、Cloneを実装したラッパーを用意しました。

これらの実装を用いるとusecase内で明示的にコミット・ロールバックすることを回避できます。

まず、AppStateTransactionManagerを持つことができるように修正します。

use crate::database::TransactionManager;

#[derive(Clone)]
pub struct AppState {
    transaction_manager: TransactionManager,
}

impl AppState {
    pub fn new(transaction_manager: TransactionManager) -> Self {
        Self {
            transaction_manager,
        }
    }

    pub fn tx_manager(&self) -> TransactionManager {
        self.transaction_manager.clone()
    }
}

この時点でサーバの初期化処理で型エラーが発生するので、そちらも適宜修正します。

#[actix_web::main]
async fn main() -> std::io::Result<()> {
    let db_connection = database::establish_database_connection(config.db_dsn())
        .await
        .map_err(|_| std::io::ErrorKind::Other)?;

    let tx_manager = database::TransactionManager::new(db_connection);
    let app_state = AppState::new(tx_manager);

    HttpServer::new(move || {
        App::new()
            .app_data(web::Data::new(app_state.clone()))
            .service(
                web::scope("/protect")
                    .service(post_account)
            )
            .service(web::scope("/public"))
    })
    .bind(("0.0.0.0", 9090))?
    .run()
    .await
}

するとusecase層では処理を以下の様に書くことができます。

pub fn create_user_and_blog(&self, ctx: AppState, user: User) -> anyhow::Result<()> {
  // repositoryはCopyトレイトを実装している必要がある
  // あるいはCloneトレイトを実装して、let repository = self.database_repository.clone()
  let repository = self.database_repository;

  ctx.transaction_manager()
    .run_in_transaction(|tx| {
       Box::pin(async move {
            database_repository.create_user(&tx, user).await?;
            database_repository.create_blog(&tx).await?;
            Ok(())
          })
        })
    }).await
}

処理をrun_in_transactionに与えたクロージャ内で行うことで、単一のトランザクションでrepository層の処理が実行されます。 またクロージャ内の処理の成否によって暗黙にトランザクションがコミット・ロールバックされるようになりました。

これによりusecase層の処理の最後にコミット処理を記述する必要がなくなり、 うっかりバグが発生してしまうことがなくなりました。

async moveによりrepositoryの所有権がクロージャ内へ移ってしまうため、予めコピー(またはクローン)する必要があるところもポイントです。

まとめ

Rustで作ったWebアプリでトランザクションの管理を行う方法を変更しました。

変更前と変更後のどちらが良いのかは人によるのかなと思います。

run_in_transactionの使用時にクロージャBox::pin(async move {})で囲む必要があり、これを忘れそう(面倒くさい)という人もいるかと思います。

ですが、Box::pin(async move {})で囲うのを忘れた場合はコンパイルエラーとなります。 一方で元の方法でtx.commit()を忘れた場合はコンパイルエラーにはなりません(なので不具合に繋がる)。

個人的にはプログラマのミスをコンパイルが発見してくれる変更後の方法の方が、よりRustらしい書き方だと思います。