Bir Monoyu gerçekten asenkron hale nasıl getirebilirim (reaktif değil!) yöntem çağrısı?

0

Soru

Bir yöntemi var

@Service
public class MyService {
    public Mono<Integer> processData() {
        ... // very long reactive operation
    }
}

Normal program akışında, bu yöntemi bir Kafka olayı aracılığıyla eşzamansız olarak çağırıyorum.

Test amacıyla yöntemi bir web hizmeti olarak göstermem gerekiyor, ancak yöntem eşzamansız olarak gösterilmelidir: yalnızca HTTP kodu 200 Tamam döndürüyor ("istek kabul edildi") ve arka planda veri işlemeye devam ediyor.

Sadece aramak için Tamam mı (= istenmeyen yan etkileri yok mu) Mono#subscribe() ve denetleyici yönteminden geri dönmek mi?

@RestController
@RequiredArgsConstructor
public class MyController {
    private final MyService service;

    @GetMapping
    public void processData() {
        service.processData()
            .subscribeOn(Schedulers.boundedElastic())
            .subscribe();
    }
}

Yoksa bunu böyle yapmak daha mı iyi (burada Intellij'den gelen uyarıyla kafam karıştı, belki de aynı https://youtrack.jetbrains.com/issue/IDEA-276018 ?):

public Mono<Void> processData() {
    service.processData()
        .subscribeOn(Schedulers.boundedElastic())
        .subscribe(); // IntelliJ complains "Inappropriate 'subscribe' call" but I think it's a false alarm in my case(?)
    return Mono.empty();
}

Ya da başka bir çözüm?

2

En iyi cevabı

3

Mono#subscribe () öğesini çağırmak ve denetleyici yönteminden geri dönmek Tamam mı (=istenmeyen yan etkileri yok mu)?

Yan etkileri var, ama onlarla yaşamak iyi olabilir:

  • Bu gerçekten ateştir ve unutmaktır-bu, bir başarı hakkında asla bilgilendirilmeyeceğiniz anlamına gelir (çoğu insanın fark ettiği), aynı zamanda bir başarısızlık hakkında asla bilgilendirilmeyeceğiniz anlamına gelir (çok daha az insanın fark ettiği).)
  • İşlem herhangi bir nedenle askıda kalırsa, söz konusu yayımcı hiçbir zaman tamamlanmaz ve bunu bilmenin hiçbir yolu kalmaz. Sınırlı elastik iş parçacığı havuzuna abone olduğunuzdan, bu sınırlı iş parçacıklarından birini de süresiz olarak bağlayacaktır.

İyi olabileceğiniz ilk nokta ya da bir şekilde reaktif zincirin bir yan etkisi olarak daha fazla hata kaydı yapmak isteyebilirsiniz, böylece bir şeyler ters giderse en azından dahili bir bildiriminiz olur.

İkinci nokta için-yöntem çağrınıza (cömert) bir zaman aşımı koymanızı tavsiye ederim, böylece en azından belirli bir sürede tamamlanmadıysa ve artık kaynakları tüketmiyorsa iptal edilir. Eşzamansız bir görev çalıştırıyorsanız, bu yalnızca biraz bellek tüketeceği için büyük bir sorun değildir. Elastik zamanlayıcıda engelleme çağrısı yapıyorsanız, bu iş parçacığındaki bir iş parçacığını süresiz olarak bağladığınız için bu daha da kötüdür.

Ayrıca, sınırlı elastik zamanlayıcıyı neden burada kullanmanız gerektiğini de sorgularım - bu kullanım durumunun temeli gibi görünmeyen engelleme çağrılarını sarmak için kullanılır. (Açık olmak gerekirse, hizmetiniz engelliyorsa, kesinlikle elastik zamanlayıcıya sarmalısınız - ancak değilse, bunu yapmak için hiçbir neden yoktur.)

Son olarak, bu örnek:

public Mono<Void> processData() {
    service.processData()
        .subscribeOn(Schedulers.boundedElastic())
        .subscribe();
    return Mono.empty();
}

...bir tür "imposter reaktif yöntem" yarattığınız için ne yapmamanız gerektiğine dair mükemmel bir örnektir-birisi, geri dönen yayıncıya, altta yatan yayıncı tamamlandığında tamamlanacağını düşünerek çok makul bir şekilde abone olabilir, ki bu açıkça burada olan şey değildir. Bir kullanarak void dönüş türü ve dolayısıyla hiçbir şey döndürmemek, bu senaryoda yapılacak doğru şeydir.

2021-11-23 16:54:58
1

Aşağıdaki kodla seçeneğiniz aslında tamam:

@GetMapping
public void processData() {
    service.processData()
        .subscribeOn(Schedulers.boundedElastic())
        .subscribe();
}

Ne bu aslında bir olduğunu @Scheduled sadece hiçbir şey döndürmeyen ve açıkça abone olduğunuz yöntem Mono veya Flux böylece elementler yayılır.

2021-11-23 08:36:44

Diğer dillerde

Bu sayfa diğer dillerde

Русский
..................................................................................................................
Italiano
..................................................................................................................
Polski
..................................................................................................................
Română
..................................................................................................................
한국어
..................................................................................................................
हिन्दी
..................................................................................................................
Français
..................................................................................................................
Česk
..................................................................................................................
Português
..................................................................................................................
ไทย
..................................................................................................................
中文
..................................................................................................................
Español
..................................................................................................................
Slovenský
..................................................................................................................