# 服务注册
服务进程是在注册中心注册自己的元数据信息,通常包括主机和端口号,有时还有身份验证信息,协议,版本号,以及运行环境的信息
本系统中包含了以下内容:
- 创建 Web 服务
- 创建注册服务
- 注册 Web 服务
- 取消注册 Web 服务
# 创建日志服务
在正式实现服务注册的功能之前,先实现日志服务,在项目文件夹下创建一个 log 文件夹,存放定义日志逻辑的代码
# 编写日志服务逻辑
日志服务是一个 WEB 服务,功能是接收 web 请求,将 POST 请求的内容写入到 log,注意这里对标准的 log 包起了别名 stdlog,因为后续要自定义一个 Logger 对象 log:
// log/server.go | |
package log | |
import ( | |
"io/ioutil" | |
stdlog "log" | |
"net/http" | |
"os" | |
) | |
var log *stdlog.Logger | |
type fileLog string | |
func (fl fileLog) Write(data []byte) (int, error) { | |
f, err := os.OpenFile(string(fl), os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0600) | |
if err != nil { | |
return 0, err | |
} | |
defer f.Close() | |
return f.Write(data) | |
} | |
func Run(destination string) { | |
log = stdlog.New(fileLog(destination), "go", stdlog.LstdFlags) | |
} | |
func RegisterHandlers() { | |
http.HandleFunc("/log", func(w http.ResponseWriter, r *http.Request) { | |
switch r.Method { | |
case http.MethodPost: | |
msg, err := ioutil.ReadAll(r.Body) // 读取 Body 数据 | |
if err != nil || len(msg) == 0 { | |
w.WriteHeader(http.StatusBadRequest) | |
return | |
} | |
write(string(msg)) | |
default: | |
w.WriteHeader(http.StatusMethodNotAllowed) | |
return | |
} | |
}) | |
} | |
func write(message string) { | |
log.Printf("%v\n", message) | |
} |
这段代码的作用是将日志写入文件系统,先为 filelog 实现 io.Writer 接口,定义 Write 方法
在这个方法中,首先调用了 OpenFile 方法,传入一个文件路径并返回一个 file 对象,指定了权限和模式,并随后判断是否产生错误,最后通过 io.Writer 接口写入文件:
func (fl fileLog) Write(data []byte) (int, error) { | |
f, err := os.OpenFile(string(fl), os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0600) // 打开文件 | |
if err != nil { | |
return 0, err | |
} | |
defer f.Close() | |
return f.Write(data) | |
} |
接着定义 Run 函数,作用是将 log 指向某个文件路径,使用 log.New 来创建一个 Logger 对象,要传入的参数:写入的位置 (实现 io.Writer 接口)、日志前缀和日志内容的 flag (包含了日期和时间)
func Run(destination string) { | |
log = stdlog.New(fileLog(destination), "go", stdlog.LstdFlags) | |
} |
然后注册一个 handler:
func RegisterHandlers() { | |
http.HandleFunc("/log", func(w http.ResponseWriter, r *http.Request) { | |
switch r.Method { | |
case http.MethodPost: // POST 请求 | |
msg, err := ioutil.ReadAll(r.Body) | |
if err != nil || len(msg) == 0 { | |
w.WriteHeader(http.StatusBadRequest) | |
return | |
} | |
write(string(msg)) // 写入数据 | |
default: | |
w.WriteHeader(http.StatusMethodNotAllowed) | |
return | |
} | |
}) | |
} |
使用了一个 http.HandleFunc 来处理 HTTP 请求,使用 switch-case 结构判断请求方式来分支处理请求,如果是 POST 请求则调用 ioutil.ReadAll 读取 Body 数据,调用 write 函数 (之后实现) 写入文件,如果读取失败或者数据为空,则返回一个 BadRequest (400) 响应。如果接收到的请求不是 POST 请求,则返回一个 MethodNotAllowed 响应 (405)
如下是 write 函数:
func write(message string) { | |
log.Printf("%v\n", message) | |
} |
调用 log.Printf 就可以,这里的 log 是自定义的 Logger 对象 (已经在 Run 函数中创建),路径在 New 方法中指定了,这时就会把日志信息写入所指向的文件中
# 运行日志服务
上述编写的 server.go 程序作为日志系统的后端,下面编写一段代码,将服务集中化管理,能够集中启动这些服务
创建一个 service 目录,并在其中编写 service.go 文件:
package service | |
import ( | |
"context" | |
"fmt" | |
"log" | |
"net/http" | |
) | |
func Start(ctx context.Context, serviceName, host, port string, | |
registerHandlerFunc func()) (context.Context, error) { | |
registerHandlerFunc() // 注册请求处理函数 | |
ctx = startService(ctx, serviceName, host, port) // 启动服务 | |
return ctx, nil | |
} | |
func startService(ctx context.Context, serviceName, host, port string) context.Context { | |
ctx, cancel := context.WithCancel(ctx) | |
var srv http.Server | |
srv.Addr = ":" + port | |
go func() { | |
log.Println(srv.ListenAndServe()) // 监听 HTTP 请求 调用 ServeHTTP 方法 | |
cancel() | |
}() | |
go func() { | |
fmt.Printf("%v started. Press any key to stop. \n", serviceName) | |
var s string | |
fmt.Scanln(&s) | |
srv.Shutdown(ctx) | |
cancel() | |
}() | |
return ctx | |
} |
这段代码定义了一个 Start 函数,接收 context 接口,服务名称,地址,端口号,作用是启动指定的服务,先调用了 registerHandleFunc 注册请求处理函数 (服务程序要有一个处理函数,来处理到来的 HTTP 请求),随后调用了 startService 函数,结束时返回 context 和 nil
在 startService 函数中调用了 WithCancel,返回一个 context 子节点和一个取消函数,用于触发取消信号,之后调一个 goroutine 启动 HTTP 服务器监听在指定的端口,处理到来的 HTTP 请求,当用户输入按下任意建就会触发 Shutdown 方法关闭服务器,并且调用取消函数撤销掉 context:
func startService(ctx context.Context, serviceName, host, port string) context.Context { | |
ctx, cancel := context.WithCancel(ctx) | |
var srv http.Server | |
srv.Addr = ":" + port | |
go func() { | |
log.Println(srv.ListenAndServe()) | |
cancel() | |
}() | |
go func() { | |
fmt.Printf("%v started. Press any key to stop. \n", serviceName) | |
var s string | |
fmt.Scanln(&s) | |
srv.Shutdown(ctx) // 关闭服务器 | |
cancel() // 取消 context | |
}() | |
return ctx | |
} |
创建一个 cmd 文件夹,编写 main.go 作为整个程序的入口:
package main | |
import ( | |
"context" | |
"distributed/log" | |
"distributed/service" | |
"fmt" | |
stdlog "log" | |
) | |
func main() { | |
log.Run("./distributed.log") // 指定日志文件路径 | |
host, port := "localhost", "4000" // 指定地址和端口号 | |
ctx, err := service.Start(context.Background(), | |
"Log Service", | |
host, port, | |
log.RegisterHandlers, | |
) | |
if err != nil { | |
stdlog.Fatalln(err) | |
} | |
<-ctx.Done() | |
fmt.Println("Shutting down log service.") | |
} |
这段代码作为入口,启动这个服务
当取消函数 (cancel) 被调用后,ctx.Done 就不会被阻塞,往下执行完整个程序
# 测试日志服务
使用 Postman 发出 POST 请求用于测试,如下所示:
发送请求后,查看日志文件:
$ cat distributed.log | |
go 2022/08/22 19:34:08 just for test |
日志被记录了下来
# 服务注册逻辑
下面才正式实现注册中心服务注册的功能,应当实现服务注册的接口,这样客户端能够通过这个接口
创建 registry 文件夹,编写如下的 registration.go 文件:
package registry | |
type Registration struct { | |
ServiceName ServiceName | |
ServiceURL string | |
} | |
type ServiceName string | |
const ( | |
LogService = ServiceName("LogService") | |
) |
如上定义的 Registration 结构体代表被注册的服务,将目前存在服务定义为常量,目前只有一个日志服务 LogService
在这个文件夹下再编写一个 server.go,包含服务注册的主要逻辑:
package registry | |
import ( | |
"encoding/json" | |
"log" | |
"net/http" | |
"sync" | |
) | |
const ServerPort = ":3000" // 端口 | |
const ServicesURL = "http://localhost" + ServerPort + "/services" // 查询服务的 URL | |
type registry struct { | |
registrations []Registration // 切片 | |
mutex *sync.Mutex // 互斥锁 | |
} | |
func (r *registry) add(reg Registration) error { | |
r.mutex.Lock() | |
r.registrations = append(r.registrations, reg) | |
r.mutex.Unlock() | |
return nil | |
} | |
var reg = registry{ | |
registrations: make([]Registration, 0), | |
mutex: new(sync.Mutex), | |
} | |
type Service struct{} | |
func (s Service) ServeHTTP(w http.ResponseWriter, r *http.Request) { | |
log.Println("Request received") | |
switch r.Method { | |
case http.MethodPost: | |
dec := json.NewDecoder(r.Body) | |
var r Registration | |
err := dec.Decode(&r) | |
if err != nil { | |
log.Println(err) | |
w.WriteHeader(http.StatusBadRequest) | |
return | |
} | |
log.Printf("Adding service: %v with URL: %s\n", r.ServiceName, r.ServiceURL) | |
err = reg.add(r) | |
if err != nil { | |
log.Println(err) | |
w.WriteHeader(http.StatusBadRequest) | |
return | |
} | |
default: | |
w.WriteHeader(http.StatusMethodNotAllowed) | |
return | |
} | |
} |
定义了一个结构体 registry,包含了两个成员:一个切片,可以看成是一系列服务的集合,和一个互斥锁,用于并发控制,之后将其创建
随后定义的 add 函数作用是注册,在上锁的情况下向集合添加元素,接着定义 ServeHTTP 函数,只接收 POST 请求,解析 JSON 数据,将其中服务名称通过 add 函数添加到上述创建的 Registration 结构中的集合中,这就意味着成功注册了一个服务
# 独立运行服务注册
将上述编写的服务注册的程序独立运行起来,将启动日志服务的 main.go 单独丢进一个 cmd 下的 logservice 文件夹,在 cmd 下创建一个 registryservice 文件夹,编写对应的 main.go 文件:
package main | |
import ( | |
"context" | |
"distributed/registry" | |
"fmt" | |
"log" | |
"net/http" | |
) | |
func main() { | |
http.Handle("/services", ®istry.Service{}) | |
ctx, cancel := context.WithCancel(context.Background()) | |
defer cancel() | |
var srv http.Server | |
srv.Addr = registry.ServerPort | |
go func() { | |
log.Println(srv.ListenAndServe()) | |
var s string | |
fmt.Scanln(&s) | |
srv.Shutdown(ctx) | |
cancel() | |
}() | |
<-ctx.Done() | |
fmt.Println("Shutting down registry service") | |
} |
这段代码的作用就是将服务注册程序给运行起来,registry.Service 已经实现了 ServeHTTP 接口方法,因此可以直接将这个结构体变量传给 http.Handle 函数
这段代码与上述 service.go 有点相似,在用户输入任意字符后中止掉程序
# 注册一个服务
封装一个函数向服务端发送 POST 请求来注册一个服务,在 registry 下编写一个 client.go 文件:
package registry | |
import ( | |
"bytes" | |
"encoding/json" | |
"fmt" | |
"net/http" | |
) | |
func RegisterService(r Registration) error { | |
buf := new(bytes.Buffer) | |
enc := json.NewEncoder(buf) | |
err := enc.Encode(r) | |
if err != nil { | |
return err | |
} | |
res, err := http.Post(ServicesURL, "application/json", buf) | |
if err != nil { | |
return err | |
} | |
if res.StatusCode != http.StatusOK { | |
return fmt.Errorf("failed to register service. Registry service "+ | |
"responded with code %v", res.StatusCode) | |
} | |
return nil | |
} |
上述代码中定义了一个 RegisterService 函数,向 URL 发送 POST 请求来注册服务,请求中包括了服务名称和服务的 URL
接下来要修改之前的代码,因为发送的数据是 Registration 结构体类型的数据,而上述编写的服务注册的服务端须要更改参数类型,修改 service/service.go 文件,这里在 Start 函数中加上 Registration 结构体类型的参数,serviceName 这个参数可以删掉,并且启动这个服务时应该进行注册,所以在该函数中加上 RegisterService 函数:
func Start(ctx context.Context, host, port string, reg registry.Registration, | |
registerHandlerFunc func()) (context.Context, error) { | |
registerHandlerFunc() | |
ctx = startService(ctx, reg.ServiceName, host, port) | |
err := registry.RegisterService(reg) // 注册服务 | |
if err != nil { | |
return ctx, err | |
} | |
return ctx, nil | |
} |
同时一并修改 startService 函数的参数:
func startService(ctx context.Context, serviceName registry.ServiceName, host, port string) context.Context { | |
ctx, cancel := context.WithCancel(ctx) | |
var srv http.Server | |
srv.Addr = ":" + port | |
go func() { | |
log.Println(srv.ListenAndServe()) | |
cancel() | |
}() | |
go func() { | |
fmt.Printf("%v started. Press any key to stop. \n", serviceName) | |
var s string | |
fmt.Scanln(&s) | |
srv.Shutdown(ctx) | |
cancel() | |
}() | |
return ctx | |
} |
最后要修改 logservice 中的 main.go,因为函数已经被修改:
.... | |
serviceAddress := fmt.Sprintf("http://%s:%s", host, port) | |
r := registry.Registration{ | |
ServiceName: "Log Service", | |
ServiceURL: serviceAddress, | |
} | |
ctx, err := service.Start(context.Background(), | |
"Log Service", | |
host, port, r, log.RegisterHandlers, | |
) | |
.... |
接下来运行这个服务,先启动 registryservice/main.go,启动服务注册的程序,在运行 logservice/main.go,这时会先发送一条 POST 请求到服务注册的程序,这时服务注册程序收到这条请求,其中包含了服务名称和 URL,服务注册程序将其添加到集合,视为注册了这个服务:
$ go run . | |
2022/08/23 13:40:24 Request received | |
2022/08/23 13:40:24 Adding service: Log Service with URL: http://localhost:4000 |
这时服务注册程序输出已经添加了日志服务
# 取消注册服务
有增就有减,对应地,有服务注册的功能就应该有取消注册的功能
在 registry/server.go 中添加 remove 函数,与 add 函数作用相反,作用是从集合中去除掉指定的 url 所在的 registration:
func (r *registry) remove(url string) error { | |
for i := range reg.registrations { | |
if reg.registrations[i].ServiceURL == url { | |
r.mutex.Lock() | |
reg.registrations = append(reg.registrations[:i], r.registrations[i+1:]...) | |
r.mutex.Unlock() | |
return nil | |
} | |
} | |
return fmt.Errorf("service at URL %s not found", url) | |
} |
使用 for-range 遍历 reg 中的切片,当遇到指定的 URL 时则将其去除
服务注册中,POST 请求用于注册服务,那么将通过 DELETE 请求来取消服务,所以在 ServeHTTP 函数中添加一个针对 DELETE 请求的分支:
func (s Service) ServeHTTP(w http.ResponseWriter, r *http.Request) { | |
log.Println("Request received") | |
switch r.Method { | |
... | |
case http.MethodDelete: | |
payload, err := ioutil.ReadAll(r.Body) | |
if err != nil { | |
log.Println(err) | |
w.WriteHeader(http.StatusInternalServerError) | |
return | |
} | |
url := string(payload) | |
log.Printf("Removing servcice at URL: %s", url) | |
err = reg.remove(url) | |
default: | |
w.WriteHeader(http.StatusMethodNotAllowed) | |
return | |
} | |
} |
在这个服务端接收到 DELETE 请求后,取出其中 body 的 URL,再调用 remove 函数进行删除
在 registry/client.go 中添加客户端用来取消服务的函数:
func ShutdownService(url string) error { | |
req, err := http.NewRequest(http.MethodDelete, ServicesURL, bytes.NewBuffer([]byte(url))) | |
if err != nil { | |
return err | |
} | |
req.Header.Add("Content-Type", "text/plain") | |
res, err := http.DefaultClient.Do(req) | |
if err != nil { | |
return err | |
} | |
if res.StatusCode != http.StatusOK { | |
return fmt.Errorf("failed to deregister service. Registry "+ | |
"service responded with code %v", res.StatusCode) | |
} | |
return nil | |
} |
使用 http.NewRequest 来发送 DELETE 请求
在 StartService 函数中,当服务停止时,调用该 shutdown 函数向服务端发送 DELETE 请求取消掉这个服务
func startService(ctx context.Context, serviceName registry.ServiceName, host, port string) context.Context { | |
... | |
go func() { | |
log.Println(srv.ListenAndServe()) | |
err := registry.ShutdownService(fmt.Sprintf("http://%s:%s", host, port)) | |
if err != nil { | |
log.Println(err) | |
} | |
cancel() | |
}() | |
... | |
} |
测试:
先启动服务注册程序,然后再启动日志服务,再按任意键关闭
日志服务:
$ go run . | |
Log Service started. Press any key to stop. | |
2022/08/23 16:45:44 http: Server closed | |
Shutting down log service. |
服务注册程序:
$ go run . | |
2022/08/23 16:45:42 Request received | |
2022/08/23 16:45:42 Adding service: Log Service with URL: http://localhost:4000 | |
2022/08/23 16:45:44 Request received | |
2022/08/23 16:45:44 Removing servcice at URL: http://localhost:4000 | |
2022/08/23 16:45:44 Request received | |
2022/08/23 16:45:44 Removing servcice at URL: http://localhost:4000 |
# 服务发现
在此之前,先实现一个业务服务,功能是学生成绩管理,用户可以查询和增加学生的成绩信息
# 创建业务服务
创建一个 grades 文件夹,编写相关代码,如下是 grades.go 文件,包含了成绩信息管理的主要逻辑:
package grades | |
import ( | |
"fmt" | |
"sync" | |
) | |
type Student struct { | |
ID int | |
FirstName string | |
LastName string | |
Grades []Grade | |
} | |
func (s Student) Average() float32 { | |
var result float32 | |
for _, grade := range s.Grades { | |
result += grade.Score | |
} | |
return result / float32(len(s.Grades)) | |
} | |
type Students []Student | |
var ( | |
students Students | |
studentsMutex sync.Mutex | |
) | |
func (ss Students) GetByID(id int) (*Student, error) { | |
for i := range ss { | |
if ss[i].ID == id { | |
return &ss[i], nil | |
} | |
} | |
return nil, fmt.Errorf("student with ID %d not found", id) | |
} | |
type GradeType string | |
const ( | |
GradeQuiz = GradeType("Quiz") | |
GradeTest = GradeType("Test") | |
GradeExam = GradeType("Exam") | |
) | |
type Grade struct { | |
Title string | |
Type GradeType | |
Score float32 | |
} |
定义了 students 结构体,表示了学生信息:
type Student struct { | |
ID int // 学生 ID | |
FirstName string // 名 | |
LastName string // 姓 | |
Grades []Grade // 成绩 | |
} |
定义了 grade 结构体:
type Grade struct { | |
Title string // 名称 | |
Type GradeType // 类别 | |
Score float32 // 得分 | |
} |
GetByID 用于根据 ID 来查询学生信息,Average 用于求平均成绩
创建 server.go 文件,作为该服务的后端,之后会被 services 中的 Start 函数调用:
package grades | |
import ( | |
"bytes" | |
"encoding/json" | |
"fmt" | |
"log" | |
"net/http" | |
"strconv" | |
"strings" | |
) | |
func RegisterHandlers() { | |
handler := new(studentsHandler) | |
http.Handle("/students", handler) | |
http.Handle("/students/", handler) | |
} | |
type studentsHandler struct{} | |
func (sh studentsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { | |
pathSegments := strings.Split(r.URL.Path, "/") | |
switch len(pathSegments) { | |
case 2: | |
sh.getAll(w, r) | |
case 3: | |
id, err := strconv.Atoi(pathSegments[2]) | |
if err != nil { | |
w.WriteHeader(http.StatusNotFound) | |
return | |
} | |
sh.getOne(w, r, id) | |
case 4: | |
id, err := strconv.Atoi(pathSegments[2]) | |
if err != nil { | |
w.WriteHeader(http.StatusNotFound) | |
return | |
} | |
sh.addGrade(w, r, id) | |
default: | |
w.WriteHeader(http.StatusNotFound) | |
} | |
} | |
func (sh studentsHandler) getAll(w http.ResponseWriter, r *http.Request) { | |
studentsMutex.Lock() | |
defer studentsMutex.Unlock() | |
data, err := sh.toJSON(students) | |
if err != nil { | |
w.WriteHeader(http.StatusInternalServerError) | |
log.Println(err) | |
return | |
} | |
w.Header().Add("Content-Type", "application/json") | |
w.Write(data) // 写回数据 | |
} | |
func (sh studentsHandler) toJSON(obj interface{}) ([]byte, error) { | |
var b bytes.Buffer | |
enc := json.NewEncoder(&b) | |
err := enc.Encode(obj) | |
if err != nil { | |
return nil, fmt.Errorf("failed to serialize students: %q", err) | |
} | |
return b.Bytes(), nil | |
} | |
func (sh studentsHandler) getOne(w http.ResponseWriter, r *http.Request, id int) { | |
studentsMutex.Lock() | |
defer studentsMutex.Unlock() | |
student, err := students.GetByID(id) | |
if err != nil { | |
w.WriteHeader(http.StatusNotFound) | |
log.Println(err) | |
return | |
} | |
data, err := sh.toJSON(student) | |
if err != nil { | |
w.WriteHeader(http.StatusInternalServerError) | |
log.Printf("Failed to serialize student: %q\n", err) | |
return | |
} | |
w.Header().Add("Content-Type", "application/json") | |
w.Write(data) | |
} | |
func (sh studentsHandler) addGrade(w http.ResponseWriter, r *http.Request, id int) { | |
studentsMutex.Lock() | |
defer studentsMutex.Unlock() | |
student, err := students.GetByID(id) | |
if err != nil { | |
w.WriteHeader(http.StatusNotFound) | |
log.Println(err) | |
return | |
} | |
var g Grade | |
dec := json.NewDecoder(r.Body) | |
err = dec.Decode(&g) | |
if err != nil { | |
w.WriteHeader(http.StatusBadRequest) | |
log.Println(err) | |
return | |
} | |
student.Grades = append(student.Grades, g) // 添加数据 | |
w.WriteHeader(http.StatusCreated) | |
data, err := sh.toJSON(g) | |
if err != nil { | |
log.Println(err) | |
return | |
} | |
w.Header().Add("Content-Type", "application/json") | |
w.Write(data) | |
} |
使用 RegisterHandlers 注册 http handler,路径指向 URL 下的 students,会作为 Start 函数的参数
作为一个服务,同样要为结构体定义一个接口方法 ServeHTTP,该方法要应对 URL 的几种情况:
/students
获得所有学生的成绩/students/{id}
获得指定 ID 的学生的信息/students/{id}/grades
获得增加学生的成绩
使用 string.Split 将 URL 路径切分为多段,使用 switch-case 结构根据分段的个数进行分支处理,段数为 2、3 和 4 分别对应了上述 3 种情况,分别调用 getAll、getOne 和 addGrade 函数,实现细节不再赘述
这里还要定义一个将结构体序列化为 JSON 数据的函数,接收一个空接口:
func (sh studentsHandler) toJSON(obj interface{}) ([]byte, error) { | |
var b bytes.Buffer | |
enc := json.NewEncoder(&b) | |
err := enc.Encode(obj) | |
if err != nil { | |
return nil, fmt.Errorf("failed to serialize students: %q", err) | |
} | |
return b.Bytes(), nil | |
} |
编写一个 mockdata.go 文件,制造一些数据:
package grades | |
func init() { | |
students = []Student{ | |
{ | |
ID: 1, | |
FirstName: "Nick", | |
LastName: "Carter", | |
Grades: []Grade{ | |
{ | |
"Quiz 1", | |
GradeQuiz, | |
85, | |
}, | |
{ | |
"Final Exam", | |
GradeExam, | |
94, | |
}, | |
{ | |
"Quiz 2", | |
GradeQuiz, | |
97, | |
}, | |
}, | |
}, | |
{ | |
ID: 2, | |
FirstName: "Jack", | |
LastName: "Bright", | |
Grades: []Grade{ | |
{ | |
"Final Exam", | |
GradeExam, | |
100, | |
}, | |
{ | |
"Quiz 2", | |
GradeQuiz, | |
80, | |
}, | |
{ | |
"Test 1", | |
GradeTest, | |
99, | |
}, | |
}, | |
}, | |
} | |
} |
这样一个业务服务就完成了,添加到 registry/registrantion.go 中:
const ( | |
LogService = ServiceName("LogService") | |
GradingService = ServiceName("GradingService") // 业务服务 | |
) |
和日志服务相似,创建 cmd/gradingservice/main.go 文件,编写启动代码:
package main | |
import ( | |
"context" | |
"distributed/log" | |
"distributed/registry" | |
"distributed/service" | |
"fmt" | |
stdlog "log" | |
) | |
func main() { | |
log.Run("./distributed.log") | |
host, port := "localhost", "6000" | |
serviceAddress := fmt.Sprintf("http://%s:%s", host, port) | |
r := registry.Registration{ | |
ServiceName: registry.GradingService, | |
ServiceURL: serviceAddress, | |
} | |
ctx, err := service.Start(context.Background(), | |
host, port, r, log.RegisterHandlers, | |
) | |
if err != nil { | |
stdlog.Fatalln(err) | |
} | |
<-ctx.Done() | |
fmt.Println("Shutting down grading service.") | |
} |
服务注册到这里就基本实现了,如下是项目结构:
. | |
├── cmd | |
│ ├── gradingservice | |
│ │ └── main.go | |
│ ├── logservice | |
│ │ └── main.go | |
│ └── registryservice | |
│ └── main.go | |
├── log | |
│ └── server.go | |
├── registry | |
│ ├── client.go | |
│ ├── registration.go | |
│ └── server.go | |
└── service | |
└── service.go |
# 实现服务发现
完成了上述的业务服务 gradingservice 后,要使其能够请求 logservice
接下来编辑 registry/registration.go 文件,扩展 Registration 结构体:
type Registration struct { | |
ServiceName ServiceName | |
ServiceURL string | |
RequiredServices []ServiceName | |
ServiceUpdateURL string | |
} |
RequiredServices 表示了该服务所依赖的服务名称,ServiceUpdateURL 用于动态接收更新,比如注册中心就通过这个 URL 告诉这个服务,这里有一个 logservice
创建两个结构体:
type patchEntry struct { | |
Name ServiceName | |
URL string | |
} |
该结构体表示单条更新条目
type patch struct { | |
Added []patchEntry | |
Removed []patchEntry | |
} |
该结构体记录增加和减少的条目
编写 registry/server.go,扩展注册中心的后端,先扩展 add 函数,在添加服务时就添加为其依赖服务:
func (r *registry) add(reg Registration) error { | |
r.mutex.Lock() | |
r.registrations = append(r.registrations, reg) | |
r.mutex.Unlock() | |
err := r.sendRequiredServices(reg) | |
if err != nil { | |
return err | |
} | |
return nil | |
} |
调用了一个 sendRequiredServices 函数,功能是添加依赖,发送一个请求,将所要依赖的服务给请求过来,方法实现如下:
func (r *registry) sendRequiredServices(reg Registration) error { | |
r.mutex.RLock() | |
defer r.mutex.RUnlock() | |
var p patch | |
for _, serviceReg := range r.registrations { | |
for _, reqService := range reg.RequiredServices { | |
if serviceReg.ServiceName == reqService { | |
p.Added = append(p.Added, patchEntry{ | |
Name: serviceReg.ServiceName, | |
URL: serviceReg.ServiceURL, | |
}) | |
} | |
} | |
} | |
err := r.sendPatch(p, reg.ServiceUpdateURL) | |
return err | |
} |
上述函数中,循环遍历已经注册的服务,如果找到所依赖的服务,则添加到切片里,稍后调用 sendPatch 发送出去,实现如下:
func (r *registry) sendPatch(p patch, url string) error { | |
d, err := json.Marshal(p) | |
if err != nil { | |
return err | |
} | |
_, err = http.Post(url, "application/json", bytes.NewBuffer(d)) | |
return err | |
} |
该函数先将 patch 结构序列化为一个 JSON 数据,然后放在 POST 请求中发送
每个客户端的服务都有所依赖的服务,要向注册中心请求这些服务,得存储这些请求的服务,比如 gradingservice 就要依赖 logservice
定义一个结构 providers:
type providers struct { | |
services map[ServiceName][]string | |
mutex *sync.RWMutex | |
} |
其中储存了服务的提供者,定义了一个 map 结构,和一个读写锁
初始化这个结构体变量:
var prov = providers{ | |
services: make(map[ServiceName][]string), | |
mutex: new(sync.RWMutex), | |
} |
定义对于的更新方法,用于更新这个结构中的数据:
func (p *providers) Update(pat patch) { | |
p.mutex.Lock() | |
defer p.mutex.Unlock() | |
for _, patchEntry := range pat.Added { | |
if _, ok := p.services[patchEntry.Name]; !ok { | |
p.services[patchEntry.Name] = make([]string, 0) | |
} | |
p.services[patchEntry.Name] = append(p.services[patchEntry.Name], patchEntry.URL) | |
} | |
for _, patchEntry := range pat.Removed { | |
if providerURLs, ok := p.services[patchEntry.Name]; ok { | |
for i := range providerURLs { | |
if providerURLs[i] == patchEntry.URL { | |
p.services[patchEntry.Name] = append(providerURLs[:i], providerURLs[i+1:]...) | |
} | |
} | |
} | |
} | |
} |
先遍历 Added,也就是 patch 要增加的服务,如果 added 里的服务在 providers 中还不存在,则创建这个 service,然后将这个服务和对应 URL 写入 map,同理,接着遍历 Removed,已有的服务就将其删除
接着定义了一个 get 函数,通过服务名称找到对应的 URL:
func (p *providers) get(name ServiceName) (string, error) { | |
providers, ok := p.services[name] | |
if !ok { | |
return "", fmt.Errorf("no providers avaliable for service %v", name) | |
} | |
idx := int(rand.Float32() * float32(len(providers))) | |
return providers[idx], nil | |
} | |
func GetProvider(name ServiceName) (string, error) { | |
return prov.get(name) | |
} |
目前 providers 只有 logservice
接着再将 ServiceUpdateURL 绑定到一个 handler 上,从而能够对其进行处理:
func RegisterService(r Registration) error { | |
serviceUpdateURL, err := url.Parse(r.ServiceUpdateURL) | |
if err != nil { | |
return err | |
} | |
http.Handle(serviceUpdateURL.Path, &serviceUpdateHandler{}) | |
.... | |
} |
定义接口方法:
type serviceUpdateHandler struct{} | |
func (suh serviceUpdateHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { | |
if r.Method != http.MethodPost { | |
w.WriteHeader(http.StatusMethodNotAllowed) | |
return | |
} | |
dec := json.NewDecoder(r.Body) | |
var p patch | |
err := dec.Decode(&p) | |
if err != nil { | |
log.Println(err) | |
w.WriteHeader(http.StatusBadGateway) | |
} | |
prov.Update(p) | |
} |
ServeHTTP 中接收 POST 请求,将 Body 到数据反序列化后传入 Update 方法再调用
因为经过了一些扩展,所以要修改 Registration,增加所要依赖的服务和 URL 字段
type Registration struct { | |
ServiceName ServiceName | |
ServiceURL string | |
RequiredServices []ServiceName | |
ServiceUpdateURL string | |
} |
下面优化一下 logservice,目前日志服务只有一个服务端,要让客户端能够方便地使用提供的 logservice 服务,在 log 下再编写一个 client.go 文件:
package log | |
import ( | |
"bytes" | |
"distributed/registry" | |
"fmt" | |
stdlog "log" | |
"net/http" | |
) | |
func SetClientLogger(serviceURL string, clientService registry.ServiceName) { | |
stdlog.SetPrefix(fmt.Sprintf("[%v] - ", clientService)) | |
stdlog.SetFlags(0) // 不设置 flag | |
stdlog.SetOutput(&clientLogger{serviceURL}) | |
} | |
type clientLogger struct { | |
url string | |
} | |
func (cl clientLogger) Write(data []byte) (n int, err error) { | |
b := bytes.NewBuffer([]byte(data)) | |
res, err := http.Post(cl.url+"/log", "text/plain", b) | |
if err != nil { | |
return 0, err | |
} | |
if res.StatusCode != http.StatusOK { | |
return 0, fmt.Errorf("failed to send log message. ") | |
} | |
return len(data), nil | |
} |
SetClientLogger 负责设置客户端日志打印的相关的属性,因为要为 clientLogger 实现 io.Writer 接口,所以下面为其实现一个 Write 方法
扩展 gradingservice/main.go 中结构体的定义:
r := registry.Registration{ | |
ServiceName: registry.GradingService, | |
ServiceURL: serviceAddress, | |
RequiredServices: []registry.ServiceName{registry.LogService}, | |
ServiceUpdateURL: serviceAddress + "/services", | |
} |
logservice 同时接着还要获取 Provider,即获取提供日志的服务的名称,这便是服务发现
if logProvider, err := registry.GetProvider(registry.LogService); err == nil { | |
fmt.Printf("Logging service found at: %s\n", logProvider) | |
log.SetClientLogger(logProvider, r.ServiceName) | |
} |
运行测试:
先运行注册中心
$ go run . | |
2022/08/24 13:03:05 Request received | |
2022/08/24 13:03:05 Adding service: LogService with URL: http://localhost:4000 | |
2022/08/24 13:03:21 Request received | |
2022/08/24 13:03:21 Adding service: GradingService with URL: http://localhost:6000 |
运行日志服务
$ go run . | |
LogService started. Press any key to stop. |
运行 gradding 服务
$ go run . | |
GradingService started. Press any key to stop. | |
Logging service found at: http://localhost:4000 |
# 服务更新
对于一个服务,当所依赖的服务发生变化时,应当发出通知,例如 logservice,当这个服务停止时,应该发出一条通知来告知 gradingservice,启动时也应当发出一条通知,在上面的实现中,gradingservice 只能在启动时发现 logservice,并且 logservice 要先于 gradingservice 启动
在服务启动时就应该进行通知,所以在 add 函数中添加一个通知函数:
func (r *registry) add(reg Registration) error { | |
.... | |
err := r.sendRequiredServices(reg) | |
r.notify(patch{ | |
Added: []patchEntry{ | |
{ | |
Name: reg.ServiceName, | |
URL: reg.ServiceURL, | |
}, | |
}, | |
}) | |
return err | |
} |
通知函数的实现:
func (r *registry) notify(fullPatch patch) { | |
r.mutex.RLock() | |
defer r.mutex.RUnlock() | |
for _, reg := range r.registrations { | |
go func(reg Registration) { | |
for _, reqService := range reg.RequiredServices { | |
p := patch{ | |
Added: []patchEntry{}, | |
Removed: []patchEntry{}, | |
} | |
sendUpdate := false | |
for _, added := range fullPatch.Added { | |
if added.Name == reqService { | |
p.Added = append(p.Added, added) | |
sendUpdate = true | |
} | |
} | |
for _, removed := range fullPatch.Removed { | |
if removed.Name == reqService { | |
p.Removed = append(p.Removed, removed) | |
sendUpdate = true | |
} | |
} | |
if sendUpdate { | |
err := r.sendPatch(p, reg.ServiceUpdateURL) // 发送 patch | |
if err != nil { | |
log.Println(err) | |
return | |
} | |
} | |
} | |
}(reg) | |
} | |
} |
这个 notify 函数接收一个 patch 变量作为参数,其中包含的 added 和 removed,对应增加和移除的服务,函数体中要做的事就是先遍历所有的服务,然后遍历其所有的依赖服务,根据传入的 fullPatch 和 added 和 removed,如果依赖服务是要增加或移除的,则将 sendUpdate 标志设置为 true,产生 patch 并发送,进行更新,这些流程使用 goroutine 来并发地进行
在 registry/client.go 的 ServeHTTP 函数中增加一句提示信息,表示接收到了更新:
func (suh serviceUpdateHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { | |
.... | |
fmt.Printf("Updated received %v\n", p) | |
prov.Update(p) | |
} |
对 remove 函数同样要进行扩展:
func (r *registry) remove(url string) error { | |
for i := range reg.registrations { | |
if reg.registrations[i].ServiceURL == url { | |
r.notify(patch{ | |
Removed: []patchEntry{ | |
{ | |
Name: r.registrations[i].ServiceName, | |
URL: r.registrations[i].ServiceURL, | |
}, | |
}, | |
}) | |
... | |
return fmt.Errorf("service at URL %s not found", url) | |
} |
运行测试:先运行注册中心,然后运行 logservice,然后运行 gradingservice,之后关闭 logservice 再启动
注册中心:
$ go run . | |
2022/08/24 18:03:45 Request received | |
2022/08/24 18:03:45 Adding service: LogService with URL: http://localhost:4000 | |
2022/08/24 18:03:50 Request received | |
2022/08/24 18:03:50 Adding service: GradingService with URL: http://localhost:6000 | |
2022/08/24 18:03:53 Request received | |
2022/08/24 18:03:53 Removing servcice at URL: http://localhost:4000 | |
2022/08/24 18:03:53 Request received | |
2022/08/24 18:03:53 Removing servcice at URL: http://localhost:4000 | |
2022/08/24 18:04:12 Request received | |
2022/08/24 18:04:12 Adding service: LogService with URL: http://localhost:4000 |
日志服务:
$ go run . | |
LogService started. Press any key to stop. | |
Updated received {[] []} | |
2022/08/24 18:03:53 http: Server closed | |
Shutting down log service. | |
$ go run . | |
LogService started. Press any key to stop. | |
Updated received {[] []} |
grading 服务:
$ go run . | |
GradingService started. Press any key to stop. | |
Updated received {[{LogService http://localhost:4000}] []} | |
Logging service found at: http://localhost:4000 | |
Updated received {[] [{LogService http://localhost:4000}]} | |
Updated received {[{LogService http://localhost:4000}] []} |
# 服务状态监控
检查所有服务的健康状况,方法是发送心跳请求,得知服务是否能正常响应
在 Registration 中加一条 HeartbeatURL:
type Registration struct { | |
ServiceName ServiceName | |
ServiceURL string | |
RequiredServices []ServiceName | |
ServiceUpdateURL string | |
HeartbeatURL string | |
} |
增加心跳检查函数:
func (r *registry) heartbeat(freq time.Duration) { | |
for { | |
var wg sync.WaitGroup | |
for _, reg := range r.registrations { | |
wg.Add(1) | |
go func(reg Registration) { | |
defer wg.Done() | |
success := true | |
for attemps := 0; attemps < 3; attemps++ { | |
res, err := http.Get(reg.HeartbeatURL) | |
if err != nil { | |
log.Println(err) | |
} else if res.StatusCode == http.StatusOK { | |
log.Printf("Heartbeat check passed for %v", reg.ServiceName) | |
if !success { | |
r.add(reg) | |
} | |
break | |
} | |
log.Printf("Heartbeat check failed for %v", reg.ServiceName) | |
if success { | |
success = false | |
r.remove(reg.ServiceURL) | |
} | |
} | |
}(reg) | |
wg.Wait() | |
time.Sleep(freq) | |
} | |
} | |
} |
上述代码中用到了 waitgroup 和 goroutine,对每个服务的 HeartbeatURL 间隔 freq 的时间发起 get 请求,连续发送三次,如果有一次失败 (状态码不等于 200),那么 success 为 false,那么会调用 remove 来移除服务,如果 success 为 false 又得到了正常的相应,那么又会调用 add 将服务加回来
上述这段代码将会在如下函数中调用,将其定义在 registry/server.go 中,每隔 3 秒检查一次服务健康状况:
var once sync.Once | |
func SetupRegistryService() { | |
once.Do(func() { | |
go reg.heartbeat(3 * time.Second) | |
}) | |
} |
与此同时,所有的客户端都应该知道如何相应这种心跳请求:
func RegisterService(r Registration) error { | |
heartbeatURL, err := url.Parse(r.HeartbeatURL) | |
if err != nil { | |
return err | |
} | |
http.HandleFunc(heartbeatURL.Path, func(w http.ResponseWriter, r *http.Request) { | |
w.WriteHeader(http.StatusOK) | |
}) | |
.... | |
} |
在服务注册时,就在心跳 URL 注册一个 handler,处理心跳请求,最后为 logservice 和 gradingservice 都加上心跳 URL:
r := registry.Registration{ | |
ServiceName: registry.LogService, | |
ServiceURL: serviceAddress, | |
RequiredServices: make([]registry.ServiceName, 0), | |
ServiceUpdateURL: serviceAddress + "/services", | |
HeartbeatURL: serviceAddress + "/heartbeat", | |
} |
测试
启动注册中心和两个服务,中途关掉 logservice
注册中心的输出:
2022/08/24 19:12:24 Adding service: LogService with URL: http://localhost:4000 | |
2022/08/24 19:12:24 Heartbeat check passed for LogService | |
2022/08/24 19:12:26 Request received | |
2022/08/24 19:12:26 Adding service: GradingService with URL: http://localhost:6000 | |
2022/08/24 19:12:27 Heartbeat check passed for LogService | |
2022/08/24 19:12:30 Heartbeat check passed for GradingService | |
2022/08/24 19:12:33 Heartbeat check passed for LogService | |
2022/08/24 19:12:36 Heartbeat check passed for GradingService | |
2022/08/24 19:12:36 Request received | |
2022/08/24 19:12:36 Removing servcice at URL: http://localhost:4000 | |
2022/08/24 19:12:36 Request received | |
2022/08/24 19:12:36 Removing servcice at URL: http://localhost:4000 | |
2022/08/24 19:12:39 Heartbeat check passed for GradingService | |
2022/08/24 19:12:42 Heartbeat check passed for GradingService | |
2022/08/24 19:12:45 Heartbeat check passed for GradingService |