|
|
|
|
@@ -1,13 +1,8 @@
|
|
|
|
|
package server
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"bytes"
|
|
|
|
|
"context"
|
|
|
|
|
"encoding/json"
|
|
|
|
|
"log"
|
|
|
|
|
"os"
|
|
|
|
|
"strconv"
|
|
|
|
|
"time"
|
|
|
|
|
|
|
|
|
|
"github.com/go-redis/redis/v8"
|
|
|
|
|
"github.com/jackc/pgx/v5/pgxpool"
|
|
|
|
|
@@ -15,15 +10,15 @@ import (
|
|
|
|
|
"git.pbiernat.dev/egommerce/go-api-pkg/consul"
|
|
|
|
|
"git.pbiernat.dev/egommerce/go-api-pkg/fluentd"
|
|
|
|
|
|
|
|
|
|
db "git.pbiernat.dev/egommerce/identity-service/pkg/database"
|
|
|
|
|
srv "git.pbiernat.dev/egommerce/identity-service/pkg/server"
|
|
|
|
|
db "git.pbiernat.dev/egosport/identity-svc/pkg/database"
|
|
|
|
|
p "git.pbiernat.dev/egosport/identity-svc/pkg/server"
|
|
|
|
|
|
|
|
|
|
cnf "git.pbiernat.dev/egommerce/identity-service/internal/config"
|
|
|
|
|
cnf "git.pbiernat.dev/egosport/identity-svc/internal/config"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
type (
|
|
|
|
|
Server struct {
|
|
|
|
|
Base *srv.Server
|
|
|
|
|
Base *p.Server
|
|
|
|
|
Config *cnf.Config
|
|
|
|
|
|
|
|
|
|
Cache *redis.Client
|
|
|
|
|
@@ -37,10 +32,12 @@ type (
|
|
|
|
|
|
|
|
|
|
func New(c *cnf.Config, opts ...OptionFn) *Server {
|
|
|
|
|
svr := &Server{
|
|
|
|
|
Base: srv.New(c.Base),
|
|
|
|
|
Base: p.New(c.Base),
|
|
|
|
|
Config: c,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
svr.Base.PurgeFn = svr.Shutdown()
|
|
|
|
|
|
|
|
|
|
for _, opt := range opts {
|
|
|
|
|
if err := opt(svr); err != nil {
|
|
|
|
|
log.Fatalf("Failed to attach extension to the server. Err: %v\n", err)
|
|
|
|
|
@@ -99,45 +96,45 @@ func WithLogger(c *cnf.Config) OptionFn {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func WithRegistry(c *cnf.Config) OptionFn {
|
|
|
|
|
return func(s *Server) error {
|
|
|
|
|
port, _ := strconv.Atoi(c.Base.NetAddr[1:]) // FIXME: can be IP:PORT which will cause error
|
|
|
|
|
registry, err := consul.NewService(c.RegistryAddr, c.Base.AppID, c.Base.AppName, c.Base.AppID, c.Base.AppName, c.Base.PathPrefix, port)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Fatalf("Failed to connect to the Consul on: %s. Err: %v", c.RegistryAddr, err)
|
|
|
|
|
}
|
|
|
|
|
// func WithRegistry(c *cnf.Config) OptionFn {
|
|
|
|
|
// return func(s *Server) error {
|
|
|
|
|
// port, _ := strconv.Atoi(c.Base.NetAddr[1:]) // FIXME: can be IP:PORT which will cause error
|
|
|
|
|
// registry, err := consul.NewService(c.RegistryAddr, c.Base.AppID, c.Base.AppName, c.Base.AppID, c.Base.AppName, c.Base.PathPrefix, port)
|
|
|
|
|
// if err != nil {
|
|
|
|
|
// log.Fatalf("Failed to connect to the Consul on: %s. Err: %v", c.RegistryAddr, err)
|
|
|
|
|
// }
|
|
|
|
|
|
|
|
|
|
err = registry.Register()
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Fatalf("Failed to register in the Consul service. Err: %v", err)
|
|
|
|
|
}
|
|
|
|
|
// err = registry.Register()
|
|
|
|
|
// if err != nil {
|
|
|
|
|
// log.Fatalf("Failed to register in the Consul service. Err: %v", err)
|
|
|
|
|
// }
|
|
|
|
|
|
|
|
|
|
s.Registry = registry
|
|
|
|
|
// s.Registry = registry
|
|
|
|
|
|
|
|
|
|
go func() { // Consul KV updater
|
|
|
|
|
ticker := time.NewTicker(time.Second * 15)
|
|
|
|
|
for range ticker.C {
|
|
|
|
|
fetchKVConfig(s) // FIXME: duplicated in worker
|
|
|
|
|
}
|
|
|
|
|
}()
|
|
|
|
|
// go func() { // Consul KV updater
|
|
|
|
|
// ticker := time.NewTicker(time.Second * 15)
|
|
|
|
|
// for range ticker.C {
|
|
|
|
|
// fetchKVConfig(s) // FIXME: duplicated in worker
|
|
|
|
|
// }
|
|
|
|
|
// }()
|
|
|
|
|
|
|
|
|
|
go func() { // Server metadata cache updater
|
|
|
|
|
ticker := time.NewTicker(time.Second * 5)
|
|
|
|
|
for range ticker.C {
|
|
|
|
|
s.cacheMetadata()
|
|
|
|
|
}
|
|
|
|
|
}()
|
|
|
|
|
// go func() { // Server metadata cache updater
|
|
|
|
|
// ticker := time.NewTicker(time.Second * 5)
|
|
|
|
|
// for range ticker.C {
|
|
|
|
|
// s.cacheMetadata()
|
|
|
|
|
// }
|
|
|
|
|
// }()
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
// return nil
|
|
|
|
|
// }
|
|
|
|
|
// }
|
|
|
|
|
|
|
|
|
|
func (s *Server) Shutdown() srv.PurgeFn {
|
|
|
|
|
return func(srv *srv.Server) error {
|
|
|
|
|
func (s *Server) Shutdown() p.PurgeFn {
|
|
|
|
|
return func() error {
|
|
|
|
|
s.Logger.Log("Server %s is going down...", s.Base.AppID)
|
|
|
|
|
|
|
|
|
|
s.Registry.Unregister()
|
|
|
|
|
s.clearMetadataCache()
|
|
|
|
|
// s.Registry.Unregister()
|
|
|
|
|
// s.clearMetadataCache()
|
|
|
|
|
s.Database.Close()
|
|
|
|
|
s.Logger.Log("Gone.")
|
|
|
|
|
s.Logger.Close()
|
|
|
|
|
@@ -147,38 +144,38 @@ func (s *Server) Shutdown() srv.PurgeFn {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// @CHECK: merge s.Config and s.Base.Config to display all config as one array/map
|
|
|
|
|
func fetchKVConfig(s *Server) { // @FIXME: merge duplication in server.go and worker.go
|
|
|
|
|
config, _, err := s.Registry.KV().Get(s.Config.KVNamespace, nil)
|
|
|
|
|
if err != nil || config == nil {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
// func fetchKVConfig(s *Server) { // @FIXME: merge duplication in server.go and worker.go
|
|
|
|
|
// config, _, err := s.Registry.KV().Get(s.Config.KVNamespace, nil)
|
|
|
|
|
// if err != nil || config == nil {
|
|
|
|
|
// return
|
|
|
|
|
// }
|
|
|
|
|
|
|
|
|
|
kvCnf := bytes.NewBuffer(config.Value)
|
|
|
|
|
decoder := json.NewDecoder(kvCnf)
|
|
|
|
|
if err := decoder.Decode(&s.Config); err != nil {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
// kvCnf := bytes.NewBuffer(config.Value)
|
|
|
|
|
// decoder := json.NewDecoder(kvCnf)
|
|
|
|
|
// if err := decoder.Decode(&s.Config); err != nil {
|
|
|
|
|
// return
|
|
|
|
|
// }
|
|
|
|
|
// }
|
|
|
|
|
|
|
|
|
|
func (s *Server) cacheMetadata() {
|
|
|
|
|
ctx := context.Background()
|
|
|
|
|
key, address := s.getMetadataIPsKey(), s.Base.Config.AppID
|
|
|
|
|
// func (s *Server) cacheMetadata() {
|
|
|
|
|
// ctx := context.Background()
|
|
|
|
|
// key, address := s.getMetadataIPsKey(), s.Base.Config.AppID
|
|
|
|
|
|
|
|
|
|
pos := s.Cache.LPos(ctx, key, address, redis.LPosArgs{}).Val()
|
|
|
|
|
if pos >= 0 {
|
|
|
|
|
s.Cache.LRem(ctx, key, 0, address)
|
|
|
|
|
}
|
|
|
|
|
// pos := s.Cache.LPos(ctx, key, address, redis.LPosArgs{}).Val()
|
|
|
|
|
// if pos >= 0 {
|
|
|
|
|
// s.Cache.LRem(ctx, key, 0, address)
|
|
|
|
|
// }
|
|
|
|
|
|
|
|
|
|
s.Cache.LPush(ctx, key, address).Err()
|
|
|
|
|
}
|
|
|
|
|
// s.Cache.LPush(ctx, key, address).Err()
|
|
|
|
|
// }
|
|
|
|
|
|
|
|
|
|
func (s *Server) clearMetadataCache() {
|
|
|
|
|
ctx := context.Background()
|
|
|
|
|
key, address := s.getMetadataIPsKey(), s.Config.Base.AppID
|
|
|
|
|
// func (s *Server) clearMetadataCache() {
|
|
|
|
|
// ctx := context.Background()
|
|
|
|
|
// key, address := s.getMetadataIPsKey(), s.Config.Base.AppID
|
|
|
|
|
|
|
|
|
|
s.Cache.LRem(ctx, key, 0, address)
|
|
|
|
|
}
|
|
|
|
|
// s.Cache.LRem(ctx, key, 0, address)
|
|
|
|
|
// }
|
|
|
|
|
|
|
|
|
|
func (s *Server) getMetadataIPsKey() string {
|
|
|
|
|
return "internal__" + s.Base.Config.AppName + "__ips"
|
|
|
|
|
}
|
|
|
|
|
// func (s *Server) getMetadataIPsKey() string {
|
|
|
|
|
// return "internal__" + s.Base.Config.AppName + "__ips"
|
|
|
|
|
// }
|
|
|
|
|
|