This commit is contained in:
Sun
2023-11-08 21:53:07 +08:00
commit 211c3071dc
245 changed files with 39293 additions and 0 deletions
+21
View File
@@ -0,0 +1,21 @@
package queue
// 队列器
type Queuer interface {
// 左侧插入
LPush(value ...interface{}) error
// 右侧插入
RPush(value ...interface{}) error
// 删除元素
Delete(value interface{}) error
// 使用下标获取值
GetByIndex(index int64, v interface{}) error
// 左侧读取并删除
LPop(v interface{}) error
// 右侧读取并删除
RPop(v interface{}) error
// 队列长度
Length() (int64, error)
// 清空队列
Flush() error
}
+105
View File
@@ -0,0 +1,105 @@
package queueMemory
import (
"encoding/json"
"errors"
"reflect"
"sync"
)
type Pool struct {
Values [][]byte
Lock sync.RWMutex
}
func New() *Pool {
return &Pool{}
}
func (k *Pool) LPush(value ...interface{}) error {
k.Lock.Lock()
defer k.Lock.Unlock()
for i := 0; i < len(value); i++ {
v, _ := json.Marshal(value[i])
k.Values = append([][]byte{v}, k.Values...)
}
return nil
}
func (k *Pool) RPush(value ...interface{}) error {
k.Lock.Lock()
defer k.Lock.Unlock()
for i := 0; i < len(value); i++ {
v, _ := json.Marshal(value[i])
k.Values = append(k.Values, v)
}
return nil
}
func (k *Pool) Delete(value interface{}) error {
k.Lock.Lock()
defer k.Lock.Unlock()
var index int64
v, _ := json.Marshal(value)
for i, item := range k.Values {
if reflect.DeepEqual(item, v) {
index = int64(i)
k.removeIndex(index)
return nil
}
}
return nil
}
// 取出值
func (k *Pool) GetByIndex(index int64, v interface{}) error {
k.Lock.RLock()
defer k.Lock.RUnlock()
if int64(len(k.Values)) >= index {
json.Unmarshal(k.Values[index], v)
return nil
} else {
return errors.New("index non-existent")
}
}
// 左-取出并删除
func (k *Pool) LPop(v interface{}) error {
if err := k.GetByIndex(0, v); err != nil {
return err
} else {
k.removeIndex(0)
}
return nil
}
// 右-取出并删除
func (k *Pool) RPop(v interface{}) error {
index := int64(len(k.Values) - 1)
if err := k.GetByIndex(index, v); err != nil {
return err
} else {
k.removeIndex(index)
}
return nil
}
func (k *Pool) removeIndex(index int64) error {
k.Lock.Lock()
defer k.Lock.Unlock()
k.Values = append(k.Values[:index], k.Values[index+1:]...)
return nil
}
func (k *Pool) Length() (int64, error) {
return int64(len(k.Values)), nil
}
func (k *Pool) Flush() error {
k.Lock.Lock()
defer k.Lock.Unlock()
k.Values = nil
return nil
}
+96
View File
@@ -0,0 +1,96 @@
package queueRedis
import (
"context"
"encoding/json"
"github.com/redis/go-redis/v9"
)
type Pool struct {
Ctx context.Context
Redis *redis.Client
Name string
}
func New(redisDb *redis.Client, name string) *Pool {
ctx := context.Background()
return &Pool{
Ctx: ctx,
Redis: redisDb,
Name: name,
}
}
func (k *Pool) LPush(value ...interface{}) error {
var values []interface{}
for _, v := range value {
values = append(values, k.encode(v))
}
return k.Redis.LPush(k.Ctx, k.Name, values...).Err()
}
func (k *Pool) RPush(value ...interface{}) error {
var values []interface{}
for _, v := range value {
values = append(values, k.encode(v))
}
return k.Redis.RPush(k.Ctx, k.Name, values...).Err()
}
func (k *Pool) Delete(value interface{}) error {
return k.Redis.LRem(k.Ctx, k.Name, 0, k.encode(value)).Err()
}
// 取出值
func (k *Pool) GetByIndex(index int64, v interface{}) error {
if d, err := k.Redis.LIndex(k.Ctx, k.Name, index).Result(); err != nil {
return err
} else {
k.decode(d, v)
return nil
}
}
// 左-取出并删除
func (k *Pool) LPop(v interface{}) error {
if d, err := k.Redis.LPop(k.Ctx, k.Name).Result(); err != nil {
return err
} else {
k.decode(d, v)
return nil
}
}
// 右-取出并删除
func (k *Pool) RPop(v interface{}) error {
if d, err := k.Redis.RPop(k.Ctx, k.Name).Result(); err != nil {
return err
} else {
k.decode(d, v)
return nil
}
}
func (k *Pool) encode(value any) string {
data, err := json.Marshal(value)
if err != nil {
return "{}"
}
return string(data)
}
func (k *Pool) decode(v string, value interface{}) {
err := json.Unmarshal([]byte(v), value)
_ = err
}
func (k *Pool) Length() (int64, error) {
r := k.Redis.LLen(k.Ctx, k.Name)
return r.Result()
}
func (k *Pool) Flush() error {
return k.Redis.Del(k.Ctx, k.Name).Err()
}