Como transformar objetos comuns em "objetos assíncronos"
Programação Concorrente em Erlang - A inspiração
Joe Armstrong, um dos criadores da linguagem Erlang nos mostrou uma maneira muito elegante de tratar concorrência na programação. Iremos introduzir alguns conceitos de Erlang que tangem a programação concorrente deixando de lado uma séria de sofisticações como o tratamento e propagação de erros, programação remota para focar nos conceitos mais básicos e gerais.O primeiro comando básico de concorrência é o BIF (built in function) ``spawn''. Essa função é usada para criar um novo ``processo" (na nomenclatura de Erlang, processo seria uma linha de execução (portanto uma thread) que não compartilha dados). A assinatura da função é a seguinte: spawn(Module, Exported Function, List of Arguments). Considere o seguinte módulo (extraído do tutorial do Erlang):
--module(example1).
-export([start/0, say_something/2]).
say_something(_, 0) -> done;
say_something(What, Times)->
io:format("~p~n", [What]),
say_something(What, Times - 1).
start() ->
spawn(example1, say_something, [hello, 3]),
spawn(example1, say_something, [goodbye, 3]).A função start() deste módulo bem simples cria dois processos que executa a função \verb+say_something+ com os parâmetros $[hello, 3]$ e $[goodbye,3]$. Além disso, a função devolve o id do processo criado (como veremos a frente). 1> c(example1).
{ok,example1}
2> example1:start().
hello
goodbye
<0.43.0>
hello
goodbye
hello
goodbye
Por enquanto, criamos processos que executam ações mas não escutam mensagens. Mas enviar mensagens para um objeto que não responde é como estender a mão a um cego esperando que aperte sua mão. Façamos então o processo enxegar ou ouvir mensagens. Para isso, utilizamos a estrutura receive.
A construção receive é usada para permitir que processos esperem mensagens de outros. Seu formato é:
receive pattern1 -> actions1; pattern2 -> actions2; .... patternN actionsN end.O que é um tanto quanto intuitivo. Ao receber alguma mensagem que coincida com o pattern1 o processo executará actions1 e assim sucessivamente. Outra construção igualmente importante é a forma como enviamos mensagens para um processo. Para isso usamos o operador ``!''.
Pid ! MessageA construção acima envia a mensagem Message (que pode ser qualquer ``termo'' Erlang) para processo cujo id seja igual a Pid. A seguir, um exemplo de programa com interações (processos enviando mensagens para outros processos), ilustrando o uso das construções citadas:
-module(example2).
-export([start/0, ping/2, pong/0]).
ping(0, Pong_PID) ->
Pong_PID ! finished,
io:format("ping finished~n", []);
ping(N, Pong_PID) ->
Pong_PID ! {ping, self()},
receive
pong -> io:format("Ping received pong~n", [])
end,
ping(N - 1, Pong_PID).
pong() ->
receive
finished -> io:format("Pong finished~n", []);
{ping, Ping_PID} ->
io:format("Pong received ping~n", []),
Ping_PID ! pong, pong()
end.
start() ->
Pong_PID = spawn(example2, pong, []),
spawn(example2, ping, [3, Pong_PID]).
Ao executar o código acima teremos o seguinte resultado:1> c(example2.erl).
{ok,example2}
2> example2:start().
Pong received ping
<0.43.0>
Ping received pong
Pong received ping
Ping received pong
Pong received ping
Ping received pong
ping finished
Pong finished
Surpreendentemente simples e útil. Por trás dessas simples construções, Erlang
cria um novo processo (spawn) e uma ``caixa de mensagens'' para este processo.
Sempre que uma mensagem é enviado ao processo (Pid ! Message), a mensagem fica
guardada na caixa de mensagens até que o processo esteja disponível para recebê-lo. O processo então retira a mensagem da caixa (receive) e seleciona qual ação tomará baseado no
padrão que a mesma se encaixa.
Asynchronous Proxy Factory - Java Strike Back
Um dos grandes apelos das linguagens tipadas é a facilidade de saber quaismensagens são entendidas por objetos dessa linguagem, e sempre que a assinatura
das mensagens enviadas não coincidirem com a interface do objeto tem-se um erro
em tempo de compilação. Isso permite a uma IDE como Eclipse fornecer dicas de
codificação no momento que o programador digita o código.
Em oposição, durante a codificação de mensagens em Erlang com a construção "Pid ! Message"
não é possível saber quais mensagens, afinal, o processo Pid escuta.
Java, não fornece estruturas simples como Erlang para mensageria mas um
complicado e também sofisticado arcabouço de concorrência java.util.concurrent.
Nossa proposta é criar uma biblioteca auxiliar que forneça os mecanismos de
concorrência para objetos que funcionem de forma ``síncrona'' usando apenas as
ferramentas já disponíveis na linguagem.
Versão OO do Ping-Pong
Para uma melhor comparação entre os conceitos de atores de Erlang e os conceitosde Orientação a Objetos de Java, iremos fazer uma adaptação do programa
ping-pong (example2). Tomaremos o cuidado de fazer uma versão testável, quer
dizer, ao invés dos métodos escreverem String na saída padrão, eles irão
preencher um ``log de conversação'' que será analisado num teste unitário. Comecemos
pelo teste.
@Test
public void ping(){
StringBuffer string = new StringBuffer();
Ping ping = new Ping(string);
PingInterface pong = new Ping(string);
ping.sendPing(3, pong);
verifyPingConversation(string, 3);
}StringBuffer é uma versão sincronizada do StringBuilder. Isto vem a calharporque devemos evitar condições de corrida na escrita do log. A grosso modo,
este método é uma versão junit da função start() do programa example2.
A verificação que implementaremos apenas verifica se todas as falas foram
realizadas durante o diálogo dos atores. Inicializaremos uma mapa com as frases
e a quantidade de vezes que aquelas frases devem aparecer. Sempre que uma frase
for dita, iremos decrementar o contador no mapa até zerá-lo quando riscaremos
a frase do mapa.
private void verifyPingConversation(StringBuffer string, int i) {
String[] messages = string.toString().split("\n");
Map expectedResult =
new HashMap();
expectedResult.put("Pong received ping", i);
expectedResult.put("Ping received pong", i);
expectedResult.put("Pong finished", 1);
expectedResult.put("ping finished", 1);
for (String string2 : messages) {
Integer count = expectedResult.get(string2);
count--;
if (count ==0){
expectedResult.remove(string2);
} else {
expectedResult.put(string2, count);
}
}
Assert.assertTrue(expectedResult.isEmpty());
}
Agora a versão Java do Ping. Bonus: definimos uma interface de comunicação,eliminamos os seletores e a classe é testável.
public class Ping implements PingInterface {
private StringBuffer conversation;
public Ping(StringBuffer string){
conversation = string;
}
public void sendPing(int n, PingInterface pid) {
if (n ==0 ){
pid.finished();
say("ping finished\n");
} else {
pid.ping(this);
sendPing(n-1, pid);
}
}
private void say(String string) {
conversation.append(string);
}
public void finished() {
say("Pong finished\n");
}
public void pong(PingInterface pong) {
say("Ping received pong\n");
}
public void ping(PingInterface ping) {
say("Pong received ping\n");
ping.pong(this);
}
}Padrão Produtor-Consumidor em Java
Inicialmente, precisaremos de um sistema de mensageria como Erlang. Paraisso, teremos uma ``caixa de mensagens'' que faremos com o uso de uma
BlockingQueue (fila bloqueante do pacote java.util.concurrent). Por questões de
simplicidade utilizaremos a implementação ArrayBlockingQueue.
Deixaremos um consumidor a espera de serviços que chegarão na
``caixa de mensagens''. Este trabalhador terá uma linha de execução
independente do programa principal e um reservatório de trabalhadores para
executar cada uma das mensagens.
Para controlar o ciclo de vida do consumidor, o consumidor irá dispor de:
- Uma tarefa kill; Ao receber a tarefa kill, o processo deve parar. A ideia é que essa seja a última tarefa a ser executada pelo processo.
- Uma variável sentinel; caso o valor falso seja atribuido o processo pára independente se outras tarefas existam na fila;
- um método para interrupção;
através da interface ExecutorService.
Abaixo, propriedades do executor conforme mencionado:
/** * caixa de mensagens (tarefas) do executor */ protected BlockingQueueO método principal do Executor, é o método público run() da interface Runnable.blockingQueue; /** * reservatorio de trabalhadores */ protected ExecutorService executorService; // variaveis de controle do ciclo de vida private boolean sentinel = true; /** * Tarefa que finaliza */ private final T kill; /** * variavel de sincronizacao: quando chegar a zero, o * processo terminou sua execucao */ private CountDownLatch countDownLatch = new CountDownLatch(1);
@Override
public void run() {
try {
// importante para interromper o loop infinito
while (isSentinelEnable()) {
T t = getBlockingQueue().take();
if (t == kill) {
kill();
sentinel = false;
} else {
execute(t);
}
}
// anuncia que o programa terminou
getCountDownLatch().countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
} Se uma mensagem kill é recebida, o método kill() será invocado para que o objetopossa realizar algum procedimento de sanidade. Para as demais mensagens, o
método execute() é invocado.
Enquanto execute() é abstrato o método kill() possui um implementação básica
vazia.
protected abstract void execute(T t);
protected void kill() {
}A classe possui ainda três métodos de razoável importância. O construtor, uminicializador e um método de sincronização. Os demais métodos são accessors e
mutators, além de possíveis métodos para finalização.
/**
* Instancia o executor, mas nao inicia seu processamento
* @param n tamanho do pool de trabalhadores
* @param m capacidade maxima da caixa de mensagens
* @param kill tarefa utilizada para finalizar o processamento
*/
public Executor(int n, int m, T kill) {
executorService = Executors.newFixedThreadPool(n);
blockingQueue = new ArrayBlockingQueue(m);
this.kill = kill;
}
/**
* Inicia o processamento com uma nova linha de execucao
*/
public void execute() {
final Thread thread = new Thread(this);
thread.start();
}
/**
* aguarda o fim do processamento e finaliza o reservatorio
* de trabalhadores
*/
public void awaitTermination() {
try {
getCountDownLatch().await();
this.executorService.shutdown();
this.executorService.awaitTermination(1000,
TimeUnit.MILLISECONDS);
} catch (final InterruptedException e) {
e.printStackTrace();
}
} Quicksort Paralelo em Java
Façamos agora uma pausa, no desenvolvimento da solução para testar a classeacima. Como prova de conceito iremos implementar uma versão paralela do
Quicksort que utliza o código.
A ideia básica, em implementar um quicksort paralelo usando o padrão
produtor-consumidor é receber uma tarefa com um vetor a ser ordenado
e particioná-los através de um elemento pivo (desse vetor) e
adicionar cada uma das partições como novas tarefas da fila de execução do
executor.
@Override public void execute(QuickSortJobPodemos programar um certificado \footnote{algoritmos de verficação out) { int size = t.array.length; if (size < thresold){ Arrays.sort(t.array); t.result = t.array; t.array = null; if (t.joinToFather()){ doneSignal.countDown(); } return ; } int rand = random.nextInt(size); final T comparable = t.array[rand]; T[] rights = newInstanceArray(t); T[] lefts = newInstanceArray(t); T[] mids = newInstanceArray(t); int l = 0, r = 0, m = 0; for (int i = 0; i < t.array.length; i++) { T array_element = t.array[i]; if (array_element.compareTo(comparable) < 0){ lefts[l++] = array_element; continue; } if (array_element.compareTo(comparable) > 0){ rights[r++] = array_element; continue; } mids[m++] = array_element; } BlockingQueue > blockingQueue = getBlockingQueue(); blockingQueue.add(new QuickSortJob (t, QuickSortJob.LEFT, Arrays.copyOf(lefts, l))); t.mid = Arrays.copyOf(mids, m); blockingQueue.add (new QuickSortJob (t, QuickSortJob.RIGHT, Arrays.copyOf(rights, r))); t.array = null; }
certificados estão relacionados com complexidade computacional. Uma breve
explicação sobre o assunto pode nas do
Paulo Feofiloff para ordenação. Para isto criamos um vetor desordenado, executamos o ordenador e verificamos se os elementos do vetor estão ordenados.
public void testQuickSort() throws Exception {
Random random = new Random(); int n = 200000;
Integer[] problem = new Integer[n];
for (int i =0; i < n ; i++){
problem[i] = random.nextInt();
}
final long timeMilis = System.currentTimeMillis();
QuickSortWorker.doneSignal = new CountDownLatch(1);
originalJob = new QuickSortJob(null, 0, problem);
QuickSortWorker executor =
new QuickSortWorker(10, n); // executor
executor.getBlockingQueue().add(originalJob);
executor.setCountDownLatch(QuickSortWorker.doneSignal);
executor.execute();
executor.awaitTermination();
final long lastMilis = System.currentTimeMillis();
System.out.println("Tempo Total:" + (lastMilis - timeMilis));
int temp = Integer.MIN_VALUE;
QuickSortJob job = originalJob;
for (int i = 0; i < job.result.length; i++) {
assertTrue(temp <= job.result[i]);
temp = job.result[i];
}
}
Junit verde! Podemos prosseguir com o desenvolvimento do utilitário.Uma versão estática
O que a construção receive faz em Erlang, um ``switch case'' poderia fazer notemplate method execute. Podemos fazer melhor que isto. Em Java, é possível
fazer com que o compilador verifique se as mensagens enviadas são entendidas por
meio de sua assinatura. Para isto, iremos criar um wrapper que implementa
a (mesma) interface de um objeto alvo e transformar a mensagem bloqueante numa
assíncrona. Criaremos uma classe message com todas as informações necessárias
a execução da mensagem o próprio método (java.lang.reflec.Method), o objeto
alvo e os parâmetros da chamada.
public class Message implements Callable{
Object[] parameters;
Object target;
Method method;
public Message(Object target2, Method method2, Object[] args) {
parameters = args;
this.method = method2;
this.target = target2;
}
public Message() {
}
@Override
public Object call() throws Exception {
return method.invoke(target, parameters);
}
}Agora, iremos criamos uma especialização do Executor que processa uma FutureTask.public class SpawnExecutor extends ExecutorDevemos encapsular o objeto Message numa FutureTask. Dessa forma, podemos{ public SpawnExecutor(int n, int m) { super(n, m, new FutureTask(new Message())); } @Override protected void execute(FutureTask task) { executorService.execute(task); } public void shutdown(){ try { getBlockingQueue().put(getKill()); } catch (InterruptedException e) { } } }
notificar os interessados sobre a conclusão de uma mensagem específica. Façamos então uma outra especialização (do Executor) que exporte estas funcionalidades além de
oferecer uma interface mais simples para o envio de mensagens.
public void send(Message message) {
FutureTask futureTask = new FutureTask(message);
spawnExecutor.getBlockingQueue().add(futureTask);
notify(futureTask);
}
private void notify(FutureTask futureTask) {
if (observer != null){
observer.notify(futureTask);
}
}
private Observer observer;
public void setObserver(Observer observer) {
this.observer = observer;
}
public Observer getObserver() {
return observer;
}Na listagem, o efeito é conseguido com o padrão Decorator.Temos um último problema: popular o objeto Message. Enquanto que objeto alvo e
os parâmetros são conhecidos, ainda precisamos encontrar o objeto Method apartir
do pacote de reflexão (queremos encontrar o método apartir do nome). Para isso,
faremos um objeto mais especializado que recebe uma mensagem e cria e popula um
objeto Message e o coloca na fila de trabalhos.
public abstract class AsynchronousStaticExecutor extends AsynchronousExecutor{
protected void sendMessage(Method method, Object[] params) {
if (parameters.length - parameterTypes.length != 0)
return false;
for (int i = 0; i < parameters.length; i++) {
Object object = parameters[i];
if (!parameterTypes[i].isAssignableFrom(
object.getClass()))
return false;
}
return true;
}
}
Agora podemos criar a versão estática assíncrona do programa ping pong.Mas, como de costume iremos apresentar o teste unitário para tal.
@Test
public void testAsyncStaticPing(){
StringBuffer string = new StringBuffer();
Ping ping = new Ping(string);
AsyncPing pong = new AsyncPing(ping);
// iniciar processo
pong.start();
ping.sendPing(3, pong);
// finaliza processo
pong.shutdown();
pong.awaitForTermination();
verifyPingConversation(string, 3);
}E, a classe estática.public class AsyncPing extends AsynchronousStaticExecutor
implements PingInterface {
private Ping target;
public AsyncPing(Ping ping) {
this.target = ping;
}
@Override
protected Object getTarget() {
return target;
}
@Override
public void finished() {
sendMessage("finished", new Object[] {});
}
@Override
public void ping(PingInterface ping) {
sendMessage("ping", new Object[] { ping });
}
@Override
public void pong(PingInterface pong) {
sendMessage("pong", new Object[] { pong });
}
}Java Lang Reflection
Um mago sabe a importância de um espelho. Entre as várias mágicas que o pacotede reflexão da linguagem está o truque de criar um proxy às chamadas
de um conjunto de interfaces. Essa simples mágica talvez seja a forma mais
fácil de adicionar algum comportamento em tempo de execução para algum objeto
genérico (outras formas seriam a manipulação de byte code ou do próprio
código-fonte). Por tamanha facilidade, a API de reflexão é utilizada nas
chamadas RMI para forjar um representante local de uma classe remota. Este
representante recebe as chamadas locais e delega sua execução a entidade remota
correspondente dando a ilusão ao programador que as chamadas são realizadas localmente.
E, tudo isso graças a uma fábrica de proxies e uma interface de manipulação:
public interface InvocationHandler {
Object invoke(Object proxy, Method method, Object[] args)
throws Throwable;
}public class GenericProxyFactory {
Limitação importante: Não é possívelcriar proxy para objetos sem interfaces. Para contornar o problema é
possível utlizar a refatoração: extract interface.
Aplicação: AsynchronousProxy
Vamos agora ao objetivo final desse artigo, apresentar um proxy para transformarchamadas bloqueantes em chamadas não bloqueantes. O uso básico desse proxy e
permitir que o programador faça uma versão simples do código, sem se preocupar
com concorrência e depois, sem mudança de paradigma ou expressivade, utilizar a
concorrência.
@Test
public void testAsyncPing() throws Exception {
StringBuffer string = new StringBuffer();
Ping ping = new Ping(string);
Ping pong = new Ping(string);
// iniciar processo
AsynchronousProxyFactory asynchronousProxyFactory = new AsynchronousProxyFactory (1, 10, pong);
PingInterface asyncPong = asynchronousProxyFactory.getAsynchronousProxy(PingInterface.class);
asynchronousProxyFactory.start();
ping.sendPing(3, asyncPong);
// finalizar processo
asynchronousProxyFactory.shutdown();
asynchronousProxyFactory.awaitForTermination();
verifyPingConversation(string, 3);
}
public class AsynchronousInvocationHandler implements InvocationHandler{
private final AsynchronousExecutor asynchronousExecutor;
private final T target;
public AsynchronousInvocationHandler
(AsynchronousExecutor asynchronousObject, T target) {
this.asynchronousExecutor = asynchronousObject;
this.target = target;
}
@Override
public Object invoke
(Object proxy, Method method, Object[] args) throws Throwable {
asynchronousExecutor.send(new Message(target, method, args));
return null;
}
}
public class AsynchronousProxyFactory extends AsynchronousExecutor {
private T target;
public AsynchronousProxyFactory(int n, int m, T obj) {
super(n, m);
this.target = obj;
}
@SuppressWarnings("unchecked")
public T getAsynchronousProxy(Class intf) {
return (T) Proxy.newProxyInstance(target.getClass().getClassLoader(),
new Class[] { intf },
new AsynchronousInvocationHandler(this, target));
}
}
Feito!
Comentários