高手的存在,就是让服务10亿人的时候,你感觉只是为你一个人服务......

ngrinder任务执行源码解析

目录
  1. 1. 源码下载
  2. 2. PerfTestRunnable任务执行
    1. 2.1. 入口PerfTestRunnable类init()方法
    2. 2.2. startPeriodically()开始执行任务
    3. 2.3. doTest()开始测试任务
      1. 2.3.1. 准备控制台,通过单例模式实现。
      2. 2.3.2. 准备脚本文件和agent,将脚本传送至agent
      3. 2.3.3. runTestOn 运行测试
      4. 2.3.4. agent启动运行

Alt text

读了ngrinder任务执行的源码,在此做一些记录。
ngrinder版本:3.4.1


源码下载

https://github.com/naver/ngrinder

Alt text


PerfTestRunnable任务执行

总架构图上对整个系统的流转描述的很清楚了,结合架构图读源码,会容易很多。
Alt text


入口PerfTestRunnable类init()方法

ngrinder-controller子项目下:

org.ngrinder.perftest.service.PerfTestRunnable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@PostConstruct
public void init() {
// Clean up db first.
doFinish(true);

this.startRunnable = new Runnable() {
@Override
public void run() {
startPeriodically();
}
};
scheduledTaskService.addFixedDelayedScheduledTask(startRunnable, PERFTEST_RUN_FREQUENCY_MILLISECONDS);
this.finishRunnable = new Runnable() {
@Override
public void run() {
finishPeriodically();
}
};
scheduledTaskService.addFixedDelayedScheduledTask(finishRunnable, PERFTEST_RUN_FREQUENCY_MILLISECONDS);

}

init()方法在项目启动加载Servlet后执行。

初始化数据库(默认是h2,db配置在/root/.ngrinder/database.conf)。

俩定时任务startPeriodically(),finishPeriodically(),1s一次。核心逻辑在startPeriodically()中。


startPeriodically()开始执行任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/**
* Scheduled method for test execution. This method dispatches the test
* candidates and run one of them. This method is responsible until a test
* is executed.
*/
public void startPeriodically() {
doStart();
}

void doStart() {
if (config.hasNoMoreTestLock()) {
return;
}
// Block if the count of testing exceed the limit
if (!canExecuteMore()) {
// LOG MORE
List<PerfTest> currentlyRunningTests = perfTestService.getCurrentlyRunningTest();
LOG.debug("Currently running test is {}. No more tests can not run.", currentlyRunningTests.size());
return;
}
// Find out next ready perftest
PerfTest runCandidate = getRunnablePerfTest();
if (runCandidate == null) {
return;
}

if (!isScheduledNow(runCandidate)) {
// this test project is reserved,but it isn't yet going to run test
// right now.
return;
}


if (!hasEnoughFreeAgents(runCandidate)) {
return;
}

doTest(runCandidate);
}

开始会判断是否超过任务上限,默认10个,可在system.conf中设置。

找到下个准备执行(状态为READY)的任务。

1
2
3
4
5
6
public PerfTest getNextRunnablePerfTestPerfTestCandidate() {
List<PerfTest> readyPerfTests = getPerfTestRepository().findAllByStatusAndRegionOrderByScheduledTimeAsc(
Status.READY, getConfig().getRegion());

List<PerfTest> usersFirstPerfTests = filterCurrentlyRunningTestUsersTest(readyPerfTests);
return usersFirstPerfTests.isEmpty() ? null : readyPerfTests.get(0);
}

判断有没有符合权限的空闲的agent。

1
2
3
4
5
6
7
8
9
protected boolean hasEnoughFreeAgents(PerfTest test) {
int size = agentManager.getAllFreeApprovedAgentsForUser(test.getCreatedUser()).size();
if (test.getAgentCount() != null && test.getAgentCount() > size) {
perfTestService.markProgress(test, "The test is tried to execute but there is not enough free agents."
+ "\n- Current free agent count : " + size + " / Requested : " + test.getAgentCount() + "\n");
return false;
}
return true;
}

doTest()开始测试任务

真正的测试任务执行从该方法开始,之前可以称为准备阶段。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
* Run the given test.
* <p/>
* If fails, it marks STOP_BY_ERROR in the given {@link PerfTest} status
*
* @param perfTest perftest instance;
*/

public void doTest(final PerfTest perfTest) {
SingleConsole singleConsole = null;
try {
singleConsole = startConsole(perfTest);
ScriptHandler prepareDistribution = perfTestService.prepareDistribution(perfTest);
GrinderProperties grinderProperties = perfTestService.getGrinderProperties(perfTest, prepareDistribution);
startAgentsOn(perfTest, grinderProperties, checkCancellation(singleConsole));
distributeFileOn(perfTest, checkCancellation(singleConsole));

singleConsole.setReportPath(perfTestService.getReportFileDirectory(perfTest));
runTestOn(perfTest, grinderProperties, checkCancellation(singleConsole));
} catch (SingleConsoleCancellationException ex) {
// In case of error, mark the occurs error on perftest.
doCancel(perfTest, singleConsole);
notifyFinish(perfTest, StopReason.CANCEL_BY_USER);
} catch (Exception e) {
// In case of error, mark the occurs error on perftest.
LOG.error("Error while executing test: {} - {} ", perfTest.getTestIdentifier(), e.getMessage());
LOG.debug("Stack Trace is : ", e);
doTerminate(perfTest, singleConsole);
notifyFinish(perfTest, StopReason.ERROR_WHILE_PREPARE);
}
}

准备控制台,通过单例模式实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/**
* Get an available console.
* <p/>
* If there is no available console, it waits until available console is returned back. If the specific time is
* elapsed, the timeout error occurs and throws {@link org.ngrinder.common.exception.NGrinderRuntimeException} . The
* timeout can be adjusted by overriding {@link #getMaxWaitingMilliSecond()}.
*
* @param baseConsoleProperties base {@link net.grinder.console.model.ConsoleProperties}
* @return console
*/

public SingleConsole getAvailableConsole(ConsoleProperties baseConsoleProperties) {
ConsoleEntry consoleEntry = null;
try {
consoleEntry = consoleQueue.poll(getMaxWaitingMilliSecond(), TimeUnit.MILLISECONDS);
if (consoleEntry == null) {
throw processException("no console entry available");
}
synchronized (this) {
consoleEntry.releaseSocket();
// FIXME : It might fail here
ConsoleCommunicationSetting consoleCommunicationSetting = ConsoleCommunicationSetting.asDefault();
if (config.getInactiveClientTimeOut() > 0) {
consoleCommunicationSetting.setInactiveClientTimeOut(config.getInactiveClientTimeOut());
}
SingleConsole singleConsole = new SingleConsole(config.getCurrentIP(), consoleEntry.getPort(),
consoleCommunicationSetting, baseConsoleProperties);

getConsoleInUse().add(singleConsole);
singleConsole.setCsvSeparator(config.getCsvSeparator());
return singleConsole;
}
} catch (Exception e) {
if (consoleEntry != null) {
consoleQueue.add(consoleEntry);
}
throw processException("no console entry available");
}
}

consoleQueue存储在consoleQueue中,通过ConsoleManager类进行设置,默认队列最大值10,可在system.conf中设置。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* Prepare console queue.
*/

@PostConstruct
public void init() {
int consoleSize = getConsoleSize();
consoleQueue = new ArrayBlockingQueue<ConsoleEntry>(consoleSize);
final String currentIP = config.getCurrentIP();
for (int each : getAvailablePorts(currentIP, consoleSize, getConsolePortBase(), MAX_PORT_NUMBER)) {
final ConsoleEntry e = new ConsoleEntry(config.getCurrentIP(), each);
try {
e.occupySocket();
consoleQueue.add(e);
} catch (Exception ex) {
LOG.error("socket binding to {}:{} is failed", config.getCurrentIP(), each);
}

}
}

准备脚本文件和agent,将脚本传送至agent

1
2
3
4
5
6
7
8
9

//准备脚本
ScriptHandler prepareDistribution = perfTestService.prepareDistribution(perfTest);
//封装脚本
GrinderProperties grinderProperties = perfTestService.getGrinderProperties(perfTest, prepareDistribution);
//启动agent
startAgentsOn(perfTest, grinderProperties, checkCancellation(singleConsole));
//将脚本传送至agent
distributeFileOn(perfTest, checkCancellation(singleConsole));

编写的脚本会保存在服务器上,默认路径/root/.ngrinder/perftest/。


runTestOn 运行测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* Run a given {@link PerfTest} with the given {@link GrinderProperties} and
* the {@link SingleConsole} .
*
* @param perfTest perftest
* @param grinderProperties grinder properties
* @param singleConsole console to be used.
*/

void runTestOn(final PerfTest perfTest, GrinderProperties grinderProperties, final SingleConsole singleConsole) {
// start target monitor
for (OnTestLifeCycleRunnable run : pluginManager.getEnabledModulesByClass(OnTestLifeCycleRunnable.class)) {
run.start(perfTest, perfTestService, config.getVersion());
}

// Run test
perfTestService.markStatusAndProgress(perfTest, START_TESTING, "The test is ready to start.");
// Add listener to detect abnormal condition and mark the perfTest
singleConsole.addListener(new ConsoleShutdownListener() {
@Override
public void readyToStop(StopReason stopReason) {
perfTestService.markAbnormalTermination(perfTest, stopReason);
LOG.error("Abnormal test {} due to {}", perfTest.getId(), stopReason.name());
}
});

long startTime = singleConsole.startTest(grinderProperties);
perfTest.setStartTime(new Date(startTime));
addSamplingListeners(perfTest, singleConsole);
perfTestService.markStatusAndProgress(perfTest, TESTING, "The test is started.");
singleConsole.startSampling();

}

开启监控空间(包括自定义的plugin),开启异常中断监听。

进行测试结果收集。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/**
* Start sampling with sampling ignore count.
*/

public void startSampling() {
this.sampleModel = getConsoleComponent(SampleModelImplementationEx.class);
this.sampleModel.addTotalSampleListener(this);
this.sampleModel.addModelListener(new SampleModel.Listener() {
@Override
public void stateChanged() {
capture = SingleConsole.this.sampleModel.getState().isCapturing();
}

@Override
public void resetTests() {
intervalStatisticMapPerTest.clear();
accumulatedStatisticMapPerTest.clear();
}

@Override
public void newTests(Set<Test> newTests, ModelTestIndex modelTestIndex) {
for (final Test each : newTests) {
SingleConsole.this.sampleModel.addSampleListener(each, new SampleListener() {
@Override
public void update(StatisticsSet intervalStatistics, StatisticsSet cumulativeStatistics) {
intervalStatisticMapPerTest.put(each, intervalStatistics.snapshot());
accumulatedStatisticMapPerTest.put(each, cumulativeStatistics.snapshot());
}
});
}
}

@Override
public void newSample() {
}
});
informTestSamplingStart();
this.sampleModel.start();
LOGGER.info("Sampling is started");
}

agent启动运行

agent的启动运行在AgentManager类中,init方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
* Initialize agent manager.
*/

@PostConstruct
public void init() {
int port = config.getControllerPort();

ConsoleCommunicationSetting consoleCommunicationSetting = ConsoleCommunicationSetting.asDefault();
if (config.getInactiveClientTimeOut() > 0) {
consoleCommunicationSetting.setInactiveClientTimeOut(config.getInactiveClientTimeOut());
}

agentControllerServerDaemon = new AgentControllerServerDaemon(config.getCurrentIP(), port, consoleCommunicationSetting);
agentControllerServerDaemon.start();
agentControllerServerDaemon.setAgentDownloadRequestListener(this);
agentControllerServerDaemon.addLogArrivedListener(new LogArrivedListener() {
@Override
public void logArrived(String testId, AgentAddress agentAddress, byte[] logs) {
AgentControllerIdentityImplementation agentIdentity = convert(agentAddress.getIdentity());
if (ArrayUtils.isEmpty(logs)) {
LOGGER.error("Log is arrived from {} but no log content", agentIdentity.getIp());
}
File logFile = null;
try {
logFile = new File(config.getHome().getPerfTestLogDirectory(testId.replace("test_", "")),
agentIdentity.getName() + "-" + agentIdentity.getRegion() + "-log.zip");

FileUtils.writeByteArrayToFile(logFile, logs);
} catch (IOException e) {
LOGGER.error("Error while write logs from {} to {}", agentAddress.getIdentity().getName(),
logFile.getAbsolutePath());

LOGGER.error("Error is following", e);
}
}
});

}

agent 启动后,连接到 controller, 开始控制测试流,直到测试结束。当测试完成后,所使用的 agents 归还给 AgentControllerServer ,以供其他console使用。