忘年会駆動 で キュー のお話しをしてきました

忘年会駆動 2012 - connpass で キューのお話しをしてきました。
Java で(Xtend)実装したのと、データ保存場所を SQL Server の テーブル にした場合と2パターンの紹介です。
スライドはこちら


Java(Xtend) のコードはこんな感じ

package queue.java

import java.util.ArrayList
import java.util.List
import queue.TransactionQueue

class TransactionItem<E> {
  @Property Long tranId
  @Property E value

  new(E value) {
    this.value = value
  }
}

class HogeQueue<E> implements TransactionQueue<E> {
  val _queue = new ArrayList<TransactionItem<E>>()

  override commit() {
    _queue.removeAll(currentTransactionTargets.toList as List)
  }

  override dequeueWithTransaction() {
    val item = _queue.findFirst([tranId == null])
    if (item == null) { return null }
    val localTranId = currentTranId
    item.tranId = localTranId
    item.value
  }

  override enqueue(E value) {
    _queue.add(new TransactionItem(value))
  }

  override rollback() {
    currentTransactionTargets.forEach([tranId = null])
  }

  def private currentTranId() {
    Thread::currentThread.id as Long
  }
  def private currentTransactionTargets() {
    val localTranId = currentTranId
    _queue.filter([tranId == localTranId])
  }

  override close() throws Exception {
    rollback()
  }
}

こっちは別に大したこと無くて、本題は SQL Server を使った方です。
このブログでも何回か紹介している OUTPUT 句 と READPAST テーブルヒントを使うことで、キューを実現しています。

キューの動きを実現するクエリ

delete from [Table] with (READPAST)
output deleted.[Value]
where [Id] = ( 
  select top (1) [Id] 
  from [Table] (READPAST) 
  order by [Id] 
)
  • OUTPUT で DELETE した値を呼び出し元に返す。
  • READPAST で他のトランザクションが更新している行は読み飛ばして削除するデータを検索する。

OUTPUT 句 (Transact-SQL)
テーブル ヒント (Transact-SQL)


こういうキューの使いどころとしては、キューのデータを処理する側が複数個にスケールする場合とかかな?*1

疑似コード

public void Hoge() {
  using (var scope = new TransactionScope())
  using (var conn = new SqlConnection("〜")) 
  using (var cmd = new SqlCommand(@"キューを実現するクエリ", conn){
    conn.Open();
    var value = cmd.ExecuteScalar();
    // 何かの処理...
    scope.Complete();
  }
}

処理が正常に完了した場合にだけキューからデータを削除したい。何らかの都合で正常に終了しなかった場合はキューのデータは元に戻したい。
でも、処理中のデータは他のスレッド等からだと見えなくなって欲しいという要求を先ほどのクエリだけで実現できます。

*1:クラウドでインスタンス増やすっていう規模ではなくて、マルチスレッドで処理するみたいな場合でも