Huge refactoring, resolved tight coupling
This commit is contained in:
81
src/internal/app/app.go
Normal file
81
src/internal/app/app.go
Normal file
@@ -0,0 +1,81 @@
|
||||
package app
|
||||
|
||||
import (
|
||||
"log"
|
||||
"os"
|
||||
"os/signal"
|
||||
"strconv"
|
||||
"syscall"
|
||||
)
|
||||
|
||||
type (
|
||||
Doer interface {
|
||||
Start() error
|
||||
RegisterHandler(string, func() any)
|
||||
OnShutdown()
|
||||
}
|
||||
Application interface {
|
||||
Start(while chan struct{})
|
||||
RegisterPlugin(PluginFn) error
|
||||
Shutdown()
|
||||
}
|
||||
|
||||
App struct {
|
||||
doer Doer
|
||||
}
|
||||
)
|
||||
|
||||
func NewApp(d Doer) *App {
|
||||
return &App{
|
||||
doer: d,
|
||||
}
|
||||
}
|
||||
|
||||
func (a *App) Start(while chan struct{}) error {
|
||||
go func() {
|
||||
sigint := make(chan os.Signal, 1)
|
||||
signal.Notify(sigint, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
|
||||
<-sigint
|
||||
|
||||
a.Shutdown()
|
||||
|
||||
close(while)
|
||||
}()
|
||||
|
||||
run := a.createRunFile("./app.run") // FIXME path...
|
||||
defer a.removeRunFile(run)
|
||||
|
||||
err := a.doer.Start()
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to start app. Reason: %v\n", err)
|
||||
close(while)
|
||||
}
|
||||
<-while
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (a *App) RegisterPlugin(p Plugin) error {
|
||||
a.doer.RegisterHandler(p.name, p.fn)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *App) Shutdown() {
|
||||
a.doer.OnShutdown()
|
||||
}
|
||||
|
||||
func (a *App) createRunFile(path string) *os.File {
|
||||
run, err := os.Create(path)
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to create run file. Reason: %v\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
run.WriteString(strconv.Itoa(os.Getpid()))
|
||||
|
||||
return run
|
||||
}
|
||||
|
||||
func (a *App) removeRunFile(f *os.File) error {
|
||||
return f.Close()
|
||||
}
|
||||
138
src/internal/app/plugins.go
Normal file
138
src/internal/app/plugins.go
Normal file
@@ -0,0 +1,138 @@
|
||||
package app
|
||||
|
||||
import (
|
||||
"log"
|
||||
"os"
|
||||
"strconv"
|
||||
|
||||
redis "github.com/go-redis/redis/v8"
|
||||
|
||||
"git.pbiernat.io/egommerce/go-api-pkg/consul"
|
||||
"git.pbiernat.io/egommerce/go-api-pkg/fluentd"
|
||||
|
||||
db "git.pbiernat.io/egommerce/identity-service/pkg/database"
|
||||
)
|
||||
|
||||
type (
|
||||
Plugin struct {
|
||||
name string
|
||||
fn PluginFn
|
||||
}
|
||||
PluginFn func() any
|
||||
)
|
||||
|
||||
func CachePlugin(cArr map[string]string) Plugin {
|
||||
return Plugin{
|
||||
name: "cache",
|
||||
fn: func() any {
|
||||
return redis.NewClient(&redis.Options{
|
||||
Addr: cArr["cacheAddr"],
|
||||
Password: cArr["cachePassword"],
|
||||
DB: 0,
|
||||
})
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func DatabasePlugin(cArr map[string]string) Plugin {
|
||||
return Plugin{
|
||||
name: "database",
|
||||
fn: func() any {
|
||||
dbConn, err := db.Connect(cArr["dbURL"])
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to connect to the Database: %s. Err: %v\n", cArr["dbURL"], err)
|
||||
os.Exit(1) // TODO: retry in background...
|
||||
}
|
||||
|
||||
return dbConn
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// func EventbusPlugin(cArr map[string]string) Plugin {
|
||||
// return Plugin{
|
||||
// name: "eventbus",
|
||||
// fn: func() any {
|
||||
// conn, err := amqp.Dial(cArr["eventBusURL"])
|
||||
// if err != nil {
|
||||
// log.Fatalf("Failed to connect to the EventBus: %s. Err: %v\n", cArr["eventBusURL"], err)
|
||||
// os.Exit(1) // TODO: retry in background...
|
||||
// }
|
||||
|
||||
// chn, err := conn.Channel()
|
||||
// if err != nil {
|
||||
// log.Fatalf("Failed to open new EventBus channel. Err: %v\n", err)
|
||||
// os.Exit(1) // TODO: retry in background...
|
||||
// }
|
||||
|
||||
// return chn
|
||||
// },
|
||||
// }
|
||||
// }
|
||||
|
||||
func LoggerPlugin(cArr map[string]string) Plugin {
|
||||
return Plugin{
|
||||
name: "logger",
|
||||
fn: func() any {
|
||||
logHost, logPort, err := fluentd.ParseAddr(cArr["loggerAddr"])
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to parse FluentD address: %s. Err: %v", cArr["loggerAddr"], err)
|
||||
os.Exit(1) // TODO: retry in background...
|
||||
}
|
||||
|
||||
logger, err := fluentd.NewLogger(cArr["appFullname"], logHost, logPort)
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to connect to the FluentD on %s:%d. Err: %v", logHost, logPort, err)
|
||||
os.Exit(1) // TODO: retry in background...
|
||||
}
|
||||
|
||||
return logger
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func RegistryPlugin(cArr map[string]string) Plugin {
|
||||
return Plugin{
|
||||
name: "registry",
|
||||
fn: func() any {
|
||||
port, _ := strconv.Atoi(cArr["netAddr"][1:]) // FIXME: can be IP:PORT or :PORT
|
||||
// log.Printf("Consul retrieved port: %v", port)
|
||||
registry, err := consul.NewService(cArr["registryAddr"], cArr["id"], cArr["name"], cArr["registryDomainOverIP"], cArr["ip"], cArr["domain"], cArr["pathPrefix"], port)
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to connect to the Consul on: %s. Err: %v", cArr["registryAddr"], err)
|
||||
os.Exit(1) // TODO: retry in background...
|
||||
}
|
||||
|
||||
err = registry.Register()
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to register in the Consul service. Err: %v", err)
|
||||
os.Exit(1) // TODO: retry in background...
|
||||
}
|
||||
|
||||
registry.RegisterHealthChecks()
|
||||
// a.registerKVUpdater() // FIXME run as goroutine
|
||||
|
||||
return registry
|
||||
|
||||
// svc, _ := registry.Connect()
|
||||
// tlsCnf := svc.ServerTLSConfig()
|
||||
// s.Base.App.Server().TLSConfig = tlsCnf
|
||||
// fmt.Println("Podmiana configa TLS")
|
||||
// defer svc.Close()
|
||||
|
||||
// go func() { // Consul KV updater
|
||||
// ticker := time.NewTicker(time.Second * 15)
|
||||
// for range ticker.C {
|
||||
// fetchKVConfig(s) // FIXME: duplicated in worker
|
||||
// }
|
||||
// }()
|
||||
|
||||
// go func() { // Server metadata cache updater
|
||||
// ticker := time.NewTicker(time.Second * 5)
|
||||
// for range ticker.C {
|
||||
// s.cacheMetadata()
|
||||
// }
|
||||
// }()
|
||||
},
|
||||
}
|
||||
}
|
||||
@@ -1,55 +0,0 @@
|
||||
package common
|
||||
|
||||
import (
|
||||
"os"
|
||||
|
||||
cnf "git.pbiernat.dev/egosport/identity-svc/pkg/config"
|
||||
srv "git.pbiernat.dev/egosport/identity-svc/pkg/server"
|
||||
)
|
||||
|
||||
const (
|
||||
defAppName = "api-svc"
|
||||
defAppDomain = "api-svc"
|
||||
defCacheAddr = "cache:6379"
|
||||
defCachePassword = "12345678"
|
||||
defDbURL = "postgres://egosport:12345678@db.egosport:5432/egosport"
|
||||
defKVNmspc = "egosport/api"
|
||||
defLoggerAddr = "logger.egosport:24224"
|
||||
defNetAddr = ":80"
|
||||
defPathPrefix = "/api"
|
||||
)
|
||||
|
||||
type Config struct {
|
||||
Base *srv.Config
|
||||
|
||||
DbURL string `json:"db_url"`
|
||||
CacheAddr string `json:"cache_addr"`
|
||||
CachePassword string `json:"cache_password"`
|
||||
EventBusExchange string `json:"eventbus_exchange"`
|
||||
EventBusQueue string `json:"eventbus_queue"`
|
||||
EventBusURL string `json:"eventbus_url"`
|
||||
LoggerAddr string `json:"logger_addr"`
|
||||
KVNamespace string
|
||||
RegistryAddr string
|
||||
|
||||
// Fields with JSON mappings are available through Consul KV storage
|
||||
}
|
||||
|
||||
func NewConfig(name string) *Config {
|
||||
c := new(Config)
|
||||
c.Base = new(srv.Config)
|
||||
|
||||
c.Base.AppID, _ = os.Hostname()
|
||||
c.Base.AppName = name
|
||||
c.Base.NetAddr = cnf.GetEnv("SERVER_ADDR", defNetAddr)
|
||||
c.Base.PathPrefix = cnf.GetEnv("APP_PATH_PREFIX", defPathPrefix)
|
||||
|
||||
c.CacheAddr = cnf.GetEnv("CACHE_ADDR", defCacheAddr)
|
||||
c.CachePassword = cnf.GetEnv("CACHE_PASSWORD", defCachePassword)
|
||||
c.DbURL = cnf.GetEnv("DATABASE_URL", defDbURL)
|
||||
c.KVNamespace = cnf.GetEnv("APP_KV_NAMESPACE", defKVNmspc)
|
||||
c.LoggerAddr = cnf.GetEnv("LOGGER_ADDR", defLoggerAddr)
|
||||
// c.RegistryAddr = cnf.GetEnv("REGISTRY_ADDR", defRegistryAddr)
|
||||
|
||||
return c
|
||||
}
|
||||
@@ -4,7 +4,7 @@ import (
|
||||
"log"
|
||||
)
|
||||
|
||||
const AppName = "identity-svc"
|
||||
const AppName = "identity-service"
|
||||
|
||||
func Panic(v ...any) {
|
||||
log.Panicln(AppName+":", v)
|
||||
@@ -1,16 +1,42 @@
|
||||
package server
|
||||
|
||||
import "fmt"
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
cnf "git.pbiernat.io/egommerce/go-api-pkg/config"
|
||||
)
|
||||
|
||||
const (
|
||||
defName = "identity-svc"
|
||||
defDomain = "identity-svc"
|
||||
defCacheAddr = "egommerce.local:6379"
|
||||
defCachePassword = "12345678"
|
||||
defDbURL = "postgres://postgres:12345678@db-postgres:5432/egommerce"
|
||||
defEventBusURL = "amqp://guest:guest@api-eventbus:5672"
|
||||
defKVNmspc = "dev.egommerce/service/identity"
|
||||
defLoggerAddr = "api-logger:24224"
|
||||
defNetAddr = ":80"
|
||||
defMongoDbURL = "mongodb://mongodb:12345678@mongo-db:27017"
|
||||
defPathPrefix = "/identity"
|
||||
defRegistryAddr = "api-registry:8500"
|
||||
defEbEventsExchange = "api-events"
|
||||
defEbEventsQueue = "identity-svc"
|
||||
)
|
||||
|
||||
type Config struct {
|
||||
AppID string
|
||||
AppName string
|
||||
AppDomain string
|
||||
PathPrefix string
|
||||
NetAddr string
|
||||
Port int
|
||||
RegistryAddr string
|
||||
KVNamespace string
|
||||
ID string
|
||||
Name string
|
||||
Domain string
|
||||
NetAddr string
|
||||
RegistryDomainOverIP string
|
||||
PathPrefix string
|
||||
|
||||
IdleTimeout time.Duration // miliseconds
|
||||
ReadTimeout time.Duration // miliseconds
|
||||
WriteTimeout time.Duration // miliseconds
|
||||
|
||||
LoggerAddr string `json:"logger_addr"`
|
||||
DbURL string `json:"db_url"`
|
||||
@@ -20,12 +46,66 @@ type Config struct {
|
||||
EventBusURL string `json:"eventbus_url"`
|
||||
EventBusExchange string `json:"eventbus_exchange"`
|
||||
EventBusQueue string `json:"eventbus_queue"`
|
||||
HttpReadTimeout int `json:"http_read_timeout"`
|
||||
HttpWriteTimeout int `json:"http_write_timeout"`
|
||||
HttpIdleTimeout int `json:"http_idle_timeout"`
|
||||
// Fields with json mapping are available trough ConsulKV
|
||||
KVNamespace string
|
||||
RegistryAddr string
|
||||
|
||||
// Fields with JSON mappings are available through Consul KV storage
|
||||
}
|
||||
|
||||
func NewConfig(name string) *Config {
|
||||
c := new(Config)
|
||||
|
||||
c.ID, _ = os.Hostname()
|
||||
c.Name = name
|
||||
c.Domain = cnf.GetEnv("APP_DOMAIN", defDomain)
|
||||
c.NetAddr = cnf.GetEnv("SERVER_ADDR", defNetAddr)
|
||||
c.RegistryDomainOverIP = cnf.GetEnv("REGISTRY_USE_DOMAIN_OVER_IP", "false")
|
||||
c.PathPrefix = cnf.GetEnv("APP_PATH_PREFIX", defPathPrefix)
|
||||
|
||||
c.CacheAddr = cnf.GetEnv("CACHE_ADDR", defCacheAddr)
|
||||
c.CachePassword = cnf.GetEnv("CACHE_PASSWORD", defCachePassword)
|
||||
c.DbURL = cnf.GetEnv("DATABASE_URL", defDbURL)
|
||||
c.EventBusExchange = defEbEventsExchange
|
||||
c.EventBusURL = cnf.GetEnv("EVENTBUS_URL", defEventBusURL)
|
||||
c.KVNamespace = cnf.GetEnv("APP_KV_NAMESPACE", defKVNmspc)
|
||||
c.LoggerAddr = cnf.GetEnv("LOGGER_ADDR", defLoggerAddr)
|
||||
c.RegistryAddr = cnf.GetEnv("REGISTRY_ADDR", defRegistryAddr)
|
||||
|
||||
return c
|
||||
}
|
||||
|
||||
func (c *Config) GetAppFullName() string {
|
||||
return fmt.Sprintf("%s_%s", c.AppName, c.AppID)
|
||||
return fmt.Sprintf("%s_%s", c.Name, c.ID)
|
||||
}
|
||||
|
||||
func (c *Config) GetIP() string {
|
||||
host, _ := os.Hostname()
|
||||
ips, _ := net.LookupIP(host)
|
||||
// for _, ip := range ips {
|
||||
// return ip.String()
|
||||
// }
|
||||
|
||||
return ips[0].String()
|
||||
}
|
||||
|
||||
func (c *Config) GetArray() map[string]string { // FIXME fix types etc
|
||||
arr := make(map[string]string)
|
||||
arr["id"] = c.ID
|
||||
arr["name"] = c.Name
|
||||
arr["appFullname"] = c.GetAppFullName()
|
||||
arr["domain"] = c.Domain
|
||||
arr["ip"] = c.GetIP()
|
||||
arr["netAddr"] = c.NetAddr
|
||||
arr["registryDomainOverIP"] = c.RegistryDomainOverIP
|
||||
arr["pathPrefix"] = c.PathPrefix
|
||||
arr["cacheAddr"] = c.CacheAddr
|
||||
arr["cachePassword"] = c.CachePassword
|
||||
arr["dbURL"] = c.DbURL
|
||||
arr["eventBusExchange"] = c.EventBusExchange
|
||||
arr["eventBusURL"] = c.EventBusURL
|
||||
arr["kvNamespace"] = c.KVNamespace
|
||||
arr["loggerAddr"] = c.LoggerAddr
|
||||
arr["registryAddr"] = c.RegistryAddr
|
||||
|
||||
return arr
|
||||
}
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"git.pbiernat.dev/egosport/identity-svc/internal/service"
|
||||
"git.pbiernat.io/egommerce/identity-service/internal/service"
|
||||
"github.com/gofiber/fiber/v2"
|
||||
)
|
||||
|
||||
@@ -26,6 +26,7 @@ func (s *Server) LoginHandler(c *fiber.Ctx) error {
|
||||
}
|
||||
|
||||
cookie := service.AuthService.Cookie("auth_token", token)
|
||||
// ^^ FIXME move cookkie creation to separate service
|
||||
c.Cookie(cookie)
|
||||
|
||||
return c.JSON(&AuthLoginResponse{JWTToken: token})
|
||||
|
||||
@@ -5,15 +5,15 @@ import (
|
||||
|
||||
"github.com/gofiber/fiber/v2"
|
||||
|
||||
"git.pbiernat.dev/egommerce/go-api-pkg/fluentd"
|
||||
"git.pbiernat.io/egommerce/go-api-pkg/fluentd"
|
||||
)
|
||||
|
||||
// "github.com/gofiber/fiber/v2"
|
||||
// "github.com/gofiber/fiber/v2/middleware/cors"
|
||||
|
||||
func SetupMiddleware(s *Server) {
|
||||
s.Base.Use(defaultCORS)
|
||||
s.Base.Use(LoggingMiddleware(s.Logger))
|
||||
s.Use(defaultCORS)
|
||||
s.Use(LoggingMiddleware(s.GetLogger()))
|
||||
}
|
||||
|
||||
func LoggingMiddleware(log *fluentd.Logger) func(c *fiber.Ctx) error {
|
||||
|
||||
@@ -6,21 +6,21 @@ import (
|
||||
|
||||
var (
|
||||
defaultCORS = cors.New(cors.Config{
|
||||
AllowOrigins: "*",
|
||||
AllowCredentials: true,
|
||||
AllowMethods: "GET, POST, PATCH, PUT, DELETE, OPTIONS",
|
||||
AllowHeaders: "Accept, Authorization, Content-Type, Vary, X-Request-Id",
|
||||
AllowOrigins: "*",
|
||||
// AllowCredentials: true,
|
||||
AllowMethods: "GET, POST, PATCH, PUT, DELETE, OPTIONS",
|
||||
AllowHeaders: "Accept, Authorization, Content-Type, Vary, X-Request-Id",
|
||||
})
|
||||
)
|
||||
|
||||
func SetupRouter(s *Server) {
|
||||
s.Base.Options("*", defaultCORS)
|
||||
s.Base.Use(defaultCORS)
|
||||
s.Options("*", defaultCORS)
|
||||
s.Use(defaultCORS)
|
||||
|
||||
s.Base.Get("/health", s.HealthHandler)
|
||||
s.Base.Get("/config", s.ConfigHandler)
|
||||
s.Get("/health", s.HealthHandler)
|
||||
s.Get("/config", s.ConfigHandler)
|
||||
|
||||
api := s.Base.Group("/api")
|
||||
api := s.Group("/api")
|
||||
v1 := api.Group("/v1")
|
||||
v1.Post("/login", s.LoginHandler)
|
||||
v1.All("/traefik", s.TraefikHandler)
|
||||
|
||||
@@ -1,181 +1,135 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"log"
|
||||
"os"
|
||||
"net"
|
||||
"time"
|
||||
|
||||
"github.com/go-redis/redis/v8"
|
||||
"github.com/gofiber/fiber/v2"
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
|
||||
"git.pbiernat.dev/egommerce/go-api-pkg/consul"
|
||||
"git.pbiernat.dev/egommerce/go-api-pkg/fluentd"
|
||||
|
||||
db "git.pbiernat.dev/egosport/identity-svc/pkg/database"
|
||||
p "git.pbiernat.dev/egosport/identity-svc/pkg/server"
|
||||
|
||||
cnf "git.pbiernat.dev/egosport/identity-svc/internal/config"
|
||||
"git.pbiernat.io/egommerce/api-entities/http"
|
||||
"git.pbiernat.io/egommerce/go-api-pkg/consul"
|
||||
"git.pbiernat.io/egommerce/go-api-pkg/fluentd"
|
||||
)
|
||||
|
||||
type (
|
||||
Server struct {
|
||||
Base *p.Server
|
||||
Config *cnf.Config
|
||||
*fiber.App
|
||||
|
||||
Cache *redis.Client
|
||||
Database *pgxpool.Pool
|
||||
Logger *fluentd.Logger
|
||||
Registry *consul.Service
|
||||
ID string
|
||||
addr string // e.g. "127.0.0.1:80"
|
||||
handlers map[string]any
|
||||
}
|
||||
HeaderRequestID struct {
|
||||
RequestID string `reqHeader:"x-request-id"`
|
||||
}
|
||||
|
||||
OptionFn func(*Server) error // FIXME: similar in worker
|
||||
)
|
||||
|
||||
func New(c *cnf.Config, opts ...OptionFn) *Server {
|
||||
svr := &Server{
|
||||
Base: p.New(c.Base),
|
||||
Config: c,
|
||||
}
|
||||
|
||||
svr.Base.PurgeFn = svr.Shutdown()
|
||||
|
||||
for _, opt := range opts {
|
||||
if err := opt(svr); err != nil {
|
||||
log.Fatalf("Failed to attach extension to the server. Err: %v\n", err)
|
||||
}
|
||||
}
|
||||
|
||||
SetupMiddleware(svr)
|
||||
SetupRouter(svr)
|
||||
|
||||
return svr
|
||||
}
|
||||
|
||||
func WithCache(c *cnf.Config) OptionFn {
|
||||
redis := redis.NewClient(&redis.Options{
|
||||
Addr: c.CacheAddr,
|
||||
Password: c.CachePassword,
|
||||
DB: 0,
|
||||
})
|
||||
|
||||
return func(s *Server) error {
|
||||
s.Cache = redis
|
||||
|
||||
return nil
|
||||
func New(c *Config) *Server {
|
||||
return &Server{
|
||||
ID: c.ID,
|
||||
App: fiber.New(fiber.Config{
|
||||
AppName: c.ID,
|
||||
ServerHeader: c.Name + ":" + c.ID,
|
||||
ReadTimeout: c.ReadTimeout * time.Millisecond,
|
||||
WriteTimeout: c.WriteTimeout * time.Millisecond,
|
||||
IdleTimeout: c.IdleTimeout * time.Millisecond,
|
||||
}),
|
||||
addr: c.NetAddr,
|
||||
handlers: make(map[string]any),
|
||||
}
|
||||
}
|
||||
|
||||
func WithDatabase(c *cnf.Config) OptionFn {
|
||||
dbConn, err := db.Connect(c.DbURL)
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to connect to the Database: %s. Err: %v\n", c.DbURL, err)
|
||||
os.Exit(1)
|
||||
}
|
||||
func (s *Server) Start() error {
|
||||
SetupMiddleware(s)
|
||||
SetupRouter(s)
|
||||
|
||||
return func(s *Server) error {
|
||||
s.Database = dbConn
|
||||
// fmt.Printf("Starting server at: %s...\n", s.addr)
|
||||
ln, _ := net.Listen("tcp", s.addr)
|
||||
// ln = tls.NewListener(ln, s.App.Server().TLSConfig)
|
||||
|
||||
return nil
|
||||
}
|
||||
return s.Listener(ln)
|
||||
}
|
||||
|
||||
func WithLogger(c *cnf.Config) OptionFn {
|
||||
return func(s *Server) error {
|
||||
logHost, logPort, err := fluentd.ParseAddr(c.LoggerAddr)
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to parse Fluentd address: %s. Err: %v", c.LoggerAddr, err)
|
||||
}
|
||||
|
||||
logger, err := fluentd.NewLogger(c.Base.GetAppFullName(), logHost, logPort)
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to connect to the Fluentd on %s:%d. Err: %v", logHost, logPort, err)
|
||||
}
|
||||
|
||||
s.Logger = logger
|
||||
|
||||
return nil
|
||||
}
|
||||
func (s *Server) RegisterHandler(name string, fn func() any) {
|
||||
// fmt.Printf("Registering plugin( with handler): %s... OK\n", name)
|
||||
s.handlers[name] = fn()
|
||||
}
|
||||
|
||||
// func WithRegistry(c *cnf.Config) OptionFn {
|
||||
// return func(s *Server) error {
|
||||
// port, _ := strconv.Atoi(c.Base.NetAddr[1:]) // FIXME: can be IP:PORT which will cause error
|
||||
// registry, err := consul.NewService(c.RegistryAddr, c.Base.AppID, c.Base.AppName, c.Base.AppID, c.Base.AppName, c.Base.PathPrefix, port)
|
||||
// if err != nil {
|
||||
// log.Fatalf("Failed to connect to the Consul on: %s. Err: %v", c.RegistryAddr, err)
|
||||
// }
|
||||
func (s *Server) OnShutdown() {
|
||||
// s.GetLogger().Log("Server %s is going down...", s.ID)
|
||||
|
||||
// err = registry.Register()
|
||||
// if err != nil {
|
||||
// log.Fatalf("Failed to register in the Consul service. Err: %v", err)
|
||||
// }
|
||||
s.GetRegistry().Unregister()
|
||||
// a.clearMetadataCache()
|
||||
// s.GetEventBus().Close()
|
||||
s.GetDatabase().Close()
|
||||
s.GetLogger().Log("Gone.")
|
||||
s.GetLogger().Close()
|
||||
|
||||
// s.Registry = registry
|
||||
s.Shutdown()
|
||||
}
|
||||
|
||||
// go func() { // Consul KV updater
|
||||
// ticker := time.NewTicker(time.Second * 15)
|
||||
// for range ticker.C {
|
||||
// fetchKVConfig(s) // FIXME: duplicated in worker
|
||||
// }
|
||||
// }()
|
||||
func (s *Server) GetRequestID(c *fiber.Ctx) (string, error) {
|
||||
var hdr = new(HeaderRequestID)
|
||||
if err := c.ReqHeaderParser(hdr); err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
// go func() { // Server metadata cache updater
|
||||
// ticker := time.NewTicker(time.Second * 5)
|
||||
// for range ticker.C {
|
||||
// s.cacheMetadata()
|
||||
// }
|
||||
// }()
|
||||
return hdr.RequestID, nil
|
||||
}
|
||||
|
||||
// return nil
|
||||
// }
|
||||
func (s *Server) Error(c *fiber.Ctx, code int, msg string) error {
|
||||
return c.Status(code).JSON(http.ErrorResponse{Error: msg})
|
||||
}
|
||||
|
||||
// Plugin helper funcitons
|
||||
func (s *Server) GetCache() *redis.Client {
|
||||
return (s.handlers["cache"]).(*redis.Client)
|
||||
}
|
||||
|
||||
func (s *Server) GetDatabase() *pgxpool.Pool { // FIXME hardcoded index issue
|
||||
return (s.handlers["database"]).(*pgxpool.Pool)
|
||||
}
|
||||
|
||||
// func (s *Server) GetEventBus() *amqp.Channel {
|
||||
// return (s.handlers["eventbus"]).(*amqp.Channel)
|
||||
// }
|
||||
|
||||
func (s *Server) Shutdown() p.PurgeFn {
|
||||
return func() error {
|
||||
s.Logger.Log("Server %s is going down...", s.Base.AppID)
|
||||
func (s *Server) GetLogger() *fluentd.Logger {
|
||||
return (s.handlers["logger"]).(*fluentd.Logger)
|
||||
}
|
||||
|
||||
// s.Registry.Unregister()
|
||||
// s.clearMetadataCache()
|
||||
s.Database.Close()
|
||||
s.Logger.Log("Gone.")
|
||||
s.Logger.Close()
|
||||
|
||||
return s.Base.Shutdown()
|
||||
}
|
||||
func (s *Server) GetRegistry() *consul.Service {
|
||||
return (s.handlers["registry"]).(*consul.Service)
|
||||
}
|
||||
|
||||
// @CHECK: merge s.Config and s.Base.Config to display all config as one array/map
|
||||
// func fetchKVConfig(s *Server) { // @FIXME: merge duplication in server.go and worker.go
|
||||
// config, _, err := s.Registry.KV().Get(s.Config.KVNamespace, nil)
|
||||
// if err != nil || config == nil {
|
||||
// return
|
||||
// }
|
||||
// func (s *Server) registerKVUpdater() { // @FIXME: merge duplication in server.go and worker.go
|
||||
// go func() {
|
||||
// ticker := time.NewTicker(time.Second * 10)
|
||||
// for range ticker.C {
|
||||
// config, _, err := s.Registry.KV().Get(s.cnf.KVNamespace, nil)
|
||||
// if err != nil || config == nil {
|
||||
// return
|
||||
// }
|
||||
|
||||
// kvCnf := bytes.NewBuffer(config.Value)
|
||||
// decoder := json.NewDecoder(kvCnf)
|
||||
// if err := decoder.Decode(&s.Config); err != nil {
|
||||
// return
|
||||
// }
|
||||
// kvCnf := bytes.NewBuffer(config.Value)
|
||||
// decoder := json.NewDecoder(kvCnf)
|
||||
// if err := decoder.Decode(&s.cnf); err != nil {
|
||||
// return
|
||||
// }
|
||||
// }
|
||||
// }()
|
||||
// }
|
||||
|
||||
// func (s *Server) cacheMetadata() {
|
||||
// ctx := context.Background()
|
||||
// key, address := s.getMetadataIPsKey(), s.Base.Config.AppID
|
||||
// // func (s *Server) clearMetadataCache() {
|
||||
// // ctx := context.Background()
|
||||
// // key, address := s.getMetadataIPsKey(), s.cnf.Base.AppID
|
||||
|
||||
// pos := s.Cache.LPos(ctx, key, address, redis.LPosArgs{}).Val()
|
||||
// if pos >= 0 {
|
||||
// s.Cache.LRem(ctx, key, 0, address)
|
||||
// }
|
||||
// // s.Cache.LRem(ctx, key, 0, address)
|
||||
// // }
|
||||
|
||||
// s.Cache.LPush(ctx, key, address).Err()
|
||||
// }
|
||||
|
||||
// func (s *Server) clearMetadataCache() {
|
||||
// ctx := context.Background()
|
||||
// key, address := s.getMetadataIPsKey(), s.Config.Base.AppID
|
||||
|
||||
// s.Cache.LRem(ctx, key, 0, address)
|
||||
// }
|
||||
|
||||
// func (s *Server) getMetadataIPsKey() string {
|
||||
// return "internal__" + s.Base.Config.AppName + "__ips"
|
||||
// }
|
||||
// // func (s *Server) getMetadataIPsKey() string {
|
||||
// // return "internal__" + s.cnf.Name + "__ips"
|
||||
// // }
|
||||
|
||||
@@ -3,7 +3,7 @@ package server
|
||||
import (
|
||||
"net/http"
|
||||
|
||||
"git.pbiernat.dev/egosport/identity-svc/internal/service"
|
||||
"git.pbiernat.io/egommerce/identity-service/internal/service"
|
||||
"github.com/gofiber/fiber/v2"
|
||||
)
|
||||
|
||||
@@ -16,11 +16,11 @@ func (s *Server) TraefikHandler(c *fiber.Ctx) error {
|
||||
cookie := service.AuthService.Cookie("traefik", "dummy-traefik-token")
|
||||
|
||||
c.Cookie(cookie)
|
||||
s.Logger.Log("Traefik action set cookie. done.")
|
||||
s.GetLogger().Log("Traefik action set cookie. done.")
|
||||
|
||||
c.Response().Header.Add("Server", "identity-svc/traefik")
|
||||
c.Response().Header.Add("Server", "identity-service/traefik")
|
||||
reqCookie := c.Request().Header.Cookie("basket_id")
|
||||
s.Logger.Log("Request cookie: %s", reqCookie)
|
||||
s.GetLogger().Log("Request cookie: %s", reqCookie)
|
||||
|
||||
return c.
|
||||
Status(http.StatusOK).
|
||||
|
||||
@@ -4,7 +4,7 @@ import (
|
||||
"errors"
|
||||
"strconv"
|
||||
|
||||
"git.pbiernat.dev/egosport/identity-svc/pkg/config"
|
||||
baseCnf "git.pbiernat.io/egommerce/go-api-pkg/config"
|
||||
"github.com/gofiber/fiber/v2"
|
||||
)
|
||||
|
||||
@@ -16,7 +16,7 @@ var (
|
||||
)
|
||||
|
||||
func init() {
|
||||
cookieExpireTime, _ := strconv.Atoi(config.GetEnv("AUTH_COOKIE_EXPIRE_TIME", "5"))
|
||||
cookieExpireTime, _ := strconv.Atoi(baseCnf.GetEnv("AUTH_COOKIE_EXPIRE_TIME", "5"))
|
||||
AuthService = &Auth{"jwt_token", "jwt_token_refresh", cookieExpireTime}
|
||||
}
|
||||
|
||||
|
||||
@@ -5,7 +5,7 @@ import (
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"git.pbiernat.dev/egosport/identity-svc/pkg/config"
|
||||
baseCnf "git.pbiernat.io/egommerce/go-api-pkg/config"
|
||||
"github.com/golang-jwt/jwt"
|
||||
)
|
||||
|
||||
@@ -15,8 +15,8 @@ var (
|
||||
)
|
||||
|
||||
func init() {
|
||||
tokenExpireTime, _ = strconv.Atoi(config.GetEnv("JWT_TOKEN_EXPIRE_TIME", "5"))
|
||||
tokenSecret = []byte(config.GetEnv("JWT_SECRET_KEY", "B413IlIv9nKQfsMCXTE0Cteo4yHgUEfqaLfjg73sNlh")) // FIXME env: JWT_SECRET_KEY !!!
|
||||
tokenExpireTime, _ = strconv.Atoi(baseCnf.GetEnv("JWT_TOKEN_EXPIRE_TIME", "5"))
|
||||
tokenSecret = []byte(baseCnf.GetEnv("JWT_SECRET_KEY", "B413IlIv9nKQfsMCXTE0Cteo4yHgUEfqaLfjg73sNlh")) // FIXME env: JWT_SECRET_KEY !!!
|
||||
|
||||
JWTService = &JWT{tokenExpireTime, tokenSecret}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user