mirror of
				https://codeberg.org/forgejo/forgejo.git
				synced 2025-11-04 08:21:11 +00:00 
			
		
		
		
	* Dropped unused codekit config * Integrated dynamic and static bindata for public * Ignore public bindata * Add a general generate make task * Integrated flexible public assets into web command * Updated vendoring, added all missiong govendor deps * Made the linter happy with the bindata and dynamic code * Moved public bindata definition to modules directory * Ignoring the new bindata path now * Updated to the new public modules import path * Updated public bindata command and drop the new prefix
		
			
				
	
	
		
			101 lines
		
	
	
	
		
			2.4 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			101 lines
		
	
	
	
		
			2.4 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
package hbase
 | 
						|
 | 
						|
import (
 | 
						|
	pb "github.com/golang/protobuf/proto"
 | 
						|
	"github.com/juju/errors"
 | 
						|
	"github.com/ngaut/log"
 | 
						|
	"github.com/pingcap/go-hbase/proto"
 | 
						|
)
 | 
						|
 | 
						|
type action interface {
 | 
						|
	ToProto() pb.Message
 | 
						|
}
 | 
						|
 | 
						|
func (c *client) innerCall(table, row []byte, action action, useCache bool) (*call, error) {
 | 
						|
	region, err := c.LocateRegion(table, row, useCache)
 | 
						|
	if err != nil {
 | 
						|
		return nil, errors.Trace(err)
 | 
						|
	}
 | 
						|
 | 
						|
	conn, err := c.getClientConn(region.Server)
 | 
						|
	if err != nil {
 | 
						|
		return nil, errors.Trace(err)
 | 
						|
	}
 | 
						|
 | 
						|
	regionSpecifier := &proto.RegionSpecifier{
 | 
						|
		Type:  proto.RegionSpecifier_REGION_NAME.Enum(),
 | 
						|
		Value: []byte(region.Name),
 | 
						|
	}
 | 
						|
 | 
						|
	var cl *call
 | 
						|
	switch a := action.(type) {
 | 
						|
	case *Get:
 | 
						|
		cl = newCall(&proto.GetRequest{
 | 
						|
			Region: regionSpecifier,
 | 
						|
			Get:    a.ToProto().(*proto.Get),
 | 
						|
		})
 | 
						|
	case *Put, *Delete:
 | 
						|
		cl = newCall(&proto.MutateRequest{
 | 
						|
			Region:   regionSpecifier,
 | 
						|
			Mutation: a.ToProto().(*proto.MutationProto),
 | 
						|
		})
 | 
						|
 | 
						|
	case *CoprocessorServiceCall:
 | 
						|
		cl = newCall(&proto.CoprocessorServiceRequest{
 | 
						|
			Region: regionSpecifier,
 | 
						|
			Call:   a.ToProto().(*proto.CoprocessorServiceCall),
 | 
						|
		})
 | 
						|
	default:
 | 
						|
		return nil, errors.Errorf("Unknown action - %T - %v", action, action)
 | 
						|
	}
 | 
						|
 | 
						|
	err = conn.call(cl)
 | 
						|
	if err != nil {
 | 
						|
		// If failed, remove bad server conn cache.
 | 
						|
		cachedKey := cachedConnKey(region.Server, ClientService)
 | 
						|
		delete(c.cachedConns, cachedKey)
 | 
						|
		return nil, errors.Trace(err)
 | 
						|
	}
 | 
						|
 | 
						|
	return cl, nil
 | 
						|
}
 | 
						|
 | 
						|
func (c *client) innerDo(table, row []byte, action action, useCache bool) (pb.Message, error) {
 | 
						|
	// Try to create and send a new resuqest call.
 | 
						|
	cl, err := c.innerCall(table, row, action, useCache)
 | 
						|
	if err != nil {
 | 
						|
		log.Warnf("inner call failed - %v", errors.ErrorStack(err))
 | 
						|
		return nil, errors.Trace(err)
 | 
						|
	}
 | 
						|
 | 
						|
	// Wait and receive the result.
 | 
						|
	return <-cl.responseCh, nil
 | 
						|
}
 | 
						|
 | 
						|
func (c *client) do(table, row []byte, action action, useCache bool) (pb.Message, error) {
 | 
						|
	var (
 | 
						|
		result pb.Message
 | 
						|
		err    error
 | 
						|
	)
 | 
						|
 | 
						|
LOOP:
 | 
						|
	for i := 0; i < c.maxRetries; i++ {
 | 
						|
		result, err = c.innerDo(table, row, action, useCache)
 | 
						|
		if err == nil {
 | 
						|
			switch r := result.(type) {
 | 
						|
			case *exception:
 | 
						|
				err = errors.New(r.msg)
 | 
						|
				// If get an execption response, clean old region cache.
 | 
						|
				c.CleanRegionCache(table)
 | 
						|
			default:
 | 
						|
				break LOOP
 | 
						|
			}
 | 
						|
		}
 | 
						|
 | 
						|
		useCache = false
 | 
						|
		log.Warnf("Retrying action for the %d time(s), error - %v", i+1, errors.ErrorStack(err))
 | 
						|
		retrySleep(i + 1)
 | 
						|
	}
 | 
						|
 | 
						|
	return result, errors.Trace(err)
 | 
						|
}
 |