Source Code Analysis of Apache SeaTunnel Zeta Engine (Part 3): Server-Side Task Submission | HackerNoon

This is the last piece of the series articles to analyze the Apache SeaTunnel Zeta Engine Source Code; review the previous series to get the full picture:

Let’s review the components that execute after the server starts:

  • CoordinatorService: Enabled only on master/standby nodes, listens to cluster status, and handles master-standby failovers.
  • SlotService: Enabled on worker nodes, periodically reports its status to the master.
  • TaskExecutionService: Enabled on worker nodes, periodically updates task metrics to IMAP.

When no tasks are received by the cluster, these components run. However, when a client sends a SeaTunnelSubmitJobCodec message to the server, how does the server handle it?

Message Reception

Since the client and server are on different machines, method calls cannot be used; instead, message passing is employed. Upon receiving a message, how does the server process it?

Firstly, the client sends a message of type SeaTunnelSubmitJobCodec:

// Client code
ClientMessage request =
        SeaTunnelSubmitJobCodec.encodeRequest(
                jobImmutableInformation.getJobId(),
                seaTunnelHazelcastClient
                        .getSerializationService()
                        .toData(jobImmutableInformation),
                jobImmutableInformation.isStartWithSavePoint());
PassiveCompletableFuture<Void> submitJobFuture =
        seaTunnelHazelcastClient.requestOnMasterAndGetCompletableFuture(request);

In the SeaTunnelSubmitJobCodec class, it is associated with a SeaTunnelMessageTaskFactoryProvider class, which maps message types to MessageTask classes. For SeaTunnelSubmitJobCodec, it maps to the SubmitJobTaskclass:

private final Int2ObjectHashMap<MessageTaskFactory> factories = new Int2ObjectHashMap<>(60);

private void initFactories() {
    factories.put(
            SeaTunnelPrintMessageCodec.REQUEST_MESSAGE_TYPE,
            (clientMessage, connection) ->
                    new PrintMessageTask(clientMessage, node, connection));
    factories.put(
            SeaTunnelSubmitJobCodec.REQUEST_MESSAGE_TYPE,
            (clientMessage, connection) -> new SubmitJobTask(clientMessage, node, connection));
    .....
}

When examining the SubmitJobTask class, it invokes the SubmitJobOperation class:

@Override
protected Operation prepareOperation() {
    return new SubmitJobOperation(
            parameters.jobId,
            parameters.jobImmutableInformation,
            parameters.isStartWithSavePoint);
}

In the SubmitJobOperation class, the task information is handed over to the CoordinatorService component via its submitJob method:

@Override
protected PassiveCompletableFuture<?> doRun() throws Exception {
    SeaTunnelServer seaTunnelServer = getService();
    return seaTunnelServer
            .getCoordinatorService()
            .submitJob(jobId, jobImmutableInformation, isStartWithSavePoint);
}

At this point, a client message is effectively handed over to the server for method invocation. Other types of operations can be traced in a similar manner.

CoordinatorService

Next, let’s explore how CoordinatorService handles job submissions:

public PassiveCompletableFuture<Void> submitJob(
        long jobId, Data jobImmutableInformation, boolean isStartWithSavePoint) {
    CompletableFuture<Void> jobSubmitFuture = new CompletableFuture<>();

    // First, check if a job with the same ID already exists
    if (getJobMaster(jobId) != null) {
        logger.warning(
                String.format(
                        "The job %s is currently running; no need to submit again.", jobId));
        jobSubmitFuture.complete(null);
        return new PassiveCompletableFuture<>(jobSubmitFuture);
    }

    // Initialize JobMaster object
    JobMaster jobMaster =
            new JobMaster(
                    jobImmutableInformation,
                    this.nodeEngine,
                    executorService,
                    getResourceManager(),
                    getJobHistoryService(),
                    runningJobStateIMap,
                    runningJobStateTimestampsIMap,
                    ownedSlotProfilesIMap,
                    runningJobInfoIMap,
                    metricsImap,
                    engineConfig,
                    seaTunnelServer);
    
    executorService.submit(
            () -> {
                try {
                    // Ensure no duplicate tasks with the same ID
                    if (!isStartWithSavePoint
                            && getJobHistoryService().getJobMetrics(jobId) != null) {
                        throw new JobException(
                                String.format(
                                        "The job id %s has already been submitted and is not starting with a savepoint.",
                                        jobId));
                    }
                    // Add task info to IMAP
                    runningJobInfoIMap.put(
                            jobId,
                            new JobInfo(System.currentTimeMillis(), jobImmutableInformation));
                    runningJobMasterMap.put(jobId, jobMaster);
                    // Initialize JobMaster
                    jobMaster.init(
                            runningJobInfoIMap.get(jobId).getInitializationTimestamp(), false);
                    // Task creation successful
                    jobSubmitFuture.complete(null);
                } catch (Throwable e) {
                    String errorMsg = ExceptionUtils.getMessage(e);
                    logger.severe(String.format("submit job %s error %s ", jobId, errorMsg));
                    jobSubmitFuture.completeExceptionally(new JobException(errorMsg));
                }
                if (!jobSubmitFuture.isCompletedExceptionally()) {
                    // Start job execution
                    try {
                        jobMaster.run();
                    } finally {
                        // Remove jobMaster from map if not cancelled
                        if (!jobMaster.getJobMasterCompleteFuture().isCancelled()) {
                            runningJobMasterMap.remove(jobId);
                        }
                    }
                } else {
                    runningJobInfoIMap.remove(jobId);
                    runningJobMasterMap.remove(jobId);
                }
            });
    return new PassiveCompletableFuture<>(jobSubmitFuture);
}

In the server, a JobMaster object is created to manage the individual task. During the JobMaster creation, it retrieves the resource manager via getResourceManager() and job history information via getJobHistoryService(). The jobHistoryService is initialized at startup, while ResourceManager is lazily loaded upon the first task submission:

/** Lazy load for resource manager */
public ResourceManager getResourceManager() {
    if (resourceManager == null) {
        synchronized (this) {
            if (resourceManager == null) {
                ResourceManager manager =
                        new ResourceManagerFactory(nodeEngine, engineConfig)
                                .getResourceManager();
                manager.init();
                resourceManager = manager;
            }
        }
    }
    return resourceManager;
}

ResourceManager

Currently, SeaTunnel supports only standalone deployment. When initializing ResourceManager, it gathers all cluster nodes and sends a SyncWorkerProfileOperation to get node information, updating the internal registerWorker state:

@Override
public void init() {
    log.info("Init ResourceManager");
    initWorker();
}

private void initWorker() {
    log.info("initWorker... ");
    List<Address> aliveNode =
            nodeEngine.getClusterService().getMembers().stream()
                    .map(Member::getAddress)
                    .collect(Collectors.toList());
    log.info("init live nodes: {}", aliveNode);
    List<CompletableFuture<Void>> futures =
            aliveNode.stream()
                    .map(
                            node ->
                                    sendToMember(new SyncWorkerProfileOperation(), node)
                                            .thenAccept(
                                                    p -> {
                                                        if (p != null) {
                                                            registerWorker.put(
                                                                    node, (WorkerProfile) p);
                                                            log.info(
                                                                    "received new worker register: "
                                                                            + ((WorkerProfile)
                                                                                            p)
                                                                                    .getAddress());
                                                        }
                                                    }))
                    .collect(Collectors.toList());
    futures.forEach(CompletableFuture::join);
    log.info("registerWorker: {}", registerWorker);
}

Previously, we observed that SlotService sends heartbeat messages to the master from each node periodically. Upon receiving these heartbeats, the ResourceManager updates the node statuses in its internal state.

@Override
public void heartbeat(WorkerProfile workerProfile) {
    if (!registerWorker.containsKey(workerProfile.getAddress())) {
        log.info("received new worker register: " + workerProfile.getAddress());
        sendToMember(new ResetResourceOperation(), workerProfile.getAddress()).join();
    } else {
        log.debug("received worker heartbeat from: " + workerProfile.getAddress());
    }
    registerWorker.put(workerProfile.getAddress(), workerProfile);
}

JobMaster

In the CoordinatorService, a JobMaster instance is created and its init method is called. When the init method is completed, it is considered that the task creation is successful. The run method is then called to formally execute the task.

Let’s look at the initialization and init method.

public JobMaster(
        @NonNull Data jobImmutableInformationData,
        @NonNull NodeEngine nodeEngine,
        @NonNull ExecutorService executorService,
        @NonNull ResourceManager resourceManager,
        @NonNull JobHistoryService jobHistoryService,
        @NonNull IMap runningJobStateIMap,
        @NonNull IMap runningJobStateTimestampsIMap,
        @NonNull IMap ownedSlotProfilesIMap,
        @NonNull IMap<Long, JobInfo> runningJobInfoIMap,
        @NonNull IMap<Long, HashMap<TaskLocation, SeaTunnelMetricsContext>> metricsImap,
        EngineConfig engineConfig,
        SeaTunnelServer seaTunnelServer) {
    this.jobImmutableInformationData = jobImmutableInformationData;
    this.nodeEngine = nodeEngine;
    this.executorService = executorService;
    flakeIdGenerator =
            this.nodeEngine
                    .getHazelcastInstance()
                    .getFlakeIdGenerator(Constant.SEATUNNEL_ID_GENERATOR_NAME);
    this.ownedSlotProfilesIMap = ownedSlotProfilesIMap;
    this.resourceManager = resourceManager;
    this.jobHistoryService = jobHistoryService;
    this.runningJobStateIMap = runningJobStateIMap;
    this.runningJobStateTimestampsIMap = runningJobStateTimestampsIMap;
    this.runningJobInfoIMap = runningJobInfoIMap;
    this.engineConfig = engineConfig;
    this.metricsImap = metricsImap;
    this.seaTunnelServer = seaTunnelServer;
    this.releasedSlotWhenTaskGroupFinished = new ConcurrentHashMap<>();
}

During initialization, only simple variable assignments are performed without any significant operations. We need to focus on the init method.

public synchronized void init(long initializationTimestamp, boolean restart) throws Exception {
    // The server receives a binary object from the client,
    // which is first converted to a JobImmutableInformation object, the same object sent by the client
    jobImmutableInformation =
            nodeEngine.getSerializationService().toObject(jobImmutableInformationData);
    // Get the checkpoint configuration, such as the interval, timeout, etc.
    jobCheckpointConfig =
            createJobCheckpointConfig(
                    engineConfig.getCheckpointConfig(), jobImmutableInformation.getJobConfig());

    LOGGER.info(
            String.format(
                    "Init JobMaster for Job %s (%s) ",
                    jobImmutableInformation.getJobConfig().getName(),
                    jobImmutableInformation.getJobId()));
    LOGGER.info(
            String.format(
                    "Job %s (%s) needed jar urls %s",
                    jobImmutableInformation.getJobConfig().getName(),
                    jobImmutableInformation.getJobId(),
                    jobImmutableInformation.getPluginJarsUrls()));
    ClassLoader appClassLoader = Thread.currentThread().getContextClassLoader();
    // Get the ClassLoader
    ClassLoader classLoader =
            seaTunnelServer
                    .getClassLoaderService()
                    .getClassLoader(
                            jobImmutableInformation.getJobId(),
                            jobImmutableInformation.getPluginJarsUrls());
    // Deserialize the logical DAG from the client-provided information
    logicalDag =
            CustomClassLoadedObject.deserializeWithCustomClassLoader(
                    nodeEngine.getSerializationService(),
                    classLoader,
                    jobImmutableInformation.getLogicalDag());
    try {
        Thread.currentThread().setContextClassLoader(classLoader);
        // Execute save mode functionality, such as table creation and deletion
        if (!restart
                && !logicalDag.isStartWithSavePoint()
                && ReadonlyConfig.fromMap(logicalDag.getJobConfig().getEnvOptions())
                        .get(EnvCommonOptions.SAVEMODE_EXECUTE_LOCATION)
                        .equals(SaveModeExecuteLocation.CLUSTER)) {
            logicalDag.getLogicalVertexMap().values().stream()
                    .map(LogicalVertex::getAction)
                    .filter(action -> action instanceof SinkAction)
                    .map(sink -> ((SinkAction<?, ?, ?, ?>) sink).getSink())
                    .forEach(JobMaster::handleSaveMode);
        }
        // Parse the logical plan into a physical plan
        final Tuple2<PhysicalPlan, Map<Integer, CheckpointPlan>> planTuple =
                PlanUtils.fromLogicalDAG(
                        logicalDag,
                        nodeEngine,
                        jobImmutableInformation,
                        initializationTimestamp,
                        executorService,
                        flakeIdGenerator,
                        runningJobStateIMap,
                        runningJobStateTimestampsIMap,
                        engineConfig.getQueueType(),
                        engineConfig);
        this.physicalPlan = planTuple.f0();
        this.physicalPlan.setJobMaster(this);
        this.checkpointPlanMap = planTuple.f1();
    } finally {
        // Reset the current thread's ClassLoader and release the created classLoader
        Thread.currentThread().setContextClassLoader(appClassLoader);
        seaTunnelServer
                .getClassLoaderService()
                .releaseClassLoader(
                        jobImmutableInformation.getJobId(),
                        jobImmutableInformation.getPluginJarsUrls());
    }
    Exception initException = null;
    try {
        // Initialize the checkpoint manager
        this.initCheckPointManager(restart);
    } catch (Exception e) {
        initException = e;
    }
    // Add some callback functions for job state listening
    this.initStateFuture();
    if (initException != null) {
        if (restart) {
            cancelJob();
        }
        throw initException;
    }
}

Lastly, let’s look at the run method:

public void run() {
    try {
        physicalPlan.startJob();
    } catch (Throwable e) {
        LOGGER.severe(
                String.format(
                        "Job %s (%s) run error with: %s",
                        physicalPlan.getJobImmutableInformation().getJobConfig().getName(),
                        physicalPlan.getJobImmutableInformation().getJobId(),
                        ExceptionUtils.getMessage(e)));
    } finally {
        jobMasterCompleteFuture.join();
        if (engineConfig.getConnectorJarStorageConfig().getEnable()) {
            List<ConnectorJarIdentifier> pluginJarIdentifiers =
                    jobImmutableInformation.getPluginJarIdentifiers();
            seaTunnelServer
                    .getConnectorPackageService()
                    .cleanUpWhenJobFinished(
                            jobImmutableInformation.getJobId(), pluginJarIdentifiers);
        }
    }
}

This method is relatively straightforward, calling physicalPlan.startJob() to execute the generated physical plan.

From the above code, it is evident that after the server receives a client task submission request, it initializes the JobMasterclass, which generates the physical plan from the logical plan, and then executes the physical plan.

Next, we need to delve into how the logical plan is converted into a physical plan.

Conversion From Logical Plan to Physical Plan

The generation of the physical plan is done by calling the following method in JobMaster:

final Tuple2<PhysicalPlan, Map<Integer, CheckpointPlan>> planTuple =
        PlanUtils.fromLogicalDAG(
                logicalDag,
                nodeEngine,
                jobImmutableInformation,
                initializationTimestamp,
                executorService,
                flakeIdGenerator,
                runningJobStateIMap,
                runningJobStateTimestampsIMap,
                engineConfig.getQueueType(),
                engineConfig);

In the method for generating the physical plan, the logical plan is first converted into an execution plan, and then the execution plan is converted into a physical plan.

public static Tuple2<PhysicalPlan, Map<Integer, CheckpointPlan>> fromLogicalDAG(
        @NonNull LogicalDag logicalDag,
        @NonNull NodeEngine nodeEngine,
        @NonNull JobImmutableInformation jobImmutableInformation,
        long initializationTimestamp,
        @NonNull ExecutorService executorService,
        @NonNull FlakeIdGenerator flakeIdGenerator,
        @NonNull IMap runningJobStateIMap,
        @NonNull IMap runningJobStateTimestampsIMap,
        @NonNull QueueType queueType,
        @NonNull EngineConfig engineConfig) {
    return new PhysicalPlanGenerator(
                    new ExecutionPlanGenerator(
                                    logicalDag, jobImmutableInformation, engineConfig)
                            .generate(),
                    nodeEngine,
                    jobImmutableInformation,
                    initializationTimestamp,
                    executorService,
                    flakeIdGenerator,
                    runningJobStateIMap,
                    runningJobStateTimestampsIMap,
                    queueType)
            .generate();
}

Generation of the Execution Plan

public ExecutionPlanGenerator(
        @NonNull LogicalDag logicalPlan,
        @NonNull JobImmutableInformation jobImmutableInformation,
        @NonNull EngineConfig engineConfig) {
    checkArgument(
            logicalPlan.getEdges().size() > 0, "ExecutionPlan Builder must have LogicalPlan.");
    this.logicalPlan = logicalPlan;
    this.jobImmutableInformation = jobImmutableInformation;
    this.engineConfig = engineConfig;
}

public ExecutionPlan generate() {
    log.debug("Generate execution plan using logical plan:");

    Set<ExecutionEdge> executionEdges = generateExecutionEdges(logicalPlan.getEdges());
    log.debug("Phase 1: generate execution edge list {}", executionEdges);

    executionEdges = generateShuffleEdges(executionEdges);
    log.debug("Phase 2: generate shuffle edge list {}", executionEdges);

    executionEdges = generateTransformChainEdges(executionEdges);
    log.debug("Phase 3: generate transform chain edge list {}", executionEdges);

    List<Pipeline> pipelines = generatePipelines(executionEdges);
    log.debug("Phase 4: generate pipeline list {}", pipelines);

    ExecutionPlan executionPlan = new ExecutionPlan(pipelines, jobImmutableInformation);
    log.debug("Phase 5

: generate execution plan {}", executionPlan);
    return executionPlan;
}

The ExecutionPlanGenerator class takes a logical plan and produces an execution plan through a series of steps, including generating execution edges, shuffle edges, transform chain edges, and finally, pipelines.

Generation of the Physical Plan

The PhysicalPlanGenerator class converts the execution plan into a physical plan:

public PhysicalPlanGenerator(
        @NonNull ExecutionPlan executionPlan,
        @NonNull NodeEngine nodeEngine,
        @NonNull JobImmutableInformation jobImmutableInformation,
        long initializationTimestamp,
        @NonNull ExecutorService executorService,
        @NonNull FlakeIdGenerator flakeIdGenerator,
        @NonNull IMap runningJobStateIMap,
        @NonNull IMap runningJobStateTimestampsIMap,
        @NonNull QueueType queueType) {
    this.executionPlan = executionPlan;
    this.nodeEngine = nodeEngine;
    this.jobImmutableInformation = jobImmutableInformation;
    this.initializationTimestamp = initializationTimestamp;
    this.executorService = executorService;
    this.flakeIdGenerator = flakeIdGenerator;
    this.runningJobStateIMap = runningJobStateIMap;
    this.runningJobStateTimestampsIMap = runningJobStateTimestampsIMap;
    this.queueType = queueType;
}

public PhysicalPlan generate() {
    List<PhysicalVertex> physicalVertices = generatePhysicalVertices(executionPlan);
    List<PhysicalEdge> physicalEdges = generatePhysicalEdges(executionPlan);

    PhysicalPlan physicalPlan = new PhysicalPlan(physicalVertices, physicalEdges);
    log.debug("Generate physical plan: {}", physicalPlan);
    return physicalPlan;
}

Let’s examine the contents of these two classes.

public class ExecutionPlan {
    private final List<Pipeline> pipelines;
    private final JobImmutableInformation jobImmutableInformation;
}

public class Pipeline {
    /** The ID of the pipeline. */
    private final Integer id;
    private final List<ExecutionEdge> edges;
    private final Map<Long, ExecutionVertex> vertexes;
    
 }
 
 public class ExecutionEdge {
    private ExecutionVertex leftVertex;
    private ExecutionVertex rightVertex;  
 }   

public class ExecutionVertex {
    private Long vertexId;
    private Action action;
    private int parallelism;
}        

Let’s compare it with the logical plan:

public class LogicalDag implements IdentifiedDataSerializable {
    @Getter private JobConfig jobConfig;
    private final Set<LogicalEdge> edges = new LinkedHashSet<>();
    private final Map<Long, LogicalVertex> logicalVertexMap = new LinkedHashMap<>();
    private IdGenerator idGenerator;
    private boolean isStartWithSavePoint = false;
  }
  
 public class LogicalEdge implements IdentifiedDataSerializable {
    private LogicalVertex inputVertex;
    private LogicalVertex targetVertex;
    private Long inputVertexId;
    private Long targetVertexId;
 }
  
public class LogicalVertex implements IdentifiedDataSerializable {
    private Long vertexId;
    private Action action;
    private int parallelism;  
 }        

It seems that each Pipeline resembles a logical plan. Why do we need this transformation step? Let’s take a closer look at the process of generating a logical plan.

As shown above, generating an execution plan involves five steps, which we will review one by one.

  • Step 1: Converting Logical Plan to Execution Plan
// Input is a set of logical plan edges, where each edge stores upstream and downstream nodes
private Set<ExecutionEdge> generateExecutionEdges(Set<LogicalEdge> logicalEdges) {
    Set<ExecutionEdge> executionEdges = new LinkedHashSet<>();

    Map<Long, ExecutionVertex> logicalVertexIdToExecutionVertexMap = new HashMap();
    // Sort in order: first by input node, then by output node
    List<LogicalEdge> sortedLogicalEdges = new ArrayList<>(logicalEdges);
    Collections.sort(
            sortedLogicalEdges,
            (o1, o2) -> {
                if (o1.getInputVertexId() != o2.getInputVertexId()) {
                    return o1.getInputVertexId() > o2.getInputVertexId() ? 1 : -1;
                }
                if (o1.getTargetVertexId() != o2.getTargetVertexId()) {
                    return o1.getTargetVertexId() > o2.getTargetVertexId() ? 1 : -1;
                }
                return 0;
            });
    // Loop to convert each logical plan edge to an execution plan edge
    for (LogicalEdge logicalEdge : sortedLogicalEdges) {
        LogicalVertex logicalInputVertex = logicalEdge.getInputVertex();
        ExecutionVertex executionInputVertex =
                logicalVertexIdToExecutionVertexMap.computeIfAbsent(
                        logicalInputVertex.getVertexId(),
                        vertexId -> {
                            long newId = idGenerator.getNextId();
                            // Recreate Action for each logical plan node
                            Action newLogicalInputAction =
                                    recreateAction(
                                            logicalInputVertex.getAction(),
                                            newId,
                                            logicalInputVertex.getParallelism());
                            // Convert to execution plan node
                            return new ExecutionVertex(
                                    newId,
                                    newLogicalInputAction,
                                    logicalInputVertex.getParallelism());
                        });
        // Similarly, recreate execution plan nodes for target nodes
        LogicalVertex logicalTargetVertex = logicalEdge.getTargetVertex();
        ExecutionVertex executionTargetVertex =
                logicalVertexIdToExecutionVertexMap.computeIfAbsent(
                        logicalTargetVertex.getVertexId(),
                        vertexId -> {
                            long newId = idGenerator.getNextId();
                            Action newLogicalTargetAction =
                                    recreateAction(
                                            logicalTargetVertex.getAction(),
                                            newId,
                                            logicalTargetVertex.getParallelism());
                            return new ExecutionVertex(
                                    newId,
                                    newLogicalTargetAction,
                                    logicalTargetVertex.getParallelism());
                        });
        // Generate execution plan edge
        ExecutionEdge executionEdge =
                new ExecutionEdge(executionInputVertex, executionTargetVertex);
        executionEdges.add(executionEdge);
    }
    return executionEdges;
}
private Set<ExecutionEdge> generateShuffleEdges(Set<ExecutionEdge> executionEdges) {
    // Map of upstream node ID to list of downstream nodes
    Map<Long, List<ExecutionVertex>> targetVerticesMap = new LinkedHashMap<>();
    // Store only nodes of type Source
    Set<ExecutionVertex> sourceExecutionVertices = new HashSet<>();
    executionEdges.forEach(
            edge -> {
                ExecutionVertex leftVertex = edge.getLeftVertex();
                ExecutionVertex rightVertex = edge.getRightVertex();
                if (leftVertex.getAction() instanceof SourceAction) {
                    sourceExecutionVertices.add(leftVertex);
                }
                targetVerticesMap
                        .computeIfAbsent(leftVertex.getVertexId(), id -> new ArrayList<>())
                        .add(rightVertex);
            });
    if (sourceExecutionVertices.size() != 1) {
        return executionEdges;
    }
    ExecutionVertex sourceExecutionVertex = sourceExecutionVertices.stream().findFirst().get();
    Action sourceAction = sourceExecutionVertex.getAction();
    List<CatalogTable> producedCatalogTables = new ArrayList<>();
    if (sourceAction instanceof SourceAction) {
        try {
            producedCatalogTables =
                    ((SourceAction<?, ?, ?>) sourceAction)
                            .getSource()
                            .getProducedCatalogTables();
        } catch (UnsupportedOperationException e) {
        }
    } else if (sourceAction instanceof TransformChainAction) {
        return executionEdges;
    } else {
        throw new SeaTunnelException(
                "source action must be SourceAction or TransformChainAction");
    }
    // If the source produces a single table or
    // the source has only one downstream output, return directly
    if (producedCatalogTables.size() <= 1
            || targetVerticesMap.get(sourceExecutionVertex.getVertexId()).size() <= 1) {
        return executionEdges;
    }

    List<ExecutionVertex> sinkVertices =
            targetVerticesMap.get(sourceExecutionVertex.getVertexId());
    // Check if there are other types of actions, currently downstream nodes should ideally have two types: Transform and Sink; here we check if only Sink type is present
    Optional<ExecutionVertex> hasOtherAction =
            sinkVertices.stream()
                    .filter(vertex -> !(vertex.getAction() instanceof SinkAction))
                    .findFirst();
    
    checkArgument(!hasOtherAction.isPresent());
    // After executing the above code, the current scenario is:
    // There is only one data source, this source produces multiple tables, and multiple sink nodes depend on these tables
    // This means the task has only two types of nodes: a source node that produces multiple tables and a group of sink nodes depending on this source
    // A new shuffle node will be created and added between the source and sinks
    // Modify the dependency relationship to source -> shuffle -> multiple sinks
    Set<ExecutionEdge> newExecutionEdges = new LinkedHashSet<>();
    // Shuffle strategy will not be explored in detail here
    ShuffleStrategy shuffleStrategy =
            ShuffleMultipleRowStrategy.builder()
                    .jobId(jobImmutableInformation.getJobId())
                    .inputPartitions(sourceAction.getParallelism())
                    .catalogTables(producedCatalogTables)
                    .queueEmptyQueueTtl(
                            (int)
                                    (engineConfig.getCheckpointConfig().getCheckpointInterval()
                                            * 3))
                    .build();
    ShuffleConfig shuffleConfig =
            ShuffleConfig.builder().shuffleStrategy(shuffleStrategy).build();

    long shuffleVertexId = idGenerator.getNextId();
    String shuffleActionName = String.format("Shuffle [%s]", sourceAction.getName());
    ShuffleAction shuffleAction =
            new ShuffleAction(shuffleVertexId, shuffleActionName, shuffleConfig);
    shuffleAction.setParallelism(sourceAction.getParallelism());
    ExecutionVertex shuffleVertex =
            new ExecutionVertex(shuffleVertexId, shuffleAction, shuffleAction.getParallelism());
    ExecutionEdge sourceToShuffleEdge = new ExecutionEdge(sourceExecutionVertex, shuffleVertex);
    newExecutionEdges.add(sourceToShuffleEdge);
    // Set the parallelism of multiple sink nodes to 1
    for (ExecutionVertex sinkVertex : sinkVertices) {
        sinkVertex.setParallelism(1);
        sinkVertex.getAction().setParallelism(1);
        ExecutionEdge shuffleToSinkEdge = new ExecutionEdge(shuffleVertex, sinkVertex);
        newExecutionEdges.add(shuffleToSinkEdge);
    }
    return newExecutionEdges;
}

The Shuffle step addresses specific scenarios where the source supports reading multiple tables, and there are multiple sink nodes depending on this source. In such cases, a shuffle node is added in between.

Step 3

private Set<ExecutionEdge> generateTransformChainEdges(Set<ExecutionEdge> executionEdges) {
    // Uses three structures: stores all Source nodes and the input/output nodes for each
    // inputVerticesMap stores all upstream input nodes by downstream node id as the key
    // targetVerticesMap stores all downstream output nodes by upstream node id as the key
    Map<Long, List<ExecutionVertex>> inputVerticesMap = new HashMap<>();
    Map<Long, List<ExecutionVertex>> targetVerticesMap = new HashMap<>();
    Set<ExecutionVertex> sourceExecutionVertices = new HashSet<>();
    executionEdges.forEach(
            edge -> {
                ExecutionVertex leftVertex = edge.getLeftVertex();
                ExecutionVertex rightVertex = edge.getRightVertex();
                if (leftVertex.getAction() instanceof SourceAction) {
                    sourceExecutionVertices.add(leftVertex);
                }
                inputVerticesMap
                        .computeIfAbsent(rightVertex.getVertexId(), id -> new ArrayList<>())
                        .add(leftVertex);
                targetVerticesMap
                        .computeIfAbsent(leftVertex.getVertexId(), id -> new ArrayList<>())
                        .add(rightVertex);
            });

    Map<Long, ExecutionVertex> transformChainVertexMap = new HashMap<>();
    Map<Long, Long> chainedTransformVerticesMapping = new HashMap<>();
    // Loop over each source, starting with all head nodes in the DAG
    for (ExecutionVertex sourceVertex : sourceExecutionVertices) {
        List<ExecutionVertex> vertices = new ArrayList<>();
        vertices.add(sourceVertex);
        for (int index = 0; index < vertices.size(); index++) {
            ExecutionVertex vertex = vertices.get(index);

            fillChainedTransformExecutionVertex(
                    vertex,
                    chainedTransformVerticesMapping,
                    transformChainVertexMap,
                    executionEdges,
                    Collections.unmodifiableMap(inputVerticesMap),
                    Collections.unmodifiableMap(targetVerticesMap));
            // If the current node has downstream nodes, add all downstream nodes to the list
            // The second loop will recalculate the newly added downstream nodes, which could be Transform nodes or Sink nodes
            if (targetVerticesMap.containsKey(vertex.getVertexId())) {
                vertices.addAll(targetVerticesMap.get(vertex.getVertexId()));
            }
        }
    }
    // After looping, chained Transform nodes will be chained, and the chainable edges will be removed from the execution plan
    // Therefore, the logical plan at this point cannot form the graph relationship and needs to be rebuilt
    Set<ExecutionEdge> transformChainEdges = new LinkedHashSet<>();
    // Loop over existing relationships
    for (ExecutionEdge executionEdge : executionEdges) {
        ExecutionVertex leftVertex = executionEdge.getLeftVertex();
        ExecutionVertex rightVertex = executionEdge.getRightVertex();
        boolean needRebuild = false;
        // Check if the input or output nodes of the current edge are in the chain mapping
        // If so, the node has been chained, and we need to find the chained node in the mapping
        // and rebuild the DAG
        if (chainedTransformVerticesMapping.containsKey(leftVertex.getVertexId())) {
            needRebuild = true;
            leftVertex =
                    transformChainVertexMap.get(
                            chainedTransformVerticesMapping.get(leftVertex.getVertexId()));
        }
        if (chainedTransformVerticesMapping.containsKey(rightVertex.getVertexId())) {
            needRebuild = true;
            rightVertex =
                    transformChainVertexMap.get(
                            chainedTransformVerticesMapping.get(rightVertex.getVertexId()));
        }
        if (needRebuild) {
            executionEdge = new ExecutionEdge(leftVertex, rightVertex);
        }
        transformChainEdges.add(executionEdge);
    }
    return transformChainEdges;
}

private void fillChainedTransformExecutionVertex(
        ExecutionVertex currentVertex,
        Map<Long, Long> chainedTransformVerticesMapping,
        Map<Long, ExecutionVertex> transformChainVertexMap,
        Set<ExecutionEdge> executionEdges,
        Map<Long, List<ExecutionVertex>> inputVerticesMap,
        Map<Long, List<ExecutionVertex>> targetVerticesMap) {
    // Exit if the map already contains the current node
    if (chainedTransformVerticesMapping.containsKey(currentVertex.getVertexId())) {
        return;
    }

    List<ExecutionVertex> transformChainedVertices = new ArrayList<>();
    collectChainedVertices(
            currentVertex,
            transformChainedVertices,
            executionEdges,
            inputVerticesMap,
            targetVerticesMap);
    // If the list is not empty, it means the Transform nodes in the list can be merged into one
    if (transformChainedVertices.size() > 0) {
        long newVertexId = idGenerator.getNextId();
        List<SeaTunnelTransform> transforms = new ArrayList<>(transformChainedVertices.size());
        List<String> names = new ArrayList<>(transformChainedVertices.size());
        Set<URL> jars = new HashSet<>();
        Set<ConnectorJarIdentifier> identifiers = new HashSet<>();

        transformChainedVertices.stream()
                .peek(
                        // Add all historical node IDs and new node IDs to the mapping
                        vertex ->
                                chainedTransformVerticesMapping.put(
                                        vertex.getVertexId(), newVertexId))
                .map(ExecutionVertex::getAction)
                .map(action -> (TransformAction) action)
                .forEach(
                        action -> {
                            transforms.add(action.getTransform());
                            jars.addAll(action.getJarUrls());
                            identifiers.addAll(action.getConnectorJarIdentifiers());
                            names.add(action.getName());
                        });
        String transformChainActionName =
                String.format("TransformChain[%s]", String.join("->", names));
        // Merge multiple TransformActions into one TransformChainAction
        TransformChainAction transformChainAction =
                new TransformChainAction(
                        newVertexId, transformChainActionName, jars, identifiers, transforms);
        transformChainAction.setParallelism(currentVertex.getAction().getParallelism());

        ExecutionVertex executionVertex =
                new ExecutionVertex(
                        newVertexId, transformChainAction, currentVertex.getParallelism());
        // Store the modified node information in the state
        transformChainVertexMap.put(newVertexId, executionVertex);
        chainedTransformVerticesMapping.put(
                currentVertex.getVertexId(), executionVertex.getVertexId());
    }
}

private void collectChainedVertices(
        ExecutionVertex currentVertex,
        List<ExecutionVertex> chainedVertices,
        Set<ExecutionEdge> executionEdges,
        Map<Long, List<ExecutionVertex>> inputVerticesMap,
        Map<Long, List<ExecutionVertex>> targetVerticesMap) {
    Action action = currentVertex.getAction();
    // Only merge TransformAction
    if (action instanceof TransformAction) {
        if (chainedVertices.size() == 0) {
            // If the list of vertices to be merged is empty, add itself to the list
            // The condition for entering this branch is that the current node is a TransformAction and the list to be merged is empty
            // There may be several scenarios: the first Transform node enters, and this Transform node has no constraints
            chainedVertices.add(currentVertex);
        } else if (inputVerticesMap.get(currentVertex.getVertexId()).size() == 1) {
            // When this condition is entered, it means:
            // The list of vertices to be merged already has at least one TransformAction
            // The scenario at this point is that the upstream Transform node has only one downstream node, i.e., the current node. This constraint is ensured by the following judgment
            // Chain the current TransformAction node with the previous TransformAction node
            // Delete this relationship from the execution plan
            executionEdges.remove(
                    new ExecutionEdge(
                            chainedVertices.get(chainedVertices.size() - 1), currentVertex));
            // Add itself to the list of nodes to be merged
            chainedVertices.add(currentVertex);
        } else {
            return;
        }
    } else {
        return;
    }

    // It cannot chain to any target vertex if it has multiple target vertices.
    if (targetVerticesMap.get(currentVertex.getVertexId()).size() == 1) {
        // If the current node has only one downstream node, try chaining again
        // If the current node has multiple downstream nodes, it will not chain the downstream nodes, so it can be ensured that the above chaining is a one-to-one relationship
        // This call occurs when the Transform node has only one downstream node
        collectChainedVertices(
                targetVerticesMap.get(currentVertex.getVertexId()).get(0),
                chainedVertices,
                executionEdges,
                inputVerticesMap,
                targetVerticesMap);
    }
}

Step 4

private List<Pipeline> generatePipelines(Set<ExecutionEdge> executionEdges) {
    // Stores each execution plan node
    Set<ExecutionVertex> executionVertices = new LinkedHashSet<>();
    for (ExecutionEdge edge : executionEdges) {
        executionVertices.add(edge.getLeftVertex());
        executionVertices.add(edge.getRightVertex());
    }
    // Calls the Pipeline generator to convert the execution plan into Pipelines
    PipelineGenerator pipelineGenerator =
            new PipelineGenerator(executionVertices, new ArrayList<>(executionEdges));
    List<Pipeline> pipelines = pipelineGenerator.generatePipelines();

    Set<String> duplicatedActionNames = new HashSet<>();
    Set<String> actionNames = new HashSet<>();
    for (Pipeline pipeline : pipelines) {
        Integer pipelineId = pipeline.getId();
        for (ExecutionVertex vertex : pipeline.getVertexes().values()) {
            // Get each execution node of the current Pipeline, reset the Action name, and add the pipeline name
            Action action = vertex.getAction();
            String actionName = String.format("pipeline-%s [%s]", pipelineId, action.getName());
            action.setName(actionName);
            if (actionNames.contains(actionName)) {
                duplicatedActionNames.add(actionName);
            }
            actionNames.add(action

Name);
        }
    }
    if (duplicatedActionNames.size() > 0) {
        throw new RuntimeException(
                String.format(
                        "Duplicated Action names found: %s", duplicatedActionNames));
    }
    return pipelines;
}

public PipelineGenerator(Collection<ExecutionVertex> vertices, List<ExecutionEdge> edges) {
    this.vertices = vertices;
    this.edges = edges;
}

public List<Pipeline> generatePipelines() {
    List<ExecutionEdge> executionEdges = expandEdgeByParallelism(edges);

    // Split the execution plan into unrelated execution plans based on their relationships
    // Divide into several unrelated execution plans
    List<List<ExecutionEdge>> edgesList = splitUnrelatedEdges(executionEdges);

    edgesList =
            edgesList.stream()
                    .flatMap(e -> this.splitUnionEdge(e).stream())
                    .collect(Collectors.toList());

    // Just convert execution plan to pipeline at now. We should split it to multi pipeline with
    // cache in the future
    IdGenerator idGenerator = new IdGenerator();
    // Convert execution plan graph to Pipeline
    return edgesList.stream()
            .map(
                    e -> {
                        Map<Long, ExecutionVertex> vertexes = new HashMap<>();
                        List<ExecutionEdge> pipelineEdges =
                                e.stream()
                                        .map(
                                                edge -> {
                                                    if (!vertexes.containsKey(
                                                            edge.getLeftVertexId())) {
                                                        vertexes.put(
                                                                edge.getLeftVertexId(),
                                                                edge.getLeftVertex());
                                                    }
                                                    ExecutionVertex source =
                                                            vertexes.get(
                                                                    edge.getLeftVertexId());
                                                    if (!vertexes.containsKey(
                                                            edge.getRightVertexId())) {
                                                        vertexes.put(
                                                                edge.getRightVertexId(),
                                                                edge.getRightVertex());
                                                    }
                                                    ExecutionVertex destination =
                                                            vertexes.get(
                                                                    edge.getRightVertexId());
                                                    return new ExecutionEdge(
                                                            source, destination);
                                                })
                                        .collect(Collectors.toList());
                        return new Pipeline(
                                (int) idGenerator.getNextId(), pipelineEdges, vertexes);
                    })
            .collect(Collectors.toList());
}

Step five involves generating the execution plan instances, passing the Pipeline parameters generated in step four.

Summary:

The execution plan performs the following tasks on the logical plan:

  1. When a source generates multiple tables and multiple sink nodes depend on this source, a shuffle node is added in between.
  2. Attempt to chain merge transform nodes, combining multiple transform nodes into one node.
  3. Split the tasks, dividing a configuration file/LogicalDag into several unrelated tasks represented as List<Pipeline>.

Physical Plan Generation

Before delving into physical plan generation, let’s first review what information is included in the generated physical plan and examine its internal components.

public class PhysicalPlan {
    private final List<SubPlan> pipelineList;
    private final AtomicInteger finishedPipelineNum = new AtomicInteger(0);
    private final AtomicInteger canceledPipelineNum = new AtomicInteger(0);
    private final AtomicInteger failedPipelineNum = new AtomicInteger(0);
    private final JobImmutableInformation jobImmutableInformation;
    private final IMap<Object, Object> runningJobStateIMap;
    private final IMap<Object, Long[]> runningJobStateTimestampsIMap;
    private CompletableFuture<JobResult> jobEndFuture;
    private final AtomicReference<String> errorBySubPlan = new AtomicReference<>();
    private final String jobFullName;
    private final long jobId;
    private JobMaster jobMaster;
    private boolean makeJobEndWhenPipelineEnded = true;
    private volatile boolean isRunning = false;
}

In this class, a key field is pipelineList, which is a list of SubPlan instances:

public class SubPlan {
    private final int pipelineMaxRestoreNum;
    private final int pipelineRestoreIntervalSeconds;
    private final List<PhysicalVertex> physicalVertexList;
    private final List<PhysicalVertex> coordinatorVertexList;
    private final int pipelineId;
    private final AtomicInteger finishedTaskNum = new AtomicInteger(0);
    private final AtomicInteger canceledTaskNum = new AtomicInteger(0);
    private final AtomicInteger failedTaskNum = new AtomicInteger(0);
    private final String pipelineFullName;
    private final IMap<Object, Object> runningJobStateIMap;
    private final Map<String, String> tags;
    private final IMap<Object, Long[]> runningJobStateTimestampsIMap;
    private CompletableFuture<PipelineExecutionState> pipelineFuture;
    private final PipelineLocation pipelineLocation;
    private AtomicReference<String> errorByPhysicalVertex = new AtomicReference<>();
    private final ExecutorService executorService;
    private JobMaster jobMaster;
    private PassiveCompletableFuture<Void> reSchedulerPipelineFuture;
    private Integer pipelineRestoreNum;
    private final Object restoreLock = new Object();
    private volatile PipelineStatus currPipelineStatus;
    public volatile boolean isRunning = false;
    private Map<TaskGroupLocation, SlotProfile> slotProfiles;
}

The SubPlan class maintains a list of PhysicalVertexinstances, divided into physical plan nodes and coordinator nodes:

public class PhysicalVertex {
    private final TaskGroupLocation taskGroupLocation;
    private final String taskFullName;
    private final TaskGroupDefaultImpl taskGroup;
    private final ExecutorService executorService;
    private final FlakeIdGenerator flakeIdGenerator;
    private final Set<URL> pluginJarsUrls;
    private final Set<ConnectorJarIdentifier> connectorJarIdentifiers;
    private final IMap<Object, Object> runningJobStateIMap;
    private CompletableFuture<TaskExecutionState> taskFuture;
    private final IMap<Object, Long[]> runningJobStateTimestampsIMap;
    private final NodeEngine nodeEngine;
    private JobMaster jobMaster;
    private volatile ExecutionState currExecutionState = ExecutionState.CREATED;
    public volatile boolean isRunning = false;
    private AtomicReference<String> errorByPhysicalVertex = new AtomicReference<>();
}
public class TaskGroupDefaultImpl implements TaskGroup {
    private final TaskGroupLocation taskGroupLocation;

    private final String taskGroupName;
    // Stores the tasks that the physical node needs to execute
    // Each task could be for reading data, writing data, data splitting, checkpoint tasks, etc.
    private final Map<Long, Task> tasks;
}

The PhysicalPlanGenerator is responsible for converting the execution plan into SeaTunnelTask and adding various coordination tasks such as data splitting, data committing, and checkpoint tasks during execution.

public PhysicalPlanGenerator(
        @NonNull ExecutionPlan executionPlan,
        @NonNull NodeEngine nodeEngine,
        @NonNull JobImmutableInformation jobImmutableInformation,
        long initializationTimestamp,
        @NonNull ExecutorService executorService,
        @NonNull FlakeIdGenerator flakeIdGenerator,
        @NonNull IMap runningJobStateIMap,
        @NonNull IMap runningJobStateTimestampsIMap,
        @NonNull QueueType queueType) {
    this.pipelines = executionPlan.getPipelines();
    this.nodeEngine = nodeEngine;
    this.jobImmutableInformation = jobImmutableInformation;
    this.initializationTimestamp = initializationTimestamp;
    this.executorService = executorService;
    this.flakeIdGenerator = flakeIdGenerator;
    // the checkpoint of a pipeline
    this.pipelineTasks = new HashSet<>();
    this.startingTasks = new HashSet<>();
    this.subtaskActions = new HashMap<>();
    this.runningJobStateIMap = runningJobStateIMap;
    this.runningJobStateTimestampsIMap = runningJobStateTimestampsIMap;
    this.queueType = queueType;
}

public Tuple2<PhysicalPlan, Map<Integer, CheckpointPlan>> generate() {
    // Get the node filter conditions from user configuration to select the nodes where tasks will run
    Map<String, String> tagFilter =
            (Map<String, String>)
                    jobImmutableInformation
                            .getJobConfig()
                            .getEnvOptions()
                            .get(EnvCommonOptions.NODE_TAG_FILTER.key());
    // TODO Determine which tasks do not need to be restored according to state
    CopyOnWriteArrayList<PassiveCompletableFuture<PipelineStatus>>
            waitForCompleteBySubPlanList = new CopyOnWriteArrayList<>();

    Map<Integer, CheckpointPlan> checkpointPlans = new HashMap<>();
    final int totalPipelineNum = pipelines.size();
    Stream<SubPlan> subPlanStream =
            pipelines.stream()
                    .map(
                            pipeline -> {
                                // Clear the state each time
                                this.pipelineTasks.clear();
                                this.startingTasks.clear();
                                this.subtaskActions.clear();
                                final int pipelineId = pipeline.getId();
                                // Get current task information
                                final List<ExecutionEdge> edges = pipeline.getEdges();
                                // Get all SourceActions
                                List<SourceAction<?, ?, ?>> sources = findSourceAction(edges);
                                // Generate Source data slice tasks, i.e., SourceSplitEnumeratorTask
                                // This task calls the SourceSplitEnumerator class in the connector if supported
                                List<PhysicalVertex> coordinatorVertexList =
                                        getEnumeratorTask(
                                                sources, pipelineId, totalPipelineNum);
                                // Generate Sink commit tasks, i.e., SinkAggregatedCommitterTask
                                // This task calls the SinkAggregatedCommitter class in the connector if supported
                                // These two tasks are executed as coordination tasks
                                coordinatorVertexList.addAll(
                                        getCommitterTask(edges, pipelineId, totalPipelineNum));
                                List<PhysicalVertex> physicalVertexList =
                                        getSourceTask(
                                                edges, sources, pipelineId, totalPipelineNum);
                                //
                                physicalVertexList.addAll(
                                        getShuffleTask(edges, pipelineId, totalPipelineNum));

                                CompletableFuture<PipelineStatus> pipelineFuture =
                                        new CompletableFuture<>();
                                waitForCompleteBySubPlanList.add(
                                        new PassiveCompletableFuture<>(pipelineFuture));
                                // Add checkpoint tasks
                                checkpointPlans.put(
                                        pipelineId,
                                        CheckpointPlan.builder()
                                                .pipelineId(pipelineId)
                                                .pipelineSubtasks(pipelineTasks)
                                                .startingSubtasks(startingTasks)
                                                .pipelineActions(pipeline.getActions())
                                                .subtaskActions(subtaskActions)
                                                .build());
                                return new SubPlan(
                                        pipelineId,
                                        totalPipelineNum,
                                        initializationTimestamp,
                                        physicalVertexList,
                                        coordinatorVertexList,
                                        jobImmutableInformation,
                                        executorService,
                                        runningJobStateIMap,
                                        runningJobStateTimestampsIMap,
                                        tagFilter);
                            });

    PhysicalPlan physicalPlan =
            new PhysicalPlan(
                    subPlanStream.collect(Collectors.toList()),
                    executorService,
                    jobImmutableInformation,
                    initializationTimestamp,
                    runningJobStateIMap,
                    runningJobStateTimestampsIMap);
    return Tuple2.tuple2(physicalPlan, checkpointPlans);
}

The process of generating the physical plan involves converting the execution plan into SeaTunnelTask and adding various coordination tasks, such as data splitting tasks, data committing tasks, and checkpoint tasks.

In SeaTunnelTask, tasks are converted into SourceFlowLifeCycleSinkFlowLifeCycleTransformFlowLifeCycleShuffleSinkFlowLifeCycleShuffleSourceFlowLifeCycle.

For example, the SourceFlowLifeCycle and SinkFlowLifeCycle classes are as follows:

  • SourceFlowLifeCycle
@Override
public void init() throws Exception {
    this.splitSerializer = sourceAction.getSource().getSplitSerializer();
    this.reader =
            sourceAction
                    .getSource()
                    .createReader(
                            new SourceReaderContext(
                                    indexID,
                                    sourceAction.getSource().getBoundedness(),
                                    this,
                                    metricsContext,
                                    eventListener));
    this.enumeratorTaskAddress = getEnumeratorTaskAddress();
}

@Override
public void open() throws Exception {
    reader.open();
    register();
}

public void collect() throws Exception {
    if (!prepareClose) {
        if (schemaChanging()) {
            log.debug("schema is changing, stop reader collect records");

            Thread.sleep(200);
            return;
        }

        reader.pollNext(collector);
        if (collector.isEmptyThisPollNext()) {
            Thread.sleep(100);
        } else {
            collector.resetEmptyThisPollNext();
            /**
             * The current thread obtain a checkpoint lock in the method {@link
             * SourceReader#pollNext(

Collector)}. When trigger the checkpoint or savepoint,
             * other threads try to obtain the lock in the method {@link
             * SourceFlowLifeCycle#triggerBarrier(Barrier)}. When high CPU load, checkpoint
             * process may be blocked as long time. So we need sleep to free the CPU.
             */
            Thread.sleep(0L);
        }

        if (collector.captureSchemaChangeBeforeCheckpointSignal()) {
            if (schemaChangePhase.get() != null) {
                throw new IllegalStateException(
                        "previous schema changes in progress, schemaChangePhase: "
                                + schemaChangePhase.get());
            }
            schemaChangePhase.set(SchemaChangePhase.createBeforePhase());
            runningTask.triggerSchemaChangeBeforeCheckpoint().get();
            log.info("triggered schema-change-before checkpoint, stopping collect data");
        } else if (collector.captureSchemaChangeAfterCheckpointSignal()) {
            if (schemaChangePhase.get() != null) {
                throw new IllegalStateException(
                        "previous schema changes in progress, schemaChangePhase: "
                                + schemaChangePhase.get());
            }
            schemaChangePhase.set(SchemaChangePhase.createAfterPhase());
            runningTask.triggerSchemaChangeAfterCheckpoint().get();
            log.info("triggered schema-change-after checkpoint, stopping collect data");
        }
    } else {
        Thread.sleep(100);
    }
}

In SourceFlowLifeCycle, data reading is performed in the collect method. Once data is read, it is placed into SeaTunnelSourceCollector. When data is received, the collector updates metrics and sends the data to downstream components.

@Override
public void collect(T row) {
    try {
        if (row instanceof SeaTunnelRow) {
            String tableId = ((SeaTunnelRow) row).getTableId();
            int size;
            if (rowType instanceof SeaTunnelRowType) {
                size = ((SeaTunnelRow) row).getBytesSize((SeaTunnelRowType) rowType);
            } else if (rowType instanceof MultipleRowType) {
                size = ((SeaTunnelRow) row).getBytesSize(rowTypeMap.get(tableId));
            } else {
                throw new SeaTunnelEngineException(
                        "Unsupported row type: " + rowType.getClass().getName());
            }
            sourceReceivedBytes.inc(size);
            sourceReceivedBytesPerSeconds.markEvent(size);
            flowControlGate.audit((SeaTunnelRow) row);
            if (StringUtils.isNotEmpty(tableId)) {
                String tableName = getFullName(TablePath.of(tableId));
                Counter sourceTableCounter = sourceReceivedCountPerTable.get(tableName);
                if (Objects.nonNull(sourceTableCounter)) {
                    sourceTableCounter.inc();
                } else {
                    Counter counter =
                            metricsContext.counter(SOURCE_RECEIVED_COUNT + "#" + tableName);
                    counter.inc();
                    sourceReceivedCountPerTable.put(tableName, counter);
                }
            }
        }
        sendRecordToNext(new Record<>(row));
        emptyThisPollNext = false;
        sourceReceivedCount.inc();
        sourceReceivedQPS.markEvent();
    } catch (IOException e) {
        throw new RuntimeException(e);
    }
}

public void sendRecordToNext(Record<?> record) throws IOException {
    synchronized (checkpointLock) {
        for (OneInputFlowLifeCycle<Record<?>> output : outputs) {
            output.received(record);
        }
    }
}
  • SinkFlowLifeCycle
@Override
public void received(Record<?> record) {
    try {
        if (record.getData() instanceof Barrier) {
            long startTime = System.currentTimeMillis();

            Barrier barrier = (Barrier) record.getData();
            if (barrier.prepareClose(this.taskLocation)) {
                prepareClose = true;
            }
            if (barrier.snapshot()) {
                try {
                    lastCommitInfo = writer.prepareCommit();
                } catch (Exception e) {
                    writer.abortPrepare();
                    throw e;
                }
                List<StateT> states = writer.snapshotState(barrier.getId());
                if (!writerStateSerializer.isPresent()) {
                    runningTask.addState(
                            barrier, ActionStateKey.of(sinkAction), Collections.emptyList());
                } else {
                    runningTask.addState(
                            barrier,
                            ActionStateKey.of(sinkAction),
                            serializeStates(writerStateSerializer.get(), states));
                }
                if (containAggCommitter) {
                    CommitInfoT commitInfoT = null;
                    if (lastCommitInfo.isPresent()) {
                        commitInfoT = lastCommitInfo.get();
                    }
                    runningTask
                            .getExecutionContext()
                            .sendToMember(
                                    new SinkPrepareCommitOperation<CommitInfoT>(
                                            barrier,
                                            committerTaskLocation,
                                            commitInfoSerializer.isPresent()
                                                    ? commitInfoSerializer
                                                            .get()
                                                            .serialize(commitInfoT)
                                                    : null),
                                    committerTaskAddress)
                            .join();
                }
            } else {
                if (containAggCommitter) {
                    runningTask
                            .getExecutionContext()
                            .sendToMember(
                                    new BarrierFlowOperation(barrier, committerTaskLocation),
                                    committerTaskAddress)
                            .join();
                }
            }
            runningTask.ack(barrier);

            log.debug(
                    "trigger barrier [{}] finished, cost {}ms. taskLocation [{}]",
                    barrier.getId(),
                    System.currentTimeMillis() - startTime,
                    taskLocation);
        } else if (record.getData() instanceof SchemaChangeEvent) {
            if (prepareClose) {
                return;
            }
            SchemaChangeEvent event = (SchemaChangeEvent) record.getData();
            writer.applySchemaChange(event);
        } else {
            if (prepareClose) {
                return;
            }
            writer.write((T) record.getData());
            sinkWriteCount.inc();
            sinkWriteQPS.markEvent();
            if (record.getData() instanceof SeaTunnelRow) {
                long size = ((SeaTunnelRow) record.getData()).getBytesSize();
                sinkWriteBytes.inc(size);
                sinkWriteBytesPerSeconds.markEvent(size);
                String tableId = ((SeaTunnelRow) record.getData()).getTableId();
                if (StringUtils.isNotBlank(tableId)) {
                    String tableName = getFullName(TablePath.of(tableId));
                    Counter sinkTableCounter = sinkWriteCountPerTable.get(tableName);
                    if (Objects.nonNull(sinkTableCounter)) {
                        sinkTableCounter.inc();
                    } else {
                        Counter counter =
                                metricsContext.counter(SINK_WRITE_COUNT + "#" + tableName);
                        counter.inc();
                        sinkWriteCountPerTable.put(tableName, counter);
                    }
                }
            }
        }
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
}

Task Execution

In the CoordinatorService, a physical plan is generated through the init method, and then the run method is called to actually start the task.

CoordinatorService {
    jobMaster.init(
            runningJobInfoIMap.get(jobId).getInitializationTimestamp(), false);
    ...
    jobMaster.run();
    
 }
 
 JobMaster { 
    public void run() {
        ... 
        physicalPlan.startJob();
        ...
    }
}    

In JobMaster, when starting the task, it calls the startJobmethod of PhysicalPlan.

public void startJob() {
    isRunning = true;
    log.info("{} state process is start", getJobFullName());
    stateProcess();
}

private synchronized void stateProcess() {
    if (!isRunning) {
        log.warn(String.format("%s state process is stopped", jobFullName));
        return;
    }
    switch (getJobStatus()) {
        case CREATED:
            updateJobState(JobStatus.SCHEDULED);
            break;
        case SCHEDULED:
            getPipelineList()
                    .forEach(
                            subPlan -> {
                                if (PipelineStatus.CREATED.equals(
                                        subPlan.getCurrPipelineStatus())) {
                                    subPlan.startSubPlanStateProcess();
                                }
                            });
            updateJobState(JobStatus.RUNNING);
            break;
        case RUNNING:
        case DOING_SAVEPOINT:
            break;
        case FAILING:
        case CANCELING:
            jobMaster.neverNeedRestore();
            getPipelineList().forEach(SubPlan::cancelPipeline);
            break;
        case FAILED:
        case CANCELED:
        case SAVEPOINT_DONE:
        case FINISHED:
            stopJobStateProcess();
            jobEndFuture.complete(new JobResult(getJobStatus(), errorBySubPlan.get()));
            return;
        default:
            throw new IllegalArgumentException("Unknown Job State: " + getJobStatus());
    }
}

In PhysicalPlan, starting a task updates the task’s status to SCHEDULED and then continues to call the start method of SubPlan.

public void startSubPlanStateProcess() {
    isRunning = true;
    log.info("{} state process is start", getPipelineFullName());
    stateProcess();
}

private synchronized void stateProcess() {
    if (!isRunning) {
        log.warn(String.format("%s state process not start", pipelineFullName));
        return;
    }
    PipelineStatus state = getCurrPipelineStatus();
    switch (state) {
        case CREATED:
            updatePipelineState(PipelineStatus.SCHEDULED);
            break;
        case SCHEDULED:
            try {
                ResourceUtils.applyResourceForPipeline(jobMaster.getResourceManager(), this);
                log.debug(
                        "slotProfiles: {}, PipelineLocation: {}",
                        slotProfiles,
                        this.getPipelineLocation());
                updatePipelineState(PipelineStatus.DEPLOYING);
            } catch (Exception e) {
                makePipelineFailing(e);
            }
            break;
        case DEPLOYING:
            coordinatorVertexList.forEach(
                    task -> {
                        if (task.getExecutionState().equals(ExecutionState.CREATED)) {
                            task.startPhysicalVertex();
                            task.makeTaskGroupDeploy();
                        }
                    });

            physicalVertexList.forEach(
                    task -> {
                        if (task.getExecutionState().equals(ExecutionState.CREATED)) {
                            task.startPhysicalVertex();
                            task.makeTaskGroupDeploy();
                        }
                    });
            updatePipelineState(PipelineStatus.RUNNING);
            break;
        case RUNNING:
            break;
        case FAILING:
        case CANCELING:
            coordinatorVertexList.forEach(
                    task -> {
                        task.startPhysicalVertex();
                        task.cancel();
                    });

            physicalVertexList.forEach(
                    task -> {
                        task.startPhysicalVertex();
                        task.cancel();
                    });
            break;
        case FAILED:
        case CANCELED:
            if (checkNeedRestore(state) && prepareRestorePipeline()) {
                jobMaster.releasePipelineResource(this);
                restorePipeline();
                return;
            }
            subPlanDone(state);
            stopSubPlanStateProcess();
            pipelineFuture.complete(
                    new PipelineExecutionState(pipelineId, state, errorByPhysicalVertex.get()));
            return;
        case FINISHED:
            subPlanDone(state);
            stopSubPlanStateProcess();
            pipelineFuture.complete(
                    new PipelineExecutionState(
                            pipelineId, getPipelineState(), errorByPhysicalVertex.get()));
            return;
        default:
            throw new IllegalArgumentException("Unknown Pipeline State: " + getPipelineState());
    }
}

In a SubPlan, resources are applied for all tasks. Resource application is done through the ResourceManager. During resource application, nodes are selected based on user-defined tags to ensure that tasks run on specific nodes, achieving resource isolation.

public static void applyResourceForPipeline(
        @NonNull ResourceManager resourceManager, @NonNull SubPlan subPlan) {
    Map<TaskGroupLocation, CompletableFuture<SlotProfile>> futures = new HashMap<>();
    Map<TaskGroupLocation, SlotProfile> slotProfiles = new HashMap<>();
    // TODO If there is no enough resources for tasks, we need add some wait profile
    subPlan.getCoordinatorVertexList()
            .forEach(
                    coordinator ->
                            futures.put(
                                    coordinator.getTaskGroupLocation(),
                                    applyResourceForTask(
                                            resourceManager, coordinator, subPlan.getTags())));

    subPlan.getPhysicalVertexList()
            .forEach(
                    task ->
                            futures.put(
                                    task.getTaskGroupLocation(),
                                    applyResourceForTask(
                                            resourceManager, task, subPlan.getTags())));

    futures.forEach(
            (key, value) -> {
                try {
                    slotProfiles.put(key, value == null ? null : value.join());
                } catch (CompletionException e) {
                    // do nothing
                }
            });
    // set it first, avoid can't get it when get resource not enough exception and need release
    // applied resource
    subPlan.getJobMaster().setOwnedSlotProfiles(subPlan.getPipelineLocation(), slotProfiles);
    if (futures.size() != slotProfiles.size()) {
        throw new NoEnoughResourceException();
    }
}

public static CompletableFuture<SlotProfile> applyResourceForTask(
        ResourceManager resourceManager, PhysicalVertex task, Map<String, String> tags) {
    // TODO custom resource size
    return resourceManager.applyResource(
            task.getTaskGroupLocation().getJobId(), new ResourceProfile(), tags);
}


public CompletableFuture<List<SlotProfile>> applyResources(
        long jobId, List<ResourceProfile> resourceProfile, Map<String, String> tagFilter)
        throws NoEnoughResourceException {
    waitingWorkerRegister();
    ConcurrentMap<Address, WorkerProfile> matchedWorker = filterWorkerByTag(tagFilter);
    if (matchedWorker.isEmpty()) {
        log.error("No matched worker with tag filter {}.", tagFilter);
        throw new NoEnoughResourceException();
    }
    return new ResourceRequestHandler(jobId, resourceProfile, matchedWorker, this)
            .request(tagFilter);
}

When all available nodes are obtained, the nodes are shuffled and a node with resources larger than the required resources is randomly selected. The node is then contacted, and a RequestSlotOperation is sent to it.

public Optional<WorkerProfile> preCheckWorkerResource(ResourceProfile r) {
    // Shuffle the order to ensure random selection of workers
    List<WorkerProfile> workerProfiles =
            Arrays.asList(registerWorker.values().toArray(new WorkerProfile[0]));
    Collections.shuffle(workerProfiles);
    // Check if there are still unassigned slots
    Optional<WorkerProfile> workerProfile =
            workerProfiles.stream()
                    .filter(
                            worker ->
                                    Arrays.stream(worker.getUnassignedSlots())
                                            .anyMatch(
                                                    slot ->
                                                            slot.getResourceProfile()
                                                                    .enoughThan(r)))
                    .findAny();

    if (!workerProfile.isPresent()) {
        // Check if there are still unassigned resources
        workerProfile =
                workerProfiles.stream()
                        .filter(WorkerProfile::isDynamicSlot)
                        .filter(worker -> worker.getUnassignedResource().enoughThan(r))
                        .findAny();
    }

    return workerProfile;
}

private CompletableFuture<SlotAndWorkerProfile> singleResourceRequestToMember(
        int i, ResourceProfile r, WorkerProfile workerProfile) {
    CompletableFuture<SlotAndWorkerProfile> future =
            resourceManager.sendToMember(
                    new RequestSlotOperation(jobId, r), workerProfile.getAddress());
    return future.whenComplete(
            withTryCatch(
                    LOGGER,
                    (slotAndWorkerProfile, error) -> {
                        if (error != null) {
                            throw new RuntimeException(error);
                        } else {
                            resourceManager.heartbeat(slotAndWorkerProfile.getWorkerProfile());
                            addSlotToCacheMap(i, slotAndWorkerProfile.getSlotProfile());
                        }
                    }));
}

When the node’s SlotService receives the requestSlotrequest, it updates its own information and returns it to the master node. If the resource request does not meet the expected result, a NoEnoughResourceException is thrown, indicating task failure. When resource allocation succeeds, task deployment begins with task.makeTaskGroupDeploy(), which sends the task to the worker node for execution.

TaskDeployState deployState =
        deploy(jobMaster.getOwnedSlotProfiles(taskGroupLocation));
        
        
public TaskDeployState deploy(@NonNull SlotProfile slotProfile) {
    try {
        if (slotProfile.getWorker().equals(nodeEngine.getThisAddress())) {
            return deployOnLocal(slotProfile);
        } else {
            return deployOnRemote(slotProfile);
        }
    } catch (Throwable th) {
        return TaskDeployState.failed(th);
    }
}


private TaskDeployState deployOnRemote(@Non

Null SlotProfile slotProfile) {
    return deployInternal(
            taskGroupImmutableInformation -> {
                try {
                    return (TaskDeployState)
                            NodeEngineUtil.sendOperationToMemberNode(
                                            nodeEngine,
                                            new DeployTaskOperation(
                                                    slotProfile,
                                                    nodeEngine
                                                            .getSerializationService()
                                                            .toData(
                                                                    taskGroupImmutableInformation)),
                                            slotProfile.getWorker())
                                    .get();
                } catch (Exception e) {
                    if (getExecutionState().isEndState()) {
                        log.warn(ExceptionUtils.getMessage(e));
                        log.warn(
                                String.format(
                                        "%s deploy error, but the state is already in end state %s, skip this error",
                                        getTaskFullName(), currExecutionState));
                        return TaskDeployState.success();
                    } else {
                        return TaskDeployState.failed(e);
                    }
                }
            });
}

Task Deployment

When deploying a task, the task information is sent to the node obtained during resource allocation:

public TaskDeployState deployTask(@NonNull Data taskImmutableInformation) {
    TaskGroupImmutableInformation taskImmutableInfo =
            nodeEngine.getSerializationService().toObject(taskImmutableInformation);
    return deployTask(taskImmutableInfo);
}

public TaskDeployState deployTask(@NonNull TaskGroupImmutableInformation taskImmutableInfo) {
    logger.info(
            String.format(
                    "received deploying task executionId [%s]",
                    taskImmutableInfo.getExecutionId()));
    TaskGroup taskGroup = null;
    try {
        Set<ConnectorJarIdentifier> connectorJarIdentifiers =
                taskImmutableInfo.getConnectorJarIdentifiers();
        Set<URL> jars = new HashSet<>();
        ClassLoader classLoader;
        if (!CollectionUtils.isEmpty(connectorJarIdentifiers)) {
            // Prioritize obtaining the jar package file required for the current task execution
            // from the local, if it does not exist locally, it will be downloaded from the
            // master node.
            jars =
                    serverConnectorPackageClient.getConnectorJarFromLocal(
                            connectorJarIdentifiers);
        } else if (!CollectionUtils.isEmpty(taskImmutableInfo.getJars())) {
            jars = taskImmutableInfo.getJars();
        }
        classLoader =
                classLoaderService.getClassLoader(
                        taskImmutableInfo.getJobId(), Lists.newArrayList(jars));
        if (jars.isEmpty()) {
            taskGroup =
                    nodeEngine.getSerializationService().toObject(taskImmutableInfo.getGroup());
        } else {
            taskGroup =
                    CustomClassLoadedObject.deserializeWithCustomClassLoader(
                            nodeEngine.getSerializationService(),
                            classLoader,
                            taskImmutableInfo.getGroup());
        }

        logger.info(
                String.format(
                        "deploying task %s, executionId [%s]",
                        taskGroup.getTaskGroupLocation(), taskImmutableInfo.getExecutionId()));

        synchronized (this) {
            if (executionContexts.containsKey(taskGroup.getTaskGroupLocation())) {
                throw new RuntimeException(
                        String.format(
                                "TaskGroupLocation: %s already exists",
                                taskGroup.getTaskGroupLocation()));
            }
            deployLocalTask(taskGroup, classLoader, jars);
            return TaskDeployState.success();
        }
    } catch (Throwable t) {
        logger.severe(
                String.format(
                        "TaskGroupID : %s  deploy error with Exception: %s",
                        taskGroup != null && taskGroup.getTaskGroupLocation() != null
                                ? taskGroup.getTaskGroupLocation().toString()
                                : "taskGroupLocation is null",
                        ExceptionUtils.getMessage(t)));
        return TaskDeployState.failed(t);
    }
}

When a worker node receives the task, it calls the deployTaskmethod of TaskExecutionService to submit the task to the thread pool created at startup.

When the task is submitted to the thread pool:

private final class BlockingWorker implements Runnable {

    private final TaskTracker tracker;
    private final CountDownLatch startedLatch;

    private BlockingWorker(TaskTracker tracker, CountDownLatch startedLatch) {
        this.tracker = tracker;
        this.startedLatch = startedLatch;
    }

    @Override
    public void run() {
        TaskExecutionService.TaskGroupExecutionTracker taskGroupExecutionTracker =
                tracker.taskGroupExecutionTracker;
        ClassLoader classLoader =
                executionContexts
                        .get(taskGroupExecutionTracker.taskGroup.getTaskGroupLocation())
                        .getClassLoader();
        ClassLoader oldClassLoader = Thread.currentThread().getContextClassLoader();
        Thread.currentThread().setContextClassLoader(classLoader);
        final Task t = tracker.task;
        ProgressState result = null;
        try {
            startedLatch.countDown();
            t.init();
            do {
                result = t.call();
            } while (!result.isDone()
                    && isRunning
                    && !taskGroupExecutionTracker.executionCompletedExceptionally());
        ...
    }
}

The Task.call method is invoked, and thus data synchronization tasks are truly executed.

ClassLoader

In SeaTunnel, the default ClassLoader has been modified to prioritize subclasses to avoid conflicts with other component classes:

@Override
public synchronized ClassLoader getClassLoader(long jobId, Collection<URL> jars) {
    log.debug("Get classloader for job {} with jars {}", jobId, jars);
    if (cacheMode) {
        // with cache mode, all jobs share the same classloader if the jars are the same
        jobId = 1L;
    }
    if (!classLoaderCache.containsKey(jobId)) {
        classLoaderCache.put(jobId, new ConcurrentHashMap<>());
        classLoaderReferenceCount.put(jobId, new ConcurrentHashMap<>());
    }
    Map<String, ClassLoader> classLoaderMap = classLoaderCache.get(jobId);
    String key = covertJarsToKey(jars);
    if (classLoaderMap.containsKey(key)) {
        classLoaderReferenceCount.get(jobId).get(key).incrementAndGet();
        return classLoaderMap.get(key);
    } else {
        ClassLoader classLoader = new SeaTunnelChildFirstClassLoader(jars);
        log.info("Create classloader for job {} with jars {}", jobId, jars);
        classLoaderMap.put(key, classLoader);
        classLoaderReferenceCount.get(jobId).put(key, new AtomicInteger(1));
        return classLoader;
    }
}

REST API Task Submission

SeaTunnel also supports task submission via REST API. To enable this feature, add the following configuration to the hazelcast.yaml file:

network:
    rest-api:
      enabled: true
      endpoint-groups:
        CLUSTER_WRITE:
          enabled: true
        DATA:
          enabled: true

After adding this configuration, the Hazelcast node will be able to receive HTTP requests.

Using REST API for task submission, the client becomes the node sending the HTTP request, and the server is the SeaTunnel cluster.

When the server receives the request, it will call the appropriate method based on the request URI:

public void handle(HttpPostCommand httpPostCommand) {
    String uri = httpPostCommand.getURI();
    try {
        if (uri.startsWith(SUBMIT_JOB_URL)) {
            handleSubmitJob(httpPostCommand, uri);
        } else if (uri.startsWith(STOP_JOB_URL)) {
            handleStopJob(httpPostCommand, uri);
        } else if (uri.startsWith(ENCRYPT_CONFIG)) {
            handleEncrypt(httpPostCommand);
        } else {
            original.handle(httpPostCommand);
        }
    } catch (IllegalArgumentException e) {
        prepareResponse(SC_400, httpPostCommand, exceptionResponse(e));
    } catch (Throwable e) {
        logger.warning("An error occurred while handling request " + httpPostCommand, e);
        prepareResponse(SC_500, httpPostCommand, exceptionResponse(e));
    }

    this.textCommandService.sendResponse(httpPostCommand);
}

The method to handle the job submission request is determined by the path:

private void handleSubmitJob(HttpPostCommand httpPostCommand, String uri)
        throws IllegalArgumentException {
    Map<String, String> requestParams = new HashMap<>();
    RestUtil.buildRequestParams(requestParams, uri);
    Config config = RestUtil.buildConfig(requestHandle(httpPostCommand), false);
    ReadonlyConfig envOptions = ReadonlyConfig.fromConfig(config.getConfig("env"));
    String jobName = envOptions.get(EnvCommonOptions.JOB_NAME);

    JobConfig jobConfig = new JobConfig();
    jobConfig.setName(
            StringUtils.isEmpty(requestParams.get(RestConstant.JOB_NAME))
                    ? jobName
                    : requestParams.get(RestConstant.JOB_NAME));

    boolean startWithSavePoint =
            Boolean.parseBoolean(requestParams.get(RestConstant.IS_START_WITH_SAVE_POINT));
    String jobIdStr = requestParams.get(RestConstant.JOB_ID);
    Long finalJobId = StringUtils.isNotBlank(jobIdStr) ? Long.parseLong(jobIdStr) : null;
    SeaTunnelServer seaTunnelServer = getSeaTunnelServer();
    RestJobExecutionEnvironment restJobExecutionEnvironment =
            new RestJobExecutionEnvironment(
                    seaTunnelServer,
                    jobConfig,
                    config,
                    textCommandService.getNode(),
                    startWithSavePoint,
                    finalJobId);
    JobImmutableInformation jobImmutableInformation = restJobExecutionEnvironment.build();
    long jobId = jobImmutableInformation.getJobId();
    if (!seaTunnelServer.isMasterNode()) {

        NodeEngineUtil.sendOperationToMasterNode(
                        getNode().nodeEngine,
                        new SubmitJobOperation(
                                jobId,
                                getNode().nodeEngine.toData(jobImmutableInformation),
                                jobImmutableInformation.isStartWithSavePoint()))
                .join();

    } else {
        submitJob(seaTunnelServer, jobImmutableInformation, jobConfig);
    }

    this.prepareResponse(
            httpPostCommand,
            new JsonObject()
                    .add(RestConstant.JOB_ID, String.valueOf(jobId))
                    .add(RestConstant.JOB_NAME, jobConfig.getName()));
}

The logic here is similar to the client-side. Since there is no local mode, a local service does not need to be created.

On the client side, the ClientJobExecutionEnvironment class is used for logical plan parsing, and similarly, the RestJobExecutionEnvironment class performs the same tasks.

When submitting a task, if the current node is not the master node, it will send information to the master node. The master node will handle task submission similarly to how it handles commands from the command-line client.

If the current node is the master node, it will directly call the submitJob method, which invokes the coordinatorService.submitJob method for subsequent processing:

private void

 submitJob(
        SeaTunnelServer seaTunnelServer,
        JobImmutableInformation jobImmutableInformation,
        JobConfig jobConfig) {
    CoordinatorService coordinatorService = seaTunnelServer.getCoordinatorService();
    Data data =
            textCommandService
                    .getNode()
                    .nodeEngine
                    .getSerializationService()
                    .toData(jobImmutableInformation);
    PassiveCompletableFuture<Void> voidPassiveCompletableFuture =
            coordinatorService.submitJob(
                    Long.parseLong(jobConfig.getJobContext().getJobId()),
                    data,
                    jobImmutableInformation.isStartWithSavePoint());
    voidPassiveCompletableFuture.join();
}

Both submission methods involve parsing the logical plan on the submission side and then sending the information to the master node. The master node then performs the physical plan parsing, allocation, and other operations.