Fluentdのログを消失させたくない話

fluentdのリトライ処理に失敗した場合に、ログを消失させたくない+処理に失敗したログを復旧させたいという話です。

fluentdのBufferedOutput系のpluginは、retry_limitに記載されているリトライ回数を消費してしまった場合、"throwing away old logs."のwarnメッセージと共に、ログが消失します。
# buffer_type fileの場合は不明

それを防ぐため、secondaryというディレクティブが存在します。
secondaryは、matchディレクティブのretry_limitに記載されているリトライ回数を超過してしまった場合に実行されます。
そのため、処理に失敗した場合のバックアップ用途に使用できます。

例えば、fluent-plugin-xxxを使用してxxx上にデータを書き込む処理中に、xxxへの接続がダウンしている場合、接続がダウンしている原因やretry_limitの設定によっては、リトライ回数を超過してしまい、ログが消失してしまう可能性もあると思います。

その場合、secondaryでxxxに書き込む予定のデータをfileに書き出す設定をしていれば、書き込みに失敗したchunkの内容がファイルに書き込まれるため、ログの消失を防ぐことができます。

例:

<match out.xxx>
type xxx
retry_limit 9
~

<secondary>
type filie
path /var/log/td-agent/error.log
</secondary>
</match>

この場合、retry_limitのリトライ回数を超えた場合、
/var/log/td-agent/error.log._0.log
というファイルが出力されます。

このファイルは、リトライ回数を超過する度に連番が振られて出力されるので、以降に失敗した場合は、
/var/log/td-agent/error.log._1.log
/var/log/td-agent/error.log._2.log
と出力されます。

このファイルの内容を復元させたい場合、out.xxxに対してファイルの内容を出力すれば復元させることができます。
今回は、in_forwardとfluent-logger-rubyを使って、復元処理を実装してみました。

例えば、上記の設定で出力された"/var/log/td-agent/error.log._0.log"ファイルを復元する場合、設定ファイルにin_forwardの設定を追記して、バックアップファイルのデータをアンパックして送信するスクリプトを実行すれば、エラーで失敗した状態と同じデータが<martch out.xxx>のディレクティブに送信されます。

設定ファイル:

<source>
type forward
port 24224
</source>
スクリプト
require 'fluent-logger'
require "msgpack"

include Fluent::Logger

host = "localhost"
port = 24224
log_file_path = "/var/log/td-agent/error.log._0.log"

logger = FluentLogger.new(nil, :host => host, :port => port)

begin
MessagePack::Unpacker.new(open(log_file_path)).each do |tag, time, record|
puts "tag:#{tag}"
puts "time:#{time}"
puts "record:#{record}"
logger.post_with_time(tag, record, time)
end
rescue EOFError
puts "end of file."
end

これで、処理に失敗したログを復元させることができました。

ただし、secondaryで出力されるファイルはchunkの内容を出力するため、BufferedOutputのformatメソッドで、入力された値と異なる形式にフォーマットしている場合、この方法での復元はできないかもしれません。