1
0
Fork 0
golang-github-meilisearch-m.../client.go
Daniel Baumann 5d4914ed7f
Adding upstream version 0.31.1.
Signed-off-by: Daniel Baumann <daniel@debian.org>
2025-05-18 21:42:39 +02:00

357 lines
8.5 KiB
Go

package meilisearch
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"net/url"
"sync"
"time"
)
type client struct {
client *http.Client
host string
apiKey string
bufferPool *sync.Pool
encoder encoder
contentEncoding ContentEncoding
retryOnStatus map[int]bool
disableRetry bool
maxRetries uint8
retryBackoff func(attempt uint8) time.Duration
}
type clientConfig struct {
contentEncoding ContentEncoding
encodingCompressionLevel EncodingCompressionLevel
retryOnStatus map[int]bool
disableRetry bool
maxRetries uint8
}
type internalRequest struct {
endpoint string
method string
contentType string
withRequest interface{}
withResponse interface{}
withQueryParams map[string]string
acceptedStatusCodes []int
functionName string
}
func newClient(cli *http.Client, host, apiKey string, cfg clientConfig) *client {
c := &client{
client: cli,
host: host,
apiKey: apiKey,
bufferPool: &sync.Pool{
New: func() interface{} {
return new(bytes.Buffer)
},
},
disableRetry: cfg.disableRetry,
maxRetries: cfg.maxRetries,
retryOnStatus: cfg.retryOnStatus,
}
if c.retryOnStatus == nil {
c.retryOnStatus = map[int]bool{
502: true,
503: true,
504: true,
}
}
if !c.disableRetry && c.retryBackoff == nil {
c.retryBackoff = func(attempt uint8) time.Duration {
return time.Second * time.Duration(attempt)
}
}
if !cfg.contentEncoding.IsZero() {
c.contentEncoding = cfg.contentEncoding
c.encoder = newEncoding(cfg.contentEncoding, cfg.encodingCompressionLevel)
}
return c
}
func (c *client) executeRequest(ctx context.Context, req *internalRequest) error {
internalError := &Error{
Endpoint: req.endpoint,
Method: req.method,
Function: req.functionName,
RequestToString: "empty request",
ResponseToString: "empty response",
MeilisearchApiError: meilisearchApiError{
Message: "empty meilisearch message",
},
StatusCodeExpected: req.acceptedStatusCodes,
encoder: c.encoder,
}
resp, err := c.sendRequest(ctx, req, internalError)
if err != nil {
return err
}
defer func() {
_ = resp.Body.Close()
}()
internalError.StatusCode = resp.StatusCode
b, err := io.ReadAll(resp.Body)
if err != nil {
return err
}
err = c.handleStatusCode(req, resp.StatusCode, b, internalError)
if err != nil {
return err
}
err = c.handleResponse(req, b, internalError)
if err != nil {
return err
}
return nil
}
func (c *client) sendRequest(
ctx context.Context,
req *internalRequest,
internalError *Error,
) (*http.Response, error) {
apiURL, err := url.Parse(c.host + req.endpoint)
if err != nil {
return nil, fmt.Errorf("unable to parse url: %w", err)
}
if req.withQueryParams != nil {
query := apiURL.Query()
for key, value := range req.withQueryParams {
query.Set(key, value)
}
apiURL.RawQuery = query.Encode()
}
// Create request body
var body io.Reader = nil
if req.withRequest != nil {
if req.method == http.MethodGet || req.method == http.MethodHead {
return nil, ErrInvalidRequestMethod
}
if req.contentType == "" {
return nil, ErrRequestBodyWithoutContentType
}
rawRequest := req.withRequest
buf := c.bufferPool.Get().(*bytes.Buffer)
buf.Reset()
if b, ok := rawRequest.([]byte); ok {
buf.Write(b)
body = buf
} else if reader, ok := rawRequest.(io.Reader); ok {
// If the request body is an io.Reader then stream it directly
body = reader
} else {
// Otherwise convert it to JSON
var (
data []byte
err error
)
if marshaler, ok := rawRequest.(json.Marshaler); ok {
data, err = marshaler.MarshalJSON()
if err != nil {
return nil, internalError.WithErrCode(ErrCodeMarshalRequest,
fmt.Errorf("failed to marshal with MarshalJSON: %w", err))
}
if data == nil {
return nil, internalError.WithErrCode(ErrCodeMarshalRequest,
errors.New("MarshalJSON returned nil data"))
}
} else {
data, err = json.Marshal(rawRequest)
if err != nil {
return nil, internalError.WithErrCode(ErrCodeMarshalRequest,
fmt.Errorf("failed to marshal with json.Marshal: %w", err))
}
}
buf.Write(data)
body = buf
}
if !c.contentEncoding.IsZero() {
// Get the data from the buffer before encoding
var bufData []byte
if buf, ok := body.(*bytes.Buffer); ok {
bufData = buf.Bytes()
encodedBuf, err := c.encoder.Encode(bytes.NewReader(bufData))
if err != nil {
if buf, ok := body.(*bytes.Buffer); ok {
c.bufferPool.Put(buf)
}
return nil, internalError.WithErrCode(ErrCodeMarshalRequest,
fmt.Errorf("failed to encode request body: %w", err))
}
// Return the original buffer to the pool since we have a new one
if buf, ok := body.(*bytes.Buffer); ok {
c.bufferPool.Put(buf)
}
body = encodedBuf
}
}
}
// Create the HTTP request
request, err := http.NewRequestWithContext(ctx, req.method, apiURL.String(), body)
if err != nil {
return nil, fmt.Errorf("unable to create request: %w", err)
}
// adding request headers
if req.contentType != "" {
request.Header.Set("Content-Type", req.contentType)
}
if c.apiKey != "" {
request.Header.Set("Authorization", "Bearer "+c.apiKey)
}
if req.withResponse != nil && !c.contentEncoding.IsZero() {
request.Header.Set("Accept-Encoding", c.contentEncoding.String())
}
if req.withRequest != nil && !c.contentEncoding.IsZero() {
request.Header.Set("Content-Encoding", c.contentEncoding.String())
}
request.Header.Set("User-Agent", GetQualifiedVersion())
resp, err := c.do(request, internalError)
if err != nil {
return nil, err
}
if body != nil {
if buf, ok := body.(*bytes.Buffer); ok {
c.bufferPool.Put(buf)
}
}
return resp, nil
}
func (c *client) do(req *http.Request, internalError *Error) (resp *http.Response, err error) {
retriesCount := uint8(0)
for {
resp, err = c.client.Do(req)
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
return nil, internalError.WithErrCode(MeilisearchTimeoutError, err)
}
return nil, internalError.WithErrCode(MeilisearchCommunicationError, err)
}
// Exit if retries are disabled
if c.disableRetry {
break
}
// Check if response status is retryable and we haven't exceeded max retries
if c.retryOnStatus[resp.StatusCode] && retriesCount < c.maxRetries {
retriesCount++
// Close response body to prevent memory leaks
resp.Body.Close()
// Handle backoff with context cancellation support
backoff := c.retryBackoff(retriesCount)
timer := time.NewTimer(backoff)
select {
case <-req.Context().Done():
err := req.Context().Err()
timer.Stop()
return nil, internalError.WithErrCode(MeilisearchTimeoutError, err)
case <-timer.C:
// Retry after backoff
timer.Stop()
}
continue
}
break
}
// Return error if retries exceeded the maximum limit
if !c.disableRetry && retriesCount >= c.maxRetries {
return nil, internalError.WithErrCode(MeilisearchMaxRetriesExceeded, nil)
}
return resp, nil
}
func (c *client) handleStatusCode(req *internalRequest, statusCode int, body []byte, internalError *Error) error {
if req.acceptedStatusCodes != nil {
// A successful status code is required so check if the response status code is in the
// expected status code list.
for _, acceptedCode := range req.acceptedStatusCodes {
if statusCode == acceptedCode {
return nil
}
}
internalError.ErrorBody(body)
if internalError.MeilisearchApiError.Code == "" {
return internalError.WithErrCode(MeilisearchApiErrorWithoutMessage)
}
return internalError.WithErrCode(MeilisearchApiError)
}
return nil
}
func (c *client) handleResponse(req *internalRequest, body []byte, internalError *Error) (err error) {
if req.withResponse != nil {
if !c.contentEncoding.IsZero() {
if err := c.encoder.Decode(body, req.withResponse); err != nil {
return internalError.WithErrCode(ErrCodeResponseUnmarshalBody, err)
}
} else {
internalError.ResponseToString = string(body)
if internalError.ResponseToString == nullBody {
req.withResponse = nil
return nil
}
var err error
if resp, ok := req.withResponse.(json.Unmarshaler); ok {
err = resp.UnmarshalJSON(body)
req.withResponse = resp
} else {
err = json.Unmarshal(body, req.withResponse)
}
if err != nil {
return internalError.WithErrCode(ErrCodeResponseUnmarshalBody, err)
}
}
}
return nil
}