1
0
Fork 0
telegraf/plugins/inputs/openweathermap/openweathermap.go

361 lines
9.1 KiB
Go
Raw Permalink Normal View History

//go:generate ../../../tools/readme_config_includer/generator
package openweathermap
import (
_ "embed"
"encoding/json"
"fmt"
"io"
"mime"
"net/http"
"net/url"
"strconv"
"strings"
"sync"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/plugins/inputs"
)
//go:embed sample.conf
var sampleConfig string
// https://openweathermap.org/current#severalid
// Limit for the number of city IDs per request.
const maxIDsPerBatch int = 20
type OpenWeatherMap struct {
AppID string `toml:"app_id"`
CityID []string `toml:"city_id"`
Lang string `toml:"lang"`
Fetch []string `toml:"fetch"`
BaseURL string `toml:"base_url"`
ResponseTimeout config.Duration `toml:"response_timeout"`
Units string `toml:"units"`
QueryStyle string `toml:"query_style"`
client *http.Client
cityIDBatches []string
baseParsedURL *url.URL
}
func (*OpenWeatherMap) SampleConfig() string {
return sampleConfig
}
func (n *OpenWeatherMap) Init() error {
// Set the default for the base-URL if not given
if n.BaseURL == "" {
n.BaseURL = "https://api.openweathermap.org/"
}
// Check the query-style setting
switch n.QueryStyle {
case "":
n.QueryStyle = "batch"
case "batch", "individual":
// Do nothing, those are valid
default:
return fmt.Errorf("unknown query-style: %s", n.QueryStyle)
}
// Check the unit setting
switch n.Units {
case "":
n.Units = "metric"
case "imperial", "standard", "metric":
// Do nothing, those are valid
default:
return fmt.Errorf("unknown units: %s", n.Units)
}
// Check the language setting
switch n.Lang {
case "":
n.Lang = "en"
case "ar", "bg", "ca", "cz", "de", "el", "en", "fa", "fi", "fr", "gl",
"hr", "hu", "it", "ja", "kr", "la", "lt", "mk", "nl", "pl",
"pt", "ro", "ru", "se", "sk", "sl", "es", "tr", "ua", "vi",
"zh_cn", "zh_tw":
// Do nothing, those are valid
default:
return fmt.Errorf("unknown language: %s", n.Lang)
}
// Check the properties to fetch
if len(n.Fetch) == 0 {
n.Fetch = []string{"weather", "forecast"}
}
for _, fetch := range n.Fetch {
switch fetch {
case "forecast", "weather":
// Do nothing, those are valid
default:
return fmt.Errorf("unknown property to fetch: %s", fetch)
}
}
// Split the city IDs into batches smaller than the maximum size
nBatches := len(n.CityID) / maxIDsPerBatch
if len(n.CityID)%maxIDsPerBatch != 0 {
nBatches++
}
batches := make([][]string, nBatches)
for i, id := range n.CityID {
batch := i / maxIDsPerBatch
batches[batch] = append(batches[batch], id)
}
n.cityIDBatches = make([]string, 0, nBatches)
for _, batch := range batches {
n.cityIDBatches = append(n.cityIDBatches, strings.Join(batch, ","))
}
// Parse the base-URL used later to construct the property API endpoint
u, err := url.Parse(n.BaseURL)
if err != nil {
return err
}
n.baseParsedURL = u
// Create an HTTP client to be used in each collection interval
n.client = &http.Client{
Transport: &http.Transport{},
Timeout: time.Duration(n.ResponseTimeout),
}
return nil
}
func (n *OpenWeatherMap) Gather(acc telegraf.Accumulator) error {
var wg sync.WaitGroup
for _, fetch := range n.Fetch {
switch fetch {
case "forecast":
for _, cityID := range n.CityID {
wg.Add(1)
go func(city string) {
defer wg.Done()
acc.AddError(n.gatherForecast(acc, city))
}(cityID)
}
case "weather":
switch n.QueryStyle {
case "individual":
for _, cityID := range n.CityID {
wg.Add(1)
go func(city string) {
defer wg.Done()
acc.AddError(n.gatherWeather(acc, city))
}(cityID)
}
case "batch":
for _, cityIDs := range n.cityIDBatches {
wg.Add(1)
go func(cities string) {
defer wg.Done()
acc.AddError(n.gatherWeatherBatch(acc, cities))
}(cityIDs)
}
}
}
}
wg.Wait()
return nil
}
func (n *OpenWeatherMap) gatherWeather(acc telegraf.Accumulator, city string) error {
// Query the data and decode the response
addr := n.formatURL("/data/2.5/weather", city)
buf, err := n.gatherURL(addr)
if err != nil {
return fmt.Errorf("querying %q failed: %w", addr, err)
}
var e weatherEntry
if err := json.Unmarshal(buf, &e); err != nil {
return fmt.Errorf("parsing JSON response failed: %w", err)
}
// Construct the metric
tm := time.Unix(e.Dt, 0)
fields := map[string]interface{}{
"cloudiness": e.Clouds.All,
"humidity": e.Main.Humidity,
"pressure": e.Main.Pressure,
"rain": e.rain(),
"snow": e.snow(),
"sunrise": time.Unix(e.Sys.Sunrise, 0).UnixNano(),
"sunset": time.Unix(e.Sys.Sunset, 0).UnixNano(),
"temperature": e.Main.Temp,
"feels_like": e.Main.Feels,
"visibility": e.Visibility,
"wind_degrees": e.Wind.Deg,
"wind_speed": e.Wind.Speed,
}
tags := map[string]string{
"city": e.Name,
"city_id": strconv.FormatInt(e.ID, 10),
"country": e.Sys.Country,
"forecast": "*",
}
if len(e.Weather) > 0 {
fields["condition_description"] = e.Weather[0].Description
fields["condition_icon"] = e.Weather[0].Icon
tags["condition_id"] = strconv.FormatInt(e.Weather[0].ID, 10)
tags["condition_main"] = e.Weather[0].Main
}
acc.AddFields("weather", fields, tags, tm)
return nil
}
func (n *OpenWeatherMap) gatherWeatherBatch(acc telegraf.Accumulator, cities string) error {
// Query the data and decode the response
addr := n.formatURL("/data/2.5/group", cities)
buf, err := n.gatherURL(addr)
if err != nil {
return fmt.Errorf("querying %q failed: %w", addr, err)
}
var status status
if err := json.Unmarshal(buf, &status); err != nil {
return fmt.Errorf("parsing JSON response failed: %w", err)
}
// Construct the metrics
for _, e := range status.List {
tm := time.Unix(e.Dt, 0)
fields := map[string]interface{}{
"cloudiness": e.Clouds.All,
"humidity": e.Main.Humidity,
"pressure": e.Main.Pressure,
"rain": e.rain(),
"snow": e.snow(),
"sunrise": time.Unix(e.Sys.Sunrise, 0).UnixNano(),
"sunset": time.Unix(e.Sys.Sunset, 0).UnixNano(),
"temperature": e.Main.Temp,
"feels_like": e.Main.Feels,
"visibility": e.Visibility,
"wind_degrees": e.Wind.Deg,
"wind_speed": e.Wind.Speed,
}
tags := map[string]string{
"city": e.Name,
"city_id": strconv.FormatInt(e.ID, 10),
"country": e.Sys.Country,
"forecast": "*",
}
if len(e.Weather) > 0 {
fields["condition_description"] = e.Weather[0].Description
fields["condition_icon"] = e.Weather[0].Icon
tags["condition_id"] = strconv.FormatInt(e.Weather[0].ID, 10)
tags["condition_main"] = e.Weather[0].Main
}
acc.AddFields("weather", fields, tags, tm)
}
return nil
}
func (n *OpenWeatherMap) gatherForecast(acc telegraf.Accumulator, city string) error {
// Query the data and decode the response
addr := n.formatURL("/data/2.5/forecast", city)
buf, err := n.gatherURL(addr)
if err != nil {
return fmt.Errorf("querying %q failed: %w", addr, err)
}
var status status
if err := json.Unmarshal(buf, &status); err != nil {
return fmt.Errorf("parsing JSON response failed: %w", err)
}
// Construct the metric
tags := map[string]string{
"city_id": strconv.FormatInt(status.City.ID, 10),
"forecast": "*",
"city": status.City.Name,
"country": status.City.Country,
}
for i, e := range status.List {
tm := time.Unix(e.Dt, 0)
fields := map[string]interface{}{
"cloudiness": e.Clouds.All,
"humidity": e.Main.Humidity,
"pressure": e.Main.Pressure,
"rain": e.rain(),
"snow": e.snow(),
"temperature": e.Main.Temp,
"feels_like": e.Main.Feels,
"wind_degrees": e.Wind.Deg,
"wind_speed": e.Wind.Speed,
}
if len(e.Weather) > 0 {
fields["condition_description"] = e.Weather[0].Description
fields["condition_icon"] = e.Weather[0].Icon
tags["condition_id"] = strconv.FormatInt(e.Weather[0].ID, 10)
tags["condition_main"] = e.Weather[0].Main
}
tags["forecast"] = fmt.Sprintf("%dh", (i+1)*3)
acc.AddFields("weather", fields, tags, tm)
}
return nil
}
func (n *OpenWeatherMap) formatURL(path, city string) string {
v := url.Values{
"id": []string{city},
"APPID": []string{n.AppID},
"lang": []string{n.Lang},
"units": []string{n.Units},
}
relative := &url.URL{
Path: path,
RawQuery: v.Encode(),
}
return n.baseParsedURL.ResolveReference(relative).String()
}
func (n *OpenWeatherMap) gatherURL(addr string) ([]byte, error) {
resp, err := n.client.Get(addr)
if err != nil {
return nil, fmt.Errorf("error making HTTP request to %q: %w", addr, err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("%s returned HTTP status %s", addr, resp.Status)
}
mediaType, _, err := mime.ParseMediaType(resp.Header.Get("Content-Type"))
if err != nil {
return nil, err
}
if mediaType != "application/json" {
return nil, fmt.Errorf("%s returned unexpected content type %s", addr, mediaType)
}
return io.ReadAll(resp.Body)
}
func init() {
inputs.Add("openweathermap", func() telegraf.Input {
return &OpenWeatherMap{
ResponseTimeout: config.Duration(5 * time.Second),
}
})
}