feat: add Plaid banking integration

- Implement Plaid connector for account balances
- Add transaction history retrieval
- Include GL reconciliation functionality
- Add institution metadata lookup
- Include comprehensive tests and documentation

Closes #4016
This commit is contained in:
Vasu Bansal
2026-02-08 20:52:47 +05:30
parent d562670425
commit 276aad6f0d
5 changed files with 574 additions and 0 deletions
+52
View File
@@ -0,0 +1,52 @@
# Plaid Banking Integration
## Supported Operations
- Fetch account balances
- Retrieve transaction history
- Reconcile with GL entries
- Institution metadata lookup
## Configuration
- PLAID_CLIENT_ID
- PLAID_SECRET
- PLAID_ENV
- PLAID_ACCESS_TOKEN
## Usage
...
from integrations.plaid import create_plaid_connector_from_env
connector = create_plaid_connector_from_env()
# Fetch accounts
accounts = connector.fetch_accounts()
# Fetch transactions
transactions = connector.fetch_transactions(
start_date="2024-01-01",
end_date="2024-01-31"
)
# Reconcile with GL
report = connector.reconcile_with_gl(gl_entries=[
{'amount': 100.00, 'date': '2024-01-15', 'description': 'Vendor payment'}
])
## Security
- Uses Plaid's official Python SDK
- Read-only access by default
- Secure token management via Hive credential store
- Supports sandbox, development, and production environments
## API Coverage
- /accounts/get - Account balances
- /transactions/get - Transaction history
- /institutions/get_by_id - Institution metadata
- Reconciliation engine for GL matching
## Prerequisites
pip install plaid-python
## Links
- [Plaid Documentation](https://plaid.com/docs)
- [Plaid Dashboard](https://dashboard.plaid.com)
+16
View File
@@ -0,0 +1,16 @@
"""
Plaid Integration for Hive
Secure banking data access via Plaid API.
"""
from .connector import PlaidConnector, PlaidConfig, create_plaid_connector_from_env
from .credentials import PlaidCredentials, PLAID_CREDENTIAL_SPEC
__all__ = [
"PlaidConnector",
"PlaidConfig",
"PlaidCredentials",
"PLAID_CREDENTIAL_SPEC",
"create_plaid_connector_from_env"
]
+262
View File
@@ -0,0 +1,262 @@
"""
Plaid Integration for Banking Data
Secure access to bank accounts, transactions, and balances.
"""
import os
from typing import Any, Optional
from dataclasses import dataclass
from datetime import datetime, timedelta
try:
from plaid.api import plaid_api
from plaid.model.transactions_get_request import TransactionsGetRequest
from plaid.model.accounts_get_request import AccountsGetRequest
from plaid.model.item_public_token_exchange_request import ItemPublicTokenExchangeRequest
from plaid.configuration import Configuration
from plaid.api_client import ApiClient
PLAID_AVAILABLE = True
except ImportError:
PLAID_AVAILABLE = False
@dataclass
class PlaidConfig:
"""Configuration for Plaid API."""
client_id: str
secret: str
environment: str = "sandbox" # sandbox, development, production
access_token: Optional[str] = None # Obtained after account linking
class PlaidConnector:
"""
Connector for Plaid banking API.
Supports:
- Account balances
- Transaction history
- Institution metadata
- Secure token management
"""
def __init__(self, config: PlaidConfig):
if not PLAID_AVAILABLE:
raise ImportError("plaid-python package required. Install: pip install plaid-python")
self.config = config
self.client = self._create_client()
def _create_client(self):
"""Initialize Plaid API client."""
configuration = Configuration(
host=self._get_host(),
api_key={
'clientId': self.config.client_id,
'secret': self.config.secret
}
)
api_client = ApiClient(configuration)
return plaid_api.PlaidApi(api_client)
def _get_host(self) -> str:
"""Get Plaid API host based on environment."""
hosts = {
"sandbox": "https://sandbox.plaid.com",
"development": "https://development.plaid.com",
"production": "https://production.plaid.com"
}
return hosts.get(self.config.environment, hosts["sandbox"])
def fetch_accounts(self) -> list[dict]:
"""
Fetch all connected bank accounts.
Returns:
List of account dictionaries with balance info
"""
if not self.config.access_token:
raise ValueError("No access token. Link account first.")
try:
request = AccountsGetRequest(access_token=self.config.access_token)
response = self.client.accounts_get(request)
accounts = []
for account in response['accounts']:
accounts.append({
'account_id': account['account_id'],
'name': account['name'],
'type': account['type'],
'subtype': account.get('subtype'),
'balance': {
'available': account['balances'].get('available'),
'current': account['balances'].get('current'),
'currency': account['balances'].get('iso_currency_code', 'USD')
},
'institution': response['item'].get('institution_id')
})
return accounts
except Exception as e:
raise ConnectionError(f"Failed to fetch accounts: {e}")
def fetch_transactions(
self,
start_date: Optional[str] = None,
end_date: Optional[str] = None,
account_ids: Optional[list[str]] = None
) -> list[dict]:
"""
Fetch transactions for specified date range.
Args:
start_date: Start date (YYYY-MM-DD), defaults to 30 days ago
end_date: End date (YYYY-MM-DD), defaults to today
account_ids: Specific accounts to query, or all if None
Returns:
List of transaction dictionaries
"""
if not self.config.access_token:
raise ValueError("No access token. Link account first.")
# Default to last 30 days
if not end_date:
end_date = datetime.now().strftime('%Y-%m-%d')
if not start_date:
start_date = (datetime.now() - timedelta(days=30)).strftime('%Y-%m-%d')
try:
request = TransactionsGetRequest(
access_token=self.config.access_token,
start_date=start_date,
end_date=end_date,
account_ids=account_ids or []
)
response = self.client.transactions_get(request)
transactions = []
for txn in response['transactions']:
transactions.append({
'transaction_id': txn['transaction_id'],
'account_id': txn['account_id'],
'amount': txn['amount'],
'currency': txn.get('iso_currency_code', 'USD'),
'date': txn['date'],
'name': txn['name'],
'merchant_name': txn.get('merchant_name'),
'category': txn.get('category', []),
'pending': txn['pending'],
'payment_channel': txn.get('payment_channel')
})
return transactions
except Exception as e:
raise ConnectionError(f"Failed to fetch transactions: {e}")
def reconcile_with_gl(
self,
gl_entries: list[dict],
tolerance: float = 0.01
) -> dict:
"""
Reconcile bank transactions with GL entries.
Args:
gl_entries: List of GL entries with 'amount', 'date', 'description'
tolerance: Amount difference tolerance for matching
Returns:
Reconciliation report with matched/unmatched items
"""
# Fetch bank transactions
bank_txns = self.fetch_transactions()
matched = []
unmatched_bank = []
unmatched_gl = []
# Simple matching algorithm (can be enhanced)
for gl in gl_entries:
found_match = False
for txn in bank_txns:
# Match by amount (within tolerance) and approximate date
amount_match = abs(abs(txn['amount']) - abs(gl['amount'])) <= tolerance
date_match = txn['date'] == gl.get('date', txn['date'])
if amount_match and date_match:
matched.append({
'gl_entry': gl,
'bank_transaction': txn,
'match_confidence': 'high' if amount_match and date_match else 'medium'
})
found_match = True
break
if not found_match:
unmatched_gl.append(gl)
# Bank transactions not in GL
matched_bank_ids = {m['bank_transaction']['transaction_id'] for m in matched}
unmatched_bank = [t for t in bank_txns if t['transaction_id'] not in matched_bank_ids]
return {
'matched': matched,
'unmatched_gl': unmatched_gl,
'unmatched_bank': unmatched_bank,
'summary': {
'total_gl_entries': len(gl_entries),
'total_bank_transactions': len(bank_txns),
'matched_count': len(matched),
'unmatched_gl_count': len(unmatched_gl),
'unmatched_bank_count': len(unmatched_bank)
}
}
def get_institution(self, institution_id: str) -> dict:
"""Get institution metadata."""
try:
from plaid.model.institutions_get_by_id_request import InstitutionsGetByIdRequest
request = InstitutionsGetByIdRequest(
institution_id=institution_id,
country_codes=['US']
)
response = self.client.institutions_get_by_id(request)
inst = response['institution']
return {
'institution_id': inst['institution_id'],
'name': inst['name'],
'products': inst.get('products', []),
'country_codes': inst.get('country_codes', [])
}
except Exception as e:
raise ConnectionError(f"Failed to get institution: {e}")
def health_check(self) -> bool:
"""Verify Plaid API connectivity."""
try:
# Try to fetch accounts as health check
if self.config.access_token:
self.fetch_accounts()
return True
except Exception:
return False
# Factory function
def create_plaid_connector_from_env() -> PlaidConnector:
"""Create connector from environment variables."""
config = PlaidConfig(
client_id=os.environ.get("PLAID_CLIENT_ID", ""),
secret=os.environ.get("PLAID_SECRET", ""),
environment=os.environ.get("PLAID_ENV", "sandbox"),
access_token=os.environ.get("PLAID_ACCESS_TOKEN")
)
return PlaidConnector(config)
+81
View File
@@ -0,0 +1,81 @@
"""
Plaid Credential Management
Follows Hive's credentialSpec pattern for secure banking authentication.
"""
from dataclasses import dataclass
from typing import Optional
from framework.credentials.credential_spec import CredentialSpec
from framework.credentials.credential_store import CredentialStore
@dataclass
class PlaidCredentials:
"""Plaid API authentication credentials."""
client_id: str
secret: str
environment: str = "sandbox"
access_token: Optional[str] = None
public_token: Optional[str] = None
@classmethod
def from_credential_store(cls, store: CredentialStore, credential_id: str) -> "PlaidCredentials":
"""Load credentials from Hive's credential store."""
spec = store.get(credential_id)
return cls(
client_id=spec.get("client_id"),
secret=spec.get("secret"),
environment=spec.get("environment", "sandbox"),
access_token=spec.get("access_token"),
public_token=spec.get("public_token")
)
def to_credential_spec(self) -> CredentialSpec:
"""Convert to Hive credential spec format."""
return CredentialSpec(
credential_type="plaid",
config={
"client_id": self.client_id,
"secret": self.secret,
"environment": self.environment,
"access_token": self.access_token,
"public_token": self.public_token
}
)
# Credential specification for validation
PLAID_CREDENTIAL_SPEC = {
"type": "object",
"required": ["client_id", "secret"],
"properties": {
"client_id": {
"type": "string",
"description": "Plaid client ID"
},
"secret": {
"type": "string",
"description": "Plaid secret key",
"sensitive": True
},
"environment": {
"type": "string",
"enum": ["sandbox", "development", "production"],
"default": "sandbox",
"description": "Plaid environment"
},
"access_token": {
"type": "string",
"description": "Plaid access token (obtained after account linking)",
"sensitive": True
},
"public_token": {
"type": "string",
"description": "Temporary public token for account linking",
"sensitive": True
}
}
}
+163
View File
@@ -0,0 +1,163 @@
"""
Unit tests for Plaid connector.
"""
import pytest
from unittest.mock import Mock, patch, MagicMock
from integrations.plaid.connector import PlaidConnector, PlaidConfig
@pytest.fixture
def mock_config():
return PlaidConfig(
client_id="test_client_id",
secret="test_secret",
environment="sandbox",
access_token="test_access_token"
)
@pytest.fixture
def connector(mock_config):
with patch('integrations.plaid.connector.plaid_api') as mock_plaid:
mock_client = MagicMock()
mock_plaid.PlaidApi.return_value = mock_client
return PlaidConnector(mock_config)
class TestPlaidConnector:
"""Test suite for Plaid connector."""
def test_initialization(self, connector, mock_config):
"""Test connector initializes correctly."""
assert connector.config == mock_config
def test_fetch_accounts_success(self, connector):
"""Test fetching accounts."""
mock_response = {
'accounts': [
{
'account_id': 'acc_123',
'name': 'Checking',
'type': 'depository',
'subtype': 'checking',
'balances': {
'available': 1000.00,
'current': 1200.00,
'iso_currency_code': 'USD'
}
}
],
'item': {'institution_id': 'ins_123'}
}
connector.client.accounts_get.return_value = mock_response
accounts = connector.fetch_accounts()
assert len(accounts) == 1
assert accounts[0]['account_id'] == 'acc_123'
assert accounts[0]['balance']['available'] == 1000.00
def test_fetch_transactions_success(self, connector):
"""Test fetching transactions."""
mock_response = {
'transactions': [
{
'transaction_id': 'txn_123',
'account_id': 'acc_123',
'amount': 50.00,
'date': '2024-01-15',
'name': 'Coffee Shop',
'merchant_name': 'Starbucks',
'category': ['Food and Drink', 'Coffee Shop'],
'pending': False,
'payment_channel': 'in store'
}
],
'total_transactions': 1
}
connector.client.transactions_get.return_value = mock_response
transactions = connector.fetch_transactions(
start_date="2024-01-01",
end_date="2024-01-31"
)
assert len(transactions) == 1
assert transactions[0]['transaction_id'] == 'txn_123'
assert transactions[0]['amount'] == 50.00
def test_reconcile_with_gl(self, connector):
"""Test GL reconciliation."""
# Mock transactions
mock_txns = {
'transactions': [
{
'transaction_id': 'txn_123',
'account_id': 'acc_123',
'amount': 100.00,
'date': '2024-01-15',
'name': 'Vendor Payment'
}
],
'total_transactions': 1
}
connector.client.transactions_get.return_value = mock_txns
gl_entries = [
{'amount': 100.00, 'date': '2024-01-15', 'description': 'Vendor payment'}
]
report = connector.reconcile_with_gl(gl_entries)
assert report['summary']['matched_count'] == 1
assert report['summary']['unmatched_gl_count'] == 0
def test_reconcile_with_unmatched_entries(self, connector):
"""Test reconciliation with unmatched GL entries."""
mock_txns = {
'transactions': [],
'total_transactions': 0
}
connector.client.transactions_get.return_value = mock_txns
gl_entries = [
{'amount': 100.00, 'date': '2024-01-15', 'description': 'Missing txn'}
]
report = connector.reconcile_with_gl(gl_entries)
assert report['summary']['matched_count'] == 0
assert report['summary']['unmatched_gl_count'] == 1
def test_health_check_success(self, connector):
"""Test health check returns True."""
mock_response = {
'accounts': [],
'item': {}
}
connector.client.accounts_get.return_value = mock_response
assert connector.health_check() is True
def test_health_check_failure(self, connector):
"""Test health check returns False on error."""
connector.client.accounts_get.side_effect = Exception("API error")
assert connector.health_check() is False
def test_no_access_token_error(self, connector):
"""Test error when no access token provided."""
connector.config.access_token = None
with pytest.raises(ValueError, match="No access token"):
connector.fetch_accounts()
def test_plaid_not_installed(self):
"""Test error when plaid-python not installed."""
with patch('integrations.plaid.connector.PLAID_AVAILABLE', False):
with pytest.raises(ImportError):
PlaidConnector(Mock())