Теоретическая часть

Установка#

Ниже два варианта установки - локально или в кластере. Я выбрал для себя кластер, но приведу пример и для локальной установки.

Docker#

$ docker run -d \
    --name=consul-1 \
    -p 127.0.0.1:8500:8500 \
    -e CONSUL_BIND_INTERFACE=eth0 \
    consul:1.8

Убедимся, что агент отлично себя чувствует:

$ docker logs -f consul-1
...
==> Consul agent running!
...

Consul, запущенный из этого образа по умолчанию запускает и web UI. Доступ к нему не ограничен а сам интерфейс находится по адресу http://127.0.0.1:8500. Можно поиграться.

Запустим еще один пару клиентов. Они потребуются для дальнейшей работы. Для связи их друг между другом можно слинковать контейнеры, но в дальнейшем будет удобнее, если мы передадим IP адрес сервиса явно. Для этого получим текйщий IP ранее запущенного consul-1 сервиса:

$ docker inspect \
    --format='{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}' \
    consul-1 | head -1
172.17.0.3

В моем случае этот адрес 172.17.0.3. У вас же он может отличаться. Далее он потребуется нам для запуска других агентов:

$ docker run -d \
    --name=consul-2 \
    -e CONSUL_BIND_INTERFACE=eth0 \
    consul:1.8 agent -dev -join=172.17.0.3
$ docker run -d \
    --name=consul-3 \
    -e CONSUL_BIND_INTERFACE=eth0 \
    consul:1.8 agent -dev -join=172.17.0.3

На странице http://127.0.0.1:8500/ui/dc1/nodes можно убедиться в том, что все ноды живы и подключились к кластеру.

All nodes connected

Подключаться к нодам можно по адресам 172.17.0.3:8500 и соотв адресам (см docker inspect) других контейнеров. Могут быть проблемы с сетью на Mac OS, но это уже за рамками простого запуска.

Kubernetes#

Для работы нам потребуется kubernetes cluster. Рекомендую создать его в DigitalOcean по ссылке. Это займет пару минут. Выбирайте все самое последнее и самое ближайшее к себе. У меня версия 1.18 в San Francisco. Пул из 4х нод 1vCPU 3GB RAM. После создания DO подскажет вам как к нему подключиться. Все очень просто, работает из коробки.

Для установки используем Helm:

Добавим чарт с Consul в наш индекс и проверим, что он успешно добавлен:

$ helm repo add hashicorp https://helm.releases.hashicorp.com
...

$ helm search repo hashicorp/consul
NAME                    CHART VERSION   APP VERSION     DESCRIPTION
hashicorp/consul        0.24.1          1.8.2           Official HashiCorp Consul Chart

Все отлично. Деплоим:

$ helm install consul hashicorp/consul --set global.name=consul --wait

Helm дождется развертывания приложения и отдаст управление обратно. Не прерывайте его. Если хочется посмотреть на то, что происходит - запустите в соседнем терминале k9s.

Установка завершена, все инстансы (три сервера и 4 агента) будут в спсике подов ващего кластера.

Kubernetes: all pods is OK

Необходимо пробросить порты подов на локальную машину. Я бы посоветовал telepresence, но не у всех он заводится. Гарантировано мы сможем это сделать через port-forward.

Прокидываем порт web UI и порты трех из четырех агентов.

$ kubectl port-forward service/consul-ui 9000:80

$ kubectl get po | grep -v server
NAME              READY   STATUS    RESTARTS   AGE
consul-2b44z      1/1     Running   0          23h
consul-jqs6v      1/1     Running   0          23h
consul-mjwr6      1/1     Running   0          24h
consul-ph4sg      1/1     Running   0          24h

# выбираем любые (я выбрал первые три) пода и пробрасываем порты на локальную машину
$ kubectl port-forward pod/consul-2b44z 8500:8500
$ kubectl port-forward pod/consul-jqs6v 8501:8500
$ kubectl port-forward pod/consul-mjwr6 8502:8500

На этом этапе у нас есть доступ к webUI и к трем агентам.

Go#

TLDR: репозиторий

Подключаемся к Consul:

package main

import (
	"fmt"
	"github.com/hashicorp/consul/api"
	"os"
)

func main() {
	config := api.DefaultConfig()
	config.Address = os.Getenv("CONSUL_ADDR")
	consul, err := api.NewClient(config)
	if err != nil {
		panic(err)
	}

	fmt.Println(consul.Agent().NodeName())
	<-make(chan int)
}

Вместо os.Getenv("CONSUL_ADDR") конечно же можно использовать адрес, явно указанный в формате 127.0.0.1:8500.

Для начала нам необходим процесс, который будет предоставлять нам актуальный идентификатор сессии, следить за ее обновлением и в случае проблем - пересоздавать. Результатом работы процесса будет предоставление идентификатора сессии и больше ничего. В силу отсутствия у Consul возможности поиска по списку существующих сессий, нам необходимо написать свою. Искать существующую сессию стоит, хотя этот шаг и не является обязательным. Если сервис был перезапущен, то его сессиия вероятно еще существует, а так же возможно, что именно этот инстанс и является мастером. В таком случае, во избежание переизбрания мастера, необходимо найти свою сессию и попытаться ее продлить. Функция для поиска сессии:

package main

import "context"

func getSessionByName(ctx context.Context, consul *api.Client, nodeName, sessionName string) (*api.SessionEntry, error) {
	opts := (&api.QueryOptions{}).WithContext(ctx)
	sessions, _, err := consul.Session().Node(nodeName, opts)
	if err != nil {
		return nil, errors.Wrap(err, "cannot get sessions from consul")
	}

	for _, session := range sessions {
		if session.Name != sessionName {
			continue
		}

		return session, nil
	}

	return nil, errors.Wrap(ErrNotFound, "session not found")
}

Если сессия найдена, то обновляем ее. В ином случае - пересоздаем. Ниже приведен листинг функции, которая этим занимается. После определения текущей сессии - печатаем ее идентификатор на экран.

package main

import (
	"context"
	"fmt"
)

func renew(ctx context.Context, consul *api.Client) error {
	doneCh := make(chan struct{})
	defer close(doneCh)

	for {
		nodeName, err := consul.Agent().NodeName()
		if err != nil {
			return errors.Wrap(err, "cannot get agent node name")
		}

		var sessionID string
		session, err := getSessionByName(ctx, consul, nodeName, sessionName)
		if err != nil {
			opts := new(api.WriteOptions).WithContext(ctx)
			sessionID, _, err = consul.Session().Create(&api.SessionEntry{
				Name:     sessionName,
				Behavior: api.SessionBehaviorRelease,
				TTL:      ttl.String(),
			}, opts)
			if err != nil {
				return errors.Wrap(err, "cannot create session")
			}
		} else {
			sessionID = session.ID
		}

		fmt.Println("sessionID", sessionID)

		opts := new(api.WriteOptions).WithContext(ctx)
		err2 := consul.Session().RenewPeriodic(ttl.String(), sessionID, opts, doneCh)
		if err2 != nil {
			return errors.Wrap(err2, "cannot renew session")
		}
	}
}

Можно вызвать функцию из main.go и запустить приложение.

package main

import "context"

func main() {
	ctx := context.Background()
	if err := renew(ctx, consul); err != nil {
		panic(err)
	}
}

На экран будет выведен идентификатор только что созданной сессии. Данная сессия будет обновляться так долго, как долго живет текущее приложение, однако если удалить сессию через интерфейс Consul, приложение будет завершено. Для того, что бы функция renew запускалась после ошибки нам необходимо завернуть ее в backoff, а для того, что бы получать обновленный идентификатор сессии - вернуть из функции канал с идентификаторами sessionID chan string. Перенесем код, связанный с сессиями в отдельный пакет sessions:

package sessions

import (
	"context"
	"github.com/cenkalti/backoff/v4"
	"github.com/hashicorp/consul/api"
	"github.com/pkg/errors"
	"go.uber.org/zap"
	"time"
)

var ErrNotFound = errors.New("not found")

type Service struct {
	consul *api.Client
	logger *zap.Logger

	sessionIDCh chan string
}

func New(logger *zap.Logger, consul *api.Client) (*Service, error) {
	return &Service{
		consul:      consul,
		logger:      logger,
		sessionIDCh: make(chan string),
	}, nil
}

func (s *Service) CreateRenew(ctx context.Context, ttl time.Duration, sessionName string) error {
	fn := func() error {
		doneCh := make(chan struct{})
		defer close(doneCh)

		for {
			nodeName, err := s.consul.Agent().NodeName()
			if err != nil {
				return errors.Wrap(err, "cannot get agent node name")
			}

			var sessionID string
			session, err := s.getSessionByName(ctx, nodeName, sessionName)
			if err != nil {
				opts := new(api.WriteOptions).WithContext(ctx)
				sessionID, _, err = s.consul.Session().Create(&api.SessionEntry{
					Name:     sessionName,
					Behavior: api.SessionBehaviorRelease,
					TTL:      ttl.String(),
				}, opts)
				if err != nil {
					return errors.Wrap(err, "cannot create session")
				}
			} else {
				sessionID = session.ID
			}

			select {
			case <-ctx.Done():
			case s.sessionIDCh <- sessionID:
			}

			opts := new(api.WriteOptions).WithContext(ctx)
			err2 := s.consul.Session().RenewPeriodic(ttl.String(), sessionID, opts, doneCh)
			if err2 != nil {
				return errors.Wrap(err2, "cannot renew session")
			}
		}
	}

	if err := backoff.Retry(fn, backoff.NewExponentialBackOff()); err != nil {
		return errors.Wrap(err, "backoff error")
	}

	return nil
}

func (s *Service) getSessionByName(ctx context.Context, nodeName, sessionName string) (*api.SessionEntry, error) {
	opts := (&api.QueryOptions{}).WithContext(ctx)
	sessions, _, err := s.consul.Session().Node(nodeName, opts)
	if err != nil {
		return nil, errors.Wrap(err, "cannot get sessions from consul")
	}

	for _, session := range sessions {
		if session.Name != sessionName {
			continue
		}

		return session, nil
	}

	return nil, errors.Wrap(ErrNotFound, "session not found")
}

func (s *Service) GetSessionID() <-chan string {
	return s.sessionIDCh
}

Создав сессию, мы можем от ее имени захватить или создать и захватить ключ в KV:

package main

func main() {
	key = &api.KVPair{
		Key:     "services/my-service/leader",
		Value:   []byte(nodeName),
		Session: sessionID,
	}
	opts := (&api.WriteOptions{}).WithContext(ctx)
	acquire, _, err := s.consul.KV().Acquire(key, opts)
	if err != nil {
		panic(err)
	}
}

Ключ может быть захвачен другой сессией, а так же ключ может быть удален, даже если его захватила текущая сессия. Во избежание этого необходимо всегда следить за выбранным ключом и проверять его состояние. Consul позволяет создать такой запрос на получение ключа, который не вернет ответа до следующего его изменения. Этот механизм позволит нам в большинстве случаев получить изменение ключа почти моментально. Однако механизм не может предоставить всю историю изменений, а это значит, что в некоторых случаях мы можем пропустить изменение ключа. Можно сделать более точный механизм детекта изменений на основе некоторых счетчиков в объекте ключа, но мы будем переиодически перезапрашивать его состояние. Ниже пакет для подписки на изменение ключа keys:

package keys

import (
	"context"
	"github.com/hashicorp/consul/api"
	"go.uber.org/zap"
	"time"
)

type Service struct {
	logger   *zap.Logger
	consul   *api.Client
	waitTime time.Duration
}

func New(logger *zap.Logger, consul *api.Client, waitTime time.Duration) (*Service, error) {
	return &Service{
		logger:   logger,
		consul:   consul,
		waitTime: waitTime,
	}, nil
}

func (s *Service) WatchKeyChanges(ctx context.Context, key string) <-chan *api.KVPair {
	ch := make(chan *api.KVPair)

	go func() {
		defer close(ch)

		delay := 300 * time.Millisecond
		var waitIndex uint64

		for {
			select {
			case <-ctx.Done():
				return
			default:
				opts := (&api.QueryOptions{
					WaitIndex: waitIndex,
					WaitTime:  s.waitTime,
				}).WithContext(ctx)
				res, _, err := s.consul.KV().Get(key, opts)
				if err != nil {
					s.logger.Warn("cannot get key", zap.Error(err))
					time.Sleep(delay)
					continue
				}
				if res == nil {
					ch <- nil
					time.Sleep(delay)
					continue
				}

				ch <- res
				waitIndex = res.ModifyIndex
			}
		}
	}()

	return ch
}

Здесь мы в цикле отправляем запрос на получение информации о ключе. Однако каждый запрос использует особенность Consul с ожиданием waitTime изменения ключа. В случае если ключ не существует - ждем короткое время (сейчас его кто-то пересоздаст) и повторяем запрос. Изменения ключа отправляем в канал, который будет использоваться для принятия решения далее.

Соберем все вместе в третьем пакете. Третий сервис запустит сервис с сессиями и сервис с ключами и будет обрабатывать изменения от обоих, в случае необходимости - захватывать ключ от имени новой сессии.

package election

import (
	"context"
	"github.com/dev-services42/leader-election-lib/leader-election/keys"
	"github.com/dev-services42/leader-election-lib/leader-election/sessions"
	"github.com/hashicorp/consul/api"
	"go.uber.org/zap"
	"time"
)

type Service struct {
	sessions    *sessions.Service
	keys        *keys.Service
	logger      *zap.Logger
	ttl         time.Duration
	sessionName string
	keyName     string
	consul      *api.Client
}

func New(logger *zap.Logger, consul *api.Client, srvSessions *sessions.Service, srvKeys *keys.Service, ttl time.Duration, sessionName, keyName string) (*Service, error) {
	return &Service{
		logger:      logger,
		consul:      consul,
		sessions:    srvSessions,
		keys:        srvKeys,
		ttl:         ttl,
		sessionName: sessionName,
		keyName:     keyName,
	}, nil
}

func (s *Service) RunLeaderElection(ctx context.Context) <-chan bool {
	ch := make(chan bool)

	go func() {
		err := s.sessions.CreateRenew(ctx, s.ttl, s.sessionName)
		if err != nil {
			s.logger.Fatal("cannot create/renew session", zap.Error(err))
		}
	}()

	keysChanged := s.keys.WatchKeyChanges(ctx, s.keyName)

	go func() {
		defer close(ch)

		var key *api.KVPair
		var sessionID string
		for {
			select {
			case <-ctx.Done():
				return
			case sid, ok := <-s.sessions.GetSessionID():
				if !ok {
					return
				}

				sessionID = sid
			case k, ok := <-keysChanged:
				if !ok {
					return
				}

				key = k
			}

			master := s.handleChanges(ctx, sessionID, key)
			select {
			case <-ctx.Done():
			case ch <- master:
			}
		}
	}()

	return ch
}

func (s *Service) handleChanges(ctx context.Context, sessionID string, key *api.KVPair) bool {
	if sessionID == "" {
		// нужно подождать создания сессии
		return false
	}

	nodeName, err := s.consul.Agent().NodeName()
	if err != nil {
		return false
	}

	if key == nil || key.Session == "" {
		key = &api.KVPair{
			Key:     s.keyName,
			Value:   []byte(nodeName),
			Session: sessionID,
		}
		opts := (&api.WriteOptions{}).WithContext(ctx)
		acquire, _, err := s.consul.KV().Acquire(key, opts)
		if err != nil {
			return false
		}

		return acquire
	}

	if string(key.Value) != nodeName {
		return false
	}

	if key.Session != sessionID {
		return false
	}

	return true
}

В листинге выше находится реализация теоретической части из пред. статьи. Результирующий main.go, с которым можно поиграть находится здесь.