/*
 * Decompiled with CFR 0.152.
 */
package com.azure.storage.blob.implementation.util;

import com.azure.core.util.logging.ClientLogger;
import java.nio.ByteBuffer;
import java.util.concurrent.LinkedBlockingQueue;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;

public final class StorageBlockingSink {
    private static final ClientLogger LOGGER = new ClientLogger(StorageBlockingSink.class);
    private final Sinks.Many<ByteBuffer> writeSink;
    private final LinkedBlockingQueue<ByteBuffer> writeLimitQueue = new ProducerBlockingQueue<ByteBuffer>(1, LOGGER);

    public StorageBlockingSink() {
        this.writeSink = Sinks.many().unicast().onBackpressureBuffer(this.writeLimitQueue);
    }

    public void emitNext(ByteBuffer buffer) {
        try {
            this.writeSink.tryEmitNext((Object)buffer).orThrow();
        }
        catch (Exception e) {
            throw LOGGER.logExceptionAsError((RuntimeException)new IllegalStateException("Faulted stream due to underlying sink write failure", e));
        }
    }

    public void emitCompleteOrThrow() {
        this.writeSink.tryEmitComplete().orThrow();
    }

    public Flux<ByteBuffer> asFlux() {
        return this.writeSink.asFlux();
    }

    private static final class ProducerBlockingQueue<ByteBuffer>
    extends LinkedBlockingQueue<ByteBuffer> {
        private final transient ClientLogger logger;
        private static final long serialVersionUID = 1L;

        ProducerBlockingQueue(int queueSize, ClientLogger logger) {
            super(queueSize);
            this.logger = logger;
        }

        @Override
        public boolean offer(ByteBuffer o) {
            try {
                super.put(o);
                return true;
            }
            catch (InterruptedException e) {
                throw this.logger.logExceptionAsError(new RuntimeException(e));
            }
        }
    }
}

