package org.thoughtcrime.securesms.jobmanager; import android.app.Application; import android.content.Intent; import android.os.Build; import androidx.annotation.NonNull; import org.session.libsession.messaging.utilities.Data; import org.session.libsession.utilities.Debouncer; import org.session.libsignal.utilities.Log; import org.thoughtcrime.securesms.jobmanager.impl.DefaultExecutorFactory; import org.thoughtcrime.securesms.jobmanager.impl.JsonDataSerializer; import org.thoughtcrime.securesms.jobmanager.persistence.JobStorage; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; /** * Allows the scheduling of durable jobs that will be run as early as possible. */ public class JobManager implements ConstraintObserver.Notifier { private static final String TAG = JobManager.class.getSimpleName(); private final ExecutorService executor; private final JobController jobController; private final JobRunner[] jobRunners; private final Set emptyQueueListeners = new CopyOnWriteArraySet<>(); public JobManager(@NonNull Application application, @NonNull Configuration configuration) { this.executor = configuration.getExecutorFactory().newSingleThreadExecutor("JobManager"); this.jobRunners = new JobRunner[configuration.getJobThreadCount()]; this.jobController = new JobController(application, configuration.getJobStorage(), configuration.getJobInstantiator(), configuration.getConstraintFactories(), configuration.getDataSerializer(), Build.VERSION.SDK_INT < 26 ? new AlarmManagerScheduler(application) : new CompositeScheduler(new InAppScheduler(this), new JobSchedulerScheduler(application)), new Debouncer(500), this::onEmptyQueue); executor.execute(() -> { jobController.init(); for (int i = 0; i < jobRunners.length; i++) { jobRunners[i] = new JobRunner(application, i + 1, jobController); jobRunners[i].start(); } for (ConstraintObserver constraintObserver : configuration.getConstraintObservers()) { constraintObserver.register(this); } if (Build.VERSION.SDK_INT < 26) { application.startService(new Intent(application, KeepAliveService.class)); } wakeUp(); }); } /** * Enqueues a single job to be run. */ public void add(@NonNull Job job) { new Chain(this, Collections.singletonList(job)).enqueue(); } /** * Begins the creation of a job chain with a single job. * @see Chain */ public Chain startChain(@NonNull Job job) { return new Chain(this, Collections.singletonList(job)); } /** * Begins the creation of a job chain with a set of jobs that can be run in parallel. * @see Chain */ public Chain startChain(@NonNull List jobs) { return new Chain(this, jobs); } /** * Retrieves a string representing the state of the job queue. Intended for debugging. */ public @NonNull String getDebugInfo() { Future result = executor.submit(jobController::getDebugInfo); try { return result.get(); } catch (ExecutionException | InterruptedException e) { Log.w(TAG, "Failed to retrieve Job info.", e); return "Failed to retrieve Job info."; } } /** * Adds a listener to that will be notified when the job queue has been drained. */ void addOnEmptyQueueListener(@NonNull EmptyQueueListener listener) { executor.execute(() -> { emptyQueueListeners.add(listener); }); } /** * Removes a listener that was added via {@link #addOnEmptyQueueListener(EmptyQueueListener)}. */ void removeOnEmptyQueueListener(@NonNull EmptyQueueListener listener) { executor.execute(() -> { emptyQueueListeners.remove(listener); }); } @Override public void onConstraintMet(@NonNull String reason) { Log.i(TAG, "onConstraintMet(" + reason + ")"); wakeUp(); } /** * Pokes the system to take another pass at the job queue. */ void wakeUp() { executor.execute(jobController::wakeUp); } private void enqueueChain(@NonNull Chain chain) { executor.execute(() -> { jobController.submitNewJobChain(chain.getJobListChain()); wakeUp(); }); } private void onEmptyQueue() { executor.execute(() -> { for (EmptyQueueListener listener : emptyQueueListeners) { listener.onQueueEmpty(); } }); } public interface EmptyQueueListener { void onQueueEmpty(); } /** * Allows enqueuing work that depends on each other. Jobs that appear later in the chain will * only run after all jobs earlier in the chain have been completed. If a job fails, all jobs * that occur later in the chain will also be failed. */ public static class Chain { private final JobManager jobManager; private final List> jobs; private Chain(@NonNull JobManager jobManager, @NonNull List jobs) { this.jobManager = jobManager; this.jobs = new LinkedList<>(); this.jobs.add(new ArrayList<>(jobs)); } public Chain then(@NonNull Job job) { return then(Collections.singletonList(job)); } public Chain then(@NonNull List jobs) { if (!jobs.isEmpty()) { this.jobs.add(new ArrayList<>(jobs)); } return this; } public void enqueue() { jobManager.enqueueChain(this); } private List> getJobListChain() { return jobs; } } public static class Configuration { private final ExecutorFactory executorFactory; private final int jobThreadCount; private final JobInstantiator jobInstantiator; private final ConstraintInstantiator constraintInstantiator; private final List constraintObservers; private final Data.Serializer dataSerializer; private final JobStorage jobStorage; private Configuration(int jobThreadCount, @NonNull ExecutorFactory executorFactory, @NonNull JobInstantiator jobInstantiator, @NonNull ConstraintInstantiator constraintInstantiator, @NonNull List constraintObservers, @NonNull Data.Serializer dataSerializer, @NonNull JobStorage jobStorage) { this.executorFactory = executorFactory; this.jobThreadCount = jobThreadCount; this.jobInstantiator = jobInstantiator; this.constraintInstantiator = constraintInstantiator; this.constraintObservers = constraintObservers; this.dataSerializer = dataSerializer; this.jobStorage = jobStorage; } int getJobThreadCount() { return jobThreadCount; } @NonNull ExecutorFactory getExecutorFactory() { return executorFactory; } @NonNull JobInstantiator getJobInstantiator() { return jobInstantiator; } @NonNull ConstraintInstantiator getConstraintFactories() { return constraintInstantiator; } @NonNull List getConstraintObservers() { return constraintObservers; } @NonNull Data.Serializer getDataSerializer() { return dataSerializer; } @NonNull JobStorage getJobStorage() { return jobStorage; } public static class Builder { private ExecutorFactory executorFactory = new DefaultExecutorFactory(); private int jobThreadCount = 1; private Map jobFactories = new HashMap<>(); private Map constraintFactories = new HashMap<>(); private List constraintObservers = new ArrayList<>(); private Data.Serializer dataSerializer = new JsonDataSerializer(); private JobStorage jobStorage = null; public @NonNull Builder setJobThreadCount(int jobThreadCount) { this.jobThreadCount = jobThreadCount; return this; } public @NonNull Builder setExecutorFactory(@NonNull ExecutorFactory executorFactory) { this.executorFactory = executorFactory; return this; } public @NonNull Builder setJobFactories(@NonNull Map jobFactories) { this.jobFactories = jobFactories; return this; } public @NonNull Builder setConstraintFactories(@NonNull Map constraintFactories) { this.constraintFactories = constraintFactories; return this; } public @NonNull Builder setConstraintObservers(@NonNull List constraintObservers) { this.constraintObservers = constraintObservers; return this; } public @NonNull Builder setDataSerializer(@NonNull Data.Serializer dataSerializer) { this.dataSerializer = dataSerializer; return this; } public @NonNull Builder setJobStorage(@NonNull JobStorage jobStorage) { this.jobStorage = jobStorage; return this; } public @NonNull Configuration build() { return new Configuration(jobThreadCount, executorFactory, new JobInstantiator(jobFactories), new ConstraintInstantiator(constraintFactories), new ArrayList<>(constraintObservers), dataSerializer, jobStorage); } } } }