The Problem
Naive multi-pool data fetching creates performance bottlenecks:- Sequential requests are slow (5+ pools = 25+ seconds)
- Concurrent requests hit RPC rate limits
- Redundant data fetching wastes bandwidth
- No caching leads to repeated expensive operations
Solution: Batched Concurrent Fetching
Use controlled concurrency with intelligent batching:Copy
// src/services/MultiPoolDataFetcher.ts
import { Connection, PublicKey } from '@solana/web3.js';
import { DLMM } from '@saros-finance/dlmm-sdk';
import { ComposedPoolData } from '../types/pool';
import { DLMMDataComposer } from './DLMMDataComposer';
export class MultiPoolDataFetcher {
private composer: DLMMDataComposer;
private concurrencyLimit: number;
private cache: Map<string, { data: any; timestamp: number }>;
private cacheTTL: number;
constructor(
rpcUrl: string,
concurrencyLimit: number = 5,
cacheTTL: number = 30000 // 30 seconds
) {
this.composer = new DLMMDataComposer(rpcUrl);
this.concurrencyLimit = concurrencyLimit;
this.cache = new Map();
this.cacheTTL = cacheTTL;
}
/**
* Fetch multiple pools with controlled concurrency
*/
async fetchMultiplePools(poolAddresses: string[]): Promise<ComposedPoolData[]> {
console.log(`π Fetching data for ${poolAddresses.length} pools (concurrency: ${this.concurrencyLimit})`);
const results: ComposedPoolData[] = [];
const errors: Array<{ pool: string; error: string }> = [];
// Process pools in batches to control concurrency
for (let i = 0; i < poolAddresses.length; i += this.concurrencyLimit) {
const batch = poolAddresses.slice(i, i + this.concurrencyLimit);
console.log(`Processing batch ${Math.floor(i / this.concurrencyLimit) + 1}/${Math.ceil(poolAddresses.length / this.concurrencyLimit)}`);
const batchPromises = batch.map(async (poolAddress) => {
try {
// Check cache first
const cacheKey = `pool-${poolAddress}`;
const cached = this.getFromCache(cacheKey);
if (cached) {
console.log(`π¦ Cache hit for ${poolAddress}`);
return cached;
}
// Fetch fresh data
const data = await this.composer.composePoolData(poolAddress);
this.setCache(cacheKey, data);
return data;
} catch (error) {
console.warn(`β οΈ Failed to fetch pool ${poolAddress}:`, error.message);
errors.push({ pool: poolAddress, error: error.message });
return null;
}
});
const batchResults = await Promise.all(batchPromises);
results.push(...batchResults.filter(result => result !== null));
// Add small delay between batches to be respectful to RPC
if (i + this.concurrencyLimit < poolAddresses.length) {
await this.delay(100);
}
}
console.log(`β
Successfully fetched ${results.length}/${poolAddresses.length} pools`);
if (errors.length > 0) {
console.log(`β Errors for ${errors.length} pools:`, errors);
}
return results;
}
/**
* Fetch pools with priority (high-priority pools fetched first)
*/
async fetchPoolsWithPriority(
highPriorityPools: string[],
lowPriorityPools: string[]
): Promise<{ highPriority: ComposedPoolData[]; lowPriority: ComposedPoolData[] }> {
console.log(`π Fetching ${highPriorityPools.length} high-priority pools first`);
const highPriorityResults = await this.fetchMultiplePools(highPriorityPools);
console.log(`π Fetching ${lowPriorityPools.length} low-priority pools`);
const lowPriorityResults = await this.fetchMultiplePools(lowPriorityPools);
return {
highPriority: highPriorityResults,
lowPriority: lowPriorityResults
};
}
/**
* Continuously monitor multiple pools with intelligent refresh rates
*/
startPoolMonitoring(
poolAddresses: string[],
callback: (poolData: ComposedPoolData[]) => void,
refreshInterval: number = 30000 // 30 seconds
): () => void {
console.log(`π Starting monitoring for ${poolAddresses.length} pools`);
let isRunning = true;
const monitoringLoop = async () => {
while (isRunning) {
try {
const poolData = await this.fetchMultiplePools(poolAddresses);
callback(poolData);
} catch (error) {
console.error('Monitoring error:', error);
}
await this.delay(refreshInterval);
}
};
// Start monitoring
monitoringLoop();
// Return stop function
return () => {
console.log('π Stopping pool monitoring');
isRunning = false;
};
}
// Cache management
private getFromCache(key: string): ComposedPoolData | null {
const cached = this.cache.get(key);
if (!cached) return null;
if (Date.now() - cached.timestamp > this.cacheTTL) {
this.cache.delete(key);
return null;
}
return cached.data;
}
private setCache(key: string, data: ComposedPoolData): void {
this.cache.set(key, {
data,
timestamp: Date.now()
});
}
private delay(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}
}
Advanced Optimization Patterns
Selective Data Fetching
Only fetch the data components you need:Copy
export interface PoolDataOptions {
includeLiquidityDistribution?: boolean;
includePricingData?: boolean;
includeHistoricalData?: boolean;
binRange?: number;
}
export class OptimizedMultiPoolFetcher extends MultiPoolDataFetcher {
/**
* Fetch minimal pool data for overview displays
*/
async fetchPoolOverviews(poolAddresses: string[]): Promise<PoolOverview[]> {
const options: PoolDataOptions = {
includeLiquidityDistribution: false,
includePricingData: true,
includeHistoricalData: false
};
return this.fetchSelectiveData(poolAddresses, options);
}
/**
* Fetch detailed pool data for analysis
*/
async fetchPoolAnalytics(poolAddresses: string[]): Promise<ComposedPoolData[]> {
const options: PoolDataOptions = {
includeLiquidityDistribution: true,
includePricingData: true,
includeHistoricalData: true,
binRange: 30
};
return this.fetchSelectiveData(poolAddresses, options);
}
private async fetchSelectiveData(
poolAddresses: string[],
options: PoolDataOptions
): Promise<any[]> {
// Implementation would modify the composer to only fetch requested data
console.log(`π― Fetching selective data for ${poolAddresses.length} pools`);
console.log(`Options:`, options);
// Batch processing with selective fetching
return this.fetchMultiplePools(poolAddresses);
}
}
Connection Pool Management
Manage multiple RPC connections for better performance:Copy
export class ConnectionPoolManager {
private connections: Connection[];
private currentIndex: number = 0;
constructor(rpcUrls: string[]) {
this.connections = rpcUrls.map(url => new Connection(url, 'confirmed'));
}
getConnection(): Connection {
const connection = this.connections[this.currentIndex];
this.currentIndex = (this.currentIndex + 1) % this.connections.length;
return connection;
}
async healthCheck(): Promise<boolean[]> {
const healthChecks = this.connections.map(async (conn, index) => {
try {
await conn.getLatestBlockhash();
return true;
} catch (error) {
console.warn(`RPC ${index} health check failed:`, error.message);
return false;
}
});
return Promise.all(healthChecks);
}
}
export class ResilientMultiPoolFetcher extends MultiPoolDataFetcher {
private connectionPool: ConnectionPoolManager;
constructor(rpcUrls: string[]) {
super(rpcUrls[0]); // Use first URL for base constructor
this.connectionPool = new ConnectionPoolManager(rpcUrls);
}
// Override to use connection pool
protected getConnection(): Connection {
return this.connectionPool.getConnection();
}
}
Usage Examples
Portfolio Analysis
Copy
async function analyzePortfolio(userPools: string[]) {
const fetcher = new MultiPoolDataFetcher('https://api.mainnet-beta.solana.com');
console.log('π Analyzing portfolio...');
// Fetch all pool data efficiently
const poolData = await fetcher.fetchMultiplePools(userPools);
// Calculate portfolio metrics
const portfolio = {
totalPools: poolData.length,
totalTVL: poolData.reduce((sum, pool) => sum + pool.liquidity.totalValueLocked, 0),
averageConcentration: poolData.reduce((sum, pool) => sum + pool.liquidity.concentrationRatio, 0) / poolData.length,
bestPerformingPool: poolData.reduce((best, pool) =>
pool.pricing.priceImpact < best.pricing.priceImpact ? pool : best
),
riskDistribution: calculateRiskDistribution(poolData)
};
console.log('Portfolio Summary:', portfolio);
return portfolio;
}
Market Scanning
Copy
async function scanTopPools() {
const topPoolAddresses = [
'2wT4jHyF6o5fFzJJM9cHH8WKrZt1kqEKnEwqYZgQy8wq',
'3nExkXBEPtjXrTD8ctwxjZyB1G8vLRoHPpg9SLrE2EMY',
// ... more pool addresses
];
const fetcher = new OptimizedMultiPoolFetcher('https://api.mainnet-beta.solana.com');
// Quick overview for initial screening
const overviews = await fetcher.fetchPoolOverviews(topPoolAddresses);
// Identify interesting pools
const interestingPools = overviews
.filter(pool => pool.tvl > 1000000 && pool.concentration > 0.7)
.map(pool => pool.address);
console.log(`Found ${interestingPools.length} interesting pools from ${topPoolAddresses.length} scanned`);
// Fetch detailed data only for interesting pools
const detailedData = await fetcher.fetchPoolAnalytics(interestingPools);
return detailedData;
}
Real-time Monitoring
Copy
async function monitorPortfolio(poolAddresses: string[]) {
const fetcher = new MultiPoolDataFetcher('https://api.mainnet-beta.solana.com');
const stopMonitoring = fetcher.startPoolMonitoring(
poolAddresses,
(poolData) => {
// Process updated pool data
const alerts = checkForAlerts(poolData);
if (alerts.length > 0) {
console.log('π¨ Portfolio Alerts:', alerts);
}
// Update dashboard or notify users
updateDashboard(poolData);
},
30000 // Update every 30 seconds
);
// Stop monitoring after 1 hour
setTimeout(stopMonitoring, 3600000);
}
function checkForAlerts(poolData: ComposedPoolData[]): string[] {
const alerts = [];
poolData.forEach(pool => {
if (pool.pricing.priceImpact > 5) {
alerts.push(`High price impact in ${pool.pool.tokenX.symbol}/${pool.pool.tokenY.symbol}: ${pool.pricing.priceImpact.toFixed(2)}%`);
}
if (pool.liquidity.concentrationRatio < 0.3) {
alerts.push(`Low liquidity concentration in ${pool.pool.tokenX.symbol}/${pool.pool.tokenY.symbol}: ${(pool.liquidity.concentrationRatio * 100).toFixed(1)}%`);
}
});
return alerts;
}
Performance Benchmarks
Expected performance improvements:- Sequential fetching: 10 pools = ~50 seconds
- Basic concurrent: 10 pools = ~15 seconds
- Optimized batching: 10 pools = ~8 seconds
- With caching: 10 pools = ~3 seconds (subsequent requests)
Best Practices
- Respect Rate Limits: Use concurrency limits (3-5 concurrent requests)
- Implement Caching: Cache pool data for 30-60 seconds
- Handle Failures Gracefully: Some pools may fail - continue with successful ones
- Use Appropriate Data Granularity: Donβt fetch full bin distributions for overview displays
- Monitor RPC Health: Implement RPC endpoint rotation and health checking
- Add Request Delays: Small delays between batches prevent overwhelming servers