diff --git a/geoparser.py b/geoparser.py index 5aeeacb..84cdcc2 100755 --- a/geoparser.py +++ b/geoparser.py @@ -16,6 +16,8 @@ import logging.handlers import geoip2.database import configparser from influxdb import InfluxDBClient +from influxdb_client import InfluxDBClient as InfluxDBClient2 +from influxdb_client.client.write_api import SYNCHRONOUS from IPy import IP as ipadd import threading @@ -33,14 +35,20 @@ root = logging.getLogger(__name__) root.setLevel(os.environ.get("LOGLEVEL", "INFO")) root.addHandler(handler) -def logparse(LOGPATH, WEBSITE, INFLUXHOST, INFLUXPORT, INFLUXDBDB, INFLUXUSER, INFLUXUSERPASS, MEASUREMENT, GEOIPDB, INODE): # NOQA + +def logparse(LOGPATH, WEBSITE, MEASUREMENT, GEOIPDB, INODE, INFLUXDB_VERSION, + INFLUXHOST=None, INFLUXPORT=None, URL=None, INFLUXDBDB=None, INFLUXUSER=None, + INFLUXUSERPASS=None, INFLUXDBTOKEN=None, INFLUXDBBUCKET=None, INFLUXDBORG=None): # NOQA # Preparing variables and params IPS = {} COUNT = {} GEOHASH = {} HOSTNAME = os.uname()[1] - CLIENT = InfluxDBClient(host=INFLUXHOST, port=INFLUXPORT, - username=INFLUXUSER, password=INFLUXUSERPASS, database=INFLUXDBDB) # NOQA + if INFLUXDB_VERSION == "1": + CLIENT = InfluxDBClient(host=INFLUXHOST, port=INFLUXPORT, + username=INFLUXUSER, password=INFLUXUSERPASS, database=INFLUXDBDB) # NOQA + elif INFLUXDB_VERSION == "2": + CLIENT = InfluxDBClient(url=URL, token=INFLUXDBTOKEN, org=INFLUXDBORG) # NOQA re_IPV4 = re.compile('(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})') re_IPV6 = re.compile('(([0-9a-fA-F]{1,4}:){7,7}[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,7}:|([0-9a-fA-F]{1,4}:){1,6}:[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,5}(:[0-9a-fA-F]{1,4}){1,2}|([0-9a-fA-F]{1,4}:){1,4}(:[0-9a-fA-F]{1,4}){1,3}|([0-9a-fA-F]{1,4}:){1,3}(:[0-9a-fA-F]{1,4}){1,4}|([0-9a-fA-F]{1,4}:){1,2}(:[0-9a-fA-F]{1,4}){1,5}|[0-9a-fA-F]{1,4}:((:[0-9a-fA-F]{1,4}){1,6})|:((:[0-9a-fA-F]{1,4}){1,7}|:)|fe80:(:[0-9a-fA-F]{0,4}){0,4}%[0-9a-zA-Z]{1,}|::(ffff(:0{1,4}){0,1}:){0,1}((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])|([0-9a-fA-F]{1,4}:){1,4}:((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9]))') # NOQA @@ -86,7 +94,11 @@ def logparse(LOGPATH, WEBSITE, INFLUXHOST, INFLUXPORT, INFLUXDBDB, INFLUXUSER, I METRICS.append(IPS) # Sending json data itto InfluxDB try: - CLIENT.write_points(METRICS) + if INFLUXDB_VERSION == "1": + CLIENT.write_points(METRICS) + elif INFLUXDB_VERSION == "2": + write_api = CLIENT.write_api(write_options=SYNCHRONOUS) # NOQA + write_api.write(INFLUXDBBUCKET, INFLUXDBORG, record=METRICS) # NOQA except Exception: logging.exception("Cannot establish connection with InfluxDB server: ") # NOQA @@ -97,15 +109,28 @@ def main(): CONFIG = configparser.ConfigParser() CONFIG.read(f'{PWD}/settings.ini') - # Getting params from config - GEOIPDB = CONFIG.get('GEOIP', 'geoipdb') - LOGPATH = CONFIG.get('NGINX_LOGS', 'logpath').split() - INFLUXHOST = CONFIG.get('INFLUXDB', 'host') - INFLUXPORT = CONFIG.get('INFLUXDB', 'port') - INFLUXDBDB = CONFIG.get('INFLUXDB', 'database') - INFLUXUSER = CONFIG.get('INFLUXDB', 'username') - MEASUREMENT = CONFIG.get('INFLUXDB', 'measurement') - INFLUXUSERPASS = CONFIG.get('INFLUXDB', 'password') + # Get the InfluxDB version so we can parse only needed part of config + INFLUXDB_VERSION = CONFIG.get('INFLUXDB_VERSION', 'version') + + if INFLUXDB_VERSION == "1": + # Getting params from config for version 1 + GEOIPDB = CONFIG.get('GEOIP', 'geoipdb') + LOGPATH = CONFIG.get('NGINX_LOGS', 'logpath').split() + INFLUXHOST = CONFIG.get('INFLUXDB1', 'host') + INFLUXPORT = CONFIG.get('INFLUXDB1', 'port') + INFLUXDBDB = CONFIG.get('INFLUXDB1', 'database') + INFLUXUSER = CONFIG.get('INFLUXDB1', 'username') + MEASUREMENT = CONFIG.get('INFLUXDB1', 'measurement') + INFLUXUSERPASS = CONFIG.get('INFLUXDB1', 'password') + elif INFLUXDB_VERSION == "2": + # Getting params from config for version 2 + GEOIPDB = CONFIG.get('GEOIP', 'geoipdb') + LOGPATH = CONFIG.get('NGINX_LOGS', 'logpath').split() + URL = CONFIG.get('INFLUXDB2', 'url') + INFLUXDBTOKEN = CONFIG.get('INFLUXDB2', 'token') + INFLUXDBBUCKET = CONFIG.get('INFLUXDB2', 'bucket') + MEASUREMENT = CONFIG.get('INFLUXDB2', 'measurement') + INFLUXDBORG = CONFIG.get('INFLUXDB2', 'organization') # Parsing log file and sending metrics to Influxdb while True: @@ -121,17 +146,31 @@ def main(): logging.info('Nginx log file %s not found', log) print('Nginx log file %s not found' % log) return - # Run the main loop and grep data in separate threads - t = website - if os.path.exists(log): - t = threading.Thread(target=logparse, args=[log, website, INFLUXHOST, INFLUXPORT, INFLUXDBDB, INFLUXUSER, INFLUXUSERPASS, MEASUREMENT, GEOIPDB, INODE], daemon=True, name=website) # NOQA - for thread in threading.enumerate(): - thread_names.append(thread.name) - if website not in thread_names: - t.start() - else: - logging.info('Nginx log file %s not found', log) - print('Nginx log file %s not found' % log) + + if INFLUXDB_VERSION == "1": + # Run the main loop and grep data in separate threads + t = website + if os.path.exists(log): + t = threading.Thread(target=logparse, args=[log, website, INFLUXDB_VERSION, INFLUXHOST, INFLUXPORT, INFLUXDBDB, INFLUXUSER, INFLUXUSERPASS, MEASUREMENT, GEOIPDB, INODE], daemon=True, name=website) # NOQA + for thread in threading.enumerate(): + thread_names.append(thread.name) + if website not in thread_names: + t.start() + else: + logging.info('Nginx log file %s not found', log) + print('Nginx log file %s not found' % log) + elif INFLUXDB_VERSION == "2": + # Run the main loop and grep data in separate threads + t = website + if os.path.exists(log): + t = threading.Thread(target=logparse, args=[log, website, INFLUXDB_VERSION, URL, INFLUXDBTOKEN, INFLUXDBBUCKET, INFLUXDBORG, MEASUREMENT, GEOIPDB, INODE], daemon=True, name=website) # NOQA + for thread in threading.enumerate(): + thread_names.append(thread.name) + if website not in thread_names: + t.start() + else: + logging.info('Nginx log file %s not found', log) + print('Nginx log file %s not found' % log) if __name__ == '__main__': diff --git a/requirements.txt b/requirements.txt index b9d0bca..143e29c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,3 +2,4 @@ configparser==3.5.0 influxdb==5.2.0 geoip2==2.9.0 IPy==1.00 +influxdb-client diff --git a/settings.ini.back b/settings.ini.back index f827e10..888b753 100644 --- a/settings.ini.back +++ b/settings.ini.back @@ -1,12 +1,20 @@ [NGINX_LOGS] -#Path for the log file (Nginx) +# Path for the log file (Nginx) logpath = website1:/var/log/website1/access.log website2:/var/log/website2/access.log [GEOIP] -#Path for the GEOIP DB file +# Path for the GEOIP DB file geoipdb = ./GeoLite2-City.mmdb -[INFLUXDB] +[INFLUXDB_VERSION] +# Version of the InfluxDB, 1 = old 1.8 and early, 2 = new 2.0 and more +# Set this parameter to 1 if you want use old InfluxDB version like 1.8 +# Or set this parameter to 2 if you plan to use InfluxDB version 2.1 or modern +version = 1 + +[INFLUXDB1] +# This part of the config will be used only when [INFLUXDB_VERSION] set to 1 + # Database URL host = INFLUXDB_SERVER_IP port = 8086 @@ -20,3 +28,21 @@ password = INFLUXDB_USER_PASSWORD # Measurement name measurement = geodata + +[INFLUXDB2] +# This part of the config will be used only when [INFLUXDB_VERSION] set to 2 + +# InfluxDB server URL +url = INFLUXDB_SERVER_IP:PORT + +# Token for authentication +token = ANY EXISTED USER TOKEN + +# Organization is the name of the organization you wish to write to +organization = ORGANIZATION NAME + +# Destination bucket to write into +bucket = SOME BUCKET name + +# Measurement name +measurement = geodata