This commit is contained in:
PB
2023-04-16 17:17:05 +02:00
parent cd9bbdfd75
commit 7adf3b9512
24 changed files with 509 additions and 325 deletions

View File

@@ -1,22 +0,0 @@
package config
import (
"os"
"github.com/joho/godotenv"
)
var ErrLoadingEnvs error
func init() {
ErrLoadingEnvs = godotenv.Load()
}
func GetEnv(name string, defVal string) string {
env := os.Getenv(name)
if env == "" {
return defVal
}
return env
}

View File

@@ -1,62 +0,0 @@
package server
import (
"strings"
"git.pbiernat.dev/egommerce/go-api-pkg/fluentd"
"github.com/gofiber/fiber/v2"
"github.com/gofiber/fiber/v2/middleware/cors"
)
var (
defaultCORS = cors.New(cors.Config{
AllowOrigins: "*",
AllowCredentials: true,
AllowMethods: "GET, POST, PATCH, PUT, DELETE, OPTIONS",
AllowHeaders: "Accept, Authorization, Content-Type, Vary, X-Request-Id",
})
)
func SetupMiddlewares(s *Server) {
s.App.Use(defaultCORS)
s.App.Use(LoggingMiddleware(s.log))
}
func SetupRoutes(s *Server) {
s.App.Options("*", defaultCORS)
s.App.Get("/health", s.HealthHandler)
s.App.Get("/config", s.ConfigHandler)
api := s.App.Group("/api")
v1 := api.Group("/v1")
v1.Post("/login", s.LoginHandler)
v1.All("/traefik", s.TraefikHandler)
}
// Middlewares
func LoggingMiddleware(log *fluentd.Logger) func(c *fiber.Ctx) error {
return func(c *fiber.Ctx) error {
path := string(c.Request().URI().Path())
if strings.Contains(path, "/health") {
return c.Next()
}
if strings.Contains(path, "/traefik") {
log.Log("Request: %s, Headers: %v, remote: %s, via: %s",
c.Request().URI().String(),
c.GetRespHeaders(),
c.Context().RemoteIP().String(),
string(c.Context().UserAgent()))
return c.Next()
}
log.Log("Request: %s, remote: %s, via: %s",
c.Request().URI().String(),
c.Context().RemoteIP().String(),
string(c.Context().UserAgent()))
return c.Next()
}
}

View File

@@ -1,169 +0,0 @@
package server
import (
"bytes"
"context"
"encoding/json"
"os"
"os/signal"
"syscall"
"time"
"github.com/go-redis/redis/v8"
"github.com/gofiber/fiber/v2"
"github.com/jackc/pgx/v4/pgxpool"
discovery "git.pbiernat.dev/egommerce/go-api-pkg/consul"
"git.pbiernat.dev/egommerce/go-api-pkg/fluentd"
)
type Server struct {
*fiber.App
conf *Config
log *fluentd.Logger
db *pgxpool.Pool
cache *redis.Client
discovery *discovery.Service
name string
addr string
kvNmspc string
}
type Headers struct {
RequestID string `reqHeader:"x-request-id"`
}
func NewServer(conf *Config, logger *fluentd.Logger, db *pgxpool.Pool, cache *redis.Client /*, ebCh *amqp.Channel*/) *Server {
consul, err := discovery.NewService(conf.RegistryAddr, conf.AppID, conf.AppName, conf.AppID, conf.AppDomain, conf.PathPrefix, conf.Port)
if err != nil {
logger.Log("Error connecting to %s: %v", conf.RegistryAddr, err)
}
logger.Log("Registering service with name: %s, address: %s", consul.Name, consul.Address)
err = consul.Register()
if err != nil {
logger.Log("register error: %v", err)
}
cnf := fiber.Config{
AppName: conf.AppName,
ServerHeader: conf.AppName,
ReadTimeout: time.Millisecond * 50,
WriteTimeout: time.Millisecond * 50,
IdleTimeout: time.Millisecond * 50,
// Prefork: true,
}
s := &Server{
fiber.New(cnf),
conf,
logger,
db,
cache,
/*ebCh,*/
consul,
conf.AppName,
conf.NetAddr,
conf.KVNamespace,
}
go func(s *Server) { // Consul Catalog and KV updater
interval := time.Second * 15
ticker := time.NewTicker(interval)
for range ticker.C {
s.updateKVConfig()
}
}(s)
go func(s *Server) { // Server metadata cache updater
interval := time.Second * 5
ticker := time.NewTicker(interval)
for range ticker.C {
s.cacheMetadata()
}
}(s)
SetupMiddlewares(s)
SetupRoutes(s)
return s
}
func (s *Server) Start() {
err := s.Listen(s.addr)
s.log.Log("Starting error: %v", err)
}
func (s *Server) StartWithGracefulShutdown(forever chan struct{}) {
go func() {
sigint := make(chan os.Signal, 1)
signal.Notify(sigint, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
<-sigint
if err := s.gracefulShutdown(); err != nil {
s.log.Log("Server is not shutting down! Reason: %v", err)
}
close(forever)
}()
if err := s.Listen(s.addr); err != nil {
s.log.Log("Server is not running! Reason: %v", err)
}
<-forever
}
// GetRequestID Return current requets ID - works only when fiber context are running
func (s *Server) GetRequestID(c *fiber.Ctx) (string, error) {
var hdr = new(Headers)
if err := c.ReqHeaderParser(hdr); err != nil {
return "", err
}
return hdr.RequestID, nil
}
func (s *Server) updateKVConfig() { // FIXME: duplicated in cmd/worker/main.go
config, _, err := s.discovery.KV().Get(s.kvNmspc, nil)
if err != nil || config == nil {
return
}
kvCnf := bytes.NewBuffer(config.Value)
decoder := json.NewDecoder(kvCnf)
if err := decoder.Decode(&s.conf); err != nil {
return
}
}
func (s *Server) cacheMetadata() {
ctx := context.Background()
key, address := "internal__"+s.conf.AppName+"__ips", s.conf.AppID // FIXME: key name
pos := s.cache.LPos(ctx, key, address, redis.LPosArgs{}).Val()
if pos >= 0 {
s.cache.LRem(ctx, key, 0, address)
}
s.cache.LPush(ctx, key, address).Err()
}
func (s *Server) clearMetadataCache() {
ctx := context.Background()
key, address := "internal__"+s.conf.AppName+"__ips", s.conf.AppID // FIXME: key name
s.cache.LRem(ctx, key, 0, address)
}
func (s *Server) gracefulShutdown() error {
s.log.Log("Server is going down...")
s.log.Log("Unregistering service: %s", s.discovery.GetID())
s.discovery.Unregister()
s.clearMetadataCache()
// s.ebCh.Close()
s.db.Close()
s.log.Close()
return s.Shutdown()
}

View File

@@ -1,18 +0,0 @@
package server
import (
"git.pbiernat.dev/egommerce/identity-service/internal/app/service"
"github.com/gofiber/fiber/v2"
)
type TraefikAuthResponse struct {
Status string `json:"status"`
}
func (s *Server) TraefikHandler(c *fiber.Ctx) error {
cookie := service.AuthService.Cookie("traefik", "tmp-dummy-traefik-token")
c.Cookie(cookie)
s.log.Log("Traefik action set cookie. done.")
return c.JSON(&TraefikAuthResponse{Status: "OK"})
}

View File

@@ -0,0 +1,62 @@
package common
import (
"os"
cnf "git.pbiernat.dev/egommerce/identity-service/pkg/config"
srv "git.pbiernat.dev/egommerce/identity-service/pkg/server"
)
const (
defAppName = "identity-svc"
defAppDomain = "identity-svc"
defPathPrefix = "/identity"
defNetAddr = ":80"
defLoggerAddr = "api-logger:24224"
defRegistryAddr = "api-registry:8500"
defDbURL = "postgres://postgres:12345678@postgres-db:5432/egommerce"
defCacheAddr = "api-cache:6379"
defCachePassword = "12345678"
defMongoDbURL = "mongodb://mongodb:12345678@mongo-db:27017"
defEventBusURL = "amqp://guest:guest@api-gateway:5672"
defEbEventsExchange = "api-events"
defEbEventsQueue = "identity-svc"
defKVNmspc = "dev.egommerce/service/identity-svc"
)
type Config struct {
Base *srv.Config
DbURL string `json:"db_url"`
CacheAddr string `json:"cache_addr"`
CachePassword string `json:"cache_password"`
EventBusExchange string `json:"eventbus_exchange"`
EventBusQueue string `json:"eventbus_queue"`
EventBusURL string `json:"eventbus_url"`
LoggerAddr string `json:"logger_addr"`
KVNamespace string
RegistryAddr string
// Fields with JSON mappings are available through Consul KV storage
}
func NewConfig(name string) *Config {
c := new(Config)
c.Base = new(srv.Config)
c.Base.AppID, _ = os.Hostname()
c.Base.AppName = name
c.Base.NetAddr = cnf.GetEnv("SERVER_ADDR", defNetAddr)
c.Base.PathPrefix = cnf.GetEnv("APP_PATH_PREFIX", defPathPrefix)
c.CacheAddr = cnf.GetEnv("CACHE_ADDR", defCacheAddr)
c.CachePassword = cnf.GetEnv("CACHE_PASSWORD", defCachePassword)
c.DbURL = cnf.GetEnv("DATABASE_URL", defDbURL)
c.EventBusExchange = defEbEventsExchange
c.EventBusURL = cnf.GetEnv("EVENTBUS_URL", defEventBusURL)
c.KVNamespace = cnf.GetEnv("APP_KV_NAMESPACE", defKVNmspc)
c.LoggerAddr = cnf.GetEnv("LOGGER_ADDR", defLoggerAddr)
c.RegistryAddr = cnf.GetEnv("REGISTRY_ADDR", defRegistryAddr)
return c
}

View File

@@ -5,5 +5,5 @@ import (
)
func (s *Server) ConfigHandler(c *fiber.Ctx) error {
return c.JSON(s.conf)
return c.JSON(s.Config)
}

View File

@@ -1,7 +1,7 @@
package server
import (
"git.pbiernat.dev/egommerce/identity-service/internal/app/service"
"git.pbiernat.dev/egommerce/identity-service/internal/service"
"github.com/gofiber/fiber/v2"
)

View File

@@ -0,0 +1,33 @@
package server
import (
"strings"
"github.com/gofiber/fiber/v2"
"git.pbiernat.dev/egommerce/go-api-pkg/fluentd"
)
// "github.com/gofiber/fiber/v2"
// "github.com/gofiber/fiber/v2/middleware/cors"
func SetupMiddleware(s *Server) {
s.Base.Use(defaultCORS)
s.Base.Use(LoggingMiddleware(s.Logger))
}
func LoggingMiddleware(log *fluentd.Logger) func(c *fiber.Ctx) error {
return func(c *fiber.Ctx) error {
path := string(c.Request().URI().Path())
if strings.Contains(path, "/health") {
return c.Next()
}
log.Log("Request: %s, remote: %s, via: %s",
c.Request().URI().String(),
c.Context().RemoteIP().String(),
string(c.Context().UserAgent()))
return c.Next()
}
}

View File

@@ -0,0 +1,26 @@
package server
import (
"github.com/gofiber/fiber/v2/middleware/cors"
)
var (
defaultCORS = cors.New(cors.Config{
AllowOrigins: "*",
AllowCredentials: true,
AllowMethods: "GET, POST, PATCH, PUT, DELETE, OPTIONS",
AllowHeaders: "Accept, Authorization, Content-Type, Vary, X-Request-Id",
})
)
func SetupRouter(s *Server) {
s.Base.Options("*", defaultCORS)
s.Base.Get("/health", s.HealthHandler)
s.Base.Get("/config", s.ConfigHandler)
api := s.Base.Group("/api")
v1 := api.Group("/v1")
v1.Post("/login", s.LoginHandler)
v1.All("/traefik", s.TraefikHandler)
}

View File

@@ -0,0 +1,184 @@
package server
import (
"bytes"
"context"
"encoding/json"
"log"
"os"
"strconv"
"time"
"github.com/go-redis/redis/v8"
"github.com/jackc/pgx/v5/pgxpool"
"git.pbiernat.dev/egommerce/go-api-pkg/consul"
"git.pbiernat.dev/egommerce/go-api-pkg/fluentd"
db "git.pbiernat.dev/egommerce/identity-service/pkg/database"
srv "git.pbiernat.dev/egommerce/identity-service/pkg/server"
cnf "git.pbiernat.dev/egommerce/identity-service/internal/config"
)
type (
Server struct {
Base *srv.Server
Config *cnf.Config
Cache *redis.Client
Database *pgxpool.Pool
Logger *fluentd.Logger
Registry *consul.Service
}
OptionFn func(*Server) error // FIXME: similar in worker
)
func New(c *cnf.Config, opts ...OptionFn) *Server {
svr := &Server{
Base: srv.New(c.Base),
Config: c,
}
for _, opt := range opts {
if err := opt(svr); err != nil {
log.Fatalf("Failed to attach extension to the server. Err: %v\n", err)
}
}
SetupMiddleware(svr)
SetupRouter(svr)
return svr
}
func WithCache(c *cnf.Config) OptionFn {
redis := redis.NewClient(&redis.Options{
Addr: c.CacheAddr,
Password: c.CachePassword,
DB: 0,
})
return func(s *Server) error {
s.Cache = redis
return nil
}
}
func WithDatabase(c *cnf.Config) OptionFn {
dbConn, err := db.Connect(c.DbURL)
if err != nil {
log.Fatalf("Failed to connect to the Database: %s. Err: %v\n", c.DbURL, err)
os.Exit(1)
}
return func(s *Server) error {
s.Database = dbConn
return nil
}
}
func WithLogger(c *cnf.Config) OptionFn {
return func(s *Server) error {
logHost, logPort, err := fluentd.ParseAddr(c.LoggerAddr)
if err != nil {
log.Fatalf("Failed to parse Fluentd address: %s. Err: %v", c.LoggerAddr, err)
}
logger, err := fluentd.NewLogger(c.Base.GetAppFullName(), logHost, logPort)
if err != nil {
log.Fatalf("Failed to connect to the Fluentd on %s:%d. Err: %v", logHost, logPort, err)
}
s.Logger = logger
return nil
}
}
func WithRegistry(c *cnf.Config) OptionFn {
return func(s *Server) error {
port, _ := strconv.Atoi(c.Base.NetAddr[1:]) // FIXME: can be IP:PORT which will cause error
registry, err := consul.NewService(c.RegistryAddr, c.Base.AppID, c.Base.AppName, c.Base.AppID, c.Base.AppName, c.Base.PathPrefix, port)
if err != nil {
log.Fatalf("Failed to connect to the Consul on: %s. Err: %v", c.RegistryAddr, err)
}
err = registry.Register()
if err != nil {
log.Fatalf("Failed to register in the Consul service. Err: %v", err)
}
s.Registry = registry
go func() { // Consul KV updater
ticker := time.NewTicker(time.Second * 15)
for range ticker.C {
fetchKVConfig(s) // FIXME: duplicated in worker
}
}()
go func() { // Server metadata cache updater
ticker := time.NewTicker(time.Second * 5)
for range ticker.C {
s.cacheMetadata()
}
}()
return nil
}
}
func (s *Server) Shutdown() srv.PurgeFn {
return func(srv *srv.Server) error {
s.Logger.Log("Server %s is going down...", s.Base.AppID)
s.Registry.Unregister()
s.clearMetadataCache()
s.Database.Close()
s.Logger.Log("Gone.")
s.Logger.Close()
return s.Base.Shutdown()
}
}
// @CHECK: merge s.Config and s.Base.Config to display all config as one array/map
func fetchKVConfig(s *Server) { // @FIXME: merge duplication in server.go and worker.go
config, _, err := s.Registry.KV().Get(s.Config.KVNamespace, nil)
if err != nil || config == nil {
return
}
kvCnf := bytes.NewBuffer(config.Value)
decoder := json.NewDecoder(kvCnf)
if err := decoder.Decode(&s.Config); err != nil {
return
}
}
func (s *Server) cacheMetadata() {
ctx := context.Background()
key, address := s.getMetadataIPsKey(), s.Base.Config.AppID
pos := s.Cache.LPos(ctx, key, address, redis.LPosArgs{}).Val()
if pos >= 0 {
s.Cache.LRem(ctx, key, 0, address)
}
s.Cache.LPush(ctx, key, address).Err()
}
func (s *Server) clearMetadataCache() {
ctx := context.Background()
key, address := s.getMetadataIPsKey(), s.Config.Base.AppID
s.Cache.LRem(ctx, key, 0, address)
}
func (s *Server) getMetadataIPsKey() string {
return "internal__" + s.Base.Config.AppName + "__ips"
}

View File

@@ -0,0 +1,31 @@
package server
import (
"net/http"
"git.pbiernat.dev/egommerce/identity-service/internal/service"
"github.com/gofiber/fiber/v2"
)
type TraefikAuthResponse struct {
Status string `json:"status,omitempty"`
Message string `json:"msg,omitempty"`
}
func (s *Server) TraefikHandler(c *fiber.Ctx) error {
cookie := service.AuthService.Cookie("traefik", "tmp-dummy-traefik-token")
c.Cookie(cookie)
s.Logger.Log("Traefik action set cookie. done.")
c.Response().Header.Add("Server", "identity-svc/traefik")
reqCookie := c.Request().Header.Cookie("basket_id")
s.Logger.Log("Request cookie: %v", reqCookie)
return c.
Status(http.StatusOK).
JSON(&TraefikAuthResponse{Status: "OK"})
// return c.
// Status(http.StatusUnauthorized).
// JSON(&TraefikAuthResponse{Message: "Access denied mf..."})
}

View File

@@ -4,7 +4,7 @@ import (
"errors"
"strconv"
"git.pbiernat.dev/egommerce/identity-service/internal/app/config"
"git.pbiernat.dev/egommerce/identity-service/pkg/config"
"github.com/gofiber/fiber/v2"
)

View File

@@ -5,7 +5,7 @@ import (
"strconv"
"time"
"git.pbiernat.dev/egommerce/identity-service/internal/app/config"
"git.pbiernat.dev/egommerce/identity-service/pkg/config"
"github.com/golang-jwt/jwt"
)