fix: enqueue assets in batches for uploads

This commit is contained in:
shenlong-tanwen
2025-10-23 22:51:21 +05:30
parent 9b58d5663a
commit fe40aa5540
5 changed files with 98 additions and 21 deletions

View File

@@ -81,7 +81,7 @@ class DriftBackupRepository extends DriftDatabaseRepository {
);
}
Future<List<LocalAsset>> getCandidates(String userId, {bool onlyHashed = true}) async {
Future<List<LocalAsset>> getCandidates(String userId, {bool onlyHashed = true, int? limit}) async {
final selectedAlbumIds = _db.localAlbumEntity.selectOnly(distinct: true)
..addColumns([_db.localAlbumEntity.id])
..where(_db.localAlbumEntity.backupSelection.equalsValue(BackupSelection.selected));
@@ -112,6 +112,10 @@ class DriftBackupRepository extends DriftDatabaseRepository {
query.where((lae) => lae.checksum.isNotNull());
}
if (limit != null) {
query.limit(limit);
}
return query.map((localAsset) => localAsset.toDto()).get();
}
}

View File

@@ -25,6 +25,7 @@ import 'package:immich_mobile/providers/asset_viewer/share_intent_upload.provide
import 'package:immich_mobile/providers/db.provider.dart';
import 'package:immich_mobile/providers/infrastructure/db.provider.dart';
import 'package:immich_mobile/providers/infrastructure/platform.provider.dart';
import 'package:immich_mobile/providers/infrastructure/upload_timer.provider.dart';
import 'package:immich_mobile/providers/locale_provider.dart';
import 'package:immich_mobile/providers/routes.provider.dart';
import 'package:immich_mobile/providers/theme.provider.dart';
@@ -211,6 +212,8 @@ class ImmichAppState extends ConsumerState<ImmichApp> with WidgetsBindingObserve
WidgetsBinding.instance.addPostFrameCallback((_) {
// needs to be delayed so that EasyLocalization is working
if (Store.isBetaTimelineEnabled) {
// Start upload timer
ref.read(uploadTimerProvider.notifier).start();
ref.read(backgroundServiceProvider).disableService();
ref.read(backgroundWorkerFgServiceProvider).enable();
if (Platform.isAndroid) {

View File

@@ -16,6 +16,7 @@ import 'package:immich_mobile/providers/backup/ios_background_settings.provider.
import 'package:immich_mobile/providers/backup/manual_upload.provider.dart';
import 'package:immich_mobile/providers/gallery_permission.provider.dart';
import 'package:immich_mobile/providers/infrastructure/platform.provider.dart';
import 'package:immich_mobile/providers/infrastructure/upload_timer.provider.dart';
import 'package:immich_mobile/providers/memory.provider.dart';
import 'package:immich_mobile/providers/notification_permission.provider.dart';
import 'package:immich_mobile/providers/server_info.provider.dart';
@@ -139,6 +140,7 @@ class AppLifeCycleNotifier extends StateNotifier<AppLifeCycleEnum> {
Future<void> _handleBetaTimelineResume() async {
_ref.read(backupProvider.notifier).cancelBackup();
_ref.read(uploadTimerProvider.notifier).start();
unawaited(_ref.read(backgroundWorkerLockServiceProvider).lock());
// Give isolates time to complete any ongoing database transactions
@@ -216,6 +218,7 @@ class AppLifeCycleNotifier extends StateNotifier<AppLifeCycleEnum> {
try {
if (Store.isBetaTimelineEnabled) {
_ref.read(uploadTimerProvider.notifier).stop();
unawaited(_ref.read(backgroundWorkerLockServiceProvider).unlock());
}
await _performPause();
@@ -250,6 +253,7 @@ class AppLifeCycleNotifier extends StateNotifier<AppLifeCycleEnum> {
state = AppLifeCycleEnum.detached;
if (Store.isBetaTimelineEnabled) {
_ref.read(uploadTimerProvider.notifier).stop();
unawaited(_ref.read(backgroundWorkerLockServiceProvider).unlock());
}

View File

@@ -0,0 +1,76 @@
import 'dart:async';
import 'package:background_downloader/background_downloader.dart';
import 'package:immich_mobile/constants/constants.dart';
import 'package:immich_mobile/providers/app_settings.provider.dart';
import 'package:immich_mobile/providers/backup/drift_backup.provider.dart';
import 'package:immich_mobile/providers/user.provider.dart';
import 'package:immich_mobile/services/app_settings.service.dart';
import 'package:logging/logging.dart';
import 'package:riverpod_annotation/riverpod_annotation.dart';
class UploadTimerNotifier extends Notifier<bool> {
Timer? _timer;
final _timerLogger = Logger('UploadTimer');
static const _refreshDuration = Duration(seconds: 10);
void start() {
if (state) {
return;
}
state = true;
_schedule();
}
void stop() {
if (!state) {
return;
}
_timer?.cancel();
_timer = null;
state = false;
}
void _schedule() {
_timer?.cancel();
_timer = Timer(_refreshDuration, () async {
if (!state) {
return;
}
await _backup();
if (state) {
_schedule();
}
});
}
Future<void> _backup() async {
final isBackupEnabled = ref.read(appSettingsServiceProvider).getSetting(AppSettingsEnum.enableBackup);
if (!isBackupEnabled) {
_timerLogger.fine("UploadTimer: Backup is disabled, skipping backup start.");
return;
}
final tasks = await FileDownloader().allTasks(group: kBackupGroup);
final currentUserId = ref.read(currentUserProvider)?.id;
if (tasks.isEmpty && currentUserId != null) {
ref.read(driftBackupProvider.notifier).startBackup(currentUserId);
} else {
_timerLogger.fine("UploadTimer: There are still active upload tasks - ${tasks.length}, skipping backup start.");
}
}
@override
bool build() {
Future.microtask(start);
ref.onDispose(() {
_timer?.cancel();
});
// Timer is not running yet
return false;
}
}
final uploadTimerProvider = NotifierProvider<UploadTimerNotifier, bool>(UploadTimerNotifier.new);

View File

@@ -121,33 +121,23 @@ class UploadService {
shouldAbortQueuingTasks = false;
final candidates = await _backupRepository.getCandidates(userId);
final candidates = await _backupRepository.getCandidates(userId, limit: 100);
if (candidates.isEmpty) {
return;
}
const batchSize = 100;
int count = 0;
for (int i = 0; i < candidates.length; i += batchSize) {
if (shouldAbortQueuingTasks) {
break;
List<UploadTask> tasks = [];
for (final asset in candidates) {
final task = await _getUploadTask(asset);
if (task != null) {
tasks.add(task);
}
}
final batch = candidates.skip(i).take(batchSize).toList();
List<UploadTask> tasks = [];
for (final asset in batch) {
final task = await _getUploadTask(asset);
if (task != null) {
tasks.add(task);
}
}
if (tasks.isNotEmpty && !shouldAbortQueuingTasks) {
await enqueueTasks(tasks);
if (tasks.isNotEmpty && !shouldAbortQueuingTasks) {
count += tasks.length;
await enqueueTasks(tasks);
onEnqueueTasks(EnqueueStatus(enqueueCount: count, totalCount: candidates.length));
}
onEnqueueTasks(EnqueueStatus(enqueueCount: tasks.length, totalCount: candidates.length));
}
}