356 lines
11 KiB
Go
356 lines
11 KiB
Go
// Copyright 2022 Google LLC
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package firestore
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
vkit "cloud.google.com/go/firestore/apiv1"
|
|
pb "cloud.google.com/go/firestore/apiv1/firestorepb"
|
|
"golang.org/x/time/rate"
|
|
"google.golang.org/api/support/bundler"
|
|
"google.golang.org/grpc/codes"
|
|
"google.golang.org/grpc/status"
|
|
)
|
|
|
|
const (
|
|
// maxBatchSize is the max number of writes to send in a request
|
|
maxBatchSize = 20
|
|
// maxRetryAttempts is the max number of times to retry a write
|
|
maxRetryAttempts = 10
|
|
// defaultStartingMaximumOpsPerSecond is the starting max number of requests to the service per second
|
|
defaultStartingMaximumOpsPerSecond = 500
|
|
// maxWritesPerSecond is the starting limit of writes allowed to callers per second
|
|
maxWritesPerSecond = maxBatchSize * defaultStartingMaximumOpsPerSecond
|
|
)
|
|
|
|
var (
|
|
batchWriteRetryCodes = map[codes.Code]bool{
|
|
codes.ResourceExhausted: true,
|
|
codes.Unavailable: true,
|
|
codes.Aborted: true,
|
|
}
|
|
)
|
|
|
|
// bulkWriterResult contains the WriteResult or error results from an individual
|
|
// write to the database.
|
|
type bulkWriterResult struct {
|
|
result *pb.WriteResult // (cached) result from the operation
|
|
err error // (cached) any errors that occurred
|
|
}
|
|
|
|
// BulkWriterJob provides read-only access to the results of a BulkWriter write attempt.
|
|
type BulkWriterJob struct {
|
|
resultChan chan bulkWriterResult // send errors and results to this channel
|
|
write *pb.Write // the writes to apply to the database
|
|
attempts int // number of times this write has been attempted
|
|
resultsLock sync.Mutex // guards the cached wr and e values for the job
|
|
result *WriteResult // (cached) result from the operation
|
|
err error // (cached) any errors that occurred
|
|
ctx context.Context // context for canceling/timing out results
|
|
}
|
|
|
|
// Results gets the results of the BulkWriter write attempt.
|
|
// This method blocks if the results for this BulkWriterJob haven't been
|
|
// received.
|
|
func (j *BulkWriterJob) Results() (*WriteResult, error) {
|
|
j.resultsLock.Lock()
|
|
defer j.resultsLock.Unlock()
|
|
if j.result == nil && j.err == nil {
|
|
j.result, j.err = j.processResults() // cache the results for additional calls
|
|
}
|
|
return j.result, j.err
|
|
}
|
|
|
|
// processResults checks for errors returned from send() and packages up the
|
|
// results as WriteResult objects
|
|
func (j *BulkWriterJob) processResults() (*WriteResult, error) {
|
|
select {
|
|
case <-j.ctx.Done():
|
|
return nil, j.ctx.Err()
|
|
case bwr := <-j.resultChan:
|
|
if bwr.err != nil {
|
|
return nil, bwr.err
|
|
}
|
|
return writeResultFromProto(bwr.result)
|
|
}
|
|
}
|
|
|
|
// setError ensures that an error is returned on the error channel of BulkWriterJob.
|
|
func (j *BulkWriterJob) setError(e error) {
|
|
bwr := bulkWriterResult{
|
|
err: e,
|
|
result: nil,
|
|
}
|
|
j.resultChan <- bwr
|
|
close(j.resultChan)
|
|
}
|
|
|
|
// A BulkWriter supports concurrent writes to multiple documents. The BulkWriter
|
|
// submits document writes in maximum batches of 20 writes per request. Each
|
|
// request can contain many different document writes: create, delete, update,
|
|
// and set are all supported.
|
|
//
|
|
// Only one operation (create, set, update, delete) per document is allowed.
|
|
// BulkWriter cannot promise atomicity: individual writes can fail or succeed
|
|
// independent of each other. Bulkwriter does not apply writes in any set order;
|
|
// thus a document can't have set on it immediately after creation.
|
|
type BulkWriter struct {
|
|
database string // the database as resource name: projects/[PROJECT]/databases/[DATABASE]
|
|
start time.Time // when this BulkWriter was started; used to calculate qps and rate increases
|
|
vc *vkit.Client // internal client
|
|
maxOpsPerSecond int // number of requests that can be sent per second
|
|
docUpdatePaths map[string]bool // document paths with corresponding writes in the queue
|
|
limiter rate.Limiter // limit requests to server to <= 500 qps
|
|
bundler *bundler.Bundler // handle bundling up writes to Firestore
|
|
ctx context.Context // context for canceling all BulkWriter operations
|
|
isOpenLock sync.RWMutex // guards against setting isOpen concurrently
|
|
isOpen bool // flag that the BulkWriter is closed
|
|
}
|
|
|
|
// newBulkWriter creates a new instance of the BulkWriter.
|
|
func newBulkWriter(ctx context.Context, c *Client, database string) *BulkWriter {
|
|
// Although typically we shouldn't store Context objects, in this case we
|
|
// need to pass this Context through to the Bundler handler.
|
|
ctx = withResourceHeader(ctx, c.path())
|
|
|
|
bw := &BulkWriter{
|
|
database: database,
|
|
start: time.Now(),
|
|
vc: c.c,
|
|
isOpen: true,
|
|
maxOpsPerSecond: defaultStartingMaximumOpsPerSecond,
|
|
docUpdatePaths: make(map[string]bool),
|
|
ctx: ctx,
|
|
limiter: *rate.NewLimiter(rate.Limit(maxWritesPerSecond), 1),
|
|
}
|
|
|
|
// can't initialize within struct above; need instance reference to BulkWriter.send()
|
|
bw.bundler = bundler.NewBundler(&BulkWriterJob{}, bw.send)
|
|
bw.bundler.HandlerLimit = bw.maxOpsPerSecond
|
|
bw.bundler.BundleCountThreshold = maxBatchSize
|
|
|
|
return bw
|
|
}
|
|
|
|
// End sends all enqueued writes in parallel and closes the BulkWriter to new requests.
|
|
// After calling End(), calling any additional method automatically returns
|
|
// with an error. This method completes when there are no more pending writes
|
|
// in the queue.
|
|
func (bw *BulkWriter) End() {
|
|
bw.isOpenLock.Lock()
|
|
bw.isOpen = false
|
|
bw.isOpenLock.Unlock()
|
|
bw.Flush()
|
|
}
|
|
|
|
// Flush commits all writes that have been enqueued up to this point in parallel.
|
|
// This method blocks execution.
|
|
func (bw *BulkWriter) Flush() {
|
|
bw.bundler.Flush()
|
|
}
|
|
|
|
// Create adds a document creation write to the queue of writes to send.
|
|
// Note: You cannot write to (Create, Update, Set, or Delete) the same document more than once.
|
|
func (bw *BulkWriter) Create(doc *DocumentRef, datum interface{}) (*BulkWriterJob, error) {
|
|
bw.isOpenLock.RLock()
|
|
defer bw.isOpenLock.RUnlock()
|
|
err := bw.checkWriteConditions(doc)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
w, err := doc.newCreateWrites(datum)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("firestore: cannot create %v with %v. %w", doc.ID, datum, err)
|
|
}
|
|
|
|
if len(w) > 1 {
|
|
return nil, fmt.Errorf("firestore: too many document writes sent to bulkwriter")
|
|
}
|
|
|
|
j := bw.write(w[0])
|
|
return j, nil
|
|
}
|
|
|
|
// Delete adds a document deletion write to the queue of writes to send.
|
|
// Note: You cannot write to (Create, Update, Set, or Delete) the same document more than once.
|
|
func (bw *BulkWriter) Delete(doc *DocumentRef, preconds ...Precondition) (*BulkWriterJob, error) {
|
|
bw.isOpenLock.RLock()
|
|
defer bw.isOpenLock.RUnlock()
|
|
err := bw.checkWriteConditions(doc)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
w, err := doc.newDeleteWrites(preconds)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("firestore: cannot delete doc %v. %w", doc.ID, err)
|
|
}
|
|
|
|
if len(w) > 1 {
|
|
return nil, fmt.Errorf("firestore: too many document writes sent to bulkwriter")
|
|
}
|
|
|
|
j := bw.write(w[0])
|
|
return j, nil
|
|
}
|
|
|
|
// Set adds a document set write to the queue of writes to send.
|
|
// Note: You cannot write to (Create, Update, Set, or Delete) the same document more than once.
|
|
func (bw *BulkWriter) Set(doc *DocumentRef, datum interface{}, opts ...SetOption) (*BulkWriterJob, error) {
|
|
bw.isOpenLock.RLock()
|
|
defer bw.isOpenLock.RUnlock()
|
|
err := bw.checkWriteConditions(doc)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
w, err := doc.newSetWrites(datum, opts)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("firestore: cannot set %v on doc %v. %w", datum, doc.ID, err)
|
|
}
|
|
|
|
if len(w) > 1 {
|
|
return nil, fmt.Errorf("firestore: too many writes sent to bulkwriter")
|
|
}
|
|
|
|
j := bw.write(w[0])
|
|
return j, nil
|
|
}
|
|
|
|
// Update adds a document update write to the queue of writes to send.
|
|
// Note: You cannot write to (Create, Update, Set, or Delete) the same document more than once.
|
|
func (bw *BulkWriter) Update(doc *DocumentRef, updates []Update, preconds ...Precondition) (*BulkWriterJob, error) {
|
|
bw.isOpenLock.RLock()
|
|
defer bw.isOpenLock.RUnlock()
|
|
err := bw.checkWriteConditions(doc)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
w, err := doc.newUpdatePathWrites(updates, preconds)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("firestore: cannot update doc %v. %w", doc.ID, err)
|
|
}
|
|
|
|
if len(w) > 1 {
|
|
return nil, fmt.Errorf("firestore: too many writes sent to bulkwriter")
|
|
}
|
|
|
|
j := bw.write(w[0])
|
|
return j, nil
|
|
}
|
|
|
|
// checkConditions determines whether this write attempt is valid. It returns
|
|
// an error if either the BulkWriter has already been closed or if it
|
|
// receives a nil document reference.
|
|
func (bw *BulkWriter) checkWriteConditions(doc *DocumentRef) error {
|
|
if !bw.isOpen {
|
|
return errors.New("firestore: BulkWriter has been closed")
|
|
}
|
|
|
|
if doc == nil {
|
|
return errors.New("firestore: nil document contents")
|
|
}
|
|
|
|
_, havePath := bw.docUpdatePaths[doc.shortPath]
|
|
if havePath {
|
|
return fmt.Errorf("firestore: BulkWriter received duplicate write for path: %v", doc.shortPath)
|
|
}
|
|
|
|
bw.docUpdatePaths[doc.shortPath] = true
|
|
|
|
return nil
|
|
}
|
|
|
|
// write packages up write requests into bulkWriterJob objects.
|
|
func (bw *BulkWriter) write(w *pb.Write) *BulkWriterJob {
|
|
|
|
j := &BulkWriterJob{
|
|
resultChan: make(chan bulkWriterResult, 1),
|
|
write: w,
|
|
ctx: bw.ctx,
|
|
}
|
|
|
|
bw.limiter.Wait(bw.ctx)
|
|
// ignore operation size constraints and related errors; can't be inferred at compile time
|
|
// Bundler is set to accept an unlimited amount of bytes
|
|
_ = bw.bundler.Add(j, 0)
|
|
|
|
return j
|
|
}
|
|
|
|
// send transmits writes to the service and matches response results to job channels.
|
|
func (bw *BulkWriter) send(i interface{}) {
|
|
bwj := i.([]*BulkWriterJob)
|
|
|
|
if len(bwj) == 0 {
|
|
return
|
|
}
|
|
|
|
var ws []*pb.Write
|
|
for _, w := range bwj {
|
|
ws = append(ws, w.write)
|
|
}
|
|
|
|
bwr := &pb.BatchWriteRequest{
|
|
Database: bw.database,
|
|
Writes: ws,
|
|
Labels: map[string]string{},
|
|
}
|
|
|
|
select {
|
|
case <-bw.ctx.Done():
|
|
return
|
|
default:
|
|
resp, err := bw.vc.BatchWrite(bw.ctx, bwr)
|
|
if err != nil {
|
|
// Do we need to be selective about what kind of errors we send?
|
|
for _, j := range bwj {
|
|
j.setError(err)
|
|
}
|
|
return
|
|
}
|
|
// Match write results with BulkWriterJob objects
|
|
for i, res := range resp.WriteResults {
|
|
s := resp.Status[i]
|
|
c := s.GetCode()
|
|
if c != 0 { // Should we do an explicit check against rpc.Code enum?
|
|
j := bwj[i]
|
|
j.attempts++
|
|
|
|
// Do we need separate retry bundler?
|
|
_, isRetryable := batchWriteRetryCodes[codes.Code(s.Code)]
|
|
if j.attempts < maxRetryAttempts && isRetryable {
|
|
// ignore operation size constraints and related errors; job size can't be inferred at compile time
|
|
// Bundler is set to accept an unlimited amount of bytes
|
|
_ = bw.bundler.Add(j, 0)
|
|
} else {
|
|
j.setError(status.Error(codes.Code(s.Code), s.Message))
|
|
}
|
|
continue
|
|
}
|
|
|
|
bwj[i].resultChan <- bulkWriterResult{err: nil, result: res}
|
|
close(bwj[i].resultChan)
|
|
}
|
|
}
|
|
}
|