RubyのCassandraクライアントでバッチ処理。

大量データの読み込み, 書き込みのバッチ処理を効率よく行うため、Cassandraクライアントにはバッチ処理用の仕組みが用意されています。

環境


cassandra (0.12.1)

読み込み


大量データを処理するバッチ処理では、Cassandra内のデータを意識せずに読み込むと
メモリ上に全て展開されてしまって、メモリが足りなくなることがあります。

例えば1,000,000件のデータがあった場合、それを一度に読み込むと1,000,000件のデータがメモリに溜まってしまうので、メモリを圧迫してしまいます。

解決方法の一つとして、1,000,000件全てをメモリに格納するのではなく、分割取得して処理する方法があります。

Cassandraクライアントでは、get_rangeメソッドが分割取得に対応しています。
下記のトリガーで分割取得を行います。
1. メソッドにブロックが渡された場合
2. options[:key_count]が指定された場合(取得するロウキーの総数)
3. options[:batch_size]が指定された場合(1度に取得する件数)

分割取得の単位件数は、options[:batch_size]に指定された件数分取得します。
デフォルトは100件です。

使い方:

client = Cassandra.new('MyKeyspace', '127.0.0.1:9160')

# ブロック内の処理がbatch_sizeごとに呼び出される
client.get_range('ExampleCF', :batch_size => 1000) do |row|
# 処理
end

書き込み


CassandraのThriftClientは、1処理を行う毎にCassandraと通信を行います。
大量データの場合、データ件数分通信を行うため、ネットワークIOがボトルネックになる可能性があります。

解決方法としては、アプリケーション側で複数命令をキャッシュして、一度に発行する方法があります。

Cassandraクライアントでは、batchメソッドが複数命令のキャッシュに対応しています。

使い方:

client = Cassandra.new('MyKeyspace', '127.0.0.1:9160')
client.batch do
(1..count).each do |i|
client.insert(COLUMN_FAMILY, SimpleUUID::UUID.new.to_s, data)
end
end

ただし、batchメソッドには1つ問題があります。
batchメソッドでは、ブロック内で発行された命令をキャッシュして、
ブロック抜けた後にキャッシュ内にある命令の処理を行うのですが、
件数が膨大である場合、キャッシュデータでメモリを圧迫する上、
通信データが肥大してCassandra側でタイムアウトする場合があります。

解決方法としては、キャッシュデータ量を制限して、指定データに達した場合に都度送信する方法があります。

この方法はCassandraクライアントに組み込まれていないため、拡張します。

キャッシュデータがbatch_insertの第4引数であるbatch_sizeに指定した件数に達した場合、キャッシュデータをflushします。
注意点は、ループ処理が抜けた後に必ずflushする必要があることです。
flushしない場合、多くの場合キャッシュデータが完全に送信されません。

・メリット
通信回数が減るため、ネットワークIOが遅い環境で効果が出やすい。
メモリを圧迫しない。

拡張内容:

require 'rubygems'
require 'cassandra'
require 'benchmark'

class Cassandra
def batch_insert(column_family, key, hash, batch_size, options = {})
@batch ||= []

insert(column_family, key, hash, options)

if batch_size == @batch.size
flush(options)
end
end

def flush(options = {})
return if @batch.nil? || @batch.size == 0

compacted_map,seen_clevels = compact_mutations!
clevel = if options[:consistency] != nil
options[:consistency]
elsif seen_clevels.length > 1
raise "Multiple consistency levels used in the batch, and no override...cannot pick one"
else
seen_clevels.first
end
_mutate(compacted_map,clevel)
ensure
@batch = nil
end
end

使い方:

client = Cassandra.new('MyKeyspace', '127.0.0.1:9160')
(1..count).each do |i|
client.batch_insert(COLUMN_FAMILY, SimpleUUID::UUID.new.to_s, data, 100)
end
client.flush

書き込みのベンチマーク

通常のinsert,Cassandra標準のbatch処理を使ったinsert,拡張したbatch_insertでベンチマークをとりました。

しかし、接続先が127.0.0.1であるため通信速度が速く、思ったより違いがでませんでした。。。
分散環境では違いが色濃くでると思うのですが。

ベンチマークのソースコード

・insert
 1,000件:0.486773
 5,000件:2.492276
10,000件:4.824156
50,000件:25.074361

・batch
 1,000件:0.186431
 5,000件:1.102207
10,000件:1.937598
50,000件:14.075151

・batch_insert
 1,000件:0.177827
 5,000件:0.851033
10,000件:1.687284
50,000件:12.067339