syslog started
This commit is contained in:
@@ -1,35 +1,28 @@
|
|||||||
package archiver
|
package archiver
|
||||||
|
|
||||||
import "log"
|
import (
|
||||||
import "time"
|
"log"
|
||||||
//import "os"
|
"ma/mqtt"
|
||||||
//import "fmt"
|
"time"
|
||||||
//import "net/url"
|
)
|
||||||
import "ma/mqtt"
|
|
||||||
//import "ma/config"
|
|
||||||
//import "ma/counter"
|
|
||||||
import "ma/database"
|
|
||||||
|
|
||||||
var dbh *database.DatabaseHandle
|
|
||||||
|
|
||||||
|
type Message struct {
|
||||||
|
Time time.Time
|
||||||
|
Topic string
|
||||||
|
Payload string
|
||||||
|
}
|
||||||
|
|
||||||
func InitArchiver() {
|
func InitArchiver() {
|
||||||
log.Printf("Archiver initializing")
|
log.Printf("Archiver initializing")
|
||||||
dbh = database.NewDatabaseHandle()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func InputArchiver() {
|
func InputArchiver() {
|
||||||
for {
|
for mqttMessage := range mqtt.InputChannel {
|
||||||
select {
|
message := Message{time.Now(), mqttMessage.Topic, string(mqttMessage.Payload)}
|
||||||
case mqttMessage := <- mqtt.InputChannel:
|
|
||||||
message := database.Message { time.Now(), mqttMessage.Topic, string(mqttMessage.Payload) }
|
|
||||||
handleMessage(message)
|
handleMessage(message)
|
||||||
}
|
}
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func handleMessage(message database.Message) {
|
func handleMessage(message Message) {
|
||||||
// log.Printf("Archiving Timestamp: %s, Topic: %s, Payload: %s", message.Time, message.Topic, message.Payload)
|
log.Printf("Archiving Timestamp: %s, Topic: %s, Payload: %s", message.Time, message.Topic, message.Payload)
|
||||||
dbh.StoreMessage(&message)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,12 +0,0 @@
|
|||||||
package database
|
|
||||||
|
|
||||||
import "time"
|
|
||||||
// import "gorm.io/gorm"
|
|
||||||
|
|
||||||
|
|
||||||
type Message struct {
|
|
||||||
Time time.Time `gorm:"not null;primary_key"`
|
|
||||||
Topic string `gorm:"not null"`
|
|
||||||
Payload string `gorm:"not null"`
|
|
||||||
}
|
|
||||||
|
|
||||||
@@ -1,53 +0,0 @@
|
|||||||
package database
|
|
||||||
|
|
||||||
|
|
||||||
import (
|
|
||||||
"log"
|
|
||||||
//"time"
|
|
||||||
//"fmt"
|
|
||||||
"ma/counter"
|
|
||||||
"gorm.io/driver/postgres"
|
|
||||||
"gorm.io/gorm"
|
|
||||||
)
|
|
||||||
|
|
||||||
type DatabaseHandle struct {
|
|
||||||
initialized bool
|
|
||||||
dbh *gorm.DB
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewDatabaseHandle() *DatabaseHandle {
|
|
||||||
var db DatabaseHandle
|
|
||||||
// inject the whole database configuration via the well-known PG* env variables
|
|
||||||
conn, err := gorm.Open(postgres.Open(""))
|
|
||||||
if err != nil {
|
|
||||||
log.Printf("Unable to open database connection: %s", err)
|
|
||||||
db.initialized = false
|
|
||||||
} else {
|
|
||||||
db.dbh = conn
|
|
||||||
db.initialized = true
|
|
||||||
//log.Println("Database connection opened")
|
|
||||||
}
|
|
||||||
return &db
|
|
||||||
}
|
|
||||||
|
|
||||||
func (self *DatabaseHandle) StoreMessage(message *Message) {
|
|
||||||
if ! self.initialized {
|
|
||||||
log.Printf("Database connection not initialized, can not store, message %s lost", message)
|
|
||||||
counter.F("Stored")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
result := self.dbh.Create(message)
|
|
||||||
if result.Error != nil {
|
|
||||||
log.Printf("Unable to insert, message %s lost, error: %s", message, result.Error)
|
|
||||||
counter.F("Stored")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
//log.Println("Successfully stored message")
|
|
||||||
counter.S("Stored")
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@@ -1,22 +0,0 @@
|
|||||||
package database
|
|
||||||
|
|
||||||
import (
|
|
||||||
"log"
|
|
||||||
//"time"
|
|
||||||
"gorm.io/driver/postgres"
|
|
||||||
"gorm.io/gorm"
|
|
||||||
)
|
|
||||||
|
|
||||||
func Migrate() {
|
|
||||||
dsn := ""
|
|
||||||
db, err := gorm.Open(postgres.Open(dsn))
|
|
||||||
if err != nil {
|
|
||||||
log.Fatalf("Unable to open database connection: %s", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
db.AutoMigrate(&Message{})
|
|
||||||
log.Println("Message created")
|
|
||||||
|
|
||||||
log.Println("Remember to call create_hypertable on message, sowhat I can't do that for you.")
|
|
||||||
}
|
|
||||||
|
|
||||||
@@ -1,7 +0,0 @@
|
|||||||
create extension if not exists timescaledb;
|
|
||||||
|
|
||||||
|
|
||||||
select create_hypertable('message', 'time');
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@@ -5,20 +5,10 @@ go 1.21.3
|
|||||||
require (
|
require (
|
||||||
github.com/eclipse/paho.mqtt.golang v1.5.0
|
github.com/eclipse/paho.mqtt.golang v1.5.0
|
||||||
github.com/google/uuid v1.6.0
|
github.com/google/uuid v1.6.0
|
||||||
gorm.io/driver/postgres v1.5.10
|
|
||||||
gorm.io/gorm v1.25.12
|
|
||||||
)
|
)
|
||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/gorilla/websocket v1.5.3 // indirect
|
github.com/gorilla/websocket v1.5.3 // indirect
|
||||||
github.com/jackc/pgpassfile v1.0.0 // indirect
|
|
||||||
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
|
|
||||||
github.com/jackc/pgx/v5 v5.5.5 // indirect
|
|
||||||
github.com/jackc/puddle/v2 v2.2.1 // indirect
|
|
||||||
github.com/jinzhu/inflection v1.0.0 // indirect
|
|
||||||
github.com/jinzhu/now v1.1.5 // indirect
|
|
||||||
golang.org/x/crypto v0.25.0 // indirect
|
|
||||||
golang.org/x/net v0.27.0 // indirect
|
golang.org/x/net v0.27.0 // indirect
|
||||||
golang.org/x/sync v0.7.0 // indirect
|
golang.org/x/sync v0.7.0 // indirect
|
||||||
golang.org/x/text v0.16.0 // indirect
|
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -1,44 +1,10 @@
|
|||||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
|
||||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
|
||||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
|
||||||
github.com/eclipse/paho.mqtt.golang v1.5.0 h1:EH+bUVJNgttidWFkLLVKaQPGmkTUfQQqjOsyvMGvD6o=
|
github.com/eclipse/paho.mqtt.golang v1.5.0 h1:EH+bUVJNgttidWFkLLVKaQPGmkTUfQQqjOsyvMGvD6o=
|
||||||
github.com/eclipse/paho.mqtt.golang v1.5.0/go.mod h1:du/2qNQVqJf/Sqs4MEL77kR8QTqANF7XU7Fk0aOTAgk=
|
github.com/eclipse/paho.mqtt.golang v1.5.0/go.mod h1:du/2qNQVqJf/Sqs4MEL77kR8QTqANF7XU7Fk0aOTAgk=
|
||||||
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
|
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
|
||||||
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||||
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
|
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
|
||||||
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
|
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
|
||||||
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
|
|
||||||
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
|
|
||||||
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk=
|
|
||||||
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
|
|
||||||
github.com/jackc/pgx/v5 v5.5.5 h1:amBjrZVmksIdNjxGW/IiIMzxMKZFelXbUoPNb+8sjQw=
|
|
||||||
github.com/jackc/pgx/v5 v5.5.5/go.mod h1:ez9gk+OAat140fv9ErkZDYFWmXLfV+++K0uAOiwgm1A=
|
|
||||||
github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk=
|
|
||||||
github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
|
|
||||||
github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E=
|
|
||||||
github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc=
|
|
||||||
github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ=
|
|
||||||
github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8=
|
|
||||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
|
||||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
|
||||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
|
||||||
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
|
|
||||||
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
|
||||||
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
|
|
||||||
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
|
|
||||||
golang.org/x/crypto v0.25.0 h1:ypSNr+bnYL2YhwoMt2zPxHFmbAN1KZs/njMG3hxUp30=
|
|
||||||
golang.org/x/crypto v0.25.0/go.mod h1:T+wALwcMOSE0kXgUAnPAHqTLW+XHgcELELW8VaDgm/M=
|
|
||||||
golang.org/x/net v0.27.0 h1:5K3Njcw06/l2y9vpGCSdcxWOYHOUk3dVNGDXN+FvAys=
|
golang.org/x/net v0.27.0 h1:5K3Njcw06/l2y9vpGCSdcxWOYHOUk3dVNGDXN+FvAys=
|
||||||
golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE=
|
golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE=
|
||||||
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
|
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
|
||||||
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
|
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
|
||||||
golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4=
|
|
||||||
golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI=
|
|
||||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
|
||||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
|
||||||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
|
||||||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
|
||||||
gorm.io/driver/postgres v1.5.10 h1:7Lggqempgy496c0WfHXsYWxk3Th+ZcW66/21QhVFdeE=
|
|
||||||
gorm.io/driver/postgres v1.5.10/go.mod h1:DX3GReXH+3FPWGrrgffdvCk3DQ1dwDPdmbenSkweRGI=
|
|
||||||
gorm.io/gorm v1.25.12 h1:I0u8i2hWQItBq1WfE0o2+WuL9+8L21K9e2HHSTE/0f8=
|
|
||||||
gorm.io/gorm v1.25.12/go.mod h1:xh7N7RHfYlNc5EmcI/El95gXusucDrQnHXe0+CgWcLQ=
|
|
||||||
|
|||||||
Reference in New Issue
Block a user