1
0
Fork 0
golang-github-meilisearch-m.../index_document.go
Daniel Baumann 5d4914ed7f
Adding upstream version 0.31.1.
Signed-off-by: Daniel Baumann <daniel@debian.org>
2025-05-18 21:42:39 +02:00

615 lines
22 KiB
Go

package meilisearch
import (
"bufio"
"bytes"
"context"
"encoding/csv"
"fmt"
"io"
"math"
"net/http"
"reflect"
"strconv"
"strings"
)
func (i *index) AddDocuments(documentsPtr interface{}, primaryKey ...string) (*TaskInfo, error) {
return i.AddDocumentsWithContext(context.Background(), documentsPtr, primaryKey...)
}
func (i *index) AddDocumentsWithContext(ctx context.Context, documentsPtr interface{}, primaryKey ...string) (*TaskInfo, error) {
return i.addDocuments(ctx, documentsPtr, contentTypeJSON, transformStringVariadicToMap(primaryKey...))
}
func (i *index) AddDocumentsInBatches(documentsPtr interface{}, batchSize int, primaryKey ...string) ([]TaskInfo, error) {
return i.AddDocumentsInBatchesWithContext(context.Background(), documentsPtr, batchSize, primaryKey...)
}
func (i *index) AddDocumentsInBatchesWithContext(ctx context.Context, documentsPtr interface{}, batchSize int, primaryKey ...string) ([]TaskInfo, error) {
return i.saveDocumentsInBatches(ctx, documentsPtr, batchSize, i.AddDocumentsWithContext, primaryKey...)
}
func (i *index) AddDocumentsCsv(documents []byte, options *CsvDocumentsQuery) (*TaskInfo, error) {
return i.AddDocumentsCsvWithContext(context.Background(), documents, options)
}
func (i *index) AddDocumentsCsvWithContext(ctx context.Context, documents []byte, options *CsvDocumentsQuery) (*TaskInfo, error) {
// []byte avoids JSON conversion in Client.sendRequest()
return i.addDocuments(ctx, documents, contentTypeCSV, transformCsvDocumentsQueryToMap(options))
}
func (i *index) AddDocumentsCsvInBatches(documents []byte, batchSize int, options *CsvDocumentsQuery) ([]TaskInfo, error) {
return i.AddDocumentsCsvInBatchesWithContext(context.Background(), documents, batchSize, options)
}
func (i *index) AddDocumentsCsvInBatchesWithContext(ctx context.Context, documents []byte, batchSize int, options *CsvDocumentsQuery) ([]TaskInfo, error) {
// Reuse io.Reader implementation
return i.AddDocumentsCsvFromReaderInBatchesWithContext(ctx, bytes.NewReader(documents), batchSize, options)
}
func (i *index) AddDocumentsCsvFromReaderInBatches(documents io.Reader, batchSize int, options *CsvDocumentsQuery) (resp []TaskInfo, err error) {
return i.AddDocumentsCsvFromReaderInBatchesWithContext(context.Background(), documents, batchSize, options)
}
func (i *index) AddDocumentsCsvFromReaderInBatchesWithContext(ctx context.Context, documents io.Reader, batchSize int, options *CsvDocumentsQuery) (resp []TaskInfo, err error) {
return i.saveDocumentsFromReaderInBatches(ctx, documents, batchSize, i.AddDocumentsCsvWithContext, options)
}
func (i *index) AddDocumentsCsvFromReader(documents io.Reader, options *CsvDocumentsQuery) (resp *TaskInfo, err error) {
return i.AddDocumentsCsvFromReaderWithContext(context.Background(), documents, options)
}
func (i *index) AddDocumentsCsvFromReaderWithContext(ctx context.Context, documents io.Reader, options *CsvDocumentsQuery) (resp *TaskInfo, err error) {
// Using io.Reader would avoid JSON conversion in Client.sendRequest(), but
// read content to memory anyway because of problems with streamed bodies
data, err := io.ReadAll(documents)
if err != nil {
return nil, fmt.Errorf("could not read documents: %w", err)
}
return i.addDocuments(ctx, data, contentTypeCSV, transformCsvDocumentsQueryToMap(options))
}
func (i *index) AddDocumentsNdjson(documents []byte, primaryKey ...string) (*TaskInfo, error) {
return i.AddDocumentsNdjsonWithContext(context.Background(), documents, primaryKey...)
}
func (i *index) AddDocumentsNdjsonWithContext(ctx context.Context, documents []byte, primaryKey ...string) (*TaskInfo, error) {
// []byte avoids JSON conversion in Client.sendRequest()
return i.addDocuments(ctx, documents, contentTypeNDJSON, transformStringVariadicToMap(primaryKey...))
}
func (i *index) AddDocumentsNdjsonInBatches(documents []byte, batchSize int, primaryKey ...string) ([]TaskInfo, error) {
return i.AddDocumentsNdjsonInBatchesWithContext(context.Background(), documents, batchSize, primaryKey...)
}
func (i *index) AddDocumentsNdjsonInBatchesWithContext(ctx context.Context, documents []byte, batchSize int, primaryKey ...string) ([]TaskInfo, error) {
// Reuse io.Reader implementation
return i.AddDocumentsNdjsonFromReaderInBatchesWithContext(ctx, bytes.NewReader(documents), batchSize, primaryKey...)
}
func (i *index) AddDocumentsNdjsonFromReaderInBatches(documents io.Reader, batchSize int, primaryKey ...string) (resp []TaskInfo, err error) {
return i.AddDocumentsNdjsonFromReaderInBatchesWithContext(context.Background(), documents, batchSize, primaryKey...)
}
func (i *index) AddDocumentsNdjsonFromReaderInBatchesWithContext(ctx context.Context, documents io.Reader, batchSize int, primaryKey ...string) (resp []TaskInfo, err error) {
// NDJSON files supposed to contain a valid JSON document in each line, so
// it's safe to split by lines.
// Lines are read and sent continuously to avoid reading all content into
// memory. However, this means that only part of the documents might be
// added successfully.
sendNdjsonLines := func(lines []string) (*TaskInfo, error) {
b := new(bytes.Buffer)
for _, line := range lines {
_, err := b.WriteString(line)
if err != nil {
return nil, fmt.Errorf("could not write NDJSON line: %w", err)
}
err = b.WriteByte('\n')
if err != nil {
return nil, fmt.Errorf("could not write NDJSON line: %w", err)
}
}
resp, err := i.AddDocumentsNdjsonWithContext(ctx, b.Bytes(), primaryKey...)
if err != nil {
return nil, err
}
return resp, nil
}
var (
responses []TaskInfo
lines []string
)
scanner := bufio.NewScanner(documents)
for scanner.Scan() {
line := strings.TrimSpace(scanner.Text())
// Skip empty lines (NDJSON might not allow this, but just to be sure)
if line == "" {
continue
}
lines = append(lines, line)
// After reaching batchSize send NDJSON lines
if len(lines) == batchSize {
resp, err := sendNdjsonLines(lines)
if err != nil {
return nil, err
}
responses = append(responses, *resp)
lines = nil
}
}
if err := scanner.Err(); err != nil {
return nil, fmt.Errorf("could not read NDJSON: %w", err)
}
// Send remaining records as the last batch if there is any
if len(lines) > 0 {
resp, err := sendNdjsonLines(lines)
if err != nil {
return nil, err
}
responses = append(responses, *resp)
}
return responses, nil
}
func (i *index) AddDocumentsNdjsonFromReader(documents io.Reader, primaryKey ...string) (resp *TaskInfo, err error) {
return i.AddDocumentsNdjsonFromReaderWithContext(context.Background(), documents, primaryKey...)
}
func (i *index) AddDocumentsNdjsonFromReaderWithContext(ctx context.Context, documents io.Reader, primaryKey ...string) (resp *TaskInfo, err error) {
// Using io.Reader would avoid JSON conversion in Client.sendRequest(), but
// read content to memory anyway because of problems with streamed bodies
data, err := io.ReadAll(documents)
if err != nil {
return nil, fmt.Errorf("could not read documents: %w", err)
}
return i.addDocuments(ctx, data, contentTypeNDJSON, transformStringVariadicToMap(primaryKey...))
}
func (i *index) UpdateDocuments(documentsPtr interface{}, primaryKey ...string) (*TaskInfo, error) {
return i.UpdateDocumentsWithContext(context.Background(), documentsPtr, primaryKey...)
}
func (i *index) UpdateDocumentsWithContext(ctx context.Context, documentsPtr interface{}, primaryKey ...string) (*TaskInfo, error) {
return i.updateDocuments(ctx, documentsPtr, contentTypeJSON, transformStringVariadicToMap(primaryKey...))
}
func (i *index) UpdateDocumentsInBatches(documentsPtr interface{}, batchSize int, primaryKey ...string) ([]TaskInfo, error) {
return i.UpdateDocumentsInBatchesWithContext(context.Background(), documentsPtr, batchSize, primaryKey...)
}
func (i *index) UpdateDocumentsInBatchesWithContext(ctx context.Context, documentsPtr interface{}, batchSize int, primaryKey ...string) ([]TaskInfo, error) {
return i.saveDocumentsInBatches(ctx, documentsPtr, batchSize, i.UpdateDocumentsWithContext, primaryKey...)
}
func (i *index) UpdateDocumentsCsv(documents []byte, options *CsvDocumentsQuery) (*TaskInfo, error) {
return i.UpdateDocumentsCsvWithContext(context.Background(), documents, options)
}
func (i *index) UpdateDocumentsCsvWithContext(ctx context.Context, documents []byte, options *CsvDocumentsQuery) (*TaskInfo, error) {
return i.updateDocuments(ctx, documents, contentTypeCSV, transformCsvDocumentsQueryToMap(options))
}
func (i *index) UpdateDocumentsCsvInBatches(documents []byte, batchSize int, options *CsvDocumentsQuery) ([]TaskInfo, error) {
return i.UpdateDocumentsCsvInBatchesWithContext(context.Background(), documents, batchSize, options)
}
func (i *index) UpdateDocumentsCsvInBatchesWithContext(ctx context.Context, documents []byte, batchSize int, options *CsvDocumentsQuery) ([]TaskInfo, error) {
// Reuse io.Reader implementation
return i.updateDocumentsCsvFromReaderInBatches(ctx, bytes.NewReader(documents), batchSize, options)
}
func (i *index) UpdateDocumentsNdjson(documents []byte, primaryKey ...string) (*TaskInfo, error) {
return i.UpdateDocumentsNdjsonWithContext(context.Background(), documents, primaryKey...)
}
func (i *index) UpdateDocumentsNdjsonWithContext(ctx context.Context, documents []byte, primaryKey ...string) (*TaskInfo, error) {
return i.updateDocuments(ctx, documents, contentTypeNDJSON, transformStringVariadicToMap(primaryKey...))
}
func (i *index) UpdateDocumentsNdjsonInBatches(documents []byte, batchSize int, primaryKey ...string) ([]TaskInfo, error) {
return i.UpdateDocumentsNdjsonInBatchesWithContext(context.Background(), documents, batchSize, primaryKey...)
}
func (i *index) UpdateDocumentsNdjsonInBatchesWithContext(ctx context.Context, documents []byte, batchSize int, primaryKey ...string) ([]TaskInfo, error) {
return i.updateDocumentsNdjsonFromReaderInBatches(ctx, bytes.NewReader(documents), batchSize, primaryKey...)
}
func (i *index) UpdateDocumentsByFunction(req *UpdateDocumentByFunctionRequest) (*TaskInfo, error) {
return i.UpdateDocumentsByFunctionWithContext(context.Background(), req)
}
func (i *index) UpdateDocumentsByFunctionWithContext(ctx context.Context, req *UpdateDocumentByFunctionRequest) (*TaskInfo, error) {
resp := new(TaskInfo)
r := &internalRequest{
endpoint: "/indexes/" + i.uid + "/documents/edit",
method: http.MethodPost,
withRequest: req,
withResponse: resp,
contentType: contentTypeJSON,
acceptedStatusCodes: []int{http.StatusAccepted},
functionName: "UpdateDocumentsByFunction",
}
if err := i.client.executeRequest(ctx, r); err != nil {
return nil, err
}
return resp, nil
}
func (i *index) GetDocument(identifier string, request *DocumentQuery, documentPtr interface{}) error {
return i.GetDocumentWithContext(context.Background(), identifier, request, documentPtr)
}
func (i *index) GetDocumentWithContext(ctx context.Context, identifier string, request *DocumentQuery, documentPtr interface{}) error {
req := &internalRequest{
endpoint: "/indexes/" + i.uid + "/documents/" + identifier,
method: http.MethodGet,
withRequest: nil,
withResponse: documentPtr,
withQueryParams: map[string]string{},
acceptedStatusCodes: []int{http.StatusOK},
functionName: "GetDocument",
}
if request != nil {
if len(request.Fields) != 0 {
req.withQueryParams["fields"] = strings.Join(request.Fields, ",")
}
}
if err := i.client.executeRequest(ctx, req); err != nil {
return err
}
return nil
}
func (i *index) GetDocuments(param *DocumentsQuery, resp *DocumentsResult) error {
return i.GetDocumentsWithContext(context.Background(), param, resp)
}
func (i *index) GetDocumentsWithContext(ctx context.Context, param *DocumentsQuery, resp *DocumentsResult) error {
req := &internalRequest{
endpoint: "/indexes/" + i.uid + "/documents",
method: http.MethodGet,
contentType: contentTypeJSON,
withRequest: nil,
withResponse: resp,
withQueryParams: nil,
acceptedStatusCodes: []int{http.StatusOK},
functionName: "GetDocuments",
}
if param != nil && param.Filter == nil {
req.withQueryParams = map[string]string{}
if param.Limit != 0 {
req.withQueryParams["limit"] = strconv.FormatInt(param.Limit, 10)
}
if param.Offset != 0 {
req.withQueryParams["offset"] = strconv.FormatInt(param.Offset, 10)
}
if len(param.Fields) != 0 {
req.withQueryParams["fields"] = strings.Join(param.Fields, ",")
}
} else if param != nil && param.Filter != nil {
req.withRequest = param
req.method = http.MethodPost
req.endpoint = req.endpoint + "/fetch"
}
if err := i.client.executeRequest(ctx, req); err != nil {
return VersionErrorHintMessage(err, req)
}
return nil
}
func (i *index) DeleteDocument(identifier string) (*TaskInfo, error) {
return i.DeleteDocumentWithContext(context.Background(), identifier)
}
func (i *index) DeleteDocumentWithContext(ctx context.Context, identifier string) (*TaskInfo, error) {
resp := new(TaskInfo)
req := &internalRequest{
endpoint: "/indexes/" + i.uid + "/documents/" + identifier,
method: http.MethodDelete,
withRequest: nil,
withResponse: resp,
acceptedStatusCodes: []int{http.StatusAccepted},
functionName: "DeleteDocument",
}
if err := i.client.executeRequest(ctx, req); err != nil {
return nil, err
}
return resp, nil
}
func (i *index) DeleteDocuments(identifiers []string) (*TaskInfo, error) {
return i.DeleteDocumentsWithContext(context.Background(), identifiers)
}
func (i *index) DeleteDocumentsWithContext(ctx context.Context, identifiers []string) (*TaskInfo, error) {
resp := new(TaskInfo)
req := &internalRequest{
endpoint: "/indexes/" + i.uid + "/documents/delete-batch",
method: http.MethodPost,
contentType: contentTypeJSON,
withRequest: identifiers,
withResponse: resp,
acceptedStatusCodes: []int{http.StatusAccepted},
functionName: "DeleteDocuments",
}
if err := i.client.executeRequest(ctx, req); err != nil {
return nil, err
}
return resp, nil
}
func (i *index) DeleteDocumentsByFilter(filter interface{}) (*TaskInfo, error) {
return i.DeleteDocumentsByFilterWithContext(context.Background(), filter)
}
func (i *index) DeleteDocumentsByFilterWithContext(ctx context.Context, filter interface{}) (*TaskInfo, error) {
resp := new(TaskInfo)
req := &internalRequest{
endpoint: "/indexes/" + i.uid + "/documents/delete",
method: http.MethodPost,
contentType: contentTypeJSON,
withRequest: map[string]interface{}{
"filter": filter,
},
withResponse: resp,
acceptedStatusCodes: []int{http.StatusAccepted},
functionName: "DeleteDocumentsByFilter",
}
if err := i.client.executeRequest(ctx, req); err != nil {
return nil, VersionErrorHintMessage(err, req)
}
return resp, nil
}
func (i *index) DeleteAllDocuments() (*TaskInfo, error) {
return i.DeleteAllDocumentsWithContext(context.Background())
}
func (i *index) DeleteAllDocumentsWithContext(ctx context.Context) (*TaskInfo, error) {
resp := new(TaskInfo)
req := &internalRequest{
endpoint: "/indexes/" + i.uid + "/documents",
method: http.MethodDelete,
withRequest: nil,
withResponse: resp,
acceptedStatusCodes: []int{http.StatusAccepted},
functionName: "DeleteAllDocuments",
}
if err := i.client.executeRequest(ctx, req); err != nil {
return nil, err
}
return resp, nil
}
func (i *index) addDocuments(ctx context.Context, documentsPtr interface{}, contentType string, options map[string]string) (resp *TaskInfo, err error) {
resp = new(TaskInfo)
endpoint := ""
if options == nil {
endpoint = "/indexes/" + i.uid + "/documents"
} else {
for key, val := range options {
if key == "primaryKey" {
i.primaryKey = val
}
}
endpoint = "/indexes/" + i.uid + "/documents?" + generateQueryForOptions(options)
}
req := &internalRequest{
endpoint: endpoint,
method: http.MethodPost,
contentType: contentType,
withRequest: documentsPtr,
withResponse: resp,
acceptedStatusCodes: []int{http.StatusAccepted},
functionName: "AddDocuments",
}
if err = i.client.executeRequest(ctx, req); err != nil {
return nil, err
}
return resp, nil
}
func (i *index) saveDocumentsFromReaderInBatches(ctx context.Context, documents io.Reader, batchSize int, documentsCsvFunc func(ctx context.Context, recs []byte, op *CsvDocumentsQuery) (resp *TaskInfo, err error), options *CsvDocumentsQuery) (resp []TaskInfo, err error) {
// Because of the possibility of multiline fields it's not safe to split
// into batches by lines, we'll have to parse the file and reassemble it
// into smaller parts. RFC 4180 compliant input with a header row is
// expected.
// Records are read and sent continuously to avoid reading all content
// into memory. However, this means that only part of the documents might
// be added successfully.
var (
responses []TaskInfo
header []string
records [][]string
)
r := csv.NewReader(documents)
for {
// Read CSV record (empty lines and comments are already skipped by csv.Reader)
record, err := r.Read()
if err == io.EOF {
break
}
if err != nil {
return nil, fmt.Errorf("could not read CSV record: %w", err)
}
// Store first record as header
if header == nil {
header = record
continue
}
// Add header record to every batch
if len(records) == 0 {
records = append(records, header)
}
records = append(records, record)
// After reaching batchSize (not counting the header record) assemble a CSV file and send records
if len(records) == batchSize+1 {
resp, err := sendCsvRecords(ctx, documentsCsvFunc, records, options)
if err != nil {
return nil, err
}
responses = append(responses, *resp)
records = nil
}
}
// Send remaining records as the last batch if there is any
if len(records) > 0 {
resp, err := sendCsvRecords(ctx, documentsCsvFunc, records, options)
if err != nil {
return nil, err
}
responses = append(responses, *resp)
}
return responses, nil
}
func (i *index) saveDocumentsInBatches(ctx context.Context, documentsPtr interface{}, batchSize int, documentFunc func(ctx context.Context, documentsPtr interface{}, primaryKey ...string) (resp *TaskInfo, err error), primaryKey ...string) (resp []TaskInfo, err error) {
arr := reflect.ValueOf(documentsPtr)
lenDocs := arr.Len()
numBatches := int(math.Ceil(float64(lenDocs) / float64(batchSize)))
resp = make([]TaskInfo, numBatches)
for j := 0; j < numBatches; j++ {
end := (j + 1) * batchSize
if end > lenDocs {
end = lenDocs
}
batch := arr.Slice(j*batchSize, end).Interface()
if len(primaryKey) != 0 {
respID, err := documentFunc(ctx, batch, primaryKey[0])
if err != nil {
return nil, err
}
resp[j] = *respID
} else {
respID, err := documentFunc(ctx, batch)
if err != nil {
return nil, err
}
resp[j] = *respID
}
}
return resp, nil
}
func (i *index) updateDocuments(ctx context.Context, documentsPtr interface{}, contentType string, options map[string]string) (resp *TaskInfo, err error) {
resp = &TaskInfo{}
endpoint := ""
if options == nil {
endpoint = "/indexes/" + i.uid + "/documents"
} else {
for key, val := range options {
if key == "primaryKey" {
i.primaryKey = val
}
}
endpoint = "/indexes/" + i.uid + "/documents?" + generateQueryForOptions(options)
}
req := &internalRequest{
endpoint: endpoint,
method: http.MethodPut,
contentType: contentType,
withRequest: documentsPtr,
withResponse: resp,
acceptedStatusCodes: []int{http.StatusAccepted},
functionName: "UpdateDocuments",
}
if err = i.client.executeRequest(ctx, req); err != nil {
return nil, err
}
return resp, nil
}
func (i *index) updateDocumentsCsvFromReaderInBatches(ctx context.Context, documents io.Reader, batchSize int, options *CsvDocumentsQuery) (resp []TaskInfo, err error) {
return i.saveDocumentsFromReaderInBatches(ctx, documents, batchSize, i.UpdateDocumentsCsvWithContext, options)
}
func (i *index) updateDocumentsNdjsonFromReaderInBatches(ctx context.Context, documents io.Reader, batchSize int, primaryKey ...string) (resp []TaskInfo, err error) {
// NDJSON files supposed to contain a valid JSON document in each line, so
// it's safe to split by lines.
// Lines are read and sent continuously to avoid reading all content into
// memory. However, this means that only part of the documents might be
// added successfully.
sendNdjsonLines := func(lines []string) (*TaskInfo, error) {
b := new(bytes.Buffer)
for _, line := range lines {
_, err := b.WriteString(line)
if err != nil {
return nil, fmt.Errorf("could not write NDJSON line: %w", err)
}
err = b.WriteByte('\n')
if err != nil {
return nil, fmt.Errorf("could not write NDJSON line: %w", err)
}
}
resp, err := i.UpdateDocumentsNdjsonWithContext(ctx, b.Bytes(), primaryKey...)
if err != nil {
return nil, err
}
return resp, nil
}
var (
responses []TaskInfo
lines []string
)
scanner := bufio.NewScanner(documents)
for scanner.Scan() {
line := strings.TrimSpace(scanner.Text())
// Skip empty lines (NDJSON might not allow this, but just to be sure)
if line == "" {
continue
}
lines = append(lines, line)
// After reaching batchSize send NDJSON lines
if len(lines) == batchSize {
resp, err := sendNdjsonLines(lines)
if err != nil {
return nil, err
}
responses = append(responses, *resp)
lines = nil
}
}
if err := scanner.Err(); err != nil {
return nil, fmt.Errorf("Could not read NDJSON: %w", err)
}
// Send remaining records as the last batch if there is any
if len(lines) > 0 {
resp, err := sendNdjsonLines(lines)
if err != nil {
return nil, err
}
responses = append(responses, *resp)
}
return responses, nil
}