The OpenNET Project / Index page

[ новости /+++ | форум | теги | ]



"прячем ssh"
Вариант для распечатки  
Пред. тема | След. тема 
Форум Информационная безопасность (Безопасность системы / Linux)
Изначальное сообщение [ Отслеживать ]

"прячем ssh"  +/
Сообщение от грустныйслоник (-), 23-Мрт-26, 16:03 
Привет.
Сижу на больничном, решил обломать ботов долбящих ssh на моем сервачке.
Оцените кому не лень пару скриптов  на питоне, может пользу принесут.

Сервер:
Запускается, проверяет наличие открытого ключа, если его нет просит ввести и пишет в базу, слушает порт 9999.
Тем кто подключается выдает строку 512 байт и ждет в ответ её escda подпись.
Проверяет подпись, если верна создает отсутствующий файл /tmp/_Good_check_ или удаляет его если он существует.
Если подпись не верна или передан мусор рвет соединение.

Клиент:
Содает пару ключей, пишет в базу закрытый, открытый показывает для ввода на сервере.
подключается к серверу, получает строку, подписывает, отправляет обратно.

Смысл этого в том чтобы на наличие или отсутствие файла /tmp/_Good_check_ повесить задание в кроне,
например файл существует - запускаем sshd, файла нет - выключаем sshd, или правила файрвола менять.
Скрипт не требует привилегий можно обмазать firejail-ом или чем-то подобным, протестил на скармливание больших данных, мусора, большую нагрузку.

#==========================================>server:

#!/usr/bin/env python3
import sqlite3
import socket
import os
import tempfile
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes

def init_db():
    conn = sqlite3.connect('server.db')
    c = conn.cursor()
    c.execute('''CREATE TABLE IF NOT EXISTS server_config (id INTEGER PRIMARY KEY, public_key BLOB)''')
    conn.commit()
    conn.close()

def get_public_key():
    conn = sqlite3.connect('server.db')
    c = conn.cursor()
    c.execute("SELECT public_key FROM server_config LIMIT 1")
    result = c.fetchone()
    conn.close()
    return result[0] if result else None

def main():
    init_db()
    public_key_bytes = get_public_key()
    
    if public_key_bytes is None:
        print("No public key found. Enter client's public key (DER hex):")
        user_input = input().strip()
        try:
            public_key_bytes = bytes.fromhex(user_input)
            if len(public_key_bytes) < 10:
                raise ValueError("Invalid public key length")
            conn = sqlite3.connect('server.db')
            c = conn.cursor()
            c.execute("INSERT INTO server_config (public_key) VALUES (?)", (public_key_bytes,))
            conn.commit()
            conn.close()
            print("Public key stored.")
        except ValueError as e:
            print(f"Invalid hex input: {e}. Exiting.")
            exit(1)
#ipv4
#    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
#    server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
#    server_socket.bind(('0.0.0.0', 9999)) #ipv4
#    server_socket.listen()
#ipv6
#    server_socket = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
#    server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
#    server_socket.bind(('::', 9999))  # :: - это все IPv6 адреса
#    server_socket.listen()
#universal Используем AF_INET6 с IPV6_V6ONLY = 0 для совместимости
    server_socket = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
    server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    server_socket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0)  # Позволяет IPv4 через IPv6
    server_socket.bind(('::', 9999))
    server_socket.listen()

    print("Server listening on port 9999...")
    print("Server will reject connections if signature exceeds 80 bytes")
    
    while True:
        try:
            client_socket, addr = server_socket.accept()
            print(f"Connection from {addr}")
            
            challenge = os.urandom(512)
            client_socket.sendall(challenge)
            
            # Ограничение на размер подписи до 80 байт #от 70 до 72 видел, может больше бывает, ставим 80.
            signature = b''
            total_received = 0
            max_signature_size = 80
            
            try:
                while total_received < max_signature_size:
                    chunk = client_socket.recv(1024)
                    if not chunk:
                        break
                    signature += chunk
                    total_received += len(chunk)
                    
                    # Проверка на превышение лимита
                    if total_received >= max_signature_size:
                        print(f"Signature size limit exceeded: {total_received} bytes")
                        client_socket.close()
                        break
                        
            except Exception as e:
                print(f"Error receiving signature: {e}")
                client_socket.close()
                continue
            
            # Закрываем соединение после получения данных
            client_socket.close()
            
            # Проверяем размер подписи
            if total_received > max_signature_size:
                print(f"Signature too large: {total_received} bytes (limit: {max_signature_size})")
                continue
            
            # Проверяем, что получили данные
            if len(signature) == 0:
                print("Empty signature received")
                continue
            
            print(f"Received signature: {len(signature)} bytes")
            
            try:
                public_key = serialization.load_der_public_key(public_key_bytes)
                public_key.verify(signature, challenge, ec.ECDSA(hashes.SHA256()))

                if os.path.isfile('/tmp/_Good_check_'):
                # Файл существует - удаляем его
                    os.remove('/tmp/_Good_check_')
                    print(f"Файл /tmp/_Good_check_ удален.")
                else:
                # Файла нет - создаем его
                    with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.check') as f:
                      f.write('Signature valid')
                      temp_path = f.name
                    os.rename(temp_path, '/tmp/_Good_check_')
                    print(f"Файл /tmp/_Good_check_ создан.")

                print("Signature verified successfully")
            except Exception as e:
                print(f"Verification failed: {str(e)}")
                
        except Exception as e:
            print(f"Error handling connection: {e}")
            continue

if __name__ == '__main__':
    main()

#============================================>client:

#!/usr/bin/env python3

import sqlite3
import socket
import os
from cryptography.hazmat.primitives import serialization, hashes
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.backends import default_backend

def init_db():
    conn = sqlite3.connect('client.db')
    c = conn.cursor()
    c.execute('''CREATE TABLE IF NOT EXISTS client_keys (id INTEGER PRIMARY KEY, private_key BLOB, public_key BLOB)''')
    conn.commit()
    conn.close()

def get_client_keys():
    conn = sqlite3.connect('client.db')
    c = conn.cursor()
    c.execute("SELECT private_key, public_key FROM client_keys LIMIT 1")
    result = c.fetchone()
    conn.close()
    return result if result else (None, None)

def generate_keys():
    priv_key = ec.generate_private_key(ec.SECP256K1(), backend=default_backend())
    pub_key = priv_key.public_key()
    
    private_bytes = priv_key.private_bytes(
        encoding=serialization.Encoding.DER,
        format=serialization.PrivateFormat.PKCS8,
        encryption_algorithm=serialization.NoEncryption()
    )
    public_bytes = pub_key.public_bytes(
        encoding=serialization.Encoding.DER,
        format=serialization.PublicFormat.SubjectPublicKeyInfo
    )
    
    conn = sqlite3.connect('client.db')
    c = conn.cursor()
    c.execute("INSERT INTO client_keys (private_key, public_key) VALUES (?, ?)", (private_bytes, public_bytes))
    conn.commit()
    conn.close()
    
    print(f"Public key (DER hex): {public_bytes.hex()}")

def main():
    init_db()
    private_key_bytes, public_key_bytes = get_client_keys()
    
    if private_key_bytes is None:
        generate_keys()
        private_key_bytes, public_key_bytes = get_client_keys()
    
    try:
        private_key = serialization.load_der_private_key(
            private_key_bytes,
            password=None,
            backend=default_backend()
        )
    except Exception as e:
        print(f"Key load failed: {str(e)}")
        exit(1)
    
#ipv4
#    client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
#    client_socket.connect(('localhost', 9999))
#ipv6
#    client_socket = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
#    client_socket.connect(('x:x:x:x:x:x:x:x', 9999))
#universal
    addr_info = socket.getaddrinfo('127.0.0.1', 9999, socket.AF_UNSPEC, socket.SOCK_STREAM)
    client_socket = socket.socket(addr_info[0][0], addr_info[0][1])
    client_socket.connect(addr_info[0][4])

    
    challenge = client_socket.recv(4096)
    if not challenge:
        print("No challenge received.")
        exit(1)
    
    try:
        signature = private_key.sign(challenge, ec.ECDSA(hashes.SHA256()))
    except Exception as e:
        print(f"Signing failed: {str(e)}")
        exit(1)
    
    client_socket.sendall(signature)    #sig
#    client_socket.sendall(challenge)    #trash
    client_socket.close()
    sig_len=len(signature)
    print("Signature sent. Sig_len:", sig_len)

#    chall = challenge.hex()
#    print("c:",chall)
#    sig = signature.hex()
#    print("s:",sig)

if __name__ == '__main__':
    main()

Ответить | Правка | Cообщить модератору


Архив | Удалить

Рекомендовать для помещения в FAQ | Индекс форумов | Темы | Пред. тема | След. тема



Партнёры:
PostgresPro
Inferno Solutions
Hosting by Hoster.ru
Хостинг:

Закладки на сайте
Проследить за страницей
Created 1996-2026 by Maxim Chirkov
Добавить, Поддержать, Вебмастеру