geostat/geoparser.py

191 lines
8.7 KiB
Python
Raw Normal View History

2018-10-08 22:25:40 +02:00
# Getting GEO information from Nginx access.log by IP's.
2018-10-08 17:13:00 +02:00
# Alexey Nizhegolenko 2018
# Parts added by Remko Lodder, 2019.
# Added: IPv6 matching, make query based on geoip2 instead of
# geoip, which is going away r.s.n.
# Added possibility of processing more than one Nginx log file,
# by adding threading support. 2022 July by Alexey Nizhegolenko
2018-10-08 17:13:00 +02:00
import os
import re
2018-10-08 22:25:40 +02:00
import sys
2018-10-08 17:13:00 +02:00
import time
import geohash
import logging
import logging.handlers
import geoip2.database
2018-10-08 17:13:00 +02:00
import configparser
2018-10-08 18:00:33 +02:00
from influxdb import InfluxDBClient
from influxdb_client import InfluxDBClient as InfluxDBClient2
from influxdb_client.client.write_api import SYNCHRONOUS
from IPy import IP as ipadd
import threading
2018-10-08 17:13:00 +02:00
class SyslogBOMFormatter(logging.Formatter):
def format(self, record):
result = super().format(record)
return "ufeff" + result
handler = logging.handlers.SysLogHandler('/dev/log')
formatter = SyslogBOMFormatter(logging.BASIC_FORMAT)
handler.setFormatter(formatter)
root = logging.getLogger(__name__)
root.setLevel(os.environ.get("LOGLEVEL", "INFO"))
root.addHandler(handler)
def logparse(LOGPATH, WEBSITE, MEASUREMENT, GEOIPDB, INODE, INFLUXDB_VERSION,
INFLUXHOST=None, INFLUXPORT=None, URL=None, INFLUXDBDB=None, INFLUXUSER=None,
INFLUXUSERPASS=None, INFLUXDBTOKEN=None, INFLUXDBBUCKET=None, INFLUXDBORG=None): # NOQA
2018-10-08 22:25:40 +02:00
# Preparing variables and params
IPS = {}
COUNT = {}
GEOHASH = {}
2018-10-12 20:38:06 +02:00
HOSTNAME = os.uname()[1]
if INFLUXDB_VERSION == "1":
CLIENT = InfluxDBClient(host=INFLUXHOST, port=INFLUXPORT,
username=INFLUXUSER, password=INFLUXUSERPASS, database=INFLUXDBDB) # NOQA
elif INFLUXDB_VERSION == "2":
2022-07-21 12:59:18 +02:00
CLIENT = InfluxDBClient2(url=URL, token=INFLUXDBTOKEN, org=INFLUXDBORG) # NOQA
re_IPV4 = re.compile('(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})')
re_IPV6 = re.compile('(([0-9a-fA-F]{1,4}:){7,7}[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,7}:|([0-9a-fA-F]{1,4}:){1,6}:[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,5}(:[0-9a-fA-F]{1,4}){1,2}|([0-9a-fA-F]{1,4}:){1,4}(:[0-9a-fA-F]{1,4}){1,3}|([0-9a-fA-F]{1,4}:){1,3}(:[0-9a-fA-F]{1,4}){1,4}|([0-9a-fA-F]{1,4}:){1,2}(:[0-9a-fA-F]{1,4}){1,5}|[0-9a-fA-F]{1,4}:((:[0-9a-fA-F]{1,4}){1,6})|:((:[0-9a-fA-F]{1,4}){1,7}|:)|fe80:(:[0-9a-fA-F]{0,4}){0,4}%[0-9a-zA-Z]{1,}|::(ffff(:0{1,4}){0,1}:){0,1}((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])|([0-9a-fA-F]{1,4}:){1,4}:((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9]))') # NOQA
GI = geoip2.database.Reader(GEOIPDB)
# Main loop that parses log file in tailf style with sending metrics out
2018-10-08 18:00:33 +02:00
with open(LOGPATH, "r") as FILE:
STR_RESULTS = os.stat(LOGPATH)
2018-10-08 17:13:00 +02:00
ST_SIZE = STR_RESULTS[6]
FILE.seek(ST_SIZE)
while True:
2018-10-08 20:09:42 +02:00
METRICS = []
2018-10-08 17:13:00 +02:00
WHERE = FILE.tell()
LINE = FILE.readline()
INODENEW = os.stat(LOGPATH).st_ino
if INODE != INODENEW:
return
2018-10-08 17:13:00 +02:00
if not LINE:
time.sleep(1)
FILE.seek(WHERE)
else:
if re_IPV4.match(LINE):
m = re_IPV4.match(LINE)
IP = m.group(1)
elif re_IPV6.match(LINE):
m = re_IPV6.match(LINE)
IP = m.group(1)
if ipadd(IP).iptype() == 'PUBLIC' and IP:
INFO = GI.city(IP)
2018-10-08 17:13:00 +02:00
if INFO is not None:
HASH = geohash.encode(INFO.location.latitude, INFO.location.longitude) # NOQA
2018-10-08 18:00:33 +02:00
COUNT['count'] = 1
2018-10-08 22:25:40 +02:00
GEOHASH['geohash'] = HASH
2018-10-12 20:38:06 +02:00
GEOHASH['host'] = HOSTNAME
GEOHASH['website'] = WEBSITE
GEOHASH['country_code'] = INFO.country.iso_code
GEOHASH['country_name'] = INFO.country.name
GEOHASH['city_name'] = INFO.city.name
2018-10-08 18:00:33 +02:00
IPS['tags'] = GEOHASH
IPS['fields'] = COUNT
2018-10-08 22:25:40 +02:00
IPS['measurement'] = MEASUREMENT
2018-10-08 20:07:59 +02:00
METRICS.append(IPS)
# Sending json data itto InfluxDB
try:
if INFLUXDB_VERSION == "1":
CLIENT.write_points(METRICS)
elif INFLUXDB_VERSION == "2":
write_api = CLIENT.write_api(write_options=SYNCHRONOUS) # NOQA
write_api.write(INFLUXDBBUCKET, INFLUXDBORG, record=METRICS) # NOQA
except Exception:
logging.exception("Cannot establish connection with InfluxDB server: ") # NOQA
2018-10-08 17:13:00 +02:00
2018-10-08 18:00:33 +02:00
2018-10-08 22:25:40 +02:00
def main():
# Preparing for reading the config file
2018-10-08 18:00:33 +02:00
PWD = os.path.abspath(os.path.dirname(os.path.realpath(__file__)))
CONFIG = configparser.ConfigParser()
CONFIG.read(f'{PWD}/settings.ini')
2022-07-21 12:59:18 +02:00
KWARGS1 = {}
KWARGS2 = {}
# Get the InfluxDB version so we can parse only needed part of config
INFLUXDB_VERSION = CONFIG.get('INFLUXDB_VERSION', 'version')
if INFLUXDB_VERSION == "1":
# Getting params from config for version 1
GEOIPDB = CONFIG.get('GEOIP', 'geoipdb')
LOGPATH = CONFIG.get('NGINX_LOGS', 'logpath').split()
INFLUXHOST = CONFIG.get('INFLUXDB1', 'host')
INFLUXPORT = CONFIG.get('INFLUXDB1', 'port')
INFLUXDBDB = CONFIG.get('INFLUXDB1', 'database')
INFLUXUSER = CONFIG.get('INFLUXDB1', 'username')
MEASUREMENT = CONFIG.get('INFLUXDB1', 'measurement')
INFLUXUSERPASS = CONFIG.get('INFLUXDB1', 'password')
elif INFLUXDB_VERSION == "2":
# Getting params from config for version 2
LOGPATH = CONFIG.get('NGINX_LOGS', 'logpath').split()
URL = CONFIG.get('INFLUXDB2', 'url')
INFLUXDBTOKEN = CONFIG.get('INFLUXDB2', 'token')
INFLUXDBBUCKET = CONFIG.get('INFLUXDB2', 'bucket')
MEASUREMENT = CONFIG.get('INFLUXDB2', 'measurement')
INFLUXDBORG = CONFIG.get('INFLUXDB2', 'organization')
2018-10-08 18:00:33 +02:00
# Parsing log file and sending metrics to Influxdb
while True:
logs = []
thread_names = []
for logitem in LOGPATH:
logs.append(logitem.split(":"))
for website, log in logs:
# Get inode from log file
if os.path.exists(log):
INODE = os.stat(log).st_ino
else:
logging.info('Nginx log file %s not found', log)
print('Nginx log file %s not found' % log)
return
if INFLUXDB_VERSION == "1":
# Run the main loop and grep data in separate threads
2022-07-21 13:23:44 +02:00
KWARGS1 = {'GEOIPDB': GEOIPDB, 'LOGPATH': LOGPATH, 'INFLUXHOST': INFLUXHOST,
'INODE': INODE, 'WEBSITE': website, 'INFLUXPORT': INFLUXPORT, 'INFLUXDBDB': INFLUXDBDB,
'INFLUXUSER': INFLUXUSER, 'MEASUREMENT': MEASUREMENT,
'INFLUXUSERPASS': INFLUXUSERPASS, 'INFLUXDB_VERSION': INFLUXDB_VERSION} # NOQA
KWARGS2 = {'LOGPATH': LOGPATH, 'URL': URL, 'INFLUXDBTOKEN': INFLUXDBTOKEN,
'INFLUXDBBUCKET': INFLUXDBBUCKET, 'MEASUREMENT': MEASUREMENT,
'INODE': INODE, 'WEBSITE': website, 'INFLUXDBORG': INFLUXDBORG} # NOQA
t = website
if os.path.exists(log):
2022-07-21 12:59:18 +02:00
t = threading.Thread(target=logparse, kwargs=KWARGS1, daemon=True, name=website) # NOQA
for thread in threading.enumerate():
thread_names.append(thread.name)
if website not in thread_names:
t.start()
else:
logging.info('Nginx log file %s not found', log)
print('Nginx log file %s not found' % log)
elif INFLUXDB_VERSION == "2":
# Run the main loop and grep data in separate threads
t = website
if os.path.exists(log):
2022-07-21 12:59:18 +02:00
t = threading.Thread(target=logparse, kwargs=KWARGS2, daemon=True, name=website) # NOQA
for thread in threading.enumerate():
thread_names.append(thread.name)
if website not in thread_names:
t.start()
else:
logging.info('Nginx log file %s not found', log)
print('Nginx log file %s not found' % log)
2018-10-08 17:13:00 +02:00
if __name__ == '__main__':
2018-10-08 22:25:40 +02:00
try:
main()
except Exception:
logging.exception("Exception in main(): ")
2018-10-08 22:25:40 +02:00
except KeyboardInterrupt:
logging.exception("Exception KeyboardInterrupt: ")
2018-10-10 20:48:48 +02:00
sys.exit(0)