Added support for the InfluxDB version 2 or modern
This commit is contained in:
parent
0f26effd16
commit
96ddf180a9
3 changed files with 93 additions and 27 deletions
87
geoparser.py
87
geoparser.py
|
@ -16,6 +16,8 @@ import logging.handlers
|
||||||
import geoip2.database
|
import geoip2.database
|
||||||
import configparser
|
import configparser
|
||||||
from influxdb import InfluxDBClient
|
from influxdb import InfluxDBClient
|
||||||
|
from influxdb_client import InfluxDBClient as InfluxDBClient2
|
||||||
|
from influxdb_client.client.write_api import SYNCHRONOUS
|
||||||
from IPy import IP as ipadd
|
from IPy import IP as ipadd
|
||||||
import threading
|
import threading
|
||||||
|
|
||||||
|
@ -33,14 +35,20 @@ root = logging.getLogger(__name__)
|
||||||
root.setLevel(os.environ.get("LOGLEVEL", "INFO"))
|
root.setLevel(os.environ.get("LOGLEVEL", "INFO"))
|
||||||
root.addHandler(handler)
|
root.addHandler(handler)
|
||||||
|
|
||||||
def logparse(LOGPATH, WEBSITE, INFLUXHOST, INFLUXPORT, INFLUXDBDB, INFLUXUSER, INFLUXUSERPASS, MEASUREMENT, GEOIPDB, INODE): # NOQA
|
|
||||||
|
def logparse(LOGPATH, WEBSITE, MEASUREMENT, GEOIPDB, INODE, INFLUXDB_VERSION,
|
||||||
|
INFLUXHOST=None, INFLUXPORT=None, URL=None, INFLUXDBDB=None, INFLUXUSER=None,
|
||||||
|
INFLUXUSERPASS=None, INFLUXDBTOKEN=None, INFLUXDBBUCKET=None, INFLUXDBORG=None): # NOQA
|
||||||
# Preparing variables and params
|
# Preparing variables and params
|
||||||
IPS = {}
|
IPS = {}
|
||||||
COUNT = {}
|
COUNT = {}
|
||||||
GEOHASH = {}
|
GEOHASH = {}
|
||||||
HOSTNAME = os.uname()[1]
|
HOSTNAME = os.uname()[1]
|
||||||
CLIENT = InfluxDBClient(host=INFLUXHOST, port=INFLUXPORT,
|
if INFLUXDB_VERSION == "1":
|
||||||
username=INFLUXUSER, password=INFLUXUSERPASS, database=INFLUXDBDB) # NOQA
|
CLIENT = InfluxDBClient(host=INFLUXHOST, port=INFLUXPORT,
|
||||||
|
username=INFLUXUSER, password=INFLUXUSERPASS, database=INFLUXDBDB) # NOQA
|
||||||
|
elif INFLUXDB_VERSION == "2":
|
||||||
|
CLIENT = InfluxDBClient(url=URL, token=INFLUXDBTOKEN, org=INFLUXDBORG) # NOQA
|
||||||
|
|
||||||
re_IPV4 = re.compile('(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})')
|
re_IPV4 = re.compile('(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})')
|
||||||
re_IPV6 = re.compile('(([0-9a-fA-F]{1,4}:){7,7}[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,7}:|([0-9a-fA-F]{1,4}:){1,6}:[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,5}(:[0-9a-fA-F]{1,4}){1,2}|([0-9a-fA-F]{1,4}:){1,4}(:[0-9a-fA-F]{1,4}){1,3}|([0-9a-fA-F]{1,4}:){1,3}(:[0-9a-fA-F]{1,4}){1,4}|([0-9a-fA-F]{1,4}:){1,2}(:[0-9a-fA-F]{1,4}){1,5}|[0-9a-fA-F]{1,4}:((:[0-9a-fA-F]{1,4}){1,6})|:((:[0-9a-fA-F]{1,4}){1,7}|:)|fe80:(:[0-9a-fA-F]{0,4}){0,4}%[0-9a-zA-Z]{1,}|::(ffff(:0{1,4}){0,1}:){0,1}((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])|([0-9a-fA-F]{1,4}:){1,4}:((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9]))') # NOQA
|
re_IPV6 = re.compile('(([0-9a-fA-F]{1,4}:){7,7}[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,7}:|([0-9a-fA-F]{1,4}:){1,6}:[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,5}(:[0-9a-fA-F]{1,4}){1,2}|([0-9a-fA-F]{1,4}:){1,4}(:[0-9a-fA-F]{1,4}){1,3}|([0-9a-fA-F]{1,4}:){1,3}(:[0-9a-fA-F]{1,4}){1,4}|([0-9a-fA-F]{1,4}:){1,2}(:[0-9a-fA-F]{1,4}){1,5}|[0-9a-fA-F]{1,4}:((:[0-9a-fA-F]{1,4}){1,6})|:((:[0-9a-fA-F]{1,4}){1,7}|:)|fe80:(:[0-9a-fA-F]{0,4}){0,4}%[0-9a-zA-Z]{1,}|::(ffff(:0{1,4}){0,1}:){0,1}((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])|([0-9a-fA-F]{1,4}:){1,4}:((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9]))') # NOQA
|
||||||
|
@ -86,7 +94,11 @@ def logparse(LOGPATH, WEBSITE, INFLUXHOST, INFLUXPORT, INFLUXDBDB, INFLUXUSER, I
|
||||||
METRICS.append(IPS)
|
METRICS.append(IPS)
|
||||||
# Sending json data itto InfluxDB
|
# Sending json data itto InfluxDB
|
||||||
try:
|
try:
|
||||||
CLIENT.write_points(METRICS)
|
if INFLUXDB_VERSION == "1":
|
||||||
|
CLIENT.write_points(METRICS)
|
||||||
|
elif INFLUXDB_VERSION == "2":
|
||||||
|
write_api = CLIENT.write_api(write_options=SYNCHRONOUS) # NOQA
|
||||||
|
write_api.write(INFLUXDBBUCKET, INFLUXDBORG, record=METRICS) # NOQA
|
||||||
except Exception:
|
except Exception:
|
||||||
logging.exception("Cannot establish connection with InfluxDB server: ") # NOQA
|
logging.exception("Cannot establish connection with InfluxDB server: ") # NOQA
|
||||||
|
|
||||||
|
@ -97,15 +109,28 @@ def main():
|
||||||
CONFIG = configparser.ConfigParser()
|
CONFIG = configparser.ConfigParser()
|
||||||
CONFIG.read(f'{PWD}/settings.ini')
|
CONFIG.read(f'{PWD}/settings.ini')
|
||||||
|
|
||||||
# Getting params from config
|
# Get the InfluxDB version so we can parse only needed part of config
|
||||||
GEOIPDB = CONFIG.get('GEOIP', 'geoipdb')
|
INFLUXDB_VERSION = CONFIG.get('INFLUXDB_VERSION', 'version')
|
||||||
LOGPATH = CONFIG.get('NGINX_LOGS', 'logpath').split()
|
|
||||||
INFLUXHOST = CONFIG.get('INFLUXDB', 'host')
|
if INFLUXDB_VERSION == "1":
|
||||||
INFLUXPORT = CONFIG.get('INFLUXDB', 'port')
|
# Getting params from config for version 1
|
||||||
INFLUXDBDB = CONFIG.get('INFLUXDB', 'database')
|
GEOIPDB = CONFIG.get('GEOIP', 'geoipdb')
|
||||||
INFLUXUSER = CONFIG.get('INFLUXDB', 'username')
|
LOGPATH = CONFIG.get('NGINX_LOGS', 'logpath').split()
|
||||||
MEASUREMENT = CONFIG.get('INFLUXDB', 'measurement')
|
INFLUXHOST = CONFIG.get('INFLUXDB1', 'host')
|
||||||
INFLUXUSERPASS = CONFIG.get('INFLUXDB', 'password')
|
INFLUXPORT = CONFIG.get('INFLUXDB1', 'port')
|
||||||
|
INFLUXDBDB = CONFIG.get('INFLUXDB1', 'database')
|
||||||
|
INFLUXUSER = CONFIG.get('INFLUXDB1', 'username')
|
||||||
|
MEASUREMENT = CONFIG.get('INFLUXDB1', 'measurement')
|
||||||
|
INFLUXUSERPASS = CONFIG.get('INFLUXDB1', 'password')
|
||||||
|
elif INFLUXDB_VERSION == "2":
|
||||||
|
# Getting params from config for version 2
|
||||||
|
GEOIPDB = CONFIG.get('GEOIP', 'geoipdb')
|
||||||
|
LOGPATH = CONFIG.get('NGINX_LOGS', 'logpath').split()
|
||||||
|
URL = CONFIG.get('INFLUXDB2', 'url')
|
||||||
|
INFLUXDBTOKEN = CONFIG.get('INFLUXDB2', 'token')
|
||||||
|
INFLUXDBBUCKET = CONFIG.get('INFLUXDB2', 'bucket')
|
||||||
|
MEASUREMENT = CONFIG.get('INFLUXDB2', 'measurement')
|
||||||
|
INFLUXDBORG = CONFIG.get('INFLUXDB2', 'organization')
|
||||||
|
|
||||||
# Parsing log file and sending metrics to Influxdb
|
# Parsing log file and sending metrics to Influxdb
|
||||||
while True:
|
while True:
|
||||||
|
@ -121,17 +146,31 @@ def main():
|
||||||
logging.info('Nginx log file %s not found', log)
|
logging.info('Nginx log file %s not found', log)
|
||||||
print('Nginx log file %s not found' % log)
|
print('Nginx log file %s not found' % log)
|
||||||
return
|
return
|
||||||
# Run the main loop and grep data in separate threads
|
|
||||||
t = website
|
if INFLUXDB_VERSION == "1":
|
||||||
if os.path.exists(log):
|
# Run the main loop and grep data in separate threads
|
||||||
t = threading.Thread(target=logparse, args=[log, website, INFLUXHOST, INFLUXPORT, INFLUXDBDB, INFLUXUSER, INFLUXUSERPASS, MEASUREMENT, GEOIPDB, INODE], daemon=True, name=website) # NOQA
|
t = website
|
||||||
for thread in threading.enumerate():
|
if os.path.exists(log):
|
||||||
thread_names.append(thread.name)
|
t = threading.Thread(target=logparse, args=[log, website, INFLUXDB_VERSION, INFLUXHOST, INFLUXPORT, INFLUXDBDB, INFLUXUSER, INFLUXUSERPASS, MEASUREMENT, GEOIPDB, INODE], daemon=True, name=website) # NOQA
|
||||||
if website not in thread_names:
|
for thread in threading.enumerate():
|
||||||
t.start()
|
thread_names.append(thread.name)
|
||||||
else:
|
if website not in thread_names:
|
||||||
logging.info('Nginx log file %s not found', log)
|
t.start()
|
||||||
print('Nginx log file %s not found' % log)
|
else:
|
||||||
|
logging.info('Nginx log file %s not found', log)
|
||||||
|
print('Nginx log file %s not found' % log)
|
||||||
|
elif INFLUXDB_VERSION == "2":
|
||||||
|
# Run the main loop and grep data in separate threads
|
||||||
|
t = website
|
||||||
|
if os.path.exists(log):
|
||||||
|
t = threading.Thread(target=logparse, args=[log, website, INFLUXDB_VERSION, URL, INFLUXDBTOKEN, INFLUXDBBUCKET, INFLUXDBORG, MEASUREMENT, GEOIPDB, INODE], daemon=True, name=website) # NOQA
|
||||||
|
for thread in threading.enumerate():
|
||||||
|
thread_names.append(thread.name)
|
||||||
|
if website not in thread_names:
|
||||||
|
t.start()
|
||||||
|
else:
|
||||||
|
logging.info('Nginx log file %s not found', log)
|
||||||
|
print('Nginx log file %s not found' % log)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
|
|
|
@ -2,3 +2,4 @@ configparser==3.5.0
|
||||||
influxdb==5.2.0
|
influxdb==5.2.0
|
||||||
geoip2==2.9.0
|
geoip2==2.9.0
|
||||||
IPy==1.00
|
IPy==1.00
|
||||||
|
influxdb-client
|
||||||
|
|
|
@ -1,12 +1,20 @@
|
||||||
[NGINX_LOGS]
|
[NGINX_LOGS]
|
||||||
#Path for the log file (Nginx)
|
# Path for the log file (Nginx)
|
||||||
logpath = website1:/var/log/website1/access.log website2:/var/log/website2/access.log
|
logpath = website1:/var/log/website1/access.log website2:/var/log/website2/access.log
|
||||||
|
|
||||||
[GEOIP]
|
[GEOIP]
|
||||||
#Path for the GEOIP DB file
|
# Path for the GEOIP DB file
|
||||||
geoipdb = ./GeoLite2-City.mmdb
|
geoipdb = ./GeoLite2-City.mmdb
|
||||||
|
|
||||||
[INFLUXDB]
|
[INFLUXDB_VERSION]
|
||||||
|
# Version of the InfluxDB, 1 = old 1.8 and early, 2 = new 2.0 and more
|
||||||
|
# Set this parameter to 1 if you want use old InfluxDB version like 1.8
|
||||||
|
# Or set this parameter to 2 if you plan to use InfluxDB version 2.1 or modern
|
||||||
|
version = 1
|
||||||
|
|
||||||
|
[INFLUXDB1]
|
||||||
|
# This part of the config will be used only when [INFLUXDB_VERSION] set to 1
|
||||||
|
|
||||||
# Database URL
|
# Database URL
|
||||||
host = INFLUXDB_SERVER_IP
|
host = INFLUXDB_SERVER_IP
|
||||||
port = 8086
|
port = 8086
|
||||||
|
@ -20,3 +28,21 @@ password = INFLUXDB_USER_PASSWORD
|
||||||
|
|
||||||
# Measurement name
|
# Measurement name
|
||||||
measurement = geodata
|
measurement = geodata
|
||||||
|
|
||||||
|
[INFLUXDB2]
|
||||||
|
# This part of the config will be used only when [INFLUXDB_VERSION] set to 2
|
||||||
|
|
||||||
|
# InfluxDB server URL
|
||||||
|
url = INFLUXDB_SERVER_IP:PORT
|
||||||
|
|
||||||
|
# Token for authentication
|
||||||
|
token = ANY EXISTED USER TOKEN
|
||||||
|
|
||||||
|
# Organization is the name of the organization you wish to write to
|
||||||
|
organization = ORGANIZATION NAME
|
||||||
|
|
||||||
|
# Destination bucket to write into
|
||||||
|
bucket = SOME BUCKET name
|
||||||
|
|
||||||
|
# Measurement name
|
||||||
|
measurement = geodata
|
||||||
|
|
Loading…
Reference in a new issue