Исходный код, разобранный в статье, опубликован в этом GitHub репозитории

Императивный подход не работает в мире ИИ. Если создавать кодовую базу на God Object, из трёх сотрудников писать код сможет только один, остальные будут заблокированы конфликтами слияния. Если разделить репозитории, не будет накапливаться база знаний: единственная информация о том, как работает торговая стратегия, находится в самом коде. Теряется синергетический эффект: Coding Agent не сможет читать код из прошлых итераций разработки. Каждая следующая стратегия - это рандом, а не развитие мысли предыдущей
Проблема
OpenSource для программирования автоматизированных торговых стратегий весь на python. На python неудобно писать декларативный код - все примеры это раздутые на тысячи строк императивные классы антипаттерна God Object
class AwesomeStrategy(IStrategy):
position_adjustment_enable = True # глобальный switch, без него DCA не работает
# 1. Параметры лесенки - атрибуты класса
max_entry_position_adjustment = 9 # макс. 9 доливок + 1 первичный вход = 10 шагов
max_dca_multiplier = 5.5 # резервируем 5.5× от стейка под будущие доливки
stoploss = -0.30 # высокий хард-стоп нужен, иначе DCA вылетает
# 2. Первичный вход - урезаем стейк, чтобы хватило на лесенку
def custom_stake_amount(self, pair: str, current_time: datetime, current_rate: float,
proposed_stake: float, min_stake: float, max_stake: float,
entry_tag: Optional[str], side: str, **kwargs) -> float:
# Зарезервировать большую часть на возможные DCA-доливки
return proposed_stake / self.max_dca_multiplier
# 3. Сама лесенка - отдельный callback, который Freqtrade зовёт на каждом тике
def adjust_trade_position(self, trade: Trade, current_time: datetime,
current_rate: float, current_profit: float,
min_stake: float, max_stake: float,
current_entry_rate: float, current_exit_rate: float,
current_entry_profit: float, current_exit_profit: float,
**kwargs) -> Optional[float]:
# Текущая просадка недостаточно глубокая - не доливаем
if current_profit > -0.05 and trade.nr_of_successful_entries == 1:
return None
if current_profit > -0.10 and trade.nr_of_successful_entries == 2:
return None
# ... повторить эту лесенку условий для каждого следующего шага
# Достаём список уже заполненных входов из state трейда
filled_entries = trade.select_filled_orders(trade.entry_side)
count_of_entries = trade.nr_of_successful_entries
try:
# Стейк первой покупки × (count_of_entries - для геометрии лесенки)
stake_amount = filled_entries[0].stake_amount
stake_amount = stake_amount * (1 + (count_of_entries * 0.25))
return stake_amount
except Exception:
return None
# 4. populate_indicators / populate_entry_trend / populate_exit_trend всё равно нужны,
# потому что без entry-сигнала первичный вход не случится.
def populate_indicators(self, dataframe, metadata): ...
def populate_entry_trend(self, dataframe, metadata): ...
def populate_exit_trend(self, dataframe, metadata): ...Этот boilerplate обслуживает только докуп лесенкой: упало - докупили. Чтобы понять, когда начинать покупать в принципе, чтобы не обанкротиться, в этот же класс придётся добавлять методы интеграции в базу данных. Прямое нарушение SOLID
class TelegramSignalStrategy(IStrategy):
custom_signals = {} # nested dict: pair -> list of signals
def bot_start(self) -> None:
# стартует cron-задача в отдельном потоке, которая ходит в Telegram
# и пишет результаты в self.custom_signals
threading.Thread(target=self._poll_telegram_loop, daemon=True).start()
def _poll_telegram_loop(self):
while True:
try:
messages = telegram_client.iter_messages("crypto_yoda_channel")
for msg in messages:
parsed = self._parse_signal(msg.text) # regex inline
if parsed:
self.custom_signals.setdefault(parsed['pair'], []).append(parsed)
except Exception:
pass
time.sleep(60)
def _parse_signal(self, text: str) -> Optional[dict]:
m = re.search(r"#([A-Z0-9]+)/USDT.*?(ЛОНГ|ШОРТ).*?зоне\s+\$?([\d.,]+)\s*[-–-]\s*\$?([\d.,]+)", text)
if not m:
return None
return {
"pair": m.group(1) + "/USDT",
"direction": "long" if m.group(2) == "ЛОНГ" else "short",
"entry_from": float(m.group(3).replace(",", ".")),
"entry_to": float(m.group(4).replace(",", ".")),
}
def populate_entry_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
signals = self.custom_signals.get(metadata['pair'], [])
# ... тут как-то применить signals к dataframeОтдельное внимание требует точка входа: в freqtrade это json файл. Туда понапиханы и API ключи, и магические константы вида dry_run. Непонятно, коммитить эти изменения или нет: если коммитить, всегда будет конфликт слияния при работе в команде
{
"dry_run": true,
"timeframe": "3m",
"stake_currency": "USDT",
"stake_amount": 200,
"max_open_trades": 5,
"exchange": {
"name": "binance",
"key": "af8ddd35195e9dc500b9a6f799f6f5c93d89193b",
"secret": "08a9dc6db3d7b53e1acebd9275677f4b0a04f1a5",
"pair_whitelist": ["BTC/USDT", "ETH/USDT"]
},
"timerange": "20260401-20260427",
"datadir": "user_data/data/binance"
}При такой архитектуре избежать look ahead bias нельзя: чтобы сделать интеграцию во фронтенд, нужно писать данные в промежуточную базу данных, а удалять через crontab. Этот императивный код математически недоказуем, так как даже близко не похож на чистую функцию
Решение проблемы
До найма сотрудников разработать конвейер, который будет преобразовывать кумулятивный эффект затраченного времени в следующий виток диалектической спирали. Для этого нужно сделать следующую структуру папок с кодом
monorepo/
├── content/
│ ├── apr_2026.strategy/
| ├──modules
│ │ ├──backtest.module.ts # testnet paper
│ ├── apr_2026.strategy.ts # strategy production code
│ ├── apr_2026.test.ts # developer playground
├── modules/
│ ├── backtest.module.ts # mainnet paper
│ ├── live.module.ts # mainnet live
├── packages/
│ ├── core/ # database layer
│ ├── main/ # cli arguments parserN точек входа вместо одной: ветвление на CLI-флаге
Корневой импорт в packages/main/src/index.ts подтягивает четыре файла
import "./main/session";
import "./main/backtest";
import "./main/live";
import "./main/paper";Каждый из которых - самостоятельная точка входа
const main = async () => {
const { values } = getArgs();
if (!values.entry) return;
if (!values.backtest) return;
await waitForReady(true);
const [strategySchema] = await listStrategySchema();
const [exchangeSchema] = await listExchangeSchema();
const [frameSchema] = await listFrameSchema();
if (values.cache) {
await CACHE_CANDLES_FN();
}
for (const symbol of CC_SYMBOL_LIST) {
Backtest.background(symbol, {
exchangeName: exchangeSchema.exchangeName,
strategyName: strategySchema.strategyName,
frameName: frameSchema.frameName,
});
}
};
main();Разделяемый событийный код стратегии
Точку входа, усреднение, stop loss и ранний выход нужно писать функционально чисто, в стиле декларации, так чтобы можно было разнести по разным файлам и использовать повторно
import {
addStrategySchema, listenActivePing, listenError,
Log, Position,
commitClosePending, commitAverageBuy,
getPositionPnlPercent, getPositionEntryOverlap, getPositionEntries,
} from "backtest-kit";
const HARD_STOP = 10.0;
const TARGET_PROFIT = 3;
const LADDER_STEP_COST = 100;
const LADDER_UPPER_STEP = 5;
const LADDER_LOWER_STEP = 1;
const LADDER_MAX_STEPS = 10;
addStrategySchema({
strategyName: "apr_2026_strategy",
getSignal: async (symbol, when, currentPrice) => ({
...Position.moonbag({
position: "long",
currentPrice,
percentStopLoss: HARD_STOP
}),
minuteEstimatedTime: Infinity,
cost: LADDER_STEP_COST,
}),
});
listenActivePing(async ({ symbol, currentPrice }) => {
const { length: steps } = await getPositionEntries(symbol);
if (steps >= LADDER_MAX_STEPS) return;
const hasOverlap = await getPositionEntryOverlap(symbol, currentPrice, {
upperPercent: LADDER_UPPER_STEP,
lowerPercent: LADDER_LOWER_STEP,
});
if (hasOverlap) return;
await commitAverageBuy(symbol, LADDER_STEP_COST);
});
listenActivePing(async ({ symbol }) => {
const currentProfit = await getPositionPnlPercent(symbol);
if (currentProfit < TARGET_PROFIT) {
return;
}
await commitClosePending(symbol, { id: "unknown", note: "# closed by target pnl" });
});Таким образом к любой стратегии можно инкрементально накинуть trailing take не меняя код. Любое изменение кода - это side effect, так как увеличивает таблицу возможных входов и выходов функции, даже если природа мутации вызвана говнокодящей нейронкой, а не обстоятельствами в runtime.
if (GLOBAL_CONFIG.ATTACH_TRAILING_TAKE) {
listenActivePing(async ({ symbol, data }) => {
const peakProfitDistance = await getPositionHighestProfitDistancePnlPercentage(symbol);
const currentProfit = await getPositionPnlPercent(symbol);
if (currentProfit < 0) {
return;
}
if (peakProfitDistance < TRAILING_TAKE) {
return;
}
Log.info("position closed due to the trailing take", {
symbol,
data,
});
await commitClosePending(symbol, {
id: "unknown",
note: str.newline(
"# closed by trailing take",
),
});
});
}Во Freqtrade удаление custom_stoploss означает «надо вспомнить, что там было важного». Мы не теряем информацию, а только накапливаем
Модульная plug-and-play конфигурация под режим
Рядом со стратегией лежит modules/backtest.module.ts. Это файл-сосед, который CLI грузит вместе со стратегией и который описывает «всё, что не сама стратегия» - биржу, исторический фрейм, глобальные параметры
addExchangeSchema({
exchangeName: "ccxt-exchange",
getCandles: async (symbol, interval, since, limit) => {
const exchange = await getExchange();
const candles = await exchange.fetchOHLCV(symbol, interval, since.getTime(), limit);
return candles.map(([timestamp, open, high, low, close, volume]) => ({
timestamp, open, high, low, close, volume,
}));
},
getOrderBook: async (symbol, depth, _from, _to, backtest) => {
if (backtest) throw new Error("Order book not supported in backtest");
const exchange = await getExchange();
const bookData = await exchange.fetchOrderBook(symbol, depth);
return {
symbol,
asks: bookData.asks.map(([price, quantity]) => ({ price: String(price), quantity: String(quantity) })),
bids: bookData.bids.map(([price, quantity]) => ({ price: String(price), quantity: String(quantity) })),
};
},
// ...formatPrice, formatQuantity, getAggregatedTrades
});
addFrameSchema({
frameName: "apr_2026_frame",
interval: "1m",
startDate: new Date("2026-04-01T00:00:00Z"),
endDate: new Date("2026-04-27T00:00:00Z"),
});Новая стратегия - это папка в директории content. Таким образом во вкладках терминала можно крутить сразу несколько стратегий не меняя корневой json
content/apr_2026.strategy/
├── apr_2026.strategy.ts # strategy entry file
├── apr_2026.test.ts # developer playground
└── modules/
└── backtest.module.ts # exchange + timeframeПримечательно, что таймфрейм, на котором запускалась стратегия, не теряется и коммитится как код. Можно понять, почему перестало работать, относительно известного времени, когда точно работало
MVC для слоя данных
Триггером любого значительного движения на рынке является новостной фон. На момент 2026 индикаторы не работают, так как рост цены - не новость. Если держать код парсера новостей в одном классе со стратегией, систему нельзя отлаживать. Нужно держать код парсера отдельно, чтобы его можно было программировать через TDD
export class ScraperService {
private readonly loggerService = inject<LoggerService>(TYPES.loggerService);
public scrapeDay = async (channel: string, date: Date): Promise<ScraperMessage[]> => {
const client = await getTelegram();
const dayStart = new Date(date); dayStart.setUTCHours(0, 0, 0, 0);
const dayEnd = new Date(date); dayEnd.setUTCHours(23, 59, 59, 999);
const rows: ScraperMessage[] = [];
for await (const message of client.iterMessages(channel, {
offsetDate: Math.floor(dayEnd.getTime() / 1000) + 1,
reverse: false,
})) {
if (!message.message) continue;
const ts = message.date * 1000;
if (ts < dayStart.getTime()) break;
rows.push({ id: message.id, content: message.message, channel, date: new Date(ts) });
}
return rows;
}
}То же самое касается интеграции в базу данных. Недостаточно просто написать SQL запрос, перед его исполнением в backtest нужно проверить, не было ли заезда по времени, чтобы избежать look ahead bias. Для реализации необходим класс с методами
class CandleDbService extends BaseCRUD(CandleModel) {
readonly loggerService = inject<LoggerService>(TYPES.loggerService);
public create = async (dto: ICandleDto): Promise<ICandleRow> => {
this.loggerService.log("candleDbService create", { dto });
const filter = {
symbol: dto.symbol,
interval: dto.interval,
timestamp: dto.timestamp,
};
const insertOnly = {
exchangeName: EXCHANGE_NAME,
open: dto.open,
high: dto.high,
low: dto.low,
close: dto.close,
volume: dto.volume,
};
const document = await CandleModel.findOneAndUpdate(
filter,
{ $setOnInsert: insertOnly },
{ upsert: true, new: true, setDefaultsOnInsert: true },
);
const result = readTransform(document.toJSON()) as unknown as ICandleRow;
return result;
};
public hasCandle = async (symbol: string, interval: CandleInterval, timestamp: number): Promise<boolean> => {
this.loggerService.log("candleDbService hasCandle", {
symbol,
interval,
timestamp,
});
const candle = await this.findBySymbolIntervalTimestamp(symbol, interval, timestamp);
return !!candle;
};
public findBySymbolIntervalTimestamp =
async (symbol: string, interval: CandleInterval, timestamp: number): Promise<ICandleRow | null> => {
this.loggerService.log("candleDbService findBySymbolIntervalTimestamp", { symbol, interval, timestamp });
return await await super.findByFilter({ symbol, interval, exchangeName: EXCHANGE_NAME, timestamp });
};
}Производительность
После внедрения ПО выяснилось, что модульный подход не только позволяет создавать рабочие места и ускорять разработку, но и обеспечивает значительно более высокую производительность.
class CandleCacheService extends BaseMap(REDIS_KEY, -1) {
readonly loggerService = inject<LoggerService>(TYPES.loggerService);
private _cacheKey(symbol: string, interval: CandleInterval, exchangeName: string, timestamp: number): string {
return `${exchangeName}:${symbol}:${interval}:${timestamp}`;
}
public async hasCandleId(symbol: string, interval: CandleInterval, exchangeName: string, timestamp: number) {
this.loggerService.log("candleCacheService getCandleId", {
symbol,
interval,
exchangeName,
timestamp,
});
const key = this._cacheKey(symbol, interval, exchangeName, timestamp);
return await this.has(key);
}
public async getCandleId(symbol: string, interval: CandleInterval, exchangeName: string, timestamp: number): Promise<string | null> {
this.loggerService.log("candleCacheService getCandleId", {
symbol,
interval,
exchangeName,
timestamp,
});
const key = this._cacheKey(symbol, interval, exchangeName, timestamp);
const id = <string>await super.get(key);
return id ?? null;
}
public async setCandleId(row: ICandleRow): Promise<string> {
this.loggerService.log(`candleCacheService setCandleId`, {
symbol: row.symbol,
interval: row.interval,
timestamp: row.timestamp
});
const key = this._cacheKey(row.symbol, row.interval, row.exchangeName, row.timestamp);
await super.set(key, row.id);
return row.id;
}
}Все наиболее часто используемые базы данных Postgres, MongoDB, MySQL используют B-Tree для поиска строк, это сложность O(log n), где n - это число строк в таблице. Redis кеш позволяет уменьшить сложность до O(1) через поиск id строки по составному ключу, однако при императивном синтаксисе freqtrade его просто негде применить. Если хранить в redis не только словарь ключ-id, а сами данные, производительность будет потеряна на сериализации/десериализации json объектов. Ниже таблица с полученными метриками производительности
Metric | Value |
|---|---|
Wall-clock span (first → last event) |
|
Total events captured | 297 |
Symbols running in parallel | 9 (BTC, POL, ZEC, HYPE, XAUT, DOGE, SOL, PENGU, HBAR) |
Historical time advanced per symbol |
|
Per-symbol replay speed | 34 min historical ÷ 2.9 s wall = ≈ 703× real-time |
Aggregate replay speed (9 symbols) | 9 × 703 = ≈ 6 326× real-time |
Event throughput | 297 ev / 2.893 s = ≈ 103 events/sec (one Node process) |
Frame coverage |
|
Скорость: ~700x ускорение исторического времени относительно реального, при 9 параллельных контекстах - эффективные ~6300x относительно проверки с ожиданием свечей в реальном времени

























