Skip to content

Reactive API (4.0)

Mark Paluch edited this page Jan 9, 2017 · 1 revision

This guide helps you to understand the Observable pattern and aims to give you a general understanding of how to build reactive applications.

Motivation

Asynchronous and reactive methodologies allow you to utilize better system resources, instead of wasting threads waiting for network or disk I/O. Threads can be fully utilized to perform other work instead.

A broad range of technologies exists to facilitate this style of programming, ranging from the very limited and less usable java.util.concurrent.Future to complete libraries and runtimes like Akka. One library, RxJava, has a very rich set of operators to compose asynchronous workflows, it has no further dependencies to other frameworks and supports the very mature Rx model. For more information about Reactive Extensions see http://reactivex.io

Understanding observables

Asynchronous processing decouples I/O or computation from the thread that invoked the operation. A handle to the result is given back, usually a java.util.concurrent.Future or similar, that returns either a single object, a collection or an exception. Retrieving a result, that was fetched asynchronously is usually not the end of processing one flow. Once data is obtained, further requests can be issued, either always or conditionally. With Java 8 or the Promise pattern, linear chaining of futures can be set up so that subsequent asynchronous requests are issued. Once conditional processing is needed, the asynchronous flow has to be interrupted and synchronized. While this approach is possible, it does not fully utilize the advantage of asynchronous processing.

In contrast to the preceding examples, Observable objects answer the multiplicity and asynchronous questions in a different fashion: By inverting the Pull pattern into a Push pattern.

An Observable is the asynchronous/push “dual” to the synchronous/pull Iterable

event Iterable (pull) Observable (push)

retrieve data

T next()

onNext(T)

discover error

throws Exception

onError(Exception)

complete

!hasNext()

onCompleted()

An Observable<T> supports emission sequences of values or even infinite streams, not just the emission of single scalar values (as Futures do), which means an Observable<T> can emit 0 to N events. You will very much appreciate this fact once you start to work on streams instead of single values.

An Observable<T> is not biased toward some particular source of concurrency or asynchronicity and how the underlying code is executed - synchronous or asynchronous, running within a ThreadPool. As a consumer of an Observable<T>, you leave the actual implementation to the supplier, who can change it later on without you having to adapt your code.

The last key point of an Observable<T> is that the underlying processing is not started at the time the Observable<T> is obtained, rather its started at the moment an observer subscribes to the Observable<T>. This is a crucial difference to a java.util.concurrent.Future, which is started somewhere at the time it is created/obtained. So if no observer ever subscribes to the Observable<T>, nothing ever will happen.

A word on the lettuce Reactive API

All commands return an Observable<T> to which an Observer can subscribe to. That Observer reacts to whatever item or sequence of items the Observable<T> emits. This pattern facilitates concurrent operations because it does not need to block while waiting for the Observable to emit objects. Instead, it creates a sentry in the form of an Observer that stands ready to react appropriately at whatever future time the Observable does so.

Consuming observables

The first thing you want to do when working with observables is to consume them. Consuming an observable means subscribing to it. Here is an example that subscribes and prints out all the items emitted:

Observable.just("Ben", "Michael", "Mark").subscribe(new Action1<String>() {

    @Override
    public void call(String s) {
        System.out.println("Hello " + s + "!");
    }

    @Override
    public void onCompleted() {
        System.out.println("Completed");
    }
});

The example prints the following lines:

Hello Ben
Hello Michael
Hello Mark
Completed

You can see that the Subscriber (or Observer) gets notified of every event and also receives the completed event. An Observable<T> emits items until either an exception is raised or the Observable<T> finishes the emission calling onCompleted. No further elements are emitted after that time.

A call to the subscribe method returns a Subscription that allows to unsubscribe and, therefore, do not receive further events. Observables can interoperate with the un-subscription and free resources once a subscriber unsubscribed from the Observable<T>.

You can control the elements that are processed by your Subscriber using operators. The take() operator limits the number of emitted items if you are interested in the first N elements only.

Observable.just("Ben", "Michael", "Mark").take(2).subscribe(new Action1<String>() {

    @Override
    public void call(String s) {
        System.out.println("Hello " + s + "!");
    }

    @Override
    public void onCompleted() {
        System.out.println("Completed");
    }
});

The example prints the following lines:

Hello Ben
Hello Michael
Completed

Note that the take operator implicitly unsubscribes from the Observable<T> once the expected count of elements was emitted.

A subscription to an Observable<T> can be done either by another Observable or a Subscriber. Unless you are implementing a custom Observer, always use Subscriber. The used subscriber Action1 from the example above does not handle Exceptions so once an Exception is thrown you will see a stack trace like this:

Exception in thread "main" rx.exceptions.OnErrorNotImplementedException: Example exception
    at rx.Observable$30.onError(Observable.java:7540)
    at rx.observers.SafeSubscriber._onError(SafeSubscriber.java:154)
    at rx.observers.SafeSubscriber.onError(SafeSubscriber.java:111)
    at rx.internal.operators.OperatorDoOnEach$1.onError(OperatorDoOnEach.java:70)
...
Caused by: java.lang.RuntimeException: Example exception
...
    at rx.Observable$10.onNext(Observable.java:4396)
    at rx.internal.operators.OperatorDoOnEach$1.onNext(OperatorDoOnEach.java:79)
Caused by: rx.exceptions.OnErrorThrowable$OnNextValue: OnError while emitting onNext value: 2
    at rx.exceptions.OnErrorThrowable.addValueAsLastCause(OnErrorThrowable.java:104)
    at rx.internal.operators.OperatorDoOnEach$1.onNext(OperatorDoOnEach.java:81)

It is always recommended to implement an error handler right from the beginning. At a certain point, things can and will go wrong.

A fully implemented subscriber declares the onCompleted and onError methods allowing you to react on these events:

Observable.just("Ben", "Michael", "Mark").subscribe(new Subscriber<String>() {
    @Override
    public void onCompleted() {
        System.out.println("Completed");
    }

    @Override
    public void onError(Throwable e) {
        System.out.println("onError: " + e);
    }

    @Override
    public void onNext(String s) {
        System.out.println("Hello " + s + "!");
    }
});

From push to pull

The examples from above illustrated how observables can be set up in a not-opinionated style about blocking or non-blocking execution. An Observable<T> can be converted explicitly into a BlockingObservable<T>, which then behaves very much like an Iterable<T>.

String last = Observable.just("Ben", "Michael", "Mark").toBlocking().last();
System.out.println(last);

The example prints the following line:

Mark

A blocking observable can be used to synchronize the observable chain and find back a way into the plain and well-known Pull pattern.

List<String> list = Observable.just("Ben", "Michael", "Mark").toList().toBlocking().single();
System.out.println(list);

The toList operator collects all emitted elements and passes the list through the BlockingObservable<T>.

The example prints the following line:

[Ben, Michael, Mark]

Creating observables using lettuce

There are many ways to establish observables. You have already seen just(), take() and toList(). Refer to the RxJava documentation for many more methods that you can use to create observables.

lettuce observables can be used for initial and chaining operations. When using lettuce observables, you will notice the non-blocking behavior. This is because all I/O and command processing are handled asynchronously using the netty EventLoop.

lettuce exposes its observables on the Standalone, Sentinel, Publish/Subscribe and Cluster APIs.

Connecting to Redis is insanely simple:

RedisClient client = RedisClient.create("redis://localhost");
RedisStringReactiveCommands<String, String> commands = client.connect().reactive();

In the next step, obtaining a value from a key requires the GET operation:

commands.get("key").subscribe(new Action1<String>() {
        @Override
        public void call(String value) {
            System.out.println(value);
        }
    });

Alternatively, written in Java 8 lambdas:

commands
   .get("key")
   .subscribe(value -> System.out.println(value));

The execution is handled asynchronously, and the invoking Thread can be used to processed in processing while the operation is completed on the Netty EventLoop threads. Due its decoupled nature, the calling method can be left before the execution of the Observable<T> is finished. You need to synchronize the execution of the Observable<T> if you wish to wait for the Observable<T> to finish.

lettuce observables can be used within the context of observable chaining to load multiple keys asynchronously:

Observable.just("Ben", "Michael", "Mark").flatMap(new Func1<String, Observable<String>>() {
        @Override
        public Observable<String> call(String s) {
            return commands.get(s);
        }
    }).subscribe(new Action1<String>() {
        @Override
        public void call(String document) {
            System.out.println("Got value: " + document);
        }
    });

Alternatively, written in Java 8 lambdas:

Observable
   .just("Ben", "Michael", "Mark")
   .flatMap(commands::get)
   .subscribe(document -> System.out.println("Got value: " + document));

Hot and Cold Observables

There is a distinction between observables that was not covered yet:

  • A cold Observable waits for a subscription until it emits values and does this freshly for every subscriber.

  • A hot Observable begins emitting values upfront and presents them to every subscriber subsequently.

All Observables returned from the Redis Standalone, Redis Cluster, and Redis Sentinel API are cold, meaning that no I/O happens until they are subscribed to. As such an observer is guaranteed to see the whole sequence from the beginning. So just creating an Observable will not cause any network I/O thus creating and discarding Observables is cheap. Observables created for a Publish/Subscribe emit PatternMessages and ChannelMessages once they are subscribed to. Observables guarantee however to emit all items from the beginning until their end. While this is true for Publish/Subscribe observables, the nature of subscribing to a Channel/Pattern allows missed messages due to its subscription nature and less to the Hot/Cold distinction of observables.

Transforming observables

Observables can transform the emitted values in various ways. One of the most basic transformations is flatMap() which you have seen from the examples above that converts the incoming value into a different one. Another one is map(). The difference between map() and flatMap() is that flatMap() allows you to do those transformations with Observable<T> calls.

Observable.just("Ben", "Michael", "Mark").flatMap(new Func1<String, Observable<String>>() {
    @Override
    public Observable<String> call(String key) {
        return commands.get(key);
    }
}).flatMap(new Func1<String, Observable<?>>() {
    @Override
    public Observable<?> call(String value) {
        return commands.rpush("result", value);
    }
}).subscribe();

The first flatMap() function is used to retrieve a value and the second flatMap() function appends the value to a Redis list named result. The flatMap() function returns an Observable whereas the normal map just returns <T>. You will use flatMap() a lot when dealing with flows like this, you’ll become good friends.

An aggregation of values can be achieved using the scan() transformation. It applies a function to each value emitted by an Observable<T>, sequentially and emits each successive value. We can use it to aggregate values, to count the number of elements in multiple Redis sets:

Observable.just("Ben", "Michael", "Mark").flatMap(new Func1<String, Observable<Long>>() {
    @Override
    public Observable<Long> call(String key) {
        return commands.scard(key);
    }
}).scan(new Func2<Long, Long, Long>() {
    @Override
    public Long call(Long sum, Long current) {
        return sum + current;
    }
}).subscribe(new Action1<Long>() {
    @Override
    public void call(Long result) {
        System.out.println("Number of elements in sets: " + result);
    }
});

The aggregation function of scan() is applied on each emitted value, so three times in the example above. If you want to get the last value, which denotes the final result containing the number of elements in all Redis sets, apply the last() transformation:

Observable.just("Ben", "Michael", "Mark").flatMap(new Func1<String, Observable<Long>>() {
    @Override
    public Observable<Long> call(String key) {
        return commands.scard(key);
    }
}).scan(new Func2<Long, Long, Long>() {
    @Override
    public Long call(Long sum, Long current) {
        return sum + current;
    }
}).last().subscribe(new Action1<Long>() {
    @Override
    public void call(Long result) {
        System.out.println("Number of elements in sets: " + result);
    }
});

Now let’s take a look at grouping observables. The following example emits three observables and groups them by the beginning character.

Observable.just("Ben", "Michael", "Mark").groupBy(new Func1<String, String>() {
    @Override
    public String call(String key) {
        return key.substring(0, 1);
    }
}).subscribe(new Action1<GroupedObservable<String, String>>() {
    @Override
    public void call(GroupedObservable<String, String> groupedObservable) {
        groupedObservable.toList().subscribe(

        new Action1<List<String>>() {
            @Override
            public void call(List<String> strings) {
                System.out.println("First character: " + groupedObservable.getKey() + ", elements: " + strings);
            }
        });
    }
});

Alternatively, written in Java 8 lambdas:

Observable
   .just("Ben", "Michael", "Mark")
   .groupBy(key -> key.substring(0, 1))
   .subscribe(grouped ->
      grouped
         .toList()
         .subscribe(strings ->
            System.out.println("First character: " + grouped.getKey() + ", elements: " + strings)));

The example prints the following lines:

First character: B, elements: [Ben]
First character: M, elements: [Michael, Mark]

Absent values

The presence and absence of values is an essential part of reactive programming. Traditional approaches consider null as an absence of a particular value. With Java 8, Optional<T> was introduced to encapsulate nullability.

In the scope of Redis, an absent value is an empty list, a non-existent key or any other empty data structure. Reactive programming discourages the use of null as value. Newer reactive specifications, like reactive-streams.org prohibit the use of null. The reactive answer to absent values is just not emitting any value that is possible due the 0 to N nature of Observable<T>.

Suppose we have the keys Ben and Michael set each to the value value. We query those and another, absent key with the following code:

Observable.just("Ben", "Michael", "Mark").flatMap(new Func1<String, Observable<String>>() {
    @Override
    public Observable<String> call(String s) {
        return reactive.get(s);
    }
}).doOnNext(new Action1<String>() {
    @Override
    public void call(String value) {
        System.out.println(value);
    }
}).subscribe();

The example prints the following lines:

value
value

The output is just two values. The GET to the absent key Mark does not emit a value.

The reactive API provides operators to work with empty results when you require a value. You can use one of the following operators:

  • defaultIfEmpty: Emit a default value if the Observable<T> did not emit any value at all

  • switchIfEmpty: Switch to a fallback Observable<T> to emit values

  • isEmpty: Emit an Observable<Boolean> that contains a flag whether the original Observable<T> is empty

  • firstOrDefault/singleOrDefault/lastOrDefault/elementAtOrDefault: Positional operators to retrieve the first/last/Nth element or emit a default value

Differing behavior regarding null values

The lettuce API behaves with three commands in a different way: MGET, HMGET and EXEC. These commands retrieve a List of keys/fields/operations and emit values in the order of the specified keys/field names. Absent values return in the synchronous and asynchronous API null elements inside the resulting List.

Suppressing null values causes ambiguity about keys/fields/operations. If a value is missing, it’s no longer possible to correlate to which key/field/operation the absent value belongs and at which offset the values are present again. MGET, HMGET and EXEC will emit null values to indicate absence for a particular key/field/operation.

This code example uses MGET to retrieve multiple keys in one operation. Ben and Michael are set again to the value value and Mark is a non-existent key.

reactive.mget("Ben", "Michael", "Mark").doOnNext(new Action1<String>() {
    @Override
    public void call(String value) {
        System.out.println(value);
    }
}).subscribe();

The example prints the following lines:

value
value
null

It’s foreseeable this behavior will change with future releases of the reactive API.

Filtering observables

The values emitted by an Observable<T> can be filtered in case you need only specific results. Filtering does not change the emitted values itself. Filters affect how many items and at which point (and if at all) they are emitted.

Observable.just("Ben", "Michael", "Mark").filter(new Func1<String, Boolean>() {
    @Override
    public Boolean call(String s) {
        return s.startsWith("M");
    }
}).flatMap(new Func1<String, Observable<String>>() {
    @Override
    public Observable<String> call(String s) {
        return commands.get(s);
    }
}).subscribe(new Action1<String>() {
    @Override
    public void call(String document) {
        System.out.println("Got value: " + document);
    }
});

The code will fetch only the keys Michael and Mark but not Ben. The filter criteria are whether the key starts with a M.

You already met the last() filter to retrieve the last value:

Observable.just("Ben", "Michael", "Mark").last().subscribe(new Action1<String>() {
    @Override
    public void call(String value) {
        System.out.println("Got value: " + value);
    }
});

the extended variant of last() allows you to take the last N values:

Observable.just("Ben", "Michael", "Mark").takeLast(2).subscribe(new Action1<String>() {
    @Override
    public void call(String value) {
        System.out.println("Got value: " + value);
    }
});

The example from above takes the last 2 values.

The opposite to last() is the first() filter that is used to retrieve the first value:

Observable.just("Ben", "Michael", "Mark").first().subscribe(new Action1<String>() {
    @Override
    public void call(String value) {
        System.out.println("Got value: " + value);
    }
});

Error handling

Error handling is an indispensable component of every real world application and should to be considered from the beginning on. RxJava provides several mechanisms to deal with errors.

In general, you want to react in the following ways:

  • Return a default value instead

  • Use a backup observable

  • Retry the observable (immediately or with delay)

The following code falls back to a default value after it throws an exception at the first emitted item:

Observable.just("Ben", "Michael", "Mark").doOnNext(new Action1<String>() {
    @Override
    public void call(String s) {
        throw new IllegalStateException("Takes way too long");
    }
}).onErrorReturn(new Func1<Throwable, String>() {
    @Override
    public String call(Throwable throwable) {
        return "Default value";
    }
}).subscribe();

You can use a backup Observable<T> which will be called if the first one fails.

Observable.just("Ben", "Michael", "Mark").doOnNext(new Action1<String>() {
    @Override
    public void call(String s) {
        throw new IllegalStateException("Takes way too long");
    }
}).onErrorResumeNext(commands.get("Default Key")).subscribe();

It is possible to retry the observable by re-subscribing. Re-subscribing can be done as soon as possible, or with a wait interval, which is preferred when external resources are involved.

Observable.just("Ben", "Michael", "Mark").flatMap(new Func1<String, Observable<String>>() {
    @Override
    public Observable<String> call(String key) {
        return commands.get(key);
    }
}).retry().subscribe();

Use the following code if you want to retry with backoff:

Observable.just("Ben", "Michael", "Mark").flatMap(new Func1<String, Observable<?>>() {
    @Override
    public Observable<?> call(String key) {
        return commands.get(key);
    }
}).retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
    @Override
    public Observable<?> call(Observable<? extends Throwable> attempts) {

        return attempts.zipWith(Observable.range(1, 3), new Func2<Throwable, Integer, Integer>() {
            @Override
            public Integer call(Throwable throwable, Integer integer) {
                return integer;
            }
        }).flatMap(new Func1<Integer, Observable<Long>>() {
            @Override
            public Observable<Long> call(Integer i) {
                {
                    System.out.println("delay retry by " + i + " second(s)");
                    return Observable.timer(i, TimeUnit.SECONDS);
                }
            }
        });
    }
}).subscribe();

Alternatively, written in Java 8 lambdas:

Observable
   .just("Ben", "Michael", "Mark")
   .flatMap(key -> commands.get(key))
   .retryWhen(attempts ->
      attempts
         .zipWith(Observable.range(1, 3), (throwable, integer) -> integer)
         .flatMap(i -> {
            System.out.println("delay retry by " + i + " second(s)");
            return Observable.timer(i, TimeUnit.SECONDS);
          }))
   .subscribe();

The attempts get passed into the retryWhen() method and zipped with the number of seconds to wait. The timer method is used to complete once its timer is done.

Schedulers and threads

Schedulers in RxJava are used to instruct multi-threading. Some operators have variants that take a Scheduler as a parameter. These instruct the operator to do some or all of its work on a particular Scheduler.

RxJava ships with a set of preconfigured Schedulers, which are all accessible through the Schedulers class:

  • Schedulers.computation(): Executes the computational work such as event-loops and callback processing.

  • Schedulers.immediate(): Executes the work immediately in the current thread

  • Schedulers.io(): Executes the I/O-bound work such as asynchronous performance of blocking I/O, this scheduler is backed by a thread-pool that will grow as needed

  • Schedulers.newThread(): Executes the work on a new thread

  • Schedulers.trampoline(): Queues work to begin on the current thread after any already-queued work

  • Schedulers.from(): Create a scheduler from a java.util.concurrent.Executor

  • Schedulers.test(): Test scheduler that allows you to exercise fine-tuned manual control over how the Scheduler’s clock behaves.

Do not use the computational scheduler for I/O.

Observables can be executed on a scheduler in the following different ways:

  • Using an operator that makes use of a scheduler

  • Explicitly by passing the Scheduler to such an operator

  • By using subscribeOn(Scheduler)

  • By using observeOn(Scheduler)

Operators like buffer, replay, skip, delay, parallel, and so forth use a Scheduler by default if not instructed otherwise. A list of default Schedulers for RxJava Observable Operators can be found here

All of the listed operators allow you to pass in a custom scheduler if needed. Sticking most of the time with the defaults is a good idea.

If you want the subscribe chain to be executed on a specific scheduler, you use the subscribeOn() operator. The code is executed on the main thread without a scheduler set:

Observable.just("Ben", "Michael", "Mark").flatMap(new Func1<String, Observable<String>>() {
    @Override
    public Observable<String> call(String key) {
        System.out.println("Map 1: " + key + " (" + Thread.currentThread().getName() + ")");
        return Observable.just(key);
    }
}).flatMap(new Func1<String, Observable<String>>() {
    @Override
    public Observable<String> call(String value) {
        System.out.println("Map 2: " + value + " (" + Thread.currentThread().getName() + ")");
        return Observable.just(value);
    }
}).subscribe();

The example prints the following lines:

Map 1: Ben (main)
Map 2: Ben (main)
Map 1: Michael (main)
Map 2: Michael (main)
Map 1: Mark (main)
Map 2: Mark (main)

This example shows the subscribeOn() method added to the flow (it does not matter where you add it):

Observable.just("Ben", "Michael", "Mark").flatMap(new Func1<String, Observable<String>>() {
    @Override
    public Observable<String> call(String key) {
        System.out.println("Map 1: " + key + " (" + Thread.currentThread().getName() + ")");
        return Observable.just(key);
    }
}).flatMap(new Func1<String, Observable<String>>() {
    @Override
    public Observable<String> call(String value) {
        System.out.println("Map 2: " + value + " (" + Thread.currentThread().getName() + ")");
        return Observable.just(value);
    }
}).subscribeOn(Schedulers.computation()).subscribe();

The output of the example shows the effect of subscribeOn(). You can see that the Observable is executed on the same thread, but on the computation thread pool:

Map 1: Ben (RxComputationThreadPool-1)
Map 2: Ben (RxComputationThreadPool-1)
Map 1: Michael (RxComputationThreadPool-1)
Map 2: Michael (RxComputationThreadPool-1)
Map 1: Mark (RxComputationThreadPool-1)
Map 2: Mark (RxComputationThreadPool-1)

If you apply the same code to lettuce, you’ll notice a small difference:

Observable.just("Ben", "Michael", "Mark").flatMap(new Func1<String, Observable<String>>() {
    @Override
    public Observable<String> call(String key) {
        System.out.println("Map 1: " + key + " (" + Thread.currentThread().getName() + ")");
        return commands.set(key, key);
    }
}).flatMap(new Func1<String, Observable<String>>() {
    @Override
    public Observable<String> call(String value) {
        System.out.println("Map 2: " + value + " (" + Thread.currentThread().getName() + ")");
        return Observable.just(value);
    }
}).subscribeOn(Schedulers.computation()).subscribe();

The example prints the following lines:

Map 1: Ben (RxComputationThreadPool-1)
Map 1: Michael (RxComputationThreadPool-1)
Map 1: Mark (RxComputationThreadPool-1)
Map 2: OK (nioEventLoopGroup-3-1)
Map 2: OK (nioEventLoopGroup-3-1)
Map 2: OK (nioEventLoopGroup-3-1)

Two things differ from the standalone examples:

  1. The values are set rather concurrently than sequentially

  2. The second flatMap() transformation prints the netty EventLoop thread name

This is because the lettuce observables are executed and completed on the netty EventLoop threads by default.

observeOn instructs an Observable to call its observer’s onNext, onError, and onCompleted methods on a particular Scheduler. Here, the order matters:

Observable.just("Ben", "Michael", "Mark").flatMap(new Func1<String, Observable<String>>() {
    @Override
    public Observable<String> call(String key) {
        System.out.println("Map 1: " + key + " (" + Thread.currentThread().getName() + ")");
        return commands.set(key, key);
    }
}).observeOn(Schedulers.computation()).flatMap(new Func1<String, Observable<String>>() {
    @Override
    public Observable<String> call(String value) {
        System.out.println("Map 2: " + value + " (" + Thread.currentThread().getName() + ")");
        return Observable.just(value);
    }
}).subscribe();

Everything before the observeOn() call is executed in main, everything below in the scheduler:

Map 1: Ben (main)
Map 1: Michael (main)
Map 1: Mark (main)
Map 2: OK (RxComputationThreadPool-3)
Map 2: OK (RxComputationThreadPool-3)
Map 2: OK (RxComputationThreadPool-3)

Schedulers allow direct scheduling of operations. Refer to the RxJava documentation for further information.

Redis Transactions

Other examples

Blocking example

RedisStringReactiveCommands<String, String> reactive = client.connect().reactive();
Observable<String> set = reactive.set("key", "value");
set.toBlocking().first();

Non-blocking example

RedisStringReactiveCommands<String, String> reactive = client.connect().reactive();
Observable<String> set = reactive.set("key", "value");
set.subscribe();
Clone this wiki locally