mirror of
				https://codeberg.org/forgejo/forgejo.git
				synced 2025-11-04 00:11:04 +00:00 
			
		
		
		
	* Vendor: update gitea.com/macaron/session to a177a270 * make vendor * Vendor: update gitea.com/macaron/macaron to 0db5d458 * make vendor * Vendor: update gitea.com/macaron/cache to 905232fb * make vendor * Vendor: update gitea.com/macaron/i18n to 4ca3dd0c * make vendor * Vendor: update gitea.com/macaron/gzip to efa5e847 * make vendor * Vendor: update gitea.com/macaron/captcha to e8597820 * make vendor
		
			
				
	
	
		
			228 lines
		
	
	
	
		
			5.3 KiB
		
	
	
	
		
			Go
		
	
	
	
		
			Vendored
		
	
	
	
			
		
		
	
	
			228 lines
		
	
	
	
		
			5.3 KiB
		
	
	
	
		
			Go
		
	
	
	
		
			Vendored
		
	
	
	
package couchbase
 | 
						|
 | 
						|
import (
 | 
						|
	"encoding/json"
 | 
						|
	"fmt"
 | 
						|
	"github.com/couchbase/goutils/logging"
 | 
						|
	"io"
 | 
						|
	"io/ioutil"
 | 
						|
	"math/rand"
 | 
						|
	"net"
 | 
						|
	"net/http"
 | 
						|
	"time"
 | 
						|
	"unsafe"
 | 
						|
)
 | 
						|
 | 
						|
// Bucket auto-updater gets the latest version of the bucket config from
 | 
						|
// the server. If the configuration has changed then updated the local
 | 
						|
// bucket information. If the bucket has been deleted then notify anyone
 | 
						|
// who is holding a reference to this bucket
 | 
						|
 | 
						|
const MAX_RETRY_COUNT = 5
 | 
						|
const DISCONNECT_PERIOD = 120 * time.Second
 | 
						|
 | 
						|
type NotifyFn func(bucket string, err error)
 | 
						|
type StreamingFn func(bucket *Bucket)
 | 
						|
 | 
						|
// Use TCP keepalive to detect half close sockets
 | 
						|
var updaterTransport http.RoundTripper = &http.Transport{
 | 
						|
	Proxy: http.ProxyFromEnvironment,
 | 
						|
	Dial: (&net.Dialer{
 | 
						|
		Timeout:   30 * time.Second,
 | 
						|
		KeepAlive: 30 * time.Second,
 | 
						|
	}).Dial,
 | 
						|
}
 | 
						|
 | 
						|
var updaterHTTPClient = &http.Client{Transport: updaterTransport}
 | 
						|
 | 
						|
func doHTTPRequestForUpdate(req *http.Request) (*http.Response, error) {
 | 
						|
 | 
						|
	var err error
 | 
						|
	var res *http.Response
 | 
						|
 | 
						|
	for i := 0; i < HTTP_MAX_RETRY; i++ {
 | 
						|
		res, err = updaterHTTPClient.Do(req)
 | 
						|
		if err != nil && isHttpConnError(err) {
 | 
						|
			continue
 | 
						|
		}
 | 
						|
		break
 | 
						|
	}
 | 
						|
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	return res, err
 | 
						|
}
 | 
						|
 | 
						|
func (b *Bucket) RunBucketUpdater(notify NotifyFn) {
 | 
						|
	b.RunBucketUpdater2(nil, notify)
 | 
						|
}
 | 
						|
 | 
						|
func (b *Bucket) RunBucketUpdater2(streamingFn StreamingFn, notify NotifyFn) {
 | 
						|
	go func() {
 | 
						|
		err := b.UpdateBucket2(streamingFn)
 | 
						|
		if err != nil {
 | 
						|
			if notify != nil {
 | 
						|
				notify(b.GetName(), err)
 | 
						|
			}
 | 
						|
			logging.Errorf(" Bucket Updater exited with err %v", err)
 | 
						|
		}
 | 
						|
	}()
 | 
						|
}
 | 
						|
 | 
						|
func (b *Bucket) replaceConnPools2(with []*connectionPool, bucketLocked bool) {
 | 
						|
	if !bucketLocked {
 | 
						|
		b.Lock()
 | 
						|
		defer b.Unlock()
 | 
						|
	}
 | 
						|
	old := b.connPools
 | 
						|
	b.connPools = unsafe.Pointer(&with)
 | 
						|
	if old != nil {
 | 
						|
		for _, pool := range *(*[]*connectionPool)(old) {
 | 
						|
			if pool != nil && pool.inUse == false {
 | 
						|
				pool.Close()
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return
 | 
						|
}
 | 
						|
 | 
						|
func (b *Bucket) UpdateBucket() error {
 | 
						|
	return b.UpdateBucket2(nil)
 | 
						|
}
 | 
						|
 | 
						|
func (b *Bucket) UpdateBucket2(streamingFn StreamingFn) error {
 | 
						|
	var failures int
 | 
						|
	var returnErr error
 | 
						|
	var poolServices PoolServices
 | 
						|
 | 
						|
	for {
 | 
						|
 | 
						|
		if failures == MAX_RETRY_COUNT {
 | 
						|
			logging.Errorf(" Maximum failures reached. Exiting loop...")
 | 
						|
			return fmt.Errorf("Max failures reached. Last Error %v", returnErr)
 | 
						|
		}
 | 
						|
 | 
						|
		nodes := b.Nodes()
 | 
						|
		if len(nodes) < 1 {
 | 
						|
			return fmt.Errorf("No healthy nodes found")
 | 
						|
		}
 | 
						|
 | 
						|
		startNode := rand.Intn(len(nodes))
 | 
						|
		node := nodes[(startNode)%len(nodes)]
 | 
						|
 | 
						|
		streamUrl := fmt.Sprintf("http://%s/pools/default/bucketsStreaming/%s", node.Hostname, uriAdj(b.GetName()))
 | 
						|
		logging.Infof(" Trying with %s", streamUrl)
 | 
						|
		req, err := http.NewRequest("GET", streamUrl, nil)
 | 
						|
		if err != nil {
 | 
						|
			return err
 | 
						|
		}
 | 
						|
 | 
						|
		// Lock here to avoid having pool closed under us.
 | 
						|
		b.RLock()
 | 
						|
		err = maybeAddAuth(req, b.pool.client.ah)
 | 
						|
		b.RUnlock()
 | 
						|
		if err != nil {
 | 
						|
			return err
 | 
						|
		}
 | 
						|
 | 
						|
		res, err := doHTTPRequestForUpdate(req)
 | 
						|
		if err != nil {
 | 
						|
			return err
 | 
						|
		}
 | 
						|
 | 
						|
		if res.StatusCode != 200 {
 | 
						|
			bod, _ := ioutil.ReadAll(io.LimitReader(res.Body, 512))
 | 
						|
			logging.Errorf("Failed to connect to host, unexpected status code: %v. Body %s", res.StatusCode, bod)
 | 
						|
			res.Body.Close()
 | 
						|
			returnErr = fmt.Errorf("Failed to connect to host. Status %v Body %s", res.StatusCode, bod)
 | 
						|
			failures++
 | 
						|
			continue
 | 
						|
		}
 | 
						|
 | 
						|
		dec := json.NewDecoder(res.Body)
 | 
						|
 | 
						|
		tmpb := &Bucket{}
 | 
						|
		for {
 | 
						|
 | 
						|
			err := dec.Decode(&tmpb)
 | 
						|
			if err != nil {
 | 
						|
				returnErr = err
 | 
						|
				res.Body.Close()
 | 
						|
				break
 | 
						|
			}
 | 
						|
 | 
						|
			// if we got here, reset failure count
 | 
						|
			failures = 0
 | 
						|
 | 
						|
			if b.pool.client.tlsConfig != nil {
 | 
						|
				poolServices, err = b.pool.client.GetPoolServices("default")
 | 
						|
				if err != nil {
 | 
						|
					returnErr = err
 | 
						|
					res.Body.Close()
 | 
						|
					break
 | 
						|
				}
 | 
						|
			}
 | 
						|
 | 
						|
			b.Lock()
 | 
						|
 | 
						|
			// mark all the old connection pools for deletion
 | 
						|
			pools := b.getConnPools(true /* already locked */)
 | 
						|
			for _, pool := range pools {
 | 
						|
				if pool != nil {
 | 
						|
					pool.inUse = false
 | 
						|
				}
 | 
						|
			}
 | 
						|
 | 
						|
			newcps := make([]*connectionPool, len(tmpb.VBSMJson.ServerList))
 | 
						|
			for i := range newcps {
 | 
						|
				// get the old connection pool and check if it is still valid
 | 
						|
				pool := b.getConnPoolByHost(tmpb.VBSMJson.ServerList[i], true /* bucket already locked */)
 | 
						|
				if pool != nil && pool.inUse == false && pool.tlsConfig == b.pool.client.tlsConfig {
 | 
						|
					// if the hostname and index is unchanged then reuse this pool
 | 
						|
					newcps[i] = pool
 | 
						|
					pool.inUse = true
 | 
						|
					continue
 | 
						|
				}
 | 
						|
				// else create a new pool
 | 
						|
				var encrypted bool
 | 
						|
				hostport := tmpb.VBSMJson.ServerList[i]
 | 
						|
				if b.pool.client.tlsConfig != nil {
 | 
						|
					hostport, encrypted, err = MapKVtoSSL(hostport, &poolServices)
 | 
						|
					if err != nil {
 | 
						|
						b.Unlock()
 | 
						|
						return err
 | 
						|
					}
 | 
						|
				}
 | 
						|
				if b.ah != nil {
 | 
						|
					newcps[i] = newConnectionPool(hostport,
 | 
						|
						b.ah, false, PoolSize, PoolOverflow, b.pool.client.tlsConfig, b.Name, encrypted)
 | 
						|
 | 
						|
				} else {
 | 
						|
					newcps[i] = newConnectionPool(hostport,
 | 
						|
						b.authHandler(true /* bucket already locked */),
 | 
						|
						false, PoolSize, PoolOverflow, b.pool.client.tlsConfig, b.Name, encrypted)
 | 
						|
				}
 | 
						|
			}
 | 
						|
 | 
						|
			b.replaceConnPools2(newcps, true /* bucket already locked */)
 | 
						|
 | 
						|
			tmpb.ah = b.ah
 | 
						|
			b.vBucketServerMap = unsafe.Pointer(&tmpb.VBSMJson)
 | 
						|
			b.nodeList = unsafe.Pointer(&tmpb.NodesJSON)
 | 
						|
			b.Unlock()
 | 
						|
 | 
						|
			if streamingFn != nil {
 | 
						|
				streamingFn(tmpb)
 | 
						|
			}
 | 
						|
			logging.Debugf("Got new configuration for bucket %s", b.GetName())
 | 
						|
 | 
						|
		}
 | 
						|
		// we are here because of an error
 | 
						|
		failures++
 | 
						|
		continue
 | 
						|
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
}
 |