package org.jocean.http.server.impl;

import io.netty.channel.Channel;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpObject;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.util.ReferenceCountUtil;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import org.jocean.http.Feature;
import org.jocean.http.server.HttpServer;
import org.jocean.http.server.impl.DefaultHttpServer;
import org.jocean.http.util.Nettys;
import org.jocean.http.util.RxNettys;
import org.jocean.idiom.ExceptionUtils;
import org.jocean.idiom.InterfaceUtils;
import org.jocean.idiom.rx.OneshotSubscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Observer;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Func0;

/* loaded from: classes2.dex */
public class DefaultHttpTrade implements HttpServer.HttpTrade, Nettys.OnHttpObject {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultHttpTrade.class);
    private final Channel _channel;
    private final DefaultHttpServer.ChannelRecycler _channelRecycler;
    private final Subscription _removeHandlers;
    private final List<Subscriber<? super HttpObject>> _subscribers = new CopyOnWriteArrayList();
    private volatile boolean _isKeepAlive = false;
    private final Observable.OnSubscribe<HttpObject> _onSubscribeRequest = new Observable.OnSubscribe<HttpObject>() { // from class: org.jocean.http.server.impl.DefaultHttpTrade.2
        @Override // rx.functions.Action1
        public void call(final Subscriber<? super HttpObject> subscriber) {
            if (subscriber.isUnsubscribed()) {
                return;
            }
            DefaultHttpTrade.this._subscribers.add(subscriber);
            subscriber.add(new OneshotSubscription() { // from class: org.jocean.http.server.impl.DefaultHttpTrade.2.1
                @Override // org.jocean.idiom.rx.OneshotSubscription
                protected void doUnsubscribe() {
                    DefaultHttpTrade.this._subscribers.remove(subscriber);
                }
            });
        }
    };

    public DefaultHttpTrade(Channel channel, DefaultHttpServer.ChannelRecycler channelRecycler, Feature.HandlerBuilder handlerBuilder, Feature... featureArr) {
        this._channelRecycler = channelRecycler;
        this._channel = channel;
        OnHttpObjectAware onHttpObjectAware = (OnHttpObjectAware) InterfaceUtils.compositeIncludeType(OnHttpObjectAware.class, featureArr);
        if (onHttpObjectAware != null) {
            onHttpObjectAware.setOnHttpObject(this);
        }
        Func0<String[]> namesDifferenceBuilder = Nettys.namesDifferenceBuilder(channel);
        for (Feature feature : featureArr) {
            feature.call(handlerBuilder, channel.pipeline());
        }
        this._removeHandlers = RxNettys.removeHandlersSubscription(channel, namesDifferenceBuilder.call());
    }

    @Override // org.jocean.http.util.Nettys.OnHttpObject
    public void onError(Throwable th) {
        LOG.warn("trade({}).onError, detail:{}", this, ExceptionUtils.exception2detail(th));
        Iterator<Subscriber<? super HttpObject>> it = this._subscribers.iterator();
        while (it.hasNext()) {
            it.next().onError(th);
        }
    }

    @Override // org.jocean.http.util.Nettys.OnHttpObject
    public void onHttpObject(HttpObject httpObject) {
        if (httpObject instanceof HttpRequest) {
            this._isKeepAlive = HttpHeaders.isKeepAlive((HttpRequest) httpObject);
        }
        for (Subscriber<? super HttpObject> subscriber : this._subscribers) {
            if (!subscriber.isUnsubscribed()) {
                subscriber.onNext(httpObject);
                if ((httpObject instanceof FullHttpRequest) || (httpObject instanceof LastHttpContent)) {
                    subscriber.onCompleted();
                }
            }
        }
    }

    @Override // org.jocean.http.server.HttpServer.HttpTrade
    public Observable<? extends HttpObject> request() {
        return Observable.create(this._onSubscribeRequest);
    }

    @Override // org.jocean.http.server.HttpServer.HttpTrade
    public Executor requestExecutor() {
        return this._channel.eventLoop();
    }

    @Override // org.jocean.http.server.HttpServer.HttpTrade
    public Observer<HttpObject> responseObserver() {
        return new Subscriber<HttpObject>() { // from class: org.jocean.http.server.impl.DefaultHttpTrade.1
            @Override // rx.Observer
            public void onCompleted() {
                DefaultHttpTrade.this._removeHandlers.unsubscribe();
                DefaultHttpTrade.this._channelRecycler.onResponseCompleted(DefaultHttpTrade.this._channel, DefaultHttpTrade.this._isKeepAlive);
            }

            @Override // rx.Observer
            public void onError(Throwable th) {
                DefaultHttpTrade.LOG.warn("trade({})'s responseObserver.onError, detail:{}", DefaultHttpTrade.this, ExceptionUtils.exception2detail(th));
                DefaultHttpTrade.this._removeHandlers.unsubscribe();
                DefaultHttpTrade.this._channelRecycler.onResponseCompleted(DefaultHttpTrade.this._channel, DefaultHttpTrade.this._isKeepAlive);
            }

            @Override // rx.Observer
            public void onNext(HttpObject httpObject) {
                DefaultHttpTrade.this._channel.write(ReferenceCountUtil.retain(httpObject));
            }
        };
    }

    public String toString() {
        return "HttpTrade [channel=" + this._channel + ", request's subscribers.size=" + this._subscribers.size() + ", isKeepAlive=" + this._isKeepAlive + "]";
    }

    @Override // org.jocean.http.server.HttpServer.HttpTrade
    public Object transport() {
        return this._channel;
    }
}
