10 min read

Salesforce Data Migration Strategy

Technical guide for planning and executing successful data migrations

Data Migration Architecture Overview

Successful Salesforce data migration requires careful planning, robust architecture, and proven methodologies. This guide provides technical leaders with comprehensive strategies for migrating data from legacy systems to Salesforce while maintaining data integrity and minimizing business disruption. Data migration is often the most complex and risky aspect of Salesforce implementations. With proper planning and execution strategies, you can migrate millions of records while ensuring data quality, maintaining relationships, and achieving optimal performance.

Migration Planning and Assessment

## Data Migration Readiness Assessment ### Data Profiling and Analysis ```python import pandas as pd import numpy as np from sqlalchemy import create_engine import logging class DataProfiler: def __init__(self, source_connection_string): self.engine = create_engine(source_connection_string) self.logger = logging.getLogger(__name__) def profile_table(self, table_name, sample_size=None): """ Comprehensive data profiling for migration planning """ # Load data if sample_size: query = f"SELECT * FROM {table_name} TABLESAMPLE({sample_size} PERCENT)" else: query = f"SELECT * FROM {table_name}" df = pd.read_sql(query, self.engine) profile_report = { 'table_name': table_name, 'row_count': len(df), 'column_count': len(df.columns), 'memory_usage': df.memory_usage(deep=True).sum() / 1024**2, # MB 'columns': {} } # Analyze each column for column in df.columns: col_profile = self.profile_column(df[column]) profile_report['columns'][column] = col_profile # Identify data quality issues profile_report['quality_issues'] = self.identify_quality_issues(df) # Identify relationships profile_report['potential_keys'] = self.identify_potential_keys(df) return profile_report def profile_column(self, series): """ Detailed column profiling """ profile = { 'data_type': str(series.dtype), 'null_count': series.isnull().sum(), 'null_percentage': (series.isnull().sum() / len(series)) * 100, 'unique_count': series.nunique(), 'unique_percentage': (series.nunique() / len(series)) * 100 } # Additional profiling for different data types if series.dtype in ['int64', 'float64']: profile.update({ 'min': series.min(), 'max': series.max(), 'mean': series.mean(), 'median': series.median(), 'std': series.std() }) elif series.dtype == 'object': profile.update({ 'min_length': series.str.len().min(), 'max_length': series.str.len().max(), 'avg_length': series.str.len().mean(), 'sample_values': series.value_counts().head(10).to_dict() }) # Check for patterns profile['patterns'] = self.detect_patterns(series) return profile def identify_quality_issues(self, df): """ Identify common data quality issues """ issues = [] for column in df.columns: # Check for high null percentage null_pct = (df[column].isnull().sum() / len(df)) * 100 if null_pct > 50: issues.append({ 'column': column, 'issue': 'high_null_percentage', 'severity': 'high', 'details': f'{null_pct:.2f}% null values' }) # Check for duplicate values in potential unique fields if df[column].dtype == 'object' and any(keyword in column.lower() for keyword in ['id', 'code', 'number']): duplicates = df[column].duplicated().sum() if duplicates > 0: issues.append({ 'column': column, 'issue': 'duplicate_values', 'severity': 'medium', 'details': f'{duplicates} duplicate values found' }) # Check for invalid formats if df[column].dtype == 'object': # Email validation if 'email' in column.lower(): invalid_emails = df[~df[column].str.match( r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+.[a-zA-Z]{2,}$', na=False )][column].count() if invalid_emails > 0: issues.append({ 'column': column, 'issue': 'invalid_email_format', 'severity': 'medium', 'details': f'{invalid_emails} invalid email addresses' }) # Phone validation if 'phone' in column.lower(): invalid_phones = df[~df[column].str.match( r'^+?1?d{10,14}$', na=False )][column].count() if invalid_phones > 0: issues.append({ 'column': column, 'issue': 'invalid_phone_format', 'severity': 'low', 'details': f'{invalid_phones} invalid phone numbers' }) return issues def generate_migration_recommendations(self, profile_report): """ Generate migration recommendations based on profiling """ recommendations = [] # Data volume recommendations row_count = profile_report['row_count'] if row_count > 5000000: recommendations.append({ 'category': 'volume', 'recommendation': 'Use Bulk API 2.0 for better performance', 'priority': 'high' }) elif row_count > 1000000: recommendations.append({ 'category': 'volume', 'recommendation': 'Consider batch processing with parallel jobs', 'priority': 'medium' }) # Data quality recommendations for issue in profile_report['quality_issues']: if issue['severity'] == 'high': recommendations.append({ 'category': 'quality', 'recommendation': f"Address {issue['issue']} in {issue['column']} before migration", 'priority': 'high', 'details': issue['details'] }) return recommendations ``` ### Migration Mapping Configuration ```yaml # migration_config.yaml migration: source: type: "postgresql" connection: host: "legacy-db.company.com" port: 5432 database: "legacy_crm" target: type: "salesforce" org: "production" api_version: "59.0" mappings: - source_object: "customers" target_object: "Account" operation: "upsert" external_id: "Legacy_ID__c" field_mappings: - source: "customer_id" target: "Legacy_ID__c" transform: "string" - source: "company_name" target: "Name" transform: "truncate(255)" - source: "annual_revenue" target: "AnnualRevenue" transform: "currency_conversion" - source: "industry_code" target: "Industry" transform: "lookup_mapping" lookup_table: "industry_mapping" - source: "created_date" target: "Legacy_Created_Date__c" transform: "datetime" - source_object: "contacts" target_object: "Contact" operation: "upsert" external_id: "Legacy_Contact_ID__c" parent_relationship: source_field: "customer_id" target_field: "AccountId" lookup_object: "Account" lookup_field: "Legacy_ID__c" field_mappings: - source: "contact_id" target: "Legacy_Contact_ID__c" - source: "first_name" target: "FirstName" - source: "last_name" target: "LastName" - source: "email_address" target: "Email" validation: "email" default_on_error: "noemail@company.com" transformations: currency_conversion: type: "custom" class: "CurrencyTransformer" lookup_mapping: type: "mapping_table" mappings: "TECH": "Technology" "FIN": "Financial Services" "HEALTH": "Healthcare" "MFG": "Manufacturing" validation_rules: - object: "Account" field: "Name" rule: "required" error_action: "skip_record" - object: "Contact" field: "Email" rule: "unique_in_account" error_action: "log_warning" ```

Data Extraction and Transformation

## ETL Pipeline Implementation ### Scalable Data Extraction Framework ```python import asyncio import aiohttp from datetime import datetime import json from typing import List, Dict, Any import hashlib class DataExtractor: def __init__(self, config: Dict[str, Any]): self.config = config self.batch_size = config.get('batch_size', 10000) self.parallel_jobs = config.get('parallel_jobs', 4) async def extract_data(self, table_name: str, filters: Dict = None) -> List[Dict]: """ Extract data with pagination and parallel processing """ # Get total record count total_count = await self.get_record_count(table_name, filters) # Calculate batches batches = [] for offset in range(0, total_count, self.batch_size): batches.append({ 'offset': offset, 'limit': min(self.batch_size, total_count - offset) }) # Process batches in parallel tasks = [] for i in range(0, len(batches), self.parallel_jobs): batch_group = batches[i:i + self.parallel_jobs] group_tasks = [ self.extract_batch(table_name, batch, filters) for batch in batch_group ] tasks.extend(group_tasks) # Execute all tasks results = await asyncio.gather(*tasks) # Flatten results all_records = [] for batch_result in results: all_records.extend(batch_result) return all_records async def extract_batch( self, table_name: str, batch_info: Dict, filters: Dict = None ) -> List[Dict]: """ Extract a single batch of data """ query = self.build_query(table_name, batch_info, filters) async with aiohttp.ClientSession() as session: async with session.post( f"{self.config['source_url']}/query", json={'query': query}, headers={'Authorization': f"Bearer {self.config['api_token']}"} ) as response: data = await response.json() return data['records'] def build_query( self, table_name: str, batch_info: Dict, filters: Dict = None ) -> str: """ Build SQL query with filters and pagination """ query = f"SELECT * FROM {table_name}" if filters: conditions = [] for field, value in filters.items(): if isinstance(value, list): conditions.append(f"{field} IN ({','.join(map(str, value))})") else: conditions.append(f"{field} = {value}") if conditions: query += f" WHERE {' AND '.join(conditions)}" query += f" LIMIT {batch_info['limit']} OFFSET {batch_info['offset']}" return query class DataTransformer: def __init__(self, mapping_config: Dict[str, Any]): self.mapping_config = mapping_config self.transformation_stats = { 'total_records': 0, 'transformed_records': 0, 'errors': [] } def transform_batch(self, records: List[Dict], object_mapping: Dict) -> List[Dict]: """ Transform a batch of records according to mapping configuration """ transformed_records = [] for record in records: try: transformed = self.transform_record(record, object_mapping) transformed_records.append(transformed) self.transformation_stats['transformed_records'] += 1 except Exception as e: self.transformation_stats['errors'].append({ 'record_id': record.get('id', 'unknown'), 'error': str(e), 'timestamp': datetime.now().isoformat() }) self.transformation_stats['total_records'] += 1 return transformed_records def transform_record(self, source_record: Dict, mapping: Dict) -> Dict: """ Transform a single record """ target_record = {} for field_mapping in mapping['field_mappings']: source_field = field_mapping['source'] target_field = field_mapping['target'] transform_type = field_mapping.get('transform', 'direct') # Get source value source_value = self.get_nested_value(source_record, source_field) # Apply transformation if transform_type == 'direct': target_value = source_value elif transform_type == 'string': target_value = str(source_value) if source_value is not None else '' elif transform_type == 'truncate': max_length = int(field_mapping.get('max_length', 255)) target_value = str(source_value)[:max_length] if source_value else '' elif transform_type == 'datetime': target_value = self.transform_datetime(source_value) elif transform_type == 'lookup_mapping': target_value = self.apply_lookup( source_value, field_mapping.get('lookup_table') ) elif transform_type == 'custom': target_value = self.apply_custom_transform( source_value, field_mapping.get('custom_function') ) else: target_value = source_value # Apply validation if specified if 'validation' in field_mapping: target_value = self.validate_field( target_value, field_mapping['validation'], field_mapping.get('default_on_error') ) target_record[target_field] = target_value # Add audit fields target_record['Migration_Batch_ID__c'] = self.get_batch_id() target_record['Migration_Date__c'] = datetime.now().isoformat() target_record['Source_System__c'] = mapping.get('source_system', 'Legacy') return target_record def transform_datetime(self, value: Any) -> str: """ Transform datetime values to Salesforce format """ if value is None: return None if isinstance(value, str): # Parse common datetime formats for fmt in ['%Y-%m-%d %H:%M:%S', '%Y-%m-%d', '%m/%d/%Y']: try: dt = datetime.strptime(value, fmt) return dt.strftime('%Y-%m-%dT%H:%M:%S.000Z') except ValueError: continue return str(value) def get_batch_id(self) -> str: """ Generate unique batch ID for tracking """ timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') return f"BATCH_{timestamp}" class DataValidator: def __init__(self, validation_rules: List[Dict]): self.validation_rules = validation_rules self.validation_results = { 'passed': 0, 'failed': 0, 'warnings': 0, 'errors': [] } def validate_batch(self, records: List[Dict], object_type: str) -> Dict: """ Validate a batch of records """ object_rules = [ rule for rule in self.validation_rules if rule['object'] == object_type ] validated_records = [] for record in records: validation_result = self.validate_record(record, object_rules) if validation_result['status'] == 'passed': validated_records.append(record) self.validation_results['passed'] += 1 elif validation_result['status'] == 'warning': validated_records.append(record) self.validation_results['warnings'] += 1 else: self.validation_results['failed'] += 1 self.validation_results['errors'].append({ 'record': record, 'errors': validation_result['errors'] }) return { 'valid_records': validated_records, 'validation_summary': self.validation_results } def validate_record(self, record: Dict, rules: List[Dict]) -> Dict: """ Validate a single record against rules """ errors = [] warnings = [] for rule in rules: field = rule['field'] rule_type = rule['rule'] if rule_type == 'required': if not record.get(field): errors.append(f"{field} is required") elif rule_type == 'unique': # This would check against existing data pass elif rule_type == 'format': pattern = rule.get('pattern') if pattern and not self.matches_pattern(record.get(field), pattern): if rule.get('error_action') == 'warning': warnings.append(f"{field} format is invalid") else: errors.append(f"{field} format is invalid") if errors: return {'status': 'failed', 'errors': errors, 'warnings': warnings} elif warnings: return {'status': 'warning', 'warnings': warnings} else: return {'status': 'passed'} ```

Data Loading Strategies

## Salesforce Bulk API 2.0 Implementation ### High-Performance Data Loader ```python import requests import csv import io from concurrent.futures import ThreadPoolExecutor, as_completed import time from typing import List, Dict, Generator class SalesforceBulkLoader: def __init__(self, instance_url: str, access_token: str): self.instance_url = instance_url self.access_token = access_token self.api_version = 'v59.0' self.headers = { 'Authorization': f'Bearer {access_token}', 'Content-Type': 'application/json' } def create_bulk_job(self, object_name: str, operation: str = 'insert') -> str: """ Create a bulk job """ job_data = { 'object': object_name, 'operation': operation, 'contentType': 'CSV', 'lineEnding': 'LF' } response = requests.post( f'{self.instance_url}/services/data/{self.api_version}/jobs/ingest', json=job_data, headers=self.headers ) if response.status_code == 200: return response.json()['id'] else: raise Exception(f"Failed to create job: {response.text}") def upload_batch( self, job_id: str, records: List[Dict], batch_size: int = 10000 ) -> List[str]: """ Upload data in batches """ batch_ids = [] # Split records into batches for i in range(0, len(records), batch_size): batch = records[i:i + batch_size] # Convert to CSV csv_data = self.dict_to_csv(batch) # Upload batch response = requests.put( f'{self.instance_url}/services/data/{self.api_version}/jobs/ingest/{job_id}/batches', data=csv_data, headers={ **self.headers, 'Content-Type': 'text/csv' } ) if response.status_code == 201: batch_ids.append(response.headers.get('Batch-Id')) else: raise Exception(f"Failed to upload batch: {response.text}") return batch_ids def close_job(self, job_id: str) -> Dict: """ Close the job to start processing """ response = requests.patch( f'{self.instance_url}/services/data/{self.api_version}/jobs/ingest/{job_id}', json={'state': 'UploadComplete'}, headers=self.headers ) return response.json() def monitor_job(self, job_id: str, poll_interval: int = 5) -> Dict: """ Monitor job progress """ while True: response = requests.get( f'{self.instance_url}/services/data/{self.api_version}/jobs/ingest/{job_id}', headers=self.headers ) job_info = response.json() state = job_info['state'] if state in ['JobComplete', 'Failed', 'Aborted']: return job_info print(f"Job {job_id} - State: {state}, " f"Processed: {job_info.get('numberRecordsProcessed', 0)}, " f"Failed: {job_info.get('numberRecordsFailed', 0)}") time.sleep(poll_interval) def get_job_results(self, job_id: str) -> Generator[Dict, None, None]: """ Get successful and failed records """ # Get successful records response = requests.get( f'{self.instance_url}/services/data/{self.api_version}/jobs/ingest/{job_id}/successfulResults', headers={**self.headers, 'Accept': 'text/csv'} ) if response.status_code == 200: reader = csv.DictReader(io.StringIO(response.text)) for row in reader: yield {'type': 'success', 'record': row} # Get failed records response = requests.get( f'{self.instance_url}/services/data/{self.api_version}/jobs/ingest/{job_id}/failedResults', headers={**self.headers, 'Accept': 'text/csv'} ) if response.status_code == 200: reader = csv.DictReader(io.StringIO(response.text)) for row in reader: yield {'type': 'failed', 'record': row} def dict_to_csv(self, records: List[Dict]) -> str: """ Convert list of dictionaries to CSV """ if not records: return "" output = io.StringIO() writer = csv.DictWriter(output, fieldnames=records[0].keys()) writer.writeheader() writer.writerows(records) return output.getvalue() class ParallelDataLoader: def __init__(self, sf_loader: SalesforceBulkLoader, max_workers: int = 4): self.sf_loader = sf_loader self.max_workers = max_workers self.load_statistics = { 'total_records': 0, 'successful_records': 0, 'failed_records': 0, 'jobs': [] } def load_data( self, data_batches: List[Dict], object_name: str, operation: str = 'insert' ) -> Dict: """ Load data using parallel jobs """ with ThreadPoolExecutor(max_workers=self.max_workers) as executor: # Submit jobs future_to_batch = {} for batch_data in data_batches: future = executor.submit( self.load_single_batch, batch_data['records'], object_name, operation, batch_data.get('batch_id') ) future_to_batch[future] = batch_data # Process completed jobs for future in as_completed(future_to_batch): batch_data = future_to_batch[future] try: result = future.result() self.load_statistics['jobs'].append(result) self.load_statistics['total_records'] += result['total_records'] self.load_statistics['successful_records'] += result['successful_records'] self.load_statistics['failed_records'] += result['failed_records'] except Exception as e: print(f"Batch {batch_data.get('batch_id')} failed: {str(e)}") self.load_statistics['failed_records'] += len(batch_data['records']) return self.load_statistics def load_single_batch( self, records: List[Dict], object_name: str, operation: str, batch_id: str = None ) -> Dict: """ Load a single batch of data """ # Create job job_id = self.sf_loader.create_bulk_job(object_name, operation) # Upload data self.sf_loader.upload_batch(job_id, records) # Close job self.sf_loader.close_job(job_id) # Monitor job job_result = self.sf_loader.monitor_job(job_id) # Process results successful_records = [] failed_records = [] for result in self.sf_loader.get_job_results(job_id): if result['type'] == 'success': successful_records.append(result['record']) else: failed_records.append(result['record']) return { 'job_id': job_id, 'batch_id': batch_id, 'total_records': len(records), 'successful_records': len(successful_records), 'failed_records': len(failed_records), 'job_state': job_result['state'], 'processing_time': job_result.get('totalProcessingTime', 0) } ``` ### Relationship Management During Migration ```apex public class RelationshipMigrationHandler { // Handle parent-child relationships public static void resolveRelationships( List<Migration_Staging__c> stagingRecords ) { // Group records by object type Map<String, List<Migration_Staging__c>> recordsByType = new Map<String, List<Migration_Staging__c>>(); for (Migration_Staging__c record : stagingRecords) { if (!recordsByType.containsKey(record.Object_Type__c)) { recordsByType.put(record.Object_Type__c, new List<Migration_Staging__c>()); } recordsByType.get(record.Object_Type__c).add(record); } // Process parent objects first List<String> objectOrder = getObjectProcessingOrder(); for (String objectType : objectOrder) { if (recordsByType.containsKey(objectType)) { processObjectRelationships( objectType, recordsByType.get(objectType) ); } } } private static void processObjectRelationships( String objectType, List<Migration_Staging__c> records ) { // Get relationship mappings List<Relationship_Mapping__mdt> mappings = [ SELECT Source_Field__c, Target_Field__c, Parent_Object__c, Lookup_Field__c FROM Relationship_Mapping__mdt WHERE Child_Object__c = :objectType ]; if (mappings.isEmpty()) { return; } // Build lookup maps Map<String, Map<String, Id>> lookupMaps = new Map<String, Map<String, Id>>(); for (Relationship_Mapping__mdt mapping : mappings) { if (!lookupMaps.containsKey(mapping.Parent_Object__c)) { lookupMaps.put( mapping.Parent_Object__c, buildLookupMap(mapping.Parent_Object__c, mapping.Lookup_Field__c) ); } } // Update staging records with resolved IDs for (Migration_Staging__c record : records) { Map<String, Object> data = (Map<String, Object>) JSON.deserializeUntyped(record.Data__c); for (Relationship_Mapping__mdt mapping : mappings) { String sourceValue = (String) data.get(mapping.Source_Field__c); if (String.isNotBlank(sourceValue)) { Map<String, Id> lookupMap = lookupMaps.get(mapping.Parent_Object__c); if (lookupMap.containsKey(sourceValue)) { data.put( mapping.Target_Field__c, lookupMap.get(sourceValue) ); } else { record.Has_Errors__c = true; record.Error_Message__c = 'Parent record not found: ' + sourceValue; } } } record.Data__c = JSON.serialize(data); } update records; } private static Map<String, Id> buildLookupMap( String objectType, String lookupField ) { Map<String, Id> lookupMap = new Map<String, Id>(); String query = String.format( 'SELECT Id, {0} FROM {1} WHERE {0} != null', new List<String>{lookupField, objectType} ); for (SObject record : Database.query(query)) { String lookupValue = (String) record.get(lookupField); lookupMap.put(lookupValue, record.Id); } return lookupMap; } // Handle many-to-many relationships public static void createJunctionRecords( String junctionObject, List<Junction_Mapping__c> mappings ) { List<SObject> junctionRecords = new List<SObject>(); for (Junction_Mapping__c mapping : mappings) { // Get parent IDs Id parentId1 = resolveExternalId( mapping.Parent1_Object__c, mapping.Parent1_External_Id__c ); Id parentId2 = resolveExternalId( mapping.Parent2_Object__c, mapping.Parent2_External_Id__c ); if (parentId1 != null && parentId2 != null) { SObject junction = Schema.getGlobalDescribe() .get(junctionObject) .newSObject(); junction.put(mapping.Parent1_Field__c, parentId1); junction.put(mapping.Parent2_Field__c, parentId2); // Add any additional fields if (String.isNotBlank(mapping.Additional_Data__c)) { Map<String, Object> additionalData = (Map<String, Object>) JSON.deserializeUntyped( mapping.Additional_Data__c ); for (String field : additionalData.keySet()) { junction.put(field, additionalData.get(field)); } } junctionRecords.add(junction); } } if (!junctionRecords.isEmpty()) { Database.SaveResult[] results = Database.insert(junctionRecords, false); handleJunctionResults(results, mappings); } } } ```

Data Validation and Reconciliation

## Post-Migration Validation Framework ### Automated Data Validation ```python class DataReconciliation: def __init__(self, source_connection, salesforce_connection): self.source_conn = source_connection self.sf_conn = salesforce_connection self.reconciliation_results = { 'record_counts': {}, 'data_integrity': {}, 'relationship_validation': {}, 'business_rules': {} } def perform_reconciliation(self, migration_config: Dict) -> Dict: """ Comprehensive post-migration reconciliation """ for mapping in migration_config['mappings']: source_object = mapping['source_object'] target_object = mapping['target_object'] print(f"Reconciling {source_object} -> {target_object}") # 1. Record count validation count_result = self.validate_record_counts( source_object, target_object, mapping.get('filters') ) self.reconciliation_results['record_counts'][target_object] = count_result # 2. Data integrity validation integrity_result = self.validate_data_integrity( source_object, target_object, mapping['field_mappings'] ) self.reconciliation_results['data_integrity'][target_object] = integrity_result # 3. Relationship validation if 'parent_relationship' in mapping: relationship_result = self.validate_relationships( target_object, mapping['parent_relationship'] ) self.reconciliation_results['relationship_validation'][target_object] = relationship_result # 4. Business rules validation if 'validation_rules' in mapping: rules_result = self.validate_business_rules( target_object, mapping['validation_rules'] ) self.reconciliation_results['business_rules'][target_object] = rules_result return self.reconciliation_results def validate_record_counts( self, source_object: str, target_object: str, filters: Dict = None ) -> Dict: """ Compare record counts between source and target """ # Get source count source_query = f"SELECT COUNT(*) as count FROM {source_object}" if filters: conditions = [f"{k} = '{v}'" for k, v in filters.items()] source_query += f" WHERE {' AND '.join(conditions)}" source_count = self.source_conn.execute(source_query).fetchone()['count'] # Get target count soql = f"SELECT COUNT() FROM {target_object}" target_count = self.sf_conn.query(soql)['totalSize'] match_percentage = (target_count / source_count * 100) if source_count > 0 else 0 return { 'source_count': source_count, 'target_count': target_count, 'match_percentage': round(match_percentage, 2), 'status': 'PASS' if match_percentage >= 99.9 else 'FAIL', 'missing_records': source_count - target_count } def validate_data_integrity( self, source_object: str, target_object: str, field_mappings: List[Dict] ) -> Dict: """ Sample and compare field values """ sample_size = min(1000, self.reconciliation_results['record_counts'][target_object]['target_count']) # Get sample from source source_fields = [fm['source'] for fm in field_mappings] source_query = f"SELECT {', '.join(source_fields)} FROM {source_object} ORDER BY RANDOM() LIMIT {sample_size}" source_sample = pd.read_sql(source_query, self.source_conn) # Get corresponding records from Salesforce target_fields = [fm['target'] for fm in field_mappings if fm['target'] != 'Id'] soql = f"SELECT {', '.join(target_fields)} FROM {target_object} LIMIT {sample_size}" sf_records = [] for record in self.sf_conn.query_all(soql)['records']: sf_records.append(record) sf_sample = pd.DataFrame(sf_records) # Compare field values field_match_results = {} for field_mapping in field_mappings: source_field = field_mapping['source'] target_field = field_mapping['target'] if source_field in source_sample.columns and target_field in sf_sample.columns: # Apply transformation if needed source_values = source_sample[source_field].apply( lambda x: self.apply_transformation(x, field_mapping.get('transform')) ) target_values = sf_sample[target_field] # Calculate match rate matches = sum(source_values == target_values) match_rate = (matches / len(source_values)) * 100 field_match_results[target_field] = { 'match_rate': round(match_rate, 2), 'sample_size': len(source_values), 'mismatches': len(source_values) - matches } overall_match_rate = sum(f['match_rate'] for f in field_match_results.values()) / len(field_match_results) return { 'overall_match_rate': round(overall_match_rate, 2), 'field_results': field_match_results, 'status': 'PASS' if overall_match_rate >= 95 else 'FAIL' } def validate_relationships( self, target_object: str, relationship_config: Dict ) -> Dict: """ Validate parent-child relationships """ # Query for orphaned records parent_field = relationship_config['target_field'] soql = f""" SELECT COUNT() FROM {target_object} WHERE {parent_field} = null AND Legacy_Parent_Id__c != null """ orphaned_count = self.sf_conn.query(soql)['totalSize'] # Query for total records with parent references soql_total = f""" SELECT COUNT() FROM {target_object} WHERE Legacy_Parent_Id__c != null """ total_with_parent = self.sf_conn.query(soql_total)['totalSize'] success_rate = ((total_with_parent - orphaned_count) / total_with_parent * 100) if total_with_parent > 0 else 100 return { 'total_with_parent_reference': total_with_parent, 'orphaned_records': orphaned_count, 'relationship_success_rate': round(success_rate, 2), 'status': 'PASS' if success_rate >= 99 else 'FAIL' } class MigrationAuditReport: def __init__(self, reconciliation_results: Dict): self.results = reconciliation_results self.report_timestamp = datetime.now() def generate_html_report(self) -> str: """ Generate comprehensive HTML audit report """ html = f""" <!DOCTYPE html> <html> <head> <title>Data Migration Audit Report</title> <style> body {{ font-family: Arial, sans-serif; margin: 20px; }} .header {{ background-color: #f0f0f0; padding: 20px; }} .summary {{ margin: 20px 0; }} .pass {{ color: green; font-weight: bold; }} .fail {{ color: red; font-weight: bold; }} table {{ border-collapse: collapse; width: 100%; margin: 20px 0; }} th, td {{ border: 1px solid #ddd; padding: 8px; text-align: left; }} th {{ background-color: #4CAF50; color: white; }} .section {{ margin: 30px 0; }} </style> </head> <body> <div class="header"> <h1>Data Migration Audit Report</h1> <p>Generated: {self.report_timestamp.strftime('%Y-%m-%d %H:%M:%S')}</p> </div> <div class="summary"> <h2>Executive Summary</h2> {self.generate_summary()} </div> <div class="section"> <h2>Record Count Validation</h2> {self.generate_count_table()} </div> <div class="section"> <h2>Data Integrity Validation</h2> {self.generate_integrity_table()} </div> <div class="section"> <h2>Relationship Validation</h2> {self.generate_relationship_table()} </div> <div class="section"> <h2>Recommendations</h2> {self.generate_recommendations()} </div> </body> </html> """ return html def generate_summary(self) -> str: """ Generate executive summary """ total_objects = len(self.results['record_counts']) passed_objects = sum( 1 for r in self.results['record_counts'].values() if r['status'] == 'PASS' ) overall_status = 'PASS' if passed_objects == total_objects else 'FAIL' return f""" <p>Overall Migration Status: <span class="{overall_status.lower()}">{overall_status}</span></p> <p>Objects Migrated: {total_objects}</p> <p>Objects Passed Validation: {passed_objects}</p> <p>Success Rate: {(passed_objects/total_objects*100):.1f}%</p> """ ```

Performance Optimization and Best Practices

## Migration Performance Optimization ### Parallel Processing Architecture ```python import multiprocessing from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor import asyncio from typing import List, Dict, Callable class OptimizedMigrationEngine: def __init__(self, config: Dict): self.config = config self.cpu_count = multiprocessing.cpu_count() self.optimal_workers = min(self.cpu_count * 2, 16) async def migrate_with_optimization( self, source_data: List[Dict], transformation_func: Callable, load_func: Callable ) -> Dict: """ Optimized migration pipeline with parallel processing """ start_time = time.time() # Phase 1: Parallel data transformation print(f"Starting transformation with {self.optimal_workers} workers...") transformed_data = await self.parallel_transform( source_data, transformation_func ) # Phase 2: Batch and deduplicate print("Batching and deduplicating data...") batched_data = self.optimize_batches(transformed_data) # Phase 3: Parallel loading print(f"Loading {len(batched_data)} batches...") load_results = await self.parallel_load(batched_data, load_func) # Calculate performance metrics end_time = time.time() total_time = end_time - start_time records_per_second = len(source_data) / total_time return { 'total_records': len(source_data), 'total_time': total_time, 'records_per_second': records_per_second, 'batch_count': len(batched_data), 'load_results': load_results } async def parallel_transform( self, data: List[Dict], transform_func: Callable ) -> List[Dict]: """ Transform data using process pool for CPU-intensive operations """ chunk_size = max(len(data) // self.optimal_workers, 1000) chunks = [data[i:i + chunk_size] for i in range(0, len(data), chunk_size)] loop = asyncio.get_event_loop() with ProcessPoolExecutor(max_workers=self.optimal_workers) as executor: futures = [] for chunk in chunks: future = loop.run_in_executor(executor, transform_func, chunk) futures.append(future) results = await asyncio.gather(*futures) # Flatten results transformed_data = [] for chunk_result in results: transformed_data.extend(chunk_result) return transformed_data def optimize_batches(self, data: List[Dict]) -> List[Dict]: """ Optimize batch sizes based on data characteristics """ # Calculate optimal batch size based on average record size sample_size = min(100, len(data)) sample_data = data[:sample_size] avg_record_size = sum(len(str(record)) for record in sample_data) / sample_size # Salesforce Bulk API has 10MB limit per batch max_batch_size_bytes = 10 * 1024 * 1024 # 10MB optimal_batch_size = int(max_batch_size_bytes / avg_record_size * 0.9) # 90% to be safe # Cap at 10,000 records per batch (Salesforce limit) optimal_batch_size = min(optimal_batch_size, 10000) # Create batches batches = [] for i in range(0, len(data), optimal_batch_size): batch = data[i:i + optimal_batch_size] batches.append({ 'batch_id': f'BATCH_{i//optimal_batch_size}', 'records': batch, 'size': len(batch) }) return batches async def parallel_load( self, batches: List[Dict], load_func: Callable ) -> List[Dict]: """ Load batches in parallel with rate limiting """ # Salesforce allows up to 15 concurrent Bulk API jobs max_concurrent_jobs = min(15, len(batches)) results = [] # Use semaphore for rate limiting semaphore = asyncio.Semaphore(max_concurrent_jobs) async def load_with_limit(batch): async with semaphore: loop = asyncio.get_event_loop() return await loop.run_in_executor(None, load_func, batch) # Create tasks for all batches tasks = [load_with_limit(batch) for batch in batches] # Execute with progress monitoring for i, task in enumerate(asyncio.as_completed(tasks)): result = await task results.append(result) print(f"Completed batch {i+1}/{len(batches)}") return results class MigrationPerformanceMonitor: def __init__(self): self.metrics = { 'extraction': {}, 'transformation': {}, 'loading': {}, 'overall': {} } def monitor_phase(self, phase: str, func: Callable, *args, **kwargs): """ Monitor performance of a migration phase """ import psutil import gc # Collect initial metrics process = psutil.Process() initial_memory = process.memory_info().rss / 1024 / 1024 # MB initial_cpu = process.cpu_percent(interval=0.1) # Execute function start_time = time.time() result = func(*args, **kwargs) end_time = time.time() # Collect final metrics final_memory = process.memory_info().rss / 1024 / 1024 # MB final_cpu = process.cpu_percent(interval=0.1) # Store metrics self.metrics[phase] = { 'duration': end_time - start_time, 'memory_used': final_memory - initial_memory, 'cpu_usage': (initial_cpu + final_cpu) / 2, 'timestamp': datetime.now().isoformat() } # Force garbage collection gc.collect() return result def generate_performance_report(self) -> Dict: """ Generate performance optimization recommendations """ recommendations = [] # Check extraction performance if self.metrics['extraction']['duration'] > 300: # 5 minutes recommendations.append({ 'phase': 'extraction', 'issue': 'Slow data extraction', 'recommendation': 'Consider using parallel extraction or indexing source tables' }) # Check transformation performance if self.metrics['transformation']['memory_used'] > 1000: # 1GB recommendations.append({ 'phase': 'transformation', 'issue': 'High memory usage during transformation', 'recommendation': 'Process data in smaller chunks or use streaming transformation' }) # Check loading performance loading_duration = self.metrics['loading']['duration'] if loading_duration > 600: # 10 minutes recommendations.append({ 'phase': 'loading', 'issue': 'Slow data loading', 'recommendation': 'Increase parallel jobs or use Bulk API 2.0' }) return { 'metrics': self.metrics, 'recommendations': recommendations, 'optimization_score': self.calculate_optimization_score() } def calculate_optimization_score(self) -> float: """ Calculate overall optimization score (0-100) """ score = 100.0 # Deduct points for performance issues total_duration = sum(m['duration'] for m in self.metrics.values() if 'duration' in m) if total_duration > 1800: # 30 minutes score -= 20 total_memory = sum(m['memory_used'] for m in self.metrics.values() if 'memory_used' in m) if total_memory > 2000: # 2GB score -= 15 avg_cpu = sum(m['cpu_usage'] for m in self.metrics.values() if 'cpu_usage' in m) / len(self.metrics) if avg_cpu > 80: score -= 10 return max(0, score) ``` ### Migration Best Practices Checklist ```python class MigrationBestPractices: @staticmethod def pre_migration_checklist() -> List[Dict]: """ Essential pre-migration checklist """ return [ { 'category': 'Data Quality', 'items': [ 'Perform data profiling and quality assessment', 'Clean and deduplicate source data', 'Standardize data formats (dates, phone numbers, etc.)', 'Validate required fields have values', 'Resolve circular dependencies' ] }, { 'category': 'Technical Preparation', 'items': [ 'Create full backup of source system', 'Set up staging environment', 'Configure API limits and permissions', 'Install and test migration tools', 'Create rollback procedures' ] }, { 'category': 'Salesforce Configuration', 'items': [ 'Create custom fields for legacy IDs', 'Configure external ID fields', 'Disable validation rules and triggers', 'Increase API limits if needed', 'Configure field-level security' ] }, { 'category': 'Testing', 'items': [ 'Perform test migration with subset of data', 'Validate data transformations', 'Test relationship mappings', 'Verify data access and visibility', 'Load test with production volumes' ] } ] @staticmethod def post_migration_checklist() -> List[Dict]: """ Post-migration validation checklist """ return [ { 'category': 'Data Validation', 'items': [ 'Verify record counts match source', 'Validate key field mappings', 'Check parent-child relationships', 'Verify data visibility and sharing', 'Test business processes end-to-end' ] }, { 'category': 'System Configuration', 'items': [ 'Re-enable validation rules', 'Re-enable triggers and workflows', 'Update integration endpoints', 'Configure backup procedures', 'Update documentation' ] }, { 'category': 'Performance', 'items': [ 'Check system performance metrics', 'Optimize slow queries', 'Review and optimize indexes', 'Monitor API usage', 'Implement data archiving if needed' ] } ] ```

Certified Partner

Salesforce certified consultants

5-Star Rated

Consistently high client satisfaction

200+ Projects

Successfully delivered

Enterprise Ready

Fortune 500 trusted

Ready to Transform Your Business?

Get expert Salesforce consulting tailored to your needs. Schedule a free consultation to discuss your project.

Free consultation • No obligation • Expert advice