zkevm-node 代码分析

git checkout https://github.com/0xPolygonHermez/zkevm-node.git

下载代码,之后列出目录

 

运行测试部署

#cd test && sudo make run

其中zkevm-json-rpc、zkevm-l2gaspricer、zkevm-aggregator、zkevm-sequence-sender、zkevm-sequencer、zkevm-eth-tx-manager。均由zkevm-node代码作为基础镜像。

并发现 zkevm-node是软件架构使用docker-compose做服务堆栈管理,并创建虚拟网络zkevm(桥接)。

 

根据需求,目前只需要知道zkevm-node中aggregator通讯方式的实现细节。

根据docker-compose.yml 可知

zkevm-aggregator:
    container_name: zkevm-aggregator
    image: zkevm-node
    ports:
      - 50081:50081
      - 9093:9091 # needed if metrics enabled
    environment:
      - ZKEVM_NODE_STATE_DB_HOST=zkevm-state-db
      - ZKEVM_NODE_AGGREGATOR_SENDER_ADDRESS=0xf39fd6e51aad88f6f4ce6ab8827279cfffb92266
    volumes:
      - ./config/test.node.config.toml:/app/config.toml
      - ./config/test.genesis.config.json:/app/genesis.json
    command:
      - "/bin/sh"
      - "-c"
      - "/app/zkevm-node run --network custom --custom-network-file /app/genesis.json --cfg /app/config.toml --components aggregator
使用test.node.config.tomlzkevm-aggregator进行配置。
再来看看test.node.config.toml。

[Executor]
URI = "zkevm-prover:50071"
MaxGRPCMessageSize = 100000000

可以看到aggregator使用prover的executor 服务,而最终是executor做聚合。

源码片段:https://github.com/0xPolygonHermez/zkevm-prover/blob/main/src/service/executor/executor_service.cpp

.....

//从zkevm-node 中的 aggregator接收GRPC服务消息ProcessBatch
::grpc::Status ExecutorServiceImpl::ProcessBatch(::grpc::ServerContext* context, const ::executor::v1::ProcessBatchRequest* request, ::executor::v1::ProcessBatchResponse* response)
{
    // If the process is exiting, do not start new activities
    if (bExitingProcess)
    {
        return Status::CANCELLED;
    }

    //TimerStart(EXECUTOR_PROCESS_BATCH);
    struct timeval EXECUTOR_PROCESS_BATCH_start;
    gettimeofday(&EXECUTOR_PROCESS_BATCH_start,NULL);

#ifdef LOG_SERVICE
    zklog.info("ExecutorServiceImpl::ProcessBatch() got request:\n" + request->DebugString());
#endif

#ifdef LOG_TIME
    lock();
    if ( (firstTotalTime.tv_sec == 0) && (firstTotalTime.tv_usec == 0) )
    {
        gettimeofday(&firstTotalTime, NULL);
        lastTotalTime = firstTotalTime;
    }
    unlock();
#endif

    // 创建了一个processBatch 类别的 ProverRequest 
    ProverRequest proverRequest(fr, config, prt_processBatch);

    // Save request to file
    if (config.saveRequestToFile)
    {
        string2file(request->DebugString(), proverRequest.filePrefix + "executor_request.txt");
    }

    // Get external request ID
    proverRequest.contextId = request->context_id();

......

    //prover处理ProverRequest
    prover.processBatch(&proverRequest);

    //TimerStart(EXECUTOR_PROCESS_BATCH_BUILD_RESPONSE);

    if (proverRequest.result != ZKR_SUCCESS)
    {
        zklog.error("ExecutorServiceImpl::ProcessBatch() detected proverRequest.result=" + to_string(proverRequest.result) + "=" + zkresult2string(proverRequest.result), &proverRequest.tags);
    }
    
    //这里设置 经过prover处理的proverRequest 返回结果,并设置到response上。
    response->set_error(zkresult2error(proverRequest.result));
    //如下:设置真实累计使用的gas费用
    response->set_cumulative_gas_used(proverRequest.pFullTracer->get_cumulative_gas_used());
......
   // Calculate the throughput, for this ProcessBatch call, and for all calls
   
   //这里计算执行时间和数据大小,并汇总到日志中,不开启LOG_TIME不计算。
#ifdef LOG_TIME
    lock();
    counter++;
    uint64_t execGas = response->cumulative_gas_used();
    totalGas += execGas;
    uint64_t execBytes = request->batch_l2_data().size();
    totalBytes += execBytes;
    uint64_t execTX = responses.size();
    totalTX += execTX;
    double execTime = double(TimeDiff(EXECUTOR_PROCESS_BATCH_start, EXECUTOR_PROCESS_BATCH_stop))/1000000;
    totalTime += execTime;
    struct timeval now;
    gettimeofday(&now, NULL);
    double timeSinceLastTotal = zkmax(1, double(TimeDiff(lastTotalTime, now))/1000000);
    if (timeSinceLastTotal >= 10.0)
    {
        totalTPG = double(totalGas - lastTotalGas)/timeSinceLastTotal;
        totalTPB = double(totalBytes - lastTotalBytes)/timeSinceLastTotal;
        totalTPTX = double(totalTX - lastTotalTX)/timeSinceLastTotal;
        lastTotalGas = totalGas;
        lastTotalBytes = totalBytes;
        lastTotalTX = totalTX;
        lastTotalTime = now;
    }
    double timeSinceFirstTotal = zkmax(1, double(TimeDiff(firstTotalTime, now))/1000000);
    double TPG = double(totalGas)/timeSinceFirstTotal;
    double TPB = double(totalBytes)/timeSinceFirstTotal;
    double TPTX = double(totalTX)/timeSinceFirstTotal;
    
    uint64_t nfd = getNumberOfFileDescriptors();

    zklog.info("ExecutorServiceImpl::ProcessBatch() done counter=" + to_string(counter) + " B=" + to_string(execBytes) + " TX=" + to_string(execTX) + " gas=" + to_string(execGas) + " time=" + to_string(execTime) +
        " TP=" + to_string(double(execBytes)/execTime) + "B/s=" + to_string(double(execTX)/execTime) + "TX/s=" + to_string(double(execGas)/execTime) + "gas/s=" + to_string(double(execGas)/double(execBytes)) + "gas/B" +
        " totalTP(10s)=" + to_string(totalTPB) + "B/s=" + to_string(totalTPTX) + "TX/s=" + to_string(totalTPG) + "gas/s=" + to_string(totalTPG/zkmax(1,totalTPB)) + "gas/B" +
        " totalTP(ever)=" + to_string(TPB) + "B/s=" + to_string(TPTX) + "TX/s=" + to_string(TPG) + "gas/s=" + to_string(TPG/zkmax(1,TPB)) + "gas/B" +
        " totalTime=" + to_string(totalTime) +
        " filedesc=" + to_string(nfd),
        &proverRequest.tags);
    
    // If the TP in gas/s is < threshold, log the input, unless it has been done before
    if (!config.logExecutorServerInput && (config.logExecutorServerInputGasThreshold > 0) && ((double(execGas)/execTime) < config.logExecutorServerInputGasThreshold))
    {
        json inputJson;
        proverRequest.input.save(inputJson);
        string inputJsonString = inputJson.dump();
        replace(inputJsonString.begin(), inputJsonString.end(), '"', '\'');
        zklog.info("TP=" + to_string(double(execGas)/execTime) + "gas/s Input=" + inputJsonString, &proverRequest.tags);
    }
    unlock();
#endif

    return Status::OK;
}

源码片段 https://github.com/0xPolygonHermez/zkevm-prover/blob/main/src/prover/prover.cpp

void Prover::processBatch(ProverRequest *pProverRequest)
{
    //TimerStart(PROVER_PROCESS_BATCH);
    zkassert(pProverRequest != NULL);
    zkassert(pProverRequest->type == prt_processBatch);

    if (config.runAggregatorClient)
    {
        zklog.info("Prover::processBatch() timestamp=" + pProverRequest->timestamp + " UUID=" + pProverRequest->uuid);
    }

    // Save input to <timestamp>.input.json, as provided by client
    if (config.saveInputToFile)
    {
        json inputJson;
        pProverRequest->input.save(inputJson);
        json2file(inputJson, pProverRequest->inputFile());
    }

    // Log input if requested
    if (config.logExecutorServerInput)
    {
        json inputJson;
        pProverRequest->input.save(inputJson);
        zklog.info("Input=" + inputJson.dump());
    }

    // Execute the program, in the process batch way
    // 这里调用进入executor (main_sm状态机处理)处理process_batch
    executor.process_batch(*pProverRequest);

    // Save input to <timestamp>.input.json after execution including dbReadLog
    if (config.saveDbReadsToFile)
    {
        json inputJsonEx;
        pProverRequest->input.save(inputJsonEx, *pProverRequest->dbReadLog);
        json2file(inputJsonEx, pProverRequest->inputDbFile());
    }

    //TimerStopAndLog(PROVER_PROCESS_BATCH);
}

至此aggregator处理聚合L1数据的过程分析结束,分析程序运行时的时序图如下: