tencent cloud

腾讯云分布式缓存数据库(兼容 Redis)

动态与公告
产品动态
公告
新手指引
产品简介
产品概述
产品优势
应用场景
存储引擎
产品系列
产品版本
规格与性能
读写分离
多可用区部署
地域和可用区
名词解释
购买指南
计费概述
定价中心
购买实例
续费说明(包年包月)
退费说明(包年包月)
欠费说明
按量转包年包月
快速入门
快速创建实例
连接实例(Redis/Valkey 版)
操作指南
操作总览
连接数据库实例
管理实例
升级实例
管理节点(Redis/ValKey 版)
管理多可用区
备份与恢复
账号管理
参数配置
慢查询
访问管理
网络与安全
监控与告警
事件管理(Redis/ValKey 版)
数据迁移
Redis 版全球复制
数据库审计
诊断优化
Sentinel 模式
开发准则
命名规则
基本使用准则
Key 与 Value 设计原则
命令使用准则
客户端程序设计准则
连接池配置
命令参考
命令参考概览
Redis 版与 Valkey 版命令兼容性
大版本命令使用差异
Proxy 架构与直连模式的使用差异
命令更多操作(Redis/Valkey 版)
Memcached 版命令兼容性
实践教程
基于 Spring Boot 搭建 Redis 客户端监控
Redis 客户端连接配置策略与实践
集群架构全局 SCAN 使用指南
实例安全下线
热 Key 与 大 key
可用区迁移方案
故障处理
连接异常
Redisson 客户端超时重连异常分析及解决方案
性能排查与调优
API 文档
History
Introduction
API Category
Making API Requests
Instance APIs
Parameter Management APIs
Other APIs
Backup and Restoration APIs
Region APIs
Monitoring and Management APIs
Log APIs
Data Types
Error Codes
常见问题
使用常见问题
连接登录问题
购买相关问题
相关协议
服务等级协议
Terms of Service
词汇表
联系我们

Redigo 连接池使用建议

PDF
聚焦模式
字号
最后更新时间: 2024-11-05 09:39:24
redigo 官网连接池 仅支持从队头拿连接。从队头放回已用连接,导致最热连接一直被使用,不轮询每一个连接,总会引起 Redis 的 Proxy 出现连接或是负载不均衡的问题。建议修改源代码中的 pool.go 文件,增加 pushBack 方法,将已使用的连接加入在队尾。

增加 pushBack 代码示例

// idle connect push list back
func (l *idleList) pushBack(pc *poolConn) {

if l.count == 0 {
l.front = pc
l.back = pc
pc.prev = nil
pc.next = nil
} else {
pc.prev = l.back
l.back.next = pc
l.back = pc
pc.next = nil
}

l.count++
}

// idle connection in the pool method, True: pushBack, False: pushFront, default False
Lifo bool

// fix add pushBack
if p.Lifo == true {
p.idle.pushBack(pc)
} else {
p.idle.pushFront(pc)
}

完整代码示例

====================整个代码修改后pool.go=========================

// Copyright 2012 Gary Burd
//
// Licensed under the Apache License, Version 2.0 (the "License"): you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations
// under the License.

package redis

import (
"bytes"
"context"
"crypto/rand"
"crypto/sha1"
"errors"
"io"
"strconv"
"sync"
"time"
)

var (
_ ConnWithTimeout = (*activeConn)(nil)
_ ConnWithTimeout = (*errorConn)(nil)
)

var nowFunc = time.Now // for testing

// ErrPoolExhausted is returned from a pool connection method (Do, Send,
// Receive, Flush, Err) when the maximum number of database connections in the
// pool has been reached.
var ErrPoolExhausted = errors.New("redigo: connection pool exhausted")

var (
errConnClosed = errors.New("redigo: connection closed")
)

// Pool maintains a pool of connections. The application calls the Get method
// to get a connection from the pool and the connection's Close method to
// return the connection's resources to the pool.
//
// The following example shows how to use a pool in a web application. The
// application creates a pool at application startup and makes it available to
// request handlers using a package level variable. The pool configuration used
// here is an example, not a recommendation.
//
// func newPool(addr string) *redis.Pool {
// return &redis.Pool{
// MaxIdle: 3,
// IdleTimeout: 240 * time.Second,
// // Dial or DialContext must be set. When both are set, DialContext takes precedence over Dial.
// Dial: func () (redis.Conn, error) { return redis.Dial("tcp", addr) },
// }
// }
//
// var (
// pool *redis.Pool
// redisServer = flag.String("redisServer", ":6379", "")
// )
//
// func main() {
// flag.Parse()
// pool = newPool(*redisServer)
// ...
// }
//
// A request handler gets a connection from the pool and closes the connection
// when the handler is done:
//
// func serveHome(w http.ResponseWriter, r *http.Request) {
// conn := pool.Get()
// defer conn.Close()
// ...
// }
//
// Use the Dial function to authenticate connections with the AUTH command or
// select a database with the SELECT command:
//
// pool := &redis.Pool{
// // Other pool configuration not shown in this example.
// Dial: func () (redis.Conn, error) {
// c, err := redis.Dial("tcp", server)
// if err != nil {
// return nil, err
// }
// if _, err := c.Do("AUTH", password); err != nil {
// c.Close()
// return nil, err
// }
// if _, err := c.Do("SELECT", db); err != nil {
// c.Close()
// return nil, err
// }
// return c, nil
// },
// }
//
// Use the TestOnBorrow function to check the health of an idle connection
// before the connection is returned to the application. This example PINGs
// connections that have been idle more than a minute:
//
// pool := &redis.Pool{
// // Other pool configuration not shown in this example.
// TestOnBorrow: func(c redis.Conn, t time.Time) error {
// if time.Since(t) < time.Minute {
// return nil
// }
// _, err := c.Do("PING")
// return err
// },
// }
//
type Pool struct {
// Dial is an application supplied function for creating and configuring a
// connection.
//
// The connection returned from Dial must not be in a special state
// (subscribed to pubsub channel, transaction started, ...).
Dial func() (Conn, error)

// DialContext is an application supplied function for creating and configuring a
// connection with the given context.
//
// The connection returned from Dial must not be in a special state
// (subscribed to pubsub channel, transaction started, ...).
DialContext func(ctx context.Context) (Conn, error)

// TestOnBorrow is an optional application supplied function for checking
// the health of an idle connection before the connection is used again by
// the application. Argument t is the time that the connection was returned
// to the pool. If the function returns an error, then the connection is
// closed.
TestOnBorrow func(c Conn, t time.Time) error

// Maximum number of idle connections in the pool.
MaxIdle int

// idle connection in the pool method, True: pushBack, False: pushFront, default False
Lifo bool

// Maximum number of connections allocated by the pool at a given time.
// When zero, there is no limit on the number of connections in the pool.
MaxActive int

// Close connections after remaining idle for this duration. If the value
// is zero, then idle connections are not closed. Applications should set
// the timeout to a value less than the server's timeout.
IdleTimeout time.Duration

// If Wait is true and the pool is at the MaxActive limit, then Get() waits
// for a connection to be returned to the pool before returning.
Wait bool

// Close connections older than this duration. If the value is zero, then
// the pool does not close connections based on age.
MaxConnLifetime time.Duration

mu sync.Mutex // mu protects the following fields
closed bool // set to true when the pool is closed.
active int // the number of open connections in the pool
initOnce sync.Once // the init ch once func
ch chan struct{} // limits open connections when p.Wait is true
idle idleList // idle connections
waitCount int64 // total number of connections waited for.
waitDuration time.Duration // total time waited for new connections.
}

// NewPool creates a new pool.
//
// Deprecated: Initialize the Pool directly as shown in the example.
func NewPool(newFn func() (Conn, error), maxIdle int) *Pool {
return &Pool{Dial: newFn, MaxIdle: maxIdle}
}

// Get gets a connection. The application must close the returned connection.
// This method always returns a valid connection so that applications can defer
// error handling to the first use of the connection. If there is an error
// getting an underlying connection, then the connection Err, Do, Send, Flush
// and Receive methods return that error.
func (p *Pool) Get() Conn {
// GetContext returns errorConn in the first argument when an error occurs.
c, _ := p.GetContext(context.Background())
return c
}

// GetContext gets a connection using the provided context.
//
// The provided Context must be non-nil. If the context expires before the
// connection is complete, an error is returned. Any expiration on the context
// will not affect the returned connection.
//
// If the function completes without error, then the application must close the
// returned connection.
func (p *Pool) GetContext(ctx context.Context) (Conn, error) {
// Wait until there is a vacant connection in the pool.
waited, err := p.waitVacantConn(ctx)
if err != nil {
return errorConn{err}, err
}

p.mu.Lock()

if waited > 0 {
p.waitCount++
p.waitDuration += waited
}

// Prune stale connections at the back of the idle list.
if p.IdleTimeout > 0 {
n := p.idle.count
for i := 0; i < n && p.idle.back != nil && p.idle.back.t.Add(p.IdleTimeout).Before(nowFunc()); i++ {
pc := p.idle.back
p.idle.popBack()
p.mu.Unlock()
pc.c.Close()
p.mu.Lock()
p.active--
}
}

// Get idle connection from the front of idle list.
for p.idle.front != nil {
pc := p.idle.front
p.idle.popFront()
p.mu.Unlock()
if (p.TestOnBorrow == nil || p.TestOnBorrow(pc.c, pc.t) == nil) &&
(p.MaxConnLifetime == 0 || nowFunc().Sub(pc.created) < p.MaxConnLifetime) {
return &activeConn{p: p, pc: pc}, nil
}
pc.c.Close()
p.mu.Lock()
p.active--
}

// Check for pool closed before dialing a new connection.
if p.closed {
p.mu.Unlock()
err := errors.New("redigo: get on closed pool")
return errorConn{err}, err
}

// Handle limit for p.Wait == false.
if !p.Wait && p.MaxActive > 0 && p.active >= p.MaxActive {
p.mu.Unlock()
return errorConn{ErrPoolExhausted}, ErrPoolExhausted
}

p.active++
p.mu.Unlock()
c, err := p.dial(ctx)
if err != nil {
p.mu.Lock()
p.active--
if p.ch != nil && !p.closed {
p.ch <- struct{}{}
}
p.mu.Unlock()
return errorConn{err}, err
}
return &activeConn{p: p, pc: &poolConn{c: c, created: nowFunc()}}, nil
}

// PoolStats contains pool statistics.
type PoolStats struct {
// ActiveCount is the number of connections in the pool. The count includes
// idle connections and connections in use.
ActiveCount int
// IdleCount is the number of idle connections in the pool.
IdleCount int

// WaitCount is the total number of connections waited for.
// This value is currently not guaranteed to be 100% accurate.
WaitCount int64

// WaitDuration is the total time blocked waiting for a new connection.
// This value is currently not guaranteed to be 100% accurate.
WaitDuration time.Duration
}

// Stats returns pool's statistics.
func (p *Pool) Stats() PoolStats {
p.mu.Lock()
stats := PoolStats{
ActiveCount: p.active,
IdleCount: p.idle.count,
WaitCount: p.waitCount,
WaitDuration: p.waitDuration,
}
p.mu.Unlock()

return stats
}

// ActiveCount returns the number of connections in the pool. The count
// includes idle connections and connections in use.
func (p *Pool) ActiveCount() int {
p.mu.Lock()
active := p.active
p.mu.Unlock()
return active
}

// IdleCount returns the number of idle connections in the pool.
func (p *Pool) IdleCount() int {
p.mu.Lock()
idle := p.idle.count
p.mu.Unlock()
return idle
}

// Close releases the resources used by the pool.
func (p *Pool) Close() error {
p.mu.Lock()
if p.closed {
p.mu.Unlock()
return nil
}
p.closed = true
p.active -= p.idle.count
pc := p.idle.front
p.idle.count = 0
p.idle.front, p.idle.back = nil, nil
if p.ch != nil {
close(p.ch)
}
p.mu.Unlock()
for ; pc != nil; pc = pc.next {
pc.c.Close()
}
return nil
}

func (p *Pool) lazyInit() {
p.initOnce.Do(func() {
p.ch = make(chan struct{}, p.MaxActive)
if p.closed {
close(p.ch)
} else {
for i := 0; i < p.MaxActive; i++ {
p.ch <- struct{}{}
}
}
})
}

// waitVacantConn waits for a vacant connection in pool if waiting
// is enabled and pool size is limited, otherwise returns instantly.
// If ctx expires before that, an error is returned.
//
// If there were no vacant connection in the pool right away it returns the time spent waiting
// for that connection to appear in the pool.
func (p *Pool) waitVacantConn(ctx context.Context) (waited time.Duration, err error) {
if !p.Wait || p.MaxActive <= 0 {
// No wait or no connection limit.
return 0, nil
}

p.lazyInit()

// wait indicates if we believe it will block so its not 100% accurate
// however for stats it should be good enough.
wait := len(p.ch) == 0
var start time.Time
if wait {
start = time.Now()
}

select {
case <-p.ch:
// Additionally check that context hasn't expired while we were waiting,
// because `select` picks a random `case` if several of them are "ready".
select {
case <-ctx.Done():
p.ch <- struct{}{}
return 0, ctx.Err()
default:
}
case <-ctx.Done():
return 0, ctx.Err()
}

if wait {
return time.Since(start), nil
}
return 0, nil
}

func (p *Pool) dial(ctx context.Context) (Conn, error) {
if p.DialContext != nil {
return p.DialContext(ctx)
}
if p.Dial != nil {
return p.Dial()
}
return nil, errors.New("redigo: must pass Dial or DialContext to pool")
}

func (p *Pool) put(pc *poolConn, forceClose bool) error {
p.mu.Lock()
if !p.closed && !forceClose {
pc.t = nowFunc()

// fix add pushBack
if p.Lifo == true {
p.idle.pushBack(pc)
} else {
p.idle.pushFront(pc)
}

if p.idle.count > p.MaxIdle {
pc = p.idle.back
p.idle.popBack()
} else {
pc = nil
}
}

if pc != nil {
p.mu.Unlock()
pc.c.Close()
p.mu.Lock()
p.active--
}

if p.ch != nil && !p.closed {
p.ch <- struct{}{}
}
p.mu.Unlock()
return nil
}

type activeConn struct {
p *Pool
pc *poolConn
state int
}

var (
sentinel []byte
sentinelOnce sync.Once
)

func initSentinel() {
p := make([]byte, 64)
if _, err := rand.Read(p); err == nil {
sentinel = p
} else {
h := sha1.New()
io.WriteString(h, "Oops, rand failed. Use time instead.") // nolint: errcheck
io.WriteString(h, strconv.FormatInt(time.Now().UnixNano(), 10)) // nolint: errcheck
sentinel = h.Sum(nil)
}
}

func (ac *activeConn) firstError(errs ...error) error {
for _, err := range errs[:len(errs)-1] {
if err != nil {
return err
}
}
return errs[len(errs)-1]
}

func (ac *activeConn) Close() (err error) {
pc := ac.pc
if pc == nil {
return nil
}
ac.pc = nil

if ac.state&connectionMultiState != 0 {
err = pc.c.Send("DISCARD")
ac.state &^= (connectionMultiState | connectionWatchState)
} else if ac.state&connectionWatchState != 0 {
err = pc.c.Send("UNWATCH")
ac.state &^= connectionWatchState
}
if ac.state&connectionSubscribeState != 0 {
err = ac.firstError(err,
pc.c.Send("UNSUBSCRIBE"),
pc.c.Send("PUNSUBSCRIBE"),
)
// To detect the end of the message stream, ask the server to echo
// a sentinel value and read until we see that value.
sentinelOnce.Do(initSentinel)
err = ac.firstError(err,
pc.c.Send("ECHO", sentinel),
pc.c.Flush(),
)
for {
p, err2 := pc.c.Receive()
if err2 != nil {
err = ac.firstError(err, err2)
break
}
if p, ok := p.([]byte); ok && bytes.Equal(p, sentinel) {
ac.state &^= connectionSubscribeState
break
}
}
}
_, err2 := pc.c.Do("")
return ac.firstError(
err,
err2,
ac.p.put(pc, ac.state != 0 || pc.c.Err() != nil),
)
}

func (ac *activeConn) Err() error {
pc := ac.pc
if pc == nil {
return errConnClosed
}
return pc.c.Err()
}

func (ac *activeConn) Do(commandName string, args ...interface{}) (reply interface{}, err error) {
pc := ac.pc
if pc == nil {
return nil, errConnClosed
}
ci := lookupCommandInfo(commandName)
ac.state = (ac.state | ci.Set) &^ ci.Clear
return pc.c.Do(commandName, args...)
}

func (ac *activeConn) DoWithTimeout(timeout time.Duration, commandName string, args ...interface{}) (reply interface{}, err error) {
pc := ac.pc
if pc == nil {
return nil, errConnClosed
}
cwt, ok := pc.c.(ConnWithTimeout)
if !ok {
return nil, errTimeoutNotSupported
}
ci := lookupCommandInfo(commandName)
ac.state = (ac.state | ci.Set) &^ ci.Clear
return cwt.DoWithTimeout(timeout, commandName, args...)
}

func (ac *activeConn) Send(commandName string, args ...interface{}) error {
pc := ac.pc
if pc == nil {
return errConnClosed
}
ci := lookupCommandInfo(commandName)
ac.state = (ac.state | ci.Set) &^ ci.Clear
return pc.c.Send(commandName, args...)
}

func (ac *activeConn) Flush() error {
pc := ac.pc
if pc == nil {
return errConnClosed
}
return pc.c.Flush()
}

func (ac *activeConn) Receive() (reply interface{}, err error) {
pc := ac.pc
if pc == nil {
return nil, errConnClosed
}
return pc.c.Receive()
}

func (ac *activeConn) ReceiveWithTimeout(timeout time.Duration) (reply interface{}, err error) {
pc := ac.pc
if pc == nil {
return nil, errConnClosed
}
cwt, ok := pc.c.(ConnWithTimeout)
if !ok {
return nil, errTimeoutNotSupported
}
return cwt.ReceiveWithTimeout(timeout)
}

type errorConn struct{ err error }

func (ec errorConn) Do(string, ...interface{}) (interface{}, error) { return nil, ec.err }
func (ec errorConn) DoWithTimeout(time.Duration, string, ...interface{}) (interface{}, error) {
return nil, ec.err
}
func (ec errorConn) Send(string, ...interface{}) error { return ec.err }
func (ec errorConn) Err() error { return ec.err }
func (ec errorConn) Close() error { return nil }
func (ec errorConn) Flush() error { return ec.err }
func (ec errorConn) Receive() (interface{}, error) { return nil, ec.err }
func (ec errorConn) ReceiveWithTimeout(time.Duration) (interface{}, error) { return nil, ec.err }

type idleList struct {
count int
front, back *poolConn
}

type poolConn struct {
c Conn
t time.Time
created time.Time
next, prev *poolConn
}

func (l *idleList) pushFront(pc *poolConn) {
pc.next = l.front
pc.prev = nil
if l.count == 0 {
l.back = pc
} else {
l.front.prev = pc
}
l.front = pc
l.count++
}


// idle connect push list back
func (l *idleList) pushBack(pc *poolConn) {

if l.count == 0 {
l.front = pc
l.back = pc
pc.prev = nil
pc.next = nil
} else {
pc.prev = l.back
l.back.next = pc
l.back = pc
pc.next = nil
}

l.count++
}


func (l *idleList) popFront() {
pc := l.front
l.count--
if l.count == 0 {
l.front, l.back = nil, nil
} else {
pc.next.prev = nil
l.front = pc.next
}
pc.next, pc.prev = nil, nil
}

func (l *idleList) popBack() {
pc := l.back
l.count--
if l.count == 0 {
l.front, l.back = nil, nil
} else {
pc.prev.next = nil
l.back = pc.prev
}
pc.next, pc.prev = nil, nil
}
==================================================

初始方法:

// 建立连接池
redisClient = &redis.Pool{
MaxIdle: maxIdle,
MaxActive: maxActive,
IdleTimeout: MaxIdleTimeout * time.Second,
Wait: true,
Lifo: true, # 设置为true,这是重点
Dial: func() (redis.Conn, error) {
con, err := redis.Dial("tcp", conf["Host"].(string),
redis.DialPassword(conf["Password"].(string)),
redis.DialDatabase(int(conf["Db"].(int64))),
redis.DialConnectTimeout(timeout*time.Second),
redis.DialReadTimeout(timeout*time.Second),
redis.DialWriteTimeout(timeout*time.Second))
if err != nil {
return nil, err
}
return con, nil
},
}


帮助和支持

本页内容是否解决了您的问题?

填写满意度调查问卷,共创更好文档体验。

文档反馈