Spring WebFlux

Lambda 表达式

Lambda写法示例:

    new Thread(new Runnable() {
        @Override
        public void run() {
            System.out.println("ThreadName:" + Thread.currentThread().getName());
        }
    }).start();

    //Lambda
    new Thread(() -> System.out.println("ThreadName:" + Thread.currentThread().getName())).start();

Runnable有@FunctionalInterface注解表示是函数接口,要求类型是接口有且只有一个抽象方法

Java自带的函数接口

java.util.function路径下有Java自带的各种函数接口 BiConsumer BiFunction BinaryOperator BiPredicate BooleanSupplier Consumer DoubleBinaryOperator DoubleConsumer DoubleFunction DoublePredicate DoubleSupplier DoubleToIntFunction DoubleToLongFunction DoubleUnaryOperator Function IntBinaryOperator IntConsumer IntFunction IntPredicate IntSupplier IntToDoubleFunction IntToLongFunction IntUnaryOperator LongBinaryOperator LongConsumer LongFunction LongPredicate LongSupplier LongToDoubleFunction LongToIntFunction LongUnaryOperator ObjDoubleConsumer ObjIntConsumer ObjLongConsumer Predicate Supplier ToDoubleBiFunction ToDoubleFunction ToIntBiFunction ToIntFunction ToLongBiFunction ToLongFunction UnaryOperator

示例:

    Supplier<String> stringSupplier = () -> "hello";
    System.out.println(stringSupplier.get());

    ToIntFunction toIntFunction = (a) -> Integer.valueOf(a.toString());
    System.out.println(toIntFunction.applyAsInt("322"));

    Consumer<String> consumer = a -> System.out.println(a);
    consumer.accept("你好");

    //输入输出类型不一样
    Function<Integer, Integer> function = a -> {
        return a * a;
    };
    System.out.println("function:" + function.apply(2));

    //输入输出类型一样
    UnaryOperator<Integer> unaryOperator = a -> a * a;
    System.out.println("unaryOperator:" + unaryOperator.apply(2));

    //输入2个输出1个
    BiFunction<Integer, Integer, Integer> biFunction = (a, b) -> a + b;
    System.out.println("biFunction:" + biFunction.apply(2, 1));

Java8的Strean编程

创建流

    String[] arr={"a","b","c","2121"};
    //通过数组创建
    Stream.of(arr).forEach(System.out::println);

    //通过List
    Arrays.asList(arr).stream().forEach(System.out::println);

    //通过Stream
    Stream.of(arr).forEach(System.out::println);

    //迭代器
    Stream.iterate(0,a->a+1).limit(10).forEach(System.out::println);

    //generate手动创建
    Stream.generate(()->new Random().nextInt(10)).limit(10).forEach(System.out::println);

一个流的操作

    String[] arr={"a","b","c","","2121","c"};
    Stream.of(arr).filter(d->!d.isEmpty())
            .distinct()
            .sorted()
            .limit(1)
            .map(d->d.replace("1",""))
            .peek(System.out::println)
            .flatMap(d->Stream.of( d.split("")))
            .peek(System.out::println)  //窥探函数 中间操作,可以用于调试
            .findFirst();               //一个流中只能有一个终止函数,中间操作可以任意个
	//        .forEach(System.out::println);    //终止函数

212.jpg

反应式编程

Spring反应式编程是低延迟高吞吐量 反应式宣言

1.jpg

2.jpg

Java9

system.util.concurrent.Flow

发布者-订阅者

public static void main(String[] args) throws IOException {
    //创建一个发布者
    SubmissionPublisher publisher = new SubmissionPublisher();

    //创建一个订阅者
    Flow.Subscriber subscriber = new Flow.Subscriber() {
        Flow.Subscription subscription;

        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            this.subscription = subscription;
            System.out.println("onSubscribe");
            subscription.request(1);
        }

        @Override
        public void onNext(Object item) {
            System.out.println("onNext"+item);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            subscription.request(1);

        }

        @Override
        public void onError(Throwable throwable) {
            System.out.println("onError");

        }

        @Override
        public void onComplete() {
            System.out.println("onComplete");
        }
    };

    //建立订阅者
    publisher.subscribe(subscriber);

    for (int i = 0; i < 100; i++) {
        publisher.submit("hello:"+i);
    }

    System.in.read();

}

发布者->订阅者->最终订阅者

//---------------测试---------------
public static void main(String[] args) throws IOException {
    //创建一个发布者
    SubmissionPublisher<String> publisher = new SubmissionPublisher();

    //创建处理器
    ReactiveProcessor processor=new ReactiveProcessor();

    //建立订阅者
    publisher.subscribe(processor);

    //创建最终的订阅者
    Flow.Subscriber subscriber = new Flow.Subscriber() {
        Flow.Subscription subscription;

        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            this.subscription = subscription;
            System.out.println("onSubscribe");
            subscription.request(1);
        }

        @Override
        public void onNext(Object item) {
            System.out.println("onNext"+item);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            subscription.request(1);

        }

        @Override
        public void onError(Throwable throwable) {
            System.out.println("onError");

        }

        @Override
        public void onComplete() {
            System.out.println("onComplete");
        }
    };
    processor.subscribe(subscriber);

    for (int i = 0; i < 100; i++) {
        publisher.submit("hello:"+i);
    }

    System.in.read();

}

public class ReactiveProcessor extends SubmissionPublisher<String> implements Flow.Processor<String,String> {

    Flow.Subscription subscription;

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
        System.out.println("onSubscribe");
        subscription.request(1);
    }

    @Override
    public void onNext(String item) {
        System.out.println("onNext"+item);
        this.submit(item); //数据发给订阅者
        subscription.request(1);

    }

    @Override
    public void onError(Throwable throwable) {
        System.out.println("onError");

    }

    @Override
    public void onComplete() {
        System.out.println("onComplete");
    }
}

Mono

Mono.justOrEmpty("32323").subscribe(System.out::println);

Flux

Flux.just("1","2","3").subscribe(System.out::println);
Flux.fromIterable(Arrays.asList("1","2","3")).subscribe(System.out::println);
Flux.fromIterable(Arrays.asList("1","2","3")).map(d->d+d).filter(d->Objects.equals(d,"11")).subscribe(System.out::println);

WebFlux

注解编程

2.jpg

函数式编程

1.jpg

数据库操作

R2DBC r2dbc github

事务操作

##参考资料 官方文档


已有 0 条评论

    欢迎您,新朋友,感谢参与互动!