Single connection to external services
This commit is contained in:
@@ -3,6 +3,7 @@ package app
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"log"
|
"log"
|
||||||
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
redis "github.com/go-redis/redis/v8"
|
redis "github.com/go-redis/redis/v8"
|
||||||
@@ -41,70 +42,84 @@ func (pm *PluginManager) GetDatabase() *pgxpool.Pool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func CachePlugin(cnf *Config, w WorkerInterface) Plugin {
|
func CachePlugin(cnf *Config, w WorkerInterface) Plugin {
|
||||||
|
var redisConn atomic.Value
|
||||||
|
|
||||||
connectFn := func(w WorkerInterface) *redis.Client {
|
connectFn := func(w WorkerInterface) *redis.Client {
|
||||||
log.Println("establishing api-cache connection...")
|
log.Println("establishing api-cache connection...")
|
||||||
|
|
||||||
return redis.NewClient(&redis.Options{
|
conn := redis.NewClient(&redis.Options{
|
||||||
Addr: cnf.CacheAddr,
|
Addr: cnf.CacheAddr,
|
||||||
Username: cnf.CacheUsername,
|
Username: cnf.CacheUsername,
|
||||||
Password: cnf.CachePassword,
|
Password: cnf.CachePassword,
|
||||||
DB: 0, // TODO
|
DB: 0, // TODO
|
||||||
DialTimeout: 100 * time.Millisecond, // TODO
|
DialTimeout: 100 * time.Millisecond, // TODO
|
||||||
})
|
})
|
||||||
|
redisConn.Store(conn)
|
||||||
|
|
||||||
|
return conn
|
||||||
}
|
}
|
||||||
|
|
||||||
|
connectFn(w)
|
||||||
|
|
||||||
// checking if the connection is still alive and try to reconnect when it is not
|
// checking if the connection is still alive and try to reconnect when it is not
|
||||||
go func(conn *redis.Client) {
|
go func() {
|
||||||
tick := time.NewTicker(5 * time.Second) // is 5 seconds is not too much?
|
tick := time.NewTicker(5 * time.Second) // is 5 seconds is not too much?
|
||||||
defer tick.Stop()
|
defer tick.Stop()
|
||||||
|
|
||||||
for range tick.C {
|
for range tick.C {
|
||||||
|
conn := redisConn.Load().(*redis.Client)
|
||||||
if err := conn.Ping(context.Background()).Err(); err != nil {
|
if err := conn.Ping(context.Background()).Err(); err != nil {
|
||||||
log.Println("lost connection with api-cache. Reconnecting...")
|
log.Println("lost connection with api-cache. Reconnecting...")
|
||||||
conn = connectFn(w)
|
connectFn(w)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}(connectFn(w))
|
}()
|
||||||
|
|
||||||
return Plugin{
|
return Plugin{
|
||||||
name: "cache",
|
name: "cache",
|
||||||
connect: func(w *WorkerInterface) any {
|
connect: func(w *WorkerInterface) any {
|
||||||
return connectFn(*w)
|
return redisConn.Load().(*redis.Client)
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func DatabasePlugin(cnf *Config, w WorkerInterface) Plugin {
|
func DatabasePlugin(cnf *Config, w WorkerInterface) Plugin {
|
||||||
|
var dbConn atomic.Value
|
||||||
|
|
||||||
connectFn := func(w WorkerInterface) *pgxpool.Pool {
|
connectFn := func(w WorkerInterface) *pgxpool.Pool {
|
||||||
log.Println("establishing db-postgres connection...")
|
log.Println("establishing db-postgres connection...")
|
||||||
|
|
||||||
conn, err := pgxpool.New(context.Background(), cnf.DbURL)
|
conn, err := pgxpool.New(context.Background(), cnf.DbURL)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("failed to connect to the database: %s. Err: %s\n", cnf.DbURL, err.Error())
|
log.Printf("failed to connect to the database: %s. Err: %s\n", cnf.DbURL, err.Error())
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
// os.Exit(1)
|
|
||||||
}
|
}
|
||||||
|
dbConn.Store(conn)
|
||||||
|
|
||||||
return conn
|
return conn
|
||||||
}
|
}
|
||||||
|
|
||||||
|
connectFn(w)
|
||||||
|
|
||||||
// checking if the connection is still alive and try to reconnect when it is not
|
// checking if the connection is still alive and try to reconnect when it is not
|
||||||
go func(conn *pgxpool.Pool) {
|
go func() {
|
||||||
tick := time.NewTicker(5 * time.Second) // is 5 seconds is not too much?
|
tick := time.NewTicker(5 * time.Second) // is 5 seconds is not too much?
|
||||||
defer tick.Stop()
|
defer tick.Stop()
|
||||||
|
|
||||||
for range tick.C {
|
for range tick.C {
|
||||||
|
conn := dbConn.Load().(*pgxpool.Pool)
|
||||||
if err := conn.Ping(context.Background()); err != nil {
|
if err := conn.Ping(context.Background()); err != nil {
|
||||||
log.Println("lost connection with db-postgres. Reconnecting...")
|
log.Println("lost connection with db-postgres. Reconnecting...")
|
||||||
conn = connectFn(w)
|
connectFn(w)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}(connectFn(w))
|
}()
|
||||||
|
|
||||||
return Plugin{
|
return Plugin{
|
||||||
name: "database",
|
name: "database",
|
||||||
connect: func(w *WorkerInterface) any {
|
connect: func(w *WorkerInterface) any {
|
||||||
return connectFn(*w)
|
return dbConn.Load().(*pgxpool.Pool)
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user