Skip to content

rotor: performance optimizations #1192

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Apr 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cli/build-scripts/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@
"@babel/preset-env": "^7.23.2",
"@babel/preset-typescript": "^7.23.2",
"babel-loader": "^9.1.3",
"webpack": "^5.95.0",
"webpack": "^5.99.5",
"webpack-cli": "^6.0.1",
"juava": "workspace:*"
}
}
4 changes: 2 additions & 2 deletions cli/jitsu-cli/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
"ts-jest": "^29.1.1",
"ts-loader": "^9.5.1",
"ts-node": "^10.9.2",
"webpack": "^5.95.0",
"webpack-cli": "^5.1.4"
"webpack": "^5.99.5",
"webpack-cli": "^6.0.1"
}
}
1 change: 1 addition & 0 deletions libs/core-functions/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
"@amplitude/ua-parser-js": "^0.7.33",
"@hubspot/api-client": "^11.1.0",
"@mongodb-js/zstd": "^1.2.2",
"undici": "^6.21.2",
"agentkeepalive": "4.3.0",
"dayjs": "^1.11.10",
"google-ads-api": "^17.1.0-rest-beta",
Expand Down
36 changes: 24 additions & 12 deletions libs/core-functions/src/functions/bulker-destination.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,25 @@ import {
} from "@jitsu/functions-lib";
import { AnalyticsServerEvent, DataLayoutType } from "@jitsu/protocols/analytics";

import { request, Agent } from "undici";
import omit from "lodash/omit";
import { MetricsMeta } from "./lib";
import { UserRecognitionParameter } from "./user-recognition";
import { parseNumber } from "juava";

const JitsuInternalProperties = [TableNameParameter, UserRecognitionParameter];

const concurrency = parseNumber(process.env.CONCURRENCY, 10);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Environment variables used without documentation

const fetchTimeoutMs = parseNumber(process.env.FETCH_TIMEOUT_MS, 2000);

export const undiciAgent = new Agent({
connections: concurrency, // Limit concurrent kept-alive connections to not run out of resources
maxRequestsPerClient: 5000,
headersTimeout: fetchTimeoutMs,
connectTimeout: fetchTimeoutMs,
bodyTimeout: fetchTimeoutMs,
});

export type MappedEvent = {
event: any;
table: string;
Expand Down Expand Up @@ -228,19 +241,18 @@ const BulkerDestination: JitsuFunction<AnalyticsServerEvent, BulkerDestinationCo
if (streamOptions && Object.keys(streamOptions).length > 0) {
headers["streamOptions"] = JSON.stringify(streamOptions);
}
const res = await ctx.fetch(
`${bulkerEndpoint}/post/${destinationId}?tableName=${table}`,
{
method: "POST",
headers,
body: payload,
},
{ log: false }
);
if (!res.ok) {
throw new HTTPError(`HTTP Error: ${res.status} ${res.statusText}`, res.status, await res.text());
const res = await request(`${bulkerEndpoint}/post/${destinationId}?tableName=${table}`, {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original fetch call included a { log: false } option that's been removed in the new implementation, which might affect logging behavior

method: "POST",
headers,
body: payload,
bodyTimeout: fetchTimeoutMs,
headersTimeout: fetchTimeoutMs,
dispatcher: undiciAgent,
});
if (res.statusCode != 200) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The condition only checks for exactly status code 200, while the previous code with res.ok accepted any 2xx status codes (200-299). API might return valid success responses with other 2xx codes (201, 204, etc.)

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed error handling logic may reject valid success responses

throw new HTTPError(`HTTP Error: ${res.statusCode}`, res.statusCode, await res.body.text());
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Error message is missing statusText, which was present in the original code. This reduces debugging information when errors occur

} else {
ctx.log.debug(`HTTP Status: ${res.status} ${res.statusText} Response: ${await res.text()}`);
ctx.log.debug(`HTTP Status: ${res.statusCode} Response: ${await res.body.text()}`);
}
}
return event;
Expand Down
45 changes: 7 additions & 38 deletions libs/core-functions/src/functions/lib/profiles-udf-wrapper-code.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,6 @@ class RetryError extends Error {
export { DropRetryErrorName, RetryError, RetryErrorName };`;

export const chainWrapperCode = `//** @UDF_FUNCTIONS_IMPORT **//
import {DropRetryErrorName, RetryError, RetryErrorName} from "@jitsu/functions-lib";

global.RetryError = RetryError;

export function checkError(chainRes) {
for (const el of chainRes.execLog) {
const error = el.error;
if (error) {
throw error;
// _jitsu_log.error.apply(undefined, [{
// function: {
// ..._jitsu_funcCtx.function,
// id: error.functionId || el.functionId
// }
// }, \`Function execution failed\`, error.name, error.message], {arguments: {copy: true}});
}
}
}

function isDropResult(result) {
return result === "drop" || (Array.isArray(result) && result.length === 0) || result === null || result === false;
Expand All @@ -41,27 +23,16 @@ async function runChain(
user,
ctx
) {
const execLog = [];
const f = chain[0];
let result = undefined;
try {
result = await f.f(events, user, ctx);
} catch (err) {
if (err.name === DropRetryErrorName) {
result = "drop";
}
if (f.meta?.retryPolicy) {
err.retryPolicy = f.meta.retryPolicy;
const result = await f.f(events, user, ctx);
if (isDropResult(result)) {
return undefined
}
execLog.push({
functionId: f.id,
error: err,
});
}
if (isDropResult(result)) {
result = undefined
return result
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Result handling has changed from {result, execLog} object to direct return of result

} catch (err) {
throw err;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed retry policy annotation on errors

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Retry policy metadata is no longer attached to errors, potentially breaking retry functionality.

}
return {result, execLog};
}

const wrappedFunctionChain = async function (eventsProvider, userProvider, ctx) {
Expand Down Expand Up @@ -129,9 +100,7 @@ const wrappedFunctionChain = async function (eventsProvider, userProvider, ctx)
},
};

const chainRes = await runChain(chain, iterator, user, ctx);
checkError(chainRes);
return chainRes.result;
return runChain(chain, iterator, user, ctx);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The execution log and error tracking have been completely removed

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Error handling flow has fundamentally changed, removing the intermediary execution log.

};

const wrappedUserFunction = (id, f, funcCtx) => {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,4 @@
//** @UDF_FUNCTIONS_IMPORT **//
import {DropRetryErrorName, RetryError, RetryErrorName} from "@jitsu/functions-lib";

global.RetryError = RetryError;

export function checkError(chainRes) {
for (const el of chainRes.execLog) {
const error = el.error;
if (error) {
throw error;
// _jitsu_log.error.apply(undefined, [{
// function: {
// ..._jitsu_funcCtx.function,
// id: error.functionId || el.functionId
// }
// }, `Function execution failed`, error.name, error.message], {arguments: {copy: true}});
}
}
}

function isDropResult(result) {
return result === "drop" || (Array.isArray(result) && result.length === 0) || result === null || result === false;
Expand All @@ -28,27 +10,16 @@ async function runChain(
user,
ctx
) {
const execLog = [];
const f = chain[0];
let result = undefined;
try {
result = await f.f(events, user, ctx);
} catch (err) {
if (err.name === DropRetryErrorName) {
result = "drop";
}
if (f.meta?.retryPolicy) {
err.retryPolicy = f.meta.retryPolicy;
const result = await f.f(events, user, ctx);
if (isDropResult(result)) {
return undefined
}
execLog.push({
functionId: f.id,
error: err,
});
}
if (isDropResult(result)) {
result = undefined
return result
} catch (err) {
throw err;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Retry policy information is no longer attached to errors, potentially breaking retry mechanisms

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Loss of detailed error information from execLog

}
return {result, execLog};
}

const wrappedFunctionChain = async function (eventsProvider, userProvider, ctx) {
Expand Down Expand Up @@ -116,9 +87,7 @@ const wrappedFunctionChain = async function (eventsProvider, userProvider, ctx)
},
};

const chainRes = await runChain(chain, iterator, user, ctx);
checkError(chainRes);
return chainRes.result;
return runChain(chain, iterator, user, ctx);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Return value structure has changed from {result, execLog} to just the result

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed return structure could break consumers expecting {result, execLog} format

};

const wrappedUserFunction = (id, f, funcCtx) => {
Expand Down
6 changes: 2 additions & 4 deletions libs/core-functions/src/functions/lib/profiles-udf-wrapper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ function wrap(connectionId: string, isolate: Isolate, context: Context, wrapper:
undefined,
[eventsProviderRef, userProviderRef, ctxCopy.copyInto({ release: true, transferIn: true })],
{
result: { promise: true },
result: { promise: true, copy: true },
}
);
switch (typeof res) {
Expand All @@ -292,9 +292,7 @@ function wrap(connectionId: string, isolate: Isolate, context: Context, wrapper:
case "boolean":
return undefined;
default:
const r = (res as Reference).copy();
(res as Reference).release();
return r;
return res as any;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed reference release may cause memory leaks

}
} catch (e: any) {
if (isolate.isDisposed) {
Expand Down
47 changes: 32 additions & 15 deletions libs/core-functions/src/functions/lib/udf-wrapper-code.ts
Original file line number Diff line number Diff line change
Expand Up @@ -392,12 +392,41 @@ function isDropResult(result) {
return result === "drop" || (Array.isArray(result) && result.length === 0) || result === null || result === false;
}

async function runSingle(
f,
event,
ctx
) {
let execLog = [];
let events = [];
let result = undefined;
try {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The runSingle function doesn't respect the fastFunctions option that runChain uses, creating inconsistent behavior

result = await f.f(event, ctx);
} catch (err) {
if (err.name === DropRetryErrorName) {
result = "drop";
}
if (f.meta?.retryPolicy) {
err.retryPolicy = f.meta.retryPolicy;
}
execLog = [{
functionId: f.id,
error: err,
}];
}
if (!isDropResult(result)) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inconsistent event handling between runSingle and runChain. If result is falsy but not a drop result, runChain preserves the original event while runSingle sets events = result (undefined), breaking the expected behavior pattern.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The runSingle function doesn't handle array results consistently with runChain. In runChain, array results are spread into the newEvents array, whereas runSingle assigns them directly to events.

events = result;
}
return {events, execLog};
}

async function runChain(
chain,
event,
ctx
) {
const execLog = [];
const fastFunctions = !!ctx.connection?.options?.fastFunctions
let events = [event];
for (let k = 0; k < chain.length; k++) {
const f = chain[k];
Expand All @@ -406,24 +435,14 @@ async function runChain(
for (let i = 0; i < events.length; i++) {
const event = events[i];
let result = undefined;
// const execLogMeta = {
// eventIndex: i,
// receivedAt: rat && rat != "Invalid Date" ? rat : new Date(),
// functionId: f.id,
// };
try {
result = await f.f(deepCopy(event), ctx);
result = await f.f(fastFunctions ? event : deepCopy(event), ctx);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fastFunctions option disables event copying between functions, which can lead to unexpected side effects if functions mutate the event object.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Enabling fastFunctions option could lead to unexpected side effects if functions modify event objects


if (k < chain.length - 1 && Array.isArray(result) && result.length > 1) {
const l = result.length;
result = undefined;
throw new Error("Got " + l + " events as result of function #" + (k + 1) + " of " + chain.length + ". Only the last function in a chain is allowed to multiply events.");
}
// execLog.push({
// ...execLogMeta,
// ms: Date.now() - sw,
// dropped: isDropResult(result),
// });
} catch (err) {
if (err.name === DropRetryErrorName) {
result = "drop";
Expand All @@ -434,9 +453,6 @@ async function runChain(
execLog.push({
functionId: f.id,
error: err,
//event,
// ms: Date.now() - sw,
// dropped: isDropResult(result),
});
}
if (!isDropResult(result)) {
Expand All @@ -462,7 +478,8 @@ async function runChain(
const wrappedFunctionChain = async function (event, ctx) {
let chain = [];
//** @UDF_FUNCTIONS_CHAIN **//
const chainRes = await runChain(chain, event, ctx);

const chainRes = chain.length === 1 ? await runSingle(chain[0], event, ctx) : await runChain(chain, event, ctx);
checkError(chainRes);
if (Array.isArray(chainRes.events) && chainRes.events.length === 1) {
return chainRes.events[0];
Expand Down
Loading
Loading