Migrate to K8S stack and fixes before big refactoring

This commit is contained in:
PB
2025-10-13 19:32:31 +02:00
parent ac19e766cf
commit 7f951cd42d
31 changed files with 163 additions and 470 deletions

View File

@@ -9,17 +9,14 @@ import (
"github.com/go-pg/migrations/v8"
"github.com/go-pg/pg/v10"
"git.pbiernat.io/egommerce/go-api-pkg/fluentd"
"git.ego.freeddns.org/egommerce/go-api-pkg/fluentd"
baseCnf "git.pbiernat.io/egommerce/go-api-pkg/config"
cnf "git.pbiernat.io/egommerce/identity-service/internal/server"
baseCnf "git.ego.freeddns.org/egommerce/go-api-pkg/config"
cnf "git.ego.freeddns.org/egommerce/identity-service/internal/server"
)
const (
defAppName = "identity-service-migrations"
defMigrationsTableName = "identity.migrations"
defLoggerAddr = "api-logger:24224"
// defKVNmspc = "dev.egommerce/service/identity-migration"
)
const usageText = `This program runs command on the db. Supported commands are:
@@ -43,7 +40,7 @@ func main() {
flag.Parse()
if baseCnf.ErrLoadingEnvs != nil {
log.Panicln("Error loading .env file", baseCnf.ErrLoadingEnvs)
log.Panicln(baseCnf.ErrLoadingEnvs)
}
c := cnf.NewConfig("migrator")

View File

@@ -5,18 +5,18 @@ import (
"log"
"os"
cnf "git.pbiernat.io/egommerce/go-api-pkg/config"
cnf "git.ego.freeddns.org/egommerce/go-api-pkg/config"
"git.pbiernat.io/egommerce/identity-service/internal/app"
"git.pbiernat.io/egommerce/identity-service/internal/server"
"git.ego.freeddns.org/egommerce/identity-service/internal/app"
"git.ego.freeddns.org/egommerce/identity-service/internal/server"
)
func main() {
if cnf.ErrLoadingEnvs != nil {
log.Panicln("Error loading .env file", cnf.ErrLoadingEnvs)
log.Panicln(cnf.ErrLoadingEnvs)
}
c := server.NewConfig("identity")
c := server.NewConfig("identity-svc")
cArr := c.GetArray()
doer := server.New(c)
@@ -24,8 +24,6 @@ func main() {
a.RegisterPlugin(app.LoggerPlugin(cArr))
a.RegisterPlugin(app.CachePlugin(cArr))
a.RegisterPlugin(app.DatabasePlugin(cArr))
// a.RegisterPlugin(app.EventbusPlugin(cArr))
// a.RegisterPlugin(app.RegistryPlugin(cArr))
while := make(chan struct{})
err := a.Start(while)

View File

@@ -1,10 +1,10 @@
module git.pbiernat.io/egommerce/identity-service
module git.ego.freeddns.org/egommerce/identity-service
go 1.18
require (
git.pbiernat.io/egommerce/api-entities v0.2.3
git.pbiernat.io/egommerce/go-api-pkg v0.3.24
git.ego.freeddns.org/egommerce/api-entities v0.3.0
git.ego.freeddns.org/egommerce/go-api-pkg v0.4.6
github.com/go-pg/migrations/v8 v8.1.0
github.com/go-pg/pg/v10 v10.11.1
github.com/go-redis/redis/v8 v8.11.5

View File

@@ -5,10 +5,10 @@ cloud.google.com/go/compute v1.23.0/go.mod h1:4tCnrn48xsqlwSAiLf1HXMQk8CONslYbdi
cloud.google.com/go/compute/metadata v0.2.3 h1:mg4jlk7mCAj6xXp9UJ4fjI9VUI5rubuGBW5aJ7UnBMY=
cloud.google.com/go/iam v1.1.1 h1:lW7fzj15aVIXYHREOqjRBV9PsH0Z6u8Y46a1YGvQP4Y=
cloud.google.com/go/iam v1.1.1/go.mod h1:A5avdyVL2tCppe4unb0951eI9jreack+RJ0/d+KUZOU=
git.pbiernat.io/egommerce/api-entities v0.2.3 h1:mR6EYfZkAzh4teydb7KXDBWoxwVW3qasnmmH5J3mnas=
git.pbiernat.io/egommerce/api-entities v0.2.3/go.mod h1:INXAG5x4+i+vNwg1NpfPHiDW8nY1kn1K7pgLOtX+/I0=
git.pbiernat.io/egommerce/go-api-pkg v0.3.24 h1:if6xsFOStckWdGm7kcgKWOhOGOe6iIBOOr+3DjgX5tM=
git.pbiernat.io/egommerce/go-api-pkg v0.3.24/go.mod h1:XIy2mmvRNIzQmYIUAcDZafhRPxTQFS2HDmsK7ZQ6980=
git.ego.freeddns.org/egommerce/api-entities v0.3.0 h1:IhJNOfze8/D8Hgy8Mr9hoFEwrg45xeFSnVRUnUrC5xc=
git.ego.freeddns.org/egommerce/api-entities v0.3.0/go.mod h1:IqynARw+06GOm4eZGZuepmbi7bUxWBnOB4jd5cI7jf8=
git.ego.freeddns.org/egommerce/go-api-pkg v0.4.6 h1:1iZW+vkbv7fQusv/pMjtIM1QvJ+QQr3nyvuuajgHc80=
git.ego.freeddns.org/egommerce/go-api-pkg v0.4.6/go.mod h1:5Ft8LCd0UXp5hHpvXRBCv9mCGikogFhL7LP2qit12JM=
github.com/Azure/azure-sdk-for-go v68.0.0+incompatible h1:fcYLmCpyNYRnvJbPerq7U0hS+6+I79yEDJBqVNcqUzU=
github.com/Azure/go-autorest v14.2.0+incompatible h1:V5VMDjClD3GiElqLWO7mz2MxNAK/vTfRHdAubSIPRgs=
github.com/Azure/go-autorest/autorest v0.11.28 h1:ndAExarwr5Y+GaHE6VCaY1kyS/HwwGGyuimVhWsHOEM=

View File

@@ -1,6 +1,7 @@
package app
import (
"fmt"
"log"
"os"
"os/signal"
@@ -36,14 +37,15 @@ func (a *App) Start(while chan struct{}) error {
sigint := make(chan os.Signal, 1)
signal.Notify(sigint, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
<-sigint
fmt.Println("Received signal:", sigint)
a.Shutdown()
close(while)
}()
run := a.createRunFile("./app.run") // FIXME path...
defer a.removeRunFile(run)
runFile := a.createRunFile("./app.run") // FIXME path...
defer a.removeRunFile(runFile)
err := a.doer.Start()
if err != nil {

View File

@@ -3,14 +3,11 @@ package app
import (
"log"
"os"
"strconv"
"time"
"git.ego.freeddns.org/egommerce/go-api-pkg/fluentd"
db "git.ego.freeddns.org/egommerce/identity-service/pkg/database"
redis "github.com/go-redis/redis/v8"
"git.pbiernat.io/egommerce/go-api-pkg/consul"
"git.pbiernat.io/egommerce/go-api-pkg/fluentd"
db "git.pbiernat.io/egommerce/identity-service/pkg/database"
)
type (
@@ -22,13 +19,16 @@ type (
)
func CachePlugin(cArr map[string]string) Plugin {
// fmt.Println(cArr["cacheAddr"], cArr["cacheUsername"], cArr["cachePassword"])
return Plugin{
name: "cache",
fn: func() any {
fn: func() any { // FIXME: return type
return redis.NewClient(&redis.Options{
Addr: cArr["cacheAddr"],
Password: cArr["cachePassword"],
DB: 0,
Addr: cArr["cacheAddr"],
Username: cArr["cacheUsername"],
Password: cArr["cachePassword"],
DB: 0,
DialTimeout: 100 * time.Millisecond,
})
},
}
@@ -37,12 +37,12 @@ func CachePlugin(cArr map[string]string) Plugin {
func DatabasePlugin(cArr map[string]string) Plugin {
return Plugin{
name: "database",
fn: func() any {
dbConn, err := db.Connect(cArr["dbURL"])
if err != nil {
log.Fatalf("Failed to connect to the Database: %s. Err: %v\n", cArr["dbURL"], err)
os.Exit(1) // TODO: retry in background...
}
fn: func() any { // FIXME: return type
dbConn, _ := db.Connect(cArr["dbURL"])
// if err != nil {
// log.Fatalf("Failed to connect to the Database: %s. Err: %v\n", cArr["dbURL"], err)
// os.Exit(1)
// }
return dbConn
},
@@ -52,7 +52,7 @@ func DatabasePlugin(cArr map[string]string) Plugin {
// func EventbusPlugin(cArr map[string]string) Plugin {
// return Plugin{
// name: "eventbus",
// fn: func() any {
// fn: func() any { // FIXME: return type
// conn, err := amqp.Dial(cArr["eventBusURL"])
// if err != nil {
// log.Fatalf("Failed to connect to the EventBus: %s. Err: %v\n", cArr["eventBusURL"], err)
@@ -73,66 +73,20 @@ func DatabasePlugin(cArr map[string]string) Plugin {
func LoggerPlugin(cArr map[string]string) Plugin {
return Plugin{
name: "logger",
fn: func() any {
fn: func() any { // FIXME: return type
logHost, logPort, err := fluentd.ParseAddr(cArr["loggerAddr"])
if err != nil {
log.Fatalf("Failed to parse FluentD address: %s. Err: %v", cArr["loggerAddr"], err)
os.Exit(1) // TODO: retry in background...
os.Exit(1)
}
logger, err := fluentd.NewLogger(cArr["appFullname"], logHost, logPort)
if err != nil {
log.Fatalf("Failed to connect to the FluentD on %s:%d. Err: %v", logHost, logPort, err)
os.Exit(1) // TODO: retry in background...
os.Exit(1)
}
return logger
},
}
}
func RegistryPlugin(cArr map[string]string) Plugin {
return Plugin{
name: "registry",
fn: func() any {
port, _ := strconv.Atoi(cArr["netAddr"][1:]) // FIXME: can be IP:PORT or :PORT
// log.Printf("Consul retrieved port: %v", port)
registry, err := consul.NewService(cArr["registryAddr"], cArr["id"], cArr["name"], cArr["registryDomainOverIP"], cArr["ip"], cArr["domain"], cArr["pathPrefix"], port)
if err != nil {
log.Fatalf("Failed to connect to the Consul on: %s. Err: %v", cArr["registryAddr"], err)
os.Exit(1) // TODO: retry in background...
}
err = registry.Register()
if err != nil {
log.Fatalf("Failed to register in the Consul service. Err: %v", err)
os.Exit(1) // TODO: retry in background...
}
registry.RegisterHealthChecks()
// a.registerKVUpdater() // FIXME run as goroutine
return registry
// svc, _ := registry.Connect()
// tlsCnf := svc.ServerTLSConfig()
// s.Base.App.Server().TLSConfig = tlsCnf
// fmt.Println("Podmiana configa TLS")
// defer svc.Close()
// go func() { // Consul KV updater
// ticker := time.NewTicker(time.Second * 15)
// for range ticker.C {
// fetchKVConfig(s) // FIXME: duplicated in worker
// }
// }()
// go func() { // Server metadata cache updater
// ticker := time.NewTicker(time.Second * 5)
// for range ticker.C {
// s.cacheMetadata()
// }
// }()
},
}
}

View File

@@ -6,13 +6,14 @@ import (
"os"
"time"
cnf "git.pbiernat.io/egommerce/go-api-pkg/config"
cnf "git.ego.freeddns.org/egommerce/go-api-pkg/config"
)
const (
defName = "identity-svc"
defDomain = "identity-svc"
defCacheAddr = "egommerce.local:6379"
defCacheAddr = "api-cache:6379"
defCacheUsername = "default"
defCachePassword = "12345678"
defDbURL = "postgres://postgres:12345678@db-postgres:5432/egommerce"
defEventBusURL = "amqp://guest:guest@api-eventbus:5672"
@@ -21,18 +22,16 @@ const (
defNetAddr = ":443"
defMongoDbURL = "mongodb://mongodb:12345678@mongo-db:27017"
defPathPrefix = "/identity"
defRegistryAddr = "api-registry:8501"
defEbEventsExchange = "api-events"
defEbEventsQueue = "identity-svc"
)
type Config struct {
ID string
Name string
Domain string
NetAddr string
RegistryDomainOverIP string
PathPrefix string
ID string
Name string
Domain string
NetAddr string
PathPrefix string
IdleTimeout time.Duration // miliseconds
ReadTimeout time.Duration // miliseconds
@@ -41,15 +40,12 @@ type Config struct {
LoggerAddr string `json:"logger_addr"`
DbURL string `json:"db_url"`
CacheAddr string `json:"cache_addr"`
CacheUsername string `json:"cache_username"`
CachePassword string `json:"cache_password"`
MongoDbUrl string `json:"mongodb_url"`
EventBusURL string `json:"eventbus_url"`
EventBusExchange string `json:"eventbus_exchange"`
EventBusQueue string `json:"eventbus_queue"`
KVNamespace string
RegistryAddr string
// Fields with JSON mappings are available through Consul KV storage
}
func NewConfig(name string) *Config {
@@ -59,17 +55,14 @@ func NewConfig(name string) *Config {
c.Name = name
c.Domain = cnf.GetEnv("APP_DOMAIN", defDomain)
c.NetAddr = cnf.GetEnv("SERVER_ADDR", defNetAddr)
c.RegistryDomainOverIP = cnf.GetEnv("REGISTRY_USE_DOMAIN_OVER_IP", "false")
c.PathPrefix = cnf.GetEnv("APP_PATH_PREFIX", defPathPrefix)
c.CacheAddr = cnf.GetEnv("CACHE_ADDR", defCacheAddr)
c.CachePassword = cnf.GetEnv("CACHE_PASSWORD", defCachePassword)
c.DbURL = cnf.GetEnv("DATABASE_URL", defDbURL)
c.CacheAddr = cnf.GetEnv("API_CACHE_ADDR", defCacheAddr)
c.CacheUsername = cnf.GetEnv("API_CACHE_USERNAME", defCacheUsername)
c.CachePassword = cnf.GetEnv("API_CACHE_PASSWORD", defCachePassword)
c.DbURL = cnf.GetEnv("API_DATABASE_URL", defDbURL)
c.EventBusExchange = defEbEventsExchange
c.EventBusURL = cnf.GetEnv("EVENTBUS_URL", defEventBusURL)
c.KVNamespace = cnf.GetEnv("APP_KV_NAMESPACE", defKVNmspc)
c.LoggerAddr = cnf.GetEnv("LOGGER_ADDR", defLoggerAddr)
c.RegistryAddr = cnf.GetEnv("REGISTRY_ADDR", defRegistryAddr)
c.EventBusURL = cnf.GetEnv("API_EVENTBUS_URL", defEventBusURL)
c.LoggerAddr = cnf.GetEnv("API_LOGGER_ADDR", defLoggerAddr)
c.PathPrefix = cnf.GetEnv("APP_PATH_PREFIX", defPathPrefix)
return c
}
@@ -96,16 +89,14 @@ func (c *Config) GetArray() map[string]string { // FIXME fix types etc
arr["domain"] = c.Domain
arr["ip"] = c.GetIP()
arr["netAddr"] = c.NetAddr
arr["registryDomainOverIP"] = c.RegistryDomainOverIP
arr["pathPrefix"] = c.PathPrefix
arr["cacheAddr"] = c.CacheAddr
arr["cacheUsername"] = c.CacheUsername
arr["cachePassword"] = c.CachePassword
arr["dbURL"] = c.DbURL
arr["eventBusExchange"] = c.EventBusExchange
arr["eventBusURL"] = c.EventBusURL
arr["kvNamespace"] = c.KVNamespace
arr["loggerAddr"] = c.LoggerAddr
arr["registryAddr"] = c.RegistryAddr
return arr
}

View File

@@ -1,9 +0,0 @@
package server
import (
"github.com/gofiber/fiber/v2"
)
func (s *Server) ConfigHandler(c *fiber.Ctx) error {
return c.JSON(s.Config)
}

View File

@@ -1,6 +1,9 @@
package server
import (
"context"
"net/http"
"github.com/gofiber/fiber/v2"
)
@@ -9,6 +12,25 @@ type HealthResponse struct {
}
func (s *Server) HealthHandler(c *fiber.Ctx) error {
// Only 404 indicate service as non-healthy
err := s.GetDatabase().Ping(context.Background())
if err != nil {
// fmt.Println("db unavailable", err)
return c.SendStatus(http.StatusNotFound)
}
err = s.GetLogger().Ping()
if err != nil {
// fmt.Println("logger unavailable", err)
return c.SendStatus(http.StatusNotFound)
}
err = s.GetCache().Ping(context.Background()).Err()
if err != nil {
// fmt.Println("cache unavailable", err)
return c.SendStatus(http.StatusNotFound)
}
return c.JSON(&HealthResponse{
Status: "OK",
})

View File

@@ -1,7 +1,7 @@
package server
import (
"git.pbiernat.io/egommerce/identity-service/internal/service"
"git.ego.freeddns.org/egommerce/identity-service/internal/service"
"github.com/gofiber/fiber/v2"
)

View File

@@ -3,7 +3,7 @@ package server
import (
"github.com/gofiber/fiber/v2"
"git.pbiernat.io/egommerce/go-api-pkg/fluentd"
"git.ego.freeddns.org/egommerce/go-api-pkg/fluentd"
)
// "github.com/gofiber/fiber/v2"

View File

@@ -18,10 +18,8 @@ func SetupRouter(s *Server) {
s.Use(defaultCORS)
s.Get("/health", s.HealthHandler)
s.Get("/config", s.ConfigHandler)
api := s.Group("/api")
v1 := api.Group("/v1")
v1.Post("/login", s.LoginHandler)
v1.All("/traefik", s.TraefikHandler)
}

View File

@@ -10,9 +10,8 @@ import (
"github.com/gofiber/fiber/v2"
"github.com/jackc/pgx/v5/pgxpool"
"git.pbiernat.io/egommerce/api-entities/http"
"git.pbiernat.io/egommerce/go-api-pkg/consul"
"git.pbiernat.io/egommerce/go-api-pkg/fluentd"
"git.ego.freeddns.org/egommerce/api-entities/common/dto"
"git.ego.freeddns.org/egommerce/go-api-pkg/fluentd"
)
type (
@@ -48,11 +47,11 @@ func (s *Server) Start() error {
SetupRouter(s)
// fmt.Printf("Starting server at: %s...\n", s.addr)
cer, err := tls.LoadX509KeyPair("certs/client.crt", "certs/client.key")
crt, err := tls.LoadX509KeyPair("certs/identity-svc.crt", "certs/identity-svc.key")
if err != nil {
log.Fatal(err)
}
tlsCnf := &tls.Config{Certificates: []tls.Certificate{cer}}
tlsCnf := &tls.Config{Certificates: []tls.Certificate{crt}}
ln, _ := net.Listen("tcp", s.addr)
ln = tls.NewListener(ln, tlsCnf)
@@ -61,15 +60,13 @@ func (s *Server) Start() error {
}
func (s *Server) RegisterHandler(name string, fn func() any) {
// fmt.Printf("Registering plugin( with handler): %s... OK\n", name)
// fmt.Printf("Registering plugin(with handler): %s... OK\n", name)
s.handlers[name] = fn()
}
func (s *Server) OnShutdown() {
// s.GetLogger().Log("Server %s is going down...", s.ID)
s.GetLogger().Log("Server %s is going down...", s.ID)
// s.GetRegistry().Unregister()
// a.clearMetadataCache()
// s.GetEventBus().Close()
s.GetDatabase().Close()
s.GetLogger().Log("Gone.")
@@ -88,7 +85,7 @@ func (s *Server) GetRequestID(c *fiber.Ctx) (string, error) {
}
func (s *Server) Error(c *fiber.Ctx, code int, msg string) error {
return c.Status(code).JSON(http.ErrorResponse{Error: msg})
return c.Status(code).JSON(dto.ErrorResponseDTO{Error: msg})
}
// Plugin helper funcitons
@@ -107,37 +104,3 @@ func (s *Server) GetDatabase() *pgxpool.Pool { // FIXME hardcoded index issue
func (s *Server) GetLogger() *fluentd.Logger {
return (s.handlers["logger"]).(*fluentd.Logger)
}
func (s *Server) GetRegistry() *consul.Service {
return (s.handlers["registry"]).(*consul.Service)
}
// @CHECK: merge s.Config and s.Base.Config to display all config as one array/map
// func (s *Server) registerKVUpdater() { // @FIXME: merge duplication in server.go and worker.go
// go func() {
// ticker := time.NewTicker(time.Second * 10)
// for range ticker.C {
// config, _, err := s.Registry.KV().Get(s.cnf.KVNamespace, nil)
// if err != nil || config == nil {
// return
// }
// kvCnf := bytes.NewBuffer(config.Value)
// decoder := json.NewDecoder(kvCnf)
// if err := decoder.Decode(&s.cnf); err != nil {
// return
// }
// }
// }()
// }
// // func (s *Server) clearMetadataCache() {
// // ctx := context.Background()
// // key, address := s.getMetadataIPsKey(), s.cnf.Base.AppID
// // s.Cache.LRem(ctx, key, 0, address)
// // }
// // func (s *Server) getMetadataIPsKey() string {
// // return "internal__" + s.cnf.Name + "__ips"
// // }

View File

@@ -1,31 +0,0 @@
package server
import (
"net/http"
"git.pbiernat.io/egommerce/identity-service/internal/service"
"github.com/gofiber/fiber/v2"
)
type TraefikAuthResponse struct {
Status string `json:"status,omitempty"`
Message string `json:"msg,omitempty"`
}
func (s *Server) TraefikHandler(c *fiber.Ctx) error {
cookie := service.AuthService.Cookie("traefik", "dummy-traefik-token")
c.Cookie(cookie)
s.GetLogger().Log("Traefik action set cookie. done.")
c.Response().Header.Add("Server", "identity-service/traefik")
reqCookie := c.Request().Header.Cookie("basket_id")
s.GetLogger().Log("Request cookie: %s", reqCookie)
return c.
Status(http.StatusOK).
JSON(&TraefikAuthResponse{Status: "OK"})
// return c.
// Status(http.StatusUnauthorized).
// JSON(&TraefikAuthResponse{Message: "Access denied mf..."})
}

View File

@@ -4,7 +4,7 @@ import (
"errors"
"strconv"
baseCnf "git.pbiernat.io/egommerce/go-api-pkg/config"
baseCnf "git.ego.freeddns.org/egommerce/go-api-pkg/config"
"github.com/gofiber/fiber/v2"
)

View File

@@ -5,7 +5,7 @@ import (
"strconv"
"time"
baseCnf "git.pbiernat.io/egommerce/go-api-pkg/config"
baseCnf "git.ego.freeddns.org/egommerce/go-api-pkg/config"
"github.com/golang-jwt/jwt"
)