mirror of
				https://codeberg.org/forgejo/forgejo.git
				synced 2025-11-04 08:21:11 +00:00 
			
		
		
		
	* Dropped unused codekit config * Integrated dynamic and static bindata for public * Ignore public bindata * Add a general generate make task * Integrated flexible public assets into web command * Updated vendoring, added all missiong govendor deps * Made the linter happy with the bindata and dynamic code * Moved public bindata definition to modules directory * Ignoring the new bindata path now * Updated to the new public modules import path * Updated public bindata command and drop the new prefix
		
			
				
	
	
		
			397 lines
		
	
	
	
		
			8.1 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			397 lines
		
	
	
	
		
			8.1 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
package hbase
 | 
						|
 | 
						|
import (
 | 
						|
	"bytes"
 | 
						|
 | 
						|
	pb "github.com/golang/protobuf/proto"
 | 
						|
	"github.com/juju/errors"
 | 
						|
	"github.com/ngaut/log"
 | 
						|
	"github.com/pingcap/go-hbase/proto"
 | 
						|
)
 | 
						|
 | 
						|
// nextKey returns the next key in byte-order.
 | 
						|
// for example:
 | 
						|
// nil -> [0]
 | 
						|
// [] -> [0]
 | 
						|
// [0] -> [1]
 | 
						|
// [1, 2, 3] -> [1, 2, 4]
 | 
						|
// [1, 255] -> [2, 0]
 | 
						|
// [255] -> [0, 0]
 | 
						|
func nextKey(data []byte) []byte {
 | 
						|
	// nil or []byte{}
 | 
						|
	dataLen := len(data)
 | 
						|
	if dataLen == 0 {
 | 
						|
		return []byte{0}
 | 
						|
	}
 | 
						|
 | 
						|
	// Check and process carry bit.
 | 
						|
	i := dataLen - 1
 | 
						|
	data[i]++
 | 
						|
	for i > 0 {
 | 
						|
		if data[i] == 0 {
 | 
						|
			i--
 | 
						|
			data[i]++
 | 
						|
		} else {
 | 
						|
			break
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	// Check whether need to add another byte for carry bit,
 | 
						|
	// like [255] -> [0, 0]
 | 
						|
	if data[i] == 0 {
 | 
						|
		data = append([]byte{0}, data...)
 | 
						|
	}
 | 
						|
 | 
						|
	return data
 | 
						|
}
 | 
						|
 | 
						|
const (
 | 
						|
	defaultScanMaxRetries = 3
 | 
						|
)
 | 
						|
 | 
						|
type Scan struct {
 | 
						|
	client *client
 | 
						|
	id     uint64
 | 
						|
	table  []byte
 | 
						|
	// row key
 | 
						|
	StartRow     []byte
 | 
						|
	StopRow      []byte
 | 
						|
	families     [][]byte
 | 
						|
	qualifiers   [][][]byte
 | 
						|
	nextStartKey []byte
 | 
						|
	numCached    int
 | 
						|
	closed       bool
 | 
						|
	location     *RegionInfo
 | 
						|
	server       *connection
 | 
						|
	cache        []*ResultRow
 | 
						|
	attrs        map[string][]byte
 | 
						|
	MaxVersions  uint32
 | 
						|
	TsRangeFrom  uint64
 | 
						|
	TsRangeTo    uint64
 | 
						|
	lastResult   *ResultRow
 | 
						|
	// if region split, set startKey = lastResult.Row, but must skip the first
 | 
						|
	skipFirst  bool
 | 
						|
	maxRetries int
 | 
						|
}
 | 
						|
 | 
						|
func NewScan(table []byte, batchSize int, c HBaseClient) *Scan {
 | 
						|
	if batchSize <= 0 {
 | 
						|
		batchSize = 100
 | 
						|
	}
 | 
						|
	return &Scan{
 | 
						|
		client:       c.(*client),
 | 
						|
		table:        table,
 | 
						|
		nextStartKey: nil,
 | 
						|
		families:     make([][]byte, 0),
 | 
						|
		qualifiers:   make([][][]byte, 0),
 | 
						|
		numCached:    batchSize,
 | 
						|
		closed:       false,
 | 
						|
		attrs:        make(map[string][]byte),
 | 
						|
		maxRetries:   defaultScanMaxRetries,
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (s *Scan) Close() error {
 | 
						|
	if s.closed {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
 | 
						|
	err := s.closeScan(s.server, s.location, s.id)
 | 
						|
	if err != nil {
 | 
						|
		return errors.Trace(err)
 | 
						|
	}
 | 
						|
 | 
						|
	s.closed = true
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (s *Scan) AddColumn(family, qual []byte) {
 | 
						|
	s.AddFamily(family)
 | 
						|
	pos := s.posOfFamily(family)
 | 
						|
	s.qualifiers[pos] = append(s.qualifiers[pos], qual)
 | 
						|
}
 | 
						|
 | 
						|
func (s *Scan) AddStringColumn(family, qual string) {
 | 
						|
	s.AddColumn([]byte(family), []byte(qual))
 | 
						|
}
 | 
						|
 | 
						|
func (s *Scan) AddFamily(family []byte) {
 | 
						|
	pos := s.posOfFamily(family)
 | 
						|
	if pos == -1 {
 | 
						|
		s.families = append(s.families, family)
 | 
						|
		s.qualifiers = append(s.qualifiers, make([][]byte, 0))
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (s *Scan) AddStringFamily(family string) {
 | 
						|
	s.AddFamily([]byte(family))
 | 
						|
}
 | 
						|
 | 
						|
func (s *Scan) posOfFamily(family []byte) int {
 | 
						|
	for p, v := range s.families {
 | 
						|
		if bytes.Equal(family, v) {
 | 
						|
			return p
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return -1
 | 
						|
}
 | 
						|
 | 
						|
func (s *Scan) AddAttr(name string, val []byte) {
 | 
						|
	s.attrs[name] = val
 | 
						|
}
 | 
						|
 | 
						|
func (s *Scan) AddTimeRange(from uint64, to uint64) {
 | 
						|
	s.TsRangeFrom = from
 | 
						|
	s.TsRangeTo = to
 | 
						|
}
 | 
						|
 | 
						|
func (s *Scan) Closed() bool {
 | 
						|
	return s.closed
 | 
						|
}
 | 
						|
 | 
						|
func (s *Scan) CreateGetFromScan(row []byte) *Get {
 | 
						|
	g := NewGet(row)
 | 
						|
	for i, family := range s.families {
 | 
						|
		if len(s.qualifiers[i]) > 0 {
 | 
						|
			for _, qual := range s.qualifiers[i] {
 | 
						|
				g.AddColumn(family, qual)
 | 
						|
			}
 | 
						|
		} else {
 | 
						|
			g.AddFamily(family)
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return g
 | 
						|
}
 | 
						|
 | 
						|
func (s *Scan) getData(startKey []byte, retries int) ([]*ResultRow, error) {
 | 
						|
	server, location, err := s.getServerAndLocation(s.table, startKey)
 | 
						|
	if err != nil {
 | 
						|
		return nil, errors.Trace(err)
 | 
						|
	}
 | 
						|
 | 
						|
	req := &proto.ScanRequest{
 | 
						|
		Region: &proto.RegionSpecifier{
 | 
						|
			Type:  proto.RegionSpecifier_REGION_NAME.Enum(),
 | 
						|
			Value: []byte(location.Name),
 | 
						|
		},
 | 
						|
		NumberOfRows: pb.Uint32(uint32(s.numCached)),
 | 
						|
		Scan:         &proto.Scan{},
 | 
						|
	}
 | 
						|
 | 
						|
	// set attributes
 | 
						|
	var attrs []*proto.NameBytesPair
 | 
						|
	for k, v := range s.attrs {
 | 
						|
		p := &proto.NameBytesPair{
 | 
						|
			Name:  pb.String(k),
 | 
						|
			Value: v,
 | 
						|
		}
 | 
						|
		attrs = append(attrs, p)
 | 
						|
	}
 | 
						|
	if len(attrs) > 0 {
 | 
						|
		req.Scan.Attribute = attrs
 | 
						|
	}
 | 
						|
 | 
						|
	if s.id > 0 {
 | 
						|
		req.ScannerId = pb.Uint64(s.id)
 | 
						|
	}
 | 
						|
	req.Scan.StartRow = startKey
 | 
						|
	if s.StopRow != nil {
 | 
						|
		req.Scan.StopRow = s.StopRow
 | 
						|
	}
 | 
						|
	if s.MaxVersions > 0 {
 | 
						|
		req.Scan.MaxVersions = &s.MaxVersions
 | 
						|
	}
 | 
						|
	if s.TsRangeTo > s.TsRangeFrom {
 | 
						|
		req.Scan.TimeRange = &proto.TimeRange{
 | 
						|
			From: pb.Uint64(s.TsRangeFrom),
 | 
						|
			To:   pb.Uint64(s.TsRangeTo),
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	for i, v := range s.families {
 | 
						|
		req.Scan.Column = append(req.Scan.Column, &proto.Column{
 | 
						|
			Family:    v,
 | 
						|
			Qualifier: s.qualifiers[i],
 | 
						|
		})
 | 
						|
	}
 | 
						|
 | 
						|
	cl := newCall(req)
 | 
						|
	err = server.call(cl)
 | 
						|
	if err != nil {
 | 
						|
		return nil, errors.Trace(err)
 | 
						|
	}
 | 
						|
 | 
						|
	msg := <-cl.responseCh
 | 
						|
	rs, err := s.processResponse(msg)
 | 
						|
	if err != nil && (isNotInRegionError(err) || isUnknownScannerError(err)) {
 | 
						|
		if retries <= s.maxRetries {
 | 
						|
			// clean this table region cache and try again
 | 
						|
			s.client.CleanRegionCache(s.table)
 | 
						|
			// create new scanner and set startRow to lastResult
 | 
						|
			s.id = 0
 | 
						|
			if s.lastResult != nil {
 | 
						|
				startKey = s.lastResult.Row
 | 
						|
				s.skipFirst = true
 | 
						|
			}
 | 
						|
			s.server = nil
 | 
						|
			s.location = nil
 | 
						|
			log.Warnf("Retryint get data for %d time(s)", retries+1)
 | 
						|
			retrySleep(retries + 1)
 | 
						|
			return s.getData(startKey, retries+1)
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return rs, nil
 | 
						|
}
 | 
						|
 | 
						|
func (s *Scan) processResponse(response pb.Message) ([]*ResultRow, error) {
 | 
						|
	var res *proto.ScanResponse
 | 
						|
	switch r := response.(type) {
 | 
						|
	case *proto.ScanResponse:
 | 
						|
		res = r
 | 
						|
	case *exception:
 | 
						|
		return nil, errors.New(r.msg)
 | 
						|
	default:
 | 
						|
		return nil, errors.Errorf("Invalid response seen [response: %#v]", response)
 | 
						|
	}
 | 
						|
 | 
						|
	// Check whether response is nil.
 | 
						|
	if res == nil {
 | 
						|
		return nil, errors.Errorf("Empty response: [table=%s] [StartRow=%q] [StopRow=%q] ", s.table, s.StartRow, s.StopRow)
 | 
						|
	}
 | 
						|
 | 
						|
	nextRegion := true
 | 
						|
	s.nextStartKey = nil
 | 
						|
	s.id = res.GetScannerId()
 | 
						|
 | 
						|
	results := res.GetResults()
 | 
						|
	n := len(results)
 | 
						|
 | 
						|
	if (n == s.numCached) ||
 | 
						|
		len(s.location.EndKey) == 0 ||
 | 
						|
		(s.StopRow != nil && bytes.Compare(s.location.EndKey, s.StopRow) > 0 && n < s.numCached) ||
 | 
						|
		res.GetMoreResultsInRegion() {
 | 
						|
		nextRegion = false
 | 
						|
	}
 | 
						|
 | 
						|
	var err error
 | 
						|
	if nextRegion {
 | 
						|
		s.nextStartKey = s.location.EndKey
 | 
						|
		err = s.closeScan(s.server, s.location, s.id)
 | 
						|
		if err != nil {
 | 
						|
			return nil, errors.Trace(err)
 | 
						|
		}
 | 
						|
		s.server = nil
 | 
						|
		s.location = nil
 | 
						|
		s.id = 0
 | 
						|
	}
 | 
						|
 | 
						|
	if n == 0 && !nextRegion {
 | 
						|
		err = s.Close()
 | 
						|
		if err != nil {
 | 
						|
			return nil, errors.Trace(err)
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	if s.skipFirst {
 | 
						|
		results = results[1:]
 | 
						|
		s.skipFirst = false
 | 
						|
		n = len(results)
 | 
						|
	}
 | 
						|
 | 
						|
	tbr := make([]*ResultRow, n)
 | 
						|
	for i, v := range results {
 | 
						|
		if v != nil {
 | 
						|
			tbr[i] = NewResultRow(v)
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	return tbr, nil
 | 
						|
}
 | 
						|
 | 
						|
func (s *Scan) nextBatch() int {
 | 
						|
	startKey := s.nextStartKey
 | 
						|
	if startKey == nil {
 | 
						|
		startKey = s.StartRow
 | 
						|
	}
 | 
						|
 | 
						|
	// Notice: ignore error here.
 | 
						|
	// TODO: add error check, now only add a log.
 | 
						|
	rs, err := s.getData(startKey, 0)
 | 
						|
	if err != nil {
 | 
						|
		log.Errorf("scan next batch failed - [startKey=%q], %v", startKey, errors.ErrorStack(err))
 | 
						|
	}
 | 
						|
 | 
						|
	// Current region get 0 data, try switch to next region.
 | 
						|
	if len(rs) == 0 && len(s.nextStartKey) > 0 {
 | 
						|
		// TODO: add error check, now only add a log.
 | 
						|
		rs, err = s.getData(s.nextStartKey, 0)
 | 
						|
		if err != nil {
 | 
						|
			log.Errorf("scan next batch failed - [startKey=%q], %v", s.nextStartKey, errors.ErrorStack(err))
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	s.cache = rs
 | 
						|
	return len(s.cache)
 | 
						|
}
 | 
						|
 | 
						|
func (s *Scan) Next() *ResultRow {
 | 
						|
	if s.closed {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
	var ret *ResultRow
 | 
						|
	if len(s.cache) == 0 {
 | 
						|
		n := s.nextBatch()
 | 
						|
		// no data returned
 | 
						|
		if n == 0 {
 | 
						|
			return nil
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	ret = s.cache[0]
 | 
						|
	s.lastResult = ret
 | 
						|
	s.cache = s.cache[1:]
 | 
						|
	return ret
 | 
						|
}
 | 
						|
 | 
						|
func (s *Scan) closeScan(server *connection, location *RegionInfo, id uint64) error {
 | 
						|
	if server == nil || location == nil {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
 | 
						|
	req := &proto.ScanRequest{
 | 
						|
		Region: &proto.RegionSpecifier{
 | 
						|
			Type:  proto.RegionSpecifier_REGION_NAME.Enum(),
 | 
						|
			Value: []byte(location.Name),
 | 
						|
		},
 | 
						|
		ScannerId:    pb.Uint64(id),
 | 
						|
		CloseScanner: pb.Bool(true),
 | 
						|
	}
 | 
						|
 | 
						|
	cl := newCall(req)
 | 
						|
	err := server.call(cl)
 | 
						|
	if err != nil {
 | 
						|
		return errors.Trace(err)
 | 
						|
	}
 | 
						|
 | 
						|
	// TODO: add exception check.
 | 
						|
	<-cl.responseCh
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (s *Scan) getServerAndLocation(table, startRow []byte) (*connection, *RegionInfo, error) {
 | 
						|
	if s.server != nil && s.location != nil {
 | 
						|
		return s.server, s.location, nil
 | 
						|
	}
 | 
						|
 | 
						|
	var err error
 | 
						|
	s.location, err = s.client.LocateRegion(table, startRow, true)
 | 
						|
	if err != nil {
 | 
						|
		return nil, nil, errors.Trace(err)
 | 
						|
	}
 | 
						|
 | 
						|
	s.server, err = s.client.getClientConn(s.location.Server)
 | 
						|
	if err != nil {
 | 
						|
		return nil, nil, errors.Trace(err)
 | 
						|
	}
 | 
						|
	return s.server, s.location, nil
 | 
						|
}
 |