Выбор лидера при помощи Consul. Part 2
Установка#
Ниже два варианта установки - локально или в кластере. Я выбрал для себя кластер, но приведу пример и для локальной установки.
Docker#
$ docker run -d \
--name=consul-1 \
-p 127.0.0.1:8500:8500 \
-e CONSUL_BIND_INTERFACE=eth0 \
consul:1.8
Убедимся, что агент отлично себя чувствует:
$ docker logs -f consul-1
...
==> Consul agent running!
...
Consul, запущенный из этого образа по умолчанию запускает и web UI. Доступ к нему не ограничен а сам интерфейс находится по адресу http://127.0.0.1:8500. Можно поиграться.
Запустим еще один пару клиентов. Они потребуются для дальнейшей работы. Для связи их друг между другом можно слинковать
контейнеры, но в дальнейшем будет удобнее, если мы передадим IP адрес сервиса явно. Для этого получим текйщий IP ранее
запущенного consul-1
сервиса:
$ docker inspect \
--format='{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}' \
consul-1 | head -1
172.17.0.3
В моем случае этот адрес 172.17.0.3
. У вас же он может отличаться. Далее он потребуется нам для запуска других
агентов:
$ docker run -d \
--name=consul-2 \
-e CONSUL_BIND_INTERFACE=eth0 \
consul:1.8 agent -dev -join=172.17.0.3
$ docker run -d \
--name=consul-3 \
-e CONSUL_BIND_INTERFACE=eth0 \
consul:1.8 agent -dev -join=172.17.0.3
На странице http://127.0.0.1:8500/ui/dc1/nodes можно убедиться в том, что все ноды живы и подключились к кластеру.
Подключаться к нодам можно по адресам 172.17.0.3:8500 и соотв адресам (см docker inspect) других контейнеров. Могут быть проблемы с сетью на Mac OS, но это уже за рамками простого запуска.
Kubernetes#
Для работы нам потребуется kubernetes cluster. Рекомендую создать его в DigitalOcean по ссылке. Это займет пару минут. Выбирайте все самое последнее и самое ближайшее к себе. У меня версия 1.18 в San Francisco. Пул из 4х нод 1vCPU 3GB RAM. После создания DO подскажет вам как к нему подключиться. Все очень просто, работает из коробки.
Для установки используем Helm:
Добавим чарт с Consul в наш индекс и проверим, что он успешно добавлен:
$ helm repo add hashicorp https://helm.releases.hashicorp.com
...
$ helm search repo hashicorp/consul
NAME CHART VERSION APP VERSION DESCRIPTION
hashicorp/consul 0.24.1 1.8.2 Official HashiCorp Consul Chart
Все отлично. Деплоим:
$ helm install consul hashicorp/consul --set global.name=consul --wait
Helm дождется развертывания приложения и отдаст управление обратно. Не прерывайте его. Если хочется посмотреть на то, что происходит - запустите в соседнем терминале k9s.
Установка завершена, все инстансы (три сервера и 4 агента) будут в спсике подов ващего кластера.
Необходимо пробросить порты подов на локальную машину. Я бы посоветовал telepresence, но не у всех он заводится. Гарантировано мы сможем это сделать через port-forward.
Прокидываем порт web UI и порты трех из четырех агентов.
$ kubectl port-forward service/consul-ui 9000:80
$ kubectl get po | grep -v server
NAME READY STATUS RESTARTS AGE
consul-2b44z 1/1 Running 0 23h
consul-jqs6v 1/1 Running 0 23h
consul-mjwr6 1/1 Running 0 24h
consul-ph4sg 1/1 Running 0 24h
# выбираем любые (я выбрал первые три) пода и пробрасываем порты на локальную машину
$ kubectl port-forward pod/consul-2b44z 8500:8500
$ kubectl port-forward pod/consul-jqs6v 8501:8500
$ kubectl port-forward pod/consul-mjwr6 8502:8500
На этом этапе у нас есть доступ к webUI и к трем агентам.
Go#
TLDR: репозиторий
Подключаемся к Consul:
package main
import (
"fmt"
"github.com/hashicorp/consul/api"
"os"
)
func main() {
config := api.DefaultConfig()
config.Address = os.Getenv("CONSUL_ADDR")
consul, err := api.NewClient(config)
if err != nil {
panic(err)
}
fmt.Println(consul.Agent().NodeName())
<-make(chan int)
}
Вместо os.Getenv("CONSUL_ADDR")
конечно же можно использовать адрес, явно указанный в формате 127.0.0.1:8500
.
Для начала нам необходим процесс, который будет предоставлять нам актуальный идентификатор сессии, следить за ее обновлением и в случае проблем - пересоздавать. Результатом работы процесса будет предоставление идентификатора сессии и больше ничего. В силу отсутствия у Consul возможности поиска по списку существующих сессий, нам необходимо написать свою. Искать существующую сессию стоит, хотя этот шаг и не является обязательным. Если сервис был перезапущен, то его сессиия вероятно еще существует, а так же возможно, что именно этот инстанс и является мастером. В таком случае, во избежание переизбрания мастера, необходимо найти свою сессию и попытаться ее продлить. Функция для поиска сессии:
package main
import "context"
func getSessionByName(ctx context.Context, consul *api.Client, nodeName, sessionName string) (*api.SessionEntry, error) {
opts := (&api.QueryOptions{}).WithContext(ctx)
sessions, _, err := consul.Session().Node(nodeName, opts)
if err != nil {
return nil, errors.Wrap(err, "cannot get sessions from consul")
}
for _, session := range sessions {
if session.Name != sessionName {
continue
}
return session, nil
}
return nil, errors.Wrap(ErrNotFound, "session not found")
}
Если сессия найдена, то обновляем ее. В ином случае - пересоздаем. Ниже приведен листинг функции, которая этим занимается. После определения текущей сессии - печатаем ее идентификатор на экран.
package main
import (
"context"
"fmt"
)
func renew(ctx context.Context, consul *api.Client) error {
doneCh := make(chan struct{})
defer close(doneCh)
for {
nodeName, err := consul.Agent().NodeName()
if err != nil {
return errors.Wrap(err, "cannot get agent node name")
}
var sessionID string
session, err := getSessionByName(ctx, consul, nodeName, sessionName)
if err != nil {
opts := new(api.WriteOptions).WithContext(ctx)
sessionID, _, err = consul.Session().Create(&api.SessionEntry{
Name: sessionName,
Behavior: api.SessionBehaviorRelease,
TTL: ttl.String(),
}, opts)
if err != nil {
return errors.Wrap(err, "cannot create session")
}
} else {
sessionID = session.ID
}
fmt.Println("sessionID", sessionID)
opts := new(api.WriteOptions).WithContext(ctx)
err2 := consul.Session().RenewPeriodic(ttl.String(), sessionID, opts, doneCh)
if err2 != nil {
return errors.Wrap(err2, "cannot renew session")
}
}
}
Можно вызвать функцию из main.go и запустить приложение.
package main
import "context"
func main() {
ctx := context.Background()
if err := renew(ctx, consul); err != nil {
panic(err)
}
}
На экран будет выведен идентификатор только что созданной сессии. Данная сессия будет обновляться так долго, как долго
живет текущее приложение, однако если удалить сессию через интерфейс Consul, приложение будет завершено. Для того, что
бы функция renew
запускалась после ошибки нам необходимо завернуть ее в backoff, а для того, что бы получать
обновленный идентификатор сессии - вернуть из функции канал с идентификаторами sessionID chan string
. Перенесем код,
связанный с сессиями в отдельный пакет sessions
:
package sessions
import (
"context"
"github.com/cenkalti/backoff/v4"
"github.com/hashicorp/consul/api"
"github.com/pkg/errors"
"go.uber.org/zap"
"time"
)
var ErrNotFound = errors.New("not found")
type Service struct {
consul *api.Client
logger *zap.Logger
sessionIDCh chan string
}
func New(logger *zap.Logger, consul *api.Client) (*Service, error) {
return &Service{
consul: consul,
logger: logger,
sessionIDCh: make(chan string),
}, nil
}
func (s *Service) CreateRenew(ctx context.Context, ttl time.Duration, sessionName string) error {
fn := func() error {
doneCh := make(chan struct{})
defer close(doneCh)
for {
nodeName, err := s.consul.Agent().NodeName()
if err != nil {
return errors.Wrap(err, "cannot get agent node name")
}
var sessionID string
session, err := s.getSessionByName(ctx, nodeName, sessionName)
if err != nil {
opts := new(api.WriteOptions).WithContext(ctx)
sessionID, _, err = s.consul.Session().Create(&api.SessionEntry{
Name: sessionName,
Behavior: api.SessionBehaviorRelease,
TTL: ttl.String(),
}, opts)
if err != nil {
return errors.Wrap(err, "cannot create session")
}
} else {
sessionID = session.ID
}
select {
case <-ctx.Done():
case s.sessionIDCh <- sessionID:
}
opts := new(api.WriteOptions).WithContext(ctx)
err2 := s.consul.Session().RenewPeriodic(ttl.String(), sessionID, opts, doneCh)
if err2 != nil {
return errors.Wrap(err2, "cannot renew session")
}
}
}
if err := backoff.Retry(fn, backoff.NewExponentialBackOff()); err != nil {
return errors.Wrap(err, "backoff error")
}
return nil
}
func (s *Service) getSessionByName(ctx context.Context, nodeName, sessionName string) (*api.SessionEntry, error) {
opts := (&api.QueryOptions{}).WithContext(ctx)
sessions, _, err := s.consul.Session().Node(nodeName, opts)
if err != nil {
return nil, errors.Wrap(err, "cannot get sessions from consul")
}
for _, session := range sessions {
if session.Name != sessionName {
continue
}
return session, nil
}
return nil, errors.Wrap(ErrNotFound, "session not found")
}
func (s *Service) GetSessionID() <-chan string {
return s.sessionIDCh
}
Создав сессию, мы можем от ее имени захватить или создать и захватить ключ в KV:
package main
func main() {
key = &api.KVPair{
Key: "services/my-service/leader",
Value: []byte(nodeName),
Session: sessionID,
}
opts := (&api.WriteOptions{}).WithContext(ctx)
acquire, _, err := s.consul.KV().Acquire(key, opts)
if err != nil {
panic(err)
}
}
Ключ может быть захвачен другой сессией, а так же ключ может быть удален, даже если его захватила текущая сессия. Во
избежание этого необходимо всегда следить за выбранным ключом и проверять его состояние. Consul позволяет создать такой
запрос на получение ключа, который не вернет ответа до следующего его изменения. Этот механизм позволит нам в
большинстве случаев получить изменение ключа почти моментально. Однако механизм не может предоставить всю историю
изменений, а это значит, что в некоторых случаях мы можем пропустить изменение ключа. Можно сделать более точный
механизм детекта изменений на основе некоторых счетчиков в объекте ключа, но мы будем переиодически перезапрашивать его
состояние. Ниже пакет для подписки на изменение ключа keys
:
package keys
import (
"context"
"github.com/hashicorp/consul/api"
"go.uber.org/zap"
"time"
)
type Service struct {
logger *zap.Logger
consul *api.Client
waitTime time.Duration
}
func New(logger *zap.Logger, consul *api.Client, waitTime time.Duration) (*Service, error) {
return &Service{
logger: logger,
consul: consul,
waitTime: waitTime,
}, nil
}
func (s *Service) WatchKeyChanges(ctx context.Context, key string) <-chan *api.KVPair {
ch := make(chan *api.KVPair)
go func() {
defer close(ch)
delay := 300 * time.Millisecond
var waitIndex uint64
for {
select {
case <-ctx.Done():
return
default:
opts := (&api.QueryOptions{
WaitIndex: waitIndex,
WaitTime: s.waitTime,
}).WithContext(ctx)
res, _, err := s.consul.KV().Get(key, opts)
if err != nil {
s.logger.Warn("cannot get key", zap.Error(err))
time.Sleep(delay)
continue
}
if res == nil {
ch <- nil
time.Sleep(delay)
continue
}
ch <- res
waitIndex = res.ModifyIndex
}
}
}()
return ch
}
Здесь мы в цикле отправляем запрос на получение информации о ключе. Однако каждый запрос использует особенность Consul с
ожиданием waitTime
изменения ключа. В случае если ключ не существует - ждем короткое время (сейчас его кто-то
пересоздаст) и повторяем запрос. Изменения ключа отправляем в канал, который будет использоваться для принятия решения
далее.
Соберем все вместе в третьем пакете. Третий сервис запустит сервис с сессиями и сервис с ключами и будет обрабатывать изменения от обоих, в случае необходимости - захватывать ключ от имени новой сессии.
package election
import (
"context"
"github.com/dev-services42/leader-election-lib/leader-election/keys"
"github.com/dev-services42/leader-election-lib/leader-election/sessions"
"github.com/hashicorp/consul/api"
"go.uber.org/zap"
"time"
)
type Service struct {
sessions *sessions.Service
keys *keys.Service
logger *zap.Logger
ttl time.Duration
sessionName string
keyName string
consul *api.Client
}
func New(logger *zap.Logger, consul *api.Client, srvSessions *sessions.Service, srvKeys *keys.Service, ttl time.Duration, sessionName, keyName string) (*Service, error) {
return &Service{
logger: logger,
consul: consul,
sessions: srvSessions,
keys: srvKeys,
ttl: ttl,
sessionName: sessionName,
keyName: keyName,
}, nil
}
func (s *Service) RunLeaderElection(ctx context.Context) <-chan bool {
ch := make(chan bool)
go func() {
err := s.sessions.CreateRenew(ctx, s.ttl, s.sessionName)
if err != nil {
s.logger.Fatal("cannot create/renew session", zap.Error(err))
}
}()
keysChanged := s.keys.WatchKeyChanges(ctx, s.keyName)
go func() {
defer close(ch)
var key *api.KVPair
var sessionID string
for {
select {
case <-ctx.Done():
return
case sid, ok := <-s.sessions.GetSessionID():
if !ok {
return
}
sessionID = sid
case k, ok := <-keysChanged:
if !ok {
return
}
key = k
}
master := s.handleChanges(ctx, sessionID, key)
select {
case <-ctx.Done():
case ch <- master:
}
}
}()
return ch
}
func (s *Service) handleChanges(ctx context.Context, sessionID string, key *api.KVPair) bool {
if sessionID == "" {
// нужно подождать создания сессии
return false
}
nodeName, err := s.consul.Agent().NodeName()
if err != nil {
return false
}
if key == nil || key.Session == "" {
key = &api.KVPair{
Key: s.keyName,
Value: []byte(nodeName),
Session: sessionID,
}
opts := (&api.WriteOptions{}).WithContext(ctx)
acquire, _, err := s.consul.KV().Acquire(key, opts)
if err != nil {
return false
}
return acquire
}
if string(key.Value) != nodeName {
return false
}
if key.Session != sessionID {
return false
}
return true
}
В листинге выше находится реализация теоретической части из пред. статьи. Результирующий main.go, с которым можно поиграть находится здесь.