Create scaffolding of ddl-simulator

Implement global server as control plane

Implement device to get their info from global server and connect each
others
This commit is contained in:
Pengzhan Hao
2024-12-17 07:31:18 +00:00
commit d6e129179c
21 changed files with 1653 additions and 0 deletions
+52
View File
@@ -0,0 +1,52 @@
package device
import (
"context"
"fmt"
"time"
pb "git.pengzhan.dev/ddp-simulator/proto"
"github.com/cenkalti/backoff/v4"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
func (d *Device) RegisterClient() error {
conn, err := grpc.NewClient(fmt.Sprintf("%s:%d", d.ParentIP, d.ParentPort), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatalf("failed to connect to global server: %v", err)
}
defer conn.Close()
client := pb.NewDeviceClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
children := make([]string, 0)
for _, child := range d.Children {
children = append(children, child.ID)
}
_, err = client.Register(ctx, &pb.RegisterRequest{
Id: d.ID,
Role: d.Role,
Ip: d.IP,
Port: int32(d.Port),
ChildrenIds: children,
Capacity: d.Capacity,
})
return err
}
func (d *Device) RegisterClientRetry(ctx context.Context, timeout time.Duration) error {
b := backoff.NewExponentialBackOff()
b.InitialInterval = 1 * time.Second
b.MaxElapsedTime = timeout
retryOp := func() error {
return d.RegisterClient()
}
err := backoff.Retry(retryOp, backoff.WithContext(b, ctx))
return err
}
+60
View File
@@ -0,0 +1,60 @@
package device
import (
"context"
"time"
pb "git.pengzhan.dev/ddp-simulator/proto"
log "github.com/sirupsen/logrus"
)
type Device struct {
pb.UnimplementedDeviceServer
ID string
Role string
IP string
Port int
ParentID *string
ParentIP string
ParentPort int
Capacity *float32
Children map[string]*Device
}
func NewDevice(id, role, ip string, port int) *Device {
return &Device{
ID: id,
Role: role,
IP: ip,
Port: port,
Children: make(map[string]*Device),
}
}
func (d *Device) UpdateCapacity(capacity *float32) {
d.Capacity = capacity
}
func (d *Device) UpdateParent(parentID *string, parentIP string, parentPort int) {
d.ParentID = parentID
d.ParentIP = parentIP
d.ParentPort = parentPort
}
func (d *Device) Register(ctx context.Context, in *pb.RegisterRequest) (*pb.RegisterResponse, error) {
child := NewDevice(in.Id, in.Role, in.Ip, int(in.Port))
if in.Capacity != nil {
child.UpdateCapacity(in.Capacity)
}
d.Children[in.Id] = child
log.WithField("device", d.ID).Infof("device %s claims on device %s as its child", child.ID, d.ID)
// Update with parent
if d.ParentID != nil {
if err := d.RegisterClientRetry(ctx, 30*time.Second); err != nil {
log.WithField("device", d.ID).Fatalf("failed to register: %v", err)
}
}
return &pb.RegisterResponse{}, nil
}
+6
View File
@@ -0,0 +1,6 @@
package env
const (
PROTOCOL = "tcp"
SOCKET = "/tmp/ddp-simulator/%s.sock"
)
+92
View File
@@ -0,0 +1,92 @@
package global
import (
"context"
"encoding/json"
"fmt"
"io"
"os"
pb "git.pengzhan.dev/ddp-simulator/proto"
log "github.com/sirupsen/logrus"
"google.golang.org/protobuf/proto"
)
type GlobalServer struct {
pb.UnimplementedGlobalServiceServer
Configs map[string]config
}
type config struct {
ID string `json:"id"`
Role string `json:"role"`
Capacity float32 `json:"capacity"`
IP string `json:"ip"`
Port int `json:"port"`
ParentID string `json:"parentID"`
ParentIP string `json:"parentIP"`
ParentPort int `json:"parentPort"`
}
func NewGlobalServer(configPath string) *GlobalServer {
configs, err := ReadConfigFromPath(configPath)
if err != nil {
log.Fatalf("could not read config file: %v", err)
}
return &GlobalServer{
Configs: configs,
}
}
func ReadConfigFromPath(configPath string) (map[string]config, error) {
file, err := os.Open(configPath)
if err != nil {
return nil, fmt.Errorf("could not open file: %v", err)
}
defer file.Close()
fileContent, err := io.ReadAll(file)
if err != nil {
return nil, fmt.Errorf("could not read file content: %v", err)
}
var configs []config
err = json.Unmarshal(fileContent, &configs)
if err != nil {
return nil, fmt.Errorf("could not parse JSON content: %v", err)
}
configMap := make(map[string]config)
for _, c := range configs {
configMap[c.ID] = c
}
return configMap, nil
}
func (g *GlobalServer) Init(ctx context.Context, in *pb.InitRequest) (*pb.InitResponse, error) {
config, ok := g.Configs[in.Id]
if !ok {
log.Errorf("device %s not found", in.Id)
return nil, fmt.Errorf("device %s not found", in.Id)
}
log.Infof("device %s init", config.ID)
resp := &pb.InitResponse{
Role: config.Role,
Capacity: proto.Float32(config.Capacity),
Ip: config.IP,
Port: int32(config.Port),
}
if config.ParentID != "" {
resp.ParentId = proto.String(config.ParentID)
resp.Pip = proto.String(config.ParentIP)
resp.Pport = proto.Int32(int32(config.ParentPort))
}
if config.Capacity != 0 {
resp.Capacity = proto.Float32(config.Capacity)
}
return resp, nil
}
+17
View File
@@ -0,0 +1,17 @@
package leader
type TreeNode struct {
Id string `yaml:"id"`
Children []TreeNode `yaml:"children,omitempty"`
}
func NewTreeNode(id string, children []TreeNode) *TreeNode {
return &TreeNode{
Id: id,
Children: children,
}
}
func (t *TreeNode) AddChild(child *TreeNode) {
t.Children = append(t.Children, *child)
}
+24
View File
@@ -0,0 +1,24 @@
package leader_test
import (
"fmt"
"testing"
yaml "gopkg.in/yaml.v3"
"git.pengzhan.dev/ddp-simulator/pkg/leader"
)
func TestTree(t *testing.T) {
l4 := leader.NewTreeNode("4", nil)
l3 := leader.NewTreeNode("3", nil)
l2 := leader.NewTreeNode("2", nil)
l1 := leader.NewTreeNode("1", []leader.TreeNode{*l4, *l3})
l0 := leader.NewTreeNode("0", []leader.TreeNode{*l1, *l2})
yamlData, err := yaml.Marshal(l0)
if err != nil {
t.Fatalf("failed to marshal yaml: %v", err)
}
fmt.Println(string(yamlData))
}