Pular para o conteúdo principal

Magia Assíncrona

Versão PDF


Como transformar objetos comuns em "objetos assíncronos"

Programação Concorrente em Erlang - A inspiração

Joe Armstrong, um dos criadores da linguagem Erlang nos mostrou uma maneira muito elegante de tratar concorrência na programação. Iremos introduzir alguns conceitos de Erlang que tangem a programação concorrente deixando de lado uma séria de sofisticações como o tratamento e propagação de erros, programação remota para focar nos conceitos mais básicos e gerais.
O primeiro comando básico de concorrência é o BIF (built in function) ``spawn''. Essa função é usada para criar um novo ``processo" (na nomenclatura de Erlang, processo seria uma linha de execução (portanto uma thread) que não compartilha dados). A assinatura da função é a seguinte: spawn(Module, Exported Function, List of Arguments). Considere o seguinte módulo (extraído do tutorial do Erlang):
--module(example1).

-export([start/0, say_something/2]).

say_something(_, 0) -> done;

say_something(What, Times)-> 
 io:format("~p~n", [What]),
 say_something(What, Times - 1).

start() ->
 spawn(example1, say_something, [hello, 3]),
 spawn(example1, say_something, [goodbye, 3]).
A função start() deste módulo bem simples cria dois processos que executa a função \verb+say_something+ com os parâmetros $[hello, 3]$ e $[goodbye,3]$. Além disso, a função devolve o id do processo criado (como veremos a frente).
1> c(example1).
{ok,example1}
2> example1:start().
hello
goodbye
<0.43.0>
hello
goodbye
hello
goodbye

Por enquanto, criamos processos que executam ações mas não escutam mensagens. Mas enviar mensagens para um objeto que não responde é como estender a mão a um cego esperando que aperte sua mão. Façamos então o processo enxegar ou ouvir mensagens. Para isso, utilizamos a estrutura receive.


A construção receive é usada para permitir que processos esperem mensagens de outros. Seu formato é:

receive
 pattern1 ->
  actions1;
 pattern2 ->
  actions2;
 ....
 patternN
  actionsN
end.
O que é um tanto quanto intuitivo. Ao receber alguma mensagem que coincida com o pattern1 o processo executará actions1 e assim sucessivamente. Outra construção igualmente importante é a forma como enviamos mensagens para um processo. Para isso usamos o operador ``!''.
Pid ! Message 
A construção acima envia a mensagem Message (que pode ser qualquer ``termo'' Erlang) para processo cujo id seja igual a Pid. A seguir, um exemplo de programa com interações (processos enviando mensagens para outros processos), ilustrando o uso das construções citadas:
-module(example2).

-export([start/0, ping/2, pong/0]).

ping(0, Pong_PID) ->
 Pong_PID ! finished,
 io:format("ping finished~n", []);

ping(N, Pong_PID) ->
 Pong_PID ! {ping, self()},
 receive
  pong -> io:format("Ping received pong~n", [])
 end,
 ping(N - 1, Pong_PID).

pong() ->
 receive
  finished -> io:format("Pong finished~n", []);
  {ping, Ping_PID} ->
   io:format("Pong received ping~n", []),
   Ping_PID ! pong, pong()
 end.
start() ->
 Pong_PID = spawn(example2, pong, []),
 spawn(example2, ping, [3, Pong_PID]).
Ao executar o código acima teremos o seguinte resultado:
1> c(example2.erl).
{ok,example2}
2> example2:start().
Pong received ping
<0.43.0>
Ping received pong
Pong received ping
Ping received pong
Pong received ping
Ping received pong
ping finished
Pong finished

Surpreendentemente simples e útil. Por trás dessas simples construções, Erlang
cria um novo processo (spawn) e uma ``caixa de mensagens'' para este processo.
Sempre que uma mensagem é enviado ao processo (Pid ! Message), a mensagem fica
guardada na caixa de mensagens até que o processo esteja disponível para recebê-lo. O processo então retira a mensagem da caixa (receive) e seleciona qual ação tomará baseado no
padrão que a mesma se encaixa.

Asynchronous Proxy Factory - Java Strike Back

Um dos grandes apelos das linguagens tipadas é a facilidade de saber quais
mensagens são entendidas por objetos dessa linguagem, e sempre que a assinatura
das mensagens enviadas não coincidirem com a interface do objeto tem-se um erro
em tempo de compilação. Isso permite a uma IDE como Eclipse fornecer dicas de
codificação no momento que o programador digita o código.
Em oposição, durante a codificação de mensagens em Erlang com a construção "Pid ! Message"
não é possível saber quais mensagens, afinal, o processo Pid escuta.
Java, não fornece estruturas simples como Erlang para mensageria mas um
complicado e também sofisticado arcabouço de concorrência java.util.concurrent.
Nossa proposta é criar uma biblioteca auxiliar que forneça os mecanismos de
concorrência para objetos que funcionem de forma ``síncrona'' usando apenas as
ferramentas já disponíveis na linguagem.

Versão OO do Ping-Pong

Para uma melhor comparação entre os conceitos de atores de Erlang e os conceitos
de Orientação a Objetos de Java, iremos fazer uma adaptação do programa
ping-pong (example2). Tomaremos o cuidado de fazer uma versão testável, quer
dizer, ao invés dos métodos escreverem String na saída padrão, eles irão
preencher um ``log de conversação'' que será analisado num teste unitário. Comecemos
pelo teste.

@Test
 public void ping(){
  StringBuffer string = new StringBuffer();
  Ping ping = new Ping(string);
  PingInterface pong = new Ping(string);
  ping.sendPing(3, pong);
  verifyPingConversation(string, 3);
 }
StringBuffer é uma versão sincronizada do StringBuilder. Isto vem a calhar
porque devemos evitar condições de corrida na escrita do log. A grosso modo,
este método é uma versão junit da função start() do programa example2.
A verificação que implementaremos apenas verifica se todas as falas foram
realizadas durante o diálogo dos atores. Inicializaremos uma mapa com as frases
e a quantidade de vezes que aquelas frases devem aparecer. Sempre que uma frase
for dita, iremos decrementar o contador no mapa até zerá-lo quando riscaremos
a frase do mapa.

private void verifyPingConversation(StringBuffer string, int i) {
  String[] messages = string.toString().split("\n");
  Map expectedResult = 
   new HashMap();
  expectedResult.put("Pong received ping", i);
  expectedResult.put("Ping received pong", i);
  expectedResult.put("Pong finished", 1);
  expectedResult.put("ping finished", 1);
  for (String string2 : messages) {
   Integer count = expectedResult.get(string2);
   count--;
   if (count ==0){
    expectedResult.remove(string2);
   } else {
    expectedResult.put(string2, count);
   }
  }
  Assert.assertTrue(expectedResult.isEmpty());
 }
Agora a versão Java do Ping. Bonus: definimos uma interface de comunicação,
eliminamos os seletores e a classe é testável.

public class Ping implements PingInterface {
 private StringBuffer conversation;


 public Ping(StringBuffer string){
  conversation = string;
 }

 public void sendPing(int n, PingInterface pid) {
  if (n ==0 ){
   pid.finished();
   say("ping finished\n");   
  } else {
   pid.ping(this);
   sendPing(n-1, pid);
  }
 }

 private void say(String string) {
  conversation.append(string);
 }

 public void finished() {
  say("Pong finished\n"); 
 }

 public void pong(PingInterface pong) {
  say("Ping received pong\n");
 }
 

 public void ping(PingInterface ping) {
  say("Pong received ping\n");
  ping.pong(this);
 }
}

Padrão Produtor-Consumidor em Java

Inicialmente, precisaremos de um sistema de mensageria como Erlang. Para
isso, teremos uma ``caixa de mensagens'' que faremos com o uso de uma
BlockingQueue (fila bloqueante do pacote java.util.concurrent). Por questões de
simplicidade utilizaremos a implementação ArrayBlockingQueue.
Deixaremos um consumidor a espera de serviços que chegarão na
``caixa de mensagens''. Este trabalhador terá uma linha de execução
independente do programa principal e um reservatório de trabalhadores para
executar cada uma das mensagens.
Para controlar o ciclo de vida do consumidor, o consumidor irá dispor de:

  • Uma tarefa kill; Ao receber a tarefa kill, o processo deve parar. A ideia é que essa seja a última tarefa a ser executada pelo processo.
  • Uma variável sentinel; caso o valor falso seja atribuido o processo pára independente se outras tarefas existam na fila;
  • um método para interrupção;
Internamente, o consumidor terá controle sobre o reservatório de trabalhadores
através da interface ExecutorService.
Abaixo, propriedades do executor conforme mencionado:

/**
  * caixa de mensagens (tarefas) do executor
  */
 protected BlockingQueue blockingQueue;

 /**
  * reservatorio de trabalhadores
  */
 protected ExecutorService executorService;

 
 // variaveis de controle do ciclo de vida
 private boolean sentinel = true;
 /**
  *  Tarefa que finaliza
  */
 private final T kill;

 /**
  * variavel de sincronizacao: quando chegar a zero, o
  *  processo terminou sua execucao
  */
 private CountDownLatch countDownLatch = new CountDownLatch(1);
O método principal do Executor, é o método público run() da interface Runnable.

@Override
 public void run() {
  try {
   // importante para interromper o loop infinito
   while (isSentinelEnable()) {
    T t = getBlockingQueue().take();
    if (t == kill) {
     kill();
     sentinel = false;
    } else {
     execute(t);
    }
   }
   // anuncia que o programa terminou
   getCountDownLatch().countDown();
  } catch (InterruptedException e) {
   e.printStackTrace();
  }
 } 
Se uma mensagem kill é recebida, o método kill() será invocado para que o objeto
possa realizar algum procedimento de sanidade. Para as demais mensagens, o
método execute() é invocado.
Enquanto execute() é abstrato o método kill() possui um implementação básica
vazia.

protected abstract void execute(T t);

 protected void kill() {
 }
A classe possui ainda três métodos de razoável importância. O construtor, um
inicializador e um método de sincronização. Os demais métodos são accessors e
mutators, além de possíveis métodos para finalização.

/**
  * Instancia o executor, mas nao inicia seu processamento 
  * @param n tamanho do pool de trabalhadores
  * @param m capacidade maxima da caixa de mensagens
  * @param kill tarefa utilizada para finalizar o processamento
  */
 public Executor(int n, int m, T kill) {
  executorService = Executors.newFixedThreadPool(n);
  blockingQueue = new ArrayBlockingQueue(m);
  this.kill = kill;
 }

 /**
  * Inicia o processamento com uma nova linha de execucao
  */
 public void execute() {
  final Thread thread = new Thread(this);
  thread.start();
 }

 /**
  * aguarda o fim do processamento e finaliza o reservatorio 
  * de trabalhadores
  */
 public void awaitTermination() {
  try {
   getCountDownLatch().await();
   this.executorService.shutdown();
   this.executorService.awaitTermination(1000,
                       TimeUnit.MILLISECONDS);
  } catch (final InterruptedException e) {
   e.printStackTrace();
  }
 }

Quicksort Paralelo em Java

Façamos agora uma pausa, no desenvolvimento da solução para testar a classe
acima. Como prova de conceito iremos implementar uma versão paralela do
Quicksort que utliza o código.
A ideia básica, em implementar um quicksort paralelo usando o padrão
produtor-consumidor é receber uma tarefa com um vetor a ser ordenado
e particioná-los através de um elemento pivo (desse vetor) e
adicionar cada uma das partições como novas tarefas da fila de execução do
executor.

@Override
 public void execute(QuickSortJob t) {
  int size = t.array.length;
  if (size < thresold){
   Arrays.sort(t.array);
   t.result = t.array;
   t.array = null;
   if (t.joinToFather()){
    doneSignal.countDown();
   }
   return ;
  } 
  int rand = random.nextInt(size);
  final T comparable = t.array[rand];
  T[] rights = newInstanceArray(t);
  T[] lefts = newInstanceArray(t);
  T[] mids  = newInstanceArray(t);
  int l = 0, r = 0, m = 0;
  for (int i = 0; i < t.array.length; i++) {
    T array_element = t.array[i];
   if (array_element.compareTo(comparable) < 0){
    lefts[l++] = array_element;
    continue;
   }
   if (array_element.compareTo(comparable) > 0){
    rights[r++] = array_element;
    continue;
   }
   mids[m++] = array_element;
  }  
  BlockingQueue> blockingQueue = getBlockingQueue();
  blockingQueue.add(new QuickSortJob
   (t, QuickSortJob.LEFT, Arrays.copyOf(lefts, l)));
  t.mid = Arrays.copyOf(mids, m);
  blockingQueue.add
   (new QuickSortJob(t, 
    QuickSortJob.RIGHT, Arrays.copyOf(rights, r)));
  t.array = null;
 }
Podemos programar um certificado \footnote{algoritmos de verficação ou
certificados estão relacionados com complexidade computacional. Uma breve
explicação sobre o assunto pode nas do
Paulo Feofiloff para ordenação. Para isto criamos um vetor desordenado, executamos o ordenador e verificamos se os elementos do vetor estão ordenados.

public void testQuickSort() throws Exception { 
  Random random = new Random(); int n = 200000;  
  Integer[] problem = new Integer[n];
  for (int i =0; i < n ; i++){
      problem[i] = random.nextInt();
  }
  final long timeMilis = System.currentTimeMillis();  
  QuickSortWorker.doneSignal = new CountDownLatch(1);
  originalJob =  new QuickSortJob(null, 0, problem);  
  QuickSortWorker executor = 
    new QuickSortWorker(10, n); // executor  
  executor.getBlockingQueue().add(originalJob);
  executor.setCountDownLatch(QuickSortWorker.doneSignal);
  executor.execute();
  executor.awaitTermination();
  
  final long lastMilis = System.currentTimeMillis();
  System.out.println("Tempo Total:" + (lastMilis - timeMilis));
  
  int temp = Integer.MIN_VALUE;
  QuickSortJob job = originalJob;
  for (int i = 0; i < job.result.length; i++) {
   assertTrue(temp <= job.result[i]);
   temp = job.result[i];
  }

 }
Junit verde! Podemos prosseguir com o desenvolvimento do utilitário.


Uma versão estática

O que a construção receive faz em Erlang, um ``switch case'' poderia fazer no
template method execute. Podemos fazer melhor que isto. Em Java, é possível
fazer com que o compilador verifique se as mensagens enviadas são entendidas por
meio de sua assinatura. Para isto, iremos criar um wrapper que implementa
a (mesma) interface de um objeto alvo e transformar a mensagem bloqueante numa
assíncrona. Criaremos uma classe message com todas as informações necessárias
a execução da mensagem o próprio método (java.lang.reflec.Method), o objeto
alvo e os parâmetros da chamada.


public class Message implements Callable{
 
 Object[] parameters;
 Object target;
 Method method;
 
 public Message(Object target2, Method method2, Object[] args) {
  parameters = args;
  this.method = method2;
  this.target = target2;
 }

 public Message() {
 }

 @Override
 public Object call() throws Exception {
  return method.invoke(target, parameters);
 }

}
Agora, iremos criamos uma especialização do Executor que processa uma FutureTask.

public class SpawnExecutor extends Executor{
 public SpawnExecutor(int n, int m) {
  super(n, m, new FutureTask(new Message()));
 }
 
 @Override
 protected void execute(FutureTask task) {
  executorService.execute(task);
 }

 public void shutdown(){
  try {
   getBlockingQueue().put(getKill());
  } catch (InterruptedException e) {
  }
 }
}
Devemos encapsular o objeto Message numa FutureTask. Dessa forma, podemos
notificar os interessados sobre a conclusão de uma mensagem específica. Façamos então uma outra especialização (do Executor) que exporte estas funcionalidades além de
oferecer uma interface mais simples para o envio de mensagens.

public void send(Message message) {
  FutureTask futureTask = new FutureTask(message);
  spawnExecutor.getBlockingQueue().add(futureTask);
  notify(futureTask);
 }

 private void notify(FutureTask futureTask) {
  if (observer != null){
   observer.notify(futureTask);
  }
 }

 private Observer observer;
 
 public void setObserver(Observer observer) {
  this.observer = observer;
 }
 
 public Observer getObserver() {
  return observer;
 }
Na listagem, o efeito é conseguido com o padrão Decorator.
Temos um último problema: popular o objeto Message. Enquanto que objeto alvo e
os parâmetros são conhecidos, ainda precisamos encontrar o objeto Method apartir
do pacote de reflexão (queremos encontrar o método apartir do nome). Para isso,
faremos um objeto mais especializado que recebe uma mensagem e cria e popula um
objeto Message e o coloca na fila de trabalhos.

public abstract class AsynchronousStaticExecutor extends AsynchronousExecutor{
  protected void sendMessage(Method method, Object[] params) {
  if (parameters.length - parameterTypes.length != 0)
   return false;
  for (int i = 0; i < parameters.length; i++) {
   Object object = parameters[i];
   if (!parameterTypes[i].isAssignableFrom(
           object.getClass()))
    return false;
  }
  return true;
 }
}
Agora podemos criar a versão estática assíncrona do programa ping pong.
Mas, como de costume iremos apresentar o teste unitário para tal.

@Test
 public void testAsyncStaticPing(){
  StringBuffer string = new StringBuffer();
  Ping ping = new Ping(string);  
  AsyncPing pong = new AsyncPing(ping);
  // iniciar processo
  pong.start();
  
  ping.sendPing(3, pong);
  
  // finaliza processo
  pong.shutdown();
  pong.awaitForTermination();
  
  verifyPingConversation(string, 3);
 }
E, a classe estática.

public class AsyncPing extends AsynchronousStaticExecutor 
     implements PingInterface {

 private Ping target;

 public AsyncPing(Ping ping) {
  this.target = ping;
 }

 @Override
 protected Object getTarget() {
  return target;
 }

 @Override
 public void finished() {
  sendMessage("finished", new Object[] {});
 }

 @Override
 public void ping(PingInterface ping) {
  sendMessage("ping", new Object[] { ping });
 }

 @Override
 public void pong(PingInterface pong) {
  sendMessage("pong", new Object[] { pong });
 }
}

Java Lang Reflection

Um mago sabe a importância de um espelho. Entre as várias mágicas que o pacote
de reflexão da linguagem está o truque de criar um proxy às chamadas
de um conjunto de interfaces. Essa simples mágica talvez seja a forma mais
fácil de adicionar algum comportamento em tempo de execução para algum objeto
genérico (outras formas seriam a manipulação de byte code ou do próprio
código-fonte). Por tamanha facilidade, a API de reflexão é utilizada nas
chamadas RMI para forjar um representante local de uma classe remota. Este
representante recebe as chamadas locais e delega sua execução a entidade remota
correspondente dando a ilusão ao programador que as chamadas são realizadas localmente.
E, tudo isso graças a uma fábrica de proxies e uma interface de manipulação:

public interface InvocationHandler {
    Object invoke(Object proxy, Method method, Object[] args)
        throws Throwable;
}
public class GenericProxyFactory {
Limitação importante: Não é possível
criar proxy para objetos sem interfaces. Para contornar o problema é
possível utlizar a refatoração: extract interface.

Aplicação: AsynchronousProxy

Vamos agora ao objetivo final desse artigo, apresentar um proxy para transformar
chamadas bloqueantes em chamadas não bloqueantes. O uso básico desse proxy e
permitir que o programador faça uma versão simples do código, sem se preocupar
com concorrência e depois, sem mudança de paradigma ou expressivade, utilizar a
concorrência.
@Test
 public void testAsyncPing() throws Exception {
  StringBuffer string = new StringBuffer();
  Ping ping = new Ping(string);
  Ping pong = new Ping(string);
  // iniciar processo
  AsynchronousProxyFactory asynchronousProxyFactory = new AsynchronousProxyFactory (1, 10, pong);
  PingInterface asyncPong = asynchronousProxyFactory.getAsynchronousProxy(PingInterface.class);
  asynchronousProxyFactory.start();
  
  ping.sendPing(3, asyncPong);
  
  // finalizar processo
  asynchronousProxyFactory.shutdown();
  asynchronousProxyFactory.awaitForTermination();
  
  verifyPingConversation(string, 3);
 }

public class AsynchronousInvocationHandler implements InvocationHandler{
 private final AsynchronousExecutor asynchronousExecutor;
 private final T target; 
 public AsynchronousInvocationHandler
   (AsynchronousExecutor asynchronousObject, T target) {
  this.asynchronousExecutor = asynchronousObject;
  this.target = target;
 }
 @Override
 public Object invoke
   (Object proxy, Method method, Object[] args) throws Throwable {
  asynchronousExecutor.send(new Message(target, method, args));
  return null; 
 }
}

public class AsynchronousProxyFactory extends AsynchronousExecutor {

 private T target;

 public AsynchronousProxyFactory(int n, int m, T obj) {
  super(n, m);
  this.target = obj;
 }

 @SuppressWarnings("unchecked")
 public T getAsynchronousProxy(Class intf) {
  return (T) Proxy.newProxyInstance(target.getClass().getClassLoader(),
    new Class[] { intf },
    new AsynchronousInvocationHandler(this, target));
 }
 
 
}

Feito!

Comentários

Postagens mais visitadas deste blog

Expressões, preconceito e racismo

Expressões preconceituosas e racistas Antes de alguma outra frase, primeiro peço licença para falar de mais um assunto do qual não domino. Falo por acreditar que um leigo presta serviço maior ao debater assunto com base em fontes (ainda que seja uma Wikipedia) e no pensamento lógico do que simplesmente se manter mudo a questões do cotidiano. Em voga agora está em falar quais são ou eram as expressões preconceituosas e racistas que até a pouco eram toleradas em muitos meios. Como é covarde dizer que em boca fechada não entra racismo. O racismo não é perpetrado apenas por quem profere mas por quem se cala à agressão perpetrada a outrem. Mas veremos que a questão é muito mais complexa que os cães raivosos do politicamente correto querem dizer. Tomo aqui a palavra racista, como sendo algo usado para impor a dominação de uma “raça” sobre outra. Portanto, a acusação de racismo vai muito além da mera acusação de preconceito. Não tenho o menor apreso por vitimismo barato, onde expressões q...

A hard logic problem - The escape of blue eyed vampires

Once upon a time, a vampire clan lived peacefully on an island (as long as vampire clans can live peacefully). Then, a demon lord came, overwhelmed the vampires and became the ruler of the island. The demon didn't want any vampire to escape so he created a gargoyle to guard the only way out. This gargoyle was a fantastic creature, so powerful that he was kept petrified for the whole time until a vampire appears. Then he awakened and started to fight until seeing no more vampire "alive" (as far a vampire can be alive). All vampires crazy enough to try were killed only left a hundred of vampires. There was a catch, of course. The gargoyle was not perfectly designed. It did not awaken when blue eyes vampires appeared. And all remaining vampire were blue eyes but as you know vampires cannot see him/her selves on reflections. For any reason, they were not aware of their eye colors. Besides all that, blue eyed vampires didn't like each other (so they would never say ...

Curry with JS

Partial application and currying with Javascript In the strict way, currying is the technique of transforming a function that takes multiple arguments (a tuple of arguments) to one function that receive only one. In such way, currying techniques allow transform one multi-parameter function in a chain of functions, each one with a single argument. Looks complicated? Blah.. it is not true. In this little article, we are actually more interesting in partial applications. Let’s take the Mozilla Example for replace function in String. As we know, we can use a “replacer” function as paramenter for replace method in String object. Let’s say that we want to split a String defined by a non-numerical part, a numerical part and finally a non-alphanumeric part. Here is how: function replacer(match, p1, p2, p3, offset, string){ // p1 is nondigits, p2 digits, and p3 non-alphanumerics return [p1, p2, p3].join(' - '); }; We can try it as usual… var newString = "abc12345#$*%...