# 服务注册

服务进程是在注册中心注册自己的元数据信息,通常包括主机和端口号,有时还有身份验证信息,协议,版本号,以及运行环境的信息

本系统中包含了以下内容:

  1. 创建 Web 服务
  2. 创建注册服务
  3. 注册 Web 服务
  4. 取消注册 Web 服务

# 创建日志服务

在正式实现服务注册的功能之前,先实现日志服务,在项目文件夹下创建一个 log 文件夹,存放定义日志逻辑的代码

# 编写日志服务逻辑

日志服务是一个 WEB 服务,功能是接收 web 请求,将 POST 请求的内容写入到 log,注意这里对标准的 log 包起了别名 stdlog,因为后续要自定义一个 Logger 对象 log:

// log/server.go
package log
 
import (
	"io/ioutil"
	stdlog "log"
	"net/http"
	"os"
)
 
var log *stdlog.Logger
 
type fileLog string
 
func (fl fileLog) Write(data []byte) (int, error) {
	f, err := os.OpenFile(string(fl), os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0600)
	if err != nil {
		return 0, err
	}
	defer f.Close()
	return f.Write(data)
}
 
func Run(destination string) {
	log = stdlog.New(fileLog(destination), "go", stdlog.LstdFlags)
}
 
func RegisterHandlers() {
	http.HandleFunc("/log", func(w http.ResponseWriter, r *http.Request) {
		switch r.Method {
		case http.MethodPost:
			msg, err := ioutil.ReadAll(r.Body)  // 读取 Body 数据
			if err != nil || len(msg) == 0 {
				w.WriteHeader(http.StatusBadRequest)
				return
			}
			write(string(msg))
		default:
			w.WriteHeader(http.StatusMethodNotAllowed)
			return
		}
	})
}
 
func write(message string) {
	log.Printf("%v\n", message)
}

这段代码的作用是将日志写入文件系统,先为 filelog 实现 io.Writer 接口,定义 Write 方法

在这个方法中,首先调用了 OpenFile 方法,传入一个文件路径并返回一个 file 对象,指定了权限和模式,并随后判断是否产生错误,最后通过 io.Writer 接口写入文件:

func (fl fileLog) Write(data []byte) (int, error) {
	f, err := os.OpenFile(string(fl), os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0600)  // 打开文件
	if err != nil {
		return 0, err
	}
	defer f.Close()
	return f.Write(data)
}

接着定义 Run 函数,作用是将 log 指向某个文件路径,使用 log.New 来创建一个 Logger 对象,要传入的参数:写入的位置 (实现 io.Writer 接口)、日志前缀和日志内容的 flag (包含了日期和时间)

func Run(destination string) {
	log = stdlog.New(fileLog(destination), "go", stdlog.LstdFlags)
}

然后注册一个 handler:

func RegisterHandlers() {
	http.HandleFunc("/log", func(w http.ResponseWriter, r *http.Request) {
		switch r.Method {
		case http.MethodPost:  // POST 请求
			msg, err := ioutil.ReadAll(r.Body)
			if err != nil || len(msg) == 0 {
				w.WriteHeader(http.StatusBadRequest)
				return
			}
			write(string(msg))  // 写入数据
		default:
			w.WriteHeader(http.StatusMethodNotAllowed)
			return
		}
	})
}

使用了一个 http.HandleFunc 来处理 HTTP 请求,使用 switch-case 结构判断请求方式来分支处理请求,如果是 POST 请求则调用 ioutil.ReadAll 读取 Body 数据,调用 write 函数 (之后实现) 写入文件,如果读取失败或者数据为空,则返回一个 BadRequest (400) 响应。如果接收到的请求不是 POST 请求,则返回一个 MethodNotAllowed 响应 (405)

如下是 write 函数:

func write(message string) {
	log.Printf("%v\n", message)
}

调用 log.Printf 就可以,这里的 log 是自定义的 Logger 对象 (已经在 Run 函数中创建),路径在 New 方法中指定了,这时就会把日志信息写入所指向的文件中

# 运行日志服务

上述编写的 server.go 程序作为日志系统的后端,下面编写一段代码,将服务集中化管理,能够集中启动这些服务

创建一个 service 目录,并在其中编写 service.go 文件:

package service
 
import (
	"context"
	"fmt"
	"log"
	"net/http"
)
 
func Start(ctx context.Context, serviceName, host, port string,
	registerHandlerFunc func()) (context.Context, error) {
	registerHandlerFunc()  // 注册请求处理函数
	ctx = startService(ctx, serviceName, host, port)  // 启动服务
	return ctx, nil
}
 
func startService(ctx context.Context, serviceName, host, port string) context.Context {
	ctx, cancel := context.WithCancel(ctx)
	var srv http.Server
	srv.Addr = ":" + port
 
	go func() {
		log.Println(srv.ListenAndServe())  // 监听 HTTP 请求 调用 ServeHTTP 方法
		cancel()
	}()
 
	go func() {
		fmt.Printf("%v started. Press any key to stop. \n", serviceName)
		var s string
		fmt.Scanln(&s)
		srv.Shutdown(ctx)
		cancel()
	}()
 
	return ctx
}

这段代码定义了一个 Start 函数,接收 context 接口,服务名称,地址,端口号,作用是启动指定的服务,先调用了 registerHandleFunc 注册请求处理函数 (服务程序要有一个处理函数,来处理到来的 HTTP 请求),随后调用了 startService 函数,结束时返回 context 和 nil

在 startService 函数中调用了 WithCancel,返回一个 context 子节点和一个取消函数,用于触发取消信号,之后调一个 goroutine 启动 HTTP 服务器监听在指定的端口,处理到来的 HTTP 请求,当用户输入按下任意建就会触发 Shutdown 方法关闭服务器,并且调用取消函数撤销掉 context:

func startService(ctx context.Context, serviceName, host, port string) context.Context {
	ctx, cancel := context.WithCancel(ctx)
	var srv http.Server
	srv.Addr = ":" + port
 
	go func() {
		log.Println(srv.ListenAndServe())
		cancel()
	}()
 
	go func() {
		fmt.Printf("%v started. Press any key to stop. \n", serviceName)
		var s string
		fmt.Scanln(&s)
		srv.Shutdown(ctx)  // 关闭服务器
		cancel()           // 取消 context
	}()
 
	return ctx
}

创建一个 cmd 文件夹,编写 main.go 作为整个程序的入口:

package main
 
import (
	"context"
	"distributed/log"
	"distributed/service"
	"fmt"
	stdlog "log"
)
 
func main() {
	log.Run("./distributed.log")        // 指定日志文件路径
	host, port := "localhost", "4000"   // 指定地址和端口号
	ctx, err := service.Start(context.Background(),
		"Log Service",
		host, port,
		log.RegisterHandlers,
	)
 
	if err != nil {
		stdlog.Fatalln(err)
	}
	<-ctx.Done()
 
	fmt.Println("Shutting down log service.")
}

这段代码作为入口,启动这个服务

当取消函数 (cancel) 被调用后,ctx.Done 就不会被阻塞,往下执行完整个程序

# 测试日志服务

使用 Postman 发出 POST 请求用于测试,如下所示:

发送请求后,查看日志文件:

$ cat distributed.log 
go 2022/08/22 19:34:08 just for test

日志被记录了下来

# 服务注册逻辑

下面才正式实现注册中心服务注册的功能,应当实现服务注册的接口,这样客户端能够通过这个接口

创建 registry 文件夹,编写如下的 registration.go 文件:

package registry
 
type Registration struct {
	ServiceName ServiceName
	ServiceURL  string
}
 
type ServiceName string
 
const (
	LogService = ServiceName("LogService")
)

如上定义的 Registration 结构体代表被注册的服务,将目前存在服务定义为常量,目前只有一个日志服务 LogService

在这个文件夹下再编写一个 server.go,包含服务注册的主要逻辑:

package registry
 
import (
	"encoding/json"
	"log"
	"net/http"
	"sync"
)
 
const ServerPort = ":3000"  // 端口
const ServicesURL = "http://localhost" + ServerPort + "/services"  // 查询服务的 URL
 
type registry struct {
	registrations []Registration  // 切片
	mutex         *sync.Mutex     // 互斥锁
}
 
func (r *registry) add(reg Registration) error {
	r.mutex.Lock()
	r.registrations = append(r.registrations, reg)
	r.mutex.Unlock()
	return nil
}
 
var reg = registry{
	registrations: make([]Registration, 0),
	mutex:         new(sync.Mutex),
}
 
type Service struct{}
 
func (s Service) ServeHTTP(w http.ResponseWriter, r *http.Request) {
	log.Println("Request received")
	switch r.Method {
	case http.MethodPost:
		dec := json.NewDecoder(r.Body)
		var r Registration
		err := dec.Decode(&r)
		if err != nil {
			log.Println(err)
			w.WriteHeader(http.StatusBadRequest)
			return
		}
		log.Printf("Adding service: %v with URL: %s\n", r.ServiceName, r.ServiceURL)
		err = reg.add(r)
		if err != nil {
			log.Println(err)
			w.WriteHeader(http.StatusBadRequest)
			return
		}
	default:
		w.WriteHeader(http.StatusMethodNotAllowed)
		return
	}
}

定义了一个结构体 registry,包含了两个成员:一个切片,可以看成是一系列服务的集合,和一个互斥锁,用于并发控制,之后将其创建

随后定义的 add 函数作用是注册,在上锁的情况下向集合添加元素,接着定义 ServeHTTP 函数,只接收 POST 请求,解析 JSON 数据,将其中服务名称通过 add 函数添加到上述创建的 Registration 结构中的集合中,这就意味着成功注册了一个服务

# 独立运行服务注册

将上述编写的服务注册的程序独立运行起来,将启动日志服务的 main.go 单独丢进一个 cmd 下的 logservice 文件夹,在 cmd 下创建一个 registryservice 文件夹,编写对应的 main.go 文件:

package main
 
import (
	"context"
	"distributed/registry"
	"fmt"
	"log"
	"net/http"
)
 
func main() {
	http.Handle("/services", &registry.Service{})
 
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
 
	var srv http.Server
	srv.Addr = registry.ServerPort
 
	go func() {
		log.Println(srv.ListenAndServe())
		var s string
		fmt.Scanln(&s)
		srv.Shutdown(ctx)
		cancel()
	}()
 
	<-ctx.Done()
	fmt.Println("Shutting down registry service")
}

这段代码的作用就是将服务注册程序给运行起来,registry.Service 已经实现了 ServeHTTP 接口方法,因此可以直接将这个结构体变量传给 http.Handle 函数

这段代码与上述 service.go 有点相似,在用户输入任意字符后中止掉程序

# 注册一个服务

封装一个函数向服务端发送 POST 请求来注册一个服务,在 registry 下编写一个 client.go 文件:

package registry
 
import (
	"bytes"
	"encoding/json"
	"fmt"
	"net/http"
)
 
func RegisterService(r Registration) error {
	buf := new(bytes.Buffer)
	enc := json.NewEncoder(buf)
	err := enc.Encode(r)
	if err != nil {
		return err
	}
	res, err := http.Post(ServicesURL, "application/json", buf)
	if err != nil {
		return err
	}
 
	if res.StatusCode != http.StatusOK {
		return fmt.Errorf("failed to register service. Registry service "+
			"responded with code %v", res.StatusCode)
	}
	return nil
}

上述代码中定义了一个 RegisterService 函数,向 URL 发送 POST 请求来注册服务,请求中包括了服务名称和服务的 URL

接下来要修改之前的代码,因为发送的数据是 Registration 结构体类型的数据,而上述编写的服务注册的服务端须要更改参数类型,修改 service/service.go 文件,这里在 Start 函数中加上 Registration 结构体类型的参数,serviceName 这个参数可以删掉,并且启动这个服务时应该进行注册,所以在该函数中加上 RegisterService 函数:

func Start(ctx context.Context, host, port string, reg registry.Registration,
	registerHandlerFunc func()) (context.Context, error) {
	registerHandlerFunc()
	ctx = startService(ctx, reg.ServiceName, host, port)
	err := registry.RegisterService(reg)  // 注册服务
	if err != nil {
		return ctx, err
	}
	return ctx, nil
}

同时一并修改 startService 函数的参数:

func startService(ctx context.Context, serviceName registry.ServiceName, host, port string) context.Context {
	ctx, cancel := context.WithCancel(ctx)
	var srv http.Server
	srv.Addr = ":" + port
 
	go func() {
		log.Println(srv.ListenAndServe())
		cancel()
	}()
 
	go func() {
		fmt.Printf("%v started. Press any key to stop. \n", serviceName)
		var s string
		fmt.Scanln(&s)
		srv.Shutdown(ctx)
		cancel()
	}()
 
	return ctx
}

最后要修改 logservice 中的 main.go,因为函数已经被修改:

....
serviceAddress := fmt.Sprintf("http://%s:%s", host, port)
 
r := registry.Registration{
    ServiceName: "Log Service",
    ServiceURL:  serviceAddress,
}
ctx, err := service.Start(context.Background(),
    "Log Service",
    host, port, r, log.RegisterHandlers,
)
....

接下来运行这个服务,先启动 registryservice/main.go,启动服务注册的程序,在运行 logservice/main.go,这时会先发送一条 POST 请求到服务注册的程序,这时服务注册程序收到这条请求,其中包含了服务名称和 URL,服务注册程序将其添加到集合,视为注册了这个服务:

$ go run .
2022/08/23 13:40:24 Request received
2022/08/23 13:40:24 Adding service: Log Service with URL: http://localhost:4000

这时服务注册程序输出已经添加了日志服务

# 取消注册服务

有增就有减,对应地,有服务注册的功能就应该有取消注册的功能

在 registry/server.go 中添加 remove 函数,与 add 函数作用相反,作用是从集合中去除掉指定的 url 所在的 registration:

func (r *registry) remove(url string) error {
	for i := range reg.registrations {
		if reg.registrations[i].ServiceURL == url {
			r.mutex.Lock()
			reg.registrations = append(reg.registrations[:i], r.registrations[i+1:]...)
			r.mutex.Unlock()
			return nil
		}
	}
	return fmt.Errorf("service at URL %s not found", url)
}

使用 for-range 遍历 reg 中的切片,当遇到指定的 URL 时则将其去除

服务注册中,POST 请求用于注册服务,那么将通过 DELETE 请求来取消服务,所以在 ServeHTTP 函数中添加一个针对 DELETE 请求的分支:

func (s Service) ServeHTTP(w http.ResponseWriter, r *http.Request) {
	log.Println("Request received")
	switch r.Method {
	...
	case http.MethodDelete:
		payload, err := ioutil.ReadAll(r.Body)
		if err != nil {
			log.Println(err)
			w.WriteHeader(http.StatusInternalServerError)
			return
		}
		url := string(payload)
		log.Printf("Removing servcice at URL: %s", url)
		err = reg.remove(url)
	default:
		w.WriteHeader(http.StatusMethodNotAllowed)
		return
	}
}

在这个服务端接收到 DELETE 请求后,取出其中 body 的 URL,再调用 remove 函数进行删除

在 registry/client.go 中添加客户端用来取消服务的函数:

func ShutdownService(url string) error {
	req, err := http.NewRequest(http.MethodDelete, ServicesURL, bytes.NewBuffer([]byte(url)))
	if err != nil {
		return err
	}
	req.Header.Add("Content-Type", "text/plain")
	res, err := http.DefaultClient.Do(req)
	if err != nil {
		return err
	}
	if res.StatusCode != http.StatusOK {
		return fmt.Errorf("failed to deregister service. Registry "+
			"service responded with code %v", res.StatusCode)
	}
	return nil
}

使用 http.NewRequest 来发送 DELETE 请求

在 StartService 函数中,当服务停止时,调用该 shutdown 函数向服务端发送 DELETE 请求取消掉这个服务

func startService(ctx context.Context, serviceName registry.ServiceName, host, port string) context.Context {
	...
	go func() {
		log.Println(srv.ListenAndServe())
		err := registry.ShutdownService(fmt.Sprintf("http://%s:%s", host, port))
		if err != nil {
			log.Println(err)
		}
		cancel()
	}()
	...
}

测试:

先启动服务注册程序,然后再启动日志服务,再按任意键关闭

日志服务:

$ go run .
Log Service started. Press any key to stop. 
 
2022/08/23 16:45:44 http: Server closed
Shutting down log service.

服务注册程序:

$ go run .
2022/08/23 16:45:42 Request received
2022/08/23 16:45:42 Adding service: Log Service with URL: http://localhost:4000
2022/08/23 16:45:44 Request received
2022/08/23 16:45:44 Removing servcice at URL: http://localhost:4000
2022/08/23 16:45:44 Request received
2022/08/23 16:45:44 Removing servcice at URL: http://localhost:4000

# 服务发现

在此之前,先实现一个业务服务,功能是学生成绩管理,用户可以查询和增加学生的成绩信息

# 创建业务服务

创建一个 grades 文件夹,编写相关代码,如下是 grades.go 文件,包含了成绩信息管理的主要逻辑:

package grades
 
import (
	"fmt"
	"sync"
)
 
type Student struct {
	ID        int
	FirstName string
	LastName  string
	Grades    []Grade
}
 
func (s Student) Average() float32 {
	var result float32
	for _, grade := range s.Grades {
		result += grade.Score
	}
	return result / float32(len(s.Grades))
}
 
type Students []Student
 
var (
	students      Students
	studentsMutex sync.Mutex
)
 
func (ss Students) GetByID(id int) (*Student, error) {
	for i := range ss {
		if ss[i].ID == id {
			return &ss[i], nil
		}
	}
	return nil, fmt.Errorf("student with ID %d not found", id)
}
 
type GradeType string
 
const (
	GradeQuiz = GradeType("Quiz")
	GradeTest = GradeType("Test")
	GradeExam = GradeType("Exam")
)
 
type Grade struct {
	Title string
	Type  GradeType
	Score float32
}

定义了 students 结构体,表示了学生信息:

type Student struct {
	ID        int     // 学生 ID
	FirstName string  // 名
	LastName  string  // 姓
	Grades    []Grade // 成绩
}

定义了 grade 结构体:

type Grade struct {
	Title string       // 名称
	Type  GradeType    // 类别
	Score float32      // 得分
}

GetByID 用于根据 ID 来查询学生信息,Average 用于求平均成绩

创建 server.go 文件,作为该服务的后端,之后会被 services 中的 Start 函数调用:

package grades
 
import (
	"bytes"
	"encoding/json"
	"fmt"
	"log"
	"net/http"
	"strconv"
	"strings"
)
 
func RegisterHandlers() {
	handler := new(studentsHandler)
	http.Handle("/students", handler)
	http.Handle("/students/", handler)
}
 
type studentsHandler struct{}
 
func (sh studentsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
	pathSegments := strings.Split(r.URL.Path, "/")
	switch len(pathSegments) {
	case 2:
		sh.getAll(w, r)
	case 3:
		id, err := strconv.Atoi(pathSegments[2])
		if err != nil {
			w.WriteHeader(http.StatusNotFound)
			return
		}
		sh.getOne(w, r, id)
	case 4:
		id, err := strconv.Atoi(pathSegments[2])
		if err != nil {
			w.WriteHeader(http.StatusNotFound)
			return
		}
		sh.addGrade(w, r, id)
	default:
		w.WriteHeader(http.StatusNotFound)
	}
}
 
func (sh studentsHandler) getAll(w http.ResponseWriter, r *http.Request) {
	studentsMutex.Lock()
	defer studentsMutex.Unlock()
 
	data, err := sh.toJSON(students)
	if err != nil {
		w.WriteHeader(http.StatusInternalServerError)
		log.Println(err)
		return
	}
	w.Header().Add("Content-Type", "application/json")
	w.Write(data)  // 写回数据
}
 
func (sh studentsHandler) toJSON(obj interface{}) ([]byte, error) {
	var b bytes.Buffer
	enc := json.NewEncoder(&b)
	err := enc.Encode(obj)
	if err != nil {
		return nil, fmt.Errorf("failed to serialize students: %q", err)
	}
	return b.Bytes(), nil
}
 
func (sh studentsHandler) getOne(w http.ResponseWriter, r *http.Request, id int) {
	studentsMutex.Lock()
	defer studentsMutex.Unlock()
 
	student, err := students.GetByID(id)
	if err != nil {
		w.WriteHeader(http.StatusNotFound)
		log.Println(err)
		return
	}
 
	data, err := sh.toJSON(student)
	if err != nil {
		w.WriteHeader(http.StatusInternalServerError)
		log.Printf("Failed to serialize student: %q\n", err)
		return
	}
	w.Header().Add("Content-Type", "application/json")
	w.Write(data)
}
 
func (sh studentsHandler) addGrade(w http.ResponseWriter, r *http.Request, id int) {
	studentsMutex.Lock()
	defer studentsMutex.Unlock()
 
	student, err := students.GetByID(id)
	if err != nil {
		w.WriteHeader(http.StatusNotFound)
		log.Println(err)
		return
	}
	var g Grade
	dec := json.NewDecoder(r.Body)
	err = dec.Decode(&g)
	if err != nil {
		w.WriteHeader(http.StatusBadRequest)
		log.Println(err)
		return
	}
	student.Grades = append(student.Grades, g)  // 添加数据
	w.WriteHeader(http.StatusCreated)
	data, err := sh.toJSON(g)
	if err != nil {
		log.Println(err)
		return
	}
	w.Header().Add("Content-Type", "application/json")
	w.Write(data)
}

使用 RegisterHandlers 注册 http handler,路径指向 URL 下的 students,会作为 Start 函数的参数

作为一个服务,同样要为结构体定义一个接口方法 ServeHTTP,该方法要应对 URL 的几种情况:

  1. /students 获得所有学生的成绩
  2. /students/{id} 获得指定 ID 的学生的信息
  3. /students/{id}/grades 获得增加学生的成绩

使用 string.Split 将 URL 路径切分为多段,使用 switch-case 结构根据分段的个数进行分支处理,段数为 2、3 和 4 分别对应了上述 3 种情况,分别调用 getAll、getOne 和 addGrade 函数,实现细节不再赘述

这里还要定义一个将结构体序列化为 JSON 数据的函数,接收一个空接口:

func (sh studentsHandler) toJSON(obj interface{}) ([]byte, error) {
	var b bytes.Buffer
	enc := json.NewEncoder(&b)
	err := enc.Encode(obj)
	if err != nil {
		return nil, fmt.Errorf("failed to serialize students: %q", err)
	}
	return b.Bytes(), nil
}

编写一个 mockdata.go 文件,制造一些数据:

package grades
 
func init() {
	students = []Student{
		{
			ID:        1,
			FirstName: "Nick",
			LastName:  "Carter",
			Grades: []Grade{
				{
					"Quiz 1",
					GradeQuiz,
					85,
				},
				{
					"Final Exam",
					GradeExam,
					94,
				},
				{
					"Quiz 2",
					GradeQuiz,
					97,
				},
			},
		},
		{
			ID:        2,
			FirstName: "Jack",
			LastName:  "Bright",
			Grades: []Grade{
				{
					"Final Exam",
					GradeExam,
					100,
				},
				{
					"Quiz 2",
					GradeQuiz,
					80,
				},
				{
					"Test 1",
					GradeTest,
					99,
				},
			},
		},
	}
}

这样一个业务服务就完成了,添加到 registry/registrantion.go 中:

const (
	LogService     = ServiceName("LogService")
	GradingService = ServiceName("GradingService")  // 业务服务
)

和日志服务相似,创建 cmd/gradingservice/main.go 文件,编写启动代码:

package main
 
import (
	"context"
	"distributed/log"
	"distributed/registry"
	"distributed/service"
	"fmt"
	stdlog "log"
)
 
func main() {
	log.Run("./distributed.log")
	host, port := "localhost", "6000"
	serviceAddress := fmt.Sprintf("http://%s:%s", host, port)
 
	r := registry.Registration{
		ServiceName: registry.GradingService,
		ServiceURL:  serviceAddress,
	}
	ctx, err := service.Start(context.Background(),
		host, port, r, log.RegisterHandlers,
	)
 
	if err != nil {
		stdlog.Fatalln(err)
	}
	<-ctx.Done()
 
	fmt.Println("Shutting down grading service.")
}

服务注册到这里就基本实现了,如下是项目结构:

.
├── cmd
│   ├── gradingservice
│   │   └── main.go
│   ├── logservice
│   │   └── main.go
│   └── registryservice
│       └── main.go
├── log
│   └── server.go
├── registry
│   ├── client.go
│   ├── registration.go
│   └── server.go
└── service
    └── service.go

# 实现服务发现

完成了上述的业务服务 gradingservice 后,要使其能够请求 logservice

接下来编辑 registry/registration.go 文件,扩展 Registration 结构体:

type Registration struct {
	ServiceName      ServiceName
	ServiceURL       string
	RequiredServices []ServiceName
	ServiceUpdateURL string
}

RequiredServices 表示了该服务所依赖的服务名称,ServiceUpdateURL 用于动态接收更新,比如注册中心就通过这个 URL 告诉这个服务,这里有一个 logservice

创建两个结构体:

type patchEntry struct {
	Name ServiceName
	URL  string
}

该结构体表示单条更新条目

type patch struct {
	Added   []patchEntry
	Removed []patchEntry
}

该结构体记录增加和减少的条目

编写 registry/server.go,扩展注册中心的后端,先扩展 add 函数,在添加服务时就添加为其依赖服务:

func (r *registry) add(reg Registration) error {
	r.mutex.Lock()
	r.registrations = append(r.registrations, reg)
	r.mutex.Unlock()
	err := r.sendRequiredServices(reg)
	if err != nil {
		return err
	}
	return nil
}

调用了一个 sendRequiredServices 函数,功能是添加依赖,发送一个请求,将所要依赖的服务给请求过来,方法实现如下:

func (r *registry) sendRequiredServices(reg Registration) error {
	r.mutex.RLock()
	defer r.mutex.RUnlock()
 
	var p patch
	for _, serviceReg := range r.registrations {
		for _, reqService := range reg.RequiredServices {
			if serviceReg.ServiceName == reqService {
				p.Added = append(p.Added, patchEntry{
					Name: serviceReg.ServiceName,
					URL:  serviceReg.ServiceURL,
				})
			}
		}
	}
	err := r.sendPatch(p, reg.ServiceUpdateURL)
	return err
}

上述函数中,循环遍历已经注册的服务,如果找到所依赖的服务,则添加到切片里,稍后调用 sendPatch 发送出去,实现如下:

func (r *registry) sendPatch(p patch, url string) error {
	d, err := json.Marshal(p)
	if err != nil {
		return err
	}
	_, err = http.Post(url, "application/json", bytes.NewBuffer(d))
	return err
}

该函数先将 patch 结构序列化为一个 JSON 数据,然后放在 POST 请求中发送

每个客户端的服务都有所依赖的服务,要向注册中心请求这些服务,得存储这些请求的服务,比如 gradingservice 就要依赖 logservice

定义一个结构 providers:

type providers struct {
	services map[ServiceName][]string
	mutex    *sync.RWMutex
}

其中储存了服务的提供者,定义了一个 map 结构,和一个读写锁

初始化这个结构体变量:

var prov = providers{
	services: make(map[ServiceName][]string),
	mutex:    new(sync.RWMutex),
}

定义对于的更新方法,用于更新这个结构中的数据:

func (p *providers) Update(pat patch) {
	p.mutex.Lock()
	defer p.mutex.Unlock()
 
	for _, patchEntry := range pat.Added {
		if _, ok := p.services[patchEntry.Name]; !ok {
			p.services[patchEntry.Name] = make([]string, 0)
		}
		p.services[patchEntry.Name] = append(p.services[patchEntry.Name], patchEntry.URL)
	}
	for _, patchEntry := range pat.Removed {
		if providerURLs, ok := p.services[patchEntry.Name]; ok {
			for i := range providerURLs {
				if providerURLs[i] == patchEntry.URL {
					p.services[patchEntry.Name] = append(providerURLs[:i], providerURLs[i+1:]...)
				}
			}
		}
	}
}

先遍历 Added,也就是 patch 要增加的服务,如果 added 里的服务在 providers 中还不存在,则创建这个 service,然后将这个服务和对应 URL 写入 map,同理,接着遍历 Removed,已有的服务就将其删除

接着定义了一个 get 函数,通过服务名称找到对应的 URL:

func (p *providers) get(name ServiceName) (string, error) {
	providers, ok := p.services[name]
	if !ok {
		return "", fmt.Errorf("no providers avaliable for service %v", name)
	}
	idx := int(rand.Float32() * float32(len(providers)))
	return providers[idx], nil
}
 
func GetProvider(name ServiceName) (string, error) {
	return prov.get(name)
}

目前 providers 只有 logservice

接着再将 ServiceUpdateURL 绑定到一个 handler 上,从而能够对其进行处理:

func RegisterService(r Registration) error {
	serviceUpdateURL, err := url.Parse(r.ServiceUpdateURL)
	if err != nil {
		return err
	}
	http.Handle(serviceUpdateURL.Path, &serviceUpdateHandler{})
	....
}

定义接口方法:

type serviceUpdateHandler struct{}
 
func (suh serviceUpdateHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
	if r.Method != http.MethodPost {
		w.WriteHeader(http.StatusMethodNotAllowed)
		return
	}
	dec := json.NewDecoder(r.Body)
	var p patch
	err := dec.Decode(&p)
	if err != nil {
		log.Println(err)
		w.WriteHeader(http.StatusBadGateway)
	}
	prov.Update(p)
}

ServeHTTP 中接收 POST 请求,将 Body 到数据反序列化后传入 Update 方法再调用

因为经过了一些扩展,所以要修改 Registration,增加所要依赖的服务和 URL 字段

type Registration struct {
	ServiceName      ServiceName
	ServiceURL       string
	RequiredServices []ServiceName
	ServiceUpdateURL string
}

下面优化一下 logservice,目前日志服务只有一个服务端,要让客户端能够方便地使用提供的 logservice 服务,在 log 下再编写一个 client.go 文件:

package log
 
import (
	"bytes"
	"distributed/registry"
	"fmt"
	stdlog "log"
	"net/http"
)
 
func SetClientLogger(serviceURL string, clientService registry.ServiceName) {
	stdlog.SetPrefix(fmt.Sprintf("[%v] - ", clientService))
	stdlog.SetFlags(0)  // 不设置 flag
	stdlog.SetOutput(&clientLogger{serviceURL})
}
 
type clientLogger struct {
	url string
}
 
func (cl clientLogger) Write(data []byte) (n int, err error) {
	b := bytes.NewBuffer([]byte(data))
	res, err := http.Post(cl.url+"/log", "text/plain", b)
	if err != nil {
		return 0, err
	}
	if res.StatusCode != http.StatusOK {
		return 0, fmt.Errorf("failed to send log message. ")
	}
	return len(data), nil
}

SetClientLogger 负责设置客户端日志打印的相关的属性,因为要为 clientLogger 实现 io.Writer 接口,所以下面为其实现一个 Write 方法

扩展 gradingservice/main.go 中结构体的定义:

r := registry.Registration{
    ServiceName:      registry.GradingService,
    ServiceURL:       serviceAddress,
    RequiredServices: []registry.ServiceName{registry.LogService},
    ServiceUpdateURL: serviceAddress + "/services",
}

logservice 同时接着还要获取 Provider,即获取提供日志的服务的名称,这便是服务发现

if logProvider, err := registry.GetProvider(registry.LogService); err == nil {
    fmt.Printf("Logging service found at: %s\n", logProvider)
    log.SetClientLogger(logProvider, r.ServiceName)
}

运行测试:

先运行注册中心

$ go run .
2022/08/24 13:03:05 Request received
2022/08/24 13:03:05 Adding service: LogService with URL: http://localhost:4000
2022/08/24 13:03:21 Request received
2022/08/24 13:03:21 Adding service: GradingService with URL: http://localhost:6000

运行日志服务

$ go run .
LogService started. Press any key to stop.

运行 gradding 服务

$ go run .
GradingService started. Press any key to stop. 
Logging service found at: http://localhost:4000

# 服务更新

对于一个服务,当所依赖的服务发生变化时,应当发出通知,例如 logservice,当这个服务停止时,应该发出一条通知来告知 gradingservice,启动时也应当发出一条通知,在上面的实现中,gradingservice 只能在启动时发现 logservice,并且 logservice 要先于 gradingservice 启动

在服务启动时就应该进行通知,所以在 add 函数中添加一个通知函数:

func (r *registry) add(reg Registration) error {
	....
	err := r.sendRequiredServices(reg)
	r.notify(patch{
		Added: []patchEntry{
			{
				Name: reg.ServiceName,
				URL:  reg.ServiceURL,
			},
		},
	})
	return err
}

通知函数的实现:

func (r *registry) notify(fullPatch patch) {
	r.mutex.RLock()
	defer r.mutex.RUnlock()
    
	for _, reg := range r.registrations {
		go func(reg Registration) {
			for _, reqService := range reg.RequiredServices {
				p := patch{
					Added:   []patchEntry{},
					Removed: []patchEntry{},
				}
				sendUpdate := false
				for _, added := range fullPatch.Added {
					if added.Name == reqService {
						p.Added = append(p.Added, added)
						sendUpdate = true
					}
				}
				for _, removed := range fullPatch.Removed {
					if removed.Name == reqService {
						p.Removed = append(p.Removed, removed)
						sendUpdate = true
					}
				}
				if sendUpdate {
					err := r.sendPatch(p, reg.ServiceUpdateURL)  // 发送 patch
					if err != nil {
						log.Println(err)
						return
					}
				}
			}
		}(reg)
	}
}

这个 notify 函数接收一个 patch 变量作为参数,其中包含的 added 和 removed,对应增加和移除的服务,函数体中要做的事就是先遍历所有的服务,然后遍历其所有的依赖服务,根据传入的 fullPatch 和 added 和 removed,如果依赖服务是要增加或移除的,则将 sendUpdate 标志设置为 true,产生 patch 并发送,进行更新,这些流程使用 goroutine 来并发地进行

在 registry/client.go 的 ServeHTTP 函数中增加一句提示信息,表示接收到了更新:

func (suh serviceUpdateHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
	....
	fmt.Printf("Updated received %v\n", p)
	prov.Update(p)
}

对 remove 函数同样要进行扩展:

func (r *registry) remove(url string) error {
	for i := range reg.registrations {
		if reg.registrations[i].ServiceURL == url {
			r.notify(patch{
				Removed: []patchEntry{
					{
						Name: r.registrations[i].ServiceName,
						URL:  r.registrations[i].ServiceURL,
					},
				},
			})
	...
	return fmt.Errorf("service at URL %s not found", url)
}

运行测试:先运行注册中心,然后运行 logservice,然后运行 gradingservice,之后关闭 logservice 再启动

注册中心:

$ go run .
2022/08/24 18:03:45 Request received
2022/08/24 18:03:45 Adding service: LogService with URL: http://localhost:4000
2022/08/24 18:03:50 Request received
2022/08/24 18:03:50 Adding service: GradingService with URL: http://localhost:6000
2022/08/24 18:03:53 Request received
2022/08/24 18:03:53 Removing servcice at URL: http://localhost:4000
2022/08/24 18:03:53 Request received
2022/08/24 18:03:53 Removing servcice at URL: http://localhost:4000
2022/08/24 18:04:12 Request received
2022/08/24 18:04:12 Adding service: LogService with URL: http://localhost:4000

日志服务:

$ go run .
LogService started. Press any key to stop. 
Updated received {[] []}
 
2022/08/24 18:03:53 http: Server closed
Shutting down log service.
$ go run .
LogService started. Press any key to stop. 
Updated received {[] []}

grading 服务:

$ go run .
GradingService started. Press any key to stop. 
Updated received {[{LogService http://localhost:4000}] []}
Logging service found at: http://localhost:4000
Updated received {[] [{LogService http://localhost:4000}]}
Updated received {[{LogService http://localhost:4000}] []}

# 服务状态监控

检查所有服务的健康状况,方法是发送心跳请求,得知服务是否能正常响应

在 Registration 中加一条 HeartbeatURL:

type Registration struct {
	ServiceName      ServiceName
	ServiceURL       string
	RequiredServices []ServiceName
	ServiceUpdateURL string
	HeartbeatURL     string
}

增加心跳检查函数:

func (r *registry) heartbeat(freq time.Duration) {
	for {
		var wg sync.WaitGroup
		for _, reg := range r.registrations {
			wg.Add(1)
			go func(reg Registration) {
				defer wg.Done()
				success := true
				for attemps := 0; attemps < 3; attemps++ {
					res, err := http.Get(reg.HeartbeatURL)
					if err != nil {
						log.Println(err)
					} else if res.StatusCode == http.StatusOK {
						log.Printf("Heartbeat check passed for %v", reg.ServiceName)
						if !success {
							r.add(reg)
						}
						break
					}
					log.Printf("Heartbeat check failed for %v", reg.ServiceName)
					if success {
						success = false
						r.remove(reg.ServiceURL)
					}
				}
			}(reg)
			wg.Wait()
			time.Sleep(freq)
		}
	}
}

上述代码中用到了 waitgroup 和 goroutine,对每个服务的 HeartbeatURL 间隔 freq 的时间发起 get 请求,连续发送三次,如果有一次失败 (状态码不等于 200),那么 success 为 false,那么会调用 remove 来移除服务,如果 success 为 false 又得到了正常的相应,那么又会调用 add 将服务加回来

上述这段代码将会在如下函数中调用,将其定义在 registry/server.go 中,每隔 3 秒检查一次服务健康状况:

var once sync.Once
 
func SetupRegistryService() {
	once.Do(func() {
		go reg.heartbeat(3 * time.Second)
	})
}

与此同时,所有的客户端都应该知道如何相应这种心跳请求:

func RegisterService(r Registration) error {
	heartbeatURL, err := url.Parse(r.HeartbeatURL)
	if err != nil {
		return err
	}
	http.HandleFunc(heartbeatURL.Path, func(w http.ResponseWriter, r *http.Request) {
		w.WriteHeader(http.StatusOK)
	})
	....
}

在服务注册时,就在心跳 URL 注册一个 handler,处理心跳请求,最后为 logservice 和 gradingservice 都加上心跳 URL:

r := registry.Registration{
    ServiceName:      registry.LogService,
    ServiceURL:       serviceAddress,
    RequiredServices: make([]registry.ServiceName, 0),
    ServiceUpdateURL: serviceAddress + "/services",
    HeartbeatURL:     serviceAddress + "/heartbeat",
}

测试

启动注册中心和两个服务,中途关掉 logservice

注册中心的输出:

2022/08/24 19:12:24 Adding service: LogService with URL: http://localhost:4000
2022/08/24 19:12:24 Heartbeat check passed for LogService
2022/08/24 19:12:26 Request received
2022/08/24 19:12:26 Adding service: GradingService with URL: http://localhost:6000
2022/08/24 19:12:27 Heartbeat check passed for LogService
2022/08/24 19:12:30 Heartbeat check passed for GradingService
2022/08/24 19:12:33 Heartbeat check passed for LogService
2022/08/24 19:12:36 Heartbeat check passed for GradingService
2022/08/24 19:12:36 Request received
2022/08/24 19:12:36 Removing servcice at URL: http://localhost:4000
2022/08/24 19:12:36 Request received
2022/08/24 19:12:36 Removing servcice at URL: http://localhost:4000
2022/08/24 19:12:39 Heartbeat check passed for GradingService
2022/08/24 19:12:42 Heartbeat check passed for GradingService
2022/08/24 19:12:45 Heartbeat check passed for GradingService