Page MenuHomePhabricator
Paste P4064

statsd_exporter upstream relay
ActivePublic

Authored by fgiunchedi on Sep 16 2016, 12:02 PM.
Tags
None
Referenced Files
F4475141: statsd_exporter upstream relay
Sep 16 2016, 12:02 PM
Subscribers
None
diff --git a/exporter.go b/exporter.go
index 02a9a0e..bff756c 100644
--- a/exporter.go
+++ b/exporter.go
@@ -284,7 +284,8 @@ func NewExporter(mapper *metricMapper, addSuffix bool) *Exporter {
}
type StatsDListener struct {
- conn *net.UDPConn
+ conn *net.UDPConn
+ relay_conns []*net.UDPConn
}
func buildEvent(statType, metric string, value float64, labels map[string]string) (Event, error) {
@@ -314,13 +315,32 @@ func buildEvent(statType, metric string, value float64, labels map[string]string
}
}
+func (l *StatsDListener) AddRelay(conn *net.UDPConn) {
+ l.relay_conns = append(l.relay_conns, conn)
+}
+
func (l *StatsDListener) Listen(e chan<- Events) {
+ relay_chan := make(chan []byte)
+ defer close(relay_chan)
+
+ go func(c chan []byte) {
+ for in_bytes := range c {
+ for _, conn := range l.relay_conns {
+ _, err := conn.Write(in_bytes)
+ if err != nil {
+ log.Warn(err)
+ }
+ }
+ }
+ }(relay_chan)
+
buf := make([]byte, 65535)
for {
n, _, err := l.conn.ReadFromUDP(buf)
if err != nil {
log.Fatal(err)
}
+ relay_chan <- buf[0:n]
l.handlePacket(buf[0:n], e)
}
}
@@ -366,7 +386,8 @@ func (l *StatsDListener) handlePacket(packet []byte, e chan<- Events) {
} else {
samples = strings.Split(elements[1], ":")
}
- samples: for _, sample := range samples {
+ samples:
+ for _, sample := range samples {
components := strings.Split(sample, "|")
samplingFactor := 1.0
if len(components) < 2 || len(components) > 4 {
diff --git a/main.go b/main.go
index 4242ef4..a36db77 100644
--- a/main.go
+++ b/main.go
@@ -35,6 +35,7 @@ var (
listenAddress = flag.String("web.listen-address", ":9102", "The address on which to expose the web interface and generated Prometheus metrics.")
metricsEndpoint = flag.String("web.telemetry-path", "/metrics", "Path under which to expose metrics.")
statsdListenAddress = flag.String("statsd.listen-address", ":9125", "The UDP address on which to receive statsd metric lines.")
+ statsdRelayAddress = flag.String("statsd.relay-address", "", "The address on which to relay received metrics.")
mappingConfig = flag.String("statsd.mapping-config", "", "Metric mapping configuration file name.")
readBuffer = flag.Int("statsd.read-buffer", 0, "Size (in bytes) of the operating system's transmit read buffer associated with the UDP connection. Please make sure the kernel parameters net.core.rmem_max is set to a value greater than the value specified.")
addSuffix = flag.Bool("statsd.add-suffix", true, "Add the metric type (counter/gauge/timer) as suffix to the generated Prometheus metric (NOT recommended, but set by default for backward compatibility).")
@@ -149,6 +150,16 @@ func main() {
}
l := &StatsDListener{conn: conn}
+
+ if *statsdRelayAddress != "" {
+ relayAddr := udpAddrFromString(*statsdRelayAddress)
+ relayConn, err := net.DialUDP("udp", nil, relayAddr)
+ if err != nil {
+ log.Fatal(err)
+ }
+ l.AddRelay(relayConn)
+ }
+
go l.Listen(events)
mapper := &metricMapper{}