commit
416da2e80a
4 changed files with 103 additions and 29 deletions
|
@ -1,9 +1,13 @@
|
||||||
# GeoStat
|
# GeoStat
|
||||||
### Version 2.2
|
### Version 2.3
|
||||||
![Alt text](https://github.com/ratibor78/geostat/blob/master/geostat.png?raw=true "Grafana dashboard example")
|
![Alt text](https://github.com/ratibor78/geostat/blob/master/geostat.png?raw=true "Grafana dashboard example")
|
||||||
|
|
||||||
|
|
||||||
GeoStat it's a Python-based script for parsing Nginx and Apache log files and getting GEO data from incoming IPs from it. This script converts parsed data into JSON format and sends it to the InfluxDB database, so you can use it for building nice Grafana dashboards for example. The application runs as SystemD service and parses log files in "tailf" style. Also, you can run it as a Docker container if you wish.
|
GeoStat a Python-based script for parsing Nginx and Apache log files and getting GEO data about incoming IPs from them. This script converts parsed data into JSON format and sends it to the InfluxDB database, so you can use it for building nice Grafana dashboards. Now, this program supports old InfluxDB 1.8 and modern InfluxDB 2. The application runs as SystemD service and parses log files in "tailf" style. Also, you can run it as a Docker container if you wish.
|
||||||
|
|
||||||
|
# New in version 2.3
|
||||||
|
- Was added the InfluxDB 2 support, now you can use not only old InfluxDB 1.8 but also send data into modern InfluxDB 2.*
|
||||||
|
- Was fixed small bugs also.
|
||||||
|
|
||||||
# New in version 2.2
|
# New in version 2.2
|
||||||
- The application was rewritten with adding the availability of parsing more than one log file at one time, now you can parse multiple separated websites on the host. To do that please set up all virtual hosts or websites to save their log files in different places.
|
- The application was rewritten with adding the availability of parsing more than one log file at one time, now you can parse multiple separated websites on the host. To do that please set up all virtual hosts or websites to save their log files in different places.
|
||||||
|
|
91
geoparser.py
91
geoparser.py
|
@ -5,6 +5,7 @@
|
||||||
# geoip, which is going away r.s.n.
|
# geoip, which is going away r.s.n.
|
||||||
# Added possibility of processing more than one Nginx log file,
|
# Added possibility of processing more than one Nginx log file,
|
||||||
# by adding threading support. 2022 July by Alexey Nizhegolenko
|
# by adding threading support. 2022 July by Alexey Nizhegolenko
|
||||||
|
# Added InfluxDB 2 support. 2022/07/21 by Alexey Nizhegolenko
|
||||||
|
|
||||||
import os
|
import os
|
||||||
import re
|
import re
|
||||||
|
@ -16,6 +17,8 @@ import logging.handlers
|
||||||
import geoip2.database
|
import geoip2.database
|
||||||
import configparser
|
import configparser
|
||||||
from influxdb import InfluxDBClient
|
from influxdb import InfluxDBClient
|
||||||
|
from influxdb_client import InfluxDBClient as InfluxDBClient2
|
||||||
|
from influxdb_client.client.write_api import SYNCHRONOUS
|
||||||
from IPy import IP as ipadd
|
from IPy import IP as ipadd
|
||||||
import threading
|
import threading
|
||||||
|
|
||||||
|
@ -33,14 +36,20 @@ root = logging.getLogger(__name__)
|
||||||
root.setLevel(os.environ.get("LOGLEVEL", "INFO"))
|
root.setLevel(os.environ.get("LOGLEVEL", "INFO"))
|
||||||
root.addHandler(handler)
|
root.addHandler(handler)
|
||||||
|
|
||||||
def logparse(LOGPATH, WEBSITE, INFLUXHOST, INFLUXPORT, INFLUXDBDB, INFLUXUSER, INFLUXUSERPASS, MEASUREMENT, GEOIPDB, INODE): # NOQA
|
|
||||||
|
def logparse(LOGPATH, WEBSITE, MEASUREMENT, GEOIPDB, INODE, INFLUXDB_VERSION,
|
||||||
|
INFLUXHOST=None, INFLUXPORT=None, URL=None, INFLUXDBDB=None, INFLUXUSER=None,
|
||||||
|
INFLUXUSERPASS=None, INFLUXDBTOKEN=None, INFLUXDBBUCKET=None, INFLUXDBORG=None): # NOQA
|
||||||
# Preparing variables and params
|
# Preparing variables and params
|
||||||
IPS = {}
|
IPS = {}
|
||||||
COUNT = {}
|
COUNT = {}
|
||||||
GEOHASH = {}
|
GEOHASH = {}
|
||||||
HOSTNAME = os.uname()[1]
|
HOSTNAME = os.uname()[1]
|
||||||
CLIENT = InfluxDBClient(host=INFLUXHOST, port=INFLUXPORT,
|
if INFLUXDB_VERSION == "1":
|
||||||
username=INFLUXUSER, password=INFLUXUSERPASS, database=INFLUXDBDB) # NOQA
|
CLIENT = InfluxDBClient(host=INFLUXHOST, port=INFLUXPORT,
|
||||||
|
username=INFLUXUSER, password=INFLUXUSERPASS, database=INFLUXDBDB) # NOQA
|
||||||
|
elif INFLUXDB_VERSION == "2":
|
||||||
|
CLIENT = InfluxDBClient2(url=URL, token=INFLUXDBTOKEN, org=INFLUXDBORG) # NOQA
|
||||||
|
|
||||||
re_IPV4 = re.compile('(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})')
|
re_IPV4 = re.compile('(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})')
|
||||||
re_IPV6 = re.compile('(([0-9a-fA-F]{1,4}:){7,7}[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,7}:|([0-9a-fA-F]{1,4}:){1,6}:[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,5}(:[0-9a-fA-F]{1,4}){1,2}|([0-9a-fA-F]{1,4}:){1,4}(:[0-9a-fA-F]{1,4}){1,3}|([0-9a-fA-F]{1,4}:){1,3}(:[0-9a-fA-F]{1,4}){1,4}|([0-9a-fA-F]{1,4}:){1,2}(:[0-9a-fA-F]{1,4}){1,5}|[0-9a-fA-F]{1,4}:((:[0-9a-fA-F]{1,4}){1,6})|:((:[0-9a-fA-F]{1,4}){1,7}|:)|fe80:(:[0-9a-fA-F]{0,4}){0,4}%[0-9a-zA-Z]{1,}|::(ffff(:0{1,4}){0,1}:){0,1}((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])|([0-9a-fA-F]{1,4}:){1,4}:((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9]))') # NOQA
|
re_IPV6 = re.compile('(([0-9a-fA-F]{1,4}:){7,7}[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,7}:|([0-9a-fA-F]{1,4}:){1,6}:[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,5}(:[0-9a-fA-F]{1,4}){1,2}|([0-9a-fA-F]{1,4}:){1,4}(:[0-9a-fA-F]{1,4}){1,3}|([0-9a-fA-F]{1,4}:){1,3}(:[0-9a-fA-F]{1,4}){1,4}|([0-9a-fA-F]{1,4}:){1,2}(:[0-9a-fA-F]{1,4}){1,5}|[0-9a-fA-F]{1,4}:((:[0-9a-fA-F]{1,4}){1,6})|:((:[0-9a-fA-F]{1,4}){1,7}|:)|fe80:(:[0-9a-fA-F]{0,4}){0,4}%[0-9a-zA-Z]{1,}|::(ffff(:0{1,4}){0,1}:){0,1}((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])|([0-9a-fA-F]{1,4}:){1,4}:((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9]))') # NOQA
|
||||||
|
@ -86,7 +95,11 @@ def logparse(LOGPATH, WEBSITE, INFLUXHOST, INFLUXPORT, INFLUXDBDB, INFLUXUSER, I
|
||||||
METRICS.append(IPS)
|
METRICS.append(IPS)
|
||||||
# Sending json data itto InfluxDB
|
# Sending json data itto InfluxDB
|
||||||
try:
|
try:
|
||||||
CLIENT.write_points(METRICS)
|
if INFLUXDB_VERSION == "1":
|
||||||
|
CLIENT.write_points(METRICS)
|
||||||
|
elif INFLUXDB_VERSION == "2":
|
||||||
|
write_api = CLIENT.write_api(write_options=SYNCHRONOUS) # NOQA
|
||||||
|
write_api.write(INFLUXDBBUCKET, INFLUXDBORG, record=METRICS) # NOQA
|
||||||
except Exception:
|
except Exception:
|
||||||
logging.exception("Cannot establish connection with InfluxDB server: ") # NOQA
|
logging.exception("Cannot establish connection with InfluxDB server: ") # NOQA
|
||||||
|
|
||||||
|
@ -96,16 +109,28 @@ def main():
|
||||||
PWD = os.path.abspath(os.path.dirname(os.path.realpath(__file__)))
|
PWD = os.path.abspath(os.path.dirname(os.path.realpath(__file__)))
|
||||||
CONFIG = configparser.ConfigParser()
|
CONFIG = configparser.ConfigParser()
|
||||||
CONFIG.read(f'{PWD}/settings.ini')
|
CONFIG.read(f'{PWD}/settings.ini')
|
||||||
|
# Get the InfluxDB version so we can parse only needed part of config
|
||||||
|
INFLUXDB_VERSION = CONFIG.get('INFLUXDB_VERSION', 'version')
|
||||||
|
|
||||||
# Getting params from config
|
if INFLUXDB_VERSION == "1":
|
||||||
GEOIPDB = CONFIG.get('GEOIP', 'geoipdb')
|
# Getting params from config for version 1
|
||||||
LOGPATH = CONFIG.get('NGINX_LOGS', 'logpath').split()
|
GEOIPDB = CONFIG.get('GEOIP', 'geoipdb')
|
||||||
INFLUXHOST = CONFIG.get('INFLUXDB', 'host')
|
LOGPATH = CONFIG.get('NGINX_LOGS', 'logpath').split()
|
||||||
INFLUXPORT = CONFIG.get('INFLUXDB', 'port')
|
INFLUXHOST = CONFIG.get('INFLUXDB1', 'host')
|
||||||
INFLUXDBDB = CONFIG.get('INFLUXDB', 'database')
|
INFLUXPORT = CONFIG.get('INFLUXDB1', 'port')
|
||||||
INFLUXUSER = CONFIG.get('INFLUXDB', 'username')
|
INFLUXDBDB = CONFIG.get('INFLUXDB1', 'database')
|
||||||
MEASUREMENT = CONFIG.get('INFLUXDB', 'measurement')
|
INFLUXUSER = CONFIG.get('INFLUXDB1', 'username')
|
||||||
INFLUXUSERPASS = CONFIG.get('INFLUXDB', 'password')
|
MEASUREMENT = CONFIG.get('INFLUXDB1', 'measurement')
|
||||||
|
INFLUXUSERPASS = CONFIG.get('INFLUXDB1', 'password')
|
||||||
|
elif INFLUXDB_VERSION == "2":
|
||||||
|
# Getting params from config for version 2
|
||||||
|
GEOIPDB = CONFIG.get('GEOIP', 'geoipdb')
|
||||||
|
LOGPATH = CONFIG.get('NGINX_LOGS', 'logpath').split()
|
||||||
|
URL = CONFIG.get('INFLUXDB2', 'url')
|
||||||
|
INFLUXDBTOKEN = CONFIG.get('INFLUXDB2', 'token')
|
||||||
|
INFLUXDBBUCKET = CONFIG.get('INFLUXDB2', 'bucket')
|
||||||
|
MEASUREMENT = CONFIG.get('INFLUXDB2', 'measurement')
|
||||||
|
INFLUXDBORG = CONFIG.get('INFLUXDB2', 'organization')
|
||||||
|
|
||||||
# Parsing log file and sending metrics to Influxdb
|
# Parsing log file and sending metrics to Influxdb
|
||||||
while True:
|
while True:
|
||||||
|
@ -121,17 +146,35 @@ def main():
|
||||||
logging.info('Nginx log file %s not found', log)
|
logging.info('Nginx log file %s not found', log)
|
||||||
print('Nginx log file %s not found' % log)
|
print('Nginx log file %s not found' % log)
|
||||||
return
|
return
|
||||||
# Run the main loop and grep data in separate threads
|
if INFLUXDB_VERSION == "1":
|
||||||
t = website
|
# Run the main loop and grep data in separate threads
|
||||||
if os.path.exists(log):
|
t = website
|
||||||
t = threading.Thread(target=logparse, args=[log, website, INFLUXHOST, INFLUXPORT, INFLUXDBDB, INFLUXUSER, INFLUXUSERPASS, MEASUREMENT, GEOIPDB, INODE], daemon=True, name=website) # NOQA
|
if os.path.exists(log):
|
||||||
for thread in threading.enumerate():
|
t = threading.Thread(target=logparse, kwargs={'GEOIPDB': GEOIPDB, 'LOGPATH': log, 'INFLUXHOST': INFLUXHOST,
|
||||||
thread_names.append(thread.name)
|
'INODE': INODE, 'WEBSITE': website, 'INFLUXPORT': INFLUXPORT, 'INFLUXDBDB': INFLUXDBDB,
|
||||||
if website not in thread_names:
|
'INFLUXUSER': INFLUXUSER, 'MEASUREMENT': MEASUREMENT,
|
||||||
t.start()
|
'INFLUXUSERPASS': INFLUXUSERPASS, 'INFLUXDB_VERSION': INFLUXDB_VERSION}, daemon=True, name=website) # NOQA
|
||||||
else:
|
for thread in threading.enumerate():
|
||||||
logging.info('Nginx log file %s not found', log)
|
thread_names.append(thread.name)
|
||||||
print('Nginx log file %s not found' % log)
|
if website not in thread_names:
|
||||||
|
t.start()
|
||||||
|
else:
|
||||||
|
logging.info('Nginx log file %s not found', log)
|
||||||
|
print('Nginx log file %s not found' % log)
|
||||||
|
elif INFLUXDB_VERSION == "2":
|
||||||
|
# Run the main loop and grep data in separate threads
|
||||||
|
t = website
|
||||||
|
if os.path.exists(log):
|
||||||
|
t = threading.Thread(target=logparse, kwargs={'GEOIPDB': GEOIPDB, 'LOGPATH': log, 'URL': URL, 'INFLUXDBTOKEN': INFLUXDBTOKEN,
|
||||||
|
'INFLUXDBBUCKET': INFLUXDBBUCKET, 'MEASUREMENT': MEASUREMENT, 'INFLUXDB_VERSION': INFLUXDB_VERSION,
|
||||||
|
'INODE': INODE, 'WEBSITE': website, 'INFLUXDBORG': INFLUXDBORG}, daemon=True, name=website) # NOQA
|
||||||
|
for thread in threading.enumerate():
|
||||||
|
thread_names.append(thread.name)
|
||||||
|
if website not in thread_names:
|
||||||
|
t.start()
|
||||||
|
else:
|
||||||
|
logging.info('Nginx log file %s not found', log)
|
||||||
|
print('Nginx log file %s not found' % log)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
|
|
|
@ -2,3 +2,4 @@ configparser==3.5.0
|
||||||
influxdb==5.2.0
|
influxdb==5.2.0
|
||||||
geoip2==2.9.0
|
geoip2==2.9.0
|
||||||
IPy==1.00
|
IPy==1.00
|
||||||
|
influxdb-client
|
||||||
|
|
|
@ -1,12 +1,20 @@
|
||||||
[NGINX_LOGS]
|
[NGINX_LOGS]
|
||||||
#Path for the log file (Nginx)
|
# Path for the log file (Nginx)
|
||||||
logpath = website1:/var/log/website1/access.log website2:/var/log/website2/access.log
|
logpath = website1:/var/log/website1/access.log website2:/var/log/website2/access.log
|
||||||
|
|
||||||
[GEOIP]
|
[GEOIP]
|
||||||
#Path for the GEOIP DB file
|
# Path for the GEOIP DB file
|
||||||
geoipdb = ./GeoLite2-City.mmdb
|
geoipdb = ./GeoLite2-City.mmdb
|
||||||
|
|
||||||
[INFLUXDB]
|
[INFLUXDB_VERSION]
|
||||||
|
# Version of the InfluxDB, 1 = old 1.8 and early, 2 = new 2.0 and more
|
||||||
|
# Set this parameter to 1 if you want use old InfluxDB version like 1.8
|
||||||
|
# Or set this parameter to 2 if you plan to use InfluxDB version 2.1 or modern
|
||||||
|
version = 1
|
||||||
|
|
||||||
|
[INFLUXDB1]
|
||||||
|
# This part of the config will be used only when [INFLUXDB_VERSION] set to 1
|
||||||
|
|
||||||
# Database URL
|
# Database URL
|
||||||
host = INFLUXDB_SERVER_IP
|
host = INFLUXDB_SERVER_IP
|
||||||
port = 8086
|
port = 8086
|
||||||
|
@ -20,3 +28,21 @@ password = INFLUXDB_USER_PASSWORD
|
||||||
|
|
||||||
# Measurement name
|
# Measurement name
|
||||||
measurement = geodata
|
measurement = geodata
|
||||||
|
|
||||||
|
[INFLUXDB2]
|
||||||
|
# This part of the config will be used only when [INFLUXDB_VERSION] set to 2
|
||||||
|
|
||||||
|
# InfluxDB server URL
|
||||||
|
url = INFLUXDB_SERVER_IP:PORT
|
||||||
|
|
||||||
|
# Token for authentication
|
||||||
|
token = ANY EXISTED USER TOKEN
|
||||||
|
|
||||||
|
# Organization is the name of the organization you wish to write to
|
||||||
|
organization = ORGANIZATION NAME
|
||||||
|
|
||||||
|
# Destination bucket to write into
|
||||||
|
bucket = SOME BUCKET name
|
||||||
|
|
||||||
|
# Measurement name
|
||||||
|
measurement = geodata
|
||||||
|
|
Loading…
Reference in a new issue