基于环状队列和迭代器如何实现分布式任务RR分配策略

76次阅读
没有评论

共计 6353 个字符,预计需要花费 16 分钟才能阅读完成。

这期内容当中丸趣 TV 小编将会给大家带来有关基于环状队列和迭代器如何实现分布式任务 RR 分配策略,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。

# 背景

## 分布式任务分配

在很多运维场景下,我们都会执行一些长时间的任务,比如装机、部署环境、打包镜像等长时间任务,而通常我们的任务节点数量通常是有限的 (排除基于 k8s 的 hpa、或者 knative 等自动伸缩场景)。

那么当我们有一个任务如何根据当前的 worker 和 corrdinator 和任务来进行合理的分配,分配其实也比较复杂,往复杂里面做,可以根据当前系统的负载、每个任务的执行资源消耗、当前集群的任务数量等,这里我们就搞一个最简单的,基于任务和当前 worker 的 RR 算法

## 系统架构

在 worker 和任务队列之间,添加一层协调调度层 Coordinator,由它来根据当前集群任务的状态来进行任务的分配,同时感知当前集群 worker 和 task 的状态,协调整个集群任务的执行、终止等操作

# 单机实现

## 整体设计

members: 表示当前集群中所有的 worker

tasks: 就是当前的任务

Coordinator: 就是我们的协调者,负责根据 members 和 tasks 进行任务的分配

result: 就是分配的结果

## CircularIterator

CircularIterator 就是我们的环状对立迭代器, 拥有两个方法,一个是 add 添加 member, 一个 Next 返回基于 rr 的下一个 member

“`go

// CircularIterator 环状迭代器

type CircularIterator struct {

list []interface{}    // 保存所有的成员变量

next int

}

// Next 返回下一个元素

func (c *CircularIterator) Next() interface{} {

item := c.list[c.next]

c.next = (c.next + 1) % len(c.list)

return item

}

// Add 添加任务

func (c *CircularIterator) Add(v interface{}) bool {

for _, item := range c.list {

if v == item {

return false

}

}

c.list = append(c.list, v)

return true

}

“`

## Member Task

Member 就是负责执行任务的 worker, 有一个 AddTask 方法和 Execute 方法负责任务的执行和添加任务

Task 标识一个任务

“`go

// Member 任务组成员

type Member struct {

id    int

tasks []*Task

}

// ID 返回当前 memberID

func (m *Member) ID() int {

return m.id

}

// AddTask 为 member 添加任务

func (m *Member) AddTask(t *Task) bool {

for _, task := range m.tasks {

if task == t {

return false

}

}

m.tasks = append(m.tasks, t)

return true

}

// Execute 执行任务

func (m *Member) Execute() {

for _, task := range m.tasks {

fmt.Printf(Member %d run task %s\n , m.ID(), task.Execute())

}

}

// Task 任务

type Task struct {

name string

}

// Execute 执行 task 返回结果

func (t *Task) Execute() string {

return Task + t.name + run success

}

“`

## Coordinator

Coordinator 是协调器,负责根据 Member 和 task 进行集群任务的协调调度

“`go

// Task 任务

type Task struct {

name string

}

// Execute 执行 task 返回结果

func (t *Task) Execute() string {

return Task + t.name + run success

}

// Coordinator 协调者

type Coordinator struct {

members []*Member

tasks   []*Task

}

// TaskAssignments 为 member 分配任务

func (c *Coordinator) TaskAssignments() map[int]*Member {

taskAssignments := make(map[int]*Member)

// 构建迭代器

memberIt := c.getMemberIterator()

for _, task := range c.tasks {

member := memberIt.Next().(*Member)

_, err := taskAssignments[member.ID()]

if err == false {

taskAssignments[member.ID()] = member

}

member.AddTask(task)

}

return taskAssignments

}

func (c *Coordinator) getMemberIterator() *CircularIterator {

// 通过当前成员, 构造成员队列

members := make([]interface{}, len(c.members))

for index, member := range c.members {

members[index] = member

}

return NewCircularIterftor(members)

}

// AddMember 添加 member 组成员

func (c *Coordinator) AddMember(m *Member) bool {

for _, member := range c.members {

if member == m {

return false

}

}

c.members = append(c.members, m)

return true

}

// AddTask 添加任务

func (c *Coordinator) AddTask(t *Task) bool {

for _, task := range c.tasks {

if task == t {

return false

}

}

c.tasks = append(c.tasks, t)

return true

}

“`

## 测试

我们首先创建一堆 member 和 task, 然后调用 coordinator 进行任务分配,执行任务结果

“`go

coordinator := NewCoordinator()

for i := 0; i i++ {

m := Member{id: i}

coordinator.AddMember(m)

}

for i := 0; i i++ {

t := Task{name: fmt.Sprintf( task %d , i)}

coordinator.AddTask(t)

}

result := coordinator.TaskAssignments()

for _, member := range result {

member.Execute()

}

“`

## 结果

可以看到每个 worker 均匀的得到任务分配

“`bash

Member 6 run task Task task 6 run success

Member 6 run task Task task 16 run success

Member 6 run task Task task 26 run success

Member 8 run task Task task 8 run success

Member 8 run task Task task 18 run success

Member 8 run task Task task 28 run success

Member 0 run task Task task 0 run success

Member 0 run task Task task 10 run success

Member 0 run task Task task 20 run success

Member 3 run task Task task 3 run success

Member 3 run task Task task 13 run success

Member 3 run task Task task 23 run success

Member 4 run task Task task 4 run success

Member 4 run task Task task 14 run success

Member 4 run task Task task 24 run success

Member 7 run task Task task 7 run success

Member 7 run task Task task 17 run success

Member 7 run task Task task 27 run success

Member 9 run task Task task 9 run success

Member 9 run task Task task 19 run success

Member 9 run task Task task 29 run success

Member 1 run task Task task 1 run success

Member 1 run task Task task 11 run success

Member 1 run task Task task 21 run success

Member 2 run task Task task 2 run success

Member 2 run task Task task 12 run success

Member 2 run task Task task 22 run success

Member 5 run task Task task 5 run success

Member 5 run task Task task 15 run success

Member 5 run task Task task 25 run success

“`

## 完整代码

“`go

package main

import fmt

// CircularIterator 环状迭代器

type CircularIterator struct {

list []interface{}

next int

}

// Next 返回下一个元素

func (c *CircularIterator) Next() interface{} {

item := c.list[c.next]

c.next = (c.next + 1) % len(c.list)

return item

}

// Add 添加任务

func (c *CircularIterator) Add(v interface{}) bool {

for _, item := range c.list {

if v == item {

return false

}

}

c.list = append(c.list, v)

return true

}

// Member 任务组成员

type Member struct {

id    int

tasks []*Task

}

// ID 返回当前 memberID

func (m *Member) ID() int {

return m.id

}

// AddTask 为 member 添加任务

func (m *Member) AddTask(t *Task) bool {

for _, task := range m.tasks {

if task == t {

return false

}

}

m.tasks = append(m.tasks, t)

return true

}

// Execute 执行任务

func (m *Member) Execute() {

for _, task := range m.tasks {

fmt.Printf(Member %d run task %s\n , m.ID(), task.Execute())

}

}

// Task 任务

type Task struct {

name string

}

// Execute 执行 task 返回结果

func (t *Task) Execute() string {

return Task + t.name + run success

}

// Coordinator 协调者

type Coordinator struct {

members []*Member

tasks   []*Task

}

// TaskAssignments 为 member 分配任务

func (c *Coordinator) TaskAssignments() map[int]*Member {

taskAssignments := make(map[int]*Member)

// 构建迭代器

memberIt := c.getMemberIterator()

for _, task := range c.tasks {

member := memberIt.Next().(*Member)

_, err := taskAssignments[member.ID()]

if err == false {

taskAssignments[member.ID()] = member

}

member.AddTask(task)

}

return taskAssignments

}

func (c *Coordinator) getMemberIterator() *CircularIterator {

// 通过当前成员, 构造成员队列

members := make([]interface{}, len(c.members))

for index, member := range c.members {

members[index] = member

}

return NewCircularIterftor(members)

}

// AddMember 添加 member 组成员

func (c *Coordinator) AddMember(m *Member) bool {

for _, member := range c.members {

if member == m {

return false

}

}

c.members = append(c.members, m)

return true

}

// AddTask 添加任务

func (c *Coordinator) AddTask(t *Task) bool {

for _, task := range c.tasks {

if task == t {

return false

}

}

c.tasks = append(c.tasks, t)

return true

}

// NewCircularIterftor 返回迭代器

func NewCircularIterftor(list []interface{}) *CircularIterator {

iterator := CircularIterator{}

for _, item := range list {

iterator.Add(item)

}

return iterator

}

// NewCoordinator 返回协调器

func NewCoordinator() *Coordinator {

c := Coordinator{}

return c

}

func main() {

coordinator := NewCoordinator()

for i := 0; i i++ {

m := Member{id: i}

coordinator.AddMember(m)

}

for i := 0; i i++ {

t := Task{name: fmt.Sprintf( task %d , i)}

coordinator.AddTask(t)

}

result := coordinator.TaskAssignments()

for _, member := range result {

member.Execute()

}

}

“`

任务协调是一个非常复杂的事情,内部的任务平台,虽然实现了基于任务的组合和 app 化,但是任务调度分配着一块,仍然没有去做,只是简单的根据树形任务去简单的做一些分支任务的执行,未来有时间再做吧,要继续研究下一个模块了。

上述就是丸趣 TV 小编为大家分享的基于环状队列和迭代器如何实现分布式任务 RR 分配策略了,如果刚好有类似的疑惑,不妨参照上述分析进行理解。如果想知道更多相关知识,欢迎关注丸趣 TV 行业资讯频道。

正文完
 
丸趣
版权声明:本站原创文章,由 丸趣 2023-08-25发表,共计6353字。
转载说明:除特殊说明外本站除技术相关以外文章皆由网络搜集发布,转载请注明出处。
评论(没有评论)