From 96ddf180a98a4cab94e16c6b49396275194dd7e5 Mon Sep 17 00:00:00 2001 From: Alexey Nizhegolenko Date: Wed, 20 Jul 2022 23:09:55 +0300 Subject: [PATCH 1/9] Added support for the InfluxDB version 2 or modern --- geoparser.py | 87 ++++++++++++++++++++++++++++++++++------------- requirements.txt | 1 + settings.ini.back | 32 +++++++++++++++-- 3 files changed, 93 insertions(+), 27 deletions(-) diff --git a/geoparser.py b/geoparser.py index 5aeeacb..84cdcc2 100755 --- a/geoparser.py +++ b/geoparser.py @@ -16,6 +16,8 @@ import logging.handlers import geoip2.database import configparser from influxdb import InfluxDBClient +from influxdb_client import InfluxDBClient as InfluxDBClient2 +from influxdb_client.client.write_api import SYNCHRONOUS from IPy import IP as ipadd import threading @@ -33,14 +35,20 @@ root = logging.getLogger(__name__) root.setLevel(os.environ.get("LOGLEVEL", "INFO")) root.addHandler(handler) -def logparse(LOGPATH, WEBSITE, INFLUXHOST, INFLUXPORT, INFLUXDBDB, INFLUXUSER, INFLUXUSERPASS, MEASUREMENT, GEOIPDB, INODE): # NOQA + +def logparse(LOGPATH, WEBSITE, MEASUREMENT, GEOIPDB, INODE, INFLUXDB_VERSION, + INFLUXHOST=None, INFLUXPORT=None, URL=None, INFLUXDBDB=None, INFLUXUSER=None, + INFLUXUSERPASS=None, INFLUXDBTOKEN=None, INFLUXDBBUCKET=None, INFLUXDBORG=None): # NOQA # Preparing variables and params IPS = {} COUNT = {} GEOHASH = {} HOSTNAME = os.uname()[1] - CLIENT = InfluxDBClient(host=INFLUXHOST, port=INFLUXPORT, - username=INFLUXUSER, password=INFLUXUSERPASS, database=INFLUXDBDB) # NOQA + if INFLUXDB_VERSION == "1": + CLIENT = InfluxDBClient(host=INFLUXHOST, port=INFLUXPORT, + username=INFLUXUSER, password=INFLUXUSERPASS, database=INFLUXDBDB) # NOQA + elif INFLUXDB_VERSION == "2": + CLIENT = InfluxDBClient(url=URL, token=INFLUXDBTOKEN, org=INFLUXDBORG) # NOQA re_IPV4 = re.compile('(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})') re_IPV6 = re.compile('(([0-9a-fA-F]{1,4}:){7,7}[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,7}:|([0-9a-fA-F]{1,4}:){1,6}:[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,5}(:[0-9a-fA-F]{1,4}){1,2}|([0-9a-fA-F]{1,4}:){1,4}(:[0-9a-fA-F]{1,4}){1,3}|([0-9a-fA-F]{1,4}:){1,3}(:[0-9a-fA-F]{1,4}){1,4}|([0-9a-fA-F]{1,4}:){1,2}(:[0-9a-fA-F]{1,4}){1,5}|[0-9a-fA-F]{1,4}:((:[0-9a-fA-F]{1,4}){1,6})|:((:[0-9a-fA-F]{1,4}){1,7}|:)|fe80:(:[0-9a-fA-F]{0,4}){0,4}%[0-9a-zA-Z]{1,}|::(ffff(:0{1,4}){0,1}:){0,1}((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])|([0-9a-fA-F]{1,4}:){1,4}:((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9]))') # NOQA @@ -86,7 +94,11 @@ def logparse(LOGPATH, WEBSITE, INFLUXHOST, INFLUXPORT, INFLUXDBDB, INFLUXUSER, I METRICS.append(IPS) # Sending json data itto InfluxDB try: - CLIENT.write_points(METRICS) + if INFLUXDB_VERSION == "1": + CLIENT.write_points(METRICS) + elif INFLUXDB_VERSION == "2": + write_api = CLIENT.write_api(write_options=SYNCHRONOUS) # NOQA + write_api.write(INFLUXDBBUCKET, INFLUXDBORG, record=METRICS) # NOQA except Exception: logging.exception("Cannot establish connection with InfluxDB server: ") # NOQA @@ -97,15 +109,28 @@ def main(): CONFIG = configparser.ConfigParser() CONFIG.read(f'{PWD}/settings.ini') - # Getting params from config - GEOIPDB = CONFIG.get('GEOIP', 'geoipdb') - LOGPATH = CONFIG.get('NGINX_LOGS', 'logpath').split() - INFLUXHOST = CONFIG.get('INFLUXDB', 'host') - INFLUXPORT = CONFIG.get('INFLUXDB', 'port') - INFLUXDBDB = CONFIG.get('INFLUXDB', 'database') - INFLUXUSER = CONFIG.get('INFLUXDB', 'username') - MEASUREMENT = CONFIG.get('INFLUXDB', 'measurement') - INFLUXUSERPASS = CONFIG.get('INFLUXDB', 'password') + # Get the InfluxDB version so we can parse only needed part of config + INFLUXDB_VERSION = CONFIG.get('INFLUXDB_VERSION', 'version') + + if INFLUXDB_VERSION == "1": + # Getting params from config for version 1 + GEOIPDB = CONFIG.get('GEOIP', 'geoipdb') + LOGPATH = CONFIG.get('NGINX_LOGS', 'logpath').split() + INFLUXHOST = CONFIG.get('INFLUXDB1', 'host') + INFLUXPORT = CONFIG.get('INFLUXDB1', 'port') + INFLUXDBDB = CONFIG.get('INFLUXDB1', 'database') + INFLUXUSER = CONFIG.get('INFLUXDB1', 'username') + MEASUREMENT = CONFIG.get('INFLUXDB1', 'measurement') + INFLUXUSERPASS = CONFIG.get('INFLUXDB1', 'password') + elif INFLUXDB_VERSION == "2": + # Getting params from config for version 2 + GEOIPDB = CONFIG.get('GEOIP', 'geoipdb') + LOGPATH = CONFIG.get('NGINX_LOGS', 'logpath').split() + URL = CONFIG.get('INFLUXDB2', 'url') + INFLUXDBTOKEN = CONFIG.get('INFLUXDB2', 'token') + INFLUXDBBUCKET = CONFIG.get('INFLUXDB2', 'bucket') + MEASUREMENT = CONFIG.get('INFLUXDB2', 'measurement') + INFLUXDBORG = CONFIG.get('INFLUXDB2', 'organization') # Parsing log file and sending metrics to Influxdb while True: @@ -121,17 +146,31 @@ def main(): logging.info('Nginx log file %s not found', log) print('Nginx log file %s not found' % log) return - # Run the main loop and grep data in separate threads - t = website - if os.path.exists(log): - t = threading.Thread(target=logparse, args=[log, website, INFLUXHOST, INFLUXPORT, INFLUXDBDB, INFLUXUSER, INFLUXUSERPASS, MEASUREMENT, GEOIPDB, INODE], daemon=True, name=website) # NOQA - for thread in threading.enumerate(): - thread_names.append(thread.name) - if website not in thread_names: - t.start() - else: - logging.info('Nginx log file %s not found', log) - print('Nginx log file %s not found' % log) + + if INFLUXDB_VERSION == "1": + # Run the main loop and grep data in separate threads + t = website + if os.path.exists(log): + t = threading.Thread(target=logparse, args=[log, website, INFLUXDB_VERSION, INFLUXHOST, INFLUXPORT, INFLUXDBDB, INFLUXUSER, INFLUXUSERPASS, MEASUREMENT, GEOIPDB, INODE], daemon=True, name=website) # NOQA + for thread in threading.enumerate(): + thread_names.append(thread.name) + if website not in thread_names: + t.start() + else: + logging.info('Nginx log file %s not found', log) + print('Nginx log file %s not found' % log) + elif INFLUXDB_VERSION == "2": + # Run the main loop and grep data in separate threads + t = website + if os.path.exists(log): + t = threading.Thread(target=logparse, args=[log, website, INFLUXDB_VERSION, URL, INFLUXDBTOKEN, INFLUXDBBUCKET, INFLUXDBORG, MEASUREMENT, GEOIPDB, INODE], daemon=True, name=website) # NOQA + for thread in threading.enumerate(): + thread_names.append(thread.name) + if website not in thread_names: + t.start() + else: + logging.info('Nginx log file %s not found', log) + print('Nginx log file %s not found' % log) if __name__ == '__main__': diff --git a/requirements.txt b/requirements.txt index b9d0bca..143e29c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,3 +2,4 @@ configparser==3.5.0 influxdb==5.2.0 geoip2==2.9.0 IPy==1.00 +influxdb-client diff --git a/settings.ini.back b/settings.ini.back index f827e10..888b753 100644 --- a/settings.ini.back +++ b/settings.ini.back @@ -1,12 +1,20 @@ [NGINX_LOGS] -#Path for the log file (Nginx) +# Path for the log file (Nginx) logpath = website1:/var/log/website1/access.log website2:/var/log/website2/access.log [GEOIP] -#Path for the GEOIP DB file +# Path for the GEOIP DB file geoipdb = ./GeoLite2-City.mmdb -[INFLUXDB] +[INFLUXDB_VERSION] +# Version of the InfluxDB, 1 = old 1.8 and early, 2 = new 2.0 and more +# Set this parameter to 1 if you want use old InfluxDB version like 1.8 +# Or set this parameter to 2 if you plan to use InfluxDB version 2.1 or modern +version = 1 + +[INFLUXDB1] +# This part of the config will be used only when [INFLUXDB_VERSION] set to 1 + # Database URL host = INFLUXDB_SERVER_IP port = 8086 @@ -20,3 +28,21 @@ password = INFLUXDB_USER_PASSWORD # Measurement name measurement = geodata + +[INFLUXDB2] +# This part of the config will be used only when [INFLUXDB_VERSION] set to 2 + +# InfluxDB server URL +url = INFLUXDB_SERVER_IP:PORT + +# Token for authentication +token = ANY EXISTED USER TOKEN + +# Organization is the name of the organization you wish to write to +organization = ORGANIZATION NAME + +# Destination bucket to write into +bucket = SOME BUCKET name + +# Measurement name +measurement = geodata From 156453cf7650a2bf5ae7b2c1bb7e227a8b3bfb5d Mon Sep 17 00:00:00 2001 From: Alexey Nizhegolenko Date: Thu, 21 Jul 2022 13:59:18 +0300 Subject: [PATCH 2/9] fixed logparse function arguments pass --- geoparser.py | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/geoparser.py b/geoparser.py index 84cdcc2..f1e79d4 100755 --- a/geoparser.py +++ b/geoparser.py @@ -48,7 +48,7 @@ def logparse(LOGPATH, WEBSITE, MEASUREMENT, GEOIPDB, INODE, INFLUXDB_VERSION, CLIENT = InfluxDBClient(host=INFLUXHOST, port=INFLUXPORT, username=INFLUXUSER, password=INFLUXUSERPASS, database=INFLUXDBDB) # NOQA elif INFLUXDB_VERSION == "2": - CLIENT = InfluxDBClient(url=URL, token=INFLUXDBTOKEN, org=INFLUXDBORG) # NOQA + CLIENT = InfluxDBClient2(url=URL, token=INFLUXDBTOKEN, org=INFLUXDBORG) # NOQA re_IPV4 = re.compile('(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})') re_IPV6 = re.compile('(([0-9a-fA-F]{1,4}:){7,7}[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,7}:|([0-9a-fA-F]{1,4}:){1,6}:[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,5}(:[0-9a-fA-F]{1,4}){1,2}|([0-9a-fA-F]{1,4}:){1,4}(:[0-9a-fA-F]{1,4}){1,3}|([0-9a-fA-F]{1,4}:){1,3}(:[0-9a-fA-F]{1,4}){1,4}|([0-9a-fA-F]{1,4}:){1,2}(:[0-9a-fA-F]{1,4}){1,5}|[0-9a-fA-F]{1,4}:((:[0-9a-fA-F]{1,4}){1,6})|:((:[0-9a-fA-F]{1,4}){1,7}|:)|fe80:(:[0-9a-fA-F]{0,4}){0,4}%[0-9a-zA-Z]{1,}|::(ffff(:0{1,4}){0,1}:){0,1}((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])|([0-9a-fA-F]{1,4}:){1,4}:((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9]))') # NOQA @@ -108,7 +108,8 @@ def main(): PWD = os.path.abspath(os.path.dirname(os.path.realpath(__file__))) CONFIG = configparser.ConfigParser() CONFIG.read(f'{PWD}/settings.ini') - + KWARGS1 = {} + KWARGS2 = {} # Get the InfluxDB version so we can parse only needed part of config INFLUXDB_VERSION = CONFIG.get('INFLUXDB_VERSION', 'version') @@ -122,15 +123,21 @@ def main(): INFLUXUSER = CONFIG.get('INFLUXDB1', 'username') MEASUREMENT = CONFIG.get('INFLUXDB1', 'measurement') INFLUXUSERPASS = CONFIG.get('INFLUXDB1', 'password') + KWARGS1 = {'GEOIPDB': GEOIPDB, 'LOGPATH': LOGPATH, 'INFLUXHOST': INFLUXHOST, + 'INFLUXPORT': INFLUXPORT, 'INFLUXDBDB': INFLUXDBDB, + 'INFLUXUSER': INFLUXUSER, 'MEASUREMENT': MEASUREMENT, + 'INFLUXUSERPASS': INFLUXUSERPASS, 'INFLUXDB_VERSION': INFLUXDB_VERSION} # NOQA elif INFLUXDB_VERSION == "2": # Getting params from config for version 2 - GEOIPDB = CONFIG.get('GEOIP', 'geoipdb') LOGPATH = CONFIG.get('NGINX_LOGS', 'logpath').split() URL = CONFIG.get('INFLUXDB2', 'url') INFLUXDBTOKEN = CONFIG.get('INFLUXDB2', 'token') INFLUXDBBUCKET = CONFIG.get('INFLUXDB2', 'bucket') MEASUREMENT = CONFIG.get('INFLUXDB2', 'measurement') INFLUXDBORG = CONFIG.get('INFLUXDB2', 'organization') + KWARGS2 = {'LOGPATH': LOGPATH, 'URL': URL, 'INFLUXDBTOKEN': INFLUXDBTOKEN, + 'INFLUXDBBUCKET': INFLUXDBBUCKET, 'MEASUREMENT': MEASUREMENT, + 'INFLUXDBORG': INFLUXDBORG} # NOQA # Parsing log file and sending metrics to Influxdb while True: @@ -151,7 +158,7 @@ def main(): # Run the main loop and grep data in separate threads t = website if os.path.exists(log): - t = threading.Thread(target=logparse, args=[log, website, INFLUXDB_VERSION, INFLUXHOST, INFLUXPORT, INFLUXDBDB, INFLUXUSER, INFLUXUSERPASS, MEASUREMENT, GEOIPDB, INODE], daemon=True, name=website) # NOQA + t = threading.Thread(target=logparse, kwargs=KWARGS1, daemon=True, name=website) # NOQA for thread in threading.enumerate(): thread_names.append(thread.name) if website not in thread_names: @@ -163,7 +170,7 @@ def main(): # Run the main loop and grep data in separate threads t = website if os.path.exists(log): - t = threading.Thread(target=logparse, args=[log, website, INFLUXDB_VERSION, URL, INFLUXDBTOKEN, INFLUXDBBUCKET, INFLUXDBORG, MEASUREMENT, GEOIPDB, INODE], daemon=True, name=website) # NOQA + t = threading.Thread(target=logparse, kwargs=KWARGS2, daemon=True, name=website) # NOQA for thread in threading.enumerate(): thread_names.append(thread.name) if website not in thread_names: From 7acc4727df08838380237f573d1654186f7305b8 Mon Sep 17 00:00:00 2001 From: Alexey Nizhegolenko Date: Thu, 21 Jul 2022 14:23:44 +0300 Subject: [PATCH 3/9] changing threads func args to keyargs --- geoparser.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/geoparser.py b/geoparser.py index f1e79d4..a9f333c 100755 --- a/geoparser.py +++ b/geoparser.py @@ -123,10 +123,6 @@ def main(): INFLUXUSER = CONFIG.get('INFLUXDB1', 'username') MEASUREMENT = CONFIG.get('INFLUXDB1', 'measurement') INFLUXUSERPASS = CONFIG.get('INFLUXDB1', 'password') - KWARGS1 = {'GEOIPDB': GEOIPDB, 'LOGPATH': LOGPATH, 'INFLUXHOST': INFLUXHOST, - 'INFLUXPORT': INFLUXPORT, 'INFLUXDBDB': INFLUXDBDB, - 'INFLUXUSER': INFLUXUSER, 'MEASUREMENT': MEASUREMENT, - 'INFLUXUSERPASS': INFLUXUSERPASS, 'INFLUXDB_VERSION': INFLUXDB_VERSION} # NOQA elif INFLUXDB_VERSION == "2": # Getting params from config for version 2 LOGPATH = CONFIG.get('NGINX_LOGS', 'logpath').split() @@ -135,9 +131,6 @@ def main(): INFLUXDBBUCKET = CONFIG.get('INFLUXDB2', 'bucket') MEASUREMENT = CONFIG.get('INFLUXDB2', 'measurement') INFLUXDBORG = CONFIG.get('INFLUXDB2', 'organization') - KWARGS2 = {'LOGPATH': LOGPATH, 'URL': URL, 'INFLUXDBTOKEN': INFLUXDBTOKEN, - 'INFLUXDBBUCKET': INFLUXDBBUCKET, 'MEASUREMENT': MEASUREMENT, - 'INFLUXDBORG': INFLUXDBORG} # NOQA # Parsing log file and sending metrics to Influxdb while True: @@ -153,9 +146,16 @@ def main(): logging.info('Nginx log file %s not found', log) print('Nginx log file %s not found' % log) return - if INFLUXDB_VERSION == "1": # Run the main loop and grep data in separate threads + KWARGS1 = {'GEOIPDB': GEOIPDB, 'LOGPATH': LOGPATH, 'INFLUXHOST': INFLUXHOST, + 'INODE': INODE, 'WEBSITE': website, 'INFLUXPORT': INFLUXPORT, 'INFLUXDBDB': INFLUXDBDB, + 'INFLUXUSER': INFLUXUSER, 'MEASUREMENT': MEASUREMENT, + 'INFLUXUSERPASS': INFLUXUSERPASS, 'INFLUXDB_VERSION': INFLUXDB_VERSION} # NOQA + + KWARGS2 = {'LOGPATH': LOGPATH, 'URL': URL, 'INFLUXDBTOKEN': INFLUXDBTOKEN, + 'INFLUXDBBUCKET': INFLUXDBBUCKET, 'MEASUREMENT': MEASUREMENT, + 'INODE': INODE, 'WEBSITE': website, 'INFLUXDBORG': INFLUXDBORG} # NOQA t = website if os.path.exists(log): t = threading.Thread(target=logparse, kwargs=KWARGS1, daemon=True, name=website) # NOQA From 943a7345117d0e1f53d4a2efdbacf203b4a23dfc Mon Sep 17 00:00:00 2001 From: Alexey Nizhegolenko Date: Thu, 21 Jul 2022 14:30:50 +0300 Subject: [PATCH 4/9] Fixing URL variable --- geoparser.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/geoparser.py b/geoparser.py index a9f333c..b464868 100755 --- a/geoparser.py +++ b/geoparser.py @@ -108,8 +108,6 @@ def main(): PWD = os.path.abspath(os.path.dirname(os.path.realpath(__file__))) CONFIG = configparser.ConfigParser() CONFIG.read(f'{PWD}/settings.ini') - KWARGS1 = {} - KWARGS2 = {} # Get the InfluxDB version so we can parse only needed part of config INFLUXDB_VERSION = CONFIG.get('INFLUXDB_VERSION', 'version') @@ -131,7 +129,8 @@ def main(): INFLUXDBBUCKET = CONFIG.get('INFLUXDB2', 'bucket') MEASUREMENT = CONFIG.get('INFLUXDB2', 'measurement') INFLUXDBORG = CONFIG.get('INFLUXDB2', 'organization') - + KWARGS1 = {} + KWARGS2 = {} # Parsing log file and sending metrics to Influxdb while True: logs = [] From 9032c74b8606688bb9ba5a6ad11640e26733deed Mon Sep 17 00:00:00 2001 From: Alexey Nizhegolenko Date: Thu, 21 Jul 2022 14:41:38 +0300 Subject: [PATCH 5/9] Add dict as keywards to thread func --- geoparser.py | 21 +++++++++------------ 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/geoparser.py b/geoparser.py index b464868..f341abf 100755 --- a/geoparser.py +++ b/geoparser.py @@ -129,8 +129,8 @@ def main(): INFLUXDBBUCKET = CONFIG.get('INFLUXDB2', 'bucket') MEASUREMENT = CONFIG.get('INFLUXDB2', 'measurement') INFLUXDBORG = CONFIG.get('INFLUXDB2', 'organization') - KWARGS1 = {} - KWARGS2 = {} + + # Parsing log file and sending metrics to Influxdb while True: logs = [] @@ -147,17 +147,12 @@ def main(): return if INFLUXDB_VERSION == "1": # Run the main loop and grep data in separate threads - KWARGS1 = {'GEOIPDB': GEOIPDB, 'LOGPATH': LOGPATH, 'INFLUXHOST': INFLUXHOST, - 'INODE': INODE, 'WEBSITE': website, 'INFLUXPORT': INFLUXPORT, 'INFLUXDBDB': INFLUXDBDB, - 'INFLUXUSER': INFLUXUSER, 'MEASUREMENT': MEASUREMENT, - 'INFLUXUSERPASS': INFLUXUSERPASS, 'INFLUXDB_VERSION': INFLUXDB_VERSION} # NOQA - - KWARGS2 = {'LOGPATH': LOGPATH, 'URL': URL, 'INFLUXDBTOKEN': INFLUXDBTOKEN, - 'INFLUXDBBUCKET': INFLUXDBBUCKET, 'MEASUREMENT': MEASUREMENT, - 'INODE': INODE, 'WEBSITE': website, 'INFLUXDBORG': INFLUXDBORG} # NOQA t = website if os.path.exists(log): - t = threading.Thread(target=logparse, kwargs=KWARGS1, daemon=True, name=website) # NOQA + t = threading.Thread(target=logparse, kwargs={'GEOIPDB': GEOIPDB, 'LOGPATH': LOGPATH, 'INFLUXHOST': INFLUXHOST, + 'INODE': INODE, 'WEBSITE': website, 'INFLUXPORT': INFLUXPORT, 'INFLUXDBDB': INFLUXDBDB, + 'INFLUXUSER': INFLUXUSER, 'MEASUREMENT': MEASUREMENT, + 'INFLUXUSERPASS': INFLUXUSERPASS, 'INFLUXDB_VERSION': INFLUXDB_VERSION}, daemon=True, name=website) # NOQA for thread in threading.enumerate(): thread_names.append(thread.name) if website not in thread_names: @@ -169,7 +164,9 @@ def main(): # Run the main loop and grep data in separate threads t = website if os.path.exists(log): - t = threading.Thread(target=logparse, kwargs=KWARGS2, daemon=True, name=website) # NOQA + t = threading.Thread(target=logparse, kwargs={'LOGPATH': LOGPATH, 'URL': URL, 'INFLUXDBTOKEN': INFLUXDBTOKEN, + 'INFLUXDBBUCKET': INFLUXDBBUCKET, 'MEASUREMENT': MEASUREMENT, + 'INODE': INODE, 'WEBSITE': website, 'INFLUXDBORG': INFLUXDBORG}, daemon=True, name=website) # NOQA for thread in threading.enumerate(): thread_names.append(thread.name) if website not in thread_names: From 8918b173acaf7de7f5b0b006a3b804df0734baf5 Mon Sep 17 00:00:00 2001 From: Alexey Nizhegolenko Date: Thu, 21 Jul 2022 14:44:28 +0300 Subject: [PATCH 6/9] Fixed LOGPATH var --- geoparser.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/geoparser.py b/geoparser.py index f341abf..6173f30 100755 --- a/geoparser.py +++ b/geoparser.py @@ -149,7 +149,7 @@ def main(): # Run the main loop and grep data in separate threads t = website if os.path.exists(log): - t = threading.Thread(target=logparse, kwargs={'GEOIPDB': GEOIPDB, 'LOGPATH': LOGPATH, 'INFLUXHOST': INFLUXHOST, + t = threading.Thread(target=logparse, kwargs={'GEOIPDB': GEOIPDB, 'LOGPATH': log, 'INFLUXHOST': INFLUXHOST, 'INODE': INODE, 'WEBSITE': website, 'INFLUXPORT': INFLUXPORT, 'INFLUXDBDB': INFLUXDBDB, 'INFLUXUSER': INFLUXUSER, 'MEASUREMENT': MEASUREMENT, 'INFLUXUSERPASS': INFLUXUSERPASS, 'INFLUXDB_VERSION': INFLUXDB_VERSION}, daemon=True, name=website) # NOQA @@ -164,7 +164,7 @@ def main(): # Run the main loop and grep data in separate threads t = website if os.path.exists(log): - t = threading.Thread(target=logparse, kwargs={'LOGPATH': LOGPATH, 'URL': URL, 'INFLUXDBTOKEN': INFLUXDBTOKEN, + t = threading.Thread(target=logparse, kwargs={'LOGPATH': log, 'URL': URL, 'INFLUXDBTOKEN': INFLUXDBTOKEN, 'INFLUXDBBUCKET': INFLUXDBBUCKET, 'MEASUREMENT': MEASUREMENT, 'INODE': INODE, 'WEBSITE': website, 'INFLUXDBORG': INFLUXDBORG}, daemon=True, name=website) # NOQA for thread in threading.enumerate(): From 056b37af2a5b2d4439251b7fc675d875cd830ee2 Mon Sep 17 00:00:00 2001 From: Alexey Nizhegolenko Date: Thu, 21 Jul 2022 17:42:51 +0300 Subject: [PATCH 7/9] Added missed GEOIPDB var into influxdb2 parsing config func --- geoparser.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/geoparser.py b/geoparser.py index 6173f30..f77b581 100755 --- a/geoparser.py +++ b/geoparser.py @@ -5,6 +5,7 @@ # geoip, which is going away r.s.n. # Added possibility of processing more than one Nginx log file, # by adding threading support. 2022 July by Alexey Nizhegolenko +# Added InfluxDB 2 support. 2022/07/21 by Alexey Nizhegolenko import os import re @@ -130,7 +131,6 @@ def main(): MEASUREMENT = CONFIG.get('INFLUXDB2', 'measurement') INFLUXDBORG = CONFIG.get('INFLUXDB2', 'organization') - # Parsing log file and sending metrics to Influxdb while True: logs = [] @@ -164,8 +164,8 @@ def main(): # Run the main loop and grep data in separate threads t = website if os.path.exists(log): - t = threading.Thread(target=logparse, kwargs={'LOGPATH': log, 'URL': URL, 'INFLUXDBTOKEN': INFLUXDBTOKEN, - 'INFLUXDBBUCKET': INFLUXDBBUCKET, 'MEASUREMENT': MEASUREMENT, + t = threading.Thread(target=logparse, kwargs={'GEOIPDB': GEOIPDB, 'LOGPATH': log, 'URL': URL, 'INFLUXDBTOKEN': INFLUXDBTOKEN, + 'INFLUXDBBUCKET': INFLUXDBBUCKET, 'MEASUREMENT': MEASUREMENT, 'INFLUXDB_VERSION': INFLUXDB_VERSION, 'INODE': INODE, 'WEBSITE': website, 'INFLUXDBORG': INFLUXDBORG}, daemon=True, name=website) # NOQA for thread in threading.enumerate(): thread_names.append(thread.name) From c3f773cf87119675e8b0bfeae2de9edfc578d639 Mon Sep 17 00:00:00 2001 From: Alexey Nizhegolenko Date: Thu, 21 Jul 2022 17:43:55 +0300 Subject: [PATCH 8/9] Added missed GEOIPDB parsing from settings.ini --- geoparser.py | 1 + 1 file changed, 1 insertion(+) diff --git a/geoparser.py b/geoparser.py index f77b581..05a41db 100755 --- a/geoparser.py +++ b/geoparser.py @@ -124,6 +124,7 @@ def main(): INFLUXUSERPASS = CONFIG.get('INFLUXDB1', 'password') elif INFLUXDB_VERSION == "2": # Getting params from config for version 2 + GEOIPDB = CONFIG.get('GEOIP', 'geoipdb') LOGPATH = CONFIG.get('NGINX_LOGS', 'logpath').split() URL = CONFIG.get('INFLUXDB2', 'url') INFLUXDBTOKEN = CONFIG.get('INFLUXDB2', 'token') From 77c20f9779dc9ad9fcc37c2a9534404d49e33440 Mon Sep 17 00:00:00 2001 From: Alexey Nizhegolenko Date: Thu, 21 Jul 2022 20:51:47 +0300 Subject: [PATCH 9/9] Update README.md --- README.md | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index cab0e6c..d3d44e3 100644 --- a/README.md +++ b/README.md @@ -1,9 +1,13 @@ # GeoStat -### Version 2.2 +### Version 2.3 ![Alt text](https://github.com/ratibor78/geostat/blob/master/geostat.png?raw=true "Grafana dashboard example") -GeoStat it's a Python-based script for parsing Nginx and Apache log files and getting GEO data from incoming IPs from it. This script converts parsed data into JSON format and sends it to the InfluxDB database, so you can use it for building nice Grafana dashboards for example. The application runs as SystemD service and parses log files in "tailf" style. Also, you can run it as a Docker container if you wish. +GeoStat a Python-based script for parsing Nginx and Apache log files and getting GEO data about incoming IPs from them. This script converts parsed data into JSON format and sends it to the InfluxDB database, so you can use it for building nice Grafana dashboards. Now, this program supports old InfluxDB 1.8 and modern InfluxDB 2. The application runs as SystemD service and parses log files in "tailf" style. Also, you can run it as a Docker container if you wish. + +# New in version 2.3 +- Was added the InfluxDB 2 support, now you can use not only old InfluxDB 1.8 but also send data into modern InfluxDB 2.* +- Was fixed small bugs also. # New in version 2.2 - The application was rewritten with adding the availability of parsing more than one log file at one time, now you can parse multiple separated websites on the host. To do that please set up all virtual hosts or websites to save their log files in different places.