Experiment with value batching

This commit is contained in:
Jan Zípek 2022-09-04 09:22:38 +02:00
parent 79c7bf3851
commit 3f163309ad
Signed by: kamen
GPG Key ID: A17882625B33AC31
1 changed files with 59 additions and 19 deletions

View File

@ -14,6 +14,13 @@ type sensorValue struct {
Value float64 `json:"value"` Value float64 `json:"value"`
} }
func min(a, b int64) int64 {
if a < b {
return a
}
return b
}
func (s *SensorValuesService) Push(sensorId int64, value float64) (int64, error) { func (s *SensorValuesService) Push(sensorId int64, value float64) (int64, error) {
res, err := s.ctx.DB.Exec("INSERT INTO sensor_values (timestamp, sensor_id, value) VALUES (?, ?, ?)", time.Now().Unix(), sensorId, value) res, err := s.ctx.DB.Exec("INSERT INTO sensor_values (timestamp, sensor_id, value) VALUES (?, ?, ?)", time.Now().Unix(), sensorId, value)
@ -25,34 +32,33 @@ func (s *SensorValuesService) Push(sensorId int64, value float64) (int64, error)
} }
func (s *SensorValuesService) GetList(sensorId int64, from int64, to int64) ([]sensorValue, error) { func (s *SensorValuesService) GetList(sensorId int64, from int64, to int64) ([]sensorValue, error) {
var value float64 intervalSize := to - from
var timestamp int64 batchSize := intervalSize
if intervalSize > 100*24*60 {
batchSize = 24 * 60
}
values := make([]sensorValue, 0) values := make([]sensorValue, 0)
current := from
rows, err := s.ctx.DB.Query("SELECT timestamp, value FROM sensor_values WHERE sensor_id = ? AND timestamp > ? AND timestamp < ? ORDER BY timestamp ASC", sensorId, from, to) for current < to {
nextValue := min(to, current+batchSize)
if err != nil { batchValues, err := s.getValueItems(sensorId, current, nextValue)
if err == sql.ErrNoRows {
return values, nil
}
return nil, err
}
defer rows.Close()
for rows.Next() {
err := rows.Scan(&timestamp, &value)
if err != nil { if err != nil {
return nil, err return nil, err
} }
values = append(values, sensorValue{Timestamp: timestamp, Value: value}) sum := 0.0
for _, item := range batchValues {
sum += item.Value
} }
err = rows.Err() values = append(values, sensorValue{Timestamp: int64((nextValue + current) / 2.0), Value: sum / float64(len(batchValues))})
if err != nil { current += batchSize
return nil, err
} }
return values, nil return values, nil
@ -70,3 +76,37 @@ func (s *SensorValuesService) GetLatest(sensorId int64, to int64) (*sensorValue,
return &value, nil return &value, nil
} }
func (s *SensorValuesService) getValueItems(sensorId int64, from int64, to int64) ([]sensorValue, error) {
values := make([]sensorValue, 0)
rows, err := s.ctx.DB.Query("SELECT timestamp, value FROM sensor_values WHERE sensor_id = ? AND timestamp > ? AND timestamp < ? ORDER BY timestamp ASC", sensorId, from, to)
if err != nil {
if err == sql.ErrNoRows {
return values, nil
}
return nil, err
}
defer rows.Close()
for rows.Next() {
item := sensorValue{}
err := rows.Scan(&item.Timestamp, &item.Value)
if err != nil {
return nil, err
}
values = append(values, item)
}
err = rows.Err()
if err != nil {
return nil, err
}
return values, nil
}