Scheduler Maintaining Dedicated Server Connections

I agree. I think we are choosing between a DIS message vs a write on a direct connection and there is only aesthetics to guide if one is better than the other. Both approaches involve sending a “end of cycle” message to the server. Personally, i don’t see any difference - except of course that everything should have been DIS anyway - mixing two protocols can be confusing for everybody

Quite similar in the overall approach i was thinking about. Perhaps we should fork that discussion out onto its own thread?

Thanks for reviewing and suggesting your thoughts @bhroam. Please find my understanding towards the same.

After discussions within our team, We don’t want to call this as a new IFL rather it is just a new Batch request PBS_BATCH_SchedCycleEnd. I have modified the design document accordingly to reflect the same.

Yes @bhroam . This is what exactly we are planning to do and hence came up with the following requirement which we are going to take this up immediately after the current enhancement.

“Scheduler should decide the priorities of scheduling commands instead of Server”.

Doing couple of things that are proposed in the current enhancement(i.e. making persistent connections between Server/s and Scheduler/s + Shifting responsibilities of primary and secondary connections) helps us to achieve the above requirement.

Going forward multiple servers talk to Scheduler in parallel and Scheduler should be able to decide the priorities of all commands coming from a single/multiple servers correctly and act accordingly. So if we dedicate a connection just to read, collate, prioritise, remove duplicate commands etc. we can very well have a thread to accomplish the same which helps us to achieve better scalability also.

And yes when we are implementing this 2nd enhancement(Scheduler should decide the priorities of scheduling commands instead of Server”) along with the current proposal put together, Server should send all scheduling commands to Scheduler(So In this case I feel we might not need PBS_BATCH_SchedCycleEnd). Scheduler upon receiving them puts in a queue in an ordered fashion after it decides the priorities. It also removes the duplicate commands if any from multiple servers. So we believe we are in the same page as far as this requirement is concerned.

It looks to me that this is a drastic design change where Scheduler directly talks to database and looks like the impact is huge with respect to changing both Server and Scheduler code. Just curious to know whether going in this direction is already decided and further details etc. about it. Don’t know when this happens. But when it happens we for sure need to have persistent connections between Server/s and Scheduler/s. So this part of enhancement is anyhow needed.

As we are planning to remove DIS library and replacing it completely with FlatBuffers library going forward we feel it is better not to do any major changes in DIS library which we no longer use in future.

True, but keeping everything in db up to date all the time might be expensive and something that we might want to avoid. As an example: to update resources_assigned information for a job in postgres, we’ll have to lock node, server and queue tables for each job that’s run, which might be very expensive, especially when there are multiple servers. So, it might be better to discuss Suresh’s feature keeping in mind the existing architecture.

@suresht So is it safe to assume that using flatbuffers (or anything other than DIS) will get us away from the problem you are facing with DIS today? If so, then we shouldn’t do anything special for sending the scheduler related commands.
Your document introduces a new Batch request (or IFL call) but it does not clearly talk about how it will be used in multi-server scenario. Do you expect the scheduler to send this new batch request to all the connected servers when a cycle is over? How will all servers know that the scheduler is in the middle of running a cycle?

Don’t all IFL calls have batch requests? Isn’t this just another IFL call with a different name?

I guess I’m a little confused about your design document. Are the two parts of the document two different phases of work? Is phase 1 implementing persistent connections and the new IFL like call and phase 2 to remove the IFL like call and send all the commands to the scheduler? If that is the case, we should skip directly to phase 2 and not implement code we will immediately stop using and remove.

This is a drastic change to the scheduler. It’s where we want to go in the future. We haven’t made strides towards it because all of the data we need isn’t stored in the database. Nothing you are talking about in your design really affects this. We’ll still need to get scheduler event commands. We will still be using IFL to communicate back to the server(e.g. running jobs). We just won’t be querying the universe using IFL any more. The second part of your design will just help us more.

Talking about this only in the current architecture is short sighted. We might design ourselves into a corner in a way which will make the future of the scheduler much harder to implement. If your team has blinders on and only sees the server, then you might design in a way that makes it harder for other parts of the product.

Bhroam

Yes @arungrover we believe that FlatBuffers will relieve us from the problem we are facing.

We in fact are planning to do this in phases.
Phase-1: is only meant for single server but we keep in mind of the future i.e. multiple servers.

  • Make persistent connections between Server and Scheduler/s
  • Shifting responsibilities of primary and secondary connections
  • New batch request to send end of cycle indication
    Since the current design only talks about phase-1 I did not explain regarding multiple servers which is phase-2.
    Phase-2: Extend this for multiple servers. In this case end of cycle message will be only sent to the server which triggered the scheduling cycle. Other servers can still send a single scheduling command(each server has global variable to mark start of cycle so they can send only one command until they get a end of cycle reply) but this will be left remaining in the corresponding recv buffer until scheduler finishes the current cycle.
    Phase-3. Implement the following requirement which I have discussed in detail earlier.
    “Scheduler should decide the priorities of scheduling commands instead of Server”.
    We thought of adding all these in incremental fashion i.e. why these phases. This helps us in the following.
  1. Not to break any existing functionality but at the same introduce new changes one by one
  2. Testing is thorough and flexible.

Of course as you might have observed there is another discussion going on like if we club phase-1 and phase-3 then we might not need the new batch request also. But don’t know is it good to put these two phases together or do it in phase wise manner so that Multi-Server feature keep moving on. This way other features that are coming up do not have a dependency till we complete phase-3.

I didn’t mean to suggest that we should only think about the server for now and deal with the scheduler later, in fact how to handle scheduler’s stat is one of the major topics of discussion in the team :slight_smile: I just meant to say that for querying the universe for scheduler, there might be a better approach than sched reading from db directly, i think it’s too soon to know …

@suresht - i think the way the proposal is right now, is a bit confusing. I suggest we clean it up by cleanly depicting and doing the following:

  1. Separate out the part about “shifting connection responsibilities” into a future proposal (I see no immediate need for this change)

  2. Cleanly depict the two actual needs we have:
    a) We need to have persistent connections - why? because in multiserver, the scheduler might want to run a job via a server instance different from the one that initiated a sched cycle. Thus keeping the connections persistent will save a lot of performance and connection handling.

    b) The connection end was earlier used as a indicator of cycle-end. Thus, with persistent connections, we need to introduce some other way of indicating end-of-cycle. We propose to send a simple message to the server (that invoked the sched cycle) to indicate the same.

Problems with idea of not having a cycle end message:

  • A server instance will not know whether a sched cycle is in progress and thus has to “nudge” the scheduler for every event that happens at its end, like a new job or a job finishing. In a HTC situation this might mean the server sends a SCHD_JOB command a few thousands times per second. The pipe between the server and the sched could get full…on the other hand the end of cycle message from the sched to server does not do any harm. In fact, it avoids the need for the server to send multiple sched_job commands back to back…

Of course, if we could do away with any sched_job message altogether that would be ideal, but that calls for a global flag, and we are not ready yet to introduce zookeeper type things into the code base

There is another idea (radical) that we can possibly try.
The whole problem here is to have persistent connection because objects are sharded and owned by different servers.
What if we use a message broker between server and scheduler? and use that for communicating the transient changes from server to scheduler or scheduler to server.

Here is what I think can happen -

  • scheduler comes up for the first time and queries everything from server.
  • Then scheduler keeps that connection open and subscribes to requests from servers. These requests could be transient changes like node state changes, job submit requests, resv submit requests, job delete etc. This way all servers should be able to publish messages about the objects they own and let scheduler know about the changes.
  • Scheduler will use the open connection to communicate with server only when it has to perform an action on objects (like run a job, confirm a reservation, preempt jobs) etc. This is because it needs to know the action was successful. All this communication will happen using existing IFL calls.
  • Scheduler can send job updates (comment, estimated, accrue-type) to server on a message broker channel and all servers can subscribe to it.
  • Another benefit is that scheduler does not have to tell servers that it is running cycle or not. Internally in scheduler after running a cycle it can just go back read messages from broker, reconcile its cache and run a cycle again (if needed).

Let me know what you think.

Hey Arun,

interesting idea, but i have a lot of questions, i apologize in advance :slight_smile:

  • just trying to understand how you envision this working. If scheduler subscribes for job submit requests, will server(s) send it every job as they receive it immediately? or will they wait for a certain amount of time and then send a batch of jobs out?
  • Considering that some sites can have a very high rate of job submission, will the recv buffer on sched side be able to handle so many job delta messages from the servers?
  • What will the delta look like when jobs/nodes/queues get deleted?
  • If a large number of deltas come in, won’t the sched spend considerable time “applying” each delta on its cached state?
  • What happens when one of the servers goes down?

Can you please provide more details about how this will work? is a message broker a new daemon ? is it connected to all servers? if yes, then how is that better than sched being connected directly to all servers? Also, why are only job updates/pbs_alterjob being sent via message broker, why not the other IFL calls?

Hey Ravi,

I don’t see any problem sending updates as and when they are received by the server. Even if we make server to throttle it (I don’t see why) I guess that would work too. When scheduler notices that new jobs have come in (it looks for relevant events like job submission, job end etc) It will just go ahead and run a cycle with the cache of the universe that it has. In the meantime if there are more updates coming in, scheduler will read them after the cycle ends and decide to run the next cycle based on the updates.

That is true, there could be an overwhelming amount of messages and it may not be able to get all the messages but we can make it either accept certain number of cycle triggering events (probably have that configurable) and then trigger a cycle or decide to run a cycle when there is nothing else to read.

I guess delete is just another state transformation for jobs. In case of node/queue we will have to make it so that scheduler understand that this is a delete queue/node update.

You are right, a large amount of delta can come in, but like I said before we can keep it configurable in scheduler when to decide that it has read enough and find out whether it needs to run a cycle with what it had.
We can also optimize in server and introduce a new flag that we can apply to attributes which scheduler absolutely needs (just like there is a flag to send attributes to mom). Server does not have to send all update but send only those updates which are relevant.

I will have to spend more time and think about the details of it working, this was just a thought that occurred to me today. A message broker is definitely a new daemon, there is already existing technology that we can take advantage of (like rabbitmq or kafka etc.).

Some benefits of using a broker over scheduler connecting to the server could be

  • It lets scheduler get the most recent data from all connected servers (where servers could be sending updates from their caches)
  • Servers can themselves subscribe to updates and reconcile caches based on updates sent by other servers.
  • This will also give us a platform for multiple schedulers to talk to each other (maybe in future to share load/resources).
  • This will also get rid of the server being aware of scheduling cycle, it does not have to care about cycles and just push updates.
  • Scheduler today does not do much after the cycle ends, this way it will utilize that time to build its universe and start a cycle on its own.

Now all this could just be me smoking the good stuff and it is possible all of this can not happen, but I just wanted to bring it out anyways.

Thanks for clarifying my concerns Arun, I still have some questions, but I would like to focus on the pros and cons of your suggestion first:

Maybe I’m not understanding it correctly, but how will this be any more recent than the data that we get by doing stat calls? We will trigger a sched cycle when a new piece of data comes in and work off of that throughout the cycle, is that really much different than doing stat at the beginning of a cycle and working off of that data throughout the cycle?

Don’t you think it would be faster to have them share such data on a distributed cache instead of sending messages via a separate daemon, which has to read the messages and route them etc.?

Again, I think it might be faster to share data via a distributed cache than to go through a separate daemon.

So, this isn’t really that big of a problem. We’ve been talking about eliminating the need to send an end of cycle message primarily because it will add an extra message that’s sent between sched and server. Your proposal will cause many many more messages to be exchanged between the server and sched, so sending the end of cycle message might actually be much cheaper than pushing all updates from server to sched.

For sites where scheduler has enough time between sched cycles to do anything significant, I don’t think we need to worry much about which approach we take :slight_smile:

I think it will be more recent (maybe efficient too) because even if scheduler uses stat calls, it will be connecting to one server to send the updates. Now I assume (and please correct me if I am wrong) to send the most recent data, this server will have to update its own cache by reading from the database. Reading from a message queue would be more efficient because server does not have to update its cache to respond to a stat call.

Sure, it will be nice to have a distributed cache but does a cache comes without any overhead? Wouldn’t we need synchronization mechanisms to make sure cache is sane and wouldn’t that be more time consuming for the server? I don’t understand when you say that a daemon has to read the messages and route them, can you please elaborate? If you are talking about a message broker then I don’t think a broker reads the message to route them. Some of the brokers support millions of messages per second throughput too.

I am not sure how a cache is useful here. It is more about requests being sent than a cache being shared between schedulers. Again, a cache IMO does not come for free, we would need synchronization and this cache would not be the same as the cache scheduler/server would share, so someone has to manage (create/delete) the cache as well. It would be nice if you can share a writeup about caching, maybe it is a better way to deal with this I am not able to understand it well.

I thought it was a problem that we wanted to address. There is a post above which says “This would be a perfect opportunity to ask this question: Why do we need to demarcate start and end of scheduling cycle?
I think it is more efficient than reading in the whole universe by a stat call. In a scenario where the workload is huge and a few jobs end in a second, with what we have today, Scheduler will have to read the whole universe to update resources released by a few jobs. In what I am suggesting, it will read only things that have changed and update its internal cache.

It is not only about doing something within cycles, it is also about leaving it on the scheduler to decide when to run a cycle. Sites where things do not change at a rapid pace, the scheduler can decide to wait for more events to occur before it decides to start cycle.

In any case, I don’t think this Idea of mine is getting much traction from folks and I do not want to digress from the current requirements Suresh is working on :slight_smile:

The use cases the scheduler needs are the following:

  1. Ability to query the universe (can happen now)
  2. The ability to get the delta of the universe between now and the last time we queried.

This can happen in several different ways. I jumped on the database as the way to do it because I thought that each server in the multi-server cluster does exactly that. It queries everything it doesn’t have, and then updates that cache by getting the delta. I thought we could do the same thing. It seems this is causing trouble, so it isn’t the only way to accomplish our use cases.

@agrawalravi90 mentioned possibly some sort of stat daemon between the scheduler and the server. As long as this stat daemon has the ability to give us deltas, then we’re fine.

I also like the idea of this data broker. From what @arungrover said, they can easily keep up with what we need (brokering millions of pieces of data per second). We can first use the stat IFL calls to query our universe, and then the server can push the delta to the data broker and we can read it from there.

Does anyone else have an idea? In the world of big data, is there some existing technology that we can use?

Bhroam

Sched can send data to one of n servers via the message broker, how will the message broker know which server to send the data? Similarly, each server might have to send a message to one of n schedulers. So, how can the message broker route messages without parsing their address?

I’d like to separate 2 things from your proposal, one is sending incremental updates from server to sched, and other is using a message broker to communicate between server and sched. For speed, even if we do send incremental updates from all servers to sched, without each server having to update their cache from db, won’t this be faster over dedicated, direct connection between the two instead of sending it via another daemon? I find it hard to believe that a message broker can be faster than direct connections between the entities talking.

I was talking about using a distributed cache based db like Redis or Aerospike, so the server wont spend any time maintaining it, it will just write to it/read from it. Anyways, we need to first define the use case for sharing data between servers before comparing different approaches, so let’s drop this for now.

I agree, but I think that this is just as achievable over dedicated connections as it will be via a message broker. Once we design how the server can send "diff"s of information, the transport mechanism will be secondary, right?