Chronicle

Sinks

Fire-and-forget event output targets — stdout, file, S3, and custom.

Sinks are secondary output destinations for audit events. Unlike the primary store.Store, sinks are fire-and-forget: errors are logged but never cause Record() to fail. They run after the event has been successfully persisted to the store.

Sink interface

type Sink interface {
    Name() string
    Write(ctx context.Context, events []*audit.Event) error
    Flush(ctx context.Context) error
    Close() error
}

Sinks receive batches of events. Write delivers the batch, Flush drains any internal buffer, and Close releases resources.

Built-in sinks

Stdout

Writes JSON-encoded events to os.Stdout. Useful for development and log aggregation pipelines.

import "github.com/xraph/chronicle/sink"

s := sink.NewStdout()

File

Writes JSON-encoded events to a file, one event per line.

s, err := sink.NewFile("/var/log/chronicle/audit.jsonl")

S3

Uploads batches of events to an S3-compatible object store.

s := sink.NewS3(s3Client, "my-audit-bucket", "prefix/")

Multi

Fans out to multiple sinks. If one sink returns an error, the others continue.

s := sink.NewMulti(stdoutSink, fileSink, s3Sink)

Custom sink

Implement the Sink interface to write to any destination:

type KafkaSink struct {
    producer kafka.Producer
    topic    string
}

func (k *KafkaSink) Name() string { return "kafka" }

func (k *KafkaSink) Write(ctx context.Context, events []*audit.Event) error {
    for _, ev := range events {
        b, _ := json.Marshal(ev)
        k.producer.Produce(&kafka.Message{
            TopicPartition: kafka.TopicPartition{Topic: &k.topic},
            Value:          b,
        }, nil)
    }
    return nil
}

func (k *KafkaSink) Flush(ctx context.Context) error {
    k.producer.Flush(5000)
    return nil
}

func (k *KafkaSink) Close() error {
    k.producer.Close()
    return nil
}

Registering sinks via the plugin system

Use the SinkProvider plugin interface to attach sinks to Chronicle:

type MySinkProvider struct{}

func (p *MySinkProvider) Name() string { return "my-sink-provider" }
func (p *MySinkProvider) Sink() sink.Sink { return sink.NewS3(client, "bucket", "prefix/") }

registry := plugin.NewRegistry(logger)
registry.Register(&MySinkProvider{})

Chronicle discovers the SinkProvider interface at registration time and adds the sink to its fan-out.

Sinks in retention

The retention system also uses a sink.Sink as its archive sink. Events matching a policy with Archive = true are written to the archive sink before being purged from the primary store. Configure it with extension.WithArchiveSink(s).

On this page