Python para Administradores de Sistemas: Automatización y Scripting Avanzado
Python se ha convertido en el lenguaje de facto para administradores de sistemas modernos. Su sintaxis clara, extensas librerías y capacidades de automatización lo hacen perfecto para gestionar infraestructura, monitorear sistemas y automatizar tareas repetitivas.
🐍 Objetivo: Dominar Python aplicado a administración de sistemas, desde scripts básicos hasta soluciones complejas de automatización, monitoreo y gestión de infraestructura.
🚀 Por qué Python para SysAdmins
Ventajas sobre bash scripting
Bash:
1
2
3
4
5
6
7
8
# Código difícil de mantener y escalar
for server in $(cat servers.txt); do
ssh $server "df -h | grep -E '^/dev' | awk '{print $5}' | sed 's/%//'" | while read usage; do
if [ $usage -gt 80 ]; then
echo "ALERT: $server disk usage: $usage%"
fi
done
done
Python equivalente:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import subprocess
import paramiko
from concurrent.futures import ThreadPoolExecutor
def check_disk_usage(server):
"""Verifica uso de disco en servidor remoto"""
try:
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(server)
stdin, stdout, stderr = ssh.exec_command("df -h / | tail -1 | awk '{print $5}' | sed 's/%//'")
usage = int(stdout.read().decode().strip())
if usage > 80:
return f"ALERT: {server} disk usage: {usage}%"
return f"OK: {server} disk usage: {usage}%"
except Exception as e:
return f"ERROR: {server} - {str(e)}"
finally:
ssh.close()
# Ejecución paralela
with open('servers.txt') as f:
servers = [line.strip() for line in f]
with ThreadPoolExecutor(max_workers=10) as executor:
results = list(executor.map(check_disk_usage, servers))
for result in results:
print(result)
Librerías esenciales para SysAdmins
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# Gestión del sistema operativo
import os, sys, subprocess, shutil, glob
from pathlib import Path
# Trabajo con archivos y text
import json, yaml, csv, configparser
import re # expresiones regulares
# Red y protocolos
import socket, requests, urllib
import paramiko # SSH
import smtplib # Email
# Concurrencia y paralelismo
import threading, multiprocessing
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
# Fechas y tiempo
import datetime, time
from dateutil import parser
# Monitoreo y métricas
import psutil # información del sistema
import logging # logging estructurado
# Bases de datos
import sqlite3, pymongo
from sqlalchemy import create_engine
📁 Gestión de Archivos y Directorios
Operaciones básicas con pathlib
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
from pathlib import Path
import shutil
import stat
class FileManager:
"""Gestión avanzada de archivos del sistema"""
def __init__(self, base_path):
self.base_path = Path(base_path)
def find_files(self, pattern, recursive=True):
"""Buscar archivos con patrón específico"""
if recursive:
return list(self.base_path.rglob(pattern))
else:
return list(self.base_path.glob(pattern))
def clean_old_files(self, days_old=7, pattern="*", dry_run=True):
"""Eliminar archivos antiguos"""
cutoff_date = datetime.now() - timedelta(days=days_old)
removed_files = []
for file_path in self.find_files(pattern):
if file_path.is_file():
file_time = datetime.fromtimestamp(file_path.stat().st_mtime)
if file_time < cutoff_date:
removed_files.append(str(file_path))
if not dry_run:
file_path.unlink()
print(f"Eliminado: {file_path}")
return removed_files
def backup_directory(self, dest_path, exclude_patterns=None):
"""Crear backup de directorio con exclusiones"""
exclude_patterns = exclude_patterns or []
dest_path = Path(dest_path)
def should_exclude(path):
for pattern in exclude_patterns:
if path.match(pattern):
return True
return False
for item in self.base_path.rglob('*'):
if should_exclude(item):
continue
relative_path = item.relative_to(self.base_path)
dest_item = dest_path / relative_path
if item.is_dir():
dest_item.mkdir(parents=True, exist_ok=True)
else:
dest_item.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(item, dest_item)
def set_permissions_recursive(self, file_perms=0o644, dir_perms=0o755):
"""Establecer permisos recursivamente"""
for item in self.base_path.rglob('*'):
if item.is_file():
item.chmod(file_perms)
elif item.is_dir():
item.chmod(dir_perms)
# Uso práctico
fm = FileManager('/var/log')
old_logs = fm.clean_old_files(days_old=30, pattern="*.log", dry_run=False)
print(f"Eliminados {len(old_logs)} archivos de log antiguos")
Monitoreo de cambios en archivos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import time
import hashlib
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
class ConfigFileMonitor(FileSystemEventHandler):
"""Monitor para cambios en archivos de configuración"""
def __init__(self, callback=None):
self.callback = callback
self.file_hashes = {}
def on_modified(self, event):
if event.is_directory:
return
file_path = event.src_path
current_hash = self.get_file_hash(file_path)
if file_path in self.file_hashes:
if self.file_hashes[file_path] != current_hash:
print(f"Archivo modificado: {file_path}")
if self.callback:
self.callback(file_path)
self.file_hashes[file_path] = current_hash
def get_file_hash(self, file_path):
"""Calcular hash MD5 de archivo"""
try:
with open(file_path, 'rb') as f:
return hashlib.md5(f.read()).hexdigest()
except IOError:
return None
def reload_nginx_config(file_path):
"""Callback para recargar configuración de nginx"""
if 'nginx' in file_path:
subprocess.run(['nginx', '-t']) # Test config
subprocess.run(['systemctl', 'reload', 'nginx'])
print("Nginx recargado exitosamente")
# Monitoreo en tiempo real
monitor = ConfigFileMonitor(callback=reload_nginx_config)
observer = Observer()
observer.schedule(monitor, '/etc/nginx/', recursive=True)
observer.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()
🌐 Automatización de Red y SSH
Cliente SSH robusto
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
import paramiko
import socket
from contextlib import contextmanager
class SSHManager:
"""Gestión robusta de conexiones SSH"""
def __init__(self, hostname, username, password=None, key_filename=None, port=22):
self.hostname = hostname
self.username = username
self.password = password
self.key_filename = key_filename
self.port = port
self.client = None
@contextmanager
def connect(self):
"""Context manager para conexiones SSH"""
try:
self.client = paramiko.SSHClient()
self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# Intentar conexión con clave primero, luego contraseña
try:
self.client.connect(
hostname=self.hostname,
username=self.username,
key_filename=self.key_filename,
port=self.port,
timeout=10
)
except paramiko.AuthenticationException:
if self.password:
self.client.connect(
hostname=self.hostname,
username=self.username,
password=self.password,
port=self.port,
timeout=10
)
else:
raise
yield self
except Exception as e:
print(f"Error conectando a {self.hostname}: {e}")
raise
finally:
if self.client:
self.client.close()
def execute_command(self, command, timeout=30):
"""Ejecutar comando remoto"""
stdin, stdout, stderr = self.client.exec_command(command, timeout=timeout)
exit_code = stdout.channel.recv_exit_status()
output = stdout.read().decode('utf-8')
error = stderr.read().decode('utf-8')
return {
'exit_code': exit_code,
'stdout': output,
'stderr': error
}
def upload_file(self, local_path, remote_path):
"""Subir archivo vía SFTP"""
sftp = self.client.open_sftp()
try:
sftp.put(local_path, remote_path)
return True
except Exception as e:
print(f"Error subiendo archivo: {e}")
return False
finally:
sftp.close()
def download_file(self, remote_path, local_path):
"""Descargar archivo vía SFTP"""
sftp = self.client.open_sftp()
try:
sftp.get(remote_path, local_path)
return True
except Exception as e:
print(f"Error descargando archivo: {e}")
return False
finally:
sftp.close()
# Gestión de múltiples servidores
class ServerFleet:
"""Gestión de flota de servidores"""
def __init__(self, config_file):
self.servers = self.load_config(config_file)
def load_config(self, config_file):
"""Cargar configuración de servidores"""
with open(config_file) as f:
return yaml.safe_load(f)
def execute_on_all(self, command, parallel=True):
"""Ejecutar comando en todos los servidores"""
if parallel:
return self._execute_parallel(command)
else:
return self._execute_sequential(command)
def _execute_parallel(self, command):
"""Ejecución paralela con ThreadPoolExecutor"""
def execute_on_server(server_config):
ssh = SSHManager(**server_config)
with ssh.connect():
result = ssh.execute_command(command)
return {
'server': server_config['hostname'],
'result': result
}
with ThreadPoolExecutor(max_workers=10) as executor:
futures = [executor.submit(execute_on_server, server)
for server in self.servers]
return [future.result() for future in futures]
def _execute_sequential(self, command):
"""Ejecución secuencial"""
results = []
for server_config in self.servers:
ssh = SSHManager(**server_config)
with ssh.connect():
result = ssh.execute_command(command)
results.append({
'server': server_config['hostname'],
'result': result
})
return results
# Ejemplo de uso
# servers.yaml
"""
- hostname: web1.example.com
username: admin
key_filename: /home/user/.ssh/id_rsa
- hostname: web2.example.com
username: admin
key_filename: /home/user/.ssh/id_rsa
- hostname: db1.example.com
username: admin
password: secret_password
"""
fleet = ServerFleet('servers.yaml')
results = fleet.execute_on_all('uptime')
for result in results:
print(f"{result['server']}: {result['result']['stdout'].strip()}")
Monitoreo de red y puertos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
import socket
import requests
from concurrent.futures import ThreadPoolExecutor
import time
class NetworkMonitor:
"""Monitor de servicios de red"""
def __init__(self):
self.results = []
def check_port(self, host, port, timeout=5):
"""Verificar si un puerto está abierto"""
try:
with socket.create_connection((host, port), timeout=timeout):
return True
except (socket.timeout, socket.error):
return False
def check_http_service(self, url, expected_status=200, timeout=10):
"""Verificar servicio HTTP/HTTPS"""
try:
response = requests.get(url, timeout=timeout)
return {
'url': url,
'status_code': response.status_code,
'response_time': response.elapsed.total_seconds(),
'success': response.status_code == expected_status
}
except requests.RequestException as e:
return {
'url': url,
'error': str(e),
'success': False
}
def ping_host(self, host):
"""Ping a host usando subprocess"""
try:
result = subprocess.run(
['ping', '-c', '1', '-W', '2', host],
capture_output=True,
text=True
)
return result.returncode == 0
except Exception:
return False
def comprehensive_check(self, target):
"""Verificación completa de un objetivo"""
results = {
'target': target,
'timestamp': datetime.now().isoformat(),
'checks': {}
}
# Parse target (puede ser URL o host:port)
if target.startswith('http'):
results['checks']['http'] = self.check_http_service(target)
host = target.split('//')[1].split('/')[0].split(':')[0]
else:
host = target.split(':')[0]
port = int(target.split(':')[1]) if ':' in target else 80
results['checks']['port'] = self.check_port(host, port)
results['checks']['ping'] = self.ping_host(host)
return results
def monitor_services(self, targets, interval=60):
"""Monitoreo continuo de servicios"""
while True:
print(f"\n--- Verificación de servicios: {datetime.now()} ---")
with ThreadPoolExecutor(max_workers=len(targets)) as executor:
futures = {executor.submit(self.comprehensive_check, target): target
for target in targets}
for future in futures:
try:
result = future.result()
self.print_status(result)
self.results.append(result)
except Exception as e:
print(f"Error verificando {futures[future]}: {e}")
time.sleep(interval)
def print_status(self, result):
"""Imprimir estado de verificación"""
target = result['target']
checks = result['checks']
status_symbols = {True: '✅', False: '❌'}
print(f"{target}:")
for check_type, check_result in checks.items():
if isinstance(check_result, bool):
symbol = status_symbols[check_result]
print(f" {check_type}: {symbol}")
elif isinstance(check_result, dict):
success = check_result.get('success', False)
symbol = status_symbols[success]
print(f" {check_type}: {symbol}")
if 'response_time' in check_result:
print(f" Response time: {check_result['response_time']:.2f}s")
# Uso del monitor
targets = [
'https://google.com',
'https://github.com',
'web1.example.com:80',
'db1.example.com:3306'
]
monitor = NetworkMonitor()
# monitor.monitor_services(targets, interval=30) # Descomenta para monitoreo continuo
📊 Monitoreo del Sistema con psutil
Monitor completo del sistema
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
import psutil
import json
import time
from datetime import datetime
import smtplib
from email.mime.text import MIMEText
class SystemMonitor:
"""Monitor completo del sistema"""
def __init__(self, config=None):
self.config = config or self.default_config()
self.alerts_sent = set()
def default_config(self):
return {
'cpu_threshold': 80,
'memory_threshold': 85,
'disk_threshold': 90,
'smtp_server': 'localhost',
'smtp_port': 587,
'alert_email': 'admin@example.com',
'check_interval': 60
}
def get_cpu_info(self):
"""Información detallada de CPU"""
return {
'usage_percent': psutil.cpu_percent(interval=1),
'usage_per_cpu': psutil.cpu_percent(interval=1, percpu=True),
'load_average': psutil.getloadavg(),
'cpu_count': psutil.cpu_count(),
'cpu_freq': psutil.cpu_freq()._asdict() if psutil.cpu_freq() else None
}
def get_memory_info(self):
"""Información de memoria"""
virtual = psutil.virtual_memory()
swap = psutil.swap_memory()
return {
'virtual': {
'total': virtual.total,
'available': virtual.available,
'used': virtual.used,
'percent': virtual.percent
},
'swap': {
'total': swap.total,
'used': swap.used,
'percent': swap.percent
}
}
def get_disk_info(self):
"""Información de discos"""
disk_info = []
for partition in psutil.disk_partitions():
try:
usage = psutil.disk_usage(partition.mountpoint)
disk_info.append({
'device': partition.device,
'mountpoint': partition.mountpoint,
'fstype': partition.fstype,
'total': usage.total,
'used': usage.used,
'free': usage.free,
'percent': (usage.used / usage.total) * 100
})
except PermissionError:
continue
return disk_info
def get_network_info(self):
"""Información de red"""
net_io = psutil.net_io_counters()
connections = len(psutil.net_connections())
return {
'io_counters': {
'bytes_sent': net_io.bytes_sent,
'bytes_recv': net_io.bytes_recv,
'packets_sent': net_io.packets_sent,
'packets_recv': net_io.packets_recv
},
'connections_count': connections
}
def get_process_info(self, limit=10):
"""Top procesos por CPU y memoria"""
processes = []
for proc in psutil.process_iter(['pid', 'name', 'cpu_percent', 'memory_percent', 'username']):
try:
processes.append(proc.info)
except (psutil.NoSuchProcess, psutil.AccessDenied):
pass
# Top por CPU
top_cpu = sorted(processes, key=lambda x: x['cpu_percent'] or 0, reverse=True)[:limit]
# Top por memoria
top_memory = sorted(processes, key=lambda x: x['memory_percent'] or 0, reverse=True)[:limit]
return {
'top_cpu': top_cpu,
'top_memory': top_memory,
'total_processes': len(processes)
}
def collect_metrics(self):
"""Recopilar todas las métricas"""
return {
'timestamp': datetime.now().isoformat(),
'cpu': self.get_cpu_info(),
'memory': self.get_memory_info(),
'disk': self.get_disk_info(),
'network': self.get_network_info(),
'processes': self.get_process_info()
}
def check_alerts(self, metrics):
"""Verificar condiciones de alerta"""
alerts = []
# CPU alert
if metrics['cpu']['usage_percent'] > self.config['cpu_threshold']:
alerts.append(f"HIGH CPU: {metrics['cpu']['usage_percent']:.1f}%")
# Memory alert
if metrics['memory']['virtual']['percent'] > self.config['memory_threshold']:
alerts.append(f"HIGH MEMORY: {metrics['memory']['virtual']['percent']:.1f}%")
# Disk alerts
for disk in metrics['disk']:
if disk['percent'] > self.config['disk_threshold']:
alerts.append(f"HIGH DISK: {disk['mountpoint']} {disk['percent']:.1f}%")
return alerts
def send_alert(self, alerts):
"""Enviar alerta por email"""
if not alerts:
return
alert_key = "|".join(sorted(alerts))
if alert_key in self.alerts_sent:
return # No enviar alertas duplicadas
subject = f"System Alert - {socket.gethostname()}"
body = f"Se detectaron las siguientes alertas:\n\n" + "\n".join(alerts)
try:
msg = MIMEText(body)
msg['Subject'] = subject
msg['From'] = 'system@example.com'
msg['To'] = self.config['alert_email']
with smtplib.SMTP(self.config['smtp_server'], self.config['smtp_port']) as server:
server.send_message(msg)
self.alerts_sent.add(alert_key)
print(f"Alerta enviada: {subject}")
except Exception as e:
print(f"Error enviando alerta: {e}")
def save_metrics(self, metrics, filename=None):
"""Guardar métricas en archivo"""
if filename is None:
filename = f"/var/log/system_metrics_{datetime.now().strftime('%Y%m%d')}.json"
with open(filename, 'a') as f:
f.write(json.dumps(metrics) + '\n')
def run_monitoring(self):
"""Ejecutar monitoreo continuo"""
print("Iniciando monitoreo del sistema...")
while True:
try:
metrics = self.collect_metrics()
alerts = self.check_alerts(metrics)
# Mostrar estado actual
print(f"\n[{metrics['timestamp']}] Sistema:")
print(f" CPU: {metrics['cpu']['usage_percent']:.1f}%")
print(f" Memoria: {metrics['memory']['virtual']['percent']:.1f}%")
print(f" Procesos: {metrics['processes']['total_processes']}")
if alerts:
print(f" 🚨 ALERTAS: {', '.join(alerts)}")
self.send_alert(alerts)
# Guardar métricas
self.save_metrics(metrics)
time.sleep(self.config['check_interval'])
except KeyboardInterrupt:
print("\nMonitoreo detenido por usuario")
break
except Exception as e:
print(f"Error en monitoreo: {e}")
time.sleep(self.config['check_interval'])
# Uso del monitor
if __name__ == "__main__":
monitor = SystemMonitor({
'cpu_threshold': 75,
'memory_threshold': 80,
'disk_threshold': 85,
'check_interval': 30
})
# monitor.run_monitoring() # Descomenta para ejecutar
🗄️ Gestión de Bases de Datos
Cliente universal de base de datos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
import sqlite3
import psycopg2
from sqlalchemy import create_engine
import pymongo
from contextlib import contextmanager
class DatabaseManager:
"""Gestor universal de bases de datos"""
def __init__(self, db_type, **kwargs):
self.db_type = db_type.lower()
self.config = kwargs
self.connection = None
@contextmanager
def connect(self):
"""Context manager para conexiones"""
try:
if self.db_type == 'sqlite':
self.connection = sqlite3.connect(self.config['database'])
self.connection.row_factory = sqlite3.Row
elif self.db_type == 'postgresql':
self.connection = psycopg2.connect(**self.config)
elif self.db_type == 'mysql':
import mysql.connector
self.connection = mysql.connector.connect(**self.config)
elif self.db_type == 'mongodb':
client = pymongo.MongoClient(**self.config)
self.connection = client[self.config.get('database', 'admin')]
yield self
except Exception as e:
print(f"Error conectando a base de datos: {e}")
raise
finally:
if self.connection and self.db_type != 'mongodb':
self.connection.close()
def execute_query(self, query, params=None):
"""Ejecutar query SQL"""
if self.db_type == 'mongodb':
raise ValueError("Use métodos específicos de MongoDB")
cursor = self.connection.cursor()
try:
if params:
cursor.execute(query, params)
else:
cursor.execute(query)
if query.strip().upper().startswith('SELECT'):
return cursor.fetchall()
else:
self.connection.commit()
return cursor.rowcount
except Exception as e:
self.connection.rollback()
raise e
finally:
cursor.close()
def backup_database(self, output_file):
"""Crear backup de base de datos"""
if self.db_type == 'postgresql':
cmd = [
'pg_dump',
'-h', self.config.get('host', 'localhost'),
'-U', self.config['user'],
'-d', self.config['database'],
'-f', output_file
]
subprocess.run(cmd, env={'PGPASSWORD': self.config['password']})
elif self.db_type == 'mysql':
cmd = [
'mysqldump',
'-h', self.config.get('host', 'localhost'),
'-u', self.config['user'],
f'-p{self.config["password"]}',
self.config['database']
]
with open(output_file, 'w') as f:
subprocess.run(cmd, stdout=f)
elif self.db_type == 'sqlite':
shutil.copy2(self.config['database'], output_file)
elif self.db_type == 'mongodb':
cmd = [
'mongodump',
'--host', f"{self.config.get('host', 'localhost')}:{self.config.get('port', 27017)}",
'--db', self.config['database'],
'--out', output_file
]
subprocess.run(cmd)
# Ejemplo de monitoreo de bases de datos
class DatabaseMonitor:
"""Monitor de salud de bases de datos"""
def __init__(self):
self.databases = {}
def add_database(self, name, db_manager):
"""Añadir base de datos al monitoreo"""
self.databases[name] = db_manager
def check_connection(self, name):
"""Verificar conectividad"""
try:
with self.databases[name].connect():
return True
except Exception as e:
print(f"Error conectando a {name}: {e}")
return False
def get_database_size(self, name):
"""Obtener tamaño de base de datos"""
db_manager = self.databases[name]
with db_manager.connect():
if db_manager.db_type == 'postgresql':
query = """
SELECT pg_size_pretty(pg_database_size(current_database())) as size
"""
result = db_manager.execute_query(query)
return result[0][0]
elif db_manager.db_type == 'mysql':
query = """
SELECT ROUND(SUM(data_length + index_length) / 1024 / 1024, 1) AS 'DB Size in MB'
FROM information_schema.tables
WHERE table_schema = DATABASE()
"""
result = db_manager.execute_query(query)
return f"{result[0][0]} MB"
elif db_manager.db_type == 'sqlite':
file_size = os.path.getsize(db_manager.config['database'])
return f"{file_size / 1024 / 1024:.1f} MB"
def monitor_all(self):
"""Monitorear todas las bases de datos"""
print("Estado de bases de datos:")
for name, db_manager in self.databases.items():
connected = self.check_connection(name)
status = "✅ Online" if connected else "❌ Offline"
if connected:
try:
size = self.get_database_size(name)
print(f" {name}: {status} - Size: {size}")
except Exception as e:
print(f" {name}: {status} - Error getting size: {e}")
else:
print(f" {name}: {status}")
# Configuración de ejemplo
pg_config = {
'host': 'localhost',
'database': 'myapp',
'user': 'admin',
'password': 'password'
}
mysql_config = {
'host': 'localhost',
'database': 'myapp',
'user': 'admin',
'password': 'password'
}
# Uso del monitor
monitor = DatabaseMonitor()
monitor.add_database('postgres', DatabaseManager('postgresql', **pg_config))
monitor.add_database('mysql', DatabaseManager('mysql', **mysql_config))
monitor.monitor_all()
📈 Logging y Métricas
Sistema de logging estructurado
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
import logging
import json
import sys
from datetime import datetime
from logging.handlers import RotatingFileHandler, SysLogHandler
class StructuredLogger:
"""Logger estructurado para aplicaciones de sistema"""
def __init__(self, name, level=logging.INFO):
self.logger = logging.getLogger(name)
self.logger.setLevel(level)
# Evitar handlers duplicados
if not self.logger.handlers:
self.setup_handlers()
def setup_handlers(self):
"""Configurar handlers de logging"""
# Console handler
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(logging.INFO)
console_formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
console_handler.setFormatter(console_formatter)
# File handler con rotación
file_handler = RotatingFileHandler(
'/var/log/sysadmin.log',
maxBytes=10*1024*1024, # 10MB
backupCount=5
)
file_handler.setLevel(logging.DEBUG)
file_formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(funcName)s:%(lineno)d - %(message)s'
)
file_handler.setFormatter(file_formatter)
# Syslog handler
try:
syslog_handler = SysLogHandler(address='/dev/log')
syslog_handler.setLevel(logging.WARNING)
syslog_formatter = logging.Formatter(
'%(name)s: %(levelname)s %(message)s'
)
syslog_handler.setFormatter(syslog_formatter)
self.logger.addHandler(syslog_handler)
except Exception:
pass # Syslog no disponible
self.logger.addHandler(console_handler)
self.logger.addHandler(file_handler)
def log_structured(self, level, message, **kwargs):
"""Log con datos estructurados"""
log_data = {
'timestamp': datetime.now().isoformat(),
'message': message,
'hostname': socket.gethostname(),
**kwargs
}
# Log como JSON para facilitar parsing
json_message = json.dumps(log_data)
self.logger.log(level, json_message)
def info(self, message, **kwargs):
self.log_structured(logging.INFO, message, **kwargs)
def warning(self, message, **kwargs):
self.log_structured(logging.WARNING, message, **kwargs)
def error(self, message, **kwargs):
self.log_structured(logging.ERROR, message, **kwargs)
def critical(self, message, **kwargs):
self.log_structured(logging.CRITICAL, message, **kwargs)
# Decorador para logging automático
def log_execution(logger):
"""Decorador para loggear ejecución de funciones"""
def decorator(func):
def wrapper(*args, **kwargs):
start_time = time.time()
logger.info(f"Iniciando ejecución de {func.__name__}",
function=func.__name__, args=str(args)[:100])
try:
result = func(*args, **kwargs)
execution_time = time.time() - start_time
logger.info(f"Completado {func.__name__}",
function=func.__name__,
execution_time=execution_time,
success=True)
return result
except Exception as e:
execution_time = time.time() - start_time
logger.error(f"Error en {func.__name__}: {str(e)}",
function=func.__name__,
execution_time=execution_time,
error=str(e),
success=False)
raise
return wrapper
return decorator
# Uso del sistema de logging
logger = StructuredLogger('sysadmin_scripts')
@log_execution(logger)
def deploy_application(app_name, version):
"""Ejemplo de función con logging automático"""
logger.info("Iniciando deployment", app=app_name, version=version)
# Simular deployment
time.sleep(2)
if app_name == "fail_app":
raise Exception("Deployment failed")
logger.info("Deployment completado exitosamente",
app=app_name, version=version)
return True
# Ejemplo de uso
try:
deploy_application("web_app", "1.2.3")
deploy_application("fail_app", "1.0.0")
except Exception:
pass
🎓 Conclusión
Python ofrece herramientas poderosas para automatizar prácticamente cualquier tarea de administración de sistemas:
Puntos clave cubiertos:
- Gestión de archivos con pathlib y monitoreo de cambios
- Automatización SSH para gestión de flotas de servidores
- Monitoreo del sistema con psutil y alertas automatizadas
- Gestión de bases de datos multiplataforma
- Logging estructurado para debugging y auditoría
- Monitoreo de red y servicios
🧠 Teoría Avanzada: Fundamentos de la Automatización
Paradigmas de Programación en Administración de Sistemas
Programación Imperativa vs Declarativa:
Imperativa (Cómo hacer):
1
2
3
4
5
6
7
# Bash tradicional - imperativo
servers = ['web1', 'web2', 'web3']
for server in servers:
ssh.connect(server)
ssh.execute('systemctl restart nginx')
ssh.execute('systemctl enable nginx')
ssh.disconnect()
Declarativa (Qué queremos):
1
2
3
4
5
6
7
8
9
10
# Enfoque declarativo con estado deseado
desired_state = {
'service': 'nginx',
'state': 'running',
'enabled': True
}
# El framework determina los pasos necesarios
for server in servers:
ensure_service_state(server, desired_state)
Ventajas del enfoque declarativo:
- Idempotencia: Ejecutar múltiples veces produce el mismo resultado
- Predictibilidad: El estado final es conocido y verificable
- Reversibilidad: Fácil rollback a estados anteriores
Teoría de Concurrencia y Paralelismo
Diferencias fundamentales:
Concurrencia (Concurrency):
- Definición: Múltiples tareas parecen ejecutarse simultáneamente
- Implementación: Threading, asyncio
- Uso: I/O bound operations (SSH, HTTP requests, file operations)
Paralelismo (Parallelism):
- Definición: Múltiples tareas realmente se ejecutan simultáneamente
- Implementación: Multiprocessing
- Uso: CPU bound operations (cálculos intensivos, compresión)
Global Interpreter Lock (GIL) en Python:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import threading
import multiprocessing
import time
# Threading - NO paralelismo real para CPU-bound
def cpu_bound_task():
return sum(i*i for i in range(1000000))
# Multiprocessing - paralelismo real
def parallel_processing():
with multiprocessing.Pool() as pool:
results = pool.map(cpu_bound_task, range(4))
return results
# AsyncIO - concurrencia para I/O-bound
import asyncio
async def io_bound_task():
await asyncio.sleep(1) # Simula operación I/O
return "completed"
Patrones de Diseño en Automatización
1. Factory Pattern - Creación de objetos:
1
2
3
4
5
6
7
8
9
10
11
class ConnectionFactory:
@staticmethod
def create_connection(conn_type, **kwargs):
if conn_type == 'ssh':
return SSHConnection(**kwargs)
elif conn_type == 'database':
return DatabaseConnection(**kwargs)
elif conn_type == 'http':
return HTTPConnection(**kwargs)
else:
raise ValueError(f"Unknown connection type: {conn_type}")
2. Observer Pattern - Monitoreo de eventos:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class SystemEventObserver:
def __init__(self):
self.observers = []
def attach(self, observer):
self.observers.append(observer)
def notify(self, event):
for observer in self.observers:
observer.update(event)
class AlertManager:
def update(self, event):
if event.severity == 'critical':
self.send_alert(event)
3. Strategy Pattern - Múltiples algoritmos:
1
2
3
4
5
6
7
8
9
10
11
12
class BackupStrategy:
def backup(self, data): pass
class IncrementalBackup(BackupStrategy):
def backup(self, data):
# Lógica de backup incremental
pass
class FullBackup(BackupStrategy):
def backup(self, data):
# Lógica de backup completo
pass
Teoría de Sistemas Distribuidos
Propiedades fundamentales (CAP Theorem):
- Consistency: Todos los nodos ven los mismos datos simultáneamente
- Availability: El sistema sigue operativo ante fallos
- Partition Tolerance: El sistema continúa funcionando ante fallos de red
Implicaciones para SysAdmins:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# Eventual Consistency en configuración distribuida
class DistributedConfig:
def __init__(self):
self.nodes = []
self.consistency_level = 'eventual'
def update_config(self, key, value):
# Envía actualización a mayoría de nodos
success_count = 0
for node in self.nodes:
if node.update(key, value):
success_count += 1
# Quorum-based consistency
return success_count > len(self.nodes) // 2
Error Handling y Resilience Patterns
Circuit Breaker Pattern:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import time
from enum import Enum
class CircuitState(Enum):
CLOSED = 1 # Normal operation
OPEN = 2 # Failing fast
HALF_OPEN = 3 # Testing recovery
class CircuitBreaker:
def __init__(self, failure_threshold=5, timeout=60):
self.failure_threshold = failure_threshold
self.timeout = timeout
self.failure_count = 0
self.last_failure_time = None
self.state = CircuitState.CLOSED
def call(self, func, *args, **kwargs):
if self.state == CircuitState.OPEN:
if time.time() - self.last_failure_time > self.timeout:
self.state = CircuitState.HALF_OPEN
else:
raise Exception("Circuit breaker is OPEN")
try:
result = func(*args, **kwargs)
self.on_success()
return result
except Exception as e:
self.on_failure()
raise e
def on_success(self):
self.failure_count = 0
self.state = CircuitState.CLOSED
def on_failure(self):
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
Retry Pattern con Exponential Backoff:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import time
import random
from functools import wraps
def retry_with_backoff(max_retries=3, base_delay=1, max_delay=60):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_retries + 1):
try:
return func(*args, **kwargs)
except Exception as e:
if attempt == max_retries:
raise e
# Exponential backoff with jitter
delay = min(base_delay * (2 ** attempt), max_delay)
jitter = random.uniform(0, delay * 0.1)
time.sleep(delay + jitter)
print(f"Retry {attempt + 1}/{max_retries} after {delay:.2f}s")
return wrapper
return decorator
@retry_with_backoff(max_retries=3, base_delay=2)
def unreliable_api_call():
# Función que puede fallar
pass
Teoría de Observabilidad
Los Tres Pilares de la Observabilidad:
1. Logging (Qué pasó):
1
2
3
4
5
6
# Structured logging para máquinas
logger.info("User login",
user_id=12345,
ip_address="192.168.1.100",
session_id="abc123",
timestamp=datetime.now().isoformat())
2. Metrics (Cuánto pasó):
1
2
3
4
5
6
7
# Time series data
metrics = {
'cpu_usage': 75.5,
'memory_usage': 82.1,
'disk_io_read': 1024000,
'network_bytes_in': 5500000
}
3. Tracing (Por dónde pasó):
1
2
3
4
5
6
7
8
# Distributed tracing
@trace_span("database_query")
def get_user(user_id):
with trace_span("validate_input"):
validate_user_id(user_id)
with trace_span("database_lookup"):
return database.query(f"SELECT * FROM users WHERE id = {user_id}")
Testing en Automatización
Pirámide de Testing:
1
2
3
4
5
E2E Tests (Pocos, lentos, costosos)
↗ ↖
Integration Tests (Algunos, medianos)
↗ ↖
Unit Tests (Muchos, rápidos, baratos)
Implementación práctica:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import unittest
from unittest.mock import patch, MagicMock
class TestSystemMonitor(unittest.TestCase):
def setUp(self):
self.monitor = SystemMonitor()
@patch('psutil.cpu_percent')
def test_cpu_alert_threshold(self, mock_cpu):
# Unit test - componente aislado
mock_cpu.return_value = 85
metrics = self.monitor.collect_metrics()
alerts = self.monitor.check_alerts(metrics)
self.assertIn('HIGH CPU', alerts[0])
@patch('subprocess.run')
def test_ssh_connection_integration(self, mock_subprocess):
# Integration test - múltiples componentes
mock_subprocess.return_value.returncode = 0
mock_subprocess.return_value.stdout = b"uptime output"
ssh = SSHManager('test-server', 'test-user')
with ssh.connect():
result = ssh.execute_command('uptime')
self.assertEqual(result['exit_code'], 0)
# Property-based testing para casos edge
from hypothesis import given, strategies as st
@given(st.integers(min_value=0, max_value=100))
def test_cpu_percentage_always_valid(cpu_percent):
monitor = SystemMonitor()
# Test que cualquier porcentaje válido no cause errores
assert 0 <= cpu_percent <= 100
Infrastructure as Code - Principios Teóricos
Immutable Infrastructure:
1
2
3
4
5
6
7
8
9
10
11
# En lugar de modificar servidores existentes
def patch_server(server):
server.update_package('nginx')
server.restart_service('nginx')
# Crear nueva instancia con configuración deseada
def deploy_new_version(config):
new_server = create_server(config)
test_server(new_server)
swap_traffic(old_server, new_server)
terminate_server(old_server)
GitOps Workflow:
1
2
3
4
5
6
7
8
9
10
11
Git Repository (Source of Truth)
↓
CI/CD Pipeline
↓
Automated Deployment
↓
Production Environment
↓
Monitoring & Feedback
↓
Git Repository (Updates)
Mejores prácticas:
- Usar context managers para recursos (archivos, conexiones)
- Implementar logging estructurado desde el inicio
- Manejo de errores robusto con try/except
- Paralelización para operaciones en múltiples servidores
- Configuración externa (YAML, JSON) para flexibilidad
- Testing automático de scripts críticos
- Versionado de scripts y configuraciones
- Documentación inline y external
Recursos recomendados:
- Fabric: Deployment y ejecución remota simplificada
- Ansible: Automatización de infraestructura
- Docker SDK: Gestión de containers desde Python
- Kubernetes Python Client: Orquestación de containers
- Prometheus Python Client: Métricas y monitoreo
- pytest: Framework de testing avanzado
- Black: Formateo automático de código
- mypy: Type checking para Python
📝 Reflexiones Finales
Python ha revolucionado la administración de sistemas moderna porque combina:
- Simplicidad sintáctica que permite enfocarse en la lógica del problema
- Ecosistema rico de librerías especializadas
- Capacidades de integración con prácticamente cualquier sistema
- Escalabilidad desde scripts simples hasta sistemas distribuidos complejos
Como administradores de sistemas, Python nos permite elevar nuestro trabajo desde tareas manuales repetitivas hacia ingeniería de sistemas sofisticada. La clave está en entender no solo cómo usar las herramientas, sino por qué funcionan de cierta manera y cuándo aplicar cada patrón.
El futuro de la administración de sistemas está en la automatización inteligente, donde los sistemas se auto-gestionan, auto-reparan y auto-optimizan. Python es nuestra herramienta principal para construir ese futuro.
¿Tienes una tarea específica de administración que quieres automatizar? ¡Comparte tu caso y te ayudo a implementarlo!
Andrés Nuñez - t4ifi