template(name="syslog_json" type="list" option.jsonf="on") { property(name="timereported" outname="timestamp" dateformat="rfc3339" format="jsonf") property(name="msg" outname="message" format="jsonf") property(name="hostname" outname="host" format="jsonf") property(name="hostname" outname="logsource" format="jsonf") property(name="syslogseverity-text" outname="severity" format="jsonf") property(name="syslogfacility-text" outname="facility" format="jsonf") property(name="programname" outname="program" format="jsonf") } template(name="ecs_170" type="list" option.jsonf="on") { property(name="timereported" outname="timestamp" dateformat="rfc3339" format="jsonf") property(name="msg" outname="message" format="jsonf") property(name="hostname" outname="host.name" format="jsonf") property(name="syslogseverity" outname="log.syslog.severity.code" format="jsonf") property(name="syslogseverity-text" outname="log.syslog.severity.name" format="jsonf") property(name="syslogseverity-text" outname="log.level" format="jsonf") property(name="syslogfacility" outname="log.syslog.facility.code" format="jsonf") property(name="syslogfacility-text" outname="log.syslog.facility.name" format="jsonf") property(name="pri" outname="log.syslog.priority" format="jsonf") property(name="programname" outname="service.type" format="jsonf") constant(value="1.7.0" outname="ecs.version" format="jsonf") } # Append all properties found by mmjsonparse to "standard" syslog_json fields # The template cannot use option.json="on" or "$!all-json" property would be json-escaped template(name="syslog_cee" type="list") { constant(value="{") property(name="timereported" outname="timestamp" format="jsonf" dateformat="rfc3339") constant(value=", ") property(name="hostname" outname="logsource" format="jsonf") constant(value=", ") property(name="hostname" outname="host" format="jsonf") constant(value=", ") property(name="programname" outname="program" format="jsonf") constant(value=", ") property(name="syslogseverity-text" outname="severity" format="jsonf") constant(value=", ") property(name="syslogfacility-text" outname="facility" format="jsonf") constant(value=", ") # Add the kubernetes-specific data we're collecting constant(value="\"kubernetes.host\":\"") constant(value=`echo $KUBERNETES_NODE`) constant(value="\", ") constant(value="\"kubernetes.namespace_name\":\"") constant(value=`echo $KUBERNETES_NAMESPACE`) constant(value="\", ") constant(value="\"kubernetes.pod_name\":\"") constant(value=`echo $KUBERNETES_POD_NAME`) constant(value="\", ") # Skip leading curly brace property(name="$!all-json" position.from="2") } module(load="imudp") module(load="mmjsonparse") module(load="omkafka") template(name="udp_localhost_topic" type="string" string="udp_localhost-%syslogseverity-text:::lowercase%") # Use a separate (in memory) queue to limit message processing to this ruleset only. ruleset(name="udp_localhost_to_kafka" queue.type="LinkedList") { action(type="mmjsonparse" name="mmjsonparse_udp_localhost") action(type="omkafka" broker=["192.168.1.53:9002"] topic="udp_localhost_topic" dynatopic="on" dynatopic.cachesize="1000" partitions.auto="on" template="syslog_cee" queue.type="LinkedList" queue.size="10000" queue.filename="udp_localhost_compat" queue.highWatermark="7000" queue.lowWatermark="6000" queue.checkpointInterval="5" confParam=[ "security.protocol=ssl", "ssl.ca.location=/usr/share/ca-certificates/wikimedia/Puppet_Internal_CA.crt", "compression.codec=snappy", "socket.timeout.ms=60000", "socket.keepalive.enable=true", "queue.buffering.max.ms=50", "batch.num.messages=1000" ] ) } input(type="imudp" port="10514" address="localhost" ruleset="udp_localhost_to_kafka")