Refactoring
This commit is contained in:
@@ -6,6 +6,7 @@ type Config struct {
|
||||
AppID string
|
||||
AppName string
|
||||
AppDomain string
|
||||
PathPrefix string
|
||||
NetAddr string
|
||||
Port int
|
||||
RegistryAddr string
|
||||
@@ -13,6 +14,8 @@ type Config struct {
|
||||
|
||||
LoggerAddr string `json:"logger_addr"`
|
||||
DbURL string `json:"db_url"`
|
||||
CacheAddr string `json:"cache_addr"`
|
||||
CachePassword string `json:"cache_password"`
|
||||
MongoDbUrl string `json:"mongodb_url"`
|
||||
EventBusURL string `json:"eventbus_url"`
|
||||
EventBusExchange string `json:"eventbus_exchange"`
|
||||
|
||||
@@ -5,9 +5,21 @@ import (
|
||||
|
||||
"git.pbiernat.dev/egommerce/go-api-pkg/fluentd"
|
||||
"github.com/gofiber/fiber/v2"
|
||||
"github.com/gofiber/fiber/v2/middleware/cors"
|
||||
)
|
||||
|
||||
var (
|
||||
defaultCORS = cors.New(cors.Config{
|
||||
AllowOrigins: "*",
|
||||
AllowCredentials: true,
|
||||
AllowMethods: "GET, POST, PATCH, PUT, DELETE, OPTIONS",
|
||||
AllowHeaders: "Accept, Authorization, Content-Type, Vary, X-Request-Id",
|
||||
})
|
||||
)
|
||||
|
||||
func SetupRoutes(s *Server) {
|
||||
s.App.Options("*", defaultCORS)
|
||||
|
||||
s.App.Get("/health", s.HealthHandler)
|
||||
s.App.Get("/config", s.ConfigHandler)
|
||||
|
||||
@@ -17,6 +29,7 @@ func SetupRoutes(s *Server) {
|
||||
}
|
||||
|
||||
func SetupMiddlewares(s *Server) {
|
||||
s.App.Use(defaultCORS)
|
||||
s.App.Use(LoggingMiddleware(s.log))
|
||||
}
|
||||
|
||||
|
||||
@@ -2,14 +2,14 @@ package server
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/go-redis/redis/v8"
|
||||
"github.com/gofiber/fiber/v2"
|
||||
"github.com/jackc/pgx/v4/pgxpool"
|
||||
|
||||
@@ -22,6 +22,7 @@ type Server struct {
|
||||
conf *Config
|
||||
log *fluentd.Logger
|
||||
db *pgxpool.Pool
|
||||
cache *redis.Client
|
||||
discovery *discovery.Service
|
||||
name string
|
||||
addr string
|
||||
@@ -32,8 +33,8 @@ type Headers struct {
|
||||
RequestID string `reqHeader:"x-request-id"`
|
||||
}
|
||||
|
||||
func NewServer(conf *Config, logger *fluentd.Logger, db *pgxpool.Pool /*, ebCh *amqp.Channel*/) *Server {
|
||||
consul, err := discovery.NewService(conf.RegistryAddr, conf.AppID, conf.AppName, conf.AppID, conf.AppDomain, conf.Port)
|
||||
func NewServer(conf *Config, logger *fluentd.Logger, db *pgxpool.Pool, cache *redis.Client /*, ebCh *amqp.Channel*/) *Server {
|
||||
consul, err := discovery.NewService(conf.RegistryAddr, conf.AppID, conf.AppName, conf.AppID, conf.AppDomain, conf.PathPrefix, conf.Port)
|
||||
if err != nil {
|
||||
logger.Log("Error connecting to %s: %v", conf.RegistryAddr, err)
|
||||
}
|
||||
@@ -57,6 +58,7 @@ func NewServer(conf *Config, logger *fluentd.Logger, db *pgxpool.Pool /*, ebCh *
|
||||
conf,
|
||||
logger,
|
||||
db,
|
||||
cache,
|
||||
/*ebCh,*/
|
||||
consul,
|
||||
conf.AppName,
|
||||
@@ -64,14 +66,22 @@ func NewServer(conf *Config, logger *fluentd.Logger, db *pgxpool.Pool /*, ebCh *
|
||||
conf.KVNamespace,
|
||||
}
|
||||
|
||||
go func(s *Server) { // Consul KV config updater
|
||||
interval := time.Second * 30
|
||||
go func(s *Server) { // Consul Catalog and KV updater
|
||||
interval := time.Second * 15
|
||||
ticker := time.NewTicker(interval)
|
||||
for range ticker.C {
|
||||
s.updateKVConfig()
|
||||
}
|
||||
}(s)
|
||||
|
||||
go func(s *Server) { // Server metadata cache updater
|
||||
interval := time.Second * 5
|
||||
ticker := time.NewTicker(interval)
|
||||
for range ticker.C {
|
||||
s.cacheMetadata()
|
||||
}
|
||||
}(s)
|
||||
|
||||
SetupMiddlewares(s)
|
||||
SetupRoutes(s)
|
||||
|
||||
@@ -113,31 +123,43 @@ func (s *Server) GetRequestID(c *fiber.Ctx) (string, error) {
|
||||
return hdr.RequestID, nil
|
||||
}
|
||||
|
||||
func (s *Server) updateKVConfig() error {
|
||||
func (s *Server) updateKVConfig() { // FIXME: duplicated in cmd/worker/main.go
|
||||
config, _, err := s.discovery.KV().Get(s.kvNmspc, nil)
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
|
||||
return err
|
||||
if err != nil || config == nil {
|
||||
return
|
||||
}
|
||||
|
||||
if config == nil {
|
||||
return errors.New("empty KV config data")
|
||||
}
|
||||
|
||||
buf := bytes.NewBuffer(config.Value)
|
||||
decoder := json.NewDecoder(buf)
|
||||
kvCnf := bytes.NewBuffer(config.Value)
|
||||
decoder := json.NewDecoder(kvCnf)
|
||||
if err := decoder.Decode(&s.conf); err != nil {
|
||||
return err
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) cacheMetadata() {
|
||||
ctx := context.Background()
|
||||
key, address := "internal__"+s.conf.AppName+"__ips", s.conf.AppID // FIXME: key name
|
||||
|
||||
pos := s.cache.LPos(ctx, key, address, redis.LPosArgs{}).Val()
|
||||
if pos >= 0 {
|
||||
s.cache.LRem(ctx, key, 0, address)
|
||||
}
|
||||
|
||||
return nil
|
||||
s.cache.LPush(ctx, key, address).Err()
|
||||
}
|
||||
|
||||
func (s *Server) clearMetadataCache() {
|
||||
ctx := context.Background()
|
||||
key, address := "internal__"+s.conf.AppName+"__ips", s.conf.AppID // FIXME: key name
|
||||
|
||||
s.cache.LRem(ctx, key, 0, address)
|
||||
}
|
||||
|
||||
func (s *Server) gracefulShutdown() error {
|
||||
s.log.Log("Server is going down...")
|
||||
s.log.Log("Unregistering service: %s", s.discovery.GetID())
|
||||
s.discovery.Unregister()
|
||||
s.clearMetadataCache()
|
||||
|
||||
// s.ebCh.Close()
|
||||
s.db.Close()
|
||||
|
||||
Reference in New Issue
Block a user