Data Migration Architecture Overview
Successful Salesforce data migration requires careful planning, robust architecture, and proven methodologies. This guide provides technical leaders with comprehensive strategies for migrating data from legacy systems to Salesforce while maintaining data integrity and minimizing business disruption.
Data migration is often the most complex and risky aspect of Salesforce implementations. With proper planning and execution strategies, you can migrate millions of records while ensuring data quality, maintaining relationships, and achieving optimal performance.
Migration Planning and Assessment
## Data Migration Readiness Assessment
### Data Profiling and Analysis
```python
import pandas as pd
import numpy as np
from sqlalchemy import create_engine
import logging
class DataProfiler:
def __init__(self, source_connection_string):
self.engine = create_engine(source_connection_string)
self.logger = logging.getLogger(__name__)
def profile_table(self, table_name, sample_size=None):
"""
Comprehensive data profiling for migration planning
"""
# Load data
if sample_size:
query = f"SELECT * FROM {table_name} TABLESAMPLE({sample_size} PERCENT)"
else:
query = f"SELECT * FROM {table_name}"
df = pd.read_sql(query, self.engine)
profile_report = {
'table_name': table_name,
'row_count': len(df),
'column_count': len(df.columns),
'memory_usage': df.memory_usage(deep=True).sum() / 1024**2, # MB
'columns': {}
}
# Analyze each column
for column in df.columns:
col_profile = self.profile_column(df[column])
profile_report['columns'][column] = col_profile
# Identify data quality issues
profile_report['quality_issues'] = self.identify_quality_issues(df)
# Identify relationships
profile_report['potential_keys'] = self.identify_potential_keys(df)
return profile_report
def profile_column(self, series):
"""
Detailed column profiling
"""
profile = {
'data_type': str(series.dtype),
'null_count': series.isnull().sum(),
'null_percentage': (series.isnull().sum() / len(series)) * 100,
'unique_count': series.nunique(),
'unique_percentage': (series.nunique() / len(series)) * 100
}
# Additional profiling for different data types
if series.dtype in ['int64', 'float64']:
profile.update({
'min': series.min(),
'max': series.max(),
'mean': series.mean(),
'median': series.median(),
'std': series.std()
})
elif series.dtype == 'object':
profile.update({
'min_length': series.str.len().min(),
'max_length': series.str.len().max(),
'avg_length': series.str.len().mean(),
'sample_values': series.value_counts().head(10).to_dict()
})
# Check for patterns
profile['patterns'] = self.detect_patterns(series)
return profile
def identify_quality_issues(self, df):
"""
Identify common data quality issues
"""
issues = []
for column in df.columns:
# Check for high null percentage
null_pct = (df[column].isnull().sum() / len(df)) * 100
if null_pct > 50:
issues.append({
'column': column,
'issue': 'high_null_percentage',
'severity': 'high',
'details': f'{null_pct:.2f}% null values'
})
# Check for duplicate values in potential unique fields
if df[column].dtype == 'object' and any(keyword in column.lower() for keyword in ['id', 'code', 'number']):
duplicates = df[column].duplicated().sum()
if duplicates > 0:
issues.append({
'column': column,
'issue': 'duplicate_values',
'severity': 'medium',
'details': f'{duplicates} duplicate values found'
})
# Check for invalid formats
if df[column].dtype == 'object':
# Email validation
if 'email' in column.lower():
invalid_emails = df[~df[column].str.match(
r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+.[a-zA-Z]{2,}$',
na=False
)][column].count()
if invalid_emails > 0:
issues.append({
'column': column,
'issue': 'invalid_email_format',
'severity': 'medium',
'details': f'{invalid_emails} invalid email addresses'
})
# Phone validation
if 'phone' in column.lower():
invalid_phones = df[~df[column].str.match(
r'^+?1?d{10,14}$',
na=False
)][column].count()
if invalid_phones > 0:
issues.append({
'column': column,
'issue': 'invalid_phone_format',
'severity': 'low',
'details': f'{invalid_phones} invalid phone numbers'
})
return issues
def generate_migration_recommendations(self, profile_report):
"""
Generate migration recommendations based on profiling
"""
recommendations = []
# Data volume recommendations
row_count = profile_report['row_count']
if row_count > 5000000:
recommendations.append({
'category': 'volume',
'recommendation': 'Use Bulk API 2.0 for better performance',
'priority': 'high'
})
elif row_count > 1000000:
recommendations.append({
'category': 'volume',
'recommendation': 'Consider batch processing with parallel jobs',
'priority': 'medium'
})
# Data quality recommendations
for issue in profile_report['quality_issues']:
if issue['severity'] == 'high':
recommendations.append({
'category': 'quality',
'recommendation': f"Address {issue['issue']} in {issue['column']} before migration",
'priority': 'high',
'details': issue['details']
})
return recommendations
```
### Migration Mapping Configuration
```yaml
# migration_config.yaml
migration:
source:
type: "postgresql"
connection:
host: "legacy-db.company.com"
port: 5432
database: "legacy_crm"
target:
type: "salesforce"
org: "production"
api_version: "59.0"
mappings:
- source_object: "customers"
target_object: "Account"
operation: "upsert"
external_id: "Legacy_ID__c"
field_mappings:
- source: "customer_id"
target: "Legacy_ID__c"
transform: "string"
- source: "company_name"
target: "Name"
transform: "truncate(255)"
- source: "annual_revenue"
target: "AnnualRevenue"
transform: "currency_conversion"
- source: "industry_code"
target: "Industry"
transform: "lookup_mapping"
lookup_table: "industry_mapping"
- source: "created_date"
target: "Legacy_Created_Date__c"
transform: "datetime"
- source_object: "contacts"
target_object: "Contact"
operation: "upsert"
external_id: "Legacy_Contact_ID__c"
parent_relationship:
source_field: "customer_id"
target_field: "AccountId"
lookup_object: "Account"
lookup_field: "Legacy_ID__c"
field_mappings:
- source: "contact_id"
target: "Legacy_Contact_ID__c"
- source: "first_name"
target: "FirstName"
- source: "last_name"
target: "LastName"
- source: "email_address"
target: "Email"
validation: "email"
default_on_error: "noemail@company.com"
transformations:
currency_conversion:
type: "custom"
class: "CurrencyTransformer"
lookup_mapping:
type: "mapping_table"
mappings:
"TECH": "Technology"
"FIN": "Financial Services"
"HEALTH": "Healthcare"
"MFG": "Manufacturing"
validation_rules:
- object: "Account"
field: "Name"
rule: "required"
error_action: "skip_record"
- object: "Contact"
field: "Email"
rule: "unique_in_account"
error_action: "log_warning"
```
Data Extraction and Transformation
## ETL Pipeline Implementation
### Scalable Data Extraction Framework
```python
import asyncio
import aiohttp
from datetime import datetime
import json
from typing import List, Dict, Any
import hashlib
class DataExtractor:
def __init__(self, config: Dict[str, Any]):
self.config = config
self.batch_size = config.get('batch_size', 10000)
self.parallel_jobs = config.get('parallel_jobs', 4)
async def extract_data(self, table_name: str, filters: Dict = None) -> List[Dict]:
"""
Extract data with pagination and parallel processing
"""
# Get total record count
total_count = await self.get_record_count(table_name, filters)
# Calculate batches
batches = []
for offset in range(0, total_count, self.batch_size):
batches.append({
'offset': offset,
'limit': min(self.batch_size, total_count - offset)
})
# Process batches in parallel
tasks = []
for i in range(0, len(batches), self.parallel_jobs):
batch_group = batches[i:i + self.parallel_jobs]
group_tasks = [
self.extract_batch(table_name, batch, filters)
for batch in batch_group
]
tasks.extend(group_tasks)
# Execute all tasks
results = await asyncio.gather(*tasks)
# Flatten results
all_records = []
for batch_result in results:
all_records.extend(batch_result)
return all_records
async def extract_batch(
self,
table_name: str,
batch_info: Dict,
filters: Dict = None
) -> List[Dict]:
"""
Extract a single batch of data
"""
query = self.build_query(table_name, batch_info, filters)
async with aiohttp.ClientSession() as session:
async with session.post(
f"{self.config['source_url']}/query",
json={'query': query},
headers={'Authorization': f"Bearer {self.config['api_token']}"}
) as response:
data = await response.json()
return data['records']
def build_query(
self,
table_name: str,
batch_info: Dict,
filters: Dict = None
) -> str:
"""
Build SQL query with filters and pagination
"""
query = f"SELECT * FROM {table_name}"
if filters:
conditions = []
for field, value in filters.items():
if isinstance(value, list):
conditions.append(f"{field} IN ({','.join(map(str, value))})")
else:
conditions.append(f"{field} = {value}")
if conditions:
query += f" WHERE {' AND '.join(conditions)}"
query += f" LIMIT {batch_info['limit']} OFFSET {batch_info['offset']}"
return query
class DataTransformer:
def __init__(self, mapping_config: Dict[str, Any]):
self.mapping_config = mapping_config
self.transformation_stats = {
'total_records': 0,
'transformed_records': 0,
'errors': []
}
def transform_batch(self, records: List[Dict], object_mapping: Dict) -> List[Dict]:
"""
Transform a batch of records according to mapping configuration
"""
transformed_records = []
for record in records:
try:
transformed = self.transform_record(record, object_mapping)
transformed_records.append(transformed)
self.transformation_stats['transformed_records'] += 1
except Exception as e:
self.transformation_stats['errors'].append({
'record_id': record.get('id', 'unknown'),
'error': str(e),
'timestamp': datetime.now().isoformat()
})
self.transformation_stats['total_records'] += 1
return transformed_records
def transform_record(self, source_record: Dict, mapping: Dict) -> Dict:
"""
Transform a single record
"""
target_record = {}
for field_mapping in mapping['field_mappings']:
source_field = field_mapping['source']
target_field = field_mapping['target']
transform_type = field_mapping.get('transform', 'direct')
# Get source value
source_value = self.get_nested_value(source_record, source_field)
# Apply transformation
if transform_type == 'direct':
target_value = source_value
elif transform_type == 'string':
target_value = str(source_value) if source_value is not None else ''
elif transform_type == 'truncate':
max_length = int(field_mapping.get('max_length', 255))
target_value = str(source_value)[:max_length] if source_value else ''
elif transform_type == 'datetime':
target_value = self.transform_datetime(source_value)
elif transform_type == 'lookup_mapping':
target_value = self.apply_lookup(
source_value,
field_mapping.get('lookup_table')
)
elif transform_type == 'custom':
target_value = self.apply_custom_transform(
source_value,
field_mapping.get('custom_function')
)
else:
target_value = source_value
# Apply validation if specified
if 'validation' in field_mapping:
target_value = self.validate_field(
target_value,
field_mapping['validation'],
field_mapping.get('default_on_error')
)
target_record[target_field] = target_value
# Add audit fields
target_record['Migration_Batch_ID__c'] = self.get_batch_id()
target_record['Migration_Date__c'] = datetime.now().isoformat()
target_record['Source_System__c'] = mapping.get('source_system', 'Legacy')
return target_record
def transform_datetime(self, value: Any) -> str:
"""
Transform datetime values to Salesforce format
"""
if value is None:
return None
if isinstance(value, str):
# Parse common datetime formats
for fmt in ['%Y-%m-%d %H:%M:%S', '%Y-%m-%d', '%m/%d/%Y']:
try:
dt = datetime.strptime(value, fmt)
return dt.strftime('%Y-%m-%dT%H:%M:%S.000Z')
except ValueError:
continue
return str(value)
def get_batch_id(self) -> str:
"""
Generate unique batch ID for tracking
"""
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
return f"BATCH_{timestamp}"
class DataValidator:
def __init__(self, validation_rules: List[Dict]):
self.validation_rules = validation_rules
self.validation_results = {
'passed': 0,
'failed': 0,
'warnings': 0,
'errors': []
}
def validate_batch(self, records: List[Dict], object_type: str) -> Dict:
"""
Validate a batch of records
"""
object_rules = [
rule for rule in self.validation_rules
if rule['object'] == object_type
]
validated_records = []
for record in records:
validation_result = self.validate_record(record, object_rules)
if validation_result['status'] == 'passed':
validated_records.append(record)
self.validation_results['passed'] += 1
elif validation_result['status'] == 'warning':
validated_records.append(record)
self.validation_results['warnings'] += 1
else:
self.validation_results['failed'] += 1
self.validation_results['errors'].append({
'record': record,
'errors': validation_result['errors']
})
return {
'valid_records': validated_records,
'validation_summary': self.validation_results
}
def validate_record(self, record: Dict, rules: List[Dict]) -> Dict:
"""
Validate a single record against rules
"""
errors = []
warnings = []
for rule in rules:
field = rule['field']
rule_type = rule['rule']
if rule_type == 'required':
if not record.get(field):
errors.append(f"{field} is required")
elif rule_type == 'unique':
# This would check against existing data
pass
elif rule_type == 'format':
pattern = rule.get('pattern')
if pattern and not self.matches_pattern(record.get(field), pattern):
if rule.get('error_action') == 'warning':
warnings.append(f"{field} format is invalid")
else:
errors.append(f"{field} format is invalid")
if errors:
return {'status': 'failed', 'errors': errors, 'warnings': warnings}
elif warnings:
return {'status': 'warning', 'warnings': warnings}
else:
return {'status': 'passed'}
```
Data Loading Strategies
## Salesforce Bulk API 2.0 Implementation
### High-Performance Data Loader
```python
import requests
import csv
import io
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
from typing import List, Dict, Generator
class SalesforceBulkLoader:
def __init__(self, instance_url: str, access_token: str):
self.instance_url = instance_url
self.access_token = access_token
self.api_version = 'v59.0'
self.headers = {
'Authorization': f'Bearer {access_token}',
'Content-Type': 'application/json'
}
def create_bulk_job(self, object_name: str, operation: str = 'insert') -> str:
"""
Create a bulk job
"""
job_data = {
'object': object_name,
'operation': operation,
'contentType': 'CSV',
'lineEnding': 'LF'
}
response = requests.post(
f'{self.instance_url}/services/data/{self.api_version}/jobs/ingest',
json=job_data,
headers=self.headers
)
if response.status_code == 200:
return response.json()['id']
else:
raise Exception(f"Failed to create job: {response.text}")
def upload_batch(
self,
job_id: str,
records: List[Dict],
batch_size: int = 10000
) -> List[str]:
"""
Upload data in batches
"""
batch_ids = []
# Split records into batches
for i in range(0, len(records), batch_size):
batch = records[i:i + batch_size]
# Convert to CSV
csv_data = self.dict_to_csv(batch)
# Upload batch
response = requests.put(
f'{self.instance_url}/services/data/{self.api_version}/jobs/ingest/{job_id}/batches',
data=csv_data,
headers={
**self.headers,
'Content-Type': 'text/csv'
}
)
if response.status_code == 201:
batch_ids.append(response.headers.get('Batch-Id'))
else:
raise Exception(f"Failed to upload batch: {response.text}")
return batch_ids
def close_job(self, job_id: str) -> Dict:
"""
Close the job to start processing
"""
response = requests.patch(
f'{self.instance_url}/services/data/{self.api_version}/jobs/ingest/{job_id}',
json={'state': 'UploadComplete'},
headers=self.headers
)
return response.json()
def monitor_job(self, job_id: str, poll_interval: int = 5) -> Dict:
"""
Monitor job progress
"""
while True:
response = requests.get(
f'{self.instance_url}/services/data/{self.api_version}/jobs/ingest/{job_id}',
headers=self.headers
)
job_info = response.json()
state = job_info['state']
if state in ['JobComplete', 'Failed', 'Aborted']:
return job_info
print(f"Job {job_id} - State: {state}, "
f"Processed: {job_info.get('numberRecordsProcessed', 0)}, "
f"Failed: {job_info.get('numberRecordsFailed', 0)}")
time.sleep(poll_interval)
def get_job_results(self, job_id: str) -> Generator[Dict, None, None]:
"""
Get successful and failed records
"""
# Get successful records
response = requests.get(
f'{self.instance_url}/services/data/{self.api_version}/jobs/ingest/{job_id}/successfulResults',
headers={**self.headers, 'Accept': 'text/csv'}
)
if response.status_code == 200:
reader = csv.DictReader(io.StringIO(response.text))
for row in reader:
yield {'type': 'success', 'record': row}
# Get failed records
response = requests.get(
f'{self.instance_url}/services/data/{self.api_version}/jobs/ingest/{job_id}/failedResults',
headers={**self.headers, 'Accept': 'text/csv'}
)
if response.status_code == 200:
reader = csv.DictReader(io.StringIO(response.text))
for row in reader:
yield {'type': 'failed', 'record': row}
def dict_to_csv(self, records: List[Dict]) -> str:
"""
Convert list of dictionaries to CSV
"""
if not records:
return ""
output = io.StringIO()
writer = csv.DictWriter(output, fieldnames=records[0].keys())
writer.writeheader()
writer.writerows(records)
return output.getvalue()
class ParallelDataLoader:
def __init__(self, sf_loader: SalesforceBulkLoader, max_workers: int = 4):
self.sf_loader = sf_loader
self.max_workers = max_workers
self.load_statistics = {
'total_records': 0,
'successful_records': 0,
'failed_records': 0,
'jobs': []
}
def load_data(
self,
data_batches: List[Dict],
object_name: str,
operation: str = 'insert'
) -> Dict:
"""
Load data using parallel jobs
"""
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
# Submit jobs
future_to_batch = {}
for batch_data in data_batches:
future = executor.submit(
self.load_single_batch,
batch_data['records'],
object_name,
operation,
batch_data.get('batch_id')
)
future_to_batch[future] = batch_data
# Process completed jobs
for future in as_completed(future_to_batch):
batch_data = future_to_batch[future]
try:
result = future.result()
self.load_statistics['jobs'].append(result)
self.load_statistics['total_records'] += result['total_records']
self.load_statistics['successful_records'] += result['successful_records']
self.load_statistics['failed_records'] += result['failed_records']
except Exception as e:
print(f"Batch {batch_data.get('batch_id')} failed: {str(e)}")
self.load_statistics['failed_records'] += len(batch_data['records'])
return self.load_statistics
def load_single_batch(
self,
records: List[Dict],
object_name: str,
operation: str,
batch_id: str = None
) -> Dict:
"""
Load a single batch of data
"""
# Create job
job_id = self.sf_loader.create_bulk_job(object_name, operation)
# Upload data
self.sf_loader.upload_batch(job_id, records)
# Close job
self.sf_loader.close_job(job_id)
# Monitor job
job_result = self.sf_loader.monitor_job(job_id)
# Process results
successful_records = []
failed_records = []
for result in self.sf_loader.get_job_results(job_id):
if result['type'] == 'success':
successful_records.append(result['record'])
else:
failed_records.append(result['record'])
return {
'job_id': job_id,
'batch_id': batch_id,
'total_records': len(records),
'successful_records': len(successful_records),
'failed_records': len(failed_records),
'job_state': job_result['state'],
'processing_time': job_result.get('totalProcessingTime', 0)
}
```
### Relationship Management During Migration
```apex
public class RelationshipMigrationHandler {
// Handle parent-child relationships
public static void resolveRelationships(
List<Migration_Staging__c> stagingRecords
) {
// Group records by object type
Map<String, List<Migration_Staging__c>> recordsByType =
new Map<String, List<Migration_Staging__c>>();
for (Migration_Staging__c record : stagingRecords) {
if (!recordsByType.containsKey(record.Object_Type__c)) {
recordsByType.put(record.Object_Type__c, new List<Migration_Staging__c>());
}
recordsByType.get(record.Object_Type__c).add(record);
}
// Process parent objects first
List<String> objectOrder = getObjectProcessingOrder();
for (String objectType : objectOrder) {
if (recordsByType.containsKey(objectType)) {
processObjectRelationships(
objectType,
recordsByType.get(objectType)
);
}
}
}
private static void processObjectRelationships(
String objectType,
List<Migration_Staging__c> records
) {
// Get relationship mappings
List<Relationship_Mapping__mdt> mappings = [
SELECT Source_Field__c, Target_Field__c,
Parent_Object__c, Lookup_Field__c
FROM Relationship_Mapping__mdt
WHERE Child_Object__c = :objectType
];
if (mappings.isEmpty()) {
return;
}
// Build lookup maps
Map<String, Map<String, Id>> lookupMaps =
new Map<String, Map<String, Id>>();
for (Relationship_Mapping__mdt mapping : mappings) {
if (!lookupMaps.containsKey(mapping.Parent_Object__c)) {
lookupMaps.put(
mapping.Parent_Object__c,
buildLookupMap(mapping.Parent_Object__c, mapping.Lookup_Field__c)
);
}
}
// Update staging records with resolved IDs
for (Migration_Staging__c record : records) {
Map<String, Object> data =
(Map<String, Object>) JSON.deserializeUntyped(record.Data__c);
for (Relationship_Mapping__mdt mapping : mappings) {
String sourceValue = (String) data.get(mapping.Source_Field__c);
if (String.isNotBlank(sourceValue)) {
Map<String, Id> lookupMap =
lookupMaps.get(mapping.Parent_Object__c);
if (lookupMap.containsKey(sourceValue)) {
data.put(
mapping.Target_Field__c,
lookupMap.get(sourceValue)
);
} else {
record.Has_Errors__c = true;
record.Error_Message__c =
'Parent record not found: ' + sourceValue;
}
}
}
record.Data__c = JSON.serialize(data);
}
update records;
}
private static Map<String, Id> buildLookupMap(
String objectType,
String lookupField
) {
Map<String, Id> lookupMap = new Map<String, Id>();
String query = String.format(
'SELECT Id, {0} FROM {1} WHERE {0} != null',
new List<String>{lookupField, objectType}
);
for (SObject record : Database.query(query)) {
String lookupValue = (String) record.get(lookupField);
lookupMap.put(lookupValue, record.Id);
}
return lookupMap;
}
// Handle many-to-many relationships
public static void createJunctionRecords(
String junctionObject,
List<Junction_Mapping__c> mappings
) {
List<SObject> junctionRecords = new List<SObject>();
for (Junction_Mapping__c mapping : mappings) {
// Get parent IDs
Id parentId1 = resolveExternalId(
mapping.Parent1_Object__c,
mapping.Parent1_External_Id__c
);
Id parentId2 = resolveExternalId(
mapping.Parent2_Object__c,
mapping.Parent2_External_Id__c
);
if (parentId1 != null && parentId2 != null) {
SObject junction = Schema.getGlobalDescribe()
.get(junctionObject)
.newSObject();
junction.put(mapping.Parent1_Field__c, parentId1);
junction.put(mapping.Parent2_Field__c, parentId2);
// Add any additional fields
if (String.isNotBlank(mapping.Additional_Data__c)) {
Map<String, Object> additionalData =
(Map<String, Object>) JSON.deserializeUntyped(
mapping.Additional_Data__c
);
for (String field : additionalData.keySet()) {
junction.put(field, additionalData.get(field));
}
}
junctionRecords.add(junction);
}
}
if (!junctionRecords.isEmpty()) {
Database.SaveResult[] results =
Database.insert(junctionRecords, false);
handleJunctionResults(results, mappings);
}
}
}
```
Data Validation and Reconciliation
## Post-Migration Validation Framework
### Automated Data Validation
```python
class DataReconciliation:
def __init__(self, source_connection, salesforce_connection):
self.source_conn = source_connection
self.sf_conn = salesforce_connection
self.reconciliation_results = {
'record_counts': {},
'data_integrity': {},
'relationship_validation': {},
'business_rules': {}
}
def perform_reconciliation(self, migration_config: Dict) -> Dict:
"""
Comprehensive post-migration reconciliation
"""
for mapping in migration_config['mappings']:
source_object = mapping['source_object']
target_object = mapping['target_object']
print(f"Reconciling {source_object} -> {target_object}")
# 1. Record count validation
count_result = self.validate_record_counts(
source_object,
target_object,
mapping.get('filters')
)
self.reconciliation_results['record_counts'][target_object] = count_result
# 2. Data integrity validation
integrity_result = self.validate_data_integrity(
source_object,
target_object,
mapping['field_mappings']
)
self.reconciliation_results['data_integrity'][target_object] = integrity_result
# 3. Relationship validation
if 'parent_relationship' in mapping:
relationship_result = self.validate_relationships(
target_object,
mapping['parent_relationship']
)
self.reconciliation_results['relationship_validation'][target_object] = relationship_result
# 4. Business rules validation
if 'validation_rules' in mapping:
rules_result = self.validate_business_rules(
target_object,
mapping['validation_rules']
)
self.reconciliation_results['business_rules'][target_object] = rules_result
return self.reconciliation_results
def validate_record_counts(
self,
source_object: str,
target_object: str,
filters: Dict = None
) -> Dict:
"""
Compare record counts between source and target
"""
# Get source count
source_query = f"SELECT COUNT(*) as count FROM {source_object}"
if filters:
conditions = [f"{k} = '{v}'" for k, v in filters.items()]
source_query += f" WHERE {' AND '.join(conditions)}"
source_count = self.source_conn.execute(source_query).fetchone()['count']
# Get target count
soql = f"SELECT COUNT() FROM {target_object}"
target_count = self.sf_conn.query(soql)['totalSize']
match_percentage = (target_count / source_count * 100) if source_count > 0 else 0
return {
'source_count': source_count,
'target_count': target_count,
'match_percentage': round(match_percentage, 2),
'status': 'PASS' if match_percentage >= 99.9 else 'FAIL',
'missing_records': source_count - target_count
}
def validate_data_integrity(
self,
source_object: str,
target_object: str,
field_mappings: List[Dict]
) -> Dict:
"""
Sample and compare field values
"""
sample_size = min(1000, self.reconciliation_results['record_counts'][target_object]['target_count'])
# Get sample from source
source_fields = [fm['source'] for fm in field_mappings]
source_query = f"SELECT {', '.join(source_fields)} FROM {source_object} ORDER BY RANDOM() LIMIT {sample_size}"
source_sample = pd.read_sql(source_query, self.source_conn)
# Get corresponding records from Salesforce
target_fields = [fm['target'] for fm in field_mappings if fm['target'] != 'Id']
soql = f"SELECT {', '.join(target_fields)} FROM {target_object} LIMIT {sample_size}"
sf_records = []
for record in self.sf_conn.query_all(soql)['records']:
sf_records.append(record)
sf_sample = pd.DataFrame(sf_records)
# Compare field values
field_match_results = {}
for field_mapping in field_mappings:
source_field = field_mapping['source']
target_field = field_mapping['target']
if source_field in source_sample.columns and target_field in sf_sample.columns:
# Apply transformation if needed
source_values = source_sample[source_field].apply(
lambda x: self.apply_transformation(x, field_mapping.get('transform'))
)
target_values = sf_sample[target_field]
# Calculate match rate
matches = sum(source_values == target_values)
match_rate = (matches / len(source_values)) * 100
field_match_results[target_field] = {
'match_rate': round(match_rate, 2),
'sample_size': len(source_values),
'mismatches': len(source_values) - matches
}
overall_match_rate = sum(f['match_rate'] for f in field_match_results.values()) / len(field_match_results)
return {
'overall_match_rate': round(overall_match_rate, 2),
'field_results': field_match_results,
'status': 'PASS' if overall_match_rate >= 95 else 'FAIL'
}
def validate_relationships(
self,
target_object: str,
relationship_config: Dict
) -> Dict:
"""
Validate parent-child relationships
"""
# Query for orphaned records
parent_field = relationship_config['target_field']
soql = f"""
SELECT COUNT()
FROM {target_object}
WHERE {parent_field} = null
AND Legacy_Parent_Id__c != null
"""
orphaned_count = self.sf_conn.query(soql)['totalSize']
# Query for total records with parent references
soql_total = f"""
SELECT COUNT()
FROM {target_object}
WHERE Legacy_Parent_Id__c != null
"""
total_with_parent = self.sf_conn.query(soql_total)['totalSize']
success_rate = ((total_with_parent - orphaned_count) / total_with_parent * 100) if total_with_parent > 0 else 100
return {
'total_with_parent_reference': total_with_parent,
'orphaned_records': orphaned_count,
'relationship_success_rate': round(success_rate, 2),
'status': 'PASS' if success_rate >= 99 else 'FAIL'
}
class MigrationAuditReport:
def __init__(self, reconciliation_results: Dict):
self.results = reconciliation_results
self.report_timestamp = datetime.now()
def generate_html_report(self) -> str:
"""
Generate comprehensive HTML audit report
"""
html = f"""
<!DOCTYPE html>
<html>
<head>
<title>Data Migration Audit Report</title>
<style>
body {{ font-family: Arial, sans-serif; margin: 20px; }}
.header {{ background-color: #f0f0f0; padding: 20px; }}
.summary {{ margin: 20px 0; }}
.pass {{ color: green; font-weight: bold; }}
.fail {{ color: red; font-weight: bold; }}
table {{ border-collapse: collapse; width: 100%; margin: 20px 0; }}
th, td {{ border: 1px solid #ddd; padding: 8px; text-align: left; }}
th {{ background-color: #4CAF50; color: white; }}
.section {{ margin: 30px 0; }}
</style>
</head>
<body>
<div class="header">
<h1>Data Migration Audit Report</h1>
<p>Generated: {self.report_timestamp.strftime('%Y-%m-%d %H:%M:%S')}</p>
</div>
<div class="summary">
<h2>Executive Summary</h2>
{self.generate_summary()}
</div>
<div class="section">
<h2>Record Count Validation</h2>
{self.generate_count_table()}
</div>
<div class="section">
<h2>Data Integrity Validation</h2>
{self.generate_integrity_table()}
</div>
<div class="section">
<h2>Relationship Validation</h2>
{self.generate_relationship_table()}
</div>
<div class="section">
<h2>Recommendations</h2>
{self.generate_recommendations()}
</div>
</body>
</html>
"""
return html
def generate_summary(self) -> str:
"""
Generate executive summary
"""
total_objects = len(self.results['record_counts'])
passed_objects = sum(
1 for r in self.results['record_counts'].values()
if r['status'] == 'PASS'
)
overall_status = 'PASS' if passed_objects == total_objects else 'FAIL'
return f"""
<p>Overall Migration Status: <span class="{overall_status.lower()}">{overall_status}</span></p>
<p>Objects Migrated: {total_objects}</p>
<p>Objects Passed Validation: {passed_objects}</p>
<p>Success Rate: {(passed_objects/total_objects*100):.1f}%</p>
"""
```
Performance Optimization and Best Practices
## Migration Performance Optimization
### Parallel Processing Architecture
```python
import multiprocessing
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
import asyncio
from typing import List, Dict, Callable
class OptimizedMigrationEngine:
def __init__(self, config: Dict):
self.config = config
self.cpu_count = multiprocessing.cpu_count()
self.optimal_workers = min(self.cpu_count * 2, 16)
async def migrate_with_optimization(
self,
source_data: List[Dict],
transformation_func: Callable,
load_func: Callable
) -> Dict:
"""
Optimized migration pipeline with parallel processing
"""
start_time = time.time()
# Phase 1: Parallel data transformation
print(f"Starting transformation with {self.optimal_workers} workers...")
transformed_data = await self.parallel_transform(
source_data,
transformation_func
)
# Phase 2: Batch and deduplicate
print("Batching and deduplicating data...")
batched_data = self.optimize_batches(transformed_data)
# Phase 3: Parallel loading
print(f"Loading {len(batched_data)} batches...")
load_results = await self.parallel_load(batched_data, load_func)
# Calculate performance metrics
end_time = time.time()
total_time = end_time - start_time
records_per_second = len(source_data) / total_time
return {
'total_records': len(source_data),
'total_time': total_time,
'records_per_second': records_per_second,
'batch_count': len(batched_data),
'load_results': load_results
}
async def parallel_transform(
self,
data: List[Dict],
transform_func: Callable
) -> List[Dict]:
"""
Transform data using process pool for CPU-intensive operations
"""
chunk_size = max(len(data) // self.optimal_workers, 1000)
chunks = [data[i:i + chunk_size] for i in range(0, len(data), chunk_size)]
loop = asyncio.get_event_loop()
with ProcessPoolExecutor(max_workers=self.optimal_workers) as executor:
futures = []
for chunk in chunks:
future = loop.run_in_executor(executor, transform_func, chunk)
futures.append(future)
results = await asyncio.gather(*futures)
# Flatten results
transformed_data = []
for chunk_result in results:
transformed_data.extend(chunk_result)
return transformed_data
def optimize_batches(self, data: List[Dict]) -> List[Dict]:
"""
Optimize batch sizes based on data characteristics
"""
# Calculate optimal batch size based on average record size
sample_size = min(100, len(data))
sample_data = data[:sample_size]
avg_record_size = sum(len(str(record)) for record in sample_data) / sample_size
# Salesforce Bulk API has 10MB limit per batch
max_batch_size_bytes = 10 * 1024 * 1024 # 10MB
optimal_batch_size = int(max_batch_size_bytes / avg_record_size * 0.9) # 90% to be safe
# Cap at 10,000 records per batch (Salesforce limit)
optimal_batch_size = min(optimal_batch_size, 10000)
# Create batches
batches = []
for i in range(0, len(data), optimal_batch_size):
batch = data[i:i + optimal_batch_size]
batches.append({
'batch_id': f'BATCH_{i//optimal_batch_size}',
'records': batch,
'size': len(batch)
})
return batches
async def parallel_load(
self,
batches: List[Dict],
load_func: Callable
) -> List[Dict]:
"""
Load batches in parallel with rate limiting
"""
# Salesforce allows up to 15 concurrent Bulk API jobs
max_concurrent_jobs = min(15, len(batches))
results = []
# Use semaphore for rate limiting
semaphore = asyncio.Semaphore(max_concurrent_jobs)
async def load_with_limit(batch):
async with semaphore:
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, load_func, batch)
# Create tasks for all batches
tasks = [load_with_limit(batch) for batch in batches]
# Execute with progress monitoring
for i, task in enumerate(asyncio.as_completed(tasks)):
result = await task
results.append(result)
print(f"Completed batch {i+1}/{len(batches)}")
return results
class MigrationPerformanceMonitor:
def __init__(self):
self.metrics = {
'extraction': {},
'transformation': {},
'loading': {},
'overall': {}
}
def monitor_phase(self, phase: str, func: Callable, *args, **kwargs):
"""
Monitor performance of a migration phase
"""
import psutil
import gc
# Collect initial metrics
process = psutil.Process()
initial_memory = process.memory_info().rss / 1024 / 1024 # MB
initial_cpu = process.cpu_percent(interval=0.1)
# Execute function
start_time = time.time()
result = func(*args, **kwargs)
end_time = time.time()
# Collect final metrics
final_memory = process.memory_info().rss / 1024 / 1024 # MB
final_cpu = process.cpu_percent(interval=0.1)
# Store metrics
self.metrics[phase] = {
'duration': end_time - start_time,
'memory_used': final_memory - initial_memory,
'cpu_usage': (initial_cpu + final_cpu) / 2,
'timestamp': datetime.now().isoformat()
}
# Force garbage collection
gc.collect()
return result
def generate_performance_report(self) -> Dict:
"""
Generate performance optimization recommendations
"""
recommendations = []
# Check extraction performance
if self.metrics['extraction']['duration'] > 300: # 5 minutes
recommendations.append({
'phase': 'extraction',
'issue': 'Slow data extraction',
'recommendation': 'Consider using parallel extraction or indexing source tables'
})
# Check transformation performance
if self.metrics['transformation']['memory_used'] > 1000: # 1GB
recommendations.append({
'phase': 'transformation',
'issue': 'High memory usage during transformation',
'recommendation': 'Process data in smaller chunks or use streaming transformation'
})
# Check loading performance
loading_duration = self.metrics['loading']['duration']
if loading_duration > 600: # 10 minutes
recommendations.append({
'phase': 'loading',
'issue': 'Slow data loading',
'recommendation': 'Increase parallel jobs or use Bulk API 2.0'
})
return {
'metrics': self.metrics,
'recommendations': recommendations,
'optimization_score': self.calculate_optimization_score()
}
def calculate_optimization_score(self) -> float:
"""
Calculate overall optimization score (0-100)
"""
score = 100.0
# Deduct points for performance issues
total_duration = sum(m['duration'] for m in self.metrics.values() if 'duration' in m)
if total_duration > 1800: # 30 minutes
score -= 20
total_memory = sum(m['memory_used'] for m in self.metrics.values() if 'memory_used' in m)
if total_memory > 2000: # 2GB
score -= 15
avg_cpu = sum(m['cpu_usage'] for m in self.metrics.values() if 'cpu_usage' in m) / len(self.metrics)
if avg_cpu > 80:
score -= 10
return max(0, score)
```
### Migration Best Practices Checklist
```python
class MigrationBestPractices:
@staticmethod
def pre_migration_checklist() -> List[Dict]:
"""
Essential pre-migration checklist
"""
return [
{
'category': 'Data Quality',
'items': [
'Perform data profiling and quality assessment',
'Clean and deduplicate source data',
'Standardize data formats (dates, phone numbers, etc.)',
'Validate required fields have values',
'Resolve circular dependencies'
]
},
{
'category': 'Technical Preparation',
'items': [
'Create full backup of source system',
'Set up staging environment',
'Configure API limits and permissions',
'Install and test migration tools',
'Create rollback procedures'
]
},
{
'category': 'Salesforce Configuration',
'items': [
'Create custom fields for legacy IDs',
'Configure external ID fields',
'Disable validation rules and triggers',
'Increase API limits if needed',
'Configure field-level security'
]
},
{
'category': 'Testing',
'items': [
'Perform test migration with subset of data',
'Validate data transformations',
'Test relationship mappings',
'Verify data access and visibility',
'Load test with production volumes'
]
}
]
@staticmethod
def post_migration_checklist() -> List[Dict]:
"""
Post-migration validation checklist
"""
return [
{
'category': 'Data Validation',
'items': [
'Verify record counts match source',
'Validate key field mappings',
'Check parent-child relationships',
'Verify data visibility and sharing',
'Test business processes end-to-end'
]
},
{
'category': 'System Configuration',
'items': [
'Re-enable validation rules',
'Re-enable triggers and workflows',
'Update integration endpoints',
'Configure backup procedures',
'Update documentation'
]
},
{
'category': 'Performance',
'items': [
'Check system performance metrics',
'Optimize slow queries',
'Review and optimize indexes',
'Monitor API usage',
'Implement data archiving if needed'
]
}
]
```
Certified Partner
Salesforce certified consultants
5-Star Rated
Consistently high client satisfaction
200+ Projects
Successfully delivered
Enterprise Ready
Fortune 500 trusted