feat: implement MSSQL tool with schema discovery closes #3377

This commit is contained in:
kubrakaradirek
2026-02-09 15:32:57 +03:00
parent bcc6848275
commit 3499be782e
15 changed files with 2285 additions and 0 deletions
+66
View File
@@ -0,0 +1,66 @@
# MSSQL Connection Configuration Template
#
# Copy this file to .env and fill in your actual values
# DO NOT commit the .env file to version control!
# ============================================================================
# SQL Server Connection - Choose ONE format below:
# ============================================================================
# OPTION 1: Local named instance
MSSQL_SERVER=localhost\SQLEXPRESS
# OPTION 2: Local default instance
# MSSQL_SERVER=localhost
# OPTION 3: Remote server with default port (1433)
# MSSQL_SERVER=192.168.1.100
# OPTION 4: Remote server with custom port (comma-separated)
# MSSQL_SERVER=192.168.1.100,1433
# OPTION 5: Remote named instance
# MSSQL_SERVER=PRODUCTION-SERVER\INSTANCE01
# OPTION 6: Domain server name
# MSSQL_SERVER=sql-prod.company.com
# OPTION 7: Domain server with port
# MSSQL_SERVER=sql-prod.company.com,1433
# ============================================================================
# Database Configuration
# ============================================================================
MSSQL_DATABASE=AdenTestDB
# ============================================================================
# Authentication - Choose ONE method:
# ============================================================================
# METHOD 1: SQL Server Authentication (username/password)
# Use this for: remote servers, Linux servers, specific SQL logins
MSSQL_USERNAME=sa
MSSQL_PASSWORD=your_password_here
# METHOD 2: Windows Authentication (leave both empty)
# Use this for: local Windows servers, domain-joined environments
# MSSQL_USERNAME=
# MSSQL_PASSWORD=
# ============================================================================
# Important Notes:
# ============================================================================
# - Port format: Use comma (,) not colon - Example: server,1433
# - Named instances: Use backslash (\) - Example: SERVER\INSTANCE
# - Default port: 1433 (can be omitted if using default)
# - ODBC Driver: Requires "ODBC Driver 17 for SQL Server" or newer
# - Security: Never commit this file with real credentials!
# - Escaping: In some shells, escape backslashes (\\) when setting env vars
# ============================================================================
# Example Production Configurations:
# -----------------------------------
# Azure SQL: MSSQL_SERVER=yourserver.database.windows.net
# AWS RDS: MSSQL_SERVER=yourinstance.region.rds.amazonaws.com,1433
# Docker: MSSQL_SERVER=localhost,1401
# Kubernetes: MSSQL_SERVER=mssql-service.namespace.svc.cluster.local,1433
+120
View File
@@ -0,0 +1,120 @@
"""
Database Initialization Script Runner for AdenTestDB
This script executes the SQL initialization file to create the AdenTestDB database.
Make sure your SQL Server is running before executing this script.
"""
import os
import pyodbc
from dotenv import load_dotenv
# Load environment variables from .env
load_dotenv()
# Database connection settings (from environment variables)
SERVER = os.getenv('MSSQL_SERVER', r'MONSTER\MSSQLSERVERR')
USERNAME = os.getenv('MSSQL_USERNAME')
PASSWORD = os.getenv('MSSQL_PASSWORD')
# SQL file path
SQL_FILE = os.path.join(os.path.dirname(__file__), 'init_aden_testdb.sql')
def execute_sql_file():
"""Execute the SQL initialization file."""
connection = None
try:
# Read SQL file
if not os.path.exists(SQL_FILE):
print(f"[ERROR] SQL file not found: {SQL_FILE}")
return False
with open(SQL_FILE, 'r', encoding='utf-8') as f:
sql_script = f.read()
print("=" * 70)
print("AdenTestDB Database Initialization")
print("=" * 70)
print(f"Server: {SERVER}")
print(f"SQL Script: {SQL_FILE}")
print()
# Connect to master database (to create new database)
connection_string = (
f'DRIVER={{ODBC Driver 17 for SQL Server}};'
f'SERVER={SERVER};'
f'DATABASE=master;'
f'UID={USERNAME};'
f'PWD={PASSWORD};'
)
print("Connecting to SQL Server...")
connection = pyodbc.connect(connection_string)
connection.autocommit = True # Required for CREATE DATABASE
cursor = connection.cursor()
print("[OK] Connected successfully!")
print()
print("Executing SQL script...")
print("-" * 70)
# Split by GO statements and execute each batch
batches = sql_script.split('\nGO\n')
for i, batch in enumerate(batches, 1):
batch = batch.strip()
if batch and not batch.startswith('--'):
try:
cursor.execute(batch)
# Print any messages from the server
while cursor.nextset():
pass
except pyodbc.Error as e:
# Some statements might not return results, that's OK
if "No results" not in str(e):
print(f"Warning in batch {i}: {str(e)}")
print("-" * 70)
print()
print("=" * 70)
print("[SUCCESS] Database initialization completed successfully!")
print("=" * 70)
print()
print("Next steps:")
print("1. Run: python test_mssql_connection.py")
print("2. Verify the relational schema and sample data")
print()
return True
except pyodbc.Error as e:
print()
print("=" * 70)
print("[ERROR] Database initialization failed!")
print("=" * 70)
print(f"Error detail: {str(e)}")
print()
print("Possible solutions:")
print("1. Ensure SQL Server is running")
print("2. Check server name, username, and password")
print("3. Ensure you have permission to create databases")
print("4. Verify ODBC Driver 17 for SQL Server is installed")
print()
return False
except Exception as e:
print(f"\n[ERROR] Unexpected error: {str(e)}")
return False
finally:
if connection:
connection.close()
print("Connection closed.")
if __name__ == "__main__":
success = execute_sql_file()
exit(0 if success else 1)
+134
View File
@@ -0,0 +1,134 @@
"""
Grant Permissions to AdenTestDB
This script grants the necessary permissions to the 'sa' user to access AdenTE testDB.
"""
import pyodbc
SERVER = r'MONSTER\MSSQLSERVERR'
USERNAME = 'sa'
PASSWORD = '622622aA.'
def grant_permissions():
"""Grant permissions to the database."""
connection = None
try:
# Connect to AdenTestDB
connection_string = (
f'DRIVER={{ODBC Driver 17 for SQL Server}};'
f'SERVER={SERVER};'
f'DATABASE=AdenTestDB;'
f'UID={USERNAME};'
f'PWD={PASSWORD};'
f'TrustServerCertificate=yes;'
)
print("=" * 70)
print("Granting Permissions to AdenTestDB")
print("=" * 70)
print(f"Server: {SERVER}")
print()
print("Connecting to database...")
connection = pyodbc.connect(connection_string)
cursor = connection.cursor()
print("[OK] Connected successfully!")
print()
# Grant permissions
print("Granting permissions...")
try:
cursor.execute("GRANT SELECT, INSERT, UPDATE, DELETE ON SCHEMA::dbo TO sa")
print("[OK] Granted schema permissions to sa")
except pyodbc.Error as e:
print(f"Note: {str(e)}")
connection.commit()
print()
print("=" * 70)
print("[SUCCESS] Permissions granted!")
print("=" * 70)
print()
print("You can now run: python test_mssql_connection.py")
return True
except pyodbc.Error as e:
# If we can't connect, try connecting to master and creating user
try:
connection_string = (
f'DRIVER={{ODBC Driver 17 for SQL Server}};'
f'SERVER={SERVER};'
f'DATABASE=master;'
f'UID={USERNAME};'
f'PWD={PASSWORD};'
f'TrustServerCertificate=yes;'
)
print("Attempting to grant permissions via master database...")
connection = pyodbc.connect(connection_string)
cursor = connection.cursor()
# Create login if not exists
try:
cursor.execute(f"""
IF NOT EXISTS (SELECT * FROM sys.server_principals WHERE name = 'sa')
BEGIN
CREATE LOGIN sa WITH PASSWORD = '{PASSWORD}'
END
""")
except:
pass
# Switch to AdenTestDB and grant permissions
cursor.execute("USE AdenTestDB")
# Create user if not exists
try:
cursor.execute("""
IF NOT EXISTS (SELECT * FROM sys.database_principals WHERE name = 'sa')
BEGIN
CREATE USER sa FOR LOGIN sa
END
""")
print("[OK] Created database user")
except:
pass
# Grant permissions
cursor.execute("ALTER ROLE db_datareader ADD MEMBER sa")
cursor.execute("ALTER ROLE db_datawriter ADD MEMBER sa")
connection.commit()
print("[OK] Permissions granted successfully!")
return True
except Exception as inner_e:
print("\n[ERROR] Could not grant permissions!")
print(f"Error: {str(inner_e)}")
print()
print("The database was created successfully, but there's a permission issue.")
print("Please run this SQL command in SQL Server Management Studio:")
print()
print("USE AdenTestDB;")
print("GO")
print("ALTER ROLE db_datareader ADD MEMBER sa;")
print("ALTER ROLE db_datawriter ADD MEMBER sa;")
print("GO")
return False
finally:
if connection:
connection.close()
print("\nConnection closed.")
if __name__ == "__main__":
grant_permissions()
+183
View File
@@ -0,0 +1,183 @@
-- ============================================================================
-- AdenTestDB Database Initialization Script
-- ============================================================================
-- Purpose: Create a professional testing database for Aden Hive MSSQL tool
-- Author: Database Architect
-- Date: 2026-02-08
-- ============================================================================
USE master;
GO
-- Drop database if exists (for clean recreation)
IF EXISTS (SELECT name FROM sys.databases WHERE name = N'AdenTestDB')
BEGIN
ALTER DATABASE AdenTestDB SET SINGLE_USER WITH ROLLBACK IMMEDIATE;
DROP DATABASE AdenTestDB;
PRINT 'Existing AdenTestDB dropped successfully.';
END
GO
-- Create new database
CREATE DATABASE AdenTestDB;
GO
PRINT 'AdenTestDB created successfully.';
GO
USE AdenTestDB;
GO
-- ============================================================================
-- TABLE: Departments
-- ============================================================================
-- Purpose: Store department information with budget tracking
-- ============================================================================
CREATE TABLE Departments (
department_id INT IDENTITY(1,1) NOT NULL,
name NVARCHAR(100) NOT NULL,
budget DECIMAL(15,2) NOT NULL,
created_date DATETIME NOT NULL DEFAULT GETDATE(),
CONSTRAINT PK_Departments PRIMARY KEY (department_id),
CONSTRAINT UK_Departments_Name UNIQUE (name),
CONSTRAINT CK_Departments_Budget CHECK (budget >= 0)
);
GO
-- Create index for performance optimization
CREATE INDEX IX_Departments_Name ON Departments(name);
GO
PRINT 'Departments table created successfully.';
GO
-- ============================================================================
-- TABLE: Employees
-- ============================================================================
-- Purpose: Store employee information with department association
-- ============================================================================
CREATE TABLE Employees (
employee_id INT IDENTITY(1000,1) NOT NULL,
first_name NVARCHAR(50) NOT NULL,
last_name NVARCHAR(50) NOT NULL,
email NVARCHAR(100) NOT NULL,
salary DECIMAL(12,2) NOT NULL,
hire_date DATETIME NOT NULL,
department_id INT NOT NULL,
CONSTRAINT PK_Employees PRIMARY KEY (employee_id),
CONSTRAINT UK_Employees_Email UNIQUE (email),
CONSTRAINT CK_Employees_Salary CHECK (salary >= 0),
CONSTRAINT FK_Employees_Departments
FOREIGN KEY (department_id) REFERENCES Departments(department_id)
ON DELETE CASCADE
ON UPDATE CASCADE
);
GO
-- Create indexes for performance optimization
CREATE INDEX IX_Employees_DepartmentId ON Employees(department_id);
CREATE INDEX IX_Employees_LastName ON Employees(last_name);
CREATE INDEX IX_Employees_Email ON Employees(email);
GO
PRINT 'Employees table created successfully.';
GO
-- ============================================================================
-- SAMPLE DATA: Departments
-- ============================================================================
INSERT INTO Departments (name, budget, created_date) VALUES
('Engineering', 2500000.00, '2023-01-15'),
('Human Resources', 800000.00, '2023-01-15'),
('Sales', 1500000.00, '2023-01-20'),
('Marketing', 1200000.00, '2023-02-01'),
('Finance', 1000000.00, '2023-02-10');
GO
PRINT 'Sample departments inserted successfully.';
GO
-- ============================================================================
-- SAMPLE DATA: Employees
-- ============================================================================
INSERT INTO Employees (first_name, last_name, email, salary, hire_date, department_id) VALUES
-- Engineering Department (ID: 1)
('John', 'Smith', 'john.smith@adenhive.com', 120000.00, '2023-03-01', 1),
('Sarah', 'Johnson', 'sarah.johnson@adenhive.com', 115000.00, '2023-03-15', 1),
('Michael', 'Chen', 'michael.chen@adenhive.com', 125000.00, '2023-04-01', 1),
('Emily', 'Rodriguez', 'emily.rodriguez@adenhive.com', 110000.00, '2023-05-10', 1),
('David', 'Kim', 'david.kim@adenhive.com', 105000.00, '2024-01-15', 1),
-- Human Resources Department (ID: 2)
('Lisa', 'Anderson', 'lisa.anderson@adenhive.com', 85000.00, '2023-02-20', 2),
('James', 'Wilson', 'james.wilson@adenhive.com', 80000.00, '2023-06-01', 2),
-- Sales Department (ID: 3)
('Jennifer', 'Taylor', 'jennifer.taylor@adenhive.com', 95000.00, '2023-04-15', 3),
('Robert', 'Martinez', 'robert.martinez@adenhive.com', 90000.00, '2023-05-01', 3),
('Amanda', 'Garcia', 'amanda.garcia@adenhive.com', 92000.00, '2023-07-20', 3),
-- Marketing Department (ID: 4)
('Christopher', 'Lee', 'christopher.lee@adenhive.com', 88000.00, '2023-03-10', 4),
('Michelle', 'White', 'michelle.white@adenhive.com', 86000.00, '2023-08-01', 4),
('Kevin', 'Brown', 'kevin.brown@adenhive.com', 84000.00, '2024-02-01', 4),
-- Finance Department (ID: 5)
('Jessica', 'Davis', 'jessica.davis@adenhive.com', 98000.00, '2023-02-15', 5),
('Daniel', 'Miller', 'daniel.miller@adenhive.com', 95000.00, '2023-09-01', 5);
GO
PRINT 'Sample employees inserted successfully.';
GO
-- ============================================================================
-- VERIFICATION QUERIES
-- ============================================================================
PRINT '';
PRINT '============================================================';
PRINT 'Database Setup Summary';
PRINT '============================================================';
-- Count departments
DECLARE @DeptCount INT;
SELECT @DeptCount = COUNT(*) FROM Departments;
PRINT 'Total Departments: ' + CAST(@DeptCount AS NVARCHAR(10));
-- Count employees
DECLARE @EmpCount INT;
SELECT @EmpCount = COUNT(*) FROM Employees;
PRINT 'Total Employees: ' + CAST(@EmpCount AS NVARCHAR(10));
-- Show department summary
PRINT '';
PRINT 'Department Summary:';
PRINT '------------------------------------------------------------';
SELECT
d.name AS Department,
COUNT(e.employee_id) AS Employees,
d.budget AS Budget,
FORMAT(d.budget / NULLIF(COUNT(e.employee_id), 0), 'C', 'en-US') AS BudgetPerEmployee
FROM Departments d
LEFT JOIN Employees e ON d.department_id = e.department_id
GROUP BY d.name, d.budget
ORDER BY d.name;
GO
PRINT '';
PRINT '============================================================';
PRINT 'AdenTestDB initialization completed successfully!';
PRINT '============================================================';
PRINT '';
PRINT 'Next Steps:';
PRINT '1. Run: python test_mssql_connection.py';
PRINT '2. Verify JOIN queries work correctly';
PRINT '3. Test relational integrity';
PRINT '============================================================';
GO
+196
View File
@@ -0,0 +1,196 @@
"""
Simple MSSQL Connection and Query Test
Tests the AdenTestDB database with JOIN queries
"""
import os
import io
import sys
import pyodbc
from dotenv import load_dotenv
# Force UTF-8 encoding for console output
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
# Load environment variables from .env file
load_dotenv()
# Database connection settings (from environment variables)
SERVER = os.getenv('MSSQL_SERVER', r'MONSTER\MSSQLSERVERR')
DATABASE = os.getenv('MSSQL_DATABASE', 'AdenTestDB')
USERNAME = os.getenv('MSSQL_USERNAME')
PASSWORD = os.getenv('MSSQL_PASSWORD')
def main():
"""Main test function."""
connection = None
try:
print("=" * 70)
print(" MSSQL Connection Test for AdenTestDB")
print("=" * 70)
print(f"Server: {SERVER}")
print(f"Database: {DATABASE}")
print()
# Connect to database
if USERNAME and PASSWORD:
# SQL Server Authentication
connection_string = (
f'DRIVER={{ODBC Driver 17 for SQL Server}};'
f'SERVER={SERVER};'
f'DATABASE={DATABASE};'
f'UID={USERNAME};'
f'PWD={PASSWORD};'
)
else:
# Windows Authentication
connection_string = (
f'DRIVER={{ODBC Driver 17 for SQL Server}};'
f'SERVER={SERVER};'
f'DATABASE={DATABASE};'
f'Trusted_Connection=yes;'
)
print("Connecting to database...")
connection = pyodbc.connect(connection_string)
cursor = connection.cursor()
print("✓ Connection successful!")
print()
# Test 1: Count tables
print("=" * 70)
print(" Table Count Verification")
print("=" * 70)
cursor.execute("SELECT COUNT(*) FROM Departments")
dept_count = cursor.fetchone()[0]
print(f"✓ Departments: {dept_count} records")
cursor.execute("SELECT COUNT(*) FROM Employees")
emp_count = cursor.fetchone()[0]
print(f"✓ Employees: {emp_count} records")
print()
# Test 2: JOIN Query
print("=" * 70)
print(" JOIN Query Test - Employees with Departments")
print("=" * 70)
query = """
SELECT
e.employee_id,
e.first_name + ' ' + e.last_name AS full_name,
e.email,
e.salary,
d.name AS department
FROM Employees e
INNER JOIN Departments d ON e.department_id = d.department_id
ORDER BY d.name, e.last_name
"""
cursor.execute(query)
print("\nEmployee List with Departments:")
print("-" * 70)
print(f"{'ID':<6} {'Name':<25} {'Email':<30} {'Dept':<15}")
print("-" * 70)
row_count = 0
for row in cursor:
row_count += 1
print(f"{row[0]:<6} {row[1]:<25} {row[2]:<30} {row[4]:<15}")
print("-" * 70)
print(f"✓ Total records fetched: {row_count}")
print()
# Test 3: Aggregate Query
print("=" * 70)
print(" Aggregate Query - Department Statistics")
print("=" * 70)
query = """
SELECT
d.name AS department,
COUNT(e.employee_id) AS emp_count,
AVG(e.salary) AS avg_salary,
d.budget
FROM Departments d
LEFT JOIN Employees e ON d.department_id = e.department_id
GROUP BY d.name, d.budget
ORDER BY emp_count DESC
"""
cursor.execute(query)
print(f"\n{'Department':<20} {'Employees':<12} {'Avg Salary':<15} {'Budget':<15}")
print("-" * 70)
for row in cursor:
avg_salary = f"${row[2]:,.2f}" if row[2] else "N/A"
budget = f"${row[3]:,.2f}"
print(f"{row[0]:<20} {row[1]:<12} {avg_salary:<15} {budget:<15}")
print("-" * 70)
print("✓ Aggregate query completed!")
print()
# Test 4: Foreign Key Integrity
print("=" * 70)
print(" Relational Integrity Check")
print("=" * 70)
query = """
SELECT COUNT(*)
FROM Employees e
LEFT JOIN Departments d ON e.department_id = d.department_id
WHERE d.department_id IS NULL
"""
cursor.execute(query)
orphaned = cursor.fetchone()[0]
if orphaned == 0:
print("✓ All employees have valid department associations")
print("✓ Foreign key constraints verified")
else:
print(f"⚠ WARNING: Found {orphaned} orphaned records!")
print()
# Final Summary
print("=" * 70)
print(" Test Summary")
print("=" * 70)
print("✓ Connection to AdenTestDB successful")
print("✓ Table counts verified")
print("✓ JOIN query executed successfully")
print("✓ Aggregate functions working")
print("✓ Relational integrity confirmed")
print("=" * 70)
print("\nAll tests passed successfully!")
except pyodbc.Error as e:
print("\n[ERROR] Database operation failed!")
print(f"Error detail: {str(e)}")
print()
print("Possible solutions:")
print("1. Ensure SQL Server is running")
print("2. Verify the 'sa' user has permission to access AdenTestDB")
print("3. Try running: sqlcmd -S MONSTER\\MSSQLSERVERR -U sa -P 622622aA. -Q \"USE AdenTestDB; SELECT 1;\"")
print("4. Grant permissions: GRANT CONNECT TO sa; USE AdenTestDB; GRANT SELECT TO sa;")
except Exception as e:
print(f"\n[ERROR] Unexpected error: {str(e)}")
finally:
if connection:
connection.close()
print("\nConnection closed.")
if __name__ == "__main__":
main()
+202
View File
@@ -0,0 +1,202 @@
"""
Payroll Analysis Tool
Analyzes total payroll costs by department and identifies highest-paid employee
"""
import os
import io
import sys
import pyodbc
from dotenv import load_dotenv
# Force UTF-8 encoding for console output
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
# Load environment variables from .env file
load_dotenv()
# Database connection settings (from environment variables)
SERVER = os.getenv('MSSQL_SERVER', r'MONSTER\MSSQLSERVERR')
DATABASE = os.getenv('MSSQL_DATABASE', 'AdenTestDB')
USERNAME = os.getenv('MSSQL_USERNAME')
PASSWORD = os.getenv('MSSQL_PASSWORD')
def main():
"""Main analysis function."""
connection = None
try:
print("=" * 80)
print(" COMPANY PAYROLL ANALYSIS")
print("=" * 80)
print(f"Server: {SERVER}")
print(f"Database: {DATABASE}")
print()
# Connect to database
if USERNAME and PASSWORD:
# SQL Server Authentication
connection_string = (
f'DRIVER={{ODBC Driver 17 for SQL Server}};'
f'SERVER={SERVER};'
f'DATABASE={DATABASE};'
f'UID={USERNAME};'
f'PWD={PASSWORD};'
)
else:
# Windows Authentication
connection_string = (
f'DRIVER={{ODBC Driver 17 for SQL Server}};'
f'SERVER={SERVER};'
f'DATABASE={DATABASE};'
f'Trusted_Connection=yes;'
)
print("Connecting to database...")
connection = pyodbc.connect(connection_string)
cursor = connection.cursor()
print("✓ Connection successful!")
print()
# Analysis 1: Total Payroll by Department
print("=" * 80)
print(" TOTAL SALARY COSTS BY DEPARTMENT")
print("=" * 80)
payroll_query = """
SELECT
d.name AS department_name,
COUNT(e.employee_id) AS employee_count,
SUM(e.salary) AS total_salary_cost,
AVG(e.salary) AS avg_salary
FROM Departments d
LEFT JOIN Employees e ON d.department_id = e.department_id
GROUP BY d.name
ORDER BY total_salary_cost DESC
"""
cursor.execute(payroll_query)
print(f"\n{'Department':<25} {'Employees':<12} {'Total Salary Cost':<20} {'Avg Salary':<15}")
print("-" * 80)
total_company_payroll = 0
total_employees = 0
for row in cursor:
dept_name = row[0]
emp_count = row[1]
total_salary = row[2] if row[2] else 0
avg_salary = row[3] if row[3] else 0
total_company_payroll += total_salary
total_employees += emp_count
total_salary_str = f"${total_salary:,.2f}"
avg_salary_str = f"${avg_salary:,.2f}" if avg_salary > 0 else "N/A"
print(f"{dept_name:<25} {emp_count:<12} {total_salary_str:<20} {avg_salary_str:<15}")
print("-" * 80)
print(f"{'TOTAL COMPANY':<25} {total_employees:<12} ${total_company_payroll:,.2f}")
print("-" * 80)
print()
# Analysis 2: Highest Paid Employee
print("=" * 80)
print(" HIGHEST PAID EMPLOYEE")
print("=" * 80)
highest_paid_query = """
SELECT TOP 1
e.employee_id,
e.first_name + ' ' + e.last_name AS full_name,
e.email,
e.salary,
d.name AS department_name
FROM Employees e
INNER JOIN Departments d ON e.department_id = d.department_id
ORDER BY e.salary DESC
"""
cursor.execute(highest_paid_query)
top_employee = cursor.fetchone()
if top_employee:
print(f"\n{'Field':<20} {'Value':<50}")
print("-" * 80)
print(f"{'Employee ID':<20} {top_employee[0]}")
print(f"{'Name':<20} {top_employee[1]}")
print(f"{'Email':<20} {top_employee[2]}")
print(f"{'Department':<20} {top_employee[4]}")
print(f"{'Salary':<20} ${top_employee[3]:,.2f}")
print("-" * 80)
else:
print("\nNo employees found in the database.")
print()
# Additional Analysis: Top 5 Highest Paid Employees
print("=" * 80)
print(" TOP 5 HIGHEST PAID EMPLOYEES")
print("=" * 80)
top_5_query = """
SELECT TOP 5
e.first_name + ' ' + e.last_name AS full_name,
d.name AS department_name,
e.salary
FROM Employees e
INNER JOIN Departments d ON e.department_id = d.department_id
ORDER BY e.salary DESC
"""
cursor.execute(top_5_query)
print(f"\n{'Rank':<6} {'Name':<30} {'Department':<25} {'Salary':<15}")
print("-" * 80)
rank = 1
for row in cursor:
full_name = row[0]
dept_name = row[1]
salary = row[2]
print(f"{rank:<6} {full_name:<30} {dept_name:<25} ${salary:,.2f}")
rank += 1
print("-" * 80)
print()
# Summary
print("=" * 80)
print(" ANALYSIS SUMMARY")
print("=" * 80)
print(f"✓ Total Employees: {total_employees}")
print(f"✓ Total Company Payroll: ${total_company_payroll:,.2f}")
print(f"✓ Average Employee Salary: ${total_company_payroll / total_employees:,.2f}" if total_employees > 0 else "N/A")
print("=" * 80)
print("\nPayroll analysis completed successfully!")
except pyodbc.Error as e:
print("\n[ERROR] Database operation failed!")
print(f"Error detail: {str(e)}")
print()
print("Possible solutions:")
print("1. Ensure SQL Server is running")
print("2. Verify database access permissions")
print("3. Check connection string configuration")
except Exception as e:
print(f"\n[ERROR] Unexpected error: {str(e)}")
finally:
if connection:
connection.close()
print("\nConnection closed.")
if __name__ == "__main__":
main()
+4
View File
@@ -47,11 +47,15 @@ ocr = [
sql = [
"duckdb>=1.0.0",
]
mssql = [
"pyodbc>=5.0.0",
]
all = [
"RestrictedPython>=7.0",
"pytesseract>=0.3.10",
"pillow>=10.0.0",
"duckdb>=1.0.0",
"pyodbc>=5.0.0",
]
[build-system]
+116
View File
@@ -0,0 +1,116 @@
"""
Query Average Salary by Department
"""
import os
import io
import sys
import pyodbc
from dotenv import load_dotenv
# Force UTF-8 encoding for console output
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
# Load environment variables from .env file
load_dotenv()
# Database connection settings (from environment variables)
SERVER = os.getenv('MSSQL_SERVER', r'MONSTER\\MSSQLSERVERR')
DATABASE = os.getenv('MSSQL_DATABASE', 'AdenTestDB')
USERNAME = os.getenv('MSSQL_USERNAME')
PASSWORD = os.getenv('MSSQL_PASSWORD')
def main():
"""Query and display average salary by department."""
connection = None
try:
# Connect to database
if USERNAME and PASSWORD:
# SQL Server Authentication
connection_string = (
f'DRIVER={{ODBC Driver 17 for SQL Server}};'
f'SERVER={SERVER};'
f'DATABASE={DATABASE};'
f'UID={USERNAME};'
f'PWD={PASSWORD};'
)
else:
# Windows Authentication
connection_string = (
f'DRIVER={{ODBC Driver 17 for SQL Server}};'
f'SERVER={SERVER};'
f'DATABASE={DATABASE};'
f'Trusted_Connection=yes;'
)
connection = pyodbc.connect(connection_string)
cursor = connection.cursor()
# Query to get average salary by department, sorted by average salary descending
query = """
SELECT
d.name AS department,
AVG(e.salary) AS avg_salary,
COUNT(e.employee_id) AS emp_count
FROM Departments d
LEFT JOIN Employees e ON d.department_id = e.department_id
WHERE e.salary IS NOT NULL
GROUP BY d.name
ORDER BY avg_salary DESC
"""
cursor.execute(query)
results = cursor.fetchall()
if not results:
print("No salary data found.")
return
# Get the highest average salary for highlighting
highest_avg = results[0][1] if results else 0
print("=" * 80)
print(" AVERAGE SALARY BY DEPARTMENT (Sorted Highest to Lowest)")
print("=" * 80)
print()
print(f"{'Rank':<6} {'Department':<25} {'Avg Salary':<20} {'Employees':<12}")
print("-" * 80)
for idx, row in enumerate(results, 1):
department = row[0]
avg_salary = row[1]
emp_count = row[2]
avg_salary_str = f"${avg_salary:,.2f}"
# Highlight the department with the highest average
if avg_salary == highest_avg:
# Use special formatting for the highest
print(f"{'>>> ' + str(idx):<6} {department:<25} {avg_salary_str:<20} {emp_count:<12} ⭐ HIGHEST")
else:
print(f"{idx:<6} {department:<25} {avg_salary_str:<20} {emp_count:<12}")
print("-" * 80)
print()
print(f"📊 Summary:")
print(f" • Total departments with employees: {len(results)}")
print(f" • Highest average salary: ${highest_avg:,.2f} ({results[0][0]})")
print(f" • Lowest average salary: ${results[-1][1]:,.2f} ({results[-1][0]})")
print("=" * 80)
except pyodbc.Error as e:
print(f"\n[ERROR] Database operation failed: {str(e)}")
except Exception as e:
print(f"\n[ERROR] Unexpected error: {str(e)}")
finally:
if connection:
connection.close()
if __name__ == "__main__":
main()
@@ -91,4 +91,81 @@ INTEGRATION_CREDENTIALS = {
credential_id="hubspot",
credential_key="access_token",
),
"mssql_server": CredentialSpec(
env_var="MSSQL_SERVER",
tools=[
"mssql_execute_query",
"mssql_execute_update",
"mssql_get_schema",
"mssql_execute_procedure",
],
required=True,
startup_required=False,
help_url="https://learn.microsoft.com/en-us/sql/connect/odbc/download-odbc-driver-for-sql-server",
description="MSSQL Server instance name (e.g., 'localhost' or 'SERVER\\INSTANCE')",
# Auth method support
aden_supported=False,
direct_api_key_supported=True,
api_key_instructions="""To connect to MSSQL Server:
1. Ensure SQL Server is installed and running
2. Note your server instance name (e.g., 'localhost\\SQLEXPRESS')
3. Set the connection details as environment variables:
- MSSQL_SERVER: Server address (e.g., 'localhost\\SQLEXPRESS')
- MSSQL_DATABASE: Database name (e.g., 'AdenTestDB')
- MSSQL_USERNAME: SQL Server username (e.g., 'sa')
- MSSQL_PASSWORD: SQL Server password
4. Ensure ODBC Driver 17 for SQL Server is installed""",
# Credential store mapping
credential_id="mssql",
credential_key="server",
),
"mssql_database": CredentialSpec(
env_var="MSSQL_DATABASE",
tools=[
"mssql_execute_query",
"mssql_execute_update",
"mssql_get_schema",
"mssql_execute_procedure",
],
required=True,
startup_required=False,
help_url="https://learn.microsoft.com/en-us/sql/t-sql/statements/create-database-transact-sql",
description="MSSQL Database name to connect to",
aden_supported=False,
direct_api_key_supported=True,
credential_id="mssql",
credential_key="database",
),
"mssql_username": CredentialSpec(
env_var="MSSQL_USERNAME",
tools=[
"mssql_execute_query",
"mssql_execute_update",
"mssql_get_schema",
"mssql_execute_procedure",
],
required=False, # Optional - can use Windows Auth
startup_required=False,
description="MSSQL Server username (not required for Windows Authentication)",
aden_supported=False,
direct_api_key_supported=True,
credential_id="mssql",
credential_key="username",
),
"mssql_password": CredentialSpec(
env_var="MSSQL_PASSWORD",
tools=[
"mssql_execute_query",
"mssql_execute_update",
"mssql_get_schema",
"mssql_execute_procedure",
],
required=False, # Optional - can use Windows Auth
startup_required=False,
description="MSSQL Server password (not required for Windows Authentication)",
aden_supported=False,
direct_api_key_supported=True,
credential_id="mssql",
credential_key="password",
),
}
+6
View File
@@ -40,6 +40,7 @@ from .file_system_toolkits.view_file import register_tools as register_view_file
from .file_system_toolkits.write_to_file import register_tools as register_write_to_file
from .github_tool import register_tools as register_github
from .hubspot_tool import register_tools as register_hubspot
from .mssql_tool import register_tools as register_mssql
from .pdf_read_tool import register_tools as register_pdf_read
from .web_scrape_tool import register_tools as register_web_scrape
from .web_search_tool import register_tools as register_web_search
@@ -72,6 +73,7 @@ def register_all_tools(
# email supports multiple providers (Resend) with auto-detection
register_email(mcp, credentials=credentials)
register_hubspot(mcp, credentials=credentials)
register_mssql(mcp, credentials=credentials)
# Register file system toolkits
register_view_file(mcp)
@@ -129,6 +131,10 @@ def register_all_tools(
"hubspot_get_deal",
"hubspot_create_deal",
"hubspot_update_deal",
"mssql_execute_query",
"mssql_execute_update",
"mssql_get_schema",
"mssql_execute_procedure",
]
@@ -0,0 +1,345 @@
# MSSQL Tool
Professional SQL Server database operations for Aden Hive.
## Overview
The MSSQL tool provides secure database access to Microsoft SQL Server with comprehensive operations for querying, updating, schema inspection, and stored procedure execution.
## Features
- **Execute Queries**: Run SELECT statements with automatic result formatting
- **Execute Updates**: Perform INSERT/UPDATE/DELETE with transaction support
- **Schema Inspection**: Get database structure, table metadata, and relationships
- **Stored Procedures**: Execute procedures with parameter passing
- **Secure Credentials**: Uses CredentialStoreAdapter for environment-based auth
- **Connection Pooling**: Efficient connection management
- **Error Handling**: Clear, actionable error messages
## Environment Setup
### Required Variables
```bash
# SQL Server connection details
MSSQL_SERVER=your-server-name # e.g., "localhost\SQLEXPRESS" or "localhost"
MSSQL_DATABASE=your-database-name # e.g., "AdenTestDB"
# Authentication (Option 1: SQL Server Authentication)
MSSQL_USERNAME=your-username # e.g., "sa"
MSSQL_PASSWORD=your-password
# Authentication (Option 2: Windows Authentication)
# Leave MSSQL_USERNAME and MSSQL_PASSWORD empty to use Windows Auth
```
### Setup Methods
#### 1. Using .env file (Recommended for development)
Create a `.env` file in your project root:
```bash
MSSQL_SERVER=localhost\SQLEXPRESS
MSSQL_DATABASE=AdenTestDB
MSSQL_USERNAME=sa
MSSQL_PASSWORD=yourpassword
```
#### 2. Using environment variables
```bash
# Windows PowerShell
$env:MSSQL_SERVER = "localhost\SQLEXPRESS"
$env:MSSQL_DATABASE = "AdenTestDB"
$env:MSSQL_USERNAME = "sa"
$env:MSSQL_PASSWORD = "yourpassword"
# Linux/Mac bash
export MSSQL_SERVER="localhost"
export MSSQL_DATABASE="AdenTestDB"
export MSSQL_USERNAME="sa"
export MSSQL_PASSWORD="yourpassword"
```
### Server Connection Formats
The MSSQL_SERVER variable supports multiple connection formats:
| Format | Example | Use Case |
|--------|---------|----------|
| Local named instance | `localhost\SQLEXPRESS` | Development on local machine |
| Local default | `localhost` | Local SQL Server, default instance |
| Remote IP | `192.168.1.100` | Remote server, default port (1433) |
| Remote IP + Port | `192.168.1.100,1433` | Remote server, custom port |
| Remote named instance | `PRODUCTION\INSTANCE01` | Remote named instance |
| Domain name | `sql-prod.company.com` | Production domain server |
| Domain + Port | `sql-prod.company.com,1433` | Production with custom port |
| Azure SQL | `yourserver.database.windows.net` | Azure SQL Database |
| AWS RDS | `instance.region.rds.amazonaws.com,1433` | AWS RDS for SQL Server |
**Important Notes:**
- Use **comma (`,`)** for ports, not colon - e.g., `server,1433`
- Use **backslash (`\`)** for named instances - e.g., `SERVER\INSTANCE`
- Default port is `1433` - can be omitted when using default
- Named instances discover their port automatically
### Prerequisites
1. **MSSQL Server**: Ensure SQL Server is installed and running
2. **ODBC Driver**: Install [ODBC Driver 17 for SQL Server](https://learn.microsoft.com/en-us/sql/connect/odbc/download-odbc-driver-for-sql-server)
3. **Python Package**: Install the tool with MSSQL support:
```bash
pip install -e ".[mssql]"
```
## Tool Functions
### 1. mssql_execute_query
Execute SELECT queries and retrieve results.
**Parameters:**
- `query` (str): SQL SELECT query
- `max_rows` (int, optional): Maximum rows to return (1-10000, default: 1000)
**Returns:**
```python
{
"columns": ["id", "name", "email"],
"rows": [
{"id": 1, "name": "John", "email": "john@example.com"},
{"id": 2, "name": "Jane", "email": "jane@example.com"}
],
"row_count": 2,
"truncated": false
}
```
**Example:**
```python
from fastmcp import FastMCP
from aden_tools.tools.mssql_tool import register_tools
from aden_tools.credentials import CredentialStoreAdapter
mcp = FastMCP("my-server")
credentials = CredentialStoreAdapter.with_env_storage()
register_tools(mcp, credentials=credentials)
# Now use via MCP
result = mssql_execute_query(
query="SELECT * FROM Employees WHERE department_id = 1"
)
```
### 2. mssql_execute_update
Execute INSERT, UPDATE, DELETE, or MERGE operations.
**Parameters:**
- `query` (str): SQL modification query
- `commit` (bool, optional): Whether to commit transaction (default: True)
**Returns:**
```python
{
"success": true,
"affected_rows": 5,
"message": "Successfully affected 5 row(s)"
}
```
**Safety Features:**
- Prevents DELETE without WHERE clause
- Transaction support with automatic rollback on error
- Returns affected row count
**Example:**
```python
result = mssql_execute_update(
query="""
UPDATE Employees
SET salary = salary * 1.1
WHERE department_id = 2
""",
commit=True
)
```
### 3. mssql_get_schema
Inspect database schema and table structure.
**Parameters:**
- `table_name` (str, optional): Specific table to inspect (None = list all tables)
- `include_indexes` (bool, optional): Include index information (default: False)
**Returns (all tables):**
```python
{
"tables": ["Departments", "Employees"],
"table_count": 2
}
```
**Returns (specific table):**
```python
{
"table": "Employees",
"columns": [
{
"name": "employee_id",
"type": "int",
"nullable": False,
"primary_key": True
},
{
"name": "first_name",
"type": "nvarchar(50)",
"nullable": False,
"primary_key": False
}
],
"column_count": 7,
"foreign_keys": [
{
"column": "department_id",
"references": "Departments(department_id)"
}
]
}
```
**Example:**
```python
# List all tables
result = mssql_get_schema()
# Get specific table schema
result = mssql_get_schema(
table_name="Employees",
include_indexes=True
)
```
### 4. mssql_execute_procedure
Execute stored procedures with parameters.
**Parameters:**
- `procedure_name` (str): Name of stored procedure
- `parameters` (dict, optional): Parameter name-value pairs
**Returns:**
```python
{
"success": True,
"procedure": "GetEmployeesByDepartment",
"result_sets": [
{
"columns": ["employee_id", "name", "salary"],
"rows": [
{"employee_id": 1, "name": "John", "salary": 75000}
]
}
],
"result_set_count": 1
}
```
**Example:**
```python
result = mssql_execute_procedure(
procedure_name="GetEmployeesByDepartment",
parameters={"department_id": 1}
)
```
## Error Handling
All tools return error information in a consistent format:
```python
{
"error": "Descriptive error message",
"committed": False # For update operations
}
```
Common errors:
- **Authentication Failed**: Check MSSQL_USERNAME and MSSQL_PASSWORD
- **Cannot Access Database**: Verify database name and permissions
- **Server Not Found**: Check MSSQL_SERVER value
- **Connection Failed**: Ensure SQL Server is running and ODBC driver is installed
## Security Best Practices
1. **Never hardcode credentials** - Always use environment variables or .env files
2. **Use least privilege** - Grant only necessary database permissions
3. **Validate inputs** - The tool includes query validation and SQL injection prevention
4. **Use transactions** - All updates are wrapped in transactions with automatic rollback
5. **Secure .env files** - Add `.env` to `.gitignore` to prevent credential exposure
## Testing
Test your connection:
```bash
cd tools
python test_mssql_connection.py
```
Expected output shows successful connection, query execution, and data retrieval.
## Integration Example
```python
from fastmcp import FastMCP
from aden_tools.tools import register_all_tools
from aden_tools.credentials import CredentialStoreAdapter
# Create MCP server
mcp = FastMCP("aden-server")
# Set up credentials
credentials = CredentialStoreAdapter.with_env_storage()
# Register all tools (includes MSSQL)
register_all_tools(mcp, credentials=credentials)
# Start server
mcp.run()
```
## Troubleshooting
### ODBC Driver Not Found
Error: `[Microsoft][ODBC Driver Manager] Data source name not found`
Solution: Install ODBC Driver 17 for SQL Server from Microsoft
### Connection Timeout
Error: `Connection timed out`
Solutions:
- Verify SQL Server is running
- Check firewall settings
- Ensure TCP/IP protocol is enabled in SQL Server Configuration Manager
- Verify server name format (use `\\` for instance names)
### Authentication Issues
Error: `Login failed for user`
Solutions:
- Verify username/password are correct
- Ensure SQL Server authentication is enabled
- Check user has access to the specified database
- For Windows Auth, leave USERNAME and PASSWORD empty
## License
This tool is part of the Aden Hive project.
@@ -0,0 +1,5 @@
"""MSSQL Tool package."""
from .mssql_tool import register_tools
__all__ = ["register_tools"]
@@ -0,0 +1,509 @@
"""
MSSQL Tool - Professional SQL Server database operations for Aden Hive.
Provides tools for:
- Executing SELECT queries
- Executing INSERT/UPDATE/DELETE operations
- Inspecting database schema
- Executing stored procedures
Security: Uses CredentialStoreAdapter for secure credential management.
"""
from __future__ import annotations
import os
from typing import TYPE_CHECKING, Any
import pyodbc
from fastmcp import FastMCP
if TYPE_CHECKING:
from aden_tools.credentials import CredentialStoreAdapter
def register_tools(
mcp: FastMCP,
credentials: CredentialStoreAdapter | None = None,
) -> None:
"""Register MSSQL tools with the MCP server."""
def _get_connection_params() -> dict[str, str | None]:
"""Get MSSQL connection parameters from credentials or environment."""
if credentials is not None:
return {
"server": credentials.get("mssql_server"),
"database": credentials.get("mssql_database"),
"username": credentials.get("mssql_username"),
"password": credentials.get("mssql_password"),
}
return {
"server": os.getenv("MSSQL_SERVER"),
"database": os.getenv("MSSQL_DATABASE"),
"username": os.getenv("MSSQL_USERNAME"),
"password": os.getenv("MSSQL_PASSWORD"),
}
def _create_connection() -> tuple[pyodbc.Connection | None, str | None]:
"""
Create a database connection.
Returns:
Tuple of (connection, error_message). If successful, error_message is None.
"""
params = _get_connection_params()
# Validate required parameters
if not params["server"]:
return None, "MSSQL_SERVER environment variable not set"
if not params["database"]:
return None, "MSSQL_DATABASE environment variable not set"
try:
# Build connection string
if params["username"] and params["password"]:
# SQL Server Authentication
connection_string = (
f'DRIVER={{ODBC Driver 17 for SQL Server}};'
f'SERVER={params["server"]};'
f'DATABASE={params["database"]};'
f'UID={params["username"]};'
f'PWD={params["password"]};'
)
else:
# Windows Authentication
connection_string = (
f'DRIVER={{ODBC Driver 17 for SQL Server}};'
f'SERVER={params["server"]};'
f'DATABASE={params["database"]};'
f'Trusted_Connection=yes;'
)
connection = pyodbc.connect(connection_string, timeout=10)
return connection, None
except pyodbc.Error as e:
error_msg = str(e)
if "Login failed" in error_msg:
return None, "Authentication failed. Check MSSQL_USERNAME and MSSQL_PASSWORD"
elif "Cannot open database" in error_msg:
return None, f"Cannot access database '{params['database']}'. Check permissions."
elif "SQL Server does not exist" in error_msg:
return None, f"Server '{params['server']}' not found. Check MSSQL_SERVER value."
else:
return None, f"Connection failed: {error_msg}"
@mcp.tool()
def mssql_execute_query(
query: str,
max_rows: int = 1000,
) -> dict[str, Any]:
"""
Execute a SELECT query on the MSSQL database.
Use this tool to retrieve data from the database using SELECT statements.
Results are returned as a list of dictionaries with column names as keys.
Args:
query: SQL SELECT query to execute (must start with SELECT)
max_rows: Maximum number of rows to return (1-10000, default 1000)
Returns:
Dict with 'columns', 'rows', 'row_count', and optionally 'error'
Example:
{
"columns": ["id", "name", "email"],
"rows": [
{"id": 1, "name": "John", "email": "john@example.com"},
{"id": 2, "name": "Jane", "email": "jane@example.com"}
],
"row_count": 2
}
"""
# Validate inputs
if not query or len(query.strip()) == 0:
return {"error": "Query cannot be empty"}
if max_rows < 1 or max_rows > 10000:
return {"error": "max_rows must be between 1 and 10000"}
# Basic query validation
query_upper = query.strip().upper()
if not query_upper.startswith("SELECT") and not query_upper.startswith("WITH"):
return {
"error": "Only SELECT queries are allowed. Use mssql_execute_update for modifications."
}
connection, error = _create_connection()
if error:
return {"error": error}
try:
cursor = connection.cursor()
cursor.execute(query)
# Get column names
columns = [column[0] for column in cursor.description]
# Fetch rows
rows = []
for row in cursor.fetchmany(max_rows):
row_dict = {}
for i, column in enumerate(columns):
value = row[i]
# Convert to JSON-serializable types
if hasattr(value, 'isoformat'): # datetime objects
value = value.isoformat()
row_dict[column] = value
rows.append(row_dict)
return {
"columns": columns,
"rows": rows,
"row_count": len(rows),
"truncated": len(rows) == max_rows,
}
except pyodbc.Error as e:
return {"error": f"Query execution failed: {str(e)}"}
finally:
if connection:
connection.close()
@mcp.tool()
def mssql_execute_update(
query: str,
commit: bool = True,
) -> dict[str, Any]:
"""
Execute an INSERT, UPDATE, or DELETE query on the MSSQL database.
Use this tool to modify data in the database. The operation is wrapped
in a transaction and will be rolled back on error unless commit=False.
Args:
query: SQL INSERT/UPDATE/DELETE query to execute
commit: Whether to commit the transaction (default True)
Returns:
Dict with 'affected_rows', 'success', and optionally 'error'
Example:
{
"success": true,
"affected_rows": 5,
"message": "Successfully updated 5 rows"
}
"""
# Validate inputs
if not query or len(query.strip()) == 0:
return {"error": "Query cannot be empty"}
# Basic query validation
query_upper = query.strip().upper()
allowed_keywords = ["INSERT", "UPDATE", "DELETE", "MERGE"]
if not any(query_upper.startswith(kw) for kw in allowed_keywords):
return {
"error": f"Only {', '.join(allowed_keywords)} queries are allowed. "
"Use mssql_execute_query for SELECT."
}
# Safety check for DELETE without WHERE
if query_upper.startswith("DELETE") and "WHERE" not in query_upper:
return {
"error": "DELETE without WHERE clause is not allowed for safety. "
"Add a WHERE clause or use DELETE FROM table WHERE 1=1 if intentional."
}
connection, error = _create_connection()
if error:
return {"error": error}
try:
cursor = connection.cursor()
cursor.execute(query)
affected_rows = cursor.rowcount
if commit:
connection.commit()
return {
"success": True,
"affected_rows": affected_rows,
"message": f"Successfully affected {affected_rows} row(s)",
}
else:
connection.rollback()
return {
"success": True,
"affected_rows": affected_rows,
"message": f"Query executed (rolled back). Would affect {affected_rows} row(s)",
"committed": False,
}
except pyodbc.Error as e:
if connection:
connection.rollback()
return {
"success": False,
"error": f"Query execution failed: {str(e)}",
"committed": False,
}
finally:
if connection:
connection.close()
@mcp.tool()
def mssql_get_schema(
table_name: str | None = None,
include_indexes: bool = False,
) -> dict[str, Any]:
"""
Get database schema information.
Use this to inspect database structure, tables, columns, and relationships.
Args:
table_name: Optional specific table name to get detailed info for.
If None, returns list of all tables.
include_indexes: Include index information (only when table_name is specified)
Returns:
Dict with schema information
Examples:
# List all tables
{"tables": ["Departments", "Employees"], "table_count": 2}
# Get specific table schema
{
"table": "Employees",
"columns": [
{"name": "employee_id", "type": "int", "nullable": False, "primary_key": True},
{"name": "first_name", "type": "nvarchar(50)", "nullable": False}
],
"foreign_keys": [
{"column": "department_id", "references": "Departments(department_id)"}
]
}
"""
connection, error = _create_connection()
if error:
return {"error": error}
try:
cursor = connection.cursor()
if table_name is None:
# List all tables
cursor.execute("""
SELECT TABLE_NAME
FROM INFORMATION_SCHEMA.TABLES
WHERE TABLE_TYPE = 'BASE TABLE'
ORDER BY TABLE_NAME
""")
tables = [row[0] for row in cursor.fetchall()]
return {
"tables": tables,
"table_count": len(tables),
}
else:
# Get detailed table schema
# Check if table exists
cursor.execute("""
SELECT COUNT(*)
FROM INFORMATION_SCHEMA.TABLES
WHERE TABLE_NAME = ?
""", table_name)
if cursor.fetchone()[0] == 0:
return {"error": f"Table '{table_name}' not found"}
# Get columns
cursor.execute("""
SELECT
c.COLUMN_NAME,
c.DATA_TYPE,
c.CHARACTER_MAXIMUM_LENGTH,
c.IS_NULLABLE,
CASE WHEN pk.COLUMN_NAME IS NOT NULL THEN 1 ELSE 0 END AS IS_PRIMARY_KEY
FROM INFORMATION_SCHEMA.COLUMNS c
LEFT JOIN (
SELECT ku.COLUMN_NAME
FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS tc
JOIN INFORMATION_SCHEMA.KEY_COLUMN_USAGE ku
ON tc.CONSTRAINT_NAME = ku.CONSTRAINT_NAME
WHERE tc.CONSTRAINT_TYPE = 'PRIMARY KEY'
AND tc.TABLE_NAME = ?
) pk ON c.COLUMN_NAME = pk.COLUMN_NAME
WHERE c.TABLE_NAME = ?
ORDER BY c.ORDINAL_POSITION
""", table_name, table_name)
columns = []
for row in cursor.fetchall():
col_type = row[1]
if row[2]: # Add length for varchar/nvarchar
col_type += f"({row[2]})"
columns.append({
"name": row[0],
"type": col_type,
"nullable": row[3] == "YES",
"primary_key": bool(row[4]),
})
# Get foreign keys
cursor.execute("""
SELECT
kcu.COLUMN_NAME,
ccu.TABLE_NAME AS REFERENCED_TABLE,
ccu.COLUMN_NAME AS REFERENCED_COLUMN
FROM INFORMATION_SCHEMA.REFERENTIAL_CONSTRAINTS rc
JOIN INFORMATION_SCHEMA.KEY_COLUMN_USAGE kcu
ON rc.CONSTRAINT_NAME = kcu.CONSTRAINT_NAME
JOIN INFORMATION_SCHEMA.CONSTRAINT_COLUMN_USAGE ccu
ON rc.UNIQUE_CONSTRAINT_NAME = ccu.CONSTRAINT_NAME
WHERE kcu.TABLE_NAME = ?
""", table_name)
foreign_keys = []
for row in cursor.fetchall():
foreign_keys.append({
"column": row[0],
"references": f"{row[1]}({row[2]})",
})
result = {
"table": table_name,
"columns": columns,
"column_count": len(columns),
"foreign_keys": foreign_keys,
}
# Optionally include indexes
if include_indexes:
cursor.execute("""
SELECT
i.name AS INDEX_NAME,
i.type_desc AS INDEX_TYPE,
COL_NAME(ic.object_id, ic.column_id) AS COLUMN_NAME
FROM sys.indexes i
JOIN sys.index_columns ic ON i.object_id = ic.object_id AND i.index_id = ic.index_id
WHERE i.object_id = OBJECT_ID(?)
ORDER BY i.name, ic.key_ordinal
""", table_name)
indexes = {}
for row in cursor.fetchall():
idx_name = row[0]
if idx_name not in indexes:
indexes[idx_name] = {
"name": idx_name,
"type": row[1],
"columns": [],
}
indexes[idx_name]["columns"].append(row[2])
result["indexes"] = list(indexes.values())
return result
except pyodbc.Error as e:
return {"error": f"Schema inspection failed: {str(e)}"}
finally:
if connection:
connection.close()
@mcp.tool()
def mssql_execute_procedure(
procedure_name: str,
parameters: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""
Execute a stored procedure.
Use this to call stored procedures with optional parameters.
Args:
procedure_name: Name of the stored procedure to execute
parameters: Optional dict of parameter names to values
Returns:
Dict with result sets and return value
Example:
{
"return_value": 0,
"result_sets": [
{
"columns": ["id", "name"],
"rows": [{"id": 1, "name": "Test"}]
}
],
"messages": ["Procedure executed successfully"]
}
"""
if not procedure_name or len(procedure_name.strip()) == 0:
return {"error": "Procedure name cannot be empty"}
connection, error = _create_connection()
if error:
return {"error": error}
try:
cursor = connection.cursor()
# Build parameter placeholders
if parameters:
param_values = list(parameters.values())
placeholders = ", ".join(["?"] * len(param_values))
sql = f"EXEC {procedure_name} {placeholders}"
cursor.execute(sql, param_values)
else:
sql = f"EXEC {procedure_name}"
cursor.execute(sql)
# Collect all result sets
result_sets = []
while True:
if cursor.description:
columns = [column[0] for column in cursor.description]
rows = []
for row in cursor.fetchall():
row_dict = {}
for i, column in enumerate(columns):
value = row[i]
if hasattr(value, 'isoformat'):
value = value.isoformat()
row_dict[column] = value
rows.append(row_dict)
result_sets.append({
"columns": columns,
"rows": rows,
})
if not cursor.nextset():
break
connection.commit()
return {
"success": True,
"procedure": procedure_name,
"result_sets": result_sets,
"result_set_count": len(result_sets),
}
except pyodbc.Error as e:
if connection:
connection.rollback()
return {
"success": False,
"error": f"Procedure execution failed: {str(e)}",
}
finally:
if connection:
connection.close()
+237
View File
@@ -0,0 +1,237 @@
"""
Test MSSQL Schema Discovery
Verifies that the mssql_get_schema functionality works correctly.
"""
import os
import sys
import io
from dotenv import load_dotenv
import pyodbc
# Force UTF-8 encoding for console output
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
# Load environment variables from .env file
load_dotenv()
# Database connection settings
SERVER = os.getenv('MSSQL_SERVER', r'MONSTER\MSSQLSERVERR')
DATABASE = os.getenv('MSSQL_DATABASE', 'AdenTestDB')
USERNAME = os.getenv('MSSQL_USERNAME')
PASSWORD = os.getenv('MSSQL_PASSWORD')
def get_connection():
"""Create and return a database connection."""
if USERNAME and PASSWORD:
connection_string = (
f'DRIVER={{ODBC Driver 17 for SQL Server}};'
f'SERVER={SERVER};'
f'DATABASE={DATABASE};'
f'UID={USERNAME};'
f'PWD={PASSWORD};'
)
else:
connection_string = (
f'DRIVER={{ODBC Driver 17 for SQL Server}};'
f'SERVER={SERVER};'
f'DATABASE={DATABASE};'
f'Trusted_Connection=yes;'
)
return pyodbc.connect(connection_string, timeout=10)
def list_all_tables(cursor):
"""List all tables in the database."""
cursor.execute("""
SELECT TABLE_NAME
FROM INFORMATION_SCHEMA.TABLES
WHERE TABLE_TYPE = 'BASE TABLE'
ORDER BY TABLE_NAME
""")
tables = [row[0] for row in cursor.fetchall()]
return tables
def get_table_schema(cursor, table_name):
"""Get detailed schema for a specific table."""
# Get columns with primary key information
cursor.execute("""
SELECT
c.COLUMN_NAME,
c.DATA_TYPE,
c.CHARACTER_MAXIMUM_LENGTH,
c.NUMERIC_PRECISION,
c.NUMERIC_SCALE,
c.IS_NULLABLE,
CASE WHEN pk.COLUMN_NAME IS NOT NULL THEN 1 ELSE 0 END AS IS_PRIMARY_KEY
FROM INFORMATION_SCHEMA.COLUMNS c
LEFT JOIN (
SELECT ku.COLUMN_NAME
FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS tc
JOIN INFORMATION_SCHEMA.KEY_COLUMN_USAGE ku
ON tc.CONSTRAINT_NAME = ku.CONSTRAINT_NAME
WHERE tc.CONSTRAINT_TYPE = 'PRIMARY KEY'
AND tc.TABLE_NAME = ?
) pk ON c.COLUMN_NAME = pk.COLUMN_NAME
WHERE c.TABLE_NAME = ?
ORDER BY c.ORDINAL_POSITION
""", table_name, table_name)
columns = []
for row in cursor.fetchall():
col_type = row[1]
# Add length/precision info
if row[2]: # CHARACTER_MAXIMUM_LENGTH
col_type += f"({row[2]})"
elif row[3]: # NUMERIC_PRECISION
if row[4]: # NUMERIC_SCALE
col_type += f"({row[3]},{row[4]})"
else:
col_type += f"({row[3]})"
columns.append({
"name": row[0],
"type": col_type,
"nullable": row[5] == "YES",
"primary_key": bool(row[6]),
})
# Get foreign keys
cursor.execute("""
SELECT
kcu.COLUMN_NAME,
ccu.TABLE_NAME AS REFERENCED_TABLE,
ccu.COLUMN_NAME AS REFERENCED_COLUMN
FROM INFORMATION_SCHEMA.REFERENTIAL_CONSTRAINTS rc
JOIN INFORMATION_SCHEMA.KEY_COLUMN_USAGE kcu
ON rc.CONSTRAINT_NAME = kcu.CONSTRAINT_NAME
JOIN INFORMATION_SCHEMA.CONSTRAINT_COLUMN_USAGE ccu
ON rc.UNIQUE_CONSTRAINT_NAME = ccu.CONSTRAINT_NAME
WHERE kcu.TABLE_NAME = ?
""", table_name)
foreign_keys = []
for row in cursor.fetchall():
foreign_keys.append({
"column": row[0],
"references_table": row[1],
"references_column": row[2],
})
return {
"table": table_name,
"columns": columns,
"foreign_keys": foreign_keys
}
def print_table_schema(schema, is_last=False):
"""Pretty print table schema."""
table_name = schema["table"]
columns = schema["columns"]
foreign_keys = schema["foreign_keys"]
print(f"\n📋 Table: {table_name}")
print("=" * 80)
# Print columns
print(f"\n Columns ({len(columns)}):")
print(" " + "-" * 76)
print(f" {'Column Name':<30} {'Type':<25} {'Nullable':<10} {'PK':<5}")
print(" " + "-" * 76)
for col in columns:
pk_mark = "" if col["primary_key"] else ""
nullable = "YES" if col["nullable"] else "NO"
print(f" {col['name']:<30} {col['type']:<25} {nullable:<10} {pk_mark:<5}")
# Print foreign keys
if foreign_keys:
print(f"\n Foreign Keys ({len(foreign_keys)}):")
print(" " + "-" * 76)
for fk in foreign_keys:
print(f" {fk['column']}{fk['references_table']}({fk['references_column']})")
else:
print("\n Foreign Keys: None")
print()
if not is_last:
print("" * 80)
def main():
"""Main test function."""
try:
print("=" * 80)
print(" MSSQL SCHEMA DISCOVERY TEST")
print("=" * 80)
print(f"Server: {SERVER}")
print(f"Database: {DATABASE}")
print()
# Connect to database
print("Connecting to database...")
connection = get_connection()
cursor = connection.cursor()
print("✓ Connected successfully!")
print()
# List all tables
print("=" * 80)
print(" DISCOVERING DATABASE SCHEMA")
print("=" * 80)
tables = list_all_tables(cursor)
print(f"\n✓ Found {len(tables)} table(s) in the database:")
for i, table in enumerate(tables, 1):
print(f" {i}. {table}")
# Get detailed schema for each table
print("\n" + "=" * 80)
print(" DETAILED SCHEMA INFORMATION")
print("=" * 80)
for i, table in enumerate(tables):
schema = get_table_schema(cursor, table)
is_last = (i == len(tables) - 1)
print_table_schema(schema, is_last)
# Summary
print("=" * 80)
print(" SUMMARY")
print("=" * 80)
print(f"✓ Total Tables: {len(tables)}")
total_columns = 0
total_fks = 0
for table in tables:
schema = get_table_schema(cursor, table)
total_columns += len(schema["columns"])
total_fks += len(schema["foreign_keys"])
print(f"✓ Total Columns: {total_columns}")
print(f"✓ Total Foreign Keys: {total_fks}")
print()
print("✓ Schema discovery completed successfully!")
print("=" * 80)
connection.close()
except pyodbc.Error as e:
print("\n[ERROR] Database operation failed!")
print(f"Error detail: {str(e)}")
return 1
except Exception as e:
print(f"\n[ERROR] Unexpected error: {str(e)}")
return 1
return 0
if __name__ == "__main__":
sys.exit(main())
+85
View File
@@ -0,0 +1,85 @@
"""
Query to find top 3 highest paid employees
"""
import os
import io
import sys
import pyodbc
from dotenv import load_dotenv
# Force UTF-8 encoding for console output
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
# Load environment variables
load_dotenv()
# Database connection settings
SERVER = os.getenv('MSSQL_SERVER', r'MONSTER\MSSQLSERVERR')
DATABASE = os.getenv('MSSQL_DATABASE', 'AdenTestDB')
USERNAME = os.getenv('MSSQL_USERNAME')
PASSWORD = os.getenv('MSSQL_PASSWORD')
def main():
connection = None
try:
# Connect to database
if USERNAME and PASSWORD:
connection_string = (
f'DRIVER={{ODBC Driver 17 for SQL Server}};'
f'SERVER={SERVER};'
f'DATABASE={DATABASE};'
f'UID={USERNAME};'
f'PWD={PASSWORD};'
)
else:
connection_string = (
f'DRIVER={{ODBC Driver 17 for SQL Server}};'
f'SERVER={SERVER};'
f'DATABASE={DATABASE};'
f'Trusted_Connection=yes;'
)
connection = pyodbc.connect(connection_string)
cursor = connection.cursor()
# Query for top 3 highest paid employees
query = """
SELECT TOP 3
e.first_name + ' ' + e.last_name AS full_name,
e.email,
d.name AS department,
e.salary
FROM Employees e
INNER JOIN Departments d ON e.department_id = d.department_id
ORDER BY e.salary DESC
"""
cursor.execute(query)
print("\n## 💰 Top 3 Highest Paid Employees\n")
print("| Rank | Employee Name | Email | Department | Salary |")
print("|------|---------------|-------|------------|--------|")
rank = 1
for row in cursor:
name = row[0]
email = row[1]
department = row[2]
salary = f"${row[3]:,.2f}"
print(f"| {rank} | {name} | {email} | {department} | {salary} |")
rank += 1
print()
except pyodbc.Error as e:
print(f"\n[ERROR] Database operation failed: {str(e)}")
except Exception as e:
print(f"\n[ERROR] Unexpected error: {str(e)}")
finally:
if connection:
connection.close()
if __name__ == "__main__":
main()