615 lines
22 KiB
Go
615 lines
22 KiB
Go
package meilisearch
|
|
|
|
import (
|
|
"bufio"
|
|
"bytes"
|
|
"context"
|
|
"encoding/csv"
|
|
"fmt"
|
|
"io"
|
|
"math"
|
|
"net/http"
|
|
"reflect"
|
|
"strconv"
|
|
"strings"
|
|
)
|
|
|
|
func (i *index) AddDocuments(documentsPtr interface{}, primaryKey ...string) (*TaskInfo, error) {
|
|
return i.AddDocumentsWithContext(context.Background(), documentsPtr, primaryKey...)
|
|
}
|
|
|
|
func (i *index) AddDocumentsWithContext(ctx context.Context, documentsPtr interface{}, primaryKey ...string) (*TaskInfo, error) {
|
|
return i.addDocuments(ctx, documentsPtr, contentTypeJSON, transformStringVariadicToMap(primaryKey...))
|
|
}
|
|
|
|
func (i *index) AddDocumentsInBatches(documentsPtr interface{}, batchSize int, primaryKey ...string) ([]TaskInfo, error) {
|
|
return i.AddDocumentsInBatchesWithContext(context.Background(), documentsPtr, batchSize, primaryKey...)
|
|
}
|
|
|
|
func (i *index) AddDocumentsInBatchesWithContext(ctx context.Context, documentsPtr interface{}, batchSize int, primaryKey ...string) ([]TaskInfo, error) {
|
|
return i.saveDocumentsInBatches(ctx, documentsPtr, batchSize, i.AddDocumentsWithContext, primaryKey...)
|
|
}
|
|
|
|
func (i *index) AddDocumentsCsv(documents []byte, options *CsvDocumentsQuery) (*TaskInfo, error) {
|
|
return i.AddDocumentsCsvWithContext(context.Background(), documents, options)
|
|
}
|
|
|
|
func (i *index) AddDocumentsCsvWithContext(ctx context.Context, documents []byte, options *CsvDocumentsQuery) (*TaskInfo, error) {
|
|
// []byte avoids JSON conversion in Client.sendRequest()
|
|
return i.addDocuments(ctx, documents, contentTypeCSV, transformCsvDocumentsQueryToMap(options))
|
|
}
|
|
|
|
func (i *index) AddDocumentsCsvInBatches(documents []byte, batchSize int, options *CsvDocumentsQuery) ([]TaskInfo, error) {
|
|
return i.AddDocumentsCsvInBatchesWithContext(context.Background(), documents, batchSize, options)
|
|
}
|
|
|
|
func (i *index) AddDocumentsCsvInBatchesWithContext(ctx context.Context, documents []byte, batchSize int, options *CsvDocumentsQuery) ([]TaskInfo, error) {
|
|
// Reuse io.Reader implementation
|
|
return i.AddDocumentsCsvFromReaderInBatchesWithContext(ctx, bytes.NewReader(documents), batchSize, options)
|
|
}
|
|
|
|
func (i *index) AddDocumentsCsvFromReaderInBatches(documents io.Reader, batchSize int, options *CsvDocumentsQuery) (resp []TaskInfo, err error) {
|
|
return i.AddDocumentsCsvFromReaderInBatchesWithContext(context.Background(), documents, batchSize, options)
|
|
}
|
|
|
|
func (i *index) AddDocumentsCsvFromReaderInBatchesWithContext(ctx context.Context, documents io.Reader, batchSize int, options *CsvDocumentsQuery) (resp []TaskInfo, err error) {
|
|
return i.saveDocumentsFromReaderInBatches(ctx, documents, batchSize, i.AddDocumentsCsvWithContext, options)
|
|
}
|
|
|
|
func (i *index) AddDocumentsCsvFromReader(documents io.Reader, options *CsvDocumentsQuery) (resp *TaskInfo, err error) {
|
|
return i.AddDocumentsCsvFromReaderWithContext(context.Background(), documents, options)
|
|
}
|
|
|
|
func (i *index) AddDocumentsCsvFromReaderWithContext(ctx context.Context, documents io.Reader, options *CsvDocumentsQuery) (resp *TaskInfo, err error) {
|
|
// Using io.Reader would avoid JSON conversion in Client.sendRequest(), but
|
|
// read content to memory anyway because of problems with streamed bodies
|
|
data, err := io.ReadAll(documents)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("could not read documents: %w", err)
|
|
}
|
|
return i.addDocuments(ctx, data, contentTypeCSV, transformCsvDocumentsQueryToMap(options))
|
|
}
|
|
|
|
func (i *index) AddDocumentsNdjson(documents []byte, primaryKey ...string) (*TaskInfo, error) {
|
|
return i.AddDocumentsNdjsonWithContext(context.Background(), documents, primaryKey...)
|
|
}
|
|
|
|
func (i *index) AddDocumentsNdjsonWithContext(ctx context.Context, documents []byte, primaryKey ...string) (*TaskInfo, error) {
|
|
// []byte avoids JSON conversion in Client.sendRequest()
|
|
return i.addDocuments(ctx, documents, contentTypeNDJSON, transformStringVariadicToMap(primaryKey...))
|
|
}
|
|
|
|
func (i *index) AddDocumentsNdjsonInBatches(documents []byte, batchSize int, primaryKey ...string) ([]TaskInfo, error) {
|
|
return i.AddDocumentsNdjsonInBatchesWithContext(context.Background(), documents, batchSize, primaryKey...)
|
|
}
|
|
|
|
func (i *index) AddDocumentsNdjsonInBatchesWithContext(ctx context.Context, documents []byte, batchSize int, primaryKey ...string) ([]TaskInfo, error) {
|
|
// Reuse io.Reader implementation
|
|
return i.AddDocumentsNdjsonFromReaderInBatchesWithContext(ctx, bytes.NewReader(documents), batchSize, primaryKey...)
|
|
}
|
|
|
|
func (i *index) AddDocumentsNdjsonFromReaderInBatches(documents io.Reader, batchSize int, primaryKey ...string) (resp []TaskInfo, err error) {
|
|
return i.AddDocumentsNdjsonFromReaderInBatchesWithContext(context.Background(), documents, batchSize, primaryKey...)
|
|
}
|
|
|
|
func (i *index) AddDocumentsNdjsonFromReaderInBatchesWithContext(ctx context.Context, documents io.Reader, batchSize int, primaryKey ...string) (resp []TaskInfo, err error) {
|
|
// NDJSON files supposed to contain a valid JSON document in each line, so
|
|
// it's safe to split by lines.
|
|
// Lines are read and sent continuously to avoid reading all content into
|
|
// memory. However, this means that only part of the documents might be
|
|
// added successfully.
|
|
|
|
sendNdjsonLines := func(lines []string) (*TaskInfo, error) {
|
|
b := new(bytes.Buffer)
|
|
for _, line := range lines {
|
|
_, err := b.WriteString(line)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("could not write NDJSON line: %w", err)
|
|
}
|
|
err = b.WriteByte('\n')
|
|
if err != nil {
|
|
return nil, fmt.Errorf("could not write NDJSON line: %w", err)
|
|
}
|
|
}
|
|
|
|
resp, err := i.AddDocumentsNdjsonWithContext(ctx, b.Bytes(), primaryKey...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return resp, nil
|
|
}
|
|
|
|
var (
|
|
responses []TaskInfo
|
|
lines []string
|
|
)
|
|
|
|
scanner := bufio.NewScanner(documents)
|
|
for scanner.Scan() {
|
|
line := strings.TrimSpace(scanner.Text())
|
|
|
|
// Skip empty lines (NDJSON might not allow this, but just to be sure)
|
|
if line == "" {
|
|
continue
|
|
}
|
|
|
|
lines = append(lines, line)
|
|
// After reaching batchSize send NDJSON lines
|
|
if len(lines) == batchSize {
|
|
resp, err := sendNdjsonLines(lines)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
responses = append(responses, *resp)
|
|
lines = nil
|
|
}
|
|
}
|
|
if err := scanner.Err(); err != nil {
|
|
return nil, fmt.Errorf("could not read NDJSON: %w", err)
|
|
}
|
|
|
|
// Send remaining records as the last batch if there is any
|
|
if len(lines) > 0 {
|
|
resp, err := sendNdjsonLines(lines)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
responses = append(responses, *resp)
|
|
}
|
|
|
|
return responses, nil
|
|
}
|
|
|
|
func (i *index) AddDocumentsNdjsonFromReader(documents io.Reader, primaryKey ...string) (resp *TaskInfo, err error) {
|
|
return i.AddDocumentsNdjsonFromReaderWithContext(context.Background(), documents, primaryKey...)
|
|
}
|
|
|
|
func (i *index) AddDocumentsNdjsonFromReaderWithContext(ctx context.Context, documents io.Reader, primaryKey ...string) (resp *TaskInfo, err error) {
|
|
// Using io.Reader would avoid JSON conversion in Client.sendRequest(), but
|
|
// read content to memory anyway because of problems with streamed bodies
|
|
data, err := io.ReadAll(documents)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("could not read documents: %w", err)
|
|
}
|
|
return i.addDocuments(ctx, data, contentTypeNDJSON, transformStringVariadicToMap(primaryKey...))
|
|
}
|
|
|
|
func (i *index) UpdateDocuments(documentsPtr interface{}, primaryKey ...string) (*TaskInfo, error) {
|
|
return i.UpdateDocumentsWithContext(context.Background(), documentsPtr, primaryKey...)
|
|
}
|
|
|
|
func (i *index) UpdateDocumentsWithContext(ctx context.Context, documentsPtr interface{}, primaryKey ...string) (*TaskInfo, error) {
|
|
return i.updateDocuments(ctx, documentsPtr, contentTypeJSON, transformStringVariadicToMap(primaryKey...))
|
|
}
|
|
|
|
func (i *index) UpdateDocumentsInBatches(documentsPtr interface{}, batchSize int, primaryKey ...string) ([]TaskInfo, error) {
|
|
return i.UpdateDocumentsInBatchesWithContext(context.Background(), documentsPtr, batchSize, primaryKey...)
|
|
}
|
|
|
|
func (i *index) UpdateDocumentsInBatchesWithContext(ctx context.Context, documentsPtr interface{}, batchSize int, primaryKey ...string) ([]TaskInfo, error) {
|
|
return i.saveDocumentsInBatches(ctx, documentsPtr, batchSize, i.UpdateDocumentsWithContext, primaryKey...)
|
|
}
|
|
|
|
func (i *index) UpdateDocumentsCsv(documents []byte, options *CsvDocumentsQuery) (*TaskInfo, error) {
|
|
return i.UpdateDocumentsCsvWithContext(context.Background(), documents, options)
|
|
}
|
|
|
|
func (i *index) UpdateDocumentsCsvWithContext(ctx context.Context, documents []byte, options *CsvDocumentsQuery) (*TaskInfo, error) {
|
|
return i.updateDocuments(ctx, documents, contentTypeCSV, transformCsvDocumentsQueryToMap(options))
|
|
}
|
|
|
|
func (i *index) UpdateDocumentsCsvInBatches(documents []byte, batchSize int, options *CsvDocumentsQuery) ([]TaskInfo, error) {
|
|
return i.UpdateDocumentsCsvInBatchesWithContext(context.Background(), documents, batchSize, options)
|
|
}
|
|
|
|
func (i *index) UpdateDocumentsCsvInBatchesWithContext(ctx context.Context, documents []byte, batchSize int, options *CsvDocumentsQuery) ([]TaskInfo, error) {
|
|
// Reuse io.Reader implementation
|
|
return i.updateDocumentsCsvFromReaderInBatches(ctx, bytes.NewReader(documents), batchSize, options)
|
|
}
|
|
|
|
func (i *index) UpdateDocumentsNdjson(documents []byte, primaryKey ...string) (*TaskInfo, error) {
|
|
return i.UpdateDocumentsNdjsonWithContext(context.Background(), documents, primaryKey...)
|
|
}
|
|
|
|
func (i *index) UpdateDocumentsNdjsonWithContext(ctx context.Context, documents []byte, primaryKey ...string) (*TaskInfo, error) {
|
|
return i.updateDocuments(ctx, documents, contentTypeNDJSON, transformStringVariadicToMap(primaryKey...))
|
|
}
|
|
|
|
func (i *index) UpdateDocumentsNdjsonInBatches(documents []byte, batchSize int, primaryKey ...string) ([]TaskInfo, error) {
|
|
return i.UpdateDocumentsNdjsonInBatchesWithContext(context.Background(), documents, batchSize, primaryKey...)
|
|
}
|
|
|
|
func (i *index) UpdateDocumentsNdjsonInBatchesWithContext(ctx context.Context, documents []byte, batchSize int, primaryKey ...string) ([]TaskInfo, error) {
|
|
return i.updateDocumentsNdjsonFromReaderInBatches(ctx, bytes.NewReader(documents), batchSize, primaryKey...)
|
|
}
|
|
|
|
func (i *index) UpdateDocumentsByFunction(req *UpdateDocumentByFunctionRequest) (*TaskInfo, error) {
|
|
return i.UpdateDocumentsByFunctionWithContext(context.Background(), req)
|
|
}
|
|
|
|
func (i *index) UpdateDocumentsByFunctionWithContext(ctx context.Context, req *UpdateDocumentByFunctionRequest) (*TaskInfo, error) {
|
|
resp := new(TaskInfo)
|
|
r := &internalRequest{
|
|
endpoint: "/indexes/" + i.uid + "/documents/edit",
|
|
method: http.MethodPost,
|
|
withRequest: req,
|
|
withResponse: resp,
|
|
contentType: contentTypeJSON,
|
|
acceptedStatusCodes: []int{http.StatusAccepted},
|
|
functionName: "UpdateDocumentsByFunction",
|
|
}
|
|
if err := i.client.executeRequest(ctx, r); err != nil {
|
|
return nil, err
|
|
}
|
|
return resp, nil
|
|
}
|
|
|
|
func (i *index) GetDocument(identifier string, request *DocumentQuery, documentPtr interface{}) error {
|
|
return i.GetDocumentWithContext(context.Background(), identifier, request, documentPtr)
|
|
}
|
|
|
|
func (i *index) GetDocumentWithContext(ctx context.Context, identifier string, request *DocumentQuery, documentPtr interface{}) error {
|
|
req := &internalRequest{
|
|
endpoint: "/indexes/" + i.uid + "/documents/" + identifier,
|
|
method: http.MethodGet,
|
|
withRequest: nil,
|
|
withResponse: documentPtr,
|
|
withQueryParams: map[string]string{},
|
|
acceptedStatusCodes: []int{http.StatusOK},
|
|
functionName: "GetDocument",
|
|
}
|
|
if request != nil {
|
|
if len(request.Fields) != 0 {
|
|
req.withQueryParams["fields"] = strings.Join(request.Fields, ",")
|
|
}
|
|
}
|
|
if err := i.client.executeRequest(ctx, req); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (i *index) GetDocuments(param *DocumentsQuery, resp *DocumentsResult) error {
|
|
return i.GetDocumentsWithContext(context.Background(), param, resp)
|
|
}
|
|
|
|
func (i *index) GetDocumentsWithContext(ctx context.Context, param *DocumentsQuery, resp *DocumentsResult) error {
|
|
req := &internalRequest{
|
|
endpoint: "/indexes/" + i.uid + "/documents",
|
|
method: http.MethodGet,
|
|
contentType: contentTypeJSON,
|
|
withRequest: nil,
|
|
withResponse: resp,
|
|
withQueryParams: nil,
|
|
acceptedStatusCodes: []int{http.StatusOK},
|
|
functionName: "GetDocuments",
|
|
}
|
|
if param != nil && param.Filter == nil {
|
|
req.withQueryParams = map[string]string{}
|
|
if param.Limit != 0 {
|
|
req.withQueryParams["limit"] = strconv.FormatInt(param.Limit, 10)
|
|
}
|
|
if param.Offset != 0 {
|
|
req.withQueryParams["offset"] = strconv.FormatInt(param.Offset, 10)
|
|
}
|
|
if len(param.Fields) != 0 {
|
|
req.withQueryParams["fields"] = strings.Join(param.Fields, ",")
|
|
}
|
|
} else if param != nil && param.Filter != nil {
|
|
req.withRequest = param
|
|
req.method = http.MethodPost
|
|
req.endpoint = req.endpoint + "/fetch"
|
|
}
|
|
if err := i.client.executeRequest(ctx, req); err != nil {
|
|
return VersionErrorHintMessage(err, req)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (i *index) DeleteDocument(identifier string) (*TaskInfo, error) {
|
|
return i.DeleteDocumentWithContext(context.Background(), identifier)
|
|
}
|
|
|
|
func (i *index) DeleteDocumentWithContext(ctx context.Context, identifier string) (*TaskInfo, error) {
|
|
resp := new(TaskInfo)
|
|
req := &internalRequest{
|
|
endpoint: "/indexes/" + i.uid + "/documents/" + identifier,
|
|
method: http.MethodDelete,
|
|
withRequest: nil,
|
|
withResponse: resp,
|
|
acceptedStatusCodes: []int{http.StatusAccepted},
|
|
functionName: "DeleteDocument",
|
|
}
|
|
if err := i.client.executeRequest(ctx, req); err != nil {
|
|
return nil, err
|
|
}
|
|
return resp, nil
|
|
}
|
|
|
|
func (i *index) DeleteDocuments(identifiers []string) (*TaskInfo, error) {
|
|
return i.DeleteDocumentsWithContext(context.Background(), identifiers)
|
|
}
|
|
|
|
func (i *index) DeleteDocumentsWithContext(ctx context.Context, identifiers []string) (*TaskInfo, error) {
|
|
resp := new(TaskInfo)
|
|
req := &internalRequest{
|
|
endpoint: "/indexes/" + i.uid + "/documents/delete-batch",
|
|
method: http.MethodPost,
|
|
contentType: contentTypeJSON,
|
|
withRequest: identifiers,
|
|
withResponse: resp,
|
|
acceptedStatusCodes: []int{http.StatusAccepted},
|
|
functionName: "DeleteDocuments",
|
|
}
|
|
if err := i.client.executeRequest(ctx, req); err != nil {
|
|
return nil, err
|
|
}
|
|
return resp, nil
|
|
}
|
|
|
|
func (i *index) DeleteDocumentsByFilter(filter interface{}) (*TaskInfo, error) {
|
|
return i.DeleteDocumentsByFilterWithContext(context.Background(), filter)
|
|
}
|
|
|
|
func (i *index) DeleteDocumentsByFilterWithContext(ctx context.Context, filter interface{}) (*TaskInfo, error) {
|
|
resp := new(TaskInfo)
|
|
req := &internalRequest{
|
|
endpoint: "/indexes/" + i.uid + "/documents/delete",
|
|
method: http.MethodPost,
|
|
contentType: contentTypeJSON,
|
|
withRequest: map[string]interface{}{
|
|
"filter": filter,
|
|
},
|
|
withResponse: resp,
|
|
acceptedStatusCodes: []int{http.StatusAccepted},
|
|
functionName: "DeleteDocumentsByFilter",
|
|
}
|
|
if err := i.client.executeRequest(ctx, req); err != nil {
|
|
return nil, VersionErrorHintMessage(err, req)
|
|
}
|
|
return resp, nil
|
|
}
|
|
|
|
func (i *index) DeleteAllDocuments() (*TaskInfo, error) {
|
|
return i.DeleteAllDocumentsWithContext(context.Background())
|
|
}
|
|
|
|
func (i *index) DeleteAllDocumentsWithContext(ctx context.Context) (*TaskInfo, error) {
|
|
resp := new(TaskInfo)
|
|
req := &internalRequest{
|
|
endpoint: "/indexes/" + i.uid + "/documents",
|
|
method: http.MethodDelete,
|
|
withRequest: nil,
|
|
withResponse: resp,
|
|
acceptedStatusCodes: []int{http.StatusAccepted},
|
|
functionName: "DeleteAllDocuments",
|
|
}
|
|
if err := i.client.executeRequest(ctx, req); err != nil {
|
|
return nil, err
|
|
}
|
|
return resp, nil
|
|
}
|
|
|
|
func (i *index) addDocuments(ctx context.Context, documentsPtr interface{}, contentType string, options map[string]string) (resp *TaskInfo, err error) {
|
|
resp = new(TaskInfo)
|
|
endpoint := ""
|
|
if options == nil {
|
|
endpoint = "/indexes/" + i.uid + "/documents"
|
|
} else {
|
|
for key, val := range options {
|
|
if key == "primaryKey" {
|
|
i.primaryKey = val
|
|
}
|
|
}
|
|
endpoint = "/indexes/" + i.uid + "/documents?" + generateQueryForOptions(options)
|
|
}
|
|
req := &internalRequest{
|
|
endpoint: endpoint,
|
|
method: http.MethodPost,
|
|
contentType: contentType,
|
|
withRequest: documentsPtr,
|
|
withResponse: resp,
|
|
acceptedStatusCodes: []int{http.StatusAccepted},
|
|
functionName: "AddDocuments",
|
|
}
|
|
if err = i.client.executeRequest(ctx, req); err != nil {
|
|
return nil, err
|
|
}
|
|
return resp, nil
|
|
}
|
|
|
|
func (i *index) saveDocumentsFromReaderInBatches(ctx context.Context, documents io.Reader, batchSize int, documentsCsvFunc func(ctx context.Context, recs []byte, op *CsvDocumentsQuery) (resp *TaskInfo, err error), options *CsvDocumentsQuery) (resp []TaskInfo, err error) {
|
|
// Because of the possibility of multiline fields it's not safe to split
|
|
// into batches by lines, we'll have to parse the file and reassemble it
|
|
// into smaller parts. RFC 4180 compliant input with a header row is
|
|
// expected.
|
|
// Records are read and sent continuously to avoid reading all content
|
|
// into memory. However, this means that only part of the documents might
|
|
// be added successfully.
|
|
|
|
var (
|
|
responses []TaskInfo
|
|
header []string
|
|
records [][]string
|
|
)
|
|
|
|
r := csv.NewReader(documents)
|
|
for {
|
|
// Read CSV record (empty lines and comments are already skipped by csv.Reader)
|
|
record, err := r.Read()
|
|
if err == io.EOF {
|
|
break
|
|
}
|
|
if err != nil {
|
|
return nil, fmt.Errorf("could not read CSV record: %w", err)
|
|
}
|
|
|
|
// Store first record as header
|
|
if header == nil {
|
|
header = record
|
|
continue
|
|
}
|
|
|
|
// Add header record to every batch
|
|
if len(records) == 0 {
|
|
records = append(records, header)
|
|
}
|
|
|
|
records = append(records, record)
|
|
|
|
// After reaching batchSize (not counting the header record) assemble a CSV file and send records
|
|
if len(records) == batchSize+1 {
|
|
resp, err := sendCsvRecords(ctx, documentsCsvFunc, records, options)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
responses = append(responses, *resp)
|
|
records = nil
|
|
}
|
|
}
|
|
|
|
// Send remaining records as the last batch if there is any
|
|
if len(records) > 0 {
|
|
resp, err := sendCsvRecords(ctx, documentsCsvFunc, records, options)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
responses = append(responses, *resp)
|
|
}
|
|
|
|
return responses, nil
|
|
}
|
|
|
|
func (i *index) saveDocumentsInBatches(ctx context.Context, documentsPtr interface{}, batchSize int, documentFunc func(ctx context.Context, documentsPtr interface{}, primaryKey ...string) (resp *TaskInfo, err error), primaryKey ...string) (resp []TaskInfo, err error) {
|
|
arr := reflect.ValueOf(documentsPtr)
|
|
lenDocs := arr.Len()
|
|
numBatches := int(math.Ceil(float64(lenDocs) / float64(batchSize)))
|
|
resp = make([]TaskInfo, numBatches)
|
|
|
|
for j := 0; j < numBatches; j++ {
|
|
end := (j + 1) * batchSize
|
|
if end > lenDocs {
|
|
end = lenDocs
|
|
}
|
|
|
|
batch := arr.Slice(j*batchSize, end).Interface()
|
|
|
|
if len(primaryKey) != 0 {
|
|
respID, err := documentFunc(ctx, batch, primaryKey[0])
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
resp[j] = *respID
|
|
} else {
|
|
respID, err := documentFunc(ctx, batch)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
resp[j] = *respID
|
|
}
|
|
}
|
|
|
|
return resp, nil
|
|
}
|
|
|
|
func (i *index) updateDocuments(ctx context.Context, documentsPtr interface{}, contentType string, options map[string]string) (resp *TaskInfo, err error) {
|
|
resp = &TaskInfo{}
|
|
endpoint := ""
|
|
if options == nil {
|
|
endpoint = "/indexes/" + i.uid + "/documents"
|
|
} else {
|
|
for key, val := range options {
|
|
if key == "primaryKey" {
|
|
i.primaryKey = val
|
|
}
|
|
}
|
|
endpoint = "/indexes/" + i.uid + "/documents?" + generateQueryForOptions(options)
|
|
}
|
|
req := &internalRequest{
|
|
endpoint: endpoint,
|
|
method: http.MethodPut,
|
|
contentType: contentType,
|
|
withRequest: documentsPtr,
|
|
withResponse: resp,
|
|
acceptedStatusCodes: []int{http.StatusAccepted},
|
|
functionName: "UpdateDocuments",
|
|
}
|
|
if err = i.client.executeRequest(ctx, req); err != nil {
|
|
return nil, err
|
|
}
|
|
return resp, nil
|
|
}
|
|
|
|
func (i *index) updateDocumentsCsvFromReaderInBatches(ctx context.Context, documents io.Reader, batchSize int, options *CsvDocumentsQuery) (resp []TaskInfo, err error) {
|
|
return i.saveDocumentsFromReaderInBatches(ctx, documents, batchSize, i.UpdateDocumentsCsvWithContext, options)
|
|
}
|
|
|
|
func (i *index) updateDocumentsNdjsonFromReaderInBatches(ctx context.Context, documents io.Reader, batchSize int, primaryKey ...string) (resp []TaskInfo, err error) {
|
|
// NDJSON files supposed to contain a valid JSON document in each line, so
|
|
// it's safe to split by lines.
|
|
// Lines are read and sent continuously to avoid reading all content into
|
|
// memory. However, this means that only part of the documents might be
|
|
// added successfully.
|
|
|
|
sendNdjsonLines := func(lines []string) (*TaskInfo, error) {
|
|
b := new(bytes.Buffer)
|
|
for _, line := range lines {
|
|
_, err := b.WriteString(line)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("could not write NDJSON line: %w", err)
|
|
}
|
|
err = b.WriteByte('\n')
|
|
if err != nil {
|
|
return nil, fmt.Errorf("could not write NDJSON line: %w", err)
|
|
}
|
|
}
|
|
|
|
resp, err := i.UpdateDocumentsNdjsonWithContext(ctx, b.Bytes(), primaryKey...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return resp, nil
|
|
}
|
|
|
|
var (
|
|
responses []TaskInfo
|
|
lines []string
|
|
)
|
|
|
|
scanner := bufio.NewScanner(documents)
|
|
for scanner.Scan() {
|
|
line := strings.TrimSpace(scanner.Text())
|
|
|
|
// Skip empty lines (NDJSON might not allow this, but just to be sure)
|
|
if line == "" {
|
|
continue
|
|
}
|
|
|
|
lines = append(lines, line)
|
|
// After reaching batchSize send NDJSON lines
|
|
if len(lines) == batchSize {
|
|
resp, err := sendNdjsonLines(lines)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
responses = append(responses, *resp)
|
|
lines = nil
|
|
}
|
|
}
|
|
if err := scanner.Err(); err != nil {
|
|
return nil, fmt.Errorf("Could not read NDJSON: %w", err)
|
|
}
|
|
|
|
// Send remaining records as the last batch if there is any
|
|
if len(lines) > 0 {
|
|
resp, err := sendNdjsonLines(lines)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
responses = append(responses, *resp)
|
|
}
|
|
|
|
return responses, nil
|
|
}
|