132 lines
2.5 KiB
Go
132 lines
2.5 KiB
Go
package storage
|
|
|
|
import (
|
|
"log"
|
|
"time"
|
|
)
|
|
|
|
type RequestLog struct {
|
|
Timestamp int64
|
|
TokenName string
|
|
Model string
|
|
Provider string
|
|
ProviderModel string
|
|
InputTokens int
|
|
OutputTokens int
|
|
CostUSD float64
|
|
LatencyMS int64
|
|
Status string // success, error, cached
|
|
ErrorMessage string
|
|
Streaming bool
|
|
Cached bool
|
|
}
|
|
|
|
type AsyncLogger struct {
|
|
db *DB
|
|
ch chan RequestLog
|
|
done chan struct{}
|
|
OnFlush func() // called after successful flush, if set
|
|
}
|
|
|
|
func NewAsyncLogger(db *DB, bufferSize int) *AsyncLogger {
|
|
if bufferSize == 0 {
|
|
bufferSize = 1000
|
|
}
|
|
l := &AsyncLogger{
|
|
db: db,
|
|
ch: make(chan RequestLog, bufferSize),
|
|
done: make(chan struct{}),
|
|
}
|
|
go l.run()
|
|
return l
|
|
}
|
|
|
|
func (l *AsyncLogger) Log(r RequestLog) {
|
|
select {
|
|
case l.ch <- r:
|
|
default:
|
|
log.Println("WARNING: request log buffer full, dropping entry")
|
|
}
|
|
}
|
|
|
|
func (l *AsyncLogger) Close() {
|
|
close(l.ch)
|
|
<-l.done
|
|
}
|
|
|
|
func (l *AsyncLogger) run() {
|
|
defer close(l.done)
|
|
|
|
batch := make([]RequestLog, 0, 100)
|
|
ticker := time.NewTicker(1 * time.Second)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case r, ok := <-l.ch:
|
|
if !ok {
|
|
// Channel closed, flush remaining
|
|
if len(batch) > 0 {
|
|
l.flush(batch)
|
|
}
|
|
return
|
|
}
|
|
batch = append(batch, r)
|
|
if len(batch) >= 100 {
|
|
l.flush(batch)
|
|
batch = batch[:0]
|
|
}
|
|
case <-ticker.C:
|
|
if len(batch) > 0 {
|
|
l.flush(batch)
|
|
batch = batch[:0]
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (l *AsyncLogger) flush(batch []RequestLog) {
|
|
tx, err := l.db.Begin()
|
|
if err != nil {
|
|
log.Printf("ERROR: starting log transaction: %v", err)
|
|
return
|
|
}
|
|
|
|
stmt, err := tx.Prepare(`INSERT INTO request_logs
|
|
(timestamp, token_name, model, provider, provider_model, input_tokens, output_tokens, cost_usd, latency_ms, status, error_message, streaming, cached)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`)
|
|
if err != nil {
|
|
log.Printf("ERROR: preparing log statement: %v", err)
|
|
tx.Rollback()
|
|
return
|
|
}
|
|
defer stmt.Close()
|
|
|
|
for _, r := range batch {
|
|
streaming := 0
|
|
if r.Streaming {
|
|
streaming = 1
|
|
}
|
|
cached := 0
|
|
if r.Cached {
|
|
cached = 1
|
|
}
|
|
_, err := stmt.Exec(
|
|
r.Timestamp, r.TokenName, r.Model, r.Provider, r.ProviderModel,
|
|
r.InputTokens, r.OutputTokens, r.CostUSD, r.LatencyMS,
|
|
r.Status, r.ErrorMessage, streaming, cached,
|
|
)
|
|
if err != nil {
|
|
log.Printf("ERROR: inserting log: %v", err)
|
|
}
|
|
}
|
|
|
|
if err := tx.Commit(); err != nil {
|
|
log.Printf("ERROR: committing log batch: %v", err)
|
|
return
|
|
}
|
|
|
|
if l.OnFlush != nil {
|
|
l.OnFlush()
|
|
}
|
|
}
|